dnscope/src/report_generator.py
overcuriousity 29e36e34be graph
2025-09-09 22:19:46 +02:00

451 lines
18 KiB
Python

# File: src/report_generator.py
"""Enhanced report generation with forensic details and discovery graph visualization."""
from datetime import datetime
from typing import Dict, Any, List, Set
from .data_structures import ForensicReconData, DiscoveryMethod, OperationType
import logging
logger = logging.getLogger(__name__)
class ForensicReportGenerator:
"""Generate comprehensive forensic reports with discovery provenance."""
def __init__(self, data: ForensicReconData):
self.data = data
def generate_text_report(self) -> str:
"""Generate comprehensive forensic text report."""
report = []
# Header
report.append("=" * 80)
report.append("FORENSIC DNS RECONNAISSANCE REPORT")
report.append("=" * 80)
report.append(f"Scan Start: {self.data.start_time}")
if self.data.end_time:
report.append(f"Scan End: {self.data.end_time}")
duration = self.data.end_time - self.data.start_time
report.append(f"Duration: {duration}")
report.append(f"Target: {self.data.scan_config.get('target', 'Unknown')}")
report.append(f"Max Depth: {self.data.scan_config.get('max_depth', 'Unknown')}")
report.append("")
# Executive Summary
report.append("EXECUTIVE SUMMARY")
report.append("-" * 40)
stats = self.data.get_stats()
report.append(f"Discovered Hostnames: {stats['hostnames']}")
report.append(f"IP Addresses Found: {stats['ip_addresses']}")
report.append(f"Operations Performed: {stats['operations_performed']}")
report.append(f"Discovery Relationships: {stats['discovery_edges']}")
report.append(f"DNS Records Collected: {stats['dns_records']}")
report.append(f"Total Certificates: {stats['certificates_total']}")
report.append(f" └─ Currently Valid: {stats['certificates_current']}")
report.append(f" └─ Expired: {stats['certificates_expired']}")
report.append(f"Shodan Results: {stats['shodan_results']}")
report.append(f"VirusTotal Results: {stats['virustotal_results']}")
report.append("")
# Discovery Graph Analysis
graph_analysis = self.data._generate_graph_analysis()
report.append("DISCOVERY GRAPH ANALYSIS")
report.append("-" * 40)
report.append(f"Maximum Discovery Depth: {graph_analysis['max_depth']}")
report.append(f"Root Nodes (Initial Targets): {len(graph_analysis['root_nodes'])}")
report.append(f"Leaf Nodes (No Further Discoveries): {len(graph_analysis['leaf_nodes'])}")
report.append("")
# Depth Distribution
report.append("Discovery Depth Distribution:")
for depth, count in sorted(graph_analysis['depth_distribution'].items()):
report.append(f" Depth {depth}: {count} hostnames")
report.append("")
# Discovery Methods Distribution
report.append("Discovery Methods Used:")
for method, count in sorted(graph_analysis['discovery_method_distribution'].items()):
report.append(f" {method}: {count} discoveries")
report.append("")
# Discovery Tree
report.append("DISCOVERY TREE")
report.append("-" * 40)
report.extend(self._generate_discovery_tree())
report.append("")
# Detailed Node Analysis
report.append("DETAILED NODE ANALYSIS")
report.append("-" * 40)
report.extend(self._generate_node_details())
report.append("")
# Operations Timeline
report.append("OPERATIONS TIMELINE")
report.append("-" * 40)
report.extend(self._generate_operations_timeline())
report.append("")
# Security Analysis
security_findings = self._analyze_security_findings()
if security_findings:
report.append("SECURITY ANALYSIS")
report.append("-" * 40)
report.extend(security_findings)
report.append("")
# Certificate Analysis
cert_analysis = self._analyze_certificates()
if cert_analysis:
report.append("CERTIFICATE ANALYSIS")
report.append("-" * 40)
report.extend(cert_analysis)
report.append("")
# DNS Record Analysis
report.append("DNS RECORD ANALYSIS")
report.append("-" * 40)
report.extend(self._analyze_dns_records())
report.append("")
return "\n".join(report)
def _generate_discovery_tree(self) -> List[str]:
"""Generate a tree view of hostname discoveries."""
tree_lines = []
# Find root nodes
graph_analysis = self.data._generate_graph_analysis()
root_nodes = graph_analysis['root_nodes']
if not root_nodes:
tree_lines.append("No root nodes found")
return tree_lines
# Generate tree for each root
for root in sorted(root_nodes):
tree_lines.extend(self._build_tree_branch(root, "", set()))
return tree_lines
def _build_tree_branch(self, hostname: str, prefix: str, visited: Set[str]) -> List[str]:
"""Build a tree branch for a hostname."""
lines = []
# Avoid cycles
if hostname in visited:
lines.append(f"{prefix}{hostname} [CYCLE]")
return lines
visited.add(hostname)
# Get node info
node = self.data.get_node(hostname)
if not node:
lines.append(f"{prefix}{hostname} [NO NODE DATA]")
return lines
# Node info
node_info = f"{hostname} (depth:{node.depth}"
if node.resolved_ips:
node_info += f", IPs:{len(node.resolved_ips)}"
if node.certificates:
valid_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
node_info += f", certs:{valid_certs}+{expired_certs}"
node_info += ")"
lines.append(f"{prefix}{node_info}")
# Get children
children = self.data.get_children(hostname)
children.sort()
for i, child in enumerate(children):
is_last = (i == len(children) - 1)
child_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
# Find discovery method for this child
discovery_method = "unknown"
for edge in self.data.edges:
if edge.source_hostname == hostname and edge.target_hostname == child:
discovery_method = edge.discovery_method.value
break
lines.append(f"{child_prefix}[{discovery_method}]")
lines.extend(self._build_tree_branch(child, next_prefix, visited.copy()))
return lines
def _generate_node_details(self) -> List[str]:
"""Generate detailed analysis of each node."""
details = []
# Sort nodes by depth, then alphabetically
sorted_nodes = sorted(self.data.nodes.items(),
key=lambda x: (x[1].depth, x[0]))
for hostname, node in sorted_nodes:
details.append(f"\n{hostname} (Depth {node.depth})")
details.append("-" * (len(hostname) + 20))
# Discovery provenance
details.append(f"First Seen: {node.first_seen}")
details.append(f"Last Updated: {node.last_updated}")
details.append(f"Discovery Methods: {', '.join(m.value for m in node.discovery_methods)}")
# Discovery paths
paths = self.data.get_discovery_path(hostname)
if paths:
details.append("Discovery Paths:")
for i, path in enumerate(paths[:3]): # Show max 3 paths
path_str = " -> ".join([f"{src}[{method.value}]{tgt}" for src, tgt, method in path])
details.append(f" Path {i+1}: {path_str}")
if len(paths) > 3:
details.append(f" ... and {len(paths) - 3} more paths")
# DNS status
if node.dns_exists is not None:
status = "EXISTS" if node.dns_exists else "NOT FOUND"
details.append(f"DNS Status: {status} (checked: {node.last_dns_check})")
# IP addresses
if node.resolved_ips:
details.append(f"Resolved IPs: {', '.join(sorted(node.resolved_ips))}")
# Reverse DNS
if node.reverse_dns:
details.append(f"Reverse DNS: {node.reverse_dns}")
# DNS records summary
total_records = len(node.get_all_dns_records())
if total_records > 0:
record_types = list(node.dns_records_by_type.keys())
details.append(f"DNS Records: {total_records} records ({', '.join(sorted(record_types))})")
# Certificates summary
current_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
if current_certs > 0 or expired_certs > 0:
details.append(f"Certificates: {current_certs} valid, {expired_certs} expired")
# External results
if node.shodan_results:
details.append(f"Shodan: {len(node.shodan_results)} results")
if node.virustotal_results:
vt_detections = sum(r.positives for r in node.virustotal_results)
details.append(f"VirusTotal: {len(node.virustotal_results)} scans, {vt_detections} total detections")
return details
def _generate_operations_timeline(self) -> List[str]:
"""Generate operations timeline."""
timeline = []
# Sort operations by timestamp
sorted_ops = []
for op_id in self.data.operation_timeline:
if op_id in self.data.operations:
sorted_ops.append(self.data.operations[op_id])
# Group operations by type for summary
op_summary = {}
for op in sorted_ops:
op_type = op.operation_type.value
if op_type not in op_summary:
op_summary[op_type] = {'total': 0, 'successful': 0, 'failed': 0}
op_summary[op_type]['total'] += 1
if op.success:
op_summary[op_type]['successful'] += 1
else:
op_summary[op_type]['failed'] += 1
# Operations summary
timeline.append("Operations Summary:")
for op_type, counts in sorted(op_summary.items()):
timeline.append(f" {op_type}: {counts['successful']}/{counts['total']} successful")
timeline.append("")
# Recent operations (last 20)
timeline.append("Recent Operations (last 20):")
recent_ops = sorted_ops[-20:] if len(sorted_ops) > 20 else sorted_ops
for op in recent_ops:
timestamp = op.timestamp.strftime("%H:%M:%S.%f")[:-3]
status = "" if op.success else ""
target_short = op.target[:30] + "..." if len(op.target) > 30 else op.target
timeline.append(f" {timestamp} {status} {op.operation_type.value:15} {target_short}")
# Show key results
if op.discovered_hostnames:
hostname_list = ", ".join(op.discovered_hostnames[:3])
if len(op.discovered_hostnames) > 3:
hostname_list += f" (+{len(op.discovered_hostnames) - 3} more)"
timeline.append(f" └─ Discovered: {hostname_list}")
if op.error_message:
timeline.append(f" └─ Error: {op.error_message[:50]}...")
return timeline
def _analyze_security_findings(self) -> List[str]:
"""Analyze security-related findings."""
findings = []
# VirusTotal detections
high_risk_resources = []
medium_risk_resources = []
for node in self.data.nodes.values():
for vt_result in node.virustotal_results:
if vt_result.positives > 5:
high_risk_resources.append((node.hostname, vt_result))
elif vt_result.positives > 0:
medium_risk_resources.append((node.hostname, vt_result))
if high_risk_resources:
findings.append("🚨 HIGH RISK FINDINGS:")
for hostname, vt_result in high_risk_resources:
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
findings.append(f" Report: {vt_result.permalink}")
if medium_risk_resources:
findings.append("⚠️ MEDIUM RISK FINDINGS:")
for hostname, vt_result in medium_risk_resources[:5]: # Show max 5
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
if len(medium_risk_resources) > 5:
findings.append(f" ... and {len(medium_risk_resources) - 5} more resources with detections")
# Expired certificates still in use
nodes_with_expired_certs = []
for hostname, node in self.data.nodes.items():
expired = node.get_expired_certificates()
current = node.get_current_certificates()
if expired and not current: # Only expired certs, no valid ones
nodes_with_expired_certs.append((hostname, len(expired)))
if nodes_with_expired_certs:
findings.append("📜 CERTIFICATE ISSUES:")
for hostname, count in nodes_with_expired_certs:
findings.append(f" {hostname}: {count} expired certificates, no valid ones")
return findings
def _analyze_certificates(self) -> List[str]:
"""Analyze certificate findings."""
cert_analysis = []
# Certificate statistics
total_certs = 0
valid_certs = 0
expired_certs = 0
wildcard_certs = 0
cert_authorities = {}
for node in self.data.nodes.values():
for cert in node.certificates:
total_certs += 1
if cert.is_valid_now:
valid_certs += 1
else:
expired_certs += 1
if cert.is_wildcard:
wildcard_certs += 1
# Count certificate authorities
issuer_short = cert.issuer.split(',')[0] if ',' in cert.issuer else cert.issuer
cert_authorities[issuer_short] = cert_authorities.get(issuer_short, 0) + 1
if total_certs == 0:
cert_analysis.append("No certificates found.")
return cert_analysis
cert_analysis.append(f"Total Certificates: {total_certs}")
cert_analysis.append(f" Currently Valid: {valid_certs}")
cert_analysis.append(f" Expired: {expired_certs}")
cert_analysis.append(f" Wildcard Certificates: {wildcard_certs}")
cert_analysis.append("")
# Top certificate authorities
cert_analysis.append("Certificate Authorities:")
sorted_cas = sorted(cert_authorities.items(), key=lambda x: x[1], reverse=True)
for ca, count in sorted_cas[:5]:
cert_analysis.append(f" {ca}: {count} certificates")
cert_analysis.append("")
# Expiring soon (within 30 days)
from datetime import timedelta
soon = datetime.now() + timedelta(days=30)
expiring_soon = []
for hostname, node in self.data.nodes.items():
for cert in node.get_current_certificates():
if cert.not_after <= soon:
expiring_soon.append((hostname, cert.not_after, cert.id))
if expiring_soon:
cert_analysis.append("Certificates Expiring Soon (within 30 days):")
for hostname, expiry, cert_id in sorted(expiring_soon, key=lambda x: x[1]):
cert_analysis.append(f" {hostname}: expires {expiry.strftime('%Y-%m-%d')} (cert ID: {cert_id})")
return cert_analysis
def _analyze_dns_records(self) -> List[str]:
"""Analyze DNS record patterns."""
dns_analysis = []
# Record type distribution
record_type_counts = {}
total_records = 0
for node in self.data.nodes.values():
for record_type, records in node.dns_records_by_type.items():
record_type_counts[record_type] = record_type_counts.get(record_type, 0) + len(records)
total_records += len(records)
dns_analysis.append(f"Total DNS Records: {total_records}")
dns_analysis.append("Record Type Distribution:")
for record_type, count in sorted(record_type_counts.items()):
percentage = (count / total_records * 100) if total_records > 0 else 0
dns_analysis.append(f" {record_type}: {count} ({percentage:.1f}%)")
dns_analysis.append("")
# Interesting findings
interesting = []
# Multiple MX records
multi_mx_nodes = []
for hostname, node in self.data.nodes.items():
mx_records = node.dns_records_by_type.get('MX', [])
if len(mx_records) > 1:
multi_mx_nodes.append((hostname, len(mx_records)))
if multi_mx_nodes:
interesting.append("Multiple MX Records:")
for hostname, count in multi_mx_nodes:
interesting.append(f" {hostname}: {count} MX records")
# CAA records (security-relevant)
caa_nodes = []
for hostname, node in self.data.nodes.items():
if 'CAA' in node.dns_records_by_type:
caa_nodes.append(hostname)
if caa_nodes:
interesting.append(f"Domains with CAA Records: {len(caa_nodes)}")
for hostname in caa_nodes[:5]: # Show first 5
interesting.append(f" {hostname}")
if interesting:
dns_analysis.append("Interesting DNS Findings:")
dns_analysis.extend(interesting)
return dns_analysis
# Backward compatibility
ReportGenerator = ForensicReportGenerator