""" Certificate Transparency provider using crt.sh. Discovers domain relationships through certificate SAN analysis. """ import json import re from typing import List, Dict, Any, Tuple, Set from urllib.parse import quote from .base_provider import BaseProvider from core.graph_manager import RelationshipType class CrtShProvider(BaseProvider): """ Provider for querying crt.sh certificate transparency database. Discovers domain relationships through certificate Subject Alternative Names (SANs). """ def __init__(self): """Initialize CrtSh provider with appropriate rate limiting.""" super().__init__( name="crtsh", rate_limit=60, # Be respectful to the free service timeout=30 ) self.base_url = "https://crt.sh/" def get_name(self) -> str: """Return the provider name.""" return "crtsh" def is_available(self) -> bool: """ Check if the provider is configured to be used. This method is intentionally simple and does not perform a network request to avoid blocking application startup. """ return True def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the domain. Args: domain: Domain to investigate Returns: List of relationships discovered from certificate analysis """ if not self._is_valid_domain(domain): return [] relationships = [] try: # Query crt.sh for certificates url = f"{self.base_url}?q={quote(domain)}&output=json" response = self.make_request(url, target_indicator=domain) if not response or response.status_code != 200: return [] certificates = response.json() if not certificates: return [] # Process certificates to extract relationships seen_certificates = set() for cert_data in certificates: cert_id = cert_data.get('id') if not cert_id or cert_id in seen_certificates: continue seen_certificates.add(cert_id) # Extract domains from certificate cert_domains = self._extract_domains_from_certificate(cert_data) if domain in cert_domains and len(cert_domains) > 1: # Create relationships between domains found in the same certificate for related_domain in cert_domains: if related_domain != domain and self._is_valid_domain(related_domain): # Create SAN relationship raw_data = { 'certificate_id': cert_id, 'issuer': cert_data.get('issuer_name', ''), 'not_before': cert_data.get('not_before', ''), 'not_after': cert_data.get('not_after', ''), 'serial_number': cert_data.get('serial_number', ''), 'all_domains': list(cert_domains) } relationships.append(( domain, related_domain, RelationshipType.SAN_CERTIFICATE, RelationshipType.SAN_CERTIFICATE.default_confidence, raw_data )) # Log the discovery self.log_relationship_discovery( source_node=domain, target_node=related_domain, relationship_type=RelationshipType.SAN_CERTIFICATE, confidence_score=RelationshipType.SAN_CERTIFICATE.default_confidence, raw_data=raw_data, discovery_method="certificate_san_analysis" ) except json.JSONDecodeError as e: self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}") except Exception as e: self.logger.logger.error(f"Error querying crt.sh for {domain}: {e}") return relationships def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]: """ Query crt.sh for certificates containing the IP address. Note: crt.sh doesn't typically index by IP, so this returns empty results. Args: ip: IP address to investigate Returns: Empty list (crt.sh doesn't support IP-based certificate queries effectively) """ # crt.sh doesn't effectively support IP-based certificate queries # This would require parsing certificate details for IP SANs, which is complex return [] def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]: """ Extract all domains from certificate data. Args: cert_data: Certificate data from crt.sh API Returns: Set of unique domain names found in the certificate """ domains = set() # Extract from common name common_name = cert_data.get('common_name', '') if common_name: cleaned_cn = self._clean_domain_name(common_name) if cleaned_cn and self._is_valid_domain(cleaned_cn): domains.add(cleaned_cn) # Extract from name_value field (contains SANs) name_value = cert_data.get('name_value', '') if name_value: # Split by newlines and clean each domain for line in name_value.split('\n'): cleaned_domain = self._clean_domain_name(line.strip()) if cleaned_domain and self._is_valid_domain(cleaned_domain): domains.add(cleaned_domain) return domains def _clean_domain_name(self, domain_name: str) -> str: """ Clean and normalize domain name from certificate data. Args: domain_name: Raw domain name from certificate Returns: Cleaned domain name or empty string if invalid """ if not domain_name: return "" # Remove common prefixes and clean up domain = domain_name.strip().lower() # Remove protocol if present if domain.startswith(('http://', 'https://')): domain = domain.split('://', 1)[1] # Remove path if present if '/' in domain: domain = domain.split('/', 1)[0] # Remove port if present if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6 domain = domain.split(':', 1)[0] # Handle wildcard domains if domain.startswith('*.'): domain = domain[2:] # Remove any remaining invalid characters domain = re.sub(r'[^\w\-\.]', '', domain) # Ensure it's not empty and doesn't start/end with dots or hyphens if domain and not domain.startswith(('.', '-')) and not domain.endswith(('.', '-')): return domain return "" def get_certificate_details(self, certificate_id: str) -> Dict[str, Any]: """ Get detailed information about a specific certificate. Args: certificate_id: Certificate ID from crt.sh Returns: Dictionary containing certificate details """ try: url = f"{self.base_url}?id={certificate_id}&output=json" response = self.make_request(url, target_indicator=f"cert_{certificate_id}") if response and response.status_code == 200: return response.json() except Exception as e: self.logger.logger.error(f"Error fetching certificate details for {certificate_id}: {e}") return {} def search_certificates_by_serial(self, serial_number: str) -> List[Dict[str, Any]]: """ Search for certificates by serial number. Args: serial_number: Certificate serial number Returns: List of matching certificates """ try: url = f"{self.base_url}?serial={quote(serial_number)}&output=json" response = self.make_request(url, target_indicator=f"serial_{serial_number}") if response and response.status_code == 200: return response.json() except Exception as e: self.logger.logger.error(f"Error searching certificates by serial {serial_number}: {e}") return [] def get_issuer_certificates(self, issuer_name: str) -> List[Dict[str, Any]]: """ Get certificates issued by a specific CA. Args: issuer_name: Certificate Authority name Returns: List of certificates from the specified issuer """ try: url = f"{self.base_url}?issuer={quote(issuer_name)}&output=json" response = self.make_request(url, target_indicator=f"issuer_{issuer_name}") if response and response.status_code == 200: return response.json() except Exception as e: self.logger.logger.error(f"Error fetching certificates for issuer {issuer_name}: {e}") return []