feat: Fase 2/3 features — hybrid search, feed, domeinen, referenties

- Hybrid search met Reciprocal Rank Fusion (Meilisearch + Qdrant)
- Atom feed endpoint (/api/v1/feed.xml) op basis van git log
- Compliance-domeinen classificatie (NIS2, DORA, AVG, etc.)
  en endpoints /api/v1/domeinen + /api/v1/regelingen/{id}/domeinen
- Cross-referentie extractie tussen regelingen, endpoint
  /api/v1/regelingen/{id}/referenties
- Change-alerts uitgebreid met webhook-notificaties en
  optionele domein-filter
- Rate limiting via slowapi (60/min default, 30/min zoeken)
- datum_verval veld in RegelingMeta + indexer
- Celery app module (wetgit.tasks) voor sync/reindex/alerts
- REPO_PATH respecteert WETGIT_GIT_REPOS_DIR (voor Ansible deploy)
This commit is contained in:
Coornhert 2026-04-21 20:58:27 +02:00
parent 3065243f73
commit 34fd5a2bf3
8 changed files with 680 additions and 49 deletions

View file

@ -13,7 +13,6 @@ classifiers = [
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Intended Audience :: Legal Industry",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.13",
"Topic :: Text Processing :: Markup",
]
@ -34,6 +33,9 @@ api = [
"uvicorn>=0.30",
"celery>=5.4",
"redis>=5.0",
"markdown>=3.5",
"jinja2>=3.1",
"slowapi>=0.1.9",
]
dev = [
"pytest>=8.0",
@ -61,6 +63,9 @@ build-backend = "setuptools.build_meta"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools.package-data]
"wetgit.web" = ["static/**/*", "templates/**/*"]
[tool.pytest.ini_options]
testpaths = ["tests"]
markers = [

View file

@ -1,21 +1,30 @@
"""Change-alerts — stuur notificaties bij wetswijzigingen.
Vergelijkt de huidige staat met de vorige en stuurt een e-mail
met een AI-gegenereerde change-summary via AgentMail.
Vergelijkt de huidige staat met de vorige en stuurt notificaties
via e-mail (AgentMail) en/of webhooks.
Ondersteunt domein-filtering (NIS2, DORA, AVG, etc.) zodat
abonnees alleen relevante wijzigingen ontvangen.
Usage:
python -m wetgit.ai.alerts --bwb-id BWBR0001840 --diff "..."
python -m wetgit.ai.alerts --test # Stuur test-alert
python -m wetgit.ai.alerts --domains # Toon beschikbare domeinen
"""
from __future__ import annotations
import json
import logging
import os
from datetime import date
import sys
import httpx
from wetgit.ai.domains import classify_regeling
logger = logging.getLogger(__name__)
MISTRAL_API_URL = "https://api.mistral.ai/v1/chat/completions"
@ -36,22 +45,34 @@ def send_change_alert(
titel: str,
diff_text: str,
recipients: list[str] | None = None,
webhooks: list[str] | None = None,
domain_filter: list[str] | None = None,
mistral_api_key: str | None = None,
agentmail_api_key: str | None = None,
) -> bool:
"""Genereer een change-summary en stuur een e-mail alert.
"""Genereer een change-summary en stuur notificaties.
Args:
bwb_id: BWB identificatienummer van de gewijzigde regeling.
titel: Titel van de regeling.
diff_text: Git diff van de wijziging.
recipients: E-mailadressen (default: coornhert@wetgit.nl).
webhooks: Lijst van webhook URLs om een POST naar te sturen.
domain_filter: Alleen alert sturen als regeling bij deze domeinen hoort.
Als None, altijd sturen.
mistral_api_key: Mistral API key.
agentmail_api_key: AgentMail API key.
Returns:
True als de alert succesvol verstuurd is.
True als minstens één notificatie verstuurd is.
"""
# Domein-filtering
if domain_filter:
matched_domains = classify_regeling(titel, diff_text)
if not any(d in domain_filter for d in matched_domains):
logger.info("Regeling %s matcht niet met domeinen %s, alert overgeslagen", bwb_id, domain_filter)
return False
mistral_key = mistral_api_key or os.environ.get("MISTRAL_API_KEY", "")
agentmail_key = agentmail_api_key or os.environ.get("AGENTMAIL_API_KEY", "")
recipients = recipients or ["coornhert@wetgit.nl"]
@ -100,7 +121,7 @@ Dit is geen juridisch advies.
"""
# Stap 4: Verstuur via AgentMail
return _send_email(
email_ok = _send_email(
from_address="coornhert@wetgit.nl",
to_addresses=recipients,
subject=subject,
@ -108,6 +129,23 @@ Dit is geen juridisch advies.
agentmail_key=agentmail_key,
)
# Stap 5: Verstuur webhooks
webhook_ok = True
matched_domains = classify_regeling(titel, diff_text)
for url in (webhooks or []):
webhook_ok = _send_webhook(url, {
"event": "regeling.gewijzigd",
"bwb_id": bwb_id,
"titel": titel,
"datum": date.today().isoformat(),
"regels_toegevoegd": added,
"regels_verwijderd": removed,
"domeinen": matched_domains,
"samenvatting": summary,
}) and webhook_ok
return email_ok or webhook_ok
def _generate_change_summary(titel: str, diff_text: str, api_key: str) -> str | None:
"""Genereer een AI-samenvatting van de wetswijziging."""
@ -170,6 +208,23 @@ def _send_email(
return False
def _send_webhook(url: str, payload: dict) -> bool:
"""Verstuur een webhook POST."""
try:
resp = httpx.post(
url,
json=payload,
headers={"Content-Type": "application/json", "User-Agent": "WetGIT/1.0"},
timeout=10,
)
resp.raise_for_status()
logger.info("Webhook verstuurd naar %s", url)
return True
except httpx.HTTPError as e:
logger.error("Webhook fout voor %s: %s", url, e)
return False
if __name__ == "__main__":
import argparse
@ -177,10 +232,19 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WetGit change alerts")
parser.add_argument("--test", action="store_true", help="Stuur een test-alert")
parser.add_argument("--domains", action="store_true", help="Toon beschikbare domeinen")
parser.add_argument("--bwb-id", default="BWBR0001840")
parser.add_argument("--to", default="coornhert@wetgit.nl")
parser.add_argument("--domain-filter", nargs="*", help="Filter op domeinen")
parser.add_argument("--webhook", nargs="*", help="Webhook URLs")
args = parser.parse_args()
if args.domains:
from wetgit.ai.domains import list_domains
for d in list_domains():
print(f" {d['naam']}: {', '.join(d['keywords'][:3])}...")
sys.exit(0)
if args.test:
# Simuleer een wijziging in de Grondwet
test_diff = """--- a/wet/grondwet/BWBR0001840/README.md

209
src/wetgit/ai/crossref.py Normal file
View file

@ -0,0 +1,209 @@
"""Cross-referentie analyse — extract verwijzingen tussen regelingen.
Parseert wetteksten op verwijzingen naar andere regelingen en bouwt
een doorzoekbare graaf op als JSON adjacency list.
Usage:
python -m wetgit.ai.crossref --repo /path/to/rijk
python -m wetgit.ai.crossref --repo /path/to/rijk --query BWBR0001840
"""
from __future__ import annotations
import json
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# Patronen voor verwijzingen naar andere regelingen
# "de Telecommunicatiewet", "de Wet open overheid", "het Burgerlijk Wetboek"
WET_REF_PATTERN = re.compile(
r"(?:de|het|van de|in de|bij de|krachtens de|bedoeld in de)\s+"
r"((?:Wet|Algemene wet|Wetboek|Boek|Grondwet|Besluit|Regeling|Verordening)"
r"(?:\s+(?:op|van|tot|inzake|betreffende|ter))?"
r"(?:\s+\w+){0,6}?)"
r"(?=[,.\s;)])",
re.IGNORECASE,
)
# "artikel 6 van de AVG", "artikel 1.1, tweede lid"
ARTIKEL_REF_PATTERN = re.compile(
r"artikel(?:en)?\s+([\d.]+(?:\s*(?:,\s*[\d.]+|tot en met\s+[\d.]+))*)"
r"(?:\s*,?\s*(?:eerste|tweede|derde|vierde|vijfde|zesde|zevende|achtste|negende|tiende)\s+lid)?"
r"(?:\s+van\s+(?:de|het)\s+(.+?))?(?=[,.\s;)])",
re.IGNORECASE,
)
# BWB-ID verwijzingen (zeldzaam in tekst, maar soms in metadata)
BWB_REF_PATTERN = re.compile(r"BWBR\d{7}")
def extract_references(bwb_id: str, tekst: str) -> list[dict]:
"""Extraheer verwijzingen uit een wettekst.
Args:
bwb_id: BWB-ID van de bronregeling.
tekst: Volledige Markdown tekst.
Returns:
Lijst van dicts met bron, doel, type, en context.
"""
refs: list[dict] = []
seen: set[str] = set()
# Zoek directe BWB-ID verwijzingen
for match in BWB_REF_PATTERN.finditer(tekst):
target_bwb = match.group(0)
if target_bwb != bwb_id and target_bwb not in seen:
seen.add(target_bwb)
start = max(0, match.start() - 50)
end = min(len(tekst), match.end() + 50)
refs.append({
"bron": bwb_id,
"doel": target_bwb,
"type": "bwb_id",
"context": tekst[start:end].strip(),
})
# Zoek wet-naam verwijzingen
for match in WET_REF_PATTERN.finditer(tekst):
wet_naam = match.group(1).strip().rstrip(".,;")
if len(wet_naam) < 5 or wet_naam.lower() in ("wet", "wetboek", "besluit"):
continue
ref_key = wet_naam.lower()
if ref_key not in seen:
seen.add(ref_key)
start = max(0, match.start() - 30)
end = min(len(tekst), match.end() + 30)
refs.append({
"bron": bwb_id,
"doel_naam": wet_naam,
"type": "wet_naam",
"context": tekst[start:end].strip(),
})
return refs
def build_reference_graph(repo_path: Path) -> dict:
"""Bouw de volledige cross-referentie graaf.
Args:
repo_path: Pad naar de wetgit/rijk repo.
Returns:
Dict met nodes (regelingen) en edges (verwijzingen).
"""
index_path = repo_path / "index.json"
if index_path.exists():
data = json.loads(index_path.read_text(encoding="utf-8"))
regelingen = data.get("regelingen", [])
else:
from wetgit.pipeline.indexer import generate_index
regelingen = generate_index(repo_path)
# Bouw titel → bwb_id lookup
titel_to_bwb: dict[str, str] = {}
for r in regelingen:
titel_to_bwb[r.get("titel", "").lower()] = r["bwb_id"]
if r.get("citeertitel"):
titel_to_bwb[r["citeertitel"].lower()] = r["bwb_id"]
all_refs: list[dict] = []
edges: dict[str, list[str]] = {} # bwb_id → [verwezen bwb_ids]
for regeling in regelingen:
bwb_id = regeling["bwb_id"]
md_path = repo_path / regeling["pad"] / "README.md"
if not md_path.exists():
continue
tekst = md_path.read_text(encoding="utf-8")
refs = extract_references(bwb_id, tekst)
targets: list[str] = []
for ref in refs:
# Probeer wet_naam te resolven naar bwb_id
if ref["type"] == "wet_naam":
doel_naam = ref["doel_naam"].lower()
resolved = titel_to_bwb.get(doel_naam)
if resolved:
ref["doel"] = resolved
targets.append(ref.get("doel", ref.get("doel_naam", "")))
else:
targets.append(ref["doel"])
all_refs.append(ref)
if targets:
edges[bwb_id] = list(set(targets))
# Bereken inbound references
inbound: dict[str, list[str]] = {}
for src, dests in edges.items():
for dest in dests:
inbound.setdefault(dest, []).append(src)
logger.info(
"Graaf: %d regelingen met verwijzingen, %d unieke edges",
len(edges),
sum(len(v) for v in edges.values()),
)
return {
"nodes": len(edges),
"total_edges": sum(len(v) for v in edges.values()),
"outbound": edges,
"inbound": inbound,
"references": all_refs,
}
def query_references(
graph: dict, bwb_id: str, direction: str = "both",
) -> dict:
"""Query verwijzingen voor een specifieke regeling.
Args:
graph: De cross-referentie graaf.
bwb_id: BWB-ID om te querien.
direction: "outbound" (verwijst naar), "inbound" (verwezen door), "both".
Returns:
Dict met verwijzingen in de gevraagde richting.
"""
result: dict = {"bwb_id": bwb_id}
if direction in ("outbound", "both"):
result["verwijst_naar"] = graph.get("outbound", {}).get(bwb_id, [])
if direction in ("inbound", "both"):
result["verwezen_door"] = graph.get("inbound", {}).get(bwb_id, [])
return result
if __name__ == "__main__":
import argparse
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
parser = argparse.ArgumentParser(description="WetGit cross-referentie analyse")
parser.add_argument("--repo", type=Path, required=True)
parser.add_argument("--query", help="BWB-ID om te querien")
parser.add_argument("--output", type=Path, help="Schrijf graaf naar JSON bestand")
args = parser.parse_args()
graph = build_reference_graph(args.repo)
print(f"Graaf: {graph['nodes']} regelingen, {graph['total_edges']} verwijzingen")
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(graph, f, ensure_ascii=False, indent=2)
print(f"Geschreven: {args.output}")
if args.query:
result = query_references(graph, args.query)
print(json.dumps(result, ensure_ascii=False, indent=2))

112
src/wetgit/ai/domains.py Normal file
View file

@ -0,0 +1,112 @@
"""Domein-classificatie voor change-alerts.
Classificeert regelingen naar compliance-domeinen (NIS2, DORA, AVG, etc.)
op basis van keyword-matching in titel en tekst.
Domeinen zijn gedefinieerd als sets van zoektermen. Een regeling hoort bij
een domein als minstens één term voorkomt in de titel of tekst.
"""
from __future__ import annotations
import re
# Domein-definities: naam → keywords die matchen in titel/tekst
DOMAINS: dict[str, list[str]] = {
"nis2-cybersecurity": [
"netwerk- en informatiebeveiliging",
"NIS2",
"cybersecurity",
"cyberbeveiligingswet",
"beveiligingsincident",
"digitale weerbaarheid",
"CSIRT",
"Telecommunicatiewet",
"Wet beveiliging netwerk- en informatiesystemen",
],
"dora-financieel": [
"DORA",
"digitale operationele veerkracht",
"financiële sector",
"Wet op het financieel toezicht",
"DNB",
"AFM",
"ICT-risicobeheer",
"Pensioenwet",
"Bankwet",
],
"avg-privacy": [
"persoonsgegevens",
"AVG",
"GDPR",
"Uitvoeringswet Algemene verordening gegevensbescherming",
"verwerking",
"Autoriteit Persoonsgegevens",
"gegevensbescherming",
"privacy",
"betrokkene",
],
"omgevingswet": [
"Omgevingswet",
"omgevingsplan",
"omgevingsvergunning",
"omgevingsvisie",
"Besluit activiteiten leefomgeving",
"Besluit kwaliteit leefomgeving",
"milieueffectrapportage",
],
"arbeidsrecht": [
"arbeidsovereenkomst",
"Burgerlijk Wetboek Boek 7",
"Arbeidstijdenwet",
"Arbeidsomstandighedenwet",
"Wet minimumloon",
"Wet werk en zekerheid",
"Ontslagrecht",
"CAO",
"Wet arbeid vreemdelingen",
"WIA",
"Werkloosheidswet",
],
"belastingrecht": [
"Wet inkomstenbelasting",
"Wet op de vennootschapsbelasting",
"Wet op de omzetbelasting",
"Algemene wet inzake rijksbelastingen",
"Invorderingswet",
"belastingplichtige",
],
}
def classify_regeling(titel: str, tekst: str | None = None) -> list[str]:
"""Classificeer een regeling naar domeinen.
Args:
titel: Titel van de regeling.
tekst: Optioneel: volledige tekst (voor diepere matching).
Returns:
Lijst van domein-namen waar de regeling bij hoort.
"""
search_text = titel.lower()
if tekst:
# Eerste 5000 chars is genoeg voor domein-detectie
search_text += " " + tekst[:5000].lower()
matched: list[str] = []
for domain, keywords in DOMAINS.items():
for kw in keywords:
if kw.lower() in search_text:
matched.append(domain)
break
return matched
def list_domains() -> list[dict[str, str | list[str]]]:
"""Lijst van alle beschikbare domeinen met hun keywords."""
return [
{"naam": name, "keywords": keywords}
for name, keywords in DOMAINS.items()
]

View file

@ -9,11 +9,16 @@ from __future__ import annotations
import os
from pathlib import Path
from fastapi import FastAPI, HTTPException, Query
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
from wetgit import __version__
from wetgit.api.data import RegelingStore
from fastapi.responses import Response
from wetgit.api.models import (
ArtikelDetail,
ArtikelItem,
@ -25,10 +30,13 @@ from wetgit.api.models import (
ZoekResultaat,
)
REPO_PATH = Path(os.environ.get("WETGIT_REPO", "/tmp/wetgit-index-test"))
_git_repos = os.environ.get("WETGIT_GIT_REPOS_DIR", os.environ.get("WETGIT_REPO", "/data/wetgit/git-repos"))
REPO_PATH = Path(_git_repos) / "rijk" if "WETGIT_GIT_REPOS_DIR" in os.environ else Path(_git_repos)
MEILI_URL = os.environ.get("MEILI_URL", "http://127.0.0.1:7700")
QDRANT_URL = os.environ.get("QDRANT_URL", "http://127.0.0.1:6333")
limiter = Limiter(key_func=get_remote_address, default_limits=["60/minute"])
app = FastAPI(
title="WetGit API",
description="Nederlandse wetgeving als code — REST API",
@ -38,6 +46,9 @@ app = FastAPI(
openapi_url="/api/openapi.json",
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
@ -156,63 +167,182 @@ def get_diff(
# --- Zoeken ---
@app.get("/api/v1/zoeken", response_model=list[ZoekResultaat])
@limiter.limit("30/minute")
def zoeken(
request: Request,
q: str = Query(..., min_length=2, description="Zoekterm"),
type: str | None = Query(None, description="Filter op type"),
mode: str = Query("keyword", description="Zoekmodus: keyword, semantic, of hybrid"),
limit: int = Query(20, ge=1, le=100, description="Max resultaten"),
) -> list[dict]:
"""Doorzoek alle wetgeving. Modes: keyword (Meilisearch), semantic (Qdrant), hybrid (beide)."""
from wetgit.api.search import MeiliSearch
from wetgit.ai.semantic import SemanticSearch
semantic_results: list[dict] = []
keyword_results: list[dict] = []
# Semantic search
if mode in ("semantic", "hybrid"):
from wetgit.ai.semantic import SemanticSearch
sem = SemanticSearch(qdrant_url=QDRANT_URL)
if sem.health():
results = sem.search(q, limit=limit)
semantic_results = sem.search(q, limit=limit)
if mode == "semantic":
return results
return semantic_results
# Hybrid: combineer met keyword
semantic_results = {r["artikel"]: r for r in results}
# Keyword search (Meilisearch → grep fallback)
if mode in ("keyword", "hybrid"):
meili = MeiliSearch(url=MEILI_URL)
if meili.health():
filter_str = f'type = "{type}"' if type else None
result = meili.search(q, filter_=filter_str, limit=limit)
keyword_results = [
{
"bwb_id": hit["bwb_id"],
"titel": hit.get("regeling_titel", ""),
"artikel": f"Artikel {hit.get('artikel_nummer', '?')}",
"context": hit.get("tekst", "")[:200],
}
for hit in result.get("hits", [])
]
else:
# Fallback: grep-style zoeken
for regeling in store.list_regelingen():
if type and regeling.get("type") != type:
continue
tekst = store.get_tekst(regeling["bwb_id"])
if tekst is None or q.lower() not in tekst.lower():
continue
current_artikel = ""
for line in tekst.split("\n"):
if line.startswith("### Artikel"):
current_artikel = line.replace("### ", "")
if q.lower() in line.lower() and current_artikel:
keyword_results.append({
"bwb_id": regeling["bwb_id"],
"titel": regeling.get("titel", ""),
"artikel": current_artikel,
"context": line.strip()[:200],
})
if len(keyword_results) >= limit:
break
from wetgit.api.search import MeiliSearch
meili = MeiliSearch(url=MEILI_URL)
if mode == "keyword":
return keyword_results
# Probeer Meilisearch, fallback naar grep
if meili.health():
filter_str = f'type = "{type}"' if type else None
result = meili.search(q, filter_=filter_str, limit=limit)
# Hybrid: Reciprocal Rank Fusion (RRF)
k = 60 # RRF constant
scores: dict[str, float] = {}
items: dict[str, dict] = {}
return [
{
"bwb_id": hit["bwb_id"],
"titel": hit.get("regeling_titel", ""),
"artikel": f"Artikel {hit.get('artikel_nummer', '?')}",
"context": hit.get("tekst", "")[:200],
}
for hit in result.get("hits", [])
]
for rank, r in enumerate(semantic_results):
key = f"{r['bwb_id']}_{r['artikel']}"
scores[key] = scores.get(key, 0) + 1 / (k + rank + 1)
items[key] = r
# Fallback: grep-style zoeken
resultaten: list[dict] = []
for regeling in store.list_regelingen():
if type and regeling.get("type") != type:
for rank, r in enumerate(keyword_results):
key = f"{r['bwb_id']}_{r['artikel']}"
scores[key] = scores.get(key, 0) + 1 / (k + rank + 1)
if key not in items:
items[key] = r
fused = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [
{**items[key], "score": round(score, 4)} for key, score in fused[:limit]
]
# --- Feed ---
@app.get("/api/v1/feed.xml", response_class=Response)
def feed(limit: int = Query(50, ge=1, le=200, description="Max entries")) -> Response:
"""Atom feed van recente wijzigingen in de wetgeving."""
import subprocess
from datetime import datetime
try:
result = subprocess.run(
["git", "log", f"--max-count={limit}", "--format=%H%n%ai%n%s%n---"],
cwd=store.repo_path, capture_output=True, text=True, check=True,
)
except subprocess.CalledProcessError:
return Response(content="<feed/>", media_type="application/atom+xml")
entries = []
lines = result.stdout.strip().split("\n---\n")
for block in lines:
parts = block.strip().split("\n")
if len(parts) < 3:
continue
tekst = store.get_tekst(regeling["bwb_id"])
if tekst is None or q.lower() not in tekst.lower():
continue
current_artikel = ""
for line in tekst.split("\n"):
if line.startswith("### Artikel"):
current_artikel = line.replace("### ", "")
if q.lower() in line.lower() and current_artikel:
resultaten.append({
"bwb_id": regeling["bwb_id"],
"titel": regeling.get("titel", ""),
"artikel": current_artikel,
"context": line.strip()[:200],
})
if len(resultaten) >= limit:
return resultaten
return resultaten
commit_hash, date_str, subject = parts[0], parts[1], parts[2]
# Parse "2026-03-30 12:00:00 +0200"
dt = date_str.split(" +")[0].split(" -")[0]
entries.append(
f' <entry>\n'
f' <title>{_xml_escape(subject)}</title>\n'
f' <id>urn:wetgit:commit:{commit_hash}</id>\n'
f' <updated>{dt.replace(" ", "T")}Z</updated>\n'
f' <link href="https://git.wetgit.nl/wetgit/rijk/commit/{commit_hash}"/>\n'
f' <summary>{_xml_escape(subject)}</summary>\n'
f' </entry>'
)
now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
atom = (
'<?xml version="1.0" encoding="utf-8"?>\n'
'<feed xmlns="http://www.w3.org/2005/Atom">\n'
f' <title>WetGIT — Wijzigingen in Nederlandse wetgeving</title>\n'
f' <link href="https://api.wetgit.nl/api/v1/feed.xml" rel="self"/>\n'
f' <link href="https://wetgit.nl"/>\n'
f' <id>urn:wetgit:feed:wijzigingen</id>\n'
f' <updated>{now}</updated>\n'
f' <subtitle>Elke wet een Markdown-bestand, elke wijziging een Git-commit.</subtitle>\n'
+ "\n".join(entries)
+ "\n</feed>"
)
return Response(content=atom, media_type="application/atom+xml")
def _xml_escape(s: str) -> str:
"""Escape XML special characters."""
return s.replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;").replace('"', "&quot;")
# --- Domeinen ---
@app.get("/api/v1/domeinen")
def list_domeinen() -> list[dict]:
"""Lijst van beschikbare compliance-domeinen voor alerts."""
from wetgit.ai.domains import list_domains
return list_domains()
@app.get("/api/v1/regelingen/{bwb_id}/domeinen")
def get_regeling_domeinen(bwb_id: str) -> dict:
"""Classificeer een regeling naar compliance-domeinen."""
from wetgit.ai.domains import classify_regeling as classify
regeling = store.get_regeling(bwb_id)
if not regeling:
raise HTTPException(status_code=404, detail=f"Regeling {bwb_id} niet gevonden")
tekst = store.get_tekst(bwb_id)
domeinen = classify(regeling.get("titel", ""), tekst)
return {"bwb_id": bwb_id, "domeinen": domeinen}
# --- Cross-referenties ---
@app.get("/api/v1/regelingen/{bwb_id}/referenties")
def get_referenties(
bwb_id: str,
richting: str = Query("both", description="outbound, inbound, of both"),
) -> dict:
"""Toon cross-referenties van/naar een regeling."""
from wetgit.ai.crossref import extract_references
regeling = store.get_regeling(bwb_id)
if not regeling:
raise HTTPException(status_code=404, detail=f"Regeling {bwb_id} niet gevonden")
tekst = store.get_tekst(bwb_id)
if tekst is None:
return {"bwb_id": bwb_id, "referenties": []}
refs = extract_references(bwb_id, tekst)
return {"bwb_id": bwb_id, "referenties": refs, "aantal": len(refs)}

View file

@ -14,6 +14,7 @@ class RegelingMeta(BaseModel):
type: str
status: str
datum_inwerkingtreding: str | None = None
datum_verval: str | None = None
bron: str
pad: str
artikelen: int

View file

@ -73,6 +73,7 @@ def _parse_regeling(md_path: Path, repo_path: Path) -> dict | None:
"type": meta.get("type", ""),
"status": meta.get("status", ""),
"datum_inwerkingtreding": meta.get("datum_inwerkingtreding"),
"datum_verval": meta.get("datum_verval"),
"bron": meta.get("bron", ""),
"pad": rel_path,
"artikelen": artikel_count,

109
src/wetgit/tasks.py Normal file
View file

@ -0,0 +1,109 @@
"""Celery achtergrondtaken voor WetGIT.
Taken voor dagelijkse sync, alert-verwerking en indexering.
Usage:
celery -A wetgit.tasks worker --loglevel=info
celery -A wetgit.tasks beat --loglevel=info # voor scheduling
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from celery import Celery
from celery.schedules import crontab
logger = logging.getLogger(__name__)
REDIS_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/1")
REPO_PATH = Path(os.environ.get("WETGIT_GIT_REPOS_DIR", "/data/wetgit/git-repos"))
app = Celery("wetgit", broker=REDIS_URL, backend=RESULT_BACKEND)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Europe/Amsterdam",
task_track_started=True,
beat_schedule={
"daily-sync": {
"task": "wetgit.tasks.daily_sync",
"schedule": crontab(hour=3, minute=0),
},
"daily-index": {
"task": "wetgit.tasks.reindex",
"schedule": crontab(hour=3, minute=30),
},
},
)
@app.task(name="wetgit.tasks.daily_sync")
def daily_sync() -> dict:
"""Voer de dagelijkse sync uit (SRU delta-updates)."""
from wetgit.pipeline.sync import run_sync
rijk_repo = REPO_PATH / "rijk"
xml_cache = Path(os.environ.get("WETGIT_DATA_DIR", "/data/wetgit")) / "xml-cache"
result = run_sync(
rijk_repo=rijk_repo,
xml_cache=xml_cache,
)
logger.info("Sync voltooid: %s", result)
return result
@app.task(name="wetgit.tasks.reindex")
def reindex() -> dict:
"""Herindexeer Meilisearch en Qdrant."""
meili_url = os.environ.get("MEILI_URL", "http://127.0.0.1:7700")
qdrant_url = os.environ.get("QDRANT_URL", "http://127.0.0.1:6333")
rijk_repo = REPO_PATH / "rijk"
results: dict = {}
# Meilisearch
try:
from wetgit.api.search import index_repo as meili_index
results["meilisearch"] = meili_index(rijk_repo, meili_url)
except Exception as e:
logger.error("Meilisearch indexering mislukt: %s", e)
results["meilisearch_error"] = str(e)
# Qdrant
try:
from wetgit.ai.semantic import index_repo as qdrant_index
results["qdrant"] = qdrant_index(rijk_repo, qdrant_url)
except Exception as e:
logger.error("Qdrant indexering mislukt: %s", e)
results["qdrant_error"] = str(e)
return results
@app.task(name="wetgit.tasks.send_alert")
def send_alert(
bwb_id: str,
titel: str,
diff_text: str,
recipients: list[str] | None = None,
webhooks: list[str] | None = None,
domain_filter: list[str] | None = None,
) -> bool:
"""Verstuur een change-alert als achtergrondtaak."""
from wetgit.ai.alerts import send_change_alert
return send_change_alert(
bwb_id=bwb_id,
titel=titel,
diff_text=diff_text,
recipients=recipients,
webhooks=webhooks,
domain_filter=domain_filter,
)