Integrations: Building RAG with Supacrawler, LangChain, and Supabase pgvector for Enterprise
LangChain has become the go-to framework for building sophisticated AI applications, providing powerful abstractions for document processing, embeddings, and retrieval systems. This comprehensive guide shows you how to combine LangChain's robust ecosystem with Supacrawler's intelligent web crawling and Supabase's vector storage to build a production-ready RAG system.
By leveraging LangChain's document loaders, text splitters, and retrieval chains, you'll create a RAG system that's both powerful and maintainable, perfect for complex document processing and advanced retrieval scenarios.
If you'd like to try it yourself you can check the Langchain Vectors notebook.
Table of Contents
- LangChain RAG Architecture
- Setting Up the Development Environment
- Supabase pgvector Configuration
- Intelligent Web Crawling with Supacrawler
- LangChain Document Processing
- Advanced Text Chunking Strategies
- OpenAI Embeddings Integration
- LangChain PGVector Storage
- Building Retrieval Chains
- Advanced Query Processing
- Production Optimization
- Monitoring and Evaluation
LangChain RAG Architecture
Our RAG system leverages LangChain's modular architecture for maximum flexibility and maintainability:
Component | Technology | Purpose | LangChain Module |
---|---|---|---|
Web Crawling | Supacrawler | Extract clean content from websites | Custom Document Loader |
Document Processing | LangChain | Parse and structure crawled content | Document Loaders & Transformers |
Text Chunking | LangChain | Split documents into searchable segments | Text Splitters |
Embeddings | OpenAI | Convert text to high-dimensional vectors | OpenAI Embeddings |
Vector Storage | Supabase pgvector | Store and search vectors efficiently | PGVector |
Retrieval | LangChain | Find relevant documents for queries | Retrievers |
Generation | OpenAI | Generate responses with retrieved context | Chat Models |
Key Advantages of LangChain Integration
- Modular Design: Easy to swap components and experiment with different approaches
- Rich Ecosystem: Access to 100+ document loaders, text splitters, and integrations
- Chain Abstractions: Build complex workflows with simple, reusable components
- Memory Management: Handle conversation context and multi-turn interactions
- Evaluation Tools: Built-in metrics and evaluation frameworks
Setting Up the Development Environment
First, install LangChain and all required dependencies:
# Core LangChain packagespip install langchain langchain-community langchain-openai# Vector storage and databasepip install langchain-postgres psycopg2-binary sqlalchemy# Text processing and utilitiespip install langchain-text-splitters beautifulsoup4 markdownify# Supacrawler and environment managementpip install supacrawler python-dotenv# Optional: Advanced featurespip install langchain-experimental # For experimental features
Create your environment configuration:
# .envSUPACRAWLER_API_KEY=your_supacrawler_api_keyOPENAI_API_KEY=your_openai_api_keySUPABASE_URL=your_supabase_project_urlSUPABASE_KEY=your_supabase_anon_keyDATABASE_URL=postgresql://postgres:[password]@db.[project].supabase.co:5432/postgres# LangChain settingsLANGCHAIN_TRACING_V2=trueLANGCHAIN_API_KEY=your_langchain_api_key # Optional: for LangSmith tracing
Supabase pgvector Configuration
Enable pgvector in your Supabase project:
import osfrom sqlalchemy import create_engine, textfrom dotenv import load_dotenvload_dotenv()def setup_supabase_pgvector():"""Setup Supabase with pgvector extension"""engine = create_engine(os.getenv('DATABASE_URL'))with engine.connect() as connection:# Enable pgvector extensionconnection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))connection.commit()print("✅ pgvector extension enabled in Supabase")return engine# Setup databaseengine = setup_supabase_pgvector()
Intelligent Web Crawling with Supacrawler
Create a custom LangChain document loader that uses Supacrawler for intelligent web crawling:
import osfrom typing import List, Dict, Any, Optionalfrom langchain_core.documents import Documentfrom langchain_core.document_loaders import BaseLoaderfrom supacrawler import SupacrawlerClientfrom dotenv import load_dotenvload_dotenv()class SupacrawlerDocumentLoader(BaseLoader):"""LangChain document loader that uses Supacrawler for web crawling"""def __init__(self,url: str,api_key: Optional[str] = None,include_patterns: Optional[List[str]] = None,exclude_patterns: Optional[List[str]] = None,depth: int = 3,link_limit: int = 200,**crawl_kwargs):"""Initialize Supacrawler document loaderArgs:url: Starting URL to crawlapi_key: Supacrawler API key (defaults to environment variable)include_patterns: URL patterns to include (e.g., ['/docs/*'])exclude_patterns: URL patterns to exclude (e.g., ['/blog/*'])depth: Maximum crawl depthlink_limit: Maximum number of pages to crawl**crawl_kwargs: Additional crawl parameters"""self.url = urlself.client = SupacrawlerClient(api_key=api_key or os.getenv('SUPACRAWLER_API_KEY'))# Default crawl configuration optimized for documentationself.crawl_config = {'url': url,'format': 'markdown','depth': depth,'link_limit': link_limit,'render_js': True,'include_patterns': include_patterns or ['/docs/*', '/api/*', '/guides/*'],'exclude_patterns': exclude_patterns or ['/blog/*', '/changelog/*', '/privacy/*'],'timeout': 30000,'wait_for': '.main-content, .content, main, article','block_ads': True,'block_cookies': True,**crawl_kwargs}def load(self) -> List[Document]:"""Crawl website and return LangChain Documents"""print(f"🚀 Starting crawl of {self.url}")# Create and execute crawl jobjob = self.client.create_crawl_job(**self.crawl_config)result = self.client.wait_for_crawl(job.job_id)if result.status != 'completed':raise Exception(f"Crawl failed with status: {result.status}")crawl_data = result.data.get('crawl_data', {})print(f"✅ Crawl completed! Found {len(crawl_data)} pages")# Convert to LangChain Documentsdocuments = []for url, page_data in crawl_data.items():content = page_data.get('markdown', '')metadata = page_data.get('metadata', {})if not content or len(content.strip()) < 100:continue# Create LangChain Document with rich metadatadoc = Document(page_content=content,metadata={'url': url,'title': metadata.get('title', ''),'description': metadata.get('description', ''),'keywords': metadata.get('keywords', ''),'author': metadata.get('author', ''),'source': 'supacrawler','crawl_timestamp': result.data.get('timestamp'),'content_length': len(content),'word_count': len(content.split())})documents.append(doc)print(f"📄 Created {len(documents)} LangChain documents")return documentsdef lazy_load(self) -> List[Document]:"""Lazy loading implementation (same as load for web crawling)"""return self.load()# Example usageloader = SupacrawlerDocumentLoader(url="https://python.langchain.com/docs",include_patterns=['/docs/*'],exclude_patterns=['/docs/changelog/*'],depth=3,link_limit=150)# Load documentsdocuments = loader.load()print(f"\n📊 Loaded {len(documents)} documents")print(f"📈 Total content: {sum(len(doc.page_content) for doc in documents):,} characters")# Display sample documentif documents:sample_doc = documents[0]print(f"\n📋 Sample Document:")print(f"Title: {sample_doc.metadata['title']}")print(f"URL: {sample_doc.metadata['url']}")print(f"Content preview: {sample_doc.page_content[:300]}...")
LangChain Document Processing
Process the crawled documents using LangChain's powerful document transformers:
from langchain_core.documents import Documentfrom langchain_text_splitters import RecursiveCharacterTextSplitter, MarkdownHeaderTextSplitterfrom typing import Listimport reclass AdvancedDocumentProcessor:"""Advanced document processing using LangChain's text splitters"""def __init__(self):# Initialize different text splitters for different content typesself.markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[("#", "Header 1"),("##", "Header 2"),("###", "Header 3"),("####", "Header 4"),])self.recursive_splitter = RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200,length_function=len,separators=["\n\n", "\n", " ", ""])def clean_document_content(self, doc: Document) -> Document:"""Clean and normalize document content"""content = doc.page_content# Remove excessive whitespacecontent = re.sub(r'\n\s*\n\s*\n', '\n\n', content)content = re.sub(r'[ \t]+', ' ', content)# Clean markdown artifactscontent = re.sub(r'!\[([^\]]*)\]\([^)]+\)', r'[Image: \1]', content) # Imagescontent = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', content) # Links to text# Remove code blocks for general content (keep for API docs)if 'api' not in doc.metadata.get('url', '').lower():content = re.sub(r'```[\s\S]*?```', '[Code Block]', content)content = re.sub(r'`([^`]+)`', r'\1', content)# Create cleaned documentcleaned_doc = Document(page_content=content.strip(),metadata={**doc.metadata, 'processed': True})return cleaned_docdef split_documents_intelligently(self, documents: List[Document]) -> List[Document]:"""Split documents using intelligent strategies based on content type"""all_chunks = []for doc in documents:# Clean document firstcleaned_doc = self.clean_document_content(doc)# Choose splitting strategy based on contenturl = doc.metadata.get('url', '')content = cleaned_doc.page_contentif self._is_structured_markdown(content):# Use markdown-aware splitting for structured contentchunks = self._split_markdown_document(cleaned_doc)else:# Use recursive splitting for general contentchunks = self.recursive_splitter.split_documents([cleaned_doc])# Add chunk metadatafor i, chunk in enumerate(chunks):chunk.metadata.update({'chunk_index': i,'total_chunks': len(chunks),'chunk_id': f"{doc.metadata.get('url', 'unknown')}#{i}",'parent_document_id': doc.metadata.get('url', 'unknown')})all_chunks.append(chunk)print(f"📊 Split {len(documents)} documents into {len(all_chunks)} chunks")return all_chunksdef _is_structured_markdown(self, content: str) -> bool:"""Detect if content has clear markdown structure"""header_count = len(re.findall(r'^#{1,6}\s+', content, re.MULTILINE))lines = content.count('\n')# If more than 10% of lines are headers, consider it structuredreturn lines > 0 and (header_count / lines) > 0.1def _split_markdown_document(self, doc: Document) -> List[Document]:"""Split markdown document preserving header hierarchy"""# First split by headersheader_chunks = self.markdown_splitter.split_text(doc.page_content)# Then further split large chunksfinal_chunks = []for chunk in header_chunks:if len(chunk.page_content) > 1200:# Further split large sectionssub_chunks = self.recursive_splitter.split_documents([chunk])final_chunks.extend(sub_chunks)else:final_chunks.append(chunk)# Preserve original metadatafor chunk in final_chunks:chunk.metadata.update(doc.metadata)return final_chunks# Process documentsprocessor = AdvancedDocumentProcessor()document_chunks = processor.split_documents_intelligently(documents)print(f"\n📈 Processing Statistics:")print(f"Original documents: {len(documents)}")print(f"Generated chunks: {len(document_chunks)}")print(f"Average chunk size: {sum(len(chunk.page_content) for chunk in document_chunks) // len(document_chunks)} characters")# Show sample chunkif document_chunks:sample_chunk = document_chunks[0]print(f"\n📋 Sample Chunk:")print(f"Chunk ID: {sample_chunk.metadata['chunk_id']}")print(f"Title: {sample_chunk.metadata['title']}")print(f"Content: {sample_chunk.page_content[:200]}...")
Advanced Text Chunking Strategies
LangChain provides sophisticated text splitting options for different use cases:
from langchain_text_splitters import (RecursiveCharacterTextSplitter,TokenTextSplitter,MarkdownHeaderTextSplitter,HTMLHeaderTextSplitter)from langchain_community.document_transformers import Html2TextTransformerclass AdaptiveChunkingStrategy:"""Adaptive chunking that selects the best strategy based on content type"""def __init__(self):self.strategies = {'recursive': RecursiveCharacterTextSplitter(chunk_size=1000,chunk_overlap=200,separators=["\n\n", "\n", ". ", " ", ""]),'token_based': TokenTextSplitter(chunk_size=800,chunk_overlap=100),'markdown': MarkdownHeaderTextSplitter(headers_to_split_on=[("#", "Header 1"),("##", "Header 2"),("###", "Header 3"),]),'semantic': RecursiveCharacterTextSplitter(chunk_size=1500,chunk_overlap=300,separators=["\n\n", "\n", ". ", " "])}def choose_chunking_strategy(self, doc: Document) -> str:"""Choose optimal chunking strategy based on content analysis"""content = doc.page_contentmetadata = doc.metadata# Analyze content characteristicshas_headers = bool(re.search(r'^#{1,6}\s+', content, re.MULTILINE))has_code = bool(re.search(r'```|`[^`]+`', content))is_api_doc = 'api' in metadata.get('url', '').lower()is_long_form = len(content) > 2000# Decision logicif has_headers and is_long_form:return 'markdown'elif is_api_doc or has_code:return 'semantic' # Preserve more context for technical contentelif is_long_form:return 'token_based' # More precise for long contentelse:return 'recursive' # Default for general contentdef chunk_document(self, doc: Document) -> List[Document]:"""Chunk document using adaptive strategy"""strategy_name = self.choose_chunking_strategy(doc)strategy = self.strategies[strategy_name]print(f"📄 Using '{strategy_name}' strategy for: {doc.metadata.get('title', 'Unknown')}")if strategy_name == 'markdown':# Special handling for markdownchunks = strategy.split_text(doc.page_content)# Convert back to Documentsresult_chunks = []for chunk in chunks:new_doc = Document(page_content=chunk.page_content,metadata={**doc.metadata, **chunk.metadata, 'chunking_strategy': strategy_name})result_chunks.append(new_doc)return result_chunkselse:chunks = strategy.split_documents([doc])for chunk in chunks:chunk.metadata['chunking_strategy'] = strategy_namereturn chunks# Apply adaptive chunkingadaptive_chunker = AdaptiveChunkingStrategy()# Process all documents with adaptive chunkingadaptive_chunks = []for doc in documents:doc_chunks = adaptive_chunker.chunk_document(doc)adaptive_chunks.extend(doc_chunks)print(f"\n🔄 Adaptive Chunking Results:")print(f"Total chunks: {len(adaptive_chunks)}")# Analyze strategy distributionstrategy_counts = {}for chunk in adaptive_chunks:strategy = chunk.metadata.get('chunking_strategy', 'unknown')strategy_counts[strategy] = strategy_counts.get(strategy, 0) + 1print(f"📊 Strategy distribution:")for strategy, count in strategy_counts.items():print(f" {strategy}: {count} chunks")
OpenAI Embeddings Integration
Integrate OpenAI embeddings with LangChain's embedding abstractions:
from langchain_openai import OpenAIEmbeddingsfrom langchain_core.documents import Documentfrom typing import Listimport osclass OptimizedOpenAIEmbeddings:"""Optimized OpenAI embeddings with batching and error handling"""def __init__(self, api_key: str = None, model: str = "text-embedding-3-small"):self.embeddings = OpenAIEmbeddings(openai_api_key=api_key or os.getenv('OPENAI_API_KEY'),model=model,show_progress_bar=True)self.model = modelself.dimension = 1536 if model == "text-embedding-3-small" else 3072def embed_documents_with_metadata(self, documents: List[Document]) -> List[Document]:"""Embed documents and add embedding vectors to metadata"""print(f"🧠 Generating embeddings for {len(documents)} documents...")# Extract texts for embeddingtexts = [doc.page_content for doc in documents]try:# Generate embeddings in batchembeddings = self.embeddings.embed_documents(texts)# Add embeddings to documentsembedded_docs = []for doc, embedding in zip(documents, embeddings):# Create new document with embedding in metadataembedded_doc = Document(page_content=doc.page_content,metadata={**doc.metadata,'embedding': embedding,'embedding_model': self.model,'embedding_dimension': len(embedding)})embedded_docs.append(embedded_doc)print(f"✅ Successfully embedded {len(embedded_docs)} documents")return embedded_docsexcept Exception as e:print(f"❌ Error generating embeddings: {e}")return []def embed_query(self, query: str) -> List[float]:"""Embed a query string"""return self.embeddings.embed_query(query)# Generate embeddings for our chunksembedder = OptimizedOpenAIEmbeddings()embedded_chunks = embedder.embed_documents_with_metadata(document_chunks)print(f"\n📊 Embedding Statistics:")print(f"Embedded chunks: {len(embedded_chunks)}")if embedded_chunks:print(f"Embedding dimension: {embedded_chunks[0].metadata['embedding_dimension']}")print(f"Model used: {embedded_chunks[0].metadata['embedding_model']}")
LangChain PGVector Storage
Use LangChain's PGVector integration for seamless vector storage:
from langchain_postgres import PGVectorfrom langchain_openai import OpenAIEmbeddingsfrom sqlalchemy import create_engineimport osclass LangChainVectorStore:"""LangChain-integrated vector store using Supabase pgvector"""def __init__(self, connection_string: str, collection_name: str = "langchain_documents"):self.connection_string = connection_stringself.collection_name = collection_nameself.engine = create_engine(connection_string)# Initialize embeddingsself.embeddings = OpenAIEmbeddings(model="text-embedding-3-small",openai_api_key=os.getenv('OPENAI_API_KEY'))# Initialize vector storeself.vector_store = PGVector(connection=self.engine,collection_name=collection_name,embeddings=self.embeddings,use_jsonb=True # Use JSONB for metadata storage)print(f"📦 Initialized LangChain PGVector store: {collection_name}")def add_documents(self, documents: List[Document]) -> List[str]:"""Add documents to vector store"""print(f"💾 Adding {len(documents)} documents to vector store...")try:# Add documents (LangChain handles embedding automatically)doc_ids = self.vector_store.add_documents(documents)print(f"✅ Successfully added {len(doc_ids)} documents")return doc_idsexcept Exception as e:print(f"❌ Error adding documents: {e}")return []def create_retriever(self, search_type: str = "similarity", search_kwargs: dict = None):"""Create a LangChain retriever"""search_kwargs = search_kwargs or {"k": 5}retriever = self.vector_store.as_retriever(search_type=search_type,search_kwargs=search_kwargs)print(f"🔍 Created retriever with search_type='{search_type}', k={search_kwargs.get('k', 5)}")return retrieverdef similarity_search_with_score(self, query: str, k: int = 5):"""Search with similarity scores"""return self.vector_store.similarity_search_with_score(query, k=k)# Initialize vector storevector_store = LangChainVectorStore(connection_string=os.getenv('DATABASE_URL'),collection_name="supacrawler_langchain_rag")# Add our embedded documentsdocument_ids = vector_store.add_documents(document_chunks)print(f"\n🎯 Vector store ready with {len(document_ids)} documents!")
Building Retrieval Chains
Create sophisticated retrieval chains using LangChain's composable components:
from langchain_core.output_parsers import StrOutputParserfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.runnables import RunnablePassthroughfrom langchain_openai import ChatOpenAIfrom langchain.chains import create_retrieval_chainfrom langchain.chains.combine_documents import create_stuff_documents_chainclass AdvancedRAGChain:"""Advanced RAG chain with multiple retrieval strategies"""def __init__(self, vector_store: LangChainVectorStore):self.vector_store = vector_storeself.llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0.1,openai_api_key=os.getenv('OPENAI_API_KEY'))# Create different retrieversself.retrievers = {'similarity': vector_store.create_retriever(search_type="similarity",search_kwargs={"k": 5}),'mmr': vector_store.create_retriever(search_type="mmr",search_kwargs={"k": 5, "fetch_k": 20}),'similarity_score': vector_store.create_retriever(search_type="similarity_score_threshold",search_kwargs={"score_threshold": 0.7, "k": 5})}self.chains = {}self._build_chains()def _build_chains(self):"""Build different retrieval chains"""# Standard RAG promptrag_prompt = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions based on the provided context.Use only the information from the context to answer the question.If the context doesn't contain enough information, say so clearly.Context: {context}Question: {input}Answer:""")# Advanced RAG prompt with source citationcitation_prompt = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions based on the provided context.Use only the information from the context to answer the question.Always cite your sources by mentioning the relevant document titles or URLs.If the context doesn't contain enough information, say so clearly.Context: {context}Question: {input}Answer (with citations):""")# Build chains for each retrieverfor name, retriever in self.retrievers.items():# Standard chainquestion_answer_chain = create_stuff_documents_chain(self.llm, rag_prompt)self.chains[f"{name}_basic"] = create_retrieval_chain(retriever, question_answer_chain)# Citation chaincitation_chain = create_stuff_documents_chain(self.llm, citation_prompt)self.chains[f"{name}_citation"] = create_retrieval_chain(retriever, citation_chain)def ask(self, question: str, chain_type: str = "similarity_citation") -> dict:"""Ask a question using specified chain"""if chain_type not in self.chains:available_chains = list(self.chains.keys())raise ValueError(f"Chain type '{chain_type}' not available. Choose from: {available_chains}")chain = self.chains[chain_type]print(f"🤔 Processing question with '{chain_type}' chain...")result = chain.invoke({"input": question})# Enhanced result with metadataenhanced_result = {'question': question,'answer': result['answer'],'chain_type': chain_type,'source_documents': result.get('context', []),'num_sources': len(result.get('context', [])),'sources': self._extract_sources(result.get('context', []))}return enhanced_resultdef _extract_sources(self, documents) -> List[dict]:"""Extract source information from retrieved documents"""sources = []seen_urls = set()for doc in documents:url = doc.metadata.get('url', 'Unknown')if url not in seen_urls:sources.append({'url': url,'title': doc.metadata.get('title', 'Untitled'),'description': doc.metadata.get('description', ''),'relevance_score': doc.metadata.get('score', 'N/A')})seen_urls.add(url)return sourcesdef compare_retrieval_strategies(self, question: str):"""Compare different retrieval strategies for a question"""print(f"\n🔍 Comparing retrieval strategies for: '{question}'")print("=" * 60)results = {}for chain_name in ['similarity_basic', 'mmr_basic', 'similarity_score_basic']:try:result = self.ask(question, chain_name)results[chain_name] = resultprint(f"\n{chain_name.upper()}:")print(f"Answer: {result['answer'][:200]}...")print(f"Sources: {result['num_sources']} documents")except Exception as e:print(f"❌ Error with {chain_name}: {e}")return results# Create advanced RAG chainrag_chain = AdvancedRAGChain(vector_store)# Test the systemtest_questions = ["How do I install LangChain?","What are the different types of text splitters in LangChain?","How do I use OpenAI embeddings with LangChain?","What is the difference between similarity search and MMR?"]print("\n🧪 Testing Advanced RAG System:")print("=" * 50)for question in test_questions:result = rag_chain.ask(question)print(f"\n❓ {result['question']}")print(f"💡 {result['answer']}")print(f"📚 Sources ({result['num_sources']}):")for source in result['sources'][:3]: # Show top 3 sourcesprint(f" • {source['title']} - {source['url']}")print("-" * 50)
Advanced Query Processing
Implement sophisticated query processing with conversation memory and query enhancement:
from langchain.memory import ConversationBufferWindowMemoryfrom langchain.chains import ConversationalRetrievalChainfrom langchain_core.prompts import PromptTemplateclass ConversationalRAG:"""Conversational RAG system with memory and context awareness"""def __init__(self, vector_store: LangChainVectorStore):self.vector_store = vector_storeself.llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0.1)# Initialize memoryself.memory = ConversationBufferWindowMemory(memory_key="chat_history",output_key="answer",return_messages=True,k=5 # Remember last 5 exchanges)# Create retrieverself.retriever = vector_store.create_retriever(search_type="mmr",search_kwargs={"k": 6, "fetch_k": 20})# Custom prompt for conversational RAGself.qa_prompt = PromptTemplate(template="""You are a helpful AI assistant with expertise in the provided documentation.Use the following context and conversation history to answer the question.Previous conversation:{chat_history}Context from documentation:{context}Current question: {question}Instructions:1. Use only information from the provided context2. Consider the conversation history for context3. If you cannot answer based on the context, say so clearly4. Provide specific examples when available5. If referring to previous questions, be explicit about the connectionAnswer:""",input_variables=["context", "question", "chat_history"])# Create conversational chainself.qa_chain = ConversationalRetrievalChain.from_llm(llm=self.llm,retriever=self.retriever,memory=self.memory,combine_docs_chain_kwargs={"prompt": self.qa_prompt},return_source_documents=True,verbose=True)def chat(self, question: str) -> dict:"""Have a conversation with the RAG system"""print(f"💬 User: {question}")try:result = self.qa_chain.invoke({"question": question})answer = result["answer"]source_docs = result.get("source_documents", [])print(f"🤖 Assistant: {answer}")return {"question": question,"answer": answer,"sources": [doc.metadata.get('url', 'Unknown') for doc in source_docs],"source_documents": source_docs}except Exception as e:error_msg = f"Sorry, I encountered an error: {e}"print(f"❌ {error_msg}")return {"question": question,"answer": error_msg,"sources": [],"source_documents": []}def get_conversation_history(self):"""Get the current conversation history"""return self.memory.chat_memory.messagesdef clear_memory(self):"""Clear conversation memory"""self.memory.clear()print("🧹 Conversation memory cleared")# Create conversational RAGconversational_rag = ConversationalRAG(vector_store)# Example conversationprint("\n💬 Starting Conversational RAG Demo:")print("=" * 50)conversation_flow = ["What is LangChain?","How do I install it?","What are the main components I should know about?","Can you explain more about text splitters?","Which text splitter should I use for long documents?"]for question in conversation_flow:result = conversational_rag.chat(question)print(f"📚 Sources: {len(result['sources'])} documents")print("-" * 30)
Production Optimization
Optimize the system for production deployment:
import asynciofrom concurrent.futures import ThreadPoolExecutorfrom langchain.callbacks import get_openai_callbackimport timeclass ProductionRAGSystem:"""Production-optimized RAG system with performance monitoring"""def __init__(self, vector_store: LangChainVectorStore):self.vector_store = vector_storeself.llm = ChatOpenAI(model="gpt-3.5-turbo",temperature=0.1,max_retries=3,request_timeout=30)# Performance metricsself.metrics = {'queries_processed': 0,'total_tokens_used': 0,'total_cost': 0.0,'avg_response_time': 0.0,'error_count': 0}# Create optimized retrieverself.retriever = vector_store.create_retriever(search_type="mmr",search_kwargs={"k": 4, "fetch_k": 12} # Reduced for speed)# Build production chainself._build_production_chain()def _build_production_chain(self):"""Build optimized production chain"""from langchain.chains import create_retrieval_chainfrom langchain.chains.combine_documents import create_stuff_documents_chain# Optimized promptprompt = ChatPromptTemplate.from_template("""Based on the provided context, answer the question concisely and accurately.Context: {context}Question: {input}Answer:""")question_answer_chain = create_stuff_documents_chain(self.llm, prompt)self.chain = create_retrieval_chain(self.retriever, question_answer_chain)def query_with_monitoring(self, question: str) -> dict:"""Process query with performance monitoring"""start_time = time.time()try:with get_openai_callback() as cb:result = self.chain.invoke({"input": question})# Update metricsresponse_time = time.time() - start_timeself.metrics['queries_processed'] += 1self.metrics['total_tokens_used'] += cb.total_tokensself.metrics['total_cost'] += cb.total_cost# Update average response timecurrent_avg = self.metrics['avg_response_time']query_count = self.metrics['queries_processed']self.metrics['avg_response_time'] = ((current_avg * (query_count - 1) + response_time) / query_count)return {'answer': result['answer'],'sources': [doc.metadata.get('url') for doc in result.get('context', [])],'response_time': response_time,'tokens_used': cb.total_tokens,'cost': cb.total_cost,'success': True}except Exception as e:self.metrics['error_count'] += 1return {'error': str(e),'response_time': time.time() - start_time,'success': False}def batch_process_queries(self, questions: List[str], max_workers: int = 3) -> List[dict]:"""Process multiple queries in parallel"""print(f"⚡ Processing {len(questions)} queries with {max_workers} workers...")with ThreadPoolExecutor(max_workers=max_workers) as executor:results = list(executor.map(self.query_with_monitoring, questions))print(f"✅ Batch processing complete!")return resultsdef get_performance_report(self) -> dict:"""Get comprehensive performance report"""return {'queries_processed': self.metrics['queries_processed'],'total_tokens_used': self.metrics['total_tokens_used'],'total_cost': round(self.metrics['total_cost'], 4),'avg_response_time': round(self.metrics['avg_response_time'], 2),'error_count': self.metrics['error_count'],'error_rate': round(self.metrics['error_count'] / max(1, self.metrics['queries_processed']) * 100, 2),'cost_per_query': round(self.metrics['total_cost'] / max(1, self.metrics['queries_processed']), 4)}# Create production systemproduction_rag = ProductionRAGSystem(vector_store)# Performance testingtest_queries = ["What is LangChain?","How do I use OpenAI embeddings?","What are the different text splitters?","How do I build a retrieval chain?","What is the difference between similarity and MMR search?"]print("\n⚡ Production Performance Test:")results = production_rag.batch_process_queries(test_queries)# Display resultsfor i, (query, result) in enumerate(zip(test_queries, results)):if result['success']:print(f"\n{i+1}. {query}")print(f" Answer: {result['answer'][:100]}...")print(f" Time: {result['response_time']:.2f}s, Tokens: {result['tokens_used']}, Cost: ${result['cost']:.4f}")else:print(f"\n{i+1}. {query} - ERROR: {result['error']}")# Performance reportprint(f"\n📊 Performance Report:")report = production_rag.get_performance_report()for key, value in report.items():print(f" {key.replace('_', ' ').title()}: {value}")
Scale Beyond Local Development with Supacrawler
While this tutorial demonstrates building with LangChain locally, production RAG systems require sophisticated data ingestion, content management, and performance optimization:
- Large-Scale Knowledge Bases: Processing thousands of documents with consistent quality
- Content Freshness: Keeping embeddings current with website changes
- Advanced Processing: Handling complex document structures, multimedia content, and dynamic pages
- Performance Optimization: Balancing retrieval quality with response time
Supacrawler's Crawl API integrates seamlessly with LangChain for production-scale RAG systems:
from langchain_core.documents import Documentfrom supacrawler import SupacrawlerClientclass ProductionSupacrawlerLoader:"""Production-grade Supacrawler integration with LangChain"""def __init__(self, api_key: str):self.client = SupacrawlerClient(api_key=api_key)def load_knowledge_base(self, urls: List[str]) -> List[Document]:"""Load multiple websites into a comprehensive knowledge base"""all_documents = []for url in urls:job = self.client.create_crawl_job(url=url,format='markdown',depth=4, # Deep crawling for comprehensive coveragelink_limit=5000, # Large-scale processingrender_js=True,# Production optimizationsinclude_patterns=['/docs/*', '/api/*', '/guides/*', '/tutorials/*'],exclude_patterns=['/blog/*', '/news/*', '/privacy/*'],remove_selectors=['.sidebar', '.nav', '.footer', '.ads'],wait_for='.main-content, .content, main',block_ads=True,block_cookies=True,# Quality controlstimeout=45000,concurrent_limit=8,respect_robots_txt=True)result = self.client.wait_for_crawl(job.job_id)if result.status == 'completed':crawl_data = result.data.get('crawl_data', {})for url, page_data in crawl_data.items():content = page_data.get('markdown', '')if len(content.strip()) > 200: # Quality filterdoc = Document(page_content=content,metadata={**page_data.get('metadata', {}),'source_domain': url.split('/')[2],'crawl_timestamp': result.data.get('timestamp'),'content_quality_score': len(content) / 1000 # Simple quality metric})all_documents.append(doc)return all_documents
Production Integration Benefits:
- ✅ Scalable Data Ingestion: Process 10,000+ pages without infrastructure management
- ✅ LangChain Compatibility: Direct integration with Document loaders and processors
- ✅ Content Quality: Clean, structured content optimized for embeddings
- ✅ Automated Updates: Easy re-crawling for fresh knowledge bases
- ✅ Error Resilience: Built-in retry logic and failure handling
- ✅ Performance Optimization: Concurrent processing and intelligent rate limiting
Getting Started:
- 📖 Crawl API Documentation for LangChain integration patterns
- 🔧 GitHub Repository for self-hosting options
- 🆓 Start with 1,000 free crawl operations
Conclusion
This comprehensive guide demonstrated how to build a sophisticated RAG system using LangChain's powerful abstractions with Supacrawler's intelligent web crawling and Supabase's vector storage. The combination provides:
- Modular Architecture: Easy to customize and extend with LangChain's ecosystem
- Advanced Processing: Sophisticated document chunking and retrieval strategies
- Production Ready: Performance monitoring, error handling, and scalability features
- Conversation Support: Memory and context-aware interactions
The LangChain integration offers unmatched flexibility for complex RAG scenarios, making it ideal for applications requiring advanced document processing, custom retrieval logic, or integration with existing LangChain workflows.
Whether building customer support systems, documentation search, or intelligent assistants, this LangChain-based RAG architecture provides the foundation for sophisticated AI applications that can scale from prototype to production.