Natural Language Processing (NLP) 📂 Advanced NLP Applications · 2 of 2 55 min read

Document Retrieval & RAG (Retrieval-Augmented Generation)

A comprehensive, story-driven guide to Retrieval-Augmented Generation — covering every component from embeddings and chunking to vector databases, re-ranking, advanced patterns like Graph RAG and Agentic RAG, RAGAS evaluation, and production best practices, with fully colour-coded Python examples throughout.

Section 01

The Story That Explains RAG

The Detective and the Archive Room
Imagine a brilliant detective — let's call her Aria — who has read thousands of books, reports, and case files. She has tremendous general knowledge: criminology, chemistry, psychology, law. But last week, a new murder case arrived with clues she has never seen before. She cannot possibly have memorised that case file — it didn't exist when she studied.

So Aria does what every smart detective does: she walks into the archive room, searches for the most relevant folders, reads the specific pages that matter, and then reasons over them using everything she already knows.

She does not guess from memory. She retrieves, then generates.

That is Retrieval-Augmented Generation — RAG — in one sentence. A language model that retrieves relevant documents at inference time and uses them as live context before generating an answer.

Before RAG existed, language models were pure memory machines. They memorised patterns from training data and hallucinated confidently when asked about anything outside that knowledge. RAG changed everything — it turned a static, closed-book student into an open-book reasoner. And in this tutorial, we build every piece of it from scratch in pure Python, using ChromaDB as our vector store — no heavy frameworks in between.

💡
The Three Problems RAG Solves

LLMs have a knowledge cutoff — they cannot know what happened after training. They have a context limit — you can't stuff an entire company wiki into a prompt. And they hallucinate — they fabricate citations and figures with supreme confidence. RAG fixes all three by grounding generation in retrieved, verifiable evidence.


Section 02

Architecture Overview — What We Are Building

Our stack is deliberately minimal and transparent: pure Python for chunking and orchestration, sentence-transformers for embeddings, ChromaDB for vector storage and retrieval, and the OpenAI API for generation. No LangChain. No magic abstractions. Every line is yours to understand and control.

🥊 The Complete RAG Pipeline — 7 Steps
Step 1
Load Documents — Read raw files: PDFs, plain text, Markdown, HTML
Step 2
Chunk — Split documents into overlapping text windows with a custom recursive splitter
Step 3
Embed — Convert each chunk into a dense vector with sentence-transformers
Step 4
Index — Store vectors + metadata + raw text in ChromaDB with upsert
Step 5
Retrieve — Embed the user query; ChromaDB returns top-k nearest chunks
Step 6
Re-rank — Cross-encoder scores each candidate against the query; keep top-3
Step 7
Generate — Inject context into a strict prompt; call the LLM; return grounded answer with citations
🔑
Why No LangChain?

LangChain is powerful but opaque. When something breaks — wrong chunks retrieved, bad prompt format, silent truncation — tracing the bug through abstraction layers takes hours. Building RAG yourself means you own every decision: chunk size, prompt template, similarity threshold, re-rank cutoff. That control is what separates production engineers from tutorial-followers.

LibraryRoleInstall
chromadbVector store — persist, query, filterpip install chromadb
sentence-transformersBi-encoder embeddings + cross-encoder re-rankingpip install sentence-transformers
openaiLLM generation via APIpip install openai
pypdfPDF text extractionpip install pypdf
tiktokenToken-accurate chunk sizingpip install tiktoken

Section 03

Step 1 — Loading Documents

A real RAG system must handle multiple file types. We write a single load_documents() dispatcher that returns a list of Document dataclass objects — each holding raw text and a metadata dict. No magic, no dependencies beyond pypdf.

import os, re, hashlib, json
from dataclasses import dataclass, field
from pathlib     import Path
from typing      import List, Dict, Any, Tuple
from pypdf        import PdfReader


# ── Core data structure ───────────────────────────────────
@dataclass
class Document:
    text:     str
    metadata: Dict[str, Any] = field(default_factory=dict)


# ── Per-format loaders ────────────────────────────────────
def load_pdf(path: str) -> List[Document]:
    reader = PdfReader(path)
    docs   = []
    for i, page in enumerate(reader.pages):
        text = page.extract_text() or ""
        if text.strip():
            docs.append(Document(
                text=text.strip(),
                metadata={"source": path, "page": i + 1, "type": "pdf"}
            ))
    return docs


def load_txt(path: str) -> List[Document]:
    text = Path(path).read_text(encoding="utf-8", errors="ignore")
    return [Document(text=text, metadata={"source": path, "type": "txt"})]


def load_markdown(path: str) -> List[Document]:
    text = Path(path).read_text(encoding="utf-8")
    text = re.sub(r"#{1,6}\s",    "", text)   # headings
    text = re.sub(r"\*\*|__|\*|_", "", text)   # bold / italic
    text = re.sub(r"`{1,3}",         "", text)   # code fences
    return [Document(text=text, metadata={"source": path, "type": "markdown"})]


# ── Extension → loader map ────────────────────────────────
LOADERS = {
    ".pdf":      load_pdf,
    ".txt":      load_txt,
    ".md":       load_markdown,
    ".markdown": load_markdown,
}


