Nye-TeeOff/backend/main.py

3241 lines
119 KiB
Python

"""
TEE OFF BACKEND API v3.8.0 - KOBLET PÅ FULL ADMIN REDIGERING
---------------------------------------------------------------------------
REGEL 1: Bruk str (ikke string) for type-hinting.
REGEL 2: Inkluder alle subqueries for banestatus og hull-data.
REGEL 3: Robust JSON-parsing (format_row) for å hindre Frontend-krasj.
REGEL 4: JWT-sesjoner lagres i HTTP-only cookies.
LOV: Aldri trunker eller slett logikk for "effektivitet".
---------------------------------------------------------------------------
"""
from fastapi import FastAPI, HTTPException, Response, Request, Query
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
import asyncio
import asyncpg
import json
import pyotp
import os
import re
import secrets
import hashlib
import smtplib
from datetime import datetime, date, timedelta
from email.message import EmailMessage
from pathlib import Path
from jose import jwt, JWTError
from passlib.context import CryptContext
import qrcode
import qrcode.image.svg
import httpx
from pydantic import BaseModel
from typing import Optional, List, Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from scrape_jobs import (
SCRAPE_JOB_TYPES,
enqueue_scrape_job,
ensure_scrape_jobs_table,
list_scrape_jobs,
)
from env_config import get_database_url, get_required_env
from weather_forecast import ensure_weather_forecast_table, weather_sync_loop
# --- KONFIGURASJON ---
DB_URL = get_database_url()
SECRET_KEY = get_required_env("JWT_SECRET")
ALGORITHM = "HS256"
PUBLIC_SESSION_SECRET = os.getenv("PUBLIC_SESSION_SECRET", SECRET_KEY)
PUBLIC_SESSION_COOKIE = "teeoff_user_session"
PUBLIC_SESSION_MAX_AGE_SECONDS = 60 * 60 * 24 * 30
PUBLIC_COMMENT_STATUSES = {"pending", "published", "rejected", "deleted"}
PUBLIC_COMMENT_DEFAULT_STATUS = (
os.getenv("PUBLIC_COMMENT_DEFAULT_STATUS", "published").strip().lower()
if os.getenv("PUBLIC_COMMENT_DEFAULT_STATUS", "published").strip().lower() in PUBLIC_COMMENT_STATUSES
else "published"
)
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "").strip()
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "").strip()
GOOGLE_OAUTH_SCOPES = os.getenv("GOOGLE_OAUTH_SCOPES", "openid email profile").strip()
GOOGLE_DISCOVERY_URL = os.getenv(
"GOOGLE_DISCOVERY_URL",
"https://accounts.google.com/.well-known/openid-configuration",
).strip()
SMTP_SERVER = os.getenv("SMTP_SERVER", "").strip()
SMTP_PORT = os.getenv("SMTP_PORT", "").strip()
SMTP_USER = os.getenv("SMTP_USER", "").strip()
SMTP_PASS = os.getenv("SMTP_PASS", "").strip()
PUBLIC_FROM_EMAIL = os.getenv("PUBLIC_FROM_EMAIL", SMTP_USER).strip()
CONTACT_FORM_TO_EMAIL = os.getenv("CONTACT_FORM_TO_EMAIL", "teeoff@teeoff.no").strip()
COMMENT_NOTIFICATION_TO_EMAIL = os.getenv(
"COMMENT_NOTIFICATION_TO_EMAIL",
CONTACT_FORM_TO_EMAIL,
).strip()
INDEXNOW_KEY = os.getenv("INDEXNOW_KEY", "").strip()
INDEXNOW_KEY_LOCATION = os.getenv("INDEXNOW_KEY_LOCATION", "").strip()
INDEXNOW_ENDPOINT = os.getenv("INDEXNOW_ENDPOINT", "https://api.indexnow.org/indexnow").strip()
pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
def get_int_env(name: str, default: int) -> int:
raw = str(os.getenv(name, str(default))).strip()
try:
return int(raw)
except ValueError:
return default
ADMIN_SESSION_MAX_AGE_SECONDS = get_int_env("ADMIN_SESSION_MAX_AGE_SECONDS", 60 * 60 * 12)
ADMIN_REMEMBER_ME_MAX_AGE_SECONDS = get_int_env("ADMIN_REMEMBER_ME_MAX_AGE_SECONDS", 60 * 60 * 24 * 30)
PUBLIC_MAGIC_LINK_MAX_AGE_MINUTES = get_int_env("PUBLIC_MAGIC_LINK_MAX_AGE_MINUTES", 20)
PUBLIC_MAGIC_LINK_REQUEST_COOLDOWN_SECONDS = get_int_env("PUBLIC_MAGIC_LINK_REQUEST_COOLDOWN_SECONDS", 60)
CONTACT_FORM_RATE_LIMIT_WINDOW_SECONDS = get_int_env("CONTACT_FORM_RATE_LIMIT_WINDOW_SECONDS", 60 * 60)
CONTACT_FORM_RATE_LIMIT_MAX_SUBMISSIONS = get_int_env("CONTACT_FORM_RATE_LIMIT_MAX_SUBMISSIONS", 3)
CONTACT_FORM_MIN_FILL_SECONDS = get_int_env("CONTACT_FORM_MIN_FILL_SECONDS", 5)
def resolve_imported_meninger_path() -> Path:
candidates: list[Path] = []
env_path = os.getenv("IMPORTED_MENINGER_PATH")
if env_path:
candidates.append(Path(env_path))
candidates.extend(
[
Path("/opt/teeoff/frontend/src/content/importedMeninger.json"),
Path("/shared/frontend-content/importedMeninger.json"),
Path(__file__).resolve().parent.parent / "frontend" / "src" / "content" / "importedMeninger.json",
]
)
for candidate in candidates:
if candidate.exists():
return candidate
raise HTTPException(
status_code=404,
detail=(
"Fant ikke importedMeninger.json. Sjekk at frontend/src/content er tilgjengelig "
"for API-et eller sett IMPORTED_MENINGER_PATH."
),
)
def is_google_login_configured() -> bool:
return bool(GOOGLE_CLIENT_ID and GOOGLE_CLIENT_SECRET)
def is_magic_link_configured() -> bool:
return bool(SMTP_SERVER and SMTP_PORT and SMTP_USER and SMTP_PASS and PUBLIC_FROM_EMAIL)
def is_contact_form_configured() -> bool:
return is_magic_link_configured() and bool(CONTACT_FORM_TO_EMAIL)
def get_public_auth_config() -> dict[str, Any]:
google_enabled = is_google_login_configured()
magic_link_enabled = is_magic_link_configured()
return {
"configured": google_enabled or magic_link_enabled,
"google": google_enabled,
"magic_link": magic_link_enabled,
}
def get_configured_public_base_url() -> str:
for env_name in ("PUBLIC_BASE_URL", "NEXT_PUBLIC_SITE_URL"):
configured = os.getenv(env_name, "").strip().rstrip("/")
if configured:
return configured
return ""
def build_public_base_url(request: Request) -> str:
configured = get_configured_public_base_url()
if configured:
return configured
forwarded_proto = request.headers.get("x-forwarded-proto")
forwarded_host = request.headers.get("x-forwarded-host")
if forwarded_proto and forwarded_host:
return f"{forwarded_proto}://{forwarded_host}".rstrip("/")
return str(request.base_url).rstrip("/")
def build_google_redirect_uri(request: Request) -> str:
return f"{build_public_base_url(request)}/api/public/auth/google/callback"
def build_absolute_public_url(path: str) -> str | None:
base_url = get_configured_public_base_url()
if not base_url:
return None
if not path:
return base_url
return f"{base_url}{path if path.startswith('/') else f'/{path}'}"
def dedupe_strings(values: list[str]) -> list[str]:
deduped: list[str] = []
seen: set[str] = set()
for value in values:
normalized = str(value or "").strip()
if not normalized or normalized in seen:
continue
deduped.append(normalized)
seen.add(normalized)
return deduped
def build_article_public_url(section: str | None, slug: str | None) -> str | None:
normalized_section = str(section or "").strip().lower()
normalized_slug = str(slug or "").strip()
if normalized_section not in {"banebesok", "meninger"} or not normalized_slug:
return None
return build_absolute_public_url(f"/{normalized_section}/{normalized_slug}")
def build_facility_public_url(slug: str | None) -> str | None:
normalized_slug = str(slug or "").strip()
if not normalized_slug:
return None
return build_absolute_public_url(f"/golfbaner/{normalized_slug}")
def collect_article_indexnow_urls(
previous_article: dict[str, Any] | None = None,
current_article: dict[str, Any] | None = None,
) -> list[str]:
urls: list[str] = []
def add_section_url(section: str | None) -> None:
normalized_section = str(section or "").strip().lower()
if normalized_section not in {"banebesok", "meninger"}:
return
section_url = build_absolute_public_url(f"/{normalized_section}")
if section_url:
urls.append(section_url)
def add_article_url(article: dict[str, Any] | None, include_when_status: str) -> None:
if not article:
return
if str(article.get("status") or "").strip().lower() != include_when_status:
return
article_url = build_article_public_url(article.get("section"), article.get("slug"))
if article_url:
urls.append(article_url)
add_section_url(article.get("section"))
add_article_url(previous_article, "published")
add_article_url(current_article, "published")
return dedupe_strings(urls)
def collect_facility_indexnow_urls(slugs: list[str], extra_paths: list[str] | None = None) -> list[str]:
urls: list[str] = []
for slug in slugs:
facility_url = build_facility_public_url(slug)
if facility_url:
urls.append(facility_url)
for path in extra_paths or []:
listing_url = build_absolute_public_url(path)
if listing_url:
urls.append(listing_url)
return dedupe_strings(urls)
def get_indexnow_key_location() -> str | None:
configured = INDEXNOW_KEY_LOCATION.strip()
if configured:
return configured
if not INDEXNOW_KEY:
return None
return build_absolute_public_url(f"/{INDEXNOW_KEY}.txt")
def normalize_indexnow_urls(urls: list[str]) -> list[str]:
base_url = get_configured_public_base_url()
if not base_url:
return []
site = urlsplit(base_url)
allowed_host = site.netloc.strip().lower()
if not allowed_host:
return []
normalized_urls: list[str] = []
seen: set[str] = set()
for value in urls:
candidate = str(value or "").strip()
if not candidate:
continue
parsed = urlsplit(candidate)
if parsed.scheme not in {"http", "https"}:
continue
if parsed.netloc.strip().lower() != allowed_host:
continue
normalized = urlunsplit((parsed.scheme, parsed.netloc, parsed.path or "/", parsed.query, ""))
if normalized in seen:
continue
seen.add(normalized)
normalized_urls.append(normalized)
return normalized_urls
async def submit_indexnow_urls(urls: list[str], reason: str) -> None:
if not INDEXNOW_KEY:
return
normalized_urls = normalize_indexnow_urls(urls)
base_url = get_configured_public_base_url()
key_location = get_indexnow_key_location()
host = urlsplit(base_url).netloc.strip() if base_url else ""
if not normalized_urls or not host or not key_location:
return
payload = {
"host": host,
"key": INDEXNOW_KEY,
"keyLocation": key_location,
"urlList": normalized_urls,
}
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(INDEXNOW_ENDPOINT, json=payload)
except Exception as exc:
print(f"IndexNow-feil ({reason}): {exc}")
return
if response.status_code >= 400:
body_preview = response.text.strip().replace("\n", " ")
print(f"IndexNow avvist ({reason}): HTTP {response.status_code} {body_preview[:300]}")
return
print(f"IndexNow sendt ({reason}): {len(normalized_urls)} URL-er, HTTP {response.status_code}")
def schedule_indexnow_submission(urls: list[str], reason: str) -> None:
if not INDEXNOW_KEY:
return
normalized_urls = normalize_indexnow_urls(urls)
if not normalized_urls:
return
asyncio.create_task(submit_indexnow_urls(normalized_urls, reason))
def should_use_secure_cookies(request: Request) -> bool:
configured = os.getenv("PUBLIC_BASE_URL", "").strip().lower()
if configured.startswith("https://"):
return True
forwarded_proto = request.headers.get("x-forwarded-proto", "").split(",")[0].strip().lower()
if forwarded_proto:
return forwarded_proto == "https"
return request.url.scheme == "https"
async def get_google_discovery_document() -> dict[str, Any]:
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.get(GOOGLE_DISCOVERY_URL)
response.raise_for_status()
return response.json()
def normalize_return_to_path(value: str | None) -> str:
candidate = str(value or "/").strip()
if not candidate.startswith("/"):
return "/"
if candidate.startswith("//"):
return "/"
return candidate or "/"
def append_query_param(url: str, key: str, value: str) -> str:
parsed = urlsplit(url)
query_params = dict(parse_qsl(parsed.query, keep_blank_values=True))
query_params[key] = value
return urlunsplit(
(
parsed.scheme,
parsed.netloc,
parsed.path,
urlencode(query_params),
parsed.fragment,
)
)
def create_public_session_token(user_id: int) -> str:
expires_at = datetime.utcnow() + timedelta(seconds=PUBLIC_SESSION_MAX_AGE_SECONDS)
return jwt.encode(
{
"sub": f"public:{user_id}",
"uid": user_id,
"exp": expires_at,
},
PUBLIC_SESSION_SECRET,
algorithm=ALGORITHM,
)
def decode_public_session_token(token: str) -> dict[str, Any]:
try:
payload = jwt.decode(token, PUBLIC_SESSION_SECRET, algorithms=[ALGORITHM])
if not payload.get("uid"):
raise JWTError()
return payload
except JWTError as exc:
raise HTTPException(status_code=401, detail="Ugyldig eller utløpt brukerøkt") from exc
def hash_request_ip(request: Request) -> str | None:
forwarded_for = request.headers.get("x-forwarded-for", "")
ip = forwarded_for.split(",")[0].strip() if forwarded_for else (request.client.host if request.client else "")
if not ip:
return None
return hashlib.sha256(ip.encode("utf-8")).hexdigest()
def hash_magic_link_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
def normalize_public_email(email: str | None) -> str | None:
normalized = str(email or "").strip().lower()
return normalized or None
async def send_magic_link_email(email_address: str, login_url: str) -> None:
subject = "Logg inn på TeeOff"
body = (
"Hei,\n\n"
"Klikk på lenken under for å logge inn på TeeOff og kommentere:\n\n"
f"{login_url}\n\n"
f"Lenken utløper om {PUBLIC_MAGIC_LINK_MAX_AGE_MINUTES} minutter.\n\n"
"Hvis du ikke ba om denne lenken, kan du se bort fra e-posten.\n"
)
def _send() -> None:
message = EmailMessage()
message["From"] = PUBLIC_FROM_EMAIL
message["To"] = email_address
message["Subject"] = subject
message.set_content(body)
with smtplib.SMTP_SSL(SMTP_SERVER, int(SMTP_PORT)) as server:
server.login(SMTP_USER, SMTP_PASS)
server.send_message(message)
await asyncio.to_thread(_send)
async def send_contact_form_email(
*,
sender_name: str,
sender_email: str,
topic: str,
message: str,
ip_hash: str | None,
) -> None:
subject = f"[TeeOff Kontakt] {topic}"
body = (
"Ny melding fra kontaktskjemaet på TeeOff.no\n\n"
f"Navn: {sender_name}\n"
f"E-post: {sender_email}\n"
f"Emne: {topic}\n"
f"IP-hash: {ip_hash or 'ukjent'}\n\n"
"Melding:\n"
f"{message.strip()}\n"
)
def _send() -> None:
mail = EmailMessage()
mail["From"] = PUBLIC_FROM_EMAIL
mail["To"] = CONTACT_FORM_TO_EMAIL
mail["Reply-To"] = sender_email
mail["Subject"] = subject
mail.set_content(body)
with smtplib.SMTP_SSL(SMTP_SERVER, int(SMTP_PORT)) as server:
server.login(SMTP_USER, SMTP_PASS)
server.send_message(mail)
await asyncio.to_thread(_send)
async def send_facility_feedback_email(
*,
facility_name: str,
facility_slug: str,
sender_name: str,
sender_email: str,
message: str,
ip_hash: str | None,
) -> None:
subject = f"[TeeOff Golfbane] Vedrørende {facility_name}"
facility_url = f"https://teeoff.no/golfbaner/{facility_slug}" if facility_slug else "ukjent"
body = (
"Ny tilbakemelding fra baneprofil på TeeOff.no\n\n"
f"Golfanlegg: {facility_name}\n"
f"Side: {facility_url}\n"
f"Navn: {sender_name}\n"
f"E-post: {sender_email}\n"
f"IP-hash: {ip_hash or 'ukjent'}\n\n"
"Melding:\n"
f"{message.strip()}\n"
)
def _send() -> None:
mail = EmailMessage()
mail["From"] = PUBLIC_FROM_EMAIL
mail["To"] = CONTACT_FORM_TO_EMAIL
mail["Reply-To"] = sender_email
mail["Subject"] = subject
mail.set_content(body)
with smtplib.SMTP_SSL(SMTP_SERVER, int(SMTP_PORT)) as server:
server.login(SMTP_USER, SMTP_PASS)
server.send_message(mail)
await asyncio.to_thread(_send)
async def send_comment_notification_email(
*,
article_title: str,
article_url: str,
article_section: str,
comment_body: str,
comment_status: str,
commenter_name: str,
commenter_email: str | None,
parent_author_name: str | None = None,
) -> None:
if not (is_magic_link_configured() and COMMENT_NOTIFICATION_TO_EMAIL):
return
subject = f"[TeeOff Kommentar] {article_title}"
body = (
"Ny kommentar på TeeOff.no\n\n"
f"Seksjon: {article_section}\n"
f"Artikkel: {article_title}\n"
f"Lenke: {article_url}\n"
f"Status: {comment_status}\n"
f"Forfatter: {commenter_name}\n"
f"E-post: {commenter_email or 'ikke tilgjengelig'}\n"
f"Svar på kommentar fra: {parent_author_name or 'ingen, dette er toppnivå'}\n\n"
"Kommentar:\n"
f"{comment_body.strip()}\n"
)
def _send() -> None:
mail = EmailMessage()
mail["From"] = PUBLIC_FROM_EMAIL
mail["To"] = COMMENT_NOTIFICATION_TO_EMAIL
if commenter_email:
mail["Reply-To"] = commenter_email
mail["Subject"] = subject
mail.set_content(body)
with smtplib.SMTP_SSL(SMTP_SERVER, int(SMTP_PORT)) as server:
server.login(SMTP_USER, SMTP_PASS)
server.send_message(mail)
await asyncio.to_thread(_send)
async def validate_admin_session_token(token: str) -> str:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise JWTError()
return username
except JWTError as exc:
raise HTTPException(status_code=401, detail="Ugyldig eller utløpt admin-sesjon") from exc
# --- PYDANTIC MODELLER ---
class CourseStatusUpdate(BaseModel):
id: int
status: str
class ScrapeSettingsUpdate(BaseModel):
scrape_method: Optional[str] = None
scrape_status_url: Optional[str] = None
scrape_status_selector: Optional[str] = None
ai_instruction: Optional[str] = None
courses: Optional[List[CourseStatusUpdate]] = []
# NY MODELL FOR Å TA IMOT IDER FOR SCRAPING
class ScrapeRunRequest(BaseModel):
facility_ids: List[int]
class MembershipDraftApproval(BaseModel):
facility_id: int
navn_standard_medlemskap: Optional[str] = None
standard_medlemskap: Optional[int] = None
standard_medlemskap_kommentarer: Optional[str] = None
navn_rimeligste_alternativ: Optional[str] = None
rimeligste_alternativ: Optional[int] = None
class BulkApprovalRequest(BaseModel):
approvals: List[MembershipDraftApproval]
class QuickEditRequest(BaseModel):
field: str
value: str
class GreenfeeApproval(BaseModel):
facility_id: int
greenfee: List[dict]
class GolfpakkerApproval(BaseModel):
facility_id: int
golfpakker: List[dict]
class VtgApproval(BaseModel):
facility_id: int
vtg_pris: int | None
vtg_beskrivelse: str | None
vtg_datoer: List[dict] | None
class BulkVtgRequest(BaseModel):
approvals: List[VtgApproval]
class BulkGolfpakkerRequest(BaseModel):
approvals: List[GolfpakkerApproval]
class AdminPasswordConfirm(BaseModel):
password: str
class ArticleUpsertRequest(BaseModel):
section: Optional[str] = "banebesok"
slug: str
title: str
description: Optional[str] = None
excerpt: Optional[str] = None
eyebrow: Optional[str] = None
location_label: Optional[str] = None
facility_name: Optional[str] = None
facility_slug: Optional[str] = None
author_name: Optional[str] = None
status: Optional[str] = "draft"
hero_images: Optional[List[dict[str, Any]]] = []
media_gallery: Optional[List[dict[str, Any]]] = []
featured_media_id: Optional[str] = None
content_html: Optional[str] = None
source_url: Optional[str] = None
source_label: Optional[str] = None
published_at: Optional[str] = None
updated_at: Optional[str] = None
class PublicCommentCreateRequest(BaseModel):
body: str
parent_id: Optional[int] = None
class PublicMagicLinkRequest(BaseModel):
email: str
return_to: Optional[str] = "/"
class PublicContactFormRequest(BaseModel):
name: str
email: str
topic: str
message: str
website: Optional[str] = ""
started_at: Optional[int] = None
class PublicFacilityFeedbackRequest(BaseModel):
facility_id: int
name: str
email: str
message: str
website: Optional[str] = ""
started_at: Optional[int] = None
# --- FUNKSJONER ---
def format_row(row):
"""
Vasker data fra databasen:
1. Konverterer datoer til ISO-format.
2. Tvinger tekst-JSON (stringified JSON) over til ekte Python objekter/lister.
3. Sikrer at lister og objekter aldri er None for å hindre Frontend-krasj.
"""
if row is None:
return None
d = dict(row)
for key in [
'status_updated_at', 'created_at', 'slope_valid_until',
'membership_updated_at', 'greenfee_updated_at', 'vtg_updated_at', 'footnote_updated_at',
'golfpakker_updated_at'
]:
if isinstance(d.get(key), (date, datetime)):
d[key] = d[key].isoformat()
json_list_fields = [
'course_statuses', 'courses', 'gallery', 'greenfee',
'faqs', 'shotzoom', 'social_links', 'holes', 'golfpakker', 'cooperating_clubs', 'vtg_datoer',
'weather_forecast'
]
json_dict_fields = [
'amenities', 'vtg', 'nsg_data', 'golfamore_data',
'membership_draft', 'greenfee_draft', 'vtg_draft', 'golfpakker_draft'
]
for field in json_list_fields:
if field in d:
val = d[field]
if val is None:
d[field] = []
elif isinstance(val, str):
try:
d[field] = json.loads(val)
except:
d[field] = []
elif not isinstance(val, list):
d[field] = []
for field in json_dict_fields:
if field in d:
val = d[field]
if val is None:
d[field] = {}
elif isinstance(val, str):
try:
d[field] = json.loads(val)
except:
d[field] = {}
elif not isinstance(val, dict):
d[field] = {}
return d
def normalize_club_lookup_value(value: str | None) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = text.replace("&", " og ")
text = re.sub(r"\bgk\b", " golfklubb ", text)
text = re.sub(r"\bgs\b", " golfsenter ", text)
text = re.sub(r"[^a-z0-9æøå]+", " ", text)
text = re.sub(r"\s+", " ", text).strip()
return text
async def resolve_cooperating_club_slugs(
conn,
suggested_names: list[str] | None,
*,
exclude_facility_id: int | None = None,
) -> list[str]:
cleaned_names = [str(name or "").strip() for name in (suggested_names or []) if str(name or "").strip()]
if not cleaned_names:
return []
rows = await conn.fetch("SELECT id, name, slug FROM facilities ORDER BY name ASC")
candidates: list[tuple[str, str]] = []
for row in rows:
facility_id = int(row["id"])
if exclude_facility_id and facility_id == exclude_facility_id:
continue
name = str(row["name"] or "").strip()
slug = str(row["slug"] or "").strip()
normalized_candidates = " ".join(
part for part in [
normalize_club_lookup_value(name),
normalize_club_lookup_value(slug.replace("-", " ")),
] if part
).strip()
candidates.append((slug, normalized_candidates))
resolved_slugs: list[str] = []
seen_slugs: set[str] = set()
for suggested_name in cleaned_names:
normalized_suggestion = normalize_club_lookup_value(suggested_name)
if not normalized_suggestion:
continue
exact_matches = [candidate for candidate in candidates if candidate[1] == normalized_suggestion]
if len(exact_matches) == 1:
slug = exact_matches[0][0]
if slug and slug not in seen_slugs:
resolved_slugs.append(slug)
seen_slugs.add(slug)
continue
partial_matches = [
candidate
for candidate in candidates
if normalized_suggestion in candidate[1] or candidate[1] in normalized_suggestion
]
if len(partial_matches) == 1:
slug = partial_matches[0][0]
if slug and slug not in seen_slugs:
resolved_slugs.append(slug)
seen_slugs.add(slug)
return resolved_slugs
async def get_table_columns(conn, table_name: str, schema_name: str = "public") -> set[str]:
rows = await conn.fetch(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2
""",
schema_name,
table_name,
)
return {str(row["column_name"]) for row in rows}
def generate_totp_qr_svg(provisioning_uri: str) -> str:
image = qrcode.make(
provisioning_uri,
image_factory=qrcode.image.svg.SvgPathImage,
box_size=8,
border=3,
)
return image.to_string(encoding="unicode")
def format_article_row(row):
if row is None:
return None
data = dict(row)
for key in ["published_at", "updated_at", "created_at"]:
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
section = str(data.get("section") or "banebesok").strip().lower()
data["section"] = section if section in {"banebesok", "meninger"} else "banebesok"
hero_images = data.get("hero_images")
if hero_images is None:
data["hero_images"] = []
elif isinstance(hero_images, str):
try:
data["hero_images"] = json.loads(hero_images)
except Exception:
data["hero_images"] = []
elif not isinstance(hero_images, list):
data["hero_images"] = []
media_gallery = data.get("media_gallery")
if media_gallery is None:
data["media_gallery"] = []
elif isinstance(media_gallery, str):
try:
data["media_gallery"] = json.loads(media_gallery)
except Exception:
data["media_gallery"] = []
elif not isinstance(media_gallery, list):
data["media_gallery"] = []
if not data["media_gallery"] and data["hero_images"]:
data["media_gallery"] = [
{
"id": build_article_media_id("image", image.get("src") or ""),
"type": "image",
"src": image.get("src") or "",
"alt": image.get("alt") or "",
"caption": image.get("caption") or "",
"poster": "",
}
for image in data["hero_images"]
if isinstance(image, dict) and str(image.get("src") or "").strip()
]
if not data.get("featured_media_id") and data["media_gallery"]:
first_image = next(
(
item
for item in data["media_gallery"]
if isinstance(item, dict) and str(item.get("type") or "image").strip().lower() == "image"
),
None,
)
if first_image:
data["featured_media_id"] = str(first_image.get("id") or "").strip() or None
return data
def normalize_article_status(status: str | None) -> str:
normalized = str(status or "draft").strip().lower()
if normalized not in {"draft", "published"}:
raise HTTPException(status_code=400, detail="Ugyldig artikkelstatus. Bruk 'draft' eller 'published'.")
return normalized
def normalize_article_section(section: str | None, allow_all: bool = False) -> str:
normalized = str(section or "banebesok").strip().lower()
valid_sections = {"banebesok", "meninger"}
if allow_all:
valid_sections = valid_sections | {"all"}
if normalized not in valid_sections:
raise HTTPException(
status_code=400,
detail="Ugyldig artikkelseksjon. Bruk 'banebesok' eller 'meninger'.",
)
return normalized
def parse_optional_datetime(value: str | None) -> datetime | None:
if value is None:
return None
trimmed = str(value).strip()
if not trimmed:
return None
try:
return datetime.fromisoformat(trimmed.replace("Z", "+00:00"))
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ugyldig datoformat: {value}") from exc
def sanitize_hero_images(value: Any) -> list[dict[str, str]]:
if not isinstance(value, list):
return []
sanitized: list[dict[str, str]] = []
for item in value:
if not isinstance(item, dict):
continue
src = str(item.get("src") or "").strip()
if not src:
continue
sanitized.append(
{
"src": normalize_article_media_url(src),
"alt": str(item.get("alt") or "").strip(),
"caption": str(item.get("caption") or "").strip(),
}
)
return sanitized
LEGACY_ARTICLE_MEDIA_PATTERN = re.compile(
r"^https?://(?:www\.)?(?:teeoff\.no|nye\.teeoff\.no|wp\.teeoff\.no)(?P<path>/(?:wp-content/uploads|uploads/articles)/.+)$",
re.IGNORECASE,
)
YOUTUBE_THUMBNAIL_PATTERN = re.compile(
r"^https?://i\.ytimg\.com/vi(?:_webp)?/(?P<video_id>[^/]+)/",
re.IGNORECASE,
)
def normalize_article_media_url(value: str | None) -> str:
trimmed = str(value or "").strip()
if not trimmed:
return ""
legacy_match = LEGACY_ARTICLE_MEDIA_PATTERN.match(trimmed)
if legacy_match:
path = legacy_match.group("path")
if path.startswith("/wp-content/uploads/"):
return f"https://wp.teeoff.no{path}"
return path
return trimmed
def build_article_media_id(media_type: str, src: str) -> str:
digest = hashlib.sha1(f"{media_type}:{src}".encode("utf-8")).hexdigest()[:12]
return f"{media_type}-{digest}"
def sanitize_article_media(value: Any, title: str | None = None) -> list[dict[str, str]]:
if not isinstance(value, list):
return []
sanitized: list[dict[str, str]] = []
seen: set[tuple[str, str]] = set()
fallback_text = str(title or "").strip()
for item in value:
if not isinstance(item, dict):
continue
media_type = str(item.get("type") or "image").strip().lower()
if media_type not in {"image", "video"}:
continue
src = normalize_article_media_url(item.get("src"))
if not src:
continue
dedupe_key = (media_type, src)
if dedupe_key in seen:
continue
seen.add(dedupe_key)
poster = normalize_article_media_url(item.get("poster"))
alt = str(item.get("alt") or "").strip()
caption = str(item.get("caption") or "").strip()
media_id = str(item.get("id") or "").strip() or build_article_media_id(media_type, src)
sanitized.append(
{
"id": media_id,
"type": media_type,
"src": src,
"alt": alt or fallback_text,
"caption": caption or alt or fallback_text,
"poster": poster,
}
)
return sanitized
def build_media_gallery_from_hero_images(hero_images: list[dict[str, str]]) -> list[dict[str, str]]:
return [
{
"id": build_article_media_id("image", image["src"]),
"type": "image",
"src": image["src"],
"alt": image.get("alt") or "",
"caption": image.get("caption") or "",
"poster": "",
}
for image in hero_images
if image.get("src")
]
def sanitize_featured_media_id(featured_media_id: str | None, media_gallery: list[dict[str, str]]) -> str | None:
candidate = str(featured_media_id or "").strip()
if candidate and any(item.get("id") == candidate and item.get("type") == "image" for item in media_gallery):
return candidate
for item in media_gallery:
if item.get("type") == "image":
return item.get("id")
return None
def build_hero_images_from_media_gallery(
media_gallery: list[dict[str, str]],
fallback_hero_images: list[dict[str, str]],
featured_media_id: str | None,
) -> list[dict[str, str]]:
image_media = [
{
"src": item["src"],
"alt": item.get("alt") or "",
"caption": item.get("caption") or item.get("alt") or "",
"id": item.get("id") or "",
}
for item in media_gallery
if item.get("type") == "image" and item.get("src")
]
if not image_media:
return fallback_hero_images
if featured_media_id:
featured_index = next(
(index for index, item in enumerate(image_media) if item.get("id") == featured_media_id),
None,
)
if featured_index is not None and featured_index > 0:
featured_item = image_media.pop(featured_index)
image_media.insert(0, featured_item)
return [
{
"src": item["src"],
"alt": item.get("alt") or "",
"caption": item.get("caption") or item.get("alt") or "",
}
for item in image_media
]
def humanize_slug(slug: str | None) -> str:
if not slug:
return "Ukjent bane"
return " ".join(part.capitalize() for part in str(slug).split("-") if part)
def resolve_imported_article_section(item: dict[str, Any]) -> tuple[str, str]:
category_slugs = {
str(slug).strip().lower()
for slug in (item.get("categorySlugs") or [])
if str(slug or "").strip()
}
categories = item.get("categories") or []
if "banebesok" in category_slugs:
return "banebesok", "Banebesøk"
if "siste-nytt" in category_slugs:
return "meninger", "Siste nytt"
for category in categories:
if not isinstance(category, dict):
continue
label = str(category.get("name") or "").strip()
if label:
return "meninger", label
return "meninger", "Meninger"
def format_public_user_row(row: Any) -> dict[str, Any] | None:
if row is None:
return None
data = dict(row)
for key in ["created_at", "updated_at", "last_login_at", "email_verified_at"]:
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
return {
"id": data.get("id"),
"vipps_sub": data.get("vipps_sub"),
"google_sub": data.get("google_sub"),
"full_name": data.get("full_name"),
"given_name": data.get("given_name"),
"family_name": data.get("family_name"),
"email": data.get("email"),
"phone_number": data.get("phone_number"),
"display_name": data.get("display_name"),
"is_blocked": bool(data.get("is_blocked")),
"email_verified_at": data.get("email_verified_at"),
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"last_login_at": data.get("last_login_at"),
}
def format_comment_row(row: Any) -> dict[str, Any] | None:
if row is None:
return None
data = dict(row)
for key in ["created_at", "updated_at", "published_at"]:
if isinstance(data.get(key), (date, datetime)):
data[key] = data[key].isoformat()
return {
"id": data.get("id"),
"article_id": data.get("article_id"),
"user_id": data.get("user_id"),
"parent_id": data.get("parent_id"),
"body": data.get("body"),
"status": data.get("status"),
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"published_at": data.get("published_at"),
"author": {
"display_name": data.get("display_name") or data.get("full_name") or "TeeOff-leser",
"given_name": data.get("given_name"),
},
"is_pending_for_viewer": data.get("status") == "pending",
}
async def find_public_user_by_email(conn, email: str | None):
normalized_email = normalize_public_email(email)
if not normalized_email:
return None
return await conn.fetchrow(
"""
SELECT *
FROM public_users
WHERE LOWER(email) = $1
ORDER BY id ASC
LIMIT 1
""",
normalized_email,
)
async def upsert_public_user_from_google_profile(conn, profile: dict[str, Any]):
google_sub = str(profile.get("sub") or "").strip()
if not google_sub:
raise HTTPException(status_code=500, detail="Google svarte uten bruker-ID.")
email = normalize_public_email(profile.get("email"))
if not email:
raise HTTPException(status_code=400, detail="Google-kontoen mangler e-postadresse.")
full_name = str(profile.get("name") or "").strip() or None
given_name = str(profile.get("given_name") or "").strip() or None
family_name = str(profile.get("family_name") or "").strip() or None
email_verified = bool(profile.get("email_verified"))
default_display_name = full_name or given_name or email.split("@")[0] or "TeeOff-leser"
existing = await conn.fetchrow("SELECT * FROM public_users WHERE google_sub = $1", google_sub)
if not existing:
existing = await find_public_user_by_email(conn, email)
if existing:
return await conn.fetchrow(
"""
UPDATE public_users
SET
google_sub = COALESCE(public_users.google_sub, $2),
email = $3,
full_name = $4,
given_name = $5,
family_name = $6,
display_name = COALESCE(public_users.display_name, $7),
email_verified_at = CASE
WHEN $8 THEN COALESCE(public_users.email_verified_at, NOW())
ELSE public_users.email_verified_at
END,
updated_at = NOW(),
last_login_at = NOW()
WHERE id = $1
RETURNING *
""",
existing["id"],
google_sub,
email,
full_name,
given_name,
family_name,
default_display_name,
email_verified,
)
return await conn.fetchrow(
"""
INSERT INTO public_users (
google_sub, email, full_name, given_name, family_name, display_name,
email_verified_at, last_login_at
) VALUES (
$1, $2, $3, $4, $5, $6,
CASE WHEN $7 THEN NOW() ELSE NULL END, NOW()
)
RETURNING *
""",
google_sub,
email,
full_name,
given_name,
family_name,
default_display_name,
email_verified,
)
async def get_or_create_public_user_by_email(conn, email: str):
normalized_email = normalize_public_email(email)
if not normalized_email:
raise HTTPException(status_code=400, detail="Ugyldig e-postadresse.")
existing = await find_public_user_by_email(conn, normalized_email)
if existing:
return existing
return await conn.fetchrow(
"""
INSERT INTO public_users (
email, display_name
) VALUES (
$1, $2
)
RETURNING *
""",
normalized_email,
normalized_email.split("@")[0] or "TeeOff-leser",
)
async def get_authenticated_public_user(request: Request) -> dict[str, Any] | None:
token = request.cookies.get(PUBLIC_SESSION_COOKIE)
if not token:
return None
try:
payload = decode_public_session_token(token)
except HTTPException:
return None
user_id = int(payload["uid"])
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM public_users WHERE id = $1", user_id)
if not row:
return None
user = format_public_user_row(row)
if user and user["is_blocked"]:
raise HTTPException(status_code=403, detail="Brukeren er blokkert fra å kommentere.")
return user
async def require_authenticated_public_user(request: Request) -> dict[str, Any]:
user = await get_authenticated_public_user(request)
if not user:
raise HTTPException(status_code=401, detail="Du må logge inn for å kommentere.")
return user
async def find_published_article_by_slug(conn, slug: str, section: str | None = None):
if section:
return await conn.fetchrow(
"""
SELECT *
FROM articles
WHERE slug = $1 AND status = 'published' AND section = $2
""",
slug,
section,
)
return await conn.fetchrow(
"""
SELECT *
FROM articles
WHERE slug = $1 AND status = 'published'
""",
slug,
)
async def fetch_facility_slugs(conn, facility_ids: list[int]) -> list[str]:
unique_ids = sorted({int(facility_id) for facility_id in facility_ids if facility_id})
if not unique_ids:
return []
rows = await conn.fetch(
"SELECT slug FROM facilities WHERE id = ANY($1::int[])",
unique_ids,
)
return [
str(row["slug"]).strip()
for row in rows
if row.get("slug") and str(row["slug"]).strip()
]
ARTICLE_IMAGE_PATTERN = re.compile(r"<img\b[^>]*\bsrc=['\"]([^'\"]+)['\"]", re.IGNORECASE)
def extract_html_image_urls(html: str | None) -> list[str]:
urls: list[str] = []
for url in ARTICLE_IMAGE_PATTERN.findall(html or ""):
if not isinstance(url, str) or not url.strip():
continue
urls.append(url.strip())
deduped: dict[str, None] = {}
for url in urls:
deduped[url] = None
return list(deduped.keys())
async def queue_scrape_job(job_type: str, facility_ids: List[int], requested_by: str | None = None):
if job_type not in SCRAPE_JOB_TYPES:
raise HTTPException(status_code=400, detail=f"Ugyldig jobbtype: {job_type}")
if not facility_ids:
raise HTTPException(status_code=400, detail="Ingen anleggs-IDer ble oppgitt.")
requested_ids = sorted({int(facility_id) for facility_id in facility_ids if str(facility_id).strip()})
job, status = await enqueue_scrape_job(app.state.pool, job_type, requested_ids, requested_by=requested_by)
was_created = status == "queued"
overlapping_ids = sorted({int(facility_id) for facility_id in (job.get("overlapping_facility_ids") or [])})
available_ids = [facility_id for facility_id in requested_ids if facility_id not in overlapping_ids]
message = (
f"{job_type.capitalize()}-skraping for {len(job['facility_ids'])} anlegg ble lagt i kø."
if was_created
else f"Fant allerede en aktiv {job_type}-jobb for samme anlegg."
)
if status == "conflict":
message = (
f"Kunne ikke legge jobben i kø fordi {len(overlapping_ids)} valgt"
f" anlegg allerede inngår i en aktiv {job_type}-jobb."
)
return {
"status": status,
"message": message,
"job": job,
"conflicting_facility_ids": overlapping_ids,
"idle_facility_ids": available_ids,
}
async def ensure_facility_columns(conn):
"""Legger til nye facility-kolonner ved behov."""
await conn.execute("""
ALTER TABLE facilities
ADD COLUMN IF NOT EXISTS footnote_updated_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS golfamore_url TEXT,
ADD COLUMN IF NOT EXISTS golfpakker_url TEXT,
ADD COLUMN IF NOT EXISTS golfpakker_draft JSONB,
ADD COLUMN IF NOT EXISTS golfpakker_updated_at TIMESTAMPTZ
""")
async def ensure_articles_table(conn):
await conn.execute("""
CREATE TABLE IF NOT EXISTS articles (
id SERIAL PRIMARY KEY,
section VARCHAR(32) NOT NULL DEFAULT 'banebesok',
slug VARCHAR(255) UNIQUE NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT,
excerpt TEXT,
eyebrow VARCHAR(120) DEFAULT 'Banebesøk',
location_label VARCHAR(255),
facility_name VARCHAR(255),
facility_slug VARCHAR(255),
author_name VARCHAR(255),
status VARCHAR(32) NOT NULL DEFAULT 'draft',
hero_images JSONB NOT NULL DEFAULT '[]'::jsonb,
media_gallery JSONB NOT NULL DEFAULT '[]'::jsonb,
featured_media_id VARCHAR(255),
content_html TEXT,
source_url TEXT,
source_label VARCHAR(255),
published_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
await conn.execute("""
ALTER TABLE articles
ADD COLUMN IF NOT EXISTS section VARCHAR(32) NOT NULL DEFAULT 'banebesok'
""")
await conn.execute("""
ALTER TABLE articles
ADD COLUMN IF NOT EXISTS media_gallery JSONB NOT NULL DEFAULT '[]'::jsonb
""")
await conn.execute("""
ALTER TABLE articles
ADD COLUMN IF NOT EXISTS featured_media_id VARCHAR(255)
""")
await conn.execute("""
UPDATE articles
SET section = 'banebesok'
WHERE section IS NULL OR TRIM(section) = ''
""")
await conn.execute("CREATE INDEX IF NOT EXISTS articles_status_idx ON articles (status)")
await conn.execute("CREATE INDEX IF NOT EXISTS articles_section_idx ON articles (section)")
await conn.execute("CREATE INDEX IF NOT EXISTS articles_published_at_idx ON articles (published_at DESC)")
async def ensure_public_user_tables(conn):
await conn.execute("""
CREATE TABLE IF NOT EXISTS public_users (
id SERIAL PRIMARY KEY,
vipps_sub VARCHAR(255) UNIQUE,
google_sub VARCHAR(255) UNIQUE,
full_name VARCHAR(255),
given_name VARCHAR(255),
family_name VARCHAR(255),
email VARCHAR(255),
phone_number VARCHAR(64),
display_name VARCHAR(255),
is_blocked BOOLEAN NOT NULL DEFAULT FALSE,
email_verified_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
last_login_at TIMESTAMPTZ
)
""")
await conn.execute("""
ALTER TABLE public_users
ALTER COLUMN vipps_sub DROP NOT NULL
""")
await conn.execute("""
ALTER TABLE public_users
ADD COLUMN IF NOT EXISTS google_sub VARCHAR(255)
""")
await conn.execute("""
ALTER TABLE public_users
ADD COLUMN IF NOT EXISTS email_verified_at TIMESTAMPTZ
""")
await conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS public_users_google_sub_idx
ON public_users (google_sub)
WHERE google_sub IS NOT NULL
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS public_users_email_lower_idx
ON public_users (LOWER(email))
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS article_comments (
id SERIAL PRIMARY KEY,
article_id INTEGER NOT NULL REFERENCES articles(id) ON DELETE CASCADE,
user_id INTEGER NOT NULL REFERENCES public_users(id) ON DELETE CASCADE,
parent_id INTEGER REFERENCES article_comments(id) ON DELETE SET NULL,
body TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
ip_hash VARCHAR(128),
user_agent TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS article_comments_article_idx
ON article_comments (article_id, created_at ASC)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS article_comments_user_idx
ON article_comments (user_id, created_at DESC)
""")
await conn.execute("""
CREATE TABLE IF NOT EXISTS public_magic_links (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES public_users(id) ON DELETE CASCADE,
token_hash VARCHAR(128) NOT NULL UNIQUE,
requested_ip_hash VARCHAR(128),
user_agent TEXT,
expires_at TIMESTAMPTZ NOT NULL,
consumed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""")
await conn.execute("""
CREATE INDEX IF NOT EXISTS public_magic_links_user_idx
ON public_magic_links (user_id, created_at DESC)
""")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Opprett database-pool ved start
try:
print("📡 Forsøker å koble til database")
app.state.pool = await asyncpg.create_pool(
DB_URL,
min_size=5,
max_size=20,
command_timeout=60
)
async with app.state.pool.acquire() as conn:
await ensure_facility_columns(conn)
await ensure_articles_table(conn)
await ensure_public_user_tables(conn)
await ensure_scrape_jobs_table(conn)
await ensure_weather_forecast_table(conn)
app.state.weather_sync_stop_event = asyncio.Event()
app.state.weather_sync_task = asyncio.create_task(
weather_sync_loop(app.state.pool, app.state.weather_sync_stop_event)
)
app.state.contact_submission_tracker = {}
print("✅ Database tilkoblet og pool opprettet")
except Exception as e:
print(f"❌ Databasefeil under oppstart: {e}")
raise e
yield
# Lukk pool ved avslutning
weather_stop_event = getattr(app.state, "weather_sync_stop_event", None)
weather_sync_task = getattr(app.state, "weather_sync_task", None)
if weather_stop_event is not None:
weather_stop_event.set()
if weather_sync_task is not None:
await weather_sync_task
await app.state.pool.close()
app = FastAPI(title="TeeOff API v3.8.0", lifespan=lifespan)
# CORS - Tillater både lokal utvikling og produksjonsdomene
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://nye.teeoff.no",
"http://nye.teeoff.no",
"http://localhost:3000"
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def require_admin_session_for_admin_routes(request: Request, call_next):
if request.url.path.startswith("/api/admin"):
token = request.cookies.get("admin_session")
if not token:
return JSONResponse(status_code=401, content={"detail": "Admin-innlogging kreves"})
try:
username = await validate_admin_session_token(token)
except HTTPException as exc:
return JSONResponse(status_code=exc.status_code, content={"detail": exc.detail})
request.state.admin_username = username
return await call_next(request)
# --- AUTH ENDPOINTS ---
@app.post("/api/auth/login")
async def login(data: dict):
"""Steg 1: Sjekk passord og returner temp_token for 2FA."""
remember_me = bool(data.get("remember_me") or data.get("rememberMe"))
async with app.state.pool.acquire() as conn:
admin = await conn.fetchrow(
"SELECT * FROM admins WHERE username = $1 OR email = $1",
data.get('username')
)
if not admin:
raise HTTPException(status_code=401, detail="Ugyldig brukernavn eller passord")
try:
is_valid = pwd_context.verify(data.get('password'), admin['password_hash'])
except Exception as e:
print("❌ Kunne ikke verifisere admin-passord")
raise HTTPException(status_code=500, detail="Internt problem med passord-format")
if not is_valid:
raise HTTPException(status_code=401, detail="Ugyldig brukernavn eller passord")
temp_token = jwt.encode(
{
"sub": admin['username'],
"partial": True,
"remember_me": remember_me,
"exp": datetime.utcnow() + timedelta(minutes=5),
},
SECRET_KEY, algorithm=ALGORITHM
)
return {"step": "2fa", "temp_token": temp_token}
@app.post("/api/auth/verify-2fa")
async def verify_2fa(data: dict, response: Response, request: Request):
"""Steg 2: Verifiser TOTP-kode og sett session cookie."""
try:
payload = jwt.decode(data.get('temp_token'), SECRET_KEY, algorithms=[ALGORITHM])
if not payload.get("partial"):
raise JWTError()
username = payload.get("sub")
remember_me = bool(payload.get("remember_me"))
except JWTError:
raise HTTPException(status_code=401, detail="Sesjonen har utløpt eller er ugyldig")
async with app.state.pool.acquire() as conn:
admin = await conn.fetchrow("SELECT otp_secret FROM admins WHERE username = $1", username)
totp = pyotp.TOTP(admin['otp_secret'])
if not totp.verify(data.get('code')):
print("❌ Ugyldig 2FA-kode ved admin-innlogging")
raise HTTPException(status_code=401, detail="Feil 2FA-kode")
session_max_age = (
ADMIN_REMEMBER_ME_MAX_AGE_SECONDS
if remember_me
else ADMIN_SESSION_MAX_AGE_SECONDS
)
final_token = jwt.encode(
{"sub": username, "exp": datetime.utcnow() + timedelta(seconds=session_max_age)},
SECRET_KEY, algorithm=ALGORITHM
)
# Sett som HTTP-only cookie
response.set_cookie(
key="admin_session",
value=final_token,
max_age=session_max_age,
expires=session_max_age,
httponly=True,
samesite="lax",
secure=should_use_secure_cookies(request),
)
return {"status": "success"}
@app.post("/api/auth/logout")
async def logout(response: Response, request: Request):
"""Logger ut admin ved å slette sesjonscookien."""
response.delete_cookie(
key="admin_session",
httponly=True,
samesite="lax",
secure=should_use_secure_cookies(request),
)
return {"status": "success"}
@app.get("/api/public/me")
async def get_public_me(request: Request):
user = await get_authenticated_public_user(request)
return {
"auth_configured": get_public_auth_config()["configured"],
"auth_providers": get_public_auth_config(),
"user": user,
}
@app.get("/api/public/auth/google/start")
async def google_login_start(request: Request, return_to: Optional[str] = Query(default="/")):
if not is_google_login_configured():
raise HTTPException(status_code=503, detail="Google-login er ikke konfigurert ennå.")
discovery = await get_google_discovery_document()
authorization_endpoint = str(discovery.get("authorization_endpoint") or "").strip()
if not authorization_endpoint:
raise HTTPException(status_code=500, detail="Fant ikke Google authorization endpoint.")
state = secrets.token_urlsafe(24)
redirect_uri = build_google_redirect_uri(request)
sanitized_return_to = normalize_return_to_path(return_to)
secure_cookies = should_use_secure_cookies(request)
query = urlencode(
{
"client_id": GOOGLE_CLIENT_ID,
"response_type": "code",
"scope": GOOGLE_OAUTH_SCOPES,
"state": state,
"redirect_uri": redirect_uri,
"prompt": "select_account",
}
)
response = Response(status_code=302)
response.headers["Location"] = f"{authorization_endpoint}?{query}"
response.set_cookie(
key="google_login_state",
value=state,
httponly=True,
samesite="lax",
secure=secure_cookies,
max_age=600,
)
response.set_cookie(
key="google_login_return_to",
value=sanitized_return_to,
httponly=True,
samesite="lax",
secure=secure_cookies,
max_age=600,
)
return response
@app.get("/api/public/auth/google/callback")
async def google_login_callback(
request: Request,
code: Optional[str] = Query(default=None),
state: Optional[str] = Query(default=None),
error: Optional[str] = Query(default=None),
):
return_to = normalize_return_to_path(request.cookies.get("google_login_return_to"))
cookie_state = request.cookies.get("google_login_state")
secure_cookies = should_use_secure_cookies(request)
redirect_response = Response(status_code=302)
redirect_response.delete_cookie("google_login_state", httponly=True, samesite="lax", secure=secure_cookies)
redirect_response.delete_cookie("google_login_return_to", httponly=True, samesite="lax", secure=secure_cookies)
if error:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_cancelled")
return redirect_response
if not code or not state or not cookie_state or state != cookie_state:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_invalid_state")
return redirect_response
if not is_google_login_configured():
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_not_configured")
return redirect_response
discovery = await get_google_discovery_document()
token_endpoint = str(discovery.get("token_endpoint") or "").strip()
userinfo_endpoint = str(discovery.get("userinfo_endpoint") or "").strip()
if not token_endpoint or not userinfo_endpoint:
raise HTTPException(status_code=500, detail="Google discovery mangler token/userinfo endpoint.")
redirect_uri = build_google_redirect_uri(request)
async with httpx.AsyncClient(timeout=20.0) as client:
token_response = await client.post(
token_endpoint,
auth=httpx.BasicAuth(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET),
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
},
headers={
"Content-Type": "application/x-www-form-urlencoded",
},
)
if token_response.status_code >= 400:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_token_error")
return redirect_response
token_payload = token_response.json()
access_token = str(token_payload.get("access_token") or "").strip()
if not access_token:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_token_error")
return redirect_response
userinfo_response = await client.get(
userinfo_endpoint,
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
)
if userinfo_response.status_code >= 400:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_userinfo_error")
return redirect_response
profile = userinfo_response.json()
async with app.state.pool.acquire() as conn:
user_row = await upsert_public_user_from_google_profile(conn, profile)
user = format_public_user_row(user_row)
if user and user["is_blocked"]:
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "blocked")
return redirect_response
redirect_response.set_cookie(
key=PUBLIC_SESSION_COOKIE,
value=create_public_session_token(int(user["id"])),
httponly=True,
samesite="lax",
secure=secure_cookies,
max_age=PUBLIC_SESSION_MAX_AGE_SECONDS,
)
redirect_response.headers["Location"] = append_query_param(return_to, "comment_auth", "google_success")
return redirect_response
@app.post("/api/public/auth/magic-link/request")
async def request_magic_link(request: Request, payload: PublicMagicLinkRequest):
if not is_magic_link_configured():
raise HTTPException(status_code=503, detail="Magic link er ikke konfigurert ennå.")
email = normalize_public_email(payload.email)
if not email or "@" not in email:
raise HTTPException(status_code=400, detail="Oppgi en gyldig e-postadresse.")
return_to = normalize_return_to_path(payload.return_to)
token = secrets.token_urlsafe(32)
token_hash = hash_magic_link_token(token)
expires_at = datetime.utcnow() + timedelta(minutes=PUBLIC_MAGIC_LINK_MAX_AGE_MINUTES)
async with app.state.pool.acquire() as conn:
user_row = await get_or_create_public_user_by_email(conn, email)
user = format_public_user_row(user_row)
if user and user["is_blocked"]:
return {
"status": "success",
"detail": "Hvis adressen kan brukes til innlogging, sender vi deg en lenke nå.",
}
recent_threshold = datetime.utcnow() - timedelta(seconds=PUBLIC_MAGIC_LINK_REQUEST_COOLDOWN_SECONDS)
recent = await conn.fetchrow(
"""
SELECT id
FROM public_magic_links
WHERE user_id = $1 AND created_at >= $2
ORDER BY created_at DESC
LIMIT 1
""",
int(user_row["id"]),
recent_threshold,
)
if recent:
return {
"status": "success",
"detail": "Hvis adressen kan brukes til innlogging, sender vi deg en lenke nå.",
}
await conn.execute(
"""
INSERT INTO public_magic_links (
user_id, token_hash, requested_ip_hash, user_agent, expires_at
) VALUES (
$1, $2, $3, $4, $5
)
""",
int(user_row["id"]),
token_hash,
hash_request_ip(request),
request.headers.get("user-agent"),
expires_at,
)
login_url = (
f"{build_public_base_url(request)}/api/public/auth/magic-link/verify?"
f"{urlencode({'token': token, 'return_to': return_to})}"
)
await send_magic_link_email(email, login_url)
return {
"status": "success",
"detail": "Hvis adressen kan brukes til innlogging, sender vi deg en lenke nå.",
}
@app.post("/api/public/contact")
async def submit_public_contact_form(request: Request, payload: PublicContactFormRequest):
if not is_contact_form_configured():
raise HTTPException(status_code=503, detail="Kontaktskjema er ikke konfigurert ennå.")
if str(payload.website or "").strip():
return {
"status": "success",
"detail": "Takk for meldingen. Vi svarer så snart vi kan.",
}
name = str(payload.name or "").strip()
email = normalize_public_email(payload.email)
topic = str(payload.topic or "").strip()
message = str(payload.message or "").strip()
if len(name) < 2 or len(name) > 120:
raise HTTPException(status_code=400, detail="Oppgi et gyldig navn.")
if not email or "@" not in email or len(email) > 255:
raise HTTPException(status_code=400, detail="Oppgi en gyldig e-postadresse.")
if len(topic) < 2 or len(topic) > 140:
raise HTTPException(status_code=400, detail="Oppgi et gyldig emne.")
if len(message) < 20 or len(message) > 5000:
raise HTTPException(status_code=400, detail="Meldingen må være mellom 20 og 5000 tegn.")
now_ts = int(datetime.utcnow().timestamp())
if payload.started_at and now_ts - int(payload.started_at) < CONTACT_FORM_MIN_FILL_SECONDS:
return {
"status": "success",
"detail": "Takk for meldingen. Vi svarer så snart vi kan.",
}
ip_hash = hash_request_ip(request)
tracker: dict[str, list[int]] = getattr(app.state, "contact_submission_tracker", {})
cutoff = now_ts - CONTACT_FORM_RATE_LIMIT_WINDOW_SECONDS
for key in list(tracker.keys()):
recent = [ts for ts in tracker.get(key, []) if ts >= cutoff]
if recent:
tracker[key] = recent
else:
tracker.pop(key, None)
rate_keys = [f"email:{email}"]
if ip_hash:
rate_keys.append(f"ip:{ip_hash}")
for key in rate_keys:
attempts = tracker.get(key, [])
if len(attempts) >= CONTACT_FORM_RATE_LIMIT_MAX_SUBMISSIONS:
raise HTTPException(
status_code=429,
detail="For mange meldinger på kort tid. Prøv igjen senere.",
)
await send_contact_form_email(
sender_name=name,
sender_email=email,
topic=topic,
message=message,
ip_hash=ip_hash,
)
for key in rate_keys:
tracker.setdefault(key, []).append(now_ts)
app.state.contact_submission_tracker = tracker
return {
"status": "success",
"detail": "Takk for meldingen. Vi svarer så snart vi kan.",
}
@app.post("/api/public/facility-feedback")
async def submit_public_facility_feedback(request: Request, payload: PublicFacilityFeedbackRequest):
if not is_contact_form_configured():
raise HTTPException(status_code=503, detail="Skjemaet er ikke konfigurert ennå.")
if str(payload.website or "").strip():
return {
"status": "success",
"detail": "Takk for meldingen. Vi ser på innspillet så snart vi kan.",
}
name = str(payload.name or "").strip()
email = normalize_public_email(payload.email)
message = str(payload.message or "").strip()
if len(name) < 2 or len(name) > 120:
raise HTTPException(status_code=400, detail="Oppgi et gyldig navn.")
if not email or "@" not in email or len(email) > 255:
raise HTTPException(status_code=400, detail="Oppgi en gyldig e-postadresse.")
if len(message) < 20 or len(message) > 5000:
raise HTTPException(status_code=400, detail="Meldingen må være mellom 20 og 5000 tegn.")
now_ts = int(datetime.utcnow().timestamp())
if payload.started_at and now_ts - int(payload.started_at) < CONTACT_FORM_MIN_FILL_SECONDS:
return {
"status": "success",
"detail": "Takk for meldingen. Vi ser på innspillet så snart vi kan.",
}
async with app.state.pool.acquire() as conn:
facility = await conn.fetchrow(
"SELECT id, name, slug FROM facilities WHERE id = $1",
payload.facility_id,
)
if not facility:
raise HTTPException(status_code=404, detail="Golfanlegget ble ikke funnet.")
ip_hash = hash_request_ip(request)
tracker: dict[str, list[int]] = getattr(app.state, "contact_submission_tracker", {})
cutoff = now_ts - CONTACT_FORM_RATE_LIMIT_WINDOW_SECONDS
for key in list(tracker.keys()):
recent = [ts for ts in tracker.get(key, []) if ts >= cutoff]
if recent:
tracker[key] = recent
else:
tracker.pop(key, None)
rate_keys = [f"email:{email}"]
if ip_hash:
rate_keys.append(f"ip:{ip_hash}")
for key in rate_keys:
attempts = tracker.get(key, [])
if len(attempts) >= CONTACT_FORM_RATE_LIMIT_MAX_SUBMISSIONS:
raise HTTPException(
status_code=429,
detail="For mange meldinger på kort tid. Prøv igjen senere.",
)
await send_facility_feedback_email(
facility_name=str(facility["name"] or "").strip() or "ukjent golfanlegg",
facility_slug=str(facility["slug"] or "").strip(),
sender_name=name,
sender_email=email,
message=message,
ip_hash=ip_hash,
)
for key in rate_keys:
tracker.setdefault(key, []).append(now_ts)
app.state.contact_submission_tracker = tracker
return {
"status": "success",
"detail": "Takk for meldingen. Vi ser på innspillet så snart vi kan.",
}
@app.get("/api/public/auth/magic-link/verify")
async def verify_magic_link(
request: Request,
token: Optional[str] = Query(default=None),
return_to: Optional[str] = Query(default="/"),
):
secure_cookies = should_use_secure_cookies(request)
redirect_target = normalize_return_to_path(return_to)
if not token:
response = Response(status_code=302)
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "magic_invalid")
return response
token_hash = hash_magic_link_token(token)
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT
ml.id AS magic_link_id,
ml.user_id AS magic_link_user_id,
ml.expires_at AS magic_link_expires_at,
ml.consumed_at AS magic_link_consumed_at,
u.*
FROM public_magic_links ml
JOIN public_users u ON u.id = ml.user_id
WHERE ml.token_hash = $1
LIMIT 1
""",
token_hash,
)
response = Response(status_code=302)
if not row:
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "magic_invalid")
return response
if row["magic_link_consumed_at"] is not None:
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "magic_invalid")
return response
expires_at = row["magic_link_expires_at"]
now = datetime.now(expires_at.tzinfo) if getattr(expires_at, "tzinfo", None) else datetime.utcnow()
if expires_at <= now:
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "magic_expired")
return response
user = format_public_user_row(row)
if user and user["is_blocked"]:
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "blocked")
return response
await conn.execute(
"""
UPDATE public_magic_links
SET consumed_at = NOW()
WHERE id = $1
""",
row["magic_link_id"],
)
await conn.execute(
"""
UPDATE public_users
SET
email_verified_at = COALESCE(email_verified_at, NOW()),
updated_at = NOW(),
last_login_at = NOW()
WHERE id = $1
""",
row["magic_link_user_id"],
)
response.set_cookie(
key=PUBLIC_SESSION_COOKIE,
value=create_public_session_token(int(row["magic_link_user_id"])),
httponly=True,
samesite="lax",
secure=secure_cookies,
max_age=PUBLIC_SESSION_MAX_AGE_SECONDS,
)
response.headers["Location"] = append_query_param(redirect_target, "comment_auth", "magic_success")
return response
@app.get("/api/public/auth/providers")
async def get_public_auth_providers():
return get_public_auth_config()
@app.post("/api/public/auth/logout")
async def public_logout(request: Request, response: Response):
response.delete_cookie(
key=PUBLIC_SESSION_COOKIE,
httponly=True,
samesite="lax",
secure=should_use_secure_cookies(request),
)
return {"status": "success"}
# --- DATA ENDPOINTS ---
@app.get("/api/facilities")
async def get_facilities():
"""Henter alle golfanlegg med aggregert banestatus for forsiden."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT f.*, (
SELECT jsonb_agg(cs) FROM (
SELECT id, name, status FROM courses
WHERE facility_id = f.id AND status != 'finnes_ingen_bane_to'
ORDER BY is_main_course DESC, id ASC
) cs
) as course_statuses, (
SELECT jsonb_agg(w_data ORDER BY w_data.day_offset ASC) FROM (
SELECT
forecast_date,
day_offset,
dry_all_day,
dry_daylight,
precip_mm,
precip_probability_max,
daylight_precip_mm,
daylight_precip_probability_max,
confidence,
source_updated_at,
source_expires_at,
calculated_at
FROM facility_weather_forecast
WHERE facility_id = f.id
ORDER BY day_offset ASC
) w_data
) as weather_forecast
FROM facilities f
ORDER BY f.name ASC
""")
return [format_row(row) for row in rows]
@app.get("/api/facilities/{slug}")
async def get_facility(slug: str):
"""Henter detaljer for ett spesifikt golfanlegg inkludert alle baner og hull."""
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT f.*, (
SELECT jsonb_agg(c_data) FROM (
SELECT c.*, (
SELECT jsonb_agg(h_data ORDER BY h_data.hole_number ASC)
FROM (SELECT * FROM holes WHERE course_id = c.id) h_data
) as holes
FROM courses c
WHERE c.facility_id = f.id
AND (c.is_main_course = true OR (c.status NOT IN ('finnes_ingen_bane_to', 'ukjent')))
ORDER BY c.is_main_course DESC, c.id ASC
) c_data
) as courses, (
SELECT jsonb_agg(w_data ORDER BY w_data.day_offset ASC) FROM (
SELECT
forecast_date,
day_offset,
dry_all_day,
dry_daylight,
precip_mm,
precip_probability_max,
daylight_precip_mm,
daylight_precip_probability_max,
confidence,
source_updated_at,
source_expires_at,
calculated_at
FROM facility_weather_forecast
WHERE facility_id = f.id
ORDER BY day_offset ASC
) w_data
) as weather_forecast
FROM facilities f WHERE f.slug = $1
""", slug)
if not row:
raise HTTPException(status_code=404, detail="Golfanlegget ble ikke funnet")
return format_row(row)
@app.get("/api/course-visits")
async def get_course_visits():
"""Henter publiserte Banebesøk-artikler."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT *
FROM articles
WHERE status = 'published' AND section = 'banebesok'
ORDER BY COALESCE(published_at, created_at) DESC, id DESC
""")
return [format_article_row(row) for row in rows]
@app.get("/api/course-visits/{slug}")
async def get_course_visit(slug: str):
"""Henter én publisert Banebesøk-artikkel."""
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT *
FROM articles
WHERE slug = $1 AND status = 'published' AND section = 'banebesok'
""", slug)
if not row:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
return format_article_row(row)
@app.get("/api/articles")
async def get_articles(section: Optional[str] = Query(default="all")):
"""Henter publiserte artikler, valgfritt filtrert på seksjon."""
normalized_section = normalize_article_section(section, allow_all=True)
query = """
SELECT *
FROM articles
WHERE status = 'published'
{section_clause}
ORDER BY COALESCE(published_at, created_at) DESC, id DESC
"""
async with app.state.pool.acquire() as conn:
if normalized_section == "all":
rows = await conn.fetch(query.format(section_clause=""))
else:
rows = await conn.fetch(
query.format(section_clause="AND section = $1"),
normalized_section,
)
return [format_article_row(row) for row in rows]
@app.get("/api/articles/{slug}")
async def get_article(slug: str, section: Optional[str] = Query(default="all")):
"""Henter én publisert artikkel, valgfritt filtrert på seksjon."""
normalized_section = normalize_article_section(section, allow_all=True)
query = """
SELECT *
FROM articles
WHERE slug = $1 AND status = 'published'
{section_clause}
"""
async with app.state.pool.acquire() as conn:
if normalized_section == "all":
row = await conn.fetchrow(query.format(section_clause=""), slug)
else:
row = await conn.fetchrow(
query.format(section_clause="AND section = $2"),
slug,
normalized_section,
)
if not row:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
return format_article_row(row)
@app.get("/api/articles/{slug}/comments")
async def get_article_comments(
request: Request,
slug: str,
section: Optional[str] = Query(default=None),
):
normalized_section = normalize_article_section(section) if section else None
viewer = await get_authenticated_public_user(request)
async with app.state.pool.acquire() as conn:
article = await find_published_article_by_slug(conn, slug, normalized_section)
if not article:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
params: list[Any] = [article["id"]]
visibility_clause = "c.status = 'published'"
if viewer:
params.append(viewer["id"])
visibility_clause = "(c.status = 'published' OR (c.status = 'pending' AND c.user_id = $2))"
rows = await conn.fetch(
f"""
SELECT
c.*,
u.display_name,
u.full_name,
u.given_name
FROM article_comments c
JOIN public_users u ON u.id = c.user_id
WHERE c.article_id = $1
AND {visibility_clause}
ORDER BY c.created_at ASC
""",
*params,
)
return {
"auth_configured": get_public_auth_config()["configured"],
"auth_providers": get_public_auth_config(),
"viewer": viewer,
"comments": [format_comment_row(row) for row in rows],
}
@app.post("/api/articles/{slug}/comments")
async def create_article_comment(
request: Request,
payload: PublicCommentCreateRequest,
slug: str,
section: Optional[str] = Query(default=None),
):
viewer = await require_authenticated_public_user(request)
normalized_section = normalize_article_section(section) if section else None
body = str(payload.body or "").strip()
parent_id = payload.parent_id
if len(body) < 3:
raise HTTPException(status_code=400, detail="Kommentaren må være minst 3 tegn.")
if len(body) > 4000:
raise HTTPException(status_code=400, detail="Kommentaren er for lang.")
if parent_id is not None and parent_id <= 0:
raise HTTPException(status_code=400, detail="Ugyldig kommentar å svare på.")
section_label = "Meninger"
parent_author_name: str | None = None
async with app.state.pool.acquire() as conn:
article = await find_published_article_by_slug(conn, slug, normalized_section)
if not article:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
section_label = "Banebesøk" if article["section"] == "banebesok" else "Meninger"
if parent_id is not None:
parent_row = await conn.fetchrow(
"""
SELECT
c.*,
u.display_name,
u.full_name
FROM article_comments c
JOIN public_users u ON u.id = c.user_id
WHERE c.id = $1
AND c.article_id = $2
AND c.status != 'deleted'
LIMIT 1
""",
parent_id,
article["id"],
)
if not parent_row:
raise HTTPException(status_code=404, detail="Fant ikke kommentaren du prøver å svare på.")
parent_author_name = (
parent_row.get("display_name")
or parent_row.get("full_name")
or "TeeOff-leser"
)
row = await conn.fetchrow(
"""
INSERT INTO article_comments (
article_id, user_id, parent_id, body, status, ip_hash, user_agent
) VALUES (
$1, $2, $3, $4, $5, $6, $7
)
RETURNING *
""",
article["id"],
viewer["id"],
parent_id,
body,
PUBLIC_COMMENT_DEFAULT_STATUS,
hash_request_ip(request),
request.headers.get("user-agent"),
)
try:
article_url = f"{build_public_base_url(request)}/{article['section']}/{article['slug']}"
await send_comment_notification_email(
article_title=str(article["title"] or article["slug"]),
article_url=article_url,
article_section=section_label,
comment_body=body,
comment_status=PUBLIC_COMMENT_DEFAULT_STATUS,
commenter_name=str(viewer.get("display_name") or viewer.get("full_name") or "TeeOff-leser"),
commenter_email=(str(viewer.get("email")).strip() if viewer.get("email") else None),
parent_author_name=parent_author_name,
)
except Exception as exc:
print(f"Kunne ikke sende kommentarvarsel: {exc}")
return {
"status": "success",
"detail": (
"Kommentaren er publisert."
if PUBLIC_COMMENT_DEFAULT_STATUS == "published"
else "Kommentaren er mottatt og venter på godkjenning."
),
"comment": format_comment_row(row),
}
# --- ADMIN ENDPOINTS ---
@app.get("/api/admin/articles")
async def get_admin_articles(status: Optional[str] = Query(default="all")):
"""Henter artikler for admin med valgfritt statusfilter."""
normalized_status = str(status or "all").strip().lower()
if normalized_status not in {"all", "draft", "published"}:
raise HTTPException(status_code=400, detail="Ugyldig statusfilter")
query = """
SELECT *
FROM articles
{where_clause}
ORDER BY COALESCE(published_at, created_at) DESC, updated_at DESC, id DESC
"""
async with app.state.pool.acquire() as conn:
if normalized_status == "all":
rows = await conn.fetch(query.format(where_clause=""))
else:
rows = await conn.fetch(
query.format(where_clause="WHERE status = $1"),
normalized_status,
)
return [format_article_row(row) for row in rows]
@app.get("/api/admin/articles/{article_id}")
async def get_admin_article(article_id: int):
async with app.state.pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM articles WHERE id = $1", article_id)
if not row:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
return format_article_row(row)
@app.post("/api/admin/articles")
async def upsert_admin_article(request: ArticleUpsertRequest):
section = normalize_article_section(request.section)
status = normalize_article_status(request.status)
requested_slug = request.slug.strip()
published_at = parse_optional_datetime(request.published_at)
updated_at = parse_optional_datetime(request.updated_at) or datetime.utcnow()
if status == "published" and not published_at:
published_at = datetime.utcnow()
fallback_hero_images = sanitize_hero_images(request.hero_images)
media_gallery = sanitize_article_media(request.media_gallery, request.title.strip())
if not media_gallery and fallback_hero_images:
media_gallery = build_media_gallery_from_hero_images(fallback_hero_images)
featured_media_id = sanitize_featured_media_id(request.featured_media_id, media_gallery)
hero_images = build_hero_images_from_media_gallery(media_gallery, fallback_hero_images, featured_media_id)
async with app.state.pool.acquire() as conn:
previous_row = await conn.fetchrow(
"SELECT slug, section, status FROM articles WHERE slug = $1",
requested_slug,
)
row = await conn.fetchrow("""
INSERT INTO articles (
section, slug, title, description, excerpt, eyebrow, location_label,
facility_name, facility_slug, author_name, status, hero_images,
media_gallery, featured_media_id, content_html, source_url, source_label, published_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11, $12::jsonb,
$13::jsonb, $14, $15, $16, $17, $18, $19
)
ON CONFLICT (slug) DO UPDATE SET
section = EXCLUDED.section,
title = EXCLUDED.title,
description = EXCLUDED.description,
excerpt = EXCLUDED.excerpt,
eyebrow = EXCLUDED.eyebrow,
location_label = EXCLUDED.location_label,
facility_name = EXCLUDED.facility_name,
facility_slug = EXCLUDED.facility_slug,
author_name = EXCLUDED.author_name,
status = EXCLUDED.status,
hero_images = EXCLUDED.hero_images,
media_gallery = EXCLUDED.media_gallery,
featured_media_id = EXCLUDED.featured_media_id,
content_html = EXCLUDED.content_html,
source_url = EXCLUDED.source_url,
source_label = EXCLUDED.source_label,
published_at = EXCLUDED.published_at,
updated_at = EXCLUDED.updated_at
RETURNING *
""",
section,
requested_slug,
request.title.strip(),
(request.description or "").strip() or None,
(request.excerpt or "").strip() or None,
(request.eyebrow or "Banebesøk").strip(),
(request.location_label or "").strip() or None,
(request.facility_name or "").strip() or None,
(request.facility_slug or "").strip() or None,
(request.author_name or "TeeOff").strip(),
status,
json.dumps(hero_images),
json.dumps(media_gallery),
featured_media_id,
request.content_html or "",
(request.source_url or "").strip() or None,
(request.source_label or "").strip() or None,
published_at,
updated_at,
)
saved_article = format_article_row(row)
previous_article = format_article_row(previous_row) if previous_row else None
schedule_indexnow_submission(
collect_article_indexnow_urls(previous_article=previous_article, current_article=saved_article),
reason="admin article upsert",
)
return saved_article
@app.delete("/api/admin/articles/{article_id}")
async def delete_admin_article(article_id: int):
async with app.state.pool.acquire() as conn:
deleted = await conn.fetchrow(
"DELETE FROM articles WHERE id = $1 RETURNING slug, section, status",
article_id,
)
if not deleted:
raise HTTPException(status_code=404, detail="Artikkelen ble ikke funnet")
deleted_article = format_article_row(deleted)
schedule_indexnow_submission(
collect_article_indexnow_urls(previous_article=deleted_article),
reason="admin article delete",
)
return {"status": "success"}
@app.post("/api/admin/articles/seed-imported")
async def seed_admin_articles_from_imported_json():
imported_path = resolve_imported_meninger_path()
try:
imported_articles = json.loads(imported_path.read_text(encoding="utf-8"))
except Exception as exc:
raise HTTPException(status_code=500, detail="Kunne ikke lese importedMeninger.json") from exc
async with app.state.pool.acquire() as conn:
facility_rows = await conn.fetch("SELECT slug, name, county FROM facilities")
facility_lookup = {
str(row["slug"]): {
"name": row["name"],
"county": row["county"],
}
for row in facility_rows
}
upserted_count = 0
submitted_urls: list[str] = []
async with conn.transaction():
for item in imported_articles:
facility_slug = item.get("primaryFacilitySlug") or ((item.get("facilitySlugs") or [None])[0])
facility = facility_lookup.get(str(facility_slug), {})
content_html = str(item.get("contentHtml") or "")
featured_image = item.get("featuredImage") or {}
section, eyebrow = resolve_imported_article_section(item)
media_gallery: list[dict[str, str]] = []
featured_url = str(featured_image.get("url") or "").strip()
if featured_url:
media_gallery.append(
{
"id": build_article_media_id("image", featured_url),
"type": "image",
"src": featured_url,
"alt": str(featured_image.get("alt") or item.get("title") or "").strip(),
"caption": str(featured_image.get("caption") or item.get("title") or "").strip(),
"poster": "",
}
)
for url in extract_html_image_urls(content_html)[:5]:
if any(existing["src"] == url for existing in media_gallery):
continue
media_gallery.append(
{
"id": build_article_media_id("image", url),
"type": "image",
"src": url,
"alt": str(item.get("title") or "").strip(),
"caption": str(item.get("title") or "").strip(),
"poster": "",
}
)
sanitized_media_gallery = sanitize_article_media(media_gallery, str(item.get("title") or "").strip())
featured_media_id = sanitize_featured_media_id(
sanitized_media_gallery[0]["id"] if sanitized_media_gallery else None,
sanitized_media_gallery,
)
hero_images = build_hero_images_from_media_gallery(
sanitized_media_gallery,
[],
featured_media_id,
)
published_at = parse_optional_datetime(item.get("publishedAt"))
updated_at = parse_optional_datetime(item.get("updatedAt")) or published_at or datetime.utcnow()
await conn.execute("""
INSERT INTO articles (
section, slug, title, description, excerpt, eyebrow, location_label,
facility_name, facility_slug, author_name, status, hero_images,
media_gallery, featured_media_id, content_html, source_url, source_label, published_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, 'published', $11::jsonb,
$12::jsonb, $13, $14, $15, $16, $17, $18
)
ON CONFLICT (slug) DO UPDATE SET
section = EXCLUDED.section,
title = EXCLUDED.title,
description = EXCLUDED.description,
excerpt = EXCLUDED.excerpt,
eyebrow = EXCLUDED.eyebrow,
location_label = EXCLUDED.location_label,
facility_name = EXCLUDED.facility_name,
facility_slug = EXCLUDED.facility_slug,
author_name = EXCLUDED.author_name,
status = EXCLUDED.status,
hero_images = EXCLUDED.hero_images,
media_gallery = EXCLUDED.media_gallery,
featured_media_id = EXCLUDED.featured_media_id,
content_html = EXCLUDED.content_html,
source_url = EXCLUDED.source_url,
source_label = EXCLUDED.source_label,
published_at = EXCLUDED.published_at,
updated_at = EXCLUDED.updated_at
""",
section,
str(item.get("slug") or "").strip(),
str(item.get("title") or "").strip(),
str(item.get("excerpt") or "").strip() or None,
str(item.get("excerpt") or "").strip() or None,
eyebrow,
str(facility.get("county") or "Norge") if facility_slug else "Norge",
str(facility.get("name") or humanize_slug(str(facility_slug))) if facility_slug else None,
str(facility_slug) if facility_slug else None,
str(((item.get("author") or {}).get("name")) or "TeeOff"),
json.dumps(hero_images),
json.dumps(sanitized_media_gallery),
featured_media_id,
content_html,
str(item.get("link") or "").strip() or None,
"Importert fra gamle TeeOff",
published_at,
updated_at,
)
upserted_count += 1
article_url = build_article_public_url(section, str(item.get("slug") or "").strip())
if article_url:
submitted_urls.append(article_url)
section_url = build_absolute_public_url(f"/{section}")
if section_url:
submitted_urls.append(section_url)
schedule_indexnow_submission(dedupe_strings(submitted_urls), reason="admin article seed import")
return {"status": "success", "count": upserted_count}
@app.patch("/api/admin/facilities/{facility_id}/scrape-settings")
async def update_scrape_settings(facility_id: int, settings: ScrapeSettingsUpdate):
"""Oppdaterer hvordan et anlegg skal skrapes (f.eks. slå på Gemini AI eller bytte URL)."""
async with app.state.pool.acquire() as conn:
try:
# Sjekk først at anlegget eksisterer
facility = await conn.fetchrow("SELECT id FROM facilities WHERE id = $1", facility_id)
if not facility:
raise HTTPException(status_code=404, detail="Anlegget finnes ikke.")
# Oppdater verdiene i databasen inkludert AI instruks
await conn.execute("""
UPDATE facilities
SET scrape_method = $1,
scrape_status_url = $2,
scrape_status_selector = $3,
ai_instruction = $4
WHERE id = $5
""",
settings.scrape_method,
settings.scrape_status_url,
settings.scrape_status_selector,
settings.ai_instruction,
facility_id)
# Hvis metoden er manuell, tvinger vi gjennom de nye banestatusene direkte
if settings.scrape_method == 'manual' and settings.courses:
for c in settings.courses:
await conn.execute("UPDATE courses SET status = $1 WHERE id = $2", c.status, c.id)
return {"status": "success", "message": f"Skrapeinnstillinger for anlegg ID {facility_id} ble oppdatert."}
except Exception as e:
if isinstance(e, HTTPException):
raise e
raise HTTPException(status_code=500, detail="Kunne ikke oppdatere skrapeinnstillingene")
# --- NYTT ADMIN ENDPOINT FOR FULL OPPDATERING (JSON-EDITOR) ---
@app.put("/api/admin/facilities/{facility_id}/full")
async def update_facility_full(facility_id: int, request: Request):
"""Dynamisk endpoint som oppdaterer anlegg, baner og hull (den fulle editoren)."""
data = await request.json()
legacy_field_aliases = {
'vtg_presentasjon': 'vtg_beskrivelse',
'vtg_kursdatoer': 'vtg_datoer',
}
for legacy_field, canonical_field in legacy_field_aliases.items():
if legacy_field in data and canonical_field not in data:
data[canonical_field] = data[legacy_field]
# Felter som er trygge å oppdatere manuelt på anlegget
allowed_fields = [
'name', 'description', 'established_year', 'season', 'banetype', 'architect', 'length_meters',
'address', 'zipcode', 'city', 'county', 'lat', 'lng',
'email', 'phone', 'website_url', 'golfbox_booking_url', 'golfbox_tournament_url',
'weather_url', 'webcam_url', 'video_url', 'baneguide_url', 'flyfoto_url',
'amenities', 'greenfee', 'golfpakker', 'rabattert_greenfee',
'nsg_url', 'nsg_data', 'golfamore', 'golfamore_url', 'golfamore_data',
'navn_standard_medlemskap', 'standard_medlemskap', 'standard_medlemskap_kommentarer',
'navn_rimeligste_alternativ', 'rimeligste_alternativ', 'medlemskap_url',
'vtg_beskrivelse', 'vtg_lenke', 'vtg_pris', 'vtg_datoer',
'guest_requirements', 'scrape_method', 'scrape_status_url',
'social_links', 'footnote', 'cooperating_clubs', 'membership_draft', 'membership_updated_at',
'greenfee_url', 'golfpakker_url', 'greenfee_draft', 'greenfee_updated_at', 'scrape_status_selector',
'vtg_updated_at', 'vtg_draft', 'footnote_updated_at',
'golfpakker_draft', 'golfpakker_updated_at'
]
update_data = {k: v for k, v in data.items() if k in allowed_fields}
membership_fields = {
'navn_standard_medlemskap', 'standard_medlemskap', 'standard_medlemskap_kommentarer',
'navn_rimeligste_alternativ', 'rimeligste_alternativ', 'medlemskap_url', 'membership_updated_at'
}
vtg_fields = {'vtg_beskrivelse', 'vtg_lenke', 'vtg_pris', 'vtg_datoer', 'vtg_updated_at'}
changed_field_names = set(update_data.keys())
facility_slug = ""
async with app.state.pool.acquire() as conn:
async with conn.transaction(): # Sikrer at alt lagres samlet
facility_slug = str(
await conn.fetchval("SELECT slug FROM facilities WHERE id = $1", facility_id) or ""
).strip()
facility_columns = await get_table_columns(conn, "facilities")
update_data = {k: v for k, v in update_data.items() if k in facility_columns}
# 1. OPPDATER ANLEGG (FACILITIES)
if update_data:
if 'footnote' in update_data and 'footnote_updated_at' not in update_data:
existing_footnote = await conn.fetchval(
"SELECT footnote FROM facilities WHERE id = $1",
facility_id
)
incoming_footnote = str(update_data.get('footnote') or '').strip()
current_footnote = str(existing_footnote or '').strip()
if incoming_footnote != current_footnote:
update_data['footnote_updated_at'] = datetime.utcnow() if incoming_footnote else None
set_clauses = []
values = []
# Definer hvilke felt som er datoer i databasen
date_fields = [
'membership_updated_at',
'greenfee_updated_at',
'vtg_updated_at',
'status_updated_at',
'footnote_updated_at',
'golfpakker_updated_at'
]
for i, (k, v) in enumerate(update_data.items(), 1):
if isinstance(v, (dict, list)):
set_clauses.append(f"{k} = ${i}::jsonb")
values.append(json.dumps(v))
elif k in date_fields:
set_clauses.append(f"{k} = ${i}")
# Håndter tomme datoer og konverter til Python datetime
if v == "" or v is None:
values.append(None)
else:
# Tving strengen over til et ekte datetime-objekt.
# .replace() håndterer Next.js' "Z"-format.
dt_str = str(v).replace("Z", "+00:00")
try:
dt_obj = datetime.fromisoformat(dt_str)
values.append(dt_obj)
except ValueError:
values.append(None)
else:
set_clauses.append(f"{k} = ${i}")
values.append(v)
values.append(facility_id)
query = f"UPDATE facilities SET {', '.join(set_clauses)} WHERE id = ${len(values)}"
await conn.execute(query, *values)
# 2. OPPDATER BANER (COURSES) OG HULL (HOLES)
courses = data.get('courses') or []
for course in courses:
if not course:
continue
course_id = course.get('id')
if course_id:
# Rens datoformat for PostgreSQL (håndterer Next.js date input)
valid_until_str = course.get('slope_valid_until')
if valid_until_str == "" or valid_until_str is None:
valid_until = None
else:
# Gjør om strengen til et ekte date-objekt for asyncpg
try:
date_part = valid_until_str.split('T')[0]
valid_until = datetime.strptime(date_part, "%Y-%m-%d").date()
except ValueError:
valid_until = None
await conn.execute("""
UPDATE courses
SET name=$1, par=$2, length_meters=$3, architect=$4,
status=$5, is_main_course=$6, tee_boxes=$7::jsonb,
slope_valid_until=$8
WHERE id=$9 AND facility_id=$10
""",
course.get('name'), course.get('par'), course.get('length_meters'),
course.get('architect'), course.get('status'), course.get('is_main_course'),
json.dumps(course.get('tee_boxes') or {}), valid_until, course_id, facility_id)
# 3. OPPDATER HULL PÅ BANEN (HOLES)
holes = course.get('holes') or []
for hole in holes:
if not hole:
continue
hole_id = hole.get('id')
if hole_id:
await conn.execute("""
UPDATE holes
SET par=$1, hcp_index=$2, lengths=$3::jsonb
WHERE id=$4 AND course_id=$5
""",
hole.get('par'), hole.get('hcp_index'),
json.dumps(hole.get('lengths') or {}), hole_id, course_id)
extra_paths = ["/golfbaner"]
if changed_field_names & membership_fields:
extra_paths.append("/medlemskap")
if changed_field_names & vtg_fields:
extra_paths.append("/vtg")
schedule_indexnow_submission(
collect_facility_indexnow_urls([facility_slug], extra_paths=extra_paths),
reason="facility full update",
)
return {"status": "success", "message": "Anlegg, baner og scorekort ble oppdatert."}
# --- NYTT ADMIN ENDPOINT: KJØRER SKRAPEREN FOR VALGTE IDER ---
@app.get("/api/admin/scrape-jobs")
async def get_scrape_jobs(job_type: Optional[str] = Query(default=None), limit: int = Query(default=10, ge=1, le=50)):
"""Henter siste scrape-jobber, evt. filtrert på type."""
if job_type and job_type not in SCRAPE_JOB_TYPES:
raise HTTPException(status_code=400, detail="Ugyldig jobbtype.")
return await list_scrape_jobs(app.state.pool, job_type=job_type, limit=limit)
@app.post("/api/admin/2fa/setup")
async def get_admin_2fa_setup(request: AdminPasswordConfirm, http_request: Request):
"""Verifiserer passord på nytt og returnerer TOTP-oppsett for 1Password/Authenticator."""
username = getattr(http_request.state, "admin_username", None)
if not username:
raise HTTPException(status_code=401, detail="Admin-innlogging kreves")
async with app.state.pool.acquire() as conn:
admin = await conn.fetchrow(
"SELECT username, email, password_hash, otp_secret FROM admins WHERE username = $1",
username,
)
if not admin:
raise HTTPException(status_code=404, detail="Admin-bruker ble ikke funnet")
try:
password_is_valid = pwd_context.verify(request.password, admin["password_hash"])
except Exception as exc:
raise HTTPException(status_code=500, detail="Kunne ikke verifisere passordet") from exc
if not password_is_valid:
raise HTTPException(status_code=401, detail="Feil passord")
if not admin["otp_secret"]:
raise HTTPException(status_code=400, detail="Denne kontoen har ikke 2FA-nøkkel ennå")
issuer_name = "TeeOff.no"
account_name = admin["email"] or admin["username"]
provisioning_uri = pyotp.TOTP(admin["otp_secret"]).provisioning_uri(
name=account_name,
issuer_name=issuer_name,
)
return {
"issuer": issuer_name,
"account_name": account_name,
"otp_secret": admin["otp_secret"],
"provisioning_uri": provisioning_uri,
"qr_svg": generate_totp_qr_svg(provisioning_uri),
}
@app.post("/api/admin/run-scraper")
async def run_scraper_endpoint(request: ScrapeRunRequest, http_request: Request):
"""Legger banestatus-skraping i en persistent jobbkø."""
print(f"📡 API mottok forespørsel om å kjøre banestatus-skraping for IDer: {request.facility_ids}")
return await queue_scrape_job("banestatus", request.facility_ids, requested_by=getattr(http_request.state, "admin_username", None))
@app.post("/api/admin/run-membership-scraper")
async def run_membership_scraper_endpoint(request: ScrapeRunRequest, http_request: Request):
"""Tar imot IDer for medlemskapsskraping og legger jobben i kø."""
print(f"📡 API mottok forespørsel om medlemskapsskraping for IDer: {request.facility_ids}")
return await queue_scrape_job("medlemskap", request.facility_ids, requested_by=getattr(http_request.state, "admin_username", None))
@app.get("/api/health")
async def health_check():
"""Enkel sjekk for å se at API og DB lever."""
try:
async with app.state.pool.acquire() as conn:
await conn.execute("SELECT 1")
return {"status": "healthy", "database": "connected"}
except Exception as e:
print(f"❌ Health check feilet: {type(e).__name__}")
return {"status": "unhealthy", "error": "database_unavailable"}
# --- MEDLEMSKAP "VASKERI" ENDEPUNKTER ---
@app.get("/api/admin/membership/drafts")
async def get_membership_drafts():
"""Henter alle anlegg som har et ventende forslag fra AI-skraperen."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, name, slug, medlemskap_url,
navn_standard_medlemskap, standard_medlemskap,
navn_rimeligste_alternativ, rimeligste_alternativ,
membership_draft
FROM facilities
WHERE membership_draft IS NOT NULL
AND membership_draft::text != '{}'
ORDER BY name ASC
""")
return [format_row(row) for row in rows]
@app.post("/api/admin/membership/approve-bulk")
async def approve_membership_bulk(request: BulkApprovalRequest):
"""Godkjenner AI-forslag, setter oppdatert-dato og sletter utkastet."""
facility_ids = [approval.facility_id for approval in request.approvals]
async with app.state.pool.acquire() as conn:
async with conn.transaction():
for approval in request.approvals:
await conn.execute("""
UPDATE facilities
SET navn_standard_medlemskap = $1,
standard_medlemskap = $2,
standard_medlemskap_kommentarer = $3,
navn_rimeligste_alternativ = $4,
rimeligste_alternativ = $5,
membership_updated_at = NOW(),
membership_draft = NULL
WHERE id = $6
""",
approval.navn_standard_medlemskap,
approval.standard_medlemskap,
approval.standard_medlemskap_kommentarer,
approval.navn_rimeligste_alternativ,
approval.rimeligste_alternativ,
approval.facility_id)
facility_slugs = await fetch_facility_slugs(conn, facility_ids)
schedule_indexnow_submission(
collect_facility_indexnow_urls(facility_slugs, extra_paths=["/medlemskap", "/golfbaner"]),
reason="membership bulk approval",
)
return {"status": "success", "message": f"{len(request.approvals)} anlegg ble oppdatert med nye priser!"}
@app.patch("/api/admin/facilities/{facility_id}/quick-edit")
async def quick_edit_facility(facility_id: int, request: QuickEditRequest):
"""Lyn-redigering av enkle URL-felter fra admin-dashbordet."""
# Sikkerhet: Tillat KUN disse URL-/tekstfeltene for hurtigredigering
allowed_fields = ['scrape_status_url', 'medlemskap_url', 'greenfee_url', 'golfpakker_url', 'vtg_lenke', 'scrape_status_selector', 'footnote', 'website_url']
if request.field not in allowed_fields:
raise HTTPException(status_code=400, detail="Ugyldig felt for hurtigredigering.")
async with app.state.pool.acquire() as conn:
facility_slug = str(
await conn.fetchval("SELECT slug FROM facilities WHERE id = $1", facility_id) or ""
).strip()
if request.field == 'footnote':
normalized_value = str(request.value or '').strip() or None
footnote_updated_at = datetime.utcnow() if normalized_value else None
await conn.execute(
"""
UPDATE facilities
SET footnote = $1,
footnote_updated_at = $2
WHERE id = $3
""",
normalized_value,
footnote_updated_at,
facility_id
)
else:
# F-string her er trygt fordi request.field er sjekket mot allowed_fields-listen
await conn.execute(f"UPDATE facilities SET {request.field} = $1 WHERE id = $2",
request.value, facility_id)
schedule_indexnow_submission(
collect_facility_indexnow_urls([facility_slug], extra_paths=["/golfbaner"]),
reason=f"facility quick edit ({request.field})",
)
return {"status": "success"}
# --- GREENFEE "VASKERI" ENDEPUNKTER ---
@app.get("/api/admin/greenfee/drafts")
async def get_greenfee_drafts():
"""Henter alle anlegg som har et ventende greenfee-forslag fra AI-skraperen."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, name, slug, greenfee_url, greenfee, greenfee_draft
FROM facilities
WHERE greenfee_draft IS NOT NULL
AND greenfee_draft::text != '{}'
ORDER BY name ASC
""")
return [format_row(row) for row in rows]
class BulkGreenfeeRequest(BaseModel):
approvals: List[GreenfeeApproval]
@app.post("/api/admin/greenfee/approve-bulk")
async def approve_greenfee_bulk(request: BulkGreenfeeRequest):
"""Godkjenner AI-forslag, setter oppdatert-dato og sletter utkastet."""
facility_ids = [approval.facility_id for approval in request.approvals]
async with app.state.pool.acquire() as conn:
async with conn.transaction():
facility_columns = await get_table_columns(conn, "facilities")
has_cooperating_clubs = "cooperating_clubs" in facility_columns
for approval in request.approvals:
draft_row = await conn.fetchrow(
"SELECT greenfee_draft FROM facilities WHERE id = $1",
approval.facility_id
)
draft_payload = format_row(draft_row) if draft_row else {}
draft_data = draft_payload.get("greenfee_draft", {}) if isinstance(draft_payload, dict) else {}
suggested_clubs = draft_data.get("foreslatt_avtaleklubber", []) if isinstance(draft_data, dict) else []
cooperating_club_slugs = await resolve_cooperating_club_slugs(
conn,
suggested_clubs if isinstance(suggested_clubs, list) else [],
exclude_facility_id=approval.facility_id,
)
if has_cooperating_clubs:
await conn.execute("""
UPDATE facilities
SET greenfee = $1::jsonb,
cooperating_clubs = CASE
WHEN $2::jsonb = '[]'::jsonb THEN cooperating_clubs
ELSE $2::jsonb
END,
greenfee_updated_at = NOW(),
greenfee_draft = NULL
WHERE id = $3
""", json.dumps(approval.greenfee), json.dumps(cooperating_club_slugs), approval.facility_id)
else:
await conn.execute("""
UPDATE facilities
SET greenfee = $1::jsonb,
greenfee_updated_at = NOW(),
greenfee_draft = NULL
WHERE id = $2
""", json.dumps(approval.greenfee), approval.facility_id)
facility_slugs = await fetch_facility_slugs(conn, facility_ids)
schedule_indexnow_submission(
collect_facility_indexnow_urls(facility_slugs, extra_paths=["/golfbaner"]),
reason="greenfee bulk approval",
)
return {"status": "success"}
@app.post("/api/admin/run-greenfee-scraper")
async def run_greenfee_scraper_endpoint(request: ScrapeRunRequest, http_request: Request):
"""Tar imot IDer for greenfeeskraping og legger jobben i kø."""
print(f"📡 API mottok forespørsel om greenfee-skraping for IDer: {request.facility_ids}")
return await queue_scrape_job("greenfee", request.facility_ids, requested_by=getattr(http_request.state, "admin_username", None))
# --- VEIEN TIL GOLF (VTG) "VASKERI" ENDEPUNKTER ---
@app.get("/api/admin/vtg/drafts")
async def get_vtg_drafts():
"""Henter alle anlegg som har et ventende VTG-forslag."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, name, slug, vtg_lenke, vtg_pris, vtg_beskrivelse, vtg_datoer, vtg_draft
FROM facilities
WHERE vtg_draft IS NOT NULL
AND vtg_draft::text != '{}'
ORDER BY name ASC
""")
return [format_row(row) for row in rows]
@app.post("/api/admin/vtg/approve-bulk")
async def approve_vtg_bulk(request: BulkVtgRequest):
"""Godkjenner AI-forslag for VTG, setter oppdatert-dato og sletter utkastet."""
facility_ids = [approval.facility_id for approval in request.approvals]
async with app.state.pool.acquire() as conn:
async with conn.transaction():
for approval in request.approvals:
datoer_json = json.dumps(approval.vtg_datoer) if approval.vtg_datoer is not None else '[]'
await conn.execute("""
UPDATE facilities
SET vtg_pris = $1,
vtg_beskrivelse = $2,
vtg_datoer = $3::jsonb,
vtg_updated_at = NOW(),
vtg_draft = NULL
WHERE id = $4
""", approval.vtg_pris, approval.vtg_beskrivelse, datoer_json, approval.facility_id)
facility_slugs = await fetch_facility_slugs(conn, facility_ids)
schedule_indexnow_submission(
collect_facility_indexnow_urls(facility_slugs, extra_paths=["/vtg", "/golfbaner"]),
reason="vtg bulk approval",
)
return {"status": "success"}
@app.post("/api/admin/run-vtg-scraper")
async def run_vtg_scraper_endpoint(request: ScrapeRunRequest, http_request: Request):
"""Tar imot IDer for VTG-skraping og legger jobben i kø."""
print(f"📡 API mottok forespørsel om VTG-skraping for IDer: {request.facility_ids}")
return await queue_scrape_job("vtg", request.facility_ids, requested_by=getattr(http_request.state, "admin_username", None))
@app.get("/api/admin/golfpakker/drafts")
async def get_golfpakker_drafts():
"""Henter alle anlegg som har et ventende golfpakke-forslag."""
async with app.state.pool.acquire() as conn:
rows = await conn.fetch("""
SELECT id, name, slug, website_url, golfpakker_url, golfpakker, golfpakker_draft
FROM facilities
WHERE golfpakker_draft IS NOT NULL
AND golfpakker_draft::text != '{}'
ORDER BY name ASC
""")
return [format_row(row) for row in rows]
@app.post("/api/admin/golfpakker/approve-bulk")
async def approve_golfpakker_bulk(request: BulkGolfpakkerRequest):
"""Godkjenner AI-forslag for golfpakker og sletter utkastet."""
facility_ids = [approval.facility_id for approval in request.approvals]
async with app.state.pool.acquire() as conn:
async with conn.transaction():
for approval in request.approvals:
await conn.execute("""
UPDATE facilities
SET golfpakker = $1::jsonb,
golfpakker_updated_at = NOW(),
golfpakker_draft = NULL
WHERE id = $2
""", json.dumps(approval.golfpakker), approval.facility_id)
facility_slugs = await fetch_facility_slugs(conn, facility_ids)
schedule_indexnow_submission(
collect_facility_indexnow_urls(facility_slugs, extra_paths=["/golfbaner"]),
reason="golfpakker bulk approval",
)
return {"status": "success"}
@app.post("/api/admin/run-golfpakker-scraper")
async def run_golfpakker_scraper_endpoint(request: ScrapeRunRequest, http_request: Request):
"""Tar imot IDer for golfpakkeskraping og legger jobben i kø."""
print(f"📡 API mottok forespørsel om golfpakkeskraping for IDer: {request.facility_ids}")
return await queue_scrape_job("golfpakker", request.facility_ids, requested_by=getattr(http_request.state, "admin_username", None))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)