Nye-TeeOff/backend/scrape_jobs.py

404 lines
14 KiB
Python
Raw Normal View History

2026-04-10 18:37:33 +02:00
import json
from datetime import date, datetime
from typing import Any, Iterable
SCRAPE_JOB_TYPES = ("banestatus", "medlemskap", "greenfee", "vtg")
SCRAPE_JOB_STATUSES = ("pending", "running", "completed", "failed")
2026-04-11 16:01:36 +02:00
DEFAULT_MAX_ATTEMPTS = 3
2026-04-10 18:37:33 +02:00
def normalize_facility_ids(facility_ids: Iterable[int]) -> list[int]:
cleaned = {
int(facility_id)
for facility_id in facility_ids
if str(facility_id).strip()
}
return sorted(facility_id for facility_id in cleaned if facility_id > 0)
2026-04-11 16:01:36 +02:00
def _normalize_int_list(values: Iterable[Any]) -> list[int]:
cleaned: set[int] = set()
for value in values:
try:
cleaned.add(int(value))
except (TypeError, ValueError):
continue
return sorted(item for item in cleaned if item > 0)
2026-04-10 18:37:33 +02:00
def _parse_json(value: Any, fallback: Any) -> Any:
if value is None:
return fallback
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return fallback
return value
def format_scrape_job_row(row: Any) -> dict[str, Any] | None:
if row is None:
return None
data = dict(row)
2026-04-11 16:01:36 +02:00
for key in ("created_at", "started_at", "finished_at", "updated_at", "last_heartbeat_at", "next_retry_at", "last_error_at"):
2026-04-10 18:37:33 +02:00
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
facility_ids = _parse_json(data.get("facility_ids"), [])
data["facility_ids"] = facility_ids if isinstance(facility_ids, list) else []
result_summary = _parse_json(data.get("result_summary"), {})
data["result_summary"] = result_summary if isinstance(result_summary, dict) else {}
2026-04-11 16:01:36 +02:00
overlapping_facility_ids = _parse_json(data.get("overlapping_facility_ids"), [])
data["overlapping_facility_ids"] = _normalize_int_list(overlapping_facility_ids if isinstance(overlapping_facility_ids, list) else [])
data["retryable"] = bool(data.get("retryable", False))
data["attempt_count"] = int(data.get("attempt_count") or 0)
data["max_attempts"] = int(data.get("max_attempts") or DEFAULT_MAX_ATTEMPTS)
2026-04-10 18:37:33 +02:00
return data
2026-04-11 16:01:36 +02:00
def classify_scrape_error(exc: Exception) -> tuple[str, bool]:
message = str(exc).lower()
type_name = type(exc).__name__.lower()
module_name = type(exc).__module__.lower()
if "json" in type_name or "jsondecodeerror" in type_name or "expecting value" in message:
return "json_parse", False
if "gemini_api_key" in message or "api_key" in message or "mangler i .env" in message:
return "configuration", False
if "permission denied" in message or "not found" in message and "module" in message:
return "configuration", False
if "timeout" in message or "timed out" in message or "timeouterror" in type_name:
return "timeout", True
if "playwright" in module_name or "browser" in message or "page.goto" in message:
return "browser", True
if "connection" in message or "connectionreseterror" in type_name or "dns" in message or "network" in message:
return "network", True
if "asyncpg" in module_name or "postgres" in message or "database" in message:
return "database", True
if "valueerror" in type_name:
return "validation", False
return "unknown", False
def compute_retry_delay_seconds(attempt_count: int, error_code: str) -> int:
base_delay = {
"timeout": 30,
"network": 45,
"browser": 60,
"database": 30,
"unknown": 90,
}.get(error_code, 0)
if base_delay <= 0:
return 0
return min(300, base_delay * max(1, attempt_count))
2026-04-10 18:37:33 +02:00
async def ensure_scrape_jobs_table(conn) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS scrape_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50) NOT NULL CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg')),
facility_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
total_facilities INTEGER NOT NULL DEFAULT 0,
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed')),
requested_by TEXT,
worker_name TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
2026-04-11 16:01:36 +02:00
max_attempts INTEGER NOT NULL DEFAULT 3,
2026-04-10 18:37:33 +02:00
error_message TEXT,
2026-04-11 16:01:36 +02:00
error_code TEXT,
retryable BOOLEAN NOT NULL DEFAULT FALSE,
2026-04-10 18:37:33 +02:00
result_summary JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
2026-04-11 16:01:36 +02:00
last_heartbeat_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
last_error_at TIMESTAMPTZ
2026-04-10 18:37:33 +02:00
)
"""
)
2026-04-11 16:01:36 +02:00
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS max_attempts INTEGER NOT NULL DEFAULT 3")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS error_code TEXT")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS retryable BOOLEAN NOT NULL DEFAULT FALSE")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS last_error_at TIMESTAMPTZ")
2026-04-10 18:37:33 +02:00
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_created_at
ON scrape_jobs (status, created_at)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_job_type_created_at
ON scrape_jobs (job_type, created_at DESC)
"""
)
2026-04-11 16:01:36 +02:00
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_next_retry_at
ON scrape_jobs (status, next_retry_at)
"""
)
async def _find_active_job_conflict(conn, job_type: str, normalized_ids: list[int]) -> tuple[dict[str, Any] | None, str | None]:
if not normalized_ids:
return None, None
requested_set = set(normalized_ids)
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
AND status IN ('pending', 'running')
ORDER BY created_at DESC
""",
job_type,
)
partial_conflict: dict[str, Any] | None = None
for row in rows:
formatted = format_scrape_job_row(row)
if not formatted:
continue
existing_ids = normalize_facility_ids(formatted.get("facility_ids", []))
overlap = sorted(requested_set.intersection(existing_ids))
if not overlap:
continue
formatted["overlapping_facility_ids"] = overlap
if existing_ids == normalized_ids:
return formatted, "already_queued"
if partial_conflict is None:
partial_conflict = formatted
if partial_conflict:
return partial_conflict, "conflict"
return None, None
2026-04-10 18:37:33 +02:00
2026-04-11 16:01:36 +02:00
async def enqueue_scrape_job(pool, job_type: str, facility_ids: Iterable[int], requested_by: str | None = None) -> tuple[dict[str, Any], str]:
2026-04-10 18:37:33 +02:00
if job_type not in SCRAPE_JOB_TYPES:
raise ValueError(f"Ugyldig job_type: {job_type}")
normalized_ids = normalize_facility_ids(facility_ids)
facility_ids_json = json.dumps(normalized_ids)
async with pool.acquire() as conn:
async with conn.transaction():
2026-04-11 16:01:36 +02:00
existing_job, queue_status = await _find_active_job_conflict(conn, job_type, normalized_ids)
if existing_job and queue_status:
return existing_job, queue_status
2026-04-10 18:37:33 +02:00
row = await conn.fetchrow(
"""
INSERT INTO scrape_jobs (
job_type,
facility_ids,
total_facilities,
requested_by
)
VALUES ($1, $2::jsonb, $3, $4)
RETURNING *
""",
job_type,
facility_ids_json,
len(normalized_ids),
requested_by,
)
2026-04-11 16:01:36 +02:00
return format_scrape_job_row(row), "queued"
2026-04-10 18:37:33 +02:00
async def list_scrape_jobs(pool, job_type: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
safe_limit = max(1, min(limit, 50))
async with pool.acquire() as conn:
if job_type:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
ORDER BY created_at DESC
LIMIT $2
""",
job_type,
safe_limit,
)
else:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
ORDER BY created_at DESC
LIMIT $1
""",
safe_limit,
)
return [format_scrape_job_row(row) for row in rows]
2026-04-11 16:01:36 +02:00
async def claim_next_scrape_job(pool, worker_name: str, stale_heartbeat_seconds: int = 120) -> dict[str, Any] | None:
2026-04-10 18:37:33 +02:00
async with pool.acquire() as conn:
async with conn.transaction():
2026-04-11 16:01:36 +02:00
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'pending',
worker_name = NULL,
updated_at = NOW(),
retryable = TRUE,
error_code = COALESCE(error_code, 'worker_stale'),
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat før jobben ble ferdig.'),
last_error_at = NOW(),
next_retry_at = NOW() + INTERVAL '15 seconds'
WHERE status = 'running'
AND attempt_count < max_attempts
AND last_heartbeat_at IS NOT NULL
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
""",
stale_heartbeat_seconds,
)
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'failed',
worker_name = NULL,
updated_at = NOW(),
finished_at = NOW(),
retryable = FALSE,
error_code = COALESCE(error_code, 'worker_stale'),
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat og maks forsok er oppbrukt.'),
last_error_at = NOW(),
next_retry_at = NULL
WHERE status = 'running'
AND attempt_count >= max_attempts
AND last_heartbeat_at IS NOT NULL
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
""",
stale_heartbeat_seconds,
)
2026-04-10 18:37:33 +02:00
row = await conn.fetchrow(
"""
WITH next_job AS (
SELECT id
FROM scrape_jobs
WHERE status = 'pending'
2026-04-11 16:01:36 +02:00
AND COALESCE(next_retry_at, created_at) <= NOW()
2026-04-10 18:37:33 +02:00
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE scrape_jobs AS job
SET status = 'running',
worker_name = $1,
attempt_count = job.attempt_count + 1,
started_at = COALESCE(job.started_at, NOW()),
updated_at = NOW(),
last_heartbeat_at = NOW(),
2026-04-11 16:01:36 +02:00
error_message = NULL,
next_retry_at = NULL
2026-04-10 18:37:33 +02:00
FROM next_job
WHERE job.id = next_job.id
RETURNING job.*
""",
worker_name,
)
return format_scrape_job_row(row)
async def heartbeat_scrape_job(pool, job_id: int) -> None:
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET last_heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
""",
job_id,
)
async def complete_scrape_job(pool, job_id: int, result_summary: dict[str, Any] | None = None) -> None:
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'completed',
finished_at = NOW(),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = NULL,
2026-04-11 16:01:36 +02:00
error_code = NULL,
retryable = FALSE,
next_retry_at = NULL,
2026-04-10 18:37:33 +02:00
result_summary = $2::jsonb
WHERE id = $1
""",
job_id,
payload,
)
2026-04-11 16:01:36 +02:00
async def fail_scrape_job(
pool,
job_id: int,
error_message: str,
result_summary: dict[str, Any] | None = None,
*,
error_code: str = "unknown",
retryable: bool = False,
retry_delay_seconds: int = 0,
) -> dict[str, Any] | None:
2026-04-10 18:37:33 +02:00
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
2026-04-11 16:01:36 +02:00
row = await conn.fetchrow(
2026-04-10 18:37:33 +02:00
"""
UPDATE scrape_jobs
2026-04-11 16:01:36 +02:00
SET status = CASE
WHEN $4 AND attempt_count < max_attempts THEN 'pending'
ELSE 'failed'
END,
finished_at = CASE
WHEN $4 AND attempt_count < max_attempts THEN NULL
ELSE NOW()
END,
2026-04-10 18:37:33 +02:00
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = $2,
2026-04-11 16:01:36 +02:00
error_code = $3,
retryable = $4,
result_summary = $5::jsonb,
last_error_at = NOW(),
worker_name = CASE
WHEN $4 AND attempt_count < max_attempts THEN NULL
ELSE worker_name
END,
next_retry_at = CASE
WHEN $4 AND attempt_count < max_attempts THEN NOW() + ($6 * INTERVAL '1 second')
ELSE NULL
END
2026-04-10 18:37:33 +02:00
WHERE id = $1
2026-04-11 16:01:36 +02:00
RETURNING *
2026-04-10 18:37:33 +02:00
""",
job_id,
error_message[:1000],
2026-04-11 16:01:36 +02:00
error_code,
retryable,
2026-04-10 18:37:33 +02:00
payload,
2026-04-11 16:01:36 +02:00
max(0, retry_delay_seconds),
2026-04-10 18:37:33 +02:00
)
2026-04-11 16:01:36 +02:00
return format_scrape_job_row(row)