dnsrecon/core/session_manager.py
overcuriousity d3e1fcf35f it
2025-09-11 14:01:15 +02:00

281 lines
10 KiB
Python

"""
Session manager for DNSRecon multi-user support.
Manages individual scanner instances per user session with automatic cleanup.
"""
import threading
import time
import uuid
from typing import Dict, Optional, Any
from datetime import datetime, timezone
from core.scanner import Scanner
class SessionManager:
"""
Manages multiple scanner instances for concurrent user sessions.
Provides session isolation and automatic cleanup of inactive sessions.
"""
def __init__(self, session_timeout_minutes: int = 60):
"""
Initialize session manager.
Args:
session_timeout_minutes: Minutes of inactivity before session cleanup
"""
self.sessions: Dict[str, Dict[str, Any]] = {}
self.session_timeout = session_timeout_minutes * 60 # Convert to seconds
self.lock = threading.Lock()
# Start cleanup thread
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start()
print(f"SessionManager initialized with {session_timeout_minutes}min timeout")
def create_session(self) -> str:
"""
Create a new user session with dedicated scanner instance and configuration.
Enhanced with better debugging and race condition protection.
Returns:
Unique session ID
"""
session_id = str(uuid.uuid4())
print(f"=== CREATING SESSION {session_id} ===")
try:
# Create session-specific configuration
from core.session_config import create_session_config
session_config = create_session_config()
print(f"Created session config for {session_id}")
# Create scanner with session config
from core.scanner import Scanner
scanner_instance = Scanner(session_config=session_config)
print(f"Created scanner instance {id(scanner_instance)} for session {session_id}")
print(f"Initial scanner status: {scanner_instance.status}")
with self.lock:
self.sessions[session_id] = {
'scanner': scanner_instance,
'config': session_config,
'created_at': time.time(),
'last_activity': time.time(),
'user_agent': '',
'status': 'active'
}
print(f"Session {session_id} stored in session manager")
print(f"Total active sessions: {len([s for s in self.sessions.values() if s['status'] == 'active'])}")
print(f"=== SESSION {session_id} CREATED SUCCESSFULLY ===")
return session_id
except Exception as e:
print(f"ERROR: Failed to create session {session_id}: {e}")
raise
def get_session(self, session_id: str) -> Optional[object]:
"""
Get scanner instance for a session with enhanced debugging.
Args:
session_id: Session identifier
Returns:
Scanner instance or None if session doesn't exist
"""
if not session_id:
print("get_session called with empty session_id")
return None
with self.lock:
if session_id not in self.sessions:
print(f"Session {session_id} not found in session manager")
print(f"Available sessions: {list(self.sessions.keys())}")
return None
session_data = self.sessions[session_id]
# Check if session is still active
if session_data['status'] != 'active':
print(f"Session {session_id} is not active (status: {session_data['status']})")
return None
# Update last activity
session_data['last_activity'] = time.time()
scanner = session_data['scanner']
print(f"Retrieved scanner {id(scanner)} for session {session_id}")
print(f"Scanner status: {scanner.status}")
return scanner
def get_or_create_session(self, session_id: Optional[str] = None) -> tuple[str, Scanner]:
"""
Get existing session or create new one.
Args:
session_id: Optional existing session ID
Returns:
Tuple of (session_id, scanner_instance)
"""
if session_id and self.get_session(session_id):
return session_id, self.get_session(session_id)
else:
new_session_id = self.create_session()
return new_session_id, self.get_session(new_session_id)
def terminate_session(self, session_id: str) -> bool:
"""
Terminate a specific session and cleanup resources.
Args:
session_id: Session to terminate
Returns:
True if session was terminated successfully
"""
with self.lock:
if session_id not in self.sessions:
return False
session_data = self.sessions[session_id]
scanner = session_data['scanner']
# Stop any running scan
try:
if scanner.status == 'running':
scanner.stop_scan()
print(f"Stopped scan for session: {session_id}")
except Exception as e:
print(f"Error stopping scan for session {session_id}: {e}")
# Mark as terminated
session_data['status'] = 'terminated'
session_data['terminated_at'] = time.time()
# Remove from active sessions after a brief delay to allow cleanup
threading.Timer(5.0, lambda: self._remove_session(session_id)).start()
print(f"Terminated session: {session_id}")
return True
def _remove_session(self, session_id: str) -> None:
"""Remove session from memory."""
with self.lock:
if session_id in self.sessions:
del self.sessions[session_id]
print(f"Removed session from memory: {session_id}")
def get_session_info(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
Get session information without updating activity.
Args:
session_id: Session identifier
Returns:
Session information dictionary or None
"""
with self.lock:
if session_id not in self.sessions:
return None
session_data = self.sessions[session_id]
scanner = session_data['scanner']
return {
'session_id': session_id,
'created_at': datetime.fromtimestamp(session_data['created_at'], timezone.utc).isoformat(),
'last_activity': datetime.fromtimestamp(session_data['last_activity'], timezone.utc).isoformat(),
'status': session_data['status'],
'scan_status': scanner.status,
'current_target': scanner.current_target,
'uptime_seconds': time.time() - session_data['created_at']
}
def list_active_sessions(self) -> Dict[str, Dict[str, Any]]:
"""
List all active sessions with enhanced debugging info.
Returns:
Dictionary of session information
"""
active_sessions = {}
with self.lock:
for session_id, session_data in self.sessions.items():
if session_data['status'] == 'active':
scanner = session_data['scanner']
active_sessions[session_id] = {
'session_id': session_id,
'created_at': datetime.fromtimestamp(session_data['created_at'], timezone.utc).isoformat(),
'last_activity': datetime.fromtimestamp(session_data['last_activity'], timezone.utc).isoformat(),
'status': session_data['status'],
'scan_status': scanner.status,
'current_target': scanner.current_target,
'uptime_seconds': time.time() - session_data['created_at'],
'scanner_object_id': id(scanner)
}
return active_sessions
def _cleanup_loop(self) -> None:
"""Background thread to cleanup inactive sessions."""
while True:
try:
current_time = time.time()
sessions_to_cleanup = []
with self.lock:
for session_id, session_data in self.sessions.items():
if session_data['status'] != 'active':
continue
inactive_time = current_time - session_data['last_activity']
if inactive_time > self.session_timeout:
sessions_to_cleanup.append(session_id)
# Cleanup outside of lock to avoid deadlock
for session_id in sessions_to_cleanup:
print(f"Cleaning up inactive session: {session_id}")
self.terminate_session(session_id)
# Sleep for 5 minutes between cleanup cycles
time.sleep(300)
except Exception as e:
print(f"Error in session cleanup loop: {e}")
time.sleep(60) # Sleep for 1 minute on error
def get_statistics(self) -> Dict[str, Any]:
"""
Get session manager statistics.
Returns:
Statistics dictionary
"""
with self.lock:
active_count = sum(1 for s in self.sessions.values() if s['status'] == 'active')
running_scans = sum(1 for s in self.sessions.values()
if s['status'] == 'active' and s['scanner'].status == 'running')
return {
'total_sessions': len(self.sessions),
'active_sessions': active_count,
'running_scans': running_scans,
'session_timeout_minutes': self.session_timeout / 60
}
# Global session manager instance
session_manager = SessionManager(session_timeout_minutes=60)