dnscope/utils/export_manager.py
2025-09-18 17:42:39 +02:00

336 lines
13 KiB
Python

# dnsrecon-reduced/utils/export_manager.py
"""
Centralized export functionality for DNSRecon.
Handles all data export operations with forensic integrity and proper formatting.
"""
import json
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
from decimal import Decimal
from utils.helpers import _is_valid_domain, _is_valid_ip
import networkx as nx
class ExportManager:
"""
Centralized manager for all DNSRecon export operations.
Maintains forensic integrity and provides consistent export formats.
"""
def __init__(self):
"""Initialize export manager."""
pass
def export_scan_results(self, scanner) -> Dict[str, Any]:
"""
Export complete scan results with forensic metadata.
Args:
scanner: Scanner instance with completed scan data
Returns:
Complete scan results dictionary
"""
graph_data = self.export_graph_json(scanner.graph)
audit_trail = scanner.logger.export_audit_trail()
provider_stats = {}
for provider in scanner.providers:
provider_stats[provider.get_name()] = provider.get_statistics()
results = {
'scan_metadata': {
'target_domain': scanner.current_target,
'max_depth': scanner.max_depth,
'final_status': scanner.status,
'total_indicators_processed': scanner.indicators_processed,
'enabled_providers': list(provider_stats.keys()),
'session_id': scanner.session_id
},
'graph_data': graph_data,
'forensic_audit': audit_trail,
'provider_statistics': provider_stats,
'scan_summary': scanner.logger.get_forensic_summary()
}
# Add export metadata
results['export_metadata'] = {
'export_timestamp': datetime.now(timezone.utc).isoformat(),
'export_version': '1.0.0',
'forensic_integrity': 'maintained'
}
return results
def export_targets_list(self, scanner) -> str:
"""
Export all discovered domains and IPs as a text file.
Args:
scanner: Scanner instance with graph data
Returns:
Newline-separated list of targets
"""
nodes = scanner.graph.get_graph_data().get('nodes', [])
targets = {
node['id'] for node in nodes
if _is_valid_domain(node['id']) or _is_valid_ip(node['id'])
}
return "\n".join(sorted(list(targets)))
def generate_executive_summary(self, scanner) -> str:
"""
Generate a natural-language executive summary of scan results.
Args:
scanner: Scanner instance with completed scan data
Returns:
Formatted executive summary text
"""
summary = []
now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')
scan_metadata = scanner.get_scan_status()
graph_data = scanner.graph.get_graph_data()
nodes = graph_data.get('nodes', [])
edges = graph_data.get('edges', [])
summary.append(f"DNSRecon Executive Summary")
summary.append(f"Report Generated: {now}")
summary.append("="*40)
# Scan Overview
summary.append("\n## Scan Overview")
summary.append(f"- Initial Target: {scanner.current_target}")
summary.append(f"- Scan Status: {scanner.status.capitalize()}")
summary.append(f"- Analysis Depth: {scanner.max_depth}")
summary.append(f"- Total Indicators Found: {len(nodes)}")
summary.append(f"- Total Relationships Discovered: {len(edges)}")
# Key Findings
summary.append("\n## Key Findings")
domains = [n for n in nodes if n['type'] == 'domain']
ips = [n for n in nodes if n['type'] == 'ip']
isps = [n for n in nodes if n['type'] == 'isp']
cas = [n for n in nodes if n['type'] == 'ca']
summary.append(f"- Discovered {len(domains)} unique domain(s).")
summary.append(f"- Identified {len(ips)} unique IP address(es).")
if isps:
summary.append(f"- Infrastructure is hosted across {len(isps)} unique ISP(s).")
if cas:
summary.append(f"- Found certificates issued by {len(cas)} unique Certificate Authorit(y/ies).")
# Detailed Findings
summary.append("\n## Detailed Findings")
# Domain Analysis
if domains:
summary.append("\n### Domain Analysis")
for domain in domains[:5]: # Report on first 5
summary.append(f"\n- Domain: {domain['id']}")
# Associated IPs
associated_ips = [edge['to'] for edge in edges
if edge['from'] == domain['id'] and _is_valid_ip(edge['to'])]
if associated_ips:
summary.append(f" - Associated IPs: {', '.join(associated_ips)}")
# Certificate info
cert_attributes = [attr for attr in domain.get('attributes', [])
if attr.get('name', '').startswith('cert_')]
if cert_attributes:
issuer = next((attr['value'] for attr in cert_attributes
if attr['name'] == 'cert_issuer_name'), 'N/A')
valid_until = next((attr['value'] for attr in cert_attributes
if attr['name'] == 'cert_not_after'), 'N/A')
summary.append(f" - Certificate Issuer: {issuer}")
summary.append(f" - Certificate Valid Until: {valid_until}")
# IP Address Analysis
if ips:
summary.append("\n### IP Address Analysis")
for ip in ips[:5]: # Report on first 5
summary.append(f"\n- IP Address: {ip['id']}")
# Hostnames
hostnames = [edge['to'] for edge in edges
if edge['from'] == ip['id'] and _is_valid_domain(edge['to'])]
if hostnames:
summary.append(f" - Associated Hostnames: {', '.join(hostnames)}")
# ISP
isp_edge = next((edge for edge in edges
if edge['from'] == ip['id'] and
any(node['id'] == edge['to'] and node['type'] == 'isp'
for node in nodes)), None)
if isp_edge:
summary.append(f" - ISP: {isp_edge['to']}")
# Data Sources
summary.append("\n## Data Sources")
provider_stats = scanner.logger.get_forensic_summary().get('provider_statistics', {})
for provider, stats in provider_stats.items():
relationships = stats.get('relationships_discovered', 0)
requests = stats.get('successful_requests', 0)
summary.append(f"- {provider.capitalize()}: {relationships} relationships from {requests} requests.")
summary.append("\n" + "="*40)
summary.append("End of Report")
return "\n".join(summary)
def export_graph_json(self, graph_manager) -> Dict[str, Any]:
"""
Export complete graph data as a JSON-serializable dictionary.
Moved from GraphManager to centralize export functionality.
Args:
graph_manager: GraphManager instance with graph data
Returns:
Complete graph data with export metadata
"""
graph_data = nx.node_link_data(graph_manager.graph, edges="edges")
return {
'export_metadata': {
'export_timestamp': datetime.now(timezone.utc).isoformat(),
'graph_creation_time': graph_manager.creation_time,
'last_modified': graph_manager.last_modified,
'total_nodes': graph_manager.get_node_count(),
'total_edges': graph_manager.get_edge_count(),
'graph_format': 'dnsrecon_v1_unified_model'
},
'graph': graph_data,
'statistics': graph_manager.get_statistics()
}
def serialize_to_json(self, data: Dict[str, Any], indent: int = 2) -> str:
"""
Serialize data to JSON with custom handling for non-serializable objects.
Args:
data: Data to serialize
indent: JSON indentation level
Returns:
JSON string representation
"""
try:
return json.dumps(data, indent=indent, cls=CustomJSONEncoder, ensure_ascii=False)
except Exception:
# Fallback to aggressive cleaning
cleaned_data = self._clean_for_json(data)
return json.dumps(cleaned_data, indent=indent, ensure_ascii=False)
def _clean_for_json(self, obj, max_depth: int = 10, current_depth: int = 0) -> Any:
"""
Recursively clean an object to make it JSON serializable.
Handles circular references and problematic object types.
Args:
obj: Object to clean
max_depth: Maximum recursion depth
current_depth: Current recursion depth
Returns:
JSON-serializable object
"""
if current_depth > max_depth:
return f"<max_depth_exceeded_{type(obj).__name__}>"
if obj is None or isinstance(obj, (bool, int, float, str)):
return obj
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, (set, frozenset)):
return list(obj)
elif isinstance(obj, dict):
cleaned = {}
for key, value in obj.items():
try:
# Ensure key is string
clean_key = str(key) if not isinstance(key, str) else key
cleaned[clean_key] = self._clean_for_json(value, max_depth, current_depth + 1)
except Exception:
cleaned[str(key)] = f"<serialization_error_{type(value).__name__}>"
return cleaned
elif isinstance(obj, (list, tuple)):
cleaned = []
for item in obj:
try:
cleaned.append(self._clean_for_json(item, max_depth, current_depth + 1))
except Exception:
cleaned.append(f"<serialization_error_{type(item).__name__}>")
return cleaned
elif hasattr(obj, '__dict__'):
try:
return self._clean_for_json(obj.__dict__, max_depth, current_depth + 1)
except Exception:
return str(obj)
elif hasattr(obj, 'value'):
# For enum-like objects
return obj.value
else:
return str(obj)
def generate_filename(self, target: str, export_type: str, timestamp: Optional[datetime] = None) -> str:
"""
Generate standardized filename for exports.
Args:
target: Target domain/IP being scanned
export_type: Type of export (json, txt, summary)
timestamp: Optional timestamp (defaults to now)
Returns:
Formatted filename with forensic naming convention
"""
if timestamp is None:
timestamp = datetime.now(timezone.utc)
timestamp_str = timestamp.strftime('%Y%m%d_%H%M%S')
safe_target = "".join(c for c in target if c.isalnum() or c in ('-', '_', '.')).rstrip()
extension_map = {
'json': 'json',
'txt': 'txt',
'summary': 'txt',
'targets': 'txt'
}
extension = extension_map.get(export_type, 'txt')
return f"dnsrecon_{export_type}_{safe_target}_{timestamp_str}.{extension}"
class CustomJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder to handle non-serializable objects."""
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, Decimal):
return float(obj)
elif hasattr(obj, '__dict__'):
# For custom objects, try to serialize their dict representation
try:
return obj.__dict__
except:
return str(obj)
elif hasattr(obj, 'value') and hasattr(obj, 'name'):
# For enum objects
return obj.value
else:
# For any other non-serializable object, convert to string
return str(obj)
# Global export manager instance
export_manager = ExportManager()