Nye-TeeOff/backend/worker.py

107 lines
3.9 KiB
Python
Raw Permalink Normal View History

2026-04-10 18:37:33 +02:00
import asyncio
import os
import traceback
import asyncpg
from env_config import get_database_url
2026-04-10 18:37:33 +02:00
from scrape_job_runner import run_scrape_job
from scrape_jobs import (
2026-04-11 16:01:36 +02:00
classify_scrape_error,
2026-04-10 18:37:33 +02:00
claim_next_scrape_job,
complete_scrape_job,
2026-04-11 16:01:36 +02:00
compute_retry_delay_seconds,
2026-04-10 18:37:33 +02:00
ensure_scrape_jobs_table,
fail_scrape_job,
heartbeat_scrape_job,
2026-04-12 10:11:23 +02:00
update_scrape_job_progress,
2026-04-10 18:37:33 +02:00
)
DB_URL = get_database_url()
2026-04-10 18:37:33 +02:00
WORKER_NAME = os.getenv("SCRAPE_WORKER_NAME", f"scrape-worker-{os.getpid()}")
POLL_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_POLL_INTERVAL", "5"))
HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_HEARTBEAT_INTERVAL", "15"))
2026-04-11 16:01:36 +02:00
STALE_HEARTBEAT_SECONDS = int(
os.getenv("SCRAPE_WORKER_STALE_HEARTBEAT_SECONDS", str(max(HEARTBEAT_INTERVAL_SECONDS * 4, 60)))
)
2026-04-10 18:37:33 +02:00
async def heartbeat_loop(pool, job_id: int, stop_event: asyncio.Event) -> None:
while not stop_event.is_set():
try:
await asyncio.wait_for(stop_event.wait(), timeout=HEARTBEAT_INTERVAL_SECONDS)
except asyncio.TimeoutError:
try:
await heartbeat_scrape_job(pool, job_id)
except Exception as exc:
2026-04-11 16:01:36 +02:00
print(f"Warning: Klarte ikke å oppdatere heartbeat for jobb {job_id}: {exc}")
2026-04-10 18:37:33 +02:00
async def main() -> None:
2026-04-11 16:01:36 +02:00
print(f"Starter scrape worker: {WORKER_NAME}")
2026-04-10 18:37:33 +02:00
pool = await asyncpg.create_pool(DB_URL, min_size=1, max_size=5, command_timeout=60)
try:
async with pool.acquire() as conn:
await ensure_scrape_jobs_table(conn)
while True:
2026-04-11 16:01:36 +02:00
job = await claim_next_scrape_job(pool, WORKER_NAME, stale_heartbeat_seconds=STALE_HEARTBEAT_SECONDS)
2026-04-10 18:37:33 +02:00
if not job:
await asyncio.sleep(POLL_INTERVAL_SECONDS)
continue
job_id = job["id"]
2026-04-11 16:01:36 +02:00
print(
f"Worker plukket jobb #{job_id} ({job['job_type']}) "
f"for {len(job.get('facility_ids', []))} anlegg"
)
2026-04-10 18:37:33 +02:00
stop_event = asyncio.Event()
heartbeat_task = asyncio.create_task(heartbeat_loop(pool, job_id, stop_event))
try:
2026-04-12 10:11:23 +02:00
async def progress_callback(payload):
await update_scrape_job_progress(pool, job_id, **payload)
result_summary = await run_scrape_job(job, progress_callback=progress_callback)
2026-04-10 18:37:33 +02:00
await complete_scrape_job(pool, job_id, result_summary)
2026-04-11 16:01:36 +02:00
print(f"Jobb #{job_id} fullført")
2026-04-10 18:37:33 +02:00
except Exception as exc:
trace = traceback.format_exc(limit=5)
2026-04-11 16:01:36 +02:00
error_code, retryable = classify_scrape_error(exc)
retry_delay_seconds = (
compute_retry_delay_seconds(int(job.get("attempt_count") or 1), error_code)
if retryable
else 0
)
updated_job = await fail_scrape_job(
2026-04-10 18:37:33 +02:00
pool,
job_id,
str(exc),
2026-04-11 16:01:36 +02:00
{
"error_code": error_code,
"traceback": trace,
},
error_code=error_code,
retryable=retryable,
retry_delay_seconds=retry_delay_seconds,
2026-04-10 18:37:33 +02:00
)
2026-04-11 16:01:36 +02:00
if updated_job and updated_job.get("status") == "pending":
print(
f"Jobb #{job_id} feilet midlertidig ({error_code}). "
f"Nytt forsøk {updated_job.get('attempt_count', 0)}/"
f"{updated_job.get('max_attempts', 0)} om {retry_delay_seconds} sek."
)
else:
print(f"Jobb #{job_id} feilet permanent ({error_code}): {exc}\n{trace}")
2026-04-10 18:37:33 +02:00
finally:
stop_event.set()
await heartbeat_task
finally:
await pool.close()
if __name__ == "__main__":
asyncio.run(main())