283 lines
11 KiB
Python
283 lines
11 KiB
Python
# dnsrecon/core/logger.py
|
|
|
|
import logging
|
|
import threading
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import timezone
|
|
|
|
|
|
@dataclass
|
|
class APIRequest:
|
|
"""Structured representation of an API request for forensic logging."""
|
|
timestamp: str
|
|
provider: str
|
|
url: str
|
|
method: str
|
|
status_code: Optional[int]
|
|
response_size: Optional[int]
|
|
duration_ms: Optional[float]
|
|
error: Optional[str]
|
|
target_indicator: str
|
|
discovery_context: Optional[str]
|
|
|
|
|
|
@dataclass
|
|
class RelationshipDiscovery:
|
|
"""Structured representation of a discovered relationship."""
|
|
timestamp: str
|
|
source_node: str
|
|
target_node: str
|
|
relationship_type: str
|
|
confidence_score: float
|
|
provider: str
|
|
raw_data: Dict[str, Any]
|
|
discovery_method: str
|
|
|
|
|
|
class ForensicLogger:
|
|
"""
|
|
Thread-safe forensic logging system for DNSRecon.
|
|
Maintains detailed audit trail of all reconnaissance activities.
|
|
"""
|
|
|
|
def __init__(self, session_id: str = ""):
|
|
"""
|
|
Initialize forensic logger.
|
|
|
|
Args:
|
|
session_id: Unique identifier for this reconnaissance session
|
|
"""
|
|
self.session_id = session_id or self._generate_session_id()
|
|
#self.lock = threading.Lock()
|
|
|
|
# Initialize audit trail storage
|
|
self.api_requests: List[APIRequest] = []
|
|
self.relationships: List[RelationshipDiscovery] = []
|
|
self.session_metadata = {
|
|
'session_id': self.session_id,
|
|
'start_time': datetime.now(timezone.utc).isoformat(),
|
|
'end_time': None,
|
|
'total_requests': 0,
|
|
'total_relationships': 0,
|
|
'providers_used': set(),
|
|
'target_domains': set()
|
|
}
|
|
|
|
# Configure standard logger
|
|
self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
|
|
self.logger.setLevel(logging.INFO)
|
|
|
|
# Create formatter for structured logging
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Add console handler if not already present
|
|
if not self.logger.handlers:
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
self.logger.addHandler(console_handler)
|
|
|
|
def __getstate__(self):
|
|
"""Prepare ForensicLogger for pickling by excluding unpicklable objects."""
|
|
state = self.__dict__.copy()
|
|
# Remove the unpickleable 'logger' attribute
|
|
if 'logger' in state:
|
|
del state['logger']
|
|
return state
|
|
|
|
def __setstate__(self, state):
|
|
"""Restore ForensicLogger after unpickling by reconstructing logger."""
|
|
self.__dict__.update(state)
|
|
# Re-initialize the 'logger' attribute
|
|
self.logger = logging.getLogger(f'dnsrecon.{self.session_id}')
|
|
self.logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
if not self.logger.handlers:
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
self.logger.addHandler(console_handler)
|
|
|
|
def _generate_session_id(self) -> str:
|
|
"""Generate unique session identifier."""
|
|
return f"dnsrecon_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
def log_api_request(self, provider: str, url: str, method: str = "GET",
|
|
status_code: Optional[int] = None,
|
|
response_size: Optional[int] = None,
|
|
duration_ms: Optional[float] = None,
|
|
error: Optional[str] = None,
|
|
target_indicator: str = "",
|
|
discovery_context: Optional[str] = None) -> None:
|
|
"""
|
|
Log an API request for forensic audit trail.
|
|
|
|
Args:
|
|
provider: Name of the data provider
|
|
url: Request URL
|
|
method: HTTP method
|
|
status_code: HTTP response status code
|
|
response_size: Size of response in bytes
|
|
duration_ms: Request duration in milliseconds
|
|
error: Error message if request failed
|
|
target_indicator: The indicator being investigated
|
|
discovery_context: Context of how this indicator was discovered
|
|
"""
|
|
api_request = APIRequest(
|
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
|
provider=provider,
|
|
url=url,
|
|
method=method,
|
|
status_code=status_code,
|
|
response_size=response_size,
|
|
duration_ms=duration_ms,
|
|
error=error,
|
|
target_indicator=target_indicator,
|
|
discovery_context=discovery_context
|
|
)
|
|
|
|
self.api_requests.append(api_request)
|
|
self.session_metadata['total_requests'] += 1
|
|
self.session_metadata['providers_used'].add(provider)
|
|
|
|
if target_indicator:
|
|
self.session_metadata['target_domains'].add(target_indicator)
|
|
|
|
# Log to standard logger
|
|
if error:
|
|
self.logger.error(f"API Request Failed - {provider}: {url} - {error}")
|
|
else:
|
|
self.logger.info(f"API Request - {provider}: {url} - Status: {status_code}")
|
|
|
|
def log_relationship_discovery(self, source_node: str, target_node: str,
|
|
relationship_type: str, confidence_score: float,
|
|
provider: str, raw_data: Dict[str, Any],
|
|
discovery_method: str) -> None:
|
|
"""
|
|
Log discovery of a new relationship between indicators.
|
|
|
|
Args:
|
|
source_node: Source node identifier
|
|
target_node: Target node identifier
|
|
relationship_type: Type of relationship (e.g., 'SAN', 'A_Record')
|
|
confidence_score: Confidence score (0.0 to 1.0)
|
|
provider: Provider that discovered this relationship
|
|
raw_data: Raw data from provider response
|
|
discovery_method: Method used to discover relationship
|
|
"""
|
|
relationship = RelationshipDiscovery(
|
|
timestamp=datetime.now(timezone.utc).isoformat(),
|
|
source_node=source_node,
|
|
target_node=target_node,
|
|
relationship_type=relationship_type,
|
|
confidence_score=confidence_score,
|
|
provider=provider,
|
|
raw_data=raw_data,
|
|
discovery_method=discovery_method
|
|
)
|
|
|
|
self.relationships.append(relationship)
|
|
self.session_metadata['total_relationships'] += 1
|
|
|
|
self.logger.info(
|
|
f"Relationship Discovered - {source_node} -> {target_node} "
|
|
f"({relationship_type}) - Confidence: {confidence_score:.2f} - Provider: {provider}"
|
|
)
|
|
|
|
def log_scan_start(self, target_domain: str, recursion_depth: int,
|
|
enabled_providers: List[str]) -> None:
|
|
"""Log the start of a reconnaissance scan."""
|
|
self.logger.info(f"Scan Started - Target: {target_domain}, Depth: {recursion_depth}")
|
|
self.logger.info(f"Enabled Providers: {', '.join(enabled_providers)}")
|
|
|
|
self.session_metadata['target_domains'].add(target_domain)
|
|
|
|
def log_scan_complete(self) -> None:
|
|
"""Log the completion of a reconnaissance scan."""
|
|
self.session_metadata['end_time'] = datetime.now(timezone.utc).isoformat()
|
|
self.session_metadata['providers_used'] = list(self.session_metadata['providers_used'])
|
|
self.session_metadata['target_domains'] = list(self.session_metadata['target_domains'])
|
|
|
|
self.logger.info(f"Scan Complete - Session: {self.session_id}")
|
|
self.logger.info(f"Total API Requests: {self.session_metadata['total_requests']}")
|
|
self.logger.info(f"Total Relationships: {self.session_metadata['total_relationships']}")
|
|
|
|
def export_audit_trail(self) -> Dict[str, Any]:
|
|
"""
|
|
Export complete audit trail for forensic analysis.
|
|
|
|
Returns:
|
|
Dictionary containing complete session audit trail
|
|
"""
|
|
return {
|
|
'session_metadata': self.session_metadata.copy(),
|
|
'api_requests': [asdict(req) for req in self.api_requests],
|
|
'relationships': [asdict(rel) for rel in self.relationships],
|
|
'export_timestamp': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
def get_forensic_summary(self) -> Dict[str, Any]:
|
|
"""
|
|
Get summary statistics for forensic reporting.
|
|
|
|
Returns:
|
|
Dictionary containing summary statistics
|
|
"""
|
|
provider_stats = {}
|
|
for provider in self.session_metadata['providers_used']:
|
|
provider_requests = [req for req in self.api_requests if req.provider == provider]
|
|
provider_relationships = [rel for rel in self.relationships if rel.provider == provider]
|
|
|
|
provider_stats[provider] = {
|
|
'total_requests': len(provider_requests),
|
|
'successful_requests': len([req for req in provider_requests if req.error is None]),
|
|
'failed_requests': len([req for req in provider_requests if req.error is not None]),
|
|
'relationships_discovered': len(provider_relationships),
|
|
'avg_confidence': sum(rel.confidence_score for rel in provider_relationships) / len(provider_relationships) if provider_relationships else 0
|
|
}
|
|
|
|
return {
|
|
'session_id': self.session_id,
|
|
'duration_minutes': self._calculate_session_duration(),
|
|
'total_requests': self.session_metadata['total_requests'],
|
|
'total_relationships': self.session_metadata['total_relationships'],
|
|
'unique_indicators': len(set([rel.source_node for rel in self.relationships] + [rel.target_node for rel in self.relationships])),
|
|
'provider_statistics': provider_stats
|
|
}
|
|
|
|
def _calculate_session_duration(self) -> float:
|
|
"""Calculate session duration in minutes."""
|
|
if not self.session_metadata['end_time']:
|
|
end_time = datetime.now(timezone.utc)
|
|
else:
|
|
end_time = datetime.fromisoformat(self.session_metadata['end_time'])
|
|
|
|
start_time = datetime.fromisoformat(self.session_metadata['start_time'])
|
|
duration = (end_time - start_time).total_seconds() / 60
|
|
return round(duration, 2)
|
|
|
|
|
|
# Global logger instance for the current session
|
|
_current_logger: Optional[ForensicLogger] = None
|
|
_logger_lock = threading.Lock()
|
|
|
|
|
|
def get_forensic_logger() -> ForensicLogger:
|
|
"""Get or create the current forensic logger instance."""
|
|
global _current_logger
|
|
with _logger_lock:
|
|
if _current_logger is None:
|
|
_current_logger = ForensicLogger()
|
|
return _current_logger
|
|
|
|
|
|
def new_session() -> ForensicLogger:
|
|
"""Start a new forensic logging session."""
|
|
global _current_logger
|
|
with _logger_lock:
|
|
_current_logger = ForensicLogger()
|
|
return _current_logger |