270 lines
9.2 KiB
Python
270 lines
9.2 KiB
Python
import argparse
|
|
import asyncio
|
|
import csv
|
|
import json
|
|
import re
|
|
import unicodedata
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
import asyncpg
|
|
|
|
from env_config import get_database_url
|
|
|
|
|
|
DEFAULT_CSV_PATH = Path("/opt/teeoff/Regneark uten navn - Ark 1.csv")
|
|
DB_URL = get_database_url()
|
|
|
|
SLUG_OVERRIDES = {
|
|
"Bodø Golfpark": "salten-golfklubb-bodo-golfpark",
|
|
"Hemsedal (IKKE ALPIN)": "hemsedal-golfklubb",
|
|
"Randsfjorden": "land-golfklubb",
|
|
"Romerike": "aurskog-golfpark",
|
|
"Steinkjær": "steinkjer-golfklubb",
|
|
"Tønsberg Re": "re-golfklubb",
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class FacilityMatch:
|
|
id: int
|
|
slug: str
|
|
name: str
|
|
normalized_blob: str
|
|
tokens: set[str]
|
|
|
|
|
|
def normalize_text(value: str) -> str:
|
|
source = (value or "").translate(
|
|
str.maketrans(
|
|
{
|
|
"æ": "ae",
|
|
"ø": "o",
|
|
"å": "a",
|
|
"Æ": "Ae",
|
|
"Ø": "O",
|
|
"Å": "A",
|
|
}
|
|
)
|
|
)
|
|
normalized = unicodedata.normalize("NFKD", source)
|
|
ascii_text = normalized.encode("ascii", "ignore").decode("ascii").lower()
|
|
ascii_text = re.sub(r"\([^)]*\)", " ", ascii_text)
|
|
ascii_text = ascii_text.replace("&", " and ")
|
|
ascii_text = re.sub(r"[^a-z0-9]+", " ", ascii_text)
|
|
return re.sub(r"\s+", " ", ascii_text).strip()
|
|
|
|
|
|
def tokenize(value: str) -> set[str]:
|
|
return {token for token in normalize_text(value).split() if token}
|
|
|
|
|
|
def clean_cell(value: str | None) -> str | None:
|
|
if value is None:
|
|
return None
|
|
cleaned = value.strip()
|
|
return cleaned or None
|
|
|
|
|
|
def build_social_links(facebook_url: str | None, instagram_url: str | None) -> list[dict[str, str]]:
|
|
social_links: list[dict[str, str]] = []
|
|
if facebook_url:
|
|
social_links.append({"platform": "Facebook", "url": facebook_url})
|
|
if instagram_url:
|
|
social_links.append({"platform": "Instagram", "url": instagram_url})
|
|
return social_links
|
|
|
|
|
|
def build_golfamore_data(description: str | None) -> dict[str, str]:
|
|
if not description:
|
|
return {}
|
|
return {"terms": description}
|
|
|
|
|
|
def select_facility(row_name: str, facilities: list[FacilityMatch]) -> FacilityMatch:
|
|
override_slug = SLUG_OVERRIDES.get(row_name)
|
|
if override_slug:
|
|
for facility in facilities:
|
|
if facility.slug == override_slug:
|
|
return facility
|
|
raise ValueError(f"Fant ikke override-slug '{override_slug}' for '{row_name}'.")
|
|
|
|
query_tokens = tokenize(row_name)
|
|
query_norm = normalize_text(row_name)
|
|
if not query_tokens:
|
|
raise ValueError(f"Tom eller ugyldig anleggsidentifikator: '{row_name}'.")
|
|
|
|
scored: list[tuple[int, FacilityMatch]] = []
|
|
for facility in facilities:
|
|
if not query_tokens.issubset(facility.tokens):
|
|
continue
|
|
|
|
score = 0
|
|
if query_norm == normalize_text(facility.slug):
|
|
score += 100
|
|
if query_norm == normalize_text(facility.name):
|
|
score += 100
|
|
if query_norm and query_norm in facility.normalized_blob:
|
|
score += 25
|
|
score += max(0, 20 - (len(facility.tokens) - len(query_tokens)))
|
|
scored.append((score, facility))
|
|
|
|
if not scored:
|
|
raise ValueError(f"Fant ingen facility-match for '{row_name}'.")
|
|
|
|
scored.sort(key=lambda item: (-item[0], item[1].name))
|
|
best_score = scored[0][0]
|
|
best_matches = [facility for score, facility in scored if score == best_score]
|
|
|
|
if len(best_matches) != 1:
|
|
options = ", ".join(f"{facility.name} ({facility.slug})" for facility in best_matches)
|
|
raise ValueError(f"Flertydig match for '{row_name}': {options}")
|
|
|
|
return best_matches[0]
|
|
|
|
|
|
async def fetch_facilities(conn: asyncpg.Connection) -> list[FacilityMatch]:
|
|
rows = await conn.fetch("SELECT id, slug, name FROM facilities")
|
|
facilities: list[FacilityMatch] = []
|
|
for row in rows:
|
|
blob = f"{row['name']} {row['slug']}"
|
|
facilities.append(
|
|
FacilityMatch(
|
|
id=row["id"],
|
|
slug=row["slug"],
|
|
name=row["name"],
|
|
normalized_blob=normalize_text(blob),
|
|
tokens=tokenize(blob),
|
|
)
|
|
)
|
|
return facilities
|
|
|
|
|
|
async def run_import(csv_path: Path, apply_changes: bool) -> None:
|
|
if not csv_path.exists():
|
|
raise FileNotFoundError(f"Fant ikke CSV-fil: {csv_path}")
|
|
|
|
with csv_path.open("r", encoding="utf-8-sig", newline="") as handle:
|
|
reader = csv.DictReader(handle)
|
|
rows = list(reader)
|
|
|
|
conn = await asyncpg.connect(DB_URL)
|
|
try:
|
|
facilities = await fetch_facilities(conn)
|
|
updates: list[dict[str, object]] = []
|
|
warnings: list[str] = []
|
|
|
|
for row in rows:
|
|
row_name = clean_cell(row.get("Anlegg"))
|
|
if not row_name:
|
|
continue
|
|
|
|
facility = select_facility(row_name, facilities)
|
|
facebook_url = clean_cell(row.get("Facebook"))
|
|
instagram_url = clean_cell(row.get("Instagram"))
|
|
golfamore_description = clean_cell(row.get("Golfamore beskrivelse"))
|
|
golfamore_url = clean_cell(row.get("Golfamore url"))
|
|
social_links = build_social_links(facebook_url, instagram_url)
|
|
golfamore_data = build_golfamore_data(golfamore_description)
|
|
|
|
if facebook_url and "facebook.com" not in facebook_url.lower():
|
|
warnings.append(f"{row_name}: Facebook-kolonnen peker ikke til facebook.com -> {facebook_url}")
|
|
if instagram_url and "instagram.com" not in instagram_url.lower():
|
|
warnings.append(f"{row_name}: Instagram-kolonnen peker ikke til instagram.com -> {instagram_url}")
|
|
|
|
updates.append(
|
|
{
|
|
"row_name": row_name,
|
|
"facility_id": facility.id,
|
|
"slug": facility.slug,
|
|
"name": facility.name,
|
|
"facebook_url": facebook_url,
|
|
"instagram_url": instagram_url,
|
|
"social_links": social_links,
|
|
"golfamore": bool(golfamore_description or golfamore_url),
|
|
"golfamore_url": golfamore_url,
|
|
"golfamore_data": golfamore_data,
|
|
}
|
|
)
|
|
|
|
print(f"Klar til å oppdatere {len(updates)} anlegg fra {csv_path}.")
|
|
for update in updates:
|
|
print(f"- {update['row_name']} -> {update['name']} ({update['slug']})")
|
|
|
|
if warnings:
|
|
print("\nAdvarsler:")
|
|
for warning in warnings:
|
|
print(f"- {warning}")
|
|
|
|
if not apply_changes:
|
|
print("\nDry-run fullført. Ingen data ble skrevet.")
|
|
return
|
|
|
|
async with conn.transaction():
|
|
for update in updates:
|
|
await conn.execute(
|
|
"""
|
|
UPDATE facilities
|
|
SET
|
|
facebook_url = $1,
|
|
instagram_url = $2,
|
|
social_links = $3::jsonb,
|
|
golfamore = $4,
|
|
golfamore_url = $5,
|
|
golfamore_data = $6::jsonb
|
|
WHERE id = $7
|
|
""",
|
|
update["facebook_url"],
|
|
update["instagram_url"],
|
|
json.dumps(update["social_links"]),
|
|
update["golfamore"],
|
|
update["golfamore_url"],
|
|
json.dumps(update["golfamore_data"]),
|
|
update["facility_id"],
|
|
)
|
|
|
|
await conn.execute(
|
|
"""
|
|
WITH cleaned AS (
|
|
SELECT
|
|
id,
|
|
COALESCE(
|
|
(
|
|
SELECT jsonb_agg(entry)
|
|
FROM jsonb_array_elements(COALESCE(social_links, '[]'::jsonb)) AS entry
|
|
WHERE NULLIF(BTRIM(entry->>'url'), '') IS NOT NULL
|
|
AND NULLIF(BTRIM(entry->>'platform'), '') IS NOT NULL
|
|
),
|
|
'[]'::jsonb
|
|
) AS social_links
|
|
FROM facilities
|
|
)
|
|
UPDATE facilities AS target
|
|
SET
|
|
facebook_url = NULLIF(BTRIM(target.facebook_url), ''),
|
|
instagram_url = NULLIF(BTRIM(target.instagram_url), ''),
|
|
social_links = cleaned.social_links
|
|
FROM cleaned
|
|
WHERE target.id = cleaned.id
|
|
"""
|
|
)
|
|
|
|
print("\nImport fullført.")
|
|
finally:
|
|
await conn.close()
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Importer sosiale medier og Golfamore-felter fra CSV.")
|
|
parser.add_argument("--csv", type=Path, default=DEFAULT_CSV_PATH, help="Sti til CSV-filen som skal importeres.")
|
|
parser.add_argument(
|
|
"--apply",
|
|
action="store_true",
|
|
help="Skriver endringene til databasen. Uten dette kjøres bare dry-run.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
arguments = parse_args()
|
|
asyncio.run(run_import(arguments.csv, arguments.apply))
|