feat: indexers productieklaar voor volledige 40k-regelingen run

Kwaliteits- en betrouwbaarheidsverbeteringen zodat een full re-index
van alle Nederlandse regelingen veilig en observeerbaar draait.

- Deterministische Qdrant point_id via SHA-256 → idempotente re-runs
  zonder duplicaten, resumable bij gedeeltelijke fout
- Chunking voor lange artikelen (>1800 chars) met 200-char overlap op
  paragraaf-grenzen (hard-split fallback), voorkomt informatieverlies
  voor lange wetsartikelen
- Retry + exponential backoff op Mistral en Meilisearch (4 pogingen,
  respecteert Retry-After header, netwerkfouten apart van HTTP-fouten)
- Mistral batch_size 10 → 32 (~3x minder API-calls voor ~400k chunks)
- Meilisearch buffered uploads (5000 docs/batch over regelingen heen)
- Progress logging met rate/ETA in beide indexers (elke 100 regelingen)
  + --limit flag voor test-runs op subset
- Nieuwe CLI: `wetgit reindex --target all|meili|qdrant [--limit N]`
This commit is contained in:
Coornhert 2026-04-21 21:13:31 +02:00
parent 2b0f858b46
commit 6a5bdf3f08
3 changed files with 356 additions and 106 deletions

View file

