From 29e36e34be0be503819a4265490f418e2f3de72d Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Tue, 9 Sep 2025 22:19:46 +0200 Subject: [PATCH] graph --- src/certificate_checker.py | 105 +++-- src/data_structures.py | 449 ++++++++++++++++---- src/dns_resolver.py | 80 +++- src/main.py | 93 +++-- src/reconnaissance.py | 811 ++++++++++++++++++++++++------------- src/report_generator.py | 496 +++++++++++++++++++---- src/shodan_client.py | 47 ++- src/virustotal_client.py | 53 ++- src/web_app.py | 282 ++++++++++--- static/script.js | 525 ++++++++++++++++++++++-- static/style.css | 249 ++++++++++++ templates/index.html | 41 +- 12 files changed, 2565 insertions(+), 666 deletions(-) diff --git a/src/certificate_checker.py b/src/certificate_checker.py index d91997d..a832d27 100644 --- a/src/certificate_checker.py +++ b/src/certificate_checker.py @@ -1,11 +1,12 @@ # File: src/certificate_checker.py -"""Certificate transparency log checker using crt.sh with minimal query caching.""" +"""Certificate transparency log checker using crt.sh with forensic operation tracking.""" import requests import json import time import logging import socket +import uuid from datetime import datetime from typing import List, Optional, Set from .data_structures import Certificate @@ -15,7 +16,7 @@ from .config import Config logger = logging.getLogger(__name__) class CertificateChecker: - """Check certificates using crt.sh with simple query caching to prevent duplicate HTTP requests.""" + """Check certificates using crt.sh with simple query caching and forensic tracking.""" CRT_SH_URL = "https://crt.sh/" @@ -29,7 +30,7 @@ class CertificateChecker: # Simple HTTP request cache to avoid duplicate queries self._http_cache = {} # query_string -> List[Certificate] - logger.info("Certificate checker initialized with HTTP request caching") + logger.info("πŸ” Certificate checker initialized with HTTP request caching") self._test_connectivity() def _test_connectivity(self): @@ -82,9 +83,13 @@ class CertificateChecker: self.last_request = time.time() self.query_count += 1 - def get_certificates(self, domain: str) -> List[Certificate]: - """Get certificates for a domain - EXACTLY the same behavior as original, just with HTTP caching.""" - logger.debug(f"Getting certificates for domain: {domain}") + def get_certificates(self, domain: str, operation_id: Optional[str] = None) -> List[Certificate]: + """Get certificates for a domain with forensic tracking.""" + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ” Getting certificates for domain: {domain} (operation: {operation_id})") if self.connection_failures >= self.max_connection_failures: logger.warning(f"Skipping certificate lookup for {domain} due to repeated connection failures") @@ -93,11 +98,11 @@ class CertificateChecker: certificates = [] # Query for the domain itself - domain_certs = self._query_crt_sh(domain) + domain_certs = self._query_crt_sh(domain, operation_id) certificates.extend(domain_certs) # Query for wildcard certificates - wildcard_certs = self._query_crt_sh(f"%.{domain}") + wildcard_certs = self._query_crt_sh(f"%.{domain}", operation_id) certificates.extend(wildcard_certs) # Remove duplicates based on certificate ID @@ -105,34 +110,58 @@ class CertificateChecker: final_certs = list(unique_certs.values()) if final_certs: - logger.info(f"Found {len(final_certs)} unique certificates for {domain}") + logger.info(f"βœ… Found {len(final_certs)} unique certificates for {domain}") else: - logger.debug(f"No certificates found for {domain}") + logger.debug(f"ℹ️ No certificates found for {domain}") return final_certs - def _query_crt_sh(self, query: str) -> List[Certificate]: - """Query crt.sh API with HTTP caching to avoid duplicate requests.""" + def _query_crt_sh(self, query: str, operation_id: str) -> List[Certificate]: + """Query crt.sh API with HTTP caching and forensic tracking.""" # Check HTTP cache first + cache_key = f"{query}:{operation_id}" # Include operation_id in cache key if query in self._http_cache: logger.debug(f"Using cached HTTP result for crt.sh query: {query}") - return self._http_cache[query] + # Need to create new Certificate objects with current operation_id + cached_certs = self._http_cache[query] + return [ + Certificate( + id=cert.id, + issuer=cert.issuer, + subject=cert.subject, + not_before=cert.not_before, + not_after=cert.not_after, + is_wildcard=cert.is_wildcard, + operation_id=operation_id # Use current operation_id + ) for cert in cached_certs + ] # Not cached, make the HTTP request - certificates = self._make_http_request(query) + certificates = self._make_http_request(query, operation_id) - # Cache the HTTP result - self._http_cache[query] = certificates + # Cache the HTTP result (without operation_id) + if certificates: + self._http_cache[query] = [ + Certificate( + id=cert.id, + issuer=cert.issuer, + subject=cert.subject, + not_before=cert.not_before, + not_after=cert.not_after, + is_wildcard=cert.is_wildcard, + operation_id="" # Cache without operation_id + ) for cert in certificates + ] return certificates - def _make_http_request(self, query: str) -> List[Certificate]: - """Make actual HTTP request to crt.sh API with retry logic.""" + def _make_http_request(self, query: str, operation_id: str) -> List[Certificate]: + """Make actual HTTP request to crt.sh API with retry logic and forensic tracking.""" certificates = [] self._rate_limit() - logger.debug(f"Making HTTP request to crt.sh for: {query}") + logger.debug(f"🌐 Making HTTP request to crt.sh for: {query} (operation: {operation_id})") max_retries = 2 backoff_delays = [1, 3] @@ -151,7 +180,7 @@ class CertificateChecker: headers={'User-Agent': 'DNS-Recon-Tool/1.0'} ) - logger.debug(f"crt.sh API response for {query}: {response.status_code}") + logger.debug(f"πŸ“‘ crt.sh API response for {query}: {response.status_code}") if response.status_code == 200: try: @@ -164,16 +193,18 @@ class CertificateChecker: not_after = self._parse_date(cert_data.get('not_after')) if not_before and not_after: + # Create Certificate with forensic metadata certificate = Certificate( id=cert_data.get('id'), issuer=cert_data.get('issuer_name', ''), subject=cert_data.get('name_value', ''), not_before=not_before, not_after=not_after, - is_wildcard='*.' in cert_data.get('name_value', '') + is_wildcard='*.' in cert_data.get('name_value', ''), + operation_id=operation_id # Forensic tracking ) certificates.append(certificate) - logger.debug(f"Parsed certificate ID {certificate.id} for {query}") + logger.debug(f"πŸ“‹ Parsed certificate ID {certificate.id} for {query}") else: logger.debug(f"Skipped certificate with invalid dates: {cert_data.get('id')}") @@ -182,7 +213,7 @@ class CertificateChecker: continue self.connection_failures = 0 - logger.info(f"Successfully processed {len(certificates)} certificates from crt.sh for {query}") + logger.info(f"βœ… Successfully processed {len(certificates)} certificates from crt.sh for {query}") return certificates except json.JSONDecodeError as e: @@ -193,19 +224,19 @@ class CertificateChecker: return certificates elif response.status_code == 404: - logger.debug(f"No certificates found for {query} (404)") + logger.debug(f"ℹ️ No certificates found for {query} (404)") self.connection_failures = 0 return certificates elif response.status_code == 429: - logger.warning(f"crt.sh rate limit exceeded for {query}") + logger.warning(f"⚠️ crt.sh rate limit exceeded for {query}") if attempt < max_retries - 1: time.sleep(5) continue return certificates else: - logger.warning(f"crt.sh HTTP error for {query}: {response.status_code}") + logger.warning(f"⚠️ crt.sh HTTP error for {query}: {response.status_code}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue @@ -213,7 +244,7 @@ class CertificateChecker: except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: error_type = "Connection Error" if isinstance(e, requests.exceptions.ConnectionError) else "Timeout" - logger.warning(f"crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}") + logger.warning(f"⚠️ crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}") if isinstance(e, requests.exceptions.ConnectionError): self.connection_failures += 1 @@ -223,17 +254,17 @@ class CertificateChecker: continue except requests.exceptions.RequestException as e: - logger.warning(f"crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}") + logger.warning(f"🌐 crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue except Exception as e: - logger.error(f"Unexpected error querying crt.sh for {query}: {e}") + logger.error(f"❌ Unexpected error querying crt.sh for {query}: {e}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue - logger.warning(f"All {max_retries} attempts failed for crt.sh query: {query}") + logger.warning(f"⚠️ All {max_retries} attempts failed for crt.sh query: {query}") return certificates def _parse_date(self, date_str: str) -> Optional[datetime]: @@ -264,10 +295,10 @@ class CertificateChecker: return None def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]: - """Extract subdomains from certificate subjects - EXACTLY the same as original.""" + """Extract subdomains from certificate subjects.""" subdomains = set() - logger.debug(f"Extracting subdomains from {len(certificates)} certificates") + logger.debug(f"🌿 Extracting subdomains from {len(certificates)} certificates") for cert in certificates: # Parse subject field for domain names @@ -278,22 +309,22 @@ class CertificateChecker: # Skip wildcard domains for recursion (they don't resolve directly) if line.startswith('*.'): - logger.debug(f"Skipping wildcard domain: {line}") + logger.debug(f"⭐ Skipping wildcard domain: {line}") continue if self._is_valid_domain(line): subdomains.add(line.lower()) - logger.debug(f"Found subdomain from certificate: {line}") + logger.debug(f"🌿 Found subdomain from certificate: {line}") if subdomains: - logger.info(f"Extracted {len(subdomains)} subdomains from certificates") + logger.info(f"🌿 Extracted {len(subdomains)} subdomains from certificates") else: - logger.debug("No subdomains extracted from certificates") + logger.debug("ℹ️ No subdomains extracted from certificates") return subdomains def _is_valid_domain(self, domain: str) -> bool: - """Basic domain validation - EXACTLY the same as original.""" + """Basic domain validation.""" if not domain or '.' not in domain: return False diff --git a/src/data_structures.py b/src/data_structures.py index a436942..4bb9168 100644 --- a/src/data_structures.py +++ b/src/data_structures.py @@ -1,38 +1,85 @@ # File: src/data_structures.py -"""Data structures for storing reconnaissance results.""" +"""Enhanced data structures for forensic DNS reconnaissance with full provenance tracking.""" from dataclasses import dataclass, field -from typing import Dict, List, Set, Optional, Any +from typing import Dict, List, Set, Optional, Any, Tuple from datetime import datetime +from enum import Enum import json import logging +import uuid # Set up logging for this module logger = logging.getLogger(__name__) +class OperationType(Enum): + """Types of discovery operations.""" + INITIAL_TARGET = "initial_target" + TLD_EXPANSION = "tld_expansion" + DNS_A = "dns_a" + DNS_AAAA = "dns_aaaa" + DNS_MX = "dns_mx" + DNS_NS = "dns_ns" + DNS_TXT = "dns_txt" + DNS_CNAME = "dns_cname" + DNS_SOA = "dns_soa" + DNS_PTR = "dns_ptr" + DNS_SRV = "dns_srv" + DNS_CAA = "dns_caa" + DNS_DNSKEY = "dns_dnskey" + DNS_DS = "dns_ds" + DNS_RRSIG = "dns_rrsig" + DNS_NSEC = "dns_nsec" + DNS_NSEC3 = "dns_nsec3" + DNS_REVERSE = "dns_reverse" + CERTIFICATE_CHECK = "certificate_check" + SHODAN_LOOKUP = "shodan_lookup" + VIRUSTOTAL_IP = "virustotal_ip" + VIRUSTOTAL_DOMAIN = "virustotal_domain" + +class DiscoveryMethod(Enum): + """How a hostname was discovered.""" + INITIAL_TARGET = "initial_target" + TLD_EXPANSION = "tld_expansion" + DNS_RECORD_VALUE = "dns_record_value" + CERTIFICATE_SUBJECT = "certificate_subject" + DNS_SUBDOMAIN_EXTRACTION = "dns_subdomain_extraction" + @dataclass class DNSRecord: - """DNS record information.""" + """DNS record information with enhanced metadata.""" record_type: str value: str ttl: Optional[int] = None + discovered_at: datetime = field(default_factory=datetime.now) + operation_id: str = field(default_factory=lambda: str(uuid.uuid4())) def to_dict(self) -> dict: return { 'record_type': self.record_type, 'value': self.value, - 'ttl': self.ttl + 'ttl': self.ttl, + 'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None, + 'operation_id': self.operation_id } @dataclass class Certificate: - """Certificate information from crt.sh.""" + """Certificate information with enhanced metadata.""" id: int issuer: str subject: str not_before: datetime not_after: datetime is_wildcard: bool = False + discovered_at: datetime = field(default_factory=datetime.now) + operation_id: str = field(default_factory=lambda: str(uuid.uuid4())) + is_valid_now: bool = field(default=True) # Based on current timestamp vs cert validity + + def __post_init__(self): + """Calculate if certificate is currently valid.""" + now = datetime.now() + self.is_valid_now = self.not_before <= now <= self.not_after def to_dict(self) -> dict: return { @@ -41,17 +88,22 @@ class Certificate: 'subject': self.subject, 'not_before': self.not_before.isoformat() if self.not_before else None, 'not_after': self.not_after.isoformat() if self.not_after else None, - 'is_wildcard': self.is_wildcard + 'is_wildcard': self.is_wildcard, + 'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None, + 'operation_id': self.operation_id, + 'is_valid_now': self.is_valid_now } @dataclass class ShodanResult: - """Shodan scan result.""" + """Shodan scan result with metadata.""" ip: str ports: List[int] services: Dict[str, Any] organization: Optional[str] = None country: Optional[str] = None + discovered_at: datetime = field(default_factory=datetime.now) + operation_id: str = field(default_factory=lambda: str(uuid.uuid4())) def to_dict(self) -> dict: return { @@ -59,17 +111,21 @@ class ShodanResult: 'ports': self.ports, 'services': self.services, 'organization': self.organization, - 'country': self.country + 'country': self.country, + 'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None, + 'operation_id': self.operation_id } @dataclass class VirusTotalResult: - """VirusTotal scan result.""" + """VirusTotal scan result with metadata.""" resource: str # IP or domain positives: int total: int scan_date: datetime permalink: str + discovered_at: datetime = field(default_factory=datetime.now) + operation_id: str = field(default_factory=lambda: str(uuid.uuid4())) def to_dict(self) -> dict: return { @@ -77,118 +133,331 @@ class VirusTotalResult: 'positives': self.positives, 'total': self.total, 'scan_date': self.scan_date.isoformat() if self.scan_date else None, - 'permalink': self.permalink + 'permalink': self.permalink, + 'discovered_at': self.discovered_at.isoformat() if self.discovered_at else None, + 'operation_id': self.operation_id } @dataclass -class ReconData: - """Main data structure for reconnaissance results.""" +class DiscoveryOperation: + """A single discovery operation with complete metadata.""" + operation_id: str = field(default_factory=lambda: str(uuid.uuid4())) + operation_type: OperationType = OperationType.INITIAL_TARGET + target: str = "" + timestamp: datetime = field(default_factory=datetime.now) + success: bool = True + error_message: Optional[str] = None - # Core data - hostnames: Set[str] = field(default_factory=set) + # Results of the operation + dns_records: List[DNSRecord] = field(default_factory=list) + certificates: List[Certificate] = field(default_factory=list) + shodan_results: List[ShodanResult] = field(default_factory=list) + virustotal_results: List[VirusTotalResult] = field(default_factory=list) + discovered_hostnames: List[str] = field(default_factory=list) # New hostnames found + discovered_ips: List[str] = field(default_factory=list) # New IPs found + + # Operation-specific metadata + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + 'operation_id': self.operation_id, + 'operation_type': self.operation_type.value, + 'target': self.target, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'success': self.success, + 'error_message': self.error_message, + 'dns_records': [record.to_dict() for record in self.dns_records], + 'certificates': [cert.to_dict() for cert in self.certificates], + 'shodan_results': [result.to_dict() for result in self.shodan_results], + 'virustotal_results': [result.to_dict() for result in self.virustotal_results], + 'discovered_hostnames': self.discovered_hostnames, + 'discovered_ips': self.discovered_ips, + 'metadata': self.metadata + } + +@dataclass +class DiscoveryEdge: + """Represents a discovery relationship between two hostnames.""" + source_hostname: str # The hostname that led to the discovery + target_hostname: str # The hostname that was discovered + discovery_method: DiscoveryMethod + operation_id: str + timestamp: datetime + metadata: Dict[str, Any] = field(default_factory=dict) # Additional context + + def to_dict(self) -> dict: + return { + 'source_hostname': self.source_hostname, + 'target_hostname': self.target_hostname, + 'discovery_method': self.discovery_method.value, + 'operation_id': self.operation_id, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'metadata': self.metadata + } + +@dataclass +class DiscoveryNode: + """A discovered hostname with complete provenance and current state.""" + hostname: str + depth: int = 0 + first_seen: datetime = field(default_factory=datetime.now) + last_updated: datetime = field(default_factory=datetime.now) + + # Current validity state + dns_exists: Optional[bool] = None # Does this hostname resolve? + last_dns_check: Optional[datetime] = None + resolved_ips: Set[str] = field(default_factory=set) + + # Discovery provenance - can have multiple discovery paths + discovery_methods: Set[DiscoveryMethod] = field(default_factory=set) + discovered_by_operations: Set[str] = field(default_factory=set) # operation_ids + + # Associated data (current state) + dns_records_by_type: Dict[str, List[DNSRecord]] = field(default_factory=dict) + certificates: List[Certificate] = field(default_factory=list) + shodan_results: List[ShodanResult] = field(default_factory=list) + virustotal_results: List[VirusTotalResult] = field(default_factory=list) + reverse_dns: Optional[str] = None + + def add_dns_record(self, record: DNSRecord) -> None: + """Add a DNS record to this node.""" + record_type = record.record_type + if record_type not in self.dns_records_by_type: + self.dns_records_by_type[record_type] = [] + self.dns_records_by_type[record_type].append(record) + self.last_updated = datetime.now() + + # Update resolved IPs if this is an A or AAAA record + if record_type in ['A', 'AAAA']: + self.resolved_ips.add(record.value) + self.dns_exists = True + self.last_dns_check = record.discovered_at + + def add_certificate(self, certificate: Certificate) -> None: + """Add a certificate to this node.""" + self.certificates.append(certificate) + self.last_updated = datetime.now() + + def get_all_dns_records(self) -> List[DNSRecord]: + """Get all DNS records for this node.""" + all_records = [] + for records in self.dns_records_by_type.values(): + all_records.extend(records) + return all_records + + def get_current_certificates(self) -> List[Certificate]: + """Get currently valid certificates.""" + return [cert for cert in self.certificates if cert.is_valid_now] + + def get_expired_certificates(self) -> List[Certificate]: + """Get expired certificates.""" + return [cert for cert in self.certificates if not cert.is_valid_now] + + def to_dict(self) -> dict: + return { + 'hostname': self.hostname, + 'depth': self.depth, + 'first_seen': self.first_seen.isoformat() if self.first_seen else None, + 'last_updated': self.last_updated.isoformat() if self.last_updated else None, + 'dns_exists': self.dns_exists, + 'last_dns_check': self.last_dns_check.isoformat() if self.last_dns_check else None, + 'resolved_ips': sorted(list(self.resolved_ips)), + 'discovery_methods': [method.value for method in self.discovery_methods], + 'discovered_by_operations': sorted(list(self.discovered_by_operations)), + 'dns_records_by_type': { + record_type: [record.to_dict() for record in records] + for record_type, records in self.dns_records_by_type.items() + }, + 'certificates': [cert.to_dict() for cert in self.certificates], + 'current_certificates': [cert.to_dict() for cert in self.get_current_certificates()], + 'expired_certificates': [cert.to_dict() for cert in self.get_expired_certificates()], + 'shodan_results': [result.to_dict() for result in self.shodan_results], + 'virustotal_results': [result.to_dict() for result in self.virustotal_results], + 'reverse_dns': self.reverse_dns + } + +@dataclass +class ForensicReconData: + """Enhanced reconnaissance data with full forensic tracking and graph structure.""" + + # Core graph structure + nodes: Dict[str, DiscoveryNode] = field(default_factory=dict) # hostname -> node + edges: List[DiscoveryEdge] = field(default_factory=list) + operations: Dict[str, DiscoveryOperation] = field(default_factory=dict) # operation_id -> operation + + # Quick lookup indexes ip_addresses: Set[str] = field(default_factory=set) - - # DNS information - dns_records: Dict[str, List[DNSRecord]] = field(default_factory=dict) - reverse_dns: Dict[str, str] = field(default_factory=dict) - - # Certificate information - certificates: Dict[str, List[Certificate]] = field(default_factory=dict) - - # External service results - shodan_results: Dict[str, ShodanResult] = field(default_factory=dict) - virustotal_results: Dict[str, VirusTotalResult] = field(default_factory=dict) + operation_timeline: List[str] = field(default_factory=list) # ordered operation_ids # Metadata start_time: datetime = field(default_factory=datetime.now) end_time: Optional[datetime] = None - depth_map: Dict[str, int] = field(default_factory=dict) # Track recursion depth + scan_config: Dict[str, Any] = field(default_factory=dict) - def add_hostname(self, hostname: str, depth: int = 0) -> None: - """Add a hostname to the dataset.""" + def add_node(self, hostname: str, depth: int = 0, + discovery_method: DiscoveryMethod = DiscoveryMethod.INITIAL_TARGET, + operation_id: Optional[str] = None) -> DiscoveryNode: + """Add or get a discovery node.""" hostname = hostname.lower() - self.hostnames.add(hostname) - self.depth_map[hostname] = depth - logger.info(f"Added hostname: {hostname} (depth: {depth})") + + if hostname not in self.nodes: + node = DiscoveryNode(hostname=hostname, depth=depth) + self.nodes[hostname] = node + logger.debug(f"Created new node: {hostname} at depth {depth}") + else: + node = self.nodes[hostname] + # Update depth if this is a shallower discovery + if depth < node.depth: + node.depth = depth + logger.debug(f"Updated node {hostname} depth: {node.depth} -> {depth}") + + # Track discovery method and operation + node.discovery_methods.add(discovery_method) + if operation_id: + node.discovered_by_operations.add(operation_id) + + return node - def add_ip_address(self, ip: str) -> None: - """Add an IP address to the dataset.""" - self.ip_addresses.add(ip) - logger.info(f"Added IP address: {ip}") + def add_edge(self, source: str, target: str, discovery_method: DiscoveryMethod, + operation_id: str, metadata: Dict[str, Any] = None) -> None: + """Add a discovery edge.""" + edge = DiscoveryEdge( + source_hostname=source.lower(), + target_hostname=target.lower(), + discovery_method=discovery_method, + operation_id=operation_id, + timestamp=datetime.now(), + metadata=metadata or {} + ) + self.edges.append(edge) + logger.debug(f"Added edge: {source} -> {target} via {discovery_method.value}") - def add_dns_record(self, hostname: str, record: DNSRecord) -> None: - """Add a DNS record for a hostname.""" + def add_operation(self, operation: DiscoveryOperation) -> None: + """Add an operation to the timeline.""" + self.operations[operation.operation_id] = operation + self.operation_timeline.append(operation.operation_id) + logger.debug(f"Added operation: {operation.operation_type.value} on {operation.target}") + + def get_node(self, hostname: str) -> Optional[DiscoveryNode]: + """Get a node by hostname.""" + return self.nodes.get(hostname.lower()) + + def get_children(self, hostname: str) -> List[str]: + """Get all hostnames discovered from this hostname.""" hostname = hostname.lower() - if hostname not in self.dns_records: - self.dns_records[hostname] = [] - self.dns_records[hostname].append(record) - logger.debug(f"Added DNS record for {hostname}: {record.record_type} -> {record.value}") + children = [] + for edge in self.edges: + if edge.source_hostname == hostname: + children.append(edge.target_hostname) + return children - def add_shodan_result(self, ip: str, result: ShodanResult) -> None: - """Add Shodan result.""" - self.shodan_results[ip] = result - logger.info(f"Added Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}") + def get_parents(self, hostname: str) -> List[str]: + """Get all hostnames that led to discovering this hostname.""" + hostname = hostname.lower() + parents = [] + for edge in self.edges: + if edge.target_hostname == hostname: + parents.append(edge.source_hostname) + return parents - def add_virustotal_result(self, resource: str, result: VirusTotalResult) -> None: - """Add VirusTotal result.""" - self.virustotal_results[resource] = result - logger.info(f"Added VirusTotal result for {resource}: {result.positives}/{result.total} detections") - - def get_new_subdomains(self, max_depth: int) -> Set[str]: - """Get subdomains that haven't been processed yet and are within depth limit.""" - new_domains = set() - for hostname in self.hostnames: - if (hostname not in self.dns_records and - self.depth_map.get(hostname, 0) < max_depth): - new_domains.add(hostname) - return new_domains + def get_discovery_path(self, hostname: str) -> List[Tuple[str, str, DiscoveryMethod]]: + """Get the discovery path(s) to a hostname.""" + hostname = hostname.lower() + paths = [] + + def trace_path(current: str, visited: Set[str], current_path: List): + if current in visited: + return # Avoid cycles + + visited.add(current) + parents = self.get_parents(current) + + if not parents: + # This is a root node, we have a complete path + paths.append(current_path.copy()) + else: + for parent in parents: + # Find the edge that connects parent to current + for edge in self.edges: + if edge.source_hostname == parent and edge.target_hostname == current: + new_path = [(parent, current, edge.discovery_method)] + current_path + trace_path(parent, visited.copy(), new_path) + + trace_path(hostname, set(), []) + return paths def get_stats(self) -> Dict[str, int]: """Get current statistics.""" + total_dns_records = sum(len(node.get_all_dns_records()) for node in self.nodes.values()) + total_certificates = sum(len(node.certificates) for node in self.nodes.values()) + current_certificates = sum(len(node.get_current_certificates()) for node in self.nodes.values()) + expired_certificates = sum(len(node.get_expired_certificates()) for node in self.nodes.values()) + total_shodan = sum(len(node.shodan_results) for node in self.nodes.values()) + total_virustotal = sum(len(node.virustotal_results) for node in self.nodes.values()) + return { - 'hostnames': len(self.hostnames), + 'hostnames': len(self.nodes), 'ip_addresses': len(self.ip_addresses), - 'dns_records': sum(len(records) for records in self.dns_records.values()), - 'certificates': sum(len(certs) for certs in self.certificates.values()), - 'shodan_results': len(self.shodan_results), - 'virustotal_results': len(self.virustotal_results) + 'discovery_edges': len(self.edges), + 'operations_performed': len(self.operations), + 'dns_records': total_dns_records, + 'certificates_total': total_certificates, + 'certificates_current': current_certificates, + 'certificates_expired': expired_certificates, + 'shodan_results': total_shodan, + 'virustotal_results': total_virustotal } def to_dict(self) -> dict: """Export data as a serializable dictionary.""" - logger.debug(f"Serializing ReconData with stats: {self.get_stats()}") + logger.info(f"Serializing ForensicReconData with {len(self.nodes)} nodes, {len(self.edges)} edges, {len(self.operations)} operations") - result = { - 'hostnames': sorted(list(self.hostnames)), + return { + 'nodes': {hostname: node.to_dict() for hostname, node in self.nodes.items()}, + 'edges': [edge.to_dict() for edge in self.edges], + 'operations': {op_id: op.to_dict() for op_id, op in self.operations.items()}, + 'operation_timeline': self.operation_timeline, 'ip_addresses': sorted(list(self.ip_addresses)), - 'dns_records': { - host: [record.to_dict() for record in records] - for host, records in self.dns_records.items() - }, - 'reverse_dns': dict(self.reverse_dns), - 'certificates': { - host: [cert.to_dict() for cert in certs] - for host, certs in self.certificates.items() - }, - 'shodan_results': { - ip: result.to_dict() for ip, result in self.shodan_results.items() - }, - 'virustotal_results': { - resource: result.to_dict() for resource, result in self.virustotal_results.items() - }, - 'depth_map': dict(self.depth_map), 'metadata': { 'start_time': self.start_time.isoformat() if self.start_time else None, 'end_time': self.end_time.isoformat() if self.end_time else None, + 'scan_config': self.scan_config, 'stats': self.get_stats() - } + }, + 'graph_analysis': self._generate_graph_analysis() } + + def _generate_graph_analysis(self) -> Dict[str, Any]: + """Generate graph analysis metadata.""" + # Depth distribution + depth_distribution = {} + for node in self.nodes.values(): + depth = node.depth + depth_distribution[depth] = depth_distribution.get(depth, 0) + 1 - logger.info(f"Serialized data contains: {len(result['hostnames'])} hostnames, " - f"{len(result['ip_addresses'])} IPs, {len(result['shodan_results'])} Shodan results, " - f"{len(result['virustotal_results'])} VirusTotal results") + # Root nodes (no parents) + root_nodes = [hostname for hostname in self.nodes.keys() + if not self.get_parents(hostname)] - return result + # Leaf nodes (no children) + leaf_nodes = [hostname for hostname in self.nodes.keys() + if not self.get_children(hostname)] + + # Discovery method distribution + method_distribution = {} + for edge in self.edges: + method = edge.discovery_method.value + method_distribution[method] = method_distribution.get(method, 0) + 1 + + return { + 'depth_distribution': depth_distribution, + 'max_depth': max(depth_distribution.keys()) if depth_distribution else 0, + 'root_nodes': root_nodes, + 'leaf_nodes': leaf_nodes, + 'discovery_method_distribution': method_distribution, + 'total_discovery_paths': len(self.edges) + } def to_json(self) -> str: """Export data as JSON.""" @@ -196,9 +465,11 @@ class ReconData: return json.dumps(self.to_dict(), indent=2, ensure_ascii=False) except Exception as e: logger.error(f"Failed to serialize to JSON: {e}") - # Return minimal JSON in case of error return json.dumps({ 'error': str(e), 'stats': self.get_stats(), 'timestamp': datetime.now().isoformat() - }, indent=2) \ No newline at end of file + }, indent=2) + +# Backward compatibility aliases (for gradual migration) +ReconData = ForensicReconData \ No newline at end of file diff --git a/src/dns_resolver.py b/src/dns_resolver.py index 3d2bd27..d698ab8 100644 --- a/src/dns_resolver.py +++ b/src/dns_resolver.py @@ -1,5 +1,5 @@ # File: src/dns_resolver.py -"""DNS resolution functionality with enhanced TLD testing.""" +"""DNS resolution functionality with enhanced TLD testing and forensic operation tracking.""" import dns.resolver import dns.reversename @@ -9,6 +9,7 @@ from typing import List, Dict, Optional, Set import socket import time import logging +import uuid from .data_structures import DNSRecord, ReconData from .config import Config @@ -16,7 +17,7 @@ from .config import Config logger = logging.getLogger(__name__) class DNSResolver: - """DNS resolution and record lookup with optimized TLD testing.""" + """DNS resolution and record lookup with optimized TLD testing and forensic tracking.""" # All DNS record types to query RECORD_TYPES = [ @@ -80,7 +81,7 @@ class DNSResolver: return ips - def resolve_hostname(self, hostname: str) -> List[str]: + def resolve_hostname(self, hostname: str, operation_id: Optional[str] = None) -> List[str]: """Resolve hostname to IP addresses (full resolution with retries).""" ips = [] @@ -126,12 +127,16 @@ class DNSResolver: return unique_ips - def get_all_dns_records(self, hostname: str) -> List[DNSRecord]: - """Get all DNS records for a hostname.""" + def get_all_dns_records(self, hostname: str, operation_id: Optional[str] = None) -> List[DNSRecord]: + """Get all DNS records for a hostname with forensic tracking.""" records = [] successful_queries = 0 - logger.debug(f"πŸ“‹ Getting all DNS records for: {hostname}") + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ“‹ Getting all DNS records for: {hostname} (operation: {operation_id})") for record_type in self.RECORD_TYPES: type_found = False @@ -145,11 +150,15 @@ class DNSResolver: try: answers = resolver.resolve(hostname, record_type) for answer in answers: - records.append(DNSRecord( + # Create DNSRecord with forensic metadata + record = DNSRecord( record_type=record_type, value=str(answer), - ttl=answers.ttl - )) + ttl=answers.ttl, + operation_id=operation_id # Forensic tracking + ) + records.append(record) + if not type_found: logger.debug(f"βœ… Found {record_type} record for {hostname}: {answer}") type_found = True @@ -179,9 +188,56 @@ class DNSResolver: return records - def reverse_dns_lookup(self, ip: str) -> Optional[str]: - """Perform reverse DNS lookup.""" - logger.debug(f"πŸ” Reverse DNS lookup for: {ip}") + def query_specific_record_type(self, hostname: str, record_type: str, operation_id: Optional[str] = None) -> List[DNSRecord]: + """Query a specific DNS record type with forensic tracking.""" + records = [] + + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"🎯 Querying {record_type} records for {hostname} (operation: {operation_id})") + + for dns_server in self.config.DNS_SERVERS: + self._rate_limit() + resolver = dns.resolver.Resolver() + resolver.nameservers = [dns_server] + resolver.timeout = self.config.DNS_TIMEOUT + + try: + answers = resolver.resolve(hostname, record_type) + for answer in answers: + # Create DNSRecord with forensic metadata + record = DNSRecord( + record_type=record_type, + value=str(answer), + ttl=answers.ttl, + operation_id=operation_id # Forensic tracking + ) + records.append(record) + logger.debug(f"βœ… {record_type} record for {hostname}: {answer}") + + break # Found records, no need to query other DNS servers + + except dns.resolver.NXDOMAIN: + logger.debug(f"❌ NXDOMAIN for {hostname} {record_type} on {dns_server}") + break # Domain doesn't exist, no point checking other servers + except dns.resolver.NoAnswer: + logger.debug(f"⚠️ No {record_type} record for {hostname} on {dns_server}") + continue # Try next DNS server + except dns.resolver.Timeout: + logger.debug(f"⏱️ Timeout for {hostname} {record_type} on {dns_server}") + continue # Try next DNS server + except Exception as e: + logger.debug(f"⚠️ Error querying {record_type} for {hostname} on {dns_server}: {e}") + continue # Try next DNS server + + logger.debug(f"🎯 Found {len(records)} {record_type} records for {hostname}") + return records + + def reverse_dns_lookup(self, ip: str, operation_id: Optional[str] = None) -> Optional[str]: + """Perform reverse DNS lookup with forensic tracking.""" + logger.debug(f"πŸ” Reverse DNS lookup for: {ip} (operation: {operation_id or 'auto'})") try: self._rate_limit() diff --git a/src/main.py b/src/main.py index 299a696..221139d 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,5 @@ # File: src/main.py -"""Main CLI interface for the reconnaissance tool with two-mode operation.""" +"""Main CLI interface for the forensic reconnaissance tool with two-mode operation.""" import click import json @@ -7,8 +7,8 @@ import sys import logging from pathlib import Path from .config import Config -from .reconnaissance import ReconnaissanceEngine -from .report_generator import ReportGenerator +from .reconnaissance import ForensicReconnaissanceEngine +from .report_generator import ForensicReportGenerator from .web_app import create_app # Module logger @@ -27,7 +27,7 @@ logger = logging.getLogger(__name__) @click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging (DEBUG level)') @click.option('--quiet', '-q', is_flag=True, help='Quiet mode (WARNING level only)') def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, text_only, port, verbose, quiet): - """DNS Reconnaissance Tool - Two-Mode Operation + """Forensic DNS Reconnaissance Tool - Two-Mode Operation MODE 1 - Hostname-only (e.g., 'cc24'): Expands hostname to all TLDs (cc24.com, cc24.net, etc.) @@ -38,13 +38,14 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, Full recursive reconnaissance with subdomain discovery Maps complete infrastructure of the specified domain Uses max-depth for recursive enumeration + Creates forensic-grade evidence chain with operation tracking Examples: recon cc24 # Mode 1: Find all cc24.* domains (no recursion) - recon cc24.com # Mode 2: Map cc24.com infrastructure (with recursion) + recon cc24.com # Mode 2: Map cc24.com infrastructure (with forensic tracking) recon cc24.com --max-depth 3 # Mode 2: Deeper recursive enumeration recon cc24 -v # Mode 1: Verbose TLD expansion - recon --web # Start web interface + recon --web # Start web interface with forensic visualization """ # Determine log level @@ -61,19 +62,19 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, if web: # Start web interface - logger.info("Starting web interface...") + logger.info("🌐 Starting forensic web interface...") app = create_app(config) - logger.info(f"Web interface starting on http://0.0.0.0:{port}") - app.run(host='0.0.0.0', port=port, debug=False) # Changed debug to False to reduce noise + logger.info(f"πŸš€ Forensic web interface starting on http://0.0.0.0:{port}") + app.run(host='0.0.0.0', port=port, debug=False) return if not target: click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.") sys.exit(1) - # Initialize reconnaissance engine - logger.info("Initializing reconnaissance engine...") - engine = ReconnaissanceEngine(config) + # Initialize forensic reconnaissance engine + logger.info("πŸ”¬ Initializing forensic reconnaissance engine...") + engine = ForensicReconnaissanceEngine(config) # Set up progress callback for CLI def progress_callback(message, percentage=None): @@ -85,53 +86,58 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, engine.set_progress_callback(progress_callback) # Display startup information - click.echo("=" * 60) - click.echo("DNS RECONNAISSANCE TOOL") - click.echo("=" * 60) + click.echo("=" * 80) + click.echo("FORENSIC DNS RECONNAISSANCE TOOL") + click.echo("=" * 80) click.echo(f"Target: {target}") # Show operation mode if '.' in target: click.echo(f"Mode: Full domain reconnaissance (recursive depth: {max_depth})") - click.echo(" β†’ Will map complete infrastructure of the specified domain") + click.echo(" β†’ Will map complete infrastructure with forensic tracking") + click.echo(" β†’ Creates evidence chain for each discovery operation") else: click.echo(f"Mode: Hostname-only reconnaissance (TLD expansion)") click.echo(" β†’ Will find all domains using this hostname (no recursion)") + click.echo(" β†’ Limited forensic tracking for efficiency") click.echo(f"DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}") click.echo(f"DNS rate limit: {config.DNS_RATE_LIMIT}/s") if shodan_key: - click.echo("Shodan integration enabled") - logger.info(f"Shodan API key provided (ends with: ...{shodan_key[-4:] if len(shodan_key) > 4 else shodan_key})") + click.echo("πŸ•΅οΈ Shodan integration enabled") else: click.echo("Shodan integration disabled (no API key)") if virustotal_key: - click.echo("VirusTotal integration enabled") - logger.info(f"VirusTotal API key provided (ends with: ...{virustotal_key[-4:] if len(virustotal_key) > 4 else virustotal_key})") + click.echo("πŸ›‘οΈ VirusTotal integration enabled") else: click.echo("VirusTotal integration disabled (no API key)") click.echo("") - # Run reconnaissance + # Run forensic reconnaissance try: - logger.info(f"Starting reconnaissance for target: {target}") + logger.info(f"🎯 Starting forensic reconnaissance for target: {target}") data = engine.run_reconnaissance(target) # Display final statistics stats = data.get_stats() + graph_analysis = data._generate_graph_analysis() + click.echo("") - click.echo("=" * 60) - click.echo("RECONNAISSANCE COMPLETE") - click.echo("=" * 60) + click.echo("=" * 80) + click.echo("FORENSIC RECONNAISSANCE COMPLETE") + click.echo("=" * 80) click.echo(f"Hostnames discovered: {stats['hostnames']}") click.echo(f"IP addresses found: {stats['ip_addresses']}") + click.echo(f"Discovery relationships: {stats['discovery_edges']}") + click.echo(f"Operations performed: {stats['operations_performed']}") click.echo(f"DNS records collected: {stats['dns_records']}") - click.echo(f"Certificates found: {stats['certificates']}") + click.echo(f"Certificates found: {stats['certificates_total']} ({stats['certificates_current']} valid, {stats['certificates_expired']} expired)") click.echo(f"Shodan results: {stats['shodan_results']}") click.echo(f"VirusTotal results: {stats['virustotal_results']}") + click.echo(f"Maximum discovery depth: {graph_analysis['max_depth']}") # Calculate and display timing if data.end_time and data.start_time: @@ -140,9 +146,9 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, click.echo("") - # Generate reports - logger.info("Generating reports...") - report_gen = ReportGenerator(data) + # Generate forensic reports + logger.info("πŸ“„ Generating forensic reports...") + report_gen = ForensicReportGenerator(data) if output: # Save to files @@ -155,9 +161,9 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, with open(json_file, 'w', encoding='utf-8') as f: f.write(json_content) saved_files.append(json_file) - logger.info(f"JSON report saved: {json_file}") + logger.info(f"πŸ“„ Forensic JSON report saved: {json_file}") except Exception as e: - logger.error(f"Failed to save JSON report: {e}") + logger.error(f"❌ Failed to save JSON report: {e}") if not json_only: text_file = f"{output}.txt" @@ -165,12 +171,12 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, with open(text_file, 'w', encoding='utf-8') as f: f.write(report_gen.generate_text_report()) saved_files.append(text_file) - logger.info(f"Text report saved: {text_file}") + logger.info(f"πŸ“„ Forensic text report saved: {text_file}") except Exception as e: - logger.error(f"Failed to save text report: {e}") + logger.error(f"❌ Failed to save text report: {e}") if saved_files: - click.echo(f"Reports saved:") + click.echo(f"πŸ“ Forensic reports saved:") for file in saved_files: click.echo(f" {file}") @@ -180,31 +186,32 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, try: click.echo(data.to_json()) except Exception as e: - logger.error(f"Failed to generate JSON output: {e}") + logger.error(f"❌ Failed to generate JSON output: {e}") click.echo(f"Error generating JSON: {e}") elif text_only: try: click.echo(report_gen.generate_text_report()) except Exception as e: - logger.error(f"Failed to generate text report: {e}") + logger.error(f"❌ Failed to generate text report: {e}") click.echo(f"Error generating text report: {e}") else: # Default: show text report try: click.echo(report_gen.generate_text_report()) - click.echo(f"\nTo get JSON output, use: --json-only") - click.echo(f"To save reports, use: --output filename") + click.echo(f"\nπŸ“Š To get JSON output with full forensic data, use: --json-only") + click.echo(f"πŸ’Ύ To save reports, use: --output filename") + click.echo(f"🌐 For interactive visualization, use: --web") except Exception as e: - logger.error(f"Failed to generate report: {e}") + logger.error(f"❌ Failed to generate report: {e}") click.echo(f"Error generating report: {e}") except KeyboardInterrupt: - logger.warning("Reconnaissance interrupted by user") - click.echo("\nReconnaissance interrupted by user.") + logger.warning("⚠️ Forensic reconnaissance interrupted by user") + click.echo("\nπŸ›‘ Reconnaissance interrupted by user.") sys.exit(1) except Exception as e: - logger.error(f"Error during reconnaissance: {e}", exc_info=True) - click.echo(f"Error during reconnaissance: {e}") + logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True) + click.echo(f"❌ Error during reconnaissance: {e}") if verbose: raise # Re-raise in verbose mode to show full traceback sys.exit(1) diff --git a/src/reconnaissance.py b/src/reconnaissance.py index 3ff2948..3815902 100644 --- a/src/reconnaissance.py +++ b/src/reconnaissance.py @@ -1,12 +1,15 @@ # File: src/reconnaissance.py -"""Main reconnaissance logic with two-mode operation for hostname vs domain targets.""" +"""Enhanced reconnaissance logic with forensic-grade operation tracking and provenance.""" import threading import concurrent.futures import logging from datetime import datetime -from typing import Set, List, Optional, Tuple -from .data_structures import ReconData +from typing import Set, List, Optional, Tuple, Dict, Any +from .data_structures import ( + ForensicReconData, DiscoveryNode, DiscoveryOperation, DiscoveryEdge, + OperationType, DiscoveryMethod, DNSRecord, Certificate +) from .config import Config from .dns_resolver import DNSResolver from .certificate_checker import CertificateChecker @@ -17,8 +20,8 @@ from .tld_fetcher import TLDFetcher # Set up logging for this module logger = logging.getLogger(__name__) -class ReconnaissanceEngine: - """Main reconnaissance engine with two-mode operation: hostname-only vs full domain.""" +class ForensicReconnaissanceEngine: + """Enhanced reconnaissance engine with complete forensic tracking and provenance.""" def __init__(self, config: Config): self.config = config @@ -32,32 +35,32 @@ class ReconnaissanceEngine: self.shodan_client = None if config.shodan_key: self.shodan_client = ShodanClient(config.shodan_key, config) - logger.info("Shodan client initialized") - else: - logger.info("Shodan API key not provided, skipping Shodan integration") + logger.info("πŸ•΅οΈ Shodan client initialized") self.virustotal_client = None if config.virustotal_key: self.virustotal_client = VirusTotalClient(config.virustotal_key, config) - logger.info("VirusTotal client initialized") - else: - logger.info("VirusTotal API key not provided, skipping VirusTotal integration") + logger.info("πŸ›‘οΈ VirusTotal client initialized") # Progress tracking self.progress_callback = None self._lock = threading.Lock() - # Shared data object for live updates - self.shared_data = None + # Track operation mode + self.is_hostname_only_mode = False + + # Operation tracking + self.pending_operations = [] + self.completed_operations = [] def set_progress_callback(self, callback): """Set callback for progress updates.""" self.progress_callback = callback - def set_shared_data(self, shared_data: ReconData): - """Set shared data object for live updates during web interface usage.""" - self.shared_data = shared_data - logger.info("Using shared data object for live updates") + def set_shared_data(self, shared_data: ForensicReconData): + """Set shared data object for live updates.""" + self.data = shared_data + logger.info("πŸ“Š Using shared forensic data object for live updates") def _update_progress(self, message: str, percentage: int = None): """Update progress if callback is set.""" @@ -65,347 +68,603 @@ class ReconnaissanceEngine: if self.progress_callback: self.progress_callback(message, percentage) - def run_reconnaissance(self, target: str) -> ReconData: - """Run reconnaissance on target using appropriate mode based on input type.""" - # Use shared data object if available, otherwise create new one - if self.shared_data is not None: - self.data = self.shared_data - logger.info("Using shared data object for reconnaissance") - else: - self.data = ReconData() - logger.info("Created new data object for reconnaissance") - + def run_reconnaissance(self, target: str) -> ForensicReconData: + """Run forensic reconnaissance with complete operation tracking.""" + # Initialize data structure + if not hasattr(self, 'data') or self.data is None: + self.data = ForensicReconData() + logger.info("πŸ”¬ Created new forensic data structure") + self.data.start_time = datetime.now() + self.data.scan_config = { + 'target': target, + 'max_depth': self.config.max_depth, + 'dns_servers': self.config.DNS_SERVERS, + 'shodan_enabled': self.shodan_client is not None, + 'virustotal_enabled': self.virustotal_client is not None + } - logger.info(f"Starting reconnaissance for target: {target}") - logger.info(f"Configuration: max_depth={self.config.max_depth}, " - f"DNS_rate={self.config.DNS_RATE_LIMIT}/s") - - # Store original max_depth for potential restoration - original_max_depth = self.config.max_depth - reconnaissance_mode = "full_domain" if '.' in target else "hostname_only" + logger.info(f"🎯 Starting forensic reconnaissance for target: {target}") try: - # Determine operation mode based on target format + # Determine operation mode and create initial operation if '.' in target: - logger.info(f"Target '{target}' appears to be a full domain name") - logger.info(f"Mode: Full domain reconnaissance with recursive enumeration (max_depth={self.config.max_depth})") - self._update_progress(f"Starting reconnaissance for {target}", 0) - self.data.add_hostname(target, 0) - initial_targets = {target} + self.is_hostname_only_mode = False + logger.info(f"πŸ” Full domain mode: {target}") + self._create_initial_operation(target, OperationType.INITIAL_TARGET) else: - logger.info(f"Target '{target}' appears to be a hostname, expanding to all TLDs") - logger.info(f"Mode: Hostname-only reconnaissance - TLD expansion without recursion") - self._update_progress(f"Expanding {target} to all TLDs", 5) - initial_targets = self._expand_hostname_to_tlds_smart(target) - - # Override max_depth for hostname-only queries to prevent infrastructure noise - self.config.max_depth = 0 - logger.info(f"Found {len(initial_targets)} valid domains after TLD expansion") - logger.info(f"Set max_depth=0 for hostname-only reconnaissance (avoiding third-party infrastructure)") + self.is_hostname_only_mode = True + logger.info(f"πŸ” Hostname expansion mode: {target}") + self._perform_tld_expansion(target) - self._update_progress("Resolving initial targets", 10) + # Process all discovery operations + self._process_discovery_queue() - # Process all targets with appropriate recursion depth - self._process_targets_recursively(initial_targets) - - # Final external lookups - self._update_progress("Performing external service lookups", 90) - self._perform_external_lookups() - - # Log final statistics with reconnaissance mode - stats = self.data.get_stats() - logger.info(f"Final statistics ({reconnaissance_mode}): {stats}") - - if reconnaissance_mode == "hostname_only": - logger.info(f"Hostname-only reconnaissance complete: discovered {stats['hostnames']} domains using '{target}' hostname") - logger.info(f"To perform recursive enumeration on specific domains, run with full domain names (e.g., '{target}.com')") - else: - logger.info(f"Full domain reconnaissance complete with recursive depth {original_max_depth}") - - self._update_progress("Reconnaissance complete", 100) - - except Exception as e: - logger.error(f"Error during reconnaissance: {e}", exc_info=True) - raise - finally: - # Restore original max_depth (though this engine instance is typically discarded) - self.config.max_depth = original_max_depth + # Perform external lookups for full domain mode + if not self.is_hostname_only_mode: + self._perform_external_lookups() + # Finalize self.data.end_time = datetime.now() duration = self.data.end_time - self.data.start_time - logger.info(f"Total reconnaissance time: {duration}") + + stats = self.data.get_stats() + logger.info(f"🏁 Forensic reconnaissance complete in {duration}") + logger.info(f"πŸ“Š Final stats: {stats}") + + self._update_progress("Forensic reconnaissance complete", 100) + + except Exception as e: + logger.error(f"❌ Error during forensic reconnaissance: {e}", exc_info=True) + raise return self.data - def _expand_hostname_to_tlds_smart(self, hostname: str) -> Set[str]: - """Smart TLD expansion with prioritization and parallel processing.""" - logger.info(f"Starting smart TLD expansion for hostname: {hostname}") + def _create_initial_operation(self, target: str, operation_type: OperationType): + """Create initial operation for the target.""" + operation = DiscoveryOperation( + operation_type=operation_type, + target=target, + discovered_hostnames=[target] + ) + + self.data.add_operation(operation) + + # Create initial node + node = self.data.add_node( + hostname=target, + depth=0, + discovery_method=DiscoveryMethod.INITIAL_TARGET, + operation_id=operation.operation_id + ) + + # Queue initial DNS operations + self._queue_dns_operations_for_hostname(target, 0) + + logger.info(f"πŸ” Created initial operation for {target}") + + def _perform_tld_expansion(self, hostname: str): + """Perform TLD expansion with forensic tracking.""" + logger.info(f"🌐 Starting TLD expansion for: {hostname}") + + # Create TLD expansion operation + operation = DiscoveryOperation( + operation_type=OperationType.TLD_EXPANSION, + target=hostname, + metadata={'expansion_type': 'smart_prioritized'} + ) + + self._update_progress(f"Expanding {hostname} to all TLDs", 5) # Get prioritized TLD lists priority_tlds, normal_tlds, deprioritized_tlds = self.tld_fetcher.get_prioritized_tlds() - logger.info(f"TLD categories: {len(priority_tlds)} priority, " - f"{len(normal_tlds)} normal, {len(deprioritized_tlds)} deprioritized") - valid_domains = set() - # Phase 1: Check priority TLDs first (parallel processing) - logger.info("Phase 1: Checking priority TLDs...") - priority_results = self._check_tlds_parallel(hostname, priority_tlds, "priority") + # Phase 1: Priority TLDs + logger.info("🎯 Phase 1: Checking priority TLDs...") + priority_results = self._check_tlds_parallel(hostname, priority_tlds, operation.operation_id) valid_domains.update(priority_results) - self._update_progress(f"Phase 1 complete: {len(priority_results)} priority TLD matches", 6) - - # Phase 2: Check normal TLDs (if we found fewer than 5 results) + # Phase 2: Normal TLDs (if needed) if len(valid_domains) < 5: - logger.info("Phase 2: Checking normal TLDs...") - normal_results = self._check_tlds_parallel(hostname, normal_tlds, "normal") + logger.info("πŸ” Phase 2: Checking normal TLDs...") + normal_results = self._check_tlds_parallel(hostname, normal_tlds, operation.operation_id) valid_domains.update(normal_results) - - self._update_progress(f"Phase 2 complete: {len(normal_results)} normal TLD matches", 8) - else: - logger.info(f"Skipping normal TLDs (found {len(valid_domains)} matches in priority)") - # Phase 3: Check deprioritized TLDs only if we found very few results + # Phase 3: Deprioritized TLDs (if really needed) if len(valid_domains) < 2: - logger.info("Phase 3: Checking deprioritized TLDs (limited results so far)...") - depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, "deprioritized") + logger.info("πŸ”Ž Phase 3: Checking deprioritized TLDs...") + depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, operation.operation_id) valid_domains.update(depri_results) + + # Update operation with results + operation.discovered_hostnames = list(valid_domains) + operation.success = len(valid_domains) > 0 + self.data.add_operation(operation) + + # Create nodes for all discovered domains + for domain in valid_domains: + node = self.data.add_node( + hostname=domain, + depth=0, + discovery_method=DiscoveryMethod.TLD_EXPANSION, + operation_id=operation.operation_id + ) - self._update_progress(f"Phase 3 complete: {len(depri_results)} deprioritized TLD matches", 9) - else: - logger.info(f"Skipping deprioritized TLDs (found {len(valid_domains)} matches already)") + # Add discovery edge (synthetic source for TLD expansion) + self.data.add_edge( + source=f"tld_expansion:{hostname}", + target=domain, + discovery_method=DiscoveryMethod.TLD_EXPANSION, + operation_id=operation.operation_id, + metadata={'original_hostname': hostname} + ) - logger.info(f"Smart TLD expansion complete: found {len(valid_domains)} valid domains") - return valid_domains + logger.info(f"βœ… TLD expansion complete: {len(valid_domains)} domains found") + + # Queue lightweight operations for hostname-only mode + for domain in valid_domains: + self._queue_lightweight_operations(domain) - def _check_tlds_parallel(self, hostname: str, tlds: List[str], phase_name: str) -> Set[str]: - """Check TLDs in parallel with optimized settings.""" + def _check_tlds_parallel(self, hostname: str, tlds: List[str], operation_id: str) -> Set[str]: + """Check TLDs in parallel with operation tracking.""" valid_domains = set() - tested_count = 0 - - # Use thread pool for parallel processing - max_workers = min(20, len(tlds)) # Limit concurrent requests - - logger.info(f"Starting parallel check of {len(tlds)} {phase_name} TLDs " - f"with {max_workers} workers") + max_workers = min(20, len(tlds)) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit all tasks future_to_tld = { - executor.submit(self._check_single_tld, hostname, tld): tld + executor.submit(self._check_single_tld_forensic, hostname, tld, operation_id): tld for tld in tlds } - # Process results as they complete for future in concurrent.futures.as_completed(future_to_tld): tld = future_to_tld[future] - tested_count += 1 - try: - result = future.result(timeout=10) # 10 second timeout per future - + result = future.result(timeout=10) if result: - full_hostname, ips = result - - logger.info(f"Valid domain found: {full_hostname} -> {ips}") - self.data.add_hostname(full_hostname, 0) - valid_domains.add(full_hostname) + domain, ips = result + valid_domains.add(domain) + # Track discovered IPs for ip in ips: - self.data.add_ip_address(ip) - - # Progress update every 50 TLDs in this phase - if tested_count % 50 == 0: - logger.info(f"{phase_name.title()} phase progress: " - f"{tested_count}/{len(tlds)} tested, " - f"{len(valid_domains)} found") + self.data.ip_addresses.add(ip) - except concurrent.futures.TimeoutError: - logger.debug(f"Timeout checking {hostname}.{tld}") except Exception as e: logger.debug(f"Error checking {hostname}.{tld}: {e}") - logger.info(f"{phase_name.title()} phase complete: " - f"tested {tested_count} TLDs, found {len(valid_domains)} valid domains") - return valid_domains - def _check_single_tld(self, hostname: str, tld: str) -> Optional[Tuple[str, List[str]]]: - """Check a single TLD combination with optimized DNS resolution.""" + def _check_single_tld_forensic(self, hostname: str, tld: str, operation_id: str) -> Optional[Tuple[str, List[str]]]: + """Check single TLD with forensic tracking.""" full_hostname = f"{hostname}.{tld}" - # Use faster DNS resolution with shorter timeout for TLD testing + # Quick DNS resolution ips = self.dns_resolver.resolve_hostname_fast(full_hostname) if ips: - logger.debug(f"{full_hostname} -> {ips}") + # Create DNS operation record + dns_operation = DiscoveryOperation( + operation_type=OperationType.DNS_A, + target=full_hostname, + discovered_ips=ips, + metadata={'tld_expansion': True, 'parent_operation': operation_id} + ) + + # Add DNS records + for ip in ips: + record = DNSRecord( + record_type='A', + value=ip, + operation_id=dns_operation.operation_id + ) + dns_operation.dns_records.append(record) + + self.data.add_operation(dns_operation) return (full_hostname, ips) return None - def _process_targets_recursively(self, targets: Set[str]): - """Process targets with recursive subdomain discovery.""" - current_depth = 0 + def _queue_dns_operations_for_hostname(self, hostname: str, depth: int): + """Queue comprehensive DNS operations for a hostname.""" + # Map DNS record types to operation types + dns_operations = [ + (OperationType.DNS_A, 'A'), + (OperationType.DNS_AAAA, 'AAAA'), + (OperationType.DNS_MX, 'MX'), + (OperationType.DNS_NS, 'NS'), + (OperationType.DNS_TXT, 'TXT'), + (OperationType.DNS_CNAME, 'CNAME'), + (OperationType.DNS_SOA, 'SOA'), + (OperationType.DNS_SRV, 'SRV'), + (OperationType.DNS_CAA, 'CAA'), + ] - while current_depth <= self.config.max_depth and targets: - logger.info(f"Processing depth {current_depth} with {len(targets)} targets") - self._update_progress(f"Processing depth {current_depth} ({len(targets)} targets)", 15 + (current_depth * 25)) + for operation_type, record_type in dns_operations: + self.pending_operations.append({ + 'type': 'dns_query', + 'operation_type': operation_type, + 'record_type': record_type, + 'hostname': hostname, + 'depth': depth + }) + + # Queue certificate operation + self.pending_operations.append({ + 'type': 'certificate_check', + 'operation_type': OperationType.CERTIFICATE_CHECK, + 'hostname': hostname, + 'depth': depth + }) + + logger.debug(f"πŸ“‹ Queued {len(dns_operations) + 1} operations for {hostname}") + + def _queue_lightweight_operations(self, hostname: str): + """Queue lightweight operations for hostname-only mode.""" + # Only essential DNS operations + essential_operations = [ + (OperationType.DNS_A, 'A'), + (OperationType.DNS_AAAA, 'AAAA'), + (OperationType.DNS_MX, 'MX'), + (OperationType.DNS_TXT, 'TXT'), + ] + + for operation_type, record_type in essential_operations: + self.pending_operations.append({ + 'type': 'dns_query', + 'operation_type': operation_type, + 'record_type': record_type, + 'hostname': hostname, + 'depth': 0 + }) + + logger.debug(f"πŸ“‹ Queued {len(essential_operations)} lightweight operations for {hostname}") + + def _process_discovery_queue(self): + """Process all queued discovery operations.""" + operation_count = 0 + total_operations = len(self.pending_operations) + + logger.info(f"βš™οΈ Processing {total_operations} operations") + + while self.pending_operations: + operation_spec = self.pending_operations.pop(0) + operation_count += 1 - new_targets = set() + # Calculate progress + progress = 20 + (operation_count * 60 // max(total_operations, 1)) - for target in targets: - logger.debug(f"Processing target: {target}") + try: + if operation_spec['type'] == 'dns_query': + self._execute_dns_operation(operation_spec) + elif operation_spec['type'] == 'certificate_check': + self._execute_certificate_operation(operation_spec) - # DNS resolution and record gathering - self._process_single_target(target, current_depth) - - # Extract new subdomains - if current_depth < self.config.max_depth: - new_subdomains = self._extract_new_subdomains(target) - logger.debug(f"Found {len(new_subdomains)} new subdomains from {target}") + # Update progress periodically + if operation_count % 10 == 0: + self._update_progress(f"Processed {operation_count}/{total_operations} operations", progress) - for subdomain in new_subdomains: - self.data.add_hostname(subdomain, current_depth + 1) - new_targets.add(subdomain) - - logger.info(f"Depth {current_depth} complete. Found {len(new_targets)} new targets for next depth") - targets = new_targets - current_depth += 1 + except Exception as e: + logger.error(f"❌ Error processing operation {operation_spec}: {e}") + continue - logger.info(f"Recursive processing complete after {current_depth} levels") + logger.info(f"βœ… Completed processing {operation_count} operations") - def _process_single_target(self, hostname: str, depth: int): - """Process a single target hostname.""" - logger.debug(f"Processing single target: {hostname} at depth {depth}") + def _execute_dns_operation(self, operation_spec: Dict[str, Any]): + """Execute a single DNS operation with full tracking.""" + hostname = operation_spec['hostname'] + record_type = operation_spec['record_type'] + operation_type = operation_spec['operation_type'] + depth = operation_spec['depth'] - # Get all DNS records - dns_records = self.dns_resolver.get_all_dns_records(hostname) - logger.debug(f"Found {len(dns_records)} DNS records for {hostname}") + logger.debug(f"πŸ” DNS {record_type} query for {hostname}") - for record in dns_records: - self.data.add_dns_record(hostname, record) + # Create operation + operation = DiscoveryOperation( + operation_type=operation_type, + target=hostname, + metadata={'record_type': record_type, 'depth': depth} + ) + + try: + # Perform DNS query using the updated method with operation_id + records = self.dns_resolver.query_specific_record_type(hostname, record_type, operation.operation_id) + operation.dns_records = records + operation.success = len(records) > 0 - # Extract IP addresses from A and AAAA records - if record.record_type in ['A', 'AAAA']: - self.data.add_ip_address(record.value) + # Process results + node = self.data.get_node(hostname) + if not node: + node = self.data.add_node(hostname, depth) + + new_hostnames = set() + new_ips = set() + + for record in records: + node.add_dns_record(record) + + # Extract IPs and hostnames + if record.record_type in ['A', 'AAAA']: + new_ips.add(record.value) + self.data.ip_addresses.add(record.value) + elif record.record_type in ['MX', 'NS', 'CNAME']: + # Extract hostname from MX (format: "priority hostname") + if record.record_type == 'MX': + parts = record.value.split() + if len(parts) >= 2: + extracted_hostname = parts[-1].rstrip('.') + else: + continue + else: + extracted_hostname = record.value.rstrip('.') + + if self._is_valid_hostname(extracted_hostname) and extracted_hostname != hostname: + new_hostnames.add(extracted_hostname) + + # Update operation results + operation.discovered_hostnames = list(new_hostnames) + operation.discovered_ips = list(new_ips) + + # Add discovery edges and queue new operations + for new_hostname in new_hostnames: + # Add node + new_node = self.data.add_node( + hostname=new_hostname, + depth=depth + 1, + discovery_method=DiscoveryMethod.DNS_RECORD_VALUE, + operation_id=operation.operation_id + ) + + # Add edge + self.data.add_edge( + source=hostname, + target=new_hostname, + discovery_method=DiscoveryMethod.DNS_RECORD_VALUE, + operation_id=operation.operation_id, + metadata={'record_type': record_type} + ) + + # Queue operations for new hostname (if not in hostname-only mode and within depth) + if not self.is_hostname_only_mode and depth + 1 <= self.config.max_depth: + self._queue_dns_operations_for_hostname(new_hostname, depth + 1) + + logger.debug(f"βœ… DNS {record_type} for {hostname}: {len(records)} records, {len(new_hostnames)} new hostnames") + + except Exception as e: + operation.success = False + operation.error_message = str(e) + logger.debug(f"❌ DNS {record_type} query failed for {hostname}: {e}") - # Get certificates - logger.debug(f"Checking certificates for {hostname}") - certificates = self.cert_checker.get_certificates(hostname) - if certificates: - self.data.certificates[hostname] = certificates - logger.info(f"Found {len(certificates)} certificates for {hostname}") - else: - logger.debug(f"No certificates found for {hostname}") + self.data.add_operation(operation) - def _extract_new_subdomains(self, hostname: str) -> Set[str]: - """Extract new subdomains from DNS records and certificates.""" - new_subdomains = set() + def _execute_certificate_operation(self, operation_spec: Dict[str, Any]): + """Execute certificate check operation.""" + hostname = operation_spec['hostname'] + depth = operation_spec['depth'] - # From DNS records - if hostname in self.data.dns_records: - dns_subdomains = self.dns_resolver.extract_subdomains_from_dns( - self.data.dns_records[hostname] - ) - new_subdomains.update(dns_subdomains) - logger.debug(f"Extracted {len(dns_subdomains)} subdomains from DNS records of {hostname}") + # Skip certificates in hostname-only mode + if self.is_hostname_only_mode: + logger.debug(f"⭐ Skipping certificate check for {hostname} (hostname-only mode)") + return - # From certificates - if hostname in self.data.certificates: - cert_subdomains = self.cert_checker.extract_subdomains_from_certificates( - self.data.certificates[hostname] - ) - new_subdomains.update(cert_subdomains) - logger.debug(f"Extracted {len(cert_subdomains)} subdomains from certificates of {hostname}") + logger.debug(f"πŸ” Certificate check for {hostname}") - # Filter out already known hostnames - filtered_subdomains = new_subdomains - self.data.hostnames - logger.debug(f"{len(filtered_subdomains)} new subdomains after filtering") + operation = DiscoveryOperation( + operation_type=OperationType.CERTIFICATE_CHECK, + target=hostname, + metadata={'depth': depth} + ) - return filtered_subdomains + try: + # Use updated method with operation_id + certificates = self.cert_checker.get_certificates(hostname, operation.operation_id) + operation.certificates = certificates + operation.success = len(certificates) > 0 + + # Process certificates + node = self.data.get_node(hostname) + if node: + new_hostnames = set() + + for cert in certificates: + node.add_certificate(cert) + + # Extract hostnames from certificate subjects + subject_hostnames = self.cert_checker.extract_subdomains_from_certificates([cert]) + for subject_hostname in subject_hostnames: + if subject_hostname != hostname and self._is_valid_hostname(subject_hostname): + new_hostnames.add(subject_hostname) + + # Update operation and create edges + operation.discovered_hostnames = list(new_hostnames) + + for new_hostname in new_hostnames: + # Add node + new_node = self.data.add_node( + hostname=new_hostname, + depth=depth + 1, + discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT, + operation_id=operation.operation_id + ) + + # Add edge + self.data.add_edge( + source=hostname, + target=new_hostname, + discovery_method=DiscoveryMethod.CERTIFICATE_SUBJECT, + operation_id=operation.operation_id + ) + + # Queue operations for new hostname (within depth limit) + if depth + 1 <= self.config.max_depth: + self._queue_dns_operations_for_hostname(new_hostname, depth + 1) + + logger.debug(f"βœ… Certificates for {hostname}: {len(certificates)} certs, {len(new_hostnames)} new hostnames") + + except Exception as e: + operation.success = False + operation.error_message = str(e) + logger.debug(f"❌ Certificate check failed for {hostname}: {e}") + + self.data.add_operation(operation) def _perform_external_lookups(self): - """Perform Shodan and VirusTotal lookups.""" - logger.info(f"Starting external lookups for {len(self.data.ip_addresses)} IPs and {len(self.data.hostnames)} hostnames") + """Perform external service lookups with operation tracking.""" + if self.is_hostname_only_mode: + logger.info("⭐ Skipping external lookups (hostname-only mode)") + return - # Reverse DNS for all IPs - logger.info("Performing reverse DNS lookups") - reverse_dns_count = 0 - for ip in self.data.ip_addresses: - reverse = self.dns_resolver.reverse_dns_lookup(ip) - if reverse: - self.data.reverse_dns[ip] = reverse - reverse_dns_count += 1 - logger.debug(f"Reverse DNS for {ip}: {reverse}") + self._update_progress("Performing external service lookups", 85) - logger.info(f"Completed reverse DNS: {reverse_dns_count}/{len(self.data.ip_addresses)} successful") + # Reverse DNS + self._perform_reverse_dns_lookups() # Shodan lookups if self.shodan_client: - logger.info(f"Starting Shodan lookups for {len(self.data.ip_addresses)} IPs") - shodan_success_count = 0 - - for ip in self.data.ip_addresses: - try: - logger.debug(f"Querying Shodan for IP: {ip}") - result = self.shodan_client.lookup_ip(ip) - if result: - self.data.add_shodan_result(ip, result) - shodan_success_count += 1 - logger.info(f"Shodan result for {ip}: {len(result.ports)} ports") - else: - logger.debug(f"No Shodan data for {ip}") - except Exception as e: - logger.warning(f"Error querying Shodan for {ip}: {e}") - - logger.info(f"Shodan lookups complete: {shodan_success_count}/{len(self.data.ip_addresses)} successful") - else: - logger.info("Skipping Shodan lookups (no API key)") + self._perform_shodan_lookups() # VirusTotal lookups if self.virustotal_client: - total_resources = len(self.data.ip_addresses) + len(self.data.hostnames) - logger.info(f"Starting VirusTotal lookups for {total_resources} resources") - vt_success_count = 0 - - # Check IPs - for ip in self.data.ip_addresses: - try: - logger.debug(f"Querying VirusTotal for IP: {ip}") - result = self.virustotal_client.lookup_ip(ip) - if result: - self.data.add_virustotal_result(ip, result) - vt_success_count += 1 - logger.info(f"VirusTotal result for {ip}: {result.positives}/{result.total} detections") - else: - logger.debug(f"No VirusTotal data for {ip}") - except Exception as e: - logger.warning(f"Error querying VirusTotal for IP {ip}: {e}") - - # Check domains - for hostname in self.data.hostnames: - try: - logger.debug(f"Querying VirusTotal for domain: {hostname}") - result = self.virustotal_client.lookup_domain(hostname) - if result: - self.data.add_virustotal_result(hostname, result) - vt_success_count += 1 - logger.info(f"VirusTotal result for {hostname}: {result.positives}/{result.total} detections") - else: - logger.debug(f"No VirusTotal data for {hostname}") - except Exception as e: - logger.warning(f"Error querying VirusTotal for domain {hostname}: {e}") - - logger.info(f"VirusTotal lookups complete: {vt_success_count}/{total_resources} successful") - else: - logger.info("Skipping VirusTotal lookups (no API key)") + self._perform_virustotal_lookups() + + def _perform_reverse_dns_lookups(self): + """Perform reverse DNS lookups with operation tracking.""" + logger.info(f"πŸ”„ Performing reverse DNS for {len(self.data.ip_addresses)} IPs") - # Final external lookup summary - ext_stats = { - 'reverse_dns': len(self.data.reverse_dns), - 'shodan_results': len(self.data.shodan_results), - 'virustotal_results': len(self.data.virustotal_results) - } - logger.info(f"External lookups summary: {ext_stats}") \ No newline at end of file + for ip in self.data.ip_addresses: + operation = DiscoveryOperation( + operation_type=OperationType.DNS_REVERSE, + target=ip + ) + + try: + # Use updated method with operation_id + reverse_hostname = self.dns_resolver.reverse_dns_lookup(ip, operation.operation_id) + if reverse_hostname: + operation.discovered_hostnames = [reverse_hostname] + operation.success = True + + # Update nodes that have this IP + for node in self.data.nodes.values(): + if ip in node.resolved_ips: + node.reverse_dns = reverse_hostname + + logger.debug(f"πŸ”„ Reverse DNS {ip} -> {reverse_hostname}") + else: + operation.success = False + + except Exception as e: + operation.success = False + operation.error_message = str(e) + + self.data.add_operation(operation) + + def _perform_shodan_lookups(self): + """Perform Shodan lookups with operation tracking.""" + logger.info(f"πŸ•΅οΈ Performing Shodan lookups for {len(self.data.ip_addresses)} IPs") + + for ip in self.data.ip_addresses: + operation = DiscoveryOperation( + operation_type=OperationType.SHODAN_LOOKUP, + target=ip + ) + + try: + # Use updated method with operation_id + result = self.shodan_client.lookup_ip(ip, operation.operation_id) + if result: + operation.shodan_results = [result] + operation.success = True + + # Add to relevant nodes + for node in self.data.nodes.values(): + if ip in node.resolved_ips: + node.shodan_results.append(result) + + logger.debug(f"πŸ•΅οΈ Shodan {ip}: {len(result.ports)} ports") + else: + operation.success = False + + except Exception as e: + operation.success = False + operation.error_message = str(e) + + self.data.add_operation(operation) + + def _perform_virustotal_lookups(self): + """Perform VirusTotal lookups with operation tracking.""" + total_resources = len(self.data.ip_addresses) + len(self.data.nodes) + logger.info(f"πŸ›‘οΈ Performing VirusTotal lookups for {total_resources} resources") + + # Check IPs + for ip in self.data.ip_addresses: + operation = DiscoveryOperation( + operation_type=OperationType.VIRUSTOTAL_IP, + target=ip + ) + + try: + # Use updated method with operation_id + result = self.virustotal_client.lookup_ip(ip, operation.operation_id) + if result: + operation.virustotal_results = [result] + operation.success = True + + # Add to relevant nodes + for node in self.data.nodes.values(): + if ip in node.resolved_ips: + node.virustotal_results.append(result) + + logger.debug(f"πŸ›‘οΈ VirusTotal {ip}: {result.positives}/{result.total}") + else: + operation.success = False + + except Exception as e: + operation.success = False + operation.error_message = str(e) + + self.data.add_operation(operation) + + # Check domains + for hostname, node in self.data.nodes.items(): + operation = DiscoveryOperation( + operation_type=OperationType.VIRUSTOTAL_DOMAIN, + target=hostname + ) + + try: + # Use updated method with operation_id + result = self.virustotal_client.lookup_domain(hostname, operation.operation_id) + if result: + operation.virustotal_results = [result] + operation.success = True + node.virustotal_results.append(result) + + logger.debug(f"πŸ›‘οΈ VirusTotal {hostname}: {result.positives}/{result.total}") + else: + operation.success = False + + except Exception as e: + operation.success = False + operation.error_message = str(e) + + self.data.add_operation(operation) + + def _is_valid_hostname(self, hostname: str) -> bool: + """Validate hostname format.""" + if not hostname or '.' not in hostname or len(hostname) > 255: + return False + + # Basic validation + parts = hostname.split('.') + if len(parts) < 2: + return False + + for part in parts: + if not part or len(part) > 63: + return False + + return True + +# Backward compatibility +ReconnaissanceEngine = ForensicReconnaissanceEngine \ No newline at end of file diff --git a/src/report_generator.py b/src/report_generator.py index f9f8315..6fed8e8 100644 --- a/src/report_generator.py +++ b/src/report_generator.py @@ -1,111 +1,451 @@ # File: src/report_generator.py -"""Generate reports from reconnaissance data.""" +"""Enhanced report generation with forensic details and discovery graph visualization.""" from datetime import datetime -from typing import Dict, Any -from .data_structures import ReconData +from typing import Dict, Any, List, Set +from .data_structures import ForensicReconData, DiscoveryMethod, OperationType +import logging -class ReportGenerator: - """Generate various report formats.""" +logger = logging.getLogger(__name__) + +class ForensicReportGenerator: + """Generate comprehensive forensic reports with discovery provenance.""" - def __init__(self, data: ReconData): + def __init__(self, data: ForensicReconData): self.data = data def generate_text_report(self) -> str: - """Generate comprehensive text report.""" + """Generate comprehensive forensic text report.""" report = [] # Header - report.append("="*80) - report.append("DNS RECONNAISSANCE REPORT") - report.append("="*80) - report.append(f"Start Time: {self.data.start_time}") - report.append(f"End Time: {self.data.end_time}") + report.append("=" * 80) + report.append("FORENSIC DNS RECONNAISSANCE REPORT") + report.append("=" * 80) + report.append(f"Scan Start: {self.data.start_time}") if self.data.end_time: + report.append(f"Scan End: {self.data.end_time}") duration = self.data.end_time - self.data.start_time report.append(f"Duration: {duration}") + report.append(f"Target: {self.data.scan_config.get('target', 'Unknown')}") + report.append(f"Max Depth: {self.data.scan_config.get('max_depth', 'Unknown')}") report.append("") - # Summary - report.append("SUMMARY") + # Executive Summary + report.append("EXECUTIVE SUMMARY") report.append("-" * 40) - report.append(f"Total Hostnames Discovered: {len(self.data.hostnames)}") - report.append(f"Total IP Addresses Found: {len(self.data.ip_addresses)}") - report.append(f"Total DNS Records: {sum(len(records) for records in self.data.dns_records.values())}") - report.append(f"Total Certificates Found: {sum(len(certs) for certs in self.data.certificates.values())}") + stats = self.data.get_stats() + report.append(f"Discovered Hostnames: {stats['hostnames']}") + report.append(f"IP Addresses Found: {stats['ip_addresses']}") + report.append(f"Operations Performed: {stats['operations_performed']}") + report.append(f"Discovery Relationships: {stats['discovery_edges']}") + report.append(f"DNS Records Collected: {stats['dns_records']}") + report.append(f"Total Certificates: {stats['certificates_total']}") + report.append(f" └─ Currently Valid: {stats['certificates_current']}") + report.append(f" └─ Expired: {stats['certificates_expired']}") + report.append(f"Shodan Results: {stats['shodan_results']}") + report.append(f"VirusTotal Results: {stats['virustotal_results']}") report.append("") - # Hostnames by depth - report.append("HOSTNAMES BY DISCOVERY DEPTH") + # Discovery Graph Analysis + graph_analysis = self.data._generate_graph_analysis() + report.append("DISCOVERY GRAPH ANALYSIS") report.append("-" * 40) - depth_groups = {} - for hostname, depth in self.data.depth_map.items(): - if depth not in depth_groups: - depth_groups[depth] = [] - depth_groups[depth].append(hostname) - - for depth in sorted(depth_groups.keys()): - report.append(f"Depth {depth}: {len(depth_groups[depth])} hostnames") - for hostname in sorted(depth_groups[depth]): - report.append(f" - {hostname}") + report.append(f"Maximum Discovery Depth: {graph_analysis['max_depth']}") + report.append(f"Root Nodes (Initial Targets): {len(graph_analysis['root_nodes'])}") + report.append(f"Leaf Nodes (No Further Discoveries): {len(graph_analysis['leaf_nodes'])}") report.append("") - # IP Addresses - report.append("IP ADDRESSES") + # Depth Distribution + report.append("Discovery Depth Distribution:") + for depth, count in sorted(graph_analysis['depth_distribution'].items()): + report.append(f" Depth {depth}: {count} hostnames") + report.append("") + + # Discovery Methods Distribution + report.append("Discovery Methods Used:") + for method, count in sorted(graph_analysis['discovery_method_distribution'].items()): + report.append(f" {method}: {count} discoveries") + report.append("") + + # Discovery Tree + report.append("DISCOVERY TREE") report.append("-" * 40) - for ip in sorted(self.data.ip_addresses): - report.append(f"{ip}") - if ip in self.data.reverse_dns: - report.append(f" Reverse DNS: {self.data.reverse_dns[ip]}") - if ip in self.data.shodan_results: - shodan = self.data.shodan_results[ip] - report.append(f" Shodan: {len(shodan.ports)} open ports") - if shodan.organization: - report.append(f" Organization: {shodan.organization}") - if shodan.country: - report.append(f" Country: {shodan.country}") + report.extend(self._generate_discovery_tree()) report.append("") - # DNS Records - report.append("DNS RECORDS") + # Detailed Node Analysis + report.append("DETAILED NODE ANALYSIS") report.append("-" * 40) - for hostname in sorted(self.data.dns_records.keys()): - report.append(f"{hostname}:") - records_by_type = {} - for record in self.data.dns_records[hostname]: - if record.record_type not in records_by_type: - records_by_type[record.record_type] = [] - records_by_type[record.record_type].append(record) - - for record_type in sorted(records_by_type.keys()): - report.append(f" {record_type}:") - for record in records_by_type[record_type]: - report.append(f" {record.value}") + report.extend(self._generate_node_details()) report.append("") - # Certificates - if self.data.certificates: - report.append("CERTIFICATES") - report.append("-" * 40) - for hostname in sorted(self.data.certificates.keys()): - report.append(f"{hostname}:") - for cert in self.data.certificates[hostname]: - report.append(f" Certificate ID: {cert.id}") - report.append(f" Issuer: {cert.issuer}") - report.append(f" Valid From: {cert.not_before}") - report.append(f" Valid Until: {cert.not_after}") - if cert.is_wildcard: - report.append(f" Type: Wildcard Certificate") - report.append("") + # Operations Timeline + report.append("OPERATIONS TIMELINE") + report.append("-" * 40) + report.extend(self._generate_operations_timeline()) + report.append("") # Security Analysis - if self.data.virustotal_results: + security_findings = self._analyze_security_findings() + if security_findings: report.append("SECURITY ANALYSIS") report.append("-" * 40) - for resource, result in self.data.virustotal_results.items(): - if result.positives > 0: - report.append(f"⚠️ {resource}: {result.positives}/{result.total} detections") - report.append(f" Scan Date: {result.scan_date}") - report.append(f" Report: {result.permalink}") + report.extend(security_findings) + report.append("") - return "\n".join(report) \ No newline at end of file + # Certificate Analysis + cert_analysis = self._analyze_certificates() + if cert_analysis: + report.append("CERTIFICATE ANALYSIS") + report.append("-" * 40) + report.extend(cert_analysis) + report.append("") + + # DNS Record Analysis + report.append("DNS RECORD ANALYSIS") + report.append("-" * 40) + report.extend(self._analyze_dns_records()) + report.append("") + + return "\n".join(report) + + def _generate_discovery_tree(self) -> List[str]: + """Generate a tree view of hostname discoveries.""" + tree_lines = [] + + # Find root nodes + graph_analysis = self.data._generate_graph_analysis() + root_nodes = graph_analysis['root_nodes'] + + if not root_nodes: + tree_lines.append("No root nodes found") + return tree_lines + + # Generate tree for each root + for root in sorted(root_nodes): + tree_lines.extend(self._build_tree_branch(root, "", set())) + + return tree_lines + + def _build_tree_branch(self, hostname: str, prefix: str, visited: Set[str]) -> List[str]: + """Build a tree branch for a hostname.""" + lines = [] + + # Avoid cycles + if hostname in visited: + lines.append(f"{prefix}{hostname} [CYCLE]") + return lines + + visited.add(hostname) + + # Get node info + node = self.data.get_node(hostname) + if not node: + lines.append(f"{prefix}{hostname} [NO NODE DATA]") + return lines + + # Node info + node_info = f"{hostname} (depth:{node.depth}" + if node.resolved_ips: + node_info += f", IPs:{len(node.resolved_ips)}" + if node.certificates: + valid_certs = len(node.get_current_certificates()) + expired_certs = len(node.get_expired_certificates()) + node_info += f", certs:{valid_certs}+{expired_certs}" + node_info += ")" + + lines.append(f"{prefix}{node_info}") + + # Get children + children = self.data.get_children(hostname) + children.sort() + + for i, child in enumerate(children): + is_last = (i == len(children) - 1) + child_prefix = prefix + ("└── " if is_last else "β”œβ”€β”€ ") + next_prefix = prefix + (" " if is_last else "β”‚ ") + + # Find discovery method for this child + discovery_method = "unknown" + for edge in self.data.edges: + if edge.source_hostname == hostname and edge.target_hostname == child: + discovery_method = edge.discovery_method.value + break + + lines.append(f"{child_prefix}[{discovery_method}]") + lines.extend(self._build_tree_branch(child, next_prefix, visited.copy())) + + return lines + + def _generate_node_details(self) -> List[str]: + """Generate detailed analysis of each node.""" + details = [] + + # Sort nodes by depth, then alphabetically + sorted_nodes = sorted(self.data.nodes.items(), + key=lambda x: (x[1].depth, x[0])) + + for hostname, node in sorted_nodes: + details.append(f"\n{hostname} (Depth {node.depth})") + details.append("-" * (len(hostname) + 20)) + + # Discovery provenance + details.append(f"First Seen: {node.first_seen}") + details.append(f"Last Updated: {node.last_updated}") + details.append(f"Discovery Methods: {', '.join(m.value for m in node.discovery_methods)}") + + # Discovery paths + paths = self.data.get_discovery_path(hostname) + if paths: + details.append("Discovery Paths:") + for i, path in enumerate(paths[:3]): # Show max 3 paths + path_str = " -> ".join([f"{src}[{method.value}]{tgt}" for src, tgt, method in path]) + details.append(f" Path {i+1}: {path_str}") + if len(paths) > 3: + details.append(f" ... and {len(paths) - 3} more paths") + + # DNS status + if node.dns_exists is not None: + status = "EXISTS" if node.dns_exists else "NOT FOUND" + details.append(f"DNS Status: {status} (checked: {node.last_dns_check})") + + # IP addresses + if node.resolved_ips: + details.append(f"Resolved IPs: {', '.join(sorted(node.resolved_ips))}") + + # Reverse DNS + if node.reverse_dns: + details.append(f"Reverse DNS: {node.reverse_dns}") + + # DNS records summary + total_records = len(node.get_all_dns_records()) + if total_records > 0: + record_types = list(node.dns_records_by_type.keys()) + details.append(f"DNS Records: {total_records} records ({', '.join(sorted(record_types))})") + + # Certificates summary + current_certs = len(node.get_current_certificates()) + expired_certs = len(node.get_expired_certificates()) + if current_certs > 0 or expired_certs > 0: + details.append(f"Certificates: {current_certs} valid, {expired_certs} expired") + + # External results + if node.shodan_results: + details.append(f"Shodan: {len(node.shodan_results)} results") + if node.virustotal_results: + vt_detections = sum(r.positives for r in node.virustotal_results) + details.append(f"VirusTotal: {len(node.virustotal_results)} scans, {vt_detections} total detections") + + return details + + def _generate_operations_timeline(self) -> List[str]: + """Generate operations timeline.""" + timeline = [] + + # Sort operations by timestamp + sorted_ops = [] + for op_id in self.data.operation_timeline: + if op_id in self.data.operations: + sorted_ops.append(self.data.operations[op_id]) + + # Group operations by type for summary + op_summary = {} + for op in sorted_ops: + op_type = op.operation_type.value + if op_type not in op_summary: + op_summary[op_type] = {'total': 0, 'successful': 0, 'failed': 0} + op_summary[op_type]['total'] += 1 + if op.success: + op_summary[op_type]['successful'] += 1 + else: + op_summary[op_type]['failed'] += 1 + + # Operations summary + timeline.append("Operations Summary:") + for op_type, counts in sorted(op_summary.items()): + timeline.append(f" {op_type}: {counts['successful']}/{counts['total']} successful") + timeline.append("") + + # Recent operations (last 20) + timeline.append("Recent Operations (last 20):") + recent_ops = sorted_ops[-20:] if len(sorted_ops) > 20 else sorted_ops + + for op in recent_ops: + timestamp = op.timestamp.strftime("%H:%M:%S.%f")[:-3] + status = "βœ“" if op.success else "βœ—" + target_short = op.target[:30] + "..." if len(op.target) > 30 else op.target + + timeline.append(f" {timestamp} {status} {op.operation_type.value:15} {target_short}") + + # Show key results + if op.discovered_hostnames: + hostname_list = ", ".join(op.discovered_hostnames[:3]) + if len(op.discovered_hostnames) > 3: + hostname_list += f" (+{len(op.discovered_hostnames) - 3} more)" + timeline.append(f" └─ Discovered: {hostname_list}") + + if op.error_message: + timeline.append(f" └─ Error: {op.error_message[:50]}...") + + return timeline + + def _analyze_security_findings(self) -> List[str]: + """Analyze security-related findings.""" + findings = [] + + # VirusTotal detections + high_risk_resources = [] + medium_risk_resources = [] + + for node in self.data.nodes.values(): + for vt_result in node.virustotal_results: + if vt_result.positives > 5: + high_risk_resources.append((node.hostname, vt_result)) + elif vt_result.positives > 0: + medium_risk_resources.append((node.hostname, vt_result)) + + if high_risk_resources: + findings.append("🚨 HIGH RISK FINDINGS:") + for hostname, vt_result in high_risk_resources: + findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections") + findings.append(f" Report: {vt_result.permalink}") + + if medium_risk_resources: + findings.append("⚠️ MEDIUM RISK FINDINGS:") + for hostname, vt_result in medium_risk_resources[:5]: # Show max 5 + findings.append(f" {hostname}: {vt_result.positives}/{vt_result.total} detections") + if len(medium_risk_resources) > 5: + findings.append(f" ... and {len(medium_risk_resources) - 5} more resources with detections") + + # Expired certificates still in use + nodes_with_expired_certs = [] + for hostname, node in self.data.nodes.items(): + expired = node.get_expired_certificates() + current = node.get_current_certificates() + if expired and not current: # Only expired certs, no valid ones + nodes_with_expired_certs.append((hostname, len(expired))) + + if nodes_with_expired_certs: + findings.append("πŸ“œ CERTIFICATE ISSUES:") + for hostname, count in nodes_with_expired_certs: + findings.append(f" {hostname}: {count} expired certificates, no valid ones") + + return findings + + def _analyze_certificates(self) -> List[str]: + """Analyze certificate findings.""" + cert_analysis = [] + + # Certificate statistics + total_certs = 0 + valid_certs = 0 + expired_certs = 0 + wildcard_certs = 0 + + cert_authorities = {} + + for node in self.data.nodes.values(): + for cert in node.certificates: + total_certs += 1 + if cert.is_valid_now: + valid_certs += 1 + else: + expired_certs += 1 + + if cert.is_wildcard: + wildcard_certs += 1 + + # Count certificate authorities + issuer_short = cert.issuer.split(',')[0] if ',' in cert.issuer else cert.issuer + cert_authorities[issuer_short] = cert_authorities.get(issuer_short, 0) + 1 + + if total_certs == 0: + cert_analysis.append("No certificates found.") + return cert_analysis + + cert_analysis.append(f"Total Certificates: {total_certs}") + cert_analysis.append(f" Currently Valid: {valid_certs}") + cert_analysis.append(f" Expired: {expired_certs}") + cert_analysis.append(f" Wildcard Certificates: {wildcard_certs}") + cert_analysis.append("") + + # Top certificate authorities + cert_analysis.append("Certificate Authorities:") + sorted_cas = sorted(cert_authorities.items(), key=lambda x: x[1], reverse=True) + for ca, count in sorted_cas[:5]: + cert_analysis.append(f" {ca}: {count} certificates") + cert_analysis.append("") + + # Expiring soon (within 30 days) + from datetime import timedelta + soon = datetime.now() + timedelta(days=30) + expiring_soon = [] + + for hostname, node in self.data.nodes.items(): + for cert in node.get_current_certificates(): + if cert.not_after <= soon: + expiring_soon.append((hostname, cert.not_after, cert.id)) + + if expiring_soon: + cert_analysis.append("Certificates Expiring Soon (within 30 days):") + for hostname, expiry, cert_id in sorted(expiring_soon, key=lambda x: x[1]): + cert_analysis.append(f" {hostname}: expires {expiry.strftime('%Y-%m-%d')} (cert ID: {cert_id})") + + return cert_analysis + + def _analyze_dns_records(self) -> List[str]: + """Analyze DNS record patterns.""" + dns_analysis = [] + + # Record type distribution + record_type_counts = {} + total_records = 0 + + for node in self.data.nodes.values(): + for record_type, records in node.dns_records_by_type.items(): + record_type_counts[record_type] = record_type_counts.get(record_type, 0) + len(records) + total_records += len(records) + + dns_analysis.append(f"Total DNS Records: {total_records}") + dns_analysis.append("Record Type Distribution:") + + for record_type, count in sorted(record_type_counts.items()): + percentage = (count / total_records * 100) if total_records > 0 else 0 + dns_analysis.append(f" {record_type}: {count} ({percentage:.1f}%)") + dns_analysis.append("") + + # Interesting findings + interesting = [] + + # Multiple MX records + multi_mx_nodes = [] + for hostname, node in self.data.nodes.items(): + mx_records = node.dns_records_by_type.get('MX', []) + if len(mx_records) > 1: + multi_mx_nodes.append((hostname, len(mx_records))) + + if multi_mx_nodes: + interesting.append("Multiple MX Records:") + for hostname, count in multi_mx_nodes: + interesting.append(f" {hostname}: {count} MX records") + + # CAA records (security-relevant) + caa_nodes = [] + for hostname, node in self.data.nodes.items(): + if 'CAA' in node.dns_records_by_type: + caa_nodes.append(hostname) + + if caa_nodes: + interesting.append(f"Domains with CAA Records: {len(caa_nodes)}") + for hostname in caa_nodes[:5]: # Show first 5 + interesting.append(f" {hostname}") + + if interesting: + dns_analysis.append("Interesting DNS Findings:") + dns_analysis.extend(interesting) + + return dns_analysis + +# Backward compatibility +ReportGenerator = ForensicReportGenerator \ No newline at end of file diff --git a/src/shodan_client.py b/src/shodan_client.py index a5a7384..ff83128 100644 --- a/src/shodan_client.py +++ b/src/shodan_client.py @@ -1,9 +1,10 @@ # File: src/shodan_client.py -"""Shodan API integration.""" +"""Shodan API integration with forensic operation tracking.""" import requests import time import logging +import uuid from typing import Optional, Dict, Any, List from .data_structures import ShodanResult from .config import Config @@ -12,7 +13,7 @@ from .config import Config logger = logging.getLogger(__name__) class ShodanClient: - """Shodan API client.""" + """Shodan API client with forensic tracking.""" BASE_URL = "https://api.shodan.io" @@ -21,7 +22,7 @@ class ShodanClient: self.config = config self.last_request = 0 - logger.info(f"πŸ•΅οΈ Shodan client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}") + logger.info(f"πŸ•΅οΈ Shodan client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}") def _rate_limit(self): """Apply rate limiting for Shodan.""" @@ -31,16 +32,20 @@ class ShodanClient: if time_since_last < min_interval: sleep_time = min_interval - time_since_last - logger.debug(f"⏸️ Shodan rate limiting: sleeping for {sleep_time:.2f}s") + logger.debug(f"⏸️ Shodan rate limiting: sleeping for {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request = time.time() - def lookup_ip(self, ip: str) -> Optional[ShodanResult]: - """Lookup IP address information.""" + def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[ShodanResult]: + """Lookup IP address information with forensic tracking.""" self._rate_limit() - logger.debug(f"πŸ” Querying Shodan for IP: {ip}") + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ” Querying Shodan for IP: {ip} (operation: {operation_id})") try: url = f"{self.BASE_URL}/shodan/host/{ip}" @@ -71,28 +76,30 @@ class ShodanClient: 'banner': service.get('data', '').strip()[:200] if service.get('data') else '' } + # Create ShodanResult with forensic metadata result = ShodanResult( ip=ip, ports=sorted(list(set(ports))), services=services, organization=data.get('org'), - country=data.get('country_name') + country=data.get('country_name'), + operation_id=operation_id # Forensic tracking ) logger.info(f"βœ… Shodan result for {ip}: {len(result.ports)} ports, org: {result.organization}") return result elif response.status_code == 404: - logger.debug(f"ℹ️ IP {ip} not found in Shodan database") + logger.debug(f"ℹ️ IP {ip} not found in Shodan database") return None elif response.status_code == 401: logger.error("❌ Shodan API key is invalid or expired") return None elif response.status_code == 429: - logger.warning("⚠️ Shodan API rate limit exceeded") + logger.warning("⚠️ Shodan API rate limit exceeded") return None else: - logger.warning(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}") + logger.warning(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}") try: error_data = response.json() logger.debug(f"Shodan error details: {error_data}") @@ -101,7 +108,7 @@ class ShodanClient: return None except requests.exceptions.Timeout: - logger.warning(f"⏱️ Shodan query timeout for {ip}") + logger.warning(f"⏱️ Shodan query timeout for {ip}") return None except requests.exceptions.RequestException as e: logger.error(f"🌐 Shodan network error for {ip}: {e}") @@ -110,11 +117,15 @@ class ShodanClient: logger.error(f"❌ Unexpected error querying Shodan for {ip}: {e}") return None - def search_domain(self, domain: str) -> List[str]: - """Search for IPs associated with a domain.""" + def search_domain(self, domain: str, operation_id: Optional[str] = None) -> List[str]: + """Search for IPs associated with a domain with forensic tracking.""" self._rate_limit() - logger.debug(f"πŸ” Searching Shodan for domain: {domain}") + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ” Searching Shodan for domain: {domain} (operation: {operation_id})") try: url = f"{self.BASE_URL}/shodan/host/search" @@ -149,14 +160,14 @@ class ShodanClient: logger.error("❌ Shodan API key is invalid for search") return [] elif response.status_code == 429: - logger.warning("⚠️ Shodan search rate limit exceeded") + logger.warning("⚠️ Shodan search rate limit exceeded") return [] else: - logger.warning(f"⚠️ Shodan search error for {domain}: HTTP {response.status_code}") + logger.warning(f"⚠️ Shodan search error for {domain}: HTTP {response.status_code}") return [] except requests.exceptions.Timeout: - logger.warning(f"⏱️ Shodan search timeout for {domain}") + logger.warning(f"⏱️ Shodan search timeout for {domain}") return [] except requests.exceptions.RequestException as e: logger.error(f"🌐 Shodan search network error for {domain}: {e}") diff --git a/src/virustotal_client.py b/src/virustotal_client.py index bc5c4d3..6b067c2 100644 --- a/src/virustotal_client.py +++ b/src/virustotal_client.py @@ -1,9 +1,10 @@ # File: src/virustotal_client.py -"""VirusTotal API integration.""" +"""VirusTotal API integration with forensic operation tracking.""" import requests import time import logging +import uuid from datetime import datetime from typing import Optional from .data_structures import VirusTotalResult @@ -13,7 +14,7 @@ from .config import Config logger = logging.getLogger(__name__) class VirusTotalClient: - """VirusTotal API client.""" + """VirusTotal API client with forensic tracking.""" BASE_URL = "https://www.virustotal.com/vtapi/v2" @@ -22,7 +23,7 @@ class VirusTotalClient: self.config = config self.last_request = 0 - logger.info(f"πŸ›‘οΈ VirusTotal client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}") + logger.info(f"πŸ›‘οΈ VirusTotal client initialized with API key ending in: ...{api_key[-4:] if len(api_key) > 4 else api_key}") def _rate_limit(self): """Apply rate limiting for VirusTotal.""" @@ -32,16 +33,20 @@ class VirusTotalClient: if time_since_last < min_interval: sleep_time = min_interval - time_since_last - logger.debug(f"⏸️ VirusTotal rate limiting: sleeping for {sleep_time:.2f}s") + logger.debug(f"⏸️ VirusTotal rate limiting: sleeping for {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request = time.time() - def lookup_ip(self, ip: str) -> Optional[VirusTotalResult]: - """Lookup IP address reputation.""" + def lookup_ip(self, ip: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]: + """Lookup IP address reputation with forensic tracking.""" self._rate_limit() - logger.debug(f"πŸ” Querying VirusTotal for IP: {ip}") + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ” Querying VirusTotal for IP: {ip} (operation: {operation_id})") try: url = f"{self.BASE_URL}/ip-address/report" @@ -81,30 +86,32 @@ class VirusTotalClient: except ValueError: logger.debug(f"Could not parse scan_date: {data.get('scan_date')}") + # Create VirusTotalResult with forensic metadata result = VirusTotalResult( resource=ip, positives=positives, total=total, scan_date=scan_date, - permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}') + permalink=data.get('permalink', f'https://www.virustotal.com/gui/ip-address/{ip}'), + operation_id=operation_id # Forensic tracking ) logger.info(f"βœ… VirusTotal result for IP {ip}: {result.positives}/{result.total} detections") return result elif data.get('response_code') == 0: - logger.debug(f"ℹ️ IP {ip} not found in VirusTotal database") + logger.debug(f"ℹ️ IP {ip} not found in VirusTotal database") return None else: logger.debug(f"VirusTotal returned response_code: {data.get('response_code')}") return None elif response.status_code == 204: - logger.warning("⚠️ VirusTotal API rate limit exceeded") + logger.warning("⚠️ VirusTotal API rate limit exceeded") return None elif response.status_code == 403: logger.error("❌ VirusTotal API key is invalid or lacks permissions") return None else: - logger.warning(f"⚠️ VirusTotal API error for IP {ip}: HTTP {response.status_code}") + logger.warning(f"⚠️ VirusTotal API error for IP {ip}: HTTP {response.status_code}") try: error_data = response.json() logger.debug(f"VirusTotal error details: {error_data}") @@ -113,7 +120,7 @@ class VirusTotalClient: return None except requests.exceptions.Timeout: - logger.warning(f"⏱️ VirusTotal query timeout for IP {ip}") + logger.warning(f"⏱️ VirusTotal query timeout for IP {ip}") return None except requests.exceptions.RequestException as e: logger.error(f"🌐 VirusTotal network error for IP {ip}: {e}") @@ -122,11 +129,15 @@ class VirusTotalClient: logger.error(f"❌ Unexpected error querying VirusTotal for IP {ip}: {e}") return None - def lookup_domain(self, domain: str) -> Optional[VirusTotalResult]: - """Lookup domain reputation.""" + def lookup_domain(self, domain: str, operation_id: Optional[str] = None) -> Optional[VirusTotalResult]: + """Lookup domain reputation with forensic tracking.""" self._rate_limit() - logger.debug(f"πŸ” Querying VirusTotal for domain: {domain}") + # Generate operation ID if not provided + if operation_id is None: + operation_id = str(uuid.uuid4()) + + logger.debug(f"πŸ” Querying VirusTotal for domain: {domain} (operation: {operation_id})") try: url = f"{self.BASE_URL}/domain/report" @@ -172,30 +183,32 @@ class VirusTotalClient: except ValueError: logger.debug(f"Could not parse scan_date: {data.get('scan_date')}") + # Create VirusTotalResult with forensic metadata result = VirusTotalResult( resource=domain, positives=positives, total=max(total, 1), # Ensure total is at least 1 scan_date=scan_date, - permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}') + permalink=data.get('permalink', f'https://www.virustotal.com/gui/domain/{domain}'), + operation_id=operation_id # Forensic tracking ) logger.info(f"βœ… VirusTotal result for domain {domain}: {result.positives}/{result.total} detections") return result elif data.get('response_code') == 0: - logger.debug(f"ℹ️ Domain {domain} not found in VirusTotal database") + logger.debug(f"ℹ️ Domain {domain} not found in VirusTotal database") return None else: logger.debug(f"VirusTotal returned response_code: {data.get('response_code')}") return None elif response.status_code == 204: - logger.warning("⚠️ VirusTotal API rate limit exceeded") + logger.warning("⚠️ VirusTotal API rate limit exceeded") return None elif response.status_code == 403: logger.error("❌ VirusTotal API key is invalid or lacks permissions") return None else: - logger.warning(f"⚠️ VirusTotal API error for domain {domain}: HTTP {response.status_code}") + logger.warning(f"⚠️ VirusTotal API error for domain {domain}: HTTP {response.status_code}") try: error_data = response.json() logger.debug(f"VirusTotal error details: {error_data}") @@ -204,7 +217,7 @@ class VirusTotalClient: return None except requests.exceptions.Timeout: - logger.warning(f"⏱️ VirusTotal query timeout for domain {domain}") + logger.warning(f"⏱️ VirusTotal query timeout for domain {domain}") return None except requests.exceptions.RequestException as e: logger.error(f"🌐 VirusTotal network error for domain {domain}: {e}") diff --git a/src/web_app.py b/src/web_app.py index 29aa203..84a2e04 100644 --- a/src/web_app.py +++ b/src/web_app.py @@ -1,14 +1,14 @@ # File: src/web_app.py -"""Flask web application for reconnaissance tool.""" +"""Flask web application for forensic reconnaissance tool.""" from flask import Flask, render_template, request, jsonify, send_from_directory import threading import time import logging from .config import Config -from .reconnaissance import ReconnaissanceEngine -from .report_generator import ReportGenerator -from .data_structures import ReconData +from .reconnaissance import ForensicReconnaissanceEngine +from .report_generator import ForensicReportGenerator +from .data_structures import ForensicReconData # Set up logging for this module logger = logging.getLogger(__name__) @@ -23,11 +23,11 @@ def create_app(config: Config): template_folder='../templates', static_folder='../static') - app.config['SECRET_KEY'] = 'recon-tool-secret-key' + app.config['SECRET_KEY'] = 'forensic-recon-tool-secret-key' # Set up logging for web app config.setup_logging(cli_mode=False) - logger.info("🌐 Web application initialized") + logger.info("🌐 Forensic web application initialized") @app.route('/') def index(): @@ -36,7 +36,7 @@ def create_app(config: Config): @app.route('/api/scan', methods=['POST']) def start_scan(): - """Start a new reconnaissance scan.""" + """Start a new forensic reconnaissance scan.""" try: data = request.get_json() target = data.get('target') @@ -52,33 +52,32 @@ def create_app(config: Config): # Generate scan ID scan_id = f"{target}_{int(time.time())}" - logger.info(f"πŸš€ Starting new scan: {scan_id} for target: {target}") + logger.info(f"πŸš€ Starting new forensic scan: {scan_id} for target: {target}") - # Create shared ReconData object for live updates - shared_data = ReconData() + # Create shared ForensicReconData object for live updates + shared_data = ForensicReconData() + shared_data.scan_config = { + 'target': target, + 'max_depth': scan_config.max_depth, + 'shodan_enabled': scan_config.shodan_key is not None, + 'virustotal_enabled': scan_config.virustotal_key is not None + } - # Initialize scan data with the shared data object + # Initialize scan data with the shared forensic data object with scan_lock: active_scans[scan_id] = { 'status': 'starting', 'progress': 0, - 'message': 'Initializing...', - 'data': shared_data, # Share the data object from the start! + 'message': 'Initializing forensic scan...', + 'data': shared_data, # Share the forensic data object from the start 'error': None, - 'live_stats': { - 'hostnames': 0, - 'ip_addresses': 0, - 'dns_records': 0, - 'certificates': 0, - 'shodan_results': 0, - 'virustotal_results': 0 - }, + 'live_stats': shared_data.get_stats(), # Use forensic stats 'latest_discoveries': [] } - # Start reconnaissance in background thread + # Start forensic reconnaissance in background thread thread = threading.Thread( - target=run_reconnaissance_background, + target=run_forensic_reconnaissance_background, args=(scan_id, target, scan_config, shared_data) ) thread.daemon = True @@ -92,14 +91,18 @@ def create_app(config: Config): @app.route('/api/scan//status') def get_scan_status(scan_id): - """Get scan status and progress with live discoveries.""" + """Get scan status and progress with live forensic discoveries.""" with scan_lock: if scan_id not in active_scans: return jsonify({'error': 'Scan not found'}), 404 scan_data = active_scans[scan_id].copy() - # Don't include the full data object in status (too large) + # Update live stats from forensic data if available + if scan_data.get('data') and hasattr(scan_data['data'], 'get_stats'): + scan_data['live_stats'] = scan_data['data'].get_stats() + + # Don't include the full forensic data object in status (too large) if 'data' in scan_data: del scan_data['data'] @@ -107,7 +110,7 @@ def create_app(config: Config): @app.route('/api/scan//report') def get_scan_report(scan_id): - """Get scan report.""" + """Get forensic scan report.""" with scan_lock: if scan_id not in active_scans: return jsonify({'error': 'Scan not found'}), 404 @@ -118,8 +121,8 @@ def create_app(config: Config): return jsonify({'error': 'Scan not completed'}), 400 try: - # Generate report - report_gen = ReportGenerator(scan_data['data']) + # Generate forensic report + report_gen = ForensicReportGenerator(scan_data['data']) return jsonify({ 'json_report': scan_data['data'].to_json(), @@ -128,49 +131,201 @@ def create_app(config: Config): except Exception as e: logger.error(f"❌ Error generating report for {scan_id}: {e}", exc_info=True) return jsonify({'error': f'Failed to generate report: {str(e)}'}), 500 - - @app.route('/api/scan//live-data') - def get_live_scan_data(scan_id): - """Get current reconnaissance data (for real-time updates).""" + + + @app.route('/api/scan//graph') + def get_scan_graph(scan_id): + """Get graph data for visualization.""" with scan_lock: if scan_id not in active_scans: return jsonify({'error': 'Scan not found'}), 404 scan_data = active_scans[scan_id] - # Now we always have a data object, even if it's empty initially - data_obj = scan_data['data'] + if scan_data['status'] != 'completed' or not scan_data['data']: + return jsonify({'error': 'Scan not completed'}), 400 + + try: + forensic_data = scan_data['data'] - if not data_obj: + # Extract nodes for graph + nodes = [] + for hostname, node in forensic_data.nodes.items(): + # Determine node color based on depth + color_map = { + 0: '#00ff41', # Green for root + 1: '#ff9900', # Orange for depth 1 + 2: '#ff6b6b', # Red for depth 2 + 3: '#4ecdc4', # Teal for depth 3 + 4: '#45b7d1', # Blue for depth 4+ + } + color = color_map.get(node.depth, '#666666') + + # Calculate node size based on number of connections and data + connections = len(forensic_data.get_children(hostname)) + len(forensic_data.get_parents(hostname)) + dns_records = len(node.get_all_dns_records()) + certificates = len(node.certificates) + + # Size based on importance (connections + data) + size = max(8, min(20, 8 + connections * 2 + dns_records // 3 + certificates)) + + nodes.append({ + 'id': hostname, + 'label': hostname, + 'depth': node.depth, + 'color': color, + 'size': size, + 'dns_records': dns_records, + 'certificates': certificates, + 'ip_addresses': list(node.resolved_ips), + 'discovery_methods': [method.value for method in node.discovery_methods], + 'first_seen': node.first_seen.isoformat() if node.first_seen else None + }) + + # Extract edges for graph + edges = [] + for edge in forensic_data.edges: + # Skip synthetic TLD expansion edges for cleaner visualization + if edge.source_hostname.startswith('tld_expansion:'): + continue + + # Color edges by discovery method + method_colors = { + 'initial_target': '#00ff41', + 'tld_expansion': '#ff9900', + 'dns_record_value': '#4ecdc4', + 'certificate_subject': '#ff6b6b', + 'dns_subdomain_extraction': '#45b7d1' + } + + edges.append({ + 'source': edge.source_hostname, + 'target': edge.target_hostname, + 'method': edge.discovery_method.value, + 'color': method_colors.get(edge.discovery_method.value, '#666666'), + 'operation_id': edge.operation_id, + 'timestamp': edge.timestamp.isoformat() if edge.timestamp else None + }) + + # Graph statistics + stats = { + 'node_count': len(nodes), + 'edge_count': len(edges), + 'max_depth': max([node['depth'] for node in nodes]) if nodes else 0, + 'discovery_methods': list(set([edge['method'] for edge in edges])), + 'root_nodes': [node['id'] for node in nodes if node['depth'] == 0] + } + + return jsonify({ + 'nodes': nodes, + 'edges': edges, + 'stats': stats + }) + + except Exception as e: + logger.error(f"⚠️ Error generating graph data for {scan_id}: {e}", exc_info=True) + return jsonify({'error': f'Failed to generate graph: {str(e)}'}), 500 + + @app.route('/api/scan//live-data') + def get_live_scan_data(scan_id): + """Get current forensic reconnaissance data (for real-time updates).""" + with scan_lock: + if scan_id not in active_scans: + return jsonify({'error': 'Scan not found'}), 404 + + scan_data = active_scans[scan_id] + forensic_data = scan_data['data'] + + if not forensic_data: return jsonify({ 'hostnames': [], 'ip_addresses': [], - 'stats': scan_data['live_stats'], + 'stats': { + 'hostnames': 0, + 'ip_addresses': 0, + 'discovery_edges': 0, + 'operations_performed': 0, + 'dns_records': 0, + 'certificates_total': 0, + 'certificates_current': 0, + 'certificates_expired': 0, + 'shodan_results': 0, + 'virustotal_results': 0 + }, 'latest_discoveries': [] }) - # Return current discoveries from the shared data object - return jsonify({ - 'hostnames': sorted(list(data_obj.hostnames)), - 'ip_addresses': sorted(list(data_obj.ip_addresses)), - 'stats': data_obj.get_stats(), - 'latest_discoveries': scan_data.get('latest_discoveries', []) - }) + # Extract data from forensic structure for frontend + try: + hostnames = sorted(list(forensic_data.nodes.keys())) + ip_addresses = sorted(list(forensic_data.ip_addresses)) + stats = forensic_data.get_stats() + + # Generate activity log from recent operations + latest_discoveries = [] + recent_operations = forensic_data.operation_timeline[-10:] # Last 10 operations + + for op_id in recent_operations: + if op_id in forensic_data.operations: + operation = forensic_data.operations[op_id] + activity_entry = { + 'timestamp': operation.timestamp.timestamp(), + 'message': f"{operation.operation_type.value}: {operation.target}" + } + + # Add result summary + if operation.discovered_hostnames: + activity_entry['message'] += f" β†’ {len(operation.discovered_hostnames)} hostnames" + if operation.discovered_ips: + activity_entry['message'] += f" β†’ {len(operation.discovered_ips)} IPs" + + latest_discoveries.append(activity_entry) + + # Update scan data with latest discoveries + scan_data['latest_discoveries'] = latest_discoveries + + return jsonify({ + 'hostnames': hostnames, + 'ip_addresses': ip_addresses, + 'stats': stats, + 'latest_discoveries': latest_discoveries + }) + + except Exception as e: + logger.error(f"❌ Error extracting live data for {scan_id}: {e}", exc_info=True) + # Return minimal data structure + return jsonify({ + 'hostnames': [], + 'ip_addresses': [], + 'stats': { + 'hostnames': len(forensic_data.nodes) if forensic_data.nodes else 0, + 'ip_addresses': len(forensic_data.ip_addresses) if forensic_data.ip_addresses else 0, + 'discovery_edges': 0, + 'operations_performed': 0, + 'dns_records': 0, + 'certificates_total': 0, + 'certificates_current': 0, + 'certificates_expired': 0, + 'shodan_results': 0, + 'virustotal_results': 0 + }, + 'latest_discoveries': [] + }) return app -def run_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ReconData): - """Run reconnaissance in background thread with shared data object.""" +def run_forensic_reconnaissance_background(scan_id: str, target: str, config: Config, shared_data: ForensicReconData): + """Run forensic reconnaissance in background thread with shared forensic data object.""" def update_progress(message: str, percentage: int = None): - """Update scan progress and live statistics.""" + """Update scan progress and live forensic statistics.""" with scan_lock: if scan_id in active_scans: active_scans[scan_id]['message'] = message if percentage is not None: active_scans[scan_id]['progress'] = percentage - # Update live stats from the shared data object + # Update live stats from the shared forensic data object if shared_data: active_scans[scan_id]['live_stats'] = shared_data.get_stats() @@ -178,10 +333,13 @@ def run_reconnaissance_background(scan_id: str, target: str, config: Config, sha if 'latest_discoveries' not in active_scans[scan_id]: active_scans[scan_id]['latest_discoveries'] = [] - active_scans[scan_id]['latest_discoveries'].append({ + # Create activity entry + activity_entry = { 'timestamp': time.time(), 'message': message - }) + } + + active_scans[scan_id]['latest_discoveries'].append(activity_entry) # Keep only last 10 discoveries active_scans[scan_id]['latest_discoveries'] = \ @@ -190,40 +348,44 @@ def run_reconnaissance_background(scan_id: str, target: str, config: Config, sha logger.info(f"[{scan_id}] {message} ({percentage}%)" if percentage else f"[{scan_id}] {message}") try: - logger.info(f"πŸ”§ Initializing reconnaissance engine for scan: {scan_id}") + logger.info(f"πŸ”§ Initializing forensic reconnaissance engine for scan: {scan_id}") - # Initialize engine - engine = ReconnaissanceEngine(config) + # Initialize forensic engine + engine = ForensicReconnaissanceEngine(config) engine.set_progress_callback(update_progress) - # IMPORTANT: Pass the shared data object to the engine + # IMPORTANT: Pass the shared forensic data object to the engine engine.set_shared_data(shared_data) # Update status with scan_lock: active_scans[scan_id]['status'] = 'running' - logger.info(f"πŸš€ Starting reconnaissance for: {target}") + logger.info(f"πŸš€ Starting forensic reconnaissance for: {target}") - # Run reconnaissance - this will populate the shared_data object incrementally + # Run forensic reconnaissance - this will populate the shared_data object incrementally final_data = engine.run_reconnaissance(target) - logger.info(f"βœ… Reconnaissance completed for scan: {scan_id}") + logger.info(f"βœ… Forensic reconnaissance completed for scan: {scan_id}") # Update with final results (the shared_data should already be populated) with scan_lock: active_scans[scan_id]['status'] = 'completed' active_scans[scan_id]['progress'] = 100 - active_scans[scan_id]['message'] = 'Reconnaissance completed' + active_scans[scan_id]['message'] = 'Forensic reconnaissance completed' active_scans[scan_id]['data'] = final_data # This should be the same as shared_data active_scans[scan_id]['live_stats'] = final_data.get_stats() - # Log final statistics + # Log final forensic statistics final_stats = final_data.get_stats() - logger.info(f"πŸ“Š Final stats for {scan_id}: {final_stats}") + logger.info(f"πŸ“Š Final forensic stats for {scan_id}: {final_stats}") + + # Log discovery graph analysis + graph_analysis = final_data._generate_graph_analysis() + logger.info(f"🌐 Discovery graph: {len(final_data.nodes)} nodes, {len(final_data.edges)} edges, max depth: {graph_analysis['max_depth']}") except Exception as e: - logger.error(f"❌ Error in reconnaissance for {scan_id}: {e}", exc_info=True) + logger.error(f"❌ Error in forensic reconnaissance for {scan_id}: {e}", exc_info=True) # Handle errors with scan_lock: active_scans[scan_id]['status'] = 'error' diff --git a/static/script.js b/static/script.js index 195ecc6..2dc6097 100644 --- a/static/script.js +++ b/static/script.js @@ -1,4 +1,4 @@ -// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Debug Output +// DNS Reconnaissance Tool - Enhanced Frontend JavaScript with Forensic Data Support class ReconTool { constructor() { @@ -6,7 +6,8 @@ class ReconTool { this.pollInterval = null; this.liveDataInterval = null; this.currentReport = null; - this.debugMode = true; // Enable debug logging + this.debugMode = true; + this.graphVisualization = null; // Add this line this.init(); } @@ -23,6 +24,13 @@ class ReconTool { init() { this.bindEvents(); this.setupRealtimeElements(); + + // Handle window resize for graph + window.addEventListener('resize', () => { + if (this.graphVisualization) { + this.graphVisualization.handleResize(); + } + }); } setupRealtimeElements() { @@ -43,6 +51,14 @@ class ReconTool { IP Addresses: 0 +
+ Discovery Edges: + 0 +
+
+ Operations: + 0 +
DNS Records: 0 @@ -117,6 +133,27 @@ class ReconTool { this.startScan(); } }); + document.getElementById('showGraphView').addEventListener('click', () => { + this.showGraphView(); + }); + } + + showGraphView() { + if (this.graphVisualization && this.currentScanId) { + document.getElementById('graphSection').style.display = 'block'; + + // Update button states + document.getElementById('showGraphView').classList.add('active'); + document.getElementById('showJson').classList.remove('active'); + document.getElementById('showText').classList.remove('active'); + + // Scroll to graph section + document.getElementById('graphSection').scrollIntoView({ + behavior: 'smooth' + }); + } else { + alert('Graph data not available yet. Please wait for scan completion.'); + } } async startScan() { @@ -137,7 +174,7 @@ class ReconTool { try { // Show progress section this.showProgressSection(); - this.updateProgress(0, 'Starting scan...'); + this.updateProgress(0, 'Starting forensic scan...'); this.debug('Starting scan with data:', scanData); @@ -223,7 +260,7 @@ class ReconTool { // Update progress this.updateProgress(status.progress, status.message); - // Update live stats + // Update live stats (handle new forensic format) if (status.live_stats) { this.debug('Received live stats:', status.live_stats); this.updateLiveStats(status.live_stats); @@ -270,7 +307,7 @@ class ReconTool { this.debug('Received live data:', data); - // Update live discoveries + // Update live discoveries (handle new forensic format) this.updateLiveDiscoveries(data); } catch (error) { @@ -282,17 +319,19 @@ class ReconTool { updateLiveStats(stats) { this.debug('Updating live stats:', stats); - // Update the live statistics counters - const statElements = { - 'liveHostnames': stats.hostnames || 0, - 'liveIPs': stats.ip_addresses || 0, + // Handle both old and new stat formats for compatibility + const statMappings = { + 'liveHostnames': stats.hostnames || stats.hostnames || 0, + 'liveIPs': stats.ip_addresses || stats.ips || 0, + 'liveEdges': stats.discovery_edges || 0, // New forensic field + 'liveOperations': stats.operations_performed || 0, // New forensic field 'liveDNS': stats.dns_records || 0, - 'liveCerts': stats.certificates || 0, + 'liveCerts': stats.certificates_total || stats.certificates || 0, 'liveShodan': stats.shodan_results || 0, 'liveVT': stats.virustotal_results || 0 }; - Object.entries(statElements).forEach(([elementId, value]) => { + Object.entries(statMappings).forEach(([elementId, value]) => { const element = document.getElementById(elementId); if (element) { const currentValue = element.textContent; @@ -315,43 +354,92 @@ class ReconTool { updateLiveDiscoveries(data) { this.debug('Updating live discoveries with data:', data); + // Handle new forensic data format + let hostnames = []; + let ipAddresses = []; + let activities = []; + + // Extract data from forensic format or fallback to old format + if (data.hostnames && Array.isArray(data.hostnames)) { + hostnames = data.hostnames; + } else if (data.stats && data.stats.hostnames) { + // If we only have stats, create a placeholder list + hostnames = [`${data.stats.hostnames} discovered`]; + } + + if (data.ip_addresses && Array.isArray(data.ip_addresses)) { + ipAddresses = data.ip_addresses; + } else if (data.stats && data.stats.ip_addresses) { + // If we only have stats, create a placeholder list + ipAddresses = [`${data.stats.ip_addresses} discovered`]; + } + + // Handle activity log from forensic format + if (data.latest_discoveries && Array.isArray(data.latest_discoveries)) { + activities = data.latest_discoveries; + } + // Update hostnames list const hostnameList = document.querySelector('#recentHostnames .hostname-list'); - if (hostnameList && data.hostnames && data.hostnames.length > 0) { + if (hostnameList && hostnames.length > 0) { // Show last 10 hostnames - const recentHostnames = data.hostnames; + const recentHostnames = hostnames.slice(-10); hostnameList.innerHTML = recentHostnames.map(hostname => `${hostname}` ).join(''); this.debug(`Updated hostname list with ${recentHostnames.length} items`); } else if (hostnameList) { - this.debug(`No hostnames to display (${data.hostnames ? data.hostnames.length : 0} total)`); + this.debug(`No hostnames to display (${hostnames.length} total)`); } // Update IP addresses list const ipList = document.querySelector('#recentIPs .ip-list'); - if (ipList && data.ip_addresses && data.ip_addresses.length > 0) { + if (ipList && ipAddresses.length > 0) { // Show last 10 IPs - const recentIPs = data.ip_addresses; + const recentIPs = ipAddresses.slice(-10); ipList.innerHTML = recentIPs.map(ip => `${ip}` ).join(''); this.debug(`Updated IP list with ${recentIPs.length} items`); } else if (ipList) { - this.debug(`No IPs to display (${data.ip_addresses ? data.ip_addresses.length : 0} total)`); + this.debug(`No IPs to display (${ipAddresses.length} total)`); } // Update activity log const activityList = document.querySelector('#activityLog .activity-list'); - if (activityList && data.latest_discoveries && data.latest_discoveries.length > 0) { - const activities = data.latest_discoveries.slice(-5); // Last 5 activities - activityList.innerHTML = activities.map(activity => { - const time = new Date(activity.timestamp * 1000).toLocaleTimeString(); - return `
[${time}] ${activity.message}
`; - }).join(''); - this.debug(`Updated activity log with ${activities.length} items`); - } else if (activityList) { - this.debug(`No activities to display (${data.latest_discoveries ? data.latest_discoveries.length : 0} total)`); + if (activityList) { + if (activities.length > 0) { + const recentActivities = activities.slice(-5); // Last 5 activities + activityList.innerHTML = recentActivities.map(activity => { + const time = new Date(activity.timestamp * 1000).toLocaleTimeString(); + return `
[${time}] ${activity.message}
`; + }).join(''); + this.debug(`Updated activity log with ${recentActivities.length} items`); + } else { + // Fallback: show generic activity based on stats + const stats = data.stats || {}; + const genericActivities = []; + + if (stats.operations_performed > 0) { + genericActivities.push(`${stats.operations_performed} operations performed`); + } + if (stats.hostnames > 0) { + genericActivities.push(`${stats.hostnames} hostnames discovered`); + } + if (stats.dns_records > 0) { + genericActivities.push(`${stats.dns_records} DNS records collected`); + } + + if (genericActivities.length > 0) { + const now = new Date().toLocaleTimeString(); + activityList.innerHTML = genericActivities.map(activity => + `
[${now}] ${activity}
` + ).join(''); + this.debug(`Updated activity log with ${genericActivities.length} generic items`); + } else { + this.debug('No activities to display'); + } + } } } @@ -374,9 +462,15 @@ class ReconTool { this.debug('Report loaded successfully'); this.showResultsSection(); this.showReport('text'); // Default to text view + + // Load and show graph + if (!this.graphVisualization) { + this.graphVisualization = new GraphVisualization(); + } + await this.graphVisualization.loadAndShowGraph(this.currentScanId); } catch (error) { - console.error('❌ Error loading report:', error); + console.error('⚠️ Error loading report:', error); this.showError(`Error loading report: ${error.message}`); } } @@ -410,7 +504,7 @@ class ReconTool { if (liveSection) { const title = liveSection.querySelector('h3'); if (title) { - title.textContent = 'πŸ“Š Final Discovery Summary'; + title.textContent = 'πŸ“Š Final Forensic Summary'; } liveSection.style.display = 'block'; } @@ -424,7 +518,7 @@ class ReconTool { if (progressMessage) progressMessage.style.display = 'none'; if (scanControls) scanControls.style.display = 'none'; - this.debug('Showing results section with live discoveries'); + this.debug('Showing results section with forensic discoveries'); } resetToForm() { @@ -527,11 +621,11 @@ class ReconTool { content = typeof this.currentReport.json_report === 'string' ? this.currentReport.json_report : JSON.stringify(this.currentReport.json_report, null, 2); - filename = `recon-report-${this.currentScanId}.json`; + filename = `forensic-recon-report-${this.currentScanId}.json`; mimeType = 'application/json'; } else { content = this.currentReport.text_report; - filename = `recon-report-${this.currentScanId}.txt`; + filename = `forensic-recon-report-${this.currentScanId}.txt`; mimeType = 'text/plain'; } @@ -548,8 +642,375 @@ class ReconTool { } } +class GraphVisualization { + constructor() { + this.svg = null; + this.simulation = null; + this.graphData = null; + this.showLabels = false; + this.selectedNode = null; + this.zoom = null; + this.container = null; + } + + async loadAndShowGraph(scanId) { + try { + console.log('πŸ•ΈοΈ Loading graph data...'); + const response = await fetch(`/api/scan/${scanId}/graph`); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + const graphData = await response.json(); + + if (graphData.error) { + throw new Error(graphData.error); + } + + this.graphData = graphData; + this.showGraphSection(); + this.initializeGraph(); + this.updateGraphStats(graphData.stats); + + console.log('βœ… Graph loaded successfully', graphData.stats); + + } catch (error) { + console.error('⚠️ Error loading graph:', error); + alert(`Failed to load graph: ${error.message}`); + } + } + + showGraphSection() { + document.getElementById('graphSection').style.display = 'block'; + this.bindGraphEvents(); + } + + bindGraphEvents() { + document.getElementById('showGraph').addEventListener('click', () => { + this.showGraph(); + }); + + document.getElementById('hideGraph').addEventListener('click', () => { + this.hideGraph(); + }); + + document.getElementById('resetZoom').addEventListener('click', () => { + this.resetZoom(); + }); + + document.getElementById('toggleLabels').addEventListener('click', () => { + this.toggleLabels(); + }); + } + + initializeGraph() { + if (!this.graphData) return; + + // Clear existing graph + d3.select('#discoveryGraph').selectAll('*').remove(); + + // Set up SVG + const container = d3.select('#discoveryGraph'); + const containerNode = container.node(); + const width = containerNode.clientWidth; + const height = containerNode.clientHeight; + + this.svg = container + .attr('width', width) + .attr('height', height); + + // Set up zoom behavior + this.zoom = d3.zoom() + .scaleExtent([0.1, 3]) + .on('zoom', (event) => { + this.container.attr('transform', event.transform); + }); + + this.svg.call(this.zoom); + + // Create container for graph elements + this.container = this.svg.append('g'); + + // Set up force simulation + this.simulation = d3.forceSimulation(this.graphData.nodes) + .force('link', d3.forceLink(this.graphData.edges) + .id(d => d.id) + .distance(80) + .strength(0.5)) + .force('charge', d3.forceManyBody() + .strength(-300) + .distanceMax(400)) + .force('center', d3.forceCenter(width / 2, height / 2)) + .force('collision', d3.forceCollide() + .radius(d => d.size + 5)); + + this.drawGraph(); + this.startSimulation(); + } + + drawGraph() { + // Draw links + const links = this.container.append('g') + .selectAll('line') + .data(this.graphData.edges) + .enter().append('line') + .attr('class', 'graph-link') + .attr('stroke', d => d.color) + .on('mouseover', (event, d) => { + this.showTooltip(event, `Discovery: ${d.method}
From: ${d.source}
To: ${d.target}`); + }) + .on('mouseout', () => { + this.hideTooltip(); + }); + + // Draw nodes + const nodes = this.container.append('g') + .selectAll('circle') + .data(this.graphData.nodes) + .enter().append('circle') + .attr('class', 'graph-node') + .attr('r', d => d.size) + .attr('fill', d => d.color) + .style('opacity', 0.8) + .on('mouseover', (event, d) => { + this.showNodeTooltip(event, d); + this.highlightConnections(d); + }) + .on('mouseout', (event, d) => { + this.hideTooltip(); + this.unhighlightConnections(); + }) + .on('click', (event, d) => { + this.selectNode(d); + }) + .call(d3.drag() + .on('start', (event, d) => { + if (!event.active) this.simulation.alphaTarget(0.3).restart(); + d.fx = d.x; + d.fy = d.y; + }) + .on('drag', (event, d) => { + d.fx = event.x; + d.fy = event.y; + }) + .on('end', (event, d) => { + if (!event.active) this.simulation.alphaTarget(0); + d.fx = null; + d.fy = null; + })); + + // Draw labels (initially hidden) + const labels = this.container.append('g') + .selectAll('text') + .data(this.graphData.nodes) + .enter().append('text') + .attr('class', 'graph-label') + .attr('dy', '.35em') + .style('opacity', this.showLabels ? 1 : 0) + .text(d => d.label); + + // Store references + this.links = links; + this.nodes = nodes; + this.labels = labels; + } + + startSimulation() { + this.simulation.on('tick', () => { + this.links + .attr('x1', d => d.source.x) + .attr('y1', d => d.source.y) + .attr('x2', d => d.target.x) + .attr('y2', d => d.target.y); + + this.nodes + .attr('cx', d => d.x) + .attr('cy', d => d.y); + + this.labels + .attr('x', d => d.x) + .attr('y', d => d.y + d.size + 12); + }); + } + + showNodeTooltip(event, node) { + const tooltip = ` + ${node.label}
+ Depth: ${node.depth}
+ DNS Records: ${node.dns_records}
+ Certificates: ${node.certificates}
+ IPs: ${node.ip_addresses.length}
+ Discovery: ${node.discovery_methods.join(', ')}
+ First Seen: ${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'} + `; + this.showTooltip(event, tooltip); + } + + showTooltip(event, content) { + const tooltip = document.getElementById('graphTooltip'); + tooltip.innerHTML = content; + tooltip.className = 'graph-tooltip visible'; + + const rect = tooltip.getBoundingClientRect(); + const containerRect = document.getElementById('discoveryGraph').getBoundingClientRect(); + + tooltip.style.left = `${event.clientX - containerRect.left + 10}px`; + tooltip.style.top = `${event.clientY - containerRect.top - 10}px`; + + // Adjust position if tooltip goes off screen + if (event.clientX + rect.width > window.innerWidth) { + tooltip.style.left = `${event.clientX - containerRect.left - rect.width - 10}px`; + } + + if (event.clientY - rect.height < 0) { + tooltip.style.top = `${event.clientY - containerRect.top + 20}px`; + } + } + + hideTooltip() { + const tooltip = document.getElementById('graphTooltip'); + tooltip.className = 'graph-tooltip'; + } + + highlightConnections(node) { + // Highlight connected links + this.links + .style('opacity', d => (d.source.id === node.id || d.target.id === node.id) ? 1 : 0.2) + .classed('highlighted', d => d.source.id === node.id || d.target.id === node.id); + + // Highlight connected nodes + this.nodes + .style('opacity', d => { + if (d.id === node.id) return 1; + const connected = this.graphData.edges.some(edge => + (edge.source.id === node.id && edge.target.id === d.id) || + (edge.target.id === node.id && edge.source.id === d.id) + ); + return connected ? 0.8 : 0.3; + }); + } + + unhighlightConnections() { + this.links + .style('opacity', 0.6) + .classed('highlighted', false); + + this.nodes + .style('opacity', 0.8); + } + + selectNode(node) { + // Update selected node styling + this.nodes + .classed('selected', d => d.id === node.id); + + // Show node details + this.showNodeDetails(node); + this.selectedNode = node; + } + + showNodeDetails(node) { + const detailsContainer = document.getElementById('nodeDetails'); + const selectedInfo = document.getElementById('selectedNodeInfo'); + + const details = ` +
+ Hostname: + ${node.label} +
+
+ Discovery Depth: + ${node.depth} +
+
+ DNS Records: + ${node.dns_records} +
+
+ Certificates: + ${node.certificates} +
+
+ IP Addresses: + ${node.ip_addresses.join(', ') || 'None'} +
+
+ Discovery Methods: + ${node.discovery_methods.join(', ')} +
+
+ First Seen: + ${node.first_seen ? new Date(node.first_seen).toLocaleString() : 'Unknown'} +
+ `; + + detailsContainer.innerHTML = details; + selectedInfo.style.display = 'block'; + } + + toggleLabels() { + this.showLabels = !this.showLabels; + if (this.labels) { + this.labels.transition() + .duration(300) + .style('opacity', this.showLabels ? 1 : 0); + } + + const button = document.getElementById('toggleLabels'); + button.textContent = this.showLabels ? 'Hide Labels' : 'Show Labels'; + } + + resetZoom() { + if (this.svg && this.zoom) { + this.svg.transition() + .duration(750) + .call(this.zoom.transform, d3.zoomIdentity); + } + } + + showGraph() { + if (this.container) { + this.container.style('display', 'block'); + } + document.getElementById('showGraph').classList.add('active'); + document.getElementById('hideGraph').classList.remove('active'); + } + + hideGraph() { + if (this.container) { + this.container.style('display', 'none'); + } + document.getElementById('hideGraph').classList.add('active'); + document.getElementById('showGraph').classList.remove('active'); + } + + updateGraphStats(stats) { + document.getElementById('graphNodes').textContent = stats.node_count || 0; + document.getElementById('graphEdges').textContent = stats.edge_count || 0; + document.getElementById('graphDepth').textContent = stats.max_depth || 0; + } + + // Handle window resize + handleResize() { + if (this.svg && this.graphData) { + const containerNode = document.getElementById('discoveryGraph'); + const width = containerNode.clientWidth; + const height = containerNode.clientHeight; + + this.svg + .attr('width', width) + .attr('height', height); + + this.simulation + .force('center', d3.forceCenter(width / 2, height / 2)) + .restart(); + } + } +} + // Initialize the application when DOM is loaded document.addEventListener('DOMContentLoaded', () => { - console.log('🌐 DNS Reconnaissance Tool initialized with debug mode'); + console.log('🌐 Forensic DNS Reconnaissance Tool initialized with debug mode'); new ReconTool(); }); \ No newline at end of file diff --git a/static/style.css b/static/style.css index bd9b69a..3ea3280 100644 --- a/static/style.css +++ b/static/style.css @@ -436,4 +436,253 @@ header p { grid-template-columns: repeat(2, 1fr); gap: 10px; } +} + + +/* Add this CSS to the existing style.css file */ + +/* Graph Section */ +.graph-section { + background: #2a2a2a; + border-radius: 4px; + border: 1px solid #444; + box-shadow: inset 0 0 15px rgba(0,0,0,0.5); + padding: 30px; + margin-bottom: 25px; +} + +.graph-controls { + margin-bottom: 20px; + text-align: center; +} + +.graph-controls button { + margin: 0 5px; +} + +.graph-legend { + display: flex; + justify-content: space-between; + margin-bottom: 20px; + padding: 15px; + background: rgba(0,0,0,0.3); + border-radius: 4px; + border: 1px solid #444; + flex-wrap: wrap; +} + +.graph-legend h4, .graph-legend h5 { + color: #e0e0e0; + margin-bottom: 10px; + text-transform: uppercase; + letter-spacing: 1px; +} + +.legend-items, .legend-methods { + display: flex; + flex-direction: column; + gap: 8px; +} + +.legend-item { + display: flex; + align-items: center; + gap: 10px; + font-size: 0.9rem; + color: #c7c7c7; +} + +.legend-color { + width: 16px; + height: 16px; + border-radius: 50%; + border: 1px solid #666; +} + +.method-item { + font-size: 0.9rem; + color: #a0a0a0; + margin-bottom: 4px; +} + +.graph-container { + position: relative; + width: 100%; + height: 600px; + background: #0a0a0a; + border-radius: 4px; + border: 1px solid #333; + margin-bottom: 20px; + overflow: hidden; +} + +#discoveryGraph { + width: 100%; + height: 100%; + cursor: grab; +} + +#discoveryGraph:active { + cursor: grabbing; +} + +.graph-tooltip { + position: absolute; + background: rgba(0, 0, 0, 0.9); + color: #00ff41; + padding: 10px; + border-radius: 4px; + border: 1px solid #00ff41; + font-family: 'Courier New', monospace; + font-size: 0.8rem; + pointer-events: none; + opacity: 0; + transition: opacity 0.2s ease; + z-index: 1000; + max-width: 300px; + line-height: 1.4; +} + +.graph-tooltip.visible { + opacity: 1; +} + +.graph-info { + display: flex; + justify-content: space-between; + align-items: flex-start; + flex-wrap: wrap; + gap: 20px; +} + +.graph-stats { + display: flex; + gap: 20px; + flex-wrap: wrap; +} + +.selected-node-info { + background: rgba(0, 40, 0, 0.8); + border: 1px solid #00ff41; + border-radius: 4px; + padding: 15px; + min-width: 250px; +} + +.selected-node-info h4 { + color: #00ff41; + margin-bottom: 10px; + text-transform: uppercase; + letter-spacing: 1px; +} + +#nodeDetails { + font-family: 'Courier New', monospace; + font-size: 0.8rem; + line-height: 1.4; + color: #c7c7c7; +} + +#nodeDetails .detail-item { + margin-bottom: 8px; + border-bottom: 1px solid #333; + padding-bottom: 4px; +} + +#nodeDetails .detail-item:last-child { + border-bottom: none; +} + +#nodeDetails .detail-label { + color: #00ff41; + font-weight: bold; +} + +#nodeDetails .detail-value { + color: #c7c7c7; + margin-left: 10px; +} + +/* Graph Nodes and Links Styling (applied via D3) */ +.graph-node { + stroke: #333; + stroke-width: 2px; + cursor: pointer; + transition: all 0.3s ease; +} + +.graph-node:hover { + stroke: #fff; + stroke-width: 3px; +} + +.graph-node.selected { + stroke: #ff9900; + stroke-width: 4px; +} + +.graph-link { + stroke-opacity: 0.6; + stroke-width: 2px; + transition: all 0.3s ease; +} + +.graph-link:hover { + stroke-opacity: 1; + stroke-width: 3px; +} + +.graph-link.highlighted { + stroke-opacity: 1; + stroke-width: 3px; +} + +.graph-label { + font-family: 'Courier New', monospace; + font-size: 10px; + fill: #c7c7c7; + text-anchor: middle; + pointer-events: none; + opacity: 0.8; +} + +.graph-label.visible { + opacity: 1; +} + +/* Responsive adjustments for graph */ +@media (max-width: 768px) { + .graph-container { + height: 400px; + } + + .graph-legend { + flex-direction: column; + gap: 15px; + } + + .legend-items, .legend-methods { + flex-direction: row; + flex-wrap: wrap; + gap: 15px; + } + + .graph-info { + flex-direction: column; + align-items: stretch; + } + + .graph-stats { + justify-content: center; + } + + .selected-node-info { + min-width: auto; + width: 100%; + } + + .graph-tooltip { + font-size: 0.7rem; + max-width: 250px; + } } \ No newline at end of file diff --git a/templates/index.html b/templates/index.html index eff9007..6bcfd21 100644 --- a/templates/index.html +++ b/templates/index.html @@ -65,6 +65,7 @@
+
@@ -73,8 +74,46 @@

             
- + + + + + \ No newline at end of file