106 lines
3.9 KiB
Python
Executable file
106 lines
3.9 KiB
Python
Executable file
import asyncio
|
|
import os
|
|
import traceback
|
|
|
|
import asyncpg
|
|
|
|
from env_config import get_database_url
|
|
from scrape_job_runner import run_scrape_job
|
|
from scrape_jobs import (
|
|
classify_scrape_error,
|
|
claim_next_scrape_job,
|
|
complete_scrape_job,
|
|
compute_retry_delay_seconds,
|
|
ensure_scrape_jobs_table,
|
|
fail_scrape_job,
|
|
heartbeat_scrape_job,
|
|
update_scrape_job_progress,
|
|
)
|
|
|
|
DB_URL = get_database_url()
|
|
WORKER_NAME = os.getenv("SCRAPE_WORKER_NAME", f"scrape-worker-{os.getpid()}")
|
|
POLL_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_POLL_INTERVAL", "5"))
|
|
HEARTBEAT_INTERVAL_SECONDS = int(os.getenv("SCRAPE_WORKER_HEARTBEAT_INTERVAL", "15"))
|
|
STALE_HEARTBEAT_SECONDS = int(
|
|
os.getenv("SCRAPE_WORKER_STALE_HEARTBEAT_SECONDS", str(max(HEARTBEAT_INTERVAL_SECONDS * 4, 60)))
|
|
)
|
|
|
|
|
|
async def heartbeat_loop(pool, job_id: int, stop_event: asyncio.Event) -> None:
|
|
while not stop_event.is_set():
|
|
try:
|
|
await asyncio.wait_for(stop_event.wait(), timeout=HEARTBEAT_INTERVAL_SECONDS)
|
|
except asyncio.TimeoutError:
|
|
try:
|
|
await heartbeat_scrape_job(pool, job_id)
|
|
except Exception as exc:
|
|
print(f"Warning: Klarte ikke å oppdatere heartbeat for jobb {job_id}: {exc}")
|
|
|
|
|
|
async def main() -> None:
|
|
print(f"Starter scrape worker: {WORKER_NAME}")
|
|
pool = await asyncpg.create_pool(DB_URL, min_size=1, max_size=5, command_timeout=60)
|
|
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
await ensure_scrape_jobs_table(conn)
|
|
|
|
while True:
|
|
job = await claim_next_scrape_job(pool, WORKER_NAME, stale_heartbeat_seconds=STALE_HEARTBEAT_SECONDS)
|
|
if not job:
|
|
await asyncio.sleep(POLL_INTERVAL_SECONDS)
|
|
continue
|
|
|
|
job_id = job["id"]
|
|
print(
|
|
f"Worker plukket jobb #{job_id} ({job['job_type']}) "
|
|
f"for {len(job.get('facility_ids', []))} anlegg"
|
|
)
|
|
|
|
stop_event = asyncio.Event()
|
|
heartbeat_task = asyncio.create_task(heartbeat_loop(pool, job_id, stop_event))
|
|
|
|
try:
|
|
async def progress_callback(payload):
|
|
await update_scrape_job_progress(pool, job_id, **payload)
|
|
|
|
result_summary = await run_scrape_job(job, progress_callback=progress_callback)
|
|
await complete_scrape_job(pool, job_id, result_summary)
|
|
print(f"Jobb #{job_id} fullført")
|
|
except Exception as exc:
|
|
trace = traceback.format_exc(limit=5)
|
|
error_code, retryable = classify_scrape_error(exc)
|
|
retry_delay_seconds = (
|
|
compute_retry_delay_seconds(int(job.get("attempt_count") or 1), error_code)
|
|
if retryable
|
|
else 0
|
|
)
|
|
updated_job = await fail_scrape_job(
|
|
pool,
|
|
job_id,
|
|
str(exc),
|
|
{
|
|
"error_code": error_code,
|
|
"traceback": trace,
|
|
},
|
|
error_code=error_code,
|
|
retryable=retryable,
|
|
retry_delay_seconds=retry_delay_seconds,
|
|
)
|
|
if updated_job and updated_job.get("status") == "pending":
|
|
print(
|
|
f"Jobb #{job_id} feilet midlertidig ({error_code}). "
|
|
f"Nytt forsøk {updated_job.get('attempt_count', 0)}/"
|
|
f"{updated_job.get('max_attempts', 0)} om {retry_delay_seconds} sek."
|
|
)
|
|
else:
|
|
print(f"Jobb #{job_id} feilet permanent ({error_code}): {exc}\n{trace}")
|
|
finally:
|
|
stop_event.set()
|
|
await heartbeat_task
|
|
finally:
|
|
await pool.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|