adjustments to shodan & export manager

This commit is contained in:
overcuriousity 2025-09-18 19:22:58 +02:00
parent d4081e1a32
commit cbfd40ee98
2 changed files with 707 additions and 124 deletions

View File

@ -117,6 +117,9 @@ class ShodanProvider(BaseProvider):
Returns: Returns:
ProviderResult containing discovered relationships and attributes ProviderResult containing discovered relationships and attributes
Raises:
Exception: For temporary failures that should be retried (timeouts, 502/503 errors, connection issues)
""" """
if not _is_valid_ip(ip) or not self.is_available(): if not _is_valid_ip(ip) or not self.is_available():
return ProviderResult() return ProviderResult()
@ -129,50 +132,117 @@ class ShodanProvider(BaseProvider):
cache_file = self._get_cache_file_path(normalized_ip) cache_file = self._get_cache_file_path(normalized_ip)
cache_status = self._get_cache_status(cache_file) cache_status = self._get_cache_status(cache_file)
result = ProviderResult() if cache_status == "fresh":
self.logger.logger.debug(f"Using fresh cache for Shodan query: {normalized_ip}")
return self._load_from_cache(cache_file)
# Need to query API
self.logger.logger.debug(f"Querying Shodan API for: {normalized_ip}")
url = f"{self.base_url}/shodan/host/{normalized_ip}"
params = {'key': self.api_key}
try: try:
if cache_status == "fresh": response = self.make_request(url, method="GET", params=params, target_indicator=normalized_ip)
result = self._load_from_cache(cache_file)
self.logger.logger.info(f"Using cached Shodan data for {normalized_ip}")
else: # "stale" or "not_found"
url = f"{self.base_url}/shodan/host/{normalized_ip}"
params = {'key': self.api_key}
response = self.make_request(url, method="GET", params=params, target_indicator=normalized_ip)
if response and response.status_code == 200: if not response:
data = response.json() # Connection failed - use stale cache if available, otherwise retry
# Process the data into ProviderResult BEFORE caching if cache_status == "stale":
result = self._process_shodan_data(normalized_ip, data) self.logger.logger.info(f"Using stale cache for {normalized_ip} due to connection failure")
self._save_to_cache(cache_file, result, data) # Save both result and raw data return self._load_from_cache(cache_file)
elif response and response.status_code == 404:
# Handle all 404s as successful "no information available" responses
# Shodan returns 404 when no information is available for an IP
self.logger.logger.debug(f"Shodan has no information for {normalized_ip}")
result = ProviderResult() # Empty but successful result
# Cache the empty result to avoid repeated queries
self._save_to_cache(cache_file, result, {'error': 'No information available'})
elif cache_status == "stale":
# If API fails on a stale cache, use the old data
result = self._load_from_cache(cache_file)
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API failure")
else: else:
# Other HTTP error codes should be treated as failures raise requests.exceptions.RequestException("No response from Shodan API - should retry")
status_code = response.status_code if response else "No response"
raise requests.exceptions.RequestException(f"Shodan API returned HTTP {status_code}") if response.status_code == 200:
self.logger.logger.debug(f"Shodan returned data for {normalized_ip}")
data = response.json()
result = self._process_shodan_data(normalized_ip, data)
self._save_to_cache(cache_file, result, data)
return result
elif response.status_code == 404:
# 404 = "no information available" - successful but empty result, don't retry
self.logger.logger.debug(f"Shodan has no information for {normalized_ip} (404)")
result = ProviderResult() # Empty but successful result
# Cache the empty result to avoid repeated queries
self._save_to_cache(cache_file, result, {'shodan_status': 'no_information', 'status_code': 404})
return result
elif response.status_code in [401, 403]:
# Authentication/authorization errors - permanent failures, don't retry
self.logger.logger.error(f"Shodan API authentication failed for {normalized_ip} (HTTP {response.status_code})")
return ProviderResult() # Empty result, don't retry
elif response.status_code in [429]:
# Rate limiting - should be handled by rate limiter, but if we get here, retry
self.logger.logger.warning(f"Shodan API rate limited for {normalized_ip} (HTTP {response.status_code})")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to rate limiting")
return self._load_from_cache(cache_file)
else:
raise requests.exceptions.RequestException(f"Shodan API rate limited (HTTP {response.status_code}) - should retry")
elif response.status_code in [500, 502, 503, 504]:
# Server errors - temporary failures that should be retried
self.logger.logger.warning(f"Shodan API server error for {normalized_ip} (HTTP {response.status_code})")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to server error")
return self._load_from_cache(cache_file)
else:
raise requests.exceptions.RequestException(f"Shodan API server error (HTTP {response.status_code}) - should retry")
except requests.exceptions.RequestException as e:
self.logger.logger.debug(f"Shodan API error for {normalized_ip}: {e}")
if cache_status == "stale":
# Use stale cache if available
result = self._load_from_cache(cache_file)
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API error")
else: else:
# FIXED: Only re-raise for actual network/timeout errors, not 404s # Other HTTP error codes - treat as temporary failures
# 404s are already handled above as successful empty results self.logger.logger.warning(f"Shodan API returned unexpected status {response.status_code} for {normalized_ip}")
raise e if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to unexpected API error")
return self._load_from_cache(cache_file)
else:
raise requests.exceptions.RequestException(f"Shodan API error (HTTP {response.status_code}) - should retry")
return result except requests.exceptions.Timeout:
# Timeout errors - should be retried
self.logger.logger.warning(f"Shodan API timeout for {normalized_ip}")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to timeout")
return self._load_from_cache(cache_file)
else:
raise # Re-raise timeout for retry
except requests.exceptions.ConnectionError:
# Connection errors - should be retried
self.logger.logger.warning(f"Shodan API connection error for {normalized_ip}")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to connection error")
return self._load_from_cache(cache_file)
else:
raise # Re-raise connection error for retry
except requests.exceptions.RequestException:
# Other request exceptions - should be retried
self.logger.logger.warning(f"Shodan API request exception for {normalized_ip}")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to request exception")
return self._load_from_cache(cache_file)
else:
raise # Re-raise request exception for retry
except json.JSONDecodeError:
# JSON parsing error on 200 response - treat as temporary failure
self.logger.logger.error(f"Invalid JSON response from Shodan for {normalized_ip}")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to JSON parsing error")
return self._load_from_cache(cache_file)
else:
raise requests.exceptions.RequestException("Invalid JSON response from Shodan - should retry")
except Exception as e:
# Unexpected exceptions - log and treat as temporary failures
self.logger.logger.error(f"Unexpected exception in Shodan query for {normalized_ip}: {e}")
if cache_status == "stale":
self.logger.logger.info(f"Using stale cache for {normalized_ip} due to unexpected exception")
return self._load_from_cache(cache_file)
else:
raise requests.exceptions.RequestException(f"Unexpected error in Shodan query: {e}") from e
def _load_from_cache(self, cache_file_path: Path) -> ProviderResult: def _load_from_cache(self, cache_file_path: Path) -> ProviderResult:
"""Load processed Shodan data from a cache file.""" """Load processed Shodan data from a cache file."""

