Compare commits

...

2 Commits

Author SHA1 Message Date
overcuriousity
4378146d0c it 2025-09-14 13:14:02 +02:00
overcuriousity
b26002eff9 fix race condition 2025-09-14 01:40:17 +02:00
8 changed files with 1827 additions and 711 deletions

1
.gitignore vendored
View File

@ -169,3 +169,4 @@ cython_debug/
#.idea/ #.idea/
dump.rdb dump.rdb
.vscode

155
app.py
View File

@ -1,6 +1,6 @@
""" """
Flask application entry point for DNSRecon web interface. Flask application entry point for DNSRecon web interface.
Provides REST API endpoints and serves the web interface with user session support. Enhanced with user session management and task-based completion model.
""" """
import json import json
@ -9,7 +9,7 @@ from flask import Flask, render_template, request, jsonify, send_file, session
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import io import io
from core.session_manager import session_manager from core.session_manager import session_manager, UserIdentifier
from config import config from config import config
@ -17,46 +17,73 @@ app = Flask(__name__)
app.config['SECRET_KEY'] = 'dnsrecon-dev-key-change-in-production' app.config['SECRET_KEY'] = 'dnsrecon-dev-key-change-in-production'
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=2) # 2 hour session lifetime app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(hours=2) # 2 hour session lifetime
def get_user_scanner(): def get_user_scanner():
""" """
User scanner retrieval with better error handling and debugging. Enhanced user scanner retrieval with user identification and session consolidation.
Implements single session per user with seamless consolidation.
""" """
# Get current Flask session info for debugging print("=== ENHANCED GET_USER_SCANNER ===")
current_flask_session_id = session.get('dnsrecon_session_id')
client_ip = request.remote_addr
user_agent = request.headers.get('User-Agent', '')[:100] # Truncate for logging
# Try to get existing session try:
if current_flask_session_id: # Extract user identification from request
existing_scanner = session_manager.get_session(current_flask_session_id) client_ip, user_agent = UserIdentifier.extract_request_info(request)
if existing_scanner: user_fingerprint = UserIdentifier.generate_user_fingerprint(client_ip, user_agent)
# Ensure session ID is set
existing_scanner.session_id = current_flask_session_id
return current_flask_session_id, existing_scanner
else:
print(f"Session {current_flask_session_id} not found in session manager")
# Create new session print(f"User fingerprint: {user_fingerprint}")
print("Creating new session...") print(f"Client IP: {client_ip}")
new_session_id = session_manager.create_session() print(f"User Agent: {user_agent[:50]}...")
new_scanner = session_manager.get_session(new_session_id)
if not new_scanner: # Get current Flask session info for debugging
print(f"ERROR: Failed to retrieve newly created session {new_session_id}") current_flask_session_id = session.get('dnsrecon_session_id')
raise Exception("Failed to create new scanner session") print(f"Flask session ID: {current_flask_session_id}")
# Store in Flask session # Try to get existing session first
session['dnsrecon_session_id'] = new_session_id if current_flask_session_id:
session.permanent = True existing_scanner = session_manager.get_session(current_flask_session_id)
if existing_scanner:
# Verify session belongs to current user
session_info = session_manager.get_session_info(current_flask_session_id)
if session_info.get('user_fingerprint') == user_fingerprint:
print(f"Found valid existing session {current_flask_session_id} for user {user_fingerprint}")
existing_scanner.session_id = current_flask_session_id
return current_flask_session_id, existing_scanner
else:
print(f"Session {current_flask_session_id} belongs to different user, will create new session")
else:
print(f"Session {current_flask_session_id} not found in Redis, will create new session")
# Ensure session ID is set on scanner # Create or replace user session (this handles consolidation automatically)
new_scanner.session_id = new_session_id new_session_id = session_manager.create_or_replace_user_session(client_ip, user_agent)
new_scanner = session_manager.get_session(new_session_id)
print(f"Created new session: {new_session_id}") if not new_scanner:
print(f"New scanner status: {new_scanner.status}") print(f"ERROR: Failed to retrieve newly created session {new_session_id}")
print("=== END SESSION DEBUG ===") raise Exception("Failed to create new scanner session")
# Store in Flask session for browser persistence
session['dnsrecon_session_id'] = new_session_id
session.permanent = True
# Ensure session ID is set on scanner
new_scanner.session_id = new_session_id
# Get session info for user feedback
session_info = session_manager.get_session_info(new_session_id)
print(f"Session created/consolidated successfully")
print(f" - Session ID: {new_session_id}")
print(f" - User: {user_fingerprint}")
print(f" - Scanner status: {new_scanner.status}")
print(f" - Session age: {session_info.get('session_age_minutes', 0)} minutes")
return new_session_id, new_scanner
except Exception as e:
print(f"ERROR: Exception in get_user_scanner: {e}")
traceback.print_exc()
raise
return new_session_id, new_scanner
@app.route('/') @app.route('/')
def index(): def index():
@ -67,7 +94,7 @@ def index():
@app.route('/api/scan/start', methods=['POST']) @app.route('/api/scan/start', methods=['POST'])
def start_scan(): def start_scan():
""" """
Start a new reconnaissance scan with immediate GUI feedback. Start a new reconnaissance scan with enhanced user session management.
""" """
print("=== API: /api/scan/start called ===") print("=== API: /api/scan/start called ===")
@ -87,7 +114,7 @@ def start_scan():
max_depth = data.get('max_depth', config.default_recursion_depth) max_depth = data.get('max_depth', config.default_recursion_depth)
clear_graph = data.get('clear_graph', True) clear_graph = data.get('clear_graph', True)
print(f"Parsed - target_domain: '{target_domain}', max_depth: {max_depth}") print(f"Parsed - target_domain: '{target_domain}', max_depth: {max_depth}, clear_graph: {clear_graph}")
# Validation # Validation
if not target_domain: if not target_domain:
@ -106,7 +133,7 @@ def start_scan():
print("Validation passed, getting user scanner...") print("Validation passed, getting user scanner...")
# Get user-specific scanner # Get user-specific scanner with enhanced session management
user_session_id, scanner = get_user_scanner() user_session_id, scanner = get_user_scanner()
# Ensure session ID is properly set # Ensure session ID is properly set
@ -126,12 +153,21 @@ def start_scan():
if success: if success:
scan_session_id = scanner.logger.session_id scan_session_id = scanner.logger.session_id
print(f"Scan started successfully with scan session ID: {scan_session_id}") print(f"Scan started successfully with scan session ID: {scan_session_id}")
# Get session info for user feedback
session_info = session_manager.get_session_info(user_session_id)
return jsonify({ return jsonify({
'success': True, 'success': True,
'message': 'Scan started successfully', 'message': 'Scan started successfully',
'scan_id': scan_session_id, 'scan_id': scan_session_id,
'user_session_id': user_session_id, 'user_session_id': user_session_id,
'scanner_status': scanner.status, 'scanner_status': scanner.status,
'session_info': {
'user_fingerprint': session_info.get('user_fingerprint', 'unknown'),
'session_age_minutes': session_info.get('session_age_minutes', 0),
'consolidated': session_info.get('session_age_minutes', 0) > 0
},
'debug_info': { 'debug_info': {
'scanner_object_id': id(scanner), 'scanner_object_id': id(scanner),
'scanner_status': scanner.status 'scanner_status': scanner.status
@ -216,7 +252,7 @@ def stop_scan():
@app.route('/api/scan/status', methods=['GET']) @app.route('/api/scan/status', methods=['GET'])
def get_scan_status(): def get_scan_status():
"""Get current scan status with error handling.""" """Get current scan status with enhanced session information."""
try: try:
# Get user-specific scanner # Get user-specific scanner
user_session_id, scanner = get_user_scanner() user_session_id, scanner = get_user_scanner()
@ -247,6 +283,15 @@ def get_scan_status():
status = scanner.get_scan_status() status = scanner.get_scan_status()
status['user_session_id'] = user_session_id status['user_session_id'] = user_session_id
# Add enhanced session information
session_info = session_manager.get_session_info(user_session_id)
status['session_info'] = {
'user_fingerprint': session_info.get('user_fingerprint', 'unknown'),
'session_age_minutes': session_info.get('session_age_minutes', 0),
'client_ip': session_info.get('client_ip', 'unknown'),
'last_activity': session_info.get('last_activity')
}
# Additional debug info # Additional debug info
status['debug_info'] = { status['debug_info'] = {
'scanner_object_id': id(scanner), 'scanner_object_id': id(scanner),
@ -275,7 +320,6 @@ def get_scan_status():
}), 500 }), 500
@app.route('/api/graph', methods=['GET']) @app.route('/api/graph', methods=['GET'])
def get_graph_data(): def get_graph_data():
"""Get current graph data with error handling.""" """Get current graph data with error handling."""
@ -321,7 +365,6 @@ def get_graph_data():
}), 500 }), 500
@app.route('/api/export', methods=['GET']) @app.route('/api/export', methods=['GET'])
def export_results(): def export_results():
"""Export complete scan results as downloadable JSON for the user session.""" """Export complete scan results as downloadable JSON for the user session."""
@ -332,17 +375,22 @@ def export_results():
# Get complete results # Get complete results
results = scanner.export_results() results = scanner.export_results()
# Add session information to export # Add enhanced session information to export
session_info = session_manager.get_session_info(user_session_id)
results['export_metadata'] = { results['export_metadata'] = {
'user_session_id': user_session_id, 'user_session_id': user_session_id,
'user_fingerprint': session_info.get('user_fingerprint', 'unknown'),
'client_ip': session_info.get('client_ip', 'unknown'),
'session_age_minutes': session_info.get('session_age_minutes', 0),
'export_timestamp': datetime.now(timezone.utc).isoformat(), 'export_timestamp': datetime.now(timezone.utc).isoformat(),
'export_type': 'user_session_results' 'export_type': 'user_session_results'
} }
# Create filename with timestamp # Create filename with user fingerprint
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
target = scanner.current_target or 'unknown' target = scanner.current_target or 'unknown'
filename = f"dnsrecon_{target}_{timestamp}_{user_session_id[:8]}.json" user_fp = session_info.get('user_fingerprint', 'unknown')[:8]
filename = f"dnsrecon_{target}_{timestamp}_{user_fp}.json"
# Create in-memory file # Create in-memory file
json_data = json.dumps(results, indent=2, ensure_ascii=False) json_data = json.dumps(results, indent=2, ensure_ascii=False)
@ -450,7 +498,7 @@ def set_api_keys():
@app.route('/api/session/info', methods=['GET']) @app.route('/api/session/info', methods=['GET'])
def get_session_info(): def get_session_info():
"""Get information about the current user session.""" """Get enhanced information about the current user session."""
try: try:
user_session_id, scanner = get_user_scanner() user_session_id, scanner = get_user_scanner()
session_info = session_manager.get_session_info(user_session_id) session_info = session_manager.get_session_info(user_session_id)
@ -501,7 +549,7 @@ def terminate_session():
@app.route('/api/admin/sessions', methods=['GET']) @app.route('/api/admin/sessions', methods=['GET'])
def list_sessions(): def list_sessions():
"""Admin endpoint to list all active sessions.""" """Admin endpoint to list all active sessions with enhanced information."""
try: try:
sessions = session_manager.list_active_sessions() sessions = session_manager.list_active_sessions()
stats = session_manager.get_statistics() stats = session_manager.get_statistics()
@ -523,7 +571,7 @@ def list_sessions():
@app.route('/api/health', methods=['GET']) @app.route('/api/health', methods=['GET'])
def health_check(): def health_check():
"""Health check endpoint.""" """Health check endpoint with enhanced session statistics."""
try: try:
# Get session stats # Get session stats
session_stats = session_manager.get_statistics() session_stats = session_manager.get_statistics()
@ -532,8 +580,8 @@ def health_check():
'success': True, 'success': True,
'status': 'healthy', 'status': 'healthy',
'timestamp': datetime.now(timezone.utc).isoformat(), 'timestamp': datetime.now(timezone.utc).isoformat(),
'version': '1.0.0-phase2', 'version': '2.0.0-enhanced',
'phase': 2, 'phase': 'enhanced_architecture',
'features': { 'features': {
'multi_provider': True, 'multi_provider': True,
'concurrent_processing': True, 'concurrent_processing': True,
@ -542,9 +590,18 @@ def health_check():
'visualization': True, 'visualization': True,
'retry_logic': True, 'retry_logic': True,
'user_sessions': True, 'user_sessions': True,
'session_isolation': True 'session_isolation': True,
'global_provider_caching': True,
'single_session_per_user': True,
'session_consolidation': True,
'task_completion_model': True
}, },
'session_statistics': session_stats 'session_statistics': session_stats,
'cache_info': {
'global_provider_cache': True,
'cache_location': '.cache/<provider_name>/',
'cache_expiry_hours': 12
}
}) })
except Exception as e: except Exception as e:
print(f"ERROR: Exception in health_check endpoint: {e}") print(f"ERROR: Exception in health_check endpoint: {e}")
@ -575,7 +632,7 @@ def internal_error(error):
if __name__ == '__main__': if __name__ == '__main__':
print("Starting DNSRecon Flask application with user session support...") print("Starting DNSRecon Flask application with enhanced user session support...")
# Load configuration from environment # Load configuration from environment
config.load_from_env() config.load_from_env()

