284 lines
10 KiB
Python
284 lines
10 KiB
Python
"""
|
|
Certificate Transparency provider using crt.sh.
|
|
Discovers domain relationships through certificate SAN analysis.
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
from typing import List, Dict, Any, Tuple, Set
|
|
from urllib.parse import quote
|
|
from datetime import datetime, timezone
|
|
|
|
from .base_provider import BaseProvider
|
|
from core.graph_manager import RelationshipType
|
|
|
|
|
|
class CrtShProvider(BaseProvider):
|
|
"""
|
|
Provider for querying crt.sh certificate transparency database.
|
|
Discovers domain relationships through certificate Subject Alternative Names (SANs).
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize CrtSh provider with appropriate rate limiting."""
|
|
super().__init__(
|
|
name="crtsh",
|
|
rate_limit=60, # Be respectful to the free service
|
|
timeout=30
|
|
)
|
|
self.base_url = "https://crt.sh/"
|
|
|
|
def get_name(self) -> str:
|
|
"""Return the provider name."""
|
|
return "crtsh"
|
|
|
|
def is_available(self) -> bool:
|
|
"""
|
|
Check if the provider is configured to be used.
|
|
This method is intentionally simple and does not perform a network request
|
|
to avoid blocking application startup.
|
|
"""
|
|
return True
|
|
|
|
def _is_cert_valid(self, cert_data: Dict[str, Any]) -> bool:
|
|
"""Check if a certificate is currently valid."""
|
|
try:
|
|
not_after_str = cert_data.get('not_after')
|
|
if not_after_str:
|
|
# Append 'Z' to indicate UTC if it's not present
|
|
if not not_after_str.endswith('Z'):
|
|
not_after_str += 'Z'
|
|
not_after_date = datetime.fromisoformat(not_after_str.replace('Z', '+00:00'))
|
|
return not_after_date > datetime.now(timezone.utc)
|
|
except Exception:
|
|
return False
|
|
return False
|
|
|
|
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query crt.sh for certificates containing the domain.
|
|
|
|
Args:
|
|
domain: Domain to investigate
|
|
|
|
Returns:
|
|
List of relationships discovered from certificate analysis
|
|
"""
|
|
if not self._is_valid_domain(domain):
|
|
return []
|
|
|
|
relationships = []
|
|
|
|
try:
|
|
# Query crt.sh for certificates
|
|
url = f"{self.base_url}?q={quote(domain)}&output=json"
|
|
response = self.make_request(url, target_indicator=domain)
|
|
|
|
if not response or response.status_code != 200:
|
|
return []
|
|
|
|
certificates = response.json()
|
|
|
|
if not certificates:
|
|
return []
|
|
|
|
# Process certificates to extract relationships
|
|
discovered_subdomains = {}
|
|
|
|
for cert_data in certificates:
|
|
cert_domains = self._extract_domains_from_certificate(cert_data)
|
|
is_valid = self._is_cert_valid(cert_data)
|
|
|
|
for subdomain in cert_domains:
|
|
if self._is_valid_domain(subdomain) and subdomain != domain:
|
|
if subdomain not in discovered_subdomains:
|
|
discovered_subdomains[subdomain] = {'has_valid_cert': False, 'issuers': set()}
|
|
|
|
if is_valid:
|
|
discovered_subdomains[subdomain]['has_valid_cert'] = True
|
|
|
|
issuer = cert_data.get('issuer_name')
|
|
if issuer:
|
|
discovered_subdomains[subdomain]['issuers'].add(issuer)
|
|
|
|
# Create relationships from the discovered subdomains
|
|
for subdomain, data in discovered_subdomains.items():
|
|
raw_data = {
|
|
'has_valid_cert': data['has_valid_cert'],
|
|
'issuers': list(data['issuers']),
|
|
'source': 'crt.sh'
|
|
}
|
|
relationships.append((
|
|
domain,
|
|
subdomain,
|
|
RelationshipType.SAN_CERTIFICATE,
|
|
RelationshipType.SAN_CERTIFICATE.default_confidence,
|
|
raw_data
|
|
))
|
|
self.log_relationship_discovery(
|
|
source_node=domain,
|
|
target_node=subdomain,
|
|
relationship_type=RelationshipType.SAN_CERTIFICATE,
|
|
confidence_score=RelationshipType.SAN_CERTIFICATE.default_confidence,
|
|
raw_data=raw_data,
|
|
discovery_method="certificate_san_analysis"
|
|
)
|
|
|
|
except json.JSONDecodeError as e:
|
|
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error querying crt.sh for {domain}: {e}")
|
|
|
|
return relationships
|
|
|
|
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
|
|
"""
|
|
Query crt.sh for certificates containing the IP address.
|
|
Note: crt.sh doesn't typically index by IP, so this returns empty results.
|
|
|
|
Args:
|
|
ip: IP address to investigate
|
|
|
|
Returns:
|
|
Empty list (crt.sh doesn't support IP-based certificate queries effectively)
|
|
"""
|
|
# crt.sh doesn't effectively support IP-based certificate queries
|
|
# This would require parsing certificate details for IP SANs, which is complex
|
|
return []
|
|
|
|
def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]:
|
|
"""
|
|
Extract all domains from certificate data.
|
|
|
|
Args:
|
|
cert_data: Certificate data from crt.sh API
|
|
|
|
Returns:
|
|
Set of unique domain names found in the certificate
|
|
"""
|
|
domains = set()
|
|
|
|
# Extract from common name
|
|
common_name = cert_data.get('common_name', '')
|
|
if common_name:
|
|
cleaned_cn = self._clean_domain_name(common_name)
|
|
if cleaned_cn and self._is_valid_domain(cleaned_cn):
|
|
domains.add(cleaned_cn)
|
|
|
|
# Extract from name_value field (contains SANs)
|
|
name_value = cert_data.get('name_value', '')
|
|
if name_value:
|
|
# Split by newlines and clean each domain
|
|
for line in name_value.split('\n'):
|
|
cleaned_domain = self._clean_domain_name(line.strip())
|
|
if cleaned_domain and self._is_valid_domain(cleaned_domain):
|
|
domains.add(cleaned_domain)
|
|
|
|
return domains
|
|
|
|
def _clean_domain_name(self, domain_name: str) -> str:
|
|
"""
|
|
Clean and normalize domain name from certificate data.
|
|
|
|
Args:
|
|
domain_name: Raw domain name from certificate
|
|
|
|
Returns:
|
|
Cleaned domain name or empty string if invalid
|
|
"""
|
|
if not domain_name:
|
|
return ""
|
|
|
|
# Remove common prefixes and clean up
|
|
domain = domain_name.strip().lower()
|
|
|
|
# Remove protocol if present
|
|
if domain.startswith(('http://', 'https://')):
|
|
domain = domain.split('://', 1)[1]
|
|
|
|
# Remove path if present
|
|
if '/' in domain:
|
|
domain = domain.split('/', 1)[0]
|
|
|
|
# Remove port if present
|
|
if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6
|
|
domain = domain.split(':', 1)[0]
|
|
|
|
# Handle wildcard domains
|
|
if domain.startswith('*.'):
|
|
domain = domain[2:]
|
|
|
|
# Remove any remaining invalid characters
|
|
domain = re.sub(r'[^\w\-\.]', '', domain)
|
|
|
|
# Ensure it's not empty and doesn't start/end with dots or hyphens
|
|
if domain and not domain.startswith(('.', '-')) and not domain.endswith(('.', '-')):
|
|
return domain
|
|
|
|
return ""
|
|
|
|
def get_certificate_details(self, certificate_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed information about a specific certificate.
|
|
|
|
Args:
|
|
certificate_id: Certificate ID from crt.sh
|
|
|
|
Returns:
|
|
Dictionary containing certificate details
|
|
"""
|
|
try:
|
|
url = f"{self.base_url}?id={certificate_id}&output=json"
|
|
response = self.make_request(url, target_indicator=f"cert_{certificate_id}")
|
|
|
|
if response and response.status_code == 200:
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error fetching certificate details for {certificate_id}: {e}")
|
|
|
|
return {}
|
|
|
|
def search_certificates_by_serial(self, serial_number: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for certificates by serial number.
|
|
|
|
Args:
|
|
serial_number: Certificate serial number
|
|
|
|
Returns:
|
|
List of matching certificates
|
|
"""
|
|
try:
|
|
url = f"{self.base_url}?serial={quote(serial_number)}&output=json"
|
|
response = self.make_request(url, target_indicator=f"serial_{serial_number}")
|
|
|
|
if response and response.status_code == 200:
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error searching certificates by serial {serial_number}: {e}")
|
|
|
|
return []
|
|
|
|
def get_issuer_certificates(self, issuer_name: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get certificates issued by a specific CA.
|
|
|
|
Args:
|
|
issuer_name: Certificate Authority name
|
|
|
|
Returns:
|
|
List of certificates from the specified issuer
|
|
"""
|
|
try:
|
|
url = f"{self.base_url}?issuer={quote(issuer_name)}&output=json"
|
|
response = self.make_request(url, target_indicator=f"issuer_{issuer_name}")
|
|
|
|
if response and response.status_code == 200:
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
self.logger.logger.error(f"Error fetching certificates for issuer {issuer_name}: {e}")
|
|
|
|
return [] |