From 03402cdfa0f4a0d27c3d2c1e661a57ac65120481 Mon Sep 17 00:00:00 2001 From: Coornhert Date: Mon, 30 Mar 2026 06:42:04 +0200 Subject: [PATCH] feat: dagelijkse sync pipeline + cron job - sync.py: vergelijkt SRU catalogus met lokale staat, verwerkt delta's - daily-sync.sh: wrapper script voor cron (lock, clone, pull, sync, push) - Cron job: dagelijks om 03:00 op dt-prod-01 - Forgejo API token in group_vars voor git push authenticatie Refs #9 --- ansible/group_vars/wetgit/main.yml | 5 +- ansible/roles/wetgit-forgejo/tasks/main.yml | 18 ++ .../wetgit-forgejo/templates/daily-sync.sh.j2 | 53 ++++ src/wetgit/pipeline/sync.py | 229 ++++++++++++++++++ 4 files changed, 304 insertions(+), 1 deletion(-) create mode 100644 ansible/roles/wetgit-forgejo/templates/daily-sync.sh.j2 create mode 100644 src/wetgit/pipeline/sync.py diff --git a/ansible/group_vars/wetgit/main.yml b/ansible/group_vars/wetgit/main.yml index 070cf2a..fe6ddf3 100644 --- a/ansible/group_vars/wetgit/main.yml +++ b/ansible/group_vars/wetgit/main.yml @@ -33,6 +33,9 @@ redis_host: "127.0.0.1" # --- Celery --- celery_concurrency: 2 +# --- Forgejo API --- +forgejo_api_token: "{{ vault_forgejo_api_token }}" + # --- Codeberg mirror --- codeberg_api_token: "{{ vault_codeberg_api_token | default('') }}" @@ -41,5 +44,5 @@ agentmail_api_key: "{{ vault_agentmail_api_key }}" # --- Secrets (from vault.yml) --- # vault_agentmail_api_key +# vault_forgejo_api_token # vault_codeberg_api_token (add when Codeberg account is ready) -# vault_forgejo_admin_password (initial admin password) diff --git a/ansible/roles/wetgit-forgejo/tasks/main.yml b/ansible/roles/wetgit-forgejo/tasks/main.yml index 5350af2..fee8a5f 100644 --- a/ansible/roles/wetgit-forgejo/tasks/main.yml +++ b/ansible/roles/wetgit-forgejo/tasks/main.yml @@ -99,6 +99,16 @@ group: wetgit mode: "0755" +# --- Daily sync script --- + +- name: Deploy daily sync script + template: + src: daily-sync.sh.j2 + dest: "{{ app_dir }}/scripts/daily-sync.sh" + owner: wetgit + group: wetgit + mode: "0755" + # --- Mirror script --- - name: Deploy Codeberg mirror script @@ -120,6 +130,14 @@ # --- Cron jobs --- +- name: Configure daily sync cron (03:00) + cron: + name: "wetgit-daily-sync" + user: wetgit + hour: "3" + minute: "0" + job: "{{ app_dir }}/scripts/daily-sync.sh >> {{ app_dir }}/logs/sync.log 2>&1" + - name: Configure backup cron (weekly Sunday 02:00) cron: name: "wetgit-backup" diff --git a/ansible/roles/wetgit-forgejo/templates/daily-sync.sh.j2 b/ansible/roles/wetgit-forgejo/templates/daily-sync.sh.j2 new file mode 100644 index 0000000..a1cfd50 --- /dev/null +++ b/ansible/roles/wetgit-forgejo/templates/daily-sync.sh.j2 @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +set -euo pipefail + +# WetGIT dagelijkse sync — managed by Ansible +# Haalt delta-updates op via SRU en commit naar wetgit/rijk. + +LOG_PREFIX="[$(date '+%Y-%m-%d %H:%M:%S')]" +SYNC_DIR="{{ app_dir }}/sync" +RIJK_REPO="$SYNC_DIR/rijk" +XML_CACHE="{{ data_dir }}/xml-cache" +META_REPO="$SYNC_DIR/meta" +LOCK_FILE="$SYNC_DIR/.sync.lock" + +# Voorkom parallelle runs +if [ -f "$LOCK_FILE" ]; then + pid=$(cat "$LOCK_FILE") + if kill -0 "$pid" 2>/dev/null; then + echo "$LOG_PREFIX Sync al actief (PID $pid), overslaan." + exit 0 + fi +fi +echo $$ > "$LOCK_FILE" +trap "rm -f $LOCK_FILE" EXIT + +# Zorg dat de repos gecloned zijn +mkdir -p "$SYNC_DIR" "$XML_CACHE" + +if [ ! -d "$RIJK_REPO/.git" ]; then + echo "$LOG_PREFIX Eerste run — clone wetgit/rijk..." + git clone https://{{ forgejo_admin_user }}:{{ forgejo_api_token }}@{{ forgejo_domain }}/wetgit/rijk.git "$RIJK_REPO" + cd "$RIJK_REPO" + git config user.name "Coornhert" + git config user.email "coornhert@wetgit.nl" +else + cd "$RIJK_REPO" + git pull --ff-only origin main 2>/dev/null || true +fi + +if [ ! -d "$META_REPO/.git" ]; then + echo "$LOG_PREFIX Eerste run — clone wetgit/meta..." + git clone https://{{ forgejo_admin_user }}:{{ forgejo_api_token }}@{{ forgejo_domain }}/wetgit/meta.git "$META_REPO" +fi + +cd "$META_REPO" +git pull --ff-only origin main 2>/dev/null || true + +echo "$LOG_PREFIX Start sync..." +PYTHONPATH="$META_REPO/src" python3 -m wetgit.pipeline.sync \ + --rijk-repo "$RIJK_REPO" \ + --xml-cache "$XML_CACHE" \ + --delay 0.15 + +echo "$LOG_PREFIX Sync voltooid." diff --git a/src/wetgit/pipeline/sync.py b/src/wetgit/pipeline/sync.py new file mode 100644 index 0000000..e9be8e1 --- /dev/null +++ b/src/wetgit/pipeline/sync.py @@ -0,0 +1,229 @@ +"""Dagelijkse sync — haalt delta-updates op via SRU en commit naar de rijk repo. + +Vergelijkt de SRU catalogus met de lokale staat en verwerkt alleen wijzigingen. +Draait als standalone script of als Celery-taak. + +Usage: + python -m wetgit.pipeline.sync --rijk-repo /path/to/rijk --xml-cache /path/to/cache +""" + +from __future__ import annotations + +import json +import logging +import subprocess +import time +from datetime import date, datetime +from pathlib import Path + +import httpx +from lxml import etree + +from wetgit.pipeline.bwb_parser import parse_bwb_xml +from wetgit.pipeline.downloader import download_xml +from wetgit.pipeline.sru_client import BWB_TYPES, SRURecord, fetch_catalogue + +logger = logging.getLogger(__name__) + +TYPE_TO_DIR = { + "wet": "wet", + "AMvB": "amvb", + "ministeriele-regeling": "ministeriele-regeling", + "KB": "kb", + "rijkswet": "rijkswet", + "verdrag": "verdrag", + "beleidsregel": "beleidsregel", + "circulaire": "circulaire", + "zbo": "zbo", + "pbo": "pbo", +} + + +def sync( + rijk_repo: Path, + xml_cache: Path, + types: list[str] | None = None, + delay: float = 0.15, + dry_run: bool = False, +) -> dict[str, int]: + """Synchroniseer de rijk repo met de laatste BWB stand. + + Args: + rijk_repo: Pad naar de lokale clone van wetgit/rijk. + xml_cache: Pad naar de XML cache directory. + types: Regelingtypen om te synchen (default: alle). + delay: Vertraging tussen downloads (sec). + dry_run: Als True, geen git commits. + + Returns: + Dict met statistieken. + """ + xml_cache.mkdir(parents=True, exist_ok=True) + types = types or BWB_TYPES + + stats = {"checked": 0, "new": 0, "updated": 0, "failed": 0, "unchanged": 0} + + # Stap 1: Haal de huidige catalogus op + logger.info("SRU catalogus ophalen...") + records = fetch_catalogue(types=types, delay=0.3) + logger.info("Catalogus: %d SRU records", len(records)) + + # Dedupliceer op BWB-ID (neem meest recente toestand) + seen: dict[str, SRURecord] = {} + for r in records: + seen[r.bwb_id] = r + unique = list(seen.values()) + logger.info("Na deduplicatie: %d unieke regelingen", len(unique)) + + # Stap 2: Vergelijk met lokale staat + changes: list[tuple[SRURecord, str]] = [] # (record, "new"|"updated") + + for record in unique: + stats["checked"] += 1 + type_dir = TYPE_TO_DIR.get(record.type, "overig") + slug = _slugify(record.titel) if record.titel else record.bwb_id.lower() + md_path = rijk_repo / type_dir / slug / record.bwb_id / "README.md" + + if not md_path.exists(): + changes.append((record, "new")) + else: + # Check of de XML URL veranderd is (nieuwe toestand) + # Download de XML en vergelijk met bestaande output + xml_path = xml_cache / f"{record.bwb_id}.xml" + new_xml = _download_fresh(record, xml_cache) + if new_xml is None: + continue + + try: + result = parse_bwb_xml(str(new_xml)) + existing = md_path.read_text(encoding="utf-8") + if result.markdown != existing: + changes.append((record, "updated")) + else: + stats["unchanged"] += 1 + except Exception as e: + logger.warning("Parse error bij vergelijking %s: %s", record.bwb_id, e) + stats["unchanged"] += 1 + + if stats["checked"] % 500 == 0: + logger.info("Gecontroleerd: %d/%d", stats["checked"], len(unique)) + + if delay > 0 and stats["checked"] % 10 == 0: + time.sleep(delay) + + logger.info("Wijzigingen gevonden: %d nieuw, %d gewijzigd", + sum(1 for _, t in changes if t == "new"), + sum(1 for _, t in changes if t == "updated")) + + if not changes: + logger.info("Geen wijzigingen — alles up-to-date.") + return stats + + # Stap 3: Verwerk wijzigingen + for record, change_type in changes: + xml_path = xml_cache / f"{record.bwb_id}.xml" + if not xml_path.exists(): + xml_path_result = download_xml(record, xml_cache) + if xml_path_result is None: + stats["failed"] += 1 + continue + + try: + result = parse_bwb_xml(str(xml_path)) + type_dir = TYPE_TO_DIR.get(record.type, "overig") + slug = _slugify(record.titel) if record.titel else record.bwb_id.lower() + regeling_dir = rijk_repo / type_dir / slug / record.bwb_id + regeling_dir.mkdir(parents=True, exist_ok=True) + (regeling_dir / "README.md").write_text(result.markdown, encoding="utf-8") + + if change_type == "new": + stats["new"] += 1 + else: + stats["updated"] += 1 + + except Exception as e: + logger.warning("Verwerking mislukt voor %s: %s", record.bwb_id, e) + stats["failed"] += 1 + + # Stap 4: Git commit + push + if not dry_run and (stats["new"] > 0 or stats["updated"] > 0): + _git_commit_and_push(rijk_repo, stats) + + logger.info( + "Sync klaar: %d nieuw, %d gewijzigd, %d ongewijzigd, %d mislukt", + stats["new"], stats["updated"], stats["unchanged"], stats["failed"], + ) + return stats + + +def _download_fresh(record: SRURecord, xml_cache: Path) -> Path | None: + """Download XML, overschrijf bestaande cache.""" + xml_path = xml_cache / f"{record.bwb_id}.xml" + try: + resp = httpx.get(record.xml_url, timeout=60, follow_redirects=True) + resp.raise_for_status() + xml_path.write_bytes(resp.content) + return xml_path + except httpx.HTTPError as e: + logger.warning("Download failed for %s: %s", record.bwb_id, e) + return None + + +def _git_commit_and_push(repo_path: Path, stats: dict[str, int]) -> None: + """Commit en push wijzigingen naar de remote.""" + today = date.today().isoformat() + msg = f"{today} | sync | {stats['new']} nieuw, {stats['updated']} gewijzigd" + + try: + subprocess.run(["git", "add", "-A"], cwd=repo_path, check=True, capture_output=True) + subprocess.run( + ["git", "commit", "-m", msg, + "--author", "Coornhert "], + cwd=repo_path, check=True, capture_output=True, + ) + subprocess.run( + ["git", "push", "origin", "main"], + cwd=repo_path, check=True, capture_output=True, timeout=300, + ) + logger.info("Git push succesvol: %s", msg) + except subprocess.CalledProcessError as e: + logger.error("Git operatie mislukt: %s", e.stderr.decode() if e.stderr else str(e)) + except subprocess.TimeoutExpired: + logger.error("Git push timeout (300s)") + + +def _slugify(text: str) -> str: + """Maak een URL-veilige slug van een titel.""" + import re + slug = text.lower().strip() + slug = re.sub(r"[^\w\s-]", "", slug) + slug = re.sub(r"[\s_]+", "-", slug) + slug = re.sub(r"-+", "-", slug) + return slug[:80].strip("-") + + +if __name__ == "__main__": + import argparse + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + parser = argparse.ArgumentParser(description="WetGit dagelijkse sync") + parser.add_argument("--rijk-repo", type=Path, required=True, help="Pad naar wetgit/rijk clone") + parser.add_argument("--xml-cache", type=Path, required=True, help="XML cache directory") + parser.add_argument("--type", action="append", dest="types", help="Regelingtype (herhaalbaar)") + parser.add_argument("--delay", type=float, default=0.15, help="Delay tussen downloads") + parser.add_argument("--dry-run", action="store_true", help="Geen git commits") + args = parser.parse_args() + + stats = sync( + rijk_repo=args.rijk_repo, + xml_cache=args.xml_cache, + types=args.types, + delay=args.delay, + dry_run=args.dry_run, + ) + print(json.dumps(stats, indent=2))