Back to Blog
Tutorials

Building Production RAG Systems with LangChain & Pinecone

Master production-ready RAG systems with this comprehensive guide. From setup to scaling, learn how to combine LangChain's flexibility with Pinecone's vector search for enterprise-grade AI applications.

TensorHQ Team·December 1, 2025·5 min read
Share:
Building Production RAG Systems with LangChain & Pinecone

Building a chatbot that can answer questions about your company docs? Sounds simple until you hit production. Suddenly you're dealing with retrieval accuracy issues, response times that make users rage-quit, and embedding costs that make your CFO cry.

I've built RAG systems that handle millions of queries monthly, and I'll show you exactly how to do it right with LangChain and Pinecone. No fluff—just the architecture patterns and gotchas that actually matter in production.

Why LangChain + Pinecone for Production RAG?

Let's be honest: there are dozens of RAG frameworks and vector databases out there. But this combo consistently delivers in production because:

  • LangChain gives you flexibility without reinventing everything from scratch
  • Pinecone handles the heavy lifting of vector search with enterprise reliability
  • Both scale horizontally without architectural rewrites

I've seen teams waste months building custom vector search only to discover Pinecone's query performance at scale. Learn from their pain.

Core Architecture: Getting the Foundations Right

Here's the production-ready architecture I use for most RAG systems:

from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.llms import OpenAI
import pinecone
import os

class ProductionRAGSystem:
    def __init__(self, pinecone_api_key: str, openai_api_key: str):
        # Initialize Pinecone
        pinecone.init(
            api_key=pinecone_api_key,
            environment="us-west1-gcp-free"  # Choose your region
        )
        
        self.index_name = "production-rag-index"
        self.embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
        
        # Create index if it doesn't exist
        if self.index_name not in pinecone.list_indexes():
            pinecone.create_index(
                name=self.index_name,
                dimension=1536,  # OpenAI embedding dimension
                metric="cosine"
            )
        
        self.vectorstore = Pinecone.from_existing_index(
            index_name=self.index_name,
            embedding=self.embeddings
        )
        
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=OpenAI(temperature=0, openai_api_key=openai_api_key),
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 5}  # Return top 5 chunks
            )
        )

Document Processing: The Make-or-Break Component

Most RAG failures happen here. Your chunking strategy determines everything downstream. Here's what actually works in production:

def process_documents(self, documents: list[str], metadata: list[dict]):
    """Process documents with production-ready chunking strategy"""
    
    # Use recursive splitter - it's smarter than basic splitting
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000,        # Sweet spot for most use cases
        chunk_overlap=200,      # Prevents context loss
        separators=["\n\n", "\n", " ", ""]  # Respect document structure
    )
    
    all_chunks = []
    all_metadata = []
    
    for doc, meta in zip(documents, metadata):
        chunks = text_splitter.split_text(doc)
        
        for i, chunk in enumerate(chunks):
            # Add chunk-specific metadata
            chunk_metadata = {
                **meta,
                "chunk_id": f"{meta.get('doc_id', 'unknown')}_{i}",
                "chunk_index": i,
                "total_chunks": len(chunks)
            }
            
            all_chunks.append(chunk)
            all_metadata.append(chunk_metadata)
    
    # Batch upsert for efficiency
    self.vectorstore.add_texts(
        texts=all_chunks,
        metadatas=all_metadata
    )
    
    return len(all_chunks)

Pro Tips for Document Processing

  • Chunk size matters: 1000 tokens works for most cases, but experiment with your data
  • Overlap is crucial: 200 characters prevents important context from being split
  • Metadata is gold: Store document source, timestamp, and any business logic fields

Query Pipeline: Optimizing for Speed and Accuracy

A production query pipeline needs more than just "ask and pray." Here's how to build one that actually works:

