correlation engine
This commit is contained in:
@@ -40,270 +40,6 @@ class GraphManager:
|
||||
self.graph = nx.DiGraph()
|
||||
self.creation_time = datetime.now(timezone.utc).isoformat()
|
||||
self.last_modified = self.creation_time
|
||||
self.correlation_index = {}
|
||||
# Compile regex for date filtering for efficiency
|
||||
self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}')
|
||||
|
||||
# FIXED: Exclude cert_issuer_name since we already create proper CA relationships
|
||||
self.EXCLUDED_KEYS = [
|
||||
# Certificate metadata that creates noise or has dedicated node types
|
||||
'cert_source', # Always 'crtsh' for crtsh provider
|
||||
'cert_common_name',
|
||||
'cert_validity_period_days', # Numerical, not useful for correlation
|
||||
'cert_issuer_name', # FIXED: Has dedicated CA nodes, don't correlate
|
||||
#'cert_certificate_id', # Unique per certificate
|
||||
#'cert_serial_number', # Unique per certificate
|
||||
'cert_entry_timestamp', # Timestamp, filtered by date regex anyway
|
||||
'cert_not_before', # Date, filtered by date regex anyway
|
||||
'cert_not_after', # Date, filtered by date regex anyway
|
||||
# DNS metadata that creates noise
|
||||
'dns_ttl', # TTL values are not meaningful for correlation
|
||||
# Shodan metadata that might create noise
|
||||
'timestamp', # Generic timestamp fields
|
||||
'last_update', # Generic timestamp fields
|
||||
#'org', # Too generic, causes false correlations
|
||||
#'isp', # Too generic, causes false correlations
|
||||
# Generic noisy attributes
|
||||
'updated_timestamp', # Any timestamp field
|
||||
'discovery_timestamp', # Any timestamp field
|
||||
'query_timestamp', # Any timestamp field
|
||||
]
|
||||
|
||||
def __getstate__(self):
|
||||
"""Prepare GraphManager for pickling, excluding compiled regex."""
|
||||
state = self.__dict__.copy()
|
||||
# Compiled regex patterns are not always picklable
|
||||
if 'date_pattern' in state:
|
||||
del state['date_pattern']
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
"""Restore GraphManager state and recompile regex."""
|
||||
self.__dict__.update(state)
|
||||
self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}')
|
||||
|
||||
def process_correlations_for_node(self, node_id: str):
|
||||
"""
|
||||
UPDATED: Process correlations for a given node with enhanced tracking.
|
||||
Now properly tracks which attribute/provider created each correlation.
|
||||
"""
|
||||
if not self.graph.has_node(node_id):
|
||||
return
|
||||
|
||||
node_attributes = self.graph.nodes[node_id].get('attributes', [])
|
||||
|
||||
# Process each attribute for potential correlations
|
||||
for attr in node_attributes:
|
||||
attr_name = attr.get('name')
|
||||
attr_value = attr.get('value')
|
||||
attr_provider = attr.get('provider', 'unknown')
|
||||
|
||||
# IMPROVED: More comprehensive exclusion logic
|
||||
should_exclude = (
|
||||
# Check against excluded keys (exact match or substring)
|
||||
any(excluded_key in attr_name or attr_name == excluded_key for excluded_key in self.EXCLUDED_KEYS) or
|
||||
# Invalid value types
|
||||
not isinstance(attr_value, (str, int, float, bool)) or
|
||||
attr_value is None or
|
||||
# Boolean values are not useful for correlation
|
||||
isinstance(attr_value, bool) or
|
||||
# String values that are too short or are dates
|
||||
(isinstance(attr_value, str) and (
|
||||
len(attr_value) < 4 or
|
||||
self.date_pattern.match(attr_value) or
|
||||
# Exclude common generic values that create noise
|
||||
attr_value.lower() in ['unknown', 'none', 'null', 'n/a', 'true', 'false', '0', '1']
|
||||
)) or
|
||||
# Numerical values that are likely to be unique identifiers
|
||||
(isinstance(attr_value, (int, float)) and (
|
||||
attr_value == 0 or # Zero values are not meaningful
|
||||
attr_value == 1 or # One values are too common
|
||||
abs(attr_value) > 1000000 # Very large numbers are likely IDs
|
||||
))
|
||||
)
|
||||
|
||||
if should_exclude:
|
||||
continue
|
||||
|
||||
# Initialize correlation tracking for this value
|
||||
if attr_value not in self.correlation_index:
|
||||
self.correlation_index[attr_value] = {
|
||||
'nodes': set(),
|
||||
'sources': [] # Track which provider/attribute combinations contributed
|
||||
}
|
||||
|
||||
# Add this node and source information
|
||||
self.correlation_index[attr_value]['nodes'].add(node_id)
|
||||
|
||||
# Track the source of this correlation value
|
||||
source_info = {
|
||||
'node_id': node_id,
|
||||
'provider': attr_provider,
|
||||
'attribute': attr_name,
|
||||
'path': f"{attr_provider}_{attr_name}"
|
||||
}
|
||||
|
||||
# Add source if not already present (avoid duplicates)
|
||||
existing_sources = [s for s in self.correlation_index[attr_value]['sources']
|
||||
if s['node_id'] == node_id and s['path'] == source_info['path']]
|
||||
if not existing_sources:
|
||||
self.correlation_index[attr_value]['sources'].append(source_info)
|
||||
|
||||
# Create correlation node if we have multiple nodes with this value
|
||||
if len(self.correlation_index[attr_value]['nodes']) > 1:
|
||||
self._create_enhanced_correlation_node_and_edges(attr_value, self.correlation_index[attr_value])
|
||||
|
||||
def _create_enhanced_correlation_node_and_edges(self, value, correlation_data):
|
||||
"""
|
||||
UPDATED: Create correlation node and edges with raw provider data (no formatting).
|
||||
"""
|
||||
correlation_node_id = f"corr_{hash(str(value)) & 0x7FFFFFFF}"
|
||||
nodes = correlation_data['nodes']
|
||||
sources = correlation_data['sources']
|
||||
|
||||
# Create or update correlation node
|
||||
if not self.graph.has_node(correlation_node_id):
|
||||
# Use raw provider/attribute data - no formatting
|
||||
provider_counts = {}
|
||||
for source in sources:
|
||||
# Keep original provider and attribute names
|
||||
key = f"{source['provider']}_{source['attribute']}"
|
||||
provider_counts[key] = provider_counts.get(key, 0) + 1
|
||||
|
||||
# Use the most common provider/attribute as the primary label (raw)
|
||||
primary_source = max(provider_counts.items(), key=lambda x: x[1])[0] if provider_counts else "unknown_correlation"
|
||||
|
||||
metadata = {
|
||||
'value': value,
|
||||
'correlated_nodes': list(nodes),
|
||||
'sources': sources,
|
||||
'primary_source': primary_source,
|
||||
'correlation_count': len(nodes)
|
||||
}
|
||||
|
||||
self.add_node(correlation_node_id, NodeType.CORRELATION_OBJECT, metadata=metadata)
|
||||
#print(f"Created correlation node {correlation_node_id} for value '{value}' with {len(nodes)} nodes")
|
||||
|
||||
# Create edges from each node to the correlation node
|
||||
for source in sources:
|
||||
node_id = source['node_id']
|
||||
provider = source['provider']
|
||||
attribute = source['attribute']
|
||||
|
||||
if self.graph.has_node(node_id) and not self.graph.has_edge(node_id, correlation_node_id):
|
||||
# Format relationship label as "corr_provider_attribute"
|
||||
relationship_label = f"corr_{provider}_{attribute}"
|
||||
|
||||
self.add_edge(
|
||||
source_id=node_id,
|
||||
target_id=correlation_node_id,
|
||||
relationship_type=relationship_label,
|
||||
confidence_score=0.9,
|
||||
source_provider=provider,
|
||||
raw_data={
|
||||
'correlation_value': value,
|
||||
'original_attribute': attribute,
|
||||
'correlation_type': 'attribute_matching'
|
||||
}
|
||||
)
|
||||
|
||||
#print(f"Added correlation edge: {node_id} -> {correlation_node_id} ({relationship_label})")
|
||||
|
||||
|
||||
def _has_direct_edge_bidirectional(self, node_a: str, node_b: str) -> bool:
|
||||
"""
|
||||
Check if there's a direct edge between two nodes in either direction.
|
||||
Returns True if node_aâ†'node_b OR node_bâ†'node_a exists.
|
||||
"""
|
||||
return (self.graph.has_edge(node_a, node_b) or
|
||||
self.graph.has_edge(node_b, node_a))
|
||||
|
||||
def _correlation_value_matches_existing_node(self, correlation_value: str) -> bool:
|
||||
"""
|
||||
Check if correlation value contains any existing node ID as substring.
|
||||
Returns True if match found (correlation node should NOT be created).
|
||||
"""
|
||||
correlation_str = str(correlation_value).lower()
|
||||
|
||||
# Check against all existing nodes
|
||||
for existing_node_id in self.graph.nodes():
|
||||
if existing_node_id.lower() in correlation_str:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _find_correlation_nodes_with_same_pattern(self, node_set: set) -> List[str]:
|
||||
"""
|
||||
Find existing correlation nodes that have the exact same pattern of connected nodes.
|
||||
Returns list of correlation node IDs with matching patterns.
|
||||
"""
|
||||
correlation_nodes = self.get_nodes_by_type(NodeType.CORRELATION_OBJECT)
|
||||
matching_nodes = []
|
||||
|
||||
for corr_node_id in correlation_nodes:
|
||||
# Get all nodes connected to this correlation node
|
||||
connected_nodes = set()
|
||||
|
||||
# Add all predecessors (nodes pointing TO the correlation node)
|
||||
connected_nodes.update(self.graph.predecessors(corr_node_id))
|
||||
|
||||
# Add all successors (nodes pointed TO by the correlation node)
|
||||
connected_nodes.update(self.graph.successors(corr_node_id))
|
||||
|
||||
# Check if the pattern matches exactly
|
||||
if connected_nodes == node_set:
|
||||
matching_nodes.append(corr_node_id)
|
||||
|
||||
return matching_nodes
|
||||
|
||||
def _merge_correlation_values(self, target_node_id: str, new_value: Any, corr_data: Dict) -> None:
|
||||
"""
|
||||
Merge a new correlation value into an existing correlation node.
|
||||
Uses same logic as large entity merging.
|
||||
"""
|
||||
if not self.graph.has_node(target_node_id):
|
||||
return
|
||||
|
||||
target_metadata = self.graph.nodes[target_node_id]['metadata']
|
||||
|
||||
# Get existing values (ensure it's a list)
|
||||
existing_values = target_metadata.get('values', [])
|
||||
if not isinstance(existing_values, list):
|
||||
existing_values = [existing_values]
|
||||
|
||||
# Add new value if not already present
|
||||
if new_value not in existing_values:
|
||||
existing_values.append(new_value)
|
||||
|
||||
# Merge sources
|
||||
existing_sources = target_metadata.get('sources', [])
|
||||
new_sources = corr_data.get('sources', [])
|
||||
|
||||
# Create set of unique sources based on (node_id, path) tuples
|
||||
source_set = set()
|
||||
for source in existing_sources + new_sources:
|
||||
source_tuple = (source['node_id'], source.get('path', ''))
|
||||
source_set.add(source_tuple)
|
||||
|
||||
# Convert back to list of dictionaries
|
||||
merged_sources = [{'node_id': nid, 'path': path} for nid, path in source_set]
|
||||
|
||||
# Update metadata
|
||||
target_metadata.update({
|
||||
'values': existing_values,
|
||||
'sources': merged_sources,
|
||||
'correlated_nodes': list(set(target_metadata.get('correlated_nodes', []) + corr_data.get('nodes', []))),
|
||||
'merge_count': len(existing_values),
|
||||
'last_merge_timestamp': datetime.now(timezone.utc).isoformat()
|
||||
})
|
||||
|
||||
# Update description to reflect merged nature
|
||||
value_count = len(existing_values)
|
||||
node_count = len(target_metadata['correlated_nodes'])
|
||||
self.graph.nodes[target_node_id]['description'] = (
|
||||
f"Correlation container with {value_count} merged values "
|
||||
f"across {node_count} nodes"
|
||||
)
|
||||
|
||||
def add_node(self, node_id: str, node_type: NodeType, attributes: Optional[List[Dict[str, Any]]] = None,
|
||||
description: str = "", metadata: Optional[Dict[str, Any]] = None) -> bool:
|
||||
@@ -415,29 +151,7 @@ class GraphManager:
|
||||
|
||||
# Remove node from the graph (NetworkX handles removing connected edges)
|
||||
self.graph.remove_node(node_id)
|
||||
|
||||
# Clean up the correlation index
|
||||
keys_to_delete = []
|
||||
for value, data in self.correlation_index.items():
|
||||
if isinstance(data, dict) and 'nodes' in data:
|
||||
# Updated correlation structure
|
||||
if node_id in data['nodes']:
|
||||
data['nodes'].discard(node_id)
|
||||
# Remove sources for this node
|
||||
data['sources'] = [s for s in data['sources'] if s['node_id'] != node_id]
|
||||
if not data['nodes']: # If no other nodes are associated, remove it
|
||||
keys_to_delete.append(value)
|
||||
else:
|
||||
# Legacy correlation structure (fallback)
|
||||
if isinstance(data, set) and node_id in data:
|
||||
data.discard(node_id)
|
||||
if not data:
|
||||
keys_to_delete.append(value)
|
||||
|
||||
for key in keys_to_delete:
|
||||
if key in self.correlation_index:
|
||||
del self.correlation_index[key]
|
||||
|
||||
self.last_modified = datetime.now(timezone.utc).isoformat()
|
||||
return True
|
||||
|
||||
@@ -562,8 +276,7 @@ class GraphManager:
|
||||
return stats
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear all nodes, edges, and indices from the graph."""
|
||||
"""Clear all nodes and edges from the graph."""
|
||||
self.graph.clear()
|
||||
self.correlation_index.clear()
|
||||
self.creation_time = datetime.now(timezone.utc).isoformat()
|
||||
self.last_modified = self.creation_time
|
||||
@@ -6,6 +6,7 @@ import os
|
||||
import importlib
|
||||
import redis
|
||||
import time
|
||||
import math
|
||||
import random # Imported for jitter
|
||||
from typing import List, Set, Dict, Any, Tuple, Optional
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
@@ -19,6 +20,7 @@ from core.provider_result import ProviderResult
|
||||
from utils.helpers import _is_valid_ip, _is_valid_domain
|
||||
from utils.export_manager import export_manager
|
||||
from providers.base_provider import BaseProvider
|
||||
from providers.correlation_provider import CorrelationProvider
|
||||
from core.rate_limiter import GlobalRateLimiter
|
||||
|
||||
class ScanStatus:
|
||||
@@ -196,12 +198,15 @@ class Scanner:
|
||||
attribute = getattr(module, attribute_name)
|
||||
if isinstance(attribute, type) and issubclass(attribute, BaseProvider) and attribute is not BaseProvider:
|
||||
provider_class = attribute
|
||||
# FIXED: Pass the 'name' argument during initialization
|
||||
provider = provider_class(name=attribute_name, session_config=self.config)
|
||||
provider_name = provider.get_name()
|
||||
|
||||
if self.config.is_provider_enabled(provider_name):
|
||||
if provider.is_available():
|
||||
provider.set_stop_event(self.stop_event)
|
||||
if isinstance(provider, CorrelationProvider):
|
||||
provider.set_graph_manager(self.graph)
|
||||
self.providers.append(provider)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
@@ -336,12 +341,20 @@ class Scanner:
|
||||
|
||||
def _get_priority(self, provider_name):
|
||||
rate_limit = self.config.get_rate_limit(provider_name)
|
||||
if rate_limit > 90:
|
||||
return 1 # Highest priority
|
||||
elif rate_limit > 50:
|
||||
return 2
|
||||
else:
|
||||
return 3 # Lowest priority
|
||||
|
||||
# Define the logarithmic scale
|
||||
if rate_limit < 10:
|
||||
return 10 # Highest priority number (lowest priority) for very low rate limits
|
||||
|
||||
# Calculate logarithmic value and map to priority levels
|
||||
# Lower rate limits get higher priority numbers (lower priority)
|
||||
log_value = math.log10(rate_limit)
|
||||
priority = 10 - int(log_value * 2) # Scale factor to get more granular levels
|
||||
|
||||
# Ensure priority is within a reasonable range (1-10)
|
||||
priority = max(1, min(10, priority))
|
||||
|
||||
return priority
|
||||
|
||||
def _execute_scan(self, target: str, max_depth: int) -> None:
|
||||
"""
|
||||
@@ -420,7 +433,7 @@ class Scanner:
|
||||
provider = next((p for p in self.providers if p.get_name() == provider_name), None)
|
||||
|
||||
if provider:
|
||||
new_targets, _, success = self._query_single_provider_for_target(provider, target_item, depth)
|
||||
new_targets, _, success = self._process_provider_task(provider, target_item, depth)
|
||||
|
||||
if self._is_stop_requested(): break
|
||||
|
||||
@@ -482,9 +495,10 @@ class Scanner:
|
||||
self.executor.shutdown(wait=False, cancel_futures=True)
|
||||
self.executor = None
|
||||
|
||||
def _query_single_provider_for_target(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
|
||||
def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
|
||||
"""
|
||||
Query a single provider and process the unified ProviderResult.
|
||||
Manages the entire process for a given target and provider.
|
||||
It uses the "worker" function to get the data and then manages the consequences.
|
||||
"""
|
||||
if self._is_stop_requested():
|
||||
return set(), set(), False
|
||||
@@ -500,7 +514,7 @@ class Scanner:
|
||||
provider_successful = True
|
||||
|
||||
try:
|
||||
provider_result = self._query_single_provider_unified(provider, target, is_ip, depth)
|
||||
provider_result = self._execute_provider_query(provider, target, is_ip)
|
||||
|
||||
if provider_result is None:
|
||||
provider_successful = False
|
||||
@@ -512,16 +526,24 @@ class Scanner:
|
||||
large_entity_members.update(discovered)
|
||||
else:
|
||||
new_targets.update(discovered)
|
||||
self.graph.process_correlations_for_node(target)
|
||||
|
||||
# After processing a provider, queue a correlation task for the target
|
||||
correlation_provider = next((p for p in self.providers if isinstance(p, CorrelationProvider)), None)
|
||||
if correlation_provider and not isinstance(provider, CorrelationProvider):
|
||||
priority = self._get_priority(correlation_provider.get_name())
|
||||
self.task_queue.put((time.time(), priority, (correlation_provider.get_name(), target, depth)))
|
||||
# FIXED: Increment total tasks when a correlation task is enqueued
|
||||
self.total_tasks_ever_enqueued += 1
|
||||
|
||||
except Exception as e:
|
||||
provider_successful = False
|
||||
self._log_provider_error(target, provider.get_name(), str(e))
|
||||
|
||||
return new_targets, large_entity_members, provider_successful
|
||||
|
||||
def _query_single_provider_unified(self, provider: BaseProvider, target: str, is_ip: bool, current_depth: int) -> Optional[ProviderResult]:
|
||||
def _execute_provider_query(self, provider: BaseProvider, target: str, is_ip: bool) -> Optional[ProviderResult]:
|
||||
"""
|
||||
Query a single provider with stop signal checking.
|
||||
The "worker" function that directly communicates with the provider to fetch data.
|
||||
"""
|
||||
provider_name = provider.get_name()
|
||||
start_time = datetime.now(timezone.utc)
|
||||
@@ -572,16 +594,15 @@ class Scanner:
|
||||
}
|
||||
attributes_by_node[attribute.target_node].append(attr_dict)
|
||||
|
||||
# Add attributes to existing nodes (important for ISP nodes to get ASN attributes)
|
||||
# FIXED: Add attributes to existing nodes AND create new nodes (like correlation nodes)
|
||||
for node_id, node_attributes_list in attributes_by_node.items():
|
||||
if self.graph.graph.has_node(node_id):
|
||||
# Node already exists, just add attributes
|
||||
if _is_valid_ip(node_id):
|
||||
node_type = NodeType.IP
|
||||
else:
|
||||
node_type = NodeType.DOMAIN
|
||||
|
||||
self.graph.add_node(node_id, node_type, attributes=node_attributes_list)
|
||||
if provider_name == 'correlation' and not self.graph.graph.has_node(node_id):
|
||||
node_type = NodeType.CORRELATION_OBJECT
|
||||
elif _is_valid_ip(node_id):
|
||||
node_type = NodeType.IP
|
||||
else:
|
||||
node_type = NodeType.DOMAIN
|
||||
self.graph.add_node(node_id, node_type, attributes=node_attributes_list)
|
||||
|
||||
# Check if this should be a large entity
|
||||
if provider_result.get_relationship_count() > self.config.large_entity_threshold:
|
||||
@@ -604,6 +625,8 @@ class Scanner:
|
||||
target_type = NodeType.ISP # ISP node for Shodan organization data
|
||||
elif provider_name == 'crtsh' and relationship.relationship_type == 'crtsh_cert_issuer':
|
||||
target_type = NodeType.CA # CA node for certificate issuers
|
||||
elif provider_name == 'correlation':
|
||||
target_type = NodeType.CORRELATION_OBJECT
|
||||
elif _is_valid_ip(target_node):
|
||||
target_type = NodeType.IP
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user