Compare commits

..

No commits in common. "29e36e34be0be503819a4265490f418e2f3de72d" and "cd80d6f569e7bc2068cbc8ede3cf88d077d8789e" have entirely different histories.

12 changed files with 698 additions and 2627 deletions

View File

@ -1,12 +1,11 @@
# File: src/certificate_checker.py
"""Certificate transparency log checker using crt.sh with forensic operation tracking."""
"""Certificate transparency log checker using crt.sh."""
import requests
import json
import time
import logging
import socket
import uuid
from datetime import datetime
from typing import List, Optional, Set
from .data_structures import Certificate
@ -16,7 +15,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class CertificateChecker:
"""Check certificates using crt.sh with simple query caching and forensic tracking."""
"""Check certificates using crt.sh."""
CRT_SH_URL = "https://crt.sh/"
@ -25,26 +24,27 @@ class CertificateChecker:
self.last_request = 0
self.query_count = 0
self.connection_failures = 0
self.max_connection_failures = 3
self.max_connection_failures = 3 # Stop trying after 3 consecutive failures
# Simple HTTP request cache to avoid duplicate queries
self._http_cache = {} # query_string -> List[Certificate]
logger.info("🔐 Certificate checker initialized")
logger.info("🔐 Certificate checker initialized with HTTP request caching")
# Test connectivity to crt.sh on initialization
self._test_connectivity()
def _test_connectivity(self):
"""Test if we can reach crt.sh."""
try:
logger.info("Testing connectivity to crt.sh...")
logger.info("🔗 Testing connectivity to crt.sh...")
# First test DNS resolution
try:
socket.gethostbyname('crt.sh')
logger.debug("DNS resolution for crt.sh successful")
logger.debug("DNS resolution for crt.sh successful")
except socket.gaierror as e:
logger.warning(f"DNS resolution failed for crt.sh: {e}")
logger.warning(f"⚠️ DNS resolution failed for crt.sh: {e}")
return False
# Test HTTP connection with a simple request
response = requests.get(
self.CRT_SH_URL,
params={'q': 'example.com', 'output': 'json'},
@ -52,21 +52,21 @@ class CertificateChecker:
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
if response.status_code in [200, 404]:
logger.info("crt.sh connectivity test successful")
if response.status_code in [200, 404]: # 404 is also acceptable (no results)
logger.info("crt.sh connectivity test successful")
return True
else:
logger.warning(f"crt.sh returned status {response.status_code}")
logger.warning(f"⚠️ crt.sh returned status {response.status_code}")
return False
except requests.exceptions.ConnectionError as e:
logger.warning(f"Cannot reach crt.sh: {e}")
logger.warning(f"⚠️ Cannot reach crt.sh: {e}")
return False
except requests.exceptions.Timeout:
logger.warning("crt.sh connectivity test timed out")
logger.warning("⚠️ crt.sh connectivity test timed out")
return False
except Exception as e:
logger.warning(f"Unexpected error testing crt.sh connectivity: {e}")
logger.warning(f"⚠️ Unexpected error testing crt.sh connectivity: {e}")
return False
def _rate_limit(self):
@ -77,32 +77,30 @@ class CertificateChecker:
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
logger.debug(f"⏸️ crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
self.query_count += 1
def get_certificates(self, domain: str, operation_id: Optional[str] = None) -> List[Certificate]:
"""Get certificates for a domain with forensic tracking."""
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔐 Getting certificates for domain: {domain} (operation: {operation_id})")
def get_certificates(self, domain: str) -> List[Certificate]:
"""Get certificates for a domain from crt.sh."""
logger.debug(f"🔍 Getting certificates for domain: {domain}")
# Skip if we've had too many connection failures
if self.connection_failures >= self.max_connection_failures:
logger.warning(f"Skipping certificate lookup for {domain} due to repeated connection failures")
logger.warning(f"⚠️ Skipping certificate lookup for {domain} due to repeated connection failures")
return []
certificates = []
# Query for the domain itself
domain_certs = self._query_crt_sh(domain, operation_id)
# Query for the domain
domain_certs = self._query_crt_sh(domain)
certificates.extend(domain_certs)
# Query for wildcard certificates
wildcard_certs = self._query_crt_sh(f"%.{domain}", operation_id)
# Also query for wildcard certificates (if the main query succeeded)
if domain_certs or self.connection_failures < self.max_connection_failures:
wildcard_certs = self._query_crt_sh(f"%.{domain}")
certificates.extend(wildcard_certs)
# Remove duplicates based on certificate ID
@ -110,61 +108,21 @@ class CertificateChecker:
final_certs = list(unique_certs.values())
if final_certs:
logger.info(f" Found {len(final_certs)} unique certificates for {domain}")
logger.info(f"📜 Found {len(final_certs)} unique certificates for {domain}")
else:
logger.debug(f" No certificates found for {domain}")
logger.debug(f" No certificates found for {domain}")
return final_certs
def _query_crt_sh(self, query: str, operation_id: str) -> List[Certificate]:
"""Query crt.sh API with HTTP caching and forensic tracking."""
# Check HTTP cache first
cache_key = f"{query}:{operation_id}" # Include operation_id in cache key
if query in self._http_cache:
logger.debug(f"Using cached HTTP result for crt.sh query: {query}")
# Need to create new Certificate objects with current operation_id
cached_certs = self._http_cache[query]
return [
Certificate(
id=cert.id,
issuer=cert.issuer,
subject=cert.subject,
not_before=cert.not_before,
not_after=cert.not_after,
is_wildcard=cert.is_wildcard,
operation_id=operation_id # Use current operation_id
) for cert in cached_certs
]
# Not cached, make the HTTP request
certificates = self._make_http_request(query, operation_id)
# Cache the HTTP result (without operation_id)
if certificates:
self._http_cache[query] = [
Certificate(
id=cert.id,
issuer=cert.issuer,
subject=cert.subject,
not_before=cert.not_before,
not_after=cert.not_after,
is_wildcard=cert.is_wildcard,
operation_id="" # Cache without operation_id
) for cert in certificates
]
return certificates
def _make_http_request(self, query: str, operation_id: str) -> List[Certificate]:
"""Make actual HTTP request to crt.sh API with retry logic and forensic tracking."""
def _query_crt_sh(self, query: str) -> List[Certificate]:
"""Query crt.sh API with retry logic and better error handling."""
certificates = []
self._rate_limit()
logger.debug(f"🌐 Making HTTP request to crt.sh for: {query} (operation: {operation_id})")
logger.debug(f"📡 Querying crt.sh for: {query}")
max_retries = 2
backoff_delays = [1, 3]
max_retries = 2 # Reduced retries for faster failure
backoff_delays = [1, 3] # Shorter delays
for attempt in range(max_retries):
try:
@ -185,53 +143,54 @@ class CertificateChecker:
if response.status_code == 200:
try:
data = response.json()
logger.debug(f"crt.sh returned {len(data)} certificate entries for {query}")
logger.debug(f"📊 crt.sh returned {len(data)} certificate entries for {query}")
for cert_data in data:
try:
# Parse dates with better error handling
not_before = self._parse_date(cert_data.get('not_before'))
not_after = self._parse_date(cert_data.get('not_after'))
if not_before and not_after:
# Create Certificate with forensic metadata
certificate = Certificate(
id=cert_data.get('id'),
issuer=cert_data.get('issuer_name', ''),
subject=cert_data.get('name_value', ''),
not_before=not_before,
not_after=not_after,
is_wildcard='*.' in cert_data.get('name_value', ''),
operation_id=operation_id # Forensic tracking
is_wildcard='*.' in cert_data.get('name_value', '')
)
certificates.append(certificate)
logger.debug(f"📋 Parsed certificate ID {certificate.id} for {query}")
logger.debug(f" Parsed certificate ID {certificate.id} for {query}")
else:
logger.debug(f"Skipped certificate with invalid dates: {cert_data.get('id')}")
logger.debug(f"⚠️ Skipped certificate with invalid dates: {cert_data.get('id')}")
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"Error parsing certificate data: {e}")
continue
logger.debug(f"⚠️ Error parsing certificate data: {e}")
continue # Skip malformed certificate data
# Success! Reset connection failure counter
self.connection_failures = 0
logger.info(f"✅ Successfully processed {len(certificates)} certificates from crt.sh for {query}")
return certificates
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}")
logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
return certificates
elif response.status_code == 404:
# 404 is normal - no certificates found
logger.debug(f" No certificates found for {query} (404)")
self.connection_failures = 0
self.connection_failures = 0 # Reset counter for successful connection
return certificates
elif response.status_code == 429:
logger.warning(f"⚠️ crt.sh rate limit exceeded for {query}")
if attempt < max_retries - 1:
time.sleep(5)
time.sleep(5) # Wait longer for rate limits
continue
return certificates
@ -244,8 +203,9 @@ class CertificateChecker:
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
error_type = "Connection Error" if isinstance(e, requests.exceptions.ConnectionError) else "Timeout"
logger.warning(f"⚠️ crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}")
logger.warning(f"🌐 crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}")
# Track connection failures
if isinstance(e, requests.exceptions.ConnectionError):
self.connection_failures += 1
@ -264,7 +224,8 @@ class CertificateChecker:
time.sleep(backoff_delays[attempt])
continue
logger.warning(f"⚠️ All {max_retries} attempts failed for crt.sh query: {query}")
# If we get here, all retries failed
logger.warning(f"❌ All {max_retries} attempts failed for crt.sh query: {query}")
return certificates
def _parse_date(self, date_str: str) -> Optional[datetime]:
@ -272,12 +233,13 @@ class CertificateChecker:
if not date_str:
return None
# Common date formats from crt.sh
date_formats = [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%fZ',
'%Y-%m-%dT%H:%M:%S', # ISO format without timezone
'%Y-%m-%dT%H:%M:%SZ', # ISO format with Z
'%Y-%m-%d %H:%M:%S', # Space separated
'%Y-%m-%dT%H:%M:%S.%f', # With microseconds
'%Y-%m-%dT%H:%M:%S.%fZ', # With microseconds and Z
]
for fmt in date_formats:
@ -286,12 +248,13 @@ class CertificateChecker:
except ValueError:
continue
# Try with timezone info
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
pass
logger.debug(f"Could not parse date: {date_str}")
logger.debug(f"⚠️ Could not parse date: {date_str}")
return None
def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]:
@ -302,6 +265,7 @@ class CertificateChecker:
for cert in certificates:
# Parse subject field for domain names
# Certificate subjects can be multi-line with multiple domains
subject_lines = cert.subject.split('\n')
for line in subject_lines:
@ -309,7 +273,7 @@ class CertificateChecker:
# Skip wildcard domains for recursion (they don't resolve directly)
if line.startswith('*.'):
logger.debug(f" Skipping wildcard domain: {line}")
logger.debug(f"🌿 Skipping wildcard domain: {line}")
continue
if self._is_valid_domain(line):
@ -319,7 +283,7 @@ class CertificateChecker:
if subdomains:
logger.info(f"🌿 Extracted {len(subdomains)} subdomains from certificates")
else:
logger.debug(" No subdomains extracted from certificates")
logger.debug(" No subdomains extracted from certificates")
return subdomains
@ -328,17 +292,20 @@ class CertificateChecker:
if not domain or '.' not in domain:
return False
# Remove common prefixes
domain = domain.lower().strip()
if domain.startswith('www.'):
domain = domain[4:]
# Basic validation
if len(domain) < 3 or len(domain) > 255:
return False
# Must not be an IP address
try:
import socket
socket.inet_aton(domain)
return False
return False # It's an IPv4 address
except socket.error:
pass
@ -347,6 +314,7 @@ class CertificateChecker:
if len(parts) < 2:
return False
# Each part should be reasonable
for part in parts:
if len(part) < 1 or len(part) > 63:
return False

View File

