From 941d8155952f5682d1782e013fd8d54bf1540767 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Tue, 9 Sep 2025 12:27:58 +0200 Subject: [PATCH] progress --- dnsrecon.py | 1154 +++++++++++++++++++++++++++------------------------ 1 file changed, 619 insertions(+), 535 deletions(-) diff --git a/dnsrecon.py b/dnsrecon.py index 01ee3e8..997b1f9 100644 --- a/dnsrecon.py +++ b/dnsrecon.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 """ +Enhanced DNS Reconnaissance Tool with Recursive Analysis + Copyright (c) 2025 mstoeck3. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: @@ -21,18 +23,37 @@ import argparse import sys import time import os +import re +import ipaddress from datetime import datetime -from typing import Dict, List, Optional, Any +from typing import Dict, List, Optional, Any, Set +from urllib.parse import urlparse +import threading +from queue import Queue, Empty -class DNSReconTool: - def __init__(self, shodan_api_key: Optional[str] = None): +class EnhancedDNSReconTool: + def __init__(self, shodan_api_key: Optional[str] = None, virustotal_api_key: Optional[str] = None): self.shodan_api_key = shodan_api_key + self.virustotal_api_key = virustotal_api_key self.output_dir = "dns_recon_results" self.session = requests.Session() self.session.headers.update({ - 'User-Agent': 'DNSReconTool/1.0 (Educational/Research Purpose)' + 'User-Agent': 'EnhancedDNSReconTool/2.0 (Educational/Research Purpose)' }) + # Track processed items to avoid infinite recursion + self.processed_domains: Set[str] = set() + self.processed_ips: Set[str] = set() + + # Results storage for recursive analysis + self.all_results: Dict[str, Any] = {} + + # Rate limiting + self.last_vt_request = 0 + self.last_shodan_request = 0 + self.vt_rate_limit = 4 # 4 requests per minute for free tier + self.shodan_rate_limit = 1 # 1 request per second for free tier + def check_dependencies(self) -> bool: """Check if required system tools are available.""" required_tools = ['dig', 'whois'] @@ -64,6 +85,155 @@ class DNSReconTool: except Exception as e: return f"Error: {str(e)}" + def rate_limit_virustotal(self): + """Implement rate limiting for VirusTotal API.""" + current_time = time.time() + time_since_last = current_time - self.last_vt_request + min_interval = 60 / self.vt_rate_limit # seconds between requests + + if time_since_last < min_interval: + sleep_time = min_interval - time_since_last + print(f" Rate limiting: waiting {sleep_time:.1f}s for VirusTotal...") + time.sleep(sleep_time) + + self.last_vt_request = time.time() + + def rate_limit_shodan(self): + """Implement rate limiting for Shodan API.""" + current_time = time.time() + time_since_last = current_time - self.last_shodan_request + min_interval = 1 / self.shodan_rate_limit # seconds between requests + + if time_since_last < min_interval: + sleep_time = min_interval - time_since_last + time.sleep(sleep_time) + + self.last_shodan_request = time.time() + + def query_virustotal_domain(self, domain: str) -> Dict[str, Any]: + """Query VirusTotal API for domain information.""" + if not self.virustotal_api_key: + return { + 'success': False, + 'message': 'No VirusTotal API key provided' + } + + print(f"šŸ” Querying VirusTotal for domain: {domain}") + + try: + self.rate_limit_virustotal() + + url = f"https://www.virustotal.com/vtapi/v2/domain/report" + params = { + 'apikey': self.virustotal_api_key, + 'domain': domain + } + + response = self.session.get(url, params=params, timeout=30) + + if response.status_code == 200: + data = response.json() + + # Extract key information + result = { + 'success': True, + 'domain': domain, + 'response_code': data.get('response_code', 0), + 'verbose_msg': data.get('verbose_msg', ''), + 'detection_ratio': f"{data.get('positives', 0)}/{data.get('total', 0)}" + } + + # Add scan results if available + if 'scans' in data: + result['scan_engines'] = len(data['scans']) + result['malicious_engines'] = sum(1 for scan in data['scans'].values() if scan.get('detected', False)) + result['scan_summary'] = {} + + # Categorize detections + for engine, scan_result in data['scans'].items(): + if scan_result.get('detected', False): + category = scan_result.get('result', 'malicious') + if category not in result['scan_summary']: + result['scan_summary'][category] = [] + result['scan_summary'][category].append(engine) + + # Add additional data if available + for key in ['subdomains', 'detected_urls', 'undetected_urls', 'resolutions']: + if key in data: + result[key] = data[key] + + return result + else: + return { + 'success': False, + 'error': f"HTTP {response.status_code}", + 'message': response.text[:200] + } + + except Exception as e: + return { + 'success': False, + 'error': str(e), + 'message': 'VirusTotal domain query failed' + } + + def query_virustotal_ip(self, ip: str) -> Dict[str, Any]: + """Query VirusTotal API for IP information.""" + if not self.virustotal_api_key: + return { + 'success': False, + 'message': 'No VirusTotal API key provided' + } + + print(f"šŸ” Querying VirusTotal for IP: {ip}") + + try: + self.rate_limit_virustotal() + + url = f"https://www.virustotal.com/vtapi/v2/ip-address/report" + params = { + 'apikey': self.virustotal_api_key, + 'ip': ip + } + + response = self.session.get(url, params=params, timeout=30) + + if response.status_code == 200: + data = response.json() + + result = { + 'success': True, + 'ip': ip, + 'response_code': data.get('response_code', 0), + 'verbose_msg': data.get('verbose_msg', ''), + 'detection_ratio': f"{data.get('positives', 0)}/{data.get('total', 0)}" + } + + # Add scan results if available + if 'scans' in data: + result['scan_engines'] = len(data['scans']) + result['malicious_engines'] = sum(1 for scan in data['scans'].values() if scan.get('detected', False)) + + # Add additional data + for key in ['detected_urls', 'undetected_urls', 'resolutions', 'asn', 'country']: + if key in data: + result[key] = data[key] + + return result + else: + return { + 'success': False, + 'error': f"HTTP {response.status_code}", + 'message': response.text[:200] + } + + except Exception as e: + return { + 'success': False, + 'error': str(e), + 'message': 'VirusTotal IP query failed' + } + def get_dns_records(self, domain: str, record_type: str, server: Optional[str] = None) -> Dict[str, Any]: """Fetch DNS records with comprehensive error handling and proper parsing.""" @@ -118,7 +288,7 @@ class DNSReconTool: def get_comprehensive_dns(self, domain: str) -> Dict[str, Any]: """Get comprehensive DNS information.""" - print("šŸ” Gathering DNS records...") + print(f"šŸ” Gathering DNS records for {domain}...") # Standard record types record_types = ['A', 'AAAA', 'MX', 'NS', 'SOA', 'TXT', 'CNAME', @@ -135,23 +305,15 @@ class DNSReconTool: dns_results = {} for record_type in record_types: - print(f" Querying {record_type} records...") dns_results[record_type] = {} for server in dns_servers: server_name = server or 'system' result = self.get_dns_records(domain, record_type, server) dns_results[record_type][server_name] = result - # Debug output for troubleshooting - if result['records']: - print(f" {server_name}: Found {len(result['records'])} {record_type} records") - elif result['raw_output'].startswith('Error:'): - print(f" {server_name}: {result['raw_output']}") - time.sleep(0.1) # Rate limiting # Try DNSSEC validation - print(" Querying DNSSEC information...") dnssec_cmd = f"dig {domain} +dnssec +noall +answer" dns_results['DNSSEC'] = { 'system': { @@ -164,9 +326,107 @@ class DNSReconTool: return dns_results + def perform_reverse_dns(self, ip: str) -> Dict[str, Any]: + """Perform reverse DNS lookup on IP address.""" + print(f"šŸ”„ Reverse DNS lookup for {ip}") + + try: + # Validate IP address + ipaddress.ip_address(ip) + + # Perform reverse DNS lookup + cmd = f"dig -x {ip} +short" + output = self.run_command(cmd) + + hostnames = [] + if output and not output.startswith("Error:"): + hostnames = [line.strip().rstrip('.') for line in output.split('\n') if line.strip()] + + return { + 'success': True, + 'ip': ip, + 'hostnames': hostnames, + 'hostname_count': len(hostnames), + 'raw_output': output + } + + except Exception as e: + return { + 'success': False, + 'ip': ip, + 'error': str(e), + 'hostnames': [], + 'hostname_count': 0 + } + + def extract_subdomains_from_certificates(self, domain: str) -> Set[str]: + """Extract subdomains from certificate transparency logs.""" + print(f"šŸ“‹ Extracting subdomains from certificates for {domain}") + + try: + url = f"https://crt.sh/?q=%.{domain}&output=json" + response = self.session.get(url, timeout=30) + + subdomains = set() + + if response.status_code == 200: + cert_data = response.json() + + for cert in cert_data: + name_value = cert.get('name_value', '') + if name_value: + # Handle multiple domains in one certificate + domains_in_cert = [d.strip() for d in name_value.split('\n')] + for subdomain in domains_in_cert: + # Clean up the subdomain + subdomain = subdomain.lower().strip() + if subdomain and '.' in subdomain: + # Only include subdomains of the target domain + if subdomain.endswith(f".{domain}") or subdomain == domain: + subdomains.add(subdomain) + elif subdomain.startswith("*."): + # Handle wildcard certificates + clean_subdomain = subdomain[2:] + if clean_subdomain.endswith(f".{domain}") or clean_subdomain == domain: + subdomains.add(clean_subdomain) + + return subdomains + + except Exception as e: + print(f" Error extracting subdomains: {e}") + return set() + + def extract_ips_from_dns(self, dns_data: Dict[str, Any]) -> Set[str]: + """Extract IP addresses from DNS records.""" + ips = set() + + # Extract from A records + for server_data in dns_data.get('A', {}).values(): + for record in server_data.get('records', []): + ip = record.get('data', '') + if ip and self.is_valid_ip(ip): + ips.add(ip) + + # Extract from AAAA records + for server_data in dns_data.get('AAAA', {}).values(): + for record in server_data.get('records', []): + ipv6 = record.get('data', '') + if ipv6 and self.is_valid_ip(ipv6): + ips.add(ipv6) + + return ips + + def is_valid_ip(self, ip: str) -> bool: + """Check if string is a valid IP address.""" + try: + ipaddress.ip_address(ip) + return True + except ValueError: + return False + def get_whois_data(self, domain: str) -> Dict[str, Any]: """Fetch and parse WHOIS data with improved parsing.""" - print("šŸ“‹ Fetching WHOIS data...") + print(f"šŸ“‹ Fetching WHOIS data for {domain}...") raw_whois = self.run_command(f"whois {domain}") @@ -205,7 +465,7 @@ class DNSReconTool: def get_certificate_transparency(self, domain: str) -> Dict[str, Any]: """Query certificate transparency logs via crt.sh.""" - print("šŸ” Querying certificate transparency logs...") + print(f"šŸ” Querying certificate transparency logs for {domain}...") try: # Query crt.sh API @@ -266,9 +526,11 @@ class DNSReconTool: 'message': 'No Shodan API key provided' } - print("šŸ”Ž Querying Shodan...") + print(f"šŸ”Ž Querying Shodan for {domain}...") try: + self.rate_limit_shodan() + # Search for the domain url = f"https://api.shodan.io/shodan/host/search" params = { @@ -299,512 +561,302 @@ class DNSReconTool: 'error': str(e), 'message': 'Shodan query failed' } + + def query_shodan_ip(self, ip: str) -> Dict[str, Any]: + """Query Shodan API for IP information.""" + if not self.shodan_api_key: + return { + 'success': False, + 'message': 'No Shodan API key provided' + } + + print(f"šŸ”Ž Querying Shodan for IP {ip}...") + + try: + self.rate_limit_shodan() + + url = f"https://api.shodan.io/shodan/host/{ip}" + params = {'key': self.shodan_api_key} + + response = self.session.get(url, params=params, timeout=30) + + if response.status_code == 200: + data = response.json() + return { + 'success': True, + 'ip': ip, + 'data': data + } + else: + return { + 'success': False, + 'error': f"HTTP {response.status_code}", + 'message': response.text[:200] + } + + except Exception as e: + return { + 'success': False, + 'error': str(e), + 'message': 'Shodan IP query failed' + } - def _write_dns_section(self, f, title: str, records: List[Dict], data_extractor): - """Helper method to write DNS record sections.""" - if records: - f.write(f"\n{title}:\n") - for record in records: - data = data_extractor(record) - f.write(f" {data}\n") - else: - f.write(f"\n{title}: None found\n") - - def _write_dns_server_comparison(self, f, dns_data: Dict): - """Compare responses from different DNS servers.""" - servers = ['system', '1.1.1.1', '8.8.8.8', '9.9.9.9'] - record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT'] + def analyze_domain_recursively(self, domain: str, depth: int = 0, max_depth: int = 2) -> Dict[str, Any]: + """Perform comprehensive analysis on a domain with recursive subdomain discovery.""" + if domain in self.processed_domains or depth > max_depth: + return {} - discrepancies_found = False + self.processed_domains.add(domain) - for record_type in record_types: - if record_type in dns_data: - f.write(f"\n{record_type} Records:\n") - server_results = {} - errors = {} - - for server in servers: - if server in dns_data[record_type]: - server_data = dns_data[record_type][server] - records = server_data.get('records', []) - raw_output = server_data.get('raw_output', '') - - if raw_output.startswith('Error:'): - errors[server] = raw_output - server_results[server] = set() - else: - server_results[server] = set(r.get('data', '') for r in records if r.get('data')) - else: - server_results[server] = set() - - # Show results for each server - for server in servers: - records = server_results.get(server, set()) - if server in errors: - f.write(f" {server:<12}: {errors[server]}\n") - elif records: - f.write(f" {server:<12}: {', '.join(sorted(records))}\n") - else: - f.write(f" {server:<12}: No records\n") - - # Check for discrepancies - if len(server_results) > 1: - unique_results = set(frozenset(result) for result in server_results.values()) - if len(unique_results) > 1: - f.write(f" āš ļø INCONSISTENCY DETECTED between servers!\n") - discrepancies_found = True + print(f"\n{' ' * depth}šŸŽÆ Analyzing domain: {domain} (depth {depth})") - if not discrepancies_found: - f.write(f"\nāœ… All DNS servers return consistent results\n") - - def create_summary_report(self, results: Dict[str, Any], filename: str) -> None: - """Create comprehensive human-readable summary report including ALL collected data.""" + results = { + 'domain': domain, + 'timestamp': datetime.now().isoformat(), + 'depth': depth, + 'dns_records': {}, + 'whois': {}, + 'certificate_transparency': {}, + 'virustotal_domain': {}, + 'shodan': {}, + 'discovered_ips': {}, + 'discovered_subdomains': {} + } + + # DNS Records + results['dns_records'] = self.get_comprehensive_dns(domain) + + # Extract IP addresses from DNS records + discovered_ips = self.extract_ips_from_dns(results['dns_records']) + + # WHOIS (only for primary domain to avoid rate limiting) + if depth == 0: + results['whois'] = self.get_whois_data(domain) + + # Certificate Transparency + results['certificate_transparency'] = self.get_certificate_transparency(domain) + + # VirusTotal Domain Analysis + results['virustotal_domain'] = self.query_virustotal_domain(domain) + + # Shodan Domain Analysis + results['shodan'] = self.query_shodan(domain) + + # Extract subdomains from certificate transparency + if depth < max_depth: + subdomains = self.extract_subdomains_from_certificates(domain) + + # Filter out already processed subdomains + new_subdomains = subdomains - self.processed_domains + new_subdomains.discard(domain) # Remove the current domain itself + + print(f"{' ' * depth}šŸ“‹ Found {len(new_subdomains)} new subdomains to analyze") + + # Recursively analyze subdomains (limit to prevent excessive recursion) + for subdomain in list(new_subdomains)[:20]: # Limit to 20 subdomains per domain + if subdomain not in self.processed_domains: + subdomain_results = self.analyze_domain_recursively(subdomain, depth + 1, max_depth) + if subdomain_results: + results['discovered_subdomains'][subdomain] = subdomain_results + + # Analyze discovered IP addresses + for ip in discovered_ips: + if ip not in self.processed_ips: + ip_results = self.analyze_ip_recursively(ip, depth) + if ip_results: + results['discovered_ips'][ip] = ip_results + + # Store in global results + self.all_results[domain] = results + + return results + + def analyze_ip_recursively(self, ip: str, depth: int = 0) -> Dict[str, Any]: + """Perform comprehensive analysis on an IP address.""" + if ip in self.processed_ips: + return {} + + self.processed_ips.add(ip) + + print(f"{' ' * depth}🌐 Analyzing IP: {ip}") + + results = { + 'ip': ip, + 'timestamp': datetime.now().isoformat(), + 'reverse_dns': {}, + 'virustotal_ip': {}, + 'shodan_ip': {}, + 'discovered_domains': {} + } + + # Reverse DNS lookup + results['reverse_dns'] = self.perform_reverse_dns(ip) + + # VirusTotal IP Analysis + results['virustotal_ip'] = self.query_virustotal_ip(ip) + + # Shodan IP Analysis + results['shodan_ip'] = self.query_shodan_ip(ip) + + # Analyze discovered domains from reverse DNS + reverse_dns = results['reverse_dns'] + if reverse_dns.get('success') and reverse_dns.get('hostnames'): + for hostname in reverse_dns['hostnames'][:5]: # Limit to 5 hostnames + if hostname not in self.processed_domains and hostname.count('.') >= 1: + # Only analyze if it's a reasonable hostname and not already processed + domain_results = self.analyze_domain_recursively(hostname, depth + 1, max_depth=1) + if domain_results: + results['discovered_domains'][hostname] = domain_results + + return results + + def create_comprehensive_summary(self, filename: str) -> None: + """Create comprehensive summary report with recursive analysis results.""" with open(filename, 'w', encoding='utf-8') as f: - f.write(f"DNS Reconnaissance Report\n") - f.write(f"{'='*50}\n") - f.write(f"Domain: {results['domain']}\n") - f.write(f"Timestamp: {results['timestamp']}\n\n") + f.write("Enhanced DNS Reconnaissance Report with Recursive Analysis\n") + f.write("=" * 65 + "\n") + f.write(f"Analysis completed at: {datetime.now().isoformat()}\n") + f.write(f"Total domains analyzed: {len(self.processed_domains)}\n") + f.write(f"Total IP addresses analyzed: {len(self.processed_ips)}\n\n") - dns_data = results.get('dns_records', {}) - - # Helper function to get records from system DNS - def get_system_records(record_type): - return dns_data.get(record_type, {}).get('system', {}).get('records', []) - - # Helper function to get all server records for a type - def get_all_server_records(record_type): - servers = ['system', '1.1.1.1', '8.8.8.8', '9.9.9.9'] - results = {} - for server in servers: - if record_type in dns_data and server in dns_data[record_type]: - server_data = dns_data[record_type][server] - results[server] = { - 'records': server_data.get('records', []), - 'raw_output': server_data.get('raw_output', ''), - 'record_count': server_data.get('record_count', 0) - } - return results - - # A Records (IPv4) with TTL and all servers - f.write(f"\nA Records (IPv4):\n") - f.write("-" * 16 + "\n") - a_servers = get_all_server_records('A') - if any(server_data['records'] for server_data in a_servers.values()): - for server, server_data in a_servers.items(): - records = server_data['records'] - if records: - f.write(f" {server}:\n") - for record in records: - ip = record.get('data', 'N/A') - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - f.write(f" {ip} (TTL: {ttl})\n") - elif server_data['raw_output'].startswith('Error:'): - f.write(f" {server}: {server_data['raw_output']}\n") - else: - f.write(f" {server}: No records found\n") - else: - f.write(" No A records found on any server\n") - - # AAAA Records (IPv6) with TTL and all servers - f.write(f"\nAAAA Records (IPv6):\n") + # Executive Summary + f.write("EXECUTIVE SUMMARY\n") f.write("-" * 17 + "\n") - aaaa_servers = get_all_server_records('AAAA') - if any(server_data['records'] for server_data in aaaa_servers.values()): - for server, server_data in aaaa_servers.items(): - records = server_data['records'] - if records: - f.write(f" {server}:\n") - for record in records: - ipv6 = record.get('data', 'N/A') - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - f.write(f" {ipv6} (TTL: {ttl})\n") - elif server_data['raw_output'].startswith('Error:'): - f.write(f" {server}: {server_data['raw_output']}\n") - else: - f.write(f" {server}: No records found\n") - else: - f.write(" No AAAA records found on any server\n") - # MX Records (Mail Servers) with TTL - mx_records = get_system_records('MX') - f.write(f"\nMX Records (Mail Servers):\n") - f.write("-" * 26 + "\n") - if mx_records: - for record in mx_records: - data_parts = record.get('data', '').split() - priority = data_parts[0] if data_parts else 'N/A' - server = ' '.join(data_parts[1:]) if len(data_parts) > 1 else 'N/A' - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - f.write(f" Priority {priority}: {server} (TTL: {ttl})\n") - else: - f.write(" No MX records found\n") + total_threats = 0 + domains_with_issues = [] + ips_with_issues = [] - # NS Records (Name Servers) with TTL - ns_records = get_system_records('NS') - f.write(f"\nNS Records (Name Servers):\n") - f.write("-" * 26 + "\n") - if ns_records: - for record in ns_records: - ns = record.get('data', 'N/A') - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - f.write(f" {ns} (TTL: {ttl})\n") - else: - f.write(" No NS records found\n") - - # CNAME Records with TTL - cname_records = get_system_records('CNAME') - f.write(f"\nCNAME Records:\n") - f.write("-" * 14 + "\n") - if cname_records: - for record in cname_records: - name = record.get('name', 'N/A') - target = record.get('data', 'N/A') - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - f.write(f" {name} -> {target} (TTL: {ttl})\n") - else: - f.write(" No CNAME records found\n") - - # TXT Records with categorization and TTL - txt_records = get_system_records('TXT') - f.write(f"\nTXT Records:\n") - f.write("-" * 12 + "\n") - if txt_records: - for record in txt_records: - txt_data = record.get('data', '').strip() - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - - # Clean up quoted text - if txt_data.startswith('"') and txt_data.endswith('"'): - txt_data = txt_data[1:-1] - - # Identify common TXT record types - if txt_data.startswith('v=spf1'): - f.write(f" [SPF] {txt_data} (TTL: {ttl})\n") - elif txt_data.startswith('v=DMARC1'): - f.write(f" [DMARC] {txt_data} (TTL: {ttl})\n") - elif txt_data.startswith('v=DKIM1'): - f.write(f" [DKIM] {txt_data} (TTL: {ttl})\n") - elif 'google-site-verification' in txt_data: - f.write(f" [Google Verification] {txt_data[:50]}... (TTL: {ttl})\n") - elif '_domainkey' in txt_data: - f.write(f" [Domain Key] {txt_data} (TTL: {ttl})\n") - elif 'facebook-domain-verification' in txt_data: - f.write(f" [Facebook Verification] {txt_data[:50]}... (TTL: {ttl})\n") - elif txt_data.startswith('MS='): - f.write(f" [Microsoft Verification] {txt_data} (TTL: {ttl})\n") - else: - f.write(f" {txt_data} (TTL: {ttl})\n") - else: - f.write(" No TXT records found\n") - - # CAA Records (Certificate Authority Authorization) with TTL - caa_records = get_system_records('CAA') - f.write(f"\nCAA Records (Certificate Authority Authorization):\n") - f.write("-" * 48 + "\n") - if caa_records: - for record in caa_records: - data_parts = record.get('data', '').split() - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - if len(data_parts) >= 3: - flags = data_parts[0] - tag = data_parts[1] - value = ' '.join(data_parts[2:]).strip('"') - f.write(f" {flags} {tag} {value} (TTL: {ttl})\n") - else: - f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n") - else: - f.write(" No CAA records found\n") - - # SRV Records with TTL - srv_records = get_system_records('SRV') - f.write(f"\nSRV Records (Service Records):\n") - f.write("-" * 30 + "\n") - if srv_records: - for record in srv_records: - data_parts = record.get('data', '').split() - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - if len(data_parts) >= 4: - priority, weight, port, target = data_parts[:4] - f.write(f" {record.get('name', 'N/A')}\n") - f.write(f" Priority: {priority}, Weight: {weight}\n") - f.write(f" Port: {port}, Target: {target}\n") - f.write(f" TTL: {ttl}\n") - else: - f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n") - else: - f.write(" No SRV records found\n") - - # PTR Records (Reverse DNS) - MISSING FROM ORIGINAL - ptr_records = get_system_records('PTR') - f.write(f"\nPTR Records (Reverse DNS):\n") - f.write("-" * 26 + "\n") - if ptr_records: - for record in ptr_records: - ptr_data = record.get('data', 'N/A') - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - name = record.get('name', 'N/A') - f.write(f" {name} -> {ptr_data} (TTL: {ttl})\n") - else: - f.write(" No PTR records found\n") - - # SOA Record (Start of Authority) with detailed parsing - soa_records = get_system_records('SOA') - f.write(f"\nSOA Record (Zone Authority):\n") - f.write("-" * 27 + "\n") - if soa_records: - for record in soa_records: - data_parts = record.get('data', '').split() - ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown' - if len(data_parts) >= 7: - primary_ns, admin_email = data_parts[:2] - serial, refresh, retry, expire, minimum = data_parts[2:7] - f.write(f" Primary Name Server: {primary_ns}\n") - f.write(f" Admin Email: {admin_email}\n") - f.write(f" Serial Number: {serial}\n") - f.write(f" Refresh Interval: {refresh} seconds\n") - f.write(f" Retry Interval: {retry} seconds\n") - f.write(f" Expire Time: {expire} seconds\n") - f.write(f" Minimum TTL: {minimum} seconds\n") - f.write(f" Record TTL: {ttl}\n") - else: - f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n") - else: - f.write(" No SOA record found\n") - - # DNSSEC Information with detailed analysis - dnssec_data = dns_data.get('DNSSEC', {}).get('system', {}) - dnssec_output = dnssec_data.get('raw_output', '') - f.write(f"\nDNSSEC Status:\n") - f.write("-" * 14 + "\n") - if dnssec_output and not dnssec_output.startswith('Error:') and dnssec_output.strip(): - if 'RRSIG' in dnssec_output: - f.write(" āœ… DNSSEC is enabled (RRSIG records found)\n") - elif 'DNSKEY' in dnssec_output: - f.write(" āœ… DNSSEC keys present (DNSKEY records found)\n") - elif 'NSEC' in dnssec_output or 'NSEC3' in dnssec_output: - f.write(" āœ… DNSSEC authenticated denial (NSEC/NSEC3 found)\n") - else: - f.write(" ā“ DNSSEC query returned data but no signatures detected\n") + # Count threats across all analyzed domains and IPs + for domain, domain_data in self.all_results.items(): + # Check VirusTotal results for domain + vt_domain = domain_data.get('virustotal_domain', {}) + if vt_domain.get('success') and vt_domain.get('malicious_engines', 0) > 0: + total_threats += 1 + domains_with_issues.append(domain) - # Show sample DNSSEC records (first few lines) - dnssec_lines = [line.strip() for line in dnssec_output.split('\n') if line.strip()] - if dnssec_lines: - f.write(" Sample DNSSEC records:\n") - for line in dnssec_lines[:3]: # Show first 3 lines - f.write(f" {line}\n") - if len(dnssec_lines) > 3: - f.write(f" ... and {len(dnssec_lines) - 3} more\n") - else: - f.write(" āŒ DNSSEC not detected or query failed\n") - if dnssec_output.startswith('Error:'): - f.write(f" Error: {dnssec_output}\n") + # Check discovered IPs + for ip, ip_data in domain_data.get('discovered_ips', {}).items(): + vt_ip = ip_data.get('virustotal_ip', {}) + if vt_ip.get('success') and vt_ip.get('malicious_engines', 0) > 0: + total_threats += 1 + ips_with_issues.append(ip) - # Complete DNS Server Comparison Table - f.write(f"\nComplete DNS Server Comparison:\n") - f.write("-" * 33 + "\n") - self._write_dns_server_comparison(f, dns_data) + f.write(f"Security Status: {'āš ļø THREATS DETECTED' if total_threats > 0 else 'āœ… NO THREATS DETECTED'}\n") + f.write(f"Total Security Issues: {total_threats}\n") + if domains_with_issues: + f.write(f"Domains with issues: {', '.join(domains_with_issues[:5])}\n") + if ips_with_issues: + f.write(f"IPs with issues: {', '.join(ips_with_issues[:5])}\n") + f.write("\n") - # Enhanced WHOIS Information with ALL parsed fields - whois_data = results.get('whois', {}) - f.write(f"\nWHOIS Information (Complete):\n") - f.write("-" * 29 + "\n") - if whois_data.get('parsed'): - parsed = whois_data['parsed'] - - # Group fields by category for better organization - domain_fields = ['domain_name', 'domain'] - registrar_fields = ['registrar', 'sponsoring_registrar', 'registrar_whois_server', 'registrar_url'] - date_fields = ['creation_date', 'created', 'expiration_date', 'registry_expiry_date', 'expires', 'updated_date', 'changed', 'last_updated'] - status_fields = ['status', 'domain_status'] - ns_fields = [k for k in parsed.keys() if 'name_server' in k.lower() or k.lower().startswith('nserver')] - contact_fields = [k for k in parsed.keys() if any(x in k.lower() for x in ['registrant', 'admin', 'tech', 'billing'])] - - # Display organized sections - for field in domain_fields: - if field in parsed: - f.write(f" Domain: {parsed[field]}\n") - break - - for field in registrar_fields: - if field in parsed: - f.write(f" Registrar: {parsed[field]}\n") - break - - # Show all date fields found - for field in date_fields: - if field in parsed: - field_display = field.replace('_', ' ').title() - f.write(f" {field_display}: {parsed[field]}\n") - - for field in status_fields: - if field in parsed: - f.write(f" Status: {parsed[field]}\n") - break - - # Name servers from WHOIS - if ns_fields: - f.write(f" WHOIS Name Servers:\n") - for field in sorted(ns_fields): - value = parsed[field] - if isinstance(value, list): - for ns in value: - f.write(f" {ns}\n") - else: - f.write(f" {value}\n") - - # Show any other significant fields not covered above - covered_fields = set(domain_fields + registrar_fields + date_fields + status_fields + ns_fields + contact_fields) - other_fields = [k for k in parsed.keys() if k not in covered_fields and not k.startswith('nserver')] - if other_fields: - f.write(f" Other WHOIS Data:\n") - for field in sorted(other_fields)[:10]: # Limit to prevent spam - field_display = field.replace('_', ' ').title() - value = parsed[field] - if isinstance(value, list): - value = ', '.join(value[:3]) # Show first 3 if list - value_display = value[:100] + '...' if len(str(value)) > 100 else str(value) - f.write(f" {field_display}: {value_display}\n") + # Process each domain in detail + for domain, domain_data in self.all_results.items(): + if domain_data.get('depth', 0) == 0: # Only show primary domains in detail + self._write_domain_analysis(f, domain, domain_data) - raw_whois = whois_data.get('raw', '') - if raw_whois.startswith('Error:'): - f.write(f" WHOIS Error: {raw_whois}\n") - elif not whois_data.get('parsed'): - f.write(" No WHOIS data could be parsed\n") + # Summary of all discovered assets + f.write("\nASSET DISCOVERY SUMMARY\n") + f.write("-" * 23 + "\n") + f.write(f"All Discovered Domains ({len(self.processed_domains)}):\n") + for domain in sorted(self.processed_domains): + f.write(f" {domain}\n") - # Certificate Transparency with comprehensive details - cert_data = results.get('certificate_transparency', {}) - f.write(f"\nCertificate Transparency Logs:\n") - f.write("-" * 30 + "\n") - if cert_data.get('success'): - total_certs = cert_data.get('total_certificates', 0) - subdomain_count = cert_data.get('subdomain_count', 0) - f.write(f" Total Certificates Found: {total_certs}\n") - f.write(f" Unique Subdomains Discovered: {subdomain_count}\n\n") + f.write(f"\nAll Discovered IP Addresses ({len(self.processed_ips)}):\n") + for ip in sorted(self.processed_ips, key=ipaddress.IPv4Address): + f.write(f" {ip}\n") + + f.write(f"\n{'=' * 65}\n") + f.write("Report Generation Complete\n") + + def _write_domain_analysis(self, f, domain: str, domain_data: Dict[str, Any]) -> None: + """Write detailed domain analysis to file.""" + f.write(f"\nDETAILED ANALYSIS: {domain.upper()}\n") + f.write("=" * (20 + len(domain)) + "\n") + + # DNS Records Summary + dns_data = domain_data.get('dns_records', {}) + f.write("DNS Records Summary:\n") + for record_type in ['A', 'AAAA', 'MX', 'NS', 'TXT']: + system_records = dns_data.get(record_type, {}).get('system', {}).get('records', []) + f.write(f" {record_type}: {len(system_records)} records\n") + + # Security Analysis + f.write(f"\nSecurity Analysis:\n") + + # VirusTotal Domain Results + vt_domain = domain_data.get('virustotal_domain', {}) + if vt_domain.get('success'): + detection_ratio = vt_domain.get('detection_ratio', '0/0') + malicious_engines = vt_domain.get('malicious_engines', 0) + f.write(f" VirusTotal Domain: {detection_ratio} ({malicious_engines} flagged as malicious)\n") + + if malicious_engines > 0: + f.write(f" āš ļø SECURITY ALERT: Domain flagged by {malicious_engines} security engines\n") + scan_summary = vt_domain.get('scan_summary', {}) + for category, engines in scan_summary.items(): + f.write(f" {category}: {', '.join(engines[:3])}\n") + else: + f.write(f" VirusTotal Domain: {vt_domain.get('message', 'Not available')}\n") + + # Certificate Information + cert_data = domain_data.get('certificate_transparency', {}) + if cert_data.get('success'): + f.write(f" SSL Certificates: {cert_data.get('total_certificates', 0)} found\n") + f.write(f" Subdomains from Certificates: {cert_data.get('subdomain_count', 0)}\n") + + # Discovered Assets + discovered_ips = domain_data.get('discovered_ips', {}) + discovered_subdomains = domain_data.get('discovered_subdomains', {}) + + if discovered_ips: + f.write(f"\nDiscovered IP Addresses ({len(discovered_ips)}):\n") + for ip, ip_data in discovered_ips.items(): + vt_ip = ip_data.get('virustotal_ip', {}) + reverse_dns = ip_data.get('reverse_dns', {}) - # Show certificate statistics by issuer - certificates = cert_data.get('certificates', []) - if certificates: - issuers = {} - for cert in certificates: - issuer = cert.get('issuer', 'Unknown') - issuers[issuer] = issuers.get(issuer, 0) + 1 - - f.write(" Certificate Issuers:\n") - for issuer, count in sorted(issuers.items(), key=lambda x: x[1], reverse=True): - f.write(f" {issuer}: {count} certificates\n") + f.write(f" {ip}:\n") + + # Reverse DNS + if reverse_dns.get('success') and reverse_dns.get('hostnames'): + f.write(f" Reverse DNS: {', '.join(reverse_dns['hostnames'][:3])}\n") + + # VirusTotal IP results + if vt_ip.get('success'): + detection_ratio = vt_ip.get('detection_ratio', '0/0') + malicious_engines = vt_ip.get('malicious_engines', 0) + f.write(f" VirusTotal: {detection_ratio}") + if malicious_engines > 0: + f.write(f" āš ļø FLAGGED BY {malicious_engines} ENGINES") f.write("\n") - # Show recent certificates with more details - if certificates: - f.write(" Recent SSL Certificates (detailed):\n") - for i, cert in enumerate(certificates[:10]): # Show top 10 - f.write(f" Certificate #{i+1}:\n") - f.write(f" ID: {cert.get('id', 'N/A')}\n") - f.write(f" Common Name: {cert.get('common_name', 'N/A')}\n") - f.write(f" Issuer: {cert.get('issuer', 'N/A')}\n") - f.write(f" Valid From: {cert.get('not_before', 'N/A')}\n") - f.write(f" Valid Until: {cert.get('not_after', 'N/A')}\n") - f.write(f" Serial: {cert.get('serial_number', 'N/A')}\n") - - # Show domains covered by this certificate - name_value = cert.get('name_value', '') - if name_value: - domains_in_cert = [d.strip() for d in name_value.split('\n')] - if len(domains_in_cert) > 1: - f.write(f" Covers {len(domains_in_cert)} domains: {', '.join(domains_in_cert[:5])}") - if len(domains_in_cert) > 5: - f.write(f" and {len(domains_in_cert) - 5} more") - f.write("\n") - f.write("\n") + # Shodan IP results + shodan_ip = ip_data.get('shodan_ip', {}) + if shodan_ip.get('success'): + shodan_data = shodan_ip.get('data', {}) + ports = shodan_data.get('ports', []) + if ports: + f.write(f" Shodan Ports: {', '.join(map(str, ports[:10]))}\n") - # Show all discovered subdomains - subdomains = cert_data.get('unique_subdomains', []) - if subdomains: - f.write(f" All Discovered Subdomains ({len(subdomains)} total):\n") - for subdomain in subdomains: - f.write(f" {subdomain}\n") - else: - error_msg = cert_data.get('message', 'Unknown error') - error_detail = cert_data.get('error', '') - f.write(f" Certificate Transparency Query Failed\n") - f.write(f" Error: {error_msg}\n") - if error_detail: - f.write(f" Details: {error_detail}\n") - - # Enhanced Shodan Results with all available data - shodan_data = results.get('shodan', {}) - f.write(f"\nShodan Intelligence:\n") - f.write("-" * 19 + "\n") - if shodan_data.get('success'): - total_results = shodan_data.get('total_results', 0) - f.write(f" Total Shodan Results: {total_results}\n\n") + f.write("\n") + + if discovered_subdomains: + f.write(f"Discovered Subdomains ({len(discovered_subdomains)}):\n") + for subdomain, subdomain_data in discovered_subdomains.items(): + f.write(f" {subdomain}\n") - matches = shodan_data.get('matches', []) - if matches: - f.write(f" Detailed Host Information:\n") - for i, match in enumerate(matches): - f.write(f" Host #{i+1}:\n") - f.write(f" IP Address: {match.get('ip_str', 'N/A')}\n") - f.write(f" Port: {match.get('port', 'N/A')}\n") - f.write(f" Protocol: {match.get('transport', 'N/A')}\n") - f.write(f" Service: {match.get('product', 'N/A')}\n") - f.write(f" Version: {match.get('version', 'N/A')}\n") - f.write(f" Organization: {match.get('org', 'N/A')}\n") - f.write(f" ISP: {match.get('isp', 'N/A')}\n") - f.write(f" ASN: {match.get('asn', 'N/A')}\n") - - # Location information - location = match.get('location', {}) - if location: - city = location.get('city', 'N/A') - region = location.get('region_code', 'N/A') - country = location.get('country_name', 'N/A') - f.write(f" Location: {city}, {region}, {country}\n") - - # SSL certificate information - if 'ssl' in match and match['ssl'].get('cert'): - cert = match['ssl']['cert'] - f.write(f" SSL Certificate:\n") - f.write(f" Subject: {cert.get('subject', {}).get('CN', 'N/A')}\n") - f.write(f" Issuer: {cert.get('issuer', {}).get('CN', 'N/A')}\n") - f.write(f" Expires: {cert.get('expires', 'N/A')}\n") - - # HTTP information if available - if 'http' in match: - http = match['http'] - if 'title' in http: - f.write(f" HTTP Title: {http['title'][:100]}{'...' if len(http['title']) > 100 else ''}\n") - if 'server' in http: - f.write(f" HTTP Server: {http['server']}\n") - - # Hostnames - hostnames = match.get('hostnames', []) - if hostnames: - f.write(f" Hostnames: {', '.join(hostnames)}\n") - - f.write(f" Last Updated: {match.get('timestamp', 'N/A')}\n") - f.write(" ---\n") + # Quick security check for subdomain + vt_subdomain = subdomain_data.get('virustotal_domain', {}) + if vt_subdomain.get('success') and vt_subdomain.get('malicious_engines', 0) > 0: + f.write(f" āš ļø Security Issue: Flagged by VirusTotal\n") - # Show facets if available - facets = shodan_data.get('facets', {}) - if facets: - f.write(f" Shodan Facets (aggregated data):\n") - for facet_name, facet_data in facets.items(): - f.write(f" {facet_name}:\n") - for item in facet_data: - f.write(f" {item.get('value', 'N/A')}: {item.get('count', 'N/A')} occurrences\n") - else: - error_msg = shodan_data.get('message', 'Unknown error') - f.write(f" Shodan Query Status: Failed\n") - f.write(f" Reason: {error_msg}\n") - if 'error' in shodan_data: - f.write(f" Error Details: {shodan_data['error']}\n") - - f.write(f"\n{'='*50}\n") - f.write(f"Report Generation Complete\n") - f.write(f"Total sections analyzed: DNS Records, WHOIS, Certificate Transparency, Shodan Intelligence\n") + subdomain_ips = subdomain_data.get('discovered_ips', {}) + if subdomain_ips: + f.write(f" IPs: {', '.join(list(subdomain_ips.keys())[:3])}\n") + + f.write("\n") - def save_results(self, domain: str, results: Dict[str, Any]) -> None: + def save_results(self, domain: str) -> None: """Save results in multiple formats.""" if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) @@ -812,80 +864,112 @@ class DNSReconTool: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_filename = f"{self.output_dir}/{domain}_{timestamp}" - # Save JSON (complete data) - json_file = f"{base_filename}.json" + # Save complete JSON (all recursive data) + json_file = f"{base_filename}_complete.json" with open(json_file, 'w', encoding='utf-8') as f: - json.dump(results, f, indent=2, ensure_ascii=False, default=str) + json.dump(self.all_results, f, indent=2, ensure_ascii=False, default=str) - # Save human-readable summary - txt_file = f"{base_filename}_summary.txt" - self.create_summary_report(results, txt_file) + # Save comprehensive summary + summary_file = f"{base_filename}_analysis.txt" + self.create_comprehensive_summary(summary_file) + + # Save asset list (domains and IPs) + assets_file = f"{base_filename}_assets.txt" + with open(assets_file, 'w', encoding='utf-8') as f: + f.write("Discovered Assets Summary\n") + f.write("=" * 25 + "\n\n") + + f.write(f"Domains ({len(self.processed_domains)}):\n") + for domain in sorted(self.processed_domains): + f.write(f"{domain}\n") + + f.write(f"\nIP Addresses ({len(self.processed_ips)}):\n") + for ip in sorted(self.processed_ips, key=lambda x: ipaddress.IPv4Address(x)): + f.write(f"{ip}\n") print(f"\nšŸ“„ Results saved:") - print(f" JSON: {json_file}") - print(f" Summary: {txt_file}") + print(f" Complete JSON: {json_file}") + print(f" Analysis Report: {summary_file}") + print(f" Asset List: {assets_file}") - def run_reconnaissance(self, domain: str) -> Dict[str, Any]: - """Run complete DNS reconnaissance.""" - print(f"\nšŸš€ Starting DNS reconnaissance for: {domain}") + def run_enhanced_reconnaissance(self, domain: str, max_depth: int = 2) -> Dict[str, Any]: + """Run enhanced recursive DNS reconnaissance.""" + print(f"\nšŸš€ Starting enhanced DNS reconnaissance for: {domain}") + print(f" Max recursion depth: {max_depth}") + print(f" APIs enabled: VirusTotal={bool(self.virustotal_api_key)}, Shodan={bool(self.shodan_api_key)}") - results = { - 'domain': domain, - 'timestamp': datetime.now().isoformat(), - 'dns_records': {}, - 'whois': {}, - 'certificate_transparency': {}, - 'shodan': {} - } + start_time = time.time() - # DNS Records - results['dns_records'] = self.get_comprehensive_dns(domain) + # Clear previous results + self.processed_domains.clear() + self.processed_ips.clear() + self.all_results.clear() - # WHOIS - results['whois'] = self.get_whois_data(domain) + # Start recursive analysis + results = self.analyze_domain_recursively(domain, depth=0, max_depth=max_depth) - # Certificate Transparency - results['certificate_transparency'] = self.get_certificate_transparency(domain) + end_time = time.time() + duration = end_time - start_time - # Shodan (if API key provided) - results['shodan'] = self.query_shodan(domain) + print(f"\nāœ… Enhanced reconnaissance completed in {duration:.1f} seconds") + print(f" Domains analyzed: {len(self.processed_domains)}") + print(f" IP addresses analyzed: {len(self.processed_ips)}") return results def main(): parser = argparse.ArgumentParser( - description="DNS Reconnaissance Tool - Use only on domains you own or have permission to test", - epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws." + description="Enhanced DNS Reconnaissance Tool with Recursive Analysis - Use only on domains you own or have permission to test", + epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws. Use responsibly." ) parser.add_argument('domain', help='Target domain (e.g., example.com)') parser.add_argument('--shodan-key', help='Shodan API key for additional reconnaissance') + parser.add_argument('--virustotal-key', help='VirusTotal API key for threat intelligence') + parser.add_argument('--max-depth', type=int, default=2, + help='Maximum recursion depth for subdomain analysis (default: 2)') parser.add_argument('--output-dir', default='dns_recon_results', help='Output directory for results') args = parser.parse_args() + # Validate domain format + if not re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', args.domain): + print("āŒ Invalid domain format. Please provide a valid domain (e.g., example.com)") + sys.exit(1) + # Initialize tool - tool = DNSReconTool(shodan_api_key=args.shodan_key) + tool = EnhancedDNSReconTool( + shodan_api_key=args.shodan_key, + virustotal_api_key=args.virustotal_key + ) tool.output_dir = args.output_dir # Check dependencies if not tool.check_dependencies(): sys.exit(1) + # Warn about API keys + if not args.virustotal_key: + print("āš ļø No VirusTotal API key provided. Threat intelligence will be limited.") + if not args.shodan_key: + print("āš ļø No Shodan API key provided. Host intelligence will be limited.") + try: - # Run reconnaissance - results = tool.run_reconnaissance(args.domain) + # Run enhanced reconnaissance + results = tool.run_enhanced_reconnaissance(args.domain, args.max_depth) # Save results - tool.save_results(args.domain, results) + tool.save_results(args.domain) - print(f"\nāœ… Reconnaissance completed for {args.domain}") + print(f"\nšŸŽÆ Enhanced reconnaissance completed for {args.domain}") except KeyboardInterrupt: print("\nā¹ļø Reconnaissance interrupted by user") sys.exit(0) except Exception as e: print(f"āŒ Error during reconnaissance: {e}") + import traceback + traceback.print_exc() sys.exit(1) if __name__ == "__main__":