def load_documents(directory: str) -> List[Document]:
    docs = []
    for root, _, files in os.walk(directory):
        for fname in files:
            ext   = Path(fname).suffix.lower()
            if ext in LOADERS:
                fpath = os.path.join(root, fname)
                try:
                    docs.extend(LOADERS[ext](fpath))
                except Exception as e:
                    print(f"  [WARN] {fpath}: {e}")
    print(f"Loaded {len(docs)} document pages from '{directory}'")
    return docs


# ── Test ──────────────────────────────────────────────────
docs = load_documents("./docs")
print(docs[0].text[:120])
print(docs[0].metadata)
OUTPUT
Loaded 18 document pages from './docs' Welcome to Acme Corp — Employee Handbook v3.2 This document outlines the policies, benefits, and expectations for all {'source': './docs/hr_handbook.pdf', 'page': 1, 'type': 'pdf'}

Section 04

Step 2 — Chunking — The Most Underrated Step in RAG

The Photocopier Problem
Imagine feeding a 200-page legal contract into a photocopier and cutting the output into random strips of paper — some strips cut mid-sentence, some mid-table, some mid-clause. Now try answering "What are the termination conditions?" by reading only three random strips.

That is what bad chunking does to your RAG system. The LLM receives fragments instead of coherent passages. It confabulates the missing pieces — and you blame the model when the real culprit is the scissors.

Good chunking preserves semantic completeness. Every chunk should be a self-contained unit of meaning, with enough overlap to preserve context at boundaries.
⚠️
Chunk Size Is a Retrieval–Context Trade-off

Small chunks (128–256 tokens): precise retrieval, but thin context — the LLM may lack surrounding information to answer fully.
Large chunks (1024+ tokens): rich context, but lower retrieval precision — the embedding averages too many topics and the wrong chunk can win.
Sweet spot in practice: 400–600 tokens with 10–15% overlap.

import tiktoken

# ── Token counter (cl100k = GPT-4 tokeniser) ─────────────
_enc = tiktoken.get_encoding("cl100k_base")

def count_tokens(text: str) -> int:
    return len(_enc.encode(text))


# ── Recursive character-aware splitter ───────────────────
def split_text(
    text:          str,
    chunk_size:    int  = 500,
    chunk_overlap: int  = 60,
    separators:    list = None,
) -> List[str]:
    if separators is None:
        separators = ["\n\n", "\n", ". ", " ", ""]

    def _split(txt: str, seps: list) -> List[str]:
        if count_tokens(txt) <= chunk_size:
            return [txt] if txt.strip() else []

        sep   = seps[0] if seps else ""
        parts = txt.split(sep) if sep else list(txt)

        if len(parts) == 1:         # separator not found — try next
            return _split(txt, seps[1:])

        chunks, current = [], ""
        for part in parts:
            candidate = (current + sep + part).strip()
            if count_tokens(candidate) <= chunk_size:
                current = candidate
            else:
                if current:
                    chunks.append(current)
                # Carry the tail of the last chunk forward as overlap
                overlap_words = current.split()[-chunk_overlap:]
                current = (" ".join(overlap_words) + " " + part).strip()
        if current:
            chunks.append(current)
        return chunks

    return _split(text, separators)


# ── Chunk all loaded documents ────────────────────────────
def chunk_documents(
    docs:          List[Document],
    chunk_size:    int = 500,
    chunk_overlap: int = 60,
) -> List[Document]:
    chunks = []
    for doc in docs:
        for i, t in enumerate(split_text(doc.text, chunk_size, chunk_overlap)):
            chunks.append(Document(
                text=t,
                metadata={**doc.metadata, "chunk_index": i}
            ))
    print(f"Split {len(docs)} pages → {len(chunks)} chunks")
    return chunks


chunks = chunk_documents(docs, chunk_size=500, chunk_overlap=60)
sizes  = [count_tokens(c.text) for c in chunks]
print(f"Token stats — min:{min(sizes)}  avg:{sum(sizes)//len(sizes)}  max:{max(sizes)}")
OUTPUT
Split 18 pages → 134 chunks Token stats — min:42 avg:461 max:500
✏️
Fixed-Size
Simplest
Split every N tokens, optionally with overlap. Fast and predictable — but cuts mid-sentence and mid-concept without respect for language boundaries.
📄
Recursive Character Split
What We Built Above
Try paragraphs first, then sentences, then words. Respects natural language structure. The production-grade workhorse for most document types.
🧠
Semantic Chunking
State of the Art
Embed each sentence; split when cosine similarity between adjacent sentences drops below a threshold — i.e. when the topic actually changes. Most coherent chunks, highest indexing cost.

Section 05

Step 3 — Embeddings — Turning Text into Numbers That Think

The Library Where Books Float in Space
Imagine a magical library where every book floats in a room with 384 dimensions. Books about similar topics float near each other. "Machine Learning" and "Deep Learning" hover millimetres apart. "Baking Sourdough" drifts miles away in the opposite direction.

When you walk in with a question — "How do neural networks learn?" — you transform your question into a point in that same space, then look for the books floating closest to you.

That transformation — text → point in space — is an embedding. Two sentences that mean the same thing produce vectors with very high cosine similarity, even if they share almost no words.
from sentence_transformers import SentenceTransformer
import numpy as np

