Nye-TeeOff/scripts/mirror_wp_article_media.py

224 lines
7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import sys
import time
from pathlib import Path
from typing import Iterable
from urllib.error import HTTPError, URLError
from urllib.parse import quote, urlsplit, urlunsplit
from urllib.request import Request, urlopen
WP_UPLOAD_PATH_PATTERN = re.compile(
r"https?://(?:www\.)?(?:teeoff\.no|nye\.teeoff\.no|wp\.teeoff\.no)(/wp-content/uploads/[^\s\"'<>]+)",
re.IGNORECASE,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Mirror old TeeOff WordPress media from wp.teeoff.no into frontend/public/wp-content/uploads "
"so article media can be served locally."
)
)
parser.add_argument(
"--source-json",
default="frontend/src/content/importedMeninger.json",
help="Path to imported article JSON. Default: %(default)s",
)
parser.add_argument(
"--target-root",
default="frontend/public",
help="Root directory where mirrored files should be stored. Default: %(default)s",
)
parser.add_argument(
"--origin",
default="https://wp.teeoff.no",
help="Origin host to download legacy uploads from. Default: %(default)s",
)
parser.add_argument(
"--limit",
type=int,
default=0,
help="Only download the first N unique files. 0 means no limit.",
)
parser.add_argument(
"--sleep",
type=float,
default=0.2,
help="Optional pause in seconds between downloads.",
)
parser.add_argument(
"--retries",
type=int,
default=5,
help="How many times to retry a failed download, especially for HTTP 429. Default: %(default)s",
)
parser.add_argument(
"--retry-backoff",
type=float,
default=2.0,
help="Base seconds to wait before retrying after a rate limit. Default: %(default)s",
)
parser.add_argument(
"--refresh",
action="store_true",
help="Re-download files even if they already exist locally.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="List files that would be mirrored without downloading them.",
)
return parser.parse_args()
def extract_upload_paths(records: list[dict]) -> list[str]:
discovered: list[str] = []
seen: set[str] = set()
def add_path(candidate: str) -> None:
path = candidate.strip()
if not path.startswith("/wp-content/uploads/"):
return
if path in seen:
return
seen.add(path)
discovered.append(path)
for record in records:
featured_image = record.get("featuredImage") or {}
featured_url = str(featured_image.get("url") or "").strip()
featured_match = WP_UPLOAD_PATH_PATTERN.match(featured_url)
if featured_match:
add_path(featured_match.group(1))
content_html = str(record.get("contentHtml") or "")
for match in WP_UPLOAD_PATH_PATTERN.finditer(content_html):
add_path(match.group(1))
return discovered
def iter_downloads(paths: Iterable[str], origin: str, target_root: Path, refresh: bool) -> Iterable[tuple[str, str, Path]]:
base_origin = origin.rstrip("/")
for path in paths:
destination = target_root / path.lstrip("/")
if destination.exists() and not refresh:
continue
yield path, f"{base_origin}{path}", destination
def encode_url(url: str) -> str:
split = urlsplit(url)
encoded_path = quote(split.path, safe="/%")
encoded_query = quote(split.query, safe="=&%")
return urlunsplit((split.scheme, split.netloc, encoded_path, encoded_query, split.fragment))
def download_file(url: str, destination: Path, retries: int, retry_backoff: float) -> None:
encoded_url = encode_url(url)
request = Request(
encoded_url,
headers={
"User-Agent": "teeoff-media-mirror/1.0 (+https://teeoff.no)",
},
)
destination.parent.mkdir(parents=True, exist_ok=True)
attempt = 0
while True:
try:
with urlopen(request, timeout=60) as response:
data = response.read()
destination.write_bytes(data)
return
except HTTPError as exc:
if exc.code == 429 and attempt < retries:
retry_after = exc.headers.get("Retry-After")
delay = float(retry_after) if retry_after else retry_backoff * (attempt + 1)
time.sleep(delay)
attempt += 1
continue
raise
except URLError:
if attempt < retries:
time.sleep(retry_backoff * (attempt + 1))
attempt += 1
continue
raise
def main() -> int:
args = parse_args()
source_json = Path(args.source_json)
target_root = Path(args.target_root)
if not source_json.exists():
print(f"Fant ikke kildefilen: {source_json}", file=sys.stderr)
return 1
try:
records = json.loads(source_json.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
print(f"Kunne ikke lese JSON fra {source_json}: {exc}", file=sys.stderr)
return 1
if not isinstance(records, list):
print(f"Forventet en liste i {source_json}", file=sys.stderr)
return 1
unique_paths = extract_upload_paths(records)
if args.limit and args.limit > 0:
unique_paths = unique_paths[: args.limit]
print(f"Fant {len(unique_paths)} unike WordPress-uploadfiler.")
pending = list(iter_downloads(unique_paths, args.origin, target_root, args.refresh))
print(f"{len(pending)} filer trenger speiling til {target_root}.")
if args.dry_run:
for path, url, destination in pending:
print(f"DRY RUN {url} -> {destination}")
return 0
downloaded = 0
failed: list[tuple[str, str]] = []
for index, (path, url, destination) in enumerate(pending, start=1):
try:
download_file(url, destination, args.retries, args.retry_backoff)
downloaded += 1
print(f"[{index}/{len(pending)}] OK {path}")
except HTTPError as exc:
failed.append((path, f"HTTP {exc.code}"))
print(f"[{index}/{len(pending)}] FEIL {path} ({exc.code})", file=sys.stderr)
except URLError as exc:
failed.append((path, str(exc.reason)))
print(f"[{index}/{len(pending)}] FEIL {path} ({exc.reason})", file=sys.stderr)
except Exception as exc: # noqa: BLE001
failed.append((path, str(exc)))
print(f"[{index}/{len(pending)}] FEIL {path} ({exc})", file=sys.stderr)
if args.sleep > 0:
time.sleep(args.sleep)
print(f"Lastet ned {downloaded} filer.")
if failed:
print(f"{len(failed)} filer feilet:", file=sys.stderr)
for path, reason in failed:
print(f" - {path}: {reason}", file=sys.stderr)
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())