View File

@ -8,6 +8,7 @@ from .scanner import Scanner, ScanStatus
from .logger import ForensicLogger, get_forensic_logger, new_session from .logger import ForensicLogger, get_forensic_logger, new_session
from .session_manager import session_manager from .session_manager import session_manager
from .session_config import SessionConfig, create_session_config from .session_config import SessionConfig, create_session_config
from .task_manager import TaskManager, TaskType, ReconTask
__all__ = [ __all__ = [
'GraphManager', 'GraphManager',
@ -19,7 +20,10 @@ __all__ = [
'new_session', 'new_session',
'session_manager', 'session_manager',
'SessionConfig', 'SessionConfig',
'create_session_config' 'create_session_config',
'TaskManager',
'TaskType',
'ReconTask'
] ]
__version__ = "1.0.0-phase2" __version__ = "1.0.0-phase2"

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
""" """
Per-session configuration management for DNSRecon. Enhanced per-session configuration management for DNSRecon.
Provides isolated configuration instances for each user session. Provides isolated configuration instances for each user session while supporting global caching.
""" """
import os import os
@ -9,12 +9,12 @@ from typing import Dict, Optional
class SessionConfig: class SessionConfig:
""" """
Session-specific configuration that inherits from global config Enhanced session-specific configuration that inherits from global config
but maintains isolated API keys and provider settings. but maintains isolated API keys and provider settings while supporting global caching.
""" """
def __init__(self): def __init__(self):
"""Initialize session config with global defaults.""" """Initialize enhanced session config with global cache support."""
# Copy all attributes from global config # Copy all attributes from global config
self.api_keys: Dict[str, Optional[str]] = { self.api_keys: Dict[str, Optional[str]] = {
'shodan': None 'shodan': None
@ -26,20 +26,39 @@ class SessionConfig:
self.max_concurrent_requests = 5 self.max_concurrent_requests = 5
self.large_entity_threshold = 100 self.large_entity_threshold = 100
# Rate limiting settings (per session) # Enhanced rate limiting settings (per session)
self.rate_limits = { self.rate_limits = {
'crtsh': 60, 'crtsh': 60,
'shodan': 60, 'shodan': 60,
'dns': 100 'dns': 100
} }
# Provider settings (per session) # Enhanced provider settings (per session)
self.enabled_providers = { self.enabled_providers = {
'crtsh': True, 'crtsh': True,
'dns': True, 'dns': True,
'shodan': False 'shodan': False
} }
# Task-based execution settings
self.task_retry_settings = {
'max_retries': 3,
'base_backoff_seconds': 1.0,
'max_backoff_seconds': 60.0,
'retry_on_rate_limit': True,
'retry_on_connection_error': True,
'retry_on_timeout': True
}
# Cache settings (global across all sessions)
self.cache_settings = {
'enabled': True,
'expiry_hours': 12,
'cache_base_dir': '.cache',
'per_provider_directories': True,
'thread_safe_operations': True
}
# Logging configuration # Logging configuration
self.log_level = 'INFO' self.log_level = 'INFO'
self.log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' self.log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
@ -49,20 +68,41 @@ class SessionConfig:
self.flask_port = 5000 self.flask_port = 5000
self.flask_debug = True self.flask_debug = True
# Session isolation settings
self.session_isolation = {
'enforce_single_session_per_user': True,
'consolidate_session_data_on_replacement': True,
'user_fingerprinting_enabled': True,
'session_timeout_minutes': 60
}
# Circuit breaker settings for provider reliability
self.circuit_breaker = {
'enabled': True,
'failure_threshold': 5, # Failures before opening circuit
'recovery_timeout_seconds': 300, # 5 minutes before trying again
'half_open_max_calls': 3 # Test calls when recovering
}
def set_api_key(self, provider: str, api_key: str) -> bool: def set_api_key(self, provider: str, api_key: str) -> bool:
""" """
Set API key for a provider in this session. Set API key for a provider in this session.
Args: Args:
provider: Provider name (shodan, etc) provider: Provider name (shodan, etc)
api_key: API key string api_key: API key string (empty string to clear)
Returns: Returns:
bool: True if key was set successfully bool: True if key was set successfully
""" """
if provider in self.api_keys: if provider in self.api_keys:
self.api_keys[provider] = api_key # Handle clearing of API keys
self.enabled_providers[provider] = True if api_key else False if api_key and api_key.strip():
self.api_keys[provider] = api_key.strip()
self.enabled_providers[provider] = True
else:
self.api_keys[provider] = None
self.enabled_providers[provider] = False
return True return True
return False return False
@ -102,19 +142,231 @@ class SessionConfig:
""" """
return self.rate_limits.get(provider, 60) return self.rate_limits.get(provider, 60)
def get_task_retry_config(self) -> Dict[str, any]:
"""
Get task retry configuration for this session.
Returns:
Dictionary with retry settings
"""
return self.task_retry_settings.copy()
def get_cache_config(self) -> Dict[str, any]:
"""
Get cache configuration (global settings).
Returns:
Dictionary with cache settings
"""
return self.cache_settings.copy()
def is_circuit_breaker_enabled(self) -> bool:
"""Check if circuit breaker is enabled for provider reliability."""
return self.circuit_breaker.get('enabled', True)
def get_circuit_breaker_config(self) -> Dict[str, any]:
"""Get circuit breaker configuration."""
return self.circuit_breaker.copy()
def update_provider_settings(self, provider_updates: Dict[str, Dict[str, any]]) -> bool:
"""
Update provider-specific settings in bulk.
Args:
provider_updates: Dictionary of provider -> settings updates
Returns:
bool: True if updates were applied successfully
"""
try:
for provider_name, updates in provider_updates.items():
# Update rate limits
if 'rate_limit' in updates:
self.rate_limits[provider_name] = updates['rate_limit']
# Update enabled status
if 'enabled' in updates:
self.enabled_providers[provider_name] = updates['enabled']
# Update API key
if 'api_key' in updates:
self.set_api_key(provider_name, updates['api_key'])
return True
except Exception as e:
print(f"Error updating provider settings: {e}")
return False
def validate_configuration(self) -> Dict[str, any]:
"""
Validate the current configuration and return validation results.
Returns:
Dictionary with validation results and any issues found
"""
validation_result = {
'valid': True,
'warnings': [],
'errors': [],
'provider_status': {}
}
# Validate provider configurations
for provider_name, enabled in self.enabled_providers.items():
provider_status = {
'enabled': enabled,
'has_api_key': bool(self.api_keys.get(provider_name)),
'rate_limit': self.rate_limits.get(provider_name, 60)
}
# Check for potential issues
if enabled and provider_name in ['shodan'] and not provider_status['has_api_key']:
validation_result['warnings'].append(
f"Provider '{provider_name}' is enabled but missing API key"
)
validation_result['provider_status'][provider_name] = provider_status
# Validate task settings
if self.task_retry_settings['max_retries'] > 10:
validation_result['warnings'].append(
f"High retry count ({self.task_retry_settings['max_retries']}) may cause long delays"
)
# Validate concurrent settings
if self.max_concurrent_requests > 10:
validation_result['warnings'].append(
f"High concurrency ({self.max_concurrent_requests}) may overwhelm providers"
)
# Validate cache settings
if not os.path.exists(self.cache_settings['cache_base_dir']):
try:
os.makedirs(self.cache_settings['cache_base_dir'], exist_ok=True)
except Exception as e:
validation_result['errors'].append(f"Cannot create cache directory: {e}")
validation_result['valid'] = False
return validation_result
def load_from_env(self): def load_from_env(self):
"""Load configuration from environment variables (only if not already set).""" """Load configuration from environment variables with enhanced validation."""
# Load API keys from environment
if os.getenv('SHODAN_API_KEY') and not self.api_keys['shodan']: if os.getenv('SHODAN_API_KEY') and not self.api_keys['shodan']:
self.set_api_key('shodan', os.getenv('SHODAN_API_KEY')) self.set_api_key('shodan', os.getenv('SHODAN_API_KEY'))
print("Loaded Shodan API key from environment")
# Override default settings from environment # Override default settings from environment
self.default_recursion_depth = int(os.getenv('DEFAULT_RECURSION_DEPTH', '2')) self.default_recursion_depth = int(os.getenv('DEFAULT_RECURSION_DEPTH', '2'))
self.default_timeout = 30 self.default_timeout = int(os.getenv('DEFAULT_TIMEOUT', '30'))
self.max_concurrent_requests = 5 self.max_concurrent_requests = int(os.getenv('MAX_CONCURRENT_REQUESTS', '5'))
# Load task retry settings from environment
if os.getenv('TASK_MAX_RETRIES'):
self.task_retry_settings['max_retries'] = int(os.getenv('TASK_MAX_RETRIES'))
if os.getenv('TASK_BASE_BACKOFF'):
self.task_retry_settings['base_backoff_seconds'] = float(os.getenv('TASK_BASE_BACKOFF'))
# Load cache settings from environment
if os.getenv('CACHE_EXPIRY_HOURS'):
self.cache_settings['expiry_hours'] = int(os.getenv('CACHE_EXPIRY_HOURS'))
if os.getenv('CACHE_DISABLED'):
self.cache_settings['enabled'] = os.getenv('CACHE_DISABLED').lower() != 'true'
# Load circuit breaker settings
if os.getenv('CIRCUIT_BREAKER_DISABLED'):
self.circuit_breaker['enabled'] = os.getenv('CIRCUIT_BREAKER_DISABLED').lower() != 'true'
# Flask settings
self.flask_debug = os.getenv('FLASK_DEBUG', 'True').lower() == 'true'
print("Enhanced configuration loaded from environment")
def export_config_summary(self) -> Dict[str, any]:
"""
Export a summary of the current configuration for debugging/logging.
Returns:
Dictionary with configuration summary (API keys redacted)
"""
return {
'providers': {
provider: {
'enabled': self.enabled_providers.get(provider, False),
'has_api_key': bool(self.api_keys.get(provider)),
'rate_limit': self.rate_limits.get(provider, 60)
}
for provider in self.enabled_providers.keys()
},
'task_settings': {
'max_retries': self.task_retry_settings['max_retries'],
'max_concurrent_requests': self.max_concurrent_requests,
'large_entity_threshold': self.large_entity_threshold
},
'cache_settings': {
'enabled': self.cache_settings['enabled'],
'expiry_hours': self.cache_settings['expiry_hours'],
'base_directory': self.cache_settings['cache_base_dir']
},
'session_settings': {
'isolation_enabled': self.session_isolation['enforce_single_session_per_user'],
'consolidation_enabled': self.session_isolation['consolidate_session_data_on_replacement'],
'timeout_minutes': self.session_isolation['session_timeout_minutes']
},
'circuit_breaker': {
'enabled': self.circuit_breaker['enabled'],
'failure_threshold': self.circuit_breaker['failure_threshold'],
'recovery_timeout': self.circuit_breaker['recovery_timeout_seconds']
}
}
def create_session_config() -> SessionConfig: def create_session_config() -> SessionConfig:
"""Create a new session configuration instance.""" """
Create a new enhanced session configuration instance.
Returns:
Configured SessionConfig instance
"""
session_config = SessionConfig() session_config = SessionConfig()
session_config.load_from_env() session_config.load_from_env()
# Validate configuration and log any issues
validation = session_config.validate_configuration()
if validation['warnings']:
print("Configuration warnings:")
for warning in validation['warnings']:
print(f" WARNING: {warning}")
if validation['errors']:
print("Configuration errors:")
for error in validation['errors']:
print(f" ERROR: {error}")
if not validation['valid']:
raise ValueError("Configuration validation failed - see errors above")
print(f"Enhanced session configuration created successfully")
return session_config return session_config
def create_test_config() -> SessionConfig:
"""
Create a test configuration with safe defaults for testing.
Returns:
Test-safe SessionConfig instance
"""
test_config = SessionConfig()
# Override settings for testing
test_config.max_concurrent_requests = 2
test_config.task_retry_settings['max_retries'] = 1
test_config.task_retry_settings['base_backoff_seconds'] = 0.1
test_config.cache_settings['expiry_hours'] = 1
test_config.session_isolation['session_timeout_minutes'] = 10
print("Test configuration created")
return test_config

View File

@ -5,37 +5,153 @@ import time
import uuid import uuid
import redis import redis
import pickle import pickle
from typing import Dict, Optional, Any, List import hashlib
from typing import Dict, Optional, Any, List, Tuple
from core.scanner import Scanner from core.scanner import Scanner
# WARNING: Using pickle can be a security risk if the data source is not trusted.
# In this case, we are only serializing/deserializing our own trusted Scanner objects, class UserIdentifier:
# which is generally safe. Do not unpickle data from untrusted sources. """Handles user identification for session management."""
@staticmethod
def generate_user_fingerprint(client_ip: str, user_agent: str) -> str:
"""
Generate a unique fingerprint for a user based on IP and User-Agent.
Args:
client_ip: Client IP address
user_agent: User-Agent header value
Returns:
Unique user fingerprint hash
"""
# Create deterministic user identifier
user_data = f"{client_ip}:{user_agent[:100]}" # Limit UA to 100 chars
fingerprint = hashlib.sha256(user_data.encode()).hexdigest()[:16] # 16 char fingerprint
return f"user_{fingerprint}"
@staticmethod
def extract_request_info(request) -> Tuple[str, str]:
"""
Extract client IP and User-Agent from Flask request.
Args:
request: Flask request object
Returns:
Tuple of (client_ip, user_agent)
"""
# Handle proxy headers for real IP
client_ip = request.headers.get('X-Forwarded-For', '').split(',')[0].strip()
if not client_ip:
client_ip = request.headers.get('X-Real-IP', '')
if not client_ip:
client_ip = request.remote_addr or 'unknown'
user_agent = request.headers.get('User-Agent', 'unknown')
return client_ip, user_agent
class SessionConsolidator:
"""Handles consolidation of session data when replacing sessions."""
@staticmethod
def consolidate_scanner_data(old_scanner: 'Scanner', new_scanner: 'Scanner') -> 'Scanner':
"""
Consolidate useful data from old scanner into new scanner.
Args:
old_scanner: Scanner from terminated session
new_scanner: New scanner instance
Returns:
Enhanced new scanner with consolidated data
"""
try:
# Consolidate graph data if old scanner has valuable data
if old_scanner and hasattr(old_scanner, 'graph') and old_scanner.graph:
old_stats = old_scanner.graph.get_statistics()
if old_stats['basic_metrics']['total_nodes'] > 0:
print(f"Consolidating graph data: {old_stats['basic_metrics']['total_nodes']} nodes, {old_stats['basic_metrics']['total_edges']} edges")
# Transfer nodes and edges to new scanner's graph
for node_id, node_data in old_scanner.graph.graph.nodes(data=True):
# Add node to new graph with all attributes
new_scanner.graph.graph.add_node(node_id, **node_data)
for source, target, edge_data in old_scanner.graph.graph.edges(data=True):
# Add edge to new graph with all attributes
new_scanner.graph.graph.add_edge(source, target, **edge_data)
# Update correlation index
if hasattr(old_scanner.graph, 'correlation_index'):
new_scanner.graph.correlation_index = old_scanner.graph.correlation_index.copy()
# Update timestamps
new_scanner.graph.creation_time = old_scanner.graph.creation_time
new_scanner.graph.last_modified = old_scanner.graph.last_modified
# Consolidate provider statistics
if old_scanner and hasattr(old_scanner, 'providers') and old_scanner.providers:
for old_provider in old_scanner.providers:
# Find matching provider in new scanner
matching_new_provider = None
for new_provider in new_scanner.providers:
if new_provider.get_name() == old_provider.get_name():
matching_new_provider = new_provider
break
if matching_new_provider:
# Transfer cumulative statistics
matching_new_provider.total_requests += old_provider.total_requests
matching_new_provider.successful_requests += old_provider.successful_requests
matching_new_provider.failed_requests += old_provider.failed_requests
matching_new_provider.total_relationships_found += old_provider.total_relationships_found
# Transfer cache statistics if available
if hasattr(old_provider, 'cache_hits'):
matching_new_provider.cache_hits += getattr(old_provider, 'cache_hits', 0)
matching_new_provider.cache_misses += getattr(old_provider, 'cache_misses', 0)
print(f"Consolidated {old_provider.get_name()} provider stats: {old_provider.total_requests} requests")
return new_scanner
except Exception as e:
print(f"Warning: Error during session consolidation: {e}")
return new_scanner
class SessionManager: class SessionManager:
""" """
Manages multiple scanner instances for concurrent user sessions using Redis. Manages single scanner session per user using Redis with user identification.
Enforces one active session per user for consistent state management.
""" """
def __init__(self, session_timeout_minutes: int = 60): def __init__(self, session_timeout_minutes: int = 60):
""" """
Initialize session manager with a Redis backend. Initialize session manager with Redis backend and user tracking.
""" """
self.redis_client = redis.StrictRedis(db=0, decode_responses=False) self.redis_client = redis.StrictRedis(db=0, decode_responses=False)
self.session_timeout = session_timeout_minutes * 60 # Convert to seconds self.session_timeout = session_timeout_minutes * 60 # Convert to seconds
self.lock = threading.Lock() # Lock for local operations, Redis handles atomic ops self.lock = threading.Lock()
# User identification helper
self.user_identifier = UserIdentifier()
self.consolidator = SessionConsolidator()
# Start cleanup thread # Start cleanup thread
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start() self.cleanup_thread.start()
print(f"SessionManager initialized with Redis backend and {session_timeout_minutes}min timeout") print(f"SessionManager initialized with Redis backend, user tracking, and {session_timeout_minutes}min timeout")
def __getstate__(self): def __getstate__(self):
"""Prepare SessionManager for pickling.""" """Prepare SessionManager for pickling."""
state = self.__dict__.copy() state = self.__dict__.copy()
# Exclude unpickleable attributes - Redis client and threading objects # Exclude unpickleable attributes
unpicklable_attrs = ['lock', 'cleanup_thread', 'redis_client'] unpicklable_attrs = ['lock', 'cleanup_thread', 'redis_client']
for attr in unpicklable_attrs: for attr in unpicklable_attrs:
if attr in state: if attr in state:
@ -53,67 +169,108 @@ class SessionManager:
self.cleanup_thread.start() self.cleanup_thread.start()
def _get_session_key(self, session_id: str) -> str: def _get_session_key(self, session_id: str) -> str:
"""Generates the Redis key for a session.""" """Generate Redis key for a session."""
return f"dnsrecon:session:{session_id}" return f"dnsrecon:session:{session_id}"
def _get_user_session_key(self, user_fingerprint: str) -> str:
"""Generate Redis key for user -> session mapping."""
return f"dnsrecon:user:{user_fingerprint}"
def _get_stop_signal_key(self, session_id: str) -> str: def _get_stop_signal_key(self, session_id: str) -> str:
"""Generates the Redis key for a session's stop signal.""" """Generate Redis key for session stop signal."""
return f"dnsrecon:stop:{session_id}" return f"dnsrecon:stop:{session_id}"
def create_session(self) -> str: def create_or_replace_user_session(self, client_ip: str, user_agent: str) -> str:
""" """
Create a new user session and store it in Redis. Create new session for user, replacing any existing session.
Consolidates data from previous session if it exists.
Args:
client_ip: Client IP address
user_agent: User-Agent header
Returns:
New session ID
""" """
session_id = str(uuid.uuid4()) user_fingerprint = self.user_identifier.generate_user_fingerprint(client_ip, user_agent)
print(f"=== CREATING SESSION {session_id} IN REDIS ===") new_session_id = str(uuid.uuid4())
print(f"=== CREATING/REPLACING SESSION FOR USER {user_fingerprint} ===")
try: try:
# Check for existing user session
existing_session_id = self._get_user_current_session(user_fingerprint)
old_scanner = None
if existing_session_id:
print(f"Found existing session {existing_session_id} for user {user_fingerprint}")
# Get old scanner data for consolidation
old_scanner = self.get_session(existing_session_id)
# Terminate old session
self._terminate_session_internal(existing_session_id, cleanup_user_mapping=False)
print(f"Terminated old session {existing_session_id}")
# Create new session config and scanner
from core.session_config import create_session_config from core.session_config import create_session_config
session_config = create_session_config() session_config = create_session_config()
scanner_instance = Scanner(session_config=session_config) new_scanner = Scanner(session_config=session_config)
# Set the session ID on the scanner for cross-process stop signal management # Set session ID on scanner for cross-process operations
scanner_instance.session_id = session_id new_scanner.session_id = new_session_id
# Consolidate data from old session if available
if old_scanner:
new_scanner = self.consolidator.consolidate_scanner_data(old_scanner, new_scanner)
print(f"Consolidated data from previous session")
# Create session data
session_data = { session_data = {
'scanner': scanner_instance, 'scanner': new_scanner,
'config': session_config, 'config': session_config,
'created_at': time.time(), 'created_at': time.time(),
'last_activity': time.time(), 'last_activity': time.time(),
'status': 'active' 'status': 'active',
'user_fingerprint': user_fingerprint,
'client_ip': client_ip,
'user_agent': user_agent[:200] # Truncate for storage
} }
# Serialize the entire session data dictionary using pickle # Store session in Redis
session_key = self._get_session_key(new_session_id)
serialized_data = pickle.dumps(session_data) serialized_data = pickle.dumps(session_data)
# Store in Redis
session_key = self._get_session_key(session_id)
self.redis_client.setex(session_key, self.session_timeout, serialized_data) self.redis_client.setex(session_key, self.session_timeout, serialized_data)
# Initialize stop signal as False # Update user -> session mapping
stop_key = self._get_stop_signal_key(session_id) user_session_key = self._get_user_session_key(user_fingerprint)
self.redis_client.setex(user_session_key, self.session_timeout, new_session_id.encode('utf-8'))
# Initialize stop signal
stop_key = self._get_stop_signal_key(new_session_id)
self.redis_client.setex(stop_key, self.session_timeout, b'0') self.redis_client.setex(stop_key, self.session_timeout, b'0')
print(f"Session {session_id} stored in Redis with stop signal initialized") print(f"Created new session {new_session_id} for user {user_fingerprint}")
return session_id return new_session_id
except Exception as e: except Exception as e:
print(f"ERROR: Failed to create session {session_id}: {e}") print(f"ERROR: Failed to create session for user {user_fingerprint}: {e}")
raise raise
def _get_user_current_session(self, user_fingerprint: str) -> Optional[str]:
"""Get current session ID for a user."""
try:
user_session_key = self._get_user_session_key(user_fingerprint)
session_id_bytes = self.redis_client.get(user_session_key)
if session_id_bytes:
return session_id_bytes.decode('utf-8')
return None
except Exception as e:
print(f"Error getting user session: {e}")
return None
def set_stop_signal(self, session_id: str) -> bool: def set_stop_signal(self, session_id: str) -> bool:
""" """Set stop signal for session (cross-process safe)."""
Set the stop signal for a session (cross-process safe).
Args:
session_id: Session identifier
Returns:
bool: True if signal was set successfully
"""
try: try:
stop_key = self._get_stop_signal_key(session_id) stop_key = self._get_stop_signal_key(session_id)
# Set stop signal to '1' with the same TTL as the session
self.redis_client.setex(stop_key, self.session_timeout, b'1') self.redis_client.setex(stop_key, self.session_timeout, b'1')
print(f"Stop signal set for session {session_id}") print(f"Stop signal set for session {session_id}")
return True return True
@ -122,15 +279,7 @@ class SessionManager:
return False return False
def is_stop_requested(self, session_id: str) -> bool: def is_stop_requested(self, session_id: str) -> bool:
""" """Check if stop is requested for session (cross-process safe)."""
Check if stop is requested for a session (cross-process safe).
Args:
session_id: Session identifier
Returns:
bool: True if stop is requested
"""
try: try:
stop_key = self._get_stop_signal_key(session_id) stop_key = self._get_stop_signal_key(session_id)
value = self.redis_client.get(stop_key) value = self.redis_client.get(stop_key)
@ -140,15 +289,7 @@ class SessionManager:
return False return False
def clear_stop_signal(self, session_id: str) -> bool: def clear_stop_signal(self, session_id: str) -> bool:
""" """Clear stop signal for session."""
Clear the stop signal for a session.
Args:
session_id: Session identifier
Returns:
bool: True if signal was cleared successfully
"""
try: try:
stop_key = self._get_stop_signal_key(session_id) stop_key = self._get_stop_signal_key(session_id)
self.redis_client.setex(stop_key, self.session_timeout, b'0') self.redis_client.setex(stop_key, self.session_timeout, b'0')
@ -159,13 +300,13 @@ class SessionManager:
return False return False
def _get_session_data(self, session_id: str) -> Optional[Dict[str, Any]]: def _get_session_data(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Retrieves and deserializes session data from Redis.""" """Retrieve and deserialize session data from Redis."""
try: try:
session_key = self._get_session_key(session_id) session_key = self._get_session_key(session_id)
serialized_data = self.redis_client.get(session_key) serialized_data = self.redis_client.get(session_key)
if serialized_data: if serialized_data:
session_data = pickle.loads(serialized_data) session_data = pickle.loads(serialized_data)
# Ensure the scanner has the correct session ID for stop signal checking # Ensure scanner has correct session ID
if 'scanner' in session_data and session_data['scanner']: if 'scanner' in session_data and session_data['scanner']:
session_data['scanner'].session_id = session_id session_data['scanner'].session_id = session_id
return session_data return session_data
@ -175,37 +316,32 @@ class SessionManager:
return None return None
def _save_session_data(self, session_id: str, session_data: Dict[str, Any]) -> bool: def _save_session_data(self, session_id: str, session_data: Dict[str, Any]) -> bool:
""" """Serialize and save session data to Redis with updated TTL."""
Serializes and saves session data back to Redis with updated TTL.
Returns:
bool: True if save was successful
"""
try: try:
session_key = self._get_session_key(session_id) session_key = self._get_session_key(session_id)
serialized_data = pickle.dumps(session_data) serialized_data = pickle.dumps(session_data)
result = self.redis_client.setex(session_key, self.session_timeout, serialized_data) result = self.redis_client.setex(session_key, self.session_timeout, serialized_data)
# Also refresh user mapping TTL if available
if 'user_fingerprint' in session_data:
user_session_key = self._get_user_session_key(session_data['user_fingerprint'])
self.redis_client.setex(user_session_key, self.session_timeout, session_id.encode('utf-8'))
return result return result
except Exception as e: except Exception as e:
print(f"ERROR: Failed to save session data for {session_id}: {e}") print(f"ERROR: Failed to save session data for {session_id}: {e}")
return False return False
def update_session_scanner(self, session_id: str, scanner: 'Scanner') -> bool: def update_session_scanner(self, session_id: str, scanner: 'Scanner') -> bool:
""" """Update scanner object in session with immediate persistence."""
Updates just the scanner object in a session with immediate persistence.
Returns:
bool: True if update was successful
"""
try: try:
session_data = self._get_session_data(session_id) session_data = self._get_session_data(session_id)
if session_data: if session_data:
# Ensure scanner has the session ID # Ensure scanner has session ID
scanner.session_id = session_id scanner.session_id = session_id
session_data['scanner'] = scanner session_data['scanner'] = scanner
session_data['last_activity'] = time.time() session_data['last_activity'] = time.time()
# Immediately save to Redis for GUI updates
success = self._save_session_data(session_id, session_data) success = self._save_session_data(session_id, session_data)
if success: if success:
print(f"Scanner state updated for session {session_id} (status: {scanner.status})") print(f"Scanner state updated for session {session_id} (status: {scanner.status})")
@ -220,16 +356,7 @@ class SessionManager:
return False return False
def update_scanner_status(self, session_id: str, status: str) -> bool: def update_scanner_status(self, session_id: str, status: str) -> bool:
""" """Quickly update scanner status for immediate GUI feedback."""
Quickly update just the scanner status for immediate GUI feedback.
Args:
session_id: Session identifier
status: New scanner status
Returns:
bool: True if update was successful
"""
try: try:
session_data = self._get_session_data(session_id) session_data = self._get_session_data(session_id)
if session_data and 'scanner' in session_data: if session_data and 'scanner' in session_data:
@ -248,9 +375,7 @@ class SessionManager:
return False return False
def get_session(self, session_id: str) -> Optional[Scanner]: def get_session(self, session_id: str) -> Optional[Scanner]:
""" """Get scanner instance for session with session ID management."""
Get scanner instance for a session from Redis with session ID management.
"""
if not session_id: if not session_id:
return None return None
@ -265,21 +390,13 @@ class SessionManager:
scanner = session_data.get('scanner') scanner = session_data.get('scanner')
if scanner: if scanner:
# Ensure the scanner can check the Redis-based stop signal # Ensure scanner can check Redis-based stop signal
scanner.session_id = session_id scanner.session_id = session_id
return scanner return scanner
def get_session_status_only(self, session_id: str) -> Optional[str]: def get_session_status_only(self, session_id: str) -> Optional[str]:
""" """Get scanner status without full session retrieval (for performance)."""
Get just the scanner status without full session retrieval (for performance).
Args:
session_id: Session identifier
Returns:
Scanner status string or None if not found
"""
try: try:
session_data = self._get_session_data(session_id) session_data = self._get_session_data(session_id)
if session_data and 'scanner' in session_data: if session_data and 'scanner' in session_data:
@ -290,16 +407,18 @@ class SessionManager:
return None return None
def terminate_session(self, session_id: str) -> bool: def terminate_session(self, session_id: str) -> bool:
""" """Terminate specific session with reliable stop signal and immediate status update."""
Terminate a specific session in Redis with reliable stop signal and immediate status update. return self._terminate_session_internal(session_id, cleanup_user_mapping=True)
"""
def _terminate_session_internal(self, session_id: str, cleanup_user_mapping: bool = True) -> bool:
"""Internal session termination with configurable user mapping cleanup."""
print(f"=== TERMINATING SESSION {session_id} ===") print(f"=== TERMINATING SESSION {session_id} ===")
try: try:
# First, set the stop signal # Set stop signal first
self.set_stop_signal(session_id) self.set_stop_signal(session_id)
# Update scanner status to stopped immediately for GUI feedback # Update scanner status immediately for GUI feedback
self.update_scanner_status(session_id, 'stopped') self.update_scanner_status(session_id, 'stopped')
session_data = self._get_session_data(session_id) session_data = self._get_session_data(session_id)
@ -310,16 +429,19 @@ class SessionManager:
scanner = session_data.get('scanner') scanner = session_data.get('scanner')
if scanner and scanner.status == 'running': if scanner and scanner.status == 'running':
print(f"Stopping scan for session: {session_id}") print(f"Stopping scan for session: {session_id}")
# The scanner will check the Redis stop signal
scanner.stop_scan() scanner.stop_scan()
# Update the scanner state immediately
self.update_session_scanner(session_id, scanner) self.update_session_scanner(session_id, scanner)
# Wait a moment for graceful shutdown # Wait for graceful shutdown
time.sleep(0.5) time.sleep(0.5)
# Delete session data and stop signal from Redis # Clean up user mapping if requested
if cleanup_user_mapping and 'user_fingerprint' in session_data:
user_session_key = self._get_user_session_key(session_data['user_fingerprint'])
self.redis_client.delete(user_session_key)
print(f"Cleaned up user mapping for {session_data['user_fingerprint']}")
# Delete session data and stop signal
session_key = self._get_session_key(session_id) session_key = self._get_session_key(session_id)
stop_key = self._get_stop_signal_key(session_id) stop_key = self._get_stop_signal_key(session_id)
self.redis_client.delete(session_key) self.redis_client.delete(session_key)
@ -333,23 +455,31 @@ class SessionManager:
return False return False
def _cleanup_loop(self) -> None: def _cleanup_loop(self) -> None:
""" """Background thread to cleanup inactive sessions and orphaned signals."""
Background thread to cleanup inactive sessions and orphaned stop signals.
"""
while True: while True:
try: try:
# Clean up orphaned stop signals # Clean up orphaned stop signals
stop_keys = self.redis_client.keys("dnsrecon:stop:*") stop_keys = self.redis_client.keys("dnsrecon:stop:*")
for stop_key in stop_keys: for stop_key in stop_keys:
# Extract session ID from stop key
session_id = stop_key.decode('utf-8').split(':')[-1] session_id = stop_key.decode('utf-8').split(':')[-1]
session_key = self._get_session_key(session_id) session_key = self._get_session_key(session_id)
# If session doesn't exist but stop signal does, clean it up
if not self.redis_client.exists(session_key): if not self.redis_client.exists(session_key):
self.redis_client.delete(stop_key) self.redis_client.delete(stop_key)
print(f"Cleaned up orphaned stop signal for session {session_id}") print(f"Cleaned up orphaned stop signal for session {session_id}")
# Clean up orphaned user mappings
user_keys = self.redis_client.keys("dnsrecon:user:*")
for user_key in user_keys:
session_id_bytes = self.redis_client.get(user_key)
if session_id_bytes:
session_id = session_id_bytes.decode('utf-8')
session_key = self._get_session_key(session_id)
if not self.redis_client.exists(session_key):
self.redis_client.delete(user_key)
print(f"Cleaned up orphaned user mapping for session {session_id}")
except Exception as e: except Exception as e:
print(f"Error in cleanup loop: {e}") print(f"Error in cleanup loop: {e}")
@ -369,6 +499,8 @@ class SessionManager:
scanner = session_data.get('scanner') scanner = session_data.get('scanner')
sessions.append({ sessions.append({
'session_id': session_id, 'session_id': session_id,
'user_fingerprint': session_data.get('user_fingerprint', 'unknown'),
'client_ip': session_data.get('client_ip', 'unknown'),
'created_at': session_data.get('created_at'), 'created_at': session_data.get('created_at'),
'last_activity': session_data.get('last_activity'), 'last_activity': session_data.get('last_activity'),
'scanner_status': scanner.status if scanner else 'unknown', 'scanner_status': scanner.status if scanner else 'unknown',
@ -384,9 +516,11 @@ class SessionManager:
"""Get session manager statistics.""" """Get session manager statistics."""
try: try:
session_keys = self.redis_client.keys("dnsrecon:session:*") session_keys = self.redis_client.keys("dnsrecon:session:*")
user_keys = self.redis_client.keys("dnsrecon:user:*")
stop_keys = self.redis_client.keys("dnsrecon:stop:*") stop_keys = self.redis_client.keys("dnsrecon:stop:*")
active_sessions = len(session_keys) active_sessions = len(session_keys)
unique_users = len(user_keys)
running_scans = 0 running_scans = 0
for session_key in session_keys: for session_key in session_keys:
@ -397,16 +531,46 @@ class SessionManager:
return { return {
'total_active_sessions': active_sessions, 'total_active_sessions': active_sessions,
'unique_users': unique_users,
'running_scans': running_scans, 'running_scans': running_scans,
'total_stop_signals': len(stop_keys) 'total_stop_signals': len(stop_keys),
'average_sessions_per_user': round(active_sessions / unique_users, 2) if unique_users > 0 else 0
} }
except Exception as e: except Exception as e:
print(f"ERROR: Failed to get statistics: {e}") print(f"ERROR: Failed to get statistics: {e}")
return { return {
'total_active_sessions': 0, 'total_active_sessions': 0,
'unique_users': 0,
'running_scans': 0, 'running_scans': 0,
'total_stop_signals': 0 'total_stop_signals': 0,
'average_sessions_per_user': 0
} }
def get_session_info(self, session_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific session."""
try:
session_data = self._get_session_data(session_id)
if not session_data:
return {'error': 'Session not found'}
scanner = session_data.get('scanner')
return {
'session_id': session_id,
'user_fingerprint': session_data.get('user_fingerprint', 'unknown'),
'client_ip': session_data.get('client_ip', 'unknown'),
'user_agent': session_data.get('user_agent', 'unknown'),
'created_at': session_data.get('created_at'),
'last_activity': session_data.get('last_activity'),
'status': session_data.get('status'),
'scanner_status': scanner.status if scanner else 'unknown',
'current_target': scanner.current_target if scanner else None,
'session_age_minutes': round((time.time() - session_data.get('created_at', time.time())) / 60, 1)
}
except Exception as e:
print(f"ERROR: Failed to get session info for {session_id}: {e}")
return {'error': f'Failed to get session info: {str(e)}'}
# Global session manager instance # Global session manager instance
session_manager = SessionManager(session_timeout_minutes=60) session_manager = SessionManager(session_timeout_minutes=60)

564
core/task_manager.py Normal file
View File

@ -0,0 +1,564 @@
# dnsrecon/core/task_manager.py
import threading
import time
import uuid
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from datetime import datetime, timezone, timedelta
from collections import deque
from utils.helpers import _is_valid_ip, _is_valid_domain
class TaskStatus(Enum):
"""Enumeration of task execution statuses."""
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED_RETRYING = "failed_retrying"
FAILED_PERMANENT = "failed_permanent"
CANCELLED = "cancelled"
class TaskType(Enum):
"""Enumeration of task types for provider queries."""
DOMAIN_QUERY = "domain_query"
IP_QUERY = "ip_query"
GRAPH_UPDATE = "graph_update"
@dataclass
class TaskResult:
"""Result of a task execution."""
success: bool
data: Optional[Any] = None
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ReconTask:
"""Represents a single reconnaissance task with retry logic."""
task_id: str
task_type: TaskType
target: str
provider_name: str
depth: int
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# Retry configuration
max_retries: int = 3
current_retry: int = 0
base_backoff_seconds: float = 1.0
max_backoff_seconds: float = 60.0
# Execution tracking
last_attempt_at: Optional[datetime] = None
next_retry_at: Optional[datetime] = None
execution_history: List[Dict[str, Any]] = field(default_factory=list)
# Results
result: Optional[TaskResult] = None
def __post_init__(self):
"""Initialize additional fields after creation."""
if not self.task_id:
self.task_id = str(uuid.uuid4())[:8]
def calculate_next_retry_time(self) -> datetime:
"""Calculate next retry time with exponential backoff and jitter."""
if self.current_retry >= self.max_retries:
return None
# Exponential backoff with jitter
backoff_time = min(
self.max_backoff_seconds,
self.base_backoff_seconds * (2 ** self.current_retry)
)
# Add jitter (±25%)
jitter = backoff_time * 0.25 * (0.5 - hash(self.task_id) % 1000 / 1000.0)
final_backoff = max(self.base_backoff_seconds, backoff_time + jitter)
return datetime.now(timezone.utc) + timedelta(seconds=final_backoff)
def should_retry(self) -> bool:
"""Determine if task should be retried based on status and retry count."""
if self.status != TaskStatus.FAILED_RETRYING:
return False
if self.current_retry >= self.max_retries:
return False
if self.next_retry_at and datetime.now(timezone.utc) < self.next_retry_at:
return False
return True
def mark_failed(self, error: str, metadata: Dict[str, Any] = None):
"""Mark task as failed and prepare for retry or permanent failure."""
self.current_retry += 1
self.last_attempt_at = datetime.now(timezone.utc)
# Record execution history
execution_record = {
'attempt': self.current_retry,
'timestamp': self.last_attempt_at.isoformat(),
'error': error,
'metadata': metadata or {}
}
self.execution_history.append(execution_record)
if self.current_retry >= self.max_retries:
self.status = TaskStatus.FAILED_PERMANENT
self.result = TaskResult(success=False, error=f"Permanent failure after {self.max_retries} attempts: {error}")
else:
self.status = TaskStatus.FAILED_RETRYING
self.next_retry_at = self.calculate_next_retry_time()
def mark_succeeded(self, data: Any = None, metadata: Dict[str, Any] = None):
"""Mark task as successfully completed."""
self.status = TaskStatus.SUCCEEDED
self.last_attempt_at = datetime.now(timezone.utc)
self.result = TaskResult(success=True, data=data, metadata=metadata or {})
# Record successful execution
execution_record = {
'attempt': self.current_retry + 1,
'timestamp': self.last_attempt_at.isoformat(),
'success': True,
'metadata': metadata or {}
}
self.execution_history.append(execution_record)
def get_summary(self) -> Dict[str, Any]:
"""Get task summary for progress reporting."""
return {
'task_id': self.task_id,
'task_type': self.task_type.value,
'target': self.target,
'provider': self.provider_name,
'status': self.status.value,
'current_retry': self.current_retry,
'max_retries': self.max_retries,
'created_at': self.created_at.isoformat(),
'last_attempt_at': self.last_attempt_at.isoformat() if self.last_attempt_at else None,
'next_retry_at': self.next_retry_at.isoformat() if self.next_retry_at else None,
'total_attempts': len(self.execution_history),
'has_result': self.result is not None
}
class TaskQueue:
"""Thread-safe task queue with retry logic and priority handling."""
def __init__(self, max_concurrent_tasks: int = 5):
"""Initialize task queue."""
self.max_concurrent_tasks = max_concurrent_tasks
self.tasks: Dict[str, ReconTask] = {}
self.pending_queue = deque()
self.retry_queue = deque()
self.running_tasks: Set[str] = set()
self._lock = threading.Lock()
self._stop_event = threading.Event()
def __getstate__(self):
"""Prepare TaskQueue for pickling by excluding unpicklable objects."""
state = self.__dict__.copy()
# Exclude the unpickleable '_lock' and '_stop_event' attributes
if '_lock' in state:
del state['_lock']
if '_stop_event' in state:
del state['_stop_event']
return state
def __setstate__(self, state):
"""Restore TaskQueue after unpickling by reconstructing threading objects."""
self.__dict__.update(state)
# Re-initialize the '_lock' and '_stop_event' attributes
self._lock = threading.Lock()
self._stop_event = threading.Event()
def add_task(self, task: ReconTask) -> str:
"""Add task to queue."""
with self._lock:
self.tasks[task.task_id] = task
self.pending_queue.append(task.task_id)
print(f"Added task {task.task_id}: {task.provider_name} query for {task.target}")
return task.task_id
def get_next_ready_task(self) -> Optional[ReconTask]:
"""Get next task ready for execution."""
with self._lock:
# Check if we have room for more concurrent tasks
if len(self.running_tasks) >= self.max_concurrent_tasks:
return None
# First priority: retry queue (tasks ready for retry)
while self.retry_queue:
task_id = self.retry_queue.popleft()
if task_id in self.tasks:
task = self.tasks[task_id]
if task.should_retry():
task.status = TaskStatus.RUNNING
self.running_tasks.add(task_id)
print(f"Retrying task {task_id} (attempt {task.current_retry + 1})")
return task
# Second priority: pending queue (new tasks)
while self.pending_queue:
task_id = self.pending_queue.popleft()
if task_id in self.tasks:
task = self.tasks[task_id]
if task.status == TaskStatus.PENDING:
task.status = TaskStatus.RUNNING
self.running_tasks.add(task_id)
print(f"Starting task {task_id}")
return task
return None
def complete_task(self, task_id: str, success: bool, data: Any = None,
error: str = None, metadata: Dict[str, Any] = None):
"""Mark task as completed (success or failure)."""
with self._lock:
if task_id not in self.tasks:
return
task = self.tasks[task_id]
self.running_tasks.discard(task_id)
if success:
task.mark_succeeded(data=data, metadata=metadata)
print(f"Task {task_id} succeeded")
else:
task.mark_failed(error or "Unknown error", metadata=metadata)
if task.status == TaskStatus.FAILED_RETRYING:
self.retry_queue.append(task_id)
print(f"Task {task_id} failed, scheduled for retry at {task.next_retry_at}")
else:
print(f"Task {task_id} permanently failed after {task.current_retry} attempts")
def cancel_all_tasks(self):
"""Cancel all pending and running tasks."""
with self._lock:
self._stop_event.set()
for task in self.tasks.values():
if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING, TaskStatus.FAILED_RETRYING]:
task.status = TaskStatus.CANCELLED
self.pending_queue.clear()
self.retry_queue.clear()
self.running_tasks.clear()
print("All tasks cancelled")
def is_complete(self) -> bool:
"""Check if all tasks are complete (succeeded, permanently failed, or cancelled)."""
with self._lock:
for task in self.tasks.values():
if task.status in [TaskStatus.PENDING, TaskStatus.RUNNING, TaskStatus.FAILED_RETRYING]:
return False
return True
def get_statistics(self) -> Dict[str, Any]:
"""Get queue statistics."""
with self._lock:
stats = {
'total_tasks': len(self.tasks),
'pending': len(self.pending_queue),
'running': len(self.running_tasks),
'retry_queue': len(self.retry_queue),
'succeeded': 0,
'failed_permanent': 0,
'cancelled': 0,
'failed_retrying': 0
}
for task in self.tasks.values():
if task.status == TaskStatus.SUCCEEDED:
stats['succeeded'] += 1
elif task.status == TaskStatus.FAILED_PERMANENT:
stats['failed_permanent'] += 1
elif task.status == TaskStatus.CANCELLED:
stats['cancelled'] += 1
elif task.status == TaskStatus.FAILED_RETRYING:
stats['failed_retrying'] += 1
stats['completion_rate'] = (stats['succeeded'] / stats['total_tasks'] * 100) if stats['total_tasks'] > 0 else 0
stats['is_complete'] = self.is_complete()
return stats
def get_task_summaries(self) -> List[Dict[str, Any]]:
"""Get summaries of all tasks for detailed progress reporting."""
with self._lock:
return [task.get_summary() for task in self.tasks.values()]
def get_failed_tasks(self) -> List[ReconTask]:
"""Get all permanently failed tasks for analysis."""
with self._lock:
return [task for task in self.tasks.values() if task.status == TaskStatus.FAILED_PERMANENT]
class TaskExecutor:
"""Executes reconnaissance tasks using providers."""
def __init__(self, providers: List, graph_manager, logger):
"""Initialize task executor."""
self.providers = {provider.get_name(): provider for provider in providers}
self.graph = graph_manager
self.logger = logger
def execute_task(self, task: ReconTask) -> TaskResult:
"""
Execute a single reconnaissance task.
Args:
task: Task to execute
Returns:
TaskResult with success/failure information
"""
try:
print(f"Executing task {task.task_id}: {task.provider_name} query for {task.target}")
provider = self.providers.get(task.provider_name)
if not provider:
return TaskResult(
success=False,
error=f"Provider {task.provider_name} not available"
)
if not provider.is_available():
return TaskResult(
success=False,
error=f"Provider {task.provider_name} is not available (missing API key or configuration)"
)
# Execute provider query based on task type
if task.task_type == TaskType.DOMAIN_QUERY:
if not _is_valid_domain(task.target):
return TaskResult(success=False, error=f"Invalid domain: {task.target}")
relationships = provider.query_domain(task.target)
elif task.task_type == TaskType.IP_QUERY:
if not _is_valid_ip(task.target):
return TaskResult(success=False, error=f"Invalid IP: {task.target}")
relationships = provider.query_ip(task.target)
else:
return TaskResult(success=False, error=f"Unsupported task type: {task.task_type}")
# Process results and update graph
new_targets = set()
relationships_added = 0
for source, target, rel_type, confidence, raw_data in relationships:
# Add nodes to graph
from core.graph_manager import NodeType
if _is_valid_ip(target):
self.graph.add_node(target, NodeType.IP)
new_targets.add(target)
elif target.startswith('AS') and target[2:].isdigit():
self.graph.add_node(target, NodeType.ASN)
elif _is_valid_domain(target):
self.graph.add_node(target, NodeType.DOMAIN)
new_targets.add(target)
# Add edge to graph
if self.graph.add_edge(source, target, rel_type, confidence, task.provider_name, raw_data):
relationships_added += 1
# Log forensic information
self.logger.logger.info(
f"Task {task.task_id} completed: {len(relationships)} relationships found, "
f"{relationships_added} added to graph, {len(new_targets)} new targets"
)
return TaskResult(
success=True,
data={
'relationships': relationships,
'new_targets': list(new_targets),
'relationships_added': relationships_added
},
metadata={
'provider': task.provider_name,
'target': task.target,
'depth': task.depth,
'execution_time': datetime.now(timezone.utc).isoformat()
}
)
except Exception as e:
error_msg = f"Task execution failed: {str(e)}"
print(f"ERROR: {error_msg} for task {task.task_id}")
self.logger.logger.error(error_msg)
return TaskResult(
success=False,
error=error_msg,
metadata={
'provider': task.provider_name,
'target': task.target,
'exception_type': type(e).__name__
}
)
class TaskManager:
"""High-level task management for reconnaissance scans."""
def __init__(self, providers: List, graph_manager, logger, max_concurrent_tasks: int = 5):
"""Initialize task manager."""
self.task_queue = TaskQueue(max_concurrent_tasks)
self.task_executor = TaskExecutor(providers, graph_manager, logger)
self.logger = logger
# Execution control
self._stop_event = threading.Event()
self._execution_threads: List[threading.Thread] = []
self._is_running = False
def create_provider_tasks(self, target: str, depth: int, providers: List) -> List[str]:
"""
Create tasks for querying all eligible providers for a target.
Args:
target: Domain or IP to query
depth: Current recursion depth
providers: List of available providers
Returns:
List of created task IDs
"""
task_ids = []
is_ip = _is_valid_ip(target)
target_key = 'ips' if is_ip else 'domains'
task_type = TaskType.IP_QUERY if is_ip else TaskType.DOMAIN_QUERY
for provider in providers:
if provider.get_eligibility().get(target_key) and provider.is_available():
task = ReconTask(
task_id=str(uuid.uuid4())[:8],
task_type=task_type,
target=target,
provider_name=provider.get_name(),
depth=depth,
max_retries=3 # Configure retries per task type/provider
)
task_id = self.task_queue.add_task(task)
task_ids.append(task_id)
return task_ids
def start_execution(self, max_workers: int = 3):
"""Start task execution with specified number of worker threads."""
if self._is_running:
print("Task execution already running")
return
self._is_running = True
self._stop_event.clear()
print(f"Starting task execution with {max_workers} workers")
for i in range(max_workers):
worker_thread = threading.Thread(
target=self._worker_loop,
name=f"TaskWorker-{i+1}",
daemon=True
)
worker_thread.start()
self._execution_threads.append(worker_thread)
def stop_execution(self):
"""Stop task execution and cancel all tasks."""
print("Stopping task execution")
self._stop_event.set()
self.task_queue.cancel_all_tasks()
self._is_running = False
# Wait for worker threads to finish
for thread in self._execution_threads:
thread.join(timeout=5.0)
self._execution_threads.clear()
print("Task execution stopped")
def _worker_loop(self):
"""Worker thread loop for executing tasks."""
thread_name = threading.current_thread().name
print(f"{thread_name} started")
while not self._stop_event.is_set():
try:
# Get next task to execute
task = self.task_queue.get_next_ready_task()
if task is None:
# No tasks ready, check if we should exit
if self.task_queue.is_complete() or self._stop_event.is_set():
break
time.sleep(0.1) # Brief sleep before checking again
continue
# Execute the task
result = self.task_executor.execute_task(task)
# Complete the task in queue
self.task_queue.complete_task(
task.task_id,
success=result.success,
data=result.data,
error=result.error,
metadata=result.metadata
)
except Exception as e:
print(f"ERROR: Worker {thread_name} encountered error: {e}")
# Continue running even if individual task fails
continue
print(f"{thread_name} finished")
def wait_for_completion(self, timeout_seconds: int = 300) -> bool:
"""
Wait for all tasks to complete.
Args:
timeout_seconds: Maximum time to wait
Returns:
True if all tasks completed, False if timeout
"""
start_time = time.time()
while time.time() - start_time < timeout_seconds:
if self.task_queue.is_complete():
return True
if self._stop_event.is_set():
return False
time.sleep(1.0) # Check every second
print(f"Timeout waiting for task completion after {timeout_seconds} seconds")
return False
def get_progress_report(self) -> Dict[str, Any]:
"""Get detailed progress report for UI updates."""
stats = self.task_queue.get_statistics()
failed_tasks = self.task_queue.get_failed_tasks()
return {
'statistics': stats,
'failed_tasks': [task.get_summary() for task in failed_tasks],
'is_running': self._is_running,
'worker_count': len(self._execution_threads),
'detailed_tasks': self.task_queue.get_task_summaries() if stats['total_tasks'] < 50 else [] # Limit detail for performance
}

View File

@ -5,14 +5,16 @@ import requests
import threading import threading
import os import os
import json import json
import hashlib
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timezone
from core.logger import get_forensic_logger from core.logger import get_forensic_logger
class RateLimiter: class RateLimiter:
"""Simple rate limiter for API calls.""" """Thread-safe rate limiter for API calls."""
def __init__(self, requests_per_minute: int): def __init__(self, requests_per_minute: int):
""" """
@ -24,36 +26,152 @@ class RateLimiter:
self.requests_per_minute = requests_per_minute self.requests_per_minute = requests_per_minute
self.min_interval = 60.0 / requests_per_minute self.min_interval = 60.0 / requests_per_minute
self.last_request_time = 0 self.last_request_time = 0
self._lock = threading.Lock()
def __getstate__(self): def __getstate__(self):
"""RateLimiter is fully picklable, return full state.""" """RateLimiter is fully picklable, return full state."""
return self.__dict__.copy() state = self.__dict__.copy()
# Exclude unpickleable lock
if '_lock' in state:
del state['_lock']
return state
def __setstate__(self, state): def __setstate__(self, state):
"""Restore RateLimiter state.""" """Restore RateLimiter state."""
self.__dict__.update(state) self.__dict__.update(state)
self._lock = threading.Lock()
def wait_if_needed(self) -> None: def wait_if_needed(self) -> None:
"""Wait if necessary to respect rate limits.""" """Wait if necessary to respect rate limits."""
current_time = time.time() with self._lock:
time_since_last = current_time - self.last_request_time current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_interval: if time_since_last < self.min_interval:
sleep_time = self.min_interval - time_since_last sleep_time = self.min_interval - time_since_last
time.sleep(sleep_time) time.sleep(sleep_time)
self.last_request_time = time.time() self.last_request_time = time.time()
class ProviderCache:
"""Thread-safe global cache for provider queries."""
def __init__(self, provider_name: str, cache_expiry_hours: int = 12):
"""
Initialize provider-specific cache.
Args:
provider_name: Name of the provider for cache directory
cache_expiry_hours: Cache expiry time in hours
"""
self.provider_name = provider_name
self.cache_expiry = cache_expiry_hours * 3600 # Convert to seconds
self.cache_dir = os.path.join('.cache', provider_name)
self._lock = threading.Lock()
# Ensure cache directory exists with thread-safe creation
os.makedirs(self.cache_dir, exist_ok=True)
def _generate_cache_key(self, method: str, url: str, params: Optional[Dict[str, Any]]) -> str:
"""Generate unique cache key for request."""
cache_data = f"{method}:{url}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_data.encode()).hexdigest() + ".json"
def get_cached_response(self, method: str, url: str, params: Optional[Dict[str, Any]]) -> Optional[requests.Response]:
"""
Retrieve cached response if available and not expired.
Returns:
Cached Response object or None if cache miss/expired
"""
cache_key = self._generate_cache_key(method, url, params)
cache_path = os.path.join(self.cache_dir, cache_key)
with self._lock:
if not os.path.exists(cache_path):
return None
# Check if cache is expired
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age >= self.cache_expiry:
try:
os.remove(cache_path)
except OSError:
pass # File might have been removed by another thread
return None
try:
with open(cache_path, 'r', encoding='utf-8') as f:
cached_data = json.load(f)
# Reconstruct Response object
response = requests.Response()
response.status_code = cached_data['status_code']
response._content = cached_data['content'].encode('utf-8')
response.headers.update(cached_data['headers'])
return response
except (json.JSONDecodeError, KeyError, IOError) as e:
# Cache file corrupted, remove it
try:
os.remove(cache_path)
except OSError:
pass
return None
def cache_response(self, method: str, url: str, params: Optional[Dict[str, Any]],
response: requests.Response) -> bool:
"""
Cache successful response to disk.
Returns:
True if cached successfully, False otherwise
"""
if response.status_code != 200:
return False
cache_key = self._generate_cache_key(method, url, params)
cache_path = os.path.join(self.cache_dir, cache_key)
with self._lock:
try:
cache_data = {
'status_code': response.status_code,
'content': response.text,
'headers': dict(response.headers),
'cached_at': datetime.now(timezone.utc).isoformat()
}
# Write to temporary file first, then rename for atomic operation
temp_path = cache_path + '.tmp'
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(cache_data, f)
# Atomic rename to prevent partial cache files
os.rename(temp_path, cache_path)
return True
except (IOError, OSError) as e:
# Clean up temp file if it exists
try:
if os.path.exists(temp_path):
os.remove(temp_path)
except OSError:
pass
return False
class BaseProvider(ABC): class BaseProvider(ABC):
""" """
Abstract base class for all DNSRecon data providers. Abstract base class for all DNSRecon data providers.
Now supports session-specific configuration. Now supports global provider-specific caching and session-specific configuration.
""" """
def __init__(self, name: str, rate_limit: int = 60, timeout: int = 30, session_config=None): def __init__(self, name: str, rate_limit: int = 60, timeout: int = 30, session_config=None):
""" """
Initialize base provider with session-specific configuration. Initialize base provider with global caching and session-specific configuration.
Args: Args:
name: Provider name for logging name: Provider name for logging
@ -80,28 +198,25 @@ class BaseProvider(ABC):
self.logger = get_forensic_logger() self.logger = get_forensic_logger()
self._stop_event = None self._stop_event = None
# Caching configuration (per session) # GLOBAL provider-specific caching (not session-based)
self.cache_dir = f'.cache/{id(self.config)}' # Unique cache per session config self.cache = ProviderCache(name, cache_expiry_hours=12)
self.cache_expiry = 12 * 3600 # 12 hours in seconds
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Statistics (per provider instance) # Statistics (per provider instance)
self.total_requests = 0 self.total_requests = 0
self.successful_requests = 0 self.successful_requests = 0
self.failed_requests = 0 self.failed_requests = 0
self.total_relationships_found = 0 self.total_relationships_found = 0
self.cache_hits = 0
self.cache_misses = 0
print(f"Initialized {name} provider with session-specific config (rate: {actual_rate_limit}/min)") print(f"Initialized {name} provider with global cache and session config (rate: {actual_rate_limit}/min)")
def __getstate__(self): def __getstate__(self):
"""Prepare BaseProvider for pickling by excluding unpicklable objects.""" """Prepare BaseProvider for pickling by excluding unpicklable objects."""
state = self.__dict__.copy() state = self.__dict__.copy()
# Exclude the unpickleable '_local' attribute and stop event # Exclude the unpickleable '_local' attribute and stop event
unpicklable_attrs = ['_local', '_stop_event'] state['_local'] = None
for attr in unpicklable_attrs: state['_stop_event'] = None
if attr in state:
del state[attr]
return state return state
def __setstate__(self, state): def __setstate__(self, state):
@ -116,7 +231,7 @@ class BaseProvider(ABC):
if not hasattr(self._local, 'session'): if not hasattr(self._local, 'session'):
self._local.session = requests.Session() self._local.session = requests.Session()
self._local.session.headers.update({ self._local.session.headers.update({
'User-Agent': 'DNSRecon/1.0 (Passive Reconnaissance Tool)' 'User-Agent': 'DNSRecon/2.0 (Passive Reconnaissance Tool)'
}) })
return self._local.session return self._local.session
@ -177,37 +292,28 @@ class BaseProvider(ABC):
target_indicator: str = "", target_indicator: str = "",
max_retries: int = 3) -> Optional[requests.Response]: max_retries: int = 3) -> Optional[requests.Response]:
""" """
Make a rate-limited HTTP request with aggressive stop signal handling. Make a rate-limited HTTP request with global caching and aggressive stop signal handling.
Terminates immediately when stop is requested, including during retries.
""" """
# Check for cancellation before starting # Check for cancellation before starting
if self._is_stop_requested(): if self._is_stop_requested():
print(f"Request cancelled before start: {url}") print(f"Request cancelled before start: {url}")
return None return None
# Create a unique cache key # Check global cache first
cache_key = f"{self.name}_{hash(f'{method}:{url}:{json.dumps(params, sort_keys=True)}')}.json" cached_response = self.cache.get_cached_response(method, url, params)
cache_path = os.path.join(self.cache_dir, cache_key) if cached_response is not None:
print(f"Cache hit for {self.name}: {url}")
self.cache_hits += 1
return cached_response
# Check cache self.cache_misses += 1
if os.path.exists(cache_path):
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age < self.cache_expiry:
print(f"Returning cached response for: {url}")
with open(cache_path, 'r') as f:
cached_data = json.load(f)
response = requests.Response()
response.status_code = cached_data['status_code']
response._content = cached_data['content'].encode('utf-8')
response.headers = cached_data['headers']
return response
# Determine effective max_retries based on stop signal # Determine effective max_retries based on stop signal
effective_max_retries = 0 if self._is_stop_requested() else max_retries effective_max_retries = 0 if self._is_stop_requested() else max_retries
last_exception = None last_exception = None
for attempt in range(effective_max_retries + 1): for attempt in range(effective_max_retries + 1):
# AGGRESSIVE: Check for cancellation before each attempt # Check for cancellation before each attempt
if self._is_stop_requested(): if self._is_stop_requested():
print(f"Request cancelled during attempt {attempt + 1}: {url}") print(f"Request cancelled during attempt {attempt + 1}: {url}")
return None return None
@ -217,7 +323,7 @@ class BaseProvider(ABC):
print(f"Request cancelled during rate limiting: {url}") print(f"Request cancelled during rate limiting: {url}")
return None return None
# AGGRESSIVE: Final check before making HTTP request # Final check before making HTTP request
if self._is_stop_requested(): if self._is_stop_requested():
print(f"Request cancelled before HTTP call: {url}") print(f"Request cancelled before HTTP call: {url}")
return None return None
@ -236,11 +342,8 @@ class BaseProvider(ABC):
print(f"Making {method} request to: {url} (attempt {attempt + 1})") print(f"Making {method} request to: {url} (attempt {attempt + 1})")
# AGGRESSIVE: Use much shorter timeout if termination is requested # Use shorter timeout if termination is requested
request_timeout = self.timeout request_timeout = 2 if self._is_stop_requested() else self.timeout
if self._is_stop_requested():
request_timeout = 2 # Max 2 seconds if termination requested
print(f"Stop requested - using short timeout: {request_timeout}s")
# Make request # Make request
if method.upper() == "GET": if method.upper() == "GET":
@ -276,13 +379,9 @@ class BaseProvider(ABC):
error=None, error=None,
target_indicator=target_indicator target_indicator=target_indicator
) )
# Cache the successful response to disk
with open(cache_path, 'w') as f: # Cache the successful response globally
json.dump({ self.cache.cache_response(method, url, params, response)
'status_code': response.status_code,
'content': response.text,
'headers': dict(response.headers)
}, f)
return response return response
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
@ -291,23 +390,21 @@ class BaseProvider(ABC):
print(f"Request failed (attempt {attempt + 1}): {error}") print(f"Request failed (attempt {attempt + 1}): {error}")
last_exception = e last_exception = e
# AGGRESSIVE: Immediately abort retries if stop requested # Immediately abort retries if stop requested
if self._is_stop_requested(): if self._is_stop_requested():
print(f"Stop requested - aborting retries for: {url}") print(f"Stop requested - aborting retries for: {url}")
break break
# Check if we should retry (but only if stop not requested) # Check if we should retry
if attempt < effective_max_retries and self._should_retry(e): if attempt < effective_max_retries and self._should_retry(e):
# Use a longer, more respectful backoff for 429 errors # Exponential backoff with jitter for 429 errors
if isinstance(e, requests.exceptions.HTTPError) and e.response and e.response.status_code == 429: if isinstance(e, requests.exceptions.HTTPError) and e.response and e.response.status_code == 429:
# Start with a 10-second backoff and increase exponentially backoff_time = min(60, 10 * (2 ** attempt))
backoff_time = 10 * (2 ** attempt)
print(f"Rate limit hit. Retrying in {backoff_time} seconds...") print(f"Rate limit hit. Retrying in {backoff_time} seconds...")
else: else:
backoff_time = min(1.0, (2 ** attempt) * 0.5) # Shorter backoff for other errors backoff_time = min(2.0, (2 ** attempt) * 0.5)
print(f"Retrying in {backoff_time} seconds...") print(f"Retrying in {backoff_time} seconds...")
# AGGRESSIVE: Much shorter backoff and more frequent checking
if not self._sleep_with_cancellation_check(backoff_time): if not self._sleep_with_cancellation_check(backoff_time):
print(f"Stop requested during backoff - aborting: {url}") print(f"Stop requested during backoff - aborting: {url}")
return None return None
@ -348,7 +445,6 @@ class BaseProvider(ABC):
return True return True
return False return False
def _wait_with_cancellation_check(self) -> bool: def _wait_with_cancellation_check(self) -> bool:
""" """
Wait for rate limiting while aggressively checking for cancellation. Wait for rate limiting while aggressively checking for cancellation.
@ -447,7 +543,7 @@ class BaseProvider(ABC):
def get_statistics(self) -> Dict[str, Any]: def get_statistics(self) -> Dict[str, Any]:
""" """
Get provider statistics. Get provider statistics including cache performance.
Returns: Returns:
Dictionary containing provider performance metrics Dictionary containing provider performance metrics
@ -459,5 +555,8 @@ class BaseProvider(ABC):
'failed_requests': self.failed_requests, 'failed_requests': self.failed_requests,
'success_rate': (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0, 'success_rate': (self.successful_requests / self.total_requests * 100) if self.total_requests > 0 else 0,
'relationships_found': self.total_relationships_found, 'relationships_found': self.total_relationships_found,
'rate_limit': self.rate_limiter.requests_per_minute 'rate_limit': self.rate_limiter.requests_per_minute,
'cache_hits': self.cache_hits,
'cache_misses': self.cache_misses,
'cache_hit_rate': (self.cache_hits / (self.cache_hits + self.cache_misses) * 100) if (self.cache_hits + self.cache_misses) > 0 else 0
} }