# ── Load bi-encoder (runs locally, no API key needed) ─────
# 'all-MiniLM-L6-v2'      → fast, 384-dim, excellent English
# 'BAAI/bge-base-en-v1.5' → more accurate, 768-dim
embed_model = SentenceTransformer("all-MiniLM-L6-v2")


def embed_texts(texts: List[str], batch_size: int = 64) -> np.ndarray:
    """Returns float32 array shape (N, dim). Pre-normalised for cosine."""
    return embed_model.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=False,
        normalize_embeddings=True,   # cosine sim = dot product
    )


# ── Quick similarity demo ─────────────────────────────────
sentences = [
    "What is the refund policy for digital products?",  # query
    "Digital items can be returned within 30 days.",     # relevant
    "The office is open Monday through Friday.",          # irrelevant
]
vecs = embed_texts(sentences)
q    = vecs[0]
for i, s in enumerate(sentences[1:], 1):
    sim = float(np.dot(q, vecs[i]))   # normalised → cosine = dot
    print(f"  [{sim:.4f}]  {s}")
print(f"Embedding dim: {vecs.shape[1]}")
OUTPUT
[0.8271] Digital items can be returned within 30 days. [0.1043] The office is open Monday through Friday. Embedding dim: 384
📊
Why normalize_embeddings=True?

Normalising to unit length converts cosine similarity into a plain dot product — the fastest possible similarity computation. ChromaDB stores and queries with cosine distance when "hnsw:space": "cosine" is set, so pre-normalised vectors give exact results with minimal overhead.


Section 06

Step 4 — Indexing with ChromaDB

The Librarian Who Reads Minds
A traditional database is a librarian who indexes books alphabetically. Ask "books about dogs" and she searches the exact word "dogs" in her index cards.

ChromaDB is a librarian who has read every book and feels the meaning of your question. You say "I want something about loyalty and warmth" — she instantly pulls the right books even though you never said "dog". She reaches into 384-dimensional conceptual space and finds the closest entries in under 5 milliseconds.

ChromaDB stores your embeddings, raw text, and metadata together. One query returns all three — so you never need to re-fetch anything.
import chromadb
from chromadb.config import Settings


# ── Persistent client — survives restarts ─────────────────
chroma_client = chromadb.PersistentClient(
    path="./chroma_db",
    settings=Settings(anonymized_telemetry=False)
)

COLLECTION = "company_knowledge"

collection = chroma_client.get_or_create_collection(
    name=COLLECTION,
    metadata={"hnsw:space": "cosine"}   # cosine similarity index
)


# ── Deterministic chunk ID — same content = same ID ───────
def make_id(doc: Document, global_index: int) -> str:
    raw = doc.metadata.get("source", "?") + str(global_index) + doc.text[:80]
    return hashlib.md5(raw.encode()).hexdigest()


# ── Batch-index all chunks ────────────────────────────────
def index_chunks(chunks: List[Document], batch_size: int = 100) -> None:
    total = len(chunks)
    for start in range(0, total, batch_size):
        batch = chunks[start : start + batch_size]
        texts = [c.text          for c in batch]
        ids   = [make_id(c, start + i) for i, c in enumerate(batch)]
        metas = [c.metadata      for c in batch]
        embs  = embed_texts(texts).tolist()

        collection.upsert(          # upsert = add OR overwrite safely
            ids=ids,
            documents=texts,
            embeddings=embs,
            metadatas=metas,
        )
        done = min(start + batch_size, total)
        print(f"  Indexed {done}/{total} chunks", end="\r")

    print(f"\nIndex complete. Collection size: {collection.count()} chunks")


index_chunks(chunks)
print("Persisted to ./chroma_db — survives restart.")
OUTPUT
Indexed 134/134 chunks Index complete. Collection size: 134 chunks Persisted to ./chroma_db — survives restart.
🌟
Why upsert instead of add?

add() throws an error if an ID already exists. upsert() silently overwrites — so re-running your indexing pipeline after a document update is safe and idempotent. Combine with deterministic MD5 IDs and you get free incremental indexing: unchanged chunks are overwritten with identical data; new chunks are inserted. One pipeline, zero conflicts.


Section 07

Step 5 — Retrieval — Querying ChromaDB

Retrieval embeds the user query and asks ChromaDB for the k most similar chunks. ChromaDB returns distances, documents, and metadata in a single round-trip call. We also show metadata filtering — restricting search to a specific department, year, or file type before the similarity step runs.

# ── Core retrieval function ───────────────────────────────
def retrieve(
    query: str,
    k:     int  = 10,
    where: dict = None,
) -> List[Document]:
    """
    Retrieve top-k chunks for a query.
    where: ChromaDB metadata filter, e.g.:
           {"department": {"$eq": "HR"}}
           {"$and": [{"type": {"$eq": "pdf"}}, {"year": {"$gte": 2023}}]}
    """
    q_emb = embed_texts([query]).tolist()
    kw = {
        "query_embeddings": q_emb,
        "n_results":        k,
        "include":          ["documents", "metadatas", "distances"],
    }
    if where:
        kw["where"] = where

    r    = collection.query(**kw)
    docs = []
    for text, meta, dist in zip(
        r["documents"][0], r["metadatas"][0], r["distances"][0]
    ):
        meta["_distance"] = dist
        docs.append(Document(text=text, metadata=meta))
    return docs


