Layer 1: Surface
Before a query reaches your LLM, it passes through a retrieval system. Before retrieval, the knowledge base must be populated. Before population, raw documents must be ingested, cleaned, chunked, and indexed. Every failure in retrieval quality has a root cause somewhere in that chain — and most of them are data problems, not model problems.
The AI data pipeline lifecycle:
| Stage | What happens | Common failure |
|---|---|---|
| Ingest | Pull documents from sources | Duplicate content, missing updates, encoding errors |
| Clean | Normalise format, strip noise | Boilerplate bleeds into chunks, OCR artifacts |
| Version | Track corpus state over time | No rollback when ingestion breaks retrieval |
| Chunk & embed | Prepare for retrieval | Chunk size mismatch with query distribution |
| Index | Write to vector store | Stale index after source update |
| Monitor | Detect drift and gaps | Unknown coverage holes, query miss rate creep |
A data contract is an explicit, machine-checkable agreement between a data producer and a data consumer about what the data will look like, how complete it will be, and how fresh it must be. Without contracts, data quality problems surface as mysterious retrieval failures.
Layer 2: Guided
Implementing a data contract
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
@dataclass
class DataContract:
source_name: str
max_age_hours: int
min_completeness: float # 0.0–1.0: fraction of expected fields present
required_fields: list[str]
def validate(self, record: dict[str, Any]) -> list[str]:
errors = []
# Freshness check
updated_at = record.get("updated_at")
if updated_at:
age = datetime.now(timezone.utc) - updated_at
if age.total_seconds() / 3600 > self.max_age_hours:
errors.append(
f"Record is {age.total_seconds() / 3600:.1f}h old, "
f"max allowed: {self.max_age_hours}h"
)
# Completeness check
present = sum(1 for f in self.required_fields if record.get(f))
completeness = present / len(self.required_fields)
if completeness < self.min_completeness:
errors.append(
f"Completeness {completeness:.0%} below threshold {self.min_completeness:.0%}"
)
return errors
contract = DataContract(
source_name="product_docs",
max_age_hours=24,
min_completeness=0.8,
required_fields=["title", "body", "updated_at", "category"],
)
def ingest_with_contract(
records: list[dict], contract: DataContract
) -> tuple[list[dict], list[dict]]:
valid, rejected = [], []
for record in records:
errors = contract.validate(record)
if errors:
record["_contract_errors"] = errors
rejected.append(record)
else:
valid.append(record)
return valid, rejected
Corpus versioning
Track each ingestion run as a versioned snapshot so you can roll back when a bad ingestion degrades retrieval:
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
class CorpusVersion:
def __init__(self, store_path: Path):
self.store_path = store_path
self.store_path.mkdir(parents=True, exist_ok=True)
def snapshot(self, run_id: str, stats: dict) -> str:
entry = {
"run_id": run_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"document_count": stats["document_count"],
"chunk_count": stats["chunk_count"],
"rejected_count": stats["rejected_count"],
"content_hash": self._hash(stats),
}
path = self.store_path / f"{run_id}.json"
path.write_text(json.dumps(entry, indent=2))
return entry["content_hash"]
def _hash(self, stats: dict) -> str:
return hashlib.sha256(json.dumps(stats, sort_keys=True).encode()).hexdigest()[:16]
def latest(self) -> dict | None:
snapshots = sorted(self.store_path.glob("*.json"))
if not snapshots:
return None
return json.loads(snapshots[-1].read_text())
Query miss rate monitoring
The single most important signal for corpus quality is how often retrieval returns nothing useful:
from collections import defaultdict
from datetime import datetime, timezone
class RetrievalMonitor:
def __init__(self, miss_threshold: float = 0.05):
self.miss_threshold = miss_threshold
self._window: list[tuple[datetime, bool]] = []
def record(self, query: str, top_score: float, min_score: float = 0.70):
is_miss = top_score < min_score
self._window.append((datetime.now(timezone.utc), is_miss))
self._prune(hours=1)
def miss_rate(self) -> float:
if not self._window:
return 0.0
misses = sum(1 for _, is_miss in self._window if is_miss)
return misses / len(self._window)
def alert_if_degraded(self) -> str | None:
rate = self.miss_rate()
if rate > self.miss_threshold:
return (
f"Miss rate {rate:.1%} exceeds threshold {self.miss_threshold:.1%}. "
f"Check corpus freshness and coverage gaps."
)
return None
def _prune(self, hours: int):
cutoff = datetime.now(timezone.utc).timestamp() - hours * 3600
self._window = [
(ts, miss) for ts, miss in self._window
if ts.timestamp() > cutoff
]
Before vs. after: uncontrolled ingestion
Before — no contracts, no monitoring:
def ingest(docs):
for doc in docs:
chunks = chunk(doc["body"])
embed_and_store(chunks)
# No version tracking. No freshness check. No miss rate.
# When retrieval degrades, you find out from users.
After — contracted ingestion:
def ingest(docs, contract: DataContract, versioner: CorpusVersion):
valid, rejected = ingest_with_contract(docs, contract)
for doc in valid:
chunks = chunk(doc["body"])
embed_and_store(chunks)
run_stats = {
"document_count": len(valid),
"chunk_count": sum(len(chunk(d["body"])) for d in valid),
"rejected_count": len(rejected),
}
versioner.snapshot(run_id=f"run_{int(datetime.now().timestamp())}", stats=run_stats)
if rejected:
alert_on_rejected(rejected)
Layer 3: Deep Dive
Why AI data contracts are different from database schemas
Database schemas validate structure. A JSON schema tells you a field is present and typed correctly. A data contract for AI goes further:
- Semantic coherence: Is the content internally consistent? A product description that contradicts the product title will confuse retrieval even if both fields are present and non-null.
- Freshness relevance: For a knowledge base over live data, a document updated 90 days ago may be more harmful than no document at all — it competes with fresher content and can win retrieval contests.
- Coverage intent: Does the corpus cover the query distribution your users actually have? A corpus that answers 95% of training queries but 40% of real queries has a coverage gap that schema validation cannot detect.
The Great Expectations framework brings this discipline to data pipelines. Its “expectations” are executable assertions about data quality — not just schema, but distributions, freshness, and cross-field consistency.
Production failure taxonomy
Coverage drift: The query distribution shifts (new product launched, regulation changed) while the corpus stays static. Miss rate climbs but everything else looks healthy. Detection: track query topics over time and compare against corpus coverage.
Ingestion regression: A pipeline change silently drops a document class (e.g., PDFs that fail to parse). The corpus shrinks but chunk count per document looks normal. Detection: track document count per source separately from chunk count.
Boilerplate contamination: Legal disclaimers, nav menus, and cookie banners appear in chunks because HTML cleaning was imperfect. These chunks win retrieval for generic queries and displace relevant content. Detection: flag chunks with entropy below a threshold — boilerplate has low lexical diversity.
Stale embedding model mismatch: Corpus was embedded with model version A; new queries are embedded with model version B after a library upgrade. Semantic similarity scores degrade silently. Detection: version your embedding model alongside your corpus and validate consistency before deploying a new model.
Circular synthetic contamination: LLM-generated summaries are ingested alongside source documents. The model then retrieves its own outputs as context and amplifies whatever biases or errors were in those summaries. Detection: tag synthetic content and track retrieval frequency per content type.
Unstructured ETL at scale
For large corpora, parsing performance matters. Key tools:
- Unstructured.io — handles PDF, DOCX, HTML, PPTX with layout-aware chunking; designed for AI pipelines
- Apache Tika — broad format support via JVM; useful for legacy enterprise formats
- LlamaParse — LLM-powered parsing for complex PDFs with tables and figures
For freshness tracking at scale, maintain a document fingerprint store — a hash of each document’s content at last ingestion. On each run, compare current hashes against stored hashes. Only re-embed documents that changed. This reduces embedding cost by 80–90% on mature corpora where most content is stable.
Further reading
- Data Management for Machine Learning Systems; Shankar et al., 2023. Taxonomy of data management challenges specific to ML pipelines; the “data cascades” concept applies directly to RAG corpus quality.
- Great Expectations documentation; Superconductive, 2023. Practical reference for data contract implementation; the “expectation suite” pattern maps cleanly to AI pipeline contracts.
- The Data Quality Imperative; Batini et al., 2024. Survey of data quality dimensions and measurement methods; theoretical foundation for completeness and consistency metrics.