Security Guide
Building secure AI systems requires careful consideration of data privacy, access control, audit trails, and compliance. ContextRouter provides comprehensive security features designed for production deployment.
Security Principles
ContextRouter follows these core security principles:
- Data Provenance - Every piece of data is traceable to its source
- Access Control - Fine-grained permissions for data access
- Audit Trails - Complete logging of all operations
- Defense in Depth - Multiple security layers
- Privacy by Design - Security built into the architecture
Bisquit Protocol Security
The BisquitEnvelope provides the foundation for secure data handling.
Data Provenance
Every envelope maintains a complete audit trail:
from contextrouter.core.bisquit import BisquitEnvelope
# Data from external sourceenvelope = BisquitEnvelope( content=user_data, provenance=["connector:web", "transformer:validation"], metadata={ "source_ip": "192.168.1.100", "user_agent": "Mozilla/5.0...", "timestamp": datetime.now().isoformat(), "data_classification": "public" })
# Each processing step adds traceenvelope.add_trace("transformer:anonymization")envelope.add_trace("provider:storage")Access Tokens
Implement row-level security with token-based access control:
from contextrouter.core.tokens import create_token, TokenPermissions
# Create user-specific tokenuser_token = create_token( user_id="user_123", permissions=[ TokenPermissions.RAG_READ, TokenPermissions.RAG_WRITE, TokenPermissions.INGEST_READ ], expires_in_hours=24, metadata={"department": "engineering"})
# Attach to envelopeenvelope.sign(user_token.id)
# Later, verify accessdef check_access(envelope: BisquitEnvelope, required_permission: str) -> bool: if not envelope.token_id: return False
token = get_token(envelope.token_id) if not token or token.is_expired(): return False
return required_permission in token.permissionsAuthentication & Authorization
API Security
When deploying ContextRouter as an API service:
from fastapi import Depends, HTTPException, Securityfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Security(security)): """Validate JWT token and return user context.""" try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"]) user_id = payload.get("sub") if not user_id: raise HTTPException(status_code=401, detail="Invalid token") return {"user_id": user_id, "permissions": payload.get("permissions", [])} except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.JWTError: raise HTTPException(status_code=401, detail="Invalid token")
@app.post("/rag/chat")async def chat_endpoint( request: ChatRequest, user_ctx: dict = Depends(get_current_user)): """Secure chat endpoint with user context.""" # User context is automatically passed to ContextRouter async for event in stream_agent(request.messages, user_ctx=user_ctx): yield eventSession Management
Implement secure session handling:
from contextrouter.core.security import SessionManager
session_manager = SessionManager(config)
# Create session with security contextsession = session_manager.create_session( user_id="user_123", permissions=["rag:read", "rag:write"], ip_address=request.client.host, user_agent=request.headers.get("user-agent"))
# Validate session on each requestif not session_manager.validate_session(session.id): raise HTTPException(status_code=401, detail="Invalid session")
# ContextRouter automatically respects session permissionsrunner = ChatRunner(config, session=session)Data Privacy & Compliance
PII Detection & Handling
Automatically detect and handle personally identifiable information:
from contextrouter.modules.transformers.privacy import PIITransformer
class PIITransformer(BaseTransformer): """Detect and anonymize PII."""
def __init__(self, config): self.pii_patterns = { "email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', "phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', "ssn": r'\b\d{3}[-]?\d{2}[-]?\d{4}\b' }
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: content = envelope.content
if isinstance(content, dict) and "text" in content: text = content["text"] pii_found = []
for pii_type, pattern in self.pii_patterns.items(): matches = re.findall(pattern, text) if matches: pii_found.extend([(pii_type, match) for match in matches]) # Anonymize text = re.sub(pattern, f"[{pii_type.upper()}]", text)
if pii_found: envelope.content["text"] = text envelope.metadata["pii_detected"] = pii_found envelope.metadata["anonymized"] = True envelope.add_trace("transformer:pii_anonymization")
return envelope
# Register and use@register_transformer("pii_protection")class PIIProtectionTransformer(PIITransformer): passData Classification
Implement data classification for compliance:
class DataClassifier(BaseTransformer): """Classify data sensitivity level."""
SENSITIVITY_LEVELS = { "public": 0, "internal": 1, "confidential": 2, "restricted": 3 }
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: content = envelope.content
# Analyze content for sensitive keywords sensitivity_score = self._analyze_sensitivity(content)
# Determine classification if sensitivity_score >= 0.8: classification = "restricted" elif sensitivity_score >= 0.6: classification = "confidential" elif sensitivity_score >= 0.3: classification = "internal" else: classification = "public"
envelope.metadata["sensitivity_level"] = self.SENSITIVITY_LEVELS[classification] envelope.metadata["data_classification"] = classification envelope.add_trace("transformer:data_classification")
return envelope
def _analyze_sensitivity(self, content) -> float: """Analyze content for sensitive information.""" sensitive_keywords = [ "password", "secret", "confidential", "internal", "salary", "medical", "personal", "private" ]
if isinstance(content, dict) and "text" in content: text = content["text"].lower() matches = sum(1 for keyword in sensitive_keywords if keyword in text) return min(matches / len(sensitive_keywords), 1.0)
return 0.0Secure Deployment
Environment Security
Secure configuration management:
# Use environment variables, never hardcode secretsexport VERTEX_PROJECT_ID=your-secure-projectexport OPENAI_API_KEY=sk-your-secure-keyexport POSTGRES_PASSWORD=your-secure-password
# Use .env files with restricted permissionschmod 600 .env
# Never commit secrets to version controlecho ".env" >> .gitignoreecho "secrets/" >> .gitignoreNetwork Security
Deploy behind security layers:
# Nginx configuration for API deploymentserver { listen 443 ssl http2; server_name api.contextrouter.example.com;
# SSL configuration ssl_certificate /etc/ssl/certs/contextrouter.crt; ssl_certificate_key /etc/ssl/private/contextrouter.key;
# Security headers add_header X-Frame-Options DENY; add_header X-Content-Type-Options nosniff; add_header X-XSS-Protection "1; mode=block"; add_header Strict-Transport-Security "max-age=31536000; includeSubDomains";
# Rate limiting limit_req zone=api burst=10 nodelay;
location /api/ { proxy_pass http://localhost:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; }}Database Security
Secure database access:
-- Create restricted database userCREATE USER contextrouter_user WITH PASSWORD 'secure_password';GRANT CONNECT ON DATABASE contextrouter TO contextrouter_user;
-- Grant minimal permissionsGRANT USAGE ON SCHEMA public TO contextrouter_user;GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO contextrouter_user;GRANT USAGE ON ALL SEQUENCES IN SCHEMA public TO contextrouter_user;
-- Row-level security (PostgreSQL 15+)ALTER TABLE documents ENABLE ROW LEVEL SECURITY;CREATE POLICY user_data_policy ON documents USING (user_id = current_user_id());Monitoring & Auditing
Implement comprehensive security monitoring:
from contextrouter.core.observability import SecurityMonitor
security_monitor = SecurityMonitor(config)
class SecurityLoggingTransformer(BaseTransformer): """Log security-relevant operations."""
def transform(self, envelope: BisquitEnvelope) -> BisquitEnvelope: # Log access patterns security_monitor.log_access({ "user_id": envelope.token_id, "operation": "transform", "data_type": envelope.metadata.get("data_classification"), "timestamp": datetime.now().isoformat() })
# Check for suspicious patterns if self._is_suspicious(envelope): security_monitor.alert_suspicious_activity(envelope)
envelope.add_trace("transformer:security_logging") return envelope
def _is_suspicious(self, envelope: BisquitEnvelope) -> bool: """Detect potentially suspicious activity.""" # Check for unusual access patterns access_count = envelope.metadata.get("access_count", 0) if access_count > 100: # High access frequency return True
# Check for sensitive data access sensitivity = envelope.metadata.get("sensitivity_level", 0) if sensitivity >= 2: # Confidential or restricted return True
return FalseCompliance Features
GDPR Compliance
Implement data subject rights:
class GDPRController: """Handle GDPR data subject requests."""
def __init__(self, config): self.config = config
async def right_to_access(self, user_id: str) -> list[BisquitEnvelope]: """Return all data for a user.""" query = f"user_id:{user_id}" return await self.provider.read(query)
async def right_to_be_forgotten(self, user_id: str) -> bool: """Delete all user data.""" # Mark for deletion (don't actually delete immediately) envelopes = await self.right_to_access(user_id)
for envelope in envelopes: envelope.metadata["gdpr_deletion_requested"] = True envelope.metadata["deletion_timestamp"] = datetime.now().isoformat() await self.provider.write(envelope)
# Schedule actual deletion after retention period await self._schedule_deletion(user_id) return True
async def data_portability(self, user_id: str) -> dict: """Export user data in portable format.""" envelopes = await self.right_to_access(user_id)
return { "user_id": user_id, "export_timestamp": datetime.now().isoformat(), "data": [envelope.model_dump() for envelope in envelopes] }Audit Logging
Comprehensive audit trails:
class AuditLogger: """Centralized audit logging."""
def __init__(self, config): self.config = config
async def log_operation(self, operation: str, envelope: BisquitEnvelope, user_id: str, metadata: dict = None): """Log auditable operation."""
audit_entry = { "timestamp": datetime.now().isoformat(), "operation": operation, "user_id": user_id, "envelope_id": envelope.id, "provenance": envelope.provenance, "data_classification": envelope.metadata.get("data_classification"), "ip_address": metadata.get("ip_address"), "user_agent": metadata.get("user_agent") }
# Store in secure audit log await self._store_audit_entry(audit_entry)
# Alert on sensitive operations if self._is_sensitive_operation(operation, envelope): await self._send_security_alert(audit_entry)
def _is_sensitive_operation(self, operation: str, envelope: BisquitEnvelope) -> bool: """Determine if operation requires alerting.""" sensitive_ops = ["delete", "export", "admin_access"] high_sensitivity = envelope.metadata.get("sensitivity_level", 0) >= 2
return operation in sensitive_ops or high_sensitivitySecurity Best Practices
Development
- Input Validation - Always validate user inputs
- Output Encoding - Encode outputs to prevent injection
- Error Handling - Don’t leak sensitive information in errors
- Dependency Updates - Keep dependencies updated for security patches
Deployment
- Principle of Least Privilege - Give minimal required permissions
- Network Segmentation - Isolate sensitive components
- Regular Backups - Secure backup and recovery procedures
- Monitoring - Continuous security monitoring and alerting
Operations
- Access Reviews - Regular review of user permissions
- Incident Response - Documented security incident procedures
- Training - Security awareness training for team members
- Compliance Audits - Regular security and compliance audits
Security Checklist
Pre-Deployment
- All secrets stored in environment variables
- Database user has minimal required permissions
- SSL/TLS enabled for all connections
- Security headers configured in web server
- Rate limiting implemented
Runtime Verification
- Authentication required for all API endpoints
- Authorization checks on data access
- Audit logging enabled and monitored
- Data classification implemented
- PII detection and handling active
Monitoring
- Security events logged and alerted
- Regular vulnerability scans performed
- Access patterns monitored for anomalies
- Compliance requirements met and documented
This security guide provides the foundation for deploying ContextRouter securely. Always consult with security experts for your specific compliance requirements and threat model.