This commit is contained in:
overcuriousity 2025-09-14 22:37:23 +02:00
parent ae07635ab6
commit eb9eea127b
3 changed files with 283 additions and 136 deletions

View File

@ -25,6 +25,9 @@ class Config:
self.max_retries_per_target = 3 self.max_retries_per_target = 3
self.cache_expiry_hours = 12 self.cache_expiry_hours = 12
# --- Provider Caching Settings ---
self.cache_timeout_hours = 6 # Provider-specific cache timeout
# --- Rate Limiting (requests per minute) --- # --- Rate Limiting (requests per minute) ---
self.rate_limits = { self.rate_limits = {
'crtsh': 30, 'crtsh': 30,
@ -65,6 +68,7 @@ class Config:
self.large_entity_threshold = int(os.getenv('LARGE_ENTITY_THRESHOLD', self.large_entity_threshold)) self.large_entity_threshold = int(os.getenv('LARGE_ENTITY_THRESHOLD', self.large_entity_threshold))
self.max_retries_per_target = int(os.getenv('MAX_RETRIES_PER_TARGET', self.max_retries_per_target)) self.max_retries_per_target = int(os.getenv('MAX_RETRIES_PER_TARGET', self.max_retries_per_target))
self.cache_expiry_hours = int(os.getenv('CACHE_EXPIRY_HOURS', self.cache_expiry_hours)) self.cache_expiry_hours = int(os.getenv('CACHE_EXPIRY_HOURS', self.cache_expiry_hours))
self.cache_timeout_hours = int(os.getenv('CACHE_TIMEOUT_HOURS', self.cache_timeout_hours))
# Override Flask and session settings # Override Flask and session settings
self.flask_host = os.getenv('FLASK_HOST', self.flask_host) self.flask_host = os.getenv('FLASK_HOST', self.flask_host)

View File

@ -3,8 +3,6 @@
import time import time
import requests import requests
import threading import threading
import os
import json
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple from typing import List, Dict, Any, Optional, Tuple
@ -80,12 +78,6 @@ class BaseProvider(ABC):
self.logger = get_forensic_logger() self.logger = get_forensic_logger()
self._stop_event = None self._stop_event = None
# Caching configuration (per session)
self.cache_dir = f'.cache/{id(self.config)}' # Unique cache per session config
self.cache_expiry = self.config.cache_expiry_hours * 3600
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
# Statistics (per provider instance) # Statistics (per provider instance)
self.total_requests = 0 self.total_requests = 0
self.successful_requests = 0 self.successful_requests = 0
@ -180,21 +172,6 @@ class BaseProvider(ABC):
print(f"Request cancelled before start: {url}") print(f"Request cancelled before start: {url}")
return None return None
cache_key = f"{self.name}_{hash(f'{method}:{url}:{json.dumps(params, sort_keys=True)}')}.json"
cache_path = os.path.join(self.cache_dir, cache_key)
if os.path.exists(cache_path):
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age < self.cache_expiry:
print(f"Returning cached response for: {url}")
with open(cache_path, 'r') as f:
cached_data = json.load(f)
response = requests.Response()
response.status_code = cached_data['status_code']
response._content = cached_data['content'].encode('utf-8')
response.headers = cached_data['headers']
return response
self.rate_limiter.wait_if_needed() self.rate_limiter.wait_if_needed()
start_time = time.time() start_time = time.time()
@ -242,12 +219,7 @@ class BaseProvider(ABC):
error=None, error=None,
target_indicator=target_indicator target_indicator=target_indicator
) )
with open(cache_path, 'w') as f:
json.dump({
'status_code': response.status_code,
'content': response.text,
'headers': dict(response.headers)
}, f)
return response return response
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:

View File

