Nye-TeeOff/backend/course_status_history.py

137 lines
3.9 KiB
Python

from datetime import date, datetime, time, timedelta
from zoneinfo import ZoneInfo
OSLO_TIMEZONE = ZoneInfo("Europe/Oslo")
def get_oslo_today() -> date:
return datetime.now(OSLO_TIMEZONE).date()
async def ensure_course_status_history_table(conn) -> None:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS course_status_history (
id SERIAL PRIMARY KEY,
course_id INTEGER NOT NULL REFERENCES courses(id) ON DELETE CASCADE,
facility_id INTEGER NOT NULL REFERENCES facilities(id) ON DELETE CASCADE,
old_status TEXT,
new_status TEXT NOT NULL,
change_source TEXT NOT NULL,
changed_by TEXT,
changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_course_status_history_changed_at
ON course_status_history (changed_at DESC)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_course_status_history_course_id
ON course_status_history (course_id, changed_at DESC)
"""
)
await conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_course_status_history_facility_id
ON course_status_history (facility_id, changed_at DESC)
"""
)
async def log_course_status_change(
conn,
*,
course_id: int,
facility_id: int,
old_status: str | None,
new_status: str | None,
change_source: str,
changed_by: str | None = None,
) -> bool:
normalized_old = str(old_status or "").strip().lower() or "ukjent"
normalized_new = str(new_status or "").strip().lower() or "ukjent"
normalized_source = str(change_source or "").strip().lower()
normalized_changed_by = str(changed_by or "").strip() or None
if not normalized_source:
raise ValueError("change_source må være satt")
if normalized_old == normalized_new:
return False
await conn.execute(
"""
INSERT INTO course_status_history (
course_id,
facility_id,
old_status,
new_status,
change_source,
changed_by
)
VALUES ($1, $2, $3, $4, $5, $6)
""",
course_id,
facility_id,
normalized_old,
normalized_new,
normalized_source,
normalized_changed_by,
)
return True
async def list_course_status_history(conn, *, changed_on: date | None = None, limit: int = 100):
target_date = changed_on or get_oslo_today()
start_at = datetime.combine(target_date, time.min, tzinfo=OSLO_TIMEZONE)
end_at = start_at + timedelta(days=1)
rows = await conn.fetch(
"""
SELECT
h.id,
h.course_id,
h.facility_id,
h.old_status,
h.new_status,
h.change_source,
h.changed_by,
h.changed_at,
c.name AS course_name,
f.name AS facility_name,
f.slug AS facility_slug
FROM course_status_history h
JOIN courses c ON c.id = h.course_id
JOIN facilities f ON f.id = h.facility_id
WHERE h.changed_at >= $1
AND h.changed_at < $2
ORDER BY h.changed_at DESC, h.id DESC
LIMIT $3
""",
start_at,
end_at,
limit,
)
return [
{
"id": row["id"],
"course_id": row["course_id"],
"facility_id": row["facility_id"],
"old_status": row["old_status"],
"new_status": row["new_status"],
"change_source": row["change_source"],
"changed_by": row["changed_by"],
"changed_at": row["changed_at"].isoformat() if row["changed_at"] else None,
"course_name": row["course_name"],
"facility_name": row["facility_name"],
"facility_slug": row["facility_slug"],
}
for row in rows
]