Compare commits

..

2 Commits

Author SHA1 Message Date
overcuriousity
29e36e34be graph 2025-09-09 22:19:46 +02:00
overcuriousity
cee620f5f6 progress 2025-09-09 20:47:01 +02:00
12 changed files with 2626 additions and 697 deletions

View File

@ -1,11 +1,12 @@
# File: src/certificate_checker.py
"""Certificate transparency log checker using crt.sh."""
"""Certificate transparency log checker using crt.sh with forensic operation tracking."""
import requests
import json
import time
import logging
import socket
import uuid
from datetime import datetime
from typing import List, Optional, Set
from .data_structures import Certificate
@ -15,7 +16,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class CertificateChecker:
"""Check certificates using crt.sh."""
"""Check certificates using crt.sh with simple query caching and forensic tracking."""
CRT_SH_URL = "https://crt.sh/"
@ -24,27 +25,26 @@ class CertificateChecker:
self.last_request = 0
self.query_count = 0
self.connection_failures = 0
self.max_connection_failures = 3 # Stop trying after 3 consecutive failures
self.max_connection_failures = 3
logger.info("🔐 Certificate checker initialized")
# Simple HTTP request cache to avoid duplicate queries
self._http_cache = {} # query_string -> List[Certificate]
# Test connectivity to crt.sh on initialization
logger.info("🔐 Certificate checker initialized with HTTP request caching")
self._test_connectivity()
def _test_connectivity(self):
"""Test if we can reach crt.sh."""
try:
logger.info("🔗 Testing connectivity to crt.sh...")
logger.info("Testing connectivity to crt.sh...")
# First test DNS resolution
try:
socket.gethostbyname('crt.sh')
logger.debug("DNS resolution for crt.sh successful")
logger.debug("DNS resolution for crt.sh successful")
except socket.gaierror as e:
logger.warning(f"⚠️ DNS resolution failed for crt.sh: {e}")
logger.warning(f"DNS resolution failed for crt.sh: {e}")
return False
# Test HTTP connection with a simple request
response = requests.get(
self.CRT_SH_URL,
params={'q': 'example.com', 'output': 'json'},
@ -52,21 +52,21 @@ class CertificateChecker:
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
if response.status_code in [200, 404]: # 404 is also acceptable (no results)
logger.info("crt.sh connectivity test successful")
if response.status_code in [200, 404]:
logger.info("crt.sh connectivity test successful")
return True
else:
logger.warning(f"⚠️ crt.sh returned status {response.status_code}")
logger.warning(f"crt.sh returned status {response.status_code}")
return False
except requests.exceptions.ConnectionError as e:
logger.warning(f"⚠️ Cannot reach crt.sh: {e}")
logger.warning(f"Cannot reach crt.sh: {e}")
return False
except requests.exceptions.Timeout:
logger.warning("⚠️ crt.sh connectivity test timed out")
logger.warning("crt.sh connectivity test timed out")
return False
except Exception as e:
logger.warning(f"⚠️ Unexpected error testing crt.sh connectivity: {e}")
logger.warning(f"Unexpected error testing crt.sh connectivity: {e}")
return False
def _rate_limit(self):
@ -77,30 +77,32 @@ class CertificateChecker:
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"⏸️ crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
logger.debug(f"crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
self.query_count += 1
def get_certificates(self, domain: str) -> List[Certificate]:
"""Get certificates for a domain from crt.sh."""
logger.debug(f"🔍 Getting certificates for domain: {domain}")
def get_certificates(self, domain: str, operation_id: Optional[str] = None) -> List[Certificate]:
"""Get certificates for a domain with forensic tracking."""
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔐 Getting certificates for domain: {domain} (operation: {operation_id})")
# Skip if we've had too many connection failures
if self.connection_failures >= self.max_connection_failures:
logger.warning(f"⚠️ Skipping certificate lookup for {domain} due to repeated connection failures")
logger.warning(f"Skipping certificate lookup for {domain} due to repeated connection failures")
return []
certificates = []
# Query for the domain
domain_certs = self._query_crt_sh(domain)
# Query for the domain itself
domain_certs = self._query_crt_sh(domain, operation_id)
certificates.extend(domain_certs)
# Also query for wildcard certificates (if the main query succeeded)
if domain_certs or self.connection_failures < self.max_connection_failures:
wildcard_certs = self._query_crt_sh(f"%.{domain}")
# Query for wildcard certificates
wildcard_certs = self._query_crt_sh(f"%.{domain}", operation_id)
certificates.extend(wildcard_certs)
# Remove duplicates based on certificate ID
@ -108,21 +110,61 @@ class CertificateChecker:
final_certs = list(unique_certs.values())
if final_certs:
logger.info(f"📜 Found {len(final_certs)} unique certificates for {domain}")
logger.info(f" Found {len(final_certs)} unique certificates for {domain}")
else:
logger.debug(f" No certificates found for {domain}")
logger.debug(f" No certificates found for {domain}")
return final_certs
def _query_crt_sh(self, query: str) -> List[Certificate]:
"""Query crt.sh API with retry logic and better error handling."""
def _query_crt_sh(self, query: str, operation_id: str) -> List[Certificate]:
"""Query crt.sh API with HTTP caching and forensic tracking."""
# Check HTTP cache first
cache_key = f"{query}:{operation_id}" # Include operation_id in cache key
if query in self._http_cache:
logger.debug(f"Using cached HTTP result for crt.sh query: {query}")
# Need to create new Certificate objects with current operation_id
cached_certs = self._http_cache[query]
return [
Certificate(
id=cert.id,
issuer=cert.issuer,
subject=cert.subject,
not_before=cert.not_before,
not_after=cert.not_after,
is_wildcard=cert.is_wildcard,
operation_id=operation_id # Use current operation_id
) for cert in cached_certs
]
# Not cached, make the HTTP request
certificates = self._make_http_request(query, operation_id)
# Cache the HTTP result (without operation_id)
if certificates:
self._http_cache[query] = [
Certificate(
id=cert.id,
issuer=cert.issuer,
subject=cert.subject,
not_before=cert.not_before,
not_after=cert.not_after,
is_wildcard=cert.is_wildcard,
operation_id="" # Cache without operation_id
) for cert in certificates
]
return certificates
def _make_http_request(self, query: str, operation_id: str) -> List[Certificate]:
"""Make actual HTTP request to crt.sh API with retry logic and forensic tracking."""
certificates = []
self._rate_limit()
logger.debug(f"📡 Querying crt.sh for: {query}")
logger.debug(f"🌐 Making HTTP request to crt.sh for: {query} (operation: {operation_id})")
max_retries = 2 # Reduced retries for faster failure
backoff_delays = [1, 3] # Shorter delays
max_retries = 2
backoff_delays = [1, 3]
for attempt in range(max_retries):
try:
@ -143,54 +185,53 @@ class CertificateChecker:
if response.status_code == 200:
try:
data = response.json()
logger.debug(f"📊 crt.sh returned {len(data)} certificate entries for {query}")
logger.debug(f"crt.sh returned {len(data)} certificate entries for {query}")
for cert_data in data:
try:
# Parse dates with better error handling
not_before = self._parse_date(cert_data.get('not_before'))
not_after = self._parse_date(cert_data.get('not_after'))
if not_before and not_after:
# Create Certificate with forensic metadata
certificate = Certificate(
id=cert_data.get('id'),
issuer=cert_data.get('issuer_name', ''),
subject=cert_data.get('name_value', ''),
not_before=not_before,
not_after=not_after,
is_wildcard='*.' in cert_data.get('name_value', '')
is_wildcard='*.' in cert_data.get('name_value', ''),
operation_id=operation_id # Forensic tracking
)
certificates.append(certificate)
logger.debug(f" Parsed certificate ID {certificate.id} for {query}")
logger.debug(f"📋 Parsed certificate ID {certificate.id} for {query}")
else:
logger.debug(f"⚠️ Skipped certificate with invalid dates: {cert_data.get('id')}")
logger.debug(f"Skipped certificate with invalid dates: {cert_data.get('id')}")
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"⚠️ Error parsing certificate data: {e}")
continue # Skip malformed certificate data
logger.debug(f"Error parsing certificate data: {e}")
continue
# Success! Reset connection failure counter
self.connection_failures = 0
logger.info(f"✅ Successfully processed {len(certificates)} certificates from crt.sh for {query}")
return certificates
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}")
logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
return certificates
elif response.status_code == 404:
# 404 is normal - no certificates found
logger.debug(f" No certificates found for {query} (404)")
self.connection_failures = 0 # Reset counter for successful connection
self.connection_failures = 0
return certificates
elif response.status_code == 429:
logger.warning(f"⚠️ crt.sh rate limit exceeded for {query}")
if attempt < max_retries - 1:
time.sleep(5) # Wait longer for rate limits
time.sleep(5)
continue
return certificates
@ -203,9 +244,8 @@ class CertificateChecker:
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
error_type = "Connection Error" if isinstance(e, requests.exceptions.ConnectionError) else "Timeout"
logger.warning(f"🌐 crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}")
logger.warning(f"⚠️ crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}")
# Track connection failures
if isinstance(e, requests.exceptions.ConnectionError):
self.connection_failures += 1
@ -224,8 +264,7 @@ class CertificateChecker:
time.sleep(backoff_delays[attempt])
continue
# If we get here, all retries failed
logger.warning(f"❌ All {max_retries} attempts failed for crt.sh query: {query}")
logger.warning(f"⚠️ All {max_retries} attempts failed for crt.sh query: {query}")
return certificates
def _parse_date(self, date_str: str) -> Optional[datetime]:
@ -233,13 +272,12 @@ class CertificateChecker:
if not date_str:
return None
# Common date formats from crt.sh
date_formats = [
'%Y-%m-%dT%H:%M:%S', # ISO format without timezone
'%Y-%m-%dT%H:%M:%SZ', # ISO format with Z
'%Y-%m-%d %H:%M:%S', # Space separated
'%Y-%m-%dT%H:%M:%S.%f', # With microseconds
'%Y-%m-%dT%H:%M:%S.%fZ', # With microseconds and Z
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%fZ',
]
for fmt in date_formats:
@ -248,13 +286,12 @@ class CertificateChecker:
except ValueError:
continue
# Try with timezone info
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
pass
logger.debug(f"⚠️ Could not parse date: {date_str}")
logger.debug(f"Could not parse date: {date_str}")
return None
def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]:
@ -265,7 +302,6 @@ class CertificateChecker:
for cert in certificates:
# Parse subject field for domain names
# Certificate subjects can be multi-line with multiple domains
subject_lines = cert.subject.split('\n')
for line in subject_lines:
@ -273,7 +309,7 @@ class CertificateChecker:
# Skip wildcard domains for recursion (they don't resolve directly)
if line.startswith('*.'):
logger.debug(f"🌿 Skipping wildcard domain: {line}")
logger.debug(f" Skipping wildcard domain: {line}")
continue
if self._is_valid_domain(line):
@ -283,7 +319,7 @@ class CertificateChecker:
if subdomains:
logger.info(f"🌿 Extracted {len(subdomains)} subdomains from certificates")
else:
logger.debug(" No subdomains extracted from certificates")
logger.debug(" No subdomains extracted from certificates")
return subdomains
@ -292,20 +328,17 @@ class CertificateChecker:
if not domain or '.' not in domain:
return False
# Remove common prefixes
domain = domain.lower().strip()
if domain.startswith('www.'):
domain = domain[4:]
# Basic validation
if len(domain) < 3 or len(domain) > 255:
return False
# Must not be an IP address
try:
import socket
socket.inet_aton(domain)
return False # It's an IPv4 address
return False
except socket.error:
pass
@ -314,7 +347,6 @@ class CertificateChecker:
if len(parts) < 2:
return False
# Each part should be reasonable
for part in parts:
if len(part) < 1 or len(part) > 63:
return False

View File