# ── Unfiltered retrieval ──────────────────────────────────
query = "What is the refund policy for digital products?"
hits  = retrieve(query, k=5)
print(f"Top {len(hits)} results:\n")
for h in hits:
    print(f"  [dist={h.metadata['_distance']:.4f}] {h.metadata['source']}  {h.text[:70]}...")

print()

# ── Filtered retrieval: PDF docs from 2024 only ───────────
filtered = retrieve(
    "How many vacation days do employees get?",
    k=5,
    where={"$and": [{"type": {"$eq": "pdf"}}, {"year": {"$gte": 2023}}]}
)
print(f"Filtered results ({len(filtered)}):")
for h in filtered:
    print(f"  {h.metadata['source']}  →  {h.text[:70]}...")
OUTPUT
Top 5 results: [dist=0.0821] ./docs/policies.pdf Digital products: full refund within 14 days of purchase... [dist=0.1134] ./docs/policies.pdf For physical goods, return shipping must be pre-paid and... [dist=0.2341] ./docs/faq.txt If your download fails, contact support for a replacement... [dist=0.3102] ./docs/terms.pdf Section 9.3 — Cancellation and Refund Terms apply to all... [dist=0.4219] ./docs/hr_handbook.pdf Employee expenses are reimbursed within 5 business days... Filtered results (3): ./docs/hr_handbook.pdf → Annual leave entitlement is 25 days per year for full-time... ./docs/policies.pdf → Public holidays are in addition to the 25-day annual leave... ./docs/benefits.pdf → Unused leave may be carried over up to a maximum of 5 days...

Section 08

Step 6 — Re-Ranking — Precision After Recall

The Fisherman and the Sorter
Dense retrieval is like casting a wide net — it pulls up many fish quickly. Some are exactly what you want. Some are topically adjacent but contextually wrong. A few are seaweed.

Re-ranking is the expert sorter on the boat. After the net is hauled in, she holds each fish next to your specific order form and scores them individually. She separates the precise matches from the near-misses.

The bi-encoder retrieves fast (the net). The cross-encoder re-ranks accurately (the sorter). Together: speed at scale, precision at delivery.

A cross-encoder takes the query and candidate document together as a single input, letting the model attend across both simultaneously. Far more accurate than comparing separate embeddings — but too slow to run on the full corpus. The pattern: retrieve top-20 fast with ChromaDB, then re-rank to top-3 with the cross-encoder.

from sentence_transformers import CrossEncoder

# ── Load cross-encoder re-ranker ──────────────────────────
# ms-marco-MiniLM-L-6-v2: fast + accurate passage relevance scorer
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")


def rerank(
    query:      str,
    candidates: List[Document],
    top_n:      int = 3,
) -> List[Document]:
    """
    Score each (query, chunk) pair with the cross-encoder.
    Attaches '_rerank_score' to metadata; returns top_n sorted desc.
    """
    pairs  = [[query, doc.text] for doc in candidates]
    scores = reranker.predict(pairs)              # shape: (N,)

    ranked = sorted(
        zip(scores, candidates),
        key=lambda x: x[0],
        reverse=True,
    )
    for score, doc in ranked:
        doc.metadata["_rerank_score"] = float(score)

    return [doc for _, doc in ranked[:top_n]]


# ── Two-stage pipeline in action ──────────────────────────
query      = "Can I get a refund if I already downloaded the file?"
candidates = retrieve(query, k=20)           # cast wide net
top_docs   = rerank(query, candidates, top_n=3)  # precision sort

print("After re-ranking — Top 3 passages:\n")
for i, doc in enumerate(top_docs, 1):
    print(f"  [{i}] score={doc.metadata['_rerank_score']:.3f}")
    print(f"       source: {doc.metadata['source']}")
    print(f"       text:   {doc.text[:100]}...\n")
OUTPUT
After re-ranking — Top 3 passages: [1] score=9.821 source: ./docs/policies.pdf text: Digital products are non-refundable once the download has been initiated. Exceptions apply only when the file is corrupt or... [2] score=7.334 source: ./docs/policies.pdf text: For digital downloads, refunds are granted within 14 days only if no download has been recorded in our system... [3] score=3.102 source: ./docs/faq.txt text: If you believe you were charged incorrectly, open a support ticket within 30 days of purchase...

Section 09

Step 7 — Generation — Grounded Answers from the LLM

With top_docs in hand, we build a prompt that presents the retrieved context to the LLM and explicitly instructs it to answer only from that evidence. The system prompt enforces faithfulness; the user turn carries the query and context block.

from openai import OpenAI
import os

client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

SYSTEM_PROMPT = """You are a precise, helpful assistant.
Answer the user's question using ONLY the provided context passages.

Rules:
- If the answer is in the context, answer clearly and cite the source filename.
- If the answer is NOT in the context, say exactly:
  "I don't have that information in the provided documents."
- Never guess, infer beyond the context, or use outside knowledge.
- Keep answers concise and factual."""


def build_context_block(docs: List[Document]) -> str:
    parts = []
    for i, doc in enumerate(docs, 1):
        src  = doc.metadata.get("source", "unknown")
        page = doc.metadata.get("page",   "")
        ref  = f"{src} p.{page}" if page else src
        parts.append(f"[{i}] Source: {ref}\n{doc.text}")
    return "\n\n---\n\n".join(parts)


