feat: dagelijkse sync pipeline + cron job

- sync.py: vergelijkt SRU catalogus met lokale staat, verwerkt delta's
- daily-sync.sh: wrapper script voor cron (lock, clone, pull, sync, push)
- Cron job: dagelijks om 03:00 op dt-prod-01
- Forgejo API token in group_vars voor git push authenticatie

Refs #9
This commit is contained in:
Coornhert 2026-03-30 06:42:04 +02:00
parent 188f41c7ee
commit 03402cdfa0
4 changed files with 304 additions and 1 deletions

View file

@ -33,6 +33,9 @@ redis_host: "127.0.0.1"
# --- Celery ---
celery_concurrency: 2
# --- Forgejo API ---
forgejo_api_token: "{{ vault_forgejo_api_token }}"
# --- Codeberg mirror ---
codeberg_api_token: "{{ vault_codeberg_api_token | default('') }}"
@ -41,5 +44,5 @@ agentmail_api_key: "{{ vault_agentmail_api_key }}"
# --- Secrets (from vault.yml) ---
# vault_agentmail_api_key
# vault_forgejo_api_token
# vault_codeberg_api_token (add when Codeberg account is ready)
# vault_forgejo_admin_password (initial admin password)

View file

@ -99,6 +99,16 @@
group: wetgit
mode: "0755"
# --- Daily sync script ---
- name: Deploy daily sync script
template:
src: daily-sync.sh.j2
dest: "{{ app_dir }}/scripts/daily-sync.sh"
owner: wetgit
group: wetgit
mode: "0755"
# --- Mirror script ---
- name: Deploy Codeberg mirror script
@ -120,6 +130,14 @@
# --- Cron jobs ---
- name: Configure daily sync cron (03:00)
cron:
name: "wetgit-daily-sync"
user: wetgit
hour: "3"
minute: "0"
job: "{{ app_dir }}/scripts/daily-sync.sh >> {{ app_dir }}/logs/sync.log 2>&1"
- name: Configure backup cron (weekly Sunday 02:00)
cron:
name: "wetgit-backup"

View file

@ -0,0 +1,53 @@
#!/usr/bin/env bash
set -euo pipefail
# WetGIT dagelijkse sync — managed by Ansible
# Haalt delta-updates op via SRU en commit naar wetgit/rijk.
LOG_PREFIX="[$(date '+%Y-%m-%d %H:%M:%S')]"
SYNC_DIR="{{ app_dir }}/sync"
RIJK_REPO="$SYNC_DIR/rijk"
XML_CACHE="{{ data_dir }}/xml-cache"
META_REPO="$SYNC_DIR/meta"
LOCK_FILE="$SYNC_DIR/.sync.lock"
# Voorkom parallelle runs
if [ -f "$LOCK_FILE" ]; then
pid=$(cat "$LOCK_FILE")
if kill -0 "$pid" 2>/dev/null; then
echo "$LOG_PREFIX Sync al actief (PID $pid), overslaan."
exit 0
fi
fi
echo $$ > "$LOCK_FILE"
trap "rm -f $LOCK_FILE" EXIT
# Zorg dat de repos gecloned zijn
mkdir -p "$SYNC_DIR" "$XML_CACHE"
if [ ! -d "$RIJK_REPO/.git" ]; then
echo "$LOG_PREFIX Eerste run — clone wetgit/rijk..."
git clone https://{{ forgejo_admin_user }}:{{ forgejo_api_token }}@{{ forgejo_domain }}/wetgit/rijk.git "$RIJK_REPO"
cd "$RIJK_REPO"
git config user.name "Coornhert"
git config user.email "coornhert@wetgit.nl"
else
cd "$RIJK_REPO"
git pull --ff-only origin main 2>/dev/null || true
fi
if [ ! -d "$META_REPO/.git" ]; then
echo "$LOG_PREFIX Eerste run — clone wetgit/meta..."
git clone https://{{ forgejo_admin_user }}:{{ forgejo_api_token }}@{{ forgejo_domain }}/wetgit/meta.git "$META_REPO"
fi
cd "$META_REPO"
git pull --ff-only origin main 2>/dev/null || true
echo "$LOG_PREFIX Start sync..."
PYTHONPATH="$META_REPO/src" python3 -m wetgit.pipeline.sync \
--rijk-repo "$RIJK_REPO" \
--xml-cache "$XML_CACHE" \
--delay 0.15
echo "$LOG_PREFIX Sync voltooid."

