Nye-TeeOff/backend/scrape_jobs.py
2026-04-15 08:15:53 +02:00

548 lines
21 KiB
Python
Executable file

import json
from datetime import date, datetime
from typing import Any, Iterable
SCRAPE_JOB_TYPES = ("banestatus", "medlemskap", "greenfee", "vtg", "golfpakker")
SCRAPE_JOB_STATUSES = ("pending", "running", "completed", "failed")
DEFAULT_MAX_ATTEMPTS = 3
DEFAULT_RECENT_EVENTS_LIMIT = 12
def normalize_facility_ids(facility_ids: Iterable[int]) -> list[int]:
cleaned = {
int(facility_id)
for facility_id in facility_ids
if str(facility_id).strip()
}
return sorted(facility_id for facility_id in cleaned if facility_id > 0)
def _normalize_int_list(values: Iterable[Any]) -> list[int]:
cleaned: set[int] = set()
for value in values:
try:
cleaned.add(int(value))
except (TypeError, ValueError):
continue
return sorted(item for item in cleaned if item > 0)
def _parse_json(value: Any, fallback: Any) -> Any:
if value is None:
return fallback
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return fallback
return value
def _normalize_progress_event(value: Any) -> dict[str, Any] | None:
parsed = _parse_json(value, None)
if not isinstance(parsed, dict):
return None
return {
"timestamp": str(parsed.get("timestamp") or ""),
"facility_id": int(parsed["facility_id"]) if str(parsed.get("facility_id") or "").strip().isdigit() else None,
"facility_name": str(parsed.get("facility_name") or ""),
"outcome": str(parsed.get("outcome") or "info"),
"message": str(parsed.get("message") or ""),
"processed": int(parsed.get("processed") or 0),
"total": int(parsed.get("total") or 0),
}
def format_scrape_job_row(row: Any) -> dict[str, Any] | None:
if row is None:
return None
data = dict(row)
for key in ("created_at", "started_at", "finished_at", "updated_at", "last_heartbeat_at", "next_retry_at", "last_error_at"):
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
facility_ids = _parse_json(data.get("facility_ids"), [])
data["facility_ids"] = facility_ids if isinstance(facility_ids, list) else []
result_summary = _parse_json(data.get("result_summary"), {})
data["result_summary"] = result_summary if isinstance(result_summary, dict) else {}
overlapping_facility_ids = _parse_json(data.get("overlapping_facility_ids"), [])
data["overlapping_facility_ids"] = _normalize_int_list(overlapping_facility_ids if isinstance(overlapping_facility_ids, list) else [])
recent_events = _parse_json(data.get("recent_events"), [])
if isinstance(recent_events, list):
data["recent_events"] = [event for event in (_normalize_progress_event(item) for item in recent_events) if event]
else:
data["recent_events"] = []
data["progress_total"] = int(data.get("progress_total") or 0)
data["progress_completed"] = int(data.get("progress_completed") or 0)
data["progress_ok"] = int(data.get("progress_ok") or 0)
data["progress_failed"] = int(data.get("progress_failed") or 0)
data["progress_skipped"] = int(data.get("progress_skipped") or 0)
data["current_facility_id"] = int(data["current_facility_id"]) if str(data.get("current_facility_id") or "").strip().isdigit() else None
data["current_facility_name"] = str(data.get("current_facility_name") or "")
data["retryable"] = bool(data.get("retryable", False))
data["attempt_count"] = int(data.get("attempt_count") or 0)
data["max_attempts"] = int(data.get("max_attempts") or DEFAULT_MAX_ATTEMPTS)
return data
def classify_scrape_error(exc: Exception) -> tuple[str, bool]:
message = str(exc).lower()
type_name = type(exc).__name__.lower()
module_name = type(exc).__module__.lower()
if "json" in type_name or "jsondecodeerror" in type_name or "expecting value" in message:
return "json_parse", False
if "gemini_api_key" in message or "api_key" in message or "mangler i .env" in message:
return "configuration", False
if "permission denied" in message or "not found" in message and "module" in message:
return "configuration", False
if "timeout" in message or "timed out" in message or "timeouterror" in type_name:
return "timeout", True
if "playwright" in module_name or "browser" in message or "page.goto" in message:
return "browser", True
if "connection" in message or "connectionreseterror" in type_name or "dns" in message or "network" in message:
return "network", True
if "asyncpg" in module_name or "postgres" in message or "database" in message:
return "database", True
if "valueerror" in type_name:
return "validation", False
return "unknown", False
def compute_retry_delay_seconds(attempt_count: int, error_code: str) -> int:
base_delay = {
"timeout": 30,
"network": 45,
"browser": 60,
"database": 30,
"unknown": 90,
}.get(error_code, 0)
if base_delay <= 0:
return 0
return min(300, base_delay * max(1, attempt_count))
async def ensure_scrape_jobs_table(conn) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS scrape_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50) NOT NULL CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg', 'golfpakker')),
facility_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
total_facilities INTEGER NOT NULL DEFAULT 0,
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed')),
requested_by TEXT,
worker_name TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
error_message TEXT,
error_code TEXT,
retryable BOOLEAN NOT NULL DEFAULT FALSE,
result_summary JSONB NOT NULL DEFAULT '{}'::jsonb,
recent_events JSONB NOT NULL DEFAULT '[]'::jsonb,
progress_total INTEGER NOT NULL DEFAULT 0,
progress_completed INTEGER NOT NULL DEFAULT 0,
progress_ok INTEGER NOT NULL DEFAULT 0,
progress_failed INTEGER NOT NULL DEFAULT 0,
progress_skipped INTEGER NOT NULL DEFAULT 0,
current_facility_id INTEGER,
current_facility_name TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_heartbeat_at TIMESTAMPTZ,
next_retry_at TIMESTAMPTZ,
last_error_at TIMESTAMPTZ
)
"""
)
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS max_attempts INTEGER NOT NULL DEFAULT 3")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS error_code TEXT")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS retryable BOOLEAN NOT NULL DEFAULT FALSE")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS recent_events JSONB NOT NULL DEFAULT '[]'::jsonb")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_total INTEGER NOT NULL DEFAULT 0")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_completed INTEGER NOT NULL DEFAULT 0")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_ok INTEGER NOT NULL DEFAULT 0")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_failed INTEGER NOT NULL DEFAULT 0")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_skipped INTEGER NOT NULL DEFAULT 0")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS current_facility_id INTEGER")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS current_facility_name TEXT")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ")
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS last_error_at TIMESTAMPTZ")
await conn.execute(
"""
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'scrape_jobs_job_type_check'
) THEN
ALTER TABLE scrape_jobs DROP CONSTRAINT scrape_jobs_job_type_check;
END IF;
EXCEPTION
WHEN undefined_object THEN NULL;
END $$;
"""
)
await conn.execute(
"""
ALTER TABLE scrape_jobs
ADD CONSTRAINT scrape_jobs_job_type_check
CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg', 'golfpakker'))
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_created_at
ON scrape_jobs (status, created_at)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_job_type_created_at
ON scrape_jobs (job_type, created_at DESC)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_next_retry_at
ON scrape_jobs (status, next_retry_at)
"""
)
async def _find_active_job_conflict(conn, job_type: str, normalized_ids: list[int]) -> tuple[dict[str, Any] | None, str | None]:
if not normalized_ids:
return None, None
requested_set = set(normalized_ids)
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
AND status IN ('pending', 'running')
ORDER BY created_at DESC
""",
job_type,
)
partial_conflict: dict[str, Any] | None = None
for row in rows:
formatted = format_scrape_job_row(row)
if not formatted:
continue
existing_ids = normalize_facility_ids(formatted.get("facility_ids", []))
overlap = sorted(requested_set.intersection(existing_ids))
if not overlap:
continue
formatted["overlapping_facility_ids"] = overlap
if existing_ids == normalized_ids:
return formatted, "already_queued"
if partial_conflict is None:
partial_conflict = formatted
if partial_conflict:
return partial_conflict, "conflict"
return None, None
async def enqueue_scrape_job(pool, job_type: str, facility_ids: Iterable[int], requested_by: str | None = None) -> tuple[dict[str, Any], str]:
if job_type not in SCRAPE_JOB_TYPES:
raise ValueError(f"Ugyldig job_type: {job_type}")
normalized_ids = normalize_facility_ids(facility_ids)
facility_ids_json = json.dumps(normalized_ids)
async with pool.acquire() as conn:
async with conn.transaction():
existing_job, queue_status = await _find_active_job_conflict(conn, job_type, normalized_ids)
if existing_job and queue_status:
return existing_job, queue_status
row = await conn.fetchrow(
"""
INSERT INTO scrape_jobs (
job_type,
facility_ids,
total_facilities,
requested_by
)
VALUES ($1, $2::jsonb, $3, $4)
RETURNING *
""",
job_type,
facility_ids_json,
len(normalized_ids),
requested_by,
)
return format_scrape_job_row(row), "queued"
async def list_scrape_jobs(pool, job_type: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
safe_limit = max(1, min(limit, 50))
async with pool.acquire() as conn:
if job_type:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
ORDER BY created_at DESC
LIMIT $2
""",
job_type,
safe_limit,
)
else:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
ORDER BY created_at DESC
LIMIT $1
""",
safe_limit,
)
return [format_scrape_job_row(row) for row in rows]
async def claim_next_scrape_job(pool, worker_name: str, stale_heartbeat_seconds: int = 120) -> dict[str, Any] | None:
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'pending',
worker_name = NULL,
updated_at = NOW(),
retryable = TRUE,
error_code = COALESCE(error_code, 'worker_stale'),
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat før jobben ble ferdig.'),
last_error_at = NOW(),
next_retry_at = NOW() + INTERVAL '15 seconds'
WHERE status = 'running'
AND attempt_count < max_attempts
AND last_heartbeat_at IS NOT NULL
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
""",
stale_heartbeat_seconds,
)
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'failed',
worker_name = NULL,
updated_at = NOW(),
finished_at = NOW(),
retryable = FALSE,
error_code = COALESCE(error_code, 'worker_stale'),
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat og maks forsok er oppbrukt.'),
last_error_at = NOW(),
next_retry_at = NULL
WHERE status = 'running'
AND attempt_count >= max_attempts
AND last_heartbeat_at IS NOT NULL
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
""",
stale_heartbeat_seconds,
)
row = await conn.fetchrow(
"""
WITH next_job AS (
SELECT id
FROM scrape_jobs
WHERE status = 'pending'
AND COALESCE(next_retry_at, created_at) <= NOW()
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE scrape_jobs AS job
SET status = 'running',
worker_name = $1,
attempt_count = job.attempt_count + 1,
started_at = COALESCE(job.started_at, NOW()),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = NULL,
progress_completed = 0,
progress_ok = 0,
progress_failed = 0,
progress_skipped = 0,
current_facility_id = NULL,
current_facility_name = NULL,
recent_events = '[]'::jsonb,
next_retry_at = NULL
FROM next_job
WHERE job.id = next_job.id
RETURNING job.*
""",
worker_name,
)
return format_scrape_job_row(row)
async def heartbeat_scrape_job(pool, job_id: int) -> None:
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET last_heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
""",
job_id,
)
async def complete_scrape_job(pool, job_id: int, result_summary: dict[str, Any] | None = None) -> None:
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'completed',
finished_at = NOW(),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = NULL,
error_code = NULL,
retryable = FALSE,
current_facility_id = NULL,
current_facility_name = NULL,
next_retry_at = NULL,
result_summary = $2::jsonb
WHERE id = $1
""",
job_id,
payload,
)
async def fail_scrape_job(
pool,
job_id: int,
error_message: str,
result_summary: dict[str, Any] | None = None,
*,
error_code: str = "unknown",
retryable: bool = False,
retry_delay_seconds: int = 0,
) -> dict[str, Any] | None:
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
UPDATE scrape_jobs
SET status = CASE
WHEN $4 AND attempt_count < max_attempts THEN 'pending'
ELSE 'failed'
END,
finished_at = CASE
WHEN $4 AND attempt_count < max_attempts THEN NULL
ELSE NOW()
END,
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = $2,
error_code = $3,
retryable = $4,
result_summary = $5::jsonb,
last_error_at = NOW(),
current_facility_id = NULL,
current_facility_name = NULL,
worker_name = CASE
WHEN $4 AND attempt_count < max_attempts THEN NULL
ELSE worker_name
END,
next_retry_at = CASE
WHEN $4 AND attempt_count < max_attempts THEN NOW() + ($6 * INTERVAL '1 second')
ELSE NULL
END
WHERE id = $1
RETURNING *
""",
job_id,
error_message[:1000],
error_code,
retryable,
payload,
max(0, retry_delay_seconds),
)
return format_scrape_job_row(row)
async def update_scrape_job_progress(
pool,
job_id: int,
*,
progress_total: int | None = None,
progress_completed: int | None = None,
progress_ok: int | None = None,
progress_failed: int | None = None,
progress_skipped: int | None = None,
current_facility_id: int | None = None,
current_facility_name: str | None = None,
event: dict[str, Any] | None = None,
clear_current: bool = False,
) -> dict[str, Any] | None:
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM scrape_jobs WHERE id = $1", job_id)
current = format_scrape_job_row(row)
if not current:
return None
events = list(current.get("recent_events") or [])
normalized_event = _normalize_progress_event(event) if event else None
if normalized_event:
normalized_event["timestamp"] = datetime.now().astimezone().isoformat()
events.append(normalized_event)
events = events[-DEFAULT_RECENT_EVENTS_LIMIT:]
next_current_facility_id = current.get("current_facility_id")
next_current_facility_name = current.get("current_facility_name") or ""
if clear_current:
next_current_facility_id = None
next_current_facility_name = ""
else:
if current_facility_id is not None:
next_current_facility_id = current_facility_id
if current_facility_name is not None:
next_current_facility_name = current_facility_name
updated_row = await conn.fetchrow(
"""
UPDATE scrape_jobs
SET progress_total = $2,
progress_completed = $3,
progress_ok = $4,
progress_failed = $5,
progress_skipped = $6,
current_facility_id = $7,
current_facility_name = $8,
recent_events = $9::jsonb,
updated_at = NOW()
WHERE id = $1
RETURNING *
""",
job_id,
progress_total if progress_total is not None else current.get("progress_total", 0),
progress_completed if progress_completed is not None else current.get("progress_completed", 0),
progress_ok if progress_ok is not None else current.get("progress_ok", 0),
progress_failed if progress_failed is not None else current.get("progress_failed", 0),
progress_skipped if progress_skipped is not None else current.get("progress_skipped", 0),
next_current_facility_id,
next_current_facility_name,
json.dumps(events),
)
return format_scrape_job_row(updated_row)