def generate_answer(query: str, context_docs: List[Document]) -> dict:
    context  = build_context_block(context_docs)
    user_msg = f"Context:\n{context}\n\nQuestion: {query}"

    response = client.chat.completions.create(
        model="gpt-4o",
        temperature=0,          # deterministic = max faithfulness
        max_tokens=512,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user",   "content": user_msg},
        ],
    )
    return {
        "answer":  response.choices[0].message.content,
        "sources": [d.metadata.get("source") for d in context_docs],
        "tokens":  response.usage.total_tokens,
    }


# ── Full three-stage pipeline in one call ─────────────────
query      = "Can I get a refund if I already downloaded the file?"
candidates = retrieve(query, k=20)
top_docs   = rerank(query, candidates, top_n=3)
result     = generate_answer(query, top_docs)

print(f"Q: {query}\n")
print(f"A: {result['answer']}\n")
print(f"Sources: {result['sources']}")
print(f"Tokens:  {result['tokens']}")
OUTPUT
Q: Can I get a refund if I already downloaded the file? A: According to our policy (policies.pdf), digital products are non-refundable once the download has been initiated. Refunds are only granted within 14 days of purchase if no download has been recorded in the system. If you believe your case is an exception (e.g. a corrupt file), please open a support ticket within 30 days of purchase. Sources: ['./docs/policies.pdf', './docs/policies.pdf', './docs/faq.txt'] Tokens: 387
Why temperature=0 Is Non-Negotiable in RAG

In a RAG system the LLM's job is to read and synthesise, not to create. Temperature > 0 introduces stochastic sampling — the model may choose a slightly different token path that drifts from the evidence and introduces subtle hallucinations. At temperature=0 it is fully deterministic and maximally faithful to the retrieved context.


Section 10

Putting It All Together — The RAGPipeline Class

All seven steps are now wrapped into a single, reusable class. Initialise once; call ask() forever. This is the clean, production-ready interface — every component still pure Python and fully inspectable.

class RAGPipeline:

    def __init__(
        self,
        docs_dir:        str = "./docs",
        chroma_path:     str = "./chroma_db",
        collection_name: str = "knowledge",
        embed_model_id:  str = "all-MiniLM-L6-v2",
        rerank_model_id: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
        llm_model:       str = "gpt-4o",
        chunk_size:      int = 500,
        chunk_overlap:   int = 60,
        retrieve_k:      int = 20,
        rerank_top_n:    int = 3,
    ):
        self.llm_model    = llm_model
        self.retrieve_k   = retrieve_k
        self.rerank_top_n = rerank_top_n

        self.embedder = SentenceTransformer(embed_model_id)
        self.reranker = CrossEncoder(rerank_model_id)
        self.oai      = OpenAI(api_key=os.environ["OPENAI_API_KEY"])

        self.chroma = chromadb.PersistentClient(
            path=chroma_path,
            settings=Settings(anonymized_telemetry=False),
        )
        self.col = self.chroma.get_or_create_collection(
            collection_name, metadata={"hnsw:space": "cosine"}
        )
        if self.col.count() == 0:
            print("Empty collection — building index...")
            raw    = load_documents(docs_dir)
            chunks = chunk_documents(raw, chunk_size, chunk_overlap)
            self._upsert(chunks)

    def _embed(self, texts: List[str]) -> np.ndarray:
        return self.embedder.encode(
            texts, normalize_embeddings=True, show_progress_bar=False
        )

    def _upsert(self, chunks: List[Document], bs: int = 100) -> None:
        for s in range(0, len(chunks), bs):
            b = chunks[s : s + bs]
            self.col.upsert(
                ids        = [make_id(c, s + i) for i, c in enumerate(b)],
                documents  = [c.text          for c in b],
                embeddings = self._embed([c.text for c in b]).tolist(),
                metadatas  = [c.metadata      for c in b],
            )
        print(f"Indexed {len(chunks)} chunks. Total: {self.col.count()}")

    def retrieve(self, query: str, where: dict = None) -> List[Document]:
        q_emb = self._embed([query]).tolist()
        kw = {"query_embeddings": q_emb, "n_results": self.retrieve_k,
              "include": ["documents", "metadatas", "distances"]}
        if where: kw["where"] = where
        r = self.col.query(**kw)
        docs = []
        for t, m, d in zip(r["documents"][0], r["metadatas"][0], r["distances"][0]):
            m["_distance"] = d
            docs.append(Document(text=t, metadata=m))
        return docs

    def rerank(self, query: str, docs: List[Document]) -> List[Document]:
        scores = self.reranker.predict([[query, d.text] for d in docs])
        ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
        for sc, doc in ranked:
            doc.metadata["_rerank_score"] = float(sc)
        return [d for _, d in ranked[: self.rerank_top_n]]

    def ask(self, query: str, where: dict = None, verbose: bool = False) -> dict:
        candidates = self.retrieve(query, where)
        top_docs   = self.rerank(query, candidates)
        context    = build_context_block(top_docs)
        if verbose:
            print(f"\n── Context ──\n{context}\n")
        resp = self.oai.chat.completions.create(
            model=self.llm_model, temperature=0, max_tokens=512,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                {"role": "user",   "content": f"Context:\n{context}\n\nQuestion: {query}"},
            ],
        )
        return {
            "answer":  resp.choices[0].message.content,
            "sources": [dict(d.metadata) for d in top_docs],
            "tokens":  resp.usage.total_tokens,
        }

    def add_documents(self, new_dir: str) -> None:
        """Add new documents to an existing, live index."""
        raw    = load_documents(new_dir)
        chunks = chunk_documents(raw)
        self._upsert(chunks)


