From 6a5bdf3f088f4d3865a798b98afb914691e86279 Mon Sep 17 00:00:00 2001 From: Coornhert Date: Tue, 21 Apr 2026 21:13:31 +0200 Subject: [PATCH] feat: indexers productieklaar voor volledige 40k-regelingen run MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kwaliteits- en betrouwbaarheidsverbeteringen zodat een full re-index van alle Nederlandse regelingen veilig en observeerbaar draait. - Deterministische Qdrant point_id via SHA-256 → idempotente re-runs zonder duplicaten, resumable bij gedeeltelijke fout - Chunking voor lange artikelen (>1800 chars) met 200-char overlap op paragraaf-grenzen (hard-split fallback), voorkomt informatieverlies voor lange wetsartikelen - Retry + exponential backoff op Mistral en Meilisearch (4 pogingen, respecteert Retry-After header, netwerkfouten apart van HTTP-fouten) - Mistral batch_size 10 → 32 (~3x minder API-calls voor ~400k chunks) - Meilisearch buffered uploads (5000 docs/batch over regelingen heen) - Progress logging met rate/ETA in beide indexers (elke 100 regelingen) + --limit flag voor test-runs op subset - Nieuwe CLI: `wetgit reindex --target all|meili|qdrant [--limit N]` --- src/wetgit/ai/semantic.py | 267 ++++++++++++++++++++++++++++---------- src/wetgit/api/search.py | 159 +++++++++++++++++------ src/wetgit/cli/main.py | 36 +++++ 3 files changed, 356 insertions(+), 106 deletions(-) diff --git a/src/wetgit/ai/semantic.py b/src/wetgit/ai/semantic.py index 3234c2c..5639196 100644 --- a/src/wetgit/ai/semantic.py +++ b/src/wetgit/ai/semantic.py @@ -10,10 +10,12 @@ Usage: from __future__ import annotations +import hashlib import json import logging import os import re +import time from pathlib import Path import httpx @@ -26,6 +28,65 @@ QDRANT_URL = "http://127.0.0.1:6333" COLLECTION = "wetgit_artikelen" VECTOR_DIM = 1024 # mistral-embed output dimension +MISTRAL_BATCH_SIZE = 32 +CHUNK_CHAR_LIMIT = 1800 +CHUNK_OVERLAP = 200 + +RETRY_STATUS = {429, 500, 502, 503, 504} +RETRY_MAX_ATTEMPTS = 4 + + +def _point_id(article_id: str) -> int: + """Deterministische 63-bit point-ID uit een artikel-ID. + + Why: Python's hash() is niet stabiel tussen processen (PYTHONHASHSEED). + Upsert-by-ID maakt idempotentie en resumability mogelijk. + """ + digest = hashlib.sha256(article_id.encode("utf-8")).digest() + return int.from_bytes(digest[:8], "big") >> 1 # 63 bits, past in Qdrant int ID + + +def _chunk_text(text: str, limit: int = CHUNK_CHAR_LIMIT, overlap: int = CHUNK_OVERLAP) -> list[str]: + """Splits een artikel in overlappende chunks op paragraaf-grenzen waar mogelijk. + + Why: `mistral-embed` accepteert ~8k tokens maar lange artikelen zonder chunking + verliezen detail. Overlap voorkomt dat concepten op chunk-grenzen vallen. + """ + if len(text) <= limit: + return [text] + + chunks: list[str] = [] + buf = "" + for para in text.split("\n\n"): + if len(buf) + len(para) + 2 <= limit: + buf = f"{buf}\n\n{para}" if buf else para + continue + if buf: + chunks.append(buf) + # Start volgende buffer met overlap-staart van vorige chunk + tail = buf[-overlap:] if overlap and len(buf) > overlap else "" + buf = f"{tail}\n\n{para}" if tail else para + else: + buf = para + # Paragraaf zelf te groot: hard-split + while len(buf) > limit: + chunks.append(buf[:limit]) + buf = buf[limit - overlap:] + if buf: + chunks.append(buf) + return chunks + + +def _parse_retry_after(resp: httpx.Response) -> float | None: + """Parse Retry-After header (seconds of HTTP-date).""" + val = resp.headers.get("retry-after") + if not val: + return None + try: + return float(val) + except ValueError: + return None # HTTP-date format — niet geparsed, val terug op backoff + class SemanticSearch: """Semantisch zoeken via Qdrant + Mistral embeddings.""" @@ -59,9 +120,12 @@ class SemanticSearch: logger.info("Collectie '%s' aangemaakt", COLLECTION) def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int: - """Indexeer alle artikelen van een regeling als embeddings.""" - # Extraheer artikelen - articles: list[dict] = [] + """Indexeer alle artikelen van een regeling als embeddings. + + Lange artikelen worden gesplitst in overlappende chunks; elke chunk + krijgt een eigen embedding en Qdrant-punt met deterministische ID. + """ + chunks: list[dict] = [] pattern = r"### Artikel (\S+)(.*?)(?=\n### Artikel |\n## |\Z)" for match in re.finditer(pattern, tekst, re.DOTALL): @@ -70,58 +134,57 @@ class SemanticSearch: if len(body) < 10: continue - articles.append({ - "id": f"{bwb_id}_art_{nummer}", - "nummer": nummer, - "tekst": body[:2000], # Beperk voor embedding - "bwb_id": bwb_id, - "regeling_titel": titel, - "type": type_, - }) + for chunk_idx, chunk in enumerate(_chunk_text(body)): + chunk_id = f"{bwb_id}_art_{nummer}" + if chunk_idx > 0 or len(body) > CHUNK_CHAR_LIMIT: + chunk_id = f"{chunk_id}_chunk_{chunk_idx}" + chunks.append({ + "id": chunk_id, + "nummer": nummer, + "tekst": chunk, + "bwb_id": bwb_id, + "regeling_titel": titel, + "type": type_, + }) - if not articles: + if not chunks: return 0 - # Genereer embeddings in batches van 10 - batch_size = 10 points: list[dict] = [] - - for i in range(0, len(articles), batch_size): - batch = articles[i:i + batch_size] + for i in range(0, len(chunks), MISTRAL_BATCH_SIZE): + batch = chunks[i:i + MISTRAL_BATCH_SIZE] texts = [ - f"{a['regeling_titel']} Artikel {a['nummer']}: {a['tekst']}" - for a in batch + f"{c['regeling_titel']} Artikel {c['nummer']}: {c['tekst']}" + for c in batch ] embeddings = self._get_embeddings(texts) if not embeddings: - logger.warning("Embedding mislukt voor batch %d", i) + logger.warning("Embeddings mislukt voor %s batch %d/%d", bwb_id, i, len(chunks)) continue - for j, (article, embedding) in enumerate(zip(batch, embeddings)): - point_id = abs(hash(article["id"])) % (2**63) + for chunk, embedding in zip(batch, embeddings): points.append({ - "id": point_id, + "id": _point_id(chunk["id"]), "vector": embedding, "payload": { - "article_id": article["id"], - "bwb_id": article["bwb_id"], - "regeling_titel": article["regeling_titel"], - "type": article["type"], - "artikel_nummer": article["nummer"], - "tekst": article["tekst"][:500], + "article_id": chunk["id"], + "bwb_id": chunk["bwb_id"], + "regeling_titel": chunk["regeling_titel"], + "type": chunk["type"], + "artikel_nummer": chunk["nummer"], + "tekst": chunk["tekst"][:500], }, }) - # Upload naar Qdrant if points: resp = httpx.put( f"{self.qdrant_url}/collections/{COLLECTION}/points", json={"points": points}, - timeout=30, + timeout=60, ) resp.raise_for_status() - logger.info("Geïndexeerd: %s — %d artikelen", bwb_id, len(points)) + logger.debug("Qdrant upsert %s — %d punten", bwb_id, len(points)) return len(points) @@ -163,35 +226,67 @@ class SemanticSearch: return False def _get_embeddings(self, texts: list[str]) -> list[list[float]] | None: - """Genereer embeddings via Mistral API.""" - try: - transport = httpx.HTTPTransport(local_address="0.0.0.0") - with httpx.Client(transport=transport, timeout=30) as client: - resp = client.post( - MISTRAL_EMBED_URL, - headers={ - "Authorization": f"Bearer {self.api_key}", - "Content-Type": "application/json", - }, - json={ - "model": MISTRAL_EMBED_MODEL, - "input": texts, - }, - ) - resp.raise_for_status() - data = resp.json() - return [item["embedding"] for item in data["data"]] - except httpx.HTTPError as e: - logger.error("Mistral embedding fout: %s", e) - return None + """Genereer embeddings via Mistral API met retry + exponential backoff. + + Retries op 429/5xx en netwerkfouten; respecteert Retry-After header. + """ + transport = httpx.HTTPTransport(local_address="0.0.0.0") + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload = {"model": MISTRAL_EMBED_MODEL, "input": texts} + + last_error: str = "" + with httpx.Client(transport=transport, timeout=60) as client: + for attempt in range(1, RETRY_MAX_ATTEMPTS + 1): + try: + resp = client.post(MISTRAL_EMBED_URL, headers=headers, json=payload) + if resp.status_code in RETRY_STATUS: + wait = _parse_retry_after(resp) or min(60, 2 ** attempt) + logger.warning( + "Mistral HTTP %d (poging %d/%d), backoff %.1fs", + resp.status_code, attempt, RETRY_MAX_ATTEMPTS, wait, + ) + last_error = f"HTTP {resp.status_code}" + time.sleep(wait) + continue + resp.raise_for_status() + data = resp.json() + return [item["embedding"] for item in data["data"]] + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + last_error = f"{type(e).__name__}: {e}" + wait = min(60, 2 ** attempt) + logger.warning( + "Mistral netwerkfout (poging %d/%d): %s — backoff %.1fs", + attempt, RETRY_MAX_ATTEMPTS, last_error, wait, + ) + time.sleep(wait) + except httpx.HTTPStatusError as e: + logger.error("Mistral HTTP %d — niet-retry: %s", e.response.status_code, e) + return None + + logger.error("Mistral embedding opgegeven na %d pogingen: %s", RETRY_MAX_ATTEMPTS, last_error) + return None -def index_repo(repo_path: Path, qdrant_url: str = QDRANT_URL) -> int: - """Indexeer de hele repo in Qdrant.""" +def index_repo( + repo_path: Path, + qdrant_url: str = QDRANT_URL, + limit: int | None = None, + progress_every: int = 100, +) -> int: + """Indexeer de hele repo in Qdrant. + + Args: + repo_path: Pad naar de wetgit/rijk repository. + qdrant_url: Qdrant endpoint. + limit: Max aantal regelingen (None = alle). Handig voor test-runs. + progress_every: Log voortgang elke N regelingen. + """ search = SemanticSearch(qdrant_url=qdrant_url) search.setup_collection() - total = 0 index_path = repo_path / "index.json" if index_path.exists(): data = json.loads(index_path.read_text(encoding="utf-8")) @@ -200,21 +295,50 @@ def index_repo(repo_path: Path, qdrant_url: str = QDRANT_URL) -> int: from wetgit.pipeline.indexer import generate_index regelingen = generate_index(repo_path) - for regeling in regelingen: + if limit: + regelingen = regelingen[:limit] + + total_regelingen = len(regelingen) + total_points = 0 + skipped = 0 + failed = 0 + start = time.perf_counter() + logger.info("Start Qdrant indexering: %d regelingen", total_regelingen) + + for idx, regeling in enumerate(regelingen, start=1): md_path = repo_path / regeling["pad"] / "README.md" if not md_path.exists(): + skipped += 1 continue - tekst = md_path.read_text(encoding="utf-8") - count = search.index_regeling( - bwb_id=regeling["bwb_id"], - titel=regeling.get("titel", ""), - type_=regeling.get("type", ""), - tekst=tekst, - ) - total += count + try: + tekst = md_path.read_text(encoding="utf-8") + count = search.index_regeling( + bwb_id=regeling["bwb_id"], + titel=regeling.get("titel", ""), + type_=regeling.get("type", ""), + tekst=tekst, + ) + total_points += count + except Exception as e: + failed += 1 + logger.error("Fout bij %s: %s", regeling.get("bwb_id", "?"), e) - logger.info("Totaal geïndexeerd: %d artikelen", total) - return total + if idx % progress_every == 0 or idx == total_regelingen: + elapsed = time.perf_counter() - start + rate = idx / elapsed if elapsed > 0 else 0 + eta_sec = (total_regelingen - idx) / rate if rate > 0 else 0 + logger.info( + "Qdrant %d/%d (%.1f%%) | %.1f reg/s | %d punten | skip=%d fail=%d | ETA %dm%02ds", + idx, total_regelingen, 100 * idx / total_regelingen, + rate, total_points, skipped, failed, + int(eta_sec // 60), int(eta_sec % 60), + ) + + logger.info( + "Qdrant indexering klaar: %d punten uit %d regelingen (skip=%d fail=%d, %.0fs)", + total_points, total_regelingen, skipped, failed, time.perf_counter() - start, + ) + return total_points if __name__ == "__main__": @@ -228,6 +352,8 @@ if __name__ == "__main__": idx = sub.add_parser("index", help="Indexeer regelingen") idx.add_argument("--repo", type=Path, required=True) idx.add_argument("--qdrant-url", default=QDRANT_URL) + idx.add_argument("--limit", type=int, default=None, help="Max regelingen (test)") + idx.add_argument("--progress-every", type=int, default=100) srch = sub.add_parser("search", help="Zoek semantisch") srch.add_argument("query") @@ -237,8 +363,11 @@ if __name__ == "__main__": args = parser.parse_args() if args.command == "index": - total = index_repo(args.repo, args.qdrant_url) - print(f"Geïndexeerd: {total} artikelen") + total = index_repo( + args.repo, args.qdrant_url, + limit=args.limit, progress_every=args.progress_every, + ) + print(f"Geïndexeerd: {total} punten") elif args.command == "search": s = SemanticSearch(qdrant_url=args.qdrant_url) results = s.search(args.query, limit=args.limit) diff --git a/src/wetgit/api/search.py b/src/wetgit/api/search.py index d2c55f8..43805d5 100644 --- a/src/wetgit/api/search.py +++ b/src/wetgit/api/search.py @@ -8,6 +8,7 @@ from __future__ import annotations import logging import re +import time from pathlib import Path import httpx @@ -17,6 +18,10 @@ logger = logging.getLogger(__name__) DEFAULT_MEILI_URL = "http://127.0.0.1:7700" INDEX_NAME = "artikelen" +MEILI_BATCH_DOCS = 5000 # Documenten per POST (ca. 500 regelingen avg) +RETRY_STATUS = {429, 500, 502, 503, 504} +RETRY_MAX_ATTEMPTS = 4 + class MeiliSearch: """Meilisearch client voor WetGit.""" @@ -54,23 +59,17 @@ class MeiliSearch: ) logger.info("Meilisearch index '%s' geconfigureerd", INDEX_NAME) - def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int: - """Indexeer alle artikelen van een regeling. - - Returns: - Aantal geïndexeerde artikelen. - """ - documents = [] + def build_documents(self, bwb_id: str, titel: str, type_: str, tekst: str) -> list[dict]: + """Bouw Meilisearch-documenten voor een regeling, zonder te uploaden.""" + documents: list[dict] = [] pattern = r"### Artikel (\S+)(.*?)(?=\n### Artikel |\n## |\Z)" for match in re.finditer(pattern, tekst, re.DOTALL): nummer = match.group(1) body = match.group(2).strip() - # Extraheer eventuele artikel-titel artikel_titel = None - lines = body.split("\n") - for line in lines: + for line in body.split("\n"): line = line.strip() if line.startswith("*") and line.endswith("*") and not line.startswith("**"): artikel_titel = line.strip("*").strip() @@ -85,16 +84,44 @@ class MeiliSearch: "artikel_titel": artikel_titel, "tekst": body, }) + return documents - if documents: - resp = httpx.post( - f"{self.url}/indexes/{INDEX_NAME}/documents", - json=documents, - headers=self.headers, - timeout=30, - ) - logger.info("Geïndexeerd: %s — %d artikelen", bwb_id, len(documents)) + def upload_documents(self, documents: list[dict]) -> bool: + """Upload een batch documenten met retry + backoff.""" + if not documents: + return True + for attempt in range(1, RETRY_MAX_ATTEMPTS + 1): + try: + resp = httpx.post( + f"{self.url}/indexes/{INDEX_NAME}/documents", + json=documents, + headers=self.headers, + timeout=60, + ) + if resp.status_code in RETRY_STATUS: + wait = min(60, 2 ** attempt) + logger.warning("Meili HTTP %d (poging %d/%d), backoff %.1fs", + resp.status_code, attempt, RETRY_MAX_ATTEMPTS, wait) + time.sleep(wait) + continue + resp.raise_for_status() + return True + except (httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError) as e: + wait = min(60, 2 ** attempt) + logger.warning("Meili netwerkfout (poging %d/%d): %s — backoff %.1fs", + attempt, RETRY_MAX_ATTEMPTS, e, wait) + time.sleep(wait) + except httpx.HTTPStatusError as e: + logger.error("Meili HTTP %d — niet-retry: %s", e.response.status_code, e) + return False + logger.error("Meili upload opgegeven na %d pogingen (%d docs)", RETRY_MAX_ATTEMPTS, len(documents)) + return False + def index_regeling(self, bwb_id: str, titel: str, type_: str, tekst: str) -> int: + """Indexeer alle artikelen van een regeling (single POST per regeling).""" + documents = self.build_documents(bwb_id, titel, type_, tekst) + if documents and self.upload_documents(documents): + logger.debug("Meili upload %s — %d artikelen", bwb_id, len(documents)) return len(documents) def search( @@ -133,20 +160,27 @@ class MeiliSearch: return False -def index_repo(repo_path: Path, meili_url: str = DEFAULT_MEILI_URL) -> int: - """Indexeer de hele repo in Meilisearch. +def index_repo( + repo_path: Path, + meili_url: str = DEFAULT_MEILI_URL, + limit: int | None = None, + progress_every: int = 100, + batch_docs: int = MEILI_BATCH_DOCS, +) -> int: + """Indexeer de hele repo in Meilisearch met batching over regelingen. - Returns: - Totaal aantal geïndexeerde artikelen. + Args: + repo_path: Pad naar wetgit/rijk. + meili_url: Meilisearch endpoint. + limit: Max aantal regelingen (None = alle). + progress_every: Log voortgang elke N regelingen. + batch_docs: Documenten-buffer grootte voordat we flushen naar Meili. """ import json meili = MeiliSearch(url=meili_url) meili.setup_index() - total = 0 - - # Laad index.json index_path = repo_path / "index.json" if index_path.exists(): data = json.loads(index_path.read_text(encoding="utf-8")) @@ -155,23 +189,68 @@ def index_repo(repo_path: Path, meili_url: str = DEFAULT_MEILI_URL) -> int: from wetgit.pipeline.indexer import generate_index regelingen = generate_index(repo_path) - for regeling in regelingen: + if limit: + regelingen = regelingen[:limit] + + total_regelingen = len(regelingen) + total_docs = 0 + skipped = 0 + failed = 0 + buffer: list[dict] = [] + start = time.perf_counter() + logger.info("Start Meilisearch indexering: %d regelingen", total_regelingen) + + def flush() -> None: + nonlocal failed + if not buffer: + return + ok = meili.upload_documents(buffer) + if not ok: + failed_in_flush = len({d["bwb_id"] for d in buffer}) + failed += failed_in_flush + buffer.clear() + + for idx, regeling in enumerate(regelingen, start=1): bwb_id = regeling["bwb_id"] md_path = repo_path / regeling["pad"] / "README.md" if not md_path.exists(): + skipped += 1 continue + try: + tekst = md_path.read_text(encoding="utf-8") + docs = meili.build_documents( + bwb_id=bwb_id, + titel=regeling.get("titel", ""), + type_=regeling.get("type", ""), + tekst=tekst, + ) + buffer.extend(docs) + total_docs += len(docs) + except Exception as e: + failed += 1 + logger.error("Fout bij %s: %s", bwb_id, e) - tekst = md_path.read_text(encoding="utf-8") - count = meili.index_regeling( - bwb_id=bwb_id, - titel=regeling.get("titel", ""), - type_=regeling.get("type", ""), - tekst=tekst, - ) - total += count + if len(buffer) >= batch_docs: + flush() - logger.info("Totaal geïndexeerd: %d artikelen", total) - return total + if idx % progress_every == 0 or idx == total_regelingen: + elapsed = time.perf_counter() - start + rate = idx / elapsed if elapsed > 0 else 0 + eta_sec = (total_regelingen - idx) / rate if rate > 0 else 0 + logger.info( + "Meili %d/%d (%.1f%%) | %.1f reg/s | %d docs | skip=%d fail=%d | ETA %dm%02ds", + idx, total_regelingen, 100 * idx / total_regelingen, + rate, total_docs, skipped, failed, + int(eta_sec // 60), int(eta_sec % 60), + ) + + flush() + + logger.info( + "Meilisearch indexering klaar: %d docs uit %d regelingen (skip=%d fail=%d, %.0fs)", + total_docs, total_regelingen, skipped, failed, time.perf_counter() - start, + ) + return total_docs if __name__ == "__main__": @@ -182,7 +261,13 @@ if __name__ == "__main__": parser = argparse.ArgumentParser(description="WetGit Meilisearch indexer") parser.add_argument("--repo", type=Path, required=True) parser.add_argument("--meili-url", default=DEFAULT_MEILI_URL) + parser.add_argument("--limit", type=int, default=None, help="Max regelingen (test)") + parser.add_argument("--progress-every", type=int, default=100) + parser.add_argument("--batch-docs", type=int, default=MEILI_BATCH_DOCS) args = parser.parse_args() - total = index_repo(args.repo, args.meili_url) + total = index_repo( + args.repo, args.meili_url, + limit=args.limit, progress_every=args.progress_every, batch_docs=args.batch_docs, + ) print(f"Geïndexeerd: {total} artikelen") diff --git a/src/wetgit/cli/main.py b/src/wetgit/cli/main.py index 22b4e3b..2b81a76 100644 --- a/src/wetgit/cli/main.py +++ b/src/wetgit/cli/main.py @@ -132,6 +132,42 @@ def search(ctx: click.Context, query: str, reg_type: str | None) -> None: console.print(f"\n{count} resultaten gevonden.") +@cli.command() +@click.option("--target", type=click.Choice(["meili", "qdrant", "all"]), default="all", + help="Welke index(en) (her)bouwen") +@click.option("--limit", type=int, default=None, help="Max regelingen (voor test-runs)") +@click.option("--progress-every", type=int, default=100) +@click.option("--meili-url", envvar="MEILI_URL", default="http://127.0.0.1:7700") +@click.option("--qdrant-url", envvar="QDRANT_URL", default="http://127.0.0.1:6333") +@click.pass_context +def reindex( + ctx: click.Context, + target: str, + limit: int | None, + progress_every: int, + meili_url: str, + qdrant_url: str, +) -> None: + """(Her)indexeer alle regelingen in Meilisearch en/of Qdrant.""" + import logging + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") + + repo = _find_repo(ctx.obj.get("repo")) + console.print(f"[bold]Repo:[/bold] {repo}") + + if target in ("meili", "all"): + from wetgit.api.search import index_repo as meili_index + console.print(f"[cyan]→ Meilisearch indexing ({meili_url})[/cyan]") + n = meili_index(repo, meili_url=meili_url, limit=limit, progress_every=progress_every) + console.print(f"[green]✓ Meili: {n} documenten[/green]") + + if target in ("qdrant", "all"): + from wetgit.ai.semantic import index_repo as qdrant_index + console.print(f"[cyan]→ Qdrant semantic indexing ({qdrant_url})[/cyan]") + n = qdrant_index(repo, qdrant_url=qdrant_url, limit=limit, progress_every=progress_every) + console.print(f"[green]✓ Qdrant: {n} punten[/green]") + + @cli.command(name="log") @click.argument("bwb_id") @click.pass_context