RAG Pipeline
ContextRouter’s RAG implementation goes far beyond simple vector search. It’s a sophisticated, multi-stage pipeline designed for accuracy, diversity, and full provenance tracking.
Pipeline Overview
When a user asks a question, the RAG pipeline:
Query │ ▼┌────────────────────────────────────────────────────────────┐│ Intent Detection ││ • Classify query type (RAG, web, direct) ││ • Generate optimized search queries ││ • Extract taxonomy concepts │└────────────────────────┬───────────────────────────────────┘ │ ▼┌────────────────────────────────────────────────────────────┐│ Parallel Retrieval ││ ││ ┌──────────┐ ┌──────────┐ ┌──────────┐ ││ │Providers │ │Connectors│ │ Graph │ ││ │(Postgres)│ │ (Web) │ │ Facts │ ││ └────┬─────┘ └────┬─────┘ └────┬─────┘ ││ │ │ │ ││ └─────────────┴─────────────┘ ││ │ │└──────────────────────┼─────────────────────────────────────┘ │ ▼┌────────────────────────────────────────────────────────────┐│ Deduplication ││ • SHA256 hash of URL + snippet + content ││ • Prevents duplicate citations │└────────────────────────┬───────────────────────────────────┘ │ ▼┌────────────────────────────────────────────────────────────┐│ Reranking ││ • Vertex AI Ranking (neural) ││ • MMR for diversity ││ • Hybrid fusion (vector + keyword) │└────────────────────────┬───────────────────────────────────┘ │ ▼┌────────────────────────────────────────────────────────────┐│ Citation Building ││ • Per-source type limits ││ • Deduplication by source ││ • Provenance attachment │└────────────────────────┬───────────────────────────────────┘ │ ▼ GenerationKey Features
Multi-Source Retrieval
Fetch context from multiple sources simultaneously:
- Vector stores: Postgres + pgvector, Vertex AI Search
- Knowledge graphs: Cognee integration, Postgres KG
- Live connectors: Web search, RSS feeds
All sources are queried in parallel for minimum latency.
Hybrid Search
Combine semantic and keyword search for better recall:
[rag]hybrid_fusion = "rrf" # or "weighted"enable_fts = true # Enable full-text searchrrf_k = 60 # RRF constanthybrid_vector_weight = 0.7hybrid_text_weight = 0.3Intelligent Reranking
Improve precision with a second-pass ranking:
- Vertex AI Ranking: Neural cross-encoder reranking
- MMR: Balance relevance with diversity
- Configurable limits: Control results per source type
Full Provenance
Every retrieved document carries its complete history:
citation = { "text": "RAG combines retrieval with generation...", "source": {"type": "book", "title": "AI Patterns", "page": 42}, "provenance": [ "provider:postgres", "reranker:vertex", "formatter:citations" ], "confidence": 0.94}Quick Configuration
[rag]provider = "postgres"reranking_enabled = truecitations_enabled = true
# Result limitsgeneral_retrieval_final_count = 10max_books = 5max_videos = 3max_qa = 5max_web = 3
# Citation limitscitations_books = 3citations_videos = 2citations_qa = 3Basic Usage
With ChatRunner
from contextrouter.cortex.runners import ChatRunner
runner = ChatRunner(config)
citations = []async for event in runner.stream("What is machine learning?"): if hasattr(event, 'content'): print(event.content, end="") if hasattr(event, 'citations'): citations = event.citations
# Display sourcesfor c in citations: print(f"Source: {c['source']['title']}")Standalone Pipeline
Use the RAG pipeline directly for custom workflows:
from contextrouter.modules.retrieval.rag import RagPipelinefrom contextrouter.modules.retrieval.rag.settings import RagRetrievalSettings
settings = RagRetrievalSettings( reranking_enabled=True, general_retrieval_final_count=10,)
pipeline = RagPipeline(config, settings=settings)
result = await pipeline.retrieve( user_query="machine learning basics", retrieval_queries=["ML fundamentals", "intro to machine learning"], taxonomy_concepts=["AI", "Machine Learning"],)
for doc in result.documents: print(f"{doc.source_type}: {doc.snippet[:100]}...")Runtime Overrides
Customize behavior per-request:
runtime_settings = { # Enable/disable features "web_search_enabled": True, "reranking_enabled": False,
# Change limits "max_results": 5,
# Switch providers "provider": "vertex",}
async for event in runner.stream(query, runtime_settings=runtime_settings): process(event)