# dnsrecon/providers/crtsh_provider.py import json import re import os from pathlib import Path from typing import List, Dict, Any, Tuple, Set from urllib.parse import quote from datetime import datetime, timezone import requests from .base_provider import BaseProvider from utils.helpers import _is_valid_domain class CrtShProvider(BaseProvider): """ Provider for querying crt.sh certificate transparency database. Now uses session-specific configuration and caching with accumulative behavior. """ def __init__(self, name=None, session_config=None): """Initialize CrtSh provider with session-specific configuration.""" super().__init__( name="crtsh", rate_limit=60, timeout=15, session_config=session_config ) self.base_url = "https://crt.sh/" self._stop_event = None # Initialize cache directory self.cache_dir = Path('cache') / 'crtsh' self.cache_dir.mkdir(parents=True, exist_ok=True) def get_name(self) -> str: """Return the provider name.""" return "crtsh" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "crt.sh" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return False def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': True, 'ips': False} def is_available(self) -> bool: """ Check if the provider is configured to be used. This method is intentionally simple and does not perform a network request to avoid blocking application startup. """ return True def _get_cache_file_path(self, domain: str) -> Path: """Generate cache file path for a domain.""" # Sanitize domain for filename safety safe_domain = domain.replace('.', '_').replace('/', '_').replace('\\', '_') return self.cache_dir / f"{safe_domain}.json" def _get_cache_status(self, cache_file_path: Path) -> str: """ Check cache status for a domain. Returns: 'not_found', 'fresh', or 'stale' """ if not cache_file_path.exists(): return "not_found" try: with open(cache_file_path, 'r') as f: cache_data = json.load(f) last_query_str = cache_data.get("last_upstream_query") if not last_query_str: return "stale" # Invalid cache format last_query = datetime.fromisoformat(last_query_str.replace('Z', '+00:00')) hours_since_query = (datetime.now(timezone.utc) - last_query).total_seconds() / 3600 cache_timeout = self.config.cache_timeout_hours if hours_since_query < cache_timeout: return "fresh" else: return "stale" except (json.JSONDecodeError, ValueError, KeyError) as e: self.logger.logger.warning(f"Invalid cache file format for {cache_file_path}: {e}") return "stale" def _load_cached_certificates(self, cache_file_path: Path) -> List[Dict[str, Any]]: """Load certificates from cache file.""" try: with open(cache_file_path, 'r') as f: cache_data = json.load(f) return cache_data.get('certificates', []) except (json.JSONDecodeError, FileNotFoundError, KeyError) as e: self.logger.logger.error(f"Failed to load cached certificates from {cache_file_path}: {e}") return [] def _query_crtsh_api(self, domain: str) -> List[Dict[str, Any]]: """ Query crt.sh API for raw certificate data. Raises exceptions for network errors to allow core logic to retry. """ url = f"{self.base_url}?q={quote(domain)}&output=json" response = self.make_request(url, target_indicator=domain) if not response or response.status_code != 200: # This could be a temporary error - raise exception so core can retry raise requests.exceptions.RequestException(f"crt.sh API returned status {response.status_code if response else 'None'}") certificates = response.json() if not certificates: return [] return certificates def _create_cache_file(self, cache_file_path: Path, domain: str, certificates: List[Dict[str, Any]]) -> None: """Create new cache file with certificates.""" try: cache_data = { "domain": domain, "first_cached": datetime.now(timezone.utc).isoformat(), "last_upstream_query": datetime.now(timezone.utc).isoformat(), "upstream_query_count": 1, "certificates": certificates } cache_file_path.parent.mkdir(parents=True, exist_ok=True) with open(cache_file_path, 'w') as f: json.dump(cache_data, f, separators=(',', ':')) self.logger.logger.info(f"Created cache file for {domain} with {len(certificates)} certificates") except Exception as e: self.logger.logger.warning(f"Failed to create cache file for {domain}: {e}") def _append_to_cache(self, cache_file_path: Path, new_certificates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Append new certificates to existing cache and return all certificates.""" try: # Load existing cache with open(cache_file_path, 'r') as f: cache_data = json.load(f) # Track existing certificate IDs to avoid duplicates existing_ids = {cert.get('id') for cert in cache_data.get('certificates', [])} # Add only new certificates added_count = 0 for cert in new_certificates: cert_id = cert.get('id') if cert_id and cert_id not in existing_ids: cache_data['certificates'].append(cert) existing_ids.add(cert_id) added_count += 1 # Update metadata cache_data['last_upstream_query'] = datetime.now(timezone.utc).isoformat() cache_data['upstream_query_count'] = cache_data.get('upstream_query_count', 0) + 1 # Write updated cache with open(cache_file_path, 'w') as f: json.dump(cache_data, f, separators=(',', ':')) total_certs = len(cache_data['certificates']) self.logger.logger.info(f"Appended {added_count} new certificates to cache. Total: {total_certs}") return cache_data['certificates'] except Exception as e: self.logger.logger.warning(f"Failed to append to cache: {e}") return new_certificates # Fallback to new certificates only def _parse_issuer_organization(self, issuer_dn: str) -> str: """ Parse the issuer Distinguished Name to extract just the organization name. Args: issuer_dn: Full issuer DN string (e.g., "C=US, O=Let's Encrypt, CN=R11") Returns: Organization name (e.g., "Let's Encrypt") or original string if parsing fails """ if not issuer_dn: return issuer_dn try: # Split by comma and look for O= component components = [comp.strip() for comp in issuer_dn.split(',')] for component in components: if component.startswith('O='): # Extract the value after O= org_name = component[2:].strip() # Remove quotes if present if org_name.startswith('"') and org_name.endswith('"'): org_name = org_name[1:-1] return org_name # If no O= component found, return the original string return issuer_dn except Exception as e: self.logger.logger.debug(f"Failed to parse issuer DN '{issuer_dn}': {e}") return issuer_dn def _parse_certificate_date(self, date_string: str) -> datetime: """ Parse certificate date from crt.sh format. Args: date_string: Date string from crt.sh API Returns: Parsed datetime object in UTC """ if not date_string: raise ValueError("Empty date string") try: # Handle various possible formats from crt.sh if date_string.endswith('Z'): return datetime.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc) elif '+' in date_string or date_string.endswith('UTC'): # Handle timezone-aware strings date_string = date_string.replace('UTC', '').strip() if '+' in date_string: date_string = date_string.split('+')[0] return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) else: # Assume UTC if no timezone specified return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) except Exception as e: # Fallback: try parsing without timezone info and assume UTC try: return datetime.strptime(date_string[:19], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc) except Exception: raise ValueError(f"Unable to parse date: {date_string}") from e def _is_cert_valid(self, cert_data: Dict[str, Any]) -> bool: """ Check if a certificate is currently valid based on its expiry date. Args: cert_data: Certificate data from crt.sh Returns: True if certificate is currently valid (not expired) """ try: not_after_str = cert_data.get('not_after') if not not_after_str: return False not_after_date = self._parse_certificate_date(not_after_str) not_before_str = cert_data.get('not_before') now = datetime.now(timezone.utc) # Check if certificate is within valid date range is_not_expired = not_after_date > now if not_before_str: not_before_date = self._parse_certificate_date(not_before_str) is_not_before_valid = not_before_date <= now return is_not_expired and is_not_before_valid return is_not_expired except Exception as e: self.logger.logger.debug(f"Certificate validity check failed: {e}") return False def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]: """ Extract comprehensive metadata from certificate data. Args: cert_data: Raw certificate data from crt.sh Returns: Comprehensive certificate metadata dictionary """ # Parse the issuer name to get just the organization raw_issuer_name = cert_data.get('issuer_name', '') parsed_issuer_name = self._parse_issuer_organization(raw_issuer_name) metadata = { 'certificate_id': cert_data.get('id'), 'serial_number': cert_data.get('serial_number'), 'issuer_name': parsed_issuer_name, # Use parsed organization name #'issuer_name_full': raw_issuer_name, # deliberately left out, because its not useful in most cases 'issuer_ca_id': cert_data.get('issuer_ca_id'), 'common_name': cert_data.get('common_name'), 'not_before': cert_data.get('not_before'), 'not_after': cert_data.get('not_after'), 'entry_timestamp': cert_data.get('entry_timestamp'), 'source': 'crt.sh' } try: if metadata['not_before'] and metadata['not_after']: not_before = self._parse_certificate_date(metadata['not_before']) not_after = self._parse_certificate_date(metadata['not_after']) metadata['validity_period_days'] = (not_after - not_before).days metadata['is_currently_valid'] = self._is_cert_valid(cert_data) metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30 # Add human-readable dates metadata['not_before'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC') metadata['not_after'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC') except Exception as e: self.logger.logger.debug(f"Error computing certificate metadata: {e}") metadata['is_currently_valid'] = False metadata['expires_soon'] = False return metadata def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the domain with caching support. Properly raises exceptions for network errors to allow core logic retries. """ if not _is_valid_domain(domain): return [] # Check for cancellation before starting if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before start for domain: {domain}") return [] # === CACHING LOGIC === cache_file = self._get_cache_file_path(domain) cache_status = self._get_cache_status(cache_file) certificates = [] try: if cache_status == "fresh": # Use cached data certificates = self._load_cached_certificates(cache_file) self.logger.logger.info(f"Using cached data for {domain} ({len(certificates)} certificates)") elif cache_status == "not_found": # Fresh query, create new cache certificates = self._query_crtsh_api(domain) if certificates: # Only cache if we got results self._create_cache_file(cache_file, domain, certificates) self.logger.logger.info(f"Cached fresh data for {domain} ({len(certificates)} certificates)") else: self.logger.logger.info(f"No certificates found for {domain}, not caching") elif cache_status == "stale": # Append query, update existing cache try: new_certificates = self._query_crtsh_api(domain) if new_certificates: certificates = self._append_to_cache(cache_file, new_certificates) self.logger.logger.info(f"Refreshed and appended cache for {domain}") else: # Use existing cache if API returns no results certificates = self._load_cached_certificates(cache_file) self.logger.logger.info(f"API returned no new results, using existing cache for {domain}") except requests.exceptions.RequestException: # If API call fails for stale cache, use cached data and re-raise for retry logic certificates = self._load_cached_certificates(cache_file) if certificates: self.logger.logger.warning(f"API call failed for {domain}, using stale cache data ({len(certificates)} certificates)") # Don't re-raise here, just use cached data else: # No cached data and API failed - re-raise for retry raise except requests.exceptions.RequestException as e: # Network/API errors should be re-raised so core logic can retry self.logger.logger.error(f"API query failed for {domain}: {e}") raise e except json.JSONDecodeError as e: # JSON parsing errors should also be raised for retry self.logger.logger.error(f"Failed to parse JSON response from crt.sh for {domain}: {e}") raise e # Check for cancellation after cache operations if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled after cache operations for domain: {domain}") return [] if not certificates: return [] return self._process_certificates_to_relationships(domain, certificates) def _process_certificates_to_relationships(self, domain: str, certificates: List[Dict[str, Any]]) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Process certificates to relationships using existing logic. This method contains the original processing logic from query_domain. """ relationships = [] # Check for cancellation before processing if self._stop_event and self._stop_event.is_set(): print(f"CrtSh processing cancelled before processing for domain: {domain}") return [] # Aggregate certificate data by domain domain_certificates = {} all_discovered_domains = set() # Process certificates with cancellation checking for i, cert_data in enumerate(certificates): # Check for cancellation every 5 certificates for faster response if i % 5 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}") break cert_metadata = self._extract_certificate_metadata(cert_data) cert_domains = self._extract_domains_from_certificate(cert_data) # Add all domains from this certificate to our tracking all_discovered_domains.update(cert_domains) for cert_domain in cert_domains: if not _is_valid_domain(cert_domain): continue # Initialize domain certificate list if needed if cert_domain not in domain_certificates: domain_certificates[cert_domain] = [] # Add this certificate to the domain's certificate list domain_certificates[cert_domain].append(cert_metadata) # Final cancellation check before creating relationships if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before relationship creation for domain: {domain}") return [] # Create relationships from query domain to ALL discovered domains with stop checking for i, discovered_domain in enumerate(all_discovered_domains): if discovered_domain == domain: continue # Skip self-relationships # Check for cancellation every 10 relationships if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh relationship creation cancelled for domain: {domain}") break if not _is_valid_domain(discovered_domain): continue # Get certificates for both domains query_domain_certs = domain_certificates.get(domain, []) discovered_domain_certs = domain_certificates.get(discovered_domain, []) # Find shared certificates (for metadata purposes) shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) # Calculate confidence based on relationship type and shared certificates confidence = self._calculate_domain_relationship_confidence( domain, discovered_domain, shared_certificates, all_discovered_domains ) # Create comprehensive raw data for the relationship relationship_raw_data = { 'relationship_type': 'certificate_discovery', 'shared_certificates': shared_certificates, 'total_shared_certs': len(shared_certificates), 'discovery_context': self._determine_relationship_context(discovered_domain, domain), 'domain_certificates': { domain: self._summarize_certificates(query_domain_certs), discovered_domain: self._summarize_certificates(discovered_domain_certs) } } # Create domain -> domain relationship relationships.append(( domain, discovered_domain, 'san_certificate', confidence, relationship_raw_data )) # Log the relationship discovery self.log_relationship_discovery( source_node=domain, target_node=discovered_domain, relationship_type='san_certificate', confidence_score=confidence, raw_data=relationship_raw_data, discovery_method="certificate_transparency_analysis" ) return relationships def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Find certificates that are shared between two domain certificate lists. Args: certs1: First domain's certificates certs2: Second domain's certificates Returns: List of shared certificate metadata """ shared = [] # Create a set of certificate IDs from the first list for quick lookup cert1_ids = {cert.get('certificate_id') for cert in certs1 if cert.get('certificate_id')} # Find certificates in the second list that match for cert in certs2: if cert.get('certificate_id') in cert1_ids: shared.append(cert) return shared def _summarize_certificates(self, certificates: List[Dict[str, Any]]) -> Dict[str, Any]: """ Create a summary of certificates for a domain. Args: certificates: List of certificate metadata Returns: Summary dictionary with aggregate statistics """ if not certificates: return { 'total_certificates': 0, 'valid_certificates': 0, 'expired_certificates': 0, 'expires_soon_count': 0, 'unique_issuers': [], 'latest_certificate': None, 'has_valid_cert': False, 'certificate_details': [] # Always include empty list } valid_count = sum(1 for cert in certificates if cert.get('is_currently_valid')) expired_count = len(certificates) - valid_count expires_soon_count = sum(1 for cert in certificates if cert.get('expires_soon')) # Get unique issuers (using parsed organization names) unique_issuers = list(set(cert.get('issuer_name') for cert in certificates if cert.get('issuer_name'))) # Find the most recent certificate latest_cert = None latest_date = None for cert in certificates: try: if cert.get('not_before'): cert_date = self._parse_certificate_date(cert['not_before']) if latest_date is None or cert_date > latest_date: latest_date = cert_date latest_cert = cert except Exception: continue # Sort certificates by date for better display (newest first) sorted_certificates = sorted( certificates, key=lambda c: self._get_certificate_sort_date(c), reverse=True ) return { 'total_certificates': len(certificates), 'valid_certificates': valid_count, 'expired_certificates': expired_count, 'expires_soon_count': expires_soon_count, 'unique_issuers': unique_issuers, 'latest_certificate': latest_cert, 'has_valid_cert': valid_count > 0, 'certificate_details': sorted_certificates # Include full certificate details } def _get_certificate_sort_date(self, cert: Dict[str, Any]) -> datetime: """ Get a sortable date from certificate data for chronological ordering. Args: cert: Certificate metadata dictionary Returns: Datetime object for sorting (falls back to epoch if parsing fails) """ try: # Try not_before first (issue date) if cert.get('not_before'): return self._parse_certificate_date(cert['not_before']) # Fall back to entry_timestamp if available if cert.get('entry_timestamp'): return self._parse_certificate_date(cert['entry_timestamp']) # Last resort - return a very old date for certificates without dates return datetime(1970, 1, 1, tzinfo=timezone.utc) except Exception: # If all parsing fails, return epoch return datetime(1970, 1, 1, tzinfo=timezone.utc) def _calculate_domain_relationship_confidence(self, domain1: str, domain2: str, shared_certificates: List[Dict[str, Any]], all_discovered_domains: Set[str]) -> float: """ Calculate confidence score for domain relationship based on various factors. Args: domain1: Source domain (query domain) domain2: Target domain (discovered domain) shared_certificates: List of shared certificate metadata all_discovered_domains: All domains discovered in this query Returns: Confidence score between 0.0 and 1.0 """ base_confidence = 0.9 # Adjust confidence based on domain relationship context relationship_context = self._determine_relationship_context(domain2, domain1) if relationship_context == 'exact_match': context_bonus = 0.0 # This shouldn't happen, but just in case elif relationship_context == 'subdomain': context_bonus = 0.1 # High confidence for subdomains elif relationship_context == 'parent_domain': context_bonus = 0.05 # Medium confidence for parent domains else: context_bonus = 0.0 # Related domains get base confidence # Adjust confidence based on shared certificates if shared_certificates: shared_count = len(shared_certificates) if shared_count >= 3: shared_bonus = 0.1 elif shared_count >= 2: shared_bonus = 0.05 else: shared_bonus = 0.02 # Additional bonus for valid shared certificates valid_shared = sum(1 for cert in shared_certificates if cert.get('is_currently_valid')) if valid_shared > 0: validity_bonus = 0.05 else: validity_bonus = 0.0 else: # Even without shared certificates, domains found in the same query have some relationship shared_bonus = 0.0 validity_bonus = 0.0 # Adjust confidence based on certificate issuer reputation (if shared certificates exist) issuer_bonus = 0.0 if shared_certificates: for cert in shared_certificates: issuer = cert.get('issuer_name', '').lower() if any(trusted_ca in issuer for trusted_ca in ['let\'s encrypt', 'digicert', 'sectigo', 'globalsign']): issuer_bonus = max(issuer_bonus, 0.03) break # Calculate final confidence final_confidence = base_confidence + context_bonus + shared_bonus + validity_bonus + issuer_bonus return max(0.1, min(1.0, final_confidence)) # Clamp between 0.1 and 1.0 def _determine_relationship_context(self, cert_domain: str, query_domain: str) -> str: """ Determine the context of the relationship between certificate domain and query domain. Args: cert_domain: Domain found in certificate query_domain: Original query domain Returns: String describing the relationship context """ if cert_domain == query_domain: return 'exact_match' elif cert_domain.endswith(f'.{query_domain}'): return 'subdomain' elif query_domain.endswith(f'.{cert_domain}'): return 'parent_domain' else: return 'related_domain' def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the IP address. Note: crt.sh doesn't typically index by IP, so this returns empty results. Args: ip: IP address to investigate Returns: Empty list (crt.sh doesn't support IP-based certificate queries effectively) """ # crt.sh doesn't effectively support IP-based certificate queries return [] def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]: """ Extract all domains from certificate data. Args: cert_data: Certificate data from crt.sh API Returns: Set of unique domain names found in the certificate """ domains = set() # Extract from common name common_name = cert_data.get('common_name', '') if common_name: cleaned_cn = self._clean_domain_name(common_name) if cleaned_cn: domains.update(cleaned_cn) # Extract from name_value field (contains SANs) name_value = cert_data.get('name_value', '') if name_value: # Split by newlines and clean each domain for line in name_value.split('\n'): cleaned_domains = self._clean_domain_name(line.strip()) if cleaned_domains: domains.update(cleaned_domains) return domains def _clean_domain_name(self, domain_name: str) -> List[str]: """ Clean and normalize domain name from certificate data. Now returns a list to handle wildcards correctly. """ if not domain_name: return [] domain = domain_name.strip().lower() # Remove protocol if present if domain.startswith(('http://', 'https://')): domain = domain.split('://', 1)[1] # Remove path if present if '/' in domain: domain = domain.split('/', 1)[0] # Remove port if present if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6 domain = domain.split(':', 1)[0] # Handle wildcard domains cleaned_domains = [] if domain.startswith('*.'): # Add both the wildcard and the base domain cleaned_domains.append(domain) cleaned_domains.append(domain[2:]) else: cleaned_domains.append(domain) # Remove any remaining invalid characters and validate final_domains = [] for d in cleaned_domains: d = re.sub(r'[^\w\-\.]', '', d) if d and not d.startswith(('.', '-')) and not d.endswith(('.', '-')): final_domains.append(d) return [d for d in final_domains if _is_valid_domain(d)]