dnsrecon/src/certificate_checker.py
overcuriousity 0c9cf00a3b progress
2025-09-09 14:54:02 +02:00

257 lines
10 KiB
Python

# File: src/certificate_checker.py
"""Certificate transparency log checker using crt.sh."""
import requests
import json
import time
import logging
from datetime import datetime
from typing import List, Optional, Set
from .data_structures import Certificate
from .config import Config
# Module logger
logger = logging.getLogger(__name__)
class CertificateChecker:
"""Check certificates using crt.sh."""
CRT_SH_URL = "https://crt.sh/"
def __init__(self, config: Config):
self.config = config
self.last_request = 0
self.query_count = 0
logger.info("🔐 Certificate checker initialized")
def _rate_limit(self):
"""Apply rate limiting for crt.sh."""
now = time.time()
time_since_last = now - self.last_request
min_interval = 1.0 / self.config.CRT_SH_RATE_LIMIT
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"⏸️ crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
self.query_count += 1
def get_certificates(self, domain: str) -> List[Certificate]:
"""Get certificates for a domain from crt.sh."""
logger.debug(f"🔐 Getting certificates for domain: {domain}")
certificates = []
# Query for the domain
domain_certs = self._query_crt_sh(domain)
certificates.extend(domain_certs)
# Also query for wildcard certificates
wildcard_certs = self._query_crt_sh(f"%.{domain}")
certificates.extend(wildcard_certs)
# Remove duplicates based on certificate ID
unique_certs = {cert.id: cert for cert in certificates}
final_certs = list(unique_certs.values())
if final_certs:
logger.info(f"📜 Found {len(final_certs)} unique certificates for {domain}")
else:
logger.debug(f"❌ No certificates found for {domain}")
return final_certs
def _query_crt_sh(self, query: str) -> List[Certificate]:
"""Query crt.sh API with retry logic."""
certificates = []
self._rate_limit()
logger.debug(f"📡 Querying crt.sh for: {query}")
max_retries = 3
for attempt in range(max_retries):
try:
params = {
'q': query,
'output': 'json'
}
response = requests.get(
self.CRT_SH_URL,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"📡 crt.sh API response for {query}: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
logger.debug(f"📊 crt.sh returned {len(data)} certificate entries for {query}")
for cert_data in data:
try:
# Parse dates with better error handling
not_before = self._parse_date(cert_data.get('not_before'))
not_after = self._parse_date(cert_data.get('not_after'))
if not_before and not_after:
certificate = Certificate(
id=cert_data.get('id'),
issuer=cert_data.get('issuer_name', ''),
subject=cert_data.get('name_value', ''),
not_before=not_before,
not_after=not_after,
is_wildcard='*.' in cert_data.get('name_value', '')
)
certificates.append(certificate)
logger.debug(f"✅ Parsed certificate ID {certificate.id} for {query}")
else:
logger.debug(f"⚠️ Skipped certificate with invalid dates: {cert_data.get('id')}")
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"⚠️ Error parsing certificate data: {e}")
continue # Skip malformed certificate data
logger.info(f"✅ Successfully processed {len(certificates)} certificates from crt.sh for {query}")
return certificates # Success, exit retry loop
except json.JSONDecodeError as e:
logger.warning(f"❌ Invalid JSON response from crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
return certificates
elif response.status_code == 429:
logger.warning(f"⚠️ crt.sh rate limit exceeded for {query}")
if attempt < max_retries - 1:
time.sleep(5) # Wait longer for rate limits
continue
return certificates
else:
logger.warning(f"⚠️ crt.sh HTTP error for {query}: {response.status_code}")
if attempt < max_retries - 1:
time.sleep(2)
continue
return certificates
except requests.exceptions.Timeout:
logger.warning(f"⏱️ crt.sh query timeout for {query} (attempt {attempt+1}/{max_retries})")
if attempt < max_retries - 1:
time.sleep(2)
continue
except requests.exceptions.RequestException as e:
logger.warning(f"🌐 crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2)
continue
except Exception as e:
logger.error(f"❌ Unexpected error querying crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(2)
continue
# If we get here, all retries failed
logger.warning(f"❌ All {max_retries} attempts failed for crt.sh query: {query}")
return certificates
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string with multiple format support."""
if not date_str:
return None
# Common date formats from crt.sh
date_formats = [
'%Y-%m-%dT%H:%M:%S', # ISO format without timezone
'%Y-%m-%dT%H:%M:%SZ', # ISO format with Z
'%Y-%m-%d %H:%M:%S', # Space separated
'%Y-%m-%dT%H:%M:%S.%f', # With microseconds
'%Y-%m-%dT%H:%M:%S.%fZ', # With microseconds and Z
]
for fmt in date_formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
# Try with timezone info
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
pass
logger.debug(f"⚠️ Could not parse date: {date_str}")
return None
def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]:
"""Extract subdomains from certificate subjects."""
subdomains = set()
logger.debug(f"🌿 Extracting subdomains from {len(certificates)} certificates")
for cert in certificates:
# Parse subject field for domain names
# Certificate subjects can be multi-line with multiple domains
subject_lines = cert.subject.split('\n')
for line in subject_lines:
line = line.strip()
# Skip wildcard domains for recursion (they don't resolve directly)
if line.startswith('*.'):
logger.debug(f"🌿 Skipping wildcard domain: {line}")
continue
if self._is_valid_domain(line):
subdomains.add(line.lower())
logger.debug(f"🌿 Found subdomain from certificate: {line}")
if subdomains:
logger.info(f"🌿 Extracted {len(subdomains)} subdomains from certificates")
else:
logger.debug("❌ No subdomains extracted from certificates")
return subdomains
def _is_valid_domain(self, domain: str) -> bool:
"""Basic domain validation."""
if not domain or '.' not in domain:
return False
# Remove common prefixes
domain = domain.lower().strip()
if domain.startswith('www.'):
domain = domain[4:]
# Basic validation
if len(domain) < 3 or len(domain) > 255:
return False
# Must not be an IP address
try:
import socket
socket.inet_aton(domain)
return False # It's an IPv4 address
except socket.error:
pass
# Check for reasonable domain structure
parts = domain.split('.')
if len(parts) < 2:
return False
# Each part should be reasonable
for part in parts:
if len(part) < 1 or len(part) > 63:
return False
if not part.replace('-', '').replace('_', '').isalnum():
return False
return True