dnsrecon/dnsrecon.py
overcuriousity 88017edbeb finalize
2025-09-09 10:51:34 +02:00

892 lines
42 KiB
Python

#!/usr/bin/env python3
"""
Copyright (c) 2025 mstoeck3.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import subprocess
import json
import requests
import argparse
import sys
import time
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
class DNSReconTool:
def __init__(self, shodan_api_key: Optional[str] = None):
self.shodan_api_key = shodan_api_key
self.output_dir = "dns_recon_results"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'DNSReconTool/1.0 (Educational/Research Purpose)'
})
def check_dependencies(self) -> bool:
"""Check if required system tools are available."""
required_tools = ['dig', 'whois']
missing_tools = []
for tool in required_tools:
try:
subprocess.run([tool, '--help'],
capture_output=True, check=False, timeout=5)
except (subprocess.TimeoutExpired, FileNotFoundError):
missing_tools.append(tool)
if missing_tools:
print(f"❌ Missing required tools: {', '.join(missing_tools)}")
print("Install with: apt install dnsutils whois (Ubuntu/Debian)")
return False
return True
def run_command(self, cmd: str, timeout: int = 30) -> str:
"""Run shell command with timeout and error handling."""
try:
result = subprocess.run(
cmd, shell=True, capture_output=True,
text=True, timeout=timeout
)
return result.stdout.strip() if result.stdout else result.stderr.strip()
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {str(e)}"
def get_dns_records(self, domain: str, record_type: str,
server: Optional[str] = None) -> Dict[str, Any]:
"""Fetch DNS records with comprehensive error handling and proper parsing."""
server_flag = f"@{server}" if server else ""
cmd = f"dig {domain} {record_type} {server_flag} +noall +answer"
output = self.run_command(cmd)
# Parse the output into structured data
records = []
if output and not output.startswith("Error:"):
for line in output.split('\n'):
line = line.strip()
if line and not line.startswith(';') and not line.startswith('>>'):
# Split on any whitespace (handles both tabs and spaces)
parts = line.split()
if len(parts) >= 4:
name = parts[0].rstrip('.')
# Check if second field is numeric (TTL)
if len(parts) >= 5 and parts[1].isdigit():
# Format: name TTL class type data
ttl = parts[1]
dns_class = parts[2]
dns_type = parts[3]
data = ' '.join(parts[4:])
else:
# Format: name class type data (no TTL shown)
ttl = ''
dns_class = parts[1]
dns_type = parts[2]
data = ' '.join(parts[3:]) if len(parts) > 3 else ''
# Validate that we have the expected record type
if dns_type.upper() == record_type.upper():
records.append({
'name': name,
'ttl': ttl,
'class': dns_class,
'type': dns_type,
'data': data
})
return {
'query': f"{domain} {record_type}",
'server': server or 'system',
'raw_output': output,
'records': records,
'record_count': len(records)
}
def get_comprehensive_dns(self, domain: str) -> Dict[str, Any]:
"""Get comprehensive DNS information."""
print("🔍 Gathering DNS records...")
# Standard record types
record_types = ['A', 'AAAA', 'MX', 'NS', 'SOA', 'TXT', 'CNAME',
'CAA', 'SRV', 'PTR']
# DNS servers to query
dns_servers = [
None, # System default
'1.1.1.1', # Cloudflare
'8.8.8.8', # Google
'9.9.9.9', # Quad9
]
dns_results = {}
for record_type in record_types:
print(f" Querying {record_type} records...")
dns_results[record_type] = {}
for server in dns_servers:
server_name = server or 'system'
result = self.get_dns_records(domain, record_type, server)
dns_results[record_type][server_name] = result
# Debug output for troubleshooting
if result['records']:
print(f" {server_name}: Found {len(result['records'])} {record_type} records")
elif result['raw_output'].startswith('Error:'):
print(f" {server_name}: {result['raw_output']}")
time.sleep(0.1) # Rate limiting
# Try DNSSEC validation
print(" Querying DNSSEC information...")
dnssec_cmd = f"dig {domain} +dnssec +noall +answer"
dns_results['DNSSEC'] = {
'system': {
'query': f"{domain} +dnssec",
'raw_output': self.run_command(dnssec_cmd),
'records': [],
'record_count': 0
}
}
return dns_results
def get_whois_data(self, domain: str) -> Dict[str, Any]:
"""Fetch and parse WHOIS data with improved parsing."""
print("📋 Fetching WHOIS data...")
raw_whois = self.run_command(f"whois {domain}")
# Basic parsing of common WHOIS fields
whois_data = {
'raw': raw_whois,
'parsed': {}
}
if not raw_whois.startswith("Error:"):
lines = raw_whois.split('\n')
for line in lines:
line = line.strip()
if ':' in line and not line.startswith('%') and not line.startswith('#') and not line.startswith('>>>'):
# Handle different WHOIS formats
if line.count(':') == 1:
key, value = line.split(':', 1)
else:
# Multiple colons - take first as key, rest as value
parts = line.split(':', 2)
key, value = parts[0], ':'.join(parts[1:])
key = key.strip().lower().replace(' ', '_').replace('-', '_')
value = value.strip()
if value and key:
# Handle multiple values for same key (like name servers)
if key in whois_data['parsed']:
# Convert to list if not already
if not isinstance(whois_data['parsed'][key], list):
whois_data['parsed'][key] = [whois_data['parsed'][key]]
whois_data['parsed'][key].append(value)
else:
whois_data['parsed'][key] = value
return whois_data
def get_certificate_transparency(self, domain: str) -> Dict[str, Any]:
"""Query certificate transparency logs via crt.sh."""
print("🔐 Querying certificate transparency logs...")
try:
# Query crt.sh API
url = f"https://crt.sh/?q=%.{domain}&output=json"
response = self.session.get(url, timeout=30)
if response.status_code == 200:
cert_data = response.json()
# Extract unique subdomains
subdomains = set()
cert_details = []
for cert in cert_data:
# Extract subdomains from name_value
name_value = cert.get('name_value', '')
if name_value:
# Handle multiple domains in one certificate
domains_in_cert = [d.strip() for d in name_value.split('\n')]
subdomains.update(domains_in_cert)
cert_details.append({
'id': cert.get('id'),
'issuer': cert.get('issuer_name'),
'common_name': cert.get('common_name'),
'name_value': cert.get('name_value'),
'not_before': cert.get('not_before'),
'not_after': cert.get('not_after'),
'serial_number': cert.get('serial_number')
})
return {
'success': True,
'total_certificates': len(cert_data),
'unique_subdomains': sorted(list(subdomains)),
'subdomain_count': len(subdomains),
'certificates': cert_details[:50] # Limit for output size
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': 'Failed to fetch certificate data'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Request to crt.sh failed'
}
def query_shodan(self, domain: str) -> Dict[str, Any]:
"""Query Shodan API for domain information."""
if not self.shodan_api_key:
return {
'success': False,
'message': 'No Shodan API key provided'
}
print("🔎 Querying Shodan...")
try:
# Search for the domain
url = f"https://api.shodan.io/shodan/host/search"
params = {
'key': self.shodan_api_key,
'query': f'hostname:{domain}'
}
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 200:
data = response.json()
return {
'success': True,
'total_results': data.get('total', 0),
'matches': data.get('matches', [])[:10], # Limit results
'facets': data.get('facets', {})
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': response.text[:200]
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Shodan query failed'
}
def _write_dns_section(self, f, title: str, records: List[Dict], data_extractor):
"""Helper method to write DNS record sections."""
if records:
f.write(f"\n{title}:\n")
for record in records:
data = data_extractor(record)
f.write(f" {data}\n")
else:
f.write(f"\n{title}: None found\n")
def _write_dns_server_comparison(self, f, dns_data: Dict):
"""Compare responses from different DNS servers."""
servers = ['system', '1.1.1.1', '8.8.8.8', '9.9.9.9']
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT']
discrepancies_found = False
for record_type in record_types:
if record_type in dns_data:
f.write(f"\n{record_type} Records:\n")
server_results = {}
errors = {}
for server in servers:
if server in dns_data[record_type]:
server_data = dns_data[record_type][server]
records = server_data.get('records', [])
raw_output = server_data.get('raw_output', '')
if raw_output.startswith('Error:'):
errors[server] = raw_output
server_results[server] = set()
else:
server_results[server] = set(r.get('data', '') for r in records if r.get('data'))
else:
server_results[server] = set()
# Show results for each server
for server in servers:
records = server_results.get(server, set())
if server in errors:
f.write(f" {server:<12}: {errors[server]}\n")
elif records:
f.write(f" {server:<12}: {', '.join(sorted(records))}\n")
else:
f.write(f" {server:<12}: No records\n")
# Check for discrepancies
if len(server_results) > 1:
unique_results = set(frozenset(result) for result in server_results.values())
if len(unique_results) > 1:
f.write(f" ⚠️ INCONSISTENCY DETECTED between servers!\n")
discrepancies_found = True
if not discrepancies_found:
f.write(f"\n✅ All DNS servers return consistent results\n")
def create_summary_report(self, results: Dict[str, Any], filename: str) -> None:
"""Create comprehensive human-readable summary report including ALL collected data."""
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"DNS Reconnaissance Report\n")
f.write(f"{'='*50}\n")
f.write(f"Domain: {results['domain']}\n")
f.write(f"Timestamp: {results['timestamp']}\n\n")
dns_data = results.get('dns_records', {})
# Helper function to get records from system DNS
def get_system_records(record_type):
return dns_data.get(record_type, {}).get('system', {}).get('records', [])
# Helper function to get all server records for a type
def get_all_server_records(record_type):
servers = ['system', '1.1.1.1', '8.8.8.8', '9.9.9.9']
results = {}
for server in servers:
if record_type in dns_data and server in dns_data[record_type]:
server_data = dns_data[record_type][server]
results[server] = {
'records': server_data.get('records', []),
'raw_output': server_data.get('raw_output', ''),
'record_count': server_data.get('record_count', 0)
}
return results
# A Records (IPv4) with TTL and all servers
f.write(f"\nA Records (IPv4):\n")
f.write("-" * 16 + "\n")
a_servers = get_all_server_records('A')
if any(server_data['records'] for server_data in a_servers.values()):
for server, server_data in a_servers.items():
records = server_data['records']
if records:
f.write(f" {server}:\n")
for record in records:
ip = record.get('data', 'N/A')
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
f.write(f" {ip} (TTL: {ttl})\n")
elif server_data['raw_output'].startswith('Error:'):
f.write(f" {server}: {server_data['raw_output']}\n")
else:
f.write(f" {server}: No records found\n")
else:
f.write(" No A records found on any server\n")
# AAAA Records (IPv6) with TTL and all servers
f.write(f"\nAAAA Records (IPv6):\n")
f.write("-" * 17 + "\n")
aaaa_servers = get_all_server_records('AAAA')
if any(server_data['records'] for server_data in aaaa_servers.values()):
for server, server_data in aaaa_servers.items():
records = server_data['records']
if records:
f.write(f" {server}:\n")
for record in records:
ipv6 = record.get('data', 'N/A')
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
f.write(f" {ipv6} (TTL: {ttl})\n")
elif server_data['raw_output'].startswith('Error:'):
f.write(f" {server}: {server_data['raw_output']}\n")
else:
f.write(f" {server}: No records found\n")
else:
f.write(" No AAAA records found on any server\n")
# MX Records (Mail Servers) with TTL
mx_records = get_system_records('MX')
f.write(f"\nMX Records (Mail Servers):\n")
f.write("-" * 26 + "\n")
if mx_records:
for record in mx_records:
data_parts = record.get('data', '').split()
priority = data_parts[0] if data_parts else 'N/A'
server = ' '.join(data_parts[1:]) if len(data_parts) > 1 else 'N/A'
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
f.write(f" Priority {priority}: {server} (TTL: {ttl})\n")
else:
f.write(" No MX records found\n")
# NS Records (Name Servers) with TTL
ns_records = get_system_records('NS')
f.write(f"\nNS Records (Name Servers):\n")
f.write("-" * 26 + "\n")
if ns_records:
for record in ns_records:
ns = record.get('data', 'N/A')
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
f.write(f" {ns} (TTL: {ttl})\n")
else:
f.write(" No NS records found\n")
# CNAME Records with TTL
cname_records = get_system_records('CNAME')
f.write(f"\nCNAME Records:\n")
f.write("-" * 14 + "\n")
if cname_records:
for record in cname_records:
name = record.get('name', 'N/A')
target = record.get('data', 'N/A')
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
f.write(f" {name} -> {target} (TTL: {ttl})\n")
else:
f.write(" No CNAME records found\n")
# TXT Records with categorization and TTL
txt_records = get_system_records('TXT')
f.write(f"\nTXT Records:\n")
f.write("-" * 12 + "\n")
if txt_records:
for record in txt_records:
txt_data = record.get('data', '').strip()
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
# Clean up quoted text
if txt_data.startswith('"') and txt_data.endswith('"'):
txt_data = txt_data[1:-1]
# Identify common TXT record types
if txt_data.startswith('v=spf1'):
f.write(f" [SPF] {txt_data} (TTL: {ttl})\n")
elif txt_data.startswith('v=DMARC1'):
f.write(f" [DMARC] {txt_data} (TTL: {ttl})\n")
elif txt_data.startswith('v=DKIM1'):
f.write(f" [DKIM] {txt_data} (TTL: {ttl})\n")
elif 'google-site-verification' in txt_data:
f.write(f" [Google Verification] {txt_data[:50]}... (TTL: {ttl})\n")
elif '_domainkey' in txt_data:
f.write(f" [Domain Key] {txt_data} (TTL: {ttl})\n")
elif 'facebook-domain-verification' in txt_data:
f.write(f" [Facebook Verification] {txt_data[:50]}... (TTL: {ttl})\n")
elif txt_data.startswith('MS='):
f.write(f" [Microsoft Verification] {txt_data} (TTL: {ttl})\n")
else:
f.write(f" {txt_data} (TTL: {ttl})\n")
else:
f.write(" No TXT records found\n")
# CAA Records (Certificate Authority Authorization) with TTL
caa_records = get_system_records('CAA')
f.write(f"\nCAA Records (Certificate Authority Authorization):\n")
f.write("-" * 48 + "\n")
if caa_records:
for record in caa_records:
data_parts = record.get('data', '').split()
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
if len(data_parts) >= 3:
flags = data_parts[0]
tag = data_parts[1]
value = ' '.join(data_parts[2:]).strip('"')
f.write(f" {flags} {tag} {value} (TTL: {ttl})\n")
else:
f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n")
else:
f.write(" No CAA records found\n")
# SRV Records with TTL
srv_records = get_system_records('SRV')
f.write(f"\nSRV Records (Service Records):\n")
f.write("-" * 30 + "\n")
if srv_records:
for record in srv_records:
data_parts = record.get('data', '').split()
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
if len(data_parts) >= 4:
priority, weight, port, target = data_parts[:4]
f.write(f" {record.get('name', 'N/A')}\n")
f.write(f" Priority: {priority}, Weight: {weight}\n")
f.write(f" Port: {port}, Target: {target}\n")
f.write(f" TTL: {ttl}\n")
else:
f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n")
else:
f.write(" No SRV records found\n")
# PTR Records (Reverse DNS) - MISSING FROM ORIGINAL
ptr_records = get_system_records('PTR')
f.write(f"\nPTR Records (Reverse DNS):\n")
f.write("-" * 26 + "\n")
if ptr_records:
for record in ptr_records:
ptr_data = record.get('data', 'N/A')
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
name = record.get('name', 'N/A')
f.write(f" {name} -> {ptr_data} (TTL: {ttl})\n")
else:
f.write(" No PTR records found\n")
# SOA Record (Start of Authority) with detailed parsing
soa_records = get_system_records('SOA')
f.write(f"\nSOA Record (Zone Authority):\n")
f.write("-" * 27 + "\n")
if soa_records:
for record in soa_records:
data_parts = record.get('data', '').split()
ttl = record.get('ttl', 'N/A') if record.get('ttl') else 'Not shown'
if len(data_parts) >= 7:
primary_ns, admin_email = data_parts[:2]
serial, refresh, retry, expire, minimum = data_parts[2:7]
f.write(f" Primary Name Server: {primary_ns}\n")
f.write(f" Admin Email: {admin_email}\n")
f.write(f" Serial Number: {serial}\n")
f.write(f" Refresh Interval: {refresh} seconds\n")
f.write(f" Retry Interval: {retry} seconds\n")
f.write(f" Expire Time: {expire} seconds\n")
f.write(f" Minimum TTL: {minimum} seconds\n")
f.write(f" Record TTL: {ttl}\n")
else:
f.write(f" {record.get('data', 'N/A')} (TTL: {ttl})\n")
else:
f.write(" No SOA record found\n")
# DNSSEC Information with detailed analysis
dnssec_data = dns_data.get('DNSSEC', {}).get('system', {})
dnssec_output = dnssec_data.get('raw_output', '')
f.write(f"\nDNSSEC Status:\n")
f.write("-" * 14 + "\n")
if dnssec_output and not dnssec_output.startswith('Error:') and dnssec_output.strip():
if 'RRSIG' in dnssec_output:
f.write(" ✅ DNSSEC is enabled (RRSIG records found)\n")
elif 'DNSKEY' in dnssec_output:
f.write(" ✅ DNSSEC keys present (DNSKEY records found)\n")
elif 'NSEC' in dnssec_output or 'NSEC3' in dnssec_output:
f.write(" ✅ DNSSEC authenticated denial (NSEC/NSEC3 found)\n")
else:
f.write(" ❓ DNSSEC query returned data but no signatures detected\n")
# Show sample DNSSEC records (first few lines)
dnssec_lines = [line.strip() for line in dnssec_output.split('\n') if line.strip()]
if dnssec_lines:
f.write(" Sample DNSSEC records:\n")
for line in dnssec_lines[:3]: # Show first 3 lines
f.write(f" {line}\n")
if len(dnssec_lines) > 3:
f.write(f" ... and {len(dnssec_lines) - 3} more\n")
else:
f.write(" ❌ DNSSEC not detected or query failed\n")
if dnssec_output.startswith('Error:'):
f.write(f" Error: {dnssec_output}\n")
# Complete DNS Server Comparison Table
f.write(f"\nComplete DNS Server Comparison:\n")
f.write("-" * 33 + "\n")
self._write_dns_server_comparison(f, dns_data)
# Enhanced WHOIS Information with ALL parsed fields
whois_data = results.get('whois', {})
f.write(f"\nWHOIS Information (Complete):\n")
f.write("-" * 29 + "\n")
if whois_data.get('parsed'):
parsed = whois_data['parsed']
# Group fields by category for better organization
domain_fields = ['domain_name', 'domain']
registrar_fields = ['registrar', 'sponsoring_registrar', 'registrar_whois_server', 'registrar_url']
date_fields = ['creation_date', 'created', 'expiration_date', 'registry_expiry_date', 'expires', 'updated_date', 'changed', 'last_updated']
status_fields = ['status', 'domain_status']
ns_fields = [k for k in parsed.keys() if 'name_server' in k.lower() or k.lower().startswith('nserver')]
contact_fields = [k for k in parsed.keys() if any(x in k.lower() for x in ['registrant', 'admin', 'tech', 'billing'])]
# Display organized sections
for field in domain_fields:
if field in parsed:
f.write(f" Domain: {parsed[field]}\n")
break
for field in registrar_fields:
if field in parsed:
f.write(f" Registrar: {parsed[field]}\n")
break
# Show all date fields found
for field in date_fields:
if field in parsed:
field_display = field.replace('_', ' ').title()
f.write(f" {field_display}: {parsed[field]}\n")
for field in status_fields:
if field in parsed:
f.write(f" Status: {parsed[field]}\n")
break
# Name servers from WHOIS
if ns_fields:
f.write(f" WHOIS Name Servers:\n")
for field in sorted(ns_fields):
value = parsed[field]
if isinstance(value, list):
for ns in value:
f.write(f" {ns}\n")
else:
f.write(f" {value}\n")
# Show any other significant fields not covered above
covered_fields = set(domain_fields + registrar_fields + date_fields + status_fields + ns_fields + contact_fields)
other_fields = [k for k in parsed.keys() if k not in covered_fields and not k.startswith('nserver')]
if other_fields:
f.write(f" Other WHOIS Data:\n")
for field in sorted(other_fields)[:10]: # Limit to prevent spam
field_display = field.replace('_', ' ').title()
value = parsed[field]
if isinstance(value, list):
value = ', '.join(value[:3]) # Show first 3 if list
value_display = value[:100] + '...' if len(str(value)) > 100 else str(value)
f.write(f" {field_display}: {value_display}\n")
raw_whois = whois_data.get('raw', '')
if raw_whois.startswith('Error:'):
f.write(f" WHOIS Error: {raw_whois}\n")
elif not whois_data.get('parsed'):
f.write(" No WHOIS data could be parsed\n")
# Certificate Transparency with comprehensive details
cert_data = results.get('certificate_transparency', {})
f.write(f"\nCertificate Transparency Logs:\n")
f.write("-" * 30 + "\n")
if cert_data.get('success'):
total_certs = cert_data.get('total_certificates', 0)
subdomain_count = cert_data.get('subdomain_count', 0)
f.write(f" Total Certificates Found: {total_certs}\n")
f.write(f" Unique Subdomains Discovered: {subdomain_count}\n\n")
# Show certificate statistics by issuer
certificates = cert_data.get('certificates', [])
if certificates:
issuers = {}
for cert in certificates:
issuer = cert.get('issuer', 'Unknown')
issuers[issuer] = issuers.get(issuer, 0) + 1
f.write(" Certificate Issuers:\n")
for issuer, count in sorted(issuers.items(), key=lambda x: x[1], reverse=True):
f.write(f" {issuer}: {count} certificates\n")
f.write("\n")
# Show recent certificates with more details
if certificates:
f.write(" Recent SSL Certificates (detailed):\n")
for i, cert in enumerate(certificates[:10]): # Show top 10
f.write(f" Certificate #{i+1}:\n")
f.write(f" ID: {cert.get('id', 'N/A')}\n")
f.write(f" Common Name: {cert.get('common_name', 'N/A')}\n")
f.write(f" Issuer: {cert.get('issuer', 'N/A')}\n")
f.write(f" Valid From: {cert.get('not_before', 'N/A')}\n")
f.write(f" Valid Until: {cert.get('not_after', 'N/A')}\n")
f.write(f" Serial: {cert.get('serial_number', 'N/A')}\n")
# Show domains covered by this certificate
name_value = cert.get('name_value', '')
if name_value:
domains_in_cert = [d.strip() for d in name_value.split('\n')]
if len(domains_in_cert) > 1:
f.write(f" Covers {len(domains_in_cert)} domains: {', '.join(domains_in_cert[:5])}")
if len(domains_in_cert) > 5:
f.write(f" and {len(domains_in_cert) - 5} more")
f.write("\n")
f.write("\n")
# Show all discovered subdomains
subdomains = cert_data.get('unique_subdomains', [])
if subdomains:
f.write(f" All Discovered Subdomains ({len(subdomains)} total):\n")
for subdomain in subdomains:
f.write(f" {subdomain}\n")
else:
error_msg = cert_data.get('message', 'Unknown error')
error_detail = cert_data.get('error', '')
f.write(f" Certificate Transparency Query Failed\n")
f.write(f" Error: {error_msg}\n")
if error_detail:
f.write(f" Details: {error_detail}\n")
# Enhanced Shodan Results with all available data
shodan_data = results.get('shodan', {})
f.write(f"\nShodan Intelligence:\n")
f.write("-" * 19 + "\n")
if shodan_data.get('success'):
total_results = shodan_data.get('total_results', 0)
f.write(f" Total Shodan Results: {total_results}\n\n")
matches = shodan_data.get('matches', [])
if matches:
f.write(f" Detailed Host Information:\n")
for i, match in enumerate(matches):
f.write(f" Host #{i+1}:\n")
f.write(f" IP Address: {match.get('ip_str', 'N/A')}\n")
f.write(f" Port: {match.get('port', 'N/A')}\n")
f.write(f" Protocol: {match.get('transport', 'N/A')}\n")
f.write(f" Service: {match.get('product', 'N/A')}\n")
f.write(f" Version: {match.get('version', 'N/A')}\n")
f.write(f" Organization: {match.get('org', 'N/A')}\n")
f.write(f" ISP: {match.get('isp', 'N/A')}\n")
f.write(f" ASN: {match.get('asn', 'N/A')}\n")
# Location information
location = match.get('location', {})
if location:
city = location.get('city', 'N/A')
region = location.get('region_code', 'N/A')
country = location.get('country_name', 'N/A')
f.write(f" Location: {city}, {region}, {country}\n")
# SSL certificate information
if 'ssl' in match and match['ssl'].get('cert'):
cert = match['ssl']['cert']
f.write(f" SSL Certificate:\n")
f.write(f" Subject: {cert.get('subject', {}).get('CN', 'N/A')}\n")
f.write(f" Issuer: {cert.get('issuer', {}).get('CN', 'N/A')}\n")
f.write(f" Expires: {cert.get('expires', 'N/A')}\n")
# HTTP information if available
if 'http' in match:
http = match['http']
if 'title' in http:
f.write(f" HTTP Title: {http['title'][:100]}{'...' if len(http['title']) > 100 else ''}\n")
if 'server' in http:
f.write(f" HTTP Server: {http['server']}\n")
# Hostnames
hostnames = match.get('hostnames', [])
if hostnames:
f.write(f" Hostnames: {', '.join(hostnames)}\n")
f.write(f" Last Updated: {match.get('timestamp', 'N/A')}\n")
f.write(" ---\n")
# Show facets if available
facets = shodan_data.get('facets', {})
if facets:
f.write(f" Shodan Facets (aggregated data):\n")
for facet_name, facet_data in facets.items():
f.write(f" {facet_name}:\n")
for item in facet_data:
f.write(f" {item.get('value', 'N/A')}: {item.get('count', 'N/A')} occurrences\n")
else:
error_msg = shodan_data.get('message', 'Unknown error')
f.write(f" Shodan Query Status: Failed\n")
f.write(f" Reason: {error_msg}\n")
if 'error' in shodan_data:
f.write(f" Error Details: {shodan_data['error']}\n")
f.write(f"\n{'='*50}\n")
f.write(f"Report Generation Complete\n")
f.write(f"Total sections analyzed: DNS Records, WHOIS, Certificate Transparency, Shodan Intelligence\n")
def save_results(self, domain: str, results: Dict[str, Any]) -> None:
"""Save results in multiple formats."""
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = f"{self.output_dir}/{domain}_{timestamp}"
# Save JSON (complete data)
json_file = f"{base_filename}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False, default=str)
# Save human-readable summary
txt_file = f"{base_filename}_summary.txt"
self.create_summary_report(results, txt_file)
print(f"\n📄 Results saved:")
print(f" JSON: {json_file}")
print(f" Summary: {txt_file}")
def run_reconnaissance(self, domain: str) -> Dict[str, Any]:
"""Run complete DNS reconnaissance."""
print(f"\n🚀 Starting DNS reconnaissance for: {domain}")
results = {
'domain': domain,
'timestamp': datetime.now().isoformat(),
'dns_records': {},
'whois': {},
'certificate_transparency': {},
'shodan': {}
}
# DNS Records
results['dns_records'] = self.get_comprehensive_dns(domain)
# WHOIS
results['whois'] = self.get_whois_data(domain)
# Certificate Transparency
results['certificate_transparency'] = self.get_certificate_transparency(domain)
# Shodan (if API key provided)
results['shodan'] = self.query_shodan(domain)
return results
def main():
parser = argparse.ArgumentParser(
description="DNS Reconnaissance Tool - Use only on domains you own or have permission to test",
epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws."
)
parser.add_argument('domain', help='Target domain (e.g., example.com)')
parser.add_argument('--shodan-key', help='Shodan API key for additional reconnaissance')
parser.add_argument('--output-dir', default='dns_recon_results',
help='Output directory for results')
args = parser.parse_args()
# Initialize tool
tool = DNSReconTool(shodan_api_key=args.shodan_key)
tool.output_dir = args.output_dir
# Check dependencies
if not tool.check_dependencies():
sys.exit(1)
try:
# Run reconnaissance
results = tool.run_reconnaissance(args.domain)
# Save results
tool.save_results(args.domain, results)
print(f"\n✅ Reconnaissance completed for {args.domain}")
except KeyboardInterrupt:
print("\n⏹️ Reconnaissance interrupted by user")
sys.exit(0)
except Exception as e:
print(f"❌ Error during reconnaissance: {e}")
sys.exit(1)
if __name__ == "__main__":
main()