Nodes & Steps
The Cortex is built from two types of components: Nodes (graph wrappers) and Steps (pure business logic). Understanding this separation is key to extending ContextRouter effectively.
The Node/Step Pattern
┌─────────────────────────────────────────────────────────┐│ NODE ││ (Registered in graph, handles state) ││ ││ ┌─────────────────────────────────────────────────┐ ││ │ STEP │ ││ │ (Pure function, business logic only) │ ││ │ │ ││ │ • No state access │ ││ │ • Receives parameters, returns results │ ││ │ • Easy to test in isolation │ ││ └─────────────────────────────────────────────────┘ ││ ││ Node responsibilities: ││ • Extract needed data from state ││ • Call the step function ││ • Return partial state update │└─────────────────────────────────────────────────────────┘Why this pattern?
- Steps are testable — Pure functions with clear inputs/outputs
- Nodes handle plumbing — State access, error handling, tracing
- Reusable logic — Same step can be used in different nodes/graphs
RAG Retrieval Nodes
The standard RAG workflow includes these nodes:
extract_query
Extracts and normalizes the search query from user input.
Input: Raw messages from user Output: Normalized query string
# What it does internallydef extract_query_step(messages: list[BaseMessage]) -> str: # Get the last user message last_message = messages[-1] # Clean and normalize query = normalize_text(last_message.content) return querydetect_intent
Uses an LLM to classify what the user wants and extract search queries.
Input: Messages, taxonomy (for concept extraction) Output: Intent classification, retrieval queries, taxonomy concepts
# Possible intentsclass Intent(Enum): RAG_SEARCH = "rag_search" # Needs knowledge base WEB_SEARCH = "web_search" # Needs live web data TRANSLATION = "translation" # Language translation IDENTITY = "identity" # About the assistant DIRECT = "direct" # Can answer directlyThe node also extracts:
- retrieval_queries: Optimized queries for search (up to 3)
- taxonomy_concepts: Categories for filtering/graph lookup
should_retrieve
A conditional router that examines the detected intent.
Logic:
def should_retrieve(state: AgentState) -> str: intent = state["intent"]
if intent in [Intent.RAG_SEARCH, Intent.WEB_SEARCH]: return "retrieve" # Go to retrieve node else: return "generate" # Skip to generationretrieve
Orchestrates parallel search across all data sources.
What it does:
- Queries Providers (Postgres, Vertex AI Search)
- Queries Connectors (Web search, RSS) if enabled
- Fetches Graph Facts from knowledge graph
- Deduplicates results using SHA256 hashing
- Reranks using configured strategy
- Selects top results per source type
Output: RetrievalResult with documents and graph facts
generate
Produces the final response using retrieved context.
Input: Messages, retrieved documents, system prompts Output: Generated response text
The node:
- Builds a prompt with context from retrieval
- Includes system instructions (identity, style)
- Streams the response from the LLM
- Formats citations
suggest
Generates follow-up question suggestions.
Input: Conversation, retrieved documents Output: List of suggested questions (typically 3)
Steps (Pure Functions)
Steps live in contextrouter/cortex/steps/ and contain the actual business logic:
async def detect_intent_step( messages: list[BaseMessage], llm: BaseLLM, taxonomy: dict[str, Any], language: str = "en",) -> IntentResult: """ Pure function to detect user intent.
No state access — receives everything as parameters. Returns a clean result object. """ # Build the intent detection prompt prompt = build_intent_prompt(messages, taxonomy, language)
# Call the LLM response = await llm.generate(prompt)
# Parse the response result = parse_intent_response(response)
return IntentResult( intent=result.intent, retrieval_queries=result.queries, taxonomy_concepts=result.concepts, confidence=result.confidence, )Node Wrapper Contract
All nodes must follow this contract:
from contextrouter.cortex.nodes.utils import BaseAgent
class BaseAgent: async def process(self, state: AgentState) -> dict[str, Any]: """ Process the current state and return updates.
MUST return a dict (partial state update). MUST await async operations (not return coroutines). SHOULD not mutate state directly. """ raise NotImplementedErrorCritical rules:
- Return type: Always return
dict[str, Any] - Async steps: Always
awaitasync operations - State updates: Return partial updates, don’t mutate
# ✅ Correctasync def process(self, state): result = await some_async_step(state["messages"]) return {"my_result": result}
# ❌ Wrong - returning coroutineasync def process(self, state): return {"my_result": some_async_step(state["messages"])} # Missing await!
# ❌ Wrong - mutating stateasync def process(self, state): state["my_result"] = await some_async_step() return stateCreating Custom Nodes
Register a custom node to add new capabilities:
from contextrouter.core.registry import register_agentfrom contextrouter.cortex.nodes.utils import BaseAgentfrom contextrouter.cortex.state import AgentState
@register_agent("sentiment_analysis")class SentimentAnalysisNode(BaseAgent): """Analyze sentiment of user messages."""
async def process(self, state: AgentState) -> dict[str, Any]: messages = state["messages"] config = state["config"]
# Get LLM from registry llm = model_registry.create_llm(config.models.default_llm, config=config)
# Call our pure step function sentiment = await analyze_sentiment_step( messages=messages, llm=llm, )
# Return partial state update return { "sentiment": sentiment, "sentiment_score": sentiment.score, }
# The pure step function (testable in isolation)async def analyze_sentiment_step( messages: list[BaseMessage], llm: BaseLLM,) -> SentimentResult: """Pure function for sentiment analysis.""" prompt = build_sentiment_prompt(messages) response = await llm.generate(prompt) return parse_sentiment_response(response)Using Custom Nodes in a Graph
Add your custom node to a workflow:
from langgraph.graph import StateGraph, START, ENDfrom contextrouter.core.registry import register_graph, agent_registry
@register_graph("sentiment_aware_chat")def build_sentiment_graph(): workflow = StateGraph(AgentState)
# Get node classes from registry sentiment_node = agent_registry.get("sentiment_analysis") generate_node = agent_registry.get("generate")
# Add nodes workflow.add_node("analyze_sentiment", sentiment_node) workflow.add_node("generate", generate_node)
# Define flow workflow.add_edge(START, "analyze_sentiment") workflow.add_edge("analyze_sentiment", "generate") workflow.add_edge("generate", END)
return workflowTesting Nodes and Steps
The separation makes testing easy:
# Test the step (pure function) in isolationasync def test_detect_intent_step(): messages = [HumanMessage(content="What is RAG?")] mock_llm = MockLLM(response='{"intent": "rag_search"}')
result = await detect_intent_step( messages=messages, llm=mock_llm, taxonomy={"AI": {"RAG": {}}}, )
assert result.intent == Intent.RAG_SEARCH
# Test the node with mocked stateasync def test_intent_node(): node = IntentDetectionNode() state = { "messages": [HumanMessage(content="What is RAG?")], "config": mock_config, }
result = await node.process(state)
assert "intent" in result assert result["intent"] == Intent.RAG_SEARCH