@ -1,85 +1,38 @@
# File: src/data_structures.py
"""Enhanced data structures for forensic DNS reconnaissance with full provenance tracking."""
"""Data structures for storing reconnaissance results."""
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional, Any, Tuple
from typing import Dict, List, Set, Optional, Any
from datetime import datetime
from enum import Enum
import json
import logging
import uuid
# Set up logging for this module
logger = logging.getLogger(__name__)
class OperationType(Enum):
"""Types of discovery operations."""
INITIAL_TARGET = "initial_target"
TLD_EXPANSION = "tld_expansion"
DNS_A = "dns_a"
DNS_AAAA = "dns_aaaa"
DNS_MX = "dns_mx"
DNS_NS = "dns_ns"
DNS_TXT = "dns_txt"
DNS_CNAME = "dns_cname"
DNS_SOA = "dns_soa"
DNS_PTR = "dns_ptr"
DNS_SRV = "dns_srv"
DNS_CAA = "dns_caa"
DNS_DNSKEY = "dns_dnskey"
DNS_DS = "dns_ds"
DNS_RRSIG = "dns_rrsig"
DNS_NSEC = "dns_nsec"
DNS_NSEC3 = "dns_nsec3"
DNS_REVERSE = "dns_reverse"
CERTIFICATE_CHECK = "certificate_check"
SHODAN_LOOKUP = "shodan_lookup"
VIRUSTOTAL_IP = "virustotal_ip"
VIRUSTOTAL_DOMAIN = "virustotal_domain"
class DiscoveryMethod(Enum):
"""How a hostname was discovered."""
INITIAL_TARGET = "initial_target"
TLD_EXPANSION = "tld_expansion"
DNS_RECORD_VALUE = "dns_record_value"
CERTIFICATE_SUBJECT = "certificate_subject"
DNS_SUBDOMAIN_EXTRACTION = "dns_subdomain_extraction"
@dataclass
class DNSRecord:
"""DNS record information with enhanced metadata."""
"""DNS record information."""
record_type: str
value: str
ttl: Optional[int] = None
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
'record_type': self.record_type,
'value': self.value,
'ttl': self.ttl,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
'ttl': self.ttl
}
@dataclass
class Certificate:
"""Certificate information with enhanced metadata."""
"""Certificate information from crt.sh."""
id: int
issuer: str
subject: str
not_before: datetime
not_after: datetime
is_wildcard: bool = False
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
is_valid_now: bool = field(default=True) # Based on current timestamp vs cert validity
def __post_init__(self):
"""Calculate if certificate is currently valid."""
now = datetime.now()
self.is_valid_now = self.not_before <= now <= self.not_after
def to_dict(self) -> dict:
return {
@ -88,22 +41,17 @@ class Certificate:
'subject': self.subject,
'not_before': self.not_before.isoformat() if self.not_before else None,
'not_after': self.not_after.isoformat() if self.not_after else None,
'is_wildcard': self.is_wildcard,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id,
'is_valid_now': self.is_valid_now
'is_wildcard': self.is_wildcard
}
@dataclass
class ShodanResult:
"""Shodan scan result with metadata."""
"""Shodan scan result."""
ip: str
ports: List[int]
services: Dict[str, Any]
organization: Optional[str] = None
country: Optional[str] = None
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
@ -111,21 +59,17 @@ class ShodanResult:
'ports': self.ports,
'services': self.services,
'organization': self.organization,
'country': self.country,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
'country': self.country
}
@dataclass
class VirusTotalResult:
"""VirusTotal scan result with metadata."""
"""VirusTotal scan result."""
resource: str # IP or domain
positives: int
total: int
scan_date: datetime
permalink: str
discovered_at: datetime = field(default_factory=datetime.now)
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
def to_dict(self) -> dict:
return {
@ -133,331 +77,118 @@ class VirusTotalResult:
'positives': self.positives,
'total': self.total,
'scan_date': self.scan_date.isoformat() if self.scan_date else None,
'permalink': self.permalink,
'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None,
'operation_id': self.operation_id
'permalink': self.permalink
}
@dataclass
class DiscoveryOperation:
"""A single discovery operation with complete metadata."""
operation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
operation_type: OperationType = OperationType.INITIAL_TARGET
target: str = ""
timestamp: datetime = field(default_factory=datetime.now)
success: bool = True
error_message: Optional[str] = None
class ReconData:
"""Main data structure for reconnaissance results."""
# Results of the operation
dns_records: List[DNSRecord] = field(default_factory=list)
certificates: List[Certificate] = field(default_factory=list)
shodan_results: List[ShodanResult] = field(default_factory=list)
virustotal_results: List[VirusTotalResult] = field(default_factory=list)
discovered_hostnames: List[str] = field(default_factory=list) # New hostnames found
discovered_ips: List[str] = field(default_factory=list) # New IPs found
# Operation-specific metadata
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
'operation_id': self.operation_id,
'operation_type': self.operation_type.value,
'target': self.target,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'success': self.success,
'error_message': self.error_message,
'dns_records': [record.to_dict() for record in self.dns_records],
'certificates': [cert.to_dict() for cert in self.certificates],
'shodan_results': [result.to_dict() for result in self.shodan_results],
'virustotal_results': [result.to_dict() for result in self.virustotal_results],
'discovered_hostnames': self.discovered_hostnames,
'discovered_ips': self.discovered_ips,
'metadata': self.metadata
}
@dataclass
class DiscoveryEdge:
"""Represents a discovery relationship between two hostnames."""
source_hostname: str # The hostname that led to the discovery
target_hostname: str # The hostname that was discovered
discovery_method: DiscoveryMethod
operation_id: str
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict) # Additional context
def to_dict(self) -> dict:
return {
'source_hostname': self.source_hostname,
'target_hostname': self.target_hostname,
'discovery_method': self.discovery_method.value,
'operation_id': self.operation_id,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
'metadata': self.metadata
}
@dataclass
class DiscoveryNode:
"""A discovered hostname with complete provenance and current state."""
hostname: str
depth: int = 0
first_seen: datetime = field(default_factory=datetime.now)
last_updated: datetime = field(default_factory=datetime.now)
# Current validity state
dns_exists: Optional[bool] = None # Does this hostname resolve?
last_dns_check: Optional[datetime] = None
resolved_ips: Set[str] = field(default_factory=set)
# Discovery provenance - can have multiple discovery paths
discovery_methods: Set[DiscoveryMethod] = field(default_factory=set)
discovered_by_operations: Set[str] = field(default_factory=set) # operation_ids
# Associated data (current state)
dns_records_by_type: Dict[str, List[DNSRecord]] = field(default_factory=dict)
certificates: List[Certificate] = field(default_factory=list)
shodan_results: List[ShodanResult] = field(default_factory=list)
virustotal_results: List[VirusTotalResult] = field(default_factory=list)
reverse_dns: Optional[str] = None
def add_dns_record(self, record: DNSRecord) -> None:
"""Add a DNS record to this node."""
record_type = record.record_type
if record_type not in self.dns_records_by_type:
self.dns_records_by_type[record_type] = []
self.dns_records_by_type[record_type].append(record)
self.last_updated = datetime.now()
# Update resolved IPs if this is an A or AAAA record
if record_type in ['A', 'AAAA']:
self.resolved_ips.add(record.value)
self.dns_exists = True
self.last_dns_check = record.discovered_at
def add_certificate(self, certificate: Certificate) -> None:
"""Add a certificate to this node."""
self.certificates.append(certificate)
self.last_updated = datetime.now()
def get_all_dns_records(self) -> List[DNSRecord]:
"""Get all DNS records for this node."""
all_records = []
for records in self.dns_records_by_type.values():
all_records.extend(records)
return all_records
def get_current_certificates(self) -> List[Certificate]:
"""Get currently valid certificates."""
return [cert for cert in self.certificates if cert.is_valid_now]
def get_expired_certificates(self) -> List[Certificate]:
"""Get expired certificates."""
return [cert for cert in self.certificates if not cert.is_valid_now]
def to_dict(self) -> dict:
return {
'hostname': self.hostname,
'depth': self.depth,
'first_seen': self.first_seen.isoformat() if self.first_seen else None,
'last_updated': self.last_updated.isoformat() if self.last_updated else None,
'dns_exists': self.dns_exists,
'last_dns_check': self.last_dns_check.isoformat() if self.last_dns_check else None,
'resolved_ips': sorted(list(self.resolved_ips)),
'discovery_methods': [method.value for method in self.discovery_methods],
'discovered_by_operations': sorted(list(self.discovered_by_operations)),
'dns_records_by_type': {
record_type: [record.to_dict() for record in records]
for record_type, records in self.dns_records_by_type.items()
},
'certificates': [cert.to_dict() for cert in self.certificates],
'current_certificates': [cert.to_dict() for cert in self.get_current_certificates()],
'expired_certificates': [cert.to_dict() for cert in self.get_expired_certificates()],
'shodan_results': [result.to_dict() for result in self.shodan_results],
'virustotal_results': [result.to_dict() for result in self.virustotal_results],
'reverse_dns': self.reverse_dns
}
@dataclass
class ForensicReconData:
"""Enhanced reconnaissance data with full forensic tracking and graph structure."""
# Core graph structure
nodes: Dict[str, DiscoveryNode] = field(default_factory=dict) # hostname -> node
edges: List[DiscoveryEdge] = field(default_factory=list)
operations: Dict[str, DiscoveryOperation] = field(default_factory=dict) # operation_id -> operation
# Quick lookup indexes
# Core data
hostnames: Set[str] = field(default_factory=set)
ip_addresses: Set[str] = field(default_factory=set)
operation_timeline: List[str] = field(default_factory=list) # ordered operation_ids
# DNS information
dns_records: Dict[str, List[DNSRecord]] = field(default_factory=dict)
reverse_dns: Dict[str, str] = field(default_factory=dict)
# Certificate information
certificates: Dict[str, List[Certificate]] = field(default_factory=dict)
# External service results
shodan_results: Dict[str, ShodanResult] = field(default_factory=dict)
virustotal_results: Dict[str, VirusTotalResult] = field(default_factory=dict)
# Metadata
start_time: datetime = field(default_factory=datetime.now)
end_time: Optional[datetime] = None
scan_config: Dict[str, Any] = field(default_factory=dict)
depth_map: Dict[str, int] = field(default_factory=dict) # Track recursion depth
def add_node(self, hostname: str, depth: int = 0,
discovery_method: DiscoveryMethod = DiscoveryMethod.INITIAL_TARGET,
operation_id: Optional[str] = None) -> DiscoveryNode:
"""Add or get a discovery node."""
def add_hostname(self, hostname: str, depth: int = 0) -> None:
"""Add a hostname to the dataset."""
hostname = hostname.lower()
self.hostnames.add(hostname)
self.depth_map[hostname] = depth
logger.info(f"Added hostname: {hostname} (depth: {depth})")
if hostname not in self.nodes:
node = DiscoveryNode(hostname=hostname, depth=depth)
self.nodes[hostname] = node
logger.debug(f"Created new node: {hostname} at depth {depth}")
else:
node = self.nodes[hostname]
# Update depth if this is a shallower discovery
if depth < node.depth:
node.depth = depth
logger.debug(f"Updated node {hostname} depth: {node.depth} -> {depth}")
def add_ip_address(self, ip: str) -> None:
"""Add an IP address to the dataset."""
self.ip_addresses.add(ip)
logger.info(f"Added IP address: {ip}")
# Track discovery method and operation
node.discovery_methods.add(discovery_method)
if operation_id:
node.discovered_by_operations.add(operation_id)
return node
def add_edge(self, source: str, target: str, discovery_method: DiscoveryMethod,
operation_id: str, metadata: Dict[str, Any] = None) -> None:
"""Add a discovery edge."""
edge = DiscoveryEdge(
source_hostname=source.lower(),
target_hostname=target.lower(),
discovery_method=discovery_method,
operation_id=operation_id,
timestamp=datetime.now(),
metadata=metadata or {}
)
self.edges.append(edge)
logger.debug(f"Added edge: {source} -> {target} via {discovery_method.value}")
def add_operation(self, operation: DiscoveryOperation) -> None:
"""Add an operation to the timeline."""
self.operations[operation.operation_id] = operation
self.operation_timeline.append(operation.operation_id)
logger.debug(f"Added operation: {operation.operation_type.value} on {operation.target}")
def get_node(self, hostname: str) -> Optional[DiscoveryNode]:
"""Get a node by hostname."""
return self.nodes.get(hostname.lower())
def get_children(self, hostname: str) -> List[str]:
"""Get all hostnames discovered from this hostname."""
def add_dns_record(self, hostname: str, record: DNSRecord) -> None:
"""Add a DNS record for a hostname."""
hostname = hostname.lower()
children = []
for edge in self.edges:
if edge.source_hostname == hostname:
children.append(edge.target_hostname)
return children
if hostname not in self.dns_records:
self.dns_records[hostname] = []
self.dns_records[hostname].append(record)
logger.debug(f"Added DNS record for {hostname}: {record.record_type} -> {record.value}")
def get_parents(self, hostname: str) -> List[str]:
"""Get all hostnames that led to discovering this hostname."""
hostname = hostname.lower()
parents = []
for edge in self.edges:
if edge.target_hostname == hostname:
parents.append(edge.source_hostname)
return parents
def add_shodan_result(self, ip: str, result: ShodanResult) -> None:
"""Add Shodan result."""
self.shodan_results[ip] = result
logger.info(f"Added Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}")
def get_discovery_path(self, hostname: str) -> List[Tuple[str, str, DiscoveryMethod]]:
"""Get the discovery path(s) to a hostname."""
hostname = hostname.lower()
paths = []
def add_virustotal_result(self, resource: str, result: VirusTotalResult) -> None:
"""Add VirusTotal result."""
self.virustotal_results[resource] = result
logger.info(f"Added VirusTotal result for {resource}: {result.positives}/{result.total} detections")
def trace_path(current: str, visited: Set[str], current_path: List):
if current in visited:
return # Avoid cycles
visited.add(current)
parents = self.get_parents(current)
if not parents:
# This is a root node, we have a complete path
paths.append(current_path.copy())
else:
for parent in parents:
# Find the edge that connects parent to current
for edge in self.edges:
if edge.source_hostname == parent and edge.target_hostname == current:
new_path = [(parent, current, edge.discovery_method)] + current_path
trace_path(parent, visited.copy(), new_path)
trace_path(hostname, set(), [])
return paths
def get_new_subdomains(self, max_depth: int) -> Set[str]:
"""Get subdomains that haven't been processed yet and are within depth limit."""
new_domains = set()
for hostname in self.hostnames:
if (hostname not in self.dns_records and
self.depth_map.get(hostname, 0) < max_depth):
new_domains.add(hostname)
return new_domains
def get_stats(self) -> Dict[str, int]:
"""Get current statistics."""
total_dns_records = sum(len(node.get_all_dns_records()) for node in self.nodes.values())
total_certificates = sum(len(node.certificates) for node in self.nodes.values())
current_certificates = sum(len(node.get_current_certificates()) for node in self.nodes.values())
expired_certificates = sum(len(node.get_expired_certificates()) for node in self.nodes.values())
total_shodan = sum(len(node.shodan_results) for node in self.nodes.values())
total_virustotal = sum(len(node.virustotal_results) for node in self.nodes.values())
return {
'hostnames': len(self.nodes),
'hostnames': len(self.hostnames),
'ip_addresses': len(self.ip_addresses),
'discovery_edges': len(self.edges),
'operations_performed': len(self.operations),
'dns_records': total_dns_records,
'certificates_total': total_certificates,
'certificates_current': current_certificates,
'certificates_expired': expired_certificates,
'shodan_results': total_shodan,
'virustotal_results': total_virustotal
'dns_records': sum(len(records) for records in self.dns_records.values()),
'certificates': sum(len(certs) for certs in self.certificates.values()),
'shodan_results': len(self.shodan_results),
'virustotal_results': len(self.virustotal_results)
}
def to_dict(self) -> dict:
"""Export data as a serializable dictionary."""
logger.info(f"Serializing ForensicReconData with {len(self.nodes)} nodes, {len(self.edges)} edges, {len(self.operations)} operations")
logger.debug(f"Serializing ReconData with stats: {self.get_stats()}")
return {
'nodes': {hostname: node.to_dict() for hostname, node in self.nodes.items()},
'edges': [edge.to_dict() for edge in self.edges],
'operations': {op_id: op.to_dict() for op_id, op in self.operations.items()},
'operation_timeline': self.operation_timeline,
result = {
'hostnames': sorted(list(self.hostnames)),
'ip_addresses': sorted(list(self.ip_addresses)),
'dns_records': {
host: [record.to_dict() for record in records]
for host, records in self.dns_records.items()
},
'reverse_dns': dict(self.reverse_dns),
'certificates': {
host: [cert.to_dict() for cert in certs]
for host, certs in self.certificates.items()
},
'shodan_results': {
ip: result.to_dict() for ip, result in self.shodan_results.items()
},
'virustotal_results': {
resource: result.to_dict() for resource, result in self.virustotal_results.items()
},
'depth_map': dict(self.depth_map),
'metadata': {
'start_time': self.start_time.isoformat() if self.start_time else None,
'end_time': self.end_time.isoformat() if self.end_time else None,
'scan_config': self.scan_config,
'stats': self.get_stats()
},
'graph_analysis': self._generate_graph_analysis()
}
}
def _generate_graph_analysis(self) -> Dict[str, Any]:
"""Generate graph analysis metadata."""
# Depth distribution
depth_distribution = {}
for node in self.nodes.values():
depth = node.depth
depth_distribution[depth] = depth_distribution.get(depth, 0) + 1
logger.info(f"Serialized data contains: {len(result['hostnames'])} hostnames, "
f"{len(result['ip_addresses'])} IPs, {len(result['shodan_results'])} Shodan results, "
f"{len(result['virustotal_results'])} VirusTotal results")
# Root nodes (no parents)
root_nodes = [hostname for hostname in self.nodes.keys()
if not self.get_parents(hostname)]
# Leaf nodes (no children)
leaf_nodes = [hostname for hostname in self.nodes.keys()
if not self.get_children(hostname)]
# Discovery method distribution
method_distribution = {}
for edge in self.edges:
method = edge.discovery_method.value
method_distribution[method] = method_distribution.get(method, 0) + 1
return {
'depth_distribution': depth_distribution,
'max_depth': max(depth_distribution.keys()) if depth_distribution else 0,
'root_nodes': root_nodes,
'leaf_nodes': leaf_nodes,
'discovery_method_distribution': method_distribution,
'total_discovery_paths': len(self.edges)
}
return result
def to_json(self) -> str:
"""Export data as JSON."""
@ -465,11 +196,9 @@ class ForensicReconData:
return json.dumps(self.to_dict(), indent=2, ensure_ascii=False)
except Exception as e:
logger.error(f"Failed to serialize to JSON: {e}")
# Return minimal JSON in case of error
return json.dumps({
'error': str(e),
'stats': self.get_stats(),
'timestamp': datetime.now().isoformat()
}, indent=2)
# Backward compatibility aliases (for gradual migration)
ReconData = ForensicReconData

View File

@ -1,5 +1,5 @@
# File: src/dns_resolver.py
"""DNS resolution functionality with enhanced TLD testing and forensic operation tracking."""
"""DNS resolution functionality with enhanced TLD testing."""
import dns.resolver
import dns.reversename
@ -9,7 +9,6 @@ from typing import List, Dict, Optional, Set
import socket
import time
import logging
import uuid
from .data_structures import DNSRecord, ReconData
from .config import Config
@ -17,7 +16,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class DNSResolver:
"""DNS resolution and record lookup with optimized TLD testing and forensic tracking."""
"""DNS resolution and record lookup with optimized TLD testing."""
# All DNS record types to query
RECORD_TYPES = [
@ -81,7 +80,7 @@ class DNSResolver:
return ips
def resolve_hostname(self, hostname: str, operation_id: Optional[str] = None) -> List[str]:
def resolve_hostname(self, hostname: str) -> List[str]:
"""Resolve hostname to IP addresses (full resolution with retries)."""
ips = []
@ -127,16 +126,12 @@ class DNSResolver:
return unique_ips
def get_all_dns_records(self, hostname: str, operation_id: Optional[str] = None) -> List[DNSRecord]:
"""Get all DNS records for a hostname with forensic tracking."""
def get_all_dns_records(self, hostname: str) -> List[DNSRecord]:
"""Get all DNS records for a hostname."""
records = []
successful_queries = 0
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"📋 Getting all DNS records for: {hostname} (operation: {operation_id})")
logger.debug(f"📋 Getting all DNS records for: {hostname}")
for record_type in self.RECORD_TYPES:
type_found = False
@ -150,15 +145,11 @@ class DNSResolver:
try:
answers = resolver.resolve(hostname, record_type)
for answer in answers:
# Create DNSRecord with forensic metadata
record = DNSRecord(
records.append(DNSRecord(
record_type=record_type,
value=str(answer),
ttl=answers.ttl,
operation_id=operation_id # Forensic tracking
)
records.append(record)
ttl=answers.ttl
))
if not type_found:
logger.debug(f"✅ Found {record_type} record for {hostname}: {answer}")
type_found = True
@ -188,56 +179,9 @@ class DNSResolver:
return records
def query_specific_record_type(self, hostname: str, record_type: str, operation_id: Optional[str] = None) -> List[DNSRecord]:
"""Query a specific DNS record type with forensic tracking."""
records = []
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🎯 Querying {record_type} records for {hostname} (operation: {operation_id})")
for dns_server in self.config.DNS_SERVERS:
self._rate_limit()
resolver = dns.resolver.Resolver()
resolver.nameservers = [dns_server]
resolver.timeout = self.config.DNS_TIMEOUT
try:
answers = resolver.resolve(hostname, record_type)
for answer in answers:
# Create DNSRecord with forensic metadata
record = DNSRecord(
record_type=record_type,
value=str(answer),
ttl=answers.ttl,
operation_id=operation_id # Forensic tracking
)
records.append(record)
logger.debug(f"{record_type} record for {hostname}: {answer}")
break # Found records, no need to query other DNS servers
except dns.resolver.NXDOMAIN:
logger.debug(f"❌ NXDOMAIN for {hostname} {record_type} on {dns_server}")
break # Domain doesn't exist, no point checking other servers
except dns.resolver.NoAnswer:
logger.debug(f"⚠️ No {record_type} record for {hostname} on {dns_server}")
continue # Try next DNS server
except dns.resolver.Timeout:
logger.debug(f"⏱️ Timeout for {hostname} {record_type} on {dns_server}")
continue # Try next DNS server
except Exception as e:
logger.debug(f"⚠️ Error querying {record_type} for {hostname} on {dns_server}: {e}")
continue # Try next DNS server
logger.debug(f"🎯 Found {len(records)} {record_type} records for {hostname}")
return records
def reverse_dns_lookup(self, ip: str, operation_id: Optional[str] = None) -> Optional[str]:
"""Perform reverse DNS lookup with forensic tracking."""
logger.debug(f"🔍 Reverse DNS lookup for: {ip} (operation: {operation_id or 'auto'})")
def reverse_dns_lookup(self, ip: str) -> Optional[str]:
"""Perform reverse DNS lookup."""
logger.debug(f"🔍 Reverse DNS lookup for: {ip}")
try:
self._rate_limit()

View File

@ -1,5 +1,5 @@
# File: src/main.py
"""Main CLI interface for the forensic reconnaissance tool with two-mode operation."""
"""Main CLI interface for the reconnaissance tool."""
import click
import json
@ -7,8 +7,8 @@ import sys
import logging
from pathlib import Path
from .config import Config
from .reconnaissance import ForensicReconnaissanceEngine
from .report_generator import ForensicReportGenerator
from .reconnaissance import ReconnaissanceEngine
from .report_generator import ReportGenerator
from .web_app import create_app
# Module logger
@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
@click.option('--web', is_flag=True, help='Start web interface instead of CLI')
@click.option('--shodan-key', help='Shodan API key')
@click.option('--virustotal-key', help='VirusTotal API key')
@click.option('--max-depth', default=2, help='Maximum recursion depth for full domain mode (default: 2)')
@click.option('--max-depth', default=2, help='Maximum recursion depth (default: 2)')
@click.option('--output', '-o', help='Output file prefix (will create .json and .txt files)')
@click.option('--json-only', is_flag=True, help='Only output JSON')
@click.option('--text-only', is_flag=True, help='Only output text report')
@ -27,25 +27,14 @@ logger = logging.getLogger(__name__)
@click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging (DEBUG level)')
@click.option('--quiet', '-q', is_flag=True, help='Quiet mode (WARNING level only)')
def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, text_only, port, verbose, quiet):
"""Forensic DNS Reconnaissance Tool - Two-Mode Operation
MODE 1 - Hostname-only (e.g., 'cc24'):
Expands hostname to all TLDs (cc24.com, cc24.net, etc.)
No recursive enumeration to avoid third-party infrastructure noise
Perfect for discovering domains using a specific hostname
MODE 2 - Full domain (e.g., 'cc24.com'):
Full recursive reconnaissance with subdomain discovery
Maps complete infrastructure of the specified domain
Uses max-depth for recursive enumeration
Creates forensic-grade evidence chain with operation tracking
"""DNS Reconnaissance Tool
Examples:
recon cc24 # Mode 1: Find all cc24.* domains (no recursion)
recon cc24.com # Mode 2: Map cc24.com infrastructure (with forensic tracking)
recon cc24.com --max-depth 3 # Mode 2: Deeper recursive enumeration
recon cc24 -v # Mode 1: Verbose TLD expansion
recon --web # Start web interface with forensic visualization
recon example.com # Scan example.com
recon example # Try example.* for all TLDs
recon example.com --max-depth 3 # Deeper recursion
recon example.com -v # Verbose logging
recon --web # Start web interface
"""
# Determine log level
@ -62,19 +51,19 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
if web:
# Start web interface
logger.info("🌐 Starting forensic web interface...")
logger.info("🌐 Starting web interface...")
app = create_app(config)
logger.info(f"🚀 Forensic web interface starting on http://0.0.0.0:{port}")
app.run(host='0.0.0.0', port=port, debug=False)
logger.info(f"🚀 Web interface starting on http://0.0.0.0:{port}")
app.run(host='0.0.0.0', port=port, debug=False) # Changed debug to False to reduce noise
return
if not target:
click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.")
click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.")
sys.exit(1)
# Initialize forensic reconnaissance engine
logger.info("🔬 Initializing forensic reconnaissance engine...")
engine = ForensicReconnaissanceEngine(config)
# Initialize reconnaissance engine
logger.info("🔧 Initializing reconnaissance engine...")
engine = ReconnaissanceEngine(config)
# Set up progress callback for CLI
def progress_callback(message, percentage=None):
@ -86,69 +75,56 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
engine.set_progress_callback(progress_callback)
# Display startup information
click.echo("=" * 80)
click.echo("FORENSIC DNS RECONNAISSANCE TOOL")
click.echo("=" * 80)
click.echo(f"Target: {target}")
# Show operation mode
if '.' in target:
click.echo(f"Mode: Full domain reconnaissance (recursive depth: {max_depth})")
click.echo(" → Will map complete infrastructure with forensic tracking")
click.echo(" → Creates evidence chain for each discovery operation")
else:
click.echo(f"Mode: Hostname-only reconnaissance (TLD expansion)")
click.echo(" → Will find all domains using this hostname (no recursion)")
click.echo(" → Limited forensic tracking for efficiency")
click.echo(f"DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}")
click.echo(f"DNS rate limit: {config.DNS_RATE_LIMIT}/s")
click.echo("=" * 60)
click.echo("🔍 DNS RECONNAISSANCE TOOL")
click.echo("=" * 60)
click.echo(f"🎯 Target: {target}")
click.echo(f"📊 Max recursion depth: {max_depth}")
click.echo(f"🌐 DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}")
click.echo(f"⚡ DNS rate limit: {config.DNS_RATE_LIMIT}/s")
if shodan_key:
click.echo("🕵️ Shodan integration enabled")
click.echo("✅ Shodan integration enabled")
logger.info(f"🕵️ Shodan API key provided (ends with: ...{shodan_key[-4:] if len(shodan_key) > 4 else shodan_key})")
else:
click.echo("Shodan integration disabled (no API key)")
click.echo("⚠️ Shodan integration disabled (no API key)")
if virustotal_key:
click.echo("🛡️ VirusTotal integration enabled")
click.echo("✅ VirusTotal integration enabled")
logger.info(f"🛡️ VirusTotal API key provided (ends with: ...{virustotal_key[-4:] if len(virustotal_key) > 4 else virustotal_key})")
else:
click.echo("VirusTotal integration disabled (no API key)")
click.echo("⚠️ VirusTotal integration disabled (no API key)")
click.echo("")
# Run forensic reconnaissance
# Run reconnaissance
try:
logger.info(f"🎯 Starting forensic reconnaissance for target: {target}")
logger.info(f"🚀 Starting reconnaissance for target: {target}")
data = engine.run_reconnaissance(target)
# Display final statistics
stats = data.get_stats()
graph_analysis = data._generate_graph_analysis()
click.echo("")
click.echo("=" * 80)
click.echo("FORENSIC RECONNAISSANCE COMPLETE")
click.echo("=" * 80)
click.echo(f"Hostnames discovered: {stats['hostnames']}")
click.echo(f"IP addresses found: {stats['ip_addresses']}")
click.echo(f"Discovery relationships: {stats['discovery_edges']}")
click.echo(f"Operations performed: {stats['operations_performed']}")
click.echo(f"DNS records collected: {stats['dns_records']}")
click.echo(f"Certificates found: {stats['certificates_total']} ({stats['certificates_current']} valid, {stats['certificates_expired']} expired)")
click.echo(f"Shodan results: {stats['shodan_results']}")
click.echo(f"VirusTotal results: {stats['virustotal_results']}")
click.echo(f"Maximum discovery depth: {graph_analysis['max_depth']}")
click.echo("=" * 60)
click.echo("📊 RECONNAISSANCE COMPLETE")
click.echo("=" * 60)
click.echo(f"🏠 Hostnames discovered: {stats['hostnames']}")
click.echo(f"🌐 IP addresses found: {stats['ip_addresses']}")
click.echo(f"📋 DNS records collected: {stats['dns_records']}")
click.echo(f"📜 Certificates found: {stats['certificates']}")
click.echo(f"🕵️ Shodan results: {stats['shodan_results']}")
click.echo(f"🛡️ VirusTotal results: {stats['virustotal_results']}")
# Calculate and display timing
if data.end_time and data.start_time:
duration = data.end_time - data.start_time
click.echo(f"Total time: {duration}")
click.echo(f"⏱️ Total time: {duration}")
click.echo("")
# Generate forensic reports
logger.info("📄 Generating forensic reports...")
report_gen = ForensicReportGenerator(data)
# Generate reports
logger.info("📄 Generating reports...")
report_gen = ReportGenerator(data)
if output:
# Save to files
@ -161,7 +137,7 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
with open(json_file, 'w', encoding='utf-8') as f:
f.write(json_content)
saved_files.append(json_file)
logger.info(f"📄 Forensic JSON report saved: {json_file}")
logger.info(f"💾 JSON report saved: {json_file}")
except Exception as e:
logger.error(f"❌ Failed to save JSON report: {e}")
@ -171,14 +147,14 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
with open(text_file, 'w', encoding='utf-8') as f:
f.write(report_gen.generate_text_report())
saved_files.append(text_file)
logger.info(f"📄 Forensic text report saved: {text_file}")
logger.info(f"💾 Text report saved: {text_file}")
except Exception as e:
logger.error(f"❌ Failed to save text report: {e}")
if saved_files:
click.echo(f"📁 Forensic reports saved:")
click.echo(f"💾 Reports saved:")
for file in saved_files:
click.echo(f" {file}")
click.echo(f" 📄 {file}")
else:
# Output to stdout
@ -198,19 +174,18 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only,
# Default: show text report
try:
click.echo(report_gen.generate_text_report())
click.echo(f"\n📊 To get JSON output with full forensic data, use: --json-only")
click.echo(f"💾 To save reports, use: --output filename")
click.echo(f"🌐 For interactive visualization, use: --web")
click.echo(f"\n💡 To get JSON output, use: --json-only")
click.echo(f"💡 To save reports, use: --output filename")
except Exception as e:
logger.error(f"❌ Failed to generate report: {e}")
click.echo(f"Error generating report: {e}")
except KeyboardInterrupt:
logger.warning("⚠️ Forensic reconnaissance interrupted by user")
click.echo("\n🛑 Reconnaissance interrupted by user.")
logger.warning("⚠️ Reconnaissance interrupted by user")
click.echo("\n⚠️ Reconnaissance interrupted by user.")
sys.exit(1)
except Exception as e:
logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True)
logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True)
click.echo(f"❌ Error during reconnaissance: {e}")
if verbose:
raise # Re-raise in verbose mode to show full traceback

View File

@ -1,15 +1,12 @@
# File: src/reconnaissance.py
"""Enhanced reconnaissance logic with forensic-grade operation tracking and provenance."""
"""Main reconnaissance logic with enhanced TLD expansion."""
import threading
import concurrent.futures
import logging
from datetime import datetime
from typing import Set, List, Optional, Tuple, Dict, Any
from .data_structures import (
ForensicReconData, DiscoveryNode, DiscoveryOperation, DiscoveryEdge,
OperationType, DiscoveryMethod, DNSRecord, Certificate
)
from typing import Set, List, Optional, Tuple
from .data_structures import ReconData
from .config import Config
from .dns_resolver import DNSResolver
from .certificate_checker import CertificateChecker
@ -20,8 +17,8 @@ from .tld_fetcher import TLDFetcher
# Set up logging for this module
logger = logging.getLogger(__name__)
class ForensicReconnaissanceEngine:
"""Enhanced reconnaissance engine with complete forensic tracking and provenance."""
class ReconnaissanceEngine:
"""Main reconnaissance engine with smart TLD expansion."""
def __init__(self, config: Config):
self.config = config
@ -35,32 +32,32 @@ class ForensicReconnaissanceEngine:
self.shodan_client = None
if config.shodan_key:
self.shodan_client = ShodanClient(config.shodan_key, config)
logger.info("🕵️ Shodan client initialized")
logger.info("✅ Shodan client initialized")
else:
logger.info("⚠️ Shodan API key not provided, skipping Shodan integration")
self.virustotal_client = None
if config.virustotal_key:
self.virustotal_client = VirusTotalClient(config.virustotal_key, config)
logger.info("🛡️ VirusTotal client initialized")
logger.info("✅ VirusTotal client initialized")
else:
logger.info("⚠️ VirusTotal API key not provided, skipping VirusTotal integration")
# Progress tracking
self.progress_callback = None
self._lock = threading.Lock()
# Track operation mode
self.is_hostname_only_mode = False
# Operation tracking
self.pending_operations = []
self.completed_operations = []
# Shared data object for live updates
self.shared_data = None
def set_progress_callback(self, callback):
"""Set callback for progress updates."""
self.progress_callback = callback
def set_shared_data(self, shared_data: ForensicReconData):
"""Set shared data object for live updates."""
self.data = shared_data
logger.info("📊 Using shared forensic data object for live updates")
def set_shared_data(self, shared_data: ReconData):
"""Set shared data object for live updates during web interface usage."""
self.shared_data = shared_data
logger.info("📊 Using shared data object for live updates")
def _update_progress(self, message: str, percentage: int = None):
"""Update progress if callback is set."""
@ -68,603 +65,336 @@ class ForensicReconnaissanceEngine:
if self.progress_callback:
self.progress_callback(message, percentage)
def run_reconnaissance(self, target: str) -> ForensicReconData:
"""Run forensic reconnaissance with complete operation tracking."""
# Initialize data structure
if not hasattr(self, 'data') or self.data is None:
self.data = ForensicReconData()
logger.info("🔬 Created new forensic data structure")
def run_reconnaissance(self, target: str) -> ReconData:
"""Run full reconnaissance on target."""
# Use shared data object if available, otherwise create new one
if self.shared_data is not None:
self.data = self.shared_data
logger.info("📊 Using shared data object for reconnaissance")
else:
self.data = ReconData()
logger.info("📊 Created new data object for reconnaissance")
self.data.start_time = datetime.now()
self.data.scan_config = {
'target': target,
'max_depth': self.config.max_depth,
'dns_servers': self.config.DNS_SERVERS,
'shodan_enabled': self.shodan_client is not None,
'virustotal_enabled': self.virustotal_client is not None
}
logger.info(f"🎯 Starting forensic reconnaissance for target: {target}")
logger.info(f"🚀 Starting reconnaissance for target: {target}")
logger.info(f"📊 Configuration: max_depth={self.config.max_depth}, "
f"DNS_rate={self.config.DNS_RATE_LIMIT}/s")
try:
# Determine operation mode and create initial operation
# Determine if target is hostname.tld or just hostname
if '.' in target:
self.is_hostname_only_mode = False
logger.info(f"🔍 Full domain mode: {target}")
self._create_initial_operation(target, OperationType.INITIAL_TARGET)
logger.info(f"🎯 Target '{target}' appears to be a full domain name")
self._update_progress(f"Starting reconnaissance for {target}", 0)
self.data.add_hostname(target, 0)
initial_targets = {target}
else:
self.is_hostname_only_mode = True
logger.info(f"🔍 Hostname expansion mode: {target}")
self._perform_tld_expansion(target)
logger.info(f"🔍 Target '{target}' appears to be a hostname, expanding to all TLDs")
self._update_progress(f"Expanding {target} to all TLDs", 5)
initial_targets = self._expand_hostname_to_tlds_smart(target)
logger.info(f"📋 Found {len(initial_targets)} valid domains after TLD expansion")
# Process all discovery operations
self._process_discovery_queue()
self._update_progress("Resolving initial targets", 10)
# Perform external lookups for full domain mode
if not self.is_hostname_only_mode:
# Process all targets recursively
self._process_targets_recursively(initial_targets)
# Final external lookups
self._update_progress("Performing external service lookups", 90)
self._perform_external_lookups()
# Finalize
self.data.end_time = datetime.now()
duration = self.data.end_time - self.data.start_time
# Log final statistics
stats = self.data.get_stats()
logger.info(f"🏁 Forensic reconnaissance complete in {duration}")
logger.info(f"📊 Final stats: {stats}")
logger.info(f"📈 Final statistics: {stats}")
self._update_progress("Forensic reconnaissance complete", 100)
self._update_progress("Reconnaissance complete", 100)
except Exception as e:
logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True)
logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True)
raise
finally:
self.data.end_time = datetime.now()
duration = self.data.end_time - self.data.start_time
logger.info(f"⏱️ Total reconnaissance time: {duration}")
return self.data
def _create_initial_operation(self, target: str, operation_type: OperationType):
"""Create initial operation for the target."""
operation = DiscoveryOperation(
operation_type=operation_type,
target=target,
discovered_hostnames=[target]
)
self.data.add_operation(operation)
# Create initial node
node = self.data.add_node(
hostname=target,
depth=0,
discovery_method=DiscoveryMethod.INITIAL_TARGET,
operation_id=operation.operation_id
)
# Queue initial DNS operations
self._queue_dns_operations_for_hostname(target, 0)
logger.info(f"🔍 Created initial operation for {target}")
def _perform_tld_expansion(self, hostname: str):
"""Perform TLD expansion with forensic tracking."""
logger.info(f"🌐 Starting TLD expansion for: {hostname}")
# Create TLD expansion operation
operation = DiscoveryOperation(
operation_type=OperationType.TLD_EXPANSION,
target=hostname,
metadata={'expansion_type': 'smart_prioritized'}
)
self._update_progress(f"Expanding {hostname} to all TLDs", 5)
def _expand_hostname_to_tlds_smart(self, hostname: str) -> Set[str]:
"""Smart TLD expansion with prioritization and parallel processing."""
logger.info(f"🌐 Starting smart TLD expansion for hostname: {hostname}")
# Get prioritized TLD lists
priority_tlds, normal_tlds, deprioritized_tlds = self.tld_fetcher.get_prioritized_tlds()
logger.info(f"📊 TLD categories: {len(priority_tlds)} priority, "
f"{len(normal_tlds)} normal, {len(deprioritized_tlds)} deprioritized")
valid_domains = set()
# Phase 1: Priority TLDs
logger.info("🎯 Phase 1: Checking priority TLDs...")
priority_results = self._check_tlds_parallel(hostname, priority_tlds, operation.operation_id)
# Phase 1: Check priority TLDs first (parallel processing)
logger.info("🚀 Phase 1: Checking priority TLDs...")
priority_results = self._check_tlds_parallel(hostname, priority_tlds, "priority")
valid_domains.update(priority_results)
# Phase 2: Normal TLDs (if needed)
self._update_progress(f"Phase 1 complete: {len(priority_results)} priority TLD matches", 6)
# Phase 2: Check normal TLDs (if we found fewer than 5 results)
if len(valid_domains) < 5:
logger.info("🔍 Phase 2: Checking normal TLDs...")
normal_results = self._check_tlds_parallel(hostname, normal_tlds, operation.operation_id)
normal_results = self._check_tlds_parallel(hostname, normal_tlds, "normal")
valid_domains.update(normal_results)
# Phase 3: Deprioritized TLDs (if really needed)
self._update_progress(f"Phase 2 complete: {len(normal_results)} normal TLD matches", 8)
else:
logger.info(f"⏭️ Skipping normal TLDs (found {len(valid_domains)} matches in priority)")
# Phase 3: Check deprioritized TLDs only if we found very few results
if len(valid_domains) < 2:
logger.info("🔎 Phase 3: Checking deprioritized TLDs...")
depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, operation.operation_id)
logger.info("🔍 Phase 3: Checking deprioritized TLDs (limited results so far)...")
depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, "deprioritized")
valid_domains.update(depri_results)
# Update operation with results
operation.discovered_hostnames = list(valid_domains)
operation.success = len(valid_domains) > 0
self.data.add_operation(operation)
self._update_progress(f"Phase 3 complete: {len(depri_results)} deprioritized TLD matches", 9)
else:
logger.info(f"⏭️ Skipping deprioritized TLDs (found {len(valid_domains)} matches already)")
# Create nodes for all discovered domains
for domain in valid_domains:
node = self.data.add_node(
hostname=domain,
depth=0,
discovery_method=DiscoveryMethod.TLD_EXPANSION,
operation_id=operation.operation_id
)
logger.info(f"🎯 Smart TLD expansion complete: found {len(valid_domains)} valid domains")
return valid_domains
# Add discovery edge (synthetic source for TLD expansion)
self.data.add_edge(
source=f"tld_expansion:{hostname}",
target=domain,
discovery_method=DiscoveryMethod.TLD_EXPANSION,
operation_id=operation.operation_id,
metadata={'original_hostname': hostname}
)
logger.info(f"✅ TLD expansion complete: {len(valid_domains)} domains found")
# Queue lightweight operations for hostname-only mode
for domain in valid_domains:
self._queue_lightweight_operations(domain)
def _check_tlds_parallel(self, hostname: str, tlds: List[str], operation_id: str) -> Set[str]:
"""Check TLDs in parallel with operation tracking."""
def _check_tlds_parallel(self, hostname: str, tlds: List[str], phase_name: str) -> Set[str]:
"""Check TLDs in parallel with optimized settings."""
valid_domains = set()
max_workers = min(20, len(tlds))
tested_count = 0
wildcard_detected = set()
# Use thread pool for parallel processing
max_workers = min(20, len(tlds)) # Limit concurrent requests
logger.info(f"⚡ Starting parallel check of {len(tlds)} {phase_name} TLDs "
f"with {max_workers} workers")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_tld = {
executor.submit(self._check_single_tld_forensic, hostname, tld, operation_id): tld
executor.submit(self._check_single_tld, hostname, tld): tld
for tld in tlds
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_tld):
tld = future_to_tld[future]
tested_count += 1
try:
result = future.result(timeout=10)
result = future.result(timeout=10) # 10 second timeout per future
if result:
domain, ips = result
valid_domains.add(domain)
full_hostname, ips = result
logger.info(f"✅ Valid domain found: {full_hostname} -> {ips}")
self.data.add_hostname(full_hostname, 0)
valid_domains.add(full_hostname)
# Track discovered IPs
for ip in ips:
self.data.ip_addresses.add(ip)
self.data.add_ip_address(ip)
# Progress update every 50 TLDs in this phase
if tested_count % 50 == 0:
logger.info(f"📊 {phase_name.title()} phase progress: "
f"{tested_count}/{len(tlds)} tested, "
f"{len(valid_domains)} found")
except concurrent.futures.TimeoutError:
logger.debug(f"⏱️ Timeout checking {hostname}.{tld}")
except Exception as e:
logger.debug(f"Error checking {hostname}.{tld}: {e}")
logger.debug(f"⚠️ Error checking {hostname}.{tld}: {e}")
logger.info(f"📊 {phase_name.title()} phase complete: "
f"tested {tested_count} TLDs, found {len(valid_domains)} valid domains, "
f"detected {len(wildcard_detected)} wildcards")
return valid_domains
def _check_single_tld_forensic(self, hostname: str, tld: str, operation_id: str) -> Optional[Tuple[str, List[str]]]:
"""Check single TLD with forensic tracking."""
def _check_single_tld(self, hostname: str, tld: str) -> Optional[Tuple[str, List[str]]]:
"""Check a single TLD combination with optimized DNS resolution."""
full_hostname = f"{hostname}.{tld}"
# Quick DNS resolution
# Use faster DNS resolution with shorter timeout for TLD testing
ips = self.dns_resolver.resolve_hostname_fast(full_hostname)
if ips:
# Create DNS operation record
dns_operation = DiscoveryOperation(
operation_type=OperationType.DNS_A,
target=full_hostname,
discovered_ips=ips,
metadata={'tld_expansion': True, 'parent_operation': operation_id}
)
# Add DNS records
for ip in ips:
record = DNSRecord(
record_type='A',
value=ip,
operation_id=dns_operation.operation_id
)
dns_operation.dns_records.append(record)
self.data.add_operation(dns_operation)
logger.debug(f"{full_hostname} -> {ips}")
return (full_hostname, ips)
return None
def _queue_dns_operations_for_hostname(self, hostname: str, depth: int):
"""Queue comprehensive DNS operations for a hostname."""
# Map DNS record types to operation types
dns_operations = [
(OperationType.DNS_A, 'A'),
(OperationType.DNS_AAAA, 'AAAA'),
(OperationType.DNS_MX, 'MX'),
(OperationType.DNS_NS, 'NS'),
(OperationType.DNS_TXT, 'TXT'),
(OperationType.DNS_CNAME, 'CNAME'),
(OperationType.DNS_SOA, 'SOA'),
(OperationType.DNS_SRV, 'SRV'),
(OperationType.DNS_CAA, 'CAA'),
]
def _process_targets_recursively(self, targets: Set[str]):
"""Process targets with recursive subdomain discovery."""
current_depth = 0
for operation_type, record_type in dns_operations:
self.pending_operations.append({
'type': 'dns_query',
'operation_type': operation_type,
'record_type': record_type,
'hostname': hostname,
'depth': depth
})
while current_depth <= self.config.max_depth and targets:
logger.info(f"🔄 Processing depth {current_depth} with {len(targets)} targets")
self._update_progress(f"Processing depth {current_depth} ({len(targets)} targets)", 15 + (current_depth * 25))
# Queue certificate operation
self.pending_operations.append({
'type': 'certificate_check',
'operation_type': OperationType.CERTIFICATE_CHECK,
'hostname': hostname,
'depth': depth
})
new_targets = set()
logger.debug(f"📋 Queued {len(dns_operations) + 1} operations for {hostname}")
for target in targets:
logger.debug(f"🎯 Processing target: {target}")
def _queue_lightweight_operations(self, hostname: str):
"""Queue lightweight operations for hostname-only mode."""
# Only essential DNS operations
essential_operations = [
(OperationType.DNS_A, 'A'),
(OperationType.DNS_AAAA, 'AAAA'),
(OperationType.DNS_MX, 'MX'),
(OperationType.DNS_TXT, 'TXT'),
]
# DNS resolution and record gathering
self._process_single_target(target, current_depth)
for operation_type, record_type in essential_operations:
self.pending_operations.append({
'type': 'dns_query',
'operation_type': operation_type,
'record_type': record_type,
'hostname': hostname,
'depth': 0
})
# Extract new subdomains
if current_depth < self.config.max_depth:
new_subdomains = self._extract_new_subdomains(target)
logger.debug(f"🌿 Found {len(new_subdomains)} new subdomains from {target}")
logger.debug(f"📋 Queued {len(essential_operations)} lightweight operations for {hostname}")
for subdomain in new_subdomains:
self.data.add_hostname(subdomain, current_depth + 1)
new_targets.add(subdomain)
def _process_discovery_queue(self):
"""Process all queued discovery operations."""
operation_count = 0
total_operations = len(self.pending_operations)
logger.info(f"📊 Depth {current_depth} complete. Found {len(new_targets)} new targets for next depth")
targets = new_targets
current_depth += 1
logger.info(f"⚙️ Processing {total_operations} operations")
logger.info(f"🏁 Recursive processing complete after {current_depth} levels")
while self.pending_operations:
operation_spec = self.pending_operations.pop(0)
operation_count += 1
def _process_single_target(self, hostname: str, depth: int):
"""Process a single target hostname."""
logger.debug(f"🎯 Processing single target: {hostname} at depth {depth}")
# Calculate progress
progress = 20 + (operation_count * 60 // max(total_operations, 1))
# Get all DNS records
dns_records = self.dns_resolver.get_all_dns_records(hostname)
logger.debug(f"📋 Found {len(dns_records)} DNS records for {hostname}")
try:
if operation_spec['type'] == 'dns_query':
self._execute_dns_operation(operation_spec)
elif operation_spec['type'] == 'certificate_check':
self._execute_certificate_operation(operation_spec)
for record in dns_records:
self.data.add_dns_record(hostname, record)
# Update progress periodically
if operation_count % 10 == 0:
self._update_progress(f"Processed {operation_count}/{total_operations} operations", progress)
except Exception as e:
logger.error(f"❌ Error processing operation {operation_spec}: {e}")
continue
logger.info(f"✅ Completed processing {operation_count} operations")
def _execute_dns_operation(self, operation_spec: Dict[str, Any]):
"""Execute a single DNS operation with full tracking."""
hostname = operation_spec['hostname']
record_type = operation_spec['record_type']
operation_type = operation_spec['operation_type']
depth = operation_spec['depth']
logger.debug(f"🔍 DNS {record_type} query for {hostname}")
# Create operation
operation = DiscoveryOperation(
operation_type=operation_type,
target=hostname,
metadata={'record_type': record_type, 'depth': depth}
)
try:
# Perform DNS query using the updated method with operation_id
records = self.dns_resolver.query_specific_record_type(hostname, record_type, operation.operation_id)
operation.dns_records = records
operation.success = len(records) > 0
# Process results
node = self.data.get_node(hostname)
if not node:
node = self.data.add_node(hostname, depth)
new_hostnames = set()
new_ips = set()
for record in records:
node.add_dns_record(record)
# Extract IPs and hostnames
# Extract IP addresses from A and AAAA records
if record.record_type in ['A', 'AAAA']:
new_ips.add(record.value)
self.data.ip_addresses.add(record.value)
elif record.record_type in ['MX', 'NS', 'CNAME']:
# Extract hostname from MX (format: "priority hostname")
if record.record_type == 'MX':
parts = record.value.split()
if len(parts) >= 2:
extracted_hostname = parts[-1].rstrip('.')
self.data.add_ip_address(record.value)
# Get certificates
logger.debug(f"🔍 Checking certificates for {hostname}")
certificates = self.cert_checker.get_certificates(hostname)
if certificates:
self.data.certificates[hostname] = certificates
logger.info(f"📜 Found {len(certificates)} certificates for {hostname}")
else:
continue
else:
extracted_hostname = record.value.rstrip('.')
logger.debug(f"❌ No certificates found for {hostname}")
if self._is_valid_hostname(extracted_hostname) and extracted_hostname != hostname:
new_hostnames.add(extracted_hostname)
def _extract_new_subdomains(self, hostname: str) -> Set[str]:
"""Extract new subdomains from DNS records and certificates."""
new_subdomains = set()
# Update operation results
operation.discovered_hostnames = list(new_hostnames)
operation.discovered_ips = list(new_ips)
# Add discovery edges and queue new operations
for new_hostname in new_hostnames:
# Add node
new_node = self.data.add_node(
hostname=new_hostname,
depth=depth + 1,
discovery_method=DiscoveryMethod.DNS_RECORD_VALUE,
operation_id=operation.operation_id
# From DNS records
if hostname in self.data.dns_records:
dns_subdomains = self.dns_resolver.extract_subdomains_from_dns(
self.data.dns_records[hostname]
)
new_subdomains.update(dns_subdomains)
logger.debug(f"🌐 Extracted {len(dns_subdomains)} subdomains from DNS records of {hostname}")
# Add edge
self.data.add_edge(
source=hostname,
target=new_hostname,
discovery_method=DiscoveryMethod.DNS_RECORD_VALUE,
operation_id=operation.operation_id,
metadata={'record_type': record_type}
# From certificates
if hostname in self.data.certificates:
cert_subdomains = self.cert_checker.extract_subdomains_from_certificates(
self.data.certificates[hostname]
)
new_subdomains.update(cert_subdomains)
logger.debug(f"🔍 Extracted {len(cert_subdomains)} subdomains from certificates of {hostname}")
# Queue operations for new hostname (if not in hostname-only mode and within depth)
if not self.is_hostname_only_mode and depth + 1 <= self.config.max_depth:
self._queue_dns_operations_for_hostname(new_hostname, depth + 1)
# Filter out already known hostnames
filtered_subdomains = new_subdomains - self.data.hostnames
logger.debug(f"🆕 {len(filtered_subdomains)} new subdomains after filtering")
logger.debug(f"✅ DNS {record_type} for {hostname}: {len(records)} records, {len(new_hostnames)} new hostnames")
except Exception as e:
operation.success = False
operation.error_message = str(e)
logger.debug(f"❌ DNS {record_type} query failed for {hostname}: {e}")
self.data.add_operation(operation)
def _execute_certificate_operation(self, operation_spec: Dict[str, Any]):
"""Execute certificate check operation."""
hostname = operation_spec['hostname']
depth = operation_spec['depth']
# Skip certificates in hostname-only mode
if self.is_hostname_only_mode:
logger.debug(f"⭐ Skipping certificate check for {hostname} (hostname-only mode)")
return
logger.debug(f"🔍 Certificate check for {hostname}")
operation = DiscoveryOperation(
operation_type=OperationType.CERTIFICATE_CHECK,
target=hostname,
metadata={'depth': depth}
)
try:
# Use updated method with operation_id
certificates = self.cert_checker.get_certificates(hostname, operation.operation_id)
operation.certificates = certificates
operation.success = len(certificates) > 0
# Process certificates
node = self.data.get_node(hostname)
if node:
new_hostnames = set()
for cert in certificates:
node.add_certificate(cert)
# Extract hostnames from certificate subjects
subject_hostnames = self.cert_checker.extract_subdomains_from_certificates([cert])
for subject_hostname in subject_hostnames:
if subject_hostname != hostname and self._is_valid_hostname(subject_hostname):
new_hostnames.add(subject_hostname)
# Update operation and create edges
operation.discovered_hostnames = list(new_hostnames)
for new_hostname in new_hostnames:
# Add node
new_node = self.data.add_node(
hostname=new_hostname,
depth=depth + 1,
discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT,
operation_id=operation.operation_id
)
# Add edge
self.data.add_edge(
source=hostname,
target=new_hostname,
discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT,
operation_id=operation.operation_id
)
# Queue operations for new hostname (within depth limit)
if depth + 1 <= self.config.max_depth:
self._queue_dns_operations_for_hostname(new_hostname, depth + 1)
logger.debug(f"✅ Certificates for {hostname}: {len(certificates)} certs, {len(new_hostnames)} new hostnames")
except Exception as e:
operation.success = False
operation.error_message = str(e)
logger.debug(f"❌ Certificate check failed for {hostname}: {e}")
self.data.add_operation(operation)
return filtered_subdomains
def _perform_external_lookups(self):
"""Perform external service lookups with operation tracking."""
if self.is_hostname_only_mode:
logger.info("⭐ Skipping external lookups (hostname-only mode)")
return
"""Perform Shodan and VirusTotal lookups."""
logger.info(f"🔍 Starting external lookups for {len(self.data.ip_addresses)} IPs and {len(self.data.hostnames)} hostnames")
self._update_progress("Performing external service lookups", 85)
# Reverse DNS for all IPs
logger.info("🔄 Performing reverse DNS lookups")
reverse_dns_count = 0
for ip in self.data.ip_addresses:
reverse = self.dns_resolver.reverse_dns_lookup(ip)
if reverse:
self.data.reverse_dns[ip] = reverse
reverse_dns_count += 1
logger.debug(f"🔙 Reverse DNS for {ip}: {reverse}")
# Reverse DNS
self._perform_reverse_dns_lookups()
logger.info(f"✅ Completed reverse DNS: {reverse_dns_count}/{len(self.data.ip_addresses)} successful")
# Shodan lookups
if self.shodan_client:
self._perform_shodan_lookups()
logger.info(f"🕵️ Starting Shodan lookups for {len(self.data.ip_addresses)} IPs")
shodan_success_count = 0
for ip in self.data.ip_addresses:
try:
logger.debug(f"🔍 Querying Shodan for IP: {ip}")
result = self.shodan_client.lookup_ip(ip)
if result:
self.data.add_shodan_result(ip, result)
shodan_success_count += 1
logger.info(f"✅ Shodan result for {ip}: {len(result.ports)} ports")
else:
logger.debug(f"❌ No Shodan data for {ip}")
except Exception as e:
logger.warning(f"⚠️ Error querying Shodan for {ip}: {e}")
logger.info(f"✅ Shodan lookups complete: {shodan_success_count}/{len(self.data.ip_addresses)} successful")
else:
logger.info("⚠️ Skipping Shodan lookups (no API key)")
# VirusTotal lookups
if self.virustotal_client:
self._perform_virustotal_lookups()
def _perform_reverse_dns_lookups(self):
"""Perform reverse DNS lookups with operation tracking."""
logger.info(f"🔄 Performing reverse DNS for {len(self.data.ip_addresses)} IPs")
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.DNS_REVERSE,
target=ip
)
try:
# Use updated method with operation_id
reverse_hostname = self.dns_resolver.reverse_dns_lookup(ip, operation.operation_id)
if reverse_hostname:
operation.discovered_hostnames = [reverse_hostname]
operation.success = True
# Update nodes that have this IP
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.reverse_dns = reverse_hostname
logger.debug(f"🔄 Reverse DNS {ip} -> {reverse_hostname}")
else:
operation.success = False
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
def _perform_shodan_lookups(self):
"""Perform Shodan lookups with operation tracking."""
logger.info(f"🕵️ Performing Shodan lookups for {len(self.data.ip_addresses)} IPs")
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.SHODAN_LOOKUP,
target=ip
)
try:
# Use updated method with operation_id
result = self.shodan_client.lookup_ip(ip, operation.operation_id)
if result:
operation.shodan_results = [result]
operation.success = True
# Add to relevant nodes
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.shodan_results.append(result)
logger.debug(f"🕵️ Shodan {ip}: {len(result.ports)} ports")
else:
operation.success = False
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
def _perform_virustotal_lookups(self):
"""Perform VirusTotal lookups with operation tracking."""
total_resources = len(self.data.ip_addresses) + len(self.data.nodes)
logger.info(f"🛡️ Performing VirusTotal lookups for {total_resources} resources")
total_resources = len(self.data.ip_addresses) + len(self.data.hostnames)
logger.info(f"🛡️ Starting VirusTotal lookups for {total_resources} resources")
vt_success_count = 0
# Check IPs
for ip in self.data.ip_addresses:
operation = DiscoveryOperation(
operation_type=OperationType.VIRUSTOTAL_IP,
target=ip
)
try:
# Use updated method with operation_id
result = self.virustotal_client.lookup_ip(ip, operation.operation_id)
logger.debug(f"🔍 Querying VirusTotal for IP: {ip}")
result = self.virustotal_client.lookup_ip(ip)
if result:
operation.virustotal_results = [result]
operation.success = True
# Add to relevant nodes
for node in self.data.nodes.values():
if ip in node.resolved_ips:
node.virustotal_results.append(result)
logger.debug(f"🛡️ VirusTotal {ip}: {result.positives}/{result.total}")
self.data.add_virustotal_result(ip, result)
vt_success_count += 1
logger.info(f"🛡️ VirusTotal result for {ip}: {result.positives}/{result.total} detections")
else:
operation.success = False
logger.debug(f"❌ No VirusTotal data for {ip}")
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.data.add_operation(operation)
logger.warning(f"⚠️ Error querying VirusTotal for IP {ip}: {e}")
# Check domains
for hostname, node in self.data.nodes.items():
operation = DiscoveryOperation(
operation_type=OperationType.VIRUSTOTAL_DOMAIN,
target=hostname
)
for hostname in self.data.hostnames:
try:
# Use updated method with operation_id
result = self.virustotal_client.lookup_domain(hostname, operation.operation_id)
logger.debug(f"🔍 Querying VirusTotal for domain: {hostname}")
result = self.virustotal_client.lookup_domain(hostname)
if result:
operation.virustotal_results = [result]
operation.success = True
node.virustotal_results.append(result)
logger.debug(f"🛡️ VirusTotal {hostname}: {result.positives}/{result.total}")
self.data.add_virustotal_result(hostname, result)
vt_success_count += 1
logger.info(f"🛡️ VirusTotal result for {hostname}: {result.positives}/{result.total} detections")
else:
operation.success = False
logger.debug(f"❌ No VirusTotal data for {hostname}")
except Exception as e:
operation.success = False
operation.error_message = str(e)
logger.warning(f"⚠️ Error querying VirusTotal for domain {hostname}: {e}")
self.data.add_operation(operation)
logger.info(f"✅ VirusTotal lookups complete: {vt_success_count}/{total_resources} successful")
else:
logger.info("⚠️ Skipping VirusTotal lookups (no API key)")
def _is_valid_hostname(self, hostname: str) -> bool:
"""Validate hostname format."""
if not hostname or '.' not in hostname or len(hostname) > 255:
return False
# Final external lookup summary
ext_stats = {
'reverse_dns': len(self.data.reverse_dns),
'shodan_results': len(self.data.shodan_results),
'virustotal_results': len(self.data.virustotal_results)
}
logger.info(f"📊 External lookups summary: {ext_stats}")
# Basic validation
parts = hostname.split('.')
if len(parts) < 2:
return False
for part in parts:
if not part or len(part) > 63:
return False
return True
# Backward compatibility
ReconnaissanceEngine = ForensicReconnaissanceEngine
# Keep the original method name for backward compatibility
def _expand_hostname_to_tlds(self, hostname: str) -> Set[str]:
"""Legacy method - redirects to smart expansion."""
return self._expand_hostname_to_tlds_smart(hostname)

View File

@ -1,451 +1,111 @@
# File: src/report_generator.py
"""Enhanced report generation with forensic details and discovery graph visualization."""
"""Generate reports from reconnaissance data."""
from datetime import datetime
from typing import Dict, Any, List, Set
from .data_structures import ForensicReconData, DiscoveryMethod, OperationType
import logging
from typing import Dict, Any
from .data_structures import ReconData
logger = logging.getLogger(__name__)
class ReportGenerator:
"""Generate various report formats."""
class ForensicReportGenerator:
"""Generate comprehensive forensic reports with discovery provenance."""
def __init__(self, data: ForensicReconData):
def __init__(self, data: ReconData):
self.data = data
def generate_text_report(self) -> str:
"""Generate comprehensive forensic text report."""
"""Generate comprehensive text report."""
report = []
# Header
report.append("="*80)
report.append("FORENSIC DNS RECONNAISSANCE REPORT")
report.append("DNS RECONNAISSANCE REPORT")
report.append("="*80)
report.append(f"Scan Start: {self.data.start_time}")
report.append(f"Start Time: {self.data.start_time}")
report.append(f"End Time: {self.data.end_time}")
if self.data.end_time:
report.append(f"Scan End: {self.data.end_time}")
duration = self.data.end_time - self.data.start_time
report.append(f"Duration: {duration}")
report.append(f"Target: {self.data.scan_config.get('target', 'Unknown')}")
report.append(f"Max Depth: {self.data.scan_config.get('max_depth', 'Unknown')}")
report.append("")
# Executive Summary
report.append("EXECUTIVE SUMMARY")
# Summary
report.append("SUMMARY")
report.append("-" * 40)
stats = self.data.get_stats()
report.append(f"Discovered Hostnames: {stats['hostnames']}")
report.append(f"IP Addresses Found: {stats['ip_addresses']}")
report.append(f"Operations Performed: {stats['operations_performed']}")
report.append(f"Discovery Relationships: {stats['discovery_edges']}")
report.append(f"DNS Records Collected: {stats['dns_records']}")
report.append(f"Total Certificates: {stats['certificates_total']}")
report.append(f" └─ Currently Valid: {stats['certificates_current']}")
report.append(f" └─ Expired: {stats['certificates_expired']}")
report.append(f"Shodan Results: {stats['shodan_results']}")
report.append(f"VirusTotal Results: {stats['virustotal_results']}")
report.append(f"Total Hostnames Discovered: {len(self.data.hostnames)}")
report.append(f"Total IP Addresses Found: {len(self.data.ip_addresses)}")
report.append(f"Total DNS Records: {sum(len(records) for records in self.data.dns_records.values())}")
report.append(f"Total Certificates Found: {sum(len(certs) for certs in self.data.certificates.values())}")
report.append("")
# Discovery Graph Analysis
graph_analysis = self.data._generate_graph_analysis()
report.append("DISCOVERY GRAPH ANALYSIS")
# Hostnames by depth
report.append("HOSTNAMES BY DISCOVERY DEPTH")
report.append("-" * 40)
report.append(f"Maximum Discovery Depth: {graph_analysis['max_depth']}")
report.append(f"Root Nodes (Initial Targets): {len(graph_analysis['root_nodes'])}")
report.append(f"Leaf Nodes (No Further Discoveries): {len(graph_analysis['leaf_nodes'])}")
depth_groups = {}
for hostname, depth in self.data.depth_map.items():
if depth not in depth_groups:
depth_groups[depth] = []
depth_groups[depth].append(hostname)
for depth in sorted(depth_groups.keys()):
report.append(f"Depth {depth}: {len(depth_groups[depth])} hostnames")
for hostname in sorted(depth_groups[depth]):
report.append(f" - {hostname}")
report.append("")
# Depth Distribution
report.append("Discovery Depth Distribution:")
for depth, count in sorted(graph_analysis['depth_distribution'].items()):
report.append(f" Depth {depth}: {count} hostnames")
report.append("")
# Discovery Methods Distribution
report.append("Discovery Methods Used:")
for method, count in sorted(graph_analysis['discovery_method_distribution'].items()):
report.append(f" {method}: {count} discoveries")
report.append("")
# Discovery Tree
report.append("DISCOVERY TREE")
# IP Addresses
report.append("IP ADDRESSES")
report.append("-" * 40)
report.extend(self._generate_discovery_tree())
for ip in sorted(self.data.ip_addresses):
report.append(f"{ip}")
if ip in self.data.reverse_dns:
report.append(f" Reverse DNS: {self.data.reverse_dns[ip]}")
if ip in self.data.shodan_results:
shodan = self.data.shodan_results[ip]
report.append(f" Shodan: {len(shodan.ports)} open ports")
if shodan.organization:
report.append(f" Organization: {shodan.organization}")
if shodan.country:
report.append(f" Country: {shodan.country}")
report.append("")
# Detailed Node Analysis
report.append("DETAILED NODE ANALYSIS")
# DNS Records
report.append("DNS RECORDS")
report.append("-" * 40)
report.extend(self._generate_node_details())
for hostname in sorted(self.data.dns_records.keys()):
report.append(f"{hostname}:")
records_by_type = {}
for record in self.data.dns_records[hostname]:
if record.record_type not in records_by_type:
records_by_type[record.record_type] = []
records_by_type[record.record_type].append(record)
for record_type in sorted(records_by_type.keys()):
report.append(f" {record_type}:")
for record in records_by_type[record_type]:
report.append(f" {record.value}")
report.append("")
# Operations Timeline
report.append("OPERATIONS TIMELINE")
# Certificates
if self.data.certificates:
report.append("CERTIFICATES")
report.append("-" * 40)
report.extend(self._generate_operations_timeline())
for hostname in sorted(self.data.certificates.keys()):
report.append(f"{hostname}:")
for cert in self.data.certificates[hostname]:
report.append(f" Certificate ID: {cert.id}")
report.append(f" Issuer: {cert.issuer}")
report.append(f" Valid From: {cert.not_before}")
report.append(f" Valid Until: {cert.not_after}")
if cert.is_wildcard:
report.append(f" Type: Wildcard Certificate")
report.append("")
# Security Analysis
security_findings = self._analyze_security_findings()
if security_findings:
if self.data.virustotal_results:
report.append("SECURITY ANALYSIS")
report.append("-" * 40)
report.extend(security_findings)
report.append("")
# Certificate Analysis
cert_analysis = self._analyze_certificates()
if cert_analysis:
report.append("CERTIFICATE ANALYSIS")
report.append("-" * 40)
report.extend(cert_analysis)
report.append("")
# DNS Record Analysis
report.append("DNS RECORD ANALYSIS")
report.append("-" * 40)
report.extend(self._analyze_dns_records())
report.append("")
for resource, result in self.data.virustotal_results.items():
if result.positives > 0:
report.append(f"⚠️ {resource}: {result.positives}/{result.total} detections")
report.append(f" Scan Date: {result.scan_date}")
report.append(f" Report: {result.permalink}")
return "\n".join(report)
def _generate_discovery_tree(self) -> List[str]:
"""Generate a tree view of hostname discoveries."""
tree_lines = []
# Find root nodes
graph_analysis = self.data._generate_graph_analysis()
root_nodes = graph_analysis['root_nodes']
if not root_nodes:
tree_lines.append("No root nodes found")
return tree_lines
# Generate tree for each root
for root in sorted(root_nodes):
tree_lines.extend(self._build_tree_branch(root, "", set()))
return tree_lines
def _build_tree_branch(self, hostname: str, prefix: str, visited: Set[str]) -> List[str]:
"""Build a tree branch for a hostname."""
lines = []
# Avoid cycles
if hostname in visited:
lines.append(f"{prefix}{hostname} [CYCLE]")
return lines
visited.add(hostname)
# Get node info
node = self.data.get_node(hostname)
if not node:
lines.append(f"{prefix}{hostname} [NO NODE DATA]")
return lines
# Node info
node_info = f"{hostname} (depth:{node.depth}"
if node.resolved_ips:
node_info += f", IPs:{len(node.resolved_ips)}"
if node.certificates:
valid_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
node_info += f", certs:{valid_certs}+{expired_certs}"
node_info += ")"
lines.append(f"{prefix}{node_info}")
# Get children
children = self.data.get_children(hostname)
children.sort()
for i, child in enumerate(children):
is_last = (i == len(children) - 1)
child_prefix = prefix + ("└── " if is_last else "├── ")
next_prefix = prefix + (" " if is_last else "")
# Find discovery method for this child
discovery_method = "unknown"
for edge in self.data.edges:
if edge.source_hostname == hostname and edge.target_hostname == child:
discovery_method = edge.discovery_method.value
break
lines.append(f"{child_prefix}[{discovery_method}]")
lines.extend(self._build_tree_branch(child, next_prefix, visited.copy()))
return lines
def _generate_node_details(self) -> List[str]:
"""Generate detailed analysis of each node."""
details = []
# Sort nodes by depth, then alphabetically
sorted_nodes = sorted(self.data.nodes.items(),
key=lambda x: (x[1].depth, x[0]))
for hostname, node in sorted_nodes:
details.append(f"\n{hostname} (Depth {node.depth})")
details.append("-" * (len(hostname) + 20))
# Discovery provenance
details.append(f"First Seen: {node.first_seen}")
details.append(f"Last Updated: {node.last_updated}")
details.append(f"Discovery Methods: {', '.join(m.value for m in node.discovery_methods)}")
# Discovery paths
paths = self.data.get_discovery_path(hostname)
if paths:
details.append("Discovery Paths:")
for i, path in enumerate(paths[:3]): # Show max 3 paths
path_str = " -> ".join([f"{src}[{method.value}]{tgt}" for src, tgt, method in path])
details.append(f" Path {i+1}: {path_str}")
if len(paths) > 3:
details.append(f" ... and {len(paths) - 3} more paths")
# DNS status
if node.dns_exists is not None:
status = "EXISTS" if node.dns_exists else "NOT FOUND"
details.append(f"DNS Status: {status} (checked: {node.last_dns_check})")
# IP addresses
if node.resolved_ips:
details.append(f"Resolved IPs: {', '.join(sorted(node.resolved_ips))}")
# Reverse DNS
if node.reverse_dns:
details.append(f"Reverse DNS: {node.reverse_dns}")
# DNS records summary
total_records = len(node.get_all_dns_records())
if total_records > 0:
record_types = list(node.dns_records_by_type.keys())
details.append(f"DNS Records: {total_records} records ({', '.join(sorted(record_types))})")
# Certificates summary
current_certs = len(node.get_current_certificates())
expired_certs = len(node.get_expired_certificates())
if current_certs > 0 or expired_certs > 0:
details.append(f"Certificates: {current_certs} valid, {expired_certs} expired")
# External results
if node.shodan_results:
details.append(f"Shodan: {len(node.shodan_results)} results")
if node.virustotal_results:
vt_detections = sum(r.positives for r in node.virustotal_results)
details.append(f"VirusTotal: {len(node.virustotal_results)} scans, {vt_detections} total detections")
return details
def _generate_operations_timeline(self) -> List[str]:
"""Generate operations timeline."""
timeline = []
# Sort operations by timestamp
sorted_ops = []
for op_id in self.data.operation_timeline:
if op_id in self.data.operations:
sorted_ops.append(self.data.operations[op_id])
# Group operations by type for summary
op_summary = {}
for op in sorted_ops:
op_type = op.operation_type.value
if op_type not in op_summary:
op_summary[op_type] = {'total': 0, 'successful': 0, 'failed': 0}
op_summary[op_type]['total'] += 1
if op.success:
op_summary[op_type]['successful'] += 1
else:
op_summary[op_type]['failed'] += 1
# Operations summary
timeline.append("Operations Summary:")
for op_type, counts in sorted(op_summary.items()):
timeline.append(f" {op_type}: {counts['successful']}/{counts['total']} successful")
timeline.append("")
# Recent operations (last 20)
timeline.append("Recent Operations (last 20):")
recent_ops = sorted_ops[-20:] if len(sorted_ops) > 20 else sorted_ops
for op in recent_ops:
timestamp = op.timestamp.strftime("%H:%M:%S.%f")[:-3]
status = "" if op.success else ""
target_short = op.target[:30] + "..." if len(op.target) > 30 else op.target
timeline.append(f" {timestamp} {status} {op.operation_type.value:15} {target_short}")
# Show key results
if op.discovered_hostnames:
hostname_list = ", ".join(op.discovered_hostnames[:3])
if len(op.discovered_hostnames) > 3:
hostname_list += f" (+{len(op.discovered_hostnames) - 3} more)"
timeline.append(f" └─ Discovered: {hostname_list}")
if op.error_message:
timeline.append(f" └─ Error: {op.error_message[:50]}...")
return timeline
def _analyze_security_findings(self) -> List[str]:
"""Analyze security-related findings."""
findings = []
# VirusTotal detections
high_risk_resources = []
medium_risk_resources = []
for node in self.data.nodes.values():
for vt_result in node.virustotal_results:
if vt_result.positives > 5:
high_risk_resources.append((node.hostname, vt_result))
elif vt_result.positives > 0:
medium_risk_resources.append((node.hostname, vt_result))
if high_risk_resources:
findings.append("🚨 HIGH RISK FINDINGS:")
for hostname, vt_result in high_risk_resources:
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
findings.append(f" Report: {vt_result.permalink}")
if medium_risk_resources:
findings.append("⚠️ MEDIUM RISK FINDINGS:")
for hostname, vt_result in medium_risk_resources[:5]: # Show max 5
findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections")
if len(medium_risk_resources) > 5:
findings.append(f" ... and {len(medium_risk_resources) - 5} more resources with detections")
# Expired certificates still in use
nodes_with_expired_certs = []
for hostname, node in self.data.nodes.items():
expired = node.get_expired_certificates()
current = node.get_current_certificates()
if expired and not current: # Only expired certs, no valid ones
nodes_with_expired_certs.append((hostname, len(expired)))
if nodes_with_expired_certs:
findings.append("📜 CERTIFICATE ISSUES:")
for hostname, count in nodes_with_expired_certs:
findings.append(f" {hostname}: {count} expired certificates, no valid ones")
return findings
def _analyze_certificates(self) -> List[str]:
"""Analyze certificate findings."""
cert_analysis = []
# Certificate statistics
total_certs = 0
valid_certs = 0
expired_certs = 0
wildcard_certs = 0
cert_authorities = {}
for node in self.data.nodes.values():
for cert in node.certificates:
total_certs += 1
if cert.is_valid_now:
valid_certs += 1
else:
expired_certs += 1
if cert.is_wildcard:
wildcard_certs += 1
# Count certificate authorities
issuer_short = cert.issuer.split(',')[0] if ',' in cert.issuer else cert.issuer
cert_authorities[issuer_short] = cert_authorities.get(issuer_short, 0) + 1
if total_certs == 0:
cert_analysis.append("No certificates found.")
return cert_analysis
cert_analysis.append(f"Total Certificates: {total_certs}")
cert_analysis.append(f" Currently Valid: {valid_certs}")
cert_analysis.append(f" Expired: {expired_certs}")
cert_analysis.append(f" Wildcard Certificates: {wildcard_certs}")
cert_analysis.append("")
# Top certificate authorities
cert_analysis.append("Certificate Authorities:")
sorted_cas = sorted(cert_authorities.items(), key=lambda x: x[1], reverse=True)
for ca, count in sorted_cas[:5]:
cert_analysis.append(f" {ca}: {count} certificates")
cert_analysis.append("")
# Expiring soon (within 30 days)
from datetime import timedelta
soon = datetime.now() + timedelta(days=30)
expiring_soon = []
for hostname, node in self.data.nodes.items():
for cert in node.get_current_certificates():
if cert.not_after <= soon:
expiring_soon.append((hostname, cert.not_after, cert.id))
if expiring_soon:
cert_analysis.append("Certificates Expiring Soon (within 30 days):")
for hostname, expiry, cert_id in sorted(expiring_soon, key=lambda x: x[1]):
cert_analysis.append(f" {hostname}: expires {expiry.strftime('%Y-%m-%d')} (cert ID: {cert_id})")
return cert_analysis
def _analyze_dns_records(self) -> List[str]:
"""Analyze DNS record patterns."""
dns_analysis = []
# Record type distribution
record_type_counts = {}
total_records = 0
for node in self.data.nodes.values():
for record_type, records in node.dns_records_by_type.items():
record_type_counts[record_type] = record_type_counts.get(record_type, 0) + len(records)
total_records += len(records)
dns_analysis.append(f"Total DNS Records: {total_records}")
dns_analysis.append("Record Type Distribution:")
for record_type, count in sorted(record_type_counts.items()):
percentage = (count / total_records * 100) if total_records > 0 else 0
dns_analysis.append(f" {record_type}: {count} ({percentage:.1f}%)")
dns_analysis.append("")
# Interesting findings
interesting = []
# Multiple MX records
multi_mx_nodes = []
for hostname, node in self.data.nodes.items():
mx_records = node.dns_records_by_type.get('MX', [])
if len(mx_records) > 1:
multi_mx_nodes.append((hostname, len(mx_records)))
if multi_mx_nodes:
interesting.append("Multiple MX Records:")
for hostname, count in multi_mx_nodes:
interesting.append(f" {hostname}: {count} MX records")
# CAA records (security-relevant)
caa_nodes = []
for hostname, node in self.data.nodes.items():
if 'CAA' in node.dns_records_by_type:
caa_nodes.append(hostname)
if caa_nodes:
interesting.append(f"Domains with CAA Records: {len(caa_nodes)}")
for hostname in caa_nodes[:5]: # Show first 5
interesting.append(f" {hostname}")
if interesting:
dns_analysis.append("Interesting DNS Findings:")
dns_analysis.extend(interesting)
return dns_analysis
# Backward compatibility
ReportGenerator = ForensicReportGenerator

View File

@ -1,10 +1,9 @@
# File: src/shodan_client.py
"""Shodan API integration with forensic operation tracking."""
"""Shodan API integration."""
import requests
import time
import logging
import uuid
from typing import Optional, Dict, Any, List
from .data_structures import ShodanResult
from .config import Config
@ -13,7 +12,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class ShodanClient:
"""Shodan API client with forensic tracking."""
"""Shodan API client."""
BASE_URL = "https://api.shodan.io"
@ -37,15 +36,11 @@ class ShodanClient:
self.last_request = time.time()
def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[ShodanResult]:
"""Lookup IP address information with forensic tracking."""
def lookup_ip(self, ip: str) -> Optional[ShodanResult]:
"""Lookup IP address information."""
self._rate_limit()
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying Shodan for IP: {ip} (operation: {operation_id})")
logger.debug(f"🔍 Querying Shodan for IP: {ip}")
try:
url = f"{self.BASE_URL}/shodan/host/{ip}"
@ -76,14 +71,12 @@ class ShodanClient:
'banner': service.get('data', '').strip()[:200] if service.get('data') else ''
}
# Create ShodanResult with forensic metadata
result = ShodanResult(
ip=ip,
ports=sorted(list(set(ports))),
services=services,
organization=data.get('org'),
country=data.get('country_name'),
operation_id=operation_id # Forensic tracking
country=data.get('country_name')
)
logger.info(f"✅ Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}")
@ -117,15 +110,11 @@ class ShodanClient:
logger.error(f"❌ Unexpected error querying Shodan for {ip}: {e}")
return None
def search_domain(self, domain: str, operation_id: Optional[str] = None) -> List[str]:
"""Search for IPs associated with a domain with forensic tracking."""
def search_domain(self, domain: str) -> List[str]:
"""Search for IPs associated with a domain."""
self._rate_limit()
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Searching Shodan for domain: {domain} (operation: {operation_id})")
logger.debug(f"🔍 Searching Shodan for domain: {domain}")
try:
url = f"{self.BASE_URL}/shodan/host/search"

View File

@ -1,10 +1,9 @@
# File: src/virustotal_client.py
"""VirusTotal API integration with forensic operation tracking."""
"""VirusTotal API integration."""
import requests
import time
import logging
import uuid
from datetime import datetime
from typing import Optional
from .data_structures import VirusTotalResult
@ -14,7 +13,7 @@ from .config import Config
logger = logging.getLogger(__name__)
class VirusTotalClient:
"""VirusTotal API client with forensic tracking."""
"""VirusTotal API client."""
BASE_URL = "https://www.virustotal.com/vtapi/v2"
@ -38,15 +37,11 @@ class VirusTotalClient:
self.last_request = time.time()
def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]:
"""Lookup IP address reputation with forensic tracking."""
def lookup_ip(self, ip: str) -> Optional[VirusTotalResult]:
"""Lookup IP address reputation."""
self._rate_limit()
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying VirusTotal for IP: {ip} (operation: {operation_id})")
logger.debug(f"🔍 Querying VirusTotal for IP: {ip}")
try:
url = f"{self.BASE_URL}/ip-address/report"
@ -86,14 +81,12 @@ class VirusTotalClient:
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
# Create VirusTotalResult with forensic metadata
result = VirusTotalResult(
resource=ip,
positives=positives,
total=total,
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}'),
operation_id=operation_id # Forensic tracking
permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}')
)
logger.info(f"✅ VirusTotal result for IP {ip}: {result.positives}/{result.total} detections")
@ -129,15 +122,11 @@ class VirusTotalClient:
logger.error(f"❌ Unexpected error querying VirusTotal for IP {ip}: {e}")
return None
def lookup_domain(self, domain: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]:
"""Lookup domain reputation with forensic tracking."""
def lookup_domain(self, domain: str) -> Optional[VirusTotalResult]:
"""Lookup domain reputation."""
self._rate_limit()
# Generate operation ID if not provided
if operation_id is None:
operation_id = str(uuid.uuid4())
logger.debug(f"🔍 Querying VirusTotal for domain: {domain} (operation: {operation_id})")
logger.debug(f"🔍 Querying VirusTotal for domain: {domain}")
try:
url = f"{self.BASE_URL}/domain/report"
@ -183,14 +172,12 @@ class VirusTotalClient:
except ValueError:
logger.debug(f"Could not parse scan_date: {data.get('scan_date')}")
# Create VirusTotalResult with forensic metadata
result = VirusTotalResult(
resource=domain,
positives=positives,
total=max(total, 1), # Ensure total is at least 1
scan_date=scan_date,
permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}'),
operation_id=operation_id # Forensic tracking
permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}')
)
logger.info(f"✅ VirusTotal result for domain {domain}: {result.positives}/{result.total} detections")

View File

@ -1,14 +1,14 @@
# File: src/web_app.py
"""Flask web application for forensic reconnaissance tool."""
"""Flask web application for reconnaissance tool."""
from flask import Flask, render_template, request, jsonify, send_from_directory
import threading
import time
import logging
from .config import Config
from .reconnaissance import ForensicReconnaissanceEngine
from .report_generator import ForensicReportGenerator
from .data_structures import ForensicReconData
from .reconnaissance import ReconnaissanceEngine
from .report_generator import ReportGenerator
from .data_structures import ReconData
# Set up logging for this module
logger = logging.getLogger(__name__)
@ -23,11 +23,11 @@ def create_app(config: Config):
template_folder='../templates',
static_folder='../static')
app.config['SECRET_KEY'] = 'forensic-recon-tool-secret-key'
app.config['SECRET_KEY'] = 'recon-tool-secret-key'
# Set up logging for web app
config.setup_logging(cli_mode=False)
logger.info("🌐 Forensic web application initialized")
logger.info("🌐 Web application initialized")
@app.route('/')
def index():
@ -36,7 +36,7 @@ def create_app(config: Config):
@app.route('/api/scan', methods=['POST'])
def start_scan():
"""Start a new forensic reconnaissance scan."""
"""Start a new reconnaissance scan."""
try:
data = request.get_json()
target = data.get('target')
@ -52,32 +52,33 @@ def create_app(config: Config):
# Generate scan ID
scan_id = f"{target}_{int(time.time())}"
logger.info(f"🚀 Starting new forensic scan: {scan_id} for target: {target}")
logger.info(f"🚀 Starting new scan: {scan_id} for target: {target}")
# Create shared ForensicReconData object for live updates
shared_data = ForensicReconData()
shared_data.scan_config = {
'target': target,
'max_depth': scan_config.max_depth,
'shodan_enabled': scan_config.shodan_key is not None,
'virustotal_enabled': scan_config.virustotal_key is not None
}
# Create shared ReconData object for live updates
shared_data = ReconData()
# Initialize scan data with the shared forensic data object
# Initialize scan data with the shared data object
with scan_lock:
active_scans[scan_id] = {
'status': 'starting',
'progress': 0,
'message': 'Initializing forensic scan...',
'data': shared_data, # Share the forensic data object from the start
'message': 'Initializing...',
'data': shared_data, # Share the data object from the start!
'error': None,
'live_stats': shared_data.get_stats(), # Use forensic stats
'live_stats': {
'hostnames': 0,
'ip_addresses': 0,
'dns_records': 0,
'certificates': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'latest_discoveries': []
}
# Start forensic reconnaissance in background thread
# Start reconnaissance in background thread
thread = threading.Thread(
target=run_forensic_reconnaissance_background,
target=run_reconnaissance_background,
args=(scan_id, target, scan_config, shared_data)
)
thread.daemon = True
@ -91,18 +92,14 @@ def create_app(config: Config):
@app.route('/api/scan/<scan_id>/status')
def get_scan_status(scan_id):
"""Get scan status and progress with live forensic discoveries."""
"""Get scan status and progress with live discoveries."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id].copy()
# Update live stats from forensic data if available
if scan_data.get('data') and hasattr(scan_data['data'], 'get_stats'):
scan_data['live_stats'] = scan_data['data'].get_stats()
# Don't include the full forensic data object in status (too large)
# Don't include the full data object in status (too large)
if 'data' in scan_data:
del scan_data['data']
@ -110,7 +107,7 @@ def create_app(config: Config):
@app.route('/api/scan/<scan_id>/report')
def get_scan_report(scan_id):
"""Get forensic scan report."""
"""Get scan report."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
@ -121,8 +118,8 @@ def create_app(config: Config):
return jsonify({'error': 'Scan not completed'}), 400
try:
# Generate forensic report
report_gen = ForensicReportGenerator(scan_data['data'])
# Generate report
report_gen = ReportGenerator(scan_data['data'])
return jsonify({
'json_report': scan_data['data'].to_json(),
@ -132,200 +129,48 @@ def create_app(config: Config):
logger.error(f"❌ Error generating report for {scan_id}: {e}", exc_info=True)
return jsonify({'error': f'Failed to generate report: {str(e)}'}), 500
@app.route('/api/scan/<scan_id>/graph')
def get_scan_graph(scan_id):
"""Get graph data for visualization."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id]
if scan_data['status'] != 'completed' or not scan_data['data']:
return jsonify({'error': 'Scan not completed'}), 400
try:
forensic_data = scan_data['data']
# Extract nodes for graph
nodes = []
for hostname, node in forensic_data.nodes.items():
# Determine node color based on depth
color_map = {
0: '#00ff41', # Green for root
1: '#ff9900', # Orange for depth 1
2: '#ff6b6b', # Red for depth 2
3: '#4ecdc4', # Teal for depth 3
4: '#45b7d1', # Blue for depth 4+
}
color = color_map.get(node.depth, '#666666')
# Calculate node size based on number of connections and data
connections = len(forensic_data.get_children(hostname)) + len(forensic_data.get_parents(hostname))
dns_records = len(node.get_all_dns_records())
certificates = len(node.certificates)
# Size based on importance (connections + data)
size = max(8, min(20, 8 + connections * 2 + dns_records // 3 + certificates))
nodes.append({
'id': hostname,
'label': hostname,
'depth': node.depth,
'color': color,
'size': size,
'dns_records': dns_records,
'certificates': certificates,
'ip_addresses': list(node.resolved_ips),
'discovery_methods': [method.value for method in node.discovery_methods],
'first_seen': node.first_seen.isoformat() if node.first_seen else None
})
# Extract edges for graph
edges = []
for edge in forensic_data.edges:
# Skip synthetic TLD expansion edges for cleaner visualization
if edge.source_hostname.startswith('tld_expansion:'):
continue
# Color edges by discovery method
method_colors = {
'initial_target': '#00ff41',
'tld_expansion': '#ff9900',
'dns_record_value': '#4ecdc4',
'certificate_subject': '#ff6b6b',
'dns_subdomain_extraction': '#45b7d1'
}
edges.append({
'source': edge.source_hostname,
'target': edge.target_hostname,
'method': edge.discovery_method.value,
'color': method_colors.get(edge.discovery_method.value, '#666666'),
'operation_id': edge.operation_id,
'timestamp': edge.timestamp.isoformat() if edge.timestamp else None
})
# Graph statistics
stats = {
'node_count': len(nodes),
'edge_count': len(edges),
'max_depth': max([node['depth'] for node in nodes]) if nodes else 0,
'discovery_methods': list(set([edge['method'] for edge in edges])),
'root_nodes': [node['id'] for node in nodes if node['depth'] == 0]
}
return jsonify({
'nodes': nodes,
'edges': edges,
'stats': stats
})
except Exception as e:
logger.error(f"⚠️ Error generating graph data for {scan_id}: {e}", exc_info=True)
return jsonify({'error': f'Failed to generate graph: {str(e)}'}), 500
@app.route('/api/scan/<scan_id>/live-data')
def get_live_scan_data(scan_id):
"""Get current forensic reconnaissance data (for real-time updates)."""
"""Get current reconnaissance data (for real-time updates)."""
with scan_lock:
if scan_id not in active_scans:
return jsonify({'error': 'Scan not found'}), 404
scan_data = active_scans[scan_id]
forensic_data = scan_data['data']
if not forensic_data:
# Now we always have a data object, even if it's empty initially
data_obj = scan_data['data']
if not data_obj:
return jsonify({
'hostnames': [],
'ip_addresses': [],
'stats': {
'hostnames': 0,
'ip_addresses': 0,
'discovery_edges': 0,
'operations_performed': 0,
'dns_records': 0,
'certificates_total': 0,
'certificates_current': 0,
'certificates_expired': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'stats': scan_data['live_stats'],
'latest_discoveries': []
})
# Extract data from forensic structure for frontend
try:
hostnames = sorted(list(forensic_data.nodes.keys()))
ip_addresses = sorted(list(forensic_data.ip_addresses))
stats = forensic_data.get_stats()
# Generate activity log from recent operations
latest_discoveries = []
recent_operations = forensic_data.operation_timeline[-10:] # Last 10 operations
for op_id in recent_operations:
if op_id in forensic_data.operations:
operation = forensic_data.operations[op_id]
activity_entry = {
'timestamp': operation.timestamp.timestamp(),
'message': f"{operation.operation_type.value}: {operation.target}"
}
# Add result summary
if operation.discovered_hostnames:
activity_entry['message'] += f"{len(operation.discovered_hostnames)} hostnames"
if operation.discovered_ips:
activity_entry['message'] += f"{len(operation.discovered_ips)} IPs"
latest_discoveries.append(activity_entry)
# Update scan data with latest discoveries
scan_data['latest_discoveries'] = latest_discoveries
# Return current discoveries from the shared data object
return jsonify({
'hostnames': hostnames,
'ip_addresses': ip_addresses,
'stats': stats,
'latest_discoveries': latest_discoveries
})
except Exception as e:
logger.error(f"❌ Error extracting live data for {scan_id}: {e}", exc_info=True)
# Return minimal data structure
return jsonify({
'hostnames': [],
'ip_addresses': [],
'stats': {
'hostnames': len(forensic_data.nodes) if forensic_data.nodes else 0,
'ip_addresses': len(forensic_data.ip_addresses) if forensic_data.ip_addresses else 0,
'discovery_edges': 0,
'operations_performed': 0,
'dns_records': 0,
'certificates_total': 0,
'certificates_current': 0,
'certificates_expired': 0,
'shodan_results': 0,
'virustotal_results': 0
},
'latest_discoveries': []
'hostnames': sorted(list(data_obj.hostnames)),
'ip_addresses': sorted(list(data_obj.ip_addresses)),
'stats': data_obj.get_stats(),
'latest_discoveries': scan_data.get('latest_discoveries', [])
})
return app
def run_forensic_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ForensicReconData):
"""Run forensic reconnaissance in background thread with shared forensic data object."""
def run_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ReconData):
"""Run reconnaissance in background thread with shared data object."""
def update_progress(message: str, percentage: int = None):
"""Update scan progress and live forensic statistics."""
"""Update scan progress and live statistics."""
with scan_lock:
if scan_id in active_scans:
active_scans[scan_id]['message'] = message
if percentage is not None:
active_scans[scan_id]['progress'] = percentage
# Update live stats from the shared forensic data object
# Update live stats from the shared data object
if shared_data:
active_scans[scan_id]['live_stats'] = shared_data.get_stats()
@ -333,13 +178,10 @@ def run_forensic_reconnaissance_background(scan_id: str, target: str, config: Co
if 'latest_discoveries' not in active_scans[scan_id]:
active_scans[scan_id]['latest_discoveries'] = []
# Create activity entry
activity_entry = {
active_scans[scan_id]['latest_discoveries'].append({
'timestamp': time.time(),
'message': message
}
active_scans[scan_id]['latest_discoveries'].append(activity_entry)
})
# Keep only last 10 discoveries
active_scans[scan_id]['latest_discoveries'] = \
@ -348,44 +190,40 @@ def run_forensic_reconnaissance_background(scan_id: str, target: str, config: Co
logger.info(f"[{scan_id}] {message} ({percentage}%)" if percentage else f"[{scan_id}] {message}")
try:
logger.info(f"🔧 Initializing forensic reconnaissance engine for scan: {scan_id}")
logger.info(f"🔧 Initializing reconnaissance engine for scan: {scan_id}")
# Initialize forensic engine
engine = ForensicReconnaissanceEngine(config)
# Initialize engine
engine = ReconnaissanceEngine(config)
engine.set_progress_callback(update_progress)
# IMPORTANT: Pass the shared forensic data object to the engine
# IMPORTANT: Pass the shared data object to the engine
engine.set_shared_data(shared_data)
# Update status
with scan_lock:
active_scans[scan_id]['status'] = 'running'
logger.info(f"🚀 Starting forensic reconnaissance for: {target}")
logger.info(f"🚀 Starting reconnaissance for: {target}")
# Run forensic reconnaissance - this will populate the shared_data object incrementally
# Run reconnaissance - this will populate the shared_data object incrementally
final_data = engine.run_reconnaissance(target)
logger.info(f"Forensic reconnaissance completed for scan: {scan_id}")
logger.info(f"Reconnaissance completed for scan: {scan_id}")
# Update with final results (the shared_data should already be populated)
with scan_lock:
active_scans[scan_id]['status'] = 'completed'
active_scans[scan_id]['progress'] = 100
active_scans[scan_id]['message'] = 'Forensic reconnaissance completed'
active_scans[scan_id]['message'] = 'Reconnaissance completed'
active_scans[scan_id]['data'] = final_data # This should be the same as shared_data
active_scans[scan_id]['live_stats'] = final_data.get_stats()
# Log final forensic statistics
# Log final statistics
final_stats = final_data.get_stats()
logger.info(f"📊 Final forensic stats for {scan_id}: {final_stats}")
# Log discovery graph analysis
graph_analysis = final_data._generate_graph_analysis()
logger.info(f"🌐 Discovery graph: {len(final_data.nodes)} nodes, {len(final_data.edges)} edges, max depth: {graph_analysis['max_depth']}")
logger.info(f"📊 Final stats for {scan_id}: {final_stats}")
except Exception as e:
logger.error(f"❌ Error in forensic reconnaissance for {scan_id}: {e}", exc_info=True)
logger.error(f"❌ Error in reconnaissance for {scan_id}: {e}", exc_info=True)
# Handle errors
with scan_lock:
active_scans[scan_id]['status'] = 'error'

View File

@ -1,4 +1,4 @@
// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Forensic Data Support
// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Debug Output
class ReconTool {
constructor() {
@ -6,8 +6,7 @@ class ReconTool {
this.pollInterval = null;
this.liveDataInterval = null;
this.currentReport = null;
this.debugMode = true;
this.graphVisualization = null; // Add this line
this.debugMode = true; // Enable debug logging
this.init();
}
@ -24,13 +23,6 @@ class ReconTool {
init() {
this.bindEvents();
this.setupRealtimeElements();
// Handle window resize for graph
window.addEventListener('resize', () => {
if (this.graphVisualization) {
this.graphVisualization.handleResize();
}
});
}
setupRealtimeElements() {
@ -51,14 +43,6 @@ class ReconTool {
<span class="stat-label">IP Addresses:</span>
<span id="liveIPs" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Discovery Edges:</span>
<span id="liveEdges" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Operations:</span>
<span id="liveOperations" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">DNS Records:</span>
<span id="liveDNS" class="stat-value">0</span>
@ -133,27 +117,6 @@ class ReconTool {
this.startScan();
}
});
document.getElementById('showGraphView').addEventListener('click', () => {
this.showGraphView();
});
}
showGraphView() {
if (this.graphVisualization && this.currentScanId) {
document.getElementById('graphSection').style.display = 'block';
// Update button states
document.getElementById('showGraphView').classList.add('active');
document.getElementById('showJson').classList.remove('active');
document.getElementById('showText').classList.remove('active');
// Scroll to graph section
document.getElementById('graphSection').scrollIntoView({
behavior: 'smooth'
});
} else {
alert('Graph data not available yet. Please wait for scan completion.');
}
}
async startScan() {
@ -174,7 +137,7 @@ class ReconTool {
try {
// Show progress section
this.showProgressSection();
this.updateProgress(0, 'Starting forensic scan...');
this.updateProgress(0, 'Starting scan...');
this.debug('Starting scan with data:', scanData);
@ -260,7 +223,7 @@ class ReconTool {
// Update progress
this.updateProgress(status.progress, status.message);
// Update live stats (handle new forensic format)
// Update live stats
if (status.live_stats) {
this.debug('Received live stats:', status.live_stats);
this.updateLiveStats(status.live_stats);
@ -307,7 +270,7 @@ class ReconTool {
this.debug('Received live data:', data);
// Update live discoveries (handle new forensic format)
// Update live discoveries
this.updateLiveDiscoveries(data);
} catch (error) {
@ -319,19 +282,17 @@ class ReconTool {
updateLiveStats(stats) {
this.debug('Updating live stats:', stats);
// Handle both old and new stat formats for compatibility
const statMappings = {
'liveHostnames': stats.hostnames || stats.hostnames || 0,
'liveIPs': stats.ip_addresses || stats.ips || 0,
'liveEdges': stats.discovery_edges || 0, // New forensic field
'liveOperations': stats.operations_performed || 0, // New forensic field
// Update the live statistics counters
const statElements = {
'liveHostnames': stats.hostnames || 0,
'liveIPs': stats.ip_addresses || 0,
'liveDNS': stats.dns_records || 0,
'liveCerts': stats.certificates_total || stats.certificates || 0,
'liveCerts': stats.certificates || 0,
'liveShodan': stats.shodan_results || 0,
'liveVT': stats.virustotal_results || 0
};
Object.entries(statMappings).forEach(([elementId, value]) => {
Object.entries(statElements).forEach(([elementId, value]) => {
const element = document.getElementById(elementId);
if (element) {
const currentValue = element.textContent;
@ -354,92 +315,43 @@ class ReconTool {
updateLiveDiscoveries(data) {
this.debug('Updating live discoveries with data:', data);
// Handle new forensic data format
let hostnames = [];
let ipAddresses = [];
let activities = [];
// Extract data from forensic format or fallback to old format
if (data.hostnames && Array.isArray(data.hostnames)) {
hostnames = data.hostnames;
} else if (data.stats && data.stats.hostnames) {
// If we only have stats, create a placeholder list
hostnames = [`${data.stats.hostnames} discovered`];
}
if (data.ip_addresses && Array.isArray(data.ip_addresses)) {
ipAddresses = data.ip_addresses;
} else if (data.stats && data.stats.ip_addresses) {
// If we only have stats, create a placeholder list
ipAddresses = [`${data.stats.ip_addresses} discovered`];
}
// Handle activity log from forensic format
if (data.latest_discoveries && Array.isArray(data.latest_discoveries)) {
activities = data.latest_discoveries;
}
// Update hostnames list
const hostnameList = document.querySelector('#recentHostnames .hostname-list');
if (hostnameList && hostnames.length > 0) {
if (hostnameList && data.hostnames && data.hostnames.length > 0) {
// Show last 10 hostnames
const recentHostnames = hostnames.slice(-10);
const recentHostnames = data.hostnames;
hostnameList.innerHTML = recentHostnames.map(hostname =>
`<span class="discovery-item">${hostname}</span>`
).join('');
this.debug(`Updated hostname list with ${recentHostnames.length} items`);
} else if (hostnameList) {
this.debug(`No hostnames to display (${hostnames.length} total)`);
this.debug(`No hostnames to display (${data.hostnames ? data.hostnames.length : 0} total)`);
}
// Update IP addresses list
const ipList = document.querySelector('#recentIPs .ip-list');
if (ipList && ipAddresses.length > 0) {
if (ipList && data.ip_addresses && data.ip_addresses.length > 0) {
// Show last 10 IPs
const recentIPs = ipAddresses.slice(-10);
const recentIPs = data.ip_addresses;
ipList.innerHTML = recentIPs.map(ip =>
`<span class="discovery-item">${ip}</span>`
).join('');
this.debug(`Updated IP list with ${recentIPs.length} items`);
} else if (ipList) {
this.debug(`No IPs to display (${ipAddresses.length} total)`);
this.debug(`No IPs to display (${data.ip_addresses ? data.ip_addresses.length : 0} total)`);
}
// Update activity log
const activityList = document.querySelector('#activityLog .activity-list');
if (activityList) {
if (activities.length > 0) {
const recentActivities = activities.slice(-5); // Last 5 activities
activityList.innerHTML = recentActivities.map(activity => {
if (activityList && data.latest_discoveries && data.latest_discoveries.length > 0) {
const activities = data.latest_discoveries.slice(-5); // Last 5 activities
activityList.innerHTML = activities.map(activity => {
const time = new Date(activity.timestamp * 1000).toLocaleTimeString();
return `<div class="activity-item">[${time}] ${activity.message}</div>`;
}).join('');
this.debug(`Updated activity log with ${recentActivities.length} items`);
} else {
// Fallback: show generic activity based on stats
const stats = data.stats || {};
const genericActivities = [];
if (stats.operations_performed > 0) {
genericActivities.push(`${stats.operations_performed} operations performed`);
}
if (stats.hostnames > 0) {
genericActivities.push(`${stats.hostnames} hostnames discovered`);
}
if (stats.dns_records > 0) {
genericActivities.push(`${stats.dns_records} DNS records collected`);
}
if (genericActivities.length > 0) {
const now = new Date().toLocaleTimeString();
activityList.innerHTML = genericActivities.map(activity =>
`<div class="activity-item">[${now}] ${activity}</div>`
).join('');
this.debug(`Updated activity log with ${genericActivities.length} generic items`);
} else {
this.debug('No activities to display');
}
}
this.debug(`Updated activity log with ${activities.length} items`);
} else if (activityList) {
this.debug(`No activities to display (${data.latest_discoveries ? data.latest_discoveries.length : 0} total)`);
}
}
@ -463,14 +375,8 @@ class ReconTool {
this.showResultsSection();
this.showReport('text'); // Default to text view
// Load and show graph
if (!this.graphVisualization) {
this.graphVisualization = new GraphVisualization();
}
await this.graphVisualization.loadAndShowGraph(this.currentScanId);
} catch (error) {
console.error('⚠️ Error loading report:', error);
console.error('❌ Error loading report:', error);
this.showError(`Error loading report: ${error.message}`);
}
}
@ -504,7 +410,7 @@ class ReconTool {
if (liveSection) {
const title = liveSection.querySelector('h3');
if (title) {
title.textContent = '📊 Final Forensic Summary';
title.textContent = '📊 Final Discovery Summary';
}
liveSection.style.display = 'block';
}
@ -518,7 +424,7 @@ class ReconTool {
if (progressMessage) progressMessage.style.display = 'none';
if (scanControls) scanControls.style.display = 'none';
this.debug('Showing results section with forensic discoveries');
this.debug('Showing results section with live discoveries');
}
resetToForm() {
@ -621,11 +527,11 @@ class ReconTool {
content = typeof this.currentReport.json_report === 'string'
? this.currentReport.json_report
: JSON.stringify(this.currentReport.json_report, null, 2);
filename = `forensic-recon-report-${this.currentScanId}.json`;
filename = `recon-report-${this.currentScanId}.json`;
mimeType = 'application/json';
} else {
content = this.currentReport.text_report;
filename = `forensic-recon-report-${this.currentScanId}.txt`;
filename = `recon-report-${this.currentScanId}.txt`;
mimeType = 'text/plain';
}
@ -642,375 +548,8 @@ class ReconTool {
}
}
class GraphVisualization {
constructor() {
this.svg = null;
this.simulation = null;
this.graphData = null;
this.showLabels = false;
this.selectedNode = null;
this.zoom = null;
this.container = null;
}
async loadAndShowGraph(scanId) {
try {
console.log('🕸️ Loading graph data...');
const response = await fetch(`/api/scan/${scanId}/graph`);
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const graphData = await response.json();
if (graphData.error) {
throw new Error(graphData.error);
}
this.graphData = graphData;
this.showGraphSection();
this.initializeGraph();
this.updateGraphStats(graphData.stats);
console.log('✅ Graph loaded successfully', graphData.stats);
} catch (error) {
console.error('⚠️ Error loading graph:', error);
alert(`Failed to load graph: ${error.message}`);
}
}
showGraphSection() {
document.getElementById('graphSection').style.display = 'block';
this.bindGraphEvents();
}
bindGraphEvents() {
document.getElementById('showGraph').addEventListener('click', () => {
this.showGraph();
});
document.getElementById('hideGraph').addEventListener('click', () => {
this.hideGraph();
});
document.getElementById('resetZoom').addEventListener('click', () => {
this.resetZoom();
});
document.getElementById('toggleLabels').addEventListener('click', () => {
this.toggleLabels();
});
}
initializeGraph() {
if (!this.graphData) return;
// Clear existing graph
d3.select('#discoveryGraph').selectAll('*').remove();
// Set up SVG
const container = d3.select('#discoveryGraph');
const containerNode = container.node();
const width = containerNode.clientWidth;
const height = containerNode.clientHeight;
this.svg = container
.attr('width', width)
.attr('height', height);
// Set up zoom behavior
this.zoom = d3.zoom()
.scaleExtent([0.1, 3])
.on('zoom', (event) => {
this.container.attr('transform', event.transform);
});
this.svg.call(this.zoom);
// Create container for graph elements
this.container = this.svg.append('g');
// Set up force simulation
this.simulation = d3.forceSimulation(this.graphData.nodes)
.force('link', d3.forceLink(this.graphData.edges)
.id(d => d.id)
.distance(80)
.strength(0.5))
.force('charge', d3.forceManyBody()
.strength(-300)
.distanceMax(400))
.force('center', d3.forceCenter(width / 2, height / 2))
.force('collision', d3.forceCollide()
.radius(d => d.size + 5));
this.drawGraph();
this.startSimulation();
}
drawGraph() {
// Draw links
const links = this.container.append('g')
.selectAll('line')
.data(this.graphData.edges)
.enter().append('line')
.attr('class', 'graph-link')
.attr('stroke', d => d.color)
.on('mouseover', (event, d) => {
this.showTooltip(event, `Discovery: ${d.method}<br>From: ${d.source}<br>To: ${d.target}`);
})
.on('mouseout', () => {
this.hideTooltip();
});
// Draw nodes
const nodes = this.container.append('g')
.selectAll('circle')
.data(this.graphData.nodes)
.enter().append('circle')
.attr('class', 'graph-node')
.attr('r', d => d.size)
.attr('fill', d => d.color)
.style('opacity', 0.8)
.on('mouseover', (event, d) => {
this.showNodeTooltip(event, d);
this.highlightConnections(d);
})
.on('mouseout', (event, d) => {
this.hideTooltip();
this.unhighlightConnections();
})
.on('click', (event, d) => {
this.selectNode(d);
})
.call(d3.drag()
.on('start', (event, d) => {
if (!event.active) this.simulation.alphaTarget(0.3).restart();
d.fx = d.x;
d.fy = d.y;
})
.on('drag', (event, d) => {
d.fx = event.x;
d.fy = event.y;
})
.on('end', (event, d) => {
if (!event.active) this.simulation.alphaTarget(0);
d.fx = null;
d.fy = null;
}));
// Draw labels (initially hidden)
const labels = this.container.append('g')
.selectAll('text')
.data(this.graphData.nodes)
.enter().append('text')
.attr('class', 'graph-label')
.attr('dy', '.35em')
.style('opacity', this.showLabels ? 1 : 0)
.text(d => d.label);
// Store references
this.links = links;
this.nodes = nodes;
this.labels = labels;
}
startSimulation() {
this.simulation.on('tick', () => {
this.links
.attr('x1', d => d.source.x)
.attr('y1', d => d.source.y)
.attr('x2', d => d.target.x)
.attr('y2', d => d.target.y);
this.nodes
.attr('cx', d => d.x)
.attr('cy', d => d.y);
this.labels
.attr('x', d => d.x)
.attr('y', d => d.y + d.size + 12);
});
}
showNodeTooltip(event, node) {
const tooltip = `
<strong>${node.label}</strong><br>
Depth: ${node.depth}<br>
DNS Records: ${node.dns_records}<br>
Certificates: ${node.certificates}<br>
IPs: ${node.ip_addresses.length}<br>
Discovery: ${node.discovery_methods.join(', ')}<br>
First Seen: ${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'}
`;
this.showTooltip(event, tooltip);
}
showTooltip(event, content) {
const tooltip = document.getElementById('graphTooltip');
tooltip.innerHTML = content;
tooltip.className = 'graph-tooltip visible';
const rect = tooltip.getBoundingClientRect();
const containerRect = document.getElementById('discoveryGraph').getBoundingClientRect();
tooltip.style.left = `${event.clientX - containerRect.left + 10}px`;
tooltip.style.top = `${event.clientY - containerRect.top - 10}px`;
// Adjust position if tooltip goes off screen
if (event.clientX + rect.width > window.innerWidth) {
tooltip.style.left = `${event.clientX - containerRect.left - rect.width - 10}px`;
}
if (event.clientY - rect.height < 0) {
tooltip.style.top = `${event.clientY - containerRect.top + 20}px`;
}
}
hideTooltip() {
const tooltip = document.getElementById('graphTooltip');
tooltip.className = 'graph-tooltip';
}
highlightConnections(node) {
// Highlight connected links
this.links
.style('opacity', d => (d.source.id === node.id || d.target.id === node.id) ? 1 : 0.2)
.classed('highlighted', d => d.source.id === node.id || d.target.id === node.id);
// Highlight connected nodes
this.nodes
.style('opacity', d => {
if (d.id === node.id) return 1;
const connected = this.graphData.edges.some(edge =>
(edge.source.id === node.id && edge.target.id === d.id) ||
(edge.target.id === node.id && edge.source.id === d.id)
);
return connected ? 0.8 : 0.3;
});
}
unhighlightConnections() {
this.links
.style('opacity', 0.6)
.classed('highlighted', false);
this.nodes
.style('opacity', 0.8);
}
selectNode(node) {
// Update selected node styling
this.nodes
.classed('selected', d => d.id === node.id);
// Show node details
this.showNodeDetails(node);
this.selectedNode = node;
}
showNodeDetails(node) {
const detailsContainer = document.getElementById('nodeDetails');
const selectedInfo = document.getElementById('selectedNodeInfo');
const details = `
<div class="detail-item">
<span class="detail-label">Hostname:</span>
<span class="detail-value">${node.label}</span>
</div>
<div class="detail-item">
<span class="detail-label">Discovery Depth:</span>
<span class="detail-value">${node.depth}</span>
</div>
<div class="detail-item">
<span class="detail-label">DNS Records:</span>
<span class="detail-value">${node.dns_records}</span>
</div>
<div class="detail-item">
<span class="detail-label">Certificates:</span>
<span class="detail-value">${node.certificates}</span>
</div>
<div class="detail-item">
<span class="detail-label">IP Addresses:</span>
<span class="detail-value">${node.ip_addresses.join(', ') || 'None'}</span>
</div>
<div class="detail-item">
<span class="detail-label">Discovery Methods:</span>
<span class="detail-value">${node.discovery_methods.join(', ')}</span>
</div>
<div class="detail-item">
<span class="detail-label">First Seen:</span>
<span class="detail-value">${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'}</span>
</div>
`;
detailsContainer.innerHTML = details;
selectedInfo.style.display = 'block';
}
toggleLabels() {
this.showLabels = !this.showLabels;
if (this.labels) {
this.labels.transition()
.duration(300)
.style('opacity', this.showLabels ? 1 : 0);
}
const button = document.getElementById('toggleLabels');
button.textContent = this.showLabels ? 'Hide Labels' : 'Show Labels';
}
resetZoom() {
if (this.svg && this.zoom) {
this.svg.transition()
.duration(750)
.call(this.zoom.transform, d3.zoomIdentity);
}
}
showGraph() {
if (this.container) {
this.container.style('display', 'block');
}
document.getElementById('showGraph').classList.add('active');
document.getElementById('hideGraph').classList.remove('active');
}
hideGraph() {
if (this.container) {
this.container.style('display', 'none');
}
document.getElementById('hideGraph').classList.add('active');
document.getElementById('showGraph').classList.remove('active');
}
updateGraphStats(stats) {
document.getElementById('graphNodes').textContent = stats.node_count || 0;
document.getElementById('graphEdges').textContent = stats.edge_count || 0;
document.getElementById('graphDepth').textContent = stats.max_depth || 0;
}
// Handle window resize
handleResize() {
if (this.svg && this.graphData) {
const containerNode = document.getElementById('discoveryGraph');
const width = containerNode.clientWidth;
const height = containerNode.clientHeight;
this.svg
.attr('width', width)
.attr('height', height);
this.simulation
.force('center', d3.forceCenter(width / 2, height / 2))
.restart();
}
}
}
// Initialize the application when DOM is loaded
document.addEventListener('DOMContentLoaded', () => {
console.log('🌐 Forensic DNS Reconnaissance Tool initialized with debug mode');
console.log('🌐 DNS Reconnaissance Tool initialized with debug mode');
new ReconTool();
});

View File

@ -437,252 +437,3 @@ header p {
gap: 10px;
}
}
/* Add this CSS to the existing style.css file */
/* Graph Section */
.graph-section {
background: #2a2a2a;
border-radius: 4px;
border: 1px solid #444;
box-shadow: inset 0 0 15px rgba(0,0,0,0.5);
padding: 30px;
margin-bottom: 25px;
}
.graph-controls {
margin-bottom: 20px;
text-align: center;
}
.graph-controls button {
margin: 0 5px;
}
.graph-legend {
display: flex;
justify-content: space-between;
margin-bottom: 20px;
padding: 15px;
background: rgba(0,0,0,0.3);
border-radius: 4px;
border: 1px solid #444;
flex-wrap: wrap;
}
.graph-legend h4, .graph-legend h5 {
color: #e0e0e0;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}
.legend-items, .legend-methods {
display: flex;
flex-direction: column;
gap: 8px;
}
.legend-item {
display: flex;
align-items: center;
gap: 10px;
font-size: 0.9rem;
color: #c7c7c7;
}
.legend-color {
width: 16px;
height: 16px;
border-radius: 50%;
border: 1px solid #666;
}
.method-item {
font-size: 0.9rem;
color: #a0a0a0;
margin-bottom: 4px;
}
.graph-container {
position: relative;
width: 100%;
height: 600px;
background: #0a0a0a;
border-radius: 4px;
border: 1px solid #333;
margin-bottom: 20px;
overflow: hidden;
}
#discoveryGraph {
width: 100%;
height: 100%;
cursor: grab;
}
#discoveryGraph:active {
cursor: grabbing;
}
.graph-tooltip {
position: absolute;
background: rgba(0, 0, 0, 0.9);
color: #00ff41;
padding: 10px;
border-radius: 4px;
border: 1px solid #00ff41;
font-family: 'Courier New', monospace;
font-size: 0.8rem;
pointer-events: none;
opacity: 0;
transition: opacity 0.2s ease;
z-index: 1000;
max-width: 300px;
line-height: 1.4;
}
.graph-tooltip.visible {
opacity: 1;
}
.graph-info {
display: flex;
justify-content: space-between;
align-items: flex-start;
flex-wrap: wrap;
gap: 20px;
}
.graph-stats {
display: flex;
gap: 20px;
flex-wrap: wrap;
}
.selected-node-info {
background: rgba(0, 40, 0, 0.8);
border: 1px solid #00ff41;
border-radius: 4px;
padding: 15px;
min-width: 250px;
}
.selected-node-info h4 {
color: #00ff41;
margin-bottom: 10px;
text-transform: uppercase;
letter-spacing: 1px;
}
#nodeDetails {
font-family: 'Courier New', monospace;
font-size: 0.8rem;
line-height: 1.4;
color: #c7c7c7;
}
#nodeDetails .detail-item {
margin-bottom: 8px;
border-bottom: 1px solid #333;
padding-bottom: 4px;
}
#nodeDetails .detail-item:last-child {
border-bottom: none;
}
#nodeDetails .detail-label {
color: #00ff41;
font-weight: bold;
}
#nodeDetails .detail-value {
color: #c7c7c7;
margin-left: 10px;
}
/* Graph Nodes and Links Styling (applied via D3) */
.graph-node {
stroke: #333;
stroke-width: 2px;
cursor: pointer;
transition: all 0.3s ease;
}
.graph-node:hover {
stroke: #fff;
stroke-width: 3px;
}
.graph-node.selected {
stroke: #ff9900;
stroke-width: 4px;
}
.graph-link {
stroke-opacity: 0.6;
stroke-width: 2px;
transition: all 0.3s ease;
}
.graph-link:hover {
stroke-opacity: 1;
stroke-width: 3px;
}
.graph-link.highlighted {
stroke-opacity: 1;
stroke-width: 3px;
}
.graph-label {
font-family: 'Courier New', monospace;
font-size: 10px;
fill: #c7c7c7;
text-anchor: middle;
pointer-events: none;
opacity: 0.8;
}
.graph-label.visible {
opacity: 1;
}
/* Responsive adjustments for graph */
@media (max-width: 768px) {
.graph-container {
height: 400px;
}
.graph-legend {
flex-direction: column;
gap: 15px;
}
.legend-items, .legend-methods {
flex-direction: row;
flex-wrap: wrap;
gap: 15px;
}
.graph-info {
flex-direction: column;
align-items: stretch;
}
.graph-stats {
justify-content: center;
}
.selected-node-info {
min-width: auto;
width: 100%;
}
.graph-tooltip {
font-size: 0.7rem;
max-width: 250px;
}
}

View File

@ -65,7 +65,6 @@
<div class="results-controls">
<button id="showJson" class="btn-secondary">Show JSON</button>
<button id="showText" class="btn-secondary active">Show Text Report</button>
<button id="showGraphView" class="btn-secondary">Show Graph</button>
<button id="downloadJson" class="btn-secondary">Download JSON</button>
<button id="downloadText" class="btn-secondary">Download Text</button>
</div>
@ -74,46 +73,8 @@
<pre id="reportContent"></pre>
</div>
</div>
<div class="graph-section" id="graphSection" style="display: none;">
<h2>Discovery Graph</h2>
<div class="graph-stats">
<div class="stat-item">
<span class="stat-label">Nodes:</span>
<span id="graphNodes" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Edges:</span>
<span id="graphEdges" class="stat-value">0</span>
</div>
<div class="stat-item">
<span class="stat-label">Max Depth:</span>
<span id="graphDepth" class="stat-value">0</span>
</div>
</div>
<div class="graph-controls">
<button id="showGraph" class="btn-secondary active">Show Graph</button>
<button id="hideGraph" class="btn-secondary">Hide Graph</button>
<button id="resetZoom" class="btn-secondary">Reset Zoom</button>
<button id="toggleLabels" class="btn-secondary">Show Labels</button>
</div>
<div class="graph-display-area">
<svg id="discoveryGraph"></svg>
<div id="graphTooltip" class="graph-tooltip"></div>
</div>
<div class="selected-node-details" id="selectedNodeInfo" style="display: none;">
<h3>Selected Node Details</h3>
<div id="nodeDetails"></div>
</div>
</div>
</div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/d3/7.8.5/d3.min.js"></script>
<script src="{{ url_for('static', filename='script.js') }}"></script>
</body>
</html>