229
src/wetgit/pipeline/sync.py Normal file
View file

@ -0,0 +1,229 @@
"""Dagelijkse sync — haalt delta-updates op via SRU en commit naar de rijk repo.
Vergelijkt de SRU catalogus met de lokale staat en verwerkt alleen wijzigingen.
Draait als standalone script of als Celery-taak.
Usage:
python -m wetgit.pipeline.sync --rijk-repo /path/to/rijk --xml-cache /path/to/cache
"""
from __future__ import annotations
import json
import logging
import subprocess
import time
from datetime import date, datetime
from pathlib import Path
import httpx
from lxml import etree
from wetgit.pipeline.bwb_parser import parse_bwb_xml
from wetgit.pipeline.downloader import download_xml
from wetgit.pipeline.sru_client import BWB_TYPES, SRURecord, fetch_catalogue
logger = logging.getLogger(__name__)
TYPE_TO_DIR = {
"wet": "wet",
"AMvB": "amvb",
"ministeriele-regeling": "ministeriele-regeling",
"KB": "kb",
"rijkswet": "rijkswet",
"verdrag": "verdrag",
"beleidsregel": "beleidsregel",
"circulaire": "circulaire",
"zbo": "zbo",
"pbo": "pbo",
}
def sync(
rijk_repo: Path,
xml_cache: Path,
types: list[str] | None = None,
delay: float = 0.15,
dry_run: bool = False,
) -> dict[str, int]:
"""Synchroniseer de rijk repo met de laatste BWB stand.
Args:
rijk_repo: Pad naar de lokale clone van wetgit/rijk.
xml_cache: Pad naar de XML cache directory.
types: Regelingtypen om te synchen (default: alle).
delay: Vertraging tussen downloads (sec).
dry_run: Als True, geen git commits.
Returns:
Dict met statistieken.
"""
xml_cache.mkdir(parents=True, exist_ok=True)
types = types or BWB_TYPES
stats = {"checked": 0, "new": 0, "updated": 0, "failed": 0, "unchanged": 0}
# Stap 1: Haal de huidige catalogus op
logger.info("SRU catalogus ophalen...")
records = fetch_catalogue(types=types, delay=0.3)
logger.info("Catalogus: %d SRU records", len(records))
# Dedupliceer op BWB-ID (neem meest recente toestand)
seen: dict[str, SRURecord] = {}
for r in records:
seen[r.bwb_id] = r
unique = list(seen.values())
logger.info("Na deduplicatie: %d unieke regelingen", len(unique))
# Stap 2: Vergelijk met lokale staat
changes: list[tuple[SRURecord, str]] = [] # (record, "new"|"updated")
for record in unique:
stats["checked"] += 1
type_dir = TYPE_TO_DIR.get(record.type, "overig")
slug = _slugify(record.titel) if record.titel else record.bwb_id.lower()
md_path = rijk_repo / type_dir / slug / record.bwb_id / "README.md"
if not md_path.exists():
changes.append((record, "new"))
else:
# Check of de XML URL veranderd is (nieuwe toestand)
# Download de XML en vergelijk met bestaande output
xml_path = xml_cache / f"{record.bwb_id}.xml"
new_xml = _download_fresh(record, xml_cache)
if new_xml is None:
continue
try:
result = parse_bwb_xml(str(new_xml))
existing = md_path.read_text(encoding="utf-8")
if result.markdown != existing:
changes.append((record, "updated"))
else:
stats["unchanged"] += 1
except Exception as e:
logger.warning("Parse error bij vergelijking %s: %s", record.bwb_id, e)
stats["unchanged"] += 1
if stats["checked"] % 500 == 0:
logger.info("Gecontroleerd: %d/%d", stats["checked"], len(unique))
if delay > 0 and stats["checked"] % 10 == 0:
time.sleep(delay)
logger.info("Wijzigingen gevonden: %d nieuw, %d gewijzigd",
sum(1 for _, t in changes if t == "new"),
sum(1 for _, t in changes if t == "updated"))
if not changes:
logger.info("Geen wijzigingen — alles up-to-date.")
return stats
# Stap 3: Verwerk wijzigingen
for record, change_type in changes:
xml_path = xml_cache / f"{record.bwb_id}.xml"
if not xml_path.exists():
xml_path_result = download_xml(record, xml_cache)
if xml_path_result is None:
stats["failed"] += 1
continue
try:
result = parse_bwb_xml(str(xml_path))
type_dir = TYPE_TO_DIR.get(record.type, "overig")
slug = _slugify(record.titel) if record.titel else record.bwb_id.lower()
regeling_dir = rijk_repo / type_dir / slug / record.bwb_id
regeling_dir.mkdir(parents=True, exist_ok=True)
(regeling_dir / "README.md").write_text(result.markdown, encoding="utf-8")
if change_type == "new":
stats["new"] += 1
else:
stats["updated"] += 1
except Exception as e:
logger.warning("Verwerking mislukt voor %s: %s", record.bwb_id, e)
stats["failed"] += 1
# Stap 4: Git commit + push
if not dry_run and (stats["new"] > 0 or stats["updated"] > 0):
_git_commit_and_push(rijk_repo, stats)
logger.info(
"Sync klaar: %d nieuw, %d gewijzigd, %d ongewijzigd, %d mislukt",
stats["new"], stats["updated"], stats["unchanged"], stats["failed"],
)
return stats
def _download_fresh(record: SRURecord, xml_cache: Path) -> Path | None:
"""Download XML, overschrijf bestaande cache."""
xml_path = xml_cache / f"{record.bwb_id}.xml"
try:
resp = httpx.get(record.xml_url, timeout=60, follow_redirects=True)
resp.raise_for_status()
xml_path.write_bytes(resp.content)
return xml_path
except httpx.HTTPError as e:
logger.warning("Download failed for %s: %s", record.bwb_id, e)
return None
def _git_commit_and_push(repo_path: Path, stats: dict[str, int]) -> None:
"""Commit en push wijzigingen naar de remote."""
today = date.today().isoformat()
msg = f"{today} | sync | {stats['new']} nieuw, {stats['updated']} gewijzigd"
try:
subprocess.run(["git", "add", "-A"], cwd=repo_path, check=True, capture_output=True)
subprocess.run(
["git", "commit", "-m", msg,
"--author", "Coornhert <coornhert@wetgit.nl>"],
cwd=repo_path, check=True, capture_output=True,
)
subprocess.run(
["git", "push", "origin", "main"],
cwd=repo_path, check=True, capture_output=True, timeout=300,
)
logger.info("Git push succesvol: %s", msg)
except subprocess.CalledProcessError as e:
logger.error("Git operatie mislukt: %s", e.stderr.decode() if e.stderr else str(e))
except subprocess.TimeoutExpired:
logger.error("Git push timeout (300s)")
def _slugify(text: str) -> str:
"""Maak een URL-veilige slug van een titel."""
import re
slug = text.lower().strip()
slug = re.sub(r"[^\w\s-]", "", slug)
slug = re.sub(r"[\s_]+", "-", slug)
slug = re.sub(r"-+", "-", slug)
return slug[:80].strip("-")
if __name__ == "__main__":
import argparse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
parser = argparse.ArgumentParser(description="WetGit dagelijkse sync")
parser.add_argument("--rijk-repo", type=Path, required=True, help="Pad naar wetgit/rijk clone")
parser.add_argument("--xml-cache", type=Path, required=True, help="XML cache directory")
parser.add_argument("--type", action="append", dest="types", help="Regelingtype (herhaalbaar)")
parser.add_argument("--delay", type=float, default=0.15, help="Delay tussen downloads")
parser.add_argument("--dry-run", action="store_true", help="Geen git commits")
args = parser.parse_args()
stats = sync(
rijk_repo=args.rijk_repo,
xml_cache=args.xml_cache,
types=args.types,
delay=args.delay,
dry_run=args.dry_run,
)
print(json.dumps(stats, indent=2))