@ -10,10 +10,12 @@ Usage:
from __future__ import annotations
import hashlib
import json
import logging
import os
import re
import time
from pathlib import Path
import httpx
@ -26,6 +28,65 @@ QDRANT_URL = "http://127.0.0.1:6333"
COLLECTION = "wetgit_artikelen"
VECTOR_DIM = 1024 # mistral-embed output dimension
MISTRAL_BATCH_SIZE = 32
CHUNK_CHAR_LIMIT = 1800
CHUNK_OVERLAP = 200
RETRY_STATUS = {429, 500, 502, 503, 504}
RETRY_MAX_ATTEMPTS = 4
def _point_id(article_id: str) -> int:
"""Deterministische 63-bit point-ID uit een artikel-ID.
Why: Python's hash() is niet stabiel tussen processen (PYTHONHASHSEED).
Upsert-by-ID maakt idempotentie en resumability mogelijk.
"""
digest = hashlib.sha256(article_id.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big") >> 1 # 63 bits, past in Qdrant int ID
def _chunk_text(text: str, limit: int = CHUNK_CHAR_LIMIT, overlap: int = CHUNK_OVERLAP) -> list[str]:
"""Splits een artikel in overlappende chunks op paragraaf-grenzen waar mogelijk.
Why: `mistral-embed` accepteert ~8k tokens maar lange artikelen zonder chunking
verliezen detail. Overlap voorkomt dat concepten op chunk-grenzen vallen.
"""
if len(text) <= limit:
return [text]
chunks: list[str] = []
buf = ""
for para in text.split("\n\n"):
if len(buf) + len(para) + 2 <= limit:
buf = f"{buf}\n\n{para}" if buf else para
continue
if buf:
chunks.append(buf)
# Start volgende buffer met overlap-staart van vorige chunk
tail = buf[-overlap:] if overlap and len(buf) > overlap else ""
buf = f"{tail}\n\n{para}" if tail else para
else:
buf = para
# Paragraaf zelf te groot: hard-split
while len(buf) > limit:
chunks.append(buf[:limit])
buf = buf[limit - overlap:]
if buf:
chunks.append(buf)
return chunks
def _parse_retry_after(resp: httpx.Response) -> float | None:
"""Parse Retry-After header (seconds of HTTP-date)."""
val = resp.headers.get("retry-after")
if not val:
return None
try:
return float(val)
except ValueError:
return None # HTTP-date format — niet geparsed, val terug op backoff
class SemanticSearch:
"""Semantisch zoeken via Qdrant + Mistral embeddings."""
@ -59,9 +120,12 @@ class SemanticSearch:
logger.info("Collectie '%s' aangemaakt", COLLECTION)
def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int:
"""Indexeer alle artikelen van een regeling als embeddings."""
# Extraheer artikelen
articles: list[dict] = []
"""Indexeer alle artikelen van een regeling als embeddings.
Lange artikelen worden gesplitst in overlappende chunks; elke chunk
krijgt een eigen embedding en Qdrant-punt met deterministische ID.
"""
chunks: list[dict] = []
pattern = r"### Artikel (\S+)(.*?)(?=\n### Artikel |\n## |\Z)"
for match in re.finditer(pattern, tekst, re.DOTALL):
@ -70,58 +134,57 @@ class SemanticSearch:
if len(body) < 10:
continue
articles.append({
"id": f"{bwb_id}_art_{nummer}",
"nummer": nummer,
"tekst": body[:2000], # Beperk voor embedding
"bwb_id": bwb_id,
"regeling_titel": titel,
"type": type_,
})
for chunk_idx, chunk in enumerate(_chunk_text(body)):
chunk_id = f"{bwb_id}_art_{nummer}"
if chunk_idx > 0 or len(body) > CHUNK_CHAR_LIMIT:
chunk_id = f"{chunk_id}_chunk_{chunk_idx}"
chunks.append({
"id": chunk_id,
"nummer": nummer,
"tekst": chunk,
"bwb_id": bwb_id,
"regeling_titel": titel,
"type": type_,
})
if not articles:
if not chunks:
return 0
# Genereer embeddings in batches van 10
batch_size = 10
points: list[dict] = []
for i in range(0, len(articles), batch_size):
batch = articles[i:i + batch_size]
for i in range(0, len(chunks), MISTRAL_BATCH_SIZE):
batch = chunks[i:i + MISTRAL_BATCH_SIZE]
texts = [
f"{a['regeling_titel']} Artikel {a['nummer']}: {a['tekst']}"
for a in batch
f"{c['regeling_titel']} Artikel {c['nummer']}: {c['tekst']}"
for c in batch
]
embeddings = self._get_embeddings(texts)
if not embeddings:
logger.warning("Embedding mislukt voor batch %d", i)
logger.warning("Embeddings mislukt voor %s batch %d/%d", bwb_id, i, len(chunks))
continue
for j, (article, embedding) in enumerate(zip(batch, embeddings)):
point_id = abs(hash(article["id"])) % (2**63)
for chunk, embedding in zip(batch, embeddings):
points.append({
"id": point_id,
"id": _point_id(chunk["id"]),
"vector": embedding,
"payload": {
"article_id": article["id"],
"bwb_id": article["bwb_id"],
"regeling_titel": article["regeling_titel"],
"type": article["type"],
"artikel_nummer": article["nummer"],
"tekst": article["tekst"][:500],
"article_id": chunk["id"],
"bwb_id": chunk["bwb_id"],
"regeling_titel": chunk["regeling_titel"],
"type": chunk["type"],
"artikel_nummer": chunk["nummer"],
"tekst": chunk["tekst"][:500],
},
})
# Upload naar Qdrant
if points:
resp = httpx.put(
f"{self.qdrant_url}/collections/{COLLECTION}/points",
json={"points": points},
timeout=30,
timeout=60,
)
resp.raise_for_status()
logger.info("Geïndexeerd: %s%d artikelen", bwb_id, len(points))
logger.debug("Qdrant upsert %s%d punten", bwb_id, len(points))
return len(points)
@ -163,35 +226,67 @@ class SemanticSearch:
return False
def _get_embeddings(self, texts: list[str]) -> list[list[float]] | None:
"""Genereer embeddings via Mistral API."""
try:
transport = httpx.HTTPTransport(local_address="0.0.0.0")
with httpx.Client(transport=transport, timeout=30) as client:
resp = client.post(
MISTRAL_EMBED_URL,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": MISTRAL_EMBED_MODEL,
"input": texts,
},
)
resp.raise_for_status()
data = resp.json()
return [item["embedding"] for item in data["data"]]
except httpx.HTTPError as e:
logger.error("Mistral embedding fout: %s", e)
return None
"""Genereer embeddings via Mistral API met retry + exponential backoff.
Retries op 429/5xx en netwerkfouten; respecteert Retry-After header.
"""
transport = httpx.HTTPTransport(local_address="0.0.0.0")
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {"model": MISTRAL_EMBED_MODEL, "input": texts}
last_error: str = ""
with httpx.Client(transport=transport, timeout=60) as client:
for attempt in range(1, RETRY_MAX_ATTEMPTS + 1):
try:
resp = client.post(MISTRAL_EMBED_URL, headers=headers, json=payload)
if resp.status_code in RETRY_STATUS:
wait = _parse_retry_after(resp) or min(60, 2 ** attempt)
logger.warning(
"Mistral HTTP %d (poging %d/%d), backoff %.1fs",
resp.status_code, attempt, RETRY_MAX_ATTEMPTS, wait,
)
last_error = f"HTTP {resp.status_code}"
time.sleep(wait)
continue
resp.raise_for_status()
data = resp.json()
return [item["embedding"] for item in data["data"]]
except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e:
last_error = f"{type(e).__name__}: {e}"
wait = min(60, 2 ** attempt)
logger.warning(
"Mistral netwerkfout (poging %d/%d): %s — backoff %.1fs",
attempt, RETRY_MAX_ATTEMPTS, last_error, wait,
)
time.sleep(wait)
except httpx.HTTPStatusError as e:
logger.error("Mistral HTTP %d — niet-retry: %s", e.response.status_code, e)
return None
logger.error("Mistral embedding opgegeven na %d pogingen: %s", RETRY_MAX_ATTEMPTS, last_error)
return None
def index_repo(repo_path: Path, qdrant_url: str = QDRANT_URL) -> int:
"""Indexeer de hele repo in Qdrant."""
def index_repo(
repo_path: Path,
qdrant_url: str = QDRANT_URL,
limit: int | None = None,
progress_every: int = 100,
) -> int:
"""Indexeer de hele repo in Qdrant.
Args:
repo_path: Pad naar de wetgit/rijk repository.
qdrant_url: Qdrant endpoint.
limit: Max aantal regelingen (None = alle). Handig voor test-runs.
progress_every: Log voortgang elke N regelingen.
"""
search = SemanticSearch(qdrant_url=qdrant_url)
search.setup_collection()
total = 0
index_path = repo_path / "index.json"
if index_path.exists():
data = json.loads(index_path.read_text(encoding="utf-8"))
@ -200,21 +295,50 @@ def index_repo(repo_path: Path, qdrant_url: str = QDRANT_URL) -> int:
from wetgit.pipeline.indexer import generate_index
regelingen = generate_index(repo_path)
for regeling in regelingen:
if limit:
regelingen = regelingen[:limit]
total_regelingen = len(regelingen)
total_points = 0
skipped = 0
failed = 0
start = time.perf_counter()
logger.info("Start Qdrant indexering: %d regelingen", total_regelingen)
for idx, regeling in enumerate(regelingen, start=1):
md_path = repo_path / regeling["pad"] / "README.md"
if not md_path.exists():
skipped += 1
continue
tekst = md_path.read_text(encoding="utf-8")
count = search.index_regeling(
bwb_id=regeling["bwb_id"],
titel=regeling.get("titel", ""),
type_=regeling.get("type", ""),
tekst=tekst,
)
total += count
try:
tekst = md_path.read_text(encoding="utf-8")
count = search.index_regeling(
bwb_id=regeling["bwb_id"],
titel=regeling.get("titel", ""),
type_=regeling.get("type", ""),
tekst=tekst,
)
total_points += count
except Exception as e:
failed += 1
logger.error("Fout bij %s: %s", regeling.get("bwb_id", "?"), e)
logger.info("Totaal geïndexeerd: %d artikelen", total)
return total
if idx % progress_every == 0 or idx == total_regelingen:
elapsed = time.perf_counter() - start
rate = idx / elapsed if elapsed > 0 else 0
eta_sec = (total_regelingen - idx) / rate if rate > 0 else 0
logger.info(
"Qdrant %d/%d (%.1f%%) | %.1f reg/s | %d punten | skip=%d fail=%d | ETA %dm%02ds",
idx, total_regelingen, 100 * idx / total_regelingen,
rate, total_points, skipped, failed,
int(eta_sec // 60), int(eta_sec % 60),
)
logger.info(
"Qdrant indexering klaar: %d punten uit %d regelingen (skip=%d fail=%d, %.0fs)",
total_points, total_regelingen, skipped, failed, time.perf_counter() - start,
)
return total_points
if __name__ == "__main__":
@ -228,6 +352,8 @@ if __name__ == "__main__":
idx = sub.add_parser("index", help="Indexeer regelingen")
idx.add_argument("--repo", type=Path, required=True)
idx.add_argument("--qdrant-url", default=QDRANT_URL)
idx.add_argument("--limit", type=int, default=None, help="Max regelingen (test)")
idx.add_argument("--progress-every", type=int, default=100)
srch = sub.add_parser("search", help="Zoek semantisch")
srch.add_argument("query")
@ -237,8 +363,11 @@ if __name__ == "__main__":
args = parser.parse_args()
if args.command == "index":
total = index_repo(args.repo, args.qdrant_url)
print(f"Geïndexeerd: {total} artikelen")
total = index_repo(
args.repo, args.qdrant_url,
limit=args.limit, progress_every=args.progress_every,
)
print(f"Geïndexeerd: {total} punten")
elif args.command == "search":
s = SemanticSearch(qdrant_url=args.qdrant_url)
results = s.search(args.query, limit=args.limit)

View file

@ -8,6 +8,7 @@ from __future__ import annotations
import logging
import re
import time
from pathlib import Path
import httpx
@ -17,6 +18,10 @@ logger = logging.getLogger(__name__)
DEFAULT_MEILI_URL = "http://127.0.0.1:7700"
INDEX_NAME = "artikelen"
MEILI_BATCH_DOCS = 5000 # Documenten per POST (ca. 500 regelingen avg)
RETRY_STATUS = {429, 500, 502, 503, 504}
RETRY_MAX_ATTEMPTS = 4
class MeiliSearch:
"""Meilisearch client voor WetGit."""
@ -54,23 +59,17 @@ class MeiliSearch:
)
logger.info("Meilisearch index '%s' geconfigureerd", INDEX_NAME)
def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int:
"""Indexeer alle artikelen van een regeling.
Returns:
Aantal geïndexeerde artikelen.
"""
documents = []
def build_documents(self, bwb_id: str, titel: str, type_: str, tekst: str) -> list[dict]:
"""Bouw Meilisearch-documenten voor een regeling, zonder te uploaden."""
documents: list[dict] = []
pattern = r"### Artikel (\S+)(.*?)(?=\n### Artikel |\n## |\Z)"
for match in re.finditer(pattern, tekst, re.DOTALL):
nummer = match.group(1)
body = match.group(2).strip()
# Extraheer eventuele artikel-titel
artikel_titel = None
lines = body.split("\n")
for line in lines:
for line in body.split("\n"):
line = line.strip()
if line.startswith("*") and line.endswith("*") and not line.startswith("**"):
artikel_titel = line.strip("*").strip()
@ -85,16 +84,44 @@ class MeiliSearch:
"artikel_titel": artikel_titel,
"tekst": body,
})
return documents
if documents:
resp = httpx.post(
f"{self.url}/indexes/{INDEX_NAME}/documents",
json=documents,
headers=self.headers,
timeout=30,
)
logger.info("Geïndexeerd: %s%d artikelen", bwb_id, len(documents))
def upload_documents(self, documents: list[dict]) -> bool:
"""Upload een batch documenten met retry + backoff."""
if not documents:
return True
for attempt in range(1, RETRY_MAX_ATTEMPTS + 1):
try:
resp = httpx.post(
f"{self.url}/indexes/{INDEX_NAME}/documents",
json=documents,
headers=self.headers,
timeout=60,
)
if resp.status_code in RETRY_STATUS:
wait = min(60, 2 ** attempt)
logger.warning("Meili HTTP %d (poging %d/%d), backoff %.1fs",
resp.status_code, attempt, RETRY_MAX_ATTEMPTS, wait)
time.sleep(wait)
continue
resp.raise_for_status()
return True
except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e:
wait = min(60, 2 ** attempt)
logger.warning("Meili netwerkfout (poging %d/%d): %s — backoff %.1fs",
attempt, RETRY_MAX_ATTEMPTS, e, wait)
time.sleep(wait)
except httpx.HTTPStatusError as e:
logger.error("Meili HTTP %d — niet-retry: %s", e.response.status_code, e)
return False
logger.error("Meili upload opgegeven na %d pogingen (%d docs)", RETRY_MAX_ATTEMPTS, len(documents))
return False
def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int:
"""Indexeer alle artikelen van een regeling (single POST per regeling)."""
documents = self.build_documents(bwb_id, titel, type_, tekst)
if documents and self.upload_documents(documents):
logger.debug("Meili upload %s%d artikelen", bwb_id, len(documents))
return len(documents)
def search(
@ -133,20 +160,27 @@ class MeiliSearch:
return False
def index_repo(repo_path: Path, meili_url: str = DEFAULT_MEILI_URL) -> int:
"""Indexeer de hele repo in Meilisearch.
def index_repo(
repo_path: Path,
meili_url: str = DEFAULT_MEILI_URL,
limit: int | None = None,
progress_every: int = 100,
batch_docs: int = MEILI_BATCH_DOCS,
) -> int:
"""Indexeer de hele repo in Meilisearch met batching over regelingen.
Returns:
Totaal aantal geïndexeerde artikelen.
Args:
repo_path: Pad naar wetgit/rijk.
meili_url: Meilisearch endpoint.
limit: Max aantal regelingen (None = alle).
progress_every: Log voortgang elke N regelingen.
batch_docs: Documenten-buffer grootte voordat we flushen naar Meili.
"""
import json
meili = MeiliSearch(url=meili_url)
meili.setup_index()
total = 0
# Laad index.json
index_path = repo_path / "index.json"
if index_path.exists():
data = json.loads(index_path.read_text(encoding="utf-8"))
@ -155,23 +189,68 @@ def index_repo(repo_path: Path, meili_url: str = DEFAULT_MEILI_URL) -> int:
from wetgit.pipeline.indexer import generate_index
regelingen = generate_index(repo_path)
for regeling in regelingen:
if limit:
regelingen = regelingen[:limit]
total_regelingen = len(regelingen)
total_docs = 0
skipped = 0
failed = 0
buffer: list[dict] = []
start = time.perf_counter()
logger.info("Start Meilisearch indexering: %d regelingen", total_regelingen)
def flush() -> None:
nonlocal failed
if not buffer:
return
ok = meili.upload_documents(buffer)
if not ok:
failed_in_flush = len({d["bwb_id"] for d in buffer})
failed += failed_in_flush
buffer.clear()
for idx, regeling in enumerate(regelingen, start=1):
bwb_id = regeling["bwb_id"]
md_path = repo_path / regeling["pad"] / "README.md"
if not md_path.exists():
skipped += 1
continue
try:
tekst = md_path.read_text(encoding="utf-8")
docs = meili.build_documents(
bwb_id=bwb_id,
titel=regeling.get("titel", ""),
type_=regeling.get("type", ""),
tekst=tekst,
)
buffer.extend(docs)
total_docs += len(docs)
except Exception as e:
failed += 1
logger.error("Fout bij %s: %s", bwb_id, e)
tekst = md_path.read_text(encoding="utf-8")
count = meili.index_regeling(
bwb_id=bwb_id,
titel=regeling.get("titel", ""),
type_=regeling.get("type", ""),
tekst=tekst,
)
total += count
if len(buffer) >= batch_docs:
flush()
logger.info("Totaal geïndexeerd: %d artikelen", total)
return total
if idx % progress_every == 0 or idx == total_regelingen:
elapsed = time.perf_counter() - start
rate = idx / elapsed if elapsed > 0 else 0
eta_sec = (total_regelingen - idx) / rate if rate > 0 else 0
logger.info(
"Meili %d/%d (%.1f%%) | %.1f reg/s | %d docs | skip=%d fail=%d | ETA %dm%02ds",
idx, total_regelingen, 100 * idx / total_regelingen,
rate, total_docs, skipped, failed,
int(eta_sec // 60), int(eta_sec % 60),
)
flush()
logger.info(
"Meilisearch indexering klaar: %d docs uit %d regelingen (skip=%d fail=%d, %.0fs)",
total_docs, total_regelingen, skipped, failed, time.perf_counter() - start,
)
return total_docs
if __name__ == "__main__":
@ -182,7 +261,13 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(description="WetGit Meilisearch indexer")
parser.add_argument("--repo", type=Path, required=True)
parser.add_argument("--meili-url", default=DEFAULT_MEILI_URL)
parser.add_argument("--limit", type=int, default=None, help="Max regelingen (test)")
parser.add_argument("--progress-every", type=int, default=100)
parser.add_argument("--batch-docs", type=int, default=MEILI_BATCH_DOCS)
args = parser.parse_args()
total = index_repo(args.repo, args.meili_url)
total = index_repo(
args.repo, args.meili_url,
limit=args.limit, progress_every=args.progress_every, batch_docs=args.batch_docs,
)
print(f"Geïndexeerd: {total} artikelen")

View file

@ -132,6 +132,42 @@ def search(ctx: click.Context, query: str, reg_type: str | None) -> None:
console.print(f"\n{count} resultaten gevonden.")
@cli.command()
@click.option("--target", type=click.Choice(["meili", "qdrant", "all"]), default="all",
help="Welke index(en) (her)bouwen")
@click.option("--limit", type=int, default=None, help="Max regelingen (voor test-runs)")
@click.option("--progress-every", type=int, default=100)
@click.option("--meili-url", envvar="MEILI_URL", default="http://127.0.0.1:7700")
@click.option("--qdrant-url", envvar="QDRANT_URL", default="http://127.0.0.1:6333")
@click.pass_context
def reindex(
ctx: click.Context,
target: str,
limit: int | None,
progress_every: int,
meili_url: str,
qdrant_url: str,
) -> None:
"""(Her)indexeer alle regelingen in Meilisearch en/of Qdrant."""
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
repo = _find_repo(ctx.obj.get("repo"))
console.print(f"[bold]Repo:[/bold] {repo}")
if target in ("meili", "all"):
from wetgit.api.search import index_repo as meili_index
console.print(f"[cyan]→ Meilisearch indexing ({meili_url})[/cyan]")
n = meili_index(repo, meili_url=meili_url, limit=limit, progress_every=progress_every)
console.print(f"[green]✓ Meili: {n} documenten[/green]")
if target in ("qdrant", "all"):
from wetgit.ai.semantic import index_repo as qdrant_index
console.print(f"[cyan]→ Qdrant semantic indexing ({qdrant_url})[/cyan]")
n = qdrant_index(repo, qdrant_url=qdrant_url, limit=limit, progress_every=progress_every)
console.print(f"[green]✓ Qdrant: {n} punten[/green]")
@cli.command(name="log")
@click.argument("bwb_id")
@click.pass_context