it
This commit is contained in:
parent
89ae06482e
commit
9499e62ccc
@ -452,13 +452,16 @@ class Scanner:
|
|||||||
return eligible
|
return eligible
|
||||||
|
|
||||||
def _already_queried_provider(self, target: str, provider_name: str) -> bool:
|
def _already_queried_provider(self, target: str, provider_name: str) -> bool:
|
||||||
"""Check if we already queried a provider for a target."""
|
"""Check if we already successfully queried a provider for a target."""
|
||||||
if not self.graph.graph.has_node(target):
|
if not self.graph.graph.has_node(target):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
node_data = self.graph.graph.nodes[target]
|
node_data = self.graph.graph.nodes[target]
|
||||||
provider_states = node_data.get('metadata', {}).get('provider_states', {})
|
provider_states = node_data.get('metadata', {}).get('provider_states', {})
|
||||||
return provider_name in provider_states
|
|
||||||
|
# A provider has been successfully queried if a state exists and its status is 'success'
|
||||||
|
provider_state = provider_states.get(provider_name)
|
||||||
|
return provider_state is not None and provider_state.get('status') == 'success'
|
||||||
|
|
||||||
def _query_single_provider_forensic(self, provider, target: str, is_ip: bool, current_depth: int) -> Optional[List]:
|
def _query_single_provider_forensic(self, provider, target: str, is_ip: bool, current_depth: int) -> Optional[List]:
|
||||||
"""Query a single provider with stop signal checking."""
|
"""Query a single provider with stop signal checking."""
|
||||||
|
@ -16,7 +16,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
Provider for querying crt.sh certificate transparency database.
|
Provider for querying crt.sh certificate transparency database.
|
||||||
Now uses session-specific configuration and caching.
|
Now uses session-specific configuration and caching.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, name=None, session_config=None):
|
def __init__(self, name=None, session_config=None):
|
||||||
"""Initialize CrtSh provider with session-specific configuration."""
|
"""Initialize CrtSh provider with session-specific configuration."""
|
||||||
super().__init__(
|
super().__init__(
|
||||||
@ -27,11 +27,11 @@ class CrtShProvider(BaseProvider):
|
|||||||
)
|
)
|
||||||
self.base_url = "https://crt.sh/"
|
self.base_url = "https://crt.sh/"
|
||||||
self._stop_event = None
|
self._stop_event = None
|
||||||
|
|
||||||
def get_name(self) -> str:
|
def get_name(self) -> str:
|
||||||
"""Return the provider name."""
|
"""Return the provider name."""
|
||||||
return "crtsh"
|
return "crtsh"
|
||||||
|
|
||||||
def get_display_name(self) -> str:
|
def get_display_name(self) -> str:
|
||||||
"""Return the provider display name for the UI."""
|
"""Return the provider display name for the UI."""
|
||||||
return "crt.sh"
|
return "crt.sh"
|
||||||
@ -51,7 +51,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
to avoid blocking application startup.
|
to avoid blocking application startup.
|
||||||
"""
|
"""
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _parse_certificate_date(self, date_string: str) -> datetime:
|
def _parse_certificate_date(self, date_string: str) -> datetime:
|
||||||
"""
|
"""
|
||||||
Parse certificate date from crt.sh format.
|
Parse certificate date from crt.sh format.
|
||||||
@ -122,10 +122,10 @@ class CrtShProvider(BaseProvider):
|
|||||||
def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]:
|
def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Extract comprehensive metadata from certificate data.
|
Extract comprehensive metadata from certificate data.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cert_data: Raw certificate data from crt.sh
|
cert_data: Raw certificate data from crt.sh
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Comprehensive certificate metadata dictionary
|
Comprehensive certificate metadata dictionary
|
||||||
"""
|
"""
|
||||||
@ -140,25 +140,25 @@ class CrtShProvider(BaseProvider):
|
|||||||
'entry_timestamp': cert_data.get('entry_timestamp'),
|
'entry_timestamp': cert_data.get('entry_timestamp'),
|
||||||
'source': 'crt.sh'
|
'source': 'crt.sh'
|
||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if metadata['not_before'] and metadata['not_after']:
|
if metadata['not_before'] and metadata['not_after']:
|
||||||
not_before = self._parse_certificate_date(metadata['not_before'])
|
not_before = self._parse_certificate_date(metadata['not_before'])
|
||||||
not_after = self._parse_certificate_date(metadata['not_after'])
|
not_after = self._parse_certificate_date(metadata['not_after'])
|
||||||
|
|
||||||
metadata['validity_period_days'] = (not_after - not_before).days
|
metadata['validity_period_days'] = (not_after - not_before).days
|
||||||
metadata['is_currently_valid'] = self._is_cert_valid(cert_data)
|
metadata['is_currently_valid'] = self._is_cert_valid(cert_data)
|
||||||
metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30
|
metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30
|
||||||
|
|
||||||
# Add human-readable dates
|
# Add human-readable dates
|
||||||
metadata['not_before'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC')
|
metadata['not_before'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||||
metadata['not_after'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC')
|
metadata['not_after'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.logger.debug(f"Error computing certificate metadata: {e}")
|
self.logger.logger.debug(f"Error computing certificate metadata: {e}")
|
||||||
metadata['is_currently_valid'] = False
|
metadata['is_currently_valid'] = False
|
||||||
metadata['expires_soon'] = False
|
metadata['expires_soon'] = False
|
||||||
|
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
||||||
@ -167,32 +167,32 @@ class CrtShProvider(BaseProvider):
|
|||||||
"""
|
"""
|
||||||
if not _is_valid_domain(domain):
|
if not _is_valid_domain(domain):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Check for cancellation before starting
|
# Check for cancellation before starting
|
||||||
if self._stop_event and self._stop_event.is_set():
|
if self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh query cancelled before start for domain: {domain}")
|
print(f"CrtSh query cancelled before start for domain: {domain}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
relationships = []
|
relationships = []
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Query crt.sh for certificates
|
# Query crt.sh for certificates
|
||||||
url = f"{self.base_url}?q={quote(domain)}&output=json"
|
url = f"{self.base_url}?q={quote(domain)}&output=json"
|
||||||
response = self.make_request(url, target_indicator=domain)
|
response = self.make_request(url, target_indicator=domain)
|
||||||
|
|
||||||
if not response or response.status_code != 200:
|
if not response or response.status_code != 200:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Check for cancellation after request
|
# Check for cancellation after request
|
||||||
if self._stop_event and self._stop_event.is_set():
|
if self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh query cancelled after request for domain: {domain}")
|
print(f"CrtSh query cancelled after request for domain: {domain}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
certificates = response.json()
|
certificates = response.json()
|
||||||
|
|
||||||
if not certificates:
|
if not certificates:
|
||||||
return []
|
return []
|
||||||
|
|
||||||
# Check for cancellation before processing
|
# Check for cancellation before processing
|
||||||
if self._stop_event and self._stop_event.is_set():
|
if self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh query cancelled before processing for domain: {domain}")
|
print(f"CrtSh query cancelled before processing for domain: {domain}")
|
||||||
@ -201,36 +201,36 @@ class CrtShProvider(BaseProvider):
|
|||||||
# Aggregate certificate data by domain
|
# Aggregate certificate data by domain
|
||||||
domain_certificates = {}
|
domain_certificates = {}
|
||||||
all_discovered_domains = set()
|
all_discovered_domains = set()
|
||||||
|
|
||||||
# Process certificates with cancellation checking
|
# Process certificates with cancellation checking
|
||||||
for i, cert_data in enumerate(certificates):
|
for i, cert_data in enumerate(certificates):
|
||||||
# Check for cancellation every 5 certificates instead of 10 for faster response
|
# Check for cancellation every 5 certificates instead of 10 for faster response
|
||||||
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
|
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}")
|
print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}")
|
||||||
break
|
break
|
||||||
|
|
||||||
cert_metadata = self._extract_certificate_metadata(cert_data)
|
cert_metadata = self._extract_certificate_metadata(cert_data)
|
||||||
cert_domains = self._extract_domains_from_certificate(cert_data)
|
cert_domains = self._extract_domains_from_certificate(cert_data)
|
||||||
|
|
||||||
# Add all domains from this certificate to our tracking
|
# Add all domains from this certificate to our tracking
|
||||||
for cert_domain in cert_domains:
|
for cert_domain in cert_domains:
|
||||||
# Additional stop check during domain processing
|
# Additional stop check during domain processing
|
||||||
if i % 20 == 0 and self._stop_event and self._stop_event.is_set():
|
if i % 20 == 0 and self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh domain processing cancelled for domain: {domain}")
|
print(f"CrtSh domain processing cancelled for domain: {domain}")
|
||||||
break
|
break
|
||||||
|
|
||||||
if not _is_valid_domain(cert_domain):
|
if not _is_valid_domain(cert_domain):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
all_discovered_domains.add(cert_domain)
|
all_discovered_domains.add(cert_domain)
|
||||||
|
|
||||||
# Initialize domain certificate list if needed
|
# Initialize domain certificate list if needed
|
||||||
if cert_domain not in domain_certificates:
|
if cert_domain not in domain_certificates:
|
||||||
domain_certificates[cert_domain] = []
|
domain_certificates[cert_domain] = []
|
||||||
|
|
||||||
# Add this certificate to the domain's certificate list
|
# Add this certificate to the domain's certificate list
|
||||||
domain_certificates[cert_domain].append(cert_metadata)
|
domain_certificates[cert_domain].append(cert_metadata)
|
||||||
|
|
||||||
# Final cancellation check before creating relationships
|
# Final cancellation check before creating relationships
|
||||||
if self._stop_event and self._stop_event.is_set():
|
if self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh query cancelled before relationship creation for domain: {domain}")
|
print(f"CrtSh query cancelled before relationship creation for domain: {domain}")
|
||||||
@ -240,7 +240,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
for i, discovered_domain in enumerate(all_discovered_domains):
|
for i, discovered_domain in enumerate(all_discovered_domains):
|
||||||
if discovered_domain == domain:
|
if discovered_domain == domain:
|
||||||
continue # Skip self-relationships
|
continue # Skip self-relationships
|
||||||
|
|
||||||
# Check for cancellation every 10 relationships
|
# Check for cancellation every 10 relationships
|
||||||
if i % 10 == 0 and self._stop_event and self._stop_event.is_set():
|
if i % 10 == 0 and self._stop_event and self._stop_event.is_set():
|
||||||
print(f"CrtSh relationship creation cancelled for domain: {domain}")
|
print(f"CrtSh relationship creation cancelled for domain: {domain}")
|
||||||
@ -248,19 +248,19 @@ class CrtShProvider(BaseProvider):
|
|||||||
|
|
||||||
if not _is_valid_domain(discovered_domain):
|
if not _is_valid_domain(discovered_domain):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Get certificates for both domains
|
# Get certificates for both domains
|
||||||
query_domain_certs = domain_certificates.get(domain, [])
|
query_domain_certs = domain_certificates.get(domain, [])
|
||||||
discovered_domain_certs = domain_certificates.get(discovered_domain, [])
|
discovered_domain_certs = domain_certificates.get(discovered_domain, [])
|
||||||
|
|
||||||
# Find shared certificates (for metadata purposes)
|
# Find shared certificates (for metadata purposes)
|
||||||
shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs)
|
shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs)
|
||||||
|
|
||||||
# Calculate confidence based on relationship type and shared certificates
|
# Calculate confidence based on relationship type and shared certificates
|
||||||
confidence = self._calculate_domain_relationship_confidence(
|
confidence = self._calculate_domain_relationship_confidence(
|
||||||
domain, discovered_domain, shared_certificates, all_discovered_domains
|
domain, discovered_domain, shared_certificates, all_discovered_domains
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create comprehensive raw data for the relationship
|
# Create comprehensive raw data for the relationship
|
||||||
relationship_raw_data = {
|
relationship_raw_data = {
|
||||||
'relationship_type': 'certificate_discovery',
|
'relationship_type': 'certificate_discovery',
|
||||||
@ -272,7 +272,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
discovered_domain: self._summarize_certificates(discovered_domain_certs)
|
discovered_domain: self._summarize_certificates(discovered_domain_certs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Create domain -> domain relationship
|
# Create domain -> domain relationship
|
||||||
relationships.append((
|
relationships.append((
|
||||||
domain,
|
domain,
|
||||||
@ -281,7 +281,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
confidence,
|
confidence,
|
||||||
relationship_raw_data
|
relationship_raw_data
|
||||||
))
|
))
|
||||||
|
|
||||||
# Log the relationship discovery
|
# Log the relationship discovery
|
||||||
self.log_relationship_discovery(
|
self.log_relationship_discovery(
|
||||||
source_node=domain,
|
source_node=domain,
|
||||||
@ -294,39 +294,39 @@ class CrtShProvider(BaseProvider):
|
|||||||
|
|
||||||
except json.JSONDecodeError as e:
|
except json.JSONDecodeError as e:
|
||||||
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
|
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
|
||||||
|
|
||||||
return relationships
|
return relationships
|
||||||
|
|
||||||
def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
def _find_shared_certificates(self, certs1: List[Dict[str, Any]], certs2: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||||
"""
|
"""
|
||||||
Find certificates that are shared between two domain certificate lists.
|
Find certificates that are shared between two domain certificate lists.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
certs1: First domain's certificates
|
certs1: First domain's certificates
|
||||||
certs2: Second domain's certificates
|
certs2: Second domain's certificates
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of shared certificate metadata
|
List of shared certificate metadata
|
||||||
"""
|
"""
|
||||||
shared = []
|
shared = []
|
||||||
|
|
||||||
# Create a set of certificate IDs from the first list for quick lookup
|
# Create a set of certificate IDs from the first list for quick lookup
|
||||||
cert1_ids = {cert.get('certificate_id') for cert in certs1 if cert.get('certificate_id')}
|
cert1_ids = {cert.get('certificate_id') for cert in certs1 if cert.get('certificate_id')}
|
||||||
|
|
||||||
# Find certificates in the second list that match
|
# Find certificates in the second list that match
|
||||||
for cert in certs2:
|
for cert in certs2:
|
||||||
if cert.get('certificate_id') in cert1_ids:
|
if cert.get('certificate_id') in cert1_ids:
|
||||||
shared.append(cert)
|
shared.append(cert)
|
||||||
|
|
||||||
return shared
|
return shared
|
||||||
|
|
||||||
def _summarize_certificates(self, certificates: List[Dict[str, Any]]) -> Dict[str, Any]:
|
def _summarize_certificates(self, certificates: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
Create a summary of certificates for a domain.
|
Create a summary of certificates for a domain.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
certificates: List of certificate metadata
|
certificates: List of certificate metadata
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Summary dictionary with aggregate statistics
|
Summary dictionary with aggregate statistics
|
||||||
"""
|
"""
|
||||||
@ -340,18 +340,18 @@ class CrtShProvider(BaseProvider):
|
|||||||
'latest_certificate': None,
|
'latest_certificate': None,
|
||||||
'has_valid_cert': False
|
'has_valid_cert': False
|
||||||
}
|
}
|
||||||
|
|
||||||
valid_count = sum(1 for cert in certificates if cert.get('is_currently_valid'))
|
valid_count = sum(1 for cert in certificates if cert.get('is_currently_valid'))
|
||||||
expired_count = len(certificates) - valid_count
|
expired_count = len(certificates) - valid_count
|
||||||
expires_soon_count = sum(1 for cert in certificates if cert.get('expires_soon'))
|
expires_soon_count = sum(1 for cert in certificates if cert.get('expires_soon'))
|
||||||
|
|
||||||
# Get unique issuers
|
# Get unique issuers
|
||||||
unique_issuers = list(set(cert.get('issuer_name') for cert in certificates if cert.get('issuer_name')))
|
unique_issuers = list(set(cert.get('issuer_name') for cert in certificates if cert.get('issuer_name')))
|
||||||
|
|
||||||
# Find the most recent certificate
|
# Find the most recent certificate
|
||||||
latest_cert = None
|
latest_cert = None
|
||||||
latest_date = None
|
latest_date = None
|
||||||
|
|
||||||
for cert in certificates:
|
for cert in certificates:
|
||||||
try:
|
try:
|
||||||
if cert.get('not_before'):
|
if cert.get('not_before'):
|
||||||
@ -361,7 +361,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
latest_cert = cert
|
latest_cert = cert
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'total_certificates': len(certificates),
|
'total_certificates': len(certificates),
|
||||||
'valid_certificates': valid_count,
|
'valid_certificates': valid_count,
|
||||||
@ -373,26 +373,26 @@ class CrtShProvider(BaseProvider):
|
|||||||
'certificate_details': certificates # Full details for forensic analysis
|
'certificate_details': certificates # Full details for forensic analysis
|
||||||
}
|
}
|
||||||
|
|
||||||
def _calculate_domain_relationship_confidence(self, domain1: str, domain2: str,
|
def _calculate_domain_relationship_confidence(self, domain1: str, domain2: str,
|
||||||
shared_certificates: List[Dict[str, Any]],
|
shared_certificates: List[Dict[str, Any]],
|
||||||
all_discovered_domains: Set[str]) -> float:
|
all_discovered_domains: Set[str]) -> float:
|
||||||
"""
|
"""
|
||||||
Calculate confidence score for domain relationship based on various factors.
|
Calculate confidence score for domain relationship based on various factors.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
domain1: Source domain (query domain)
|
domain1: Source domain (query domain)
|
||||||
domain2: Target domain (discovered domain)
|
domain2: Target domain (discovered domain)
|
||||||
shared_certificates: List of shared certificate metadata
|
shared_certificates: List of shared certificate metadata
|
||||||
all_discovered_domains: All domains discovered in this query
|
all_discovered_domains: All domains discovered in this query
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Confidence score between 0.0 and 1.0
|
Confidence score between 0.0 and 1.0
|
||||||
"""
|
"""
|
||||||
base_confidence = 0.9
|
base_confidence = 0.9
|
||||||
|
|
||||||
# Adjust confidence based on domain relationship context
|
# Adjust confidence based on domain relationship context
|
||||||
relationship_context = self._determine_relationship_context(domain2, domain1)
|
relationship_context = self._determine_relationship_context(domain2, domain1)
|
||||||
|
|
||||||
if relationship_context == 'exact_match':
|
if relationship_context == 'exact_match':
|
||||||
context_bonus = 0.0 # This shouldn't happen, but just in case
|
context_bonus = 0.0 # This shouldn't happen, but just in case
|
||||||
elif relationship_context == 'subdomain':
|
elif relationship_context == 'subdomain':
|
||||||
@ -401,7 +401,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
context_bonus = 0.05 # Medium confidence for parent domains
|
context_bonus = 0.05 # Medium confidence for parent domains
|
||||||
else:
|
else:
|
||||||
context_bonus = 0.0 # Related domains get base confidence
|
context_bonus = 0.0 # Related domains get base confidence
|
||||||
|
|
||||||
# Adjust confidence based on shared certificates
|
# Adjust confidence based on shared certificates
|
||||||
if shared_certificates:
|
if shared_certificates:
|
||||||
shared_count = len(shared_certificates)
|
shared_count = len(shared_certificates)
|
||||||
@ -411,7 +411,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
shared_bonus = 0.05
|
shared_bonus = 0.05
|
||||||
else:
|
else:
|
||||||
shared_bonus = 0.02
|
shared_bonus = 0.02
|
||||||
|
|
||||||
# Additional bonus for valid shared certificates
|
# Additional bonus for valid shared certificates
|
||||||
valid_shared = sum(1 for cert in shared_certificates if cert.get('is_currently_valid'))
|
valid_shared = sum(1 for cert in shared_certificates if cert.get('is_currently_valid'))
|
||||||
if valid_shared > 0:
|
if valid_shared > 0:
|
||||||
@ -422,7 +422,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
# Even without shared certificates, domains found in the same query have some relationship
|
# Even without shared certificates, domains found in the same query have some relationship
|
||||||
shared_bonus = 0.0
|
shared_bonus = 0.0
|
||||||
validity_bonus = 0.0
|
validity_bonus = 0.0
|
||||||
|
|
||||||
# Adjust confidence based on certificate issuer reputation (if shared certificates exist)
|
# Adjust confidence based on certificate issuer reputation (if shared certificates exist)
|
||||||
issuer_bonus = 0.0
|
issuer_bonus = 0.0
|
||||||
if shared_certificates:
|
if shared_certificates:
|
||||||
@ -431,7 +431,7 @@ class CrtShProvider(BaseProvider):
|
|||||||
if any(trusted_ca in issuer for trusted_ca in ['let\'s encrypt', 'digicert', 'sectigo', 'globalsign']):
|
if any(trusted_ca in issuer for trusted_ca in ['let\'s encrypt', 'digicert', 'sectigo', 'globalsign']):
|
||||||
issuer_bonus = max(issuer_bonus, 0.03)
|
issuer_bonus = max(issuer_bonus, 0.03)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Calculate final confidence
|
# Calculate final confidence
|
||||||
final_confidence = base_confidence + context_bonus + shared_bonus + validity_bonus + issuer_bonus
|
final_confidence = base_confidence + context_bonus + shared_bonus + validity_bonus + issuer_bonus
|
||||||
return max(0.1, min(1.0, final_confidence)) # Clamp between 0.1 and 1.0
|
return max(0.1, min(1.0, final_confidence)) # Clamp between 0.1 and 1.0
|
||||||
@ -439,11 +439,11 @@ class CrtShProvider(BaseProvider):
|
|||||||
def _determine_relationship_context(self, cert_domain: str, query_domain: str) -> str:
|
def _determine_relationship_context(self, cert_domain: str, query_domain: str) -> str:
|
||||||
"""
|
"""
|
||||||
Determine the context of the relationship between certificate domain and query domain.
|
Determine the context of the relationship between certificate domain and query domain.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cert_domain: Domain found in certificate
|
cert_domain: Domain found in certificate
|
||||||
query_domain: Original query domain
|
query_domain: Original query domain
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
String describing the relationship context
|
String describing the relationship context
|
||||||
"""
|
"""
|
||||||
@ -455,40 +455,40 @@ class CrtShProvider(BaseProvider):
|
|||||||
return 'parent_domain'
|
return 'parent_domain'
|
||||||
else:
|
else:
|
||||||
return 'related_domain'
|
return 'related_domain'
|
||||||
|
|
||||||
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
||||||
"""
|
"""
|
||||||
Query crt.sh for certificates containing the IP address.
|
Query crt.sh for certificates containing the IP address.
|
||||||
Note: crt.sh doesn't typically index by IP, so this returns empty results.
|
Note: crt.sh doesn't typically index by IP, so this returns empty results.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
ip: IP address to investigate
|
ip: IP address to investigate
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Empty list (crt.sh doesn't support IP-based certificate queries effectively)
|
Empty list (crt.sh doesn't support IP-based certificate queries effectively)
|
||||||
"""
|
"""
|
||||||
# crt.sh doesn't effectively support IP-based certificate queries
|
# crt.sh doesn't effectively support IP-based certificate queries
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]:
|
def _extract_domains_from_certificate(self, cert_data: Dict[str, Any]) -> Set[str]:
|
||||||
"""
|
"""
|
||||||
Extract all domains from certificate data.
|
Extract all domains from certificate data.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
cert_data: Certificate data from crt.sh API
|
cert_data: Certificate data from crt.sh API
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Set of unique domain names found in the certificate
|
Set of unique domain names found in the certificate
|
||||||
"""
|
"""
|
||||||
domains = set()
|
domains = set()
|
||||||
|
|
||||||
# Extract from common name
|
# Extract from common name
|
||||||
common_name = cert_data.get('common_name', '')
|
common_name = cert_data.get('common_name', '')
|
||||||
if common_name:
|
if common_name:
|
||||||
cleaned_cn = self._clean_domain_name(common_name)
|
cleaned_cn = self._clean_domain_name(common_name)
|
||||||
if cleaned_cn:
|
if cleaned_cn:
|
||||||
domains.update(cleaned_cn)
|
domains.update(cleaned_cn)
|
||||||
|
|
||||||
# Extract from name_value field (contains SANs)
|
# Extract from name_value field (contains SANs)
|
||||||
name_value = cert_data.get('name_value', '')
|
name_value = cert_data.get('name_value', '')
|
||||||
if name_value:
|
if name_value:
|
||||||
@ -497,9 +497,9 @@ class CrtShProvider(BaseProvider):
|
|||||||
cleaned_domains = self._clean_domain_name(line.strip())
|
cleaned_domains = self._clean_domain_name(line.strip())
|
||||||
if cleaned_domains:
|
if cleaned_domains:
|
||||||
domains.update(cleaned_domains)
|
domains.update(cleaned_domains)
|
||||||
|
|
||||||
return domains
|
return domains
|
||||||
|
|
||||||
def _clean_domain_name(self, domain_name: str) -> List[str]:
|
def _clean_domain_name(self, domain_name: str) -> List[str]:
|
||||||
"""
|
"""
|
||||||
Clean and normalize domain name from certificate data.
|
Clean and normalize domain name from certificate data.
|
||||||
|
@ -50,12 +50,7 @@ class DNSProvider(BaseProvider):
|
|||||||
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
|
||||||
"""
|
"""
|
||||||
Query DNS records for the domain to discover relationships.
|
Query DNS records for the domain to discover relationships.
|
||||||
|
...
|
||||||
Args:
|
|
||||||
domain: Domain to investigate
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of relationships discovered from DNS analysis
|
|
||||||
"""
|
"""
|
||||||
if not _is_valid_domain(domain):
|
if not _is_valid_domain(domain):
|
||||||
return []
|
return []
|
||||||
@ -66,11 +61,13 @@ class DNSProvider(BaseProvider):
|
|||||||
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
|
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
|
||||||
try:
|
try:
|
||||||
relationships.extend(self._query_record(domain, record_type))
|
relationships.extend(self._query_record(domain, record_type))
|
||||||
|
except resolver.NoAnswer:
|
||||||
|
# This is not an error, just a confirmation that the record doesn't exist.
|
||||||
|
self.logger.logger.debug(f"No {record_type} record found for {domain}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.failed_requests += 1
|
self.failed_requests += 1
|
||||||
self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
|
self.logger.logger.debug(f"{record_type} record query failed for {domain}: {e}")
|
||||||
# Re-raise the exception so the scanner can handle it
|
# Optionally, you might want to re-raise other, more serious exceptions.
|
||||||
raise e
|
|
||||||
|
|
||||||
return relationships
|
return relationships
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user