@ -1,38 +1,85 @@
# File: src/data_structures.py
"""Data structures for storing reconnaissance results."""
"""Enhanced data structures for forensic DNS reconnaissance with full provenance tracking."""
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Any
from typing import Dict, List, Set, Optional, Any, Tuple
from datetime import datetime
from enum import Enum
import json
import logging
import uuid
# Set up logging for this module
logger = logging.getLogger(__name__)
class OperationType(Enum):
"""Types of discovery operations."""
INITIAL_TARGET = "initial_target"
TLD_EXPANSION = "tld_expansion"
DNS_A = "dns_a"
DNS_AAAA = "dns_aaaa"
DNS_MX = "dns_mx"
DNS_NS = "dns_ns"
DNS_TXT = "dns_txt"
DNS_CNAME = "dns_cname"
DNS_SOA = "dns_soa"
DNS_PTR = "dns_ptr"
DNS_SRV = "dns_srv"
DNS_CAA = "dns_caa"
DNS_DNSKEY = "dns_dnskey"
DNS_DS = "dns_ds"
DNS_RRSIG = "dns_rrsig"
DNS_NSEC = "dns_nsec"
DNS_NSEC3 = "dns_nsec3"
DNS_REVERSE = "dns_reverse"
CERTIFICATE_CHECK = "certificate_check"
SHODAN_LOOKUP = "shodan_lookup"
VIRUSTOTAL_IP = "virustotal_ip"
VIRUSTOTAL_DOMAIN = "virustotal_domain"
class DiscoveryMethod(Enum):
"""How a hostname was discovered."""
INITIAL_TARGET = "initial_target"
TLD_EXPANSION = "tld_expansion"
DNS_RECORD_VALUE = "dns_record_value"
CERTIFICATE_SUBJECT = "certificate_subject"
DNS_SUBDOMAIN_EXTRACTION = "dns_subdomain_extraction"
@dataclass
class DNSRecord:
"""DNS record information."""
"""DNS record information with enhanced metadata."""
record_type: str
value: str
ttl: Optional[int] = None
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
'record_type': self.record_type,
'value': self.value,
'ttl': self.ttl
'ttl': self.ttl,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
}
@dataclass
class Certificate:
"""Certificate information from crt.sh."""
"""Certificate information with enhanced metadata."""
id: int
issuer: str
subject: str
not_before: datetime
not_after: datetime
is_wildcard: bool = False
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
is_valid_now: bool = field(default=True) # Based on current timestamp vs cert validity
def __post_init__(self):
"""Calculate if certificate is currently valid."""
now = datetime.now()
self.is_valid_now = self.not_before <= now <= self.not_after
def to_dict(self) -> dict:
return {
@ -41,17 +88,22 @@ class Certificate:
'subject': self.subject,
'not_before': self.not_before.isoformat() if self.not_before else None,
'not_after': self.not_after.isoformat() if self.not_after else None,
'is_wildcard': self.is_wildcard
'is_wildcard': self.is_wildcard,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id,
'is_valid_now': self.is_valid_now
}
@dataclass
class ShodanResult:
"""Shodan scan result."""
"""Shodan scan result with metadata."""
ip: str
ports: List[int]
services: Dict[str, Any]
organization: Optional[str] = None
country: Optional[str] = None
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
@ -59,17 +111,21 @@ class ShodanResult:
'ports': self.ports,
'services': self.services,
'organization': self.organization,
'country': self.country
'country': self.country,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
}
@dataclass
class VirusTotalResult:
"""VirusTotal scan result."""
"""VirusTotal scan result with metadata."""
resource: str # IP or domain
positives: int
total: int
scan_date: datetime
permalink: str
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
@ -77,118 +133,331 @@ class VirusTotalResult:
'positives': self.positives,
'total': self.total,
'scan_date': self.scan_date.isoformat() if self.scan_date else None,
'permalink': self.permalink
'permalink': self.permalink,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
}
@dataclass
class ReconData:
"""Main data structure for reconnaissance results."""
class DiscoveryOperation:
"""A single discovery operation with complete metadata."""
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
operation_type: OperationType = OperationType.INITIAL_TARGET
target: str = ""
timestamp: datetime = field(default_factory=datetime.now)
success: bool = True
error_message: Optional[str] = None
# Core data
hostnames: Set[str] = field(default_factory=set)
# Results of the operation
dns_records: List[DNSRecord] = field(default_factory=list)
certificates: List[Certificate] = field(default_factory=list)
shodan_results: List[ShodanResult] = field(default_factory=list)
virustotal_results: List[VirusTotalResult] = field(default_factory=list)
discovered_hostnames: List[str] = field(default_factory=list) # New hostnames found
discovered_ips: List[str] = field(default_factory=list) # New IPs found
# Operation-specific metadata
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
'operation_id': self.operation_id,
'operation_type': self.operation_type.value,
'target': self.target,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'success': self.success,
'error_message': self.error_message,
'dns_records': [record.to_dict() for record in self.dns_records],
'certificates': [cert.to_dict() for cert in self.certificates],
'shodan_results': [result.to_dict() for result in self.shodan_results],
'virustotal_results': [result.to_dict() for result in self.virustotal_results],
'discovered_hostnames': self.discovered_hostnames,
'discovered_ips': self.discovered_ips,
'metadata': self.metadata
}
@dataclass
class DiscoveryEdge:
"""Represents a discovery relationship between two hostnames."""
source_hostname: str # The hostname that led to the discovery
target_hostname: str # The hostname that was discovered
discovery_method: DiscoveryMethod
operation_id: str
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict) # Additional context
def to_dict(self) -> dict:
return {
'source_hostname': self.source_hostname,
'target_hostname': self.target_hostname,
'discovery_method': self.discovery_method.value,
'operation_id': self.operation_id,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'metadata': self.metadata
}
@dataclass
class DiscoveryNode:
"""A discovered hostname with complete provenance and current state."""
hostname: str
depth: int = 0
first_seen: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
# Current validity state
dns_exists: Optional[bool] = None # Does this hostname resolve?
last_dns_check: Optional[datetime] = None
resolved_ips: Set[str] = field(default_factory=set)
# Discovery provenance - can have multiple discovery paths
discovery_methods: Set[DiscoveryMethod] = field(default_factory=set)
discovered_by_operations: Set[str] = field(default_factory=set) # operation_ids
# Associated data (current state)
dns_records_by_type: Dict[str, List[DNSRecord]] = field(default_factory=dict)
certificates: List[Certificate] = field(default_factory=list)
shodan_results: List[ShodanResult] = field(default_factory=list)
virustotal_results: List[VirusTotalResult] = field(default_factory=list)
reverse_dns: Optional[str] = None
def add_dns_record(self, record: DNSRecord) -> None:
"""Add a DNS record to this node."""
record_type = record.record_type
if record_type not in self.dns_records_by_type:
self.dns_records_by_type[record_type] = []
self.dns_records_by_type[record_type].append(record)
self.last_updated = datetime.now()
# Update resolved IPs if this is an A or AAAA record
if record_type in ['A', 'AAAA']:
self.resolved_ips.add(record.value)
self.dns_exists = True
self.last_dns_check = record.discovered_at
def add_certificate(self, certificate: Certificate) -> None:
"""Add a certificate to this node."""
self.certificates.append(certificate)
self.last_updated = datetime.now()
def get_all_dns_records(self) -> List[DNSRecord]:
"""Get all DNS records for this node."""
all_records = []
for records in self.dns_records_by_type.values():
all_records.extend(records)
return all_records
def get_current_certificates(self) -> List[Certificate]:
"""Get currently valid certificates."""
return [cert for cert in self.certificates if cert.is_valid_now]
def get_expired_certificates(self) -> List[Certificate]:
"""Get expired certificates."""
return [cert for cert in self.certificates if not cert.is_valid_now]
def to_dict(self) -> dict:
return {
'hostname': self.hostname,
'depth': self.depth,
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
'last_updated': self.last_updated.isoformat() if self.last_updated else None,
'dns_exists': self.dns_exists,
'last_dns_check': self.last_dns_check.isoformat() if self.last_dns_check else None,
'resolved_ips': sorted(list(self.resolved_ips)),
'discovery_methods': [method.value for method in self.discovery_methods],
'discovered_by_operations': sorted(list(self.discovered_by_operations)),
'dns_records_by_type': {
record_type: [record.to_dict() for record in records]
for record_type, records in self.dns_records_by_type.items()
},
'certificates': [cert.to_dict() for cert in self.certificates],
'current_certificates': [cert.to_dict() for cert in self.get_current_certificates()],
'expired_certificates': [cert.to_dict() for cert in self.get_expired_certificates()],
'shodan_results': [result.to_dict() for result in self.shodan_results],
'virustotal_results': [result.to_dict() for result in self.virustotal_results],
'reverse_dns': self.reverse_dns
}
@dataclass
class ForensicReconData:
"""Enhanced reconnaissance data with full forensic tracking and graph structure."""
# Core graph structure
nodes: Dict[str, DiscoveryNode] = field(default_factory=dict) # hostname -> node
edges: List[DiscoveryEdge] = field(default_factory=list)
operations: Dict[str, DiscoveryOperation] = field(default_factory=dict) # operation_id -> operation
# Quick lookup indexes
ip_addresses: Set[str] = field(default_factory=set)
# DNS information
dns_records: Dict[str, List[DNSRecord]] = field(default_factory=dict)
reverse_dns: Dict[str, str] = field(default_factory=dict)
# Certificate information
certificates: Dict[str, List[Certificate]] = field(default_factory=dict)
# External service results
shodan_results: Dict[str, ShodanResult] = field(default_factory=dict)
virustotal_results: Dict[str, VirusTotalResult] = field(default_factory=dict)
operation_timeline: List[str] = field(default_factory=list) # ordered operation_ids
# Metadata
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
depth_map: Dict[str, int] = field(default_factory=dict) # Track recursion depth
scan_config: Dict[str, Any] = field(default_factory=dict)
def add_hostname(self, hostname: str, depth: int = 0) -> None:
"""Add a hostname to the dataset."""
def add_node(self, hostname: str, depth: int = 0,
discovery_method: DiscoveryMethod = DiscoveryMethod.INITIAL_TARGET,
operation_id: Optional[str] = None) -> DiscoveryNode:
"""Add or get a discovery node."""
hostname = hostname.lower()
self.hostnames.add(hostname)
self.depth_map[hostname] = depth
logger.info(f"Added hostname: {hostname} (depth: {depth})")
def add_ip_address(self, ip: str) -> None:
"""Add an IP address to the dataset."""
self.ip_addresses.add(ip)
logger.info(f"Added IP address: {ip}")
if hostname not in self.nodes:
node = DiscoveryNode(hostname=hostname, depth=depth)
self.nodes[hostname] = node
logger.debug(f"Created new node: {hostname} at depth {depth}")
else:
node = self.nodes[hostname]
# Update depth if this is a shallower discovery
if depth < node.depth:
node.depth = depth
logger.debug(f"Updated node {hostname} depth: {node.depth} -> {depth}")
def add_dns_record(self, hostname: str, record: DNSRecord) -> None:
"""Add a DNS record for a hostname."""
# Track discovery method and operation
node.discovery_methods.add(discovery_method)
if operation_id:
node.discovered_by_operations.add(operation_id)
return node
def add_edge(self, source: str, target: str, discovery_method: DiscoveryMethod,
operation_id: str, metadata: Dict[str, Any] = None) -> None:
"""Add a discovery edge."""
edge = DiscoveryEdge(
source_hostname=source.lower(),
target_hostname=target.lower(),
discovery_method=discovery_method,
operation_id=operation_id,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.edges.append(edge)
logger.debug(f"Added edge: {source} -> {target} via {discovery_method.value}")
def add_operation(self, operation: DiscoveryOperation) -> None:
"""Add an operation to the timeline."""
self.operations[operation.operation_id] = operation
self.operation_timeline.append(operation.operation_id)
logger.debug(f"Added operation: {operation.operation_type.value} on {operation.target}")
def get_node(self, hostname: str) -> Optional[DiscoveryNode]:
"""Get a node by hostname."""
return self.nodes.get(hostname.lower())
def get_children(self, hostname: str) -> List[str]:
"""Get all hostnames discovered from this hostname."""
hostname = hostname.lower()
if hostname not in self.dns_records:
self.dns_records[hostname] = []
self.dns_records[hostname].append(record)
logger.debug(f"Added DNS record for {hostname}: {record.record_type} -> {record.value}")
children = []
for edge in self.edges:
if edge.source_hostname == hostname:
children.append(edge.target_hostname)
return children
def add_shodan_result(self, ip: str, result: ShodanResult) -> None:
"""Add Shodan result."""
self.shodan_results[ip] = result
logger.info(f"Added Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}")
def get_parents(self, hostname: str) -> List[str]:
"""Get all hostnames that led to discovering this hostname."""
hostname = hostname.lower()
parents = []
for edge in self.edges:
if edge.target_hostname == hostname:
parents.append(edge.source_hostname)
return parents
def add_virustotal_result(self, resource: str, result: VirusTotalResult) -> None:
"""Add VirusTotal result."""
self.virustotal_results[resource] = result
logger.info(f"Added VirusTotal result for {resource}: {result.positives}/{result.total} detections")
def get_discovery_path(self, hostname: str) -> List[Tuple[str, str, DiscoveryMethod]]:
"""Get the discovery path(s) to a hostname."""
hostname = hostname.lower()
paths = []
def get_new_subdomains(self, max_depth: int) -> Set[str]:
"""Get subdomains that haven't been processed yet and are within depth limit."""
new_domains = set()
for hostname in self.hostnames:
if (hostname not in self.dns_records and
self.depth_map.get(hostname, 0) < max_depth):
new_domains.add(hostname)
return new_domains
def trace_path(current: str, visited: Set[str], current_path: List):
if current in visited:
return # Avoid cycles
visited.add(current)
parents = self.get_parents(current)
if not parents:
# This is a root node, we have a complete path
paths.append(current_path.copy())
else:
for parent in parents:
# Find the edge that connects parent to current
for edge in self.edges:
if edge.source_hostname == parent and edge.target_hostname == current:
new_path = [(parent, current, edge.discovery_method)] + current_path
trace_path(parent, visited.copy(), new_path)
trace_path(hostname, set(), [])
return paths
def get_stats(self) -> Dict[str, int]:
"""Get current statistics."""
total_dns_records = sum(len(node.get_all_dns_records()) for node in self.nodes.values())
total_certificates = sum(len(node.certificates) for node in self.nodes.values())
current_certificates = sum(len(node.get_current_certificates()) for node in self.nodes.values())
expired_certificates = sum(len(node.get_expired_certificates()) for node in self.nodes.values())
total_shodan = sum(len(node.shodan_results) for node in self.nodes.values())
total_virustotal = sum(len(node.virustotal_results) for node in self.nodes.values())
return {
'hostnames': len(self.hostnames),
'hostnames': len(self.nodes),
'ip_addresses': len(self.ip_addresses),
'dns_records': sum(len(records) for records in self.dns_records.values()),
'certificates': sum(len(certs) for certs in self.certificates.values()),
'shodan_results': len(self.shodan_results),
'virustotal_results': len(self.virustotal_results)
'discovery_edges': len(self.edges),
'operations_performed': len(self.operations),
'dns_records': total_dns_records,
'certificates_total': total_certificates,
'certificates_current': current_certificates,
'certificates_expired': expired_certificates,
'shodan_results': total_shodan,
'virustotal_results': total_virustotal
}
def to_dict(self) -> dict:
"""Export data as a serializable dictionary."""
logger.debug(f"Serializing ReconData with stats: {self.get_stats()}")
logger.info(f"Serializing ForensicReconData with {len(self.nodes)} nodes, {len(self.edges)} edges, {len(self.operations)} operations")
result = {
'hostnames': sorted(list(self.hostnames)),
return {
'nodes': {hostname: node.to_dict() for hostname, node in self.nodes.items()},
'edges': [edge.to_dict() for edge in self.edges],
'operations': {op_id: op.to_dict() for op_id, op in self.operations.items()},
'operation_timeline': self.operation_timeline,
'ip_addresses': sorted(list(self.ip_addresses)),
'dns_records': {
host: [record.to_dict() for record in records]
for host, records in self.dns_records.items()
},
'reverse_dns': dict(self.reverse_dns),
'certificates': {
host: [cert.to_dict() for cert in certs]
for host, certs in self.certificates.items()
},
'shodan_results': {
ip: result.to_dict() for ip, result in self.shodan_results.items()
},
'virustotal_results': {
resource: result.to_dict() for resource, result in self.virustotal_results.items()
},
'depth_map': dict(self.depth_map),
'metadata': {
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None,
'scan_config': self.scan_config,
'stats': self.get_stats()
}
},
'graph_analysis': self._generate_graph_analysis()
}
logger.info(f"Serialized data contains: {len(result['hostnames'])} hostnames, "
f"{len(result['ip_addresses'])} IPs, {len(result['shodan_results'])} Shodan results, "
f"{len(result['virustotal_results'])} VirusTotal results")
def _generate_graph_analysis(self) -> Dict[str, Any]:
"""Generate graph analysis metadata."""
# Depth distribution
depth_distribution = {}
for node in self.nodes.values():
depth = node.depth
depth_distribution[depth] = depth_distribution.get(depth, 0) + 1
return result
# Root nodes (no parents)
root_nodes = [hostname for hostname in self.nodes.keys()
if not self.get_parents(hostname)]
# Leaf nodes (no children)
leaf_nodes = [hostname for hostname in self.nodes.keys()
if not self.get_children(hostname)]
# Discovery method distribution
method_distribution = {}
for edge in self.edges:
method = edge.discovery_method.value
method_distribution[method] = method_distribution.get(method, 0) + 1
return {
'depth_distribution': depth_distribution,
'max_depth': max(depth_distribution.keys()) if depth_distribution else 0,
'root_nodes': root_nodes,
'leaf_nodes': leaf_nodes,
'discovery_method_distribution': method_distribution,
'total_discovery_paths': len(self.edges)
}
def to_json(self) -> str:
"""Export data as JSON."""
@ -196,9 +465,11 @@ class ReconData:
return json.dumps(self.to_dict(), indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to serialize to JSON: {e}")
# Return minimal JSON in case of error
return json.dumps({
'error': str(e),
'stats': self.get_stats(),
'timestamp': datetime.now().isoformat()
}, indent=2)
# Backward compatibility aliases (for gradual migration)
ReconData = ForensicReconData

View File

@ -1,5 +1,5 @@
# File: src/dns_resolver.py
"""DNS resolution functionality with enhanced TLD testing."""
"""DNS resolution functionality with enhanced TLD testing and forensic operation tracking."""
import dns.resolver
import dns.reversename
@ -9,6 +9,7 @@ from typing import List, Dict, Optional, Set
import socket
import time
import logging
import uuid
from .data_structures import DNSRecord, ReconData
from .config import Config
@ -16,7 +17,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class DNSResolver:
"""DNS resolution and record lookup with optimized TLD testing."""
"""DNS resolution and record lookup with optimized TLD testing and forensic tracking."""
# All DNS record types to query
RECORD_TYPES = [
@ -80,7 +81,7 @@ class DNSResolver:
return ips
def resolve_hostname(self, hostname: str) -> List[str]:
def resolve_hostname(self, hostname: str, operation_id: Optional[str] = None) -> List[str]:
"""Resolve hostname to IP addresses (full resolution with retries)."""
ips = []
@ -126,12 +127,16 @@ class DNSResolver:
return unique_ips
def get_all_dns_records(self, hostname: str) -> List[DNSRecord]:
"""Get all DNS records for a hostname."""
def get_all_dns_records(self, hostname: str, operation_id: Optional[str] = None) -> List[DNSRecord]:
"""Get all DNS records for a hostname with forensic tracking."""
records = []
successful_queries = 0
logger.debug(f"📋 Getting all DNS records for: {hostname}")
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"📋 Getting all DNS records for: {hostname} (operation: {operation_id})")
for record_type in self.RECORD_TYPES:
type_found = False
@ -145,11 +150,15 @@ class DNSResolver:
try:
answers = resolver.resolve(hostname, record_type)
for answer in answers:
records.append(DNSRecord(
# Create DNSRecord with forensic metadata
record = DNSRecord(
record_type=record_type,
value=str(answer),
ttl=answers.ttl
))
ttl=answers.ttl,
operation_id=operation_id # Forensic tracking
)
records.append(record)
if not type_found:
logger.debug(f"✅ Found {record_type} record for {hostname}: {answer}")
type_found = True
@ -179,9 +188,56 @@ class DNSResolver:
return records
def reverse_dns_lookup(self, ip: str) -> Optional[str]:
"""Perform reverse DNS lookup."""
logger.debug(f"🔍 Reverse DNS lookup for: {ip}")
def query_specific_record_type(self, hostname: str, record_type: str, operation_id: Optional[str] = None) -> List[DNSRecord]:
"""Query a specific DNS record type with forensic tracking."""
records = []
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🎯 Querying {record_type} records for {hostname} (operation: {operation_id})")
for dns_server in self.config.DNS_SERVERS:
self._rate_limit()
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = self.config.DNS_TIMEOUT
try:
answers = resolver.resolve(hostname, record_type)
for answer in answers:
# Create DNSRecord with forensic metadata
record = DNSRecord(
record_type=record_type,
value=str(answer),
ttl=answers.ttl,
operation_id=operation_id # Forensic tracking
)
records.append(record)
logger.debug(f"{record_type} record for {hostname}: {answer}")
break # Found records, no need to query other DNS servers
except dns.resolver.NXDOMAIN:
logger.debug(f"❌ NXDOMAIN for {hostname} {record_type} on {dns_server}")
break # Domain doesn't exist, no point checking other servers
except dns.resolver.NoAnswer:
logger.debug(f"⚠️ No {record_type} record for {hostname} on {dns_server}")
continue # Try next DNS server
except dns.resolver.Timeout:
logger.debug(f"⏱️ Timeout for {hostname} {record_type} on {dns_server}")
continue # Try next DNS server
except Exception as e:
logger.debug(f"⚠️ Error querying {record_type} for {hostname} on {dns_server}: {e}")
continue # Try next DNS server
logger.debug(f"🎯 Found {len(records)} {record_type} records for {hostname}")
return records
def reverse_dns_lookup(self, ip: str, operation_id: Optional[str] = None) -> Optional[str]:
"""Perform reverse DNS lookup with forensic tracking."""
logger.debug(f"🔍 Reverse DNS lookup for: {ip} (operation: {operation_id or 'auto'})")
try:
self._rate_limit()

View File

@ -1,5 +1,5 @@
# File: src/main.py
"""Main CLI interface for the reconnaissance tool."""
"""Main CLI interface for the forensic reconnaissance tool with two-mode operation."""
import click
import json
@ -7,8 +7,8 @@ import sys
import logging
from pathlib import Path
from .config import Config
from .reconnaissance import ReconnaissanceEngine
from .report_generator import ReportGenerator
from .reconnaissance import ForensicReconnaissanceEngine
from .report_generator import ForensicReportGenerator
from .web_app import create_app
# Module logger
@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@click.option('--web', is_flag=True, help='Start web interface instead of CLI')
@click.option('--shodan-key', help='Shodan API key')
@click.option('--virustotal-key', help='VirusTotal API key')
@click.option('--max-depth', default=2, help='Maximum recursion depth (default: 2)')
@click.option('--max-depth', default=2, help='Maximum recursion depth for full domain mode (default: 2)')
@click.option('--output', '-o', help='Output file prefix (will create .json and .txt files)')
@click.option('--json-only', is_flag=True, help='Only output JSON')
@click.option('--text-only', is_flag=True, help='Only output text report')
@ -27,14 +27,25 @@ logger = logging.getLogger(__name__)
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging (DEBUG level)')
@click.option('--quiet', '-q', is_flag=True, help='Quiet mode (WARNING level only)')
def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, text_only, port, verbose, quiet):
"""DNS Reconnaissance Tool
"""Forensic DNS Reconnaissance Tool - Two-Mode Operation
MODE 1 - Hostname-only (e.g., 'cc24'):
Expands hostname to all TLDs (cc24.com, cc24.net, etc.)
No recursive enumeration to avoid third-party infrastructure noise
Perfect for discovering domains using a specific hostname
MODE 2 - Full domain (e.g., 'cc24.com'):
Full recursive reconnaissance with subdomain discovery
Maps complete infrastructure of the specified domain
Uses max-depth for recursive enumeration
Creates forensic-grade evidence chain with operation tracking
Examples:
recon example.com # Scan example.com
recon example # Try example.* for all TLDs
recon example.com --max-depth 3 # Deeper recursion
recon example.com -v # Verbose logging
recon --web # Start web interface
recon cc24 # Mode 1: Find all cc24.* domains (no recursion)
recon cc24.com # Mode 2: Map cc24.com infrastructure (with forensic tracking)
recon cc24.com --max-depth 3 # Mode 2: Deeper recursive enumeration
recon cc24 -v # Mode 1: Verbose TLD expansion
recon --web # Start web interface with forensic visualization
"""
# Determine log level
@ -51,19 +62,19 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
if web:
# Start web interface
logger.info("🌐 Starting web interface...")
logger.info("🌐 Starting forensic web interface...")
app = create_app(config)
logger.info(f"🚀 Web interface starting on http://0.0.0.0:{port}")
app.run(host='0.0.0.0', port=port, debug=False) # Changed debug to False to reduce noise
logger.info(f"🚀 Forensic web interface starting on http://0.0.0.0:{port}")
app.run(host='0.0.0.0', port=port, debug=False)
return
if not target:
click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.")
click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.")
sys.exit(1)
# Initialize reconnaissance engine
logger.info("🔧 Initializing reconnaissance engine...")
engine = ReconnaissanceEngine(config)
# Initialize forensic reconnaissance engine
logger.info("🔬 Initializing forensic reconnaissance engine...")
engine = ForensicReconnaissanceEngine(config)
# Set up progress callback for CLI
def progress_callback(message, percentage=None):
@ -75,56 +86,69 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
engine.set_progress_callback(progress_callback)
# Display startup information
click.echo("=" * 60)
click.echo("🔍 DNS RECONNAISSANCE TOOL")
click.echo("=" * 60)
click.echo(f"🎯 Target: {target}")
click.echo(f"📊 Max recursion depth: {max_depth}")
click.echo(f"🌐 DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}")
click.echo(f"⚡ DNS rate limit: {config.DNS_RATE_LIMIT}/s")
click.echo("=" * 80)
click.echo("FORENSIC DNS RECONNAISSANCE TOOL")
click.echo("=" * 80)
click.echo(f"Target: {target}")
# Show operation mode
if '.' in target:
click.echo(f"Mode: Full domain reconnaissance (recursive depth: {max_depth})")
click.echo(" → Will map complete infrastructure with forensic tracking")
click.echo(" → Creates evidence chain for each discovery operation")
else:
click.echo(f"Mode: Hostname-only reconnaissance (TLD expansion)")
click.echo(" → Will find all domains using this hostname (no recursion)")
click.echo(" → Limited forensic tracking for efficiency")
click.echo(f"DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}")
click.echo(f"DNS rate limit: {config.DNS_RATE_LIMIT}/s")
if shodan_key:
click.echo("✅ Shodan integration enabled")
logger.info(f"🕵️ Shodan API key provided (ends with: ...{shodan_key[-4:] if len(shodan_key) > 4 else shodan_key})")
click.echo("🕵️ Shodan integration enabled")
else:
click.echo("⚠️ Shodan integration disabled (no API key)")
click.echo("Shodan integration disabled (no API key)")
if virustotal_key:
click.echo("✅ VirusTotal integration enabled")
logger.info(f"🛡️ VirusTotal API key provided (ends with: ...{virustotal_key[-4:] if len(virustotal_key) > 4 else virustotal_key})")
click.echo("🛡️ VirusTotal integration enabled")
else:
click.echo("⚠️ VirusTotal integration disabled (no API key)")
click.echo("VirusTotal integration disabled (no API key)")
click.echo("")
# Run reconnaissance
# Run forensic reconnaissance
try:
logger.info(f"🚀 Starting reconnaissance for target: {target}")
logger.info(f"🎯 Starting forensic reconnaissance for target: {target}")
data = engine.run_reconnaissance(target)
# Display final statistics
stats = data.get_stats()
graph_analysis = data._generate_graph_analysis()
click.echo("")
click.echo("=" * 60)
click.echo("📊 RECONNAISSANCE COMPLETE")
click.echo("=" * 60)
click.echo(f"🏠 Hostnames discovered: {stats['hostnames']}")
click.echo(f"🌐 IP addresses found: {stats['ip_addresses']}")
click.echo(f"📋 DNS records collected: {stats['dns_records']}")
click.echo(f"📜 Certificates found: {stats['certificates']}")
click.echo(f"🕵️ Shodan results: {stats['shodan_results']}")
click.echo(f"🛡️ VirusTotal results: {stats['virustotal_results']}")
click.echo("=" * 80)
click.echo("FORENSIC RECONNAISSANCE COMPLETE")
click.echo("=" * 80)
click.echo(f"Hostnames discovered: {stats['hostnames']}")
click.echo(f"IP addresses found: {stats['ip_addresses']}")
click.echo(f"Discovery relationships: {stats['discovery_edges']}")
click.echo(f"Operations performed: {stats['operations_performed']}")
click.echo(f"DNS records collected: {stats['dns_records']}")
click.echo(f"Certificates found: {stats['certificates_total']} ({stats['certificates_current']} valid, {stats['certificates_expired']} expired)")
click.echo(f"Shodan results: {stats['shodan_results']}")
click.echo(f"VirusTotal results: {stats['virustotal_results']}")
click.echo(f"Maximum discovery depth: {graph_analysis['max_depth']}")
# Calculate and display timing
if data.end_time and data.start_time:
duration = data.end_time - data.start_time
click.echo(f"⏱️ Total time: {duration}")
click.echo(f"Total time: {duration}")
click.echo("")
# Generate reports
logger.info("📄 Generating reports...")
report_gen = ReportGenerator(data)
# Generate forensic reports
logger.info("📄 Generating forensic reports...")
report_gen = ForensicReportGenerator(data)
if output:
# Save to files
@ -137,7 +161,7 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
with open(json_file, 'w', encoding='utf-8') as f:
f.write(json_content)
saved_files.append(json_file)
logger.info(f"💾 JSON report saved: {json_file}")
logger.info(f"📄 Forensic JSON report saved: {json_file}")
except Exception as e:
logger.error(f"❌ Failed to save JSON report: {e}")
@ -147,14 +171,14 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
with open(text_file, 'w', encoding='utf-8') as f:
f.write(report_gen.generate_text_report())
saved_files.append(text_file)
logger.info(f"💾 Text report saved: {text_file}")
logger.info(f"📄 Forensic text report saved: {text_file}")
except Exception as e:
logger.error(f"❌ Failed to save text report: {e}")
if saved_files:
click.echo(f"💾 Reports saved:")
click.echo(f"📁 Forensic reports saved:")
for file in saved_files:
click.echo(f" 📄 {file}")
click.echo(f" {file}")
else:
# Output to stdout
@ -174,18 +198,19 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
# Default: show text report
try:
click.echo(report_gen.generate_text_report())
click.echo(f"\n💡 To get JSON output, use: --json-only")
click.echo(f"💡 To save reports, use: --output filename")
click.echo(f"\n📊 To get JSON output with full forensic data, use: --json-only")
click.echo(f"💾 To save reports, use: --output filename")
click.echo(f"🌐 For interactive visualization, use: --web")
except Exception as e:
logger.error(f"❌ Failed to generate report: {e}")
click.echo(f"Error generating report: {e}")
except KeyboardInterrupt:
logger.warning("⚠️ Reconnaissance interrupted by user")
click.echo("\n⚠️ Reconnaissance interrupted by user.")
logger.warning("⚠️ Forensic reconnaissance interrupted by user")
click.echo("\n🛑 Reconnaissance interrupted by user.")
sys.exit(1)
except Exception as e:
logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True)
logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True)
click.echo(f"❌ Error during reconnaissance: {e}")
if verbose:
raise # Re-raise in verbose mode to show full traceback

View File

@ -1,12 +1,15 @@
# File: src/reconnaissance.py
"""Main reconnaissance logic with enhanced TLD expansion."""
"""Enhanced reconnaissance logic with forensic-grade operation tracking and provenance."""
import threading
import concurrent.futures
import logging
from datetime import datetime
from typing import Set, List, Optional, Tuple
from .data_structures import ReconData
from typing import Set, List, Optional, Tuple, Dict, Any
from .data_structures import (
ForensicReconData, DiscoveryNode, DiscoveryOperation, DiscoveryEdge,
OperationType, DiscoveryMethod, DNSRecord, Certificate
)
from .config import Config
from .dns_resolver import DNSResolver
from .certificate_checker import CertificateChecker
@ -17,8 +20,8 @@ from .tld_fetcher import TLDFetcher
# Set up logging for this module
logger = logging.getLogger(__name__)
class ReconnaissanceEngine:
"""Main reconnaissance engine with smart TLD expansion."""
class ForensicReconnaissanceEngine:
"""Enhanced reconnaissance engine with complete forensic tracking and provenance."""
def __init__(self, config: Config):
self.config = config
@ -32,32 +35,32 @@ class ReconnaissanceEngine:
self.shodan_client = None
if config.shodan_key:
self.shodan_client = ShodanClient(config.shodan_key, config)
logger.info("✅ Shodan client initialized")
else:
logger.info("⚠️ Shodan API key not provided, skipping Shodan integration")
logger.info("🕵️ Shodan client initialized")
self.virustotal_client = None
if config.virustotal_key:
self.virustotal_client = VirusTotalClient(config.virustotal_key, config)
logger.info("✅ VirusTotal client initialized")
else:
logger.info("⚠️ VirusTotal API key not provided, skipping VirusTotal integration")
logger.info("🛡️ VirusTotal client initialized")
# Progress tracking
self.progress_callback = None
self._lock = threading.Lock()
# Shared data object for live updates
self.shared_data = None
# Track operation mode
self.is_hostname_only_mode = False
# Operation tracking
self.pending_operations = []
self.completed_operations = []
def set_progress_callback(self, callback):
"""Set callback for progress updates."""
self.progress_callback = callback
def set_shared_data(self, shared_data: ReconData):
"""Set shared data object for live updates during web interface usage."""
self.shared_data = shared_data
logger.info("📊 Using shared data object for live updates")
def set_shared_data(self, shared_data: ForensicReconData):
"""Set shared data object for live updates."""
self.data = shared_data
logger.info("📊 Using shared forensic data object for live updates")
def _update_progress(self, message: str, percentage: int = None):
"""Update progress if callback is set."""
@ -65,336 +68,603 @@ class ReconnaissanceEngine:
if self.progress_callback:
self.progress_callback(message, percentage)
def run_reconnaissance(self, target: str) -> ReconData:
"""Run full reconnaissance on target."""
# Use shared data object if available, otherwise create new one
if self.shared_data is not None:
self.data = self.shared_data
logger.info("📊 Using shared data object for reconnaissance")
else:
self.data = ReconData()
logger.info("📊 Created new data object for reconnaissance")
def run_reconnaissance(self, target: str) -> ForensicReconData:
"""Run forensic reconnaissance with complete operation tracking."""
# Initialize data structure
if not hasattr(self, 'data') or self.data is None:
self.data = ForensicReconData()
logger.info("🔬 Created new forensic data structure")
self.data.start_time = datetime.now()
self.data.scan_config = {
'target': target,
'max_depth': self.config.max_depth,
'dns_servers': self.config.DNS_SERVERS,
'shodan_enabled': self.shodan_client is not None,
'virustotal_enabled': self.virustotal_client is not None
}
logger.info(f"🚀 Starting reconnaissance for target: {target}")
logger.info(f"📊 Configuration: max_depth={self.config.max_depth}, "
f"DNS_rate={self.config.DNS_RATE_LIMIT}/s")
logger.info(f"🎯 Starting forensic reconnaissance for target: {target}")
try:
# Determine if target is hostname.tld or just hostname
# Determine operation mode and create initial operation
if '.' in target:
logger.info(f"🎯 Target '{target}' appears to be a full domain name")
self._update_progress(f"Starting reconnaissance for {target}", 0)
self.data.add_hostname(target, 0)
initial_targets = {target}
self.is_hostname_only_mode = False
logger.info(f"🔍 Full domain mode: {target}")
self._create_initial_operation(target, OperationType.INITIAL_TARGET)
else:
logger.info(f"🔍 Target '{target}' appears to be a hostname, expanding to all TLDs")
self._update_progress(f"Expanding {target} to all TLDs", 5)
initial_targets = self._expand_hostname_to_tlds_smart(target)
logger.info(f"📋 Found {len(initial_targets)} valid domains after TLD expansion")
self.is_hostname_only_mode = True
logger.info(f"🔍 Hostname expansion mode: {target}")
self._perform_tld_expansion(target)
self._update_progress("Resolving initial targets", 10)
# Process all discovery operations
self._process_discovery_queue()
# Process all targets recursively
self._process_targets_recursively(initial_targets)
# Final external lookups
self._update_progress("Performing external service lookups", 90)
# Perform external lookups for full domain mode
if not self.is_hostname_only_mode:
self._perform_external_lookups()
# Log final statistics
stats = self.data.get_stats()
logger.info(f"📈 Final statistics: {stats}")
self._update_progress("Reconnaissance complete", 100)
except Exception as e:
logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True)
raise
finally:
# Finalize
self.data.end_time = datetime.now()
duration = self.data.end_time - self.data.start_time
logger.info(f"⏱️ Total reconnaissance time: {duration}")
stats = self.data.get_stats()
logger.info(f"🏁 Forensic reconnaissance complete in {duration}")
logger.info(f"📊 Final stats: {stats}")
self._update_progress("Forensic reconnaissance complete", 100)
except Exception as e:
logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True)
raise
return self.data
def _expand_hostname_to_tlds_smart(self, hostname: str) -> Set[str]:
"""Smart TLD expansion with prioritization and parallel processing."""
logger.info(f"🌐 Starting smart TLD expansion for hostname: {hostname}")
def _create_initial_operation(self, target: str, operation_type: OperationType):
"""Create initial operation for the target."""
operation = DiscoveryOperation(
operation_type=operation_type,
target=target,
discovered_hostnames=[target]
)
self.data.add_operation(operation)
# Create initial node
node = self.data.add_node(
hostname=target,
depth=0,
discovery_method=DiscoveryMethod.INITIAL_TARGET,
operation_id=operation.operation_id
)
# Queue initial DNS operations
self._queue_dns_operations_for_hostname(target, 0)
logger.info(f"🔍 Created initial operation for {target}")
def _perform_tld_expansion(self, hostname: str):
"""Perform TLD expansion with forensic tracking."""
logger.info(f"🌐 Starting TLD expansion for: {hostname}")
# Create TLD expansion operation
operation = DiscoveryOperation(
operation_type=OperationType.TLD_EXPANSION,
target=hostname,
metadata={'expansion_type': 'smart_prioritized'}
)
self._update_progress(f"Expanding {hostname} to all TLDs", 5)
# Get prioritized TLD lists
priority_tlds, normal_tlds, deprioritized_tlds = self.tld_fetcher.get_prioritized_tlds()
logger.info(f"📊 TLD categories: {len(priority_tlds)} priority, "
f"{len(normal_tlds)} normal, {len(deprioritized_tlds)} deprioritized")
valid_domains = set()
# Phase 1: Check priority TLDs first (parallel processing)
logger.info("🚀 Phase 1: Checking priority TLDs...")
priority_results = self._check_tlds_parallel(hostname, priority_tlds, "priority")
# Phase 1: Priority TLDs
logger.info("🎯 Phase 1: Checking priority TLDs...")
priority_results = self._check_tlds_parallel(hostname, priority_tlds, operation.operation_id)
valid_domains.update(priority_results)
self._update_progress(f"Phase 1 complete: {len(priority_results)} priority TLD matches", 6)
# Phase 2: Check normal TLDs (if we found fewer than 5 results)
# Phase 2: Normal TLDs (if needed)
if len(valid_domains) < 5:
logger.info("🔍 Phase 2: Checking normal TLDs...")
normal_results = self._check_tlds_parallel(hostname, normal_tlds, "normal")
normal_results = self._check_tlds_parallel(hostname, normal_tlds, operation.operation_id)
valid_domains.update(normal_results)
self._update_progress(f"Phase 2 complete: {len(normal_results)} normal TLD matches", 8)
else:
logger.info(f"⏭️ Skipping normal TLDs (found {len(valid_domains)} matches in priority)")
# Phase 3: Check deprioritized TLDs only if we found very few results
# Phase 3: Deprioritized TLDs (if really needed)
if len(valid_domains) < 2:
logger.info("🔍 Phase 3: Checking deprioritized TLDs (limited results so far)...")
depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, "deprioritized")
logger.info("🔎 Phase 3: Checking deprioritized TLDs...")
depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, operation.operation_id)
valid_domains.update(depri_results)
self._update_progress(f"Phase 3 complete: {len(depri_results)} deprioritized TLD matches", 9)
else:
logger.info(f"⏭️ Skipping deprioritized TLDs (found {len(valid_domains)} matches already)")
# Update operation with results
operation.discovered_hostnames = list(valid_domains)
operation.success = len(valid_domains) > 0
self.data.add_operation(operation)
logger.info(f"🎯 Smart TLD expansion complete: found {len(valid_domains)} valid domains")
return valid_domains
# Create nodes for all discovered domains
for domain in valid_domains:
node = self.data.add_node(
hostname=domain,
depth=0,
discovery_method=DiscoveryMethod.TLD_EXPANSION,
operation_id=operation.operation_id
)
def _check_tlds_parallel(self, hostname: str, tlds: List[str], phase_name: str) -> Set[str]:
"""Check TLDs in parallel with optimized settings."""
# Add discovery edge (synthetic source for TLD expansion)
self.data.add_edge(
source=f"tld_expansion:{hostname}",
target=domain,
discovery_method=DiscoveryMethod.TLD_EXPANSION,
operation_id=operation.operation_id,
metadata={'original_hostname': hostname}
)
logger.info(f"✅ TLD expansion complete: {len(valid_domains)} domains found")
# Queue lightweight operations for hostname-only mode
for domain in valid_domains:
self._queue_lightweight_operations(domain)
def _check_tlds_parallel(self, hostname: str, tlds: List[str], operation_id: str) -> Set[str]:
"""Check TLDs in parallel with operation tracking."""
valid_domains = set()
tested_count = 0
wildcard_detected = set()
# Use thread pool for parallel processing
max_workers = min(20, len(tlds)) # Limit concurrent requests
logger.info(f"⚡ Starting parallel check of {len(tlds)} {phase_name} TLDs "
f"with {max_workers} workers")
max_workers = min(20, len(tlds))
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_tld = {
executor.submit(self._check_single_tld, hostname, tld): tld
executor.submit(self._check_single_tld_forensic, hostname, tld, operation_id): tld
for tld in tlds
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_tld):
tld = future_to_tld[future]
tested_count += 1
try:
result = future.result(timeout=10) # 10 second timeout per future
result = future.result(timeout=10)
if result:
full_hostname, ips = result
logger.info(f"✅ Valid domain found: {full_hostname} -> {ips}")
self.data.add_hostname(full_hostname, 0)
valid_domains.add(full_hostname)
domain, ips = result
valid_domains.add(domain)
# Track discovered IPs
for ip in ips:
self.data.add_ip_address(ip)
self.data.ip_addresses.add(ip)
# Progress update every 50 TLDs in this phase
if tested_count % 50 == 0:
logger.info(f"📊 {phase_name.title()} phase progress: "
f"{tested_count}/{len(tlds)} tested, "
f"{len(valid_domains)} found")
except concurrent.futures.TimeoutError:
logger.debug(f"⏱️ Timeout checking {hostname}.{tld}")
except Exception as e:
logger.debug(f"⚠️ Error checking {hostname}.{tld}: {e}")
logger.info(f"📊 {phase_name.title()} phase complete: "
f"tested {tested_count} TLDs, found {len(valid_domains)} valid domains, "
f"detected {len(wildcard_detected)} wildcards")
logger.debug(f"Error checking {hostname}.{tld}: {e}")
return valid_domains
def _check_single_tld(self, hostname: str, tld: str) -> Optional[Tuple[str, List[str]]]:
"""Check a single TLD combination with optimized DNS resolution."""
def _check_single_tld_forensic(self, hostname: str, tld: str, operation_id: str) -> Optional[Tuple[str, List[str]]]:
"""Check single TLD with forensic tracking."""
full_hostname = f"{hostname}.{tld}"
# Use faster DNS resolution with shorter timeout for TLD testing
# Quick DNS resolution
ips = self.dns_resolver.resolve_hostname_fast(full_hostname)
if ips:
logger.debug(f"{full_hostname} -> {ips}")
# Create DNS operation record
dns_operation = DiscoveryOperation(
operation_type=OperationType.DNS_A,
target=full_hostname,
discovered_ips=ips,
metadata={'tld_expansion': True, 'parent_operation': operation_id}
)
# Add DNS records
for ip in ips:
record = DNSRecord(
record_type='A',
value=ip,
operation_id=dns_operation.operation_id
)
dns_operation.dns_records.append(record)
self.data.add_operation(dns_operation)
return (full_hostname, ips)
return None
def _process_targets_recursively(self, targets: Set[str]):
"""Process targets with recursive subdomain discovery."""
current_depth = 0
def _queue_dns_operations_for_hostname(self, hostname: str, depth: int):
"""Queue comprehensive DNS operations for a hostname."""
# Map DNS record types to operation types
dns_operations = [
(OperationType.DNS_A, 'A'),
(OperationType.DNS_AAAA, 'AAAA'),
(OperationType.DNS_MX, 'MX'),
(OperationType.DNS_NS, 'NS'),
(OperationType.DNS_TXT, 'TXT'),
(OperationType.DNS_CNAME, 'CNAME'),
(OperationType.DNS_SOA, 'SOA'),
(OperationType.DNS_SRV, 'SRV'),
(OperationType.DNS_CAA, 'CAA'),
]
while current_depth <= self.config.max_depth and targets:
logger.info(f"🔄 Processing depth {current_depth} with {len(targets)} targets")
self._update_progress(f"Processing depth {current_depth} ({len(targets)} targets)", 15 + (current_depth * 25))
for operation_type, record_type in dns_operations:
self.pending_operations.append({
'type': 'dns_query',
'operation_type': operation_type,
'record_type': record_type,
'hostname': hostname,
'depth': depth
})
new_targets = set()
# Queue certificate operation
self.pending_operations.append({
'type': 'certificate_check',
'operation_type': OperationType.CERTIFICATE_CHECK,
'hostname': hostname,
'depth': depth
})
for target in targets:
logger.debug(f"🎯 Processing target: {target}")
logger.debug(f"📋 Queued {len(dns_operations) + 1} operations for {hostname}")
# DNS resolution and record gathering
self._process_single_target(target, current_depth)
def _queue_lightweight_operations(self, hostname: str):
"""Queue lightweight operations for hostname-only mode."""
# Only essential DNS operations
essential_operations = [
(OperationType.DNS_A, 'A'),
(OperationType.DNS_AAAA, 'AAAA'),
(OperationType.DNS_MX, 'MX'),
(OperationType.DNS_TXT, 'TXT'),
]
# Extract new subdomains
if current_depth < self.config.max_depth:
new_subdomains = self._extract_new_subdomains(target)
logger.debug(f"🌿 Found {len(new_subdomains)} new subdomains from {target}")
for operation_type, record_type in essential_operations:
self.pending_operations.append({
'type': 'dns_query',
'operation_type': operation_type,
'record_type': record_type,
'hostname': hostname,
'depth': 0
})
for subdomain in new_subdomains:
self.data.add_hostname(subdomain, current_depth + 1)
new_targets.add(subdomain)
logger.debug(f"📋 Queued {len(essential_operations)} lightweight operations for {hostname}")
logger.info(f"📊 Depth {current_depth} complete. Found {len(new_targets)} new targets for next depth")
targets = new_targets
current_depth += 1
def _process_discovery_queue(self):
"""Process all queued discovery operations."""
operation_count = 0
total_operations = len(self.pending_operations)
logger.info(f"🏁 Recursive processing complete after {current_depth} levels")
logger.info(f"⚙️ Processing {total_operations} operations")
def _process_single_target(self, hostname: str, depth: int):
"""Process a single target hostname."""
logger.debug(f"🎯 Processing single target: {hostname} at depth {depth}")
while self.pending_operations:
operation_spec = self.pending_operations.pop(0)
operation_count += 1
# Get all DNS records
dns_records = self.dns_resolver.get_all_dns_records(hostname)
logger.debug(f"📋 Found {len(dns_records)} DNS records for {hostname}")
# Calculate progress
progress = 20 + (operation_count * 60 // max(total_operations, 1))
for record in dns_records:
self.data.add_dns_record(hostname, record)
try:
if operation_spec['type'] == 'dns_query':
self._execute_dns_operation(operation_spec)
elif operation_spec['type'] == 'certificate_check':
self._execute_certificate_operation(operation_spec)
# Extract IP addresses from A and AAAA records
# Update progress periodically
if operation_count % 10 == 0:
self._update_progress(f"Processed {operation_count}/{total_operations} operations", progress)
except Exception as e:
logger.error(f"❌ Error processing operation {operation_spec}: {e}")
continue
logger.info(f"✅ Completed processing {operation_count} operations")
def _execute_dns_operation(self, operation_spec: Dict[str, Any]):
"""Execute a single DNS operation with full tracking."""
hostname = operation_spec['hostname']
record_type = operation_spec['record_type']
operation_type = operation_spec['operation_type']
depth = operation_spec['depth']
logger.debug(f"🔍 DNS {record_type} query for {hostname}")
# Create operation
operation = DiscoveryOperation(
operation_type=operation_type,
target=hostname,
metadata={'record_type': record_type, 'depth': depth}
)
try:
# Perform DNS query using the updated method with operation_id
records = self.dns_resolver.query_specific_record_type(hostname, record_type, operation.operation_id)
operation.dns_records = records
operation.success = len(records) > 0
# Process results
node = self.data.get_node(hostname)
if not node:
node = self.data.add_node(hostname, depth)
new_hostnames = set()
new_ips = set()
for record in records:
node.add_dns_record(record)
# Extract IPs and hostnames
if record.record_type in ['A', 'AAAA']:
self.data.add_ip_address(record.value)
# Get certificates
logger.debug(f"🔍 Checking certificates for {hostname}")
certificates = self.cert_checker.get_certificates(hostname)
if certificates:
self.data.certificates[hostname] = certificates
logger.info(f"📜 Found {len(certificates)} certificates for {hostname}")
new_ips.add(record.value)
self.data.ip_addresses.add(record.value)
elif record.record_type in ['MX', 'NS', 'CNAME']:
# Extract hostname from MX (format: "priority hostname")
if record.record_type == 'MX':
parts = record.value.split()
if len(parts) >= 2:
extracted_hostname = parts[-1].rstrip('.')
else:
logger.debug(f"❌ No certificates found for {hostname}")
continue
else:
extracted_hostname = record.value.rstrip('.')
def _extract_new_subdomains(self, hostname: str) -> Set[str]:
"""Extract new subdomains from DNS records and certificates."""
new_subdomains = set()
if self._is_valid_hostname(extracted_hostname) and extracted_hostname != hostname:
new_hostnames.add(extracted_hostname)
# From DNS records
if hostname in self.data.dns_records:
dns_subdomains = self.dns_resolver.extract_subdomains_from_dns(
self.data.dns_records[hostname]
# Update operation results
operation.discovered_hostnames = list(new_hostnames)
operation.discovered_ips = list(new_ips)
# Add discovery edges and queue new operations
for new_hostname in new_hostnames:
# Add node
new_node = self.data.add_node(
hostname=new_hostname,
depth=depth + 1,
discovery_method=DiscoveryMethod.DNS_RECORD_VALUE,
operation_id=operation.operation_id
)
new_subdomains.update(dns_subdomains)
logger.debug(f"🌐 Extracted {len(dns_subdomains)} subdomains from DNS records of {hostname}")
# From certificates
if hostname in self.data.certificates:
cert_subdomains = self.cert_checker.extract_subdomains_from_certificates(
self.data.certificates[hostname]
# Add edge
self.data.add_edge(
source=hostname,
target=new_hostname,
discovery_method=DiscoveryMethod.DNS_RECORD_VALUE,
operation_id=operation.operation_id,
metadata={'record_type': record_type}
)
new_subdomains.update(cert_subdomains)
logger.debug(f"🔍 Extracted {len(cert_subdomains)} subdomains from certificates of {hostname}")
# Filter out already known hostnames
filtered_subdomains = new_subdomains - self.data.hostnames
logger.debug(f"🆕 {len(filtered_subdomains)} new subdomains after filtering")
# Queue operations for new hostname (if not in hostname-only mode and within depth)
if not self.is_hostname_only_mode and depth + 1 <= self.config.max_depth:
self._queue_dns_operations_for_hostname(new_hostname, depth + 1)
return filtered_subdomains
logger.debug(f"✅ DNS {record_type} for {hostname}: {len(records)} records, {len(new_hostnames)} new hostnames")
except Exception as e:
operation.success = False
operation.error_message = str(e)
logger.debug(f"❌ DNS {record_type} query failed for {hostname}: {e}")
self.data.add_operation(operation)
def _execute_certificate_operation(self, operation_spec: Dict[str, Any]):
"""Execute certificate check operation."""
hostname = operation_spec['hostname']
depth = operation_spec['depth']
# Skip certificates in hostname-only mode
if self.is_hostname_only_mode:
logger.debug(f"⭐ Skipping certificate check for {hostname} (hostname-only mode)")
return
logger.debug(f"🔍 Certificate check for {hostname}")
operation = DiscoveryOperation(
operation_type=OperationType.CERTIFICATE_CHECK,
target=hostname,
metadata={'depth': depth}
)
try:
# Use updated method with operation_id
certificates = self.cert_checker.get_certificates(hostname, operation.operation_id)
operation.certificates = certificates
operation.success = len(certificates) > 0
# Process certificates
node = self.data.get_node(hostname)
if node:
new_hostnames = set()
for cert in certificates:
node.add_certificate(cert)
# Extract hostnames from certificate subjects
subject_hostnames = self.cert_checker.extract_subdomains_from_certificates([cert])
for subject_hostname in subject_hostnames:
if subject_hostname != hostname and self._is_valid_hostname(subject_hostname):
new_hostnames.add(subject_hostname)
# Update operation and create edges
operation.discovered_hostnames = list(new_hostnames)
for new_hostname in new_hostnames:
# Add node
new_node = self.data.add_node(
hostname=new_hostname,
depth=depth + 1,
discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT,
operation_id=operation.operation_id
)
# Add edge
self.data.add_edge(
source=hostname,
target=new_hostname,
discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT,
operation_id=operation.operation_id
)
# Queue operations for new hostname (within depth limit)
if depth + 1 <= self.config.max_depth:
self._queue_dns_operations_for_hostname(new_hostname, depth + 1)
logger.debug(f"✅ Certificates for {hostname}: {len(certificates)} certs, {len(new_hostnames)} new hostnames")
except Exception as e:
operation.success = False
operation.error_message = str(e)
logger.debug(f"❌ Certificate check failed for {hostname}: {e}")
self.data.add_operation(operation)
def _perform_external_lookups(self):
"""Perform Shodan and VirusTotal lookups."""
logger.info(f"🔍 Starting external lookups for {len(self.data.ip_addresses)} IPs and {len(self.data.hostnames)} hostnames")
"""Perform external service lookups with operation tracking."""
if self.is_hostname_only_mode:
logger.info("⭐ Skipping external lookups (hostname-only mode)")
return
# Reverse DNS for all IPs
logger.info("🔄 Performing reverse DNS lookups")
reverse_dns_count = 0
for ip in self.data.ip_addresses:
reverse = self.dns_resolver.reverse_dns_lookup(ip)
if reverse:
self.data.reverse_dns[ip] = reverse
reverse_dns_count += 1
logger.debug(f"🔙 Reverse DNS for {ip}: {reverse}")
self._update_progress("Performing external service lookups", 85)
logger.info(f"✅ Completed reverse DNS: {reverse_dns_count}/{len(self.data.ip_addresses)} successful")
# Reverse DNS
self._perform_reverse_dns_lookups()
# Shodan lookups
if self.shodan_client:
logger.info(f"🕵️ Starting Shodan lookups for {len(self.data.ip_addresses)} IPs")
shodan_success_count = 0
for ip in self.data.ip_addresses:
try:
logger.debug(f"🔍 Querying Shodan for IP: {ip}")
result = self.shodan_client.lookup_ip(ip)
if result:
self.data.add_shodan_result(ip, result)
shodan_success_count += 1
logger.info(f"✅ Shodan result for {ip}: {len(result.ports)} ports")
else:
logger.debug(f"❌ No Shodan data for {ip}")
except Exception as e:
logger.warning(f"⚠️ Error querying Shodan for {ip}: {e}")
logger.info(f"✅ Shodan lookups complete: {shodan_success_count}/{len(self.data.ip_addresses)} successful")
else:
logger.info("⚠️ Skipping Shodan lookups (no API key)")
self._perform_shodan_lookups()
# VirusTotal lookups
if self.virustotal_client:
total_resources = len(self.data.ip_addresses) + len(self.data.hostnames)
logger.info(f"🛡️ Starting VirusTotal lookups for {total_resources} resources")
vt_success_count = 0
self._perform_virustotal_lookups()
def _perform_reverse_dns_lookups(self):
"""Perform reverse DNS lookups with operation tracking."""
logger.info(f"🔄 Performing reverse DNS for {len(self.data.ip_addresses)} IPs")
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.DNS_REVERSE,
target=ip
)
try:
# Use updated method with operation_id
reverse_hostname = self.dns_resolver.reverse_dns_lookup(ip, operation.operation_id)
if reverse_hostname:
operation.discovered_hostnames = [reverse_hostname]
operation.success = True
# Update nodes that have this IP
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.reverse_dns = reverse_hostname
logger.debug(f"🔄 Reverse DNS {ip} -> {reverse_hostname}")
else:
operation.success = False
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
def _perform_shodan_lookups(self):
"""Perform Shodan lookups with operation tracking."""
logger.info(f"🕵️ Performing Shodan lookups for {len(self.data.ip_addresses)} IPs")
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.SHODAN_LOOKUP,
target=ip
)
try:
# Use updated method with operation_id
result = self.shodan_client.lookup_ip(ip, operation.operation_id)
if result:
operation.shodan_results = [result]
operation.success = True
# Add to relevant nodes
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.shodan_results.append(result)
logger.debug(f"🕵️ Shodan {ip}: {len(result.ports)} ports")
else:
operation.success = False
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
def _perform_virustotal_lookups(self):
"""Perform VirusTotal lookups with operation tracking."""
total_resources = len(self.data.ip_addresses) + len(self.data.nodes)
logger.info(f"🛡️ Performing VirusTotal lookups for {total_resources} resources")
# Check IPs
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.VIRUSTOTAL_IP,
target=ip
)
try:
logger.debug(f"🔍 Querying VirusTotal for IP: {ip}")
result = self.virustotal_client.lookup_ip(ip)
# Use updated method with operation_id
result = self.virustotal_client.lookup_ip(ip, operation.operation_id)
if result:
self.data.add_virustotal_result(ip, result)
vt_success_count += 1
logger.info(f"🛡️ VirusTotal result for {ip}: {result.positives}/{result.total} detections")
operation.virustotal_results = [result]
operation.success = True
# Add to relevant nodes
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.virustotal_results.append(result)
logger.debug(f"🛡️ VirusTotal {ip}: {result.positives}/{result.total}")
else:
logger.debug(f"❌ No VirusTotal data for {ip}")
operation.success = False
except Exception as e:
logger.warning(f"⚠️ Error querying VirusTotal for IP {ip}: {e}")
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
# Check domains
for hostname in self.data.hostnames:
for hostname, node in self.data.nodes.items():
operation = DiscoveryOperation(
operation_type=OperationType.VIRUSTOTAL_DOMAIN,
target=hostname
)
try:
logger.debug(f"🔍 Querying VirusTotal for domain: {hostname}")
result = self.virustotal_client.lookup_domain(hostname)
# Use updated method with operation_id
result = self.virustotal_client.lookup_domain(hostname, operation.operation_id)
if result:
self.data.add_virustotal_result(hostname, result)
vt_success_count += 1
logger.info(f"🛡️ VirusTotal result for {hostname}: {result.positives}/{result.total} detections")
operation.virustotal_results = [result]
operation.success = True
node.virustotal_results.append(result)
logger.debug(f"🛡️ VirusTotal {hostname}: {result.positives}/{result.total}")
else:
logger.debug(f"❌ No VirusTotal data for {hostname}")
operation.success = False
except Exception as e:
logger.warning(f"⚠️ Error querying VirusTotal for domain {hostname}: {e}")
operation.success = False
operation.error_message = str(e)
logger.info(f"✅ VirusTotal lookups complete: {vt_success_count}/{total_resources} successful")
else:
logger.info("⚠️ Skipping VirusTotal lookups (no API key)")
self.data.add_operation(operation)
# Final external lookup summary
ext_stats = {
'reverse_dns': len(self.data.reverse_dns),
'shodan_results': len(self.data.shodan_results),
'virustotal_results': len(self.data.virustotal_results)
}
logger.info(f"📊 External lookups summary: {ext_stats}")
def _is_valid_hostname(self, hostname: str) -> bool:
"""Validate hostname format."""
if not hostname or '.' not in hostname or len(hostname) > 255:
return False
# Keep the original method name for backward compatibility
def _expand_hostname_to_tlds(self, hostname: str) -> Set[str]:
"""Legacy method - redirects to smart expansion."""
return self._expand_hostname_to_tlds_smart(hostname)
# Basic validation
parts = hostname.split('.')
if len(parts) < 2:
return False
for part in parts:
if not part or len(part) > 63:
return False
return True
# Backward compatibility
ReconnaissanceEngine = ForensicReconnaissanceEngine

View File

@ -1,111 +1,451 @@
# File: src/report_generator.py
"""Generate reports from reconnaissance data."""
"""Enhanced report generation with forensic details and discovery graph visualization."""
from datetime import datetime
from typing import Dict, Any
from .data_structures import ReconData
from typing import Dict, Any, List, Set
from .data_structures import ForensicReconData, DiscoveryMethod, OperationType
import logging
class ReportGenerator:
"""Generate various report formats."""
logger = logging.getLogger(__name__)
def __init__(self, data: ReconData):
class ForensicReportGenerator:
"""Generate comprehensive forensic reports with discovery provenance."""
def __init__(self, data: ForensicReconData):
self.data = data
def generate_text_report(self) -> str:
"""Generate comprehensive text report."""
"""Generate comprehensive forensic text report."""
report = []
# Header
report.append("=" * 80)
report.append("DNS RECONNAISSANCE REPORT")
report.append("FORENSIC DNS RECONNAISSANCE REPORT")
report.append("=" * 80)
report.append(f"Start Time: {self.data.start_time}")
report.append(f"End Time: {self.data.end_time}")
report.append(f"Scan Start: {self.data.start_time}")
if self.data.end_time:
report.append(f"Scan End: {self.data.end_time}")
duration = self.data.end_time - self.data.start_time
report.append(f"Duration: {duration}")
report.append(f"Target: {self.data.scan_config.get('target', 'Unknown')}")
report.append(f"Max Depth: {self.data.scan_config.get('max_depth', 'Unknown')}")
report.append("")
# Summary
report.append("SUMMARY")
# Executive Summary
report.append("EXECUTIVE SUMMARY")
report.append("-" * 40)
report.append(f"Total Hostnames Discovered: {len(self.data.hostnames)}")
report.append(f"Total IP Addresses Found: {len(self.data.ip_addresses)}")
report.append(f"Total DNS Records: {sum(len(records) for records in self.data.dns_records.values())}")
report.append(f"Total Certificates Found: {sum(len(certs) for certs in self.data.certificates.values())}")
stats = self.data.get_stats()
report.append(f"Discovered Hostnames: {stats['hostnames']}")
report.append(f"IP Addresses Found: {stats['ip_addresses']}")
report.append(f"Operations Performed: {stats['operations_performed']}")
report.append(f"Discovery Relationships: {stats['discovery_edges']}")
report.append(f"DNS Records Collected: {stats['dns_records']}")
report.append(f"Total Certificates: {stats['certificates_total']}")
report.append(f" └─ Currently Valid: {stats['certificates_current']}")
report.append(f" └─ Expired: {stats['certificates_expired']}")
report.append(f"Shodan Results: {stats['shodan_results']}")
report.append(f"VirusTotal Results: {stats['virustotal_results']}")
report.append("")
# Hostnames by depth
report.append("HOSTNAMES BY DISCOVERY DEPTH")
# Discovery Graph Analysis
graph_analysis = self.data._generate_graph_analysis()
report.append("DISCOVERY GRAPH ANALYSIS")
report.append("-" * 40)
depth_groups = {}
for hostname, depth in self.data.depth_map.items():
if depth not in depth_groups:
depth_groups[depth] = []
depth_groups[depth].append(hostname)
for depth in sorted(depth_groups.keys()):
report.append(f"Depth {depth}: {len(depth_groups[depth])} hostnames")
for hostname in sorted(depth_groups[depth]):
report.append(f" - {hostname}")
report.append(f"Maximum Discovery Depth: {graph_analysis['max_depth']}")
report.append(f"Root Nodes (Initial Targets): {len(graph_analysis['root_nodes'])}")
report.append(f"Leaf Nodes (No Further Discoveries): {len(graph_analysis['leaf_nodes'])}")
report.append("")
# IP Addresses
report.append("IP ADDRESSES")
report.append("-" * 40)
for ip in sorted(self.data.ip_addresses):
report.append(f"{ip}")
if ip in self.data.reverse_dns:
report.append(f" Reverse DNS: {self.data.reverse_dns[ip]}")
if ip in self.data.shodan_results:
shodan = self.data.shodan_results[ip]
report.append(f" Shodan: {len(shodan.ports)} open ports")
if shodan.organization:
report.append(f" Organization: {shodan.organization}")
if shodan.country:
report.append(f" Country: {shodan.country}")
# Depth Distribution
report.append("Discovery Depth Distribution:")
for depth, count in sorted(graph_analysis['depth_distribution'].items()):
report.append(f" Depth {depth}: {count} hostnames")
report.append("")
# DNS Records
report.append("DNS RECORDS")
report.append("-" * 40)
for hostname in sorted(self.data.dns_records.keys()):
report.append(f"{hostname}:")
records_by_type = {}
for record in self.data.dns_records[hostname]:
if record.record_type not in records_by_type:
records_by_type[record.record_type] = []
records_by_type[record.record_type].append(record)
for record_type in sorted(records_by_type.keys()):
report.append(f" {record_type}:")
for record in records_by_type[record_type]:
report.append(f" {record.value}")
# Discovery Methods Distribution
report.append("Discovery Methods Used:")
for method, count in sorted(graph_analysis['discovery_method_distribution'].items()):
report.append(f" {method}: {count} discoveries")
report.append("")
# Certificates
if self.data.certificates:
report.append("CERTIFICATES")
# Discovery Tree
report.append("DISCOVERY TREE")
report.append("-" * 40)
for hostname in sorted(self.data.certificates.keys()):
report.append(f"{hostname}:")
for cert in self.data.certificates[hostname]:
report.append(f" Certificate ID: {cert.id}")
report.append(f" Issuer: {cert.issuer}")
report.append(f" Valid From: {cert.not_before}")
report.append(f" Valid Until: {cert.not_after}")
if cert.is_wildcard:
report.append(f" Type: Wildcard Certificate")
report.extend(self._generate_discovery_tree())
report.append("")
# Detailed Node Analysis
report.append("DETAILED NODE ANALYSIS")
report.append("-" * 40)
report.extend(self._generate_node_details())
report.append("")
# Operations Timeline
report.append("OPERATIONS TIMELINE")
report.append("-" * 40)
report.extend(self._generate_operations_timeline())
report.append("")
# Security Analysis
if self.data.virustotal_results:
security_findings = self._analyze_security_findings()
if security_findings:
report.append("SECURITY ANALYSIS")
report.append("-" * 40)
for resource, result in self.data.virustotal_results.items():
if result.positives > 0:
report.append(f"⚠️ {resource}: {result.positives}/{result.total} detections")
report.append(f" Scan Date: {result.scan_date}")
report.append(f" Report: {result.permalink}")
report.extend(security_findings)
report.append("")
# Certificate Analysis
cert_analysis = self._analyze_certificates()
if cert_analysis:
report.append("CERTIFICATE ANALYSIS")
report.append("-" * 40)
report.extend(cert_analysis)
report.append("")
# DNS Record Analysis
report.append("DNS RECORD ANALYSIS")
report.append("-" * 40)
report.extend(self._analyze_dns_records())
report.append("")
return "\n".join(report)
def _generate_discovery_tree(self) -> List[str]:
"""Generate a tree view of hostname discoveries."""
tree_lines = []
# Find root nodes
graph_analysis = self.data._generate_graph_analysis()
root_nodes = graph_analysis['root_nodes']
if not root_nodes:
tree_lines.append("No root nodes found")
return tree_lines
# Generate tree for each root
for root in sorted(root_nodes):
tree_lines.extend(self._build_tree_branch(root, "", set()))
return tree_lines
def _build_tree_branch(self, hostname: str, prefix: str, visited: Set[str]) -> List[str]:
"""Build a tree branch for a hostname."""
lines = []
# Avoid cycles
if hostname in visited:
lines.append(f"{prefix}{hostname} [CYCLE]")
return lines
visited.add(hostname)
# Get node info
node = self.data.get_node(hostname)
if not node:
lines.append(f"{prefix}{hostname} [NO NODE DATA]")
return lines
# Node info
node_info = f"{hostname} (depth:{node.depth}"
if node.resolved_ips:
node_info += f", IPs:{len(node.resolved_ips)}"
if node.certificates:
valid_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
node_info += f", certs:{valid_certs}+{expired_certs}"
node_info += ")"
lines.append(f"{prefix}{node_info}")
# Get children
children = self.data.get_children(hostname)
children.sort()
for i, child in enumerate(children):
is_last = (i == len(children) - 1)
child_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
# Find discovery method for this child
discovery_method = "unknown"
for edge in self.data.edges:
if edge.source_hostname == hostname and edge.target_hostname == child:
discovery_method = edge.discovery_method.value
break
lines.append(f"{child_prefix}[{discovery_method}]")
lines.extend(self._build_tree_branch(child, next_prefix, visited.copy()))
return lines
def _generate_node_details(self) -> List[str]:
"""Generate detailed analysis of each node."""
details = []
# Sort nodes by depth, then alphabetically
sorted_nodes = sorted(self.data.nodes.items(),
key=lambda x: (x[1].depth, x[0]))
for hostname, node in sorted_nodes:
details.append(f"\n{hostname} (Depth {node.depth})")
details.append("-" * (len(hostname) + 20))
# Discovery provenance
details.append(f"First Seen: {node.first_seen}")
details.append(f"Last Updated: {node.last_updated}")
details.append(f"Discovery Methods: {', '.join(m.value for m in node.discovery_methods)}")
# Discovery paths
paths = self.data.get_discovery_path(hostname)
if paths:
details.append("Discovery Paths:")
for i, path in enumerate(paths[:3]): # Show max 3 paths
path_str = " -> ".join([f"{src}[{method.value}]{tgt}" for src, tgt, method in path])
details.append(f" Path {i+1}: {path_str}")
if len(paths) > 3:
details.append(f" ... and {len(paths) - 3} more paths")
# DNS status
if node.dns_exists is not None:
status = "EXISTS" if node.dns_exists else "NOT FOUND"
details.append(f"DNS Status: {status} (checked: {node.last_dns_check})")
# IP addresses
if node.resolved_ips:
details.append(f"Resolved IPs: {', '.join(sorted(node.resolved_ips))}")
# Reverse DNS
if node.reverse_dns:
details.append(f"Reverse DNS: {node.reverse_dns}")
# DNS records summary
total_records = len(node.get_all_dns_records())
if total_records > 0:
record_types = list(node.dns_records_by_type.keys())
details.append(f"DNS Records: {total_records} records ({', '.join(sorted(record_types))})")
# Certificates summary
current_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
if current_certs > 0 or expired_certs > 0:
details.append(f"Certificates: {current_certs} valid, {expired_certs} expired")
# External results
if node.shodan_results:
details.append(f"Shodan: {len(node.shodan_results)} results")
if node.virustotal_results:
vt_detections = sum(r.positives for r in node.virustotal_results)
details.append(f"VirusTotal: {len(node.virustotal_results)} scans, {vt_detections} total detections")
return details
def _generate_operations_timeline(self) -> List[str]:
"""Generate operations timeline."""
timeline = []
# Sort operations by timestamp
sorted_ops = []
for op_id in self.data.operation_timeline:
if op_id in self.data.operations:
sorted_ops.append(self.data.operations[op_id])
# Group operations by type for summary
op_summary = {}
for op in sorted_ops:
op_type = op.operation_type.value
if op_type not in op_summary:
op_summary[op_type] = {'total': 0, 'successful': 0, 'failed': 0}
op_summary[op_type]['total'] += 1
if op.success:
op_summary[op_type]['successful'] += 1
else:
op_summary[op_type]['failed'] += 1
# Operations summary
timeline.append("Operations Summary:")
for op_type, counts in sorted(op_summary.items()):
timeline.append(f" {op_type}: {counts['successful']}/{counts['total']} successful")
timeline.append("")
# Recent operations (last 20)
timeline.append("Recent Operations (last 20):")
recent_ops = sorted_ops[-20:] if len(sorted_ops) > 20 else sorted_ops
for op in recent_ops:
timestamp = op.timestamp.strftime("%H:%M:%S.%f")[:-3]
status = "" if op.success else ""
target_short = op.target[:30] + "..." if len(op.target) > 30 else op.target
timeline.append(f" {timestamp} {status} {op.operation_type.value:15} {target_short}")
# Show key results
if op.discovered_hostnames:
hostname_list = ", ".join(op.discovered_hostnames[:3])
if len(op.discovered_hostnames) > 3:
hostname_list += f" (+{len(op.discovered_hostnames) - 3} more)"
timeline.append(f" └─ Discovered: {hostname_list}")
if op.error_message:
timeline.append(f" └─ Error: {op.error_message[:50]}...")
return timeline
def _analyze_security_findings(self) -> List[str]:
"""Analyze security-related findings."""
findings = []
# VirusTotal detections
high_risk_resources = []
medium_risk_resources = []
for node in self.data.nodes.values():
for vt_result in node.virustotal_results:
if vt_result.positives > 5:
high_risk_resources.append((node.hostname, vt_result))
elif vt_result.positives > 0:
medium_risk_resources.append((node.hostname, vt_result))
if high_risk_resources:
findings.append("🚨 HIGH RISK FINDINGS:")
for hostname, vt_result in high_risk_resources:
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
findings.append(f" Report: {vt_result.permalink}")
if medium_risk_resources:
findings.append("⚠️ MEDIUM RISK FINDINGS:")
for hostname, vt_result in medium_risk_resources[:5]: # Show max 5
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
if len(medium_risk_resources) > 5:
findings.append(f" ... and {len(medium_risk_resources) - 5} more resources with detections")
# Expired certificates still in use
nodes_with_expired_certs = []
for hostname, node in self.data.nodes.items():
expired = node.get_expired_certificates()
current = node.get_current_certificates()
if expired and not current: # Only expired certs, no valid ones
nodes_with_expired_certs.append((hostname, len(expired)))
if nodes_with_expired_certs:
findings.append("📜 CERTIFICATE ISSUES:")
for hostname, count in nodes_with_expired_certs:
findings.append(f" {hostname}: {count} expired certificates, no valid ones")
return findings
def _analyze_certificates(self) -> List[str]:
"""Analyze certificate findings."""
cert_analysis = []
# Certificate statistics
total_certs = 0
valid_certs = 0
expired_certs = 0
wildcard_certs = 0
cert_authorities = {}
for node in self.data.nodes.values():
for cert in node.certificates:
total_certs += 1
if cert.is_valid_now:
valid_certs += 1
else:
expired_certs += 1
if cert.is_wildcard:
wildcard_certs += 1
# Count certificate authorities
issuer_short = cert.issuer.split(',')[0] if ',' in cert.issuer else cert.issuer
cert_authorities[issuer_short] = cert_authorities.get(issuer_short, 0) + 1
if total_certs == 0:
cert_analysis.append("No certificates found.")
return cert_analysis
cert_analysis.append(f"Total Certificates: {total_certs}")
cert_analysis.append(f" Currently Valid: {valid_certs}")
cert_analysis.append(f" Expired: {expired_certs}")
cert_analysis.append(f" Wildcard Certificates: {wildcard_certs}")
cert_analysis.append("")
# Top certificate authorities
cert_analysis.append("Certificate Authorities:")
sorted_cas = sorted(cert_authorities.items(), key=lambda x: x[1], reverse=True)
for ca, count in sorted_cas[:5]:
cert_analysis.append(f" {ca}: {count} certificates")
cert_analysis.append("")
# Expiring soon (within 30 days)
from datetime import timedelta
soon = datetime.now() + timedelta(days=30)
expiring_soon = []
for hostname, node in self.data.nodes.items():
for cert in node.get_current_certificates():
if cert.not_after <= soon:
expiring_soon.append((hostname, cert.not_after, cert.id))
if expiring_soon:
cert_analysis.append("Certificates Expiring Soon (within 30 days):")
for hostname, expiry, cert_id in sorted(expiring_soon, key=lambda x: x[1]):
cert_analysis.append(f" {hostname}: expires {expiry.strftime('%Y-%m-%d')} (cert ID: {cert_id})")
return cert_analysis
def _analyze_dns_records(self) -> List[str]:
"""Analyze DNS record patterns."""
dns_analysis = []
# Record type distribution
record_type_counts = {}
total_records = 0
for node in self.data.nodes.values():
for record_type, records in node.dns_records_by_type.items():
record_type_counts[record_type] = record_type_counts.get(record_type, 0) + len(records)
total_records += len(records)
dns_analysis.append(f"Total DNS Records: {total_records}")
dns_analysis.append("Record Type Distribution:")
for record_type, count in sorted(record_type_counts.items()):
percentage = (count / total_records * 100) if total_records > 0 else 0
dns_analysis.append(f" {record_type}: {count} ({percentage:.1f}%)")
dns_analysis.append("")
# Interesting findings
interesting = []
# Multiple MX records
multi_mx_nodes = []
for hostname, node in self.data.nodes.items():
mx_records = node.dns_records_by_type.get('MX', [])
if len(mx_records) > 1:
multi_mx_nodes.append((hostname, len(mx_records)))
if multi_mx_nodes:
interesting.append("Multiple MX Records:")
for hostname, count in multi_mx_nodes:
interesting.append(f" {hostname}: {count} MX records")
# CAA records (security-relevant)
caa_nodes = []
for hostname, node in self.data.nodes.items():
if 'CAA' in node.dns_records_by_type:
caa_nodes.append(hostname)
if caa_nodes:
interesting.append(f"Domains with CAA Records: {len(caa_nodes)}")
for hostname in caa_nodes[:5]: # Show first 5
interesting.append(f" {hostname}")
if interesting:
dns_analysis.append("Interesting DNS Findings:")
dns_analysis.extend(interesting)
return dns_analysis
# Backward compatibility
ReportGenerator = ForensicReportGenerator

View File

@ -1,9 +1,10 @@
# File: src/shodan_client.py
"""Shodan API integration."""
"""Shodan API integration with forensic operation tracking."""
import requests
import time
import logging
import uuid
from typing import Optional, Dict, Any, List
from .data_structures import ShodanResult
from .config import Config
@ -12,7 +13,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class ShodanClient:
"""Shodan API client."""
"""Shodan API client with forensic tracking."""
BASE_URL = "https://api.shodan.io"
@ -36,11 +37,15 @@ class ShodanClient:
self.last_request = time.time()
def lookup_ip(self, ip: str) -> Optional[ShodanResult]:
"""Lookup IP address information."""
def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[ShodanResult]:
"""Lookup IP address information with forensic tracking."""
self._rate_limit()
logger.debug(f"🔍 Querying Shodan for IP: {ip}")
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying Shodan for IP: {ip} (operation: {operation_id})")
try:
url = f"{self.BASE_URL}/shodan/host/{ip}"
@ -71,12 +76,14 @@ class ShodanClient:
'banner': service.get('data', '').strip()[:200] if service.get('data') else ''
}
# Create ShodanResult with forensic metadata
result = ShodanResult(
ip=ip,
ports=sorted(list(set(ports))),
services=services,
organization=data.get('org'),
country=data.get('country_name')
country=data.get('country_name'),
operation_id=operation_id # Forensic tracking
)
logger.info(f"✅ Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}")
@ -110,11 +117,15 @@ class ShodanClient:
logger.error(f"❌ Unexpected error querying Shodan for {ip}: {e}")
return None
def search_domain(self, domain: str) -> List[str]:
"""Search for IPs associated with a domain."""
def search_domain(self, domain: str, operation_id: Optional[str] = None) -> List[str]:
"""Search for IPs associated with a domain with forensic tracking."""
self._rate_limit()
logger.debug(f"🔍 Searching Shodan for domain: {domain}")
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Searching Shodan for domain: {domain} (operation: {operation_id})")
try:
url = f"{self.BASE_URL}/shodan/host/search"

View File

@ -1,9 +1,10 @@
# File: src/virustotal_client.py
"""VirusTotal API integration."""
"""VirusTotal API integration with forensic operation tracking."""
import requests
import time
import logging
import uuid
from datetime import datetime
from typing import Optional
from .data_structures import VirusTotalResult
@ -13,7 +14,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class VirusTotalClient:
"""VirusTotal API client."""
"""VirusTotal API client with forensic tracking."""
BASE_URL = "https://www.virustotal.com/vtapi/v2"
@ -37,11 +38,15 @@ class VirusTotalClient:
self.last_request = time.time()
def lookup_ip(self, ip: str) -> Optional[VirusTotalResult]:
"""Lookup IP address reputation."""
def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]:
"""Lookup IP address reputation with forensic tracking."""
self._rate_limit()
logger.debug(f"🔍 Querying VirusTotal for IP: {ip}")
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying VirusTotal for IP: {ip} (operation: {operation_id})")
try:
url = f"{self.BASE_URL}/ip-address/report"
@ -81,12 +86,14 @@ class VirusTotalClient:
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
# Create VirusTotalResult with forensic metadata
result = VirusTotalResult(
resource=ip,
positives=positives,
total=total,
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}')
permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}'),
operation_id=operation_id # Forensic tracking
)
logger.info(f"✅ VirusTotal result for IP {ip}: {result.positives}/{result.total} detections")
@ -122,11 +129,15 @@ class VirusTotalClient:
logger.error(f"❌ Unexpected error querying VirusTotal for IP {ip}: {e}")
return None
def lookup_domain(self, domain: str) -> Optional[VirusTotalResult]:
"""Lookup domain reputation."""
def lookup_domain(self, domain: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]:
"""Lookup domain reputation with forensic tracking."""
self._rate_limit()
logger.debug(f"🔍 Querying VirusTotal for domain: {domain}")
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying VirusTotal for domain: {domain} (operation: {operation_id})")
try:
url = f"{self.BASE_URL}/domain/report"
@ -172,12 +183,14 @@ class VirusTotalClient:
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
# Create VirusTotalResult with forensic metadata
result = VirusTotalResult(
resource=domain,
positives=positives,
total=max(total, 1), # Ensure total is at least 1
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}')
permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}'),
operation_id=operation_id # Forensic tracking
)
logger.info(f"✅ VirusTotal result for domain {domain}: {result.positives}/{result.total} detections")

View File

@ -1,14 +1,14 @@
# File: src/web_app.py
"""Flask web application for reconnaissance tool."""
"""Flask web application for forensic reconnaissance tool."""
from flask import Flask, render_template, request, jsonify, send_from_directory
import threading
import time
import logging
from .config import Config
from .reconnaissance import ReconnaissanceEngine
from .report_generator import ReportGenerator
from .data_structures import ReconData
from .reconnaissance import ForensicReconnaissanceEngine
from .report_generator import ForensicReportGenerator
from .data_structures import ForensicReconData
# Set up logging for this module
logger = logging.getLogger(__name__)
@ -23,11 +23,11 @@ def create_app(config: Config):
template_folder='../templates',
static_folder='../static')
app.config['SECRET_KEY'] = 'recon-tool-secret-key'
app.config['SECRET_KEY'] = 'forensic-recon-tool-secret-key'
# Set up logging for web app
config.setup_logging(cli_mode=False)
logger.info("🌐 Web application initialized")
logger.info("🌐 Forensic web application initialized")
@app.route('/')
def index():
@ -36,7 +36,7 @@ def create_app(config: Config):
@app.route('/api/scan', methods=['POST'])
def start_scan():
"""Start a new reconnaissance scan."""
"""Start a new forensic reconnaissance scan."""
try:
data = request.get_json()
target = data.get('target')
@ -52,33 +52,32 @@ def create_app(config: Config):
# Generate scan ID
scan_id = f"{target}_{int(time.time())}"
logger.info(f"🚀 Starting new scan: {scan_id} for target: {target}")
logger.info(f"🚀 Starting new forensic scan: {scan_id} for target: {target}")
# Create shared ReconData object for live updates
shared_data = ReconData()
# Create shared ForensicReconData object for live updates
shared_data = ForensicReconData()
shared_data.scan_config = {
'target': target,
'max_depth': scan_config.max_depth,
'shodan_enabled': scan_config.shodan_key is not None,
'virustotal_enabled': scan_config.virustotal_key is not None
}
# Initialize scan data with the shared data object
# Initialize scan data with the shared forensic data object
with scan_lock:
active_scans[scan_id] = {
'status': 'starting',
'progress': 0,
'message': 'Initializing...',
'data': shared_data, # Share the data object from the start!
'message': 'Initializing forensic scan...',
'data': shared_data, # Share the forensic data object from the start
'error': None,
'live_stats': {
'hostnames': 0,
'ip_addresses': 0,
'dns_records': 0,
'certificates': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'live_stats': shared_data.get_stats(), # Use forensic stats
'latest_discoveries': []
}
# Start reconnaissance in background thread
# Start forensic reconnaissance in background thread
thread = threading.Thread(
target=run_reconnaissance_background,
target=run_forensic_reconnaissance_background,
args=(scan_id, target, scan_config, shared_data)
)
thread.daemon = True
@ -92,14 +91,18 @@ def create_app(config: Config):
@app.route('/api/scan/<scan_id>/status')
def get_scan_status(scan_id):
"""Get scan status and progress with live discoveries."""
"""Get scan status and progress with live forensic discoveries."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id].copy()
# Don't include the full data object in status (too large)
# Update live stats from forensic data if available
if scan_data.get('data') and hasattr(scan_data['data'], 'get_stats'):
scan_data['live_stats'] = scan_data['data'].get_stats()
# Don't include the full forensic data object in status (too large)
if 'data' in scan_data:
del scan_data['data']
@ -107,7 +110,7 @@ def create_app(config: Config):
@app.route('/api/scan/<scan_id>/report')
def get_scan_report(scan_id):
"""Get scan report."""
"""Get forensic scan report."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
@ -118,8 +121,8 @@ def create_app(config: Config):
return jsonify({'error': 'Scan not completed'}), 400
try:
# Generate report
report_gen = ReportGenerator(scan_data['data'])
# Generate forensic report
report_gen = ForensicReportGenerator(scan_data['data'])
return jsonify({
'json_report': scan_data['data'].to_json(),
@ -129,48 +132,200 @@ def create_app(config: Config):
logger.error(f"❌ Error generating report for {scan_id}: {e}", exc_info=True)
return jsonify({'error': f'Failed to generate report: {str(e)}'}), 500
@app.route('/api/scan/<scan_id>/live-data')
def get_live_scan_data(scan_id):
"""Get current reconnaissance data (for real-time updates)."""
@app.route('/api/scan/<scan_id>/graph')
def get_scan_graph(scan_id):
"""Get graph data for visualization."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id]
# Now we always have a data object, even if it's empty initially
data_obj = scan_data['data']
if scan_data['status'] != 'completed' or not scan_data['data']:
return jsonify({'error': 'Scan not completed'}), 400
if not data_obj:
try:
forensic_data = scan_data['data']
# Extract nodes for graph
nodes = []
for hostname, node in forensic_data.nodes.items():
# Determine node color based on depth
color_map = {
0: '#00ff41', # Green for root
1: '#ff9900', # Orange for depth 1
2: '#ff6b6b', # Red for depth 2
3: '#4ecdc4', # Teal for depth 3
4: '#45b7d1', # Blue for depth 4+
}
color = color_map.get(node.depth, '#666666')
# Calculate node size based on number of connections and data
connections = len(forensic_data.get_children(hostname)) + len(forensic_data.get_parents(hostname))
dns_records = len(node.get_all_dns_records())
certificates = len(node.certificates)
# Size based on importance (connections + data)
size = max(8, min(20, 8 + connections * 2 + dns_records // 3 + certificates))
nodes.append({
'id': hostname,
'label': hostname,
'depth': node.depth,
'color': color,
'size': size,
'dns_records': dns_records,
'certificates': certificates,
'ip_addresses': list(node.resolved_ips),
'discovery_methods': [method.value for method in node.discovery_methods],
'first_seen': node.first_seen.isoformat() if node.first_seen else None
})
# Extract edges for graph
edges = []
for edge in forensic_data.edges:
# Skip synthetic TLD expansion edges for cleaner visualization
if edge.source_hostname.startswith('tld_expansion:'):
continue
# Color edges by discovery method
method_colors = {
'initial_target': '#00ff41',
'tld_expansion': '#ff9900',
'dns_record_value': '#4ecdc4',
'certificate_subject': '#ff6b6b',
'dns_subdomain_extraction': '#45b7d1'
}
edges.append({
'source': edge.source_hostname,
'target': edge.target_hostname,
'method': edge.discovery_method.value,
'color': method_colors.get(edge.discovery_method.value, '#666666'),
'operation_id': edge.operation_id,
'timestamp': edge.timestamp.isoformat() if edge.timestamp else None
})
# Graph statistics
stats = {
'node_count': len(nodes),
'edge_count': len(edges),
'max_depth': max([node['depth'] for node in nodes]) if nodes else 0,
'discovery_methods': list(set([edge['method'] for edge in edges])),
'root_nodes': [node['id'] for node in nodes if node['depth'] == 0]
}
return jsonify({
'nodes': nodes,
'edges': edges,
'stats': stats
})
except Exception as e:
logger.error(f"⚠️ Error generating graph data for {scan_id}: {e}", exc_info=True)
return jsonify({'error': f'Failed to generate graph: {str(e)}'}), 500
@app.route('/api/scan/<scan_id>/live-data')
def get_live_scan_data(scan_id):
"""Get current forensic reconnaissance data (for real-time updates)."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id]
forensic_data = scan_data['data']
if not forensic_data:
return jsonify({
'hostnames': [],
'ip_addresses': [],
'stats': scan_data['live_stats'],
'stats': {
'hostnames': 0,
'ip_addresses': 0,
'discovery_edges': 0,
'operations_performed': 0,
'dns_records': 0,
'certificates_total': 0,
'certificates_current': 0,
'certificates_expired': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'latest_discoveries': []
})
# Return current discoveries from the shared data object
# Extract data from forensic structure for frontend
try:
hostnames = sorted(list(forensic_data.nodes.keys()))
ip_addresses = sorted(list(forensic_data.ip_addresses))
stats = forensic_data.get_stats()
# Generate activity log from recent operations
latest_discoveries = []
recent_operations = forensic_data.operation_timeline[-10:] # Last 10 operations
for op_id in recent_operations:
if op_id in forensic_data.operations:
operation = forensic_data.operations[op_id]
activity_entry = {
'timestamp': operation.timestamp.timestamp(),
'message': f"{operation.operation_type.value}: {operation.target}"
}
# Add result summary
if operation.discovered_hostnames:
activity_entry['message'] += f"{len(operation.discovered_hostnames)} hostnames"
if operation.discovered_ips:
activity_entry['message'] += f"{len(operation.discovered_ips)} IPs"
latest_discoveries.append(activity_entry)
# Update scan data with latest discoveries
scan_data['latest_discoveries'] = latest_discoveries
return jsonify({
'hostnames': sorted(list(data_obj.hostnames)),
'ip_addresses': sorted(list(data_obj.ip_addresses)),
'stats': data_obj.get_stats(),
'latest_discoveries': scan_data.get('latest_discoveries', [])
'hostnames': hostnames,
'ip_addresses': ip_addresses,
'stats': stats,
'latest_discoveries': latest_discoveries
})
except Exception as e:
logger.error(f"❌ Error extracting live data for {scan_id}: {e}", exc_info=True)
# Return minimal data structure
return jsonify({
'hostnames': [],
'ip_addresses': [],
'stats': {
'hostnames': len(forensic_data.nodes) if forensic_data.nodes else 0,
'ip_addresses': len(forensic_data.ip_addresses) if forensic_data.ip_addresses else 0,
'discovery_edges': 0,
'operations_performed': 0,
'dns_records': 0,
'certificates_total': 0,
'certificates_current': 0,
'certificates_expired': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'latest_discoveries': []
})
return app
def run_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ReconData):
"""Run reconnaissance in background thread with shared data object."""
def run_forensic_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ForensicReconData):
"""Run forensic reconnaissance in background thread with shared forensic data object."""
def update_progress(message: str, percentage: int = None):
"""Update scan progress and live statistics."""
"""Update scan progress and live forensic statistics."""
with scan_lock:
if scan_id in active_scans:
active_scans[scan_id]['message'] = message
if percentage is not None:
active_scans[scan_id]['progress'] = percentage
# Update live stats from the shared data object
# Update live stats from the shared forensic data object
if shared_data:
active_scans[scan_id]['live_stats'] = shared_data.get_stats()
@ -178,10 +333,13 @@ def run_reconnaissance_background(scan_id: str, target: str, config: Config, sha
if 'latest_discoveries' not in active_scans[scan_id]:
active_scans[scan_id]['latest_discoveries'] = []
active_scans[scan_id]['latest_discoveries'].append({
# Create activity entry
activity_entry = {
'timestamp': time.time(),
'message': message
})
}
active_scans[scan_id]['latest_discoveries'].append(activity_entry)
# Keep only last 10 discoveries
active_scans[scan_id]['latest_discoveries'] = \
@ -190,40 +348,44 @@ def run_reconnaissance_background(scan_id: str, target: str, config: Config, sha
logger.info(f"[{scan_id}] {message} ({percentage}%)" if percentage else f"[{scan_id}] {message}")
try:
logger.info(f"🔧 Initializing reconnaissance engine for scan: {scan_id}")
logger.info(f"🔧 Initializing forensic reconnaissance engine for scan: {scan_id}")
# Initialize engine
engine = ReconnaissanceEngine(config)
# Initialize forensic engine
engine = ForensicReconnaissanceEngine(config)
engine.set_progress_callback(update_progress)
# IMPORTANT: Pass the shared data object to the engine
# IMPORTANT: Pass the shared forensic data object to the engine
engine.set_shared_data(shared_data)
# Update status
with scan_lock:
active_scans[scan_id]['status'] = 'running'
logger.info(f"🚀 Starting reconnaissance for: {target}")
logger.info(f"🚀 Starting forensic reconnaissance for: {target}")
# Run reconnaissance - this will populate the shared_data object incrementally
# Run forensic reconnaissance - this will populate the shared_data object incrementally
final_data = engine.run_reconnaissance(target)
logger.info(f"Reconnaissance completed for scan: {scan_id}")
logger.info(f"Forensic reconnaissance completed for scan: {scan_id}")
# Update with final results (the shared_data should already be populated)
with scan_lock:
active_scans[scan_id]['status'] = 'completed'
active_scans[scan_id]['progress'] = 100
active_scans[scan_id]['message'] = 'Reconnaissance completed'
active_scans[scan_id]['message'] = 'Forensic reconnaissance completed'
active_scans[scan_id]['data'] = final_data # This should be the same as shared_data
active_scans[scan_id]['live_stats'] = final_data.get_stats()
# Log final statistics
# Log final forensic statistics
final_stats = final_data.get_stats()
logger.info(f"📊 Final stats for {scan_id}: {final_stats}")
logger.info(f"📊 Final forensic stats for {scan_id}: {final_stats}")
# Log discovery graph analysis
graph_analysis = final_data._generate_graph_analysis()
logger.info(f"🌐 Discovery graph: {len(final_data.nodes)} nodes, {len(final_data.edges)} edges, max depth: {graph_analysis['max_depth']}")
except Exception as e:
logger.error(f"❌ Error in reconnaissance for {scan_id}: {e}", exc_info=True)
logger.error(f"❌ Error in forensic reconnaissance for {scan_id}: {e}", exc_info=True)
# Handle errors
with scan_lock:
active_scans[scan_id]['status'] = 'error'

View File

@ -1,4 +1,4 @@
// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Debug Output
// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Forensic Data Support
class ReconTool {
constructor() {
@ -6,7 +6,8 @@ class ReconTool {
this.pollInterval = null;
this.liveDataInterval = null;
this.currentReport = null;
this.debugMode = true; // Enable debug logging
this.debugMode = true;
this.graphVisualization = null; // Add this line
this.init();
}
@ -23,6 +24,13 @@ class ReconTool {
init() {
this.bindEvents();
this.setupRealtimeElements();
// Handle window resize for graph
window.addEventListener('resize', () => {
if (this.graphVisualization) {
this.graphVisualization.handleResize();
}
});
}
setupRealtimeElements() {
@ -43,6 +51,14 @@ class ReconTool {
<span class="stat-label">IP Addresses:</span>
<span id="liveIPs" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Discovery Edges:</span>
<span id="liveEdges" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Operations:</span>
<span id="liveOperations" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">DNS Records:</span>
<span id="liveDNS" class="stat-value">0</span>
@ -117,6 +133,27 @@ class ReconTool {
this.startScan();
}
});
document.getElementById('showGraphView').addEventListener('click', () => {
this.showGraphView();
});
}
showGraphView() {
if (this.graphVisualization && this.currentScanId) {
document.getElementById('graphSection').style.display = 'block';
// Update button states
document.getElementById('showGraphView').classList.add('active');
document.getElementById('showJson').classList.remove('active');
document.getElementById('showText').classList.remove('active');
// Scroll to graph section
document.getElementById('graphSection').scrollIntoView({
behavior: 'smooth'
});
} else {
alert('Graph data not available yet. Please wait for scan completion.');
}
}
async startScan() {
@ -137,7 +174,7 @@ class ReconTool {
try {
// Show progress section
this.showProgressSection();
this.updateProgress(0, 'Starting scan...');
this.updateProgress(0, 'Starting forensic scan...');
this.debug('Starting scan with data:', scanData);
@ -223,7 +260,7 @@ class ReconTool {
// Update progress
this.updateProgress(status.progress, status.message);
// Update live stats
// Update live stats (handle new forensic format)
if (status.live_stats) {
this.debug('Received live stats:', status.live_stats);
this.updateLiveStats(status.live_stats);
@ -270,7 +307,7 @@ class ReconTool {
this.debug('Received live data:', data);
// Update live discoveries
// Update live discoveries (handle new forensic format)
this.updateLiveDiscoveries(data);
} catch (error) {
@ -282,17 +319,19 @@ class ReconTool {
updateLiveStats(stats) {
this.debug('Updating live stats:', stats);
// Update the live statistics counters
const statElements = {
'liveHostnames': stats.hostnames || 0,
'liveIPs': stats.ip_addresses || 0,
// Handle both old and new stat formats for compatibility
const statMappings = {
'liveHostnames': stats.hostnames || stats.hostnames || 0,
'liveIPs': stats.ip_addresses || stats.ips || 0,
'liveEdges': stats.discovery_edges || 0, // New forensic field
'liveOperations': stats.operations_performed || 0, // New forensic field
'liveDNS': stats.dns_records || 0,
'liveCerts': stats.certificates || 0,
'liveCerts': stats.certificates_total || stats.certificates || 0,
'liveShodan': stats.shodan_results || 0,
'liveVT': stats.virustotal_results || 0
};
Object.entries(statElements).forEach(([elementId, value]) => {
Object.entries(statMappings).forEach(([elementId, value]) => {
const element = document.getElementById(elementId);
if (element) {
const currentValue = element.textContent;
@ -315,43 +354,92 @@ class ReconTool {
updateLiveDiscoveries(data) {
this.debug('Updating live discoveries with data:', data);
// Handle new forensic data format
let hostnames = [];
let ipAddresses = [];
let activities = [];
// Extract data from forensic format or fallback to old format
if (data.hostnames && Array.isArray(data.hostnames)) {
hostnames = data.hostnames;
} else if (data.stats && data.stats.hostnames) {
// If we only have stats, create a placeholder list
hostnames = [`${data.stats.hostnames} discovered`];
}
if (data.ip_addresses && Array.isArray(data.ip_addresses)) {
ipAddresses = data.ip_addresses;
} else if (data.stats && data.stats.ip_addresses) {
// If we only have stats, create a placeholder list
ipAddresses = [`${data.stats.ip_addresses} discovered`];
}
// Handle activity log from forensic format
if (data.latest_discoveries && Array.isArray(data.latest_discoveries)) {
activities = data.latest_discoveries;
}
// Update hostnames list
const hostnameList = document.querySelector('#recentHostnames .hostname-list');
if (hostnameList && data.hostnames && data.hostnames.length > 0) {
if (hostnameList && hostnames.length > 0) {
// Show last 10 hostnames
const recentHostnames = data.hostnames;
const recentHostnames = hostnames.slice(-10);
hostnameList.innerHTML = recentHostnames.map(hostname =>
`<span class="discovery-item">${hostname}</span>`
).join('');
this.debug(`Updated hostname list with ${recentHostnames.length} items`);
} else if (hostnameList) {
this.debug(`No hostnames to display (${data.hostnames ? data.hostnames.length : 0} total)`);
this.debug(`No hostnames to display (${hostnames.length} total)`);
}
// Update IP addresses list
const ipList = document.querySelector('#recentIPs .ip-list');
if (ipList && data.ip_addresses && data.ip_addresses.length > 0) {
if (ipList && ipAddresses.length > 0) {
// Show last 10 IPs
const recentIPs = data.ip_addresses;
const recentIPs = ipAddresses.slice(-10);
ipList.innerHTML = recentIPs.map(ip =>
`<span class="discovery-item">${ip}</span>`
).join('');
this.debug(`Updated IP list with ${recentIPs.length} items`);
} else if (ipList) {
this.debug(`No IPs to display (${data.ip_addresses ? data.ip_addresses.length : 0} total)`);
this.debug(`No IPs to display (${ipAddresses.length} total)`);
}
// Update activity log
const activityList = document.querySelector('#activityLog .activity-list');
if (activityList && data.latest_discoveries && data.latest_discoveries.length > 0) {
const activities = data.latest_discoveries.slice(-5); // Last 5 activities
activityList.innerHTML = activities.map(activity => {
if (activityList) {
if (activities.length > 0) {
const recentActivities = activities.slice(-5); // Last 5 activities
activityList.innerHTML = recentActivities.map(activity => {
const time = new Date(activity.timestamp * 1000).toLocaleTimeString();
return `<div class="activity-item">[${time}] ${activity.message}</div>`;
}).join('');
this.debug(`Updated activity log with ${activities.length} items`);
} else if (activityList) {
this.debug(`No activities to display (${data.latest_discoveries ? data.latest_discoveries.length : 0} total)`);
this.debug(`Updated activity log with ${recentActivities.length} items`);
} else {
// Fallback: show generic activity based on stats
const stats = data.stats || {};
const genericActivities = [];
if (stats.operations_performed > 0) {
genericActivities.push(`${stats.operations_performed} operations performed`);
}
if (stats.hostnames > 0) {
genericActivities.push(`${stats.hostnames} hostnames discovered`);
}
if (stats.dns_records > 0) {
genericActivities.push(`${stats.dns_records} DNS records collected`);
}
if (genericActivities.length > 0) {
const now = new Date().toLocaleTimeString();
activityList.innerHTML = genericActivities.map(activity =>
`<div class="activity-item">[${now}] ${activity}</div>`
).join('');
this.debug(`Updated activity log with ${genericActivities.length} generic items`);
} else {
this.debug('No activities to display');
}
}
}
}
@ -375,8 +463,14 @@ class ReconTool {
this.showResultsSection();
this.showReport('text'); // Default to text view
// Load and show graph
if (!this.graphVisualization) {
this.graphVisualization = new GraphVisualization();
}
await this.graphVisualization.loadAndShowGraph(this.currentScanId);
} catch (error) {
console.error('❌ Error loading report:', error);
console.error('⚠️ Error loading report:', error);
this.showError(`Error loading report: ${error.message}`);
}
}
@ -410,7 +504,7 @@ class ReconTool {
if (liveSection) {
const title = liveSection.querySelector('h3');
if (title) {
title.textContent = '📊 Final Discovery Summary';
title.textContent = '📊 Final Forensic Summary';
}
liveSection.style.display = 'block';
}
@ -424,7 +518,7 @@ class ReconTool {
if (progressMessage) progressMessage.style.display = 'none';
if (scanControls) scanControls.style.display = 'none';
this.debug('Showing results section with live discoveries');
this.debug('Showing results section with forensic discoveries');
}
resetToForm() {
@ -527,11 +621,11 @@ class ReconTool {
content = typeof this.currentReport.json_report === 'string'
? this.currentReport.json_report
: JSON.stringify(this.currentReport.json_report, null, 2);
filename = `recon-report-${this.currentScanId}.json`;
filename = `forensic-recon-report-${this.currentScanId}.json`;
mimeType = 'application/json';
} else {
content = this.currentReport.text_report;
filename = `recon-report-${this.currentScanId}.txt`;
filename = `forensic-recon-report-${this.currentScanId}.txt`;
mimeType = 'text/plain';
}
@ -548,8 +642,375 @@ class ReconTool {
}
}
class GraphVisualization {
constructor() {
this.svg = null;
this.simulation = null;
this.graphData = null;
this.showLabels = false;
this.selectedNode = null;
this.zoom = null;
this.container = null;
}
async loadAndShowGraph(scanId) {
try {
console.log('🕸️ Loading graph data...');
const response = await fetch(`/api/scan/${scanId}/graph`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const graphData = await response.json();
if (graphData.error) {
throw new Error(graphData.error);
}
this.graphData = graphData;
this.showGraphSection();
this.initializeGraph();
this.updateGraphStats(graphData.stats);
console.log('✅ Graph loaded successfully', graphData.stats);
} catch (error) {
console.error('⚠️ Error loading graph:', error);
alert(`Failed to load graph: ${error.message}`);
}
}
showGraphSection() {
document.getElementById('graphSection').style.display = 'block';
this.bindGraphEvents();
}
bindGraphEvents() {
document.getElementById('showGraph').addEventListener('click', () => {
this.showGraph();
});
document.getElementById('hideGraph').addEventListener('click', () => {
this.hideGraph();
});
document.getElementById('resetZoom').addEventListener('click', () => {
this.resetZoom();
});
document.getElementById('toggleLabels').addEventListener('click', () => {
this.toggleLabels();
});
}
initializeGraph() {
if (!this.graphData) return;
// Clear existing graph
d3.select('#discoveryGraph').selectAll('*').remove();
// Set up SVG
const container = d3.select('#discoveryGraph');
const containerNode = container.node();
const width = containerNode.clientWidth;
const height = containerNode.clientHeight;
this.svg = container
.attr('width', width)
.attr('height', height);
// Set up zoom behavior
this.zoom = d3.zoom()
.scaleExtent([0.1, 3])
.on('zoom', (event) => {
this.container.attr('transform', event.transform);
});
this.svg.call(this.zoom);
// Create container for graph elements
this.container = this.svg.append('g');
// Set up force simulation
this.simulation = d3.forceSimulation(this.graphData.nodes)
.force('link', d3.forceLink(this.graphData.edges)
.id(d => d.id)
.distance(80)
.strength(0.5))
.force('charge', d3.forceManyBody()
.strength(-300)
.distanceMax(400))
.force('center', d3.forceCenter(width / 2, height / 2))
.force('collision', d3.forceCollide()
.radius(d => d.size + 5));
this.drawGraph();
this.startSimulation();
}
drawGraph() {
// Draw links
const links = this.container.append('g')
.selectAll('line')
.data(this.graphData.edges)
.enter().append('line')
.attr('class', 'graph-link')
.attr('stroke', d => d.color)
.on('mouseover', (event, d) => {
this.showTooltip(event, `Discovery: ${d.method}<br>From: ${d.source}<br>To: ${d.target}`);
})
.on('mouseout', () => {
this.hideTooltip();
});
// Draw nodes
const nodes = this.container.append('g')
.selectAll('circle')
.data(this.graphData.nodes)
.enter().append('circle')
.attr('class', 'graph-node')
.attr('r', d => d.size)
.attr('fill', d => d.color)
.style('opacity', 0.8)
.on('mouseover', (event, d) => {
this.showNodeTooltip(event, d);
this.highlightConnections(d);
})
.on('mouseout', (event, d) => {
this.hideTooltip();
this.unhighlightConnections();
})
.on('click', (event, d) => {
this.selectNode(d);
})
.call(d3.drag()
.on('start', (event, d) => {
if (!event.active) this.simulation.alphaTarget(0.3).restart();
d.fx = d.x;
d.fy = d.y;
})
.on('drag', (event, d) => {
d.fx = event.x;
d.fy = event.y;
})
.on('end', (event, d) => {
if (!event.active) this.simulation.alphaTarget(0);
d.fx = null;
d.fy = null;
}));
// Draw labels (initially hidden)
const labels = this.container.append('g')
.selectAll('text')
.data(this.graphData.nodes)
.enter().append('text')
.attr('class', 'graph-label')
.attr('dy', '.35em')
.style('opacity', this.showLabels ? 1 : 0)
.text(d => d.label);
// Store references
this.links = links;
this.nodes = nodes;
this.labels = labels;
}
startSimulation() {
this.simulation.on('tick', () => {
this.links
.attr('x1', d => d.source.x)
.attr('y1', d => d.source.y)
.attr('x2', d => d.target.x)
.attr('y2', d => d.target.y);
this.nodes
.attr('cx', d => d.x)
.attr('cy', d => d.y);
this.labels
.attr('x', d => d.x)
.attr('y', d => d.y + d.size + 12);
});
}
showNodeTooltip(event, node) {
const tooltip = `
<strong>${node.label}</strong><br>
Depth: ${node.depth}<br>
DNS Records: ${node.dns_records}<br>
Certificates: ${node.certificates}<br>
IPs: ${node.ip_addresses.length}<br>
Discovery: ${node.discovery_methods.join(', ')}<br>
First Seen: ${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'}
`;
this.showTooltip(event, tooltip);
}
showTooltip(event, content) {
const tooltip = document.getElementById('graphTooltip');
tooltip.innerHTML = content;
tooltip.className = 'graph-tooltip visible';
const rect = tooltip.getBoundingClientRect();
const containerRect = document.getElementById('discoveryGraph').getBoundingClientRect();
tooltip.style.left = `${event.clientX - containerRect.left + 10}px`;
tooltip.style.top = `${event.clientY - containerRect.top - 10}px`;
// Adjust position if tooltip goes off screen
if (event.clientX + rect.width > window.innerWidth) {
tooltip.style.left = `${event.clientX - containerRect.left - rect.width - 10}px`;
}
if (event.clientY - rect.height < 0) {
tooltip.style.top = `${event.clientY - containerRect.top + 20}px`;
}
}
hideTooltip() {
const tooltip = document.getElementById('graphTooltip');
tooltip.className = 'graph-tooltip';
}
highlightConnections(node) {
// Highlight connected links
this.links
.style('opacity', d => (d.source.id === node.id || d.target.id === node.id) ? 1 : 0.2)
.classed('highlighted', d => d.source.id === node.id || d.target.id === node.id);
// Highlight connected nodes
this.nodes
.style('opacity', d => {
if (d.id === node.id) return 1;
const connected = this.graphData.edges.some(edge =>
(edge.source.id === node.id && edge.target.id === d.id) ||
(edge.target.id === node.id && edge.source.id === d.id)
);
return connected ? 0.8 : 0.3;
});
}
unhighlightConnections() {
this.links
.style('opacity', 0.6)
.classed('highlighted', false);
this.nodes
.style('opacity', 0.8);
}
selectNode(node) {
// Update selected node styling
this.nodes
.classed('selected', d => d.id === node.id);
// Show node details
this.showNodeDetails(node);
this.selectedNode = node;
}
showNodeDetails(node) {
const detailsContainer = document.getElementById('nodeDetails');
const selectedInfo = document.getElementById('selectedNodeInfo');
const details = `
<div class="detail-item">
<span class="detail-label">Hostname:</span>
<span class="detail-value">${node.label}</span>
</div>
<div class="detail-item">
<span class="detail-label">Discovery Depth:</span>
<span class="detail-value">${node.depth}</span>
</div>
<div class="detail-item">
<span class="detail-label">DNS Records:</span>
<span class="detail-value">${node.dns_records}</span>
</div>
<div class="detail-item">
<span class="detail-label">Certificates:</span>
<span class="detail-value">${node.certificates}</span>
</div>
<div class="detail-item">
<span class="detail-label">IP Addresses:</span>
<span class="detail-value">${node.ip_addresses.join(', ') || 'None'}</span>
</div>
<div class="detail-item">
<span class="detail-label">Discovery Methods:</span>
<span class="detail-value">${node.discovery_methods.join(', ')}</span>
</div>
<div class="detail-item">
<span class="detail-label">First Seen:</span>
<span class="detail-value">${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'}</span>
</div>
`;
detailsContainer.innerHTML = details;
selectedInfo.style.display = 'block';
}
toggleLabels() {
this.showLabels = !this.showLabels;
if (this.labels) {
this.labels.transition()
.duration(300)
.style('opacity', this.showLabels ? 1 : 0);
}
const button = document.getElementById('toggleLabels');
button.textContent = this.showLabels ? 'Hide Labels' : 'Show Labels';
}
resetZoom() {
if (this.svg && this.zoom) {
this.svg.transition()
.duration(750)
.call(this.zoom.transform, d3.zoomIdentity);
}
}
showGraph() {
if (this.container) {
this.container.style('display', 'block');
}
document.getElementById('showGraph').classList.add('active');
document.getElementById('hideGraph').classList.remove('active');
}
hideGraph() {
if (this.container) {
this.container.style('display', 'none');
}
document.getElementById('hideGraph').classList.add('active');
document.getElementById('showGraph').classList.remove('active');
}
updateGraphStats(stats) {
document.getElementById('graphNodes').textContent = stats.node_count || 0;
document.getElementById('graphEdges').textContent = stats.edge_count || 0;
document.getElementById('graphDepth').textContent = stats.max_depth || 0;
}
// Handle window resize
handleResize() {
if (this.svg && this.graphData) {
const containerNode = document.getElementById('discoveryGraph');
const width = containerNode.clientWidth;
const height = containerNode.clientHeight;
this.svg
.attr('width', width)
.attr('height', height);
this.simulation
.force('center', d3.forceCenter(width / 2, height / 2))
.restart();
}
}
}
// Initialize the application when DOM is loaded
document.addEventListener('DOMContentLoaded', () => {
console.log('🌐 DNS Reconnaissance Tool initialized with debug mode');
console.log('🌐 Forensic DNS Reconnaissance Tool initialized with debug mode');
new ReconTool();
});

View File

@ -437,3 +437,252 @@ header p {
gap: 10px;
}
}
/* Add this CSS to the existing style.css file */
/* Graph Section */
.graph-section {
background: #2a2a2a;
border-radius: 4px;
border: 1px solid #444;
box-shadow: inset 0 0 15px rgba(0,0,0,0.5);
padding: 30px;
margin-bottom: 25px;
}
.graph-controls {
margin-bottom: 20px;
text-align: center;
}
.graph-controls button {
margin: 0 5px;
}
.graph-legend {
display: flex;
justify-content: space-between;
margin-bottom: 20px;
padding: 15px;
background: rgba(0,0,0,0.3);
border-radius: 4px;
border: 1px solid #444;
flex-wrap: wrap;
}
.graph-legend h4, .graph-legend h5 {
color: #e0e0e0;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}
.legend-items, .legend-methods {
display: flex;
flex-direction: column;
gap: 8px;
}
.legend-item {
display: flex;
align-items: center;
gap: 10px;
font-size: 0.9rem;
color: #c7c7c7;
}
.legend-color {
width: 16px;
height: 16px;
border-radius: 50%;
border: 1px solid #666;
}
.method-item {
font-size: 0.9rem;
color: #a0a0a0;
margin-bottom: 4px;
}
.graph-container {
position: relative;
width: 100%;
height: 600px;
background: #0a0a0a;
border-radius: 4px;
border: 1px solid #333;
margin-bottom: 20px;
overflow: hidden;
}
#discoveryGraph {
width: 100%;
height: 100%;
cursor: grab;
}
#discoveryGraph:active {
cursor: grabbing;
}
.graph-tooltip {
position: absolute;
background: rgba(0, 0, 0, 0.9);
color: #00ff41;
padding: 10px;
border-radius: 4px;
border: 1px solid #00ff41;
font-family: 'Courier New', monospace;
font-size: 0.8rem;
pointer-events: none;
opacity: 0;
transition: opacity 0.2s ease;
z-index: 1000;
max-width: 300px;
line-height: 1.4;
}
.graph-tooltip.visible {
opacity: 1;
}
.graph-info {
display: flex;
justify-content: space-between;
align-items: flex-start;
flex-wrap: wrap;
gap: 20px;
}
.graph-stats {
display: flex;
gap: 20px;
flex-wrap: wrap;
}
.selected-node-info {
background: rgba(0, 40, 0, 0.8);
border: 1px solid #00ff41;
border-radius: 4px;
padding: 15px;
min-width: 250px;
}
.selected-node-info h4 {
color: #00ff41;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}
#nodeDetails {
font-family: 'Courier New', monospace;
font-size: 0.8rem;
line-height: 1.4;
color: #c7c7c7;
}
#nodeDetails .detail-item {
margin-bottom: 8px;
border-bottom: 1px solid #333;
padding-bottom: 4px;
}
#nodeDetails .detail-item:last-child {
border-bottom: none;
}
#nodeDetails .detail-label {
color: #00ff41;
font-weight: bold;
}
#nodeDetails .detail-value {
color: #c7c7c7;
margin-left: 10px;
}
/* Graph Nodes and Links Styling (applied via D3) */
.graph-node {
stroke: #333;
stroke-width: 2px;
cursor: pointer;
transition: all 0.3s ease;
}
.graph-node:hover {
stroke: #fff;
stroke-width: 3px;
}
.graph-node.selected {
stroke: #ff9900;
stroke-width: 4px;
}
.graph-link {
stroke-opacity: 0.6;
stroke-width: 2px;
transition: all 0.3s ease;
}
.graph-link:hover {
stroke-opacity: 1;
stroke-width: 3px;
}
.graph-link.highlighted {
stroke-opacity: 1;
stroke-width: 3px;
}
.graph-label {
font-family: 'Courier New', monospace;
font-size: 10px;
fill: #c7c7c7;
text-anchor: middle;
pointer-events: none;
opacity: 0.8;
}
.graph-label.visible {
opacity: 1;
}
/* Responsive adjustments for graph */
@media (max-width: 768px) {
.graph-container {
height: 400px;
}
.graph-legend {
flex-direction: column;
gap: 15px;
}
.legend-items, .legend-methods {
flex-direction: row;
flex-wrap: wrap;
gap: 15px;
}
.graph-info {
flex-direction: column;
align-items: stretch;
}
.graph-stats {
justify-content: center;
}
.selected-node-info {
min-width: auto;
width: 100%;
}
.graph-tooltip {
font-size: 0.7rem;
max-width: 250px;
}
}

View File

@ -65,6 +65,7 @@
<div class="results-controls">
<button id="showJson" class="btn-secondary">Show JSON</button>
<button id="showText" class="btn-secondary active">Show Text Report</button>
<button id="showGraphView" class="btn-secondary">Show Graph</button>
<button id="downloadJson" class="btn-secondary">Download JSON</button>
<button id="downloadText" class="btn-secondary">Download Text</button>
</div>
@ -73,8 +74,46 @@
<pre id="reportContent"></pre>
</div>
</div>
<div class="graph-section" id="graphSection" style="display: none;">
<h2>Discovery Graph</h2>
<div class="graph-stats">
<div class="stat-item">
<span class="stat-label">Nodes:</span>
<span id="graphNodes" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Edges:</span>
<span id="graphEdges" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Max Depth:</span>
<span id="graphDepth" class="stat-value">0</span>
</div>
</div>
<div class="graph-controls">
<button id="showGraph" class="btn-secondary active">Show Graph</button>
<button id="hideGraph" class="btn-secondary">Hide Graph</button>
<button id="resetZoom" class="btn-secondary">Reset Zoom</button>
<button id="toggleLabels" class="btn-secondary">Show Labels</button>
</div>
<div class="graph-display-area">
<svg id="discoveryGraph"></svg>
<div id="graphTooltip" class="graph-tooltip"></div>
</div>
<div class="selected-node-details" id="selectedNodeInfo" style="display: none;">
<h3>Selected Node Details</h3>
<div id="nodeDetails"></div>
</div>
</div>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/d3/7.8.5/d3.min.js"></script>
<script src="{{ url_for('static', filename='script.js') }}"></script>
</body>
</html>