import asyncio import os import random from datetime import date, datetime, time, timedelta, timezone from email.utils import parsedate_to_datetime from zoneinfo import ZoneInfo import asyncpg import httpx LOCAL_TZ = ZoneInfo("Europe/Oslo") MET_LOCATIONFORECAST_URL = os.getenv( "MET_LOCATIONFORECAST_URL", "https://api.met.no/weatherapi/locationforecast/2.0/compact", ).strip() MET_API_USER_AGENT = os.getenv( "MET_API_USER_AGENT", "TeeOff.no/1.0 contact@teeoff.no https://teeoff.no", ).strip() WEATHER_SYNC_INTERVAL_SECONDS = max(900, int(os.getenv("WEATHER_SYNC_INTERVAL_SECONDS", "3600"))) WEATHER_SYNC_INITIAL_DELAY_SECONDS = max(5, int(os.getenv("WEATHER_SYNC_INITIAL_DELAY_SECONDS", "20"))) WEATHER_SYNC_CONCURRENCY = max(1, min(4, int(os.getenv("WEATHER_SYNC_CONCURRENCY", "3")))) WEATHER_MAX_DAY_OFFSET = 7 DAYLIGHT_START_HOUR = 8 DAYLIGHT_END_HOUR = 20 DRY_MAX_PRECIP_MM = 0.3 DRY_MAX_PRECIP_PROBABILITY = 25.0 def _parse_http_datetime(value: str | None) -> datetime | None: raw = str(value or "").strip() if not raw: return None try: parsed = parsedate_to_datetime(raw) except (TypeError, ValueError, IndexError): return None if parsed.tzinfo is None: return parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _parse_iso_datetime(value: str | None) -> datetime | None: raw = str(value or "").strip() if not raw: return None try: return datetime.fromisoformat(raw.replace("Z", "+00:00")) except ValueError: return None def _format_confidence(day_offset: int) -> str: if day_offset <= 2: return "high" if day_offset <= 5: return "medium" return "low" def _overlap_hours(start: datetime, end: datetime, window_start: datetime, window_end: datetime) -> float: overlap_start = max(start, window_start) overlap_end = min(end, window_end) if overlap_end <= overlap_start: return 0.0 return (overlap_end - overlap_start).total_seconds() / 3600 def _select_period_data(data: dict) -> tuple[int, dict] | None: for key, hours in (("next_1_hours", 1), ("next_6_hours", 6), ("next_12_hours", 12)): details = ((data.get(key) or {}).get("details") or {}) if "precipitation_amount" in details or "probability_of_precipitation" in details: return hours, details return None def summarize_weather_forecast(payload: dict, today_local: date | None = None) -> list[dict]: timeseries = (((payload or {}).get("properties") or {}).get("timeseries") or []) today = today_local or datetime.now(LOCAL_TZ).date() buckets: dict[date, dict] = {} for offset in range(WEATHER_MAX_DAY_OFFSET + 1): forecast_date = today + timedelta(days=offset) buckets[forecast_date] = { "forecast_date": forecast_date, "day_offset": offset, "precip_mm": 0.0, "precip_probability_max": 0.0, "daylight_precip_mm": 0.0, "daylight_precip_probability_max": 0.0, "confidence": _format_confidence(offset), } for entry in timeseries: period = _select_period_data((entry or {}).get("data") or {}) if not period: continue period_hours, details = period base_time = _parse_iso_datetime((entry or {}).get("time")) if base_time is None or period_hours <= 0: continue precip_amount = float(details.get("precipitation_amount") or 0.0) precip_probability = float(details.get("probability_of_precipitation") or 0.0) start_local = base_time.astimezone(LOCAL_TZ) end_local = (base_time + timedelta(hours=period_hours)).astimezone(LOCAL_TZ) if end_local <= start_local: continue for forecast_date, bucket in buckets.items(): day_start = datetime.combine(forecast_date, time.min, tzinfo=LOCAL_TZ) day_end = day_start + timedelta(days=1) day_overlap = _overlap_hours(start_local, end_local, day_start, day_end) if day_overlap > 0: ratio = day_overlap / period_hours bucket["precip_mm"] += precip_amount * ratio bucket["precip_probability_max"] = max(bucket["precip_probability_max"], precip_probability) daylight_start = datetime.combine( forecast_date, time(hour=DAYLIGHT_START_HOUR), tzinfo=LOCAL_TZ, ) daylight_end = datetime.combine( forecast_date, time(hour=DAYLIGHT_END_HOUR), tzinfo=LOCAL_TZ, ) daylight_overlap = _overlap_hours(start_local, end_local, daylight_start, daylight_end) if daylight_overlap > 0: ratio = daylight_overlap / period_hours bucket["daylight_precip_mm"] += precip_amount * ratio bucket["daylight_precip_probability_max"] = max( bucket["daylight_precip_probability_max"], precip_probability, ) rows: list[dict] = [] for forecast_date in sorted(buckets.keys()): bucket = buckets[forecast_date] precip_mm = round(bucket["precip_mm"], 2) precip_probability_max = round(bucket["precip_probability_max"], 1) daylight_precip_mm = round(bucket["daylight_precip_mm"], 2) daylight_precip_probability_max = round(bucket["daylight_precip_probability_max"], 1) rows.append( { "forecast_date": forecast_date, "day_offset": bucket["day_offset"], "dry_all_day": precip_mm < DRY_MAX_PRECIP_MM and precip_probability_max < DRY_MAX_PRECIP_PROBABILITY, "dry_daylight": daylight_precip_mm < DRY_MAX_PRECIP_MM and daylight_precip_probability_max < DRY_MAX_PRECIP_PROBABILITY, "precip_mm": precip_mm, "precip_probability_max": precip_probability_max, "daylight_precip_mm": daylight_precip_mm, "daylight_precip_probability_max": daylight_precip_probability_max, "confidence": bucket["confidence"], } ) return rows async def ensure_weather_forecast_table(conn) -> None: await conn.execute( """ CREATE TABLE IF NOT EXISTS facility_weather_forecast ( facility_id INTEGER NOT NULL REFERENCES facilities(id) ON DELETE CASCADE, forecast_date DATE NOT NULL, day_offset SMALLINT NOT NULL CHECK (day_offset BETWEEN 0 AND 7), dry_all_day BOOLEAN NOT NULL DEFAULT FALSE, dry_daylight BOOLEAN NOT NULL DEFAULT FALSE, precip_mm NUMERIC(6,2), precip_probability_max NUMERIC(5,2), daylight_precip_mm NUMERIC(6,2), daylight_precip_probability_max NUMERIC(5,2), confidence TEXT NOT NULL DEFAULT 'medium', source_updated_at TIMESTAMPTZ, source_expires_at TIMESTAMPTZ, source_last_modified TEXT, calculated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), PRIMARY KEY (facility_id, forecast_date) ) """ ) await conn.execute( """ CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_daylight ON facility_weather_forecast (day_offset, dry_daylight) """ ) await conn.execute( """ CREATE INDEX IF NOT EXISTS idx_facility_weather_forecast_all_day ON facility_weather_forecast (day_offset, dry_all_day) """ ) async def _read_existing_metadata(conn, facility_id: int) -> dict | None: return await conn.fetchrow( """ SELECT COUNT(*)::int AS row_count, MIN(forecast_date) AS min_date, MAX(forecast_date) AS max_date, MAX(source_expires_at) AS source_expires_at, MAX(source_last_modified) AS source_last_modified FROM facility_weather_forecast WHERE facility_id = $1 """, facility_id, ) async def _persist_weather_rows( conn, facility_id: int, rows: list[dict], *, source_updated_at: datetime | None, source_expires_at: datetime | None, source_last_modified: str | None, ) -> None: async with conn.transaction(): valid_dates = [row["forecast_date"] for row in rows] await conn.execute("DELETE FROM facility_weather_forecast WHERE facility_id = $1", facility_id) for row in rows: await conn.execute( """ INSERT INTO facility_weather_forecast ( facility_id, forecast_date, day_offset, dry_all_day, dry_daylight, precip_mm, precip_probability_max, daylight_precip_mm, daylight_precip_probability_max, confidence, source_updated_at, source_expires_at, source_last_modified, calculated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW() ) ON CONFLICT (facility_id, forecast_date) DO UPDATE SET day_offset = EXCLUDED.day_offset, dry_all_day = EXCLUDED.dry_all_day, dry_daylight = EXCLUDED.dry_daylight, precip_mm = EXCLUDED.precip_mm, precip_probability_max = EXCLUDED.precip_probability_max, daylight_precip_mm = EXCLUDED.daylight_precip_mm, daylight_precip_probability_max = EXCLUDED.daylight_precip_probability_max, confidence = EXCLUDED.confidence, source_updated_at = EXCLUDED.source_updated_at, source_expires_at = EXCLUDED.source_expires_at, source_last_modified = EXCLUDED.source_last_modified, calculated_at = NOW() """, facility_id, row["forecast_date"], row["day_offset"], row["dry_all_day"], row["dry_daylight"], row["precip_mm"], row["precip_probability_max"], row["daylight_precip_mm"], row["daylight_precip_probability_max"], row["confidence"], source_updated_at, source_expires_at, source_last_modified, ) if valid_dates: await conn.execute( """ DELETE FROM facility_weather_forecast WHERE facility_id = $1 AND NOT (forecast_date = ANY($2::date[])) """, facility_id, valid_dates, ) async def sync_facility_weather_forecast( conn, client: httpx.AsyncClient, facility_id: int, lat: float, lng: float, *, today_local: date | None = None, force: bool = False, ) -> str: today = today_local or datetime.now(LOCAL_TZ).date() metadata = await _read_existing_metadata(conn, facility_id) row_count = int(metadata["row_count"] or 0) if metadata else 0 min_date = metadata["min_date"] if metadata else None max_date = metadata["max_date"] if metadata else None source_expires_at = metadata["source_expires_at"] if metadata else None source_last_modified = str(metadata["source_last_modified"] or "").strip() if metadata else "" needs_full_window_refresh = ( row_count != WEATHER_MAX_DAY_OFFSET + 1 or min_date != today or max_date != today + timedelta(days=WEATHER_MAX_DAY_OFFSET) ) if not force and not needs_full_window_refresh and isinstance(source_expires_at, datetime): if source_expires_at.tzinfo is None: source_expires_at = source_expires_at.replace(tzinfo=timezone.utc) if source_expires_at > datetime.now(timezone.utc): return "cached" headers: dict[str, str] = {} if source_last_modified and not needs_full_window_refresh: headers["If-Modified-Since"] = source_last_modified response = await client.get( MET_LOCATIONFORECAST_URL, params={ "lat": round(float(lat), 4), "lon": round(float(lng), 4), }, headers=headers, ) if response.status_code == 304: refreshed_expires_at = _parse_http_datetime(response.headers.get("Expires")) or source_expires_at refreshed_last_modified = response.headers.get("Last-Modified") or source_last_modified await conn.execute( """ UPDATE facility_weather_forecast SET source_expires_at = $2, source_last_modified = $3, calculated_at = NOW() WHERE facility_id = $1 """, facility_id, refreshed_expires_at, refreshed_last_modified, ) return "not_modified" response.raise_for_status() payload = response.json() rows = summarize_weather_forecast(payload, today_local=today) updated_at = _parse_iso_datetime((((payload.get("properties") or {}).get("meta") or {}).get("updated_at"))) expires_at = _parse_http_datetime(response.headers.get("Expires")) last_modified = response.headers.get("Last-Modified") await _persist_weather_rows( conn, facility_id, rows, source_updated_at=updated_at, source_expires_at=expires_at, source_last_modified=last_modified, ) return "updated" async def sync_all_weather_forecasts(pool, *, force: bool = False) -> dict[str, int]: today_local = datetime.now(LOCAL_TZ).date() latest_local_date = today_local + timedelta(days=WEATHER_MAX_DAY_OFFSET) async with pool.acquire() as conn: facilities = await conn.fetch( """ SELECT id, name, lat, lng FROM facilities WHERE lat IS NOT NULL AND lng IS NOT NULL ORDER BY id ASC """ ) if not facilities: return {"facilities": 0, "updated": 0, "cached": 0, "not_modified": 0, "failed": 0} stats = {"facilities": len(facilities), "updated": 0, "cached": 0, "not_modified": 0, "failed": 0} semaphore = asyncio.Semaphore(WEATHER_SYNC_CONCURRENCY) timeout = httpx.Timeout(20.0, connect=10.0) async with httpx.AsyncClient( timeout=timeout, headers={ "User-Agent": MET_API_USER_AGENT, "Accept": "application/json", }, follow_redirects=True, ) as client: async def handle_facility(facility) -> None: async with semaphore: async with pool.acquire() as conn: try: outcome = await sync_facility_weather_forecast( conn, client, int(facility["id"]), float(facility["lat"]), float(facility["lng"]), force=force, ) stats[outcome] = stats.get(outcome, 0) + 1 except Exception as exc: stats["failed"] += 1 print( f"Vær-sync feilet for {facility['name']} (id={facility['id']}): {exc}" ) await asyncio.gather(*(handle_facility(facility) for facility in facilities)) async with pool.acquire() as conn: await conn.execute( """ DELETE FROM facility_weather_forecast WHERE forecast_date < $1 OR forecast_date > $2 """, today_local, latest_local_date, ) return stats async def weather_sync_loop(pool, stop_event: asyncio.Event) -> None: await asyncio.sleep(WEATHER_SYNC_INITIAL_DELAY_SECONDS + random.uniform(0, 30)) while not stop_event.is_set(): try: stats = await sync_all_weather_forecasts(pool) print( "Vær-sync fullført: " f"{stats['updated']} oppdatert, " f"{stats['not_modified']} uendret, " f"{stats['cached']} cachet, " f"{stats['failed']} feilet" ) except Exception as exc: print(f"Vær-sync feilet på batch-nivå: {exc}") sleep_seconds = WEATHER_SYNC_INTERVAL_SECONDS + random.uniform(0, 600) try: await asyncio.wait_for(stop_event.wait(), timeout=sleep_seconds) except asyncio.TimeoutError: continue