This commit is contained in:
overcuriousity
2025-09-13 23:45:36 +02:00
parent 41d556e2ce
commit b7a57f1552
11 changed files with 125 additions and 150 deletions

View File

@@ -9,7 +9,6 @@ from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
from core.logger import get_forensic_logger
from core.graph_manager import RelationshipType
class RateLimiter:
@@ -147,7 +146,7 @@ class BaseProvider(ABC):
pass
@abstractmethod
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query the provider for information about a domain.
@@ -160,7 +159,7 @@ class BaseProvider(ABC):
pass
@abstractmethod
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query the provider for information about an IP address.
@@ -419,7 +418,7 @@ class BaseProvider(ABC):
return False
def log_relationship_discovery(self, source_node: str, target_node: str,
relationship_type: RelationshipType,
relationship_type: str,
confidence_score: float,
raw_data: Dict[str, Any],
discovery_method: str) -> None:
@@ -439,7 +438,7 @@ class BaseProvider(ABC):
self.logger.log_relationship_discovery(
source_node=source_node,
target_node=target_node,
relationship_type=relationship_type.relationship_name,
relationship_type=relationship_type,
confidence_score=confidence_score,
provider=self.name,
raw_data=raw_data,

View File

@@ -9,10 +9,10 @@ import re
from typing import List, Dict, Any, Tuple, Set
from urllib.parse import quote
from datetime import datetime, timezone
import requests
from .base_provider import BaseProvider
from utils.helpers import _is_valid_domain
from core.graph_manager import RelationshipType
class CrtShProvider(BaseProvider):
@@ -145,7 +145,6 @@ class CrtShProvider(BaseProvider):
'source': 'crt.sh'
}
# Add computed fields
try:
if metadata['not_before'] and metadata['not_after']:
not_before = self._parse_certificate_date(metadata['not_before'])
@@ -166,10 +165,9 @@ class CrtShProvider(BaseProvider):
return metadata
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query crt.sh for certificates containing the domain.
Enhanced with more frequent stop signal checking for reliable termination.
"""
if not _is_valid_domain(domain):
return []
@@ -184,7 +182,7 @@ class CrtShProvider(BaseProvider):
try:
# Query crt.sh for certificates
url = f"{self.base_url}?q={quote(domain)}&output=json"
response = self.make_request(url, target_indicator=domain, max_retries=1) # Reduce retries for faster cancellation
response = self.make_request(url, target_indicator=domain, max_retries=3)
if not response or response.status_code != 200:
return []
@@ -208,7 +206,7 @@ class CrtShProvider(BaseProvider):
domain_certificates = {}
all_discovered_domains = set()
# Process certificates with enhanced cancellation checking
# Process certificates with cancellation checking
for i, cert_data in enumerate(certificates):
# Check for cancellation every 5 certificates instead of 10 for faster response
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
@@ -283,7 +281,7 @@ class CrtShProvider(BaseProvider):
relationships.append((
domain,
discovered_domain,
RelationshipType.SAN_CERTIFICATE,
'san_certificate',
confidence,
relationship_raw_data
))
@@ -292,7 +290,7 @@ class CrtShProvider(BaseProvider):
self.log_relationship_discovery(
source_node=domain,
target_node=discovered_domain,
relationship_type=RelationshipType.SAN_CERTIFICATE,
relationship_type='san_certificate',
confidence_score=confidence,
raw_data=relationship_raw_data,
discovery_method="certificate_transparency_analysis"
@@ -300,6 +298,9 @@ class CrtShProvider(BaseProvider):
except json.JSONDecodeError as e:
self.logger.logger.error(f"Failed to parse JSON response from crt.sh: {e}")
except requests.exceptions.RequestException as e:
self.logger.logger.error(f"HTTP request to crt.sh failed: {e}")
return relationships
@@ -394,7 +395,7 @@ class CrtShProvider(BaseProvider):
Returns:
Confidence score between 0.0 and 1.0
"""
base_confidence = RelationshipType.SAN_CERTIFICATE.default_confidence
base_confidence = 0.9
# Adjust confidence based on domain relationship context
relationship_context = self._determine_relationship_context(domain2, domain1)
@@ -462,7 +463,7 @@ class CrtShProvider(BaseProvider):
else:
return 'related_domain'
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query crt.sh for certificates containing the IP address.
Note: crt.sh doesn't typically index by IP, so this returns empty results.

View File

