dnsrecon/dnsrecon.py
overcuriousity 5d0e095a81 improvement
2025-09-09 10:15:31 +02:00

603 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Copyright (c) 2025 mstoeck3.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import subprocess
import json
import requests
import argparse
import sys
import time
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
class DNSReconTool:
def __init__(self, shodan_api_key: Optional[str] = None):
self.shodan_api_key = shodan_api_key
self.output_dir = "dns_recon_results"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'DNSReconTool/1.0 (Educational/Research Purpose)'
})
def check_dependencies(self) -> bool:
"""Check if required system tools are available."""
required_tools = ['dig', 'whois']
missing_tools = []
for tool in required_tools:
try:
subprocess.run([tool, '--help'],
capture_output=True, check=False, timeout=5)
except (subprocess.TimeoutExpired, FileNotFoundError):
missing_tools.append(tool)
if missing_tools:
print(f"❌ Missing required tools: {', '.join(missing_tools)}")
print("Install with: apt install dnsutils whois (Ubuntu/Debian)")
return False
return True
def run_command(self, cmd: str, timeout: int = 30) -> str:
"""Run shell command with timeout and error handling."""
try:
result = subprocess.run(
cmd, shell=True, capture_output=True,
text=True, timeout=timeout
)
return result.stdout.strip() if result.stdout else result.stderr.strip()
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {str(e)}"
def get_dns_records(self, domain: str, record_type: str,
server: Optional[str] = None) -> Dict[str, Any]:
"""Fetch DNS records with comprehensive error handling."""
server_flag = f"@{server}" if server else ""
cmd = f"dig {domain} {record_type} {server_flag} +noall +answer +nottlid"
output = self.run_command(cmd)
# Parse the output into structured data
records = []
if output and not output.startswith("Error:"):
for line in output.split('\n'):
line = line.strip()
if line and not line.startswith(';') and not line.startswith('>>'):
# Split on whitespace, but preserve quoted strings
parts = line.split(None, 4)
if len(parts) >= 5: # name, ttl, class, type, data
records.append({
'name': parts[0].rstrip('.'), # Remove trailing dot
'ttl': parts[1],
'class': parts[2],
'type': parts[3],
'data': parts[4]
})
elif len(parts) == 4: # Sometimes no data field
records.append({
'name': parts[0].rstrip('.'),
'ttl': parts[1],
'class': parts[2],
'type': parts[3],
'data': ''
})
return {
'query': f"{domain} {record_type}",
'server': server or 'system',
'raw_output': output,
'records': records,
'record_count': len(records)
}
def get_comprehensive_dns(self, domain: str) -> Dict[str, Any]:
"""Get comprehensive DNS information."""
print("🔍 Gathering DNS records...")
# Standard record types
record_types = ['A', 'AAAA', 'MX', 'NS', 'SOA', 'TXT', 'CNAME',
'CAA', 'SRV', 'PTR']
# DNS servers to query
dns_servers = [
None, # System default
'1.1.1.1', # Cloudflare
'8.8.8.8', # Google
'9.9.9.9', # Quad9
]
dns_results = {}
for record_type in record_types:
dns_results[record_type] = {}
for server in dns_servers:
server_name = server or 'system'
dns_results[record_type][server_name] = self.get_dns_records(
domain, record_type, server
)
time.sleep(0.1) # Rate limiting
# Try DNSSEC validation
dnssec_cmd = f"dig {domain} +dnssec +noall +answer"
dns_results['DNSSEC'] = {
'system': {
'query': f"{domain} +dnssec",
'raw_output': self.run_command(dnssec_cmd),
'records': [],
'record_count': 0
}
}
return dns_results
def get_whois_data(self, domain: str) -> Dict[str, Any]:
"""Fetch and parse WHOIS data."""
print("📋 Fetching WHOIS data...")
raw_whois = self.run_command(f"whois {domain}")
# Basic parsing of common WHOIS fields
whois_data = {
'raw': raw_whois,
'parsed': {}
}
if not raw_whois.startswith("Error:"):
lines = raw_whois.split('\n')
for line in lines:
line = line.strip()
if ':' in line and not line.startswith('%') and not line.startswith('#'):
key, value = line.split(':', 1)
key = key.strip().lower().replace(' ', '_')
value = value.strip()
if value:
whois_data['parsed'][key] = value
return whois_data
def get_certificate_transparency(self, domain: str) -> Dict[str, Any]:
"""Query certificate transparency logs via crt.sh."""
print("🔐 Querying certificate transparency logs...")
try:
# Query crt.sh API
url = f"https://crt.sh/?q=%.{domain}&output=json"
response = self.session.get(url, timeout=30)
if response.status_code == 200:
cert_data = response.json()
# Extract unique subdomains
subdomains = set()
cert_details = []
for cert in cert_data:
# Extract subdomains from name_value
name_value = cert.get('name_value', '')
if name_value:
# Handle multiple domains in one certificate
domains_in_cert = [d.strip() for d in name_value.split('\n')]
subdomains.update(domains_in_cert)
cert_details.append({
'id': cert.get('id'),
'issuer': cert.get('issuer_name'),
'common_name': cert.get('common_name'),
'name_value': cert.get('name_value'),
'not_before': cert.get('not_before'),
'not_after': cert.get('not_after'),
'serial_number': cert.get('serial_number')
})
return {
'success': True,
'total_certificates': len(cert_data),
'unique_subdomains': sorted(list(subdomains)),
'subdomain_count': len(subdomains),
'certificates': cert_details[:50] # Limit for output size
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': 'Failed to fetch certificate data'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Request to crt.sh failed'
}
def query_shodan(self, domain: str) -> Dict[str, Any]:
"""Query Shodan API for domain information."""
if not self.shodan_api_key:
return {
'success': False,
'message': 'No Shodan API key provided'
}
print("🔎 Querying Shodan...")
try:
# Search for the domain
url = f"https://api.shodan.io/shodan/host/search"
params = {
'key': self.shodan_api_key,
'query': f'hostname:{domain}'
}
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 200:
data = response.json()
return {
'success': True,
'total_results': data.get('total', 0),
'matches': data.get('matches', [])[:10], # Limit results
'facets': data.get('facets', {})
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': response.text[:200]
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Shodan query failed'
}
def _write_dns_section(self, f, title: str, records: List[Dict], data_extractor):
"""Helper method to write DNS record sections."""
if records:
f.write(f"\n{title}:\n")
for record in records:
data = data_extractor(record)
f.write(f" {data}\n")
else:
f.write(f"\n{title}: None found\n")
def _write_dns_server_comparison(self, f, dns_data: Dict):
"""Compare responses from different DNS servers."""
servers = ['system', '1.1.1.1', '8.8.8.8', '9.9.9.9']
record_types = ['A', 'AAAA', 'MX', 'NS']
discrepancies_found = False
for record_type in record_types:
if record_type in dns_data:
server_results = {}
for server in servers:
if server in dns_data[record_type]:
records = dns_data[record_type][server].get('records', [])
server_results[server] = set(r.get('data', '') for r in records if r.get('data'))
# Check for discrepancies
if len(server_results) > 1:
all_results = list(server_results.values())
if not all(result == all_results[0] for result in all_results):
if not discrepancies_found:
discrepancies_found = True
f.write(f" ⚠️ {record_type} records differ between DNS servers:\n")
for server, records in server_results.items():
f.write(f" {server}: {', '.join(sorted(records)) if records else 'No records'}\n")
if not discrepancies_found:
f.write(" ✅ All DNS servers return consistent results\n")
def create_summary_report(self, results: Dict[str, Any], filename: str) -> None:
"""Create comprehensive human-readable summary report."""
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"DNS Reconnaissance Report\n")
f.write(f"{'='*50}\n")
f.write(f"Domain: {results['domain']}\n")
f.write(f"Timestamp: {results['timestamp']}\n\n")
dns_data = results.get('dns_records', {})
# Helper function to get records from system DNS
def get_system_records(record_type):
return dns_data.get(record_type, {}).get('system', {}).get('records', [])
# A Records (IPv4)
self._write_dns_section(f, "A Records (IPv4)", get_system_records('A'),
lambda r: r.get('data', 'N/A'))
# AAAA Records (IPv6)
self._write_dns_section(f, "AAAA Records (IPv6)", get_system_records('AAAA'),
lambda r: r.get('data', 'N/A'))
# MX Records (Mail Servers)
mx_records = get_system_records('MX')
if mx_records:
f.write(f"\nMX Records (Mail Servers):\n")
for record in mx_records:
data_parts = record.get('data', '').split()
priority = data_parts[0] if data_parts else 'N/A'
server = ' '.join(data_parts[1:]) if len(data_parts) > 1 else 'N/A'
f.write(f" Priority {priority}: {server}\n")
else:
f.write(f"\nMX Records (Mail Servers): None found\n")
# NS Records (Name Servers)
self._write_dns_section(f, "NS Records (Name Servers)", get_system_records('NS'),
lambda r: r.get('data', 'N/A'))
# CNAME Records
self._write_dns_section(f, "CNAME Records", get_system_records('CNAME'),
lambda r: f"{r.get('name', 'N/A')} -> {r.get('data', 'N/A')}")
# TXT Records
txt_records = get_system_records('TXT')
if txt_records:
f.write(f"\nTXT Records:\n")
for record in txt_records:
txt_data = record.get('data', '').strip()
# Clean up quoted text
if txt_data.startswith('"') and txt_data.endswith('"'):
txt_data = txt_data[1:-1]
# Identify common TXT record types
if txt_data.startswith('v=spf1'):
f.write(f" [SPF] {txt_data}\n")
elif txt_data.startswith('v=DMARC1'):
f.write(f" [DMARC] {txt_data}\n")
elif txt_data.startswith('v=DKIM1'):
f.write(f" [DKIM] {txt_data}\n")
elif 'google-site-verification' in txt_data:
f.write(f" [Google Verification] {txt_data[:50]}...\n")
else:
f.write(f" {txt_data}\n")
else:
f.write(f"\nTXT Records: None found\n")
# CAA Records (Certificate Authority Authorization)
caa_records = get_system_records('CAA')
if caa_records:
f.write(f"\nCAA Records (Certificate Authority Authorization):\n")
for record in caa_records:
data_parts = record.get('data', '').split()
if len(data_parts) >= 3:
flags = data_parts[0]
tag = data_parts[1]
value = ' '.join(data_parts[2:]).strip('"')
f.write(f" {flags} {tag} {value}\n")
else:
f.write(f" {record.get('data', 'N/A')}\n")
else:
f.write(f"\nCAA Records: None found\n")
# SRV Records
srv_records = get_system_records('SRV')
if srv_records:
f.write(f"\nSRV Records (Service Records):\n")
for record in srv_records:
data_parts = record.get('data', '').split()
if len(data_parts) >= 4:
priority, weight, port, target = data_parts[:4]
f.write(f" {record.get('name', 'N/A')}\n")
f.write(f" Priority: {priority}, Weight: {weight}\n")
f.write(f" Port: {port}, Target: {target}\n")
else:
f.write(f" {record.get('data', 'N/A')}\n")
else:
f.write(f"\nSRV Records: None found\n")
# SOA Record (Start of Authority)
soa_records = get_system_records('SOA')
if soa_records:
f.write(f"\nSOA Record (Zone Authority):\n")
for record in soa_records:
data_parts = record.get('data', '').split()
if len(data_parts) >= 7:
primary_ns, admin_email = data_parts[:2]
serial, refresh, retry, expire, minimum = data_parts[2:7]
f.write(f" Primary Name Server: {primary_ns}\n")
f.write(f" Admin Email: {admin_email}\n")
f.write(f" Serial: {serial}\n")
f.write(f" Refresh: {refresh}s, Retry: {retry}s\n")
f.write(f" Expire: {expire}s, Minimum TTL: {minimum}s\n")
else:
f.write(f" {record.get('data', 'N/A')}\n")
else:
f.write(f"\nSOA Record: None found\n")
# DNSSEC Information
dnssec_data = dns_data.get('DNSSEC', {}).get('system', {})
dnssec_output = dnssec_data.get('raw_output', '')
if dnssec_output and not dnssec_output.startswith('Error:') and dnssec_output.strip():
f.write(f"\nDNSSEC Status:\n")
f.write("-" * 14 + "\n")
if 'RRSIG' in dnssec_output or 'DNSKEY' in dnssec_output:
f.write(f" ✅ DNSSEC is enabled\n")
else:
f.write(f" ❌ DNSSEC not detected\n")
else:
f.write(f"\nDNSSEC Status: Unable to determine\n")
# DNS Server Comparison (show discrepancies)
f.write(f"\nDNS Server Comparison:\n")
f.write("-" * 21 + "\n")
self._write_dns_server_comparison(f, dns_data)
# Enhanced WHOIS Information
whois_data = results.get('whois', {})
if whois_data.get('parsed'):
f.write(f"\nWHOIS Information\n")
f.write("-" * 17 + "\n")
parsed = whois_data['parsed']
# Domain info
for field in ['domain_name', 'domain']:
if field in parsed:
f.write(f"Domain: {parsed[field]}\n")
break
# Registrar info
for field in ['registrar', 'sponsoring_registrar']:
if field in parsed:
f.write(f"Registrar: {parsed[field]}\n")
break
# Important dates
for field in ['creation_date', 'created']:
if field in parsed:
f.write(f"Created: {parsed[field]}\n")
break
for field in ['expiration_date', 'registry_expiry_date', 'expires']:
if field in parsed:
f.write(f"Expires: {parsed[field]}\n")
break
for field in ['updated_date', 'changed', 'last_updated']:
if field in parsed:
f.write(f"Last Updated: {parsed[field]}\n")
break
# Status
for field in ['status', 'domain_status']:
if field in parsed:
f.write(f"Status: {parsed[field]}\n")
break
# Name servers from WHOIS
ns_fields = [k for k in parsed.keys() if 'name_server' in k.lower() or k.lower().startswith('nserver')]
if ns_fields:
f.write(f"WHOIS Name Servers:\n")
for field in sorted(ns_fields)[:10]: # Limit to 10
f.write(f" {parsed[field]}\n")
# Certificate transparency section
cert_data = results.get('certificate_transparency', {})
if cert_data.get('success'):
subdomain_count = cert_data.get('subdomain_count', 0)
f.write(f"\nSubdomains from Certificate Logs ({subdomain_count} total):\n")
f.write("-" * 45 + "\n")
subdomains = cert_data.get('unique_subdomains', [])
display_count = min(50, len(subdomains))
for subdomain in subdomains[:display_count]:
f.write(f" {subdomain}\n")
if len(subdomains) > display_count:
f.write(f" ... and {len(subdomains) - display_count} more\n")
# Shodan section
shodan_data = results.get('shodan', {})
if shodan_data.get('success') and shodan_data.get('total_results', 0) > 0:
f.write(f"\nShodan Results ({shodan_data.get('total_results', 0)} total):\n")
f.write("-" * 25 + "\n")
for match in shodan_data.get('matches', [])[:5]:
f.write(f" IP: {match.get('ip_str', 'N/A')}\n")
f.write(f" Port: {match.get('port', 'N/A')}\n")
f.write(f" Service: {match.get('product', 'N/A')}\n")
f.write(f" Organization: {match.get('org', 'N/A')}\n")
f.write(" ---\n")
def save_results(self, domain: str, results: Dict[str, Any]) -> None:
"""Save results in multiple formats."""
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = f"{self.output_dir}/{domain}_{timestamp}"
# Save JSON (complete data)
json_file = f"{base_filename}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False, default=str)
# Save human-readable summary
txt_file = f"{base_filename}_summary.txt"
self.create_summary_report(results, txt_file)
print(f"\n📄 Results saved:")
print(f" JSON: {json_file}")
print(f" Summary: {txt_file}")
def run_reconnaissance(self, domain: str) -> Dict[str, Any]:
"""Run complete DNS reconnaissance."""
print(f"\n🚀 Starting DNS reconnaissance for: {domain}")
results = {
'domain': domain,
'timestamp': datetime.now().isoformat(),
'dns_records': {},
'whois': {},
'certificate_transparency': {},
'shodan': {}
}
# DNS Records
results['dns_records'] = self.get_comprehensive_dns(domain)
# WHOIS
results['whois'] = self.get_whois_data(domain)
# Certificate Transparency
results['certificate_transparency'] = self.get_certificate_transparency(domain)
# Shodan (if API key provided)
results['shodan'] = self.query_shodan(domain)
return results
def main():
parser = argparse.ArgumentParser(
description="DNS Reconnaissance Tool - Use only on domains you own or have permission to test",
epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws."
)
parser.add_argument('domain', help='Target domain (e.g., example.com)')
parser.add_argument('--shodan-key', help='Shodan API key for additional reconnaissance')
parser.add_argument('--output-dir', default='dns_recon_results',
help='Output directory for results')
args = parser.parse_args()
# Initialize tool
tool = DNSReconTool(shodan_api_key=args.shodan_key)
tool.output_dir = args.output_dir
# Check dependencies
if not tool.check_dependencies():
sys.exit(1)
try:
# Run reconnaissance
results = tool.run_reconnaissance(args.domain)
# Save results
tool.save_results(args.domain, results)
print(f"\n✅ Reconnaissance completed for {args.domain}")
except KeyboardInterrupt:
print("\n⏹️ Reconnaissance interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error during reconnaissance: {e}")
sys.exit(1)
if __name__ == "__main__":
main()