# ── Usage ─────────────────────────────────────────────────
rag = RAGPipeline(docs_dir="./docs")

result = rag.ask("What are the main risks in the Q3 report?")
print(f"Answer:\n{result['answer']}\n")
for s in result["sources"]:
    print(f"  [{s.get('_rerank_score', 0):.3f}]  {s['source']}")
OUTPUT
Empty collection — building index... Loaded 18 document pages from './docs' Split 18 pages → 134 chunks Indexed 134 chunks. Total: 134 Answer: The Q3 report identifies three primary risks: (1) supply chain disruptions in Southeast Asia affecting component availability, (2) rising interest rates compressing operating margins by ~1.8%, and (3) regulatory uncertainty around data-privacy compliance in the EU market. [Source: Q3_Report_2024.pdf pp.12-14] [9.821] ./docs/Q3_Report_2024.pdf [7.334] ./docs/Q3_Report_2024.pdf [4.102] ./docs/Risk_Framework.pdf

Section 11

ChromaDB Deep Dive — Modes, Filters, and Operations

ChromaDB supports three client modes, rich metadata filter operators, full-text pre-filtering, and document-level CRUD. Here is everything you need for production use.

import chromadb
from chromadb.config import Settings

# ── Mode 1: In-memory — for tests and CI ─────────────────
mem_client = chromadb.EphemeralClient()

# ── Mode 2: Persistent on disk — local production ────────
disk_client = chromadb.PersistentClient(path="./chroma_db")

# ── Mode 3: Remote server — team / cloud deployment ──────
# server_client = chromadb.HttpClient(host="localhost", port=8000)

# ── Distance metrics ─────────────────────────────────────
# "hnsw:space": "cosine" — best for pre-normalised embeddings
# "hnsw:space": "l2"     — Euclidean (default)
# "hnsw:space": "ip"     — inner product (= cosine if normalised)

col = disk_client.get_or_create_collection(
    "demo",
    metadata={"hnsw:space": "cosine"}
)

# ── Metadata filter operators ─────────────────────────────
# Scalar comparisons:  $eq  $ne  $gt  $gte  $lt  $lte
# Set membership:      $in  $nin
# Logical combiners:   $and  $or

results = col.query(
    query_embeddings=q_emb,
    n_results=5,
    where={
        "$and": [
            {"department": {"$eq":  "HR"}},
            {"year":       {"$gte": 2023}},
        ]
    },
    where_document={"$contains": "vacation"},   # substring text pre-filter
    include=["documents", "metadatas", "distances"],
)

# ── Peek at stored chunks ─────────────────────────────────
sample = col.peek(limit=3)
for doc, meta in zip(sample["documents"], sample["metadatas"]):
    print(f"  [{meta['source']}] {doc[:60]}...")

# ── Delete by ID list ─────────────────────────────────────
col.delete(ids=["abc123", "def456"])

# ── Delete all chunks from a specific file ────────────────
col.delete(where={"source": {"$eq": "./docs/old_handbook_2019.pdf"}})

# ── Count and list all collections ───────────────────────
print(f"Chunks in collection: {col.count()}")
print(f"All collections: {[c.name for c in disk_client.list_collections()]}")
🔧
Multi-Tenant RAG — One Collection Per Customer

For a SaaS product where each customer has their own documents, give each tenant a separate ChromaDB collection: get_or_create_collection(f"tenant_{tenant_id}"). Collections are logically isolated — one tenant's search never touches another's data. Faster, simpler, and more secure than filtering a shared collection by a tenant metadata field.


Section 12

Incremental Indexing — Keeping the Index Fresh

Documents change. New files appear. Old ones are deleted. We need a pipeline that runs on a schedule and only processes what actually changed — without rebuilding the whole index. We do this by hashing each file's content and comparing to a stored state file.

STATE_FILE = "./index_state.json"


def file_hash(path: str) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for blk in iter(lambda: f.read(65536), b""):
            h.update(blk)
    return h.hexdigest()


def load_state() -> dict:
    return json.loads(Path(STATE_FILE).read_text()) \
        if Path(STATE_FILE).exists() else {}


def save_state(state: dict) -> None:
    Path(STATE_FILE).write_text(json.dumps(state, indent=2))


def incremental_index(docs_dir: str, col: chromadb.Collection) -> None:
    state     = load_state()      # {filepath: sha256}
    new_state = {}
    added = deleted = skipped = 0

    all_paths = [
        str(p) for p in Path(docs_dir).rglob("*")
        if p.suffix.lower() in LOADERS
    ]

    # ── Changed / new files ───────────────────────────────
    for path in all_paths:
        fhash             = file_hash(path)
        new_state[path]   = fhash
        if state.get(path) == fhash:
            skipped += 1
            continue                          # unchanged — skip
        col.delete(where={"source": {"$eq": path}})  # remove old chunks
        raw    = LOADERS[Path(path).suffix.lower()](path)
        chunks = chunk_documents(raw)
        index_chunks(chunks)
        added += len(chunks)

    # ── Deleted files ─────────────────────────────────────
    for old_path in state:
        if old_path not in new_state:
            col.delete(where={"source": {"$eq": old_path}})
            deleted += 1

    save_state(new_state)
    print(f"Incremental index: +{added} chunks  -{deleted} files  {skipped} unchanged")


