284 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			284 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# dnsrecon/core/logger.py
 | 
						|
 | 
						|
import logging
 | 
						|
import threading
 | 
						|
from datetime import datetime
 | 
						|
from typing import Dict, Any, Optional, List
 | 
						|
from dataclasses import dataclass, asdict
 | 
						|
from datetime import timezone
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class APIRequest:
 | 
						|
    """Structured representation of an API request for forensic logging."""
 | 
						|
    timestamp: str
 | 
						|
    provider: str
 | 
						|
    url: str
 | 
						|
    method: str
 | 
						|
    status_code: Optional[int]
 | 
						|
    response_size: Optional[int]
 | 
						|
    duration_ms: Optional[float]
 | 
						|
    error: Optional[str]
 | 
						|
    target_indicator: str
 | 
						|
    discovery_context: Optional[str]
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class RelationshipDiscovery:
 | 
						|
    """Structured representation of a discovered relationship."""
 | 
						|
    timestamp: str
 | 
						|
    source_node: str
 | 
						|
    target_node: str
 | 
						|
    relationship_type: str
 | 
						|
    confidence_score: float
 | 
						|
    provider: str
 | 
						|
    raw_data: Dict[str, Any]
 | 
						|
    discovery_method: str
 | 
						|
 | 
						|
 | 
						|
