# dnsrecon/core/logger.py import logging import threading from datetime import datetime from typing import Dict, Any, Optional, List from dataclasses import dataclass, asdict from datetime import timezone @dataclass class APIRequest: """Structured representation of an API request for forensic logging.""" timestamp: str provider: str url: str method: str status_code: Optional[int] response_size: Optional[int] duration_ms: Optional[float] error: Optional[str] target_indicator: str discovery_context: Optional[str] @dataclass class RelationshipDiscovery: """Structured representation of a discovered relationship.""" timestamp: str source_node: str target_node: str relationship_type: str confidence_score: float provider: str raw_data: Dict[str, Any] discovery_method: str class ForensicLogger: """ Thread-safe forensic logging system for DNSRecon. Maintains detailed audit trail of all reconnaissance activities. """ def __init__(self, session_id: str = ""): """ Initialize forensic logger. Args: session_id: Unique identifier for this reconnaissance session """ self.session_id = session_id or self._generate_session_id() self.lock = threading.Lock() # Initialize audit trail storage self.api_requests: List[APIRequest] = [] self.relationships: List[RelationshipDiscovery] = [] self.session_metadata = { 'session_id': self.session_id, 'start_time': datetime.now(timezone.utc).isoformat(), 'end_time': None, 'total_requests': 0, 'total_relationships': 0, 'providers_used': set(), 'target_domains': set() } # Configure standard logger self.logger = logging.getLogger(f'dnsrecon.{self.session_id}') self.logger.setLevel(logging.INFO) # Create formatter for structured logging formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Add console handler if not already present if not self.logger.handlers: console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) self.logger.addHandler(console_handler) def __getstate__(self): """Prepare ForensicLogger for pickling by excluding unpicklable objects.""" state = self.__dict__.copy() # Remove the unpickleable 'logger' attribute if 'logger' in state: del state['logger'] if 'lock' in state: del state['lock'] return state def __setstate__(self, state): """Restore ForensicLogger after unpickling by reconstructing logger.""" self.__dict__.update(state) # Re-initialize the 'logger' attribute self.logger = logging.getLogger(f'dnsrecon.{self.session_id}') self.logger.setLevel(logging.INFO) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) if not self.logger.handlers: console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) self.logger.addHandler(console_handler) self.lock = threading.Lock() def _generate_session_id(self) -> str: """Generate unique session identifier.""" return f"dnsrecon_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}" def log_api_request(self, provider: str, url: str, method: str = "GET", status_code: Optional[int] = None, response_size: Optional[int] = None, duration_ms: Optional[float] = None, error: Optional[str] = None, target_indicator: str = "", discovery_context: Optional[str] = None) -> None: """ Log an API request for forensic audit trail. Args: provider: Name of the data provider url: Request URL method: HTTP method status_code: HTTP response status code response_size: Size of response in bytes duration_ms: Request duration in milliseconds error: Error message if request failed target_indicator: The indicator being investigated discovery_context: Context of how this indicator was discovered """ api_request = APIRequest( timestamp=datetime.now(timezone.utc).isoformat(), provider=provider, url=url, method=method, status_code=status_code, response_size=response_size, duration_ms=duration_ms, error=error, target_indicator=target_indicator, discovery_context=discovery_context ) self.api_requests.append(api_request) self.session_metadata['total_requests'] += 1 self.session_metadata['providers_used'].add(provider) if target_indicator: self.session_metadata['target_domains'].add(target_indicator) # Log to standard logger if error: self.logger.error(f"API Request Failed - {provider}: {url} - {error}") else: self.logger.info(f"API Request - {provider}: {url} - Status: {status_code}") def log_relationship_discovery(self, source_node: str, target_node: str, relationship_type: str, confidence_score: float, provider: str, raw_data: Dict[str, Any], discovery_method: str) -> None: """ Log discovery of a new relationship between indicators. Args: source_node: Source node identifier target_node: Target node identifier relationship_type: Type of relationship (e.g., 'SAN', 'A_Record') confidence_score: Confidence score (0.0 to 1.0) provider: Provider that discovered this relationship raw_data: Raw data from provider response discovery_method: Method used to discover relationship """ relationship = RelationshipDiscovery( timestamp=datetime.now(timezone.utc).isoformat(), source_node=source_node, target_node=target_node, relationship_type=relationship_type, confidence_score=confidence_score, provider=provider, raw_data=raw_data, discovery_method=discovery_method ) self.relationships.append(relationship) self.session_metadata['total_relationships'] += 1 self.logger.info( f"Relationship Discovered - {source_node} -> {target_node} " f"({relationship_type}) - Confidence: {confidence_score:.2f} - Provider: {provider}" ) def log_scan_start(self, target_domain: str, recursion_depth: int, enabled_providers: List[str]) -> None: """Log the start of a reconnaissance scan.""" self.logger.info(f"Scan Started - Target: {target_domain}, Depth: {recursion_depth}") self.logger.info(f"Enabled Providers: {', '.join(enabled_providers)}") self.session_metadata['target_domains'].add(target_domain) def log_scan_complete(self) -> None: """Log the completion of a reconnaissance scan.""" self.session_metadata['end_time'] = datetime.now(timezone.utc).isoformat() self.session_metadata['providers_used'] = list(self.session_metadata['providers_used']) self.session_metadata['target_domains'] = list(self.session_metadata['target_domains']) self.logger.info(f"Scan Complete - Session: {self.session_id}") def export_audit_trail(self) -> Dict[str, Any]: """ Export complete audit trail for forensic analysis. Returns: Dictionary containing complete session audit trail """ return { 'session_metadata': self.session_metadata.copy(), 'api_requests': [asdict(req) for req in self.api_requests], 'relationships': [asdict(rel) for rel in self.relationships], 'export_timestamp': datetime.now(timezone.utc).isoformat() } def get_forensic_summary(self) -> Dict[str, Any]: """ Get summary statistics for forensic reporting. Returns: Dictionary containing summary statistics """ provider_stats = {} for provider in self.session_metadata['providers_used']: provider_requests = [req for req in self.api_requests if req.provider == provider] provider_relationships = [rel for rel in self.relationships if rel.provider == provider] provider_stats[provider] = { 'total_requests': len(provider_requests), 'successful_requests': len([req for req in provider_requests if req.error is None]), 'failed_requests': len([req for req in provider_requests if req.error is not None]), 'relationships_discovered': len(provider_relationships), 'avg_confidence': sum(rel.confidence_score for rel in provider_relationships) / len(provider_relationships) if provider_relationships else 0 } return { 'session_id': self.session_id, 'duration_minutes': self._calculate_session_duration(), 'total_requests': self.session_metadata['total_requests'], 'total_relationships': self.session_metadata['total_relationships'], 'unique_indicators': len(set([rel.source_node for rel in self.relationships] + [rel.target_node for rel in self.relationships])), 'provider_statistics': provider_stats } def _calculate_session_duration(self) -> float: """Calculate session duration in minutes.""" if not self.session_metadata['end_time']: end_time = datetime.now(timezone.utc) else: end_time = datetime.fromisoformat(self.session_metadata['end_time']) start_time = datetime.fromisoformat(self.session_metadata['start_time']) duration = (end_time - start_time).total_seconds() / 60 return round(duration, 2) # Global logger instance for the current session _current_logger: Optional[ForensicLogger] = None _logger_lock = threading.Lock() def get_forensic_logger() -> ForensicLogger: """Get or create the current forensic logger instance.""" global _current_logger with _logger_lock: if _current_logger is None: _current_logger = ForensicLogger() return _current_logger def new_session() -> ForensicLogger: """Start a new forensic logging session.""" global _current_logger with _logger_lock: _current_logger = ForensicLogger() return _current_logger