feat: BWB pipeline — SRU crawler, downloader, runner

- SRU client: crawlt zoekservice.overheid.nl voor alle BWB regelingen
- Downloader: haalt XML op van repository.officiele-overheidspublicaties.nl
- Pipeline runner: orchestreert crawl → download → parse → schrijf Markdown
- Deduplicatie: meerdere SRU toestanden → meest recente per BWB-ID
- Mappenstructuur conform PRD: wet/{slug}/{BWB_ID}/README.md
- CLI: python -m wetgit.pipeline.runner --output /path --type wet --limit N

Getest: 28 wetten succesvol gedownload en geparsed, 0 failures.

Refs #6
This commit is contained in:
Coornhert 2026-03-29 21:27:47 +02:00
parent c481ebf9e7
commit da7d11deb9
3 changed files with 378 additions and 0 deletions

View file

@ -0,0 +1,41 @@
"""BWB XML downloader.
Downloadt BWB toestand-XML bestanden van de officiele repository.
"""
from __future__ import annotations
import logging
from pathlib import Path
import httpx
from wetgit.pipeline.sru_client import SRURecord
logger = logging.getLogger(__name__)
def download_xml(record: SRURecord, output_dir: Path) -> Path | None:
"""Download de XML voor een SRU record.
Args:
record: SRU record met de download-URL.
output_dir: Directory om de XML op te slaan.
Returns:
Pad naar het gedownloade bestand, of None bij fout.
"""
xml_path = output_dir / f"{record.bwb_id}.xml"
if xml_path.exists():
return xml_path
try:
resp = httpx.get(record.xml_url, timeout=60, follow_redirects=True)
resp.raise_for_status()
xml_path.write_bytes(resp.content)
logger.debug("Downloaded %s (%d KB)", record.bwb_id, len(resp.content) // 1024)
return xml_path
except httpx.HTTPError as e:
logger.warning("Download failed for %s: %s", record.bwb_id, e)
return None

View file

@ -0,0 +1,151 @@
"""Pipeline runner — orkestreert SRU crawl, download en parse.
Usage:
python -m wetgit.pipeline.runner --type wet --limit 100 --output /tmp/wetgit-output
"""
from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from wetgit.pipeline.bwb_parser import parse_bwb_xml
from wetgit.pipeline.downloader import download_xml
from wetgit.pipeline.sru_client import BWB_TYPES, SRURecord, fetch_catalogue
logger = logging.getLogger(__name__)
# Mappenstructuur conform PRD
TYPE_TO_DIR = {
"wet": "wet",
"AMvB": "amvb",
"ministeriele-regeling": "ministeriele-regeling",
"KB": "kb",
"rijkswet": "rijkswet",
"verdrag": "verdrag",
"beleidsregel": "beleidsregel",
"circulaire": "circulaire",
"zbo": "zbo",
"pbo": "pbo",
}
def run_pipeline(
output_dir: Path,
xml_cache_dir: Path | None = None,
types: list[str] | None = None,
limit: int | None = None,
delay: float = 0.3,
) -> dict[str, int]:
"""Draai de volledige BWB pipeline.
Args:
output_dir: Waar de Markdown bestanden geschreven worden.
xml_cache_dir: Cache directory voor gedownloade XML (default: output_dir/.xml-cache).
types: Regelingtypen om te verwerken (default: alle).
limit: Maximum aantal regelingen (None = alles).
delay: Vertraging tussen downloads (sec).
Returns:
Dict met statistieken (success, failed, skipped).
"""
xml_cache_dir = xml_cache_dir or output_dir / ".xml-cache"
xml_cache_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
stats = {"catalogus": 0, "downloaded": 0, "parsed": 0, "failed": 0, "skipped": 0}
# Stap 1: SRU catalogus ophalen
logger.info("Stap 1: SRU catalogus ophalen (types=%s, limit=%s)", types, limit)
records = fetch_catalogue(types=types, max_records=limit)
stats["catalogus"] = len(records)
logger.info("Catalogus: %d regelingen gevonden", len(records))
# Dedupliceer op BWB-ID (SRU kan meerdere toestanden per regeling geven)
# Neem de laatste (meest recente) per BWB-ID
seen: dict[str, SRURecord] = {}
for r in records:
seen[r.bwb_id] = r # Laatste wint
unique_records = list(seen.values())
logger.info("Na deduplicatie: %d unieke regelingen", len(unique_records))
# Stap 2+3: Download en parse
for i, record in enumerate(unique_records):
if i > 0 and i % 100 == 0:
logger.info("Voortgang: %d/%d (%d parsed, %d failed)",
i, len(unique_records), stats["parsed"], stats["failed"])
# Bepaal output pad: output_dir/{type}/{slug}/{BWB_ID}/README.md
type_dir = TYPE_TO_DIR.get(record.type, "overig")
slug = _slugify(record.titel) if record.titel else record.bwb_id.lower()
regeling_dir = output_dir / type_dir / slug / record.bwb_id
md_path = regeling_dir / "README.md"
if md_path.exists():
stats["skipped"] += 1
continue
# Download
xml_path = download_xml(record, xml_cache_dir)
if xml_path is None:
stats["failed"] += 1
continue
stats["downloaded"] += 1
# Parse
try:
result = parse_bwb_xml(str(xml_path))
regeling_dir.mkdir(parents=True, exist_ok=True)
md_path.write_text(result.markdown, encoding="utf-8")
stats["parsed"] += 1
except Exception as e:
logger.warning("Parse failed for %s: %s", record.bwb_id, e)
stats["failed"] += 1
if delay > 0:
time.sleep(delay)
logger.info(
"Pipeline klaar: %d parsed, %d failed, %d skipped van %d",
stats["parsed"], stats["failed"], stats["skipped"], stats["catalogus"],
)
return stats
def _slugify(text: str) -> str:
"""Maak een URL-veilige slug van een titel."""
import re
slug = text.lower().strip()
slug = re.sub(r"[^\w\s-]", "", slug)
slug = re.sub(r"[\s_]+", "-", slug)
slug = re.sub(r"-+", "-", slug)
return slug[:80].strip("-")
if __name__ == "__main__":
import argparse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
parser = argparse.ArgumentParser(description="WetGit BWB pipeline")
parser.add_argument("--output", type=Path, required=True, help="Output directory")
parser.add_argument("--xml-cache", type=Path, help="XML cache directory")
parser.add_argument("--type", action="append", dest="types", help="Regelingtype (herhaalbaar)")
parser.add_argument("--limit", type=int, help="Maximum aantal regelingen")
parser.add_argument("--delay", type=float, default=0.3, help="Delay tussen downloads (sec)")
args = parser.parse_args()
stats = run_pipeline(
output_dir=args.output,
xml_cache_dir=args.xml_cache,
types=args.types,
limit=args.limit,
delay=args.delay,
)
print(json.dumps(stats, indent=2))

View file

@ -0,0 +1,186 @@
"""SRU client voor het Basiswettenbestand.
Crawlt de SRU-zoekservice van overheid.nl om BWB regelingen te vinden
en download-URLs voor de XML-toestanden op te halen.
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
import httpx
from lxml import etree
logger = logging.getLogger(__name__)
SRU_BASE = "https://zoekservice.overheid.nl/sru/Search"
SRU_NS = {
"srw": "http://www.loc.gov/zing/srw/",
"bwb": "http://standaarden.overheid.nl/bwb/terms/",
"dcterms": "http://purl.org/dc/terms/",
"overheid": "http://standaarden.overheid.nl/owms/terms/",
}
# Alle regelingtypen in het BWB
BWB_TYPES = [
"wet", "AMvB", "ministeriele-regeling", "KB", "rijkswet",
"verdrag", "beleidsregel", "circulaire", "zbo", "pbo",
]
@dataclass(frozen=True)
class SRURecord:
"""Eén regeling uit de SRU-response."""
bwb_id: str
titel: str
type: str
xml_url: str
wti_url: str
datum_geldig_van: str | None = None
datum_geldig_tot: str | None = None
ministerie: str | None = None
def fetch_catalogue(
types: list[str] | None = None,
max_records: int | None = None,
batch_size: int = 100,
delay: float = 0.5,
) -> list[SRURecord]:
"""Haal de volledige BWB catalogus op via SRU.
Args:
types: Lijst van regelingtypen om op te halen (default: alle).
max_records: Maximum aantal records (None = alles).
batch_size: Aantal records per SRU-request (max 100).
delay: Vertraging tussen requests (sec) om de server niet te overbelasten.
Returns:
Lijst van SRURecords met metadata en download-URLs.
"""
types = types or BWB_TYPES
all_records: list[SRURecord] = []
for type_name in types:
query = f"dcterms.type=={type_name}"
start = 1
while True:
records, total = _fetch_page(query, start, batch_size)
all_records.extend(records)
logger.info(
"SRU %s: %d-%d / %d (totaal: %d)",
type_name, start, start + len(records) - 1, total, len(all_records),
)
if max_records and len(all_records) >= max_records:
return all_records[:max_records]
start += len(records)
if start > total or not records:
break
time.sleep(delay)
return all_records
def fetch_latest_toestand(bwb_id: str) -> SRURecord | None:
"""Haal de meest recente toestand op voor één BWB-ID.
Args:
bwb_id: Het BWB identificatienummer (bijv. BWBR0001840).
Returns:
SRURecord of None als niet gevonden.
"""
records, _ = _fetch_page(f"dcterms.identifier={bwb_id}", 1, 100)
if not records:
return None
# Neem de laatste (meest recente toestand)
return records[-1]
def _fetch_page(
query: str, start_record: int, maximum_records: int,
) -> tuple[list[SRURecord], int]:
"""Haal één pagina SRU-resultaten op."""
params = {
"operation": "searchRetrieve",
"version": "1.2",
"x-connection": "BWB",
"query": query,
"startRecord": str(start_record),
"maximumRecords": str(maximum_records),
}
resp = httpx.get(SRU_BASE, params=params, timeout=30)
resp.raise_for_status()
tree = etree.fromstring(resp.content)
# Totaal aantal records
total_el = tree.find(".//srw:numberOfRecords", SRU_NS)
total = int(total_el.text) if total_el is not None and total_el.text else 0
records: list[SRURecord] = []
for record in tree.findall(".//srw:record", SRU_NS):
parsed = _parse_record(record)
if parsed:
records.append(parsed)
return records, total
def _parse_record(record: etree._Element) -> SRURecord | None:
"""Parse één SRU record naar een SRURecord."""
# Zoek in originalData en enrichedData
bwb_id = ""
titel = ""
type_ = ""
xml_url = ""
wti_url = ""
datum_van = None
datum_tot = None
ministerie = None
for elem in record.iter():
tag = elem.tag
text = (elem.text or "").strip()
if not text:
continue
if tag.endswith("}identifier") and "BWB" in text:
bwb_id = text
elif tag.endswith("}title"):
titel = text
elif tag.endswith("}type") and text in BWB_TYPES:
type_ = text
elif tag.endswith("}authority") or tag.endswith("}creator"):
if not ministerie:
ministerie = text
elif "locatie_toestand" in tag:
xml_url = text
elif "locatie_wti" in tag:
wti_url = text
elif "geldigheidsperiode_startdatum" in tag:
datum_van = text
elif "geldigheidsperiode_einddatum" in tag:
datum_tot = text
if not bwb_id or not xml_url:
return None
return SRURecord(
bwb_id=bwb_id,
titel=titel,
type=type_,
xml_url=xml_url,
wti_url=wti_url,
datum_geldig_van=datum_van,
datum_geldig_tot=datum_tot,
ministerie=ministerie,
)