Transformers
Transformers are modular components that process, analyze, and enrich data as it flows through ContextRouter pipelines. They provide the “intelligence” that turns raw data into structured, searchable knowledge.
What Are Transformers?
Transformers are functions or classes that take a BisquitEnvelope as input, process its content, and return an enriched envelope. They can:
- Extract entities and relationships
- Analyze sentiment and tone
- Classify content by topic
- Generate summaries and keywords
- Convert between formats
- Add metadata and provenance
Core Architecture
Base Interface
All transformers implement the BaseTransformer interface:
from abc import ABC, abstractmethodfrom contextrouter.core.bisquit import BisquitEnvelope
class BaseTransformer(ABC): """Base class for all transformers."""
@abstractmethod def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: """Transform the envelope content.""" passRegistration
Transformers are registered using decorators:
from contextrouter.core.registry import register_transformer
@register_transformer("my_transformer")class MyTransformer(BaseTransformer): def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: # Process envelope return envelopeBuilt-in Transformers
NER Transformer (@register_transformer("ner"))
Extracts named entities from text content.
Features:
- Person, Organization, Location recognition
- Date, money, percentage extraction
- Custom entity types support
- Confidence scoring
Configuration:
[ingestion.rag.transformers.ner]enabled = truemodel = "vertex/gemini-2.0-flash"entity_types = ["PERSON", "ORG", "GPE", "DATE", "MONEY"]confidence_threshold = 0.7max_entities_per_chunk = 20Example Output:
# Input envelopeenvelope = BisquitEnvelope( content={"text": "John Smith works at Google in New York."})
# After NER transformationenvelope.content["entities"] = [ {"text": "John Smith", "type": "PERSON", "confidence": 0.95}, {"text": "Google", "type": "ORG", "confidence": 0.92}, {"text": "New York", "type": "GPE", "confidence": 0.88}]Taxonomy Transformer (@register_transformer("taxonomy"))
Categorizes content into hierarchical topics.
Features:
- Automatic topic classification
- Hierarchical category trees
- Confidence-based categorization
- Custom taxonomy support
Configuration:
[ingestion.rag.transformers.taxonomy]enabled = truemodel = "vertex/gemini-2.0-flash"max_categories = 5category_depth = 3confidence_threshold = 0.6Example Output:
envelope.content["categories"] = [ {"name": "Technology", "confidence": 0.89}, {"name": "Artificial Intelligence", "confidence": 0.76}, {"name": "Machine Learning", "confidence": 0.68}]Keyphrases Transformer (@register_transformer("keyphrases"))
Extracts important phrases and keywords.
Features:
- Statistical and LLM-based extraction
- Multi-language support
- Phrase length control
- Relevance ranking
Configuration:
[ingestion.rag.transformers.keyphrases]enabled = truealgorithm = "mixed" # "llm", "tfidf", "mixed"max_phrases = 10min_phrase_length = 2max_phrase_length = 5Summarization Transformer (@register_transformer("summarization"))
Generates concise summaries of content.
Features:
- Extractive and abstractive summarization
- Length control
- Multi-language support
- Focus area specification
Configuration:
[ingestion.rag.transformers.summarization]enabled = truemodel = "vertex/gemini-2.0-flash"max_length = 200min_length = 50focus_areas = ["main_points", "conclusion", "key_facts"]Sentiment Transformer (@register_transformer("sentiment"))
Analyzes emotional tone and sentiment.
Features:
- Positive/negative/neutral classification
- Intensity scoring
- Emotion detection
- Context-aware analysis
Configuration:
[ingestion.rag.transformers.sentiment]enabled = truemodel = "vertex/gemini-2.0-flash"include_emotions = trueintensity_threshold = 0.3Shadow Record Transformer (@register_transformer("shadow"))
Creates optimized metadata for search indexing.
Features:
- Combines multiple analysis results
- Generates search-friendly representations
- Pre-computes frequently accessed fields
- Optimizes for vector search
Configuration:
[ingestion.rag.transformers.shadow]enabled = trueinclude_keywords = trueinclude_entities = trueinclude_taxonomy = trueinclude_summary = truesummary_length = 200Using Transformers
Direct Usage
from contextrouter.core.registry import select_transformerfrom contextrouter.core.bisquit import BisquitEnvelope
# Get a transformerner_transformer = select_transformer("ner")
# Create envelopeenvelope = BisquitEnvelope( content={"text": "Apple Inc. was founded by Steve Jobs in Cupertino."})
# Apply transformationenriched = ner_transformer.transform(envelope)
print(enriched.content["entities"])# [{"text": "Apple Inc.", "type": "ORG", ...}, {"text": "Steve Jobs", "type": "PERSON", ...}]Pipeline Composition
from typing import List
def create_enrichment_pipeline(transformer_names: List[str]): """Create a pipeline of transformers.""" transformers = [select_transformer(name) for name in transformer_names]
def process(envelope: BisquitEnvelope) -> BisquitEnvelope: for transformer in transformers: envelope = transformer.transform(envelope) return envelope
return process
# Create pipelineenrich_pipeline = create_enrichment_pipeline([ "ner", "taxonomy", "keyphrases", "summarization"])
# Process contentenvelope = BisquitEnvelope(content={"text": article_text})enriched = enrich_pipeline(envelope)Conditional Transformation
def smart_transform(envelope: BisquitEnvelope) -> BisquitEnvelope: """Apply transformers based on content analysis.""" content = envelope.content text = content.get("text", "")
# Detect content type if is_code(text): # Apply code-specific transformers code_transformer = select_transformer("code_analyzer") envelope = code_transformer.transform(envelope)
elif is_news(text): # Apply news-specific transformers sentiment_transformer = select_transformer("sentiment") envelope = sentiment_transformer.transform(envelope)
else: # Apply general transformers ner_transformer = select_transformer("ner") envelope = ner_transformer.transform(envelope)
return envelopeCustom Transformers
Basic Custom Transformer
from contextrouter.core.registry import register_transformerfrom contextrouter.core.interfaces import BaseTransformer
@register_transformer("word_count")class WordCountTransformer(BaseTransformer): """Counts words in text content."""
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: content = envelope.content
if isinstance(content, dict) and "text" in content: text = content["text"] word_count = len(text.split())
# Add metadata envelope.metadata["word_count"] = word_count envelope.metadata["character_count"] = len(text)
# Add trace envelope.add_trace("transformer:word_count")
return envelopeAdvanced Custom Transformer
@register_transformer("complexity_analyzer")class ComplexityAnalyzer(BaseTransformer): """Analyzes text complexity using multiple metrics."""
def __init__(self, config=None): self.config = config or {} self.readability_metrics = self.config.get("readability", True)
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: content = envelope.content
if not isinstance(content, dict) or "text" not in content: return envelope
text = content["text"] analysis = {}
# Basic metrics analysis["word_count"] = len(text.split()) analysis["sentence_count"] = len(text.split('.')) analysis["avg_word_length"] = sum(len(word) for word in text.split()) / max(1, analysis["word_count"])
# Readability (if enabled) if self.readability_metrics: analysis["flesch_score"] = self._calculate_flesch(text) analysis["reading_level"] = self._get_reading_level(analysis["flesch_score"])
# Complexity classification analysis["complexity"] = self._classify_complexity(analysis)
# Update envelope content["complexity_analysis"] = analysis envelope.metadata.update({ "complexity_score": analysis["complexity"], "reading_level": analysis.get("reading_level", "unknown") }) envelope.add_trace("transformer:complexity_analyzer")
return envelope
def _calculate_flesch(self, text: str) -> float: """Calculate Flesch Reading Ease score.""" # Simplified implementation words = len(text.split()) sentences = len(text.split('.')) syllables = sum(self._count_syllables(word) for word in text.split())
if words == 0 or sentences == 0: return 0.0
return 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words)
def _count_syllables(self, word: str) -> int: """Count syllables in a word.""" word = word.lower() count = 0 vowels = "aeiouy" prev_was_vowel = False
for char in word: is_vowel = char in vowels if is_vowel and not prev_was_vowel: count += 1 prev_was_vowel = is_vowel
if word.endswith("e"): count -= 1 if count == 0: count = 1
return count
def _get_reading_level(self, flesch_score: float) -> str: """Convert Flesch score to reading level.""" if flesch_score >= 90: return "5th grade" elif flesch_score >= 80: return "6th grade" elif flesch_score >= 70: return "7th grade" elif flesch_score >= 60: return "8th-9th grade" elif flesch_score >= 50: return "10th-12th grade" elif flesch_score >= 30: return "college" else: return "college graduate"
def _classify_complexity(self, analysis: dict) -> float: """Classify text complexity on 0-1 scale.""" # Combine multiple factors word_length_factor = min(1.0, analysis["avg_word_length"] / 8.0) sentence_length_factor = min(1.0, analysis["word_count"] / max(1, analysis["sentence_count"]) / 20.0)
return (word_length_factor + sentence_length_factor) / 2.0Asynchronous Transformers
import asynciofrom typing import List
@register_transformer("async_batch_processor")class AsyncBatchTransformer(BaseTransformer): """Processes multiple envelopes asynchronously."""
async def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: """Async transformation with batch processing.""" content = envelope.content
if isinstance(content, dict) and "batch_items" in content: # Process items in parallel items = content["batch_items"] tasks = [self._process_item(item) for item in items] processed_items = await asyncio.gather(*tasks)
content["processed_batch"] = processed_items envelope.add_trace("transformer:async_batch_processor")
return envelope
async def _process_item(self, item: dict) -> dict: """Process a single item (simulate async work).""" await asyncio.sleep(0.1) # Simulate I/O return { "original": item, "processed": True, "timestamp": asyncio.get_event_loop().time() }Configuration & Deployment
Global Configuration
Configure transformers globally in settings.toml:
[ingestion.rag.transformers]# Global settings for all transformerstimeout_seconds = 30retry_attempts = 3batch_size = 10
[ingestion.rag.transformers.ner]enabled = truemodel = "vertex/gemini-2.0-flash"entity_types = ["PERSON", "ORG", "GPE"]confidence_threshold = 0.7
[ingestion.rag.transformers.taxonomy]enabled = truemodel = "vertex/gemini-2.0-flash"max_categories = 5custom_categories = ["AI", "Machine Learning", "Data Science"]Runtime Configuration
Override settings at runtime:
# Runtime transformer configurationruntime_config = { "transformers": { "ner": { "enabled": True, "confidence_threshold": 0.8, "entity_types": ["PERSON", "ORG", "TECH"] }, "taxonomy": { "enabled": False # Disable taxonomy for this request } }}
# Apply to processingresult = await process_with_config(envelope, runtime_config)Plugin-Based Transformers
Package transformers as plugins:
from .sentiment import SentimentTransformerfrom .code_analysis import CodeAnalysisTransformer
# Register on import__all__ = ["SentimentTransformer", "CodeAnalysisTransformer"]
# my_transformers/sentiment.pyfrom contextrouter.core.registry import register_transformer
@register_transformer("advanced_sentiment")class SentimentTransformer(BaseTransformer): # Advanced sentiment analysis implementation passBest Practices
Performance
- Batch processing for multiple envelopes
- Caching for expensive operations
- Async I/O for external API calls
- Early termination for irrelevant content
Error Handling
- Graceful degradation when transformers fail
- Timeout protection for long-running operations
- Retry logic for transient failures
- Logging of transformation errors
Data Quality
- Input validation before processing
- Confidence scoring for uncertain results
- Fallback values for missing data
- Provenance tracking for all transformations
Testing
- Unit tests for individual transformers
- Integration tests for transformer pipelines
- Mock external dependencies for reliable testing
- Performance benchmarks for optimization
Troubleshooting
Common Issues
Transformer not found:
# Check if transformer is registeredfrom contextrouter.core.registry import transformer_registryprint(list(transformer_registry.keys()))
# Ensure import happenedimport my_transformers # Triggers registrationTimeout errors:
# Increase timeout[ingestion.rag.transformers]timeout_seconds = 60Memory issues:
# Process in batchesdef batch_transform(envelopes: List[BisquitEnvelope], batch_size: int = 10): for i in range(0, len(envelopes), batch_size): batch = envelopes[i:i + batch_size] # Process batch yield process_batch(batch)Inconsistent results:
# Add seed for reproducible results@register_transformer("consistent_ner")class ConsistentNERTransformer(BaseTransformer): def __init__(self, seed: int = 42): self.seed = seed
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: # Use seed for consistent randomization import random random.seed(self.seed) # ... rest of implementationAdvanced Topics
Transformer Chains
Create complex processing pipelines:
class TransformerChain: """Chain multiple transformers with conditional logic."""
def __init__(self, transformers_config: dict): self.config = transformers_config self.transformers = {}
# Initialize transformers for name, settings in self.config.items(): if settings.get("enabled", True): self.transformers[name] = select_transformer(name)
def process(self, envelope: BisquitEnvelope) -> BisquitEnvelope: """Process envelope through transformer chain.""" content_type = self._detect_content_type(envelope)
# Apply content-specific transformers if content_type == "code": chain = ["code_analyzer", "complexity_analyzer"] elif content_type == "news": chain = ["sentiment", "taxonomy", "summarization"] else: chain = ["ner", "keyphrases", "taxonomy"]
for transformer_name in chain: if transformer_name in self.transformers: transformer = self.transformers[transformer_name] envelope = transformer.transform(envelope)
return envelope
def _detect_content_type(self, envelope: BisquitEnvelope) -> str: """Detect content type for conditional processing.""" content = envelope.content if isinstance(content, dict) and "text" in content: text = content["text"].lower() if any(ext in text for ext in [".py", ".js", ".java", "function", "class "]): return "code" elif any(word in text for word in ["breaking", "news", "announced", "reported"]): return "news" return "general"Custom Transformer Registry
Create domain-specific transformer registries:
from contextrouter.core.registry import ComponentFactory
class DomainTransformerRegistry: """Registry for domain-specific transformers."""
def __init__(self, domain: str): self.domain = domain self.transformers = {}
def register(self, name: str, transformer_class: type): """Register a domain transformer.""" self.transformers[name] = transformer_class
def get(self, name: str): """Get transformer with domain-specific logic.""" transformer_class = self.transformers.get(name) if not transformer_class: raise KeyError(f"Transformer '{name}' not found in {self.domain} domain")
# Initialize with domain-specific config return transformer_class(domain_config=self._get_domain_config())
def _get_domain_config(self) -> dict: """Get configuration for this domain.""" # Domain-specific configuration logic return {"domain": self.domain}
# Usagemedical_registry = DomainTransformerRegistry("medical")medical_registry.register("diagnosis_extractor", DiagnosisExtractor)
extractor = medical_registry.get("diagnosis_extractor")This comprehensive transformer system enables flexible, modular data processing that can be easily extended and customized for specific use cases.