dnsrecon/src/certificate_checker.py
overcuriousity cee620f5f6 progress
2025-09-09 20:47:01 +02:00

325 lines
13 KiB
Python

# File: src/certificate_checker.py
"""Certificate transparency log checker using crt.sh with minimal query caching."""
import requests
import json
import time
import logging
import socket
from datetime import datetime
from typing import List, Optional, Set
from .data_structures import Certificate
from .config import Config
# Module logger
logger = logging.getLogger(__name__)
class CertificateChecker:
"""Check certificates using crt.sh with simple query caching to prevent duplicate HTTP requests."""
CRT_SH_URL = "https://crt.sh/"
def __init__(self, config: Config):
self.config = config
self.last_request = 0
self.query_count = 0
self.connection_failures = 0
self.max_connection_failures = 3
# Simple HTTP request cache to avoid duplicate queries
self._http_cache = {} # query_string -> List[Certificate]
logger.info("Certificate checker initialized with HTTP request caching")
self._test_connectivity()
def _test_connectivity(self):
"""Test if we can reach crt.sh."""
try:
logger.info("Testing connectivity to crt.sh...")
try:
socket.gethostbyname('crt.sh')
logger.debug("DNS resolution for crt.sh successful")
except socket.gaierror as e:
logger.warning(f"DNS resolution failed for crt.sh: {e}")
return False
response = requests.get(
self.CRT_SH_URL,
params={'q': 'example.com', 'output': 'json'},
timeout=10,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
if response.status_code in [200, 404]:
logger.info("crt.sh connectivity test successful")
return True
else:
logger.warning(f"crt.sh returned status {response.status_code}")
return False
except requests.exceptions.ConnectionError as e:
logger.warning(f"Cannot reach crt.sh: {e}")
return False
except requests.exceptions.Timeout:
logger.warning("crt.sh connectivity test timed out")
return False
except Exception as e:
logger.warning(f"Unexpected error testing crt.sh connectivity: {e}")
return False
def _rate_limit(self):
"""Apply rate limiting for crt.sh."""
now = time.time()
time_since_last = now - self.last_request
min_interval = 1.0 / self.config.CRT_SH_RATE_LIMIT
if time_since_last < min_interval:
sleep_time = min_interval - time_since_last
logger.debug(f"crt.sh rate limiting: sleeping for {sleep_time:.2f}s")
time.sleep(sleep_time)
self.last_request = time.time()
self.query_count += 1
def get_certificates(self, domain: str) -> List[Certificate]:
"""Get certificates for a domain - EXACTLY the same behavior as original, just with HTTP caching."""
logger.debug(f"Getting certificates for domain: {domain}")
if self.connection_failures >= self.max_connection_failures:
logger.warning(f"Skipping certificate lookup for {domain} due to repeated connection failures")
return []
certificates = []
# Query for the domain itself
domain_certs = self._query_crt_sh(domain)
certificates.extend(domain_certs)
# Query for wildcard certificates
wildcard_certs = self._query_crt_sh(f"%.{domain}")
certificates.extend(wildcard_certs)
# Remove duplicates based on certificate ID
unique_certs = {cert.id: cert for cert in certificates}
final_certs = list(unique_certs.values())
if final_certs:
logger.info(f"Found {len(final_certs)} unique certificates for {domain}")
else:
logger.debug(f"No certificates found for {domain}")
return final_certs
def _query_crt_sh(self, query: str) -> List[Certificate]:
"""Query crt.sh API with HTTP caching to avoid duplicate requests."""
# Check HTTP cache first
if query in self._http_cache:
logger.debug(f"Using cached HTTP result for crt.sh query: {query}")
return self._http_cache[query]
# Not cached, make the HTTP request
certificates = self._make_http_request(query)
# Cache the HTTP result
self._http_cache[query] = certificates
return certificates
def _make_http_request(self, query: str) -> List[Certificate]:
"""Make actual HTTP request to crt.sh API with retry logic."""
certificates = []
self._rate_limit()
logger.debug(f"Making HTTP request to crt.sh for: {query}")
max_retries = 2
backoff_delays = [1, 3]
for attempt in range(max_retries):
try:
params = {
'q': query,
'output': 'json'
}
response = requests.get(
self.CRT_SH_URL,
params=params,
timeout=self.config.HTTP_TIMEOUT,
headers={'User-Agent': 'DNS-Recon-Tool/1.0'}
)
logger.debug(f"crt.sh API response for {query}: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
logger.debug(f"crt.sh returned {len(data)} certificate entries for {query}")
for cert_data in data:
try:
not_before = self._parse_date(cert_data.get('not_before'))
not_after = self._parse_date(cert_data.get('not_after'))
if not_before and not_after:
certificate = Certificate(
id=cert_data.get('id'),
issuer=cert_data.get('issuer_name', ''),
subject=cert_data.get('name_value', ''),
not_before=not_before,
not_after=not_after,
is_wildcard='*.' in cert_data.get('name_value', '')
)
certificates.append(certificate)
logger.debug(f"Parsed certificate ID {certificate.id} for {query}")
else:
logger.debug(f"Skipped certificate with invalid dates: {cert_data.get('id')}")
except (ValueError, TypeError, KeyError) as e:
logger.debug(f"Error parsing certificate data: {e}")
continue
self.connection_failures = 0
logger.info(f"Successfully processed {len(certificates)} certificates from crt.sh for {query}")
return certificates
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON response from crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
return certificates
elif response.status_code == 404:
logger.debug(f"No certificates found for {query} (404)")
self.connection_failures = 0
return certificates
elif response.status_code == 429:
logger.warning(f"crt.sh rate limit exceeded for {query}")
if attempt < max_retries - 1:
time.sleep(5)
continue
return certificates
else:
logger.warning(f"crt.sh HTTP error for {query}: {response.status_code}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
return certificates
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
error_type = "Connection Error" if isinstance(e, requests.exceptions.ConnectionError) else "Timeout"
logger.warning(f"crt.sh {error_type} for {query} (attempt {attempt+1}/{max_retries}): {e}")
if isinstance(e, requests.exceptions.ConnectionError):
self.connection_failures += 1
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
except requests.exceptions.RequestException as e:
logger.warning(f"crt.sh network error for {query} (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
except Exception as e:
logger.error(f"Unexpected error querying crt.sh for {query}: {e}")
if attempt < max_retries - 1:
time.sleep(backoff_delays[attempt])
continue
logger.warning(f"All {max_retries} attempts failed for crt.sh query: {query}")
return certificates
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string with multiple format support."""
if not date_str:
return None
date_formats = [
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%dT%H:%M:%SZ',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S.%fZ',
]
for fmt in date_formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
try:
return datetime.fromisoformat(date_str.replace('Z', '+00:00'))
except ValueError:
pass
logger.debug(f"Could not parse date: {date_str}")
return None
def extract_subdomains_from_certificates(self, certificates: List[Certificate]) -> Set[str]:
"""Extract subdomains from certificate subjects - EXACTLY the same as original."""
subdomains = set()
logger.debug(f"Extracting subdomains from {len(certificates)} certificates")
for cert in certificates:
# Parse subject field for domain names
subject_lines = cert.subject.split('\n')
for line in subject_lines:
line = line.strip()
# Skip wildcard domains for recursion (they don't resolve directly)
if line.startswith('*.'):
logger.debug(f"Skipping wildcard domain: {line}")
continue
if self._is_valid_domain(line):
subdomains.add(line.lower())
logger.debug(f"Found subdomain from certificate: {line}")
if subdomains:
logger.info(f"Extracted {len(subdomains)} subdomains from certificates")
else:
logger.debug("No subdomains extracted from certificates")
return subdomains
def _is_valid_domain(self, domain: str) -> bool:
"""Basic domain validation - EXACTLY the same as original."""
if not domain or '.' not in domain:
return False
domain = domain.lower().strip()
if domain.startswith('www.'):
domain = domain[4:]
if len(domain) < 3 or len(domain) > 255:
return False
# Must not be an IP address
try:
socket.inet_aton(domain)
return False
except socket.error:
pass
# Check for reasonable domain structure
parts = domain.split('.')
if len(parts) < 2:
return False
for part in parts:
if len(part) < 1 or len(part) > 63:
return False
if not part.replace('-', '').replace('_', '').isalnum():
return False
return True