diff --git a/config.py b/config.py index 6122157..3f29c0a 100644 --- a/config.py +++ b/config.py @@ -25,6 +25,9 @@ class Config: self.max_retries_per_target = 3 self.cache_expiry_hours = 12 + # --- Provider Caching Settings --- + self.cache_timeout_hours = 6 # Provider-specific cache timeout + # --- Rate Limiting (requests per minute) --- self.rate_limits = { 'crtsh': 30, @@ -65,6 +68,7 @@ class Config: self.large_entity_threshold = int(os.getenv('LARGE_ENTITY_THRESHOLD', self.large_entity_threshold)) self.max_retries_per_target = int(os.getenv('MAX_RETRIES_PER_TARGET', self.max_retries_per_target)) self.cache_expiry_hours = int(os.getenv('CACHE_EXPIRY_HOURS', self.cache_expiry_hours)) + self.cache_timeout_hours = int(os.getenv('CACHE_TIMEOUT_HOURS', self.cache_timeout_hours)) # Override Flask and session settings self.flask_host = os.getenv('FLASK_HOST', self.flask_host) diff --git a/providers/base_provider.py b/providers/base_provider.py index 76ebd10..eebf631 100644 --- a/providers/base_provider.py +++ b/providers/base_provider.py @@ -3,8 +3,6 @@ import time import requests import threading -import os -import json from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional, Tuple @@ -80,12 +78,6 @@ class BaseProvider(ABC): self.logger = get_forensic_logger() self._stop_event = None - # Caching configuration (per session) - self.cache_dir = f'.cache/{id(self.config)}' # Unique cache per session config - self.cache_expiry = self.config.cache_expiry_hours * 3600 - if not os.path.exists(self.cache_dir): - os.makedirs(self.cache_dir) - # Statistics (per provider instance) self.total_requests = 0 self.successful_requests = 0 @@ -180,21 +172,6 @@ class BaseProvider(ABC): print(f"Request cancelled before start: {url}") return None - cache_key = f"{self.name}_{hash(f'{method}:{url}:{json.dumps(params, sort_keys=True)}')}.json" - cache_path = os.path.join(self.cache_dir, cache_key) - - if os.path.exists(cache_path): - cache_age = time.time() - os.path.getmtime(cache_path) - if cache_age < self.cache_expiry: - print(f"Returning cached response for: {url}") - with open(cache_path, 'r') as f: - cached_data = json.load(f) - response = requests.Response() - response.status_code = cached_data['status_code'] - response._content = cached_data['content'].encode('utf-8') - response.headers = cached_data['headers'] - return response - self.rate_limiter.wait_if_needed() start_time = time.time() @@ -242,12 +219,7 @@ class BaseProvider(ABC): error=None, target_indicator=target_indicator ) - with open(cache_path, 'w') as f: - json.dump({ - 'status_code': response.status_code, - 'content': response.text, - 'headers': dict(response.headers) - }, f) + return response except requests.exceptions.RequestException as e: diff --git a/providers/crtsh_provider.py b/providers/crtsh_provider.py index c2aea01..c7b47ee 100644 --- a/providers/crtsh_provider.py +++ b/providers/crtsh_provider.py @@ -2,6 +2,8 @@ import json import re +import os +from pathlib import Path from typing import List, Dict, Any, Tuple, Set from urllib.parse import quote from datetime import datetime, timezone @@ -14,7 +16,7 @@ from utils.helpers import _is_valid_domain class CrtShProvider(BaseProvider): """ Provider for querying crt.sh certificate transparency database. - Now uses session-specific configuration and caching. + Now uses session-specific configuration and caching with accumulative behavior. """ def __init__(self, name=None, session_config=None): @@ -27,6 +29,10 @@ class CrtShProvider(BaseProvider): ) self.base_url = "https://crt.sh/" self._stop_event = None + + # Initialize cache directory + self.cache_dir = Path('cache') / 'crtsh' + self.cache_dir.mkdir(parents=True, exist_ok=True) def get_name(self) -> str: """Return the provider name.""" @@ -52,6 +58,125 @@ class CrtShProvider(BaseProvider): """ return True + def _get_cache_file_path(self, domain: str) -> Path: + """Generate cache file path for a domain.""" + # Sanitize domain for filename safety + safe_domain = domain.replace('.', '_').replace('/', '_').replace('\\', '_') + return self.cache_dir / f"{safe_domain}.json" + + def _get_cache_status(self, cache_file_path: Path) -> str: + """ + Check cache status for a domain. + Returns: 'not_found', 'fresh', or 'stale' + """ + if not cache_file_path.exists(): + return "not_found" + + try: + with open(cache_file_path, 'r') as f: + cache_data = json.load(f) + + last_query_str = cache_data.get("last_upstream_query") + if not last_query_str: + return "stale" # Invalid cache format + + last_query = datetime.fromisoformat(last_query_str.replace('Z', '+00:00')) + hours_since_query = (datetime.now(timezone.utc) - last_query).total_seconds() / 3600 + + cache_timeout = self.config.cache_timeout_hours + if hours_since_query < cache_timeout: + return "fresh" + else: + return "stale" + + except (json.JSONDecodeError, ValueError, KeyError) as e: + self.logger.logger.warning(f"Invalid cache file format for {cache_file_path}: {e}") + return "stale" + + def _load_cached_certificates(self, cache_file_path: Path) -> List[Dict[str, Any]]: + """Load certificates from cache file.""" + try: + with open(cache_file_path, 'r') as f: + cache_data = json.load(f) + return cache_data.get('certificates', []) + except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: + self.logger.logger.error(f"Failed to load cached certificates from {cache_file_path}: {e}") + return [] + + def _query_crtsh_api(self, domain: str) -> List[Dict[str, Any]]: + """ + Query crt.sh API for raw certificate data. + Raises exceptions for network errors to allow core logic to retry. + """ + url = f"{self.base_url}?q={quote(domain)}&output=json" + response = self.make_request(url, target_indicator=domain) + + if not response or response.status_code != 200: + # This could be a temporary error - raise exception so core can retry + raise requests.exceptions.RequestException(f"crt.sh API returned status {response.status_code if response else 'None'}") + + certificates = response.json() + if not certificates: + return [] + + return certificates + + def _create_cache_file(self, cache_file_path: Path, domain: str, certificates: List[Dict[str, Any]]) -> None: + """Create new cache file with certificates.""" + try: + cache_data = { + "domain": domain, + "first_cached": datetime.now(timezone.utc).isoformat(), + "last_upstream_query": datetime.now(timezone.utc).isoformat(), + "upstream_query_count": 1, + "certificates": certificates + } + + cache_file_path.parent.mkdir(parents=True, exist_ok=True) + with open(cache_file_path, 'w') as f: + json.dump(cache_data, f, separators=(',', ':')) + + self.logger.logger.info(f"Created cache file for {domain} with {len(certificates)} certificates") + + except Exception as e: + self.logger.logger.warning(f"Failed to create cache file for {domain}: {e}") + + def _append_to_cache(self, cache_file_path: Path, new_certificates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Append new certificates to existing cache and return all certificates.""" + try: + # Load existing cache + with open(cache_file_path, 'r') as f: + cache_data = json.load(f) + + # Track existing certificate IDs to avoid duplicates + existing_ids = {cert.get('id') for cert in cache_data.get('certificates', [])} + + # Add only new certificates + added_count = 0 + for cert in new_certificates: + cert_id = cert.get('id') + if cert_id and cert_id not in existing_ids: + cache_data['certificates'].append(cert) + existing_ids.add(cert_id) + added_count += 1 + + # Update metadata + cache_data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() + cache_data['upstream_query_count'] = cache_data.get('upstream_query_count', 0) + 1 + + # Write updated cache + with open(cache_file_path, 'w') as f: + json.dump(cache_data, f, separators=(',', ':')) + + total_certs = len(cache_data['certificates']) + self.logger.logger.info(f"Appended {added_count} new certificates to cache. Total: {total_certs}") + + return cache_data['certificates'] + + except Exception as e: + self.logger.logger.warning(f"Failed to append to cache: {e}") + return new_certificates # Fallback to new certificates only + def _parse_issuer_organization(self, issuer_dn: str) -> str: """ Parse the issuer Distinguished Name to extract just the organization name. @@ -201,7 +326,8 @@ class CrtShProvider(BaseProvider): def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ - Query crt.sh for certificates containing the domain. + Query crt.sh for certificates containing the domain with caching support. + Properly raises exceptions for network errors to allow core logic retries. """ if not _is_valid_domain(domain): return [] @@ -211,122 +337,167 @@ class CrtShProvider(BaseProvider): print(f"CrtSh query cancelled before start for domain: {domain}") return [] - relationships = [] + # === CACHING LOGIC === + cache_file = self._get_cache_file_path(domain) + cache_status = self._get_cache_status(cache_file) + + certificates = [] try: - # Query crt.sh for certificates - url = f"{self.base_url}?q={quote(domain)}&output=json" - response = self.make_request(url, target_indicator=domain) - - if not response or response.status_code != 200: - return [] - - # Check for cancellation after request - if self._stop_event and self._stop_event.is_set(): - print(f"CrtSh query cancelled after request for domain: {domain}") - return [] - - certificates = response.json() - - if not certificates: - return [] - - # Check for cancellation before processing - if self._stop_event and self._stop_event.is_set(): - print(f"CrtSh query cancelled before processing for domain: {domain}") - return [] - - # Aggregate certificate data by domain - domain_certificates = {} - all_discovered_domains = set() - - # Process certificates with cancellation checking - for i, cert_data in enumerate(certificates): - # Check for cancellation every 5 certificates instead of 10 for faster response - if i % 5 == 0 and self._stop_event and self._stop_event.is_set(): - print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}") - break - - cert_metadata = self._extract_certificate_metadata(cert_data) - cert_domains = self._extract_domains_from_certificate(cert_data) + if cache_status == "fresh": + # Use cached data + certificates = self._load_cached_certificates(cache_file) + self.logger.logger.info(f"Using cached data for {domain} ({len(certificates)} certificates)") - # Add all domains from this certificate to our tracking - all_discovered_domains.update(cert_domains) - for cert_domain in cert_domains: - if not _is_valid_domain(cert_domain): - continue + elif cache_status == "not_found": + # Fresh query, create new cache + certificates = self._query_crtsh_api(domain) + if certificates: # Only cache if we got results + self._create_cache_file(cache_file, domain, certificates) + self.logger.logger.info(f"Cached fresh data for {domain} ({len(certificates)} certificates)") + else: + self.logger.logger.info(f"No certificates found for {domain}, not caching") - # Initialize domain certificate list if needed - if cert_domain not in domain_certificates: - domain_certificates[cert_domain] = [] - - # Add this certificate to the domain's certificate list - domain_certificates[cert_domain].append(cert_metadata) + elif cache_status == "stale": + # Append query, update existing cache + try: + new_certificates = self._query_crtsh_api(domain) + if new_certificates: + certificates = self._append_to_cache(cache_file, new_certificates) + self.logger.logger.info(f"Refreshed and appended cache for {domain}") + else: + # Use existing cache if API returns no results + certificates = self._load_cached_certificates(cache_file) + self.logger.logger.info(f"API returned no new results, using existing cache for {domain}") + except requests.exceptions.RequestException: + # If API call fails for stale cache, use cached data and re-raise for retry logic + certificates = self._load_cached_certificates(cache_file) + if certificates: + self.logger.logger.warning(f"API call failed for {domain}, using stale cache data ({len(certificates)} certificates)") + # Don't re-raise here, just use cached data + else: + # No cached data and API failed - re-raise for retry + raise - # Final cancellation check before creating relationships - if self._stop_event and self._stop_event.is_set(): - print(f"CrtSh query cancelled before relationship creation for domain: {domain}") - return [] + except requests.exceptions.RequestException as e: + # Network/API errors should be re-raised so core logic can retry + self.logger.logger.error(f"API query failed for {domain}: {e}") + raise e + except json.JSONDecodeError as e: + # JSON parsing errors should also be raised for retry + self.logger.logger.error(f"Failed to parse JSON response from crt.sh for {domain}: {e}") + raise e + + # Check for cancellation after cache operations + if self._stop_event and self._stop_event.is_set(): + print(f"CrtSh query cancelled after cache operations for domain: {domain}") + return [] + + if not certificates: + return [] + + return self._process_certificates_to_relationships(domain, certificates) + + def _process_certificates_to_relationships(self, domain: str, certificates: List[Dict[str, Any]]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: + """ + Process certificates to relationships using existing logic. + This method contains the original processing logic from query_domain. + """ + relationships = [] + + # Check for cancellation before processing + if self._stop_event and self._stop_event.is_set(): + print(f"CrtSh processing cancelled before processing for domain: {domain}") + return [] - # Create relationships from query domain to ALL discovered domains with stop checking - for i, discovered_domain in enumerate(all_discovered_domains): - if discovered_domain == domain: - continue # Skip self-relationships + # Aggregate certificate data by domain + domain_certificates = {} + all_discovered_domains = set() + + # Process certificates with cancellation checking + for i, cert_data in enumerate(certificates): + # Check for cancellation every 5 certificates for faster response + if i % 5 == 0 and self._stop_event and self._stop_event.is_set(): + print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}") + break - # Check for cancellation every 10 relationships - if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): - print(f"CrtSh relationship creation cancelled for domain: {domain}") - break - - if not _is_valid_domain(discovered_domain): + cert_metadata = self._extract_certificate_metadata(cert_data) + cert_domains = self._extract_domains_from_certificate(cert_data) + + # Add all domains from this certificate to our tracking + all_discovered_domains.update(cert_domains) + for cert_domain in cert_domains: + if not _is_valid_domain(cert_domain): continue - # Get certificates for both domains - query_domain_certs = domain_certificates.get(domain, []) - discovered_domain_certs = domain_certificates.get(discovered_domain, []) + # Initialize domain certificate list if needed + if cert_domain not in domain_certificates: + domain_certificates[cert_domain] = [] - # Find shared certificates (for metadata purposes) - shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) - - # Calculate confidence based on relationship type and shared certificates - confidence = self._calculate_domain_relationship_confidence( - domain, discovered_domain, shared_certificates, all_discovered_domains - ) - - # Create comprehensive raw data for the relationship - relationship_raw_data = { - 'relationship_type': 'certificate_discovery', - 'shared_certificates': shared_certificates, - 'total_shared_certs': len(shared_certificates), - 'discovery_context': self._determine_relationship_context(discovered_domain, domain), - 'domain_certificates': { - domain: self._summarize_certificates(query_domain_certs), - discovered_domain: self._summarize_certificates(discovered_domain_certs) - } - } - - # Create domain -> domain relationship - relationships.append(( - domain, - discovered_domain, - 'san_certificate', - confidence, - relationship_raw_data - )) - - # Log the relationship discovery - self.log_relationship_discovery( - source_node=domain, - target_node=discovered_domain, - relationship_type='san_certificate', - confidence_score=confidence, - raw_data=relationship_raw_data, - discovery_method="certificate_transparency_analysis" - ) - - except json.JSONDecodeError as e: - self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}") + # Add this certificate to the domain's certificate list + domain_certificates[cert_domain].append(cert_metadata) + # Final cancellation check before creating relationships + if self._stop_event and self._stop_event.is_set(): + print(f"CrtSh query cancelled before relationship creation for domain: {domain}") + return [] + + # Create relationships from query domain to ALL discovered domains with stop checking + for i, discovered_domain in enumerate(all_discovered_domains): + if discovered_domain == domain: + continue # Skip self-relationships + + # Check for cancellation every 10 relationships + if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): + print(f"CrtSh relationship creation cancelled for domain: {domain}") + break + + if not _is_valid_domain(discovered_domain): + continue + + # Get certificates for both domains + query_domain_certs = domain_certificates.get(domain, []) + discovered_domain_certs = domain_certificates.get(discovered_domain, []) + + # Find shared certificates (for metadata purposes) + shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) + + # Calculate confidence based on relationship type and shared certificates + confidence = self._calculate_domain_relationship_confidence( + domain, discovered_domain, shared_certificates, all_discovered_domains + ) + + # Create comprehensive raw data for the relationship + relationship_raw_data = { + 'relationship_type': 'certificate_discovery', + 'shared_certificates': shared_certificates, + 'total_shared_certs': len(shared_certificates), + 'discovery_context': self._determine_relationship_context(discovered_domain, domain), + 'domain_certificates': { + domain: self._summarize_certificates(query_domain_certs), + discovered_domain: self._summarize_certificates(discovered_domain_certs) + } + } + + # Create domain -> domain relationship + relationships.append(( + domain, + discovered_domain, + 'san_certificate', + confidence, + relationship_raw_data + )) + + # Log the relationship discovery + self.log_relationship_discovery( + source_node=domain, + target_node=discovered_domain, + relationship_type='san_certificate', + confidence_score=confidence, + raw_data=relationship_raw_data, + discovery_method="certificate_transparency_analysis" + ) + return relationships def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: