🤖 AI Explained
Emerging area 6 min read

Data Engineering for AI Systems

Most AI failures blamed on the model are actually data quality failures upstream. This module covers corpus lifecycle management, data contracts for AI pipelines, and the ingestion patterns that determine whether your RAG system retrieves signal or noise.

Layer 1: Surface

Before a query reaches your LLM, it passes through a retrieval system. Before retrieval, the knowledge base must be populated. Before population, raw documents must be ingested, cleaned, chunked, and indexed. Every failure in retrieval quality has a root cause somewhere in that chain — and most of them are data problems, not model problems.

The AI data pipeline lifecycle:

StageWhat happensCommon failure
IngestPull documents from sourcesDuplicate content, missing updates, encoding errors
CleanNormalise format, strip noiseBoilerplate bleeds into chunks, OCR artifacts
VersionTrack corpus state over timeNo rollback when ingestion breaks retrieval
Chunk & embedPrepare for retrievalChunk size mismatch with query distribution
IndexWrite to vector storeStale index after source update
MonitorDetect drift and gapsUnknown coverage holes, query miss rate creep

A data contract is an explicit, machine-checkable agreement between a data producer and a data consumer about what the data will look like, how complete it will be, and how fresh it must be. Without contracts, data quality problems surface as mysterious retrieval failures.


Layer 2: Guided

Implementing a data contract

from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any

@dataclass
class DataContract:
    source_name: str
    max_age_hours: int
    min_completeness: float  # 0.0–1.0: fraction of expected fields present
    required_fields: list[str]

    def validate(self, record: dict[str, Any]) -> list[str]:
        errors = []

        # Freshness check
        updated_at = record.get("updated_at")
        if updated_at:
            age = datetime.now(timezone.utc) - updated_at
            if age.total_seconds() / 3600 > self.max_age_hours:
                errors.append(
                    f"Record is {age.total_seconds() / 3600:.1f}h old, "
                    f"max allowed: {self.max_age_hours}h"
                )

        # Completeness check
        present = sum(1 for f in self.required_fields if record.get(f))
        completeness = present / len(self.required_fields)
        if completeness < self.min_completeness:
            errors.append(
                f"Completeness {completeness:.0%} below threshold {self.min_completeness:.0%}"
            )

        return errors


contract = DataContract(
    source_name="product_docs",
    max_age_hours=24,
    min_completeness=0.8,
    required_fields=["title", "body", "updated_at", "category"],
)


def ingest_with_contract(
    records: list[dict], contract: DataContract
) -> tuple[list[dict], list[dict]]:
    valid, rejected = [], []
    for record in records:
        errors = contract.validate(record)
        if errors:
            record["_contract_errors"] = errors
            rejected.append(record)
        else:
            valid.append(record)
    return valid, rejected

Corpus versioning

Track each ingestion run as a versioned snapshot so you can roll back when a bad ingestion degrades retrieval:

import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path

class CorpusVersion:
    def __init__(self, store_path: Path):
        self.store_path = store_path
        self.store_path.mkdir(parents=True, exist_ok=True)

    def snapshot(self, run_id: str, stats: dict) -> str:
        entry = {
            "run_id": run_id,
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "document_count": stats["document_count"],
            "chunk_count": stats["chunk_count"],
            "rejected_count": stats["rejected_count"],
            "content_hash": self._hash(stats),
        }
        path = self.store_path / f"{run_id}.json"
        path.write_text(json.dumps(entry, indent=2))
        return entry["content_hash"]

    def _hash(self, stats: dict) -> str:
        return hashlib.sha256(json.dumps(stats, sort_keys=True).encode()).hexdigest()[:16]

    def latest(self) -> dict | None:
        snapshots = sorted(self.store_path.glob("*.json"))
        if not snapshots:
            return None
        return json.loads(snapshots[-1].read_text())

Query miss rate monitoring

The single most important signal for corpus quality is how often retrieval returns nothing useful:

from collections import defaultdict
from datetime import datetime, timezone

class RetrievalMonitor:
    def __init__(self, miss_threshold: float = 0.05):
        self.miss_threshold = miss_threshold
        self._window: list[tuple[datetime, bool]] = []

    def record(self, query: str, top_score: float, min_score: float = 0.70):
        is_miss = top_score < min_score
        self._window.append((datetime.now(timezone.utc), is_miss))
        self._prune(hours=1)

    def miss_rate(self) -> float:
        if not self._window:
            return 0.0
        misses = sum(1 for _, is_miss in self._window if is_miss)
        return misses / len(self._window)

    def alert_if_degraded(self) -> str | None:
        rate = self.miss_rate()
        if rate > self.miss_threshold:
            return (
                f"Miss rate {rate:.1%} exceeds threshold {self.miss_threshold:.1%}. "
                f"Check corpus freshness and coverage gaps."
            )
        return None

    def _prune(self, hours: int):
        cutoff = datetime.now(timezone.utc).timestamp() - hours * 3600
        self._window = [
            (ts, miss) for ts, miss in self._window
            if ts.timestamp() > cutoff
        ]

Before vs. after: uncontrolled ingestion

Before — no contracts, no monitoring:

def ingest(docs):
    for doc in docs:
        chunks = chunk(doc["body"])
        embed_and_store(chunks)
    # No version tracking. No freshness check. No miss rate.
    # When retrieval degrades, you find out from users.

After — contracted ingestion:

def ingest(docs, contract: DataContract, versioner: CorpusVersion):
    valid, rejected = ingest_with_contract(docs, contract)
    for doc in valid:
        chunks = chunk(doc["body"])
        embed_and_store(chunks)
    run_stats = {
        "document_count": len(valid),
        "chunk_count": sum(len(chunk(d["body"])) for d in valid),
        "rejected_count": len(rejected),
    }
    versioner.snapshot(run_id=f"run_{int(datetime.now().timestamp())}", stats=run_stats)
    if rejected:
        alert_on_rejected(rejected)

Layer 3: Deep Dive

Why AI data contracts are different from database schemas

Database schemas validate structure. A JSON schema tells you a field is present and typed correctly. A data contract for AI goes further:

  • Semantic coherence: Is the content internally consistent? A product description that contradicts the product title will confuse retrieval even if both fields are present and non-null.
  • Freshness relevance: For a knowledge base over live data, a document updated 90 days ago may be more harmful than no document at all — it competes with fresher content and can win retrieval contests.
  • Coverage intent: Does the corpus cover the query distribution your users actually have? A corpus that answers 95% of training queries but 40% of real queries has a coverage gap that schema validation cannot detect.

The Great Expectations framework brings this discipline to data pipelines. Its “expectations” are executable assertions about data quality — not just schema, but distributions, freshness, and cross-field consistency.

Production failure taxonomy

Coverage drift: The query distribution shifts (new product launched, regulation changed) while the corpus stays static. Miss rate climbs but everything else looks healthy. Detection: track query topics over time and compare against corpus coverage.

Ingestion regression: A pipeline change silently drops a document class (e.g., PDFs that fail to parse). The corpus shrinks but chunk count per document looks normal. Detection: track document count per source separately from chunk count.

Boilerplate contamination: Legal disclaimers, nav menus, and cookie banners appear in chunks because HTML cleaning was imperfect. These chunks win retrieval for generic queries and displace relevant content. Detection: flag chunks with entropy below a threshold — boilerplate has low lexical diversity.

Stale embedding model mismatch: Corpus was embedded with model version A; new queries are embedded with model version B after a library upgrade. Semantic similarity scores degrade silently. Detection: version your embedding model alongside your corpus and validate consistency before deploying a new model.

Circular synthetic contamination: LLM-generated summaries are ingested alongside source documents. The model then retrieves its own outputs as context and amplifies whatever biases or errors were in those summaries. Detection: tag synthetic content and track retrieval frequency per content type.

Unstructured ETL at scale

For large corpora, parsing performance matters. Key tools:

  • Unstructured.io — handles PDF, DOCX, HTML, PPTX with layout-aware chunking; designed for AI pipelines
  • Apache Tika — broad format support via JVM; useful for legacy enterprise formats
  • LlamaParse — LLM-powered parsing for complex PDFs with tables and figures

For freshness tracking at scale, maintain a document fingerprint store — a hash of each document’s content at last ingestion. On each run, compare current hashes against stored hashes. Only re-embed documents that changed. This reduces embedding cost by 80–90% on mature corpora where most content is stable.

Further reading

  • Data Management for Machine Learning Systems; Shankar et al., 2023. Taxonomy of data management challenges specific to ML pipelines; the “data cascades” concept applies directly to RAG corpus quality.
  • Great Expectations documentation; Superconductive, 2023. Practical reference for data contract implementation; the “expectation suite” pattern maps cleanly to AI pipeline contracts.
  • The Data Quality Imperative; Batini et al., 2024. Survey of data quality dimensions and measurement methods; theoretical foundation for completeness and consistency metrics.
✏ Suggest an edit on GitHub

Data Engineering for AI Systems — Check your understanding

Q1

Your RAG system's retrieval quality was excellent at launch but has been degrading for two months. Your embedding model, chunk size, and retrieval parameters are unchanged. What is the most likely root cause?

Q2

You implement a data contract that validates document schema before ingestion. After deployment, users report that the system confidently gives outdated answers on product pricing. Why didn't your contract catch this?

Q3

You want to re-embed only documents that changed since the last ingestion run to reduce embedding API cost. What data structure makes this most efficient?

Q4

Your query miss rate — the fraction of queries that retrieve no chunk above the relevance threshold — is stable at 3% for the first six months, then spikes to 18% after a product launch. The corpus ingestion pipeline shows no errors. What is the most likely explanation?

Q5

You ingest LLM-generated summaries of your source documents alongside the original documents to improve retrieval. Six months later, your system starts confidently repeating a specific factual error across multiple query types. Tracing the error, you find it originated in a hallucinated summary that was ingested and is now being retrieved frequently. Which data engineering principle does this violate?