class ForensicLogger:
 | 
						|
    """
 | 
						|
    Thread-safe forensic logging system for DNSRecon.
 | 
						|
    Maintains detailed audit trail of all reconnaissance activities.
 | 
						|
    """
 | 
						|
    
 | 
						|
    def __init__(self, session_id: str = ""):
 | 
						|
        """
 | 
						|
        Initialize forensic logger.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            session_id: Unique identifier for this reconnaissance session
 | 
						|
        """
 | 
						|
        self.session_id = session_id or self._generate_session_id()
 | 
						|
        self.lock = threading.Lock()
 | 
						|
        
 | 
						|
        # Initialize audit trail storage
 | 
						|
        self.api_requests: List[APIRequest] = []
 | 
						|
        self.relationships: List[RelationshipDiscovery] = []
 | 
						|
        self.session_metadata = {
 | 
						|
            'session_id': self.session_id,
 | 
						|
            'start_time': datetime.now(timezone.utc).isoformat(),
 | 
						|
            'end_time': None,
 | 
						|
            'total_requests': 0,
 | 
						|
            'total_relationships': 0,
 | 
						|
            'providers_used': set(),
 | 
						|
            'target_domains': set()
 | 
						|
        }
 | 
						|
        
 | 
						|
        # Configure standard logger
 | 
						|
        self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
 | 
						|
        self.logger.setLevel(logging.INFO)
 | 
						|
        
 | 
						|
        # Create formatter for structured logging
 | 
						|
        formatter = logging.Formatter(
 | 
						|
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 | 
						|
        )
 | 
						|
        
 | 
						|
        # Add console handler if not already present
 | 
						|
        if not self.logger.handlers:
 | 
						|
            console_handler = logging.StreamHandler()
 | 
						|
            console_handler.setFormatter(formatter)
 | 
						|
            self.logger.addHandler(console_handler)
 | 
						|
 | 
						|
    def __getstate__(self):
 | 
						|
        """Prepare ForensicLogger for pickling by excluding unpicklable objects."""
 | 
						|
        state = self.__dict__.copy()
 | 
						|
        # Remove the unpickleable 'logger' attribute
 | 
						|
        if 'logger' in state:
 | 
						|
            del state['logger']
 | 
						|
        if 'lock' in state:
 | 
						|
            del state['lock']
 | 
						|
        return state
 | 
						|
 | 
						|
    def __setstate__(self, state):
 | 
						|
        """Restore ForensicLogger after unpickling by reconstructing logger."""
 | 
						|
        self.__dict__.update(state)
 | 
						|
        # Re-initialize the 'logger' attribute
 | 
						|
        self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
 | 
						|
        self.logger.setLevel(logging.INFO)
 | 
						|
        formatter = logging.Formatter(
 | 
						|
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
 | 
						|
        )
 | 
						|
        if not self.logger.handlers:
 | 
						|
            console_handler = logging.StreamHandler()
 | 
						|
            console_handler.setFormatter(formatter)
 | 
						|
            self.logger.addHandler(console_handler)
 | 
						|
        self.lock = threading.Lock()
 | 
						|
 | 
						|
    def _generate_session_id(self) -> str:
 | 
						|
        """Generate unique session identifier."""
 | 
						|
        return f"dnsrecon_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
 | 
						|
    
 | 
						|
    def log_api_request(self, provider: str, url: str, method: str = "GET", 
 | 
						|
                       status_code: Optional[int] = None, 
 | 
						|
                       response_size: Optional[int] = None,
 | 
						|
                       duration_ms: Optional[float] = None,
 | 
						|
                       error: Optional[str] = None,
 | 
						|
                       target_indicator: str = "",
 | 
						|
                       discovery_context: Optional[str] = None) -> None:
 | 
						|
        """
 | 
						|
        Log an API request for forensic audit trail.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            provider: Name of the data provider
 | 
						|
            url: Request URL
 | 
						|
            method: HTTP method
 | 
						|
            status_code: HTTP response status code
 | 
						|
            response_size: Size of response in bytes
 | 
						|
            duration_ms: Request duration in milliseconds
 | 
						|
            error: Error message if request failed
 | 
						|
            target_indicator: The indicator being investigated
 | 
						|
            discovery_context: Context of how this indicator was discovered
 | 
						|
        """
 | 
						|
        api_request = APIRequest(
 | 
						|
            timestamp=datetime.now(timezone.utc).isoformat(),
 | 
						|
            provider=provider,
 | 
						|
            url=url,
 | 
						|
            method=method,
 | 
						|
            status_code=status_code,
 | 
						|
            response_size=response_size,
 | 
						|
            duration_ms=duration_ms,
 | 
						|
            error=error,
 | 
						|
            target_indicator=target_indicator,
 | 
						|
            discovery_context=discovery_context
 | 
						|
        )
 | 
						|
        
 | 
						|
        self.api_requests.append(api_request)
 | 
						|
        self.session_metadata['total_requests'] += 1
 | 
						|
        self.session_metadata['providers_used'].add(provider)
 | 
						|
        
 | 
						|
        if target_indicator:
 | 
						|
            self.session_metadata['target_domains'].add(target_indicator)
 | 
						|
        
 | 
						|
        # Log to standard logger
 | 
						|
        if error:
 | 
						|
            self.logger.error(f"API Request Failed - {provider}: {url} - {error}")
 | 
						|
        else:
 | 
						|
            self.logger.info(f"API Request - {provider}: {url} - Status: {status_code}")
 | 
						|
    
 | 
						|
    def log_relationship_discovery(self, source_node: str, target_node: str,
 | 
						|
                                 relationship_type: str, confidence_score: float,
 | 
						|
                                 provider: str, raw_data: Dict[str, Any],
 | 
						|
                                 discovery_method: str) -> None:
 | 
						|
        """
 | 
						|
        Log discovery of a new relationship between indicators.
 | 
						|
        
 | 
						|
        Args:
 | 
						|
            source_node: Source node identifier
 | 
						|
            target_node: Target node identifier
 | 
						|
            relationship_type: Type of relationship (e.g., 'SAN', 'A_Record')
 | 
						|
            confidence_score: Confidence score (0.0 to 1.0)
 | 
						|
            provider: Provider that discovered this relationship
 | 
						|
            raw_data: Raw data from provider response
 | 
						|
            discovery_method: Method used to discover relationship
 | 
						|
        """
 | 
						|
        relationship = RelationshipDiscovery(
 | 
						|
            timestamp=datetime.now(timezone.utc).isoformat(),
 | 
						|
            source_node=source_node,
 | 
						|
            target_node=target_node,
 | 
						|
            relationship_type=relationship_type,
 | 
						|
            confidence_score=confidence_score,
 | 
						|
            provider=provider,
 | 
						|
            raw_data=raw_data,
 | 
						|
            discovery_method=discovery_method
 | 
						|
        )
 | 
						|
        
 | 
						|
        self.relationships.append(relationship)
 | 
						|
        self.session_metadata['total_relationships'] += 1
 | 
						|
        
 | 
						|
        self.logger.info(
 | 
						|
            f"Relationship Discovered - {source_node} -> {target_node} "
 | 
						|
            f"({relationship_type}) - Confidence: {confidence_score:.2f} - Provider: {provider}"
 | 
						|
        )
 | 
						|
    
 | 
						|
    def log_scan_start(self, target_domain: str, recursion_depth: int, 
 | 
						|
                      enabled_providers: List[str]) -> None:
 | 
						|
        """Log the start of a reconnaissance scan."""
 | 
						|
        self.logger.info(f"Scan Started - Target: {target_domain}, Depth: {recursion_depth}")
 | 
						|
        self.logger.info(f"Enabled Providers: {', '.join(enabled_providers)}")
 | 
						|
        
 | 
						|
        self.session_metadata['target_domains'].add(target_domain)
 | 
						|
    
 | 
						|
    def log_scan_complete(self) -> None:
 | 
						|
        """Log the completion of a reconnaissance scan."""
 | 
						|
        self.session_metadata['end_time'] = datetime.now(timezone.utc).isoformat()
 | 
						|
        self.session_metadata['providers_used'] = list(self.session_metadata['providers_used'])
 | 
						|
        self.session_metadata['target_domains'] = list(self.session_metadata['target_domains'])
 | 
						|
        
 | 
						|
        self.logger.info(f"Scan Complete - Session: {self.session_id}")
 | 
						|
    
 | 
						|
    def export_audit_trail(self) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Export complete audit trail for forensic analysis.
 | 
						|
        
 | 
						|
        Returns:
 | 
						|
            Dictionary containing complete session audit trail
 | 
						|
        """
 | 
						|
        return {
 | 
						|
            'session_metadata': self.session_metadata.copy(),
 | 
						|
            'api_requests': [asdict(req) for req in self.api_requests],
 | 
						|
            'relationships': [asdict(rel) for rel in self.relationships],
 | 
						|
            'export_timestamp': datetime.now(timezone.utc).isoformat()
 | 
						|
        }
 | 
						|
    
 | 
						|
    def get_forensic_summary(self) -> Dict[str, Any]:
 | 
						|
        """
 | 
						|
        Get summary statistics for forensic reporting.
 | 
						|
        
 | 
						|
        Returns:
 | 
						|
            Dictionary containing summary statistics
 | 
						|
        """
 | 
						|
        provider_stats = {}
 | 
						|
        for provider in self.session_metadata['providers_used']:
 | 
						|
            provider_requests = [req for req in self.api_requests if req.provider == provider]
 | 
						|
            provider_relationships = [rel for rel in self.relationships if rel.provider == provider]
 | 
						|
            
 | 
						|
            provider_stats[provider] = {
 | 
						|
                'total_requests': len(provider_requests),
 | 
						|
                'successful_requests': len([req for req in provider_requests if req.error is None]),
 | 
						|
                'failed_requests': len([req for req in provider_requests if req.error is not None]),
 | 
						|
                'relationships_discovered': len(provider_relationships),
 | 
						|
                'avg_confidence': sum(rel.confidence_score for rel in provider_relationships) / len(provider_relationships) if provider_relationships else 0
 | 
						|
            }
 | 
						|
        
 | 
						|
        return {
 | 
						|
            'session_id': self.session_id,
 | 
						|
            'duration_minutes': self._calculate_session_duration(),
 | 
						|
            'total_requests': self.session_metadata['total_requests'],
 | 
						|
            'total_relationships': self.session_metadata['total_relationships'],
 | 
						|
            'unique_indicators': len(set([rel.source_node for rel in self.relationships] + [rel.target_node for rel in self.relationships])),
 | 
						|
            'provider_statistics': provider_stats
 | 
						|
        }
 | 
						|
    
 | 
						|
    def _calculate_session_duration(self) -> float:
 | 
						|
        """Calculate session duration in minutes."""
 | 
						|
        if not self.session_metadata['end_time']:
 | 
						|
            end_time = datetime.now(timezone.utc)
 | 
						|
        else:
 | 
						|
            end_time = datetime.fromisoformat(self.session_metadata['end_time'])
 | 
						|
        
 | 
						|
        start_time = datetime.fromisoformat(self.session_metadata['start_time'])
 | 
						|
        duration = (end_time - start_time).total_seconds() / 60
 | 
						|
        return round(duration, 2)
 | 
						|
 | 
						|
 | 
						|
# Global logger instance for the current session
 | 
						|
_current_logger: Optional[ForensicLogger] = None
 | 
						|
_logger_lock = threading.Lock()
 | 
						|
 | 
						|
 | 
						|
def get_forensic_logger() -> ForensicLogger:
 | 
						|
    """Get or create the current forensic logger instance."""
 | 
						|
    global _current_logger
 | 
						|
    with _logger_lock:
 | 
						|
        if _current_logger is None:
 | 
						|
            _current_logger = ForensicLogger()
 | 
						|
        return _current_logger
 | 
						|
 | 
						|
 | 
						|
def new_session() -> ForensicLogger:
 | 
						|
    """Start a new forensic logging session."""
 | 
						|
    global _current_logger
 | 
						|
    with _logger_lock:
 | 
						|
        _current_logger = ForensicLogger()
 | 
						|
        return _current_logger |