diff --git a/src/wetgit/pipeline/downloader.py b/src/wetgit/pipeline/downloader.py new file mode 100644 index 0000000..06e04fe --- /dev/null +++ b/src/wetgit/pipeline/downloader.py @@ -0,0 +1,41 @@ +"""BWB XML downloader. + +Downloadt BWB toestand-XML bestanden van de officiele repository. +""" + +from __future__ import annotations + +import logging +from pathlib import Path + +import httpx + +from wetgit.pipeline.sru_client import SRURecord + +logger = logging.getLogger(__name__) + + +def download_xml(record: SRURecord, output_dir: Path) -> Path | None: + """Download de XML voor een SRU record. + + Args: + record: SRU record met de download-URL. + output_dir: Directory om de XML op te slaan. + + Returns: + Pad naar het gedownloade bestand, of None bij fout. + """ + xml_path = output_dir / f"{record.bwb_id}.xml" + + if xml_path.exists(): + return xml_path + + try: + resp = httpx.get(record.xml_url, timeout=60, follow_redirects=True) + resp.raise_for_status() + xml_path.write_bytes(resp.content) + logger.debug("Downloaded %s (%d KB)", record.bwb_id, len(resp.content) // 1024) + return xml_path + except httpx.HTTPError as e: + logger.warning("Download failed for %s: %s", record.bwb_id, e) + return None diff --git a/src/wetgit/pipeline/runner.py b/src/wetgit/pipeline/runner.py new file mode 100644 index 0000000..3b04a34 --- /dev/null +++ b/src/wetgit/pipeline/runner.py @@ -0,0 +1,151 @@ +"""Pipeline runner — orkestreert SRU crawl, download en parse. + +Usage: + python -m wetgit.pipeline.runner --type wet --limit 100 --output /tmp/wetgit-output +""" + +from __future__ import annotations + +import json +import logging +import time +from pathlib import Path + +from wetgit.pipeline.bwb_parser import parse_bwb_xml +from wetgit.pipeline.downloader import download_xml +from wetgit.pipeline.sru_client import BWB_TYPES, SRURecord, fetch_catalogue + +logger = logging.getLogger(__name__) + +# Mappenstructuur conform PRD +TYPE_TO_DIR = { + "wet": "wet", + "AMvB": "amvb", + "ministeriele-regeling": "ministeriele-regeling", + "KB": "kb", + "rijkswet": "rijkswet", + "verdrag": "verdrag", + "beleidsregel": "beleidsregel", + "circulaire": "circulaire", + "zbo": "zbo", + "pbo": "pbo", +} + + +def run_pipeline( + output_dir: Path, + xml_cache_dir: Path | None = None, + types: list[str] | None = None, + limit: int | None = None, + delay: float = 0.3, +) -> dict[str, int]: + """Draai de volledige BWB pipeline. + + Args: + output_dir: Waar de Markdown bestanden geschreven worden. + xml_cache_dir: Cache directory voor gedownloade XML (default: output_dir/.xml-cache). + types: Regelingtypen om te verwerken (default: alle). + limit: Maximum aantal regelingen (None = alles). + delay: Vertraging tussen downloads (sec). + + Returns: + Dict met statistieken (success, failed, skipped). + """ + xml_cache_dir = xml_cache_dir or output_dir / ".xml-cache" + xml_cache_dir.mkdir(parents=True, exist_ok=True) + output_dir.mkdir(parents=True, exist_ok=True) + + stats = {"catalogus": 0, "downloaded": 0, "parsed": 0, "failed": 0, "skipped": 0} + + # Stap 1: SRU catalogus ophalen + logger.info("Stap 1: SRU catalogus ophalen (types=%s, limit=%s)", types, limit) + records = fetch_catalogue(types=types, max_records=limit) + stats["catalogus"] = len(records) + logger.info("Catalogus: %d regelingen gevonden", len(records)) + + # Dedupliceer op BWB-ID (SRU kan meerdere toestanden per regeling geven) + # Neem de laatste (meest recente) per BWB-ID + seen: dict[str, SRURecord] = {} + for r in records: + seen[r.bwb_id] = r # Laatste wint + unique_records = list(seen.values()) + logger.info("Na deduplicatie: %d unieke regelingen", len(unique_records)) + + # Stap 2+3: Download en parse + for i, record in enumerate(unique_records): + if i > 0 and i % 100 == 0: + logger.info("Voortgang: %d/%d (%d parsed, %d failed)", + i, len(unique_records), stats["parsed"], stats["failed"]) + + # Bepaal output pad: output_dir/{type}/{slug}/{BWB_ID}/README.md + type_dir = TYPE_TO_DIR.get(record.type, "overig") + slug = _slugify(record.titel) if record.titel else record.bwb_id.lower() + regeling_dir = output_dir / type_dir / slug / record.bwb_id + md_path = regeling_dir / "README.md" + + if md_path.exists(): + stats["skipped"] += 1 + continue + + # Download + xml_path = download_xml(record, xml_cache_dir) + if xml_path is None: + stats["failed"] += 1 + continue + stats["downloaded"] += 1 + + # Parse + try: + result = parse_bwb_xml(str(xml_path)) + regeling_dir.mkdir(parents=True, exist_ok=True) + md_path.write_text(result.markdown, encoding="utf-8") + stats["parsed"] += 1 + except Exception as e: + logger.warning("Parse failed for %s: %s", record.bwb_id, e) + stats["failed"] += 1 + + if delay > 0: + time.sleep(delay) + + logger.info( + "Pipeline klaar: %d parsed, %d failed, %d skipped van %d", + stats["parsed"], stats["failed"], stats["skipped"], stats["catalogus"], + ) + return stats + + +def _slugify(text: str) -> str: + """Maak een URL-veilige slug van een titel.""" + import re + slug = text.lower().strip() + slug = re.sub(r"[^\w\s-]", "", slug) + slug = re.sub(r"[\s_]+", "-", slug) + slug = re.sub(r"-+", "-", slug) + return slug[:80].strip("-") + + +if __name__ == "__main__": + import argparse + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", + ) + + parser = argparse.ArgumentParser(description="WetGit BWB pipeline") + parser.add_argument("--output", type=Path, required=True, help="Output directory") + parser.add_argument("--xml-cache", type=Path, help="XML cache directory") + parser.add_argument("--type", action="append", dest="types", help="Regelingtype (herhaalbaar)") + parser.add_argument("--limit", type=int, help="Maximum aantal regelingen") + parser.add_argument("--delay", type=float, default=0.3, help="Delay tussen downloads (sec)") + args = parser.parse_args() + + stats = run_pipeline( + output_dir=args.output, + xml_cache_dir=args.xml_cache, + types=args.types, + limit=args.limit, + delay=args.delay, + ) + print(json.dumps(stats, indent=2)) diff --git a/src/wetgit/pipeline/sru_client.py b/src/wetgit/pipeline/sru_client.py new file mode 100644 index 0000000..08e53c4 --- /dev/null +++ b/src/wetgit/pipeline/sru_client.py @@ -0,0 +1,186 @@ +"""SRU client voor het Basiswettenbestand. + +Crawlt de SRU-zoekservice van overheid.nl om BWB regelingen te vinden +en download-URLs voor de XML-toestanden op te halen. +""" + +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass, field + +import httpx +from lxml import etree + +logger = logging.getLogger(__name__) + +SRU_BASE = "https://zoekservice.overheid.nl/sru/Search" +SRU_NS = { + "srw": "http://www.loc.gov/zing/srw/", + "bwb": "http://standaarden.overheid.nl/bwb/terms/", + "dcterms": "http://purl.org/dc/terms/", + "overheid": "http://standaarden.overheid.nl/owms/terms/", +} + +# Alle regelingtypen in het BWB +BWB_TYPES = [ + "wet", "AMvB", "ministeriele-regeling", "KB", "rijkswet", + "verdrag", "beleidsregel", "circulaire", "zbo", "pbo", +] + + +@dataclass(frozen=True) +class SRURecord: + """Eén regeling uit de SRU-response.""" + + bwb_id: str + titel: str + type: str + xml_url: str + wti_url: str + datum_geldig_van: str | None = None + datum_geldig_tot: str | None = None + ministerie: str | None = None + + +def fetch_catalogue( + types: list[str] | None = None, + max_records: int | None = None, + batch_size: int = 100, + delay: float = 0.5, +) -> list[SRURecord]: + """Haal de volledige BWB catalogus op via SRU. + + Args: + types: Lijst van regelingtypen om op te halen (default: alle). + max_records: Maximum aantal records (None = alles). + batch_size: Aantal records per SRU-request (max 100). + delay: Vertraging tussen requests (sec) om de server niet te overbelasten. + + Returns: + Lijst van SRURecords met metadata en download-URLs. + """ + types = types or BWB_TYPES + all_records: list[SRURecord] = [] + + for type_name in types: + query = f"dcterms.type=={type_name}" + start = 1 + + while True: + records, total = _fetch_page(query, start, batch_size) + all_records.extend(records) + logger.info( + "SRU %s: %d-%d / %d (totaal: %d)", + type_name, start, start + len(records) - 1, total, len(all_records), + ) + + if max_records and len(all_records) >= max_records: + return all_records[:max_records] + + start += len(records) + if start > total or not records: + break + + time.sleep(delay) + + return all_records + + +def fetch_latest_toestand(bwb_id: str) -> SRURecord | None: + """Haal de meest recente toestand op voor één BWB-ID. + + Args: + bwb_id: Het BWB identificatienummer (bijv. BWBR0001840). + + Returns: + SRURecord of None als niet gevonden. + """ + records, _ = _fetch_page(f"dcterms.identifier={bwb_id}", 1, 100) + if not records: + return None + # Neem de laatste (meest recente toestand) + return records[-1] + + +def _fetch_page( + query: str, start_record: int, maximum_records: int, +) -> tuple[list[SRURecord], int]: + """Haal één pagina SRU-resultaten op.""" + params = { + "operation": "searchRetrieve", + "version": "1.2", + "x-connection": "BWB", + "query": query, + "startRecord": str(start_record), + "maximumRecords": str(maximum_records), + } + + resp = httpx.get(SRU_BASE, params=params, timeout=30) + resp.raise_for_status() + tree = etree.fromstring(resp.content) + + # Totaal aantal records + total_el = tree.find(".//srw:numberOfRecords", SRU_NS) + total = int(total_el.text) if total_el is not None and total_el.text else 0 + + records: list[SRURecord] = [] + for record in tree.findall(".//srw:record", SRU_NS): + parsed = _parse_record(record) + if parsed: + records.append(parsed) + + return records, total + + +def _parse_record(record: etree._Element) -> SRURecord | None: + """Parse één SRU record naar een SRURecord.""" + # Zoek in originalData en enrichedData + bwb_id = "" + titel = "" + type_ = "" + xml_url = "" + wti_url = "" + datum_van = None + datum_tot = None + ministerie = None + + for elem in record.iter(): + tag = elem.tag + text = (elem.text or "").strip() + + if not text: + continue + + if tag.endswith("}identifier") and "BWB" in text: + bwb_id = text + elif tag.endswith("}title"): + titel = text + elif tag.endswith("}type") and text in BWB_TYPES: + type_ = text + elif tag.endswith("}authority") or tag.endswith("}creator"): + if not ministerie: + ministerie = text + elif "locatie_toestand" in tag: + xml_url = text + elif "locatie_wti" in tag: + wti_url = text + elif "geldigheidsperiode_startdatum" in tag: + datum_van = text + elif "geldigheidsperiode_einddatum" in tag: + datum_tot = text + + if not bwb_id or not xml_url: + return None + + return SRURecord( + bwb_id=bwb_id, + titel=titel, + type=type_, + xml_url=xml_url, + wti_url=wti_url, + datum_geldig_van=datum_van, + datum_geldig_tot=datum_tot, + ministerie=ministerie, + )