The Story That Explains RAG
So Aria does what every smart detective does: she walks into the archive room, searches for the most relevant folders, reads the specific pages that matter, and then reasons over them using everything she already knows.
She does not guess from memory. She retrieves, then generates.
That is Retrieval-Augmented Generation — RAG — in one sentence. A language model that retrieves relevant documents at inference time and uses them as live context before generating an answer.
Before RAG existed, language models were pure memory machines. They memorised patterns from training data and hallucinated confidently when asked about anything outside that knowledge. RAG changed everything — it turned a static, closed-book student into an open-book reasoner. And in this tutorial, we build every piece of it from scratch in pure Python, using ChromaDB as our vector store — no heavy frameworks in between.
LLMs have a knowledge cutoff — they cannot know what happened after training. They have a context limit — you can't stuff an entire company wiki into a prompt. And they hallucinate — they fabricate citations and figures with supreme confidence. RAG fixes all three by grounding generation in retrieved, verifiable evidence.
Architecture Overview — What We Are Building
Our stack is deliberately minimal and transparent: pure Python for chunking and orchestration, sentence-transformers for embeddings, ChromaDB for vector storage and retrieval, and the OpenAI API for generation. No LangChain. No magic abstractions. Every line is yours to understand and control.
upsert
LangChain is powerful but opaque. When something breaks — wrong chunks retrieved, bad prompt format, silent truncation — tracing the bug through abstraction layers takes hours. Building RAG yourself means you own every decision: chunk size, prompt template, similarity threshold, re-rank cutoff. That control is what separates production engineers from tutorial-followers.
| Library | Role | Install |
|---|---|---|
| chromadb | Vector store — persist, query, filter | pip install chromadb |
| sentence-transformers | Bi-encoder embeddings + cross-encoder re-ranking | pip install sentence-transformers |
| openai | LLM generation via API | pip install openai |
| pypdf | PDF text extraction | pip install pypdf |
| tiktoken | Token-accurate chunk sizing | pip install tiktoken |
Step 1 — Loading Documents
A real RAG system must handle multiple file types. We write a single
load_documents() dispatcher that returns a list of Document
dataclass objects — each holding raw text and a metadata dict. No magic, no
dependencies beyond pypdf.
import os, re, hashlib, json
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Any, Tuple
from pypdf import PdfReader
# ── Core data structure ───────────────────────────────────
@dataclass
class Document:
text: str
metadata: Dict[str, Any] = field(default_factory=dict)
# ── Per-format loaders ────────────────────────────────────
def load_pdf(path: str) -> List[Document]:
reader = PdfReader(path)
docs = []
for i, page in enumerate(reader.pages):
text = page.extract_text() or ""
if text.strip():
docs.append(Document(
text=text.strip(),
metadata={"source": path, "page": i + 1, "type": "pdf"}
))
return docs
def load_txt(path: str) -> List[Document]:
text = Path(path).read_text(encoding="utf-8", errors="ignore")
return [Document(text=text, metadata={"source": path, "type": "txt"})]
def load_markdown(path: str) -> List[Document]:
text = Path(path).read_text(encoding="utf-8")
text = re.sub(r"#{1,6}\s", "", text) # headings
text = re.sub(r"\*\*|__|\*|_", "", text) # bold / italic
text = re.sub(r"`{1,3}", "", text) # code fences
return [Document(text=text, metadata={"source": path, "type": "markdown"})]
# ── Extension → loader map ────────────────────────────────
LOADERS = {
".pdf": load_pdf,
".txt": load_txt,
".md": load_markdown,
".markdown": load_markdown,
}
def load_documents(directory: str) -> List[Document]:
docs = []
for root, _, files in os.walk(directory):
for fname in files:
ext = Path(fname).suffix.lower()
if ext in LOADERS:
fpath = os.path.join(root, fname)
try:
docs.extend(LOADERS[ext](fpath))
except Exception as e:
print(f" [WARN] {fpath}: {e}")
print(f"Loaded {len(docs)} document pages from '{directory}'")
return docs
# ── Test ──────────────────────────────────────────────────
docs = load_documents("./docs")
print(docs[0].text[:120])
print(docs[0].metadata)
Step 2 — Chunking — The Most Underrated Step in RAG
That is what bad chunking does to your RAG system. The LLM receives fragments instead of coherent passages. It confabulates the missing pieces — and you blame the model when the real culprit is the scissors.
Good chunking preserves semantic completeness. Every chunk should be a self-contained unit of meaning, with enough overlap to preserve context at boundaries.
Small chunks (128–256 tokens): precise retrieval, but thin context — the LLM
may lack surrounding information to answer fully.
Large chunks (1024+ tokens): rich context, but lower retrieval precision —
the embedding averages too many topics and the wrong chunk can win.
Sweet spot in practice: 400–600 tokens with 10–15% overlap.
import tiktoken
# ── Token counter (cl100k = GPT-4 tokeniser) ─────────────
_enc = tiktoken.get_encoding("cl100k_base")
def count_tokens(text: str) -> int:
return len(_enc.encode(text))
# ── Recursive character-aware splitter ───────────────────
def split_text(
text: str,
chunk_size: int = 500,
chunk_overlap: int = 60,
separators: list = None,
) -> List[str]:
if separators is None:
separators = ["\n\n", "\n", ". ", " ", ""]
def _split(txt: str, seps: list) -> List[str]:
if count_tokens(txt) <= chunk_size:
return [txt] if txt.strip() else []
sep = seps[0] if seps else ""
parts = txt.split(sep) if sep else list(txt)
if len(parts) == 1: # separator not found — try next
return _split(txt, seps[1:])
chunks, current = [], ""
for part in parts:
candidate = (current + sep + part).strip()
if count_tokens(candidate) <= chunk_size:
current = candidate
else:
if current:
chunks.append(current)
# Carry the tail of the last chunk forward as overlap
overlap_words = current.split()[-chunk_overlap:]
current = (" ".join(overlap_words) + " " + part).strip()
if current:
chunks.append(current)
return chunks
return _split(text, separators)
# ── Chunk all loaded documents ────────────────────────────
def chunk_documents(
docs: List[Document],
chunk_size: int = 500,
chunk_overlap: int = 60,
) -> List[Document]:
chunks = []
for doc in docs:
for i, t in enumerate(split_text(doc.text, chunk_size, chunk_overlap)):
chunks.append(Document(
text=t,
metadata={**doc.metadata, "chunk_index": i}
))
print(f"Split {len(docs)} pages → {len(chunks)} chunks")
return chunks
chunks = chunk_documents(docs, chunk_size=500, chunk_overlap=60)
sizes = [count_tokens(c.text) for c in chunks]
print(f"Token stats — min:{min(sizes)} avg:{sum(sizes)//len(sizes)} max:{max(sizes)}")
Step 3 — Embeddings — Turning Text into Numbers That Think
When you walk in with a question — "How do neural networks learn?" — you transform your question into a point in that same space, then look for the books floating closest to you.
That transformation — text → point in space — is an embedding. Two sentences that mean the same thing produce vectors with very high cosine similarity, even if they share almost no words.
from sentence_transformers import SentenceTransformer
import numpy as np
# ── Load bi-encoder (runs locally, no API key needed) ─────
# 'all-MiniLM-L6-v2' → fast, 384-dim, excellent English
# 'BAAI/bge-base-en-v1.5' → more accurate, 768-dim
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
def embed_texts(texts: List[str], batch_size: int = 64) -> np.ndarray:
"""Returns float32 array shape (N, dim). Pre-normalised for cosine."""
return embed_model.encode(
texts,
batch_size=batch_size,
show_progress_bar=False,
normalize_embeddings=True, # cosine sim = dot product
)
# ── Quick similarity demo ─────────────────────────────────
sentences = [
"What is the refund policy for digital products?", # query
"Digital items can be returned within 30 days.", # relevant
"The office is open Monday through Friday.", # irrelevant
]
vecs = embed_texts(sentences)
q = vecs[0]
for i, s in enumerate(sentences[1:], 1):
sim = float(np.dot(q, vecs[i])) # normalised → cosine = dot
print(f" [{sim:.4f}] {s}")
print(f"Embedding dim: {vecs.shape[1]}")
Normalising to unit length converts cosine similarity into a plain dot product —
the fastest possible similarity computation. ChromaDB stores and queries with
cosine distance when "hnsw:space": "cosine" is set, so pre-normalised
vectors give exact results with minimal overhead.
Step 4 — Indexing with ChromaDB
ChromaDB is a librarian who has read every book and feels the meaning of your question. You say "I want something about loyalty and warmth" — she instantly pulls the right books even though you never said "dog". She reaches into 384-dimensional conceptual space and finds the closest entries in under 5 milliseconds.
ChromaDB stores your embeddings, raw text, and metadata together. One query returns all three — so you never need to re-fetch anything.
import chromadb
from chromadb.config import Settings
# ── Persistent client — survives restarts ─────────────────
chroma_client = chromadb.PersistentClient(
path="./chroma_db",
settings=Settings(anonymized_telemetry=False)
)
COLLECTION = "company_knowledge"
collection = chroma_client.get_or_create_collection(
name=COLLECTION,
metadata={"hnsw:space": "cosine"} # cosine similarity index
)
# ── Deterministic chunk ID — same content = same ID ───────
def make_id(doc: Document, global_index: int) -> str:
raw = doc.metadata.get("source", "?") + str(global_index) + doc.text[:80]
return hashlib.md5(raw.encode()).hexdigest()
# ── Batch-index all chunks ────────────────────────────────
def index_chunks(chunks: List[Document], batch_size: int = 100) -> None:
total = len(chunks)
for start in range(0, total, batch_size):
batch = chunks[start : start + batch_size]
texts = [c.text for c in batch]
ids = [make_id(c, start + i) for i, c in enumerate(batch)]
metas = [c.metadata for c in batch]
embs = embed_texts(texts).tolist()
collection.upsert( # upsert = add OR overwrite safely
ids=ids,
documents=texts,
embeddings=embs,
metadatas=metas,
)
done = min(start + batch_size, total)
print(f" Indexed {done}/{total} chunks", end="\r")
print(f"\nIndex complete. Collection size: {collection.count()} chunks")
index_chunks(chunks)
print("Persisted to ./chroma_db — survives restart.")
add() throws an error if an ID already exists.
upsert() silently overwrites — so re-running your indexing pipeline after
a document update is safe and idempotent. Combine with deterministic MD5 IDs and you
get free incremental indexing: unchanged chunks are overwritten with identical data;
new chunks are inserted. One pipeline, zero conflicts.
Step 5 — Retrieval — Querying ChromaDB
Retrieval embeds the user query and asks ChromaDB for the k most similar chunks.
ChromaDB returns distances, documents, and metadata in a single round-trip call.
We also show metadata filtering — restricting search to a specific department,
year, or file type before the similarity step runs.
# ── Core retrieval function ───────────────────────────────
def retrieve(
query: str,
k: int = 10,
where: dict = None,
) -> List[Document]:
"""
Retrieve top-k chunks for a query.
where: ChromaDB metadata filter, e.g.:
{"department": {"$eq": "HR"}}
{"$and": [{"type": {"$eq": "pdf"}}, {"year": {"$gte": 2023}}]}
"""
q_emb = embed_texts([query]).tolist()
kw = {
"query_embeddings": q_emb,
"n_results": k,
"include": ["documents", "metadatas", "distances"],
}
if where:
kw["where"] = where
r = collection.query(**kw)
docs = []
for text, meta, dist in zip(
r["documents"][0], r["metadatas"][0], r["distances"][0]
):
meta["_distance"] = dist
docs.append(Document(text=text, metadata=meta))
return docs
# ── Unfiltered retrieval ──────────────────────────────────
query = "What is the refund policy for digital products?"
hits = retrieve(query, k=5)
print(f"Top {len(hits)} results:\n")
for h in hits:
print(f" [dist={h.metadata['_distance']:.4f}] {h.metadata['source']} {h.text[:70]}...")
print()
# ── Filtered retrieval: PDF docs from 2024 only ───────────
filtered = retrieve(
"How many vacation days do employees get?",
k=5,
where={"$and": [{"type": {"$eq": "pdf"}}, {"year": {"$gte": 2023}}]}
)
print(f"Filtered results ({len(filtered)}):")
for h in filtered:
print(f" {h.metadata['source']} → {h.text[:70]}...")
Step 6 — Re-Ranking — Precision After Recall
Re-ranking is the expert sorter on the boat. After the net is hauled in, she holds each fish next to your specific order form and scores them individually. She separates the precise matches from the near-misses.
The bi-encoder retrieves fast (the net). The cross-encoder re-ranks accurately (the sorter). Together: speed at scale, precision at delivery.
A cross-encoder takes the query and candidate document together as a single input, letting the model attend across both simultaneously. Far more accurate than comparing separate embeddings — but too slow to run on the full corpus. The pattern: retrieve top-20 fast with ChromaDB, then re-rank to top-3 with the cross-encoder.
from sentence_transformers import CrossEncoder
# ── Load cross-encoder re-ranker ──────────────────────────
# ms-marco-MiniLM-L-6-v2: fast + accurate passage relevance scorer
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank(
query: str,
candidates: List[Document],
top_n: int = 3,
) -> List[Document]:
"""
Score each (query, chunk) pair with the cross-encoder.
Attaches '_rerank_score' to metadata; returns top_n sorted desc.
"""
pairs = [[query, doc.text] for doc in candidates]
scores = reranker.predict(pairs) # shape: (N,)
ranked = sorted(
zip(scores, candidates),
key=lambda x: x[0],
reverse=True,
)
for score, doc in ranked:
doc.metadata["_rerank_score"] = float(score)
return [doc for _, doc in ranked[:top_n]]
# ── Two-stage pipeline in action ──────────────────────────
query = "Can I get a refund if I already downloaded the file?"
candidates = retrieve(query, k=20) # cast wide net
top_docs = rerank(query, candidates, top_n=3) # precision sort
print("After re-ranking — Top 3 passages:\n")
for i, doc in enumerate(top_docs, 1):
print(f" [{i}] score={doc.metadata['_rerank_score']:.3f}")
print(f" source: {doc.metadata['source']}")
print(f" text: {doc.text[:100]}...\n")
Step 7 — Generation — Grounded Answers from the LLM
With top_docs in hand, we build a prompt that presents the retrieved context
to the LLM and explicitly instructs it to answer only from that evidence.
The system prompt enforces faithfulness; the user turn carries the query and context block.
from openai import OpenAI
import os
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
SYSTEM_PROMPT = """You are a precise, helpful assistant.
Answer the user's question using ONLY the provided context passages.
Rules:
- If the answer is in the context, answer clearly and cite the source filename.
- If the answer is NOT in the context, say exactly:
"I don't have that information in the provided documents."
- Never guess, infer beyond the context, or use outside knowledge.
- Keep answers concise and factual."""
def build_context_block(docs: List[Document]) -> str:
parts = []
for i, doc in enumerate(docs, 1):
src = doc.metadata.get("source", "unknown")
page = doc.metadata.get("page", "")
ref = f"{src} p.{page}" if page else src
parts.append(f"[{i}] Source: {ref}\n{doc.text}")
return "\n\n---\n\n".join(parts)
def generate_answer(query: str, context_docs: List[Document]) -> dict:
context = build_context_block(context_docs)
user_msg = f"Context:\n{context}\n\nQuestion: {query}"
response = client.chat.completions.create(
model="gpt-4o",
temperature=0, # deterministic = max faithfulness
max_tokens=512,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
],
)
return {
"answer": response.choices[0].message.content,
"sources": [d.metadata.get("source") for d in context_docs],
"tokens": response.usage.total_tokens,
}
# ── Full three-stage pipeline in one call ─────────────────
query = "Can I get a refund if I already downloaded the file?"
candidates = retrieve(query, k=20)
top_docs = rerank(query, candidates, top_n=3)
result = generate_answer(query, top_docs)
print(f"Q: {query}\n")
print(f"A: {result['answer']}\n")
print(f"Sources: {result['sources']}")
print(f"Tokens: {result['tokens']}")
In a RAG system the LLM's job is to read and synthesise, not to create. Temperature > 0 introduces stochastic sampling — the model may choose a slightly different token path that drifts from the evidence and introduces subtle hallucinations. At temperature=0 it is fully deterministic and maximally faithful to the retrieved context.
Putting It All Together — The RAGPipeline Class
All seven steps are now wrapped into a single, reusable class. Initialise once; call
ask() forever. This is the clean, production-ready interface — every component
still pure Python and fully inspectable.
class RAGPipeline:
def __init__(
self,
docs_dir: str = "./docs",
chroma_path: str = "./chroma_db",
collection_name: str = "knowledge",
embed_model_id: str = "all-MiniLM-L6-v2",
rerank_model_id: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
llm_model: str = "gpt-4o",
chunk_size: int = 500,
chunk_overlap: int = 60,
retrieve_k: int = 20,
rerank_top_n: int = 3,
):
self.llm_model = llm_model
self.retrieve_k = retrieve_k
self.rerank_top_n = rerank_top_n
self.embedder = SentenceTransformer(embed_model_id)
self.reranker = CrossEncoder(rerank_model_id)
self.oai = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
self.chroma = chromadb.PersistentClient(
path=chroma_path,
settings=Settings(anonymized_telemetry=False),
)
self.col = self.chroma.get_or_create_collection(
collection_name, metadata={"hnsw:space": "cosine"}
)
if self.col.count() == 0:
print("Empty collection — building index...")
raw = load_documents(docs_dir)
chunks = chunk_documents(raw, chunk_size, chunk_overlap)
self._upsert(chunks)
def _embed(self, texts: List[str]) -> np.ndarray:
return self.embedder.encode(
texts, normalize_embeddings=True, show_progress_bar=False
)
def _upsert(self, chunks: List[Document], bs: int = 100) -> None:
for s in range(0, len(chunks), bs):
b = chunks[s : s + bs]
self.col.upsert(
ids = [make_id(c, s + i) for i, c in enumerate(b)],
documents = [c.text for c in b],
embeddings = self._embed([c.text for c in b]).tolist(),
metadatas = [c.metadata for c in b],
)
print(f"Indexed {len(chunks)} chunks. Total: {self.col.count()}")
def retrieve(self, query: str, where: dict = None) -> List[Document]:
q_emb = self._embed([query]).tolist()
kw = {"query_embeddings": q_emb, "n_results": self.retrieve_k,
"include": ["documents", "metadatas", "distances"]}
if where: kw["where"] = where
r = self.col.query(**kw)
docs = []
for t, m, d in zip(r["documents"][0], r["metadatas"][0], r["distances"][0]):
m["_distance"] = d
docs.append(Document(text=t, metadata=m))
return docs
def rerank(self, query: str, docs: List[Document]) -> List[Document]:
scores = self.reranker.predict([[query, d.text] for d in docs])
ranked = sorted(zip(scores, docs), key=lambda x: x[0], reverse=True)
for sc, doc in ranked:
doc.metadata["_rerank_score"] = float(sc)
return [d for _, d in ranked[: self.rerank_top_n]]
def ask(self, query: str, where: dict = None, verbose: bool = False) -> dict:
candidates = self.retrieve(query, where)
top_docs = self.rerank(query, candidates)
context = build_context_block(top_docs)
if verbose:
print(f"\n── Context ──\n{context}\n")
resp = self.oai.chat.completions.create(
model=self.llm_model, temperature=0, max_tokens=512,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"},
],
)
return {
"answer": resp.choices[0].message.content,
"sources": [dict(d.metadata) for d in top_docs],
"tokens": resp.usage.total_tokens,
}
def add_documents(self, new_dir: str) -> None:
"""Add new documents to an existing, live index."""
raw = load_documents(new_dir)
chunks = chunk_documents(raw)
self._upsert(chunks)
# ── Usage ─────────────────────────────────────────────────
rag = RAGPipeline(docs_dir="./docs")
result = rag.ask("What are the main risks in the Q3 report?")
print(f"Answer:\n{result['answer']}\n")
for s in result["sources"]:
print(f" [{s.get('_rerank_score', 0):.3f}] {s['source']}")
ChromaDB Deep Dive — Modes, Filters, and Operations
ChromaDB supports three client modes, rich metadata filter operators, full-text pre-filtering, and document-level CRUD. Here is everything you need for production use.
import chromadb
from chromadb.config import Settings
# ── Mode 1: In-memory — for tests and CI ─────────────────
mem_client = chromadb.EphemeralClient()
# ── Mode 2: Persistent on disk — local production ────────
disk_client = chromadb.PersistentClient(path="./chroma_db")
# ── Mode 3: Remote server — team / cloud deployment ──────
# server_client = chromadb.HttpClient(host="localhost", port=8000)
# ── Distance metrics ─────────────────────────────────────
# "hnsw:space": "cosine" — best for pre-normalised embeddings
# "hnsw:space": "l2" — Euclidean (default)
# "hnsw:space": "ip" — inner product (= cosine if normalised)
col = disk_client.get_or_create_collection(
"demo",
metadata={"hnsw:space": "cosine"}
)
# ── Metadata filter operators ─────────────────────────────
# Scalar comparisons: $eq $ne $gt $gte $lt $lte
# Set membership: $in $nin
# Logical combiners: $and $or
results = col.query(
query_embeddings=q_emb,
n_results=5,
where={
"$and": [
{"department": {"$eq": "HR"}},
{"year": {"$gte": 2023}},
]
},
where_document={"$contains": "vacation"}, # substring text pre-filter
include=["documents", "metadatas", "distances"],
)
# ── Peek at stored chunks ─────────────────────────────────
sample = col.peek(limit=3)
for doc, meta in zip(sample["documents"], sample["metadatas"]):
print(f" [{meta['source']}] {doc[:60]}...")
# ── Delete by ID list ─────────────────────────────────────
col.delete(ids=["abc123", "def456"])
# ── Delete all chunks from a specific file ────────────────
col.delete(where={"source": {"$eq": "./docs/old_handbook_2019.pdf"}})
# ── Count and list all collections ───────────────────────
print(f"Chunks in collection: {col.count()}")
print(f"All collections: {[c.name for c in disk_client.list_collections()]}")
For a SaaS product where each customer has their own documents, give each tenant
a separate ChromaDB collection: get_or_create_collection(f"tenant_{tenant_id}").
Collections are logically isolated — one tenant's search never touches another's data.
Faster, simpler, and more secure than filtering a shared collection by a tenant metadata field.
Incremental Indexing — Keeping the Index Fresh
Documents change. New files appear. Old ones are deleted. We need a pipeline that runs on a schedule and only processes what actually changed — without rebuilding the whole index. We do this by hashing each file's content and comparing to a stored state file.
STATE_FILE = "./index_state.json"
def file_hash(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for blk in iter(lambda: f.read(65536), b""):
h.update(blk)
return h.hexdigest()
def load_state() -> dict:
return json.loads(Path(STATE_FILE).read_text()) \
if Path(STATE_FILE).exists() else {}
def save_state(state: dict) -> None:
Path(STATE_FILE).write_text(json.dumps(state, indent=2))
def incremental_index(docs_dir: str, col: chromadb.Collection) -> None:
state = load_state() # {filepath: sha256}
new_state = {}
added = deleted = skipped = 0
all_paths = [
str(p) for p in Path(docs_dir).rglob("*")
if p.suffix.lower() in LOADERS
]
# ── Changed / new files ───────────────────────────────
for path in all_paths:
fhash = file_hash(path)
new_state[path] = fhash
if state.get(path) == fhash:
skipped += 1
continue # unchanged — skip
col.delete(where={"source": {"$eq": path}}) # remove old chunks
raw = LOADERS[Path(path).suffix.lower()](path)
chunks = chunk_documents(raw)
index_chunks(chunks)
added += len(chunks)
# ── Deleted files ─────────────────────────────────────
for old_path in state:
if old_path not in new_state:
col.delete(where={"source": {"$eq": old_path}})
deleted += 1
save_state(new_state)
print(f"Incremental index: +{added} chunks -{deleted} files {skipped} unchanged")
incremental_index("./docs", col=collection)
Multi-Turn Conversation — Adding Memory to RAG
Standard RAG is stateless — each question is independent. For a chatbot, users expect follow-up questions to work: "What were the risks?" → "Which had the most revenue impact?" The second question needs context from the first. We solve this by condensing the follow-up into a standalone query before retrieval.
History = List[Tuple[str, str]] # [(user_msg, assistant_msg), ...]
def condense_query(history: History, new_query: str) -> str:
"""
Rewrite a follow-up question as a standalone question that
includes all necessary context from the conversation history.
'Which of those affected revenue?'
→ 'Which of the Q3 risks had the greatest revenue impact?'
Uses only the last 3 turns to stay within token budget.
"""
if not history:
return new_query
hist_text = "\n".join(
f"User: {u}\nAssistant: {a}" for u, a in history[-3:]
)
prompt = (
f"Conversation so far:\n{hist_text}\n\n"
f"Rewrite this follow-up as a standalone question "
f"that includes all necessary context:\n"
f"Follow-up: {new_query}\nStandalone:"
)
resp = client.chat.completions.create(
model="gpt-4o-mini", temperature=0, max_tokens=120,
messages=[{"role": "user", "content": prompt}],
)
return resp.choices[0].message.content.strip()
def chat(
rag: RAGPipeline,
history: History,
user_msg: str,
) -> Tuple[str, History]:
standalone = condense_query(history, user_msg)
result = rag.ask(standalone)
answer = result["answer"]
return answer, history + [(user_msg, answer)]
# ── Demo conversation ─────────────────────────────────────
rag = RAGPipeline()
history = []
for q in [
"What are the main risks in the Q3 report?",
"Which of those had the greatest financial impact?",
"What mitigation steps were proposed for it?",
]:
answer, history = chat(rag, history, q)
print(f"User: {q}")
print(f"RAG: {answer}\n")
Evaluating RAG Quality — RAGAS Metrics
You cannot improve what you cannot measure. The RAGAS framework provides four LLM-graded, reference-free metrics covering both retrieval and generation quality.
from ragas import evaluate
from ragas.metrics import (
faithfulness, answer_relevancy,
context_precision, context_recall
)
from datasets import Dataset
rag = RAGPipeline()
eval_qs = [
{"q": "What is the refund window for digital products?",
"gt": "14 days from the date of purchase."},
{"q": "How many vacation days do employees receive?",
"gt": "25 days per year for full-time employees."},
{"q": "What is the API rate limit on the Premium plan?",
"gt": "1000 requests per minute."},
]
rows = []
for item in eval_qs:
cands = rag.retrieve(item["q"])
top_docs = rag.rerank(item["q"], cands)
result = rag.ask(item["q"])
rows.append({
"question": item["q"],
"answer": result["answer"],
"contexts": [d.text for d in top_docs],
"ground_truth": item["gt"],
})
scores = evaluate(
Dataset.from_list(rows),
metrics=[faithfulness, answer_relevancy, context_precision, context_recall]
)
print(scores)
Common Failure Modes — Diagnosis and Fixes
where_document={"$contains": keyword}
as a pre-filter; use re-ranking; reduce chunk size for better topic specificity.
rerank_top_n to 3–4; order chunks so
the highest-scoring passage appears first.
context_tokens + 600 > model_limit, reduce
rerank_top_n or compress context with a summarisation pass.
Production RAG — The Non-Negotiable Checklist
upsert, never add in ChromaDB.
Idempotent indexing means you can re-run the indexer on any schedule without fear
of duplicate chunks corrupting your results.
temperature=0 on the generation LLM.
RAG is a comprehension task, not a creative one. Deterministic decoding =
maximal faithfulness to the retrieved context = fewest hallucinations.
What You Built — And Where to Go Next
She never guesses. She retrieves, verifies, synthesises, and cites.
You built the exact same system — from scratch, in pure Python, with full visibility into every decision. No black boxes, no framework magic, no hidden abstractions. This is what separates production engineers from tutorial-followers.
| What to Explore Next | How to Get There |
|---|---|
| Hybrid Search (BM25 + dense) | Add rank-bm25 retrieval; merge results with Reciprocal Rank Fusion (RRF) |
| Semantic Chunking | Embed each sentence; split when cosine similarity between adjacent sentences drops below a threshold |
| Streaming Answers | Pass stream=True to the OpenAI call; yield tokens as they arrive for real-time UI |
| ChromaDB Remote Server | Run chroma run as a Docker container; switch to chromadb.HttpClient() |
| Graph RAG | Extract (subject, relation, object) triples with an LLM; store in NetworkX; traverse for multi-hop queries |
| Multimodal RAG | Replace text embeddings with CLIP; index images alongside text chunks in ChromaDB |
Pure Python document loading and recursive chunking · sentence-transformers bi-encoder embeddings and cross-encoder re-ranking · ChromaDB persistent, filterable, cosine-similarity vector store · OpenAI API grounded generation at temperature=0 · RAGAS continuous evaluation · SHA-256 hashing incremental indexing · Query condensation multi-turn conversation. Every line transparent. Every decision yours to tune.