View File

@ -3,21 +3,24 @@
""" """
Centralized export functionality for DNSRecon. Centralized export functionality for DNSRecon.
Handles all data export operations with forensic integrity and proper formatting. Handles all data export operations with forensic integrity and proper formatting.
ENHANCED: Professional forensic executive summary generation for court-ready documentation.
""" """
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Dict, Any, List, Optional from typing import Dict, Any, List, Optional, Set, Tuple
from decimal import Decimal from decimal import Decimal
from collections import defaultdict, Counter
import networkx as nx
from utils.helpers import _is_valid_domain, _is_valid_ip from utils.helpers import _is_valid_domain, _is_valid_ip
import networkx as nx
class ExportManager: class ExportManager:
""" """
Centralized manager for all DNSRecon export operations. Centralized manager for all DNSRecon export operations.
Maintains forensic integrity and provides consistent export formats. Maintains forensic integrity and provides consistent export formats.
ENHANCED: Advanced forensic analysis and professional reporting capabilities.
""" """
def __init__(self): def __init__(self):
@ -84,105 +87,615 @@ class ExportManager:
def generate_executive_summary(self, scanner) -> str: def generate_executive_summary(self, scanner) -> str:
""" """
Generate a natural-language executive summary of scan results. ENHANCED: Generate a comprehensive, court-ready forensic executive summary.
Args: Args:
scanner: Scanner instance with completed scan data scanner: Scanner instance with completed scan data
Returns: Returns:
Formatted executive summary text Professional forensic summary formatted for investigative use
""" """
summary = [] report = []
now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z') now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')
scan_metadata = scanner.get_scan_status()
# Get comprehensive data for analysis
graph_data = scanner.graph.get_graph_data() graph_data = scanner.graph.get_graph_data()
nodes = graph_data.get('nodes', []) nodes = graph_data.get('nodes', [])
edges = graph_data.get('edges', []) edges = graph_data.get('edges', [])
audit_trail = scanner.logger.export_audit_trail()
summary.append(f"DNSRecon Executive Summary") # Perform advanced analysis
summary.append(f"Report Generated: {now}") infrastructure_analysis = self._analyze_infrastructure_patterns(nodes, edges)
summary.append("="*40)
# Scan Overview # === HEADER AND METADATA ===
summary.append("\n## Scan Overview") report.extend([
summary.append(f"- Initial Target: {scanner.current_target}") "=" * 80,
summary.append(f"- Scan Status: {scanner.status.capitalize()}") "DIGITAL INFRASTRUCTURE RECONNAISSANCE REPORT",
summary.append(f"- Analysis Depth: {scanner.max_depth}") "=" * 80,
summary.append(f"- Total Indicators Found: {len(nodes)}") "",
summary.append(f"- Total Relationships Discovered: {len(edges)}") f"Report Generated: {now}",
f"Investigation Target: {scanner.current_target}",
f"Analysis Session: {scanner.session_id}",
f"Scan Depth: {scanner.max_depth} levels",
f"Final Status: {scanner.status.upper()}",
""
])
# Key Findings # === EXECUTIVE SUMMARY ===
summary.append("\n## Key Findings") report.extend([
domains = [n for n in nodes if n['type'] == 'domain'] "EXECUTIVE SUMMARY",
ips = [n for n in nodes if n['type'] == 'ip'] "-" * 40,
isps = [n for n in nodes if n['type'] == 'isp'] "",
cas = [n for n in nodes if n['type'] == 'ca'] f"This report presents the findings of a comprehensive passive reconnaissance analysis "
f"conducted against the target '{scanner.current_target}'. The investigation employed "
f"multiple intelligence sources and discovered {len(nodes)} distinct digital entities "
f"connected through {len(edges)} verified relationships.",
"",
f"The analysis reveals a digital infrastructure comprising {infrastructure_analysis['domains']} "
f"domain names, {infrastructure_analysis['ips']} IP addresses, and {infrastructure_analysis['isps']} "
f"infrastructure service providers. Certificate transparency analysis identified "
f"{infrastructure_analysis['cas']} certificate authorities managing the cryptographic "
f"infrastructure for the investigated entities.",
"",
])
summary.append(f"- Discovered {len(domains)} unique domain(s).") # === METHODOLOGY ===
summary.append(f"- Identified {len(ips)} unique IP address(es).") report.extend([
if isps: "INVESTIGATIVE METHODOLOGY",
summary.append(f"- Infrastructure is hosted across {len(isps)} unique ISP(s).") "-" * 40,
if cas: "",
summary.append(f"- Found certificates issued by {len(cas)} unique Certificate Authorit(y/ies).") "This analysis employed passive reconnaissance techniques using the following verified data sources:",
""
])
# Detailed Findings provider_info = {
summary.append("\n## Detailed Findings") 'dns': 'Standard DNS resolution and reverse DNS lookups',
'crtsh': 'Certificate Transparency database analysis via crt.sh',
'shodan': 'Internet-connected device intelligence via Shodan API'
}
for provider in scanner.providers:
provider_name = provider.get_name()
stats = provider.get_statistics()
description = provider_info.get(provider_name, f'{provider_name} data provider')
report.extend([
f"{provider.get_display_name()}: {description}",
f" - Total Requests: {stats['total_requests']}",
f" - Success Rate: {stats['success_rate']:.1f}%",
f" - Relationships Discovered: {stats['relationships_found']}",
""
])
# === INFRASTRUCTURE ANALYSIS ===
report.extend([
"INFRASTRUCTURE ANALYSIS",
"-" * 40,
""
])
# Domain Analysis # Domain Analysis
if domains: if infrastructure_analysis['domains'] > 0:
summary.append("\n### Domain Analysis") report.extend([
for domain in domains[:5]: # Report on first 5 f"Domain Name Infrastructure ({infrastructure_analysis['domains']} entities):",
summary.append(f"\n- Domain: {domain['id']}") ""
])
# Associated IPs domain_details = self._get_detailed_domain_analysis(nodes, edges)
associated_ips = [edge['to'] for edge in edges for domain_info in domain_details[:10]: # Top 10 domains
if edge['from'] == domain['id'] and _is_valid_ip(edge['to'])] report.extend([
if associated_ips: f"{domain_info['domain']}",
summary.append(f" - Associated IPs: {', '.join(associated_ips)}") f" - Type: {domain_info['classification']}",
f" - Connected IPs: {len(domain_info['ips'])}",
f" - Certificate Status: {domain_info['cert_status']}",
f" - Relationship Confidence: {domain_info['avg_confidence']:.2f}",
])
# Certificate info if domain_info['security_notes']:
cert_attributes = [attr for attr in domain.get('attributes', []) report.extend([
if attr.get('name', '').startswith('cert_')] f" - Security Notes: {', '.join(domain_info['security_notes'])}",
if cert_attributes: ])
issuer = next((attr['value'] for attr in cert_attributes report.append("")
if attr['name'] == 'cert_issuer_name'), 'N/A')
valid_until = next((attr['value'] for attr in cert_attributes
if attr['name'] == 'cert_not_after'), 'N/A')
summary.append(f" - Certificate Issuer: {issuer}")
summary.append(f" - Certificate Valid Until: {valid_until}")
# IP Address Analysis # IP Address Analysis
if ips: if infrastructure_analysis['ips'] > 0:
summary.append("\n### IP Address Analysis") report.extend([
for ip in ips[:5]: # Report on first 5 f"IP Address Infrastructure ({infrastructure_analysis['ips']} entities):",
summary.append(f"\n- IP Address: {ip['id']}") ""
])
# Hostnames ip_details = self._get_detailed_ip_analysis(nodes, edges)
hostnames = [edge['to'] for edge in edges for ip_info in ip_details[:8]: # Top 8 IPs
if edge['from'] == ip['id'] and _is_valid_domain(edge['to'])] report.extend([
if hostnames: f"{ip_info['ip']} ({ip_info['version']})",
summary.append(f" - Associated Hostnames: {', '.join(hostnames)}") f" - Associated Domains: {len(ip_info['domains'])}",
f" - ISP: {ip_info['isp'] or 'Unknown'}",
f" - Geographic Location: {ip_info['location'] or 'Not determined'}",
])
# ISP if ip_info['open_ports']:
isp_edge = next((edge for edge in edges report.extend([
if edge['from'] == ip['id'] and f" - Exposed Services: {', '.join(map(str, ip_info['open_ports'][:5]))}"
any(node['id'] == edge['to'] and node['type'] == 'isp' + (f" (and {len(ip_info['open_ports']) - 5} more)" if len(ip_info['open_ports']) > 5 else ""),
for node in nodes)), None) ])
if isp_edge: report.append("")
summary.append(f" - ISP: {isp_edge['to']}")
# Data Sources # === RELATIONSHIP ANALYSIS ===
summary.append("\n## Data Sources") report.extend([
provider_stats = scanner.logger.get_forensic_summary().get('provider_statistics', {}) "ENTITY RELATIONSHIP ANALYSIS",
for provider, stats in provider_stats.items(): "-" * 40,
relationships = stats.get('relationships_discovered', 0) ""
requests = stats.get('successful_requests', 0) ])
summary.append(f"- {provider.capitalize()}: {relationships} relationships from {requests} requests.")
summary.append("\n" + "="*40) # Network topology insights
summary.append("End of Report") topology = self._analyze_network_topology(nodes, edges)
report.extend([
f"Network Topology Assessment:",
f"• Central Hubs: {len(topology['hubs'])} entities serve as primary connection points",
f"• Isolated Clusters: {len(topology['clusters'])} distinct groupings identified",
f"• Relationship Density: {topology['density']:.3f} (0=sparse, 1=fully connected)",
f"• Average Path Length: {topology['avg_path_length']:.2f} degrees of separation",
""
])
return "\n".join(summary) # Key relationships
key_relationships = self._identify_key_relationships(edges)
if key_relationships:
report.extend([
"Critical Infrastructure Relationships:",
""
])
for rel in key_relationships[:8]: # Top 8 relationships
confidence_desc = self._describe_confidence(rel['confidence'])
report.extend([
f"{rel['source']}{rel['target']}",
f" - Relationship: {self._humanize_relationship_type(rel['type'])}",
f" - Evidence Strength: {confidence_desc} ({rel['confidence']:.2f})",
f" - Discovery Method: {rel['provider']}",
""
])
# === CERTIFICATE ANALYSIS ===
cert_analysis = self._analyze_certificate_infrastructure(nodes)
if cert_analysis['total_certs'] > 0:
report.extend([
"CERTIFICATE INFRASTRUCTURE ANALYSIS",
"-" * 40,
"",
f"Certificate Status Overview:",
f"• Total Certificates Analyzed: {cert_analysis['total_certs']}",
f"• Valid Certificates: {cert_analysis['valid']}",
f"• Expired/Invalid: {cert_analysis['expired']}",
f"• Certificate Authorities: {len(cert_analysis['cas'])}",
""
])
if cert_analysis['cas']:
report.extend([
"Certificate Authority Distribution:",
""
])
for ca, count in cert_analysis['cas'].most_common(5):
report.extend([
f"{ca}: {count} certificate(s)",
])
report.append("")
# === TECHNICAL APPENDIX ===
report.extend([
"TECHNICAL APPENDIX",
"-" * 40,
"",
"Data Quality Assessment:",
f"• Total API Requests: {audit_trail.get('session_metadata', {}).get('total_requests', 0)}",
f"• Data Providers Used: {len(audit_trail.get('session_metadata', {}).get('providers_used', []))}",
f"• Relationship Confidence Distribution:",
])
# Confidence distribution
confidence_dist = self._calculate_confidence_distribution(edges)
for level, count in confidence_dist.items():
percentage = (count / len(edges) * 100) if edges else 0
report.extend([
f" - {level.title()} Confidence (≥{self._get_confidence_threshold(level)}): {count} ({percentage:.1f}%)",
])
report.extend([
"",
"Correlation Analysis:",
f"• Entity Correlations Identified: {len(scanner.graph.correlation_index)}",
f"• Cross-Reference Validation: {self._count_cross_validated_relationships(edges)} relationships verified by multiple sources",
""
])
# === CONCLUSION ===
report.extend([
"CONCLUSION",
"-" * 40,
"",
self._generate_conclusion(scanner.current_target, infrastructure_analysis,
len(edges)),
"",
"This analysis was conducted using passive reconnaissance techniques and represents "
"the digital infrastructure observable through public data sources at the time of investigation. "
"All findings are supported by verifiable technical evidence and documented through "
"a complete audit trail maintained for forensic integrity.",
"",
f"Investigation completed: {now}",
f"Report authenticated by: DNSRecon v{self._get_version()}",
"",
"=" * 80,
"END OF REPORT",
"=" * 80
])
return "\n".join(report)
def _analyze_infrastructure_patterns(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]:
"""Analyze infrastructure patterns and classify entities."""
analysis = {
'domains': len([n for n in nodes if n['type'] == 'domain']),
'ips': len([n for n in nodes if n['type'] == 'ip']),
'isps': len([n for n in nodes if n['type'] == 'isp']),
'cas': len([n for n in nodes if n['type'] == 'ca']),
'correlations': len([n for n in nodes if n['type'] == 'correlation_object'])
}
return analysis
def _get_detailed_domain_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]:
"""Generate detailed analysis for each domain."""
domain_nodes = [n for n in nodes if n['type'] == 'domain']
domain_analysis = []
for domain in domain_nodes:
# Find connected IPs
connected_ips = [e['to'] for e in edges
if e['from'] == domain['id'] and _is_valid_ip(e['to'])]
# Determine classification
classification = "Primary Domain"
if domain['id'].startswith('www.'):
classification = "Web Interface"
elif any(subdomain in domain['id'] for subdomain in ['api.', 'mail.', 'smtp.']):
classification = "Service Endpoint"
elif domain['id'].count('.') > 1:
classification = "Subdomain"
# Certificate status
cert_status = self._determine_certificate_status(domain)
# Security notes
security_notes = []
if cert_status == "Expired/Invalid":
security_notes.append("Certificate validation issues")
if len(connected_ips) == 0:
security_notes.append("No IP resolution found")
if len(connected_ips) > 5:
security_notes.append("Multiple IP endpoints")
# Average confidence
domain_edges = [e for e in edges if e['from'] == domain['id']]
avg_confidence = sum(e['confidence_score'] for e in domain_edges) / len(domain_edges) if domain_edges else 0
domain_analysis.append({
'domain': domain['id'],
'classification': classification,
'ips': connected_ips,
'cert_status': cert_status,
'security_notes': security_notes,
'avg_confidence': avg_confidence
})
# Sort by number of connections (most connected first)
return sorted(domain_analysis, key=lambda x: len(x['ips']), reverse=True)
def _get_detailed_ip_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]:
"""Generate detailed analysis for each IP address."""
ip_nodes = [n for n in nodes if n['type'] == 'ip']
ip_analysis = []
for ip in ip_nodes:
# Find connected domains
connected_domains = [e['from'] for e in edges
if e['to'] == ip['id'] and _is_valid_domain(e['from'])]
# Extract metadata from attributes
ip_version = "IPv4"
location = None
isp = None
open_ports = []
for attr in ip.get('attributes', []):
if attr.get('name') == 'country':
location = attr.get('value')
elif attr.get('name') == 'org':
isp = attr.get('value')
elif attr.get('name') == 'shodan_open_port':
open_ports.append(attr.get('value'))
elif 'ipv6' in str(attr.get('metadata', {})).lower():
ip_version = "IPv6"
# Find ISP from relationships
if not isp:
isp_edges = [e for e in edges if e['from'] == ip['id'] and e['label'].endswith('_isp')]
isp = isp_edges[0]['to'] if isp_edges else None
ip_analysis.append({
'ip': ip['id'],
'version': ip_version,
'domains': connected_domains,
'isp': isp,
'location': location,
'open_ports': open_ports
})
# Sort by number of connected domains
return sorted(ip_analysis, key=lambda x: len(x['domains']), reverse=True)
def _analyze_network_topology(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]:
"""Analyze network topology and identify key structural patterns."""
if not nodes or not edges:
return {'hubs': [], 'clusters': [], 'density': 0, 'avg_path_length': 0}
# Create NetworkX graph
G = nx.DiGraph()
for node in nodes:
G.add_node(node['id'])
for edge in edges:
G.add_edge(edge['from'], edge['to'])
# Convert to undirected for certain analyses
G_undirected = G.to_undirected()
# Identify hubs (nodes with high degree centrality)
centrality = nx.degree_centrality(G_undirected)
hub_threshold = max(centrality.values()) * 0.7 if centrality else 0
hubs = [node for node, cent in centrality.items() if cent >= hub_threshold]
# Find connected components (clusters)
clusters = list(nx.connected_components(G_undirected))
# Calculate density
density = nx.density(G_undirected)
# Calculate average path length (for largest component)
if G_undirected.number_of_nodes() > 1:
largest_cc = max(nx.connected_components(G_undirected), key=len)
subgraph = G_undirected.subgraph(largest_cc)
try:
avg_path_length = nx.average_shortest_path_length(subgraph)
except:
avg_path_length = 0
else:
avg_path_length = 0
return {
'hubs': hubs,
'clusters': clusters,
'density': density,
'avg_path_length': avg_path_length
}
def _identify_key_relationships(self, edges: List[Dict]) -> List[Dict[str, Any]]:
"""Identify the most significant relationships in the infrastructure."""
# Score relationships by confidence and type importance
relationship_importance = {
'dns_a_record': 0.9,
'dns_aaaa_record': 0.9,
'crtsh_cert_issuer': 0.8,
'shodan_isp': 0.8,
'crtsh_san_certificate': 0.7,
'dns_mx_record': 0.7,
'dns_ns_record': 0.7
}
scored_edges = []
for edge in edges:
base_confidence = edge.get('confidence_score', 0)
type_weight = relationship_importance.get(edge.get('label', ''), 0.5)
combined_score = (base_confidence * 0.7) + (type_weight * 0.3)
scored_edges.append({
'source': edge['from'],
'target': edge['to'],
'type': edge.get('label', ''),
'confidence': base_confidence,
'provider': edge.get('source_provider', ''),
'score': combined_score
})
# Return top relationships by score
return sorted(scored_edges, key=lambda x: x['score'], reverse=True)
def _analyze_certificate_infrastructure(self, nodes: List[Dict]) -> Dict[str, Any]:
"""Analyze certificate infrastructure across all domains."""
domain_nodes = [n for n in nodes if n['type'] == 'domain']
ca_nodes = [n for n in nodes if n['type'] == 'ca']
valid_certs = 0
expired_certs = 0
total_certs = 0
cas = Counter()
for domain in domain_nodes:
for attr in domain.get('attributes', []):
if attr.get('name') == 'cert_is_currently_valid':
total_certs += 1
if attr.get('value') is True:
valid_certs += 1
else:
expired_certs += 1
elif attr.get('name') == 'cert_issuer_name':
issuer = attr.get('value')
if issuer:
cas[issuer] += 1
return {
'total_certs': total_certs,
'valid': valid_certs,
'expired': expired_certs,
'cas': cas
}
def _has_expired_certificates(self, domain_node: Dict) -> bool:
"""Check if domain has expired certificates."""
for attr in domain_node.get('attributes', []):
if (attr.get('name') == 'cert_is_currently_valid' and
attr.get('value') is False):
return True
return False
def _determine_certificate_status(self, domain_node: Dict) -> str:
"""Determine the certificate status for a domain."""
has_valid = False
has_expired = False
has_any = False
for attr in domain_node.get('attributes', []):
if attr.get('name') == 'cert_is_currently_valid':
has_any = True
if attr.get('value') is True:
has_valid = True
else:
has_expired = True
if not has_any:
return "No Certificate Data"
elif has_valid and not has_expired:
return "Valid"
elif has_expired and not has_valid:
return "Expired/Invalid"
else:
return "Mixed Status"
def _describe_confidence(self, confidence: float) -> str:
"""Convert confidence score to descriptive text."""
if confidence >= 0.9:
return "Very High"
elif confidence >= 0.8:
return "High"
elif confidence >= 0.6:
return "Medium"
elif confidence >= 0.4:
return "Low"
else:
return "Very Low"
def _humanize_relationship_type(self, rel_type: str) -> str:
"""Convert technical relationship types to human-readable descriptions."""
type_map = {
'dns_a_record': 'DNS A Record Resolution',
'dns_aaaa_record': 'DNS AAAA Record (IPv6) Resolution',
'dns_mx_record': 'Email Server (MX) Configuration',
'dns_ns_record': 'Name Server Delegation',
'dns_cname_record': 'DNS Alias (CNAME) Resolution',
'crtsh_cert_issuer': 'SSL Certificate Issuer Relationship',
'crtsh_san_certificate': 'Shared SSL Certificate',
'shodan_isp': 'Internet Service Provider Assignment',
'shodan_a_record': 'IP-to-Domain Resolution (Shodan)',
'dns_ptr_record': 'Reverse DNS Resolution'
}
return type_map.get(rel_type, rel_type.replace('_', ' ').title())
def _calculate_confidence_distribution(self, edges: List[Dict]) -> Dict[str, int]:
"""Calculate confidence score distribution."""
distribution = {'high': 0, 'medium': 0, 'low': 0}
for edge in edges:
confidence = edge.get('confidence_score', 0)
if confidence >= 0.8:
distribution['high'] += 1
elif confidence >= 0.6:
distribution['medium'] += 1
else:
distribution['low'] += 1
return distribution
def _get_confidence_threshold(self, level: str) -> str:
"""Get confidence threshold for a level."""
thresholds = {'high': '0.80', 'medium': '0.60', 'low': '0.00'}
return thresholds.get(level, '0.00')
def _count_cross_validated_relationships(self, edges: List[Dict]) -> int:
"""Count relationships verified by multiple providers."""
# Group edges by source-target pair
edge_pairs = defaultdict(list)
for edge in edges:
pair_key = f"{edge['from']}->{edge['to']}"
edge_pairs[pair_key].append(edge.get('source_provider', ''))
# Count pairs with multiple providers
cross_validated = 0
for pair, providers in edge_pairs.items():
if len(set(providers)) > 1: # Multiple unique providers
cross_validated += 1
return cross_validated
def _generate_security_recommendations(self, infrastructure_analysis: Dict) -> List[str]:
"""Generate actionable security recommendations."""
recommendations = []
# Check for complex infrastructure
if infrastructure_analysis['ips'] > 10:
recommendations.append(
"Document and validate the necessity of extensive IP address infrastructure"
)
if infrastructure_analysis['correlations'] > 5:
recommendations.append(
"Investigate shared infrastructure components for operational security implications"
)
if not recommendations:
recommendations.append(
"Continue monitoring for changes in the identified digital infrastructure"
)
return recommendations
def _generate_conclusion(self, target: str, infrastructure_analysis: Dict, total_relationships: int) -> str:
"""Generate a professional conclusion for the report."""
conclusion_parts = [
f"The passive reconnaissance analysis of '{target}' has successfully mapped "
f"a digital infrastructure ecosystem consisting of {infrastructure_analysis['domains']} "
f"domain names, {infrastructure_analysis['ips']} IP addresses, and "
f"{total_relationships} verified inter-entity relationships."
]
conclusion_parts.append(
"All findings in this report are based on publicly available information and "
"passive reconnaissance techniques. The analysis maintains full forensic integrity "
"with complete audit trails for all data collection activities."
)
return " ".join(conclusion_parts)
def _count_bidirectional_relationships(self, graph) -> int:
"""Count bidirectional relationships in the graph."""
count = 0
for u, v in graph.edges():
if graph.has_edge(v, u):
count += 1
return count // 2 # Each pair counted twice
def _identify_hub_nodes(self, graph, nodes: List[Dict]) -> List[str]:
"""Identify nodes that serve as major hubs in the network."""
if not graph.nodes():
return []
degree_centrality = nx.degree_centrality(graph.to_undirected())
threshold = max(degree_centrality.values()) * 0.8 if degree_centrality else 0
return [node for node, centrality in degree_centrality.items()
if centrality >= threshold]
def _get_version(self) -> str:
"""Get DNSRecon version for report authentication."""
return "1.0.0-forensic"
def export_graph_json(self, graph_manager) -> Dict[str, Any]: def export_graph_json(self, graph_manager) -> Dict[str, Any]:
""" """