import asyncio import os import traceback import asyncpg from env_config import get_database_url from scrape_job_runner import run_scrape_job from scrape_jobs import ( classify_scrape_error, claim_next_scrape_job, complete_scrape_job, compute_retry_delay_seconds, ensure_scrape_jobs_table, fail_scrape_job, heartbeat_scrape_job, update_scrape_job_progress, ) DB_URL = get_database_url() WORKER_NAME = os.getenv("SCRAPE_WORKER_NAME", f"scrape-worker-{os.getpid()}") POLL_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_POLL_INTERVAL", "5")) HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_HEARTBEAT_INTERVAL", "15")) STALE_HEARTBEAT_SECONDS = int( os.getenv("SCRAPE_WORKER_STALE_HEARTBEAT_SECONDS", str(max(HEARTBEAT_INTERVAL_SECONDS * 4, 60))) ) async def heartbeat_loop(pool, job_id: int, stop_event: asyncio.Event) -> None: while not stop_event.is_set(): try: await asyncio.wait_for(stop_event.wait(), timeout=HEARTBEAT_INTERVAL_SECONDS) except asyncio.TimeoutError: try: await heartbeat_scrape_job(pool, job_id) except Exception as exc: print(f"Warning: Klarte ikke å oppdatere heartbeat for jobb {job_id}: {exc}") async def main() -> None: print(f"Starter scrape worker: {WORKER_NAME}") pool = await asyncpg.create_pool(DB_URL, min_size=1, max_size=5, command_timeout=60) try: async with pool.acquire() as conn: await ensure_scrape_jobs_table(conn) while True: job = await claim_next_scrape_job(pool, WORKER_NAME, stale_heartbeat_seconds=STALE_HEARTBEAT_SECONDS) if not job: await asyncio.sleep(POLL_INTERVAL_SECONDS) continue job_id = job["id"] print( f"Worker plukket jobb #{job_id} ({job['job_type']}) " f"for {len(job.get('facility_ids', []))} anlegg" ) stop_event = asyncio.Event() heartbeat_task = asyncio.create_task(heartbeat_loop(pool, job_id, stop_event)) try: async def progress_callback(payload): await update_scrape_job_progress(pool, job_id, **payload) result_summary = await run_scrape_job(job, progress_callback=progress_callback) await complete_scrape_job(pool, job_id, result_summary) print(f"Jobb #{job_id} fullført") except Exception as exc: trace = traceback.format_exc(limit=5) error_code, retryable = classify_scrape_error(exc) retry_delay_seconds = ( compute_retry_delay_seconds(int(job.get("attempt_count") or 1), error_code) if retryable else 0 ) updated_job = await fail_scrape_job( pool, job_id, str(exc), { "error_code": error_code, "traceback": trace, }, error_code=error_code, retryable=retryable, retry_delay_seconds=retry_delay_seconds, ) if updated_job and updated_job.get("status") == "pending": print( f"Jobb #{job_id} feilet midlertidig ({error_code}). " f"Nytt forsøk {updated_job.get('attempt_count', 0)}/" f"{updated_job.get('max_attempts', 0)} om {retry_delay_seconds} sek." ) else: print(f"Jobb #{job_id} feilet permanent ({error_code}): {exc}\n{trace}") finally: stop_event.set() await heartbeat_task finally: await pool.close() if __name__ == "__main__": asyncio.run(main())