Nye-TeeOff/backend/export_facility_contacts_csv.py

201 lines
5.9 KiB
Python

import argparse
import asyncio
import csv
import json
import re
import unicodedata
from pathlib import Path
import asyncpg
from env_config import get_database_url
DEFAULT_OUTPUT_PATH = Path(__file__).resolve().parent / "facility_contacts_export.csv"
DB_URL = get_database_url()
BASE_EXPORT_COLUMNS = [
"id",
"slug",
"name",
"address",
"zipcode",
"city",
"county",
"phone",
"email",
"website_url",
"golfbox_booking_url",
"golfbox_tournament_url",
"facebook_url",
"instagram_url",
]
OPTIONAL_SOURCE_COLUMNS = [
"social_links",
]
def normalize_platform_name(value: str | None) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = (
text.replace("&", " and ")
.replace("+", " plus ")
.replace("/", " ")
)
text = unicodedata.normalize("NFKD", text).encode("ascii", "ignore").decode("ascii")
text = re.sub(r"[^a-z0-9]+", "_", text)
text = re.sub(r"_+", "_", text).strip("_")
aliases = {
"x": "twitter",
"x_com": "twitter",
"twitter_x": "twitter",
"tik_tok": "tiktok",
"face_book": "facebook",
"fscebook": "facebook",
"you_tube": "youtube",
}
return aliases.get(text, text)
def parse_social_links(value) -> list[dict[str, str]]:
if value is None:
return []
if isinstance(value, str):
try:
value = json.loads(value)
except json.JSONDecodeError:
return []
if not isinstance(value, list):
return []
parsed: list[dict[str, str]] = []
for entry in value:
if not isinstance(entry, dict):
continue
platform = str(entry.get("platform") or "").strip()
url = str(entry.get("url") or "").strip()
if not platform or not url:
continue
parsed.append({"platform": platform, "url": url})
return parsed
def build_social_platform_map(social_links: list[dict[str, str]]) -> dict[str, list[str]]:
platform_map: dict[str, list[str]] = {}
for entry in social_links:
key = normalize_platform_name(entry.get("platform"))
url = str(entry.get("url") or "").strip()
if not key or not url:
continue
platform_map.setdefault(key, [])
if url not in platform_map[key]:
platform_map[key].append(url)
return platform_map
async def get_facilities_columns(conn: asyncpg.Connection) -> set[str]:
rows = await conn.fetch(
"""
SELECT column_name
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = 'facilities'
"""
)
return {str(row["column_name"]).strip() for row in rows}
async def fetch_facility_rows(conn: asyncpg.Connection) -> list[asyncpg.Record]:
available_columns = await get_facilities_columns(conn)
selected_columns = [
column
for column in [*BASE_EXPORT_COLUMNS, *OPTIONAL_SOURCE_COLUMNS]
if column in available_columns
]
if not selected_columns:
raise RuntimeError("Fant ingen eksportbare kolonner i facilities-tabellen.")
query = f"SELECT {', '.join(selected_columns)} FROM facilities ORDER BY name ASC"
return await conn.fetch(query)
def build_csv_rows(rows: list[asyncpg.Record]) -> tuple[list[str], list[dict[str, str]]]:
parsed_rows: list[dict[str, str]] = []
discovered_social_columns: set[str] = set()
for row in rows:
raw = dict(row)
social_links = parse_social_links(raw.get("social_links"))
social_map = build_social_platform_map(social_links)
facebook_urls = social_map.get("facebook", [])
instagram_urls = social_map.get("instagram", [])
facebook_url = str(raw.get("facebook_url") or "").strip() or " | ".join(facebook_urls)
instagram_url = str(raw.get("instagram_url") or "").strip() or " | ".join(instagram_urls)
csv_row: dict[str, str] = {}
for column in BASE_EXPORT_COLUMNS:
value = raw.get(column)
csv_row[column] = "" if value is None else str(value).strip()
csv_row["facebook_url"] = facebook_url
csv_row["instagram_url"] = instagram_url
csv_row["social_links_json"] = json.dumps(social_links, ensure_ascii=False)
for platform, urls in sorted(social_map.items()):
if platform in {"facebook", "instagram"}:
continue
column_name = f"social_{platform}_url"
discovered_social_columns.add(column_name)
csv_row[column_name] = " | ".join(urls)
parsed_rows.append(csv_row)
ordered_columns = [
*BASE_EXPORT_COLUMNS,
*sorted(discovered_social_columns),
"social_links_json",
]
return ordered_columns, parsed_rows
def write_csv(output_path: Path, fieldnames: list[str], rows: list[dict[str, str]]) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for row in rows:
writer.writerow(row)
async def run(output_path: Path) -> None:
conn = await asyncpg.connect(DB_URL)
try:
rows = await fetch_facility_rows(conn)
finally:
await conn.close()
fieldnames, csv_rows = build_csv_rows(rows)
write_csv(output_path, fieldnames, csv_rows)
print(f"Skrev {len(csv_rows)} rader til {output_path}")
print(f"Kolonner: {', '.join(fieldnames)}")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Eksporter kontaktfelter for alle klubber til CSV.")
parser.add_argument(
"--output",
type=Path,
default=DEFAULT_OUTPUT_PATH,
help=f"Sti til CSV-fil. Standard: {DEFAULT_OUTPUT_PATH}",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
asyncio.run(run(args.output))