451 lines
17 KiB
Python
451 lines
17 KiB
Python
import asyncio
|
|
import os
|
|
import random
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
from email.utils import parsedate_to_datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import asyncpg
|
|
import httpx
|
|
|
|
LOCAL_TZ = ZoneInfo("Europe/Oslo")
|
|
MET_LOCATIONFORECAST_URL = os.getenv(
|
|
"MET_LOCATIONFORECAST_URL",
|
|
"https://api.met.no/weatherapi/locationforecast/2.0/compact",
|
|
).strip()
|
|
MET_API_USER_AGENT = os.getenv(
|
|
"MET_API_USER_AGENT",
|
|
"TeeOff.no/1.0 contact@teeoff.no https://teeoff.no",
|
|
).strip()
|
|
WEATHER_SYNC_INTERVAL_SECONDS = max(900, int(os.getenv("WEATHER_SYNC_INTERVAL_SECONDS", "3600")))
|
|
WEATHER_SYNC_INITIAL_DELAY_SECONDS = max(5, int(os.getenv("WEATHER_SYNC_INITIAL_DELAY_SECONDS", "20")))
|
|
WEATHER_SYNC_CONCURRENCY = max(1, min(4, int(os.getenv("WEATHER_SYNC_CONCURRENCY", "3"))))
|
|
WEATHER_MAX_DAY_OFFSET = 7
|
|
DAYLIGHT_START_HOUR = 8
|
|
DAYLIGHT_END_HOUR = 20
|
|
DRY_MAX_PRECIP_MM = 0.3
|
|
DRY_MAX_PRECIP_PROBABILITY = 25.0
|
|
|
|
|
|
def _parse_http_datetime(value: str | None) -> datetime | None:
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return None
|
|
try:
|
|
parsed = parsedate_to_datetime(raw)
|
|
except (TypeError, ValueError, IndexError):
|
|
return None
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|
|
|
|
|
|
def _parse_iso_datetime(value: str | None) -> datetime | None:
|
|
raw = str(value or "").strip()
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _format_confidence(day_offset: int) -> str:
|
|
if day_offset <= 2:
|
|
return "high"
|
|
if day_offset <= 5:
|
|
return "medium"
|
|
return "low"
|
|
|
|
|
|
def _overlap_hours(start: datetime, end: datetime, window_start: datetime, window_end: datetime) -> float:
|
|
overlap_start = max(start, window_start)
|
|
overlap_end = min(end, window_end)
|
|
if overlap_end <= overlap_start:
|
|
return 0.0
|
|
return (overlap_end - overlap_start).total_seconds() / 3600
|
|
|
|
|
|
def _select_period_data(data: dict) -> tuple[int, dict] | None:
|
|
for key, hours in (("next_1_hours", 1), ("next_6_hours", 6), ("next_12_hours", 12)):
|
|
details = ((data.get(key) or {}).get("details") or {})
|
|
if "precipitation_amount" in details or "probability_of_precipitation" in details:
|
|
return hours, details
|
|
return None
|
|
|
|
|
|
def summarize_weather_forecast(payload: dict, today_local: date | None = None) -> list[dict]:
|
|
timeseries = (((payload or {}).get("properties") or {}).get("timeseries") or [])
|
|
today = today_local or datetime.now(LOCAL_TZ).date()
|
|
|
|
buckets: dict[date, dict] = {}
|
|
for offset in range(WEATHER_MAX_DAY_OFFSET + 1):
|
|
forecast_date = today + timedelta(days=offset)
|
|
buckets[forecast_date] = {
|
|
"forecast_date": forecast_date,
|
|
"day_offset": offset,
|
|
"precip_mm": 0.0,
|
|
"precip_probability_max": 0.0,
|
|
"daylight_precip_mm": 0.0,
|
|
"daylight_precip_probability_max": 0.0,
|
|
"confidence": _format_confidence(offset),
|
|
}
|
|
|
|
for entry in timeseries:
|
|
period = _select_period_data((entry or {}).get("data") or {})
|
|
if not period:
|
|
continue
|
|
|
|
period_hours, details = period
|
|
base_time = _parse_iso_datetime((entry or {}).get("time"))
|
|
if base_time is None or period_hours <= 0:
|
|
continue
|
|
|
|
precip_amount = float(details.get("precipitation_amount") or 0.0)
|
|
precip_probability = float(details.get("probability_of_precipitation") or 0.0)
|
|
start_local = base_time.astimezone(LOCAL_TZ)
|
|
end_local = (base_time + timedelta(hours=period_hours)).astimezone(LOCAL_TZ)
|
|
if end_local <= start_local:
|
|
continue
|
|
|
|
for forecast_date, bucket in buckets.items():
|
|
day_start = datetime.combine(forecast_date, time.min, tzinfo=LOCAL_TZ)
|
|
day_end = day_start + timedelta(days=1)
|
|
day_overlap = _overlap_hours(start_local, end_local, day_start, day_end)
|
|
if day_overlap > 0:
|
|
ratio = day_overlap / period_hours
|
|
bucket["precip_mm"] += precip_amount * ratio
|
|
bucket["precip_probability_max"] = max(bucket["precip_probability_max"], precip_probability)
|
|
|
|
daylight_start = datetime.combine(
|
|
forecast_date,
|
|
time(hour=DAYLIGHT_START_HOUR),
|
|
tzinfo=LOCAL_TZ,
|
|
)
|
|
daylight_end = datetime.combine(
|
|
forecast_date,
|
|
time(hour=DAYLIGHT_END_HOUR),
|
|
tzinfo=LOCAL_TZ,
|
|
)
|
|
daylight_overlap = _overlap_hours(start_local, end_local, daylight_start, daylight_end)
|
|
if daylight_overlap > 0:
|
|
ratio = daylight_overlap / period_hours
|
|
bucket["daylight_precip_mm"] += precip_amount * ratio
|
|
bucket["daylight_precip_probability_max"] = max(
|
|
bucket["daylight_precip_probability_max"],
|
|
precip_probability,
|
|
)
|
|
|
|
rows: list[dict] = []
|
|
for forecast_date in sorted(buckets.keys()):
|
|
bucket = buckets[forecast_date]
|
|
precip_mm = round(bucket["precip_mm"], 2)
|
|
precip_probability_max = round(bucket["precip_probability_max"], 1)
|
|
daylight_precip_mm = round(bucket["daylight_precip_mm"], 2)
|
|
daylight_precip_probability_max = round(bucket["daylight_precip_probability_max"], 1)
|
|
rows.append(
|
|
{
|
|
"forecast_date": forecast_date,
|
|
"day_offset": bucket["day_offset"],
|
|
"dry_all_day": precip_mm < DRY_MAX_PRECIP_MM
|
|
and precip_probability_max < DRY_MAX_PRECIP_PROBABILITY,
|
|
"dry_daylight": daylight_precip_mm < DRY_MAX_PRECIP_MM
|
|
and daylight_precip_probability_max < DRY_MAX_PRECIP_PROBABILITY,
|
|
"precip_mm": precip_mm,
|
|
"precip_probability_max": precip_probability_max,
|
|
"daylight_precip_mm": daylight_precip_mm,
|
|
"daylight_precip_probability_max": daylight_precip_probability_max,
|
|
"confidence": bucket["confidence"],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
async def ensure_weather_forecast_table(conn) -> None:
|
|
await conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS facility_weather_forecast (
|
|
facility_id INTEGER NOT NULL REFERENCES facilities(id) ON DELETE CASCADE,
|
|
forecast_date DATE NOT NULL,
|
|
day_offset SMALLINT NOT NULL CHECK (day_offset BETWEEN 0 AND 7),
|
|
dry_all_day BOOLEAN NOT NULL DEFAULT FALSE,
|
|
dry_daylight BOOLEAN NOT NULL DEFAULT FALSE,
|
|
precip_mm NUMERIC(6,2),
|
|
precip_probability_max NUMERIC(5,2),
|
|
daylight_precip_mm NUMERIC(6,2),
|
|
daylight_precip_probability_max NUMERIC(5,2),
|
|
confidence TEXT NOT NULL DEFAULT 'medium',
|
|
source_updated_at TIMESTAMPTZ,
|
|
source_expires_at TIMESTAMPTZ,
|
|
source_last_modified TEXT,
|
|
calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
PRIMARY KEY (facility_id, forecast_date)
|
|
)
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_daylight
|
|
ON facility_weather_forecast (day_offset, dry_daylight)
|
|
"""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_all_day
|
|
ON facility_weather_forecast (day_offset, dry_all_day)
|
|
"""
|
|
)
|
|
|
|
|
|
async def _read_existing_metadata(conn, facility_id: int) -> dict | None:
|
|
return await conn.fetchrow(
|
|
"""
|
|
SELECT
|
|
COUNT(*)::int AS row_count,
|
|
MIN(forecast_date) AS min_date,
|
|
MAX(forecast_date) AS max_date,
|
|
MAX(source_expires_at) AS source_expires_at,
|
|
MAX(source_last_modified) AS source_last_modified
|
|
FROM facility_weather_forecast
|
|
WHERE facility_id = $1
|
|
""",
|
|
facility_id,
|
|
)
|
|
|
|
|
|
async def _persist_weather_rows(
|
|
conn,
|
|
facility_id: int,
|
|
rows: list[dict],
|
|
*,
|
|
source_updated_at: datetime | None,
|
|
source_expires_at: datetime | None,
|
|
source_last_modified: str | None,
|
|
) -> None:
|
|
async with conn.transaction():
|
|
valid_dates = [row["forecast_date"] for row in rows]
|
|
await conn.execute("DELETE FROM facility_weather_forecast WHERE facility_id = $1", facility_id)
|
|
for row in rows:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO facility_weather_forecast (
|
|
facility_id,
|
|
forecast_date,
|
|
day_offset,
|
|
dry_all_day,
|
|
dry_daylight,
|
|
precip_mm,
|
|
precip_probability_max,
|
|
daylight_precip_mm,
|
|
daylight_precip_probability_max,
|
|
confidence,
|
|
source_updated_at,
|
|
source_expires_at,
|
|
source_last_modified,
|
|
calculated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW()
|
|
)
|
|
ON CONFLICT (facility_id, forecast_date) DO UPDATE SET
|
|
day_offset = EXCLUDED.day_offset,
|
|
dry_all_day = EXCLUDED.dry_all_day,
|
|
dry_daylight = EXCLUDED.dry_daylight,
|
|
precip_mm = EXCLUDED.precip_mm,
|
|
precip_probability_max = EXCLUDED.precip_probability_max,
|
|
daylight_precip_mm = EXCLUDED.daylight_precip_mm,
|
|
daylight_precip_probability_max = EXCLUDED.daylight_precip_probability_max,
|
|
confidence = EXCLUDED.confidence,
|
|
source_updated_at = EXCLUDED.source_updated_at,
|
|
source_expires_at = EXCLUDED.source_expires_at,
|
|
source_last_modified = EXCLUDED.source_last_modified,
|
|
calculated_at = NOW()
|
|
""",
|
|
facility_id,
|
|
row["forecast_date"],
|
|
row["day_offset"],
|
|
row["dry_all_day"],
|
|
row["dry_daylight"],
|
|
row["precip_mm"],
|
|
row["precip_probability_max"],
|
|
row["daylight_precip_mm"],
|
|
row["daylight_precip_probability_max"],
|
|
row["confidence"],
|
|
source_updated_at,
|
|
source_expires_at,
|
|
source_last_modified,
|
|
)
|
|
|
|
if valid_dates:
|
|
await conn.execute(
|
|
"""
|
|
DELETE FROM facility_weather_forecast
|
|
WHERE facility_id = $1
|
|
AND NOT (forecast_date = ANY($2::date[]))
|
|
""",
|
|
facility_id,
|
|
valid_dates,
|
|
)
|
|
|
|
|
|
async def sync_facility_weather_forecast(
|
|
conn,
|
|
client: httpx.AsyncClient,
|
|
facility_id: int,
|
|
lat: float,
|
|
lng: float,
|
|
*,
|
|
today_local: date | None = None,
|
|
force: bool = False,
|
|
) -> str:
|
|
today = today_local or datetime.now(LOCAL_TZ).date()
|
|
metadata = await _read_existing_metadata(conn, facility_id)
|
|
row_count = int(metadata["row_count"] or 0) if metadata else 0
|
|
min_date = metadata["min_date"] if metadata else None
|
|
max_date = metadata["max_date"] if metadata else None
|
|
source_expires_at = metadata["source_expires_at"] if metadata else None
|
|
source_last_modified = str(metadata["source_last_modified"] or "").strip() if metadata else ""
|
|
|
|
needs_full_window_refresh = (
|
|
row_count != WEATHER_MAX_DAY_OFFSET + 1
|
|
or min_date != today
|
|
or max_date != today + timedelta(days=WEATHER_MAX_DAY_OFFSET)
|
|
)
|
|
|
|
if not force and not needs_full_window_refresh and isinstance(source_expires_at, datetime):
|
|
if source_expires_at.tzinfo is None:
|
|
source_expires_at = source_expires_at.replace(tzinfo=timezone.utc)
|
|
if source_expires_at > datetime.now(timezone.utc):
|
|
return "cached"
|
|
|
|
headers: dict[str, str] = {}
|
|
if source_last_modified and not needs_full_window_refresh:
|
|
headers["If-Modified-Since"] = source_last_modified
|
|
|
|
response = await client.get(
|
|
MET_LOCATIONFORECAST_URL,
|
|
params={
|
|
"lat": round(float(lat), 4),
|
|
"lon": round(float(lng), 4),
|
|
},
|
|
headers=headers,
|
|
)
|
|
|
|
if response.status_code == 304:
|
|
refreshed_expires_at = _parse_http_datetime(response.headers.get("Expires")) or source_expires_at
|
|
refreshed_last_modified = response.headers.get("Last-Modified") or source_last_modified
|
|
await conn.execute(
|
|
"""
|
|
UPDATE facility_weather_forecast
|
|
SET source_expires_at = $2,
|
|
source_last_modified = $3,
|
|
calculated_at = NOW()
|
|
WHERE facility_id = $1
|
|
""",
|
|
facility_id,
|
|
refreshed_expires_at,
|
|
refreshed_last_modified,
|
|
)
|
|
return "not_modified"
|
|
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
rows = summarize_weather_forecast(payload, today_local=today)
|
|
updated_at = _parse_iso_datetime((((payload.get("properties") or {}).get("meta") or {}).get("updated_at")))
|
|
expires_at = _parse_http_datetime(response.headers.get("Expires"))
|
|
last_modified = response.headers.get("Last-Modified")
|
|
await _persist_weather_rows(
|
|
conn,
|
|
facility_id,
|
|
rows,
|
|
source_updated_at=updated_at,
|
|
source_expires_at=expires_at,
|
|
source_last_modified=last_modified,
|
|
)
|
|
return "updated"
|
|
|
|
|
|
async def sync_all_weather_forecasts(pool, *, force: bool = False) -> dict[str, int]:
|
|
today_local = datetime.now(LOCAL_TZ).date()
|
|
latest_local_date = today_local + timedelta(days=WEATHER_MAX_DAY_OFFSET)
|
|
|
|
async with pool.acquire() as conn:
|
|
facilities = await conn.fetch(
|
|
"""
|
|
SELECT id, name, lat, lng
|
|
FROM facilities
|
|
WHERE lat IS NOT NULL AND lng IS NOT NULL
|
|
ORDER BY id ASC
|
|
"""
|
|
)
|
|
|
|
if not facilities:
|
|
return {"facilities": 0, "updated": 0, "cached": 0, "not_modified": 0, "failed": 0}
|
|
|
|
stats = {"facilities": len(facilities), "updated": 0, "cached": 0, "not_modified": 0, "failed": 0}
|
|
semaphore = asyncio.Semaphore(WEATHER_SYNC_CONCURRENCY)
|
|
timeout = httpx.Timeout(20.0, connect=10.0)
|
|
|
|
async with httpx.AsyncClient(
|
|
timeout=timeout,
|
|
headers={
|
|
"User-Agent": MET_API_USER_AGENT,
|
|
"Accept": "application/json",
|
|
},
|
|
follow_redirects=True,
|
|
) as client:
|
|
async def handle_facility(facility) -> None:
|
|
async with semaphore:
|
|
async with pool.acquire() as conn:
|
|
try:
|
|
outcome = await sync_facility_weather_forecast(
|
|
conn,
|
|
client,
|
|
int(facility["id"]),
|
|
float(facility["lat"]),
|
|
float(facility["lng"]),
|
|
force=force,
|
|
)
|
|
stats[outcome] = stats.get(outcome, 0) + 1
|
|
except Exception as exc:
|
|
stats["failed"] += 1
|
|
print(
|
|
f"Vær-sync feilet for {facility['name']} (id={facility['id']}): {exc}"
|
|
)
|
|
|
|
await asyncio.gather(*(handle_facility(facility) for facility in facilities))
|
|
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
DELETE FROM facility_weather_forecast
|
|
WHERE forecast_date < $1
|
|
OR forecast_date > $2
|
|
""",
|
|
today_local,
|
|
latest_local_date,
|
|
)
|
|
|
|
return stats
|
|
|
|
|
|
async def weather_sync_loop(pool, stop_event: asyncio.Event) -> None:
|
|
await asyncio.sleep(WEATHER_SYNC_INITIAL_DELAY_SECONDS + random.uniform(0, 30))
|
|
|
|
while not stop_event.is_set():
|
|
try:
|
|
stats = await sync_all_weather_forecasts(pool)
|
|
print(
|
|
"Vær-sync fullført: "
|
|
f"{stats['updated']} oppdatert, "
|
|
f"{stats['not_modified']} uendret, "
|
|
f"{stats['cached']} cachet, "
|
|
f"{stats['failed']} feilet"
|
|
)
|
|
except Exception as exc:
|
|
print(f"Vær-sync feilet på batch-nivå: {exc}")
|
|
|
|
sleep_seconds = WEATHER_SYNC_INTERVAL_SECONDS + random.uniform(0, 600)
|
|
try:
|
|
await asyncio.wait_for(stop_event.wait(), timeout=sleep_seconds)
|
|
except asyncio.TimeoutError:
|
|
continue
|