Nye-TeeOff/backend/scrape_jobs.py

236 lines
7.5 KiB
Python
Executable file

import json
from datetime import date, datetime
from typing import Any, Iterable
SCRAPE_JOB_TYPES = ("banestatus", "medlemskap", "greenfee", "vtg")
SCRAPE_JOB_STATUSES = ("pending", "running", "completed", "failed")
def normalize_facility_ids(facility_ids: Iterable[int]) -> list[int]:
cleaned = {
int(facility_id)
for facility_id in facility_ids
if str(facility_id).strip()
}
return sorted(facility_id for facility_id in cleaned if facility_id > 0)
def _parse_json(value: Any, fallback: Any) -> Any:
if value is None:
return fallback
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return fallback
return value
def format_scrape_job_row(row: Any) -> dict[str, Any] | None:
if row is None:
return None
data = dict(row)
for key in ("created_at", "started_at", "finished_at", "updated_at", "last_heartbeat_at"):
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
facility_ids = _parse_json(data.get("facility_ids"), [])
data["facility_ids"] = facility_ids if isinstance(facility_ids, list) else []
result_summary = _parse_json(data.get("result_summary"), {})
data["result_summary"] = result_summary if isinstance(result_summary, dict) else {}
return data
async def ensure_scrape_jobs_table(conn) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS scrape_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50) NOT NULL CHECK (job_type IN ('banestatus', 'medlemskap', 'greenfee', 'vtg')),
facility_ids JSONB NOT NULL DEFAULT '[]'::jsonb,
total_facilities INTEGER NOT NULL DEFAULT 0,
status VARCHAR(20) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'running', 'completed', 'failed')),
requested_by TEXT,
worker_name TEXT,
attempt_count INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
result_summary JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_heartbeat_at TIMESTAMPTZ
)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_status_created_at
ON scrape_jobs (status, created_at)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_scrape_jobs_job_type_created_at
ON scrape_jobs (job_type, created_at DESC)
"""
)
async def enqueue_scrape_job(pool, job_type: str, facility_ids: Iterable[int], requested_by: str | None = None) -> tuple[dict[str, Any], bool]:
if job_type not in SCRAPE_JOB_TYPES:
raise ValueError(f"Ugyldig job_type: {job_type}")
normalized_ids = normalize_facility_ids(facility_ids)
facility_ids_json = json.dumps(normalized_ids)
async with pool.acquire() as conn:
async with conn.transaction():
existing = await conn.fetchrow(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
AND status IN ('pending', 'running')
AND facility_ids = $2::jsonb
ORDER BY created_at DESC
LIMIT 1
""",
job_type,
facility_ids_json,
)
if existing:
return format_scrape_job_row(existing), False
row = await conn.fetchrow(
"""
INSERT INTO scrape_jobs (
job_type,
facility_ids,
total_facilities,
requested_by
)
VALUES ($1, $2::jsonb, $3, $4)
RETURNING *
""",
job_type,
facility_ids_json,
len(normalized_ids),
requested_by,
)
return format_scrape_job_row(row), True
async def list_scrape_jobs(pool, job_type: str | None = None, limit: int = 10) -> list[dict[str, Any]]:
safe_limit = max(1, min(limit, 50))
async with pool.acquire() as conn:
if job_type:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
WHERE job_type = $1
ORDER BY created_at DESC
LIMIT $2
""",
job_type,
safe_limit,
)
else:
rows = await conn.fetch(
"""
SELECT *
FROM scrape_jobs
ORDER BY created_at DESC
LIMIT $1
""",
safe_limit,
)
return [format_scrape_job_row(row) for row in rows]
async def claim_next_scrape_job(pool, worker_name: str) -> dict[str, Any] | None:
async with pool.acquire() as conn:
async with conn.transaction():
row = await conn.fetchrow(
"""
WITH next_job AS (
SELECT id
FROM scrape_jobs
WHERE status = 'pending'
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE scrape_jobs AS job
SET status = 'running',
worker_name = $1,
attempt_count = job.attempt_count + 1,
started_at = COALESCE(job.started_at, NOW()),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = NULL
FROM next_job
WHERE job.id = next_job.id
RETURNING job.*
""",
worker_name,
)
return format_scrape_job_row(row)
async def heartbeat_scrape_job(pool, job_id: int) -> None:
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET last_heartbeat_at = NOW(),
updated_at = NOW()
WHERE id = $1
""",
job_id,
)
async def complete_scrape_job(pool, job_id: int, result_summary: dict[str, Any] | None = None) -> None:
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'completed',
finished_at = NOW(),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = NULL,
result_summary = $2::jsonb
WHERE id = $1
""",
job_id,
payload,
)
async def fail_scrape_job(pool, job_id: int, error_message: str, result_summary: dict[str, Any] | None = None) -> None:
payload = json.dumps(result_summary or {})
async with pool.acquire() as conn:
await conn.execute(
"""
UPDATE scrape_jobs
SET status = 'failed',
finished_at = NOW(),
updated_at = NOW(),
last_heartbeat_at = NOW(),
error_message = $2,
result_summary = $3::jsonb
WHERE id = $1
""",
job_id,
error_message[:1000],
payload,
)