@@ -5,7 +5,6 @@ import dns.reversename
from typing import List, Dict, Any, Tuple
from .base_provider import BaseProvider
from utils.helpers import _is_valid_ip, _is_valid_domain
from core.graph_manager import RelationshipType
class DNSProvider(BaseProvider):
@@ -49,7 +48,7 @@ class DNSProvider(BaseProvider):
"""DNS is always available - no API key required."""
return True
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query DNS records for the domain to discover relationships.
@@ -70,7 +69,7 @@ class DNSProvider(BaseProvider):
return relationships
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query reverse DNS for the IP address.
@@ -106,16 +105,16 @@ class DNSProvider(BaseProvider):
relationships.append((
ip,
hostname,
RelationshipType.PTR_RECORD,
RelationshipType.PTR_RECORD.default_confidence,
'ptr_record',
0.8,
raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
relationship_type=RelationshipType.PTR_RECORD,
confidence_score=RelationshipType.PTR_RECORD.default_confidence,
relationship_type='ptr_record',
confidence_score=0.8,
raw_data=raw_data,
discovery_method="reverse_dns_lookup"
)
@@ -126,7 +125,7 @@ class DNSProvider(BaseProvider):
return relationships
def _query_record(self, domain: str, record_type: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def _query_record(self, domain: str, record_type: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query a specific type of DNS record for the domain.
"""
@@ -147,7 +146,8 @@ class DNSProvider(BaseProvider):
elif record_type == 'SOA':
target = str(record.mname).rstrip('.')
elif record_type in ['TXT']:
target = b' '.join(record.strings).decode('utf-8', 'ignore')
# TXT records are treated as metadata, not relationships.
continue
elif record_type == 'SRV':
target = str(record.target).rstrip('.')
elif record_type == 'CAA':
@@ -155,7 +155,6 @@ class DNSProvider(BaseProvider):
else:
target = str(record)
if target:
raw_data = {
'query_type': record_type,
@@ -163,32 +162,25 @@ class DNSProvider(BaseProvider):
'value': target,
'ttl': response.ttl
}
try:
relationship_type_enum_name = f"{record_type}_RECORD"
# Handle TXT records as metadata, not relationships
if record_type == 'TXT':
relationship_type_enum = RelationshipType.A_RECORD # Dummy value, won't be used
else:
relationship_type_enum = getattr(RelationshipType, relationship_type_enum_name)
relationship_type = f"{record_type.lower()}_record"
confidence = 0.8 # Default confidence for DNS records
relationships.append((
domain,
target,
relationship_type_enum,
relationship_type_enum.default_confidence,
raw_data
))
relationships.append((
domain,
target,
relationship_type,
confidence,
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=target,
relationship_type=relationship_type_enum,
confidence_score=relationship_type_enum.default_confidence,
raw_data=raw_data,
discovery_method=f"dns_{record_type.lower()}_record"
)
except AttributeError:
self.logger.logger.error(f"Unsupported record type '{record_type}' encountered for domain {domain}")
self.log_relationship_discovery(
source_node=domain,
target_node=target,
relationship_type=relationship_type,
confidence_score=confidence,
raw_data=raw_data,
discovery_method=f"dns_{record_type.lower()}_record"
)
except Exception as e:
self.failed_requests += 1

View File

@@ -7,7 +7,6 @@ import json
from typing import List, Dict, Any, Tuple
from .base_provider import BaseProvider
from utils.helpers import _is_valid_ip, _is_valid_domain
from core.graph_manager import RelationshipType
class ShodanProvider(BaseProvider):
@@ -47,7 +46,7 @@ class ShodanProvider(BaseProvider):
"""Return a dictionary indicating if the provider can query domains and/or IPs."""
return {'domains': True, 'ips': True}
def query_domain(self, domain: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_domain(self, domain: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query Shodan for information about a domain.
Uses Shodan's hostname search to find associated IPs.
@@ -103,16 +102,16 @@ class ShodanProvider(BaseProvider):
relationships.append((
domain,
ip_address,
RelationshipType.A_RECORD, # Domain resolves to IP
RelationshipType.A_RECORD.default_confidence,
'a_record', # Domain resolves to IP
0.8,
raw_data
))
self.log_relationship_discovery(
source_node=domain,
target_node=ip_address,
relationship_type=RelationshipType.A_RECORD,
confidence_score=RelationshipType.A_RECORD.default_confidence,
relationship_type='a_record',
confidence_score=0.8,
raw_data=raw_data,
discovery_method="shodan_hostname_search"
)
@@ -129,7 +128,7 @@ class ShodanProvider(BaseProvider):
relationships.append((
domain,
hostname,
RelationshipType.PASSIVE_DNS, # Shared hosting relationship
'passive_dns', # Shared hosting relationship
0.6, # Lower confidence for shared hosting
hostname_raw_data
))
@@ -137,7 +136,7 @@ class ShodanProvider(BaseProvider):
self.log_relationship_discovery(
source_node=domain,
target_node=hostname,
relationship_type=RelationshipType.PASSIVE_DNS,
relationship_type='passive_dns',
confidence_score=0.6,
raw_data=hostname_raw_data,
discovery_method="shodan_shared_hosting"
@@ -148,7 +147,7 @@ class ShodanProvider(BaseProvider):
return relationships
def query_ip(self, ip: str) -> List[Tuple[str, str, RelationshipType, float, Dict[str, Any]]]:
def query_ip(self, ip: str) -> List[Tuple[str, str, str, float, Dict[str, Any]]]:
"""
Query Shodan for information about an IP address.
@@ -195,16 +194,16 @@ class ShodanProvider(BaseProvider):
relationships.append((
ip,
hostname,
RelationshipType.A_RECORD, # IP resolves to hostname
RelationshipType.A_RECORD.default_confidence,
'a_record', # IP resolves to hostname
0.8,
raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
relationship_type=RelationshipType.A_RECORD,
confidence_score=RelationshipType.A_RECORD.default_confidence,
relationship_type='a_record',
confidence_score=0.8,
raw_data=raw_data,
discovery_method="shodan_host_lookup"
)
@@ -230,16 +229,16 @@ class ShodanProvider(BaseProvider):
relationships.append((
ip,
asn_name,
RelationshipType.ASN_MEMBERSHIP,
RelationshipType.ASN_MEMBERSHIP.default_confidence,
'asn_membership',
0.7,
asn_raw_data
))
self.log_relationship_discovery(
source_node=ip,
target_node=asn_name,
relationship_type=RelationshipType.ASN_MEMBERSHIP,
confidence_score=RelationshipType.ASN_MEMBERSHIP.default_confidence,
relationship_type='asn_membership',
confidence_score=0.7,
raw_data=asn_raw_data,
discovery_method="shodan_asn_lookup"
)