548 lines
21 KiB
Python
Executable file
548 lines
21 KiB
Python
Executable file
import json
|
|
from datetime import date, datetime
|
|
from typing import Any, Iterable
|
|
|
|
SCRAPE_JOB_TYPES = ("banestatus", "medlemskap", "greenfee", "vtg", "golfpakker")
|
|
SCRAPE_JOB_STATUSES = ("pending", "running", "completed", "failed")
|
|
DEFAULT_MAX_ATTEMPTS = 3
|
|
DEFAULT_RECENT_EVENTS_LIMIT = 12
|
|
|
|
|
|
def normalize_facility_ids(facility_ids: Iterable[int]) -> list[int]:
|
|
cleaned = {
|
|
int(facility_id)
|
|
for facility_id in facility_ids
|
|
if str(facility_id).strip()
|
|
}
|
|
return sorted(facility_id for facility_id in cleaned if facility_id > 0)
|
|
|
|
|
|
def _normalize_int_list(values: Iterable[Any]) -> list[int]:
|
|
cleaned: set[int] = set()
|
|
for value in values:
|
|
try:
|
|
cleaned.add(int(value))
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return sorted(item for item in cleaned if item > 0)
|
|
|
|
|
|
def _parse_json(value: Any, fallback: Any) -> Any:
|
|
if value is None:
|
|
return fallback
|
|
if isinstance(value, str):
|
|
try:
|
|
return json.loads(value)
|
|
except json.JSONDecodeError:
|
|
return fallback
|
|
return value
|
|
|
|
|
|
def _normalize_progress_event(value: Any) -> dict[str, Any] | None:
|
|
parsed = _parse_json(value, None)
|
|
if not isinstance(parsed, dict):
|
|
return None
|
|
|
|
return {
|
|
"timestamp": str(parsed.get("timestamp") or ""),
|
|
"facility_id": int(parsed["facility_id"]) if str(parsed.get("facility_id") or "").strip().isdigit() else None,
|
|
"facility_name": str(parsed.get("facility_name") or ""),
|
|
"outcome": str(parsed.get("outcome") or "info"),
|
|
"message": str(parsed.get("message") or ""),
|
|
"processed": int(parsed.get("processed") or 0),
|
|
"total": int(parsed.get("total") or 0),
|
|
}
|
|
|
|
|
|
def format_scrape_job_row(row: Any) -> dict[str, Any] | None:
|
|
if row is None:
|
|
return None
|
|
|
|
data = dict(row)
|
|
|
|
for key in ("created_at", "started_at", "finished_at", "updated_at", "last_heartbeat_at", "next_retry_at", "last_error_at"):
|
|
if isinstance(data.get(key), (date, datetime)):
|
|
data[key] = data[key].isoformat()
|
|
|
|
facility_ids = _parse_json(data.get("facility_ids"), [])
|
|
data["facility_ids"] = facility_ids if isinstance(facility_ids, list) else []
|
|
|
|
result_summary = _parse_json(data.get("result_summary"), {})
|
|
data["result_summary"] = result_summary if isinstance(result_summary, dict) else {}
|
|
overlapping_facility_ids = _parse_json(data.get("overlapping_facility_ids"), [])
|
|
data["overlapping_facility_ids"] = _normalize_int_list(overlapping_facility_ids if isinstance(overlapping_facility_ids, list) else [])
|
|
recent_events = _parse_json(data.get("recent_events"), [])
|
|
if isinstance(recent_events, list):
|
|
data["recent_events"] = [event for event in (_normalize_progress_event(item) for item in recent_events) if event]
|
|
else:
|
|
data["recent_events"] = []
|
|
data["progress_total"] = int(data.get("progress_total") or 0)
|
|
data["progress_completed"] = int(data.get("progress_completed") or 0)
|
|
data["progress_ok"] = int(data.get("progress_ok") or 0)
|
|
data["progress_failed"] = int(data.get("progress_failed") or 0)
|
|
data["progress_skipped"] = int(data.get("progress_skipped") or 0)
|
|
data["current_facility_id"] = int(data["current_facility_id"]) if str(data.get("current_facility_id") or "").strip().isdigit() else None
|
|
data["current_facility_name"] = str(data.get("current_facility_name") or "")
|
|
data["retryable"] = bool(data.get("retryable", False))
|
|
data["attempt_count"] = int(data.get("attempt_count") or 0)
|
|
data["max_attempts"] = int(data.get("max_attempts") or DEFAULT_MAX_ATTEMPTS)
|
|
|
|
return data
|
|
|
|
|
|
def classify_scrape_error(exc: Exception) -> tuple[str, bool]:
|
|
message = str(exc).lower()
|
|
type_name = type(exc).__name__.lower()
|
|
module_name = type(exc).__module__.lower()
|
|
|
|
if "json" in type_name or "jsondecodeerror" in type_name or "expecting value" in message:
|
|
return "json_parse", False
|
|
if "gemini_api_key" in message or "api_key" in message or "mangler i .env" in message:
|
|
return "configuration", False
|
|
if "permission denied" in message or "not found" in message and "module" in message:
|
|
return "configuration", False
|
|
if "timeout" in message or "timed out" in message or "timeouterror" in type_name:
|
|
return "timeout", True
|
|
if "playwright" in module_name or "browser" in message or "page.goto" in message:
|
|
return "browser", True
|
|
if "connection" in message or "connectionreseterror" in type_name or "dns" in message or "network" in message:
|
|
return "network", True
|
|
if "asyncpg" in module_name or "postgres" in message or "database" in message:
|
|
return "database", True
|
|
if "valueerror" in type_name:
|
|
return "validation", False
|
|
return "unknown", False
|
|
|
|
|
|
def compute_retry_delay_seconds(attempt_count: int, error_code: str) -> int:
|
|
base_delay = {
|
|
"timeout": 30,
|
|
"network": 45,
|
|
"browser": 60,
|
|
"database": 30,
|
|
"unknown": 90,
|
|
}.get(error_code, 0)
|
|
if base_delay <= 0:
|
|
return 0
|
|
return min(300, base_delay * max(1, attempt_count))
|
|
|
|
|
|
async def ensure_scrape_jobs_table(conn) -> None:
|
|
await conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS scrape_jobs (
|
|
id SERIAL PRIMARY KEY,
|
|
job_type VARCHAR(50) NOT NULL CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg', 'golfpakker')),
|
|
facility_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
total_facilities INTEGER NOT NULL DEFAULT 0,
|
|
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed')),
|
|
requested_by TEXT,
|
|
worker_name TEXT,
|
|
attempt_count INTEGER NOT NULL DEFAULT 0,
|
|
max_attempts INTEGER NOT NULL DEFAULT 3,
|
|
error_message TEXT,
|
|
error_code TEXT,
|
|
retryable BOOLEAN NOT NULL DEFAULT FALSE,
|
|
result_summary JSONB NOT NULL DEFAULT '{}'::jsonb,
|
|
recent_events JSONB NOT NULL DEFAULT '[]'::jsonb,
|
|
progress_total INTEGER NOT NULL DEFAULT 0,
|
|
progress_completed INTEGER NOT NULL DEFAULT 0,
|
|
progress_ok INTEGER NOT NULL DEFAULT 0,
|
|
progress_failed INTEGER NOT NULL DEFAULT 0,
|
|
progress_skipped INTEGER NOT NULL DEFAULT 0,
|
|
current_facility_id INTEGER,
|
|
current_facility_name TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
started_at TIMESTAMPTZ,
|
|
finished_at TIMESTAMPTZ,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
last_heartbeat_at TIMESTAMPTZ,
|
|
next_retry_at TIMESTAMPTZ,
|
|
last_error_at TIMESTAMPTZ
|
|
)
|
|
"""
|
|
)
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS max_attempts INTEGER NOT NULL DEFAULT 3")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS error_code TEXT")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS retryable BOOLEAN NOT NULL DEFAULT FALSE")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS recent_events JSONB NOT NULL DEFAULT '[]'::jsonb")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_total INTEGER NOT NULL DEFAULT 0")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_completed INTEGER NOT NULL DEFAULT 0")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_ok INTEGER NOT NULL DEFAULT 0")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_failed INTEGER NOT NULL DEFAULT 0")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS progress_skipped INTEGER NOT NULL DEFAULT 0")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS current_facility_id INTEGER")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS current_facility_name TEXT")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS next_retry_at TIMESTAMPTZ")
|
|
await conn.execute("ALTER TABLE scrape_jobs ADD COLUMN IF NOT EXISTS last_error_at TIMESTAMPTZ")
|
|
await conn.execute(
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
IF EXISTS (
|
|
SELECT 1
|
|
FROM pg_constraint
|
|
WHERE conname = 'scrape_jobs_job_type_check'
|
|
) THEN
|
|
ALTER TABLE scrape_jobs DROP CONSTRAINT scrape_jobs_job_type_check;
|
|
END IF;
|
|
EXCEPTION
|
|
WHEN undefined_object THEN NULL;
|
|
END $$;
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
ALTER TABLE scrape_jobs
|
|
ADD CONSTRAINT scrape_jobs_job_type_check
|
|
CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg', 'golfpakker'))
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_created_at
|
|
ON scrape_jobs (status, created_at)
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_job_type_created_at
|
|
ON scrape_jobs (job_type, created_at DESC)
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_next_retry_at
|
|
ON scrape_jobs (status, next_retry_at)
|
|
"""
|
|
)
|
|
|
|
|
|
async def _find_active_job_conflict(conn, job_type: str, normalized_ids: list[int]) -> tuple[dict[str, Any] | None, str | None]:
|
|
if not normalized_ids:
|
|
return None, None
|
|
|
|
requested_set = set(normalized_ids)
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT *
|
|
FROM scrape_jobs
|
|
WHERE job_type = $1
|
|
AND status IN ('pending', 'running')
|
|
ORDER BY created_at DESC
|
|
""",
|
|
job_type,
|
|
)
|
|
|
|
partial_conflict: dict[str, Any] | None = None
|
|
for row in rows:
|
|
formatted = format_scrape_job_row(row)
|
|
if not formatted:
|
|
continue
|
|
existing_ids = normalize_facility_ids(formatted.get("facility_ids", []))
|
|
overlap = sorted(requested_set.intersection(existing_ids))
|
|
if not overlap:
|
|
continue
|
|
|
|
formatted["overlapping_facility_ids"] = overlap
|
|
if existing_ids == normalized_ids:
|
|
return formatted, "already_queued"
|
|
if partial_conflict is None:
|
|
partial_conflict = formatted
|
|
|
|
if partial_conflict:
|
|
return partial_conflict, "conflict"
|
|
return None, None
|
|
|
|
|
|
async def enqueue_scrape_job(pool, job_type: str, facility_ids: Iterable[int], requested_by: str | None = None) -> tuple[dict[str, Any], str]:
|
|
if job_type not in SCRAPE_JOB_TYPES:
|
|
raise ValueError(f"Ugyldig job_type: {job_type}")
|
|
|
|
normalized_ids = normalize_facility_ids(facility_ids)
|
|
facility_ids_json = json.dumps(normalized_ids)
|
|
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
existing_job, queue_status = await _find_active_job_conflict(conn, job_type, normalized_ids)
|
|
if existing_job and queue_status:
|
|
return existing_job, queue_status
|
|
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO scrape_jobs (
|
|
job_type,
|
|
facility_ids,
|
|
total_facilities,
|
|
requested_by
|
|
)
|
|
VALUES ($1, $2::jsonb, $3, $4)
|
|
RETURNING *
|
|
""",
|
|
job_type,
|
|
facility_ids_json,
|
|
len(normalized_ids),
|
|
requested_by,
|
|
)
|
|
return format_scrape_job_row(row), "queued"
|
|
|
|
|
|
async def list_scrape_jobs(pool, job_type: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
|
|
safe_limit = max(1, min(limit, 50))
|
|
|
|
async with pool.acquire() as conn:
|
|
if job_type:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT *
|
|
FROM scrape_jobs
|
|
WHERE job_type = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT $2
|
|
""",
|
|
job_type,
|
|
safe_limit,
|
|
)
|
|
else:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT *
|
|
FROM scrape_jobs
|
|
ORDER BY created_at DESC
|
|
LIMIT $1
|
|
""",
|
|
safe_limit,
|
|
)
|
|
return [format_scrape_job_row(row) for row in rows]
|
|
|
|
|
|
async def claim_next_scrape_job(pool, worker_name: str, stale_heartbeat_seconds: int = 120) -> dict[str, Any] | None:
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
await conn.execute(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET status = 'pending',
|
|
worker_name = NULL,
|
|
updated_at = NOW(),
|
|
retryable = TRUE,
|
|
error_code = COALESCE(error_code, 'worker_stale'),
|
|
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat før jobben ble ferdig.'),
|
|
last_error_at = NOW(),
|
|
next_retry_at = NOW() + INTERVAL '15 seconds'
|
|
WHERE status = 'running'
|
|
AND attempt_count < max_attempts
|
|
AND last_heartbeat_at IS NOT NULL
|
|
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
|
|
""",
|
|
stale_heartbeat_seconds,
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET status = 'failed',
|
|
worker_name = NULL,
|
|
updated_at = NOW(),
|
|
finished_at = NOW(),
|
|
retryable = FALSE,
|
|
error_code = COALESCE(error_code, 'worker_stale'),
|
|
error_message = COALESCE(error_message, 'Forrige worker mistet heartbeat og maks forsok er oppbrukt.'),
|
|
last_error_at = NOW(),
|
|
next_retry_at = NULL
|
|
WHERE status = 'running'
|
|
AND attempt_count >= max_attempts
|
|
AND last_heartbeat_at IS NOT NULL
|
|
AND last_heartbeat_at < NOW() - ($1 * INTERVAL '1 second')
|
|
""",
|
|
stale_heartbeat_seconds,
|
|
)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
WITH next_job AS (
|
|
SELECT id
|
|
FROM scrape_jobs
|
|
WHERE status = 'pending'
|
|
AND COALESCE(next_retry_at, created_at) <= NOW()
|
|
ORDER BY created_at ASC
|
|
FOR UPDATE SKIP LOCKED
|
|
LIMIT 1
|
|
)
|
|
UPDATE scrape_jobs AS job
|
|
SET status = 'running',
|
|
worker_name = $1,
|
|
attempt_count = job.attempt_count + 1,
|
|
started_at = COALESCE(job.started_at, NOW()),
|
|
updated_at = NOW(),
|
|
last_heartbeat_at = NOW(),
|
|
error_message = NULL,
|
|
progress_completed = 0,
|
|
progress_ok = 0,
|
|
progress_failed = 0,
|
|
progress_skipped = 0,
|
|
current_facility_id = NULL,
|
|
current_facility_name = NULL,
|
|
recent_events = '[]'::jsonb,
|
|
next_retry_at = NULL
|
|
FROM next_job
|
|
WHERE job.id = next_job.id
|
|
RETURNING job.*
|
|
""",
|
|
worker_name,
|
|
)
|
|
return format_scrape_job_row(row)
|
|
|
|
|
|
async def heartbeat_scrape_job(pool, job_id: int) -> None:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET last_heartbeat_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
""",
|
|
job_id,
|
|
)
|
|
|
|
|
|
async def complete_scrape_job(pool, job_id: int, result_summary: dict[str, Any] | None = None) -> None:
|
|
payload = json.dumps(result_summary or {})
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET status = 'completed',
|
|
finished_at = NOW(),
|
|
updated_at = NOW(),
|
|
last_heartbeat_at = NOW(),
|
|
error_message = NULL,
|
|
error_code = NULL,
|
|
retryable = FALSE,
|
|
current_facility_id = NULL,
|
|
current_facility_name = NULL,
|
|
next_retry_at = NULL,
|
|
result_summary = $2::jsonb
|
|
WHERE id = $1
|
|
""",
|
|
job_id,
|
|
payload,
|
|
)
|
|
|
|
|
|
async def fail_scrape_job(
|
|
pool,
|
|
job_id: int,
|
|
error_message: str,
|
|
result_summary: dict[str, Any] | None = None,
|
|
*,
|
|
error_code: str = "unknown",
|
|
retryable: bool = False,
|
|
retry_delay_seconds: int = 0,
|
|
) -> dict[str, Any] | None:
|
|
payload = json.dumps(result_summary or {})
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET status = CASE
|
|
WHEN $4 AND attempt_count < max_attempts THEN 'pending'
|
|
ELSE 'failed'
|
|
END,
|
|
finished_at = CASE
|
|
WHEN $4 AND attempt_count < max_attempts THEN NULL
|
|
ELSE NOW()
|
|
END,
|
|
updated_at = NOW(),
|
|
last_heartbeat_at = NOW(),
|
|
error_message = $2,
|
|
error_code = $3,
|
|
retryable = $4,
|
|
result_summary = $5::jsonb,
|
|
last_error_at = NOW(),
|
|
current_facility_id = NULL,
|
|
current_facility_name = NULL,
|
|
worker_name = CASE
|
|
WHEN $4 AND attempt_count < max_attempts THEN NULL
|
|
ELSE worker_name
|
|
END,
|
|
next_retry_at = CASE
|
|
WHEN $4 AND attempt_count < max_attempts THEN NOW() + ($6 * INTERVAL '1 second')
|
|
ELSE NULL
|
|
END
|
|
WHERE id = $1
|
|
RETURNING *
|
|
""",
|
|
job_id,
|
|
error_message[:1000],
|
|
error_code,
|
|
retryable,
|
|
payload,
|
|
max(0, retry_delay_seconds),
|
|
)
|
|
return format_scrape_job_row(row)
|
|
|
|
|
|
async def update_scrape_job_progress(
|
|
pool,
|
|
job_id: int,
|
|
*,
|
|
progress_total: int | None = None,
|
|
progress_completed: int | None = None,
|
|
progress_ok: int | None = None,
|
|
progress_failed: int | None = None,
|
|
progress_skipped: int | None = None,
|
|
current_facility_id: int | None = None,
|
|
current_facility_name: str | None = None,
|
|
event: dict[str, Any] | None = None,
|
|
clear_current: bool = False,
|
|
) -> dict[str, Any] | None:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("SELECT * FROM scrape_jobs WHERE id = $1", job_id)
|
|
current = format_scrape_job_row(row)
|
|
if not current:
|
|
return None
|
|
|
|
events = list(current.get("recent_events") or [])
|
|
normalized_event = _normalize_progress_event(event) if event else None
|
|
if normalized_event:
|
|
normalized_event["timestamp"] = datetime.now().astimezone().isoformat()
|
|
events.append(normalized_event)
|
|
events = events[-DEFAULT_RECENT_EVENTS_LIMIT:]
|
|
|
|
next_current_facility_id = current.get("current_facility_id")
|
|
next_current_facility_name = current.get("current_facility_name") or ""
|
|
if clear_current:
|
|
next_current_facility_id = None
|
|
next_current_facility_name = ""
|
|
else:
|
|
if current_facility_id is not None:
|
|
next_current_facility_id = current_facility_id
|
|
if current_facility_name is not None:
|
|
next_current_facility_name = current_facility_name
|
|
|
|
updated_row = await conn.fetchrow(
|
|
"""
|
|
UPDATE scrape_jobs
|
|
SET progress_total = $2,
|
|
progress_completed = $3,
|
|
progress_ok = $4,
|
|
progress_failed = $5,
|
|
progress_skipped = $6,
|
|
current_facility_id = $7,
|
|
current_facility_name = $8,
|
|
recent_events = $9::jsonb,
|
|
updated_at = NOW()
|
|
WHERE id = $1
|
|
RETURNING *
|
|
""",
|
|
job_id,
|
|
progress_total if progress_total is not None else current.get("progress_total", 0),
|
|
progress_completed if progress_completed is not None else current.get("progress_completed", 0),
|
|
progress_ok if progress_ok is not None else current.get("progress_ok", 0),
|
|
progress_failed if progress_failed is not None else current.get("progress_failed", 0),
|
|
progress_skipped if progress_skipped is not None else current.get("progress_skipped", 0),
|
|
next_current_facility_id,
|
|
next_current_facility_name,
|
|
json.dumps(events),
|
|
)
|
|
return format_scrape_job_row(updated_row)
|