Introduction to RAG Systems
Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge retrieval, enabling AI systems to access and reason over vast document collections without requiring model retraining. RAG addresses fundamental LLM limitations—hallucination, knowledge cutoffs, and lack of domain specificity—by grounding responses in retrieved evidence. This section introduces RAG architecture, embedding models, vector databases, and the core components that form modern knowledge-augmented AI systems.
The RAG Paradigm
RAG bridges parametric and non-parametric knowledge:
import numpy as np
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import hashlib
import json
@dataclass
class Document:
"""Represents a document or chunk in the RAG system."""
content: str
metadata: Dict[str, Any] = field(default_factory=dict)
doc_id: Optional[str] = None
embedding: Optional[np.ndarray] = None
def __post_init__(self):
if self.doc_id is None:
# Generate deterministic ID from content
self.doc_id = hashlib.md5(self.content.encode()).hexdigest()[:16]
@dataclass
class RetrievalResult:
"""Result from retrieval operation."""
document: Document
score: float
rank: int
class RAGSystem:
"""
Retrieval-Augmented Generation System.
Architecture:
1. Indexing Pipeline: Documents → Chunks → Embeddings → Vector Store
2. Retrieval Pipeline: Query → Embedding → Search → Rerank → Top-K
3. Generation Pipeline: Query + Context → LLM → Response
Key Benefits:
- Access to current/private knowledge without retraining
- Reduced hallucination through grounding
- Transparent sourcing with citations
- Cost-effective knowledge updates
"""
def __init__(
self,
embedding_model: 'EmbeddingModel',
vector_store: 'VectorStore',
llm: 'LanguageModel',
chunker: 'TextChunker',
top_k: int = 5
):
self.embedding_model = embedding_model
self.vector_store = vector_store
self.llm = llm
self.chunker = chunker
self.top_k = top_k
def index_documents(self, documents: List[Document]) -> int:
"""
Index documents into the vector store.
Pipeline:
1. Chunk documents into smaller pieces
2. Generate embeddings for each chunk
3. Store in vector database
"""
all_chunks = []
for doc in documents:
# Chunk the document
chunks = self.chunker.chunk(doc.content, doc.metadata)
all_chunks.extend(chunks)
# Generate embeddings
contents = [chunk.content for chunk in all_chunks]
embeddings = self.embedding_model.embed_documents(contents)
# Attach embeddings to chunks
for chunk, embedding in zip(all_chunks, embeddings):
chunk.embedding = embedding
# Store in vector database
self.vector_store.add(all_chunks)
return len(all_chunks)
def query(
self,
question: str,
filters: Optional[Dict[str, Any]] = None
) -> Tuple[str, List[RetrievalResult]]:
"""
Answer a question using RAG.
Pipeline:
1. Embed the query
2. Retrieve relevant documents
3. Generate answer with context
"""
# Retrieve relevant context
results = self.retrieve(question, filters)
# Build context string
context = self._build_context(results)
# Generate response
response = self._generate(question, context)
return response, results
def retrieve(
self,
query: str,
filters: Optional[Dict[str, Any]] = None
) -> List[RetrievalResult]:
"""Retrieve relevant documents for a query."""
# Embed query
query_embedding = self.embedding_model.embed_query(query)
# Search vector store
results = self.vector_store.search(
query_embedding,
top_k=self.top_k,
filters=filters
)
return results
def _build_context(self, results: List[RetrievalResult]) -> str:
"""Build context string from retrieval results."""
context_parts = []
for i, result in enumerate(results, 1):
source = result.document.metadata.get('source', 'Unknown')
context_parts.append(
f"[Source {i}: {source}]\n{result.document.content}"
)
return "\n\n".join(context_parts)
def _generate(self, question: str, context: str) -> str:
"""Generate answer using LLM with retrieved context."""
prompt = f"""Answer the question based on the provided context.
If the context doesn't contain relevant information, say so.
Context:
{context}
Question: {question}
Answer:"""
return self.llm.generate(prompt)
def rag_vs_fine_tuning():
"""Compare RAG with fine-tuning approaches."""
comparison = {
'RAG': {
'knowledge_update': 'Instant (update documents)',
'cost': 'Lower (no retraining)',
'hallucination': 'Reduced (grounded)',
'transparency': 'High (citations)',
'knowledge_scope': 'Unlimited (external)',
'latency': 'Higher (retrieval step)',
'best_for': 'Dynamic knowledge, QA, search'
},
'Fine-tuning': {
'knowledge_update': 'Requires retraining',
'cost': 'Higher (compute intensive)',
'hallucination': 'Can increase',
'transparency': 'Low (black box)',
'knowledge_scope': 'Limited to training data',
'latency': 'Lower (single forward pass)',
'best_for': 'Style, format, specialized tasks'
},
'RAG + Fine-tuning': {
'knowledge_update': 'Hybrid approach',
'cost': 'Highest',
'hallucination': 'Lowest',
'transparency': 'High',
'knowledge_scope': 'Comprehensive',
'latency': 'Medium',
'best_for': 'Production systems, high accuracy'
}
}
print("RAG vs Fine-tuning Comparison:")
print("=" * 70)
for approach, attrs in comparison.items():
print(f"\n{approach}:")
for k, v in attrs.items():
print(f" {k}: {v}")
rag_vs_fine_tuning()Text Embeddings
Embeddings convert text to dense vectors for semantic similarity:
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
class EmbeddingModel(ABC):
"""Abstract base class for embedding models."""
@abstractmethod
def embed_documents(self, texts: List[str]) -> np.ndarray:
"""Embed a list of documents."""
pass
@abstractmethod
def embed_query(self, text: str) -> np.ndarray:
"""Embed a single query."""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Return embedding dimension."""
pass
class SentenceTransformerEmbeddings(EmbeddingModel):
"""
Sentence Transformer embeddings for RAG.
Popular models:
- all-MiniLM-L6-v2: Fast, good quality (384 dim)
- all-mpnet-base-v2: Best quality (768 dim)
- multi-qa-mpnet-base-dot-v1: Optimized for QA
- e5-large-v2: State-of-the-art (1024 dim)
"""
def __init__(
self,
model_name: str = "sentence-transformers/all-MiniLM-L6-v2",
device: str = "cuda" if torch.cuda.is_available() else "cpu",
normalize: bool = True,
max_length: int = 512
):
self.device = device
self.normalize = normalize
self.max_length = max_length
# Load model and tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name).to(device)
self.model.eval()
self._dimension = self.model.config.hidden_size
@property
def dimension(self) -> int:
return self._dimension
def _mean_pooling(
self,
model_output: torch.Tensor,
attention_mask: torch.Tensor
) -> torch.Tensor:
"""Mean pooling over token embeddings."""
token_embeddings = model_output[0]
input_mask_expanded = attention_mask.unsqueeze(-1).expand(
token_embeddings.size()
).float()
sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, dim=1)
sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9)
return sum_embeddings / sum_mask
@torch.no_grad()
def embed_documents(
self,
texts: List[str],
batch_size: int = 32
) -> np.ndarray:
"""Embed multiple documents with batching."""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
# Tokenize
encoded = self.tokenizer(
batch,
padding=True,
truncation=True,
max_length=self.max_length,
return_tensors='pt'
).to(self.device)
# Forward pass
outputs = self.model(**encoded)
# Pool
embeddings = self._mean_pooling(outputs, encoded['attention_mask'])
# Normalize
if self.normalize:
embeddings = F.normalize(embeddings, p=2, dim=1)
all_embeddings.append(embeddings.cpu().numpy())
return np.vstack(all_embeddings)
def embed_query(self, text: str) -> np.ndarray:
"""Embed a single query."""
return self.embed_documents([text])[0]
class E5Embeddings(EmbeddingModel):
"""
E5 embeddings with query/passage prefixes.
E5 models are trained with contrastive learning and
require specific prefixes:
- Query: "query: {text}"
- Passage: "passage: {text}"
"""
def __init__(
self,
model_name: str = "intfloat/e5-large-v2",
device: str = "cuda" if torch.cuda.is_available() else "cpu"
):
self.device = device
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name).to(device)
self.model.eval()
self._dimension = self.model.config.hidden_size
@property
def dimension(self) -> int:
return self._dimension
def _average_pool(
self,
last_hidden_states: torch.Tensor,
attention_mask: torch.Tensor
) -> torch.Tensor:
last_hidden = last_hidden_states.masked_fill(
~attention_mask[..., None].bool(), 0.0
)
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
@torch.no_grad()
def embed_documents(self, texts: List[str]) -> np.ndarray:
"""Embed documents with passage prefix."""
prefixed = [f"passage: {text}" for text in texts]
encoded = self.tokenizer(
prefixed,
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
).to(self.device)
outputs = self.model(**encoded)
embeddings = self._average_pool(outputs.last_hidden_state, encoded['attention_mask'])
embeddings = F.normalize(embeddings, p=2, dim=1)
return embeddings.cpu().numpy()
def embed_query(self, text: str) -> np.ndarray:
"""Embed query with query prefix."""
prefixed = f"query: {text}"
encoded = self.tokenizer(
[prefixed],
padding=True,
truncation=True,
max_length=512,
return_tensors='pt'
).to(self.device)
with torch.no_grad():
outputs = self.model(**encoded)
embedding = self._average_pool(outputs.last_hidden_state, encoded['attention_mask'])
embedding = F.normalize(embedding, p=2, dim=1)
return embedding.cpu().numpy()[0]
class OpenAIEmbeddings(EmbeddingModel):
"""
OpenAI embedding API wrapper.
Models:
- text-embedding-3-small: Fast, economical (1536 dim)
- text-embedding-3-large: Best quality (3072 dim)
- text-embedding-ada-002: Legacy (1536 dim)
"""
def __init__(
self,
model: str = "text-embedding-3-small",
api_key: Optional[str] = None,
dimensions: Optional[int] = None
):
self.model = model
self.api_key = api_key
self._dimensions = dimensions
# Dimension lookup
self._default_dims = {
"text-embedding-3-small": 1536,
"text-embedding-3-large": 3072,
"text-embedding-ada-002": 1536
}
@property
def dimension(self) -> int:
if self._dimensions:
return self._dimensions
return self._default_dims.get(self.model, 1536)
def embed_documents(self, texts: List[str]) -> np.ndarray:
"""Embed documents using OpenAI API."""
# Simulated - actual implementation would call API
# import openai
# response = openai.embeddings.create(
# model=self.model,
# input=texts,
# dimensions=self._dimensions
# )
# return np.array([d.embedding for d in response.data])
# Placeholder for demonstration
return np.random.randn(len(texts), self.dimension).astype(np.float32)
def embed_query(self, text: str) -> np.ndarray:
return self.embed_documents([text])[0]
def embedding_model_comparison():
"""Compare popular embedding models."""
models = {
'all-MiniLM-L6-v2': {
'dimension': 384,
'max_tokens': 256,
'speed': 'Very fast',
'quality': 'Good',
'use_case': 'General purpose, resource-constrained'
},
'all-mpnet-base-v2': {
'dimension': 768,
'max_tokens': 384,
'speed': 'Fast',
'quality': 'Very good',
'use_case': 'General purpose'
},
'e5-large-v2': {
'dimension': 1024,
'max_tokens': 512,
'speed': 'Medium',
'quality': 'Excellent',
'use_case': 'High-accuracy retrieval'
},
'bge-large-en-v1.5': {
'dimension': 1024,
'max_tokens': 512,
'speed': 'Medium',
'quality': 'Excellent',
'use_case': 'Benchmark leader'
},
'text-embedding-3-small': {
'dimension': 1536,
'max_tokens': 8191,
'speed': 'API latency',
'quality': 'Very good',
'use_case': 'Long documents, ease of use'
},
'text-embedding-3-large': {
'dimension': 3072,
'max_tokens': 8191,
'speed': 'API latency',
'quality': 'Excellent',
'use_case': 'Maximum quality'
}
}
print("Embedding Model Comparison:")
print("=" * 60)
for name, attrs in models.items():
print(f"\n{name}:")
for k, v in attrs.items():
print(f" {k}: {v}")
embedding_model_comparison()Vector Stores
Vector databases enable efficient similarity search at scale:
class VectorStore(ABC):
"""Abstract base class for vector stores."""
@abstractmethod
def add(self, documents: List[Document]) -> None:
"""Add documents to the store."""
pass
@abstractmethod
def search(
self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Optional[Dict[str, Any]] = None
) -> List[RetrievalResult]:
"""Search for similar documents."""
pass
@abstractmethod
def delete(self, doc_ids: List[str]) -> None:
"""Delete documents by ID."""
pass
class InMemoryVectorStore(VectorStore):
"""
Simple in-memory vector store using NumPy.
Good for:
- Development and testing
- Small document collections (<100K)
- Prototyping RAG systems
"""
def __init__(self, dimension: int):
self.dimension = dimension
self.documents: List[Document] = []
self.embeddings: Optional[np.ndarray] = None
def add(self, documents: List[Document]) -> None:
"""Add documents to the store."""
for doc in documents:
if doc.embedding is None:
raise ValueError(f"Document {doc.doc_id} has no embedding")
if doc.embedding.shape[0] != self.dimension:
raise ValueError(f"Embedding dimension mismatch")
self.documents.extend(documents)
# Rebuild embedding matrix
embeddings = np.array([doc.embedding for doc in self.documents])
self.embeddings = embeddings
def search(
self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Optional[Dict[str, Any]] = None
) -> List[RetrievalResult]:
"""Search using cosine similarity."""
if self.embeddings is None or len(self.documents) == 0:
return []
# Normalize for cosine similarity
query_norm = query_embedding / np.linalg.norm(query_embedding)
embeddings_norm = self.embeddings / np.linalg.norm(
self.embeddings, axis=1, keepdims=True
)
# Compute similarities
similarities = embeddings_norm @ query_norm
# Apply filters
if filters:
mask = self._apply_filters(filters)
similarities = np.where(mask, similarities, -np.inf)
# Get top-k
top_indices = np.argsort(similarities)[-top_k:][::-1]
results = []
for rank, idx in enumerate(top_indices):
if similarities[idx] > -np.inf:
results.append(RetrievalResult(
document=self.documents[idx],
score=float(similarities[idx]),
rank=rank
))
return results
def _apply_filters(self, filters: Dict[str, Any]) -> np.ndarray:
"""Apply metadata filters."""
mask = np.ones(len(self.documents), dtype=bool)
for key, value in filters.items():
for i, doc in enumerate(self.documents):
if doc.metadata.get(key) != value:
mask[i] = False
return mask
def delete(self, doc_ids: List[str]) -> None:
"""Delete documents by ID."""
doc_ids_set = set(doc_ids)
self.documents = [d for d in self.documents if d.doc_id not in doc_ids_set]
if self.documents:
self.embeddings = np.array([doc.embedding for doc in self.documents])
else:
self.embeddings = None
class FAISSVectorStore(VectorStore):
"""
FAISS-based vector store for efficient similarity search.
FAISS (Facebook AI Similarity Search) supports:
- Multiple index types (Flat, IVF, HNSW, PQ)
- GPU acceleration
- Billion-scale search
"""
def __init__(
self,
dimension: int,
index_type: str = "Flat",
nlist: int = 100,
nprobe: int = 10,
use_gpu: bool = False
):
import faiss
self.dimension = dimension
self.index_type = index_type
self.documents: List[Document] = []
# Create index
if index_type == "Flat":
# Exact search (brute force)
self.index = faiss.IndexFlatIP(dimension)
elif index_type == "IVF":
# Inverted file index for approximate search
quantizer = faiss.IndexFlatIP(dimension)
self.index = faiss.IndexIVFFlat(quantizer, dimension, nlist)
self.index.nprobe = nprobe
elif index_type == "HNSW":
# Hierarchical Navigable Small World graph
self.index = faiss.IndexHNSWFlat(dimension, 32)
else:
raise ValueError(f"Unknown index type: {index_type}")
# Move to GPU if requested
if use_gpu and faiss.get_num_gpus() > 0:
self.index = faiss.index_cpu_to_gpu(
faiss.StandardGpuResources(),
0,
self.index
)
self._needs_training = index_type == "IVF"
self._is_trained = False
def add(self, documents: List[Document]) -> None:
"""Add documents to FAISS index."""
embeddings = np.array([doc.embedding for doc in documents]).astype('float32')
# Normalize for inner product (cosine similarity)
faiss.normalize_L2(embeddings)
# Train if needed (IVF requires training)
if self._needs_training and not self._is_trained:
self.index.train(embeddings)
self._is_trained = True
# Add to index
self.index.add(embeddings)
self.documents.extend(documents)
def search(
self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Optional[Dict[str, Any]] = None
) -> List[RetrievalResult]:
"""Search FAISS index."""
import faiss
query = query_embedding.reshape(1, -1).astype('float32')
faiss.normalize_L2(query)
# Search (may need to fetch more for filtering)
fetch_k = top_k * 3 if filters else top_k
scores, indices = self.index.search(query, fetch_k)
results = []
for rank, (idx, score) in enumerate(zip(indices[0], scores[0])):
if idx == -1: # FAISS returns -1 for missing results
continue
doc = self.documents[idx]
# Apply filters
if filters:
if not self._matches_filters(doc, filters):
continue
results.append(RetrievalResult(
document=doc,
score=float(score),
rank=len(results)
))
if len(results) >= top_k:
break
return results
def _matches_filters(self, doc: Document, filters: Dict[str, Any]) -> bool:
"""Check if document matches filters."""
for key, value in filters.items():
if doc.metadata.get(key) != value:
return False
return True
def delete(self, doc_ids: List[str]) -> None:
"""Delete is complex in FAISS - typically rebuild index."""
# Mark for deletion and rebuild
doc_ids_set = set(doc_ids)
new_docs = [d for d in self.documents if d.doc_id not in doc_ids_set]
# Rebuild index
self.documents = []
self.index.reset()
if self._needs_training:
self._is_trained = False
if new_docs:
self.add(new_docs)
class ChromaVectorStore(VectorStore):
"""
ChromaDB vector store wrapper.
Chroma provides:
- Simple API for RAG applications
- Metadata filtering
- Persistent storage
- Embedding function integration
"""
def __init__(
self,
collection_name: str = "rag_collection",
persist_directory: Optional[str] = None
):
# import chromadb
# if persist_directory:
# self.client = chromadb.PersistentClient(path=persist_directory)
# else:
# self.client = chromadb.Client()
# self.collection = self.client.get_or_create_collection(collection_name)
# Simulated for demonstration
self.collection_name = collection_name
self.documents: Dict[str, Document] = {}
def add(self, documents: List[Document]) -> None:
"""Add documents to Chroma."""
for doc in documents:
self.documents[doc.doc_id] = doc
# Actual Chroma API:
# self.collection.add(
# ids=[d.doc_id for d in documents],
# embeddings=[d.embedding.tolist() for d in documents],
# documents=[d.content for d in documents],
# metadatas=[d.metadata for d in documents]
# )
def search(
self,
query_embedding: np.ndarray,
top_k: int = 10,
filters: Optional[Dict[str, Any]] = None
) -> List[RetrievalResult]:
"""Search Chroma collection."""
# Actual Chroma API:
# results = self.collection.query(
# query_embeddings=[query_embedding.tolist()],
# n_results=top_k,
# where=filters
# )
# Simulated search
results = []
for doc in self.documents.values():
if doc.embedding is not None:
score = np.dot(query_embedding, doc.embedding)
results.append((doc, score))
results.sort(key=lambda x: x[1], reverse=True)
return [
RetrievalResult(document=doc, score=score, rank=i)
for i, (doc, score) in enumerate(results[:top_k])
]
def delete(self, doc_ids: List[str]) -> None:
"""Delete documents from Chroma."""
for doc_id in doc_ids:
self.documents.pop(doc_id, None)
# Actual Chroma API:
# self.collection.delete(ids=doc_ids)
def vector_store_comparison():
"""Compare vector store options."""
stores = {
'In-Memory (NumPy)': {
'scale': '<100K vectors',
'search_type': 'Exact (brute force)',
'persistence': 'None',
'best_for': 'Development, small datasets'
},
'FAISS': {
'scale': 'Billions of vectors',
'search_type': 'Exact or approximate (IVF, HNSW)',
'persistence': 'File-based',
'best_for': 'High-performance, large scale'
},
'ChromaDB': {
'scale': 'Millions of vectors',
'search_type': 'Approximate (HNSW)',
'persistence': 'SQLite or DuckDB',
'best_for': 'Simple RAG applications'
},
'Pinecone': {
'scale': 'Billions (managed)',
'search_type': 'Approximate',
'persistence': 'Cloud managed',
'best_for': 'Production, serverless'
},
'Weaviate': {
'scale': 'Billions',
'search_type': 'HNSW + hybrid',
'persistence': 'Docker/Cloud',
'best_for': 'Hybrid search, GraphQL API'
},
'Qdrant': {
'scale': 'Billions',
'search_type': 'HNSW',
'persistence': 'Docker/Cloud',
'best_for': 'Filtering, payload support'
},
'Milvus': {
'scale': 'Trillions',
'search_type': 'Multiple indexes',
'persistence': 'Distributed',
'best_for': 'Enterprise, high availability'
}
}
print("Vector Store Comparison:")
print("=" * 60)
for name, attrs in stores.items():
print(f"\n{name}:")
for k, v in attrs.items():
print(f" {k}: {v}")
vector_store_comparison()Document Chunking
Effective chunking is critical for RAG quality:
class TextChunker(ABC):
"""Abstract base class for text chunkers."""
@abstractmethod
def chunk(
self,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Split text into chunks."""
pass
class FixedSizeChunker(TextChunker):
"""
Fixed-size chunking with overlap.
Simple but effective baseline approach.
"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 50,
length_function: callable = len
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.length_function = length_function
def chunk(
self,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Split text into fixed-size chunks with overlap."""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# Find chunk text
chunk_text = text[start:end]
# Create document
chunk_metadata = {**(metadata or {}), 'chunk_index': len(chunks)}
chunks.append(Document(
content=chunk_text.strip(),
metadata=chunk_metadata
))
# Move start with overlap
start = end - self.chunk_overlap
return chunks
class RecursiveCharacterChunker(TextChunker):
"""
Recursive chunking that respects text structure.
Tries to split on natural boundaries:
paragraphs > sentences > words > characters
"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: Optional[List[str]] = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or ["\n\n", "\n", ". ", " ", ""]
def chunk(
self,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Recursively split text on natural boundaries."""
chunks = self._split_text(text, self.separators)
return [
Document(
content=chunk.strip(),
metadata={**(metadata or {}), 'chunk_index': i}
)
for i, chunk in enumerate(chunks)
if chunk.strip()
]
def _split_text(
self,
text: str,
separators: List[str]
) -> List[str]:
"""Recursively split text."""
final_chunks = []
# Find the appropriate separator
separator = separators[-1]
for sep in separators:
if sep in text:
separator = sep
break
# Split
if separator:
splits = text.split(separator)
else:
splits = list(text)
# Merge small chunks
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split)
if current_length + split_length > self.chunk_size:
if current_chunk:
merged = separator.join(current_chunk)
final_chunks.append(merged)
# Keep overlap
overlap_chunks = []
overlap_length = 0
for chunk in reversed(current_chunk):
if overlap_length + len(chunk) <= self.chunk_overlap:
overlap_chunks.insert(0, chunk)
overlap_length += len(chunk) + len(separator)
else:
break
current_chunk = overlap_chunks
current_chunk.append(split)
current_length = sum(len(c) for c in current_chunk)
else:
current_chunk.append(split)
current_length += split_length + len(separator)
# Add remaining
if current_chunk:
final_chunks.append(separator.join(current_chunk))
return final_chunks
class SemanticChunker(TextChunker):
"""
Semantic chunking based on embedding similarity.
Splits where semantic similarity between sentences drops,
creating more coherent chunks.
"""
def __init__(
self,
embedding_model: EmbeddingModel,
breakpoint_threshold: float = 0.5,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.embedding_model = embedding_model
self.breakpoint_threshold = breakpoint_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def chunk(
self,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Split text based on semantic similarity."""
# Split into sentences
sentences = self._split_sentences(text)
if len(sentences) <= 1:
return [Document(content=text, metadata=metadata)]
# Get embeddings for each sentence
embeddings = self.embedding_model.embed_documents(sentences)
# Calculate similarities between adjacent sentences
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i + 1])
similarities.append(sim)
# Find breakpoints (low similarity)
breakpoints = []
for i, sim in enumerate(similarities):
if sim < self.breakpoint_threshold:
breakpoints.append(i + 1)
# Create chunks
chunks = []
start = 0
for bp in breakpoints:
chunk_text = " ".join(sentences[start:bp])
# Check size constraints
if len(chunk_text) >= self.min_chunk_size:
chunks.append(chunk_text)
start = bp
# If too small, continue to next breakpoint
# Add remaining
if start < len(sentences):
chunk_text = " ".join(sentences[start:])
if chunks and len(chunk_text) < self.min_chunk_size:
chunks[-1] += " " + chunk_text
else:
chunks.append(chunk_text)
return [
Document(
content=chunk,
metadata={**(metadata or {}), 'chunk_index': i}
)
for i, chunk in enumerate(chunks)
]
def _split_sentences(self, text: str) -> List[str]:
"""Simple sentence splitting."""
import re
sentences = re.split(r'(?<=[.!?])\s+', text)
return [s.strip() for s in sentences if s.strip()]
class MarkdownChunker(TextChunker):
"""
Markdown-aware chunking that respects document structure.
Preserves:
- Headers and hierarchy
- Code blocks
- Lists
"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 100
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk(
self,
text: str,
metadata: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Split markdown respecting structure."""
import re
chunks = []
current_headers = []
# Split by headers
header_pattern = r'^(#{1,6})\s+(.+)$'
sections = re.split(r'(?=^#{1,6}\s)', text, flags=re.MULTILINE)
for section in sections:
if not section.strip():
continue
# Extract header if present
header_match = re.match(header_pattern, section, re.MULTILINE)
if header_match:
level = len(header_match.group(1))
header_text = header_match.group(2)
# Update header hierarchy
current_headers = current_headers[:level-1] + [header_text]
# Create chunk with header context
chunk_metadata = {
**(metadata or {}),
'headers': ' > '.join(current_headers),
'chunk_index': len(chunks)
}
# Split large sections
if len(section) > self.chunk_size:
sub_chunks = self._split_large_section(section)
for i, sub_chunk in enumerate(sub_chunks):
sub_metadata = {**chunk_metadata, 'sub_chunk': i}
chunks.append(Document(content=sub_chunk, metadata=sub_metadata))
else:
chunks.append(Document(content=section, metadata=chunk_metadata))
return chunks
def _split_large_section(self, section: str) -> List[str]:
"""Split large section while preserving code blocks."""
import re
# Protect code blocks
code_blocks = re.findall(r'', section) placeholders = [f"<strong>CODE<em>BLOCK</em>{i}</strong>" for i in range(len(code<em>blocks))]</p>
<p>for placeholder, block in zip(placeholders, code</em>blocks): section = section.replace(block, placeholder)</p>
<h1 id="split-on-paragraphs">Split on paragraphs</h1>
<p>chunks = [] paragraphs = section.split('\n\n') current<em>chunk = [] current</em>length = 0</p>
<p>for para in paragraphs: para<em>length = len(para)</p>
<p>if current</em>length + para<em>length > self.chunk</em>size and current<em>chunk: chunks.append('\n\n'.join(current</em>chunk)) current<em>chunk = [] current</em>length = 0</p>
<p>current<em>chunk.append(para) current</em>length += para<em>length</p>
<p>if current</em>chunk: chunks.append('\n\n'.join(current<em>chunk))</p>
<h1 id="restore-code-blocks">Restore code blocks</h1>
<p>restored</em>chunks = [] for chunk in chunks: for placeholder, block in zip(placeholders, code<em>blocks): chunk = chunk.replace(placeholder, block) restored</em>chunks.append(chunk)</p>
<p>return restored_chunks ``
Key Takeaways
RAG systems combine retrieval and generation to ground LLM responses in external knowledge. The core pipeline involves: (1) chunking documents into manageable pieces, (2) embedding chunks using models like E5 or OpenAI embeddings, (3) storing embeddings in vector databases like FAISS or ChromaDB, (4) retrieving relevant context for queries, and (5) generating responses conditioned on retrieved content. Key considerations include chunk size/overlap trade-offs, embedding model selection based on quality vs. speed, and vector store choice based on scale requirements. RAG offers significant advantages over fine-tuning for knowledge-intensive tasks—instant updates, reduced hallucination, and transparent sourcing—while fine-tuning remains better for style and format adaptation.