@ -2,6 +2,8 @@
import json import json
import re import re
import os
from pathlib import Path
from typing import List, Dict, Any, Tuple, Set from typing import List, Dict, Any, Tuple, Set
from urllib.parse import quote from urllib.parse import quote
from datetime import datetime, timezone from datetime import datetime, timezone
@ -14,7 +16,7 @@ from utils.helpers import _is_valid_domain
class CrtShProvider(BaseProvider): class CrtShProvider(BaseProvider):
""" """
Provider for querying crt.sh certificate transparency database. Provider for querying crt.sh certificate transparency database.
Now uses session-specific configuration and caching. Now uses session-specific configuration and caching with accumulative behavior.
""" """
def __init__(self, name=None, session_config=None): def __init__(self, name=None, session_config=None):
@ -27,6 +29,10 @@ class CrtShProvider(BaseProvider):
) )
self.base_url = "https://crt.sh/" self.base_url = "https://crt.sh/"
self._stop_event = None self._stop_event = None
# Initialize cache directory
self.cache_dir = Path('cache') / 'crtsh'
self.cache_dir.mkdir(parents=True, exist_ok=True)
def get_name(self) -> str: def get_name(self) -> str:
"""Return the provider name.""" """Return the provider name."""
@ -52,6 +58,125 @@ class CrtShProvider(BaseProvider):
""" """
return True return True
def _get_cache_file_path(self, domain: str) -> Path:
"""Generate cache file path for a domain."""
# Sanitize domain for filename safety
safe_domain = domain.replace('.', '_').replace('/', '_').replace('\\', '_')
return self.cache_dir / f"{safe_domain}.json"
def _get_cache_status(self, cache_file_path: Path) -> str:
"""
Check cache status for a domain.
Returns: 'not_found', 'fresh', or 'stale'
"""
if not cache_file_path.exists():
return "not_found"
try:
with open(cache_file_path, 'r') as f:
cache_data = json.load(f)
last_query_str = cache_data.get("last_upstream_query")
if not last_query_str:
return "stale" # Invalid cache format
last_query = datetime.fromisoformat(last_query_str.replace('Z', '+00:00'))
hours_since_query = (datetime.now(timezone.utc) - last_query).total_seconds() / 3600
cache_timeout = self.config.cache_timeout_hours
if hours_since_query < cache_timeout:
return "fresh"
else:
return "stale"
except (json.JSONDecodeError, ValueError, KeyError) as e:
self.logger.logger.warning(f"Invalid cache file format for {cache_file_path}: {e}")
return "stale"
def _load_cached_certificates(self, cache_file_path: Path) -> List[Dict[str, Any]]:
"""Load certificates from cache file."""
try:
with open(cache_file_path, 'r') as f:
cache_data = json.load(f)
return cache_data.get('certificates', [])
except (json.JSONDecodeError, FileNotFoundError, KeyError) as e:
self.logger.logger.error(f"Failed to load cached certificates from {cache_file_path}: {e}")
return []
def _query_crtsh_api(self, domain: str) -> List[Dict[str, Any]]:
"""
Query crt.sh API for raw certificate data.
Raises exceptions for network errors to allow core logic to retry.
"""
url = f"{self.base_url}?q={quote(domain)}&output=json"
response = self.make_request(url, target_indicator=domain)
if not response or response.status_code != 200:
# This could be a temporary error - raise exception so core can retry
raise requests.exceptions.RequestException(f"crt.sh API returned status {response.status_code if response else 'None'}")
certificates = response.json()
if not certificates:
return []
return certificates
def _create_cache_file(self, cache_file_path: Path, domain: str, certificates: List[Dict[str, Any]]) -> None:
"""Create new cache file with certificates."""
try:
cache_data = {
"domain": domain,
"first_cached": datetime.now(timezone.utc).isoformat(),
"last_upstream_query": datetime.now(timezone.utc).isoformat(),
"upstream_query_count": 1,
"certificates": certificates
}
cache_file_path.parent.mkdir(parents=True, exist_ok=True)
with open(cache_file_path, 'w') as f:
json.dump(cache_data, f, separators=(',', ':'))
self.logger.logger.info(f"Created cache file for {domain} with {len(certificates)} certificates")
except Exception as e:
self.logger.logger.warning(f"Failed to create cache file for {domain}: {e}")
def _append_to_cache(self, cache_file_path: Path, new_certificates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Append new certificates to existing cache and return all certificates."""
try:
# Load existing cache
with open(cache_file_path, 'r') as f:
cache_data = json.load(f)
# Track existing certificate IDs to avoid duplicates
existing_ids = {cert.get('id') for cert in cache_data.get('certificates', [])}
# Add only new certificates
added_count = 0
for cert in new_certificates:
cert_id = cert.get('id')
if cert_id and cert_id not in existing_ids:
cache_data['certificates'].append(cert)
existing_ids.add(cert_id)
added_count += 1
# Update metadata
cache_data['last_upstream_query'] = datetime.now(timezone.utc).isoformat()
cache_data['upstream_query_count'] = cache_data.get('upstream_query_count', 0) + 1
# Write updated cache
with open(cache_file_path, 'w') as f:
json.dump(cache_data, f, separators=(',', ':'))
total_certs = len(cache_data['certificates'])
self.logger.logger.info(f"Appended {added_count} new certificates to cache. Total: {total_certs}")
return cache_data['certificates']
except Exception as e:
self.logger.logger.warning(f"Failed to append to cache: {e}")
return new_certificates # Fallback to new certificates only
def _parse_issuer_organization(self, issuer_dn: str) -> str: def _parse_issuer_organization(self, issuer_dn: str) -> str:
""" """
Parse the issuer Distinguished Name to extract just the organization name. Parse the issuer Distinguished Name to extract just the organization name.
@ -201,7 +326,8 @@ class CrtShProvider(BaseProvider):
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
""" """
Query crt.sh for certificates containing the domain. Query crt.sh for certificates containing the domain with caching support.
Properly raises exceptions for network errors to allow core logic retries.
""" """
if not _is_valid_domain(domain): if not _is_valid_domain(domain):
return [] return []
@ -211,122 +337,167 @@ class CrtShProvider(BaseProvider):
print(f"CrtSh query cancelled before start for domain: {domain}") print(f"CrtSh query cancelled before start for domain: {domain}")
return [] return []
relationships = [] # === CACHING LOGIC ===
cache_file = self._get_cache_file_path(domain)
cache_status = self._get_cache_status(cache_file)
certificates = []
try: try:
# Query crt.sh for certificates if cache_status == "fresh":
url = f"{self.base_url}?q={quote(domain)}&output=json" # Use cached data
response = self.make_request(url, target_indicator=domain) certificates = self._load_cached_certificates(cache_file)
self.logger.logger.info(f"Using cached data for {domain} ({len(certificates)} certificates)")
if not response or response.status_code != 200:
return []
# Check for cancellation after request
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh query cancelled after request for domain: {domain}")
return []
certificates = response.json()
if not certificates:
return []
# Check for cancellation before processing
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh query cancelled before processing for domain: {domain}")
return []
# Aggregate certificate data by domain
domain_certificates = {}
all_discovered_domains = set()
# Process certificates with cancellation checking
for i, cert_data in enumerate(certificates):
# Check for cancellation every 5 certificates instead of 10 for faster response
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}")
break
cert_metadata = self._extract_certificate_metadata(cert_data)
cert_domains = self._extract_domains_from_certificate(cert_data)
# Add all domains from this certificate to our tracking elif cache_status == "not_found":
all_discovered_domains.update(cert_domains) # Fresh query, create new cache
for cert_domain in cert_domains: certificates = self._query_crtsh_api(domain)
if not _is_valid_domain(cert_domain): if certificates: # Only cache if we got results
continue self._create_cache_file(cache_file, domain, certificates)
self.logger.logger.info(f"Cached fresh data for {domain} ({len(certificates)} certificates)")
else:
self.logger.logger.info(f"No certificates found for {domain}, not caching")
# Initialize domain certificate list if needed elif cache_status == "stale":
if cert_domain not in domain_certificates: # Append query, update existing cache
domain_certificates[cert_domain] = [] try:
new_certificates = self._query_crtsh_api(domain)
# Add this certificate to the domain's certificate list if new_certificates:
domain_certificates[cert_domain].append(cert_metadata) certificates = self._append_to_cache(cache_file, new_certificates)
self.logger.logger.info(f"Refreshed and appended cache for {domain}")
else:
# Use existing cache if API returns no results
certificates = self._load_cached_certificates(cache_file)
self.logger.logger.info(f"API returned no new results, using existing cache for {domain}")
except requests.exceptions.RequestException:
# If API call fails for stale cache, use cached data and re-raise for retry logic
certificates = self._load_cached_certificates(cache_file)
if certificates:
self.logger.logger.warning(f"API call failed for {domain}, using stale cache data ({len(certificates)} certificates)")
# Don't re-raise here, just use cached data
else:
# No cached data and API failed - re-raise for retry
raise
# Final cancellation check before creating relationships except requests.exceptions.RequestException as e:
if self._stop_event and self._stop_event.is_set(): # Network/API errors should be re-raised so core logic can retry
print(f"CrtSh query cancelled before relationship creation for domain: {domain}") self.logger.logger.error(f"API query failed for {domain}: {e}")
return [] raise e
except json.JSONDecodeError as e:
# JSON parsing errors should also be raised for retry
self.logger.logger.error(f"Failed to parse JSON response from crt.sh for {domain}: {e}")
raise e
# Check for cancellation after cache operations
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh query cancelled after cache operations for domain: {domain}")
return []
if not certificates:
return []
return self._process_certificates_to_relationships(domain, certificates)
def _process_certificates_to_relationships(self, domain: str, certificates: List[Dict[str, Any]]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Process certificates to relationships using existing logic.
This method contains the original processing logic from query_domain.
"""
relationships = []
# Check for cancellation before processing
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh processing cancelled before processing for domain: {domain}")
return []
# Create relationships from query domain to ALL discovered domains with stop checking # Aggregate certificate data by domain
for i, discovered_domain in enumerate(all_discovered_domains): domain_certificates = {}
if discovered_domain == domain: all_discovered_domains = set()
continue # Skip self-relationships
# Process certificates with cancellation checking
for i, cert_data in enumerate(certificates):
# Check for cancellation every 5 certificates for faster response
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}")
break
# Check for cancellation every 10 relationships cert_metadata = self._extract_certificate_metadata(cert_data)
if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): cert_domains = self._extract_domains_from_certificate(cert_data)
print(f"CrtSh relationship creation cancelled for domain: {domain}")
break # Add all domains from this certificate to our tracking
all_discovered_domains.update(cert_domains)
if not _is_valid_domain(discovered_domain): for cert_domain in cert_domains:
if not _is_valid_domain(cert_domain):
continue continue
# Get certificates for both domains # Initialize domain certificate list if needed
query_domain_certs = domain_certificates.get(domain, []) if cert_domain not in domain_certificates:
discovered_domain_certs = domain_certificates.get(discovered_domain, []) domain_certificates[cert_domain] = []
# Find shared certificates (for metadata purposes) # Add this certificate to the domain's certificate list
shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) domain_certificates[cert_domain].append(cert_metadata)
# Calculate confidence based on relationship type and shared certificates
confidence = self._calculate_domain_relationship_confidence(
domain, discovered_domain, shared_certificates, all_discovered_domains
)
# Create comprehensive raw data for the relationship
relationship_raw_data = {
'relationship_type': 'certificate_discovery',
'shared_certificates': shared_certificates,
'total_shared_certs': len(shared_certificates),
'discovery_context': self._determine_relationship_context(discovered_domain, domain),
'domain_certificates': {
domain: self._summarize_certificates(query_domain_certs),
discovered_domain: self._summarize_certificates(discovered_domain_certs)
}
}
# Create domain -> domain relationship
relationships.append((
domain,
discovered_domain,
'san_certificate',
confidence,
relationship_raw_data
))
# Log the relationship discovery
self.log_relationship_discovery(
source_node=domain,
target_node=discovered_domain,
relationship_type='san_certificate',
confidence_score=confidence,
raw_data=relationship_raw_data,
discovery_method="certificate_transparency_analysis"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
# Final cancellation check before creating relationships
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh query cancelled before relationship creation for domain: {domain}")
return []
# Create relationships from query domain to ALL discovered domains with stop checking
for i, discovered_domain in enumerate(all_discovered_domains):
if discovered_domain == domain:
continue # Skip self-relationships
# Check for cancellation every 10 relationships
if i % 10 == 0 and self._stop_event and self._stop_event.is_set():
print(f"CrtSh relationship creation cancelled for domain: {domain}")
break
if not _is_valid_domain(discovered_domain):
continue
# Get certificates for both domains
query_domain_certs = domain_certificates.get(domain, [])
discovered_domain_certs = domain_certificates.get(discovered_domain, [])
# Find shared certificates (for metadata purposes)
shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs)
# Calculate confidence based on relationship type and shared certificates
confidence = self._calculate_domain_relationship_confidence(
domain, discovered_domain, shared_certificates, all_discovered_domains
)
# Create comprehensive raw data for the relationship
relationship_raw_data = {
'relationship_type': 'certificate_discovery',
'shared_certificates': shared_certificates,
'total_shared_certs': len(shared_certificates),
'discovery_context': self._determine_relationship_context(discovered_domain, domain),
'domain_certificates': {
domain: self._summarize_certificates(query_domain_certs),
discovered_domain: self._summarize_certificates(discovered_domain_certs)
}
}
# Create domain -> domain relationship
relationships.append((
domain,
discovered_domain,
'san_certificate',
confidence,
relationship_raw_data
))
# Log the relationship discovery
self.log_relationship_discovery(
source_node=domain,
target_node=discovered_domain,
relationship_type='san_certificate',
confidence_score=confidence,
raw_data=relationship_raw_data,
discovery_method="certificate_transparency_analysis"
)
return relationships return relationships
def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: