224 lines
7 KiB
Python
224 lines
7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.parse import quote, urlsplit, urlunsplit
|
|
from urllib.request import Request, urlopen
|
|
|
|
WP_UPLOAD_PATH_PATTERN = re.compile(
|
|
r"https?://(?:www\.)?(?:teeoff\.no|nye\.teeoff\.no|wp\.teeoff\.no)(/wp-content/uploads/[^\s\"'<>]+)",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"Mirror old TeeOff WordPress media from wp.teeoff.no into frontend/public/wp-content/uploads "
|
|
"so article media can be served locally."
|
|
)
|
|
)
|
|
parser.add_argument(
|
|
"--source-json",
|
|
default="frontend/src/content/importedMeninger.json",
|
|
help="Path to imported article JSON. Default: %(default)s",
|
|
)
|
|
parser.add_argument(
|
|
"--target-root",
|
|
default="frontend/public",
|
|
help="Root directory where mirrored files should be stored. Default: %(default)s",
|
|
)
|
|
parser.add_argument(
|
|
"--origin",
|
|
default="https://wp.teeoff.no",
|
|
help="Origin host to download legacy uploads from. Default: %(default)s",
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=0,
|
|
help="Only download the first N unique files. 0 means no limit.",
|
|
)
|
|
parser.add_argument(
|
|
"--sleep",
|
|
type=float,
|
|
default=0.2,
|
|
help="Optional pause in seconds between downloads.",
|
|
)
|
|
parser.add_argument(
|
|
"--retries",
|
|
type=int,
|
|
default=5,
|
|
help="How many times to retry a failed download, especially for HTTP 429. Default: %(default)s",
|
|
)
|
|
parser.add_argument(
|
|
"--retry-backoff",
|
|
type=float,
|
|
default=2.0,
|
|
help="Base seconds to wait before retrying after a rate limit. Default: %(default)s",
|
|
)
|
|
parser.add_argument(
|
|
"--refresh",
|
|
action="store_true",
|
|
help="Re-download files even if they already exist locally.",
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="List files that would be mirrored without downloading them.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def extract_upload_paths(records: list[dict]) -> list[str]:
|
|
discovered: list[str] = []
|
|
seen: set[str] = set()
|
|
|
|
def add_path(candidate: str) -> None:
|
|
path = candidate.strip()
|
|
if not path.startswith("/wp-content/uploads/"):
|
|
return
|
|
if path in seen:
|
|
return
|
|
seen.add(path)
|
|
discovered.append(path)
|
|
|
|
for record in records:
|
|
featured_image = record.get("featuredImage") or {}
|
|
featured_url = str(featured_image.get("url") or "").strip()
|
|
featured_match = WP_UPLOAD_PATH_PATTERN.match(featured_url)
|
|
if featured_match:
|
|
add_path(featured_match.group(1))
|
|
|
|
content_html = str(record.get("contentHtml") or "")
|
|
for match in WP_UPLOAD_PATH_PATTERN.finditer(content_html):
|
|
add_path(match.group(1))
|
|
|
|
return discovered
|
|
|
|
|
|
def iter_downloads(paths: Iterable[str], origin: str, target_root: Path, refresh: bool) -> Iterable[tuple[str, str, Path]]:
|
|
base_origin = origin.rstrip("/")
|
|
|
|
for path in paths:
|
|
destination = target_root / path.lstrip("/")
|
|
if destination.exists() and not refresh:
|
|
continue
|
|
yield path, f"{base_origin}{path}", destination
|
|
|
|
|
|
def encode_url(url: str) -> str:
|
|
split = urlsplit(url)
|
|
encoded_path = quote(split.path, safe="/%")
|
|
encoded_query = quote(split.query, safe="=&%")
|
|
return urlunsplit((split.scheme, split.netloc, encoded_path, encoded_query, split.fragment))
|
|
|
|
|
|
def download_file(url: str, destination: Path, retries: int, retry_backoff: float) -> None:
|
|
encoded_url = encode_url(url)
|
|
request = Request(
|
|
encoded_url,
|
|
headers={
|
|
"User-Agent": "teeoff-media-mirror/1.0 (+https://teeoff.no)",
|
|
},
|
|
)
|
|
destination.parent.mkdir(parents=True, exist_ok=True)
|
|
attempt = 0
|
|
|
|
while True:
|
|
try:
|
|
with urlopen(request, timeout=60) as response:
|
|
data = response.read()
|
|
destination.write_bytes(data)
|
|
return
|
|
except HTTPError as exc:
|
|
if exc.code == 429 and attempt < retries:
|
|
retry_after = exc.headers.get("Retry-After")
|
|
delay = float(retry_after) if retry_after else retry_backoff * (attempt + 1)
|
|
time.sleep(delay)
|
|
attempt += 1
|
|
continue
|
|
raise
|
|
except URLError:
|
|
if attempt < retries:
|
|
time.sleep(retry_backoff * (attempt + 1))
|
|
attempt += 1
|
|
continue
|
|
raise
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
source_json = Path(args.source_json)
|
|
target_root = Path(args.target_root)
|
|
|
|
if not source_json.exists():
|
|
print(f"Fant ikke kildefilen: {source_json}", file=sys.stderr)
|
|
return 1
|
|
|
|
try:
|
|
records = json.loads(source_json.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError as exc:
|
|
print(f"Kunne ikke lese JSON fra {source_json}: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
if not isinstance(records, list):
|
|
print(f"Forventet en liste i {source_json}", file=sys.stderr)
|
|
return 1
|
|
|
|
unique_paths = extract_upload_paths(records)
|
|
if args.limit and args.limit > 0:
|
|
unique_paths = unique_paths[: args.limit]
|
|
|
|
print(f"Fant {len(unique_paths)} unike WordPress-uploadfiler.")
|
|
|
|
pending = list(iter_downloads(unique_paths, args.origin, target_root, args.refresh))
|
|
print(f"{len(pending)} filer trenger speiling til {target_root}.")
|
|
|
|
if args.dry_run:
|
|
for path, url, destination in pending:
|
|
print(f"DRY RUN {url} -> {destination}")
|
|
return 0
|
|
|
|
downloaded = 0
|
|
failed: list[tuple[str, str]] = []
|
|
|
|
for index, (path, url, destination) in enumerate(pending, start=1):
|
|
try:
|
|
download_file(url, destination, args.retries, args.retry_backoff)
|
|
downloaded += 1
|
|
print(f"[{index}/{len(pending)}] OK {path}")
|
|
except HTTPError as exc:
|
|
failed.append((path, f"HTTP {exc.code}"))
|
|
print(f"[{index}/{len(pending)}] FEIL {path} ({exc.code})", file=sys.stderr)
|
|
except URLError as exc:
|
|
failed.append((path, str(exc.reason)))
|
|
print(f"[{index}/{len(pending)}] FEIL {path} ({exc.reason})", file=sys.stderr)
|
|
except Exception as exc: # noqa: BLE001
|
|
failed.append((path, str(exc)))
|
|
print(f"[{index}/{len(pending)}] FEIL {path} ({exc})", file=sys.stderr)
|
|
|
|
if args.sleep > 0:
|
|
time.sleep(args.sleep)
|
|
|
|
print(f"Lastet ned {downloaded} filer.")
|
|
|
|
if failed:
|
|
print(f"{len(failed)} filer feilet:", file=sys.stderr)
|
|
for path, reason in failed:
|
|
print(f" - {path}: {reason}", file=sys.stderr)
|
|
return 2
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|