dnsrecon/providers/crtsh_provider.py
overcuriousity a0caedcb1f upgrades
2025-09-10 16:26:44 +02:00

284 lines
10 KiB
Python

"""
Certificate Transparency provider using crt.sh.
Discovers domain relationships through certificate SAN analysis.
"""
import json
import re
from typing import List, Dict, Any, Tuple, Set
from urllib.parse import quote
from datetime import datetime, timezone
from .base_provider import BaseProvider
from core.graph_manager import RelationshipType
class CrtShProvider(BaseProvider):
"""
Provider for querying crt.sh certificate transparency database.
Discovers domain relationships through certificate Subject Alternative Names (SANs).
"""
def __init__(self):
"""Initialize CrtSh provider with appropriate rate limiting."""
super().__init__(
name="crtsh",
rate_limit=60, # Be respectful to the free service
timeout=30
)
self.base_url = "https://crt.sh/"
def get_name(self) -> str:
"""Return the provider name."""
return "crtsh"
def is_available(self) -> bool:
"""
Check if the provider is configured to be used.
This method is intentionally simple and does not perform a network request
to avoid blocking application startup.
"""
return True
def _is_cert_valid(self, cert_data: Dict[str, Any]) -> bool:
"""Check if a certificate is currently valid."""
try:
not_after_str = cert_data.get('not_after')
if not_after_str:
# Append 'Z' to indicate UTC if it's not present
if not not_after_str.endswith('Z'):
not_after_str += 'Z'
not_after_date = datetime.fromisoformat(not_after_str.replace('Z', '+00:00'))
return not_after_date > datetime.now(timezone.utc)
except Exception:
return False
return False
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query crt.sh for certificates containing the domain.
Args:
domain: Domain to investigate
Returns:
List of relationships discovered from certificate analysis
"""
if not self._is_valid_domain(domain):
return []
relationships = []
try:
# Query crt.sh for certificates
url = f"{self.base_url}?q={quote(domain)}&output=json"
response = self.make_request(url, target_indicator=domain)
if not response or response.status_code != 200:
return []
certificates = response.json()
if not certificates:
return []
# Process certificates to extract relationships
discovered_subdomains = {}
for cert_data in certificates:
cert_domains = self._extract_domains_from_certificate(cert_data)
is_valid = self._is_cert_valid(cert_data)
for subdomain in cert_domains:
if self._is_valid_domain(subdomain) and subdomain != domain:
if subdomain not in discovered_subdomains:
discovered_subdomains[subdomain] = {'has_valid_cert': False, 'issuers': set()}
if is_valid:
discovered_subdomains[subdomain]['has_valid_cert'] = True
issuer = cert_data.get('issuer_name')
if issuer:
discovered_subdomains[subdomain]['issuers'].add(issuer)
# Create relationships from the discovered subdomains
for subdomain, data in discovered_subdomains.items():
raw_data = {
'has_valid_cert': data['has_valid_cert'],
'issuers': list(data['issuers']),
'source': 'crt.sh'
}
relationships.append((
domain,
subdomain,
RelationshipType.SAN_CERTIFICATE,
RelationshipType.SAN_CERTIFICATE.default_confidence,
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=subdomain,
relationship_type=RelationshipType.SAN_CERTIFICATE,
confidence_score=RelationshipType.SAN_CERTIFICATE.default_confidence,
raw_data=raw_data,
discovery_method="certificate_san_analysis"
)
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
except Exception as e:
self.logger.logger.error(f"Error querying crt.sh for {domain}: {e}")
return relationships
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
"""
Query crt.sh for certificates containing the IP address.
Note: crt.sh doesn't typically index by IP, so this returns empty results.
Args:
ip: IP address to investigate
Returns:
Empty list (crt.sh doesn't support IP-based certificate queries effectively)
"""
# crt.sh doesn't effectively support IP-based certificate queries
# This would require parsing certificate details for IP SANs, which is complex
return []
def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]:
"""
Extract all domains from certificate data.
Args:
cert_data: Certificate data from crt.sh API
Returns:
Set of unique domain names found in the certificate
"""
domains = set()
# Extract from common name
common_name = cert_data.get('common_name', '')
if common_name:
cleaned_cn = self._clean_domain_name(common_name)
if cleaned_cn and self._is_valid_domain(cleaned_cn):
domains.add(cleaned_cn)
# Extract from name_value field (contains SANs)
name_value = cert_data.get('name_value', '')
if name_value:
# Split by newlines and clean each domain
for line in name_value.split('\n'):
cleaned_domain = self._clean_domain_name(line.strip())
if cleaned_domain and self._is_valid_domain(cleaned_domain):
domains.add(cleaned_domain)
return domains
def _clean_domain_name(self, domain_name: str) -> str:
"""
Clean and normalize domain name from certificate data.
Args:
domain_name: Raw domain name from certificate
Returns:
Cleaned domain name or empty string if invalid
"""
if not domain_name:
return ""
# Remove common prefixes and clean up
domain = domain_name.strip().lower()
# Remove protocol if present
if domain.startswith(('http://', 'https://')):
domain = domain.split('://', 1)[1]
# Remove path if present
if '/' in domain:
domain = domain.split('/', 1)[0]
# Remove port if present
if ':' in domain and not domain.count(':') > 1: # Avoid breaking IPv6
domain = domain.split(':', 1)[0]
# Handle wildcard domains
if domain.startswith('*.'):
domain = domain[2:]
# Remove any remaining invalid characters
domain = re.sub(r'[^\w\-\.]', '', domain)
# Ensure it's not empty and doesn't start/end with dots or hyphens
if domain and not domain.startswith(('.', '-')) and not domain.endswith(('.', '-')):
return domain
return ""
def get_certificate_details(self, certificate_id: str) -> Dict[str, Any]:
"""
Get detailed information about a specific certificate.
Args:
certificate_id: Certificate ID from crt.sh
Returns:
Dictionary containing certificate details
"""
try:
url = f"{self.base_url}?id={certificate_id}&output=json"
response = self.make_request(url, target_indicator=f"cert_{certificate_id}")
if response and response.status_code == 200:
return response.json()
except Exception as e:
self.logger.logger.error(f"Error fetching certificate details for {certificate_id}: {e}")
return {}
def search_certificates_by_serial(self, serial_number: str) -> List[Dict[str, Any]]:
"""
Search for certificates by serial number.
Args:
serial_number: Certificate serial number
Returns:
List of matching certificates
"""
try:
url = f"{self.base_url}?serial={quote(serial_number)}&output=json"
response = self.make_request(url, target_indicator=f"serial_{serial_number}")
if response and response.status_code == 200:
return response.json()
except Exception as e:
self.logger.logger.error(f"Error searching certificates by serial {serial_number}: {e}")
return []
def get_issuer_certificates(self, issuer_name: str) -> List[Dict[str, Any]]:
"""
Get certificates issued by a specific CA.
Args:
issuer_name: Certificate Authority name
Returns:
List of certificates from the specified issuer
"""
try:
url = f"{self.base_url}?issuer={quote(issuer_name)}&output=json"
response = self.make_request(url, target_indicator=f"issuer_{issuer_name}")
if response and response.status_code == 200:
return response.json()
except Exception as e:
self.logger.logger.error(f"Error fetching certificates for issuer {issuer_name}: {e}")
return []