#!/usr/bin/env python3 """ Enhanced DNS Reconnaissance Tool with Recursive Analysis Copyright (c) 2025 mstoeck3. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ import subprocess import json import requests import argparse import sys import time import os import re import ipaddress from datetime import datetime from typing import Dict, List, Optional, Any, Set from urllib.parse import urlparse import threading from queue import Queue, Empty class EnhancedDNSReconTool: def __init__(self, shodan_api_key: Optional[str] = None, virustotal_api_key: Optional[str] = None): self.shodan_api_key = shodan_api_key self.virustotal_api_key = virustotal_api_key self.output_dir = "dns_recon_results" self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'EnhancedDNSReconTool/2.0 (Educational/Research Purpose)' }) # Track processed items to avoid infinite recursion self.processed_domains: Set[str] = set() self.processed_ips: Set[str] = set() # Results storage for recursive analysis self.all_results: Dict[str, Any] = {} # Rate limiting self.last_vt_request = 0 self.last_shodan_request = 0 self.vt_rate_limit = 4 # 4 requests per minute for free tier self.shodan_rate_limit = 1 # 1 request per second for free tier def check_dependencies(self) -> bool: """Check if required system tools are available.""" required_tools = ['dig', 'whois'] missing_tools = [] for tool in required_tools: try: subprocess.run([tool, '--help'], capture_output=True, check=False, timeout=5) except (subprocess.TimeoutExpired, FileNotFoundError): missing_tools.append(tool) if missing_tools: print(f"āŒ Missing required tools: {', '.join(missing_tools)}") print("Install with: apt install dnsutils whois (Ubuntu/Debian)") return False return True def run_command(self, cmd: str, timeout: int = 30) -> str: """Run shell command with timeout and error handling.""" try: result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=timeout ) return result.stdout.strip() if result.stdout else result.stderr.strip() except subprocess.TimeoutExpired: return "Error: Command timed out" except Exception as e: return f"Error: {str(e)}" def rate_limit_virustotal(self): """Implement rate limiting for VirusTotal API.""" current_time = time.time() time_since_last = current_time - self.last_vt_request min_interval = 60 / self.vt_rate_limit # seconds between requests if time_since_last < min_interval: sleep_time = min_interval - time_since_last print(f" Rate limiting: waiting {sleep_time:.1f}s for VirusTotal...") time.sleep(sleep_time) self.last_vt_request = time.time() def rate_limit_shodan(self): """Implement rate limiting for Shodan API.""" current_time = time.time() time_since_last = current_time - self.last_shodan_request min_interval = 1 / self.shodan_rate_limit # seconds between requests if time_since_last < min_interval: sleep_time = min_interval - time_since_last time.sleep(sleep_time) self.last_shodan_request = time.time() def query_virustotal_domain(self, domain: str) -> Dict[str, Any]: """Query VirusTotal API for domain information.""" if not self.virustotal_api_key: return { 'success': False, 'message': 'No VirusTotal API key provided' } print(f"šŸ” Querying VirusTotal for domain: {domain}") try: self.rate_limit_virustotal() url = f"https://www.virustotal.com/vtapi/v2/domain/report" params = { 'apikey': self.virustotal_api_key, 'domain': domain } response = self.session.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() # Extract key information result = { 'success': True, 'domain': domain, 'response_code': data.get('response_code', 0), 'verbose_msg': data.get('verbose_msg', ''), 'detection_ratio': f"{data.get('positives', 0)}/{data.get('total', 0)}" } # Add scan results if available if 'scans' in data: result['scan_engines'] = len(data['scans']) result['malicious_engines'] = sum(1 for scan in data['scans'].values() if scan.get('detected', False)) result['scan_summary'] = {} # Categorize detections for engine, scan_result in data['scans'].items(): if scan_result.get('detected', False): category = scan_result.get('result', 'malicious') if category not in result['scan_summary']: result['scan_summary'][category] = [] result['scan_summary'][category].append(engine) # Add additional data if available for key in ['subdomains', 'detected_urls', 'undetected_urls', 'resolutions']: if key in data: result[key] = data[key] return result else: return { 'success': False, 'error': f"HTTP {response.status_code}", 'message': response.text[:200] } except Exception as e: return { 'success': False, 'error': str(e), 'message': 'VirusTotal domain query failed' } def query_virustotal_ip(self, ip: str) -> Dict[str, Any]: """Query VirusTotal API for IP information.""" if not self.virustotal_api_key: return { 'success': False, 'message': 'No VirusTotal API key provided' } print(f"šŸ” Querying VirusTotal for IP: {ip}") try: self.rate_limit_virustotal() url = f"https://www.virustotal.com/vtapi/v2/ip-address/report" params = { 'apikey': self.virustotal_api_key, 'ip': ip } response = self.session.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() result = { 'success': True, 'ip': ip, 'response_code': data.get('response_code', 0), 'verbose_msg': data.get('verbose_msg', ''), 'detection_ratio': f"{data.get('positives', 0)}/{data.get('total', 0)}" } # Add scan results if available if 'scans' in data: result['scan_engines'] = len(data['scans']) result['malicious_engines'] = sum(1 for scan in data['scans'].values() if scan.get('detected', False)) # Add additional data for key in ['detected_urls', 'undetected_urls', 'resolutions', 'asn', 'country']: if key in data: result[key] = data[key] return result else: return { 'success': False, 'error': f"HTTP {response.status_code}", 'message': response.text[:200] } except Exception as e: return { 'success': False, 'error': str(e), 'message': 'VirusTotal IP query failed' } def get_dns_records(self, domain: str, record_type: str, server: Optional[str] = None) -> Dict[str, Any]: """Fetch DNS records with comprehensive error handling and proper parsing.""" server_flag = f"@{server}" if server else "" cmd = f"dig {domain} {record_type} {server_flag} +noall +answer" output = self.run_command(cmd) # Parse the output into structured data records = [] if output and not output.startswith("Error:"): for line in output.split('\n'): line = line.strip() if line and not line.startswith(';') and not line.startswith('>>'): # Split on any whitespace (handles both tabs and spaces) parts = line.split() if len(parts) >= 4: name = parts[0].rstrip('.') # Check if second field is numeric (TTL) if len(parts) >= 5 and parts[1].isdigit(): # Format: name TTL class type data ttl = parts[1] dns_class = parts[2] dns_type = parts[3] data = ' '.join(parts[4:]) else: # Format: name class type data (no TTL shown) ttl = '' dns_class = parts[1] dns_type = parts[2] data = ' '.join(parts[3:]) if len(parts) > 3 else '' # Validate that we have the expected record type if dns_type.upper() == record_type.upper(): records.append({ 'name': name, 'ttl': ttl, 'class': dns_class, 'type': dns_type, 'data': data }) return { 'query': f"{domain} {record_type}", 'server': server or 'system', 'raw_output': output, 'records': records, 'record_count': len(records) } def get_comprehensive_dns(self, domain: str) -> Dict[str, Any]: """Get comprehensive DNS information.""" print(f"šŸ” Gathering DNS records for {domain}...") # Standard record types record_types = ['A', 'AAAA', 'MX', 'NS', 'SOA', 'TXT', 'CNAME', 'CAA', 'SRV', 'PTR'] # DNS servers to query dns_servers = [ None, # System default '1.1.1.1', # Cloudflare '8.8.8.8', # Google '9.9.9.9', # Quad9 ] dns_results = {} for record_type in record_types: dns_results[record_type] = {} for server in dns_servers: server_name = server or 'system' result = self.get_dns_records(domain, record_type, server) dns_results[record_type][server_name] = result time.sleep(0.1) # Rate limiting # Try DNSSEC validation dnssec_cmd = f"dig {domain} +dnssec +noall +answer" dns_results['DNSSEC'] = { 'system': { 'query': f"{domain} +dnssec", 'raw_output': self.run_command(dnssec_cmd), 'records': [], 'record_count': 0 } } return dns_results def perform_reverse_dns(self, ip: str) -> Dict[str, Any]: """Perform reverse DNS lookup on IP address.""" print(f"šŸ”„ Reverse DNS lookup for {ip}") try: # Validate IP address ipaddress.ip_address(ip) # Perform reverse DNS lookup cmd = f"dig -x {ip} +short" output = self.run_command(cmd) hostnames = [] if output and not output.startswith("Error:"): hostnames = [line.strip().rstrip('.') for line in output.split('\n') if line.strip()] return { 'success': True, 'ip': ip, 'hostnames': hostnames, 'hostname_count': len(hostnames), 'raw_output': output } except Exception as e: return { 'success': False, 'ip': ip, 'error': str(e), 'hostnames': [], 'hostname_count': 0 } def extract_subdomains_from_certificates(self, domain: str) -> Set[str]: """Extract subdomains from certificate transparency logs.""" print(f"šŸ“‹ Extracting subdomains from certificates for {domain}") try: url = f"https://crt.sh/?q=%.{domain}&output=json" response = self.session.get(url, timeout=30) subdomains = set() if response.status_code == 200: cert_data = response.json() for cert in cert_data: name_value = cert.get('name_value', '') if name_value: # Handle multiple domains in one certificate domains_in_cert = [d.strip() for d in name_value.split('\n')] for subdomain in domains_in_cert: # Clean up the subdomain subdomain = subdomain.lower().strip() if subdomain and '.' in subdomain: # Only include subdomains of the target domain if subdomain.endswith(f".{domain}") or subdomain == domain: subdomains.add(subdomain) elif subdomain.startswith("*."): # Handle wildcard certificates clean_subdomain = subdomain[2:] if clean_subdomain.endswith(f".{domain}") or clean_subdomain == domain: subdomains.add(clean_subdomain) return subdomains except Exception as e: print(f" Error extracting subdomains: {e}") return set() def extract_ips_from_dns(self, dns_data: Dict[str, Any]) -> Set[str]: """Extract IP addresses from DNS records.""" ips = set() # Extract from A records for server_data in dns_data.get('A', {}).values(): for record in server_data.get('records', []): ip = record.get('data', '') if ip and self.is_valid_ip(ip): ips.add(ip) # Extract from AAAA records for server_data in dns_data.get('AAAA', {}).values(): for record in server_data.get('records', []): ipv6 = record.get('data', '') if ipv6 and self.is_valid_ip(ipv6): ips.add(ipv6) return ips def is_valid_ip(self, ip: str) -> bool: """Check if string is a valid IP address.""" try: ipaddress.ip_address(ip) return True except ValueError: return False def get_whois_data(self, domain: str) -> Dict[str, Any]: """Fetch and parse WHOIS data with improved parsing.""" print(f"šŸ“‹ Fetching WHOIS data for {domain}...") raw_whois = self.run_command(f"whois {domain}") # Basic parsing of common WHOIS fields whois_data = { 'raw': raw_whois, 'parsed': {} } if not raw_whois.startswith("Error:"): lines = raw_whois.split('\n') for line in lines: line = line.strip() if ':' in line and not line.startswith('%') and not line.startswith('#') and not line.startswith('>>>'): # Handle different WHOIS formats if line.count(':') == 1: key, value = line.split(':', 1) else: # Multiple colons - take first as key, rest as value parts = line.split(':', 2) key, value = parts[0], ':'.join(parts[1:]) key = key.strip().lower().replace(' ', '_').replace('-', '_') value = value.strip() if value and key: # Handle multiple values for same key (like name servers) if key in whois_data['parsed']: # Convert to list if not already if not isinstance(whois_data['parsed'][key], list): whois_data['parsed'][key] = [whois_data['parsed'][key]] whois_data['parsed'][key].append(value) else: whois_data['parsed'][key] = value return whois_data def get_certificate_transparency(self, domain: str) -> Dict[str, Any]: """Query certificate transparency logs via crt.sh.""" print(f"šŸ” Querying certificate transparency logs for {domain}...") try: # Query crt.sh API url = f"https://crt.sh/?q=%.{domain}&output=json" response = self.session.get(url, timeout=30) if response.status_code == 200: cert_data = response.json() # Extract unique subdomains subdomains = set() cert_details = [] for cert in cert_data: # Extract subdomains from name_value name_value = cert.get('name_value', '') if name_value: # Handle multiple domains in one certificate domains_in_cert = [d.strip() for d in name_value.split('\n')] subdomains.update(domains_in_cert) cert_details.append({ 'id': cert.get('id'), 'issuer': cert.get('issuer_name'), 'common_name': cert.get('common_name'), 'name_value': cert.get('name_value'), 'not_before': cert.get('not_before'), 'not_after': cert.get('not_after'), 'serial_number': cert.get('serial_number') }) return { 'success': True, 'total_certificates': len(cert_data), 'unique_subdomains': sorted(list(subdomains)), 'subdomain_count': len(subdomains), 'certificates': cert_details[:50] # Limit for output size } else: return { 'success': False, 'error': f"HTTP {response.status_code}", 'message': 'Failed to fetch certificate data' } except Exception as e: return { 'success': False, 'error': str(e), 'message': 'Request to crt.sh failed' } def query_shodan(self, domain: str) -> Dict[str, Any]: """Query Shodan API for domain information.""" if not self.shodan_api_key: return { 'success': False, 'message': 'No Shodan API key provided' } print(f"šŸ”Ž Querying Shodan for {domain}...") try: self.rate_limit_shodan() # Search for the domain url = f"https://api.shodan.io/shodan/host/search" params = { 'key': self.shodan_api_key, 'query': f'hostname:{domain}' } response = self.session.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() return { 'success': True, 'total_results': data.get('total', 0), 'matches': data.get('matches', [])[:10], # Limit results 'facets': data.get('facets', {}) } else: return { 'success': False, 'error': f"HTTP {response.status_code}", 'message': response.text[:200] } except Exception as e: return { 'success': False, 'error': str(e), 'message': 'Shodan query failed' } def query_shodan_ip(self, ip: str) -> Dict[str, Any]: """Query Shodan API for IP information.""" if not self.shodan_api_key: return { 'success': False, 'message': 'No Shodan API key provided' } print(f"šŸ”Ž Querying Shodan for IP {ip}...") try: self.rate_limit_shodan() url = f"https://api.shodan.io/shodan/host/{ip}" params = {'key': self.shodan_api_key} response = self.session.get(url, params=params, timeout=30) if response.status_code == 200: data = response.json() return { 'success': True, 'ip': ip, 'data': data } else: return { 'success': False, 'error': f"HTTP {response.status_code}", 'message': response.text[:200] } except Exception as e: return { 'success': False, 'error': str(e), 'message': 'Shodan IP query failed' } def analyze_domain_recursively(self, domain: str, depth: int = 0, max_depth: int = 2) -> Dict[str, Any]: """Perform comprehensive analysis on a domain with recursive subdomain discovery.""" if domain in self.processed_domains or depth > max_depth: return {} self.processed_domains.add(domain) print(f"\n{' ' * depth}šŸŽÆ Analyzing domain: {domain} (depth {depth})") results = { 'domain': domain, 'timestamp': datetime.now().isoformat(), 'depth': depth, 'dns_records': {}, 'whois': {}, 'certificate_transparency': {}, 'virustotal_domain': {}, 'shodan': {}, 'discovered_ips': {}, 'discovered_subdomains': {} } # DNS Records results['dns_records'] = self.get_comprehensive_dns(domain) # Extract IP addresses from DNS records discovered_ips = self.extract_ips_from_dns(results['dns_records']) # WHOIS (only for primary domain to avoid rate limiting) if depth == 0: results['whois'] = self.get_whois_data(domain) # Certificate Transparency results['certificate_transparency'] = self.get_certificate_transparency(domain) # VirusTotal Domain Analysis results['virustotal_domain'] = self.query_virustotal_domain(domain) # Shodan Domain Analysis results['shodan'] = self.query_shodan(domain) # Extract subdomains from certificate transparency if depth < max_depth: subdomains = self.extract_subdomains_from_certificates(domain) # Filter out already processed subdomains new_subdomains = subdomains - self.processed_domains new_subdomains.discard(domain) # Remove the current domain itself print(f"{' ' * depth}šŸ“‹ Found {len(new_subdomains)} new subdomains to analyze") # Recursively analyze subdomains (limit to prevent excessive recursion) for subdomain in list(new_subdomains)[:20]: # Limit to 20 subdomains per domain if subdomain not in self.processed_domains: subdomain_results = self.analyze_domain_recursively(subdomain, depth + 1, max_depth) if subdomain_results: results['discovered_subdomains'][subdomain] = subdomain_results # Analyze discovered IP addresses for ip in discovered_ips: if ip not in self.processed_ips: ip_results = self.analyze_ip_recursively(ip, depth) if ip_results: results['discovered_ips'][ip] = ip_results # Store in global results self.all_results[domain] = results return results def analyze_ip_recursively(self, ip: str, depth: int = 0) -> Dict[str, Any]: """Perform comprehensive analysis on an IP address.""" if ip in self.processed_ips: return {} self.processed_ips.add(ip) print(f"{' ' * depth}🌐 Analyzing IP: {ip}") results = { 'ip': ip, 'timestamp': datetime.now().isoformat(), 'reverse_dns': {}, 'virustotal_ip': {}, 'shodan_ip': {}, 'discovered_domains': {} } # Reverse DNS lookup results['reverse_dns'] = self.perform_reverse_dns(ip) # VirusTotal IP Analysis results['virustotal_ip'] = self.query_virustotal_ip(ip) # Shodan IP Analysis results['shodan_ip'] = self.query_shodan_ip(ip) # Analyze discovered domains from reverse DNS reverse_dns = results['reverse_dns'] if reverse_dns.get('success') and reverse_dns.get('hostnames'): for hostname in reverse_dns['hostnames'][:5]: # Limit to 5 hostnames if hostname not in self.processed_domains and hostname.count('.') >= 1: # Only analyze if it's a reasonable hostname and not already processed domain_results = self.analyze_domain_recursively(hostname, depth + 1, max_depth=1) if domain_results: results['discovered_domains'][hostname] = domain_results return results def create_comprehensive_summary(self, filename: str) -> None: """Create comprehensive summary report with recursive analysis results.""" with open(filename, 'w', encoding='utf-8') as f: f.write("Enhanced DNS Reconnaissance Report with Recursive Analysis\n") f.write("=" * 65 + "\n") f.write(f"Analysis completed at: {datetime.now().isoformat()}\n") f.write(f"Total domains analyzed: {len(self.processed_domains)}\n") f.write(f"Total IP addresses analyzed: {len(self.processed_ips)}\n\n") # Executive Summary f.write("EXECUTIVE SUMMARY\n") f.write("-" * 17 + "\n") total_threats = 0 domains_with_issues = [] ips_with_issues = [] # Count threats across all analyzed domains and IPs for domain, domain_data in self.all_results.items(): # Check VirusTotal results for domain vt_domain = domain_data.get('virustotal_domain', {}) if vt_domain.get('success') and vt_domain.get('malicious_engines', 0) > 0: total_threats += 1 domains_with_issues.append(domain) # Check discovered IPs for ip, ip_data in domain_data.get('discovered_ips', {}).items(): vt_ip = ip_data.get('virustotal_ip', {}) if vt_ip.get('success') and vt_ip.get('malicious_engines', 0) > 0: total_threats += 1 ips_with_issues.append(ip) f.write(f"Security Status: {'āš ļø THREATS DETECTED' if total_threats > 0 else 'āœ… NO THREATS DETECTED'}\n") f.write(f"Total Security Issues: {total_threats}\n") if domains_with_issues: f.write(f"Domains with issues: {', '.join(domains_with_issues[:5])}\n") if ips_with_issues: f.write(f"IPs with issues: {', '.join(ips_with_issues[:5])}\n") f.write("\n") # Process each domain in detail for domain, domain_data in self.all_results.items(): if domain_data.get('depth', 0) == 0: # Only show primary domains in detail self._write_domain_analysis(f, domain, domain_data) # Summary of all discovered assets f.write("\nASSET DISCOVERY SUMMARY\n") f.write("-" * 23 + "\n") f.write(f"All Discovered Domains ({len(self.processed_domains)}):\n") for domain in sorted(self.processed_domains): f.write(f" {domain}\n") f.write(f"\nAll Discovered IP Addresses ({len(self.processed_ips)}):\n") for ip in sorted(self.processed_ips, key=ipaddress.IPv4Address): f.write(f" {ip}\n") f.write(f"\n{'=' * 65}\n") f.write("Report Generation Complete\n") def _write_domain_analysis(self, f, domain: str, domain_data: Dict[str, Any]) -> None: """Write detailed domain analysis to file.""" f.write(f"\nDETAILED ANALYSIS: {domain.upper()}\n") f.write("=" * (20 + len(domain)) + "\n") # DNS Records Summary dns_data = domain_data.get('dns_records', {}) f.write("DNS Records Summary:\n") for record_type in ['A', 'AAAA', 'MX', 'NS', 'TXT']: system_records = dns_data.get(record_type, {}).get('system', {}).get('records', []) f.write(f" {record_type}: {len(system_records)} records\n") # Security Analysis f.write(f"\nSecurity Analysis:\n") # VirusTotal Domain Results vt_domain = domain_data.get('virustotal_domain', {}) if vt_domain.get('success'): detection_ratio = vt_domain.get('detection_ratio', '0/0') malicious_engines = vt_domain.get('malicious_engines', 0) f.write(f" VirusTotal Domain: {detection_ratio} ({malicious_engines} flagged as malicious)\n") if malicious_engines > 0: f.write(f" āš ļø SECURITY ALERT: Domain flagged by {malicious_engines} security engines\n") scan_summary = vt_domain.get('scan_summary', {}) for category, engines in scan_summary.items(): f.write(f" {category}: {', '.join(engines[:3])}\n") else: f.write(f" VirusTotal Domain: {vt_domain.get('message', 'Not available')}\n") # Certificate Information cert_data = domain_data.get('certificate_transparency', {}) if cert_data.get('success'): f.write(f" SSL Certificates: {cert_data.get('total_certificates', 0)} found\n") f.write(f" Subdomains from Certificates: {cert_data.get('subdomain_count', 0)}\n") # Discovered Assets discovered_ips = domain_data.get('discovered_ips', {}) discovered_subdomains = domain_data.get('discovered_subdomains', {}) if discovered_ips: f.write(f"\nDiscovered IP Addresses ({len(discovered_ips)}):\n") for ip, ip_data in discovered_ips.items(): vt_ip = ip_data.get('virustotal_ip', {}) reverse_dns = ip_data.get('reverse_dns', {}) f.write(f" {ip}:\n") # Reverse DNS if reverse_dns.get('success') and reverse_dns.get('hostnames'): f.write(f" Reverse DNS: {', '.join(reverse_dns['hostnames'][:3])}\n") # VirusTotal IP results if vt_ip.get('success'): detection_ratio = vt_ip.get('detection_ratio', '0/0') malicious_engines = vt_ip.get('malicious_engines', 0) f.write(f" VirusTotal: {detection_ratio}") if malicious_engines > 0: f.write(f" āš ļø FLAGGED BY {malicious_engines} ENGINES") f.write("\n") # Shodan IP results shodan_ip = ip_data.get('shodan_ip', {}) if shodan_ip.get('success'): shodan_data = shodan_ip.get('data', {}) ports = shodan_data.get('ports', []) if ports: f.write(f" Shodan Ports: {', '.join(map(str, ports[:10]))}\n") f.write("\n") if discovered_subdomains: f.write(f"Discovered Subdomains ({len(discovered_subdomains)}):\n") for subdomain, subdomain_data in discovered_subdomains.items(): f.write(f" {subdomain}\n") # Quick security check for subdomain vt_subdomain = subdomain_data.get('virustotal_domain', {}) if vt_subdomain.get('success') and vt_subdomain.get('malicious_engines', 0) > 0: f.write(f" āš ļø Security Issue: Flagged by VirusTotal\n") subdomain_ips = subdomain_data.get('discovered_ips', {}) if subdomain_ips: f.write(f" IPs: {', '.join(list(subdomain_ips.keys())[:3])}\n") f.write("\n") def save_results(self, domain: str) -> None: """Save results in multiple formats.""" if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_filename = f"{self.output_dir}/{domain}_{timestamp}" # Save complete JSON (all recursive data) json_file = f"{base_filename}_complete.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(self.all_results, f, indent=2, ensure_ascii=False, default=str) # Save comprehensive summary summary_file = f"{base_filename}_analysis.txt" self.create_comprehensive_summary(summary_file) # Save asset list (domains and IPs) assets_file = f"{base_filename}_assets.txt" with open(assets_file, 'w', encoding='utf-8') as f: f.write("Discovered Assets Summary\n") f.write("=" * 25 + "\n\n") f.write(f"Domains ({len(self.processed_domains)}):\n") for domain in sorted(self.processed_domains): f.write(f"{domain}\n") f.write(f"\nIP Addresses ({len(self.processed_ips)}):\n") for ip in sorted(self.processed_ips, key=lambda x: ipaddress.IPv4Address(x)): f.write(f"{ip}\n") print(f"\nšŸ“„ Results saved:") print(f" Complete JSON: {json_file}") print(f" Analysis Report: {summary_file}") print(f" Asset List: {assets_file}") def run_enhanced_reconnaissance(self, domain: str, max_depth: int = 2) -> Dict[str, Any]: """Run enhanced recursive DNS reconnaissance.""" print(f"\nšŸš€ Starting enhanced DNS reconnaissance for: {domain}") print(f" Max recursion depth: {max_depth}") print(f" APIs enabled: VirusTotal={bool(self.virustotal_api_key)}, Shodan={bool(self.shodan_api_key)}") start_time = time.time() # Clear previous results self.processed_domains.clear() self.processed_ips.clear() self.all_results.clear() # Start recursive analysis results = self.analyze_domain_recursively(domain, depth=0, max_depth=max_depth) end_time = time.time() duration = end_time - start_time print(f"\nāœ… Enhanced reconnaissance completed in {duration:.1f} seconds") print(f" Domains analyzed: {len(self.processed_domains)}") print(f" IP addresses analyzed: {len(self.processed_ips)}") return results def main(): parser = argparse.ArgumentParser( description="Enhanced DNS Reconnaissance Tool with Recursive Analysis - Use only on domains you own or have permission to test", epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws. Use responsibly." ) parser.add_argument('domain', help='Target domain (e.g., example.com)') parser.add_argument('--shodan-key', help='Shodan API key for additional reconnaissance') parser.add_argument('--virustotal-key', help='VirusTotal API key for threat intelligence') parser.add_argument('--max-depth', type=int, default=2, help='Maximum recursion depth for subdomain analysis (default: 2)') parser.add_argument('--output-dir', default='dns_recon_results', help='Output directory for results') args = parser.parse_args() # Validate domain format if not re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', args.domain): print("āŒ Invalid domain format. Please provide a valid domain (e.g., example.com)") sys.exit(1) # Initialize tool tool = EnhancedDNSReconTool( shodan_api_key=args.shodan_key, virustotal_api_key=args.virustotal_key ) tool.output_dir = args.output_dir # Check dependencies if not tool.check_dependencies(): sys.exit(1) # Warn about API keys if not args.virustotal_key: print("āš ļø No VirusTotal API key provided. Threat intelligence will be limited.") if not args.shodan_key: print("āš ļø No Shodan API key provided. Host intelligence will be limited.") try: # Run enhanced reconnaissance results = tool.run_enhanced_reconnaissance(args.domain, args.max_depth) # Save results tool.save_results(args.domain) print(f"\nšŸŽÆ Enhanced reconnaissance completed for {args.domain}") except KeyboardInterrupt: print("\nā¹ļø Reconnaissance interrupted by user") sys.exit(0) except Exception as e: print(f"āŒ Error during reconnaissance: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": main()