improving the display
This commit is contained in:
@@ -56,15 +56,22 @@ class GraphManager:
|
||||
self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}')
|
||||
|
||||
def process_correlations_for_node(self, node_id: str):
|
||||
"""Process correlations for a given node based on its attributes."""
|
||||
"""
|
||||
UPDATED: Process correlations for a given node with enhanced tracking.
|
||||
Now properly tracks which attribute/provider created each correlation.
|
||||
"""
|
||||
if not self.graph.has_node(node_id):
|
||||
return
|
||||
|
||||
node_attributes = self.graph.nodes[node_id].get('attributes', [])
|
||||
|
||||
# Process each attribute for potential correlations
|
||||
for attr in node_attributes:
|
||||
attr_name = attr.get('name')
|
||||
attr_value = attr.get('value')
|
||||
attr_provider = attr.get('provider', 'unknown')
|
||||
|
||||
# Skip excluded attributes and invalid values
|
||||
if attr_name in self.EXCLUDED_KEYS or not isinstance(attr_value, (str, int, float, bool)) or attr_value is None:
|
||||
continue
|
||||
|
||||
@@ -74,25 +81,91 @@ class GraphManager:
|
||||
if isinstance(attr_value, str) and (len(attr_value) < 4 or self.date_pattern.match(attr_value)):
|
||||
continue
|
||||
|
||||
# Initialize correlation tracking for this value
|
||||
if attr_value not in self.correlation_index:
|
||||
self.correlation_index[attr_value] = set()
|
||||
self.correlation_index[attr_value] = {
|
||||
'nodes': set(),
|
||||
'sources': [] # Track which provider/attribute combinations contributed
|
||||
}
|
||||
|
||||
self.correlation_index[attr_value].add(node_id)
|
||||
# Add this node and source information
|
||||
self.correlation_index[attr_value]['nodes'].add(node_id)
|
||||
|
||||
# Track the source of this correlation value
|
||||
source_info = {
|
||||
'node_id': node_id,
|
||||
'provider': attr_provider,
|
||||
'attribute': attr_name,
|
||||
'path': f"{attr_provider}_{attr_name}"
|
||||
}
|
||||
|
||||
# Add source if not already present (avoid duplicates)
|
||||
existing_sources = [s for s in self.correlation_index[attr_value]['sources']
|
||||
if s['node_id'] == node_id and s['path'] == source_info['path']]
|
||||
if not existing_sources:
|
||||
self.correlation_index[attr_value]['sources'].append(source_info)
|
||||
|
||||
if len(self.correlation_index[attr_value]) > 1:
|
||||
self._create_correlation_node_and_edges(attr_value, self.correlation_index[attr_value])
|
||||
# Create correlation node if we have multiple nodes with this value
|
||||
if len(self.correlation_index[attr_value]['nodes']) > 1:
|
||||
self._create_enhanced_correlation_node_and_edges(attr_value, self.correlation_index[attr_value])
|
||||
|
||||
def _create_correlation_node_and_edges(self, value, nodes):
|
||||
"""Create a correlation node and edges to the correlated nodes."""
|
||||
correlation_node_id = f"corr_{value}"
|
||||
def _create_enhanced_correlation_node_and_edges(self, value, correlation_data):
|
||||
"""
|
||||
UPDATED: Create correlation node and edges with detailed provider tracking.
|
||||
"""
|
||||
correlation_node_id = f"corr_{hash(str(value)) & 0x7FFFFFFF}"
|
||||
nodes = correlation_data['nodes']
|
||||
sources = correlation_data['sources']
|
||||
|
||||
# Create or update correlation node
|
||||
if not self.graph.has_node(correlation_node_id):
|
||||
self.add_node(correlation_node_id, NodeType.CORRELATION_OBJECT,
|
||||
metadata={'value': value, 'correlated_nodes': list(nodes)})
|
||||
# Determine the most common provider/attribute combination
|
||||
provider_counts = {}
|
||||
for source in sources:
|
||||
key = f"{source['provider']}_{source['attribute']}"
|
||||
provider_counts[key] = provider_counts.get(key, 0) + 1
|
||||
|
||||
# Use the most common provider/attribute as the primary label
|
||||
primary_source = max(provider_counts.items(), key=lambda x: x[1])[0] if provider_counts else "unknown_correlation"
|
||||
|
||||
metadata = {
|
||||
'value': value,
|
||||
'correlated_nodes': list(nodes),
|
||||
'sources': sources,
|
||||
'primary_source': primary_source,
|
||||
'correlation_count': len(nodes)
|
||||
}
|
||||
|
||||
self.add_node(correlation_node_id, NodeType.CORRELATION_OBJECT, metadata=metadata)
|
||||
print(f"Created correlation node {correlation_node_id} for value '{value}' with {len(nodes)} nodes")
|
||||
|
||||
for node_id in nodes:
|
||||
# Create edges from each node to the correlation node
|
||||
for source in sources:
|
||||
node_id = source['node_id']
|
||||
provider = source['provider']
|
||||
attribute = source['attribute']
|
||||
|
||||
if self.graph.has_node(node_id) and not self.graph.has_edge(node_id, correlation_node_id):
|
||||
self.add_edge(node_id, correlation_node_id, "correlation", confidence_score=0.9)
|
||||
|
||||
# Format relationship label as "provider: attribute"
|
||||
display_provider = provider
|
||||
display_attribute = attribute.replace('_', ' ').replace('cert ', '').strip()
|
||||
|
||||
relationship_label = f"{display_provider}: {display_attribute}"
|
||||
|
||||
self.add_edge(
|
||||
source_id=node_id,
|
||||
target_id=correlation_node_id,
|
||||
relationship_type=relationship_label,
|
||||
confidence_score=0.9,
|
||||
source_provider=provider,
|
||||
raw_data={
|
||||
'correlation_value': value,
|
||||
'original_attribute': attribute,
|
||||
'correlation_type': 'attribute_matching'
|
||||
}
|
||||
)
|
||||
|
||||
print(f"Added correlation edge: {node_id} -> {correlation_node_id} ({relationship_label})")
|
||||
|
||||
def add_node(self, node_id: str, node_type: NodeType, attributes: Optional[List[Dict[str, Any]]] = None,
|
||||
description: str = "", metadata: Optional[Dict[str, Any]] = None) -> bool:
|
||||
@@ -335,26 +408,56 @@ class GraphManager:
|
||||
def get_graph_data(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Export graph data formatted for frontend visualization.
|
||||
Compatible with unified data model - preserves all attribute information for frontend display.
|
||||
UPDATED: Fixed certificate validity styling logic for unified data model.
|
||||
"""
|
||||
nodes = []
|
||||
for node_id, attrs in self.graph.nodes(data=True):
|
||||
node_data = {'id': node_id, 'label': node_id, 'type': attrs.get('type', 'unknown'),
|
||||
'attributes': attrs.get('attributes', []), # Ensure attributes is a list
|
||||
'description': attrs.get('description', ''),
|
||||
'metadata': attrs.get('metadata', {}),
|
||||
'added_timestamp': attrs.get('added_timestamp')}
|
||||
'attributes': attrs.get('attributes', []), # Ensure attributes is a list
|
||||
'description': attrs.get('description', ''),
|
||||
'metadata': attrs.get('metadata', {}),
|
||||
'added_timestamp': attrs.get('added_timestamp')}
|
||||
|
||||
# Customize node appearance based on type and attributes
|
||||
# UPDATED: Fixed certificate validity styling logic
|
||||
node_type = node_data['type']
|
||||
attributes_list = node_data['attributes']
|
||||
|
||||
# CORRECTED LOGIC: Handle certificate validity styling
|
||||
if node_type == 'domain' and isinstance(attributes_list, list):
|
||||
# Find the certificates attribute in the list
|
||||
cert_attr = next((attr for attr in attributes_list if attr.get('name') == 'certificates'), None)
|
||||
if cert_attr and cert_attr.get('value', {}).get('has_valid_cert') is False:
|
||||
node_data['color'] = {'background': '#c7c7c7', 'border': '#999'} # Gray for invalid cert
|
||||
# Check for certificate-related attributes
|
||||
has_certificates = False
|
||||
has_valid_certificates = False
|
||||
has_expired_certificates = False
|
||||
|
||||
for attr in attributes_list:
|
||||
attr_name = attr.get('name', '').lower()
|
||||
attr_provider = attr.get('provider', '').lower()
|
||||
attr_value = attr.get('value')
|
||||
|
||||
# Look for certificate attributes from crt.sh provider
|
||||
if attr_provider == 'crtsh' or 'cert' in attr_name:
|
||||
has_certificates = True
|
||||
|
||||
# Check certificate validity
|
||||
if attr_name == 'cert_is_currently_valid':
|
||||
if attr_value is True:
|
||||
has_valid_certificates = True
|
||||
elif attr_value is False:
|
||||
has_expired_certificates = True
|
||||
|
||||
# Also check for certificate expiry indicators
|
||||
elif 'expires_soon' in attr_name and attr_value is True:
|
||||
has_expired_certificates = True
|
||||
elif 'expired' in attr_name and attr_value is True:
|
||||
has_expired_certificates = True
|
||||
|
||||
# Apply styling based on certificate status
|
||||
if has_expired_certificates and not has_valid_certificates:
|
||||
# Red for expired/invalid certificates
|
||||
node_data['color'] = {'background': '#ff6b6b', 'border': '#cc5555'}
|
||||
elif not has_certificates:
|
||||
# Grey for domains with no certificates
|
||||
node_data['color'] = {'background': '#c7c7c7', 'border': '#999999'}
|
||||
# Default green styling is handled by the frontend for domains with valid certificates
|
||||
|
||||
# Add incoming and outgoing edges to node data
|
||||
if self.graph.has_node(node_id):
|
||||
@@ -366,10 +469,10 @@ class GraphManager:
|
||||
edges = []
|
||||
for source, target, attrs in self.graph.edges(data=True):
|
||||
edges.append({'from': source, 'to': target,
|
||||
'label': attrs.get('relationship_type', ''),
|
||||
'confidence_score': attrs.get('confidence_score', 0),
|
||||
'source_provider': attrs.get('source_provider', ''),
|
||||
'discovery_timestamp': attrs.get('discovery_timestamp')})
|
||||
'label': attrs.get('relationship_type', ''),
|
||||
'confidence_score': attrs.get('confidence_score', 0),
|
||||
'source_provider': attrs.get('source_provider', ''),
|
||||
'discovery_timestamp': attrs.get('discovery_timestamp')})
|
||||
return {
|
||||
'nodes': nodes, 'edges': edges,
|
||||
'statistics': self.get_statistics()['basic_metrics']
|
||||
|
||||
Reference in New Issue
Block a user