Nye-TeeOff/backend/weather_forecast.py

451 lines
17 KiB
Python

import asyncio
import os
import random
from datetime import date, datetime, time, timedelta, timezone
from email.utils import parsedate_to_datetime
from zoneinfo import ZoneInfo
import asyncpg
import httpx
LOCAL_TZ = ZoneInfo("Europe/Oslo")
MET_LOCATIONFORECAST_URL = os.getenv(
"MET_LOCATIONFORECAST_URL",
"https://api.met.no/weatherapi/locationforecast/2.0/compact",
).strip()
MET_API_USER_AGENT = os.getenv(
"MET_API_USER_AGENT",
"TeeOff.no/1.0 contact@teeoff.no https://teeoff.no",
).strip()
WEATHER_SYNC_INTERVAL_SECONDS = max(900, int(os.getenv("WEATHER_SYNC_INTERVAL_SECONDS", "3600")))
WEATHER_SYNC_INITIAL_DELAY_SECONDS = max(5, int(os.getenv("WEATHER_SYNC_INITIAL_DELAY_SECONDS", "20")))
WEATHER_SYNC_CONCURRENCY = max(1, min(4, int(os.getenv("WEATHER_SYNC_CONCURRENCY", "3"))))
WEATHER_MAX_DAY_OFFSET = 7
DAYLIGHT_START_HOUR = 8
DAYLIGHT_END_HOUR = 20
DRY_MAX_PRECIP_MM = 0.3
DRY_MAX_PRECIP_PROBABILITY = 25.0
def _parse_http_datetime(value: str | None) -> datetime | None:
raw = str(value or "").strip()
if not raw:
return None
try:
parsed = parsedate_to_datetime(raw)
except (TypeError, ValueError, IndexError):
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def _parse_iso_datetime(value: str | None) -> datetime | None:
raw = str(value or "").strip()
if not raw:
return None
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00"))
except ValueError:
return None
def _format_confidence(day_offset: int) -> str:
if day_offset <= 2:
return "high"
if day_offset <= 5:
return "medium"
return "low"
def _overlap_hours(start: datetime, end: datetime, window_start: datetime, window_end: datetime) -> float:
overlap_start = max(start, window_start)
overlap_end = min(end, window_end)
if overlap_end <= overlap_start:
return 0.0
return (overlap_end - overlap_start).total_seconds() / 3600
def _select_period_data(data: dict) -> tuple[int, dict] | None:
for key, hours in (("next_1_hours", 1), ("next_6_hours", 6), ("next_12_hours", 12)):
details = ((data.get(key) or {}).get("details") or {})
if "precipitation_amount" in details or "probability_of_precipitation" in details:
return hours, details
return None
def summarize_weather_forecast(payload: dict, today_local: date | None = None) -> list[dict]:
timeseries = (((payload or {}).get("properties") or {}).get("timeseries") or [])
today = today_local or datetime.now(LOCAL_TZ).date()
buckets: dict[date, dict] = {}
for offset in range(WEATHER_MAX_DAY_OFFSET + 1):
forecast_date = today + timedelta(days=offset)
buckets[forecast_date] = {
"forecast_date": forecast_date,
"day_offset": offset,
"precip_mm": 0.0,
"precip_probability_max": 0.0,
"daylight_precip_mm": 0.0,
"daylight_precip_probability_max": 0.0,
"confidence": _format_confidence(offset),
}
for entry in timeseries:
period = _select_period_data((entry or {}).get("data") or {})
if not period:
continue
period_hours, details = period
base_time = _parse_iso_datetime((entry or {}).get("time"))
if base_time is None or period_hours <= 0:
continue
precip_amount = float(details.get("precipitation_amount") or 0.0)
precip_probability = float(details.get("probability_of_precipitation") or 0.0)
start_local = base_time.astimezone(LOCAL_TZ)
end_local = (base_time + timedelta(hours=period_hours)).astimezone(LOCAL_TZ)
if end_local <= start_local:
continue
for forecast_date, bucket in buckets.items():
day_start = datetime.combine(forecast_date, time.min, tzinfo=LOCAL_TZ)
day_end = day_start + timedelta(days=1)
day_overlap = _overlap_hours(start_local, end_local, day_start, day_end)
if day_overlap > 0:
ratio = day_overlap / period_hours
bucket["precip_mm"] += precip_amount * ratio
bucket["precip_probability_max"] = max(bucket["precip_probability_max"], precip_probability)
daylight_start = datetime.combine(
forecast_date,
time(hour=DAYLIGHT_START_HOUR),
tzinfo=LOCAL_TZ,
)
daylight_end = datetime.combine(
forecast_date,
time(hour=DAYLIGHT_END_HOUR),
tzinfo=LOCAL_TZ,
)
daylight_overlap = _overlap_hours(start_local, end_local, daylight_start, daylight_end)
if daylight_overlap > 0:
ratio = daylight_overlap / period_hours
bucket["daylight_precip_mm"] += precip_amount * ratio
bucket["daylight_precip_probability_max"] = max(
bucket["daylight_precip_probability_max"],
precip_probability,
)
rows: list[dict] = []
for forecast_date in sorted(buckets.keys()):
bucket = buckets[forecast_date]
precip_mm = round(bucket["precip_mm"], 2)
precip_probability_max = round(bucket["precip_probability_max"], 1)
daylight_precip_mm = round(bucket["daylight_precip_mm"], 2)
daylight_precip_probability_max = round(bucket["daylight_precip_probability_max"], 1)
rows.append(
{
"forecast_date": forecast_date,
"day_offset": bucket["day_offset"],
"dry_all_day": precip_mm < DRY_MAX_PRECIP_MM
and precip_probability_max < DRY_MAX_PRECIP_PROBABILITY,
"dry_daylight": daylight_precip_mm < DRY_MAX_PRECIP_MM
and daylight_precip_probability_max < DRY_MAX_PRECIP_PROBABILITY,
"precip_mm": precip_mm,
"precip_probability_max": precip_probability_max,
"daylight_precip_mm": daylight_precip_mm,
"daylight_precip_probability_max": daylight_precip_probability_max,
"confidence": bucket["confidence"],
}
)
return rows
async def ensure_weather_forecast_table(conn) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS facility_weather_forecast (
facility_id INTEGER NOT NULL REFERENCES facilities(id) ON DELETE CASCADE,
forecast_date DATE NOT NULL,
day_offset SMALLINT NOT NULL CHECK (day_offset BETWEEN 0 AND 7),
dry_all_day BOOLEAN NOT NULL DEFAULT FALSE,
dry_daylight BOOLEAN NOT NULL DEFAULT FALSE,
precip_mm NUMERIC(6,2),
precip_probability_max NUMERIC(5,2),
daylight_precip_mm NUMERIC(6,2),
daylight_precip_probability_max NUMERIC(5,2),
confidence TEXT NOT NULL DEFAULT 'medium',
source_updated_at TIMESTAMPTZ,
source_expires_at TIMESTAMPTZ,
source_last_modified TEXT,
calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (facility_id, forecast_date)
)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_daylight
ON facility_weather_forecast (day_offset, dry_daylight)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_all_day
ON facility_weather_forecast (day_offset, dry_all_day)
"""
)
async def _read_existing_metadata(conn, facility_id: int) -> dict | None:
return await conn.fetchrow(
"""
SELECT
COUNT(*)::int AS row_count,
MIN(forecast_date) AS min_date,
MAX(forecast_date) AS max_date,
MAX(source_expires_at) AS source_expires_at,
MAX(source_last_modified) AS source_last_modified
FROM facility_weather_forecast
WHERE facility_id = $1
""",
facility_id,
)
async def _persist_weather_rows(
conn,
facility_id: int,
rows: list[dict],
*,
source_updated_at: datetime | None,
source_expires_at: datetime | None,
source_last_modified: str | None,
) -> None:
async with conn.transaction():
valid_dates = [row["forecast_date"] for row in rows]
await conn.execute("DELETE FROM facility_weather_forecast WHERE facility_id = $1", facility_id)
for row in rows:
await conn.execute(
"""
INSERT INTO facility_weather_forecast (
facility_id,
forecast_date,
day_offset,
dry_all_day,
dry_daylight,
precip_mm,
precip_probability_max,
daylight_precip_mm,
daylight_precip_probability_max,
confidence,
source_updated_at,
source_expires_at,
source_last_modified,
calculated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW()
)
ON CONFLICT (facility_id, forecast_date) DO UPDATE SET
day_offset = EXCLUDED.day_offset,
dry_all_day = EXCLUDED.dry_all_day,
dry_daylight = EXCLUDED.dry_daylight,
precip_mm = EXCLUDED.precip_mm,
precip_probability_max = EXCLUDED.precip_probability_max,
daylight_precip_mm = EXCLUDED.daylight_precip_mm,
daylight_precip_probability_max = EXCLUDED.daylight_precip_probability_max,
confidence = EXCLUDED.confidence,
source_updated_at = EXCLUDED.source_updated_at,
source_expires_at = EXCLUDED.source_expires_at,
source_last_modified = EXCLUDED.source_last_modified,
calculated_at = NOW()
""",
facility_id,
row["forecast_date"],
row["day_offset"],
row["dry_all_day"],
row["dry_daylight"],
row["precip_mm"],
row["precip_probability_max"],
row["daylight_precip_mm"],
row["daylight_precip_probability_max"],
row["confidence"],
source_updated_at,
source_expires_at,
source_last_modified,
)
if valid_dates:
await conn.execute(
"""
DELETE FROM facility_weather_forecast
WHERE facility_id = $1
AND NOT (forecast_date = ANY($2::date[]))
""",
facility_id,
valid_dates,
)
async def sync_facility_weather_forecast(
conn,
client: httpx.AsyncClient,
facility_id: int,
lat: float,
lng: float,
*,
today_local: date | None = None,
force: bool = False,
) -> str:
today = today_local or datetime.now(LOCAL_TZ).date()
metadata = await _read_existing_metadata(conn, facility_id)
row_count = int(metadata["row_count"] or 0) if metadata else 0
min_date = metadata["min_date"] if metadata else None
max_date = metadata["max_date"] if metadata else None
source_expires_at = metadata["source_expires_at"] if metadata else None
source_last_modified = str(metadata["source_last_modified"] or "").strip() if metadata else ""
needs_full_window_refresh = (
row_count != WEATHER_MAX_DAY_OFFSET + 1
or min_date != today
or max_date != today + timedelta(days=WEATHER_MAX_DAY_OFFSET)
)
if not force and not needs_full_window_refresh and isinstance(source_expires_at, datetime):
if source_expires_at.tzinfo is None:
source_expires_at = source_expires_at.replace(tzinfo=timezone.utc)
if source_expires_at > datetime.now(timezone.utc):
return "cached"
headers: dict[str, str] = {}
if source_last_modified and not needs_full_window_refresh:
headers["If-Modified-Since"] = source_last_modified
response = await client.get(
MET_LOCATIONFORECAST_URL,
params={
"lat": round(float(lat), 4),
"lon": round(float(lng), 4),
},
headers=headers,
)
if response.status_code == 304:
refreshed_expires_at = _parse_http_datetime(response.headers.get("Expires")) or source_expires_at
refreshed_last_modified = response.headers.get("Last-Modified") or source_last_modified
await conn.execute(
"""
UPDATE facility_weather_forecast
SET source_expires_at = $2,
source_last_modified = $3,
calculated_at = NOW()
WHERE facility_id = $1
""",
facility_id,
refreshed_expires_at,
refreshed_last_modified,
)
return "not_modified"
response.raise_for_status()
payload = response.json()
rows = summarize_weather_forecast(payload, today_local=today)
updated_at = _parse_iso_datetime((((payload.get("properties") or {}).get("meta") or {}).get("updated_at")))
expires_at = _parse_http_datetime(response.headers.get("Expires"))
last_modified = response.headers.get("Last-Modified")
await _persist_weather_rows(
conn,
facility_id,
rows,
source_updated_at=updated_at,
source_expires_at=expires_at,
source_last_modified=last_modified,
)
return "updated"
async def sync_all_weather_forecasts(pool, *, force: bool = False) -> dict[str, int]:
today_local = datetime.now(LOCAL_TZ).date()
latest_local_date = today_local + timedelta(days=WEATHER_MAX_DAY_OFFSET)
async with pool.acquire() as conn:
facilities = await conn.fetch(
"""
SELECT id, name, lat, lng
FROM facilities
WHERE lat IS NOT NULL AND lng IS NOT NULL
ORDER BY id ASC
"""
)
if not facilities:
return {"facilities": 0, "updated": 0, "cached": 0, "not_modified": 0, "failed": 0}
stats = {"facilities": len(facilities), "updated": 0, "cached": 0, "not_modified": 0, "failed": 0}
semaphore = asyncio.Semaphore(WEATHER_SYNC_CONCURRENCY)
timeout = httpx.Timeout(20.0, connect=10.0)
async with httpx.AsyncClient(
timeout=timeout,
headers={
"User-Agent": MET_API_USER_AGENT,
"Accept": "application/json",
},
follow_redirects=True,
) as client:
async def handle_facility(facility) -> None:
async with semaphore:
async with pool.acquire() as conn:
try:
outcome = await sync_facility_weather_forecast(
conn,
client,
int(facility["id"]),
float(facility["lat"]),
float(facility["lng"]),
force=force,
)
stats[outcome] = stats.get(outcome, 0) + 1
except Exception as exc:
stats["failed"] += 1
print(
f"Vær-sync feilet for {facility['name']} (id={facility['id']}): {exc}"
)
await asyncio.gather(*(handle_facility(facility) for facility in facilities))
async with pool.acquire() as conn:
await conn.execute(
"""
DELETE FROM facility_weather_forecast
WHERE forecast_date < $1
OR forecast_date > $2
""",
today_local,
latest_local_date,
)
return stats
async def weather_sync_loop(pool, stop_event: asyncio.Event) -> None:
await asyncio.sleep(WEATHER_SYNC_INITIAL_DELAY_SECONDS + random.uniform(0, 30))
while not stop_event.is_set():
try:
stats = await sync_all_weather_forecasts(pool)
print(
"Vær-sync fullført: "
f"{stats['updated']} oppdatert, "
f"{stats['not_modified']} uendret, "
f"{stats['cached']} cachet, "
f"{stats['failed']} feilet"
)
except Exception as exc:
print(f"Vær-sync feilet på batch-nivå: {exc}")
sleep_seconds = WEATHER_SYNC_INTERVAL_SECONDS + random.uniform(0, 600)
try:
await asyncio.wait_for(stop_event.wait(), timeout=sleep_seconds)
except asyncio.TimeoutError:
continue