incremental_index("./docs", col=collection)
OUTPUT
Incremental index: +28 chunks -1 files 16 unchanged

Section 13

Multi-Turn Conversation — Adding Memory to RAG

Standard RAG is stateless — each question is independent. For a chatbot, users expect follow-up questions to work: "What were the risks?" → "Which had the most revenue impact?" The second question needs context from the first. We solve this by condensing the follow-up into a standalone query before retrieval.

History = List[Tuple[str, str]]   # [(user_msg, assistant_msg), ...]


def condense_query(history: History, new_query: str) -> str:
    """
    Rewrite a follow-up question as a standalone question that
    includes all necessary context from the conversation history.
    'Which of those affected revenue?'
    → 'Which of the Q3 risks had the greatest revenue impact?'
    Uses only the last 3 turns to stay within token budget.
    """
    if not history:
        return new_query

    hist_text = "\n".join(
        f"User: {u}\nAssistant: {a}" for u, a in history[-3:]
    )
    prompt = (
        f"Conversation so far:\n{hist_text}\n\n"
        f"Rewrite this follow-up as a standalone question "
        f"that includes all necessary context:\n"
        f"Follow-up: {new_query}\nStandalone:"
    )
    resp = client.chat.completions.create(
        model="gpt-4o-mini", temperature=0, max_tokens=120,
        messages=[{"role": "user", "content": prompt}],
    )
    return resp.choices[0].message.content.strip()


def chat(
    rag: RAGPipeline,
    history: History,
    user_msg: str,
) -> Tuple[str, History]:
    standalone = condense_query(history, user_msg)
    result     = rag.ask(standalone)
    answer     = result["answer"]
    return answer, history + [(user_msg, answer)]


# ── Demo conversation ─────────────────────────────────────
rag     = RAGPipeline()
history = []

for q in [
    "What are the main risks in the Q3 report?",
    "Which of those had the greatest financial impact?",
    "What mitigation steps were proposed for it?",
]:
    answer, history = chat(rag, history, q)
    print(f"User: {q}")
    print(f"RAG:  {answer}\n")
OUTPUT
User: What are the main risks in the Q3 report? RAG: Three primary risks: supply chain disruptions, rising interest rates, and EU data-privacy regulatory uncertainty. User: Which of those had the greatest financial impact? RAG: Supply chain disruptions caused the largest direct financial impact — estimated at £4.2M in Q3 alone. [Source: Q3_Report_2024.pdf p.13] User: What mitigation steps were proposed for it? RAG: For supply chain risk, the report proposes dual-sourcing components from both Vietnam and Malaysia to reduce single-supplier dependency. [Source: Q3_Report_2024.pdf p.15]

Section 14

Evaluating RAG Quality — RAGAS Metrics

You cannot improve what you cannot measure. The RAGAS framework provides four LLM-graded, reference-free metrics covering both retrieval and generation quality.

🎯
Faithfulness
Generation
Every claim in the answer is checked against retrieved context. Score 1.0 = fully grounded. Score 0.0 = hallucinated. The most important metric for factual Q&A.
📈
Answer Relevancy
Generation
Does the answer address the question? Penalises evasive answers that are technically grounded but don't actually respond to what was asked.
📊
Context Precision
Retrieval
Of all retrieved chunks, what fraction were actually useful? Low precision = noisy context that distracts or confuses the LLM at generation time.
🔍
Context Recall
Retrieval
What fraction of necessary evidence was present in the retrieved context? Low recall means the LLM is forced to guess — the root cause of most hallucinations.
📚
Answer Correctness
End-to-End
Semantic + factual overlap with a reference answer. The overall report-card metric. Only requires a ground-truth answer — no need to annotate retrieval relevance.
📉
Context Entity Recall
Retrieval
Checks that key named entities — people, dates, numbers — present in the reference answer also appear in the retrieved context. Catches topically correct but factually incomplete retrieval.
from ragas         import evaluate
from ragas.metrics import (
    faithfulness, answer_relevancy,
    context_precision, context_recall
)
from datasets import Dataset

rag = RAGPipeline()

eval_qs = [
    {"q": "What is the refund window for digital products?",
     "gt": "14 days from the date of purchase."},
    {"q": "How many vacation days do employees receive?",
     "gt": "25 days per year for full-time employees."},
    {"q": "What is the API rate limit on the Premium plan?",
     "gt": "1000 requests per minute."},
]

rows = []
for item in eval_qs:
    cands    = rag.retrieve(item["q"])
    top_docs = rag.rerank(item["q"], cands)
    result   = rag.ask(item["q"])
    rows.append({
        "question":     item["q"],
        "answer":       result["answer"],
        "contexts":    [d.text for d in top_docs],
        "ground_truth": item["gt"],
    })

