""" Certificate Transparency provider using crt.sh. Discovers domain relationships through certificate SAN analysis with comprehensive certificate tracking. Stores certificates as metadata on domain nodes rather than creating certificate nodes. """ import json import re from typing import List, Dict, Any, Tuple, Set from urllib.parse import quote from datetime import datetime, timezone import requests from .base_provider import BaseProvider from utils.helpers import _is_valid_domain class CrtShProvider(BaseProvider): """ Provider for querying crt.sh certificate transparency database. Now uses session-specific configuration and caching. """ def __init__(self, session_config=None): """Initialize CrtSh provider with session-specific configuration.""" super().__init__( name="crtsh", rate_limit=60, timeout=15, session_config=session_config ) self.base_url = "https://crt.sh/" self._stop_event = None def get_name(self) -> str: """Return the provider name.""" return "crtsh" def get_display_name(self) -> str: """Return the provider display name for the UI.""" return "crt.sh" def requires_api_key(self) -> bool: """Return True if the provider requires an API key.""" return False def get_eligibility(self) -> Dict[str, bool]: """Return a dictionary indicating if the provider can query domains and/or IPs.""" return {'domains': True, 'ips': False} def is_available(self) -> bool: """ Check if the provider is configured to be used. This method is intentionally simple and does not perform a network request to avoid blocking application startup. """ return True def _parse_certificate_date(self, date_string: str) -> datetime: """ Parse certificate date from crt.sh format. Args: date_string: Date string from crt.sh API Returns: Parsed datetime object in UTC """ if not date_string: raise ValueError("Empty date string") try: # Handle various possible formats from crt.sh if date_string.endswith('Z'): return datetime.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc) elif '+' in date_string or date_string.endswith('UTC'): # Handle timezone-aware strings date_string = date_string.replace('UTC', '').strip() if '+' in date_string: date_string = date_string.split('+')[0] return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) else: # Assume UTC if no timezone specified return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) except Exception as e: # Fallback: try parsing without timezone info and assume UTC try: return datetime.strptime(date_string[:19], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc) except Exception: raise ValueError(f"Unable to parse date: {date_string}") from e def _is_cert_valid(self, cert_data: Dict[str, Any]) -> bool: """ Check if a certificate is currently valid based on its expiry date. Args: cert_data: Certificate data from crt.sh Returns: True if certificate is currently valid (not expired) """ try: not_after_str = cert_data.get('not_after') if not not_after_str: return False not_after_date = self._parse_certificate_date(not_after_str) not_before_str = cert_data.get('not_before') now = datetime.now(timezone.utc) # Check if certificate is within valid date range is_not_expired = not_after_date > now if not_before_str: not_before_date = self._parse_certificate_date(not_before_str) is_not_before_valid = not_before_date <= now return is_not_expired and is_not_before_valid return is_not_expired except Exception as e: self.logger.logger.debug(f"Certificate validity check failed: {e}") return False def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]: """ Extract comprehensive metadata from certificate data. Args: cert_data: Raw certificate data from crt.sh Returns: Comprehensive certificate metadata dictionary """ metadata = { 'certificate_id': cert_data.get('id'), 'serial_number': cert_data.get('serial_number'), 'issuer_name': cert_data.get('issuer_name'), 'issuer_ca_id': cert_data.get('issuer_ca_id'), 'common_name': cert_data.get('common_name'), 'not_before': cert_data.get('not_before'), 'not_after': cert_data.get('not_after'), 'entry_timestamp': cert_data.get('entry_timestamp'), 'source': 'crt.sh' } try: if metadata['not_before'] and metadata['not_after']: not_before = self._parse_certificate_date(metadata['not_before']) not_after = self._parse_certificate_date(metadata['not_after']) metadata['validity_period_days'] = (not_after - not_before).days metadata['is_currently_valid'] = self._is_cert_valid(cert_data) metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30 # Add human-readable dates metadata['not_before'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC') metadata['not_after'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC') except Exception as e: self.logger.logger.debug(f"Error computing certificate metadata: {e}") metadata['is_currently_valid'] = False metadata['expires_soon'] = False return metadata def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the domain. """ if not _is_valid_domain(domain): return [] # Check for cancellation before starting if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before start for domain: {domain}") return [] relationships = [] try: # Query crt.sh for certificates url = f"{self.base_url}?q={quote(domain)}&output=json" response = self.make_request(url, target_indicator=domain, max_retries=3) if not response or response.status_code != 200: return [] # Check for cancellation after request if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled after request for domain: {domain}") return [] certificates = response.json() if not certificates: return [] # Check for cancellation before processing if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before processing for domain: {domain}") return [] # Aggregate certificate data by domain domain_certificates = {} all_discovered_domains = set() # Process certificates with cancellation checking for i, cert_data in enumerate(certificates): # Check for cancellation every 5 certificates instead of 10 for faster response if i % 5 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}") break cert_metadata = self._extract_certificate_metadata(cert_data) cert_domains = self._extract_domains_from_certificate(cert_data) # Add all domains from this certificate to our tracking for cert_domain in cert_domains: # Additional stop check during domain processing if i % 20 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh domain processing cancelled for domain: {domain}") break if not _is_valid_domain(cert_domain): continue all_discovered_domains.add(cert_domain) # Initialize domain certificate list if needed if cert_domain not in domain_certificates: domain_certificates[cert_domain] = [] # Add this certificate to the domain's certificate list domain_certificates[cert_domain].append(cert_metadata) # Final cancellation check before creating relationships if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before relationship creation for domain: {domain}") return [] # Create relationships from query domain to ALL discovered domains with stop checking for i, discovered_domain in enumerate(all_discovered_domains): if discovered_domain == domain: continue # Skip self-relationships # Check for cancellation every 10 relationships if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh relationship creation cancelled for domain: {domain}") break if not _is_valid_domain(discovered_domain): continue # Get certificates for both domains query_domain_certs = domain_certificates.get(domain, []) discovered_domain_certs = domain_certificates.get(discovered_domain, []) # Find shared certificates (for metadata purposes) shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) # Calculate confidence based on relationship type and shared certificates confidence = self._calculate_domain_relationship_confidence( domain, discovered_domain, shared_certificates, all_discovered_domains ) # Create comprehensive raw data for the relationship relationship_raw_data = { 'relationship_type': 'certificate_discovery', 'shared_certificates': shared_certificates, 'total_shared_certs': len(shared_certificates), 'discovery_context': self._determine_relationship_context(discovered_domain, domain), 'domain_certificates': { domain: self._summarize_certificates(query_domain_certs), discovered_domain: self._summarize_certificates(discovered_domain_certs) } } # Create domain -> domain relationship relationships.append(( domain, discovered_domain, 'san_certificate', confidence, relationship_raw_data )) # Log the relationship discovery self.log_relationship_discovery( source_node=domain, target_node=discovered_domain, relationship_type='san_certificate', confidence_score=confidence, raw_data=relationship_raw_data, discovery_method="certificate_transparency_analysis" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}") except requests.exceptions.RequestException as e: self.logger.logger.error(f"HTTP request to crt.sh failed: {e}") return relationships def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Find certificates that are shared between two domain certificate lists. Args: certs1: First domain's certificates certs2: Second domain's certificates Returns: List of shared certificate metadata """ shared = [] # Create a set of certificate IDs from the first list for quick lookup cert1_ids = {cert.get('certificate_id') for cert in certs1 if cert.get('certificate_id')} # Find certificates in the second list that match for cert in certs2: if cert.get('certificate_id') in cert1_ids: shared.append(cert) return shared def _summarize_certificates(self, certificates: List[Dict[str, Any]]) -> Dict[str, Any]: """ Create a summary of certificates for a domain. Args: certificates: List of certificate metadata Returns: Summary dictionary with aggregate statistics """ if not certificates: return { 'total_certificates': 0, 'valid_certificates': 0, 'expired_certificates': 0, 'expires_soon_count': 0, 'unique_issuers': [], 'latest_certificate': None, 'has_valid_cert': False } valid_count = sum(1 for cert in certificates if cert.get('is_currently_valid')) expired_count = len(certificates) - valid_count expires_soon_count = sum(1 for cert in certificates if cert.get('expires_soon')) # Get unique issuers unique_issuers = list(set(cert.get('issuer_name') for cert in certificates if cert.get('issuer_name'))) # Find the most recent certificate latest_cert = None latest_date = None for cert in certificates: try: if cert.get('not_before'): cert_date = self._parse_certificate_date(cert['not_before']) if latest_date is None or cert_date > latest_date: latest_date = cert_date latest_cert = cert except Exception: continue return { 'total_certificates': len(certificates), 'valid_certificates': valid_count, 'expired_certificates': expired_count, 'expires_soon_count': expires_soon_count, 'unique_issuers': unique_issuers, 'latest_certificate': latest_cert, 'has_valid_cert': valid_count > 0, 'certificate_details': certificates # Full details for forensic analysis } def _calculate_domain_relationship_confidence(self, domain1: str, domain2: str, shared_certificates: List[Dict[str, Any]], all_discovered_domains: Set[str]) -> float: """ Calculate confidence score for domain relationship based on various factors. Args: domain1: Source domain (query domain) domain2: Target domain (discovered domain) shared_certificates: List of shared certificate metadata all_discovered_domains: All domains discovered in this query Returns: Confidence score between 0.0 and 1.0 """ base_confidence = 0.9 # Adjust confidence based on domain relationship context relationship_context = self._determine_relationship_context(domain2, domain1) if relationship_context == 'exact_match': context_bonus = 0.0 # This shouldn't happen, but just in case elif relationship_context == 'subdomain': context_bonus = 0.1 # High confidence for subdomains elif relationship_context == 'parent_domain': context_bonus = 0.05 # Medium confidence for parent domains else: context_bonus = 0.0 # Related domains get base confidence # Adjust confidence based on shared certificates if shared_certificates: shared_count = len(shared_certificates) if shared_count >= 3: shared_bonus = 0.1 elif shared_count >= 2: shared_bonus = 0.05 else: shared_bonus = 0.02 # Additional bonus for valid shared certificates valid_shared = sum(1 for cert in shared_certificates if cert.get('is_currently_valid')) if valid_shared > 0: validity_bonus = 0.05 else: validity_bonus = 0.0 else: # Even without shared certificates, domains found in the same query have some relationship shared_bonus = 0.0 validity_bonus = 0.0 # Adjust confidence based on certificate issuer reputation (if shared certificates exist) issuer_bonus = 0.0 if shared_certificates: for cert in shared_certificates: issuer = cert.get('issuer_name', '').lower() if any(trusted_ca in issuer for trusted_ca in ['let\'s encrypt', 'digicert', 'sectigo', 'globalsign']): issuer_bonus = max(issuer_bonus, 0.03) break # Calculate final confidence final_confidence = base_confidence + context_bonus + shared_bonus + validity_bonus + issuer_bonus return max(0.1, min(1.0, final_confidence)) # Clamp between 0.1 and 1.0 def _determine_relationship_context(self, cert_domain: str, query_domain: str) -> str: """ Determine the context of the relationship between certificate domain and query domain. Args: cert_domain: Domain found in certificate query_domain: Original query domain Returns: String describing the relationship context """ if cert_domain == query_domain: return 'exact_match' elif cert_domain.endswith(f'.{query_domain}'): return 'subdomain' elif query_domain.endswith(f'.{cert_domain}'): return 'parent_domain' else: return 'related_domain' def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the IP address. Note: crt.sh doesn't typically index by IP, so this returns empty results. Args: ip: IP address to investigate Returns: Empty list (crt.sh doesn't support IP-based certificate queries effectively) """ # crt.sh doesn't effectively support IP-based certificate queries return [] def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]: """ Extract all domains from certificate data. Args: cert_data: Certificate data from crt.sh API Returns: Set of unique domain names found in the certificate """ domains = set() # Extract from common name common_name = cert_data.get('common_name', '') if common_name: cleaned_cn = self._clean_domain_name(common_name) if cleaned_cn: domains.update(cleaned_cn) # Extract from name_value field (contains SANs) name_value = cert_data.get('name_value', '') if name_value: # Split by newlines and clean each domain for line in name_value.split('\n'): cleaned_domains = self._clean_domain_name(line.strip()) if cleaned_domains: domains.update(cleaned_domains) return domains def _clean_domain_name(self, domain_name: str) -> List[str]: """ Clean and normalize domain name from certificate data. Now returns a list to handle wildcards correctly. """ if not domain_name: return [] domain = domain_name.strip().lower() # Remove protocol if present if domain.startswith(('http://', 'https://')): domain = domain.split('://', 1)[1] # Remove path if present if '/' in domain: domain = domain.split('/', 1)[0] # Remove port if present if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6 domain = domain.split(':', 1)[0] # Handle wildcard domains cleaned_domains = [] if domain.startswith('*.'): # Add both the wildcard and the base domain cleaned_domains.append(domain) cleaned_domains.append(domain[2:]) else: cleaned_domains.append(domain) # Remove any remaining invalid characters and validate final_domains = [] for d in cleaned_domains: d = re.sub(r'[^\w\-\.]', '', d) if d and not d.startswith(('.', '-')) and not d.endswith(('.', '-')): final_domains.append(d) return [d for d in final_domains if _is_valid_domain(d)]