diff --git a/src/certificate_checker.py b/src/certificate_checker.py index 73cea67..d91997d 100644 --- a/src/certificate_checker.py +++ b/src/certificate_checker.py @@ -1,5 +1,5 @@ # File: src/certificate_checker.py -"""Certificate transparency log checker using crt.sh.""" +"""Certificate transparency log checker using crt.sh with minimal query caching.""" import requests import json @@ -15,7 +15,7 @@ from .config import Config logger = logging.getLogger(__name__) class CertificateChecker: - """Check certificates using crt.sh.""" + """Check certificates using crt.sh with simple query caching to prevent duplicate HTTP requests.""" CRT_SH_URL = "https://crt.sh/" @@ -24,27 +24,26 @@ class CertificateChecker: self.last_request = 0 self.query_count = 0 self.connection_failures = 0 - self.max_connection_failures = 3 # Stop trying after 3 consecutive failures + self.max_connection_failures = 3 - logger.info("πŸ” Certificate checker initialized") + # Simple HTTP request cache to avoid duplicate queries + self._http_cache = {} # query_string -> List[Certificate] - # Test connectivity to crt.sh on initialization + logger.info("Certificate checker initialized with HTTP request caching") self._test_connectivity() def _test_connectivity(self): """Test if we can reach crt.sh.""" try: - logger.info("πŸ”— Testing connectivity to crt.sh...") + logger.info("Testing connectivity to crt.sh...") - # First test DNS resolution try: socket.gethostbyname('crt.sh') - logger.debug("βœ… DNS resolution for crt.sh successful") + logger.debug("DNS resolution for crt.sh successful") except socket.gaierror as e: - logger.warning(f"⚠️ DNS resolution failed for crt.sh: {e}") + logger.warning(f"DNS resolution failed for crt.sh: {e}") return False - # Test HTTP connection with a simple request response = requests.get( self.CRT_SH_URL, params={'q': 'example.com', 'output': 'json'}, @@ -52,21 +51,21 @@ class CertificateChecker: headers={'User-Agent': 'DNS-Recon-Tool/1.0'} ) - if response.status_code in [200, 404]: # 404 is also acceptable (no results) - logger.info("βœ… crt.sh connectivity test successful") + if response.status_code in [200, 404]: + logger.info("crt.sh connectivity test successful") return True else: - logger.warning(f"⚠️ crt.sh returned status {response.status_code}") + logger.warning(f"crt.sh returned status {response.status_code}") return False except requests.exceptions.ConnectionError as e: - logger.warning(f"⚠️ Cannot reach crt.sh: {e}") + logger.warning(f"Cannot reach crt.sh: {e}") return False except requests.exceptions.Timeout: - logger.warning("⚠️ crt.sh connectivity test timed out") + logger.warning("crt.sh connectivity test timed out") return False except Exception as e: - logger.warning(f"⚠️ Unexpected error testing crt.sh connectivity: {e}") + logger.warning(f"Unexpected error testing crt.sh connectivity: {e}") return False def _rate_limit(self): @@ -77,52 +76,66 @@ class CertificateChecker: if time_since_last < min_interval: sleep_time = min_interval - time_since_last - logger.debug(f"⏸️ crt.sh rate limiting: sleeping for {sleep_time:.2f}s") + logger.debug(f"crt.sh rate limiting: sleeping for {sleep_time:.2f}s") time.sleep(sleep_time) self.last_request = time.time() self.query_count += 1 def get_certificates(self, domain: str) -> List[Certificate]: - """Get certificates for a domain from crt.sh.""" - logger.debug(f"πŸ” Getting certificates for domain: {domain}") + """Get certificates for a domain - EXACTLY the same behavior as original, just with HTTP caching.""" + logger.debug(f"Getting certificates for domain: {domain}") - # Skip if we've had too many connection failures if self.connection_failures >= self.max_connection_failures: - logger.warning(f"⚠️ Skipping certificate lookup for {domain} due to repeated connection failures") + logger.warning(f"Skipping certificate lookup for {domain} due to repeated connection failures") return [] certificates = [] - # Query for the domain + # Query for the domain itself domain_certs = self._query_crt_sh(domain) certificates.extend(domain_certs) - # Also query for wildcard certificates (if the main query succeeded) - if domain_certs or self.connection_failures < self.max_connection_failures: - wildcard_certs = self._query_crt_sh(f"%.{domain}") - certificates.extend(wildcard_certs) + # Query for wildcard certificates + wildcard_certs = self._query_crt_sh(f"%.{domain}") + certificates.extend(wildcard_certs) # Remove duplicates based on certificate ID unique_certs = {cert.id: cert for cert in certificates} final_certs = list(unique_certs.values()) if final_certs: - logger.info(f"πŸ“œ Found {len(final_certs)} unique certificates for {domain}") + logger.info(f"Found {len(final_certs)} unique certificates for {domain}") else: - logger.debug(f"❌ No certificates found for {domain}") + logger.debug(f"No certificates found for {domain}") return final_certs def _query_crt_sh(self, query: str) -> List[Certificate]: - """Query crt.sh API with retry logic and better error handling.""" + """Query crt.sh API with HTTP caching to avoid duplicate requests.""" + + # Check HTTP cache first + if query in self._http_cache: + logger.debug(f"Using cached HTTP result for crt.sh query: {query}") + return self._http_cache[query] + + # Not cached, make the HTTP request + certificates = self._make_http_request(query) + + # Cache the HTTP result + self._http_cache[query] = certificates + + return certificates + + def _make_http_request(self, query: str) -> List[Certificate]: + """Make actual HTTP request to crt.sh API with retry logic.""" certificates = [] self._rate_limit() - logger.debug(f"πŸ“‘ Querying crt.sh for: {query}") + logger.debug(f"Making HTTP request to crt.sh for: {query}") - max_retries = 2 # Reduced retries for faster failure - backoff_delays = [1, 3] # Shorter delays + max_retries = 2 + backoff_delays = [1, 3] for attempt in range(max_retries): try: @@ -138,16 +151,15 @@ class CertificateChecker: headers={'User-Agent': 'DNS-Recon-Tool/1.0'} ) - logger.debug(f"πŸ“‘ crt.sh API response for {query}: {response.status_code}") + logger.debug(f"crt.sh API response for {query}: {response.status_code}") if response.status_code == 200: try: data = response.json() - logger.debug(f"πŸ“Š crt.sh returned {len(data)} certificate entries for {query}") + logger.debug(f"crt.sh returned {len(data)} certificate entries for {query}") for cert_data in data: try: - # Parse dates with better error handling not_before = self._parse_date(cert_data.get('not_before')) not_after = self._parse_date(cert_data.get('not_after')) @@ -161,41 +173,39 @@ class CertificateChecker: is_wildcard='*.' in cert_data.get('name_value', '') ) certificates.append(certificate) - logger.debug(f"βœ… Parsed certificate ID {certificate.id} for {query}") + logger.debug(f"Parsed certificate ID {certificate.id} for {query}") else: - logger.debug(f"⚠️ Skipped certificate with invalid dates: {cert_data.get('id')}") + logger.debug(f"Skipped certificate with invalid dates: {cert_data.get('id')}") except (ValueError, TypeError, KeyError) as e: - logger.debug(f"⚠️ Error parsing certificate data: {e}") - continue # Skip malformed certificate data + logger.debug(f"Error parsing certificate data: {e}") + continue - # Success! Reset connection failure counter self.connection_failures = 0 - logger.info(f"βœ… Successfully processed {len(certificates)} certificates from crt.sh for {query}") + logger.info(f"Successfully processed {len(certificates)} certificates from crt.sh for {query}") return certificates except json.JSONDecodeError as e: - logger.warning(f"❌ Invalid JSON response from crt.sh for {query}: {e}") + logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue return certificates elif response.status_code == 404: - # 404 is normal - no certificates found - logger.debug(f"ℹ️ No certificates found for {query} (404)") - self.connection_failures = 0 # Reset counter for successful connection + logger.debug(f"No certificates found for {query} (404)") + self.connection_failures = 0 return certificates elif response.status_code == 429: - logger.warning(f"⚠️ crt.sh rate limit exceeded for {query}") + logger.warning(f"crt.sh rate limit exceeded for {query}") if attempt < max_retries - 1: - time.sleep(5) # Wait longer for rate limits + time.sleep(5) continue return certificates else: - logger.warning(f"⚠️ crt.sh HTTP error for {query}: {response.status_code}") + logger.warning(f"crt.sh HTTP error for {query}: {response.status_code}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue @@ -203,9 +213,8 @@ class CertificateChecker: except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: error_type = "Connection Error" if isinstance(e, requests.exceptions.ConnectionError) else "Timeout" - logger.warning(f"🌐 crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}") + logger.warning(f"crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}") - # Track connection failures if isinstance(e, requests.exceptions.ConnectionError): self.connection_failures += 1 @@ -214,18 +223,17 @@ class CertificateChecker: continue except requests.exceptions.RequestException as e: - logger.warning(f"🌐 crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}") + logger.warning(f"crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue except Exception as e: - logger.error(f"❌ Unexpected error querying crt.sh for {query}: {e}") + logger.error(f"Unexpected error querying crt.sh for {query}: {e}") if attempt < max_retries - 1: time.sleep(backoff_delays[attempt]) continue - # If we get here, all retries failed - logger.warning(f"❌ All {max_retries} attempts failed for crt.sh query: {query}") + logger.warning(f"All {max_retries} attempts failed for crt.sh query: {query}") return certificates def _parse_date(self, date_str: str) -> Optional[datetime]: @@ -233,13 +241,12 @@ class CertificateChecker: if not date_str: return None - # Common date formats from crt.sh date_formats = [ - '%Y-%m-%dT%H:%M:%S', # ISO format without timezone - '%Y-%m-%dT%H:%M:%SZ', # ISO format with Z - '%Y-%m-%d %H:%M:%S', # Space separated - '%Y-%m-%dT%H:%M:%S.%f', # With microseconds - '%Y-%m-%dT%H:%M:%S.%fZ', # With microseconds and Z + '%Y-%m-%dT%H:%M:%S', + '%Y-%m-%dT%H:%M:%SZ', + '%Y-%m-%d %H:%M:%S', + '%Y-%m-%dT%H:%M:%S.%f', + '%Y-%m-%dT%H:%M:%S.%fZ', ] for fmt in date_formats: @@ -248,24 +255,22 @@ class CertificateChecker: except ValueError: continue - # Try with timezone info try: return datetime.fromisoformat(date_str.replace('Z', '+00:00')) except ValueError: pass - logger.debug(f"⚠️ Could not parse date: {date_str}") + logger.debug(f"Could not parse date: {date_str}") return None def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]: - """Extract subdomains from certificate subjects.""" + """Extract subdomains from certificate subjects - EXACTLY the same as original.""" subdomains = set() - logger.debug(f"🌿 Extracting subdomains from {len(certificates)} certificates") + logger.debug(f"Extracting subdomains from {len(certificates)} certificates") for cert in certificates: # Parse subject field for domain names - # Certificate subjects can be multi-line with multiple domains subject_lines = cert.subject.split('\n') for line in subject_lines: @@ -273,39 +278,36 @@ class CertificateChecker: # Skip wildcard domains for recursion (they don't resolve directly) if line.startswith('*.'): - logger.debug(f"🌿 Skipping wildcard domain: {line}") + logger.debug(f"Skipping wildcard domain: {line}") continue if self._is_valid_domain(line): subdomains.add(line.lower()) - logger.debug(f"🌿 Found subdomain from certificate: {line}") + logger.debug(f"Found subdomain from certificate: {line}") if subdomains: - logger.info(f"🌿 Extracted {len(subdomains)} subdomains from certificates") + logger.info(f"Extracted {len(subdomains)} subdomains from certificates") else: - logger.debug("❌ No subdomains extracted from certificates") + logger.debug("No subdomains extracted from certificates") return subdomains def _is_valid_domain(self, domain: str) -> bool: - """Basic domain validation.""" + """Basic domain validation - EXACTLY the same as original.""" if not domain or '.' not in domain: return False - # Remove common prefixes domain = domain.lower().strip() if domain.startswith('www.'): domain = domain[4:] - # Basic validation if len(domain) < 3 or len(domain) > 255: return False # Must not be an IP address try: - import socket socket.inet_aton(domain) - return False # It's an IPv4 address + return False except socket.error: pass @@ -314,7 +316,6 @@ class CertificateChecker: if len(parts) < 2: return False - # Each part should be reasonable for part in parts: if len(part) < 1 or len(part) > 63: return False diff --git a/src/main.py b/src/main.py index 9965ce9..299a696 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,5 @@ # File: src/main.py -"""Main CLI interface for the reconnaissance tool.""" +"""Main CLI interface for the reconnaissance tool with two-mode operation.""" import click import json @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) @click.option('--web', is_flag=True, help='Start web interface instead of CLI') @click.option('--shodan-key', help='Shodan API key') @click.option('--virustotal-key', help='VirusTotal API key') -@click.option('--max-depth', default=2, help='Maximum recursion depth (default: 2)') +@click.option('--max-depth', default=2, help='Maximum recursion depth for full domain mode (default: 2)') @click.option('--output', '-o', help='Output file prefix (will create .json and .txt files)') @click.option('--json-only', is_flag=True, help='Only output JSON') @click.option('--text-only', is_flag=True, help='Only output text report') @@ -27,13 +27,23 @@ logger = logging.getLogger(__name__) @click.option('--verbose', '-v', is_flag=True, help='Enable verbose logging (DEBUG level)') @click.option('--quiet', '-q', is_flag=True, help='Quiet mode (WARNING level only)') def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, text_only, port, verbose, quiet): - """DNS Reconnaissance Tool + """DNS Reconnaissance Tool - Two-Mode Operation + + MODE 1 - Hostname-only (e.g., 'cc24'): + Expands hostname to all TLDs (cc24.com, cc24.net, etc.) + No recursive enumeration to avoid third-party infrastructure noise + Perfect for discovering domains using a specific hostname + + MODE 2 - Full domain (e.g., 'cc24.com'): + Full recursive reconnaissance with subdomain discovery + Maps complete infrastructure of the specified domain + Uses max-depth for recursive enumeration Examples: - recon example.com # Scan example.com - recon example # Try example.* for all TLDs - recon example.com --max-depth 3 # Deeper recursion - recon example.com -v # Verbose logging + recon cc24 # Mode 1: Find all cc24.* domains (no recursion) + recon cc24.com # Mode 2: Map cc24.com infrastructure (with recursion) + recon cc24.com --max-depth 3 # Mode 2: Deeper recursive enumeration + recon cc24 -v # Mode 1: Verbose TLD expansion recon --web # Start web interface """ @@ -51,18 +61,18 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, if web: # Start web interface - logger.info("🌐 Starting web interface...") + logger.info("Starting web interface...") app = create_app(config) - logger.info(f"πŸš€ Web interface starting on http://0.0.0.0:{port}") + logger.info(f"Web interface starting on http://0.0.0.0:{port}") app.run(host='0.0.0.0', port=port, debug=False) # Changed debug to False to reduce noise return if not target: - click.echo("❌ Error: TARGET is required for CLI mode. Use --web for web interface.") + click.echo("Error: TARGET is required for CLI mode. Use --web for web interface.") sys.exit(1) # Initialize reconnaissance engine - logger.info("πŸ”§ Initializing reconnaissance engine...") + logger.info("Initializing reconnaissance engine...") engine = ReconnaissanceEngine(config) # Set up progress callback for CLI @@ -76,54 +86,62 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, # Display startup information click.echo("=" * 60) - click.echo("πŸ” DNS RECONNAISSANCE TOOL") + click.echo("DNS RECONNAISSANCE TOOL") click.echo("=" * 60) - click.echo(f"🎯 Target: {target}") - click.echo(f"πŸ“Š Max recursion depth: {max_depth}") - click.echo(f"🌐 DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}") - click.echo(f"⚑ DNS rate limit: {config.DNS_RATE_LIMIT}/s") + click.echo(f"Target: {target}") + + # Show operation mode + if '.' in target: + click.echo(f"Mode: Full domain reconnaissance (recursive depth: {max_depth})") + click.echo(" β†’ Will map complete infrastructure of the specified domain") + else: + click.echo(f"Mode: Hostname-only reconnaissance (TLD expansion)") + click.echo(" β†’ Will find all domains using this hostname (no recursion)") + + click.echo(f"DNS servers: {', '.join(config.DNS_SERVERS[:3])}{'...' if len(config.DNS_SERVERS) > 3 else ''}") + click.echo(f"DNS rate limit: {config.DNS_RATE_LIMIT}/s") if shodan_key: - click.echo("βœ… Shodan integration enabled") - logger.info(f"πŸ•΅οΈ Shodan API key provided (ends with: ...{shodan_key[-4:] if len(shodan_key) > 4 else shodan_key})") + click.echo("Shodan integration enabled") + logger.info(f"Shodan API key provided (ends with: ...{shodan_key[-4:] if len(shodan_key) > 4 else shodan_key})") else: - click.echo("⚠️ Shodan integration disabled (no API key)") + click.echo("Shodan integration disabled (no API key)") if virustotal_key: - click.echo("βœ… VirusTotal integration enabled") - logger.info(f"πŸ›‘οΈ VirusTotal API key provided (ends with: ...{virustotal_key[-4:] if len(virustotal_key) > 4 else virustotal_key})") + click.echo("VirusTotal integration enabled") + logger.info(f"VirusTotal API key provided (ends with: ...{virustotal_key[-4:] if len(virustotal_key) > 4 else virustotal_key})") else: - click.echo("⚠️ VirusTotal integration disabled (no API key)") + click.echo("VirusTotal integration disabled (no API key)") click.echo("") # Run reconnaissance try: - logger.info(f"πŸš€ Starting reconnaissance for target: {target}") + logger.info(f"Starting reconnaissance for target: {target}") data = engine.run_reconnaissance(target) # Display final statistics stats = data.get_stats() click.echo("") click.echo("=" * 60) - click.echo("πŸ“Š RECONNAISSANCE COMPLETE") + click.echo("RECONNAISSANCE COMPLETE") click.echo("=" * 60) - click.echo(f"🏠 Hostnames discovered: {stats['hostnames']}") - click.echo(f"🌐 IP addresses found: {stats['ip_addresses']}") - click.echo(f"πŸ“‹ DNS records collected: {stats['dns_records']}") - click.echo(f"πŸ“œ Certificates found: {stats['certificates']}") - click.echo(f"πŸ•΅οΈ Shodan results: {stats['shodan_results']}") - click.echo(f"πŸ›‘οΈ VirusTotal results: {stats['virustotal_results']}") + click.echo(f"Hostnames discovered: {stats['hostnames']}") + click.echo(f"IP addresses found: {stats['ip_addresses']}") + click.echo(f"DNS records collected: {stats['dns_records']}") + click.echo(f"Certificates found: {stats['certificates']}") + click.echo(f"Shodan results: {stats['shodan_results']}") + click.echo(f"VirusTotal results: {stats['virustotal_results']}") # Calculate and display timing if data.end_time and data.start_time: duration = data.end_time - data.start_time - click.echo(f"⏱️ Total time: {duration}") + click.echo(f"Total time: {duration}") click.echo("") # Generate reports - logger.info("πŸ“„ Generating reports...") + logger.info("Generating reports...") report_gen = ReportGenerator(data) if output: @@ -137,9 +155,9 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, with open(json_file, 'w', encoding='utf-8') as f: f.write(json_content) saved_files.append(json_file) - logger.info(f"πŸ’Ύ JSON report saved: {json_file}") + logger.info(f"JSON report saved: {json_file}") except Exception as e: - logger.error(f"❌ Failed to save JSON report: {e}") + logger.error(f"Failed to save JSON report: {e}") if not json_only: text_file = f"{output}.txt" @@ -147,14 +165,14 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, with open(text_file, 'w', encoding='utf-8') as f: f.write(report_gen.generate_text_report()) saved_files.append(text_file) - logger.info(f"πŸ’Ύ Text report saved: {text_file}") + logger.info(f"Text report saved: {text_file}") except Exception as e: - logger.error(f"❌ Failed to save text report: {e}") + logger.error(f"Failed to save text report: {e}") if saved_files: - click.echo(f"πŸ’Ύ Reports saved:") + click.echo(f"Reports saved:") for file in saved_files: - click.echo(f" πŸ“„ {file}") + click.echo(f" {file}") else: # Output to stdout @@ -162,31 +180,31 @@ def main(target, web, shodan_key, virustotal_key, max_depth, output, json_only, try: click.echo(data.to_json()) except Exception as e: - logger.error(f"❌ Failed to generate JSON output: {e}") + logger.error(f"Failed to generate JSON output: {e}") click.echo(f"Error generating JSON: {e}") elif text_only: try: click.echo(report_gen.generate_text_report()) except Exception as e: - logger.error(f"❌ Failed to generate text report: {e}") + logger.error(f"Failed to generate text report: {e}") click.echo(f"Error generating text report: {e}") else: # Default: show text report try: click.echo(report_gen.generate_text_report()) - click.echo(f"\nπŸ’‘ To get JSON output, use: --json-only") - click.echo(f"πŸ’‘ To save reports, use: --output filename") + click.echo(f"\nTo get JSON output, use: --json-only") + click.echo(f"To save reports, use: --output filename") except Exception as e: - logger.error(f"❌ Failed to generate report: {e}") + logger.error(f"Failed to generate report: {e}") click.echo(f"Error generating report: {e}") except KeyboardInterrupt: - logger.warning("⚠️ Reconnaissance interrupted by user") - click.echo("\n⚠️ Reconnaissance interrupted by user.") + logger.warning("Reconnaissance interrupted by user") + click.echo("\nReconnaissance interrupted by user.") sys.exit(1) except Exception as e: - logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True) - click.echo(f"❌ Error during reconnaissance: {e}") + logger.error(f"Error during reconnaissance: {e}", exc_info=True) + click.echo(f"Error during reconnaissance: {e}") if verbose: raise # Re-raise in verbose mode to show full traceback sys.exit(1) diff --git a/src/reconnaissance.py b/src/reconnaissance.py index 512b485..3ff2948 100644 --- a/src/reconnaissance.py +++ b/src/reconnaissance.py @@ -1,5 +1,5 @@ # File: src/reconnaissance.py -"""Main reconnaissance logic with enhanced TLD expansion.""" +"""Main reconnaissance logic with two-mode operation for hostname vs domain targets.""" import threading import concurrent.futures @@ -18,7 +18,7 @@ from .tld_fetcher import TLDFetcher logger = logging.getLogger(__name__) class ReconnaissanceEngine: - """Main reconnaissance engine with smart TLD expansion.""" + """Main reconnaissance engine with two-mode operation: hostname-only vs full domain.""" def __init__(self, config: Config): self.config = config @@ -32,16 +32,16 @@ class ReconnaissanceEngine: self.shodan_client = None if config.shodan_key: self.shodan_client = ShodanClient(config.shodan_key, config) - logger.info("βœ… Shodan client initialized") + logger.info("Shodan client initialized") else: - logger.info("⚠️ Shodan API key not provided, skipping Shodan integration") + logger.info("Shodan API key not provided, skipping Shodan integration") self.virustotal_client = None if config.virustotal_key: self.virustotal_client = VirusTotalClient(config.virustotal_key, config) - logger.info("βœ… VirusTotal client initialized") + logger.info("VirusTotal client initialized") else: - logger.info("⚠️ VirusTotal API key not provided, skipping VirusTotal integration") + logger.info("VirusTotal API key not provided, skipping VirusTotal integration") # Progress tracking self.progress_callback = None @@ -57,7 +57,7 @@ class ReconnaissanceEngine: def set_shared_data(self, shared_data: ReconData): """Set shared data object for live updates during web interface usage.""" self.shared_data = shared_data - logger.info("πŸ“Š Using shared data object for live updates") + logger.info("Using shared data object for live updates") def _update_progress(self, message: str, percentage: int = None): """Update progress if callback is set.""" @@ -66,73 +66,92 @@ class ReconnaissanceEngine: self.progress_callback(message, percentage) def run_reconnaissance(self, target: str) -> ReconData: - """Run full reconnaissance on target.""" + """Run reconnaissance on target using appropriate mode based on input type.""" # Use shared data object if available, otherwise create new one if self.shared_data is not None: self.data = self.shared_data - logger.info("πŸ“Š Using shared data object for reconnaissance") + logger.info("Using shared data object for reconnaissance") else: self.data = ReconData() - logger.info("πŸ“Š Created new data object for reconnaissance") + logger.info("Created new data object for reconnaissance") self.data.start_time = datetime.now() - logger.info(f"πŸš€ Starting reconnaissance for target: {target}") - logger.info(f"πŸ“Š Configuration: max_depth={self.config.max_depth}, " + logger.info(f"Starting reconnaissance for target: {target}") + logger.info(f"Configuration: max_depth={self.config.max_depth}, " f"DNS_rate={self.config.DNS_RATE_LIMIT}/s") + # Store original max_depth for potential restoration + original_max_depth = self.config.max_depth + reconnaissance_mode = "full_domain" if '.' in target else "hostname_only" + try: - # Determine if target is hostname.tld or just hostname + # Determine operation mode based on target format if '.' in target: - logger.info(f"🎯 Target '{target}' appears to be a full domain name") + logger.info(f"Target '{target}' appears to be a full domain name") + logger.info(f"Mode: Full domain reconnaissance with recursive enumeration (max_depth={self.config.max_depth})") self._update_progress(f"Starting reconnaissance for {target}", 0) self.data.add_hostname(target, 0) initial_targets = {target} else: - logger.info(f"πŸ” Target '{target}' appears to be a hostname, expanding to all TLDs") + logger.info(f"Target '{target}' appears to be a hostname, expanding to all TLDs") + logger.info(f"Mode: Hostname-only reconnaissance - TLD expansion without recursion") self._update_progress(f"Expanding {target} to all TLDs", 5) initial_targets = self._expand_hostname_to_tlds_smart(target) - logger.info(f"πŸ“‹ Found {len(initial_targets)} valid domains after TLD expansion") + + # Override max_depth for hostname-only queries to prevent infrastructure noise + self.config.max_depth = 0 + logger.info(f"Found {len(initial_targets)} valid domains after TLD expansion") + logger.info(f"Set max_depth=0 for hostname-only reconnaissance (avoiding third-party infrastructure)") self._update_progress("Resolving initial targets", 10) - # Process all targets recursively + # Process all targets with appropriate recursion depth self._process_targets_recursively(initial_targets) # Final external lookups self._update_progress("Performing external service lookups", 90) self._perform_external_lookups() - # Log final statistics + # Log final statistics with reconnaissance mode stats = self.data.get_stats() - logger.info(f"πŸ“ˆ Final statistics: {stats}") + logger.info(f"Final statistics ({reconnaissance_mode}): {stats}") + + if reconnaissance_mode == "hostname_only": + logger.info(f"Hostname-only reconnaissance complete: discovered {stats['hostnames']} domains using '{target}' hostname") + logger.info(f"To perform recursive enumeration on specific domains, run with full domain names (e.g., '{target}.com')") + else: + logger.info(f"Full domain reconnaissance complete with recursive depth {original_max_depth}") self._update_progress("Reconnaissance complete", 100) except Exception as e: - logger.error(f"❌ Error during reconnaissance: {e}", exc_info=True) + logger.error(f"Error during reconnaissance: {e}", exc_info=True) raise finally: + # Restore original max_depth (though this engine instance is typically discarded) + self.config.max_depth = original_max_depth + self.data.end_time = datetime.now() duration = self.data.end_time - self.data.start_time - logger.info(f"⏱️ Total reconnaissance time: {duration}") + logger.info(f"Total reconnaissance time: {duration}") return self.data def _expand_hostname_to_tlds_smart(self, hostname: str) -> Set[str]: """Smart TLD expansion with prioritization and parallel processing.""" - logger.info(f"🌐 Starting smart TLD expansion for hostname: {hostname}") + logger.info(f"Starting smart TLD expansion for hostname: {hostname}") # Get prioritized TLD lists priority_tlds, normal_tlds, deprioritized_tlds = self.tld_fetcher.get_prioritized_tlds() - logger.info(f"πŸ“Š TLD categories: {len(priority_tlds)} priority, " + logger.info(f"TLD categories: {len(priority_tlds)} priority, " f"{len(normal_tlds)} normal, {len(deprioritized_tlds)} deprioritized") valid_domains = set() # Phase 1: Check priority TLDs first (parallel processing) - logger.info("πŸš€ Phase 1: Checking priority TLDs...") + logger.info("Phase 1: Checking priority TLDs...") priority_results = self._check_tlds_parallel(hostname, priority_tlds, "priority") valid_domains.update(priority_results) @@ -140,37 +159,36 @@ class ReconnaissanceEngine: # Phase 2: Check normal TLDs (if we found fewer than 5 results) if len(valid_domains) < 5: - logger.info("πŸ” Phase 2: Checking normal TLDs...") + logger.info("Phase 2: Checking normal TLDs...") normal_results = self._check_tlds_parallel(hostname, normal_tlds, "normal") valid_domains.update(normal_results) self._update_progress(f"Phase 2 complete: {len(normal_results)} normal TLD matches", 8) else: - logger.info(f"⏭️ Skipping normal TLDs (found {len(valid_domains)} matches in priority)") + logger.info(f"Skipping normal TLDs (found {len(valid_domains)} matches in priority)") # Phase 3: Check deprioritized TLDs only if we found very few results if len(valid_domains) < 2: - logger.info("πŸ” Phase 3: Checking deprioritized TLDs (limited results so far)...") + logger.info("Phase 3: Checking deprioritized TLDs (limited results so far)...") depri_results = self._check_tlds_parallel(hostname, deprioritized_tlds, "deprioritized") valid_domains.update(depri_results) self._update_progress(f"Phase 3 complete: {len(depri_results)} deprioritized TLD matches", 9) else: - logger.info(f"⏭️ Skipping deprioritized TLDs (found {len(valid_domains)} matches already)") + logger.info(f"Skipping deprioritized TLDs (found {len(valid_domains)} matches already)") - logger.info(f"🎯 Smart TLD expansion complete: found {len(valid_domains)} valid domains") + logger.info(f"Smart TLD expansion complete: found {len(valid_domains)} valid domains") return valid_domains def _check_tlds_parallel(self, hostname: str, tlds: List[str], phase_name: str) -> Set[str]: """Check TLDs in parallel with optimized settings.""" valid_domains = set() tested_count = 0 - wildcard_detected = set() # Use thread pool for parallel processing max_workers = min(20, len(tlds)) # Limit concurrent requests - logger.info(f"⚑ Starting parallel check of {len(tlds)} {phase_name} TLDs " + logger.info(f"Starting parallel check of {len(tlds)} {phase_name} TLDs " f"with {max_workers} workers") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -191,8 +209,7 @@ class ReconnaissanceEngine: if result: full_hostname, ips = result - - logger.info(f"βœ… Valid domain found: {full_hostname} -> {ips}") + logger.info(f"Valid domain found: {full_hostname} -> {ips}") self.data.add_hostname(full_hostname, 0) valid_domains.add(full_hostname) @@ -201,18 +218,17 @@ class ReconnaissanceEngine: # Progress update every 50 TLDs in this phase if tested_count % 50 == 0: - logger.info(f"πŸ“Š {phase_name.title()} phase progress: " + logger.info(f"{phase_name.title()} phase progress: " f"{tested_count}/{len(tlds)} tested, " f"{len(valid_domains)} found") except concurrent.futures.TimeoutError: - logger.debug(f"⏱️ Timeout checking {hostname}.{tld}") + logger.debug(f"Timeout checking {hostname}.{tld}") except Exception as e: - logger.debug(f"⚠️ Error checking {hostname}.{tld}: {e}") + logger.debug(f"Error checking {hostname}.{tld}: {e}") - logger.info(f"πŸ“Š {phase_name.title()} phase complete: " - f"tested {tested_count} TLDs, found {len(valid_domains)} valid domains, " - f"detected {len(wildcard_detected)} wildcards") + logger.info(f"{phase_name.title()} phase complete: " + f"tested {tested_count} TLDs, found {len(valid_domains)} valid domains") return valid_domains @@ -224,7 +240,7 @@ class ReconnaissanceEngine: ips = self.dns_resolver.resolve_hostname_fast(full_hostname) if ips: - logger.debug(f"βœ… {full_hostname} -> {ips}") + logger.debug(f"{full_hostname} -> {ips}") return (full_hostname, ips) return None @@ -234,13 +250,13 @@ class ReconnaissanceEngine: current_depth = 0 while current_depth <= self.config.max_depth and targets: - logger.info(f"πŸ”„ Processing depth {current_depth} with {len(targets)} targets") + logger.info(f"Processing depth {current_depth} with {len(targets)} targets") self._update_progress(f"Processing depth {current_depth} ({len(targets)} targets)", 15 + (current_depth * 25)) new_targets = set() for target in targets: - logger.debug(f"🎯 Processing target: {target}") + logger.debug(f"Processing target: {target}") # DNS resolution and record gathering self._process_single_target(target, current_depth) @@ -248,25 +264,25 @@ class ReconnaissanceEngine: # Extract new subdomains if current_depth < self.config.max_depth: new_subdomains = self._extract_new_subdomains(target) - logger.debug(f"🌿 Found {len(new_subdomains)} new subdomains from {target}") + logger.debug(f"Found {len(new_subdomains)} new subdomains from {target}") for subdomain in new_subdomains: self.data.add_hostname(subdomain, current_depth + 1) new_targets.add(subdomain) - logger.info(f"πŸ“Š Depth {current_depth} complete. Found {len(new_targets)} new targets for next depth") + logger.info(f"Depth {current_depth} complete. Found {len(new_targets)} new targets for next depth") targets = new_targets current_depth += 1 - logger.info(f"🏁 Recursive processing complete after {current_depth} levels") + logger.info(f"Recursive processing complete after {current_depth} levels") def _process_single_target(self, hostname: str, depth: int): """Process a single target hostname.""" - logger.debug(f"🎯 Processing single target: {hostname} at depth {depth}") + logger.debug(f"Processing single target: {hostname} at depth {depth}") # Get all DNS records dns_records = self.dns_resolver.get_all_dns_records(hostname) - logger.debug(f"πŸ“‹ Found {len(dns_records)} DNS records for {hostname}") + logger.debug(f"Found {len(dns_records)} DNS records for {hostname}") for record in dns_records: self.data.add_dns_record(hostname, record) @@ -276,13 +292,13 @@ class ReconnaissanceEngine: self.data.add_ip_address(record.value) # Get certificates - logger.debug(f"πŸ” Checking certificates for {hostname}") + logger.debug(f"Checking certificates for {hostname}") certificates = self.cert_checker.get_certificates(hostname) if certificates: self.data.certificates[hostname] = certificates - logger.info(f"πŸ“œ Found {len(certificates)} certificates for {hostname}") + logger.info(f"Found {len(certificates)} certificates for {hostname}") else: - logger.debug(f"❌ No certificates found for {hostname}") + logger.debug(f"No certificates found for {hostname}") def _extract_new_subdomains(self, hostname: str) -> Set[str]: """Extract new subdomains from DNS records and certificates.""" @@ -294,7 +310,7 @@ class ReconnaissanceEngine: self.data.dns_records[hostname] ) new_subdomains.update(dns_subdomains) - logger.debug(f"🌐 Extracted {len(dns_subdomains)} subdomains from DNS records of {hostname}") + logger.debug(f"Extracted {len(dns_subdomains)} subdomains from DNS records of {hostname}") # From certificates if hostname in self.data.certificates: @@ -302,89 +318,89 @@ class ReconnaissanceEngine: self.data.certificates[hostname] ) new_subdomains.update(cert_subdomains) - logger.debug(f"πŸ” Extracted {len(cert_subdomains)} subdomains from certificates of {hostname}") + logger.debug(f"Extracted {len(cert_subdomains)} subdomains from certificates of {hostname}") # Filter out already known hostnames filtered_subdomains = new_subdomains - self.data.hostnames - logger.debug(f"πŸ†• {len(filtered_subdomains)} new subdomains after filtering") + logger.debug(f"{len(filtered_subdomains)} new subdomains after filtering") return filtered_subdomains def _perform_external_lookups(self): """Perform Shodan and VirusTotal lookups.""" - logger.info(f"πŸ” Starting external lookups for {len(self.data.ip_addresses)} IPs and {len(self.data.hostnames)} hostnames") + logger.info(f"Starting external lookups for {len(self.data.ip_addresses)} IPs and {len(self.data.hostnames)} hostnames") # Reverse DNS for all IPs - logger.info("πŸ”„ Performing reverse DNS lookups") + logger.info("Performing reverse DNS lookups") reverse_dns_count = 0 for ip in self.data.ip_addresses: reverse = self.dns_resolver.reverse_dns_lookup(ip) if reverse: self.data.reverse_dns[ip] = reverse reverse_dns_count += 1 - logger.debug(f"πŸ”™ Reverse DNS for {ip}: {reverse}") + logger.debug(f"Reverse DNS for {ip}: {reverse}") - logger.info(f"βœ… Completed reverse DNS: {reverse_dns_count}/{len(self.data.ip_addresses)} successful") + logger.info(f"Completed reverse DNS: {reverse_dns_count}/{len(self.data.ip_addresses)} successful") # Shodan lookups if self.shodan_client: - logger.info(f"πŸ•΅οΈ Starting Shodan lookups for {len(self.data.ip_addresses)} IPs") + logger.info(f"Starting Shodan lookups for {len(self.data.ip_addresses)} IPs") shodan_success_count = 0 for ip in self.data.ip_addresses: try: - logger.debug(f"πŸ” Querying Shodan for IP: {ip}") + logger.debug(f"Querying Shodan for IP: {ip}") result = self.shodan_client.lookup_ip(ip) if result: self.data.add_shodan_result(ip, result) shodan_success_count += 1 - logger.info(f"βœ… Shodan result for {ip}: {len(result.ports)} ports") + logger.info(f"Shodan result for {ip}: {len(result.ports)} ports") else: - logger.debug(f"❌ No Shodan data for {ip}") + logger.debug(f"No Shodan data for {ip}") except Exception as e: - logger.warning(f"⚠️ Error querying Shodan for {ip}: {e}") + logger.warning(f"Error querying Shodan for {ip}: {e}") - logger.info(f"βœ… Shodan lookups complete: {shodan_success_count}/{len(self.data.ip_addresses)} successful") + logger.info(f"Shodan lookups complete: {shodan_success_count}/{len(self.data.ip_addresses)} successful") else: - logger.info("⚠️ Skipping Shodan lookups (no API key)") + logger.info("Skipping Shodan lookups (no API key)") # VirusTotal lookups if self.virustotal_client: total_resources = len(self.data.ip_addresses) + len(self.data.hostnames) - logger.info(f"πŸ›‘οΈ Starting VirusTotal lookups for {total_resources} resources") + logger.info(f"Starting VirusTotal lookups for {total_resources} resources") vt_success_count = 0 # Check IPs for ip in self.data.ip_addresses: try: - logger.debug(f"πŸ” Querying VirusTotal for IP: {ip}") + logger.debug(f"Querying VirusTotal for IP: {ip}") result = self.virustotal_client.lookup_ip(ip) if result: self.data.add_virustotal_result(ip, result) vt_success_count += 1 - logger.info(f"πŸ›‘οΈ VirusTotal result for {ip}: {result.positives}/{result.total} detections") + logger.info(f"VirusTotal result for {ip}: {result.positives}/{result.total} detections") else: - logger.debug(f"❌ No VirusTotal data for {ip}") + logger.debug(f"No VirusTotal data for {ip}") except Exception as e: - logger.warning(f"⚠️ Error querying VirusTotal for IP {ip}: {e}") + logger.warning(f"Error querying VirusTotal for IP {ip}: {e}") # Check domains for hostname in self.data.hostnames: try: - logger.debug(f"πŸ” Querying VirusTotal for domain: {hostname}") + logger.debug(f"Querying VirusTotal for domain: {hostname}") result = self.virustotal_client.lookup_domain(hostname) if result: self.data.add_virustotal_result(hostname, result) vt_success_count += 1 - logger.info(f"πŸ›‘οΈ VirusTotal result for {hostname}: {result.positives}/{result.total} detections") + logger.info(f"VirusTotal result for {hostname}: {result.positives}/{result.total} detections") else: - logger.debug(f"❌ No VirusTotal data for {hostname}") + logger.debug(f"No VirusTotal data for {hostname}") except Exception as e: - logger.warning(f"⚠️ Error querying VirusTotal for domain {hostname}: {e}") + logger.warning(f"Error querying VirusTotal for domain {hostname}: {e}") - logger.info(f"βœ… VirusTotal lookups complete: {vt_success_count}/{total_resources} successful") + logger.info(f"VirusTotal lookups complete: {vt_success_count}/{total_resources} successful") else: - logger.info("⚠️ Skipping VirusTotal lookups (no API key)") + logger.info("Skipping VirusTotal lookups (no API key)") # Final external lookup summary ext_stats = { @@ -392,9 +408,4 @@ class ReconnaissanceEngine: 'shodan_results': len(self.data.shodan_results), 'virustotal_results': len(self.data.virustotal_results) } - logger.info(f"πŸ“Š External lookups summary: {ext_stats}") - - # Keep the original method name for backward compatibility - def _expand_hostname_to_tlds(self, hostname: str) -> Set[str]: - """Legacy method - redirects to smart expansion.""" - return self._expand_hostname_to_tlds_smart(hostname) \ No newline at end of file + logger.info(f"External lookups summary: {ext_stats}") \ No newline at end of file