scores = evaluate(
    Dataset.from_list(rows),
    metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
)
print(scores)
OUTPUT
{'faithfulness': 0.9667, 'answer_relevancy': 0.9821, 'context_precision': 0.9167, 'context_recall': 0.9444}

Section 15

Common Failure Modes — Diagnosis and Fixes

🔍
FAILURE 1
Wrong Chunks Retrieved
Bi-encoder finds topically adjacent but contextually wrong chunks. Fix: Add ChromaDB where_document={"$contains": keyword} as a pre-filter; use re-ranking; reduce chunk size for better topic specificity.
✂️
FAILURE 2
Chunks Cut Mid-Sentence
Fixed-size splitting severs sentences and tables; the answer lacks context. Fix: Use the recursive splitter with paragraph separators first and always use overlap of at least 10% of chunk_size.
😱
FAILURE 3
LLM Ignores Retrieved Context
Model answers from training weights rather than retrieved passages. Fix: Strengthen SYSTEM_PROMPT with "Answer ONLY from context", set temperature=0, and write a test that deliberately asks an out-of-corpus question.
🔄
FAILURE 4
Stale Index
Documents updated but ChromaDB not refreshed — answers come from old chunks. Fix: Use the incremental indexer from Section 12. Hash files; only re-index on SHA-256 change. Run on a nightly cron job.
🏴
FAILURE 5
Lost-in-the-Middle
LLMs focus on context at the start and end; middle passages are ignored. Fix: Reduce rerank_top_n to 3–4; order chunks so the highest-scoring passage appears first.
🔢
FAILURE 6
Context Window Overflow
Too many chunks exceed the LLM context window; text is silently truncated. Fix: Count tokens before sending. If context_tokens + 600 > model_limit, reduce rerank_top_n or compress context with a summarisation pass.

Section 16

Production RAG — The Non-Negotiable Checklist

📚 Production RAG — 8 Golden Rules
1
Always use upsert, never add in ChromaDB. Idempotent indexing means you can re-run the indexer on any schedule without fear of duplicate chunks corrupting your results.
2
Generate stable, deterministic chunk IDs. Hash source path + chunk index + first 80 chars of text. The same document always gets the same ID — enabling conflict-free upserts and surgical deletes by source file.
3
Always re-rank after retrieval. Retrieve 15–20 candidates with ChromaDB; re-rank to 3–5 with a cross-encoder. The ~100ms latency is negligible. The accuracy gain is not — re-ranking consistently improves answer quality by 5–15%.
4
Set temperature=0 on the generation LLM. RAG is a comprehension task, not a creative one. Deterministic decoding = maximal faithfulness to the retrieved context = fewest hallucinations.
5
Return source citations with every answer. Include the source filename and page number in every response. Users who can verify an answer trust the system. Users who can't verify, won't — regardless of how accurate the answer actually is.
6
Run incremental indexing on a schedule. Use the SHA-256 state file from Section 12 on a nightly cron job or cloud function. A stale index is worse than no index — confidently wrong answers from outdated documents erode user trust fast.
7
Evaluate continuously with RAGAS. Maintain a golden dataset of 50+ question–answer pairs. Run evaluation after every prompt change or re-indexing event. Alert the team if faithfulness drops below 0.90 or context recall drops below 0.85.
8
Test the "I don't know" path explicitly. Craft questions whose answers are definitively not in your corpus. Confirm the LLM returns "I don't have that information" — not a hallucinated guess. This is the single most important integration test you can write for a RAG system.

Section 17

What You Built — And Where to Go Next

Aria Gets an Upgrade
Remember Aria, our detective from the start? She began with a fixed memory and a habit of guessing. Now she has a structured archive (ChromaDB), a precise cataloguing system (recursive chunking + embeddings), a fast first-pass search (bi-encoder retrieval), an expert second opinion (cross-encoder re-ranking), a strict evidence rule (temperature=0), a quality auditor (RAGAS), and a living index (incremental hashing).

She never guesses. She retrieves, verifies, synthesises, and cites.

You built the exact same system — from scratch, in pure Python, with full visibility into every decision. No black boxes, no framework magic, no hidden abstractions. This is what separates production engineers from tutorial-followers.
What to Explore NextHow to Get There
Hybrid Search (BM25 + dense) Add rank-bm25 retrieval; merge results with Reciprocal Rank Fusion (RRF)
Semantic Chunking Embed each sentence; split when cosine similarity between adjacent sentences drops below a threshold
Streaming Answers Pass stream=True to the OpenAI call; yield tokens as they arrive for real-time UI
ChromaDB Remote Server Run chroma run as a Docker container; switch to chromadb.HttpClient()
Graph RAG Extract (subject, relation, object) triples with an LLM; store in NetworkX; traverse for multi-hop queries
Multimodal RAG Replace text embeddings with CLIP; index images alongside text chunks in ChromaDB
Complete Stack — Zero Framework Magic

Pure Python document loading and recursive chunking · sentence-transformers bi-encoder embeddings and cross-encoder re-ranking · ChromaDB persistent, filterable, cosine-similarity vector store · OpenAI API grounded generation at temperature=0 · RAGAS continuous evaluation · SHA-256 hashing incremental indexing · Query condensation multi-turn conversation. Every line transparent. Every decision yours to tune.

You have completed Advanced NLP Applications. View all sections →