def enhanced_query(self, question: str, filters: dict = None) -> dict:
    """Enhanced query with filtering and response metadata"""
    
    # Apply metadata filters if provided
    retriever = self.vectorstore.as_retriever(
        search_kwargs={
            "k": 10,  # Get more candidates initially
            "filter": filters or {}
        }
    )
    
    # Get relevant documents
    docs = retriever.get_relevant_documents(question)
    
    # Re-rank by relevance score (Pinecone provides this)
    scored_docs = [(doc, doc.metadata.get('score', 0)) for doc in docs]
    scored_docs.sort(key=lambda x: x[1], reverse=True)
    
    # Take top 5 after re-ranking
    top_docs = [doc for doc, _ in scored_docs[:5]]
    
    # Generate answer
    response = self.qa_chain.run({
        "query": question,
        "source_documents": top_docs
    })
    
    return {
        "answer": response,
        "source_chunks": len(top_docs),
        "sources": [doc.metadata.get('source', 'unknown') for doc in top_docs]
    }

Production Considerations: What They Don't Tell You

Monitoring and Observability

You can't optimize what you can't measure. Track these metrics religiously:

import time
import logging
from functools import wraps

def monitor_rag_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            
            # Log successful query metrics
            logging.info({
                "query_time": time.time() - start_time,
                "chunks_retrieved": result.get('source_chunks', 0),
                "query_length": len(args[1]) if len(args) > 1 else 0,
                "status": "success"
            })
            
            return result
            
        except Exception as e:
            # Log errors with context
            logging.error({
                "query_time": time.time() - start_time,
                "error": str(e),
                "status": "error"
            })
            raise
    
    return wrapper

Cost Optimization Strategies

Reality check: Embedding costs add up fast. A system processing 100k queries/day can easily hit $500/month in OpenAI embedding costs alone.

Here's how to keep costs sane:

  • Cache embeddings: Hash query text and cache embeddings for repeated queries
  • Batch processing: Process documents in batches during off-peak hours
  • Hybrid search: Combine semantic search with keyword filters to reduce vector operations

Scaling Strategies That Actually Work

When your RAG system grows beyond the prototype phase, you'll hit these bottlenecks. Here's how to handle them:

Horizontal Scaling

class ScalableRAGSystem(ProductionRAGSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        # Use multiple Pinecone indexes for different domains
        self.domain_indexes = {
            "legal": "legal-docs-index",
            "technical": "tech-docs-index",
            "general": "general-docs-index"
        }
    
    def route_query(self, question: str, domain: str = "general") -> dict:
        """Route queries to domain-specific indexes"""
        
        index_name = self.domain_indexes.get(domain, "general")
        
        # Switch to domain-specific vectorstore
        domain_vectorstore = Pinecone.from_existing_index(
            index_name=index_name,
            embedding=self.embeddings
        )
        
        # Create domain-specific QA chain
        domain_qa = RetrievalQA.from_chain_type(
            llm=self.qa_chain.llm,
            chain_type="stuff",
            retriever=domain_vectorstore.as_retriever(
                search_kwargs={"k": 5}
            )
        )
        
        return domain_qa.run(question)

Common Pitfalls and How to Avoid Them

  • Chunk size too small: Results in fragmented context and poor answers
  • No metadata strategy: Makes debugging and filtering impossible
  • Ignoring embedding model limits: OpenAI has token limits that will break your pipeline
  • No fallback handling: Always have a graceful degradation strategy

Actionable Takeaways

Building production RAG systems isn't just about connecting APIs. Focus on:

  1. Document processing strategy—get this right and everything else becomes easier
  2. Monitoring from day one—you'll need those metrics when things break
  3. Cost management—embedding costs scale linearly with usage
  4. Domain-specific optimization—one size doesn't fit all use cases

The architecture I've shown you handles millions of queries monthly across multiple production systems. Start with this foundation, then optimize based on your specific metrics and constraints.

Remember: the best RAG system is the one your users actually want to use. Focus on response quality and speed over fancy features, and you'll build something that lasts.

📬

Subscribe to Our Newsletter

Get the latest AI insights, tutorials, and industry news delivered to your inbox weekly.

Free, weekly, unsubscribe anytime. No spam, ever.