new data model refinement

This commit is contained in:
overcuriousity
2025-09-16 21:23:02 +02:00
parent 97aa18f788
commit 733e1da640
6 changed files with 129 additions and 647 deletions

View File

@@ -134,7 +134,7 @@ class CrtShProvider(BaseProvider):
self.logger.logger.info(f"Refreshed and merged cache for {domain}")
else: # "not_found"
# Create new result from processed certs
result = self._process_certificates_to_result(domain, current_processed_certs)
result = self._process_certificates_to_result(domain, raw_certificates)
self.logger.logger.info(f"Created fresh result for {domain} ({result.get_relationship_count()} relationships)")
# Save the result to cache
@@ -272,109 +272,73 @@ class CrtShProvider(BaseProvider):
Process certificates to create ProviderResult with relationships and attributes.
"""
result = ProviderResult()
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh processing cancelled before processing for domain: {domain}")
return result
# Aggregate certificate data by domain
domain_certificates = {}
all_discovered_domains = set()
# Process certificates with cancellation checking
for i, cert_data in enumerate(certificates):
if i % 5 == 0 and self._stop_event and self._stop_event.is_set():
print(f"CrtSh processing cancelled at certificate {i} for domain: {domain}")
break
cert_metadata = self._extract_certificate_metadata(cert_data)
cert_domains = self._extract_domains_from_certificate(cert_data)
all_discovered_domains.update(cert_domains)
for cert_domain in cert_domains:
if not _is_valid_domain(cert_domain):
continue
if cert_domain not in domain_certificates:
domain_certificates[cert_domain] = []
domain_certificates[cert_domain].append(cert_metadata)
for key, value in self._extract_certificate_metadata(cert_data).items():
if value is not None:
result.add_attribute(
target_node=cert_domain,
name=f"cert_{key}",
value=value,
attr_type='certificate_data',
provider=self.name,
confidence=0.9
)
if self._stop_event and self._stop_event.is_set():
print(f"CrtSh query cancelled before relationship creation for domain: {domain}")
return result
# Create relationships from query domain to ALL discovered domains
for i, discovered_domain in enumerate(all_discovered_domains):
if discovered_domain == domain:
continue # Skip self-relationships
continue
if i % 10 == 0 and self._stop_event and self._stop_event.is_set():
print(f"CrtSh relationship creation cancelled for domain: {domain}")
break
if not _is_valid_domain(discovered_domain):
continue
# Get certificates for both domains
query_domain_certs = domain_certificates.get(domain, [])
discovered_domain_certs = domain_certificates.get(discovered_domain, [])
# Find shared certificates
shared_certificates = self._find_shared_certificates(query_domain_certs, discovered_domain_certs)
# Calculate confidence
confidence = self._calculate_domain_relationship_confidence(
domain, discovered_domain, shared_certificates, all_discovered_domains
domain, discovered_domain, [], all_discovered_domains
)
# Create comprehensive raw data for the relationship
relationship_raw_data = {
'relationship_type': 'certificate_discovery',
'shared_certificates': shared_certificates,
'total_shared_certs': len(shared_certificates),
'discovery_context': self._determine_relationship_context(discovered_domain, domain),
'domain_certificates': {
domain: self._summarize_certificates(query_domain_certs),
discovered_domain: self._summarize_certificates(discovered_domain_certs)
}
}
# Add relationship
result.add_relationship(
source_node=domain,
target_node=discovered_domain,
relationship_type='san_certificate',
provider=self.name,
confidence=confidence,
raw_data=relationship_raw_data
raw_data={'relationship_type': 'certificate_discovery'}
)
# Log the relationship discovery
self.log_relationship_discovery(
source_node=domain,
target_node=discovered_domain,
relationship_type='san_certificate',
confidence_score=confidence,
raw_data=relationship_raw_data,
raw_data={'relationship_type': 'certificate_discovery'},
discovery_method="certificate_transparency_analysis"
)
# Add certificate summary as attributes for all domains that have certificates
for cert_domain, cert_list in domain_certificates.items():
if cert_list:
cert_summary = self._summarize_certificates(cert_list)
result.add_attribute(
target_node=cert_domain,
name='certificates',
value=cert_summary,
attr_type='certificate_data',
provider=self.name,
confidence=0.9,
metadata={'total_certificates': len(cert_list)}
)
return result
def _extract_certificate_metadata(self, cert_data: Dict[str, Any]) -> Dict[str, Any]:

View File

@@ -222,110 +222,62 @@ class ShodanProvider(BaseProvider):
"""
result = ProviderResult()
# Extract hostname relationships
hostnames = data.get('hostnames', [])
for hostname in hostnames:
if _is_valid_domain(hostname):
for key, value in data.items():
if key == 'hostnames':
for hostname in value:
if _is_valid_domain(hostname):
result.add_relationship(
source_node=ip,
target_node=hostname,
relationship_type='a_record',
provider=self.name,
confidence=0.8,
raw_data=data
)
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
relationship_type='a_record',
confidence_score=0.8,
raw_data=data,
discovery_method="shodan_host_lookup"
)
elif key == 'asn':
asn_name = f"AS{value[2:]}" if isinstance(value, str) and value.startswith('AS') else f"AS{value}"
result.add_relationship(
source_node=ip,
target_node=hostname,
relationship_type='a_record',
target_node=asn_name,
relationship_type='asn_membership',
provider=self.name,
confidence=0.8,
confidence=0.7,
raw_data=data
)
self.log_relationship_discovery(
source_node=ip,
target_node=hostname,
relationship_type='a_record',
confidence_score=0.8,
target_node=asn_name,
relationship_type='asn_membership',
confidence_score=0.7,
raw_data=data,
discovery_method="shodan_host_lookup"
discovery_method="shodan_asn_lookup"
)
elif key == 'ports':
for port in value:
result.add_attribute(
target_node=ip,
name='open_port',
value=port,
attr_type='network_info',
provider=self.name,
confidence=0.9
)
elif isinstance(value, (str, int, float, bool)) and value is not None:
result.add_attribute(
target_node=ip,
name=f"shodan_{key}",
value=value,
attr_type='shodan_info',
provider=self.name,
confidence=0.9
)
# Extract ASN relationship
asn = data.get('asn')
if asn:
asn_name = f"AS{asn[2:]}" if isinstance(asn, str) and asn.startswith('AS') else f"AS{asn}"
result.add_relationship(
source_node=ip,
target_node=asn_name,
relationship_type='asn_membership',
provider=self.name,
confidence=0.7,
raw_data=data
)
self.log_relationship_discovery(
source_node=ip,
target_node=asn_name,
relationship_type='asn_membership',
confidence_score=0.7,
raw_data=data,
discovery_method="shodan_asn_lookup"
)
# Add comprehensive Shodan host information as attributes
if 'ports' in data:
result.add_attribute(
target_node=ip,
name='ports',
value=data['ports'],
attr_type='network_info',
provider=self.name,
confidence=0.9
)
if 'os' in data and data['os']:
result.add_attribute(
target_node=ip,
name='operating_system',
value=data['os'],
attr_type='system_info',
provider=self.name,
confidence=0.8
)
if 'org' in data:
result.add_attribute(
target_node=ip,
name='organization',
value=data['org'],
attr_type='network_info',
provider=self.name,
confidence=0.8
)
if 'country_name' in data:
result.add_attribute(
target_node=ip,
name='country',
value=data['country_name'],
attr_type='location_info',
provider=self.name,
confidence=0.9
)
if 'city' in data:
result.add_attribute(
target_node=ip,
name='city',
value=data['city'],
attr_type='location_info',
provider=self.name,
confidence=0.8
)
# Store complete Shodan data as a comprehensive attribute
result.add_attribute(
target_node=ip,
name='shodan_host_info',
value=data, # Complete Shodan response for full forensic detail
attr_type='comprehensive_data',
provider=self.name,
confidence=0.9,
metadata={'data_source': 'shodan_api', 'query_type': 'host_lookup'}
)
return result