""" Enhanced per-session configuration management for DNSRecon. Provides isolated configuration instances for each user session while supporting global caching. """ import os from typing import Dict, Optional class SessionConfig: """ Enhanced session-specific configuration that inherits from global config but maintains isolated API keys and provider settings while supporting global caching. """ def __init__(self): """Initialize enhanced session config with global cache support.""" # Copy all attributes from global config self.api_keys: Dict[str, Optional[str]] = { 'shodan': None } # Default settings (copied from global config) self.default_recursion_depth = 2 self.default_timeout = 30 self.max_concurrent_requests = 5 self.large_entity_threshold = 100 # Enhanced rate limiting settings (per session) self.rate_limits = { 'crtsh': 60, 'shodan': 60, 'dns': 100 } # Enhanced provider settings (per session) self.enabled_providers = { 'crtsh': True, 'dns': True, 'shodan': False } # Task-based execution settings self.task_retry_settings = { 'max_retries': 3, 'base_backoff_seconds': 1.0, 'max_backoff_seconds': 60.0, 'retry_on_rate_limit': True, 'retry_on_connection_error': True, 'retry_on_timeout': True } # Cache settings (global across all sessions) self.cache_settings = { 'enabled': True, 'expiry_hours': 12, 'cache_base_dir': '.cache', 'per_provider_directories': True, 'thread_safe_operations': True } # Logging configuration self.log_level = 'INFO' self.log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # Flask configuration (shared) self.flask_host = '127.0.0.1' self.flask_port = 5000 self.flask_debug = True # Session isolation settings self.session_isolation = { 'enforce_single_session_per_user': True, 'consolidate_session_data_on_replacement': True, 'user_fingerprinting_enabled': True, 'session_timeout_minutes': 60 } # Circuit breaker settings for provider reliability self.circuit_breaker = { 'enabled': True, 'failure_threshold': 5, # Failures before opening circuit 'recovery_timeout_seconds': 300, # 5 minutes before trying again 'half_open_max_calls': 3 # Test calls when recovering } def set_api_key(self, provider: str, api_key: str) -> bool: """ Set API key for a provider in this session. Args: provider: Provider name (shodan, etc) api_key: API key string (empty string to clear) Returns: bool: True if key was set successfully """ if provider in self.api_keys: # Handle clearing of API keys if api_key and api_key.strip(): self.api_keys[provider] = api_key.strip() self.enabled_providers[provider] = True else: self.api_keys[provider] = None self.enabled_providers[provider] = False return True return False def get_api_key(self, provider: str) -> Optional[str]: """ Get API key for a provider in this session. Args: provider: Provider name Returns: API key or None if not set """ return self.api_keys.get(provider) def is_provider_enabled(self, provider: str) -> bool: """ Check if a provider is enabled in this session. Args: provider: Provider name Returns: bool: True if provider is enabled """ return self.enabled_providers.get(provider, False) def get_rate_limit(self, provider: str) -> int: """ Get rate limit for a provider in this session. Args: provider: Provider name Returns: Rate limit in requests per minute """ return self.rate_limits.get(provider, 60) def get_task_retry_config(self) -> Dict[str, any]: """ Get task retry configuration for this session. Returns: Dictionary with retry settings """ return self.task_retry_settings.copy() def get_cache_config(self) -> Dict[str, any]: """ Get cache configuration (global settings). Returns: Dictionary with cache settings """ return self.cache_settings.copy() def is_circuit_breaker_enabled(self) -> bool: """Check if circuit breaker is enabled for provider reliability.""" return self.circuit_breaker.get('enabled', True) def get_circuit_breaker_config(self) -> Dict[str, any]: """Get circuit breaker configuration.""" return self.circuit_breaker.copy() def update_provider_settings(self, provider_updates: Dict[str, Dict[str, any]]) -> bool: """ Update provider-specific settings in bulk. Args: provider_updates: Dictionary of provider -> settings updates Returns: bool: True if updates were applied successfully """ try: for provider_name, updates in provider_updates.items(): # Update rate limits if 'rate_limit' in updates: self.rate_limits[provider_name] = updates['rate_limit'] # Update enabled status if 'enabled' in updates: self.enabled_providers[provider_name] = updates['enabled'] # Update API key if 'api_key' in updates: self.set_api_key(provider_name, updates['api_key']) return True except Exception as e: print(f"Error updating provider settings: {e}") return False def validate_configuration(self) -> Dict[str, any]: """ Validate the current configuration and return validation results. Returns: Dictionary with validation results and any issues found """ validation_result = { 'valid': True, 'warnings': [], 'errors': [], 'provider_status': {} } # Validate provider configurations for provider_name, enabled in self.enabled_providers.items(): provider_status = { 'enabled': enabled, 'has_api_key': bool(self.api_keys.get(provider_name)), 'rate_limit': self.rate_limits.get(provider_name, 60) } # Check for potential issues if enabled and provider_name in ['shodan'] and not provider_status['has_api_key']: validation_result['warnings'].append( f"Provider '{provider_name}' is enabled but missing API key" ) validation_result['provider_status'][provider_name] = provider_status # Validate task settings if self.task_retry_settings['max_retries'] > 10: validation_result['warnings'].append( f"High retry count ({self.task_retry_settings['max_retries']}) may cause long delays" ) # Validate concurrent settings if self.max_concurrent_requests > 10: validation_result['warnings'].append( f"High concurrency ({self.max_concurrent_requests}) may overwhelm providers" ) # Validate cache settings if not os.path.exists(self.cache_settings['cache_base_dir']): try: os.makedirs(self.cache_settings['cache_base_dir'], exist_ok=True) except Exception as e: validation_result['errors'].append(f"Cannot create cache directory: {e}") validation_result['valid'] = False return validation_result def load_from_env(self): """Load configuration from environment variables with enhanced validation.""" # Load API keys from environment if os.getenv('SHODAN_API_KEY') and not self.api_keys['shodan']: self.set_api_key('shodan', os.getenv('SHODAN_API_KEY')) print("Loaded Shodan API key from environment") # Override default settings from environment self.default_recursion_depth = int(os.getenv('DEFAULT_RECURSION_DEPTH', '2')) self.default_timeout = int(os.getenv('DEFAULT_TIMEOUT', '30')) self.max_concurrent_requests = int(os.getenv('MAX_CONCURRENT_REQUESTS', '5')) # Load task retry settings from environment if os.getenv('TASK_MAX_RETRIES'): self.task_retry_settings['max_retries'] = int(os.getenv('TASK_MAX_RETRIES')) if os.getenv('TASK_BASE_BACKOFF'): self.task_retry_settings['base_backoff_seconds'] = float(os.getenv('TASK_BASE_BACKOFF')) # Load cache settings from environment if os.getenv('CACHE_EXPIRY_HOURS'): self.cache_settings['expiry_hours'] = int(os.getenv('CACHE_EXPIRY_HOURS')) if os.getenv('CACHE_DISABLED'): self.cache_settings['enabled'] = os.getenv('CACHE_DISABLED').lower() != 'true' # Load circuit breaker settings if os.getenv('CIRCUIT_BREAKER_DISABLED'): self.circuit_breaker['enabled'] = os.getenv('CIRCUIT_BREAKER_DISABLED').lower() != 'true' # Flask settings self.flask_debug = os.getenv('FLASK_DEBUG', 'True').lower() == 'true' print("Enhanced configuration loaded from environment") def export_config_summary(self) -> Dict[str, any]: """ Export a summary of the current configuration for debugging/logging. Returns: Dictionary with configuration summary (API keys redacted) """ return { 'providers': { provider: { 'enabled': self.enabled_providers.get(provider, False), 'has_api_key': bool(self.api_keys.get(provider)), 'rate_limit': self.rate_limits.get(provider, 60) } for provider in self.enabled_providers.keys() }, 'task_settings': { 'max_retries': self.task_retry_settings['max_retries'], 'max_concurrent_requests': self.max_concurrent_requests, 'large_entity_threshold': self.large_entity_threshold }, 'cache_settings': { 'enabled': self.cache_settings['enabled'], 'expiry_hours': self.cache_settings['expiry_hours'], 'base_directory': self.cache_settings['cache_base_dir'] }, 'session_settings': { 'isolation_enabled': self.session_isolation['enforce_single_session_per_user'], 'consolidation_enabled': self.session_isolation['consolidate_session_data_on_replacement'], 'timeout_minutes': self.session_isolation['session_timeout_minutes'] }, 'circuit_breaker': { 'enabled': self.circuit_breaker['enabled'], 'failure_threshold': self.circuit_breaker['failure_threshold'], 'recovery_timeout': self.circuit_breaker['recovery_timeout_seconds'] } } def create_session_config() -> SessionConfig: """ Create a new enhanced session configuration instance. Returns: Configured SessionConfig instance """ session_config = SessionConfig() session_config.load_from_env() # Validate configuration and log any issues validation = session_config.validate_configuration() if validation['warnings']: print("Configuration warnings:") for warning in validation['warnings']: print(f" WARNING: {warning}") if validation['errors']: print("Configuration errors:") for error in validation['errors']: print(f" ERROR: {error}") if not validation['valid']: raise ValueError("Configuration validation failed - see errors above") print(f"Enhanced session configuration created successfully") return session_config def create_test_config() -> SessionConfig: """ Create a test configuration with safe defaults for testing. Returns: Test-safe SessionConfig instance """ test_config = SessionConfig() # Override settings for testing test_config.max_concurrent_requests = 2 test_config.task_retry_settings['max_retries'] = 1 test_config.task_retry_settings['base_backoff_seconds'] = 0.1 test_config.cache_settings['expiry_hours'] = 1 test_config.session_isolation['session_timeout_minutes'] = 10 print("Test configuration created") return test_config