dnsrecon/core/logger.py
2025-09-12 14:23:33 +02:00

257 lines
10 KiB
Python

# dnsrecon/core/logger.py
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
from datetime import timezone
@dataclass
class APIRequest:
"""Structured representation of an API request for forensic logging."""
timestamp: str
provider: str
url: str
method: str
status_code: Optional[int]
response_size: Optional[int]
duration_ms: Optional[float]
error: Optional[str]
target_indicator: str
discovery_context: Optional[str]
@dataclass
class RelationshipDiscovery:
"""Structured representation of a discovered relationship."""
timestamp: str
source_node: str
target_node: str
relationship_type: str
confidence_score: float
provider: str
raw_data: Dict[str, Any]
discovery_method: str
class ForensicLogger:
"""
Thread-safe forensic logging system for DNSRecon.
Maintains detailed audit trail of all reconnaissance activities.
"""
def __init__(self, session_id: str = None):
"""
Initialize forensic logger.
Args:
session_id: Unique identifier for this reconnaissance session
"""
self.session_id = session_id or self._generate_session_id()
# Initialize audit trail storage
self.api_requests: List[APIRequest] = []
self.relationships: List[RelationshipDiscovery] = []
self.session_metadata = {
'session_id': self.session_id,
'start_time': datetime.now(timezone.utc).isoformat(),
'end_time': None,
'total_requests': 0,
'total_relationships': 0,
'providers_used': set(),
'target_domains': set()
}
# Configure standard logger
self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
self.logger.setLevel(logging.INFO)
# Create formatter for structured logging
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Add console handler if not already present
if not self.logger.handlers:
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def __getstate__(self):
state = self.__dict__.copy()
# Exclude the unpickleable 'logger' attribute
if 'logger' in state:
del state['logger']
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Re-initialize the 'logger' attribute
self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
self.logger.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
if not self.logger.handlers:
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def _generate_session_id(self) -> str:
"""Generate unique session identifier."""
return f"dnsrecon_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
def log_api_request(self, provider: str, url: str, method: str = "GET",
status_code: Optional[int] = None,
response_size: Optional[int] = None,
duration_ms: Optional[float] = None,
error: Optional[str] = None,
target_indicator: str = "",
discovery_context: Optional[str] = None) -> None:
"""
Log an API request for forensic audit trail.
Args:
provider: Name of the data provider
url: Request URL
method: HTTP method
status_code: HTTP response status code
response_size: Size of response in bytes
duration_ms: Request duration in milliseconds
error: Error message if request failed
target_indicator: The indicator being investigated
discovery_context: Context of how this indicator was discovered
"""
api_request = APIRequest(
timestamp=datetime.now(timezone.utc).isoformat(),
provider=provider,
url=url,
method=method,
status_code=status_code,
response_size=response_size,
duration_ms=duration_ms,
error=error,
target_indicator=target_indicator,
discovery_context=discovery_context
)
self.api_requests.append(api_request)
self.session_metadata['total_requests'] += 1
self.session_metadata['providers_used'].add(provider)
if target_indicator:
self.session_metadata['target_domains'].add(target_indicator)
# Log to standard logger
if error:
self.logger.error(f"API Request Failed - {provider}: {url} - {error}")
else:
self.logger.info(f"API Request - {provider}: {url} - Status: {status_code}")
def log_relationship_discovery(self, source_node: str, target_node: str,
relationship_type: str, confidence_score: float,
provider: str, raw_data: Dict[str, Any],
discovery_method: str) -> None:
"""
Log discovery of a new relationship between indicators.
Args:
source_node: Source node identifier
target_node: Target node identifier
relationship_type: Type of relationship (e.g., 'SAN', 'A_Record')
confidence_score: Confidence score (0.0 to 1.0)
provider: Provider that discovered this relationship
raw_data: Raw data from provider response
discovery_method: Method used to discover relationship
"""
relationship = RelationshipDiscovery(
timestamp=datetime.now(timezone.utc).isoformat(),
source_node=source_node,
target_node=target_node,
relationship_type=relationship_type,
confidence_score=confidence_score,
provider=provider,
raw_data=raw_data,
discovery_method=discovery_method
)
self.relationships.append(relationship)
self.session_metadata['total_relationships'] += 1
self.logger.info(
f"Relationship Discovered - {source_node} -> {target_node} "
f"({relationship_type}) - Confidence: {confidence_score:.2f} - Provider: {provider}"
)
def log_scan_start(self, target_domain: str, recursion_depth: int,
enabled_providers: List[str]) -> None:
"""Log the start of a reconnaissance scan."""
self.logger.info(f"Scan Started - Target: {target_domain}, Depth: {recursion_depth}")
self.logger.info(f"Enabled Providers: {', '.join(enabled_providers)}")
self.session_metadata['target_domains'].add(target_domain)
def log_scan_complete(self) -> None:
"""Log the completion of a reconnaissance scan."""
self.session_metadata['end_time'] = datetime.now(timezone.utc).isoformat()
self.session_metadata['providers_used'] = list(self.session_metadata['providers_used'])
self.session_metadata['target_domains'] = list(self.session_metadata['target_domains'])
self.logger.info(f"Scan Complete - Session: {self.session_id}")
self.logger.info(f"Total API Requests: {self.session_metadata['total_requests']}")
self.logger.info(f"Total Relationships: {self.session_metadata['total_relationships']}")
def export_audit_trail(self) -> Dict[str, Any]:
"""
Export complete audit trail for forensic analysis.
Returns:
Dictionary containing complete session audit trail
"""
return {
'session_metadata': self.session_metadata.copy(),
'api_requests': [asdict(req) for req in self.api_requests],
'relationships': [asdict(rel) for rel in self.relationships],
'export_timestamp': datetime.now(timezone.utc).isoformat()
}
def get_forensic_summary(self) -> Dict[str, Any]:
"""
Get summary statistics for forensic reporting.
Returns:
Dictionary containing summary statistics
"""
provider_stats = {}
for provider in self.session_metadata['providers_used']:
provider_requests = [req for req in self.api_requests if req.provider == provider]
provider_relationships = [rel for rel in self.relationships if rel.provider == provider]
provider_stats[provider] = {
'total_requests': len(provider_requests),
'successful_requests': len([req for req in provider_requests if req.error is None]),
'failed_requests': len([req for req in provider_requests if req.error is not None]),
'relationships_discovered': len(provider_relationships),
'avg_confidence': sum(rel.confidence_score for rel in provider_relationships) / len(provider_relationships) if provider_relationships else 0
}
return {
'session_id': self.session_id,
'duration_minutes': self._calculate_session_duration(),
'total_requests': self.session_metadata['total_requests'],
'total_relationships': self.session_metadata['total_relationships'],
'unique_indicators': len(set([rel.source_node for rel in self.relationships] + [rel.target_node for rel in self.relationships])),
'provider_statistics': provider_stats
}
def _calculate_session_duration(self) -> float:
"""Calculate session duration in minutes."""
if not self.session_metadata['end_time']:
end_time = datetime.now(timezone.utc)
else:
end_time = datetime.fromisoformat(self.session_metadata['end_time'])
start_time = datetime.fromisoformat(self.session_metadata['start_time'])
duration = (end_time - start_time).total_seconds() / 60
return round(duration, 2)