""" Certificate Transparency provider using crt.sh. Discovers domain relationships through certificate SAN analysis with comprehensive certificate tracking. Stores certificates as metadata on domain nodes rather than creating certificate nodes. """ import json import re from typing import List, Dict, Any, Tuple, Set from urllib.parse import quote from datetime import datetime, timezone from .base_provider import BaseProvider from utils.helpers import _is_valid_domain from core.graph_manager import RelationshipType class CrtShProvider(BaseProvider): """ Provider for querying crt.sh certificate transparency database. Now uses session-specific configuration and caching. """ def __init__(self, session_config=None): """Initialize CrtSh provider with session-specific configuration.""" super().__init__( name="crtsh", rate_limit=60, timeout=15, session_config=session_config ) self.base_url = "https://crt.sh/" self._stop_event = None def get_name(self) -> str: """Return the provider name.""" return "crtsh" def is_available(self) -> bool: """ Check if the provider is configured to be used. This method is intentionally simple and does not perform a network request to avoid blocking application startup. """ return True def _parse_certificate_date(self, date_string: str) -> datetime: """ Parse certificate date from crt.sh format. Args: date_string: Date string from crt.sh API Returns: Parsed datetime object in UTC """ if not date_string: raise ValueError("Empty date string") try: # Handle various possible formats from crt.sh if date_string.endswith('Z'): return datetime.fromisoformat(date_string[:-1]).replace(tzinfo=timezone.utc) elif '+' in date_string or date_string.endswith('UTC'): # Handle timezone-aware strings date_string = date_string.replace('UTC', '').strip() if '+' in date_string: date_string = date_string.split('+')[0] return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) else: # Assume UTC if no timezone specified return datetime.fromisoformat(date_string).replace(tzinfo=timezone.utc) except Exception as e: # Fallback: try parsing without timezone info and assume UTC try: return datetime.strptime(date_string[:19], "%Y-%m-%dT%H:%M:%S").replace(tzinfo=timezone.utc) except Exception: raise ValueError(f"Unable to parse date: {date_string}") from e def _is_cert_valid(self, cert_data: Dict[str, Any]) -> bool: """ Check if a certificate is currently valid based on its expiry date. Args: cert_data: Certificate data from crt.sh Returns: True if certificate is currently valid (not expired) """ try: not_after_str = cert_data.get('not_after') if not not_after_str: return False not_after_date = self._parse_certificate_date(not_after_str) not_before_str = cert_data.get('not_before') now = datetime.now(timezone.utc) # Check if certificate is within valid date range is_not_expired = not_after_date > now if not_before_str: not_before_date = self._parse_certificate_date(not_before_str) is_not_before_valid = not_before_date <= now return is_not_expired and is_not_before_valid return is_not_expired except Exception as e: self.logger.logger.debug(f"Certificate validity check failed: {e}") return False def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]: """ Extract comprehensive metadata from certificate data. Args: cert_data: Raw certificate data from crt.sh Returns: Comprehensive certificate metadata dictionary """ metadata = { 'certificate_id': cert_data.get('id'), 'serial_number': cert_data.get('serial_number'), 'issuer_name': cert_data.get('issuer_name'), 'issuer_ca_id': cert_data.get('issuer_ca_id'), 'common_name': cert_data.get('common_name'), 'not_before': cert_data.get('not_before'), 'not_after': cert_data.get('not_after'), 'entry_timestamp': cert_data.get('entry_timestamp'), 'source': 'crt.sh' } # Add computed fields try: if metadata['not_before'] and metadata['not_after']: not_before = self._parse_certificate_date(metadata['not_before']) not_after = self._parse_certificate_date(metadata['not_after']) metadata['validity_period_days'] = (not_after - not_before).days metadata['is_currently_valid'] = self._is_cert_valid(cert_data) metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30 # Add human-readable dates metadata['not_before_formatted'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC') metadata['not_after_formatted'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC') except Exception as e: self.logger.logger.debug(f"Error computing certificate metadata: {e}") metadata['is_currently_valid'] = False metadata['expires_soon'] = False return metadata def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the domain. Creates domain-to-domain relationships and stores certificate data as metadata. Now supports early termination via stop_event. """ if not _is_valid_domain(domain): return [] # Check for cancellation before starting if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before start for domain: {domain}") return [] relationships = [] try: # Query crt.sh for certificates url = f"{self.base_url}?q={quote(domain)}&output=json" response = self.make_request(url, target_indicator=domain, max_retries=1) # Reduce retries for faster cancellation if not response or response.status_code != 200: return [] # Check for cancellation after request if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled after request for domain: {domain}") return [] certificates = response.json() if not certificates: return [] # Check for cancellation before processing if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before processing for domain: {domain}") return [] # Aggregate certificate data by domain domain_certificates = {} all_discovered_domains = set() # Process certificates and group by domain (with cancellation checks) for i, cert_data in enumerate(certificates): # Check for cancellation every 10 certificates if i % 10 == 0 and self._stop_event and self._stop_event.is_set(): print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}") break cert_metadata = self._extract_certificate_metadata(cert_data) cert_domains = self._extract_domains_from_certificate(cert_data) # Add all domains from this certificate to our tracking for cert_domain in cert_domains: if not _is_valid_domain(cert_domain): continue all_discovered_domains.add(cert_domain) # Initialize domain certificate list if needed if cert_domain not in domain_certificates: domain_certificates[cert_domain] = [] # Add this certificate to the domain's certificate list domain_certificates[cert_domain].append(cert_metadata) # Final cancellation check before creating relationships if self._stop_event and self._stop_event.is_set(): print(f"CrtSh query cancelled before relationship creation for domain: {domain}") return [] # Create relationships from query domain to ALL discovered domains for discovered_domain in all_discovered_domains: if discovered_domain == domain: continue # Skip self-relationships # Check for cancellation during relationship creation if self._stop_event and self._stop_event.is_set(): print(f"CrtSh relationship creation cancelled for domain: {domain}") break if not _is_valid_domain(discovered_domain): continue # Get certificates for both domains query_domain_certs = domain_certificates.get(domain, []) discovered_domain_certs = domain_certificates.get(discovered_domain, []) # Find shared certificates (for metadata purposes) shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs) # Calculate confidence based on relationship type and shared certificates confidence = self._calculate_domain_relationship_confidence( domain, discovered_domain, shared_certificates, all_discovered_domains ) # Create comprehensive raw data for the relationship relationship_raw_data = { 'relationship_type': 'certificate_discovery', 'shared_certificates': shared_certificates, 'total_shared_certs': len(shared_certificates), 'discovery_context': self._determine_relationship_context(discovered_domain, domain), 'domain_certificates': { domain: self._summarize_certificates(query_domain_certs), discovered_domain: self._summarize_certificates(discovered_domain_certs) } } # Create domain -> domain relationship relationships.append(( domain, discovered_domain, RelationshipType.SAN_CERTIFICATE, confidence, relationship_raw_data )) # Log the relationship discovery self.log_relationship_discovery( source_node=domain, target_node=discovered_domain, relationship_type=RelationshipType.SAN_CERTIFICATE, confidence_score=confidence, raw_data=relationship_raw_data, discovery_method="certificate_transparency_analysis" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}") except Exception as e: self.logger.logger.error(f"Error querying crt.sh for {domain}: {e}") return relationships def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Find certificates that are shared between two domain certificate lists. Args: certs1: First domain's certificates certs2: Second domain's certificates Returns: List of shared certificate metadata """ shared = [] # Create a set of certificate IDs from the first list for quick lookup cert1_ids = {cert.get('certificate_id') for cert in certs1 if cert.get('certificate_id')} # Find certificates in the second list that match for cert in certs2: if cert.get('certificate_id') in cert1_ids: shared.append(cert) return shared def _summarize_certificates(self, certificates: List[Dict[str, Any]]) -> Dict[str, Any]: """ Create a summary of certificates for a domain. Args: certificates: List of certificate metadata Returns: Summary dictionary with aggregate statistics """ if not certificates: return { 'total_certificates': 0, 'valid_certificates': 0, 'expired_certificates': 0, 'expires_soon_count': 0, 'unique_issuers': [], 'latest_certificate': None, 'has_valid_cert': False } valid_count = sum(1 for cert in certificates if cert.get('is_currently_valid')) expired_count = len(certificates) - valid_count expires_soon_count = sum(1 for cert in certificates if cert.get('expires_soon')) # Get unique issuers unique_issuers = list(set(cert.get('issuer_name') for cert in certificates if cert.get('issuer_name'))) # Find the most recent certificate latest_cert = None latest_date = None for cert in certificates: try: if cert.get('not_before'): cert_date = self._parse_certificate_date(cert['not_before']) if latest_date is None or cert_date > latest_date: latest_date = cert_date latest_cert = cert except Exception: continue return { 'total_certificates': len(certificates), 'valid_certificates': valid_count, 'expired_certificates': expired_count, 'expires_soon_count': expires_soon_count, 'unique_issuers': unique_issuers, 'latest_certificate': latest_cert, 'has_valid_cert': valid_count > 0, 'certificate_details': certificates # Full details for forensic analysis } def _calculate_domain_relationship_confidence(self, domain1: str, domain2: str, shared_certificates: List[Dict[str, Any]], all_discovered_domains: Set[str]) -> float: """ Calculate confidence score for domain relationship based on various factors. Args: domain1: Source domain (query domain) domain2: Target domain (discovered domain) shared_certificates: List of shared certificate metadata all_discovered_domains: All domains discovered in this query Returns: Confidence score between 0.0 and 1.0 """ base_confidence = RelationshipType.SAN_CERTIFICATE.default_confidence # Adjust confidence based on domain relationship context relationship_context = self._determine_relationship_context(domain2, domain1) if relationship_context == 'exact_match': context_bonus = 0.0 # This shouldn't happen, but just in case elif relationship_context == 'subdomain': context_bonus = 0.1 # High confidence for subdomains elif relationship_context == 'parent_domain': context_bonus = 0.05 # Medium confidence for parent domains else: context_bonus = 0.0 # Related domains get base confidence # Adjust confidence based on shared certificates if shared_certificates: shared_count = len(shared_certificates) if shared_count >= 3: shared_bonus = 0.1 elif shared_count >= 2: shared_bonus = 0.05 else: shared_bonus = 0.02 # Additional bonus for valid shared certificates valid_shared = sum(1 for cert in shared_certificates if cert.get('is_currently_valid')) if valid_shared > 0: validity_bonus = 0.05 else: validity_bonus = 0.0 else: # Even without shared certificates, domains found in the same query have some relationship shared_bonus = 0.0 validity_bonus = 0.0 # Adjust confidence based on certificate issuer reputation (if shared certificates exist) issuer_bonus = 0.0 if shared_certificates: for cert in shared_certificates: issuer = cert.get('issuer_name', '').lower() if any(trusted_ca in issuer for trusted_ca in ['let\'s encrypt', 'digicert', 'sectigo', 'globalsign']): issuer_bonus = max(issuer_bonus, 0.03) break # Calculate final confidence final_confidence = base_confidence + context_bonus + shared_bonus + validity_bonus + issuer_bonus return max(0.1, min(1.0, final_confidence)) # Clamp between 0.1 and 1.0 def _determine_relationship_context(self, cert_domain: str, query_domain: str) -> str: """ Determine the context of the relationship between certificate domain and query domain. Args: cert_domain: Domain found in certificate query_domain: Original query domain Returns: String describing the relationship context """ if cert_domain == query_domain: return 'exact_match' elif cert_domain.endswith(f'.{query_domain}'): return 'subdomain' elif query_domain.endswith(f'.{cert_domain}'): return 'parent_domain' else: return 'related_domain' def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the IP address. Note: crt.sh doesn't typically index by IP, so this returns empty results. Args: ip: IP address to investigate Returns: Empty list (crt.sh doesn't support IP-based certificate queries effectively) """ # crt.sh doesn't effectively support IP-based certificate queries return [] def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]: """ Extract all domains from certificate data. Args: cert_data: Certificate data from crt.sh API Returns: Set of unique domain names found in the certificate """ domains = set() # Extract from common name common_name = cert_data.get('common_name', '') if common_name: cleaned_cn = self._clean_domain_name(common_name) if cleaned_cn and _is_valid_domain(cleaned_cn): domains.add(cleaned_cn) # Extract from name_value field (contains SANs) name_value = cert_data.get('name_value', '') if name_value: # Split by newlines and clean each domain for line in name_value.split('\n'): cleaned_domain = self._clean_domain_name(line.strip()) if cleaned_domain and _is_valid_domain(cleaned_domain): domains.add(cleaned_domain) return domains def _clean_domain_name(self, domain_name: str) -> str: """ Clean and normalize domain name from certificate data. Args: domain_name: Raw domain name from certificate Returns: Cleaned domain name or empty string if invalid """ if not domain_name: return "" # Remove common prefixes and clean up domain = domain_name.strip().lower() # Remove protocol if present if domain.startswith(('http://', 'https://')): domain = domain.split('://', 1)[1] # Remove path if present if '/' in domain: domain = domain.split('/', 1)[0] # Remove port if present if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6 domain = domain.split(':', 1)[0] # Handle wildcard domains if domain.startswith('*.'): domain = domain[2:] # Remove any remaining invalid characters domain = re.sub(r'[^\w\-\.]', '', domain) # Ensure it's not empty and doesn't start/end with dots or hyphens if domain and not domain.startswith(('.', '-')) and not domain.endswith(('.', '-')): return domain return ""