163 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			163 lines
		
	
	
		
			6.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# dnsrecon-reduced/config.py
 | 
						|
 | 
						|
"""
 | 
						|
Configuration management for DNSRecon tool.
 | 
						|
Handles API key storage, rate limiting, and default settings.
 | 
						|
"""
 | 
						|
 | 
						|
import os
 | 
						|
from typing import Dict, Optional
 | 
						|
from dotenv import load_dotenv
 | 
						|
 | 
						|
# Load environment variables from .env file
 | 
						|
load_dotenv()
 | 
						|
 | 
						|
class Config:
 | 
						|
    """Configuration manager for DNSRecon application."""
 | 
						|
    
 | 
						|
    def __init__(self):
 | 
						|
        """Initialize configuration with default values."""
 | 
						|
        self.api_keys: Dict[str, Optional[str]] = {}
 | 
						|
        
 | 
						|
        # --- General Settings ---
 | 
						|
        self.default_recursion_depth = 2
 | 
						|
        self.default_timeout = 60
 | 
						|
        self.max_concurrent_requests = 1
 | 
						|
        self.large_entity_threshold = 100
 | 
						|
        self.max_retries_per_target = 8
 | 
						|
        
 | 
						|
        # --- Graph Polling Performance Settings ---
 | 
						|
        self.graph_polling_node_threshold = 100  # Stop graph auto-polling above this many nodes
 | 
						|
        
 | 
						|
        # --- Provider Caching Settings ---
 | 
						|
        self.cache_timeout_hours = 6  # Provider-specific cache timeout
 | 
						|
        
 | 
						|
        # --- Rate Limiting (requests per minute) ---
 | 
						|
        self.rate_limits = {
 | 
						|
            'crtsh': 5,
 | 
						|
            'shodan': 60,
 | 
						|
            'dns': 100,
 | 
						|
            'correlation': 0 # Set to 0 to make sure correlations run last
 | 
						|
        }
 | 
						|
        
 | 
						|
        # --- Provider Settings ---
 | 
						|
        self.enabled_providers = {
 | 
						|
            'crtsh': True,
 | 
						|
            'dns': True,
 | 
						|
            'shodan': False,
 | 
						|
            'correlation': True # Enable the new provider by default
 | 
						|
        }
 | 
						|
        
 | 
						|
        # --- Logging ---
 | 
						|
        self.log_level = 'INFO'
 | 
						|
        self.log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 | 
						|
        
 | 
						|
        # --- Flask & Session Settings ---
 | 
						|
        self.flask_host = '127.0.0.1'
 | 
						|
        self.flask_port = 5000
 | 
						|
        self.flask_debug = True
 | 
						|
        self.flask_secret_key = 'default-secret-key-change-me'
 | 
						|
        self.flask_permanent_session_lifetime_hours = 2
 | 
						|
        self.session_timeout_minutes = 60
 | 
						|
        
 | 
						|
        # Load environment variables to override defaults
 | 
						|
        self.load_from_env()
 | 
						|
        
 | 
						|
    def load_from_env(self):
 | 
						|
        """Load configuration from environment variables."""
 | 
						|
        self.set_api_key('shodan', os.getenv('SHODAN_API_KEY'))
 | 
						|
 | 
						|
        # Override settings from environment
 | 
						|
        self.default_recursion_depth = int(os.getenv('DEFAULT_RECURSION_DEPTH', self.default_recursion_depth))
 | 
						|
        self.default_timeout = int(os.getenv('DEFAULT_TIMEOUT', self.default_timeout))
 | 
						|
        self.max_concurrent_requests = int(os.getenv('MAX_CONCURRENT_REQUESTS', self.max_concurrent_requests))
 | 
						|
        self.large_entity_threshold = int(os.getenv('LARGE_ENTITY_THRESHOLD', self.large_entity_threshold))
 | 
						|
        self.max_retries_per_target = int(os.getenv('MAX_RETRIES_PER_TARGET', self.max_retries_per_target))
 | 
						|
        self.cache_timeout_hours = int(os.getenv('CACHE_TIMEOUT_HOURS', self.cache_timeout_hours))
 | 
						|
        
 | 
						|
        # Override graph polling threshold from environment
 | 
						|
        self.graph_polling_node_threshold = int(os.getenv('GRAPH_POLLING_NODE_THRESHOLD', self.graph_polling_node_threshold))
 | 
						|
        
 | 
						|
        # Override Flask and session settings
 | 
						|
        self.flask_host = os.getenv('FLASK_HOST', self.flask_host)
 | 
						|
        self.flask_port = int(os.getenv('FLASK_PORT', self.flask_port))
 | 
						|
        self.flask_debug = os.getenv('FLASK_DEBUG', str(self.flask_debug)).lower() == 'true'
 | 
						|
        self.flask_secret_key = os.getenv('FLASK_SECRET_KEY', self.flask_secret_key)
 | 
						|
        self.flask_permanent_session_lifetime_hours = int(os.getenv('FLASK_PERMANENT_SESSION_LIFETIME_HOURS', self.flask_permanent_session_lifetime_hours))
 | 
						|
        self.session_timeout_minutes = int(os.getenv('SESSION_TIMEOUT_MINUTES', self.session_timeout_minutes))
 | 
						|
 | 
						|
    def set_api_key(self, provider: str, api_key: Optional[str]) -> bool:
 | 
						|
        """Set API key for a provider."""
 | 
						|
        self.api_keys[provider] = api_key
 | 
						|
        if api_key:
 | 
						|
            self.enabled_providers[provider] = True
 | 
						|
        return True
 | 
						|
    
 | 
						|
    def set_provider_enabled(self, provider: str, enabled: bool) -> bool:
 | 
						|
        """
 | 
						|
        Set provider enabled status for the session.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            provider: Provider name
 | 
						|
            enabled: Whether the provider should be enabled
 | 
						|
            
 | 
						|
        Returns:
 | 
						|
            True if the setting was applied successfully
 | 
						|
        """
 | 
						|
        provider_key = provider.lower()
 | 
						|
        self.enabled_providers[provider_key] = enabled
 | 
						|
        return True
 | 
						|
 | 
						|
    def get_provider_enabled(self, provider: str) -> bool:
 | 
						|
        """
 | 
						|
        Get provider enabled status.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            provider: Provider name
 | 
						|
            
 | 
						|
        Returns:
 | 
						|
            True if the provider is enabled
 | 
						|
        """
 | 
						|
        provider_key = provider.lower()
 | 
						|
        return self.enabled_providers.get(provider_key, True)  # Default to enabled
 | 
						|
 | 
						|
    def bulk_set_provider_settings(self, provider_settings: dict) -> dict:
 | 
						|
        """
 | 
						|
        Set multiple provider settings at once.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            provider_settings: Dict of provider_name -> {'enabled': bool, ...}
 | 
						|
            
 | 
						|
        Returns:
 | 
						|
            Dict with results for each provider
 | 
						|
        """
 | 
						|
        results = {}
 | 
						|
        
 | 
						|
        for provider_name, settings in provider_settings.items():
 | 
						|
            provider_key = provider_name.lower()
 | 
						|
            
 | 
						|
            try:
 | 
						|
                if 'enabled' in settings:
 | 
						|
                    self.enabled_providers[provider_key] = settings['enabled']
 | 
						|
                    results[provider_key] = {'success': True, 'enabled': settings['enabled']}
 | 
						|
                else:
 | 
						|
                    results[provider_key] = {'success': False, 'error': 'No enabled setting provided'}
 | 
						|
            except Exception as e:
 | 
						|
                results[provider_key] = {'success': False, 'error': str(e)}
 | 
						|
        
 | 
						|
        return results
 | 
						|
    
 | 
						|
    def get_api_key(self, provider: str) -> Optional[str]:
 | 
						|
        """Get API key for a provider."""
 | 
						|
        return self.api_keys.get(provider)
 | 
						|
    
 | 
						|
    def is_provider_enabled(self, provider: str) -> bool:
 | 
						|
        """Check if a provider is enabled."""
 | 
						|
        return self.enabled_providers.get(provider, False)
 | 
						|
    
 | 
						|
    def get_rate_limit(self, provider: str) -> int:
 | 
						|
        """Get rate limit for a provider."""
 | 
						|
        return self.rate_limits.get(provider, 60)
 | 
						|
 | 
						|
# Global configuration instance
 | 
						|
config = Config() |