Building Production RAG Systems with LangChain & Pinecone
Master production-ready RAG systems with this comprehensive guide. From setup to scaling, learn how to combine LangChain's flexibility with Pinecone's vector search for enterprise-grade AI applications.
Building a chatbot that can answer questions about your company docs? Sounds simple until you hit production. Suddenly you're dealing with retrieval accuracy issues, response times that make users rage-quit, and embedding costs that make your CFO cry.
I've built RAG systems that handle millions of queries monthly, and I'll show you exactly how to do it right with LangChain and Pinecone. No fluff—just the architecture patterns and gotchas that actually matter in production.
Why LangChain + Pinecone for Production RAG?
Let's be honest: there are dozens of RAG frameworks and vector databases out there. But this combo consistently delivers in production because:
- LangChain gives you flexibility without reinventing everything from scratch
- Pinecone handles the heavy lifting of vector search with enterprise reliability
- Both scale horizontally without architectural rewrites
I've seen teams waste months building custom vector search only to discover Pinecone's query performance at scale. Learn from their pain.
Core Architecture: Getting the Foundations Right
Here's the production-ready architecture I use for most RAG systems:
from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
import pinecone
import os
class ProductionRAGSystem:
def __init__(self, pinecone_api_key: str, openai_api_key: str):
# Initialize Pinecone
pinecone.init(
api_key=pinecone_api_key,
environment="us-west1-gcp-free" # Choose your region
)
self.index_name = "production-rag-index"
self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
# Create index if it doesn't exist
if self.index_name not in pinecone.list_indexes():
pinecone.create_index(
name=self.index_name,
dimension=1536, # OpenAI embedding dimension
metric="cosine"
)
self.vectorstore = Pinecone.from_existing_index(
index_name=self.index_name,
embedding=self.embeddings
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=OpenAI(temperature=0, openai_api_key=openai_api_key),
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 5} # Return top 5 chunks
)
)
Document Processing: The Make-or-Break Component
Most RAG failures happen here. Your chunking strategy determines everything downstream. Here's what actually works in production:
def process_documents(self, documents: list[str], metadata: list[dict]):
"""Process documents with production-ready chunking strategy"""
# Use recursive splitter - it's smarter than basic splitting
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Sweet spot for most use cases
chunk_overlap=200, # Prevents context loss
separators=["\n\n", "\n", " ", ""] # Respect document structure
)
all_chunks = []
all_metadata = []
for doc, meta in zip(documents, metadata):
chunks = text_splitter.split_text(doc)
for i, chunk in enumerate(chunks):
# Add chunk-specific metadata
chunk_metadata = {
**meta,
"chunk_id": f"{meta.get('doc_id', 'unknown')}_{i}",
"chunk_index": i,
"total_chunks": len(chunks)
}
all_chunks.append(chunk)
all_metadata.append(chunk_metadata)
# Batch upsert for efficiency
self.vectorstore.add_texts(
texts=all_chunks,
metadatas=all_metadata
)
return len(all_chunks)
Pro Tips for Document Processing
- Chunk size matters: 1000 tokens works for most cases, but experiment with your data
- Overlap is crucial: 200 characters prevents important context from being split
- Metadata is gold: Store document source, timestamp, and any business logic fields
Query Pipeline: Optimizing for Speed and Accuracy
A production query pipeline needs more than just "ask and pray." Here's how to build one that actually works:
def enhanced_query(self, question: str, filters: dict = None) -> dict:
"""Enhanced query with filtering and response metadata"""
# Apply metadata filters if provided
retriever = self.vectorstore.as_retriever(
search_kwargs={
"k": 10, # Get more candidates initially
"filter": filters or {}
}
)
# Get relevant documents
docs = retriever.get_relevant_documents(question)
# Re-rank by relevance score (Pinecone provides this)
scored_docs = [(doc, doc.metadata.get('score', 0)) for doc in docs]
scored_docs.sort(key=lambda x: x[1], reverse=True)
# Take top 5 after re-ranking
top_docs = [doc for doc, _ in scored_docs[:5]]
# Generate answer
response = self.qa_chain.run({
"query": question,
"source_documents": top_docs
})
return {
"answer": response,
"source_chunks": len(top_docs),
"sources": [doc.metadata.get('source', 'unknown') for doc in top_docs]
}
Production Considerations: What They Don't Tell You
Monitoring and Observability
You can't optimize what you can't measure. Track these metrics religiously:
import time
import logging
from functools import wraps
def monitor_rag_performance(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
# Log successful query metrics
logging.info({
"query_time": time.time() - start_time,
"chunks_retrieved": result.get('source_chunks', 0),
"query_length": len(args[1]) if len(args) > 1 else 0,
"status": "success"
})
return result
except Exception as e:
# Log errors with context
logging.error({
"query_time": time.time() - start_time,
"error": str(e),
"status": "error"
})
raise
return wrapper
Cost Optimization Strategies
Reality check: Embedding costs add up fast. A system processing 100k queries/day can easily hit $500/month in OpenAI embedding costs alone.
Here's how to keep costs sane:
- Cache embeddings: Hash query text and cache embeddings for repeated queries
- Batch processing: Process documents in batches during off-peak hours
- Hybrid search: Combine semantic search with keyword filters to reduce vector operations
Scaling Strategies That Actually Work
When your RAG system grows beyond the prototype phase, you'll hit these bottlenecks. Here's how to handle them:
Horizontal Scaling
class ScalableRAGSystem(ProductionRAGSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Use multiple Pinecone indexes for different domains
self.domain_indexes = {
"legal": "legal-docs-index",
"technical": "tech-docs-index",
"general": "general-docs-index"
}
def route_query(self, question: str, domain: str = "general") -> dict:
"""Route queries to domain-specific indexes"""
index_name = self.domain_indexes.get(domain, "general")
# Switch to domain-specific vectorstore
domain_vectorstore = Pinecone.from_existing_index(
index_name=index_name,
embedding=self.embeddings
)
# Create domain-specific QA chain
domain_qa = RetrievalQA.from_chain_type(
llm=self.qa_chain.llm,
chain_type="stuff",
retriever=domain_vectorstore.as_retriever(
search_kwargs={"k": 5}
)
)
return domain_qa.run(question)
Common Pitfalls and How to Avoid Them
- Chunk size too small: Results in fragmented context and poor answers
- No metadata strategy: Makes debugging and filtering impossible
- Ignoring embedding model limits: OpenAI has token limits that will break your pipeline
- No fallback handling: Always have a graceful degradation strategy
Actionable Takeaways
Building production RAG systems isn't just about connecting APIs. Focus on:
- Document processing strategy—get this right and everything else becomes easier
- Monitoring from day one—you'll need those metrics when things break
- Cost management—embedding costs scale linearly with usage
- Domain-specific optimization—one size doesn't fit all use cases
The architecture I've shown you handles millions of queries monthly across multiple production systems. Start with this foundation, then optimize based on your specific metrics and constraints.
Remember: the best RAG system is the one your users actually want to use. Focus on response quality and speed over fancy features, and you'll build something that lasts.