# core/graph_manager.py """ Graph data model for DNSRecon using NetworkX. Manages in-memory graph storage with confidence scoring and forensic metadata. """ import re from datetime import datetime, timezone from enum import Enum from typing import Dict, List, Any, Optional, Tuple import networkx as nx class NodeType(Enum): """Enumeration of supported node types.""" DOMAIN = "domain" IP = "ip" ASN = "asn" LARGE_ENTITY = "large_entity" CORRELATION_OBJECT = "correlation_object" def __repr__(self): return self.value class GraphManager: """ Thread-safe graph manager for DNSRecon infrastructure mapping. Uses NetworkX for in-memory graph storage with confidence scoring. """ def __init__(self): """Initialize empty directed graph.""" self.graph = nx.DiGraph() self.creation_time = datetime.now(timezone.utc).isoformat() self.last_modified = self.creation_time self.correlation_index = {} # Compile regex for date filtering for efficiency self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}') def __getstate__(self): """Prepare GraphManager for pickling, excluding compiled regex.""" state = self.__dict__.copy() # Compiled regex patterns are not always picklable if 'date_pattern' in state: del state['date_pattern'] return state def __setstate__(self, state): """Restore GraphManager state and recompile regex.""" self.__dict__.update(state) self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}') def _update_correlation_index(self, node_id: str, data: Any, path: List[str] = [], parent_attr: str = ""): """Recursively traverse metadata and add hashable values to the index with better path tracking.""" if path is None: path = [] if isinstance(data, dict): for key, value in data.items(): self._update_correlation_index(node_id, value, path + [key], key) elif isinstance(data, list): for i, item in enumerate(data): # Instead of just using [i], include the parent attribute context list_path_component = f"[{i}]" if not parent_attr else f"{parent_attr}[{i}]" self._update_correlation_index(node_id, item, path + [list_path_component], parent_attr) else: self._add_to_correlation_index(node_id, data, ".".join(path), parent_attr) def _add_to_correlation_index(self, node_id: str, value: Any, path_str: str, parent_attr: str = ""): """Add a hashable value to the correlation index, filtering out noise.""" if not isinstance(value, (str, int, float, bool)) or value is None: return # Ignore certain paths that contain noisy, non-unique identifiers if any(keyword in path_str.lower() for keyword in ['count', 'total', 'timestamp', 'date']): return # Filter out common low-entropy values and date-like strings if isinstance(value, str): # FIXED: Prevent correlation on date/time strings. if self.date_pattern.match(value): return if len(value) < 4 or value.lower() in ['true', 'false', 'unknown', 'none', 'crt.sh']: return elif isinstance(value, int) and (abs(value) < 1024 or abs(value) > 65535): return # Ignore small integers and common port numbers elif isinstance(value, bool): return # Ignore boolean values # Add the valuable correlation data to the index if value not in self.correlation_index: self.correlation_index[value] = {} if node_id not in self.correlation_index[value]: self.correlation_index[value][node_id] = [] # Store both the full path and the parent attribute for better edge labeling correlation_entry = { 'path': path_str, 'parent_attr': parent_attr, 'meaningful_attr': self._extract_meaningful_attribute(path_str, parent_attr) } if correlation_entry not in self.correlation_index[value][node_id]: self.correlation_index[value][node_id].append(correlation_entry) def _extract_meaningful_attribute(self, path_str: str, parent_attr: str = "") -> str: """Extract the most meaningful attribute name from a path string.""" if not path_str: return "unknown" path_parts = path_str.split('.') # Look for the last non-array-index part for part in reversed(path_parts): # Skip array indices like [0], [1], etc. if not (part.startswith('[') and part.endswith(']') and part[1:-1].isdigit()): # Clean up compound names like "hostnames[0]" to just "hostnames" clean_part = re.sub(r'\[\d+\]$', '', part) if clean_part: return clean_part # Fallback to parent attribute if available if parent_attr: return parent_attr # Last resort - use the first meaningful part for part in path_parts: if not (part.startswith('[') and part.endswith(']') and part[1:-1].isdigit()): clean_part = re.sub(r'\[\d+\]$', '', part) if clean_part: return clean_part return "correlation" def _check_for_correlations(self, new_node_id: str, data: Any, path: List[str] = [], parent_attr: str = "") -> List[Dict]: """Recursively traverse metadata to find correlations with existing data.""" if path is None: path = [] all_correlations = [] if isinstance(data, dict): for key, value in data.items(): if key == 'source': # Avoid correlating on the provider name continue all_correlations.extend(self._check_for_correlations(new_node_id, value, path + [key], key)) elif isinstance(data, list): for i, item in enumerate(data): list_path_component = f"[{i}]" if not parent_attr else f"{parent_attr}[{i}]" all_correlations.extend(self._check_for_correlations(new_node_id, item, path + [list_path_component], parent_attr)) else: value = data if value in self.correlation_index: existing_nodes_with_paths = self.correlation_index[value] unique_nodes = set(existing_nodes_with_paths.keys()) unique_nodes.add(new_node_id) if len(unique_nodes) < 2: return all_correlations # Correlation must involve at least two distinct nodes new_source = { 'node_id': new_node_id, 'path': ".".join(path), 'parent_attr': parent_attr, 'meaningful_attr': self._extract_meaningful_attribute(".".join(path), parent_attr) } all_sources = [new_source] for node_id, path_entries in existing_nodes_with_paths.items(): for entry in path_entries: if isinstance(entry, dict): all_sources.append({ 'node_id': node_id, 'path': entry['path'], 'parent_attr': entry.get('parent_attr', ''), 'meaningful_attr': entry.get('meaningful_attr', self._extract_meaningful_attribute(entry['path'], entry.get('parent_attr', ''))) }) else: # Handle legacy string-only entries all_sources.append({ 'node_id': node_id, 'path': str(entry), 'parent_attr': '', 'meaningful_attr': self._extract_meaningful_attribute(str(entry)) }) all_correlations.append({ 'value': value, 'sources': all_sources, 'nodes': list(unique_nodes) }) return all_correlations def add_node(self, node_id: str, node_type: NodeType, attributes: Optional[Dict[str, Any]] = None, description: str = "", metadata: Optional[Dict[str, Any]] = None) -> bool: """Add a node to the graph, update attributes, and process correlations.""" is_new_node = not self.graph.has_node(node_id) if is_new_node: self.graph.add_node(node_id, type=node_type.value, added_timestamp=datetime.now(timezone.utc).isoformat(), attributes=attributes or {}, description=description, metadata=metadata or {}) else: # Safely merge new attributes into existing attributes if attributes: existing_attributes = self.graph.nodes[node_id].get('attributes', {}) existing_attributes.update(attributes) self.graph.nodes[node_id]['attributes'] = existing_attributes if description: self.graph.nodes[node_id]['description'] = description if metadata: existing_metadata = self.graph.nodes[node_id].get('metadata', {}) existing_metadata.update(metadata) self.graph.nodes[node_id]['metadata'] = existing_metadata if attributes and node_type != NodeType.CORRELATION_OBJECT: correlations = self._check_for_correlations(node_id, attributes) for corr in correlations: value = corr['value'] # STEP 1: Substring check against all existing nodes if self._correlation_value_matches_existing_node(value): # Skip creating correlation node - would be redundant continue eligible_nodes = set(corr['nodes']) if len(eligible_nodes) < 2: # Need at least 2 nodes to create a correlation continue # STEP 3: Check for existing correlation node with same connection pattern correlation_nodes_with_pattern = self._find_correlation_nodes_with_same_pattern(eligible_nodes) if correlation_nodes_with_pattern: # STEP 4: Merge with existing correlation node target_correlation_node = correlation_nodes_with_pattern[0] self._merge_correlation_values(target_correlation_node, value, corr) else: # STEP 5: Create new correlation node for eligible nodes only correlation_node_id = f"corr_{abs(hash(str(sorted(eligible_nodes))))}" self.add_node(correlation_node_id, NodeType.CORRELATION_OBJECT, metadata={'values': [value], 'sources': corr['sources'], 'correlated_nodes': list(eligible_nodes)}) # Create edges from eligible nodes to this correlation node with better labeling for c_node_id in eligible_nodes: if self.graph.has_node(c_node_id): # Find the best attribute name for this node meaningful_attr = self._find_best_attribute_name_for_node(c_node_id, corr['sources']) relationship_type = f"c_{meaningful_attr}" self.add_edge(c_node_id, correlation_node_id, relationship_type, confidence_score=0.9) self._update_correlation_index(node_id, attributes) self.last_modified = datetime.now(timezone.utc).isoformat() return is_new_node def _find_best_attribute_name_for_node(self, node_id: str, sources: List[Dict]) -> str: """Find the best attribute name for a correlation edge by looking at the sources.""" node_sources = [s for s in sources if s['node_id'] == node_id] if not node_sources: return "correlation" # Use the meaningful_attr if available for source in node_sources: meaningful_attr = source.get('meaningful_attr') if meaningful_attr and meaningful_attr != "unknown": return meaningful_attr # Fallback to parent_attr for source in node_sources: parent_attr = source.get('parent_attr') if parent_attr: return parent_attr # Last resort - extract from path for source in node_sources: path = source.get('path', '') if path: extracted = self._extract_meaningful_attribute(path) if extracted != "unknown": return extracted return "correlation" def _has_direct_edge_bidirectional(self, node_a: str, node_b: str) -> bool: """ Check if there's a direct edge between two nodes in either direction. Returns True if node_a→node_b OR node_b→node_a exists. """ return (self.graph.has_edge(node_a, node_b) or self.graph.has_edge(node_b, node_a)) def _correlation_value_matches_existing_node(self, correlation_value: str) -> bool: """ Check if correlation value contains any existing node ID as substring. Returns True if match found (correlation node should NOT be created). """ correlation_str = str(correlation_value).lower() # Check against all existing nodes for existing_node_id in self.graph.nodes(): if existing_node_id.lower() in correlation_str: return True return False def _find_correlation_nodes_with_same_pattern(self, node_set: set) -> List[str]: """ Find existing correlation nodes that have the exact same pattern of connected nodes. Returns list of correlation node IDs with matching patterns. """ correlation_nodes = self.get_nodes_by_type(NodeType.CORRELATION_OBJECT) matching_nodes = [] for corr_node_id in correlation_nodes: # Get all nodes connected to this correlation node connected_nodes = set() # Add all predecessors (nodes pointing TO the correlation node) connected_nodes.update(self.graph.predecessors(corr_node_id)) # Add all successors (nodes pointed TO by the correlation node) connected_nodes.update(self.graph.successors(corr_node_id)) # Check if the pattern matches exactly if connected_nodes == node_set: matching_nodes.append(corr_node_id) return matching_nodes def _merge_correlation_values(self, target_node_id: str, new_value: Any, corr_data: Dict) -> None: """ Merge a new correlation value into an existing correlation node. Uses same logic as large entity merging. """ if not self.graph.has_node(target_node_id): return target_metadata = self.graph.nodes[target_node_id]['metadata'] # Get existing values (ensure it's a list) existing_values = target_metadata.get('values', []) if not isinstance(existing_values, list): existing_values = [existing_values] # Add new value if not already present if new_value not in existing_values: existing_values.append(new_value) # Merge sources existing_sources = target_metadata.get('sources', []) new_sources = corr_data.get('sources', []) # Create set of unique sources based on (node_id, path) tuples source_set = set() for source in existing_sources + new_sources: source_tuple = (source['node_id'], source.get('path', '')) source_set.add(source_tuple) # Convert back to list of dictionaries merged_sources = [{'node_id': nid, 'path': path} for nid, path in source_set] # Update metadata target_metadata.update({ 'values': existing_values, 'sources': merged_sources, 'correlated_nodes': list(set(target_metadata.get('correlated_nodes', []) + corr_data.get('nodes', []))), 'merge_count': len(existing_values), 'last_merge_timestamp': datetime.now(timezone.utc).isoformat() }) # Update description to reflect merged nature value_count = len(existing_values) node_count = len(target_metadata['correlated_nodes']) self.graph.nodes[target_node_id]['description'] = ( f"Correlation container with {value_count} merged values " f"across {node_count} nodes" ) def add_edge(self, source_id: str, target_id: str, relationship_type: str, confidence_score: float = 0.5, source_provider: str = "unknown", raw_data: Optional[Dict[str, Any]] = None) -> bool: """Add or update an edge between two nodes, ensuring nodes exist.""" if not self.graph.has_node(source_id) or not self.graph.has_node(target_id): return False new_confidence = confidence_score if relationship_type.startswith("c_"): edge_label = relationship_type else: edge_label = f"{source_provider}_{relationship_type}" if self.graph.has_edge(source_id, target_id): # If edge exists, update confidence if the new score is higher. if new_confidence > self.graph.edges[source_id, target_id].get('confidence_score', 0): self.graph.edges[source_id, target_id]['confidence_score'] = new_confidence self.graph.edges[source_id, target_id]['updated_timestamp'] = datetime.now(timezone.utc).isoformat() self.graph.edges[source_id, target_id]['updated_by'] = source_provider return False # Add a new edge with all attributes. self.graph.add_edge(source_id, target_id, relationship_type=edge_label, confidence_score=new_confidence, source_provider=source_provider, discovery_timestamp=datetime.now(timezone.utc).isoformat(), raw_data=raw_data or {}) self.last_modified = datetime.now(timezone.utc).isoformat() return True def extract_node_from_large_entity(self, large_entity_id: str, node_id_to_extract: str) -> bool: """ Removes a node from a large entity's internal lists and updates its count. This prepares the large entity for the node's promotion to a regular node. """ if not self.graph.has_node(large_entity_id): return False node_data = self.graph.nodes[large_entity_id] attributes = node_data.get('attributes', {}) # Remove from the list of member nodes if 'nodes' in attributes and node_id_to_extract in attributes['nodes']: attributes['nodes'].remove(node_id_to_extract) # Update the count attributes['count'] = len(attributes['nodes']) else: # This can happen if the node was already extracted, which is not an error. print(f"Warning: Node {node_id_to_extract} not found in the 'nodes' list of {large_entity_id}.") return True # Proceed as if successful self.last_modified = datetime.now(timezone.utc).isoformat() return True def remove_node(self, node_id: str) -> bool: """Remove a node and its connected edges from the graph.""" if not self.graph.has_node(node_id): return False # Remove node from the graph (NetworkX handles removing connected edges) self.graph.remove_node(node_id) # Clean up the correlation index keys_to_delete = [] for value, nodes in self.correlation_index.items(): if node_id in nodes: del nodes[node_id] if not nodes: # If no other nodes are associated with this value, remove it keys_to_delete.append(value) for key in keys_to_delete: if key in self.correlation_index: del self.correlation_index[key] self.last_modified = datetime.now(timezone.utc).isoformat() return True def get_node_count(self) -> int: """Get total number of nodes in the graph.""" return self.graph.number_of_nodes() def get_edge_count(self) -> int: """Get total number of edges in the graph.""" return self.graph.number_of_edges() def get_nodes_by_type(self, node_type: NodeType) -> List[str]: """Get all nodes of a specific type.""" return [n for n, d in self.graph.nodes(data=True) if d.get('type') == node_type.value] def get_neighbors(self, node_id: str) -> List[str]: """Get all unique neighbors (predecessors and successors) for a node.""" if not self.graph.has_node(node_id): return [] return list(set(self.graph.predecessors(node_id)) | set(self.graph.successors(node_id))) def get_high_confidence_edges(self, min_confidence: float = 0.8) -> List[Tuple[str, str, Dict]]: """Get edges with confidence score above a given threshold.""" return [(u, v, d) for u, v, d in self.graph.edges(data=True) if d.get('confidence_score', 0) >= min_confidence] def get_graph_data(self) -> Dict[str, Any]: """Export graph data formatted for frontend visualization.""" nodes = [] for node_id, attrs in self.graph.nodes(data=True): node_data = {'id': node_id, 'label': node_id, 'type': attrs.get('type', 'unknown'), 'attributes': attrs.get('attributes', {}), 'description': attrs.get('description', ''), 'metadata': attrs.get('metadata', {}), 'added_timestamp': attrs.get('added_timestamp')} # Customize node appearance based on type and attributes node_type = node_data['type'] attributes = node_data['attributes'] if node_type == 'domain' and attributes.get('certificates', {}).get('has_valid_cert') is False: node_data['color'] = {'background': '#c7c7c7', 'border': '#999'} # Gray for invalid cert # Add incoming and outgoing edges to node data if self.graph.has_node(node_id): node_data['incoming_edges'] = [{'from': u, 'data': d} for u, _, d in self.graph.in_edges(node_id, data=True)] node_data['outgoing_edges'] = [{'to': v, 'data': d} for _, v, d in self.graph.out_edges(node_id, data=True)] nodes.append(node_data) edges = [] for source, target, attrs in self.graph.edges(data=True): edges.append({'from': source, 'to': target, 'label': attrs.get('relationship_type', ''), 'confidence_score': attrs.get('confidence_score', 0), 'source_provider': attrs.get('source_provider', ''), 'discovery_timestamp': attrs.get('discovery_timestamp')}) return { 'nodes': nodes, 'edges': edges, 'statistics': self.get_statistics()['basic_metrics'] } def export_json(self) -> Dict[str, Any]: """Export complete graph data as a JSON-serializable dictionary.""" graph_data = nx.node_link_data(self.graph) # Use NetworkX's built-in robust serializer return { 'export_metadata': { 'export_timestamp': datetime.now(timezone.utc).isoformat(), 'graph_creation_time': self.creation_time, 'last_modified': self.last_modified, 'total_nodes': self.get_node_count(), 'total_edges': self.get_edge_count(), 'graph_format': 'dnsrecon_v1_nodeling' }, 'graph': graph_data, 'statistics': self.get_statistics() } def _get_confidence_distribution(self) -> Dict[str, int]: """Get distribution of edge confidence scores.""" distribution = {'high': 0, 'medium': 0, 'low': 0} for _, _, data in self.graph.edges(data=True): confidence = data.get('confidence_score', 0) if confidence >= 0.8: distribution['high'] += 1 elif confidence >= 0.6: distribution['medium'] += 1 else: distribution['low'] += 1 return distribution def get_statistics(self) -> Dict[str, Any]: """Get comprehensive statistics about the graph.""" stats = {'basic_metrics': {'total_nodes': self.get_node_count(), 'total_edges': self.get_edge_count(), 'creation_time': self.creation_time, 'last_modified': self.last_modified}, 'node_type_distribution': {}, 'relationship_type_distribution': {}, 'confidence_distribution': self._get_confidence_distribution(), 'provider_distribution': {}} # Calculate distributions for node_type in NodeType: stats['node_type_distribution'][node_type.value] = self.get_nodes_by_type(node_type).__len__() for _, _, data in self.graph.edges(data=True): rel_type = data.get('relationship_type', 'unknown') stats['relationship_type_distribution'][rel_type] = stats['relationship_type_distribution'].get(rel_type, 0) + 1 provider = data.get('source_provider', 'unknown') stats['provider_distribution'][provider] = stats['provider_distribution'].get(provider, 0) + 1 return stats def clear(self) -> None: """Clear all nodes, edges, and indices from the graph.""" self.graph.clear() self.correlation_index.clear() self.creation_time = datetime.now(timezone.utc).isoformat() self.last_modified = self.creation_time