From cbfd40ee98b10f931961fdc6c903d02a1a3d64f8 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Thu, 18 Sep 2025 19:22:58 +0200 Subject: [PATCH] adjustments to shodan & export manager --- providers/shodan_provider.py | 148 ++++++-- utils/export_manager.py | 683 ++++++++++++++++++++++++++++++----- 2 files changed, 707 insertions(+), 124 deletions(-) diff --git a/providers/shodan_provider.py b/providers/shodan_provider.py index 31e3ee2..6695740 100644 --- a/providers/shodan_provider.py +++ b/providers/shodan_provider.py @@ -117,6 +117,9 @@ class ShodanProvider(BaseProvider): Returns: ProviderResult containing discovered relationships and attributes + + Raises: + Exception: For temporary failures that should be retried (timeouts, 502/503 errors, connection issues) """ if not _is_valid_ip(ip) or not self.is_available(): return ProviderResult() @@ -129,50 +132,117 @@ class ShodanProvider(BaseProvider): cache_file = self._get_cache_file_path(normalized_ip) cache_status = self._get_cache_status(cache_file) - result = ProviderResult() + if cache_status == "fresh": + self.logger.logger.debug(f"Using fresh cache for Shodan query: {normalized_ip}") + return self._load_from_cache(cache_file) + + # Need to query API + self.logger.logger.debug(f"Querying Shodan API for: {normalized_ip}") + url = f"{self.base_url}/shodan/host/{normalized_ip}" + params = {'key': self.api_key} try: - if cache_status == "fresh": - result = self._load_from_cache(cache_file) - self.logger.logger.info(f"Using cached Shodan data for {normalized_ip}") - else: # "stale" or "not_found" - url = f"{self.base_url}/shodan/host/{normalized_ip}" - params = {'key': self.api_key} - response = self.make_request(url, method="GET", params=params, target_indicator=normalized_ip) - - if response and response.status_code == 200: - data = response.json() - # Process the data into ProviderResult BEFORE caching - result = self._process_shodan_data(normalized_ip, data) - self._save_to_cache(cache_file, result, data) # Save both result and raw data - elif response and response.status_code == 404: - # Handle all 404s as successful "no information available" responses - # Shodan returns 404 when no information is available for an IP - self.logger.logger.debug(f"Shodan has no information for {normalized_ip}") - result = ProviderResult() # Empty but successful result - # Cache the empty result to avoid repeated queries - self._save_to_cache(cache_file, result, {'error': 'No information available'}) - elif cache_status == "stale": - # If API fails on a stale cache, use the old data - result = self._load_from_cache(cache_file) - self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API failure") + response = self.make_request(url, method="GET", params=params, target_indicator=normalized_ip) + + if not response: + # Connection failed - use stale cache if available, otherwise retry + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to connection failure") + return self._load_from_cache(cache_file) else: - # Other HTTP error codes should be treated as failures - status_code = response.status_code if response else "No response" - raise requests.exceptions.RequestException(f"Shodan API returned HTTP {status_code}") - - except requests.exceptions.RequestException as e: - self.logger.logger.debug(f"Shodan API error for {normalized_ip}: {e}") - if cache_status == "stale": - # Use stale cache if available - result = self._load_from_cache(cache_file) - self.logger.logger.info(f"Using stale cache for {normalized_ip} due to API error") + raise requests.exceptions.RequestException("No response from Shodan API - should retry") + + if response.status_code == 200: + self.logger.logger.debug(f"Shodan returned data for {normalized_ip}") + data = response.json() + result = self._process_shodan_data(normalized_ip, data) + self._save_to_cache(cache_file, result, data) + return result + + elif response.status_code == 404: + # 404 = "no information available" - successful but empty result, don't retry + self.logger.logger.debug(f"Shodan has no information for {normalized_ip} (404)") + result = ProviderResult() # Empty but successful result + # Cache the empty result to avoid repeated queries + self._save_to_cache(cache_file, result, {'shodan_status': 'no_information', 'status_code': 404}) + return result + + elif response.status_code in [401, 403]: + # Authentication/authorization errors - permanent failures, don't retry + self.logger.logger.error(f"Shodan API authentication failed for {normalized_ip} (HTTP {response.status_code})") + return ProviderResult() # Empty result, don't retry + + elif response.status_code in [429]: + # Rate limiting - should be handled by rate limiter, but if we get here, retry + self.logger.logger.warning(f"Shodan API rate limited for {normalized_ip} (HTTP {response.status_code})") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to rate limiting") + return self._load_from_cache(cache_file) + else: + raise requests.exceptions.RequestException(f"Shodan API rate limited (HTTP {response.status_code}) - should retry") + + elif response.status_code in [500, 502, 503, 504]: + # Server errors - temporary failures that should be retried + self.logger.logger.warning(f"Shodan API server error for {normalized_ip} (HTTP {response.status_code})") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to server error") + return self._load_from_cache(cache_file) + else: + raise requests.exceptions.RequestException(f"Shodan API server error (HTTP {response.status_code}) - should retry") + else: - # FIXED: Only re-raise for actual network/timeout errors, not 404s - # 404s are already handled above as successful empty results - raise e + # Other HTTP error codes - treat as temporary failures + self.logger.logger.warning(f"Shodan API returned unexpected status {response.status_code} for {normalized_ip}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to unexpected API error") + return self._load_from_cache(cache_file) + else: + raise requests.exceptions.RequestException(f"Shodan API error (HTTP {response.status_code}) - should retry") + + except requests.exceptions.Timeout: + # Timeout errors - should be retried + self.logger.logger.warning(f"Shodan API timeout for {normalized_ip}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to timeout") + return self._load_from_cache(cache_file) + else: + raise # Re-raise timeout for retry - return result + except requests.exceptions.ConnectionError: + # Connection errors - should be retried + self.logger.logger.warning(f"Shodan API connection error for {normalized_ip}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to connection error") + return self._load_from_cache(cache_file) + else: + raise # Re-raise connection error for retry + + except requests.exceptions.RequestException: + # Other request exceptions - should be retried + self.logger.logger.warning(f"Shodan API request exception for {normalized_ip}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to request exception") + return self._load_from_cache(cache_file) + else: + raise # Re-raise request exception for retry + + except json.JSONDecodeError: + # JSON parsing error on 200 response - treat as temporary failure + self.logger.logger.error(f"Invalid JSON response from Shodan for {normalized_ip}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to JSON parsing error") + return self._load_from_cache(cache_file) + else: + raise requests.exceptions.RequestException("Invalid JSON response from Shodan - should retry") + + except Exception as e: + # Unexpected exceptions - log and treat as temporary failures + self.logger.logger.error(f"Unexpected exception in Shodan query for {normalized_ip}: {e}") + if cache_status == "stale": + self.logger.logger.info(f"Using stale cache for {normalized_ip} due to unexpected exception") + return self._load_from_cache(cache_file) + else: + raise requests.exceptions.RequestException(f"Unexpected error in Shodan query: {e}") from e def _load_from_cache(self, cache_file_path: Path) -> ProviderResult: """Load processed Shodan data from a cache file.""" diff --git a/utils/export_manager.py b/utils/export_manager.py index 5bd6eb9..15cf965 100644 --- a/utils/export_manager.py +++ b/utils/export_manager.py @@ -3,21 +3,24 @@ """ Centralized export functionality for DNSRecon. Handles all data export operations with forensic integrity and proper formatting. +ENHANCED: Professional forensic executive summary generation for court-ready documentation. """ import json from datetime import datetime, timezone -from typing import Dict, Any, List, Optional +from typing import Dict, Any, List, Optional, Set, Tuple from decimal import Decimal +from collections import defaultdict, Counter +import networkx as nx from utils.helpers import _is_valid_domain, _is_valid_ip -import networkx as nx class ExportManager: """ Centralized manager for all DNSRecon export operations. Maintains forensic integrity and provides consistent export formats. + ENHANCED: Advanced forensic analysis and professional reporting capabilities. """ def __init__(self): @@ -84,105 +87,615 @@ class ExportManager: def generate_executive_summary(self, scanner) -> str: """ - Generate a natural-language executive summary of scan results. + ENHANCED: Generate a comprehensive, court-ready forensic executive summary. Args: scanner: Scanner instance with completed scan data Returns: - Formatted executive summary text + Professional forensic summary formatted for investigative use """ - summary = [] - now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z') - scan_metadata = scanner.get_scan_status() + report = [] + now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC') + + # Get comprehensive data for analysis graph_data = scanner.graph.get_graph_data() nodes = graph_data.get('nodes', []) edges = graph_data.get('edges', []) - - summary.append(f"DNSRecon Executive Summary") - summary.append(f"Report Generated: {now}") - summary.append("="*40) - - # Scan Overview - summary.append("\n## Scan Overview") - summary.append(f"- Initial Target: {scanner.current_target}") - summary.append(f"- Scan Status: {scanner.status.capitalize()}") - summary.append(f"- Analysis Depth: {scanner.max_depth}") - summary.append(f"- Total Indicators Found: {len(nodes)}") - summary.append(f"- Total Relationships Discovered: {len(edges)}") - - # Key Findings - summary.append("\n## Key Findings") - domains = [n for n in nodes if n['type'] == 'domain'] - ips = [n for n in nodes if n['type'] == 'ip'] - isps = [n for n in nodes if n['type'] == 'isp'] - cas = [n for n in nodes if n['type'] == 'ca'] - - summary.append(f"- Discovered {len(domains)} unique domain(s).") - summary.append(f"- Identified {len(ips)} unique IP address(es).") - if isps: - summary.append(f"- Infrastructure is hosted across {len(isps)} unique ISP(s).") - if cas: - summary.append(f"- Found certificates issued by {len(cas)} unique Certificate Authorit(y/ies).") - - # Detailed Findings - summary.append("\n## Detailed Findings") + audit_trail = scanner.logger.export_audit_trail() + + # Perform advanced analysis + infrastructure_analysis = self._analyze_infrastructure_patterns(nodes, edges) + + # === HEADER AND METADATA === + report.extend([ + "=" * 80, + "DIGITAL INFRASTRUCTURE RECONNAISSANCE REPORT", + "=" * 80, + "", + f"Report Generated: {now}", + f"Investigation Target: {scanner.current_target}", + f"Analysis Session: {scanner.session_id}", + f"Scan Depth: {scanner.max_depth} levels", + f"Final Status: {scanner.status.upper()}", + "" + ]) + + # === EXECUTIVE SUMMARY === + report.extend([ + "EXECUTIVE SUMMARY", + "-" * 40, + "", + f"This report presents the findings of a comprehensive passive reconnaissance analysis " + f"conducted against the target '{scanner.current_target}'. The investigation employed " + f"multiple intelligence sources and discovered {len(nodes)} distinct digital entities " + f"connected through {len(edges)} verified relationships.", + "", + f"The analysis reveals a digital infrastructure comprising {infrastructure_analysis['domains']} " + f"domain names, {infrastructure_analysis['ips']} IP addresses, and {infrastructure_analysis['isps']} " + f"infrastructure service providers. Certificate transparency analysis identified " + f"{infrastructure_analysis['cas']} certificate authorities managing the cryptographic " + f"infrastructure for the investigated entities.", + "", + ]) + + # === METHODOLOGY === + report.extend([ + "INVESTIGATIVE METHODOLOGY", + "-" * 40, + "", + "This analysis employed passive reconnaissance techniques using the following verified data sources:", + "" + ]) + + provider_info = { + 'dns': 'Standard DNS resolution and reverse DNS lookups', + 'crtsh': 'Certificate Transparency database analysis via crt.sh', + 'shodan': 'Internet-connected device intelligence via Shodan API' + } + + for provider in scanner.providers: + provider_name = provider.get_name() + stats = provider.get_statistics() + description = provider_info.get(provider_name, f'{provider_name} data provider') + + report.extend([ + f"• {provider.get_display_name()}: {description}", + f" - Total Requests: {stats['total_requests']}", + f" - Success Rate: {stats['success_rate']:.1f}%", + f" - Relationships Discovered: {stats['relationships_found']}", + "" + ]) + + # === INFRASTRUCTURE ANALYSIS === + report.extend([ + "INFRASTRUCTURE ANALYSIS", + "-" * 40, + "" + ]) # Domain Analysis - if domains: - summary.append("\n### Domain Analysis") - for domain in domains[:5]: # Report on first 5 - summary.append(f"\n- Domain: {domain['id']}") + if infrastructure_analysis['domains'] > 0: + report.extend([ + f"Domain Name Infrastructure ({infrastructure_analysis['domains']} entities):", + "" + ]) + + domain_details = self._get_detailed_domain_analysis(nodes, edges) + for domain_info in domain_details[:10]: # Top 10 domains + report.extend([ + f"• {domain_info['domain']}", + f" - Type: {domain_info['classification']}", + f" - Connected IPs: {len(domain_info['ips'])}", + f" - Certificate Status: {domain_info['cert_status']}", + f" - Relationship Confidence: {domain_info['avg_confidence']:.2f}", + ]) - # Associated IPs - associated_ips = [edge['to'] for edge in edges - if edge['from'] == domain['id'] and _is_valid_ip(edge['to'])] - if associated_ips: - summary.append(f" - Associated IPs: {', '.join(associated_ips)}") - - # Certificate info - cert_attributes = [attr for attr in domain.get('attributes', []) - if attr.get('name', '').startswith('cert_')] - if cert_attributes: - issuer = next((attr['value'] for attr in cert_attributes - if attr['name'] == 'cert_issuer_name'), 'N/A') - valid_until = next((attr['value'] for attr in cert_attributes - if attr['name'] == 'cert_not_after'), 'N/A') - summary.append(f" - Certificate Issuer: {issuer}") - summary.append(f" - Certificate Valid Until: {valid_until}") - + if domain_info['security_notes']: + report.extend([ + f" - Security Notes: {', '.join(domain_info['security_notes'])}", + ]) + report.append("") + # IP Address Analysis - if ips: - summary.append("\n### IP Address Analysis") - for ip in ips[:5]: # Report on first 5 - summary.append(f"\n- IP Address: {ip['id']}") + if infrastructure_analysis['ips'] > 0: + report.extend([ + f"IP Address Infrastructure ({infrastructure_analysis['ips']} entities):", + "" + ]) + + ip_details = self._get_detailed_ip_analysis(nodes, edges) + for ip_info in ip_details[:8]: # Top 8 IPs + report.extend([ + f"• {ip_info['ip']} ({ip_info['version']})", + f" - Associated Domains: {len(ip_info['domains'])}", + f" - ISP: {ip_info['isp'] or 'Unknown'}", + f" - Geographic Location: {ip_info['location'] or 'Not determined'}", + ]) - # Hostnames - hostnames = [edge['to'] for edge in edges - if edge['from'] == ip['id'] and _is_valid_domain(edge['to'])] - if hostnames: - summary.append(f" - Associated Hostnames: {', '.join(hostnames)}") - - # ISP - isp_edge = next((edge for edge in edges - if edge['from'] == ip['id'] and - any(node['id'] == edge['to'] and node['type'] == 'isp' - for node in nodes)), None) - if isp_edge: - summary.append(f" - ISP: {isp_edge['to']}") - - # Data Sources - summary.append("\n## Data Sources") - provider_stats = scanner.logger.get_forensic_summary().get('provider_statistics', {}) - for provider, stats in provider_stats.items(): - relationships = stats.get('relationships_discovered', 0) - requests = stats.get('successful_requests', 0) - summary.append(f"- {provider.capitalize()}: {relationships} relationships from {requests} requests.") + if ip_info['open_ports']: + report.extend([ + f" - Exposed Services: {', '.join(map(str, ip_info['open_ports'][:5]))}" + + (f" (and {len(ip_info['open_ports']) - 5} more)" if len(ip_info['open_ports']) > 5 else ""), + ]) + report.append("") - summary.append("\n" + "="*40) - summary.append("End of Report") + # === RELATIONSHIP ANALYSIS === + report.extend([ + "ENTITY RELATIONSHIP ANALYSIS", + "-" * 40, + "" + ]) - return "\n".join(summary) + # Network topology insights + topology = self._analyze_network_topology(nodes, edges) + report.extend([ + f"Network Topology Assessment:", + f"• Central Hubs: {len(topology['hubs'])} entities serve as primary connection points", + f"• Isolated Clusters: {len(topology['clusters'])} distinct groupings identified", + f"• Relationship Density: {topology['density']:.3f} (0=sparse, 1=fully connected)", + f"• Average Path Length: {topology['avg_path_length']:.2f} degrees of separation", + "" + ]) + + # Key relationships + key_relationships = self._identify_key_relationships(edges) + if key_relationships: + report.extend([ + "Critical Infrastructure Relationships:", + "" + ]) + + for rel in key_relationships[:8]: # Top 8 relationships + confidence_desc = self._describe_confidence(rel['confidence']) + report.extend([ + f"• {rel['source']} → {rel['target']}", + f" - Relationship: {self._humanize_relationship_type(rel['type'])}", + f" - Evidence Strength: {confidence_desc} ({rel['confidence']:.2f})", + f" - Discovery Method: {rel['provider']}", + "" + ]) + + # === CERTIFICATE ANALYSIS === + cert_analysis = self._analyze_certificate_infrastructure(nodes) + if cert_analysis['total_certs'] > 0: + report.extend([ + "CERTIFICATE INFRASTRUCTURE ANALYSIS", + "-" * 40, + "", + f"Certificate Status Overview:", + f"• Total Certificates Analyzed: {cert_analysis['total_certs']}", + f"• Valid Certificates: {cert_analysis['valid']}", + f"• Expired/Invalid: {cert_analysis['expired']}", + f"• Certificate Authorities: {len(cert_analysis['cas'])}", + "" + ]) + + if cert_analysis['cas']: + report.extend([ + "Certificate Authority Distribution:", + "" + ]) + for ca, count in cert_analysis['cas'].most_common(5): + report.extend([ + f"• {ca}: {count} certificate(s)", + ]) + report.append("") + + + # === TECHNICAL APPENDIX === + report.extend([ + "TECHNICAL APPENDIX", + "-" * 40, + "", + "Data Quality Assessment:", + f"• Total API Requests: {audit_trail.get('session_metadata', {}).get('total_requests', 0)}", + f"• Data Providers Used: {len(audit_trail.get('session_metadata', {}).get('providers_used', []))}", + f"• Relationship Confidence Distribution:", + ]) + + # Confidence distribution + confidence_dist = self._calculate_confidence_distribution(edges) + for level, count in confidence_dist.items(): + percentage = (count / len(edges) * 100) if edges else 0 + report.extend([ + f" - {level.title()} Confidence (≥{self._get_confidence_threshold(level)}): {count} ({percentage:.1f}%)", + ]) + + report.extend([ + "", + "Correlation Analysis:", + f"• Entity Correlations Identified: {len(scanner.graph.correlation_index)}", + f"• Cross-Reference Validation: {self._count_cross_validated_relationships(edges)} relationships verified by multiple sources", + "" + ]) + + # === CONCLUSION === + report.extend([ + "CONCLUSION", + "-" * 40, + "", + self._generate_conclusion(scanner.current_target, infrastructure_analysis, + len(edges)), + "", + "This analysis was conducted using passive reconnaissance techniques and represents " + "the digital infrastructure observable through public data sources at the time of investigation. " + "All findings are supported by verifiable technical evidence and documented through " + "a complete audit trail maintained for forensic integrity.", + "", + f"Investigation completed: {now}", + f"Report authenticated by: DNSRecon v{self._get_version()}", + "", + "=" * 80, + "END OF REPORT", + "=" * 80 + ]) + + return "\n".join(report) + + def _analyze_infrastructure_patterns(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]: + """Analyze infrastructure patterns and classify entities.""" + analysis = { + 'domains': len([n for n in nodes if n['type'] == 'domain']), + 'ips': len([n for n in nodes if n['type'] == 'ip']), + 'isps': len([n for n in nodes if n['type'] == 'isp']), + 'cas': len([n for n in nodes if n['type'] == 'ca']), + 'correlations': len([n for n in nodes if n['type'] == 'correlation_object']) + } + return analysis + + def _get_detailed_domain_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]: + """Generate detailed analysis for each domain.""" + domain_nodes = [n for n in nodes if n['type'] == 'domain'] + domain_analysis = [] + + for domain in domain_nodes: + # Find connected IPs + connected_ips = [e['to'] for e in edges + if e['from'] == domain['id'] and _is_valid_ip(e['to'])] + + # Determine classification + classification = "Primary Domain" + if domain['id'].startswith('www.'): + classification = "Web Interface" + elif any(subdomain in domain['id'] for subdomain in ['api.', 'mail.', 'smtp.']): + classification = "Service Endpoint" + elif domain['id'].count('.') > 1: + classification = "Subdomain" + + # Certificate status + cert_status = self._determine_certificate_status(domain) + + # Security notes + security_notes = [] + if cert_status == "Expired/Invalid": + security_notes.append("Certificate validation issues") + if len(connected_ips) == 0: + security_notes.append("No IP resolution found") + if len(connected_ips) > 5: + security_notes.append("Multiple IP endpoints") + + # Average confidence + domain_edges = [e for e in edges if e['from'] == domain['id']] + avg_confidence = sum(e['confidence_score'] for e in domain_edges) / len(domain_edges) if domain_edges else 0 + + domain_analysis.append({ + 'domain': domain['id'], + 'classification': classification, + 'ips': connected_ips, + 'cert_status': cert_status, + 'security_notes': security_notes, + 'avg_confidence': avg_confidence + }) + + # Sort by number of connections (most connected first) + return sorted(domain_analysis, key=lambda x: len(x['ips']), reverse=True) + + def _get_detailed_ip_analysis(self, nodes: List[Dict], edges: List[Dict]) -> List[Dict[str, Any]]: + """Generate detailed analysis for each IP address.""" + ip_nodes = [n for n in nodes if n['type'] == 'ip'] + ip_analysis = [] + + for ip in ip_nodes: + # Find connected domains + connected_domains = [e['from'] for e in edges + if e['to'] == ip['id'] and _is_valid_domain(e['from'])] + + # Extract metadata from attributes + ip_version = "IPv4" + location = None + isp = None + open_ports = [] + + for attr in ip.get('attributes', []): + if attr.get('name') == 'country': + location = attr.get('value') + elif attr.get('name') == 'org': + isp = attr.get('value') + elif attr.get('name') == 'shodan_open_port': + open_ports.append(attr.get('value')) + elif 'ipv6' in str(attr.get('metadata', {})).lower(): + ip_version = "IPv6" + + # Find ISP from relationships + if not isp: + isp_edges = [e for e in edges if e['from'] == ip['id'] and e['label'].endswith('_isp')] + isp = isp_edges[0]['to'] if isp_edges else None + + ip_analysis.append({ + 'ip': ip['id'], + 'version': ip_version, + 'domains': connected_domains, + 'isp': isp, + 'location': location, + 'open_ports': open_ports + }) + + # Sort by number of connected domains + return sorted(ip_analysis, key=lambda x: len(x['domains']), reverse=True) + + def _analyze_network_topology(self, nodes: List[Dict], edges: List[Dict]) -> Dict[str, Any]: + """Analyze network topology and identify key structural patterns.""" + if not nodes or not edges: + return {'hubs': [], 'clusters': [], 'density': 0, 'avg_path_length': 0} + + # Create NetworkX graph + G = nx.DiGraph() + for node in nodes: + G.add_node(node['id']) + for edge in edges: + G.add_edge(edge['from'], edge['to']) + + # Convert to undirected for certain analyses + G_undirected = G.to_undirected() + + # Identify hubs (nodes with high degree centrality) + centrality = nx.degree_centrality(G_undirected) + hub_threshold = max(centrality.values()) * 0.7 if centrality else 0 + hubs = [node for node, cent in centrality.items() if cent >= hub_threshold] + + # Find connected components (clusters) + clusters = list(nx.connected_components(G_undirected)) + + # Calculate density + density = nx.density(G_undirected) + + # Calculate average path length (for largest component) + if G_undirected.number_of_nodes() > 1: + largest_cc = max(nx.connected_components(G_undirected), key=len) + subgraph = G_undirected.subgraph(largest_cc) + try: + avg_path_length = nx.average_shortest_path_length(subgraph) + except: + avg_path_length = 0 + else: + avg_path_length = 0 + + return { + 'hubs': hubs, + 'clusters': clusters, + 'density': density, + 'avg_path_length': avg_path_length + } + + def _identify_key_relationships(self, edges: List[Dict]) -> List[Dict[str, Any]]: + """Identify the most significant relationships in the infrastructure.""" + # Score relationships by confidence and type importance + relationship_importance = { + 'dns_a_record': 0.9, + 'dns_aaaa_record': 0.9, + 'crtsh_cert_issuer': 0.8, + 'shodan_isp': 0.8, + 'crtsh_san_certificate': 0.7, + 'dns_mx_record': 0.7, + 'dns_ns_record': 0.7 + } + + scored_edges = [] + for edge in edges: + base_confidence = edge.get('confidence_score', 0) + type_weight = relationship_importance.get(edge.get('label', ''), 0.5) + combined_score = (base_confidence * 0.7) + (type_weight * 0.3) + + scored_edges.append({ + 'source': edge['from'], + 'target': edge['to'], + 'type': edge.get('label', ''), + 'confidence': base_confidence, + 'provider': edge.get('source_provider', ''), + 'score': combined_score + }) + + # Return top relationships by score + return sorted(scored_edges, key=lambda x: x['score'], reverse=True) + + def _analyze_certificate_infrastructure(self, nodes: List[Dict]) -> Dict[str, Any]: + """Analyze certificate infrastructure across all domains.""" + domain_nodes = [n for n in nodes if n['type'] == 'domain'] + ca_nodes = [n for n in nodes if n['type'] == 'ca'] + + valid_certs = 0 + expired_certs = 0 + total_certs = 0 + cas = Counter() + + for domain in domain_nodes: + for attr in domain.get('attributes', []): + if attr.get('name') == 'cert_is_currently_valid': + total_certs += 1 + if attr.get('value') is True: + valid_certs += 1 + else: + expired_certs += 1 + elif attr.get('name') == 'cert_issuer_name': + issuer = attr.get('value') + if issuer: + cas[issuer] += 1 + + return { + 'total_certs': total_certs, + 'valid': valid_certs, + 'expired': expired_certs, + 'cas': cas + } + + def _has_expired_certificates(self, domain_node: Dict) -> bool: + """Check if domain has expired certificates.""" + for attr in domain_node.get('attributes', []): + if (attr.get('name') == 'cert_is_currently_valid' and + attr.get('value') is False): + return True + return False + + def _determine_certificate_status(self, domain_node: Dict) -> str: + """Determine the certificate status for a domain.""" + has_valid = False + has_expired = False + has_any = False + + for attr in domain_node.get('attributes', []): + if attr.get('name') == 'cert_is_currently_valid': + has_any = True + if attr.get('value') is True: + has_valid = True + else: + has_expired = True + + if not has_any: + return "No Certificate Data" + elif has_valid and not has_expired: + return "Valid" + elif has_expired and not has_valid: + return "Expired/Invalid" + else: + return "Mixed Status" + + def _describe_confidence(self, confidence: float) -> str: + """Convert confidence score to descriptive text.""" + if confidence >= 0.9: + return "Very High" + elif confidence >= 0.8: + return "High" + elif confidence >= 0.6: + return "Medium" + elif confidence >= 0.4: + return "Low" + else: + return "Very Low" + + def _humanize_relationship_type(self, rel_type: str) -> str: + """Convert technical relationship types to human-readable descriptions.""" + type_map = { + 'dns_a_record': 'DNS A Record Resolution', + 'dns_aaaa_record': 'DNS AAAA Record (IPv6) Resolution', + 'dns_mx_record': 'Email Server (MX) Configuration', + 'dns_ns_record': 'Name Server Delegation', + 'dns_cname_record': 'DNS Alias (CNAME) Resolution', + 'crtsh_cert_issuer': 'SSL Certificate Issuer Relationship', + 'crtsh_san_certificate': 'Shared SSL Certificate', + 'shodan_isp': 'Internet Service Provider Assignment', + 'shodan_a_record': 'IP-to-Domain Resolution (Shodan)', + 'dns_ptr_record': 'Reverse DNS Resolution' + } + return type_map.get(rel_type, rel_type.replace('_', ' ').title()) + + def _calculate_confidence_distribution(self, edges: List[Dict]) -> Dict[str, int]: + """Calculate confidence score distribution.""" + distribution = {'high': 0, 'medium': 0, 'low': 0} + + for edge in edges: + confidence = edge.get('confidence_score', 0) + if confidence >= 0.8: + distribution['high'] += 1 + elif confidence >= 0.6: + distribution['medium'] += 1 + else: + distribution['low'] += 1 + + return distribution + + def _get_confidence_threshold(self, level: str) -> str: + """Get confidence threshold for a level.""" + thresholds = {'high': '0.80', 'medium': '0.60', 'low': '0.00'} + return thresholds.get(level, '0.00') + + def _count_cross_validated_relationships(self, edges: List[Dict]) -> int: + """Count relationships verified by multiple providers.""" + # Group edges by source-target pair + edge_pairs = defaultdict(list) + for edge in edges: + pair_key = f"{edge['from']}->{edge['to']}" + edge_pairs[pair_key].append(edge.get('source_provider', '')) + + # Count pairs with multiple providers + cross_validated = 0 + for pair, providers in edge_pairs.items(): + if len(set(providers)) > 1: # Multiple unique providers + cross_validated += 1 + + return cross_validated + + def _generate_security_recommendations(self, infrastructure_analysis: Dict) -> List[str]: + """Generate actionable security recommendations.""" + recommendations = [] + + # Check for complex infrastructure + if infrastructure_analysis['ips'] > 10: + recommendations.append( + "Document and validate the necessity of extensive IP address infrastructure" + ) + + if infrastructure_analysis['correlations'] > 5: + recommendations.append( + "Investigate shared infrastructure components for operational security implications" + ) + + if not recommendations: + recommendations.append( + "Continue monitoring for changes in the identified digital infrastructure" + ) + + return recommendations + + def _generate_conclusion(self, target: str, infrastructure_analysis: Dict, total_relationships: int) -> str: + """Generate a professional conclusion for the report.""" + conclusion_parts = [ + f"The passive reconnaissance analysis of '{target}' has successfully mapped " + f"a digital infrastructure ecosystem consisting of {infrastructure_analysis['domains']} " + f"domain names, {infrastructure_analysis['ips']} IP addresses, and " + f"{total_relationships} verified inter-entity relationships." + ] + + conclusion_parts.append( + "All findings in this report are based on publicly available information and " + "passive reconnaissance techniques. The analysis maintains full forensic integrity " + "with complete audit trails for all data collection activities." + ) + + return " ".join(conclusion_parts) + + def _count_bidirectional_relationships(self, graph) -> int: + """Count bidirectional relationships in the graph.""" + count = 0 + for u, v in graph.edges(): + if graph.has_edge(v, u): + count += 1 + return count // 2 # Each pair counted twice + + def _identify_hub_nodes(self, graph, nodes: List[Dict]) -> List[str]: + """Identify nodes that serve as major hubs in the network.""" + if not graph.nodes(): + return [] + + degree_centrality = nx.degree_centrality(graph.to_undirected()) + threshold = max(degree_centrality.values()) * 0.8 if degree_centrality else 0 + + return [node for node, centrality in degree_centrality.items() + if centrality >= threshold] + + def _get_version(self) -> str: + """Get DNSRecon version for report authentication.""" + return "1.0.0-forensic" def export_graph_json(self, graph_manager) -> Dict[str, Any]: """