This commit is contained in:
overcuriousity 2025-09-09 10:05:19 +02:00
parent c79955c1d6
commit a9addf7031

498
dnsrecon.py Normal file
View File

@ -0,0 +1,498 @@
#!/usr/bin/env python3
"""
Copyright (c) 2025 mstoeck3.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import subprocess
import json
import requests
import argparse
import sys
import time
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
class DNSReconTool:
def __init__(self, shodan_api_key: Optional[str] = None):
self.shodan_api_key = shodan_api_key
self.output_dir = "dns_recon_results"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'DNSReconTool/1.0 (Educational/Research Purpose)'
})
def check_dependencies(self) -> bool:
"""Check if required system tools are available."""
required_tools = ['dig', 'whois']
missing_tools = []
for tool in required_tools:
try:
subprocess.run([tool, '--help'],
capture_output=True, check=False, timeout=5)
except (subprocess.TimeoutExpired, FileNotFoundError):
missing_tools.append(tool)
if missing_tools:
print(f"❌ Missing required tools: {', '.join(missing_tools)}")
print("Install with: apt install dnsutils whois (Ubuntu/Debian)")
return False
return True
def run_command(self, cmd: str, timeout: int = 30) -> str:
"""Run shell command with timeout and error handling."""
try:
result = subprocess.run(
cmd, shell=True, capture_output=True,
text=True, timeout=timeout
)
return result.stdout.strip() if result.stdout else result.stderr.strip()
except subprocess.TimeoutExpired:
return "Error: Command timed out"
except Exception as e:
return f"Error: {str(e)}"
def get_dns_records(self, domain: str, record_type: str,
server: Optional[str] = None) -> Dict[str, Any]:
"""Fetch DNS records with comprehensive error handling."""
server_flag = f"@{server}" if server else ""
cmd = f"dig {domain} {record_type} {server_flag} +noall +answer +nottlid"
output = self.run_command(cmd)
# Parse the output into structured data
records = []
if output and not output.startswith("Error:"):
for line in output.split('\n'):
line = line.strip()
if line and not line.startswith(';'):
parts = line.split(None, 4)
if len(parts) >= 4:
records.append({
'name': parts[0],
'ttl': parts[1],
'class': parts[2],
'type': parts[3],
'data': ' '.join(parts[4:]) if len(parts) > 4 else ''
})
return {
'query': f"{domain} {record_type}",
'server': server or 'system',
'raw_output': output,
'records': records,
'record_count': len(records)
}
def get_comprehensive_dns(self, domain: str) -> Dict[str, Any]:
"""Get comprehensive DNS information."""
print("🔍 Gathering DNS records...")
# Standard record types
record_types = ['A', 'AAAA', 'MX', 'NS', 'SOA', 'TXT', 'CNAME',
'CAA', 'SRV', 'PTR']
# DNS servers to query
dns_servers = [
None, # System default
'1.1.1.1', # Cloudflare
'8.8.8.8', # Google
'9.9.9.9', # Quad9
]
dns_results = {}
for record_type in record_types:
dns_results[record_type] = {}
for server in dns_servers:
server_name = server or 'system'
dns_results[record_type][server_name] = self.get_dns_records(
domain, record_type, server
)
time.sleep(0.1) # Rate limiting
# Try DNSSEC validation
dnssec_cmd = f"dig {domain} +dnssec +noall +answer"
dns_results['DNSSEC'] = {
'system': {
'query': f"{domain} +dnssec",
'raw_output': self.run_command(dnssec_cmd),
'records': [],
'record_count': 0
}
}
return dns_results
def get_whois_data(self, domain: str) -> Dict[str, Any]:
"""Fetch and parse WHOIS data."""
print("📋 Fetching WHOIS data...")
raw_whois = self.run_command(f"whois {domain}")
# Basic parsing of common WHOIS fields
whois_data = {
'raw': raw_whois,
'parsed': {}
}
if not raw_whois.startswith("Error:"):
lines = raw_whois.split('\n')
for line in lines:
line = line.strip()
if ':' in line and not line.startswith('%') and not line.startswith('#'):
key, value = line.split(':', 1)
key = key.strip().lower().replace(' ', '_')
value = value.strip()
if value:
whois_data['parsed'][key] = value
return whois_data
def get_certificate_transparency(self, domain: str) -> Dict[str, Any]:
"""Query certificate transparency logs via crt.sh."""
print("🔐 Querying certificate transparency logs...")
try:
# Query crt.sh API
url = f"https://crt.sh/?q=%.{domain}&output=json"
response = self.session.get(url, timeout=30)
if response.status_code == 200:
cert_data = response.json()
# Extract unique subdomains
subdomains = set()
cert_details = []
for cert in cert_data:
# Extract subdomains from name_value
name_value = cert.get('name_value', '')
if name_value:
# Handle multiple domains in one certificate
domains_in_cert = [d.strip() for d in name_value.split('\n')]
subdomains.update(domains_in_cert)
cert_details.append({
'id': cert.get('id'),
'issuer': cert.get('issuer_name'),
'common_name': cert.get('common_name'),
'name_value': cert.get('name_value'),
'not_before': cert.get('not_before'),
'not_after': cert.get('not_after'),
'serial_number': cert.get('serial_number')
})
return {
'success': True,
'total_certificates': len(cert_data),
'unique_subdomains': sorted(list(subdomains)),
'subdomain_count': len(subdomains),
'certificates': cert_details[:50] # Limit for output size
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': 'Failed to fetch certificate data'
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Request to crt.sh failed'
}
def query_shodan(self, domain: str) -> Dict[str, Any]:
"""Query Shodan API for domain information."""
if not self.shodan_api_key:
return {
'success': False,
'message': 'No Shodan API key provided'
}
print("🔎 Querying Shodan...")
try:
# Search for the domain
url = f"https://api.shodan.io/shodan/host/search"
params = {
'key': self.shodan_api_key,
'query': f'hostname:{domain}'
}
response = self.session.get(url, params=params, timeout=30)
if response.status_code == 200:
data = response.json()
return {
'success': True,
'total_results': data.get('total', 0),
'matches': data.get('matches', [])[:10], # Limit results
'facets': data.get('facets', {})
}
else:
return {
'success': False,
'error': f"HTTP {response.status_code}",
'message': response.text[:200]
}
except Exception as e:
return {
'success': False,
'error': str(e),
'message': 'Shodan query failed'
}
def save_results(self, domain: str, results: Dict[str, Any]) -> None:
"""Save results in multiple formats."""
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_filename = f"{self.output_dir}/{domain}_{timestamp}"
# Save JSON (complete data)
json_file = f"{base_filename}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False, default=str)
# Save human-readable summary
txt_file = f"{base_filename}_summary.txt"
self.create_summary_report(results, txt_file)
print(f"\n📄 Results saved:")
print(f" JSON: {json_file}")
print(f" Summary: {txt_file}")
def create_summary_report(self, results: Dict[str, Any], filename: str) -> None:
"""Create human-readable summary report."""
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"DNS Reconnaissance Report\n")
f.write(f"{'='*50}\n")
f.write(f"Domain: {results['domain']}\n")
f.write(f"Timestamp: {results['timestamp']}\n\n")
# DNS Summary - improved parsing
f.write("DNS Records Summary\n")
f.write("-" * 20 + "\n")
dns_data = results.get('dns_records', {})
# A Records
if 'A' in dns_data:
system_records = dns_data['A'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nA Records (IPv4):\n")
for record in system_records:
ip = record.get('data') or record.get('type', 'N/A')
f.write(f" {ip}\n")
else:
f.write(f"\nA Records (IPv4): None found\n")
# AAAA Records
if 'AAAA' in dns_data:
system_records = dns_data['AAAA'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nAAAA Records (IPv6):\n")
for record in system_records:
ipv6 = record.get('data') or record.get('type', 'N/A')
f.write(f" {ipv6}\n")
else:
f.write(f"\nAAAA Records (IPv6): None found\n")
# MX Records
if 'MX' in dns_data:
system_records = dns_data['MX'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nMX Records (Mail Servers):\n")
for record in system_records:
priority = record.get('type', '')
server = record.get('data', 'N/A')
f.write(f" Priority {priority}: {server}\n")
else:
f.write(f"\nMX Records (Mail Servers): None found\n")
# NS Records
if 'NS' in dns_data:
system_records = dns_data['NS'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nNS Records (Name Servers):\n")
for record in system_records:
ns = record.get('data') or record.get('type', 'N/A')
f.write(f" {ns}\n")
else:
f.write(f"\nNS Records (Name Servers): None found\n")
# TXT Records
if 'TXT' in dns_data:
system_records = dns_data['TXT'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nTXT Records:\n")
for record in system_records:
txt_data = record.get('data', '')
txt_type = record.get('type', '')
full_txt = f"{txt_type} {txt_data}".strip()
if full_txt.startswith('"') and full_txt.endswith('"'):
full_txt = full_txt[1:-1] # Remove quotes
f.write(f" {full_txt}\n")
else:
f.write(f"\nTXT Records: None found\n")
# SOA Record
if 'SOA' in dns_data:
system_records = dns_data['SOA'].get('system', {}).get('records', [])
if system_records:
f.write(f"\nSOA Record (Zone Authority):\n")
for record in system_records:
primary_ns = record.get('type', 'N/A')
soa_data = record.get('data', 'N/A')
f.write(f" Primary NS: {primary_ns}\n")
f.write(f" Details: {soa_data}\n")
else:
f.write(f"\nSOA Record: None found\n")
# WHOIS Summary
whois_data = results.get('whois', {})
if whois_data.get('parsed'):
f.write(f"\nWHOIS Information\n")
f.write("-" * 17 + "\n")
parsed = whois_data['parsed']
if 'domain' in parsed:
f.write(f"Domain: {parsed['domain']}\n")
if 'changed' in parsed:
f.write(f"Last Updated: {parsed['changed']}\n")
if 'status' in parsed:
f.write(f"Status: {parsed['status']}\n")
# Certificate Transparency - show more subdomains
cert_data = results.get('certificate_transparency', {})
if cert_data.get('success'):
subdomain_count = cert_data.get('subdomain_count', 0)
f.write(f"\nSubdomains from Certificate Logs ({subdomain_count} total):\n")
f.write("-" * 45 + "\n")
subdomains = cert_data.get('unique_subdomains', [])
# Show more subdomains, not just 20
display_count = min(50, len(subdomains)) # Show up to 50
for subdomain in subdomains[:display_count]:
f.write(f" {subdomain}\n")
if len(subdomains) > display_count:
f.write(f" ... and {len(subdomains) - display_count} more\n")
# Show recent certificates
certs = cert_data.get('certificates', [])
if certs:
f.write(f"\nRecent SSL Certificates:\n")
f.write("-" * 23 + "\n")
for cert in certs[:5]: # Show first 5 certificates
f.write(f" {cert.get('common_name', 'N/A')}\n")
f.write(f" Issuer: {cert.get('issuer', 'N/A')}\n")
f.write(f" Valid: {cert.get('not_before', 'N/A')} to {cert.get('not_after', 'N/A')}\n\n")
# Shodan Summary - more detailed
shodan_data = results.get('shodan', {})
if shodan_data.get('success') and shodan_data.get('total_results', 0) > 0:
f.write(f"Shodan Results ({shodan_data.get('total_results', 0)} total):\n")
f.write("-" * 25 + "\n")
for match in shodan_data.get('matches', [])[:10]: # Show up to 10 matches
f.write(f" IP: {match.get('ip_str', 'N/A')}\n")
f.write(f" Port: {match.get('port', 'N/A')}\n")
f.write(f" Transport: {match.get('transport', 'N/A')}\n")
f.write(f" ISP: {match.get('isp', 'N/A')}\n")
f.write(f" Organization: {match.get('org', 'N/A')}\n")
f.write(f" Location: {match.get('location', {}).get('city', 'N/A')}, {match.get('location', {}).get('country_name', 'N/A')}\n")
# SSL info if available
if 'ssl' in match and match['ssl'].get('cert'):
cert = match['ssl']['cert']
f.write(f" SSL Subject: {cert.get('subject', {}).get('CN', 'N/A')}\n")
f.write(f" SSL Expires: {cert.get('expires', 'N/A')}\n")
f.write(f" ---\n")
else:
f.write(f"\nShodan Results: No data available\n")
if not shodan_data.get('success'):
error_msg = shodan_data.get('message', 'Unknown error')
f.write(f" Error: {error_msg}\n")
def run_reconnaissance(self, domain: str) -> Dict[str, Any]:
"""Run complete DNS reconnaissance."""
print(f"\n🚀 Starting DNS reconnaissance for: {domain}")
results = {
'domain': domain,
'timestamp': datetime.now().isoformat(),
'dns_records': {},
'whois': {},
'certificate_transparency': {},
'shodan': {}
}
# DNS Records
results['dns_records'] = self.get_comprehensive_dns(domain)
# WHOIS
results['whois'] = self.get_whois_data(domain)
# Certificate Transparency
results['certificate_transparency'] = self.get_certificate_transparency(domain)
# Shodan (if API key provided)
results['shodan'] = self.query_shodan(domain)
return results
def main():
parser = argparse.ArgumentParser(
description="DNS Reconnaissance Tool - Use only on domains you own or have permission to test",
epilog="LEGAL NOTICE: Unauthorized reconnaissance may violate applicable laws."
)
parser.add_argument('domain', help='Target domain (e.g., example.com)')
parser.add_argument('--shodan-key', help='Shodan API key for additional reconnaissance')
parser.add_argument('--output-dir', default='dns_recon_results',
help='Output directory for results')
args = parser.parse_args()
# Initialize tool
tool = DNSReconTool(shodan_api_key=args.shodan_key)
tool.output_dir = args.output_dir
# Check dependencies
if not tool.check_dependencies():
sys.exit(1)
try:
# Run reconnaissance
results = tool.run_reconnaissance(args.domain)
# Save results
tool.save_results(args.domain, results)
print(f"\n✅ Reconnaissance completed for {args.domain}")
except KeyboardInterrupt:
print("\n⏹️ Reconnaissance interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error during reconnaissance: {e}")
sys.exit(1)
if __name__ == "__main__":
main()