From ae07635ab69cc4257ae21fff3bddb9cb8b7d2de5 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Sun, 14 Sep 2025 20:50:09 +0200 Subject: [PATCH] update edge labels --- core/graph_manager.py | 137 +++++++++++++++++++++++++++++++++--------- 1 file changed, 110 insertions(+), 27 deletions(-) diff --git a/core/graph_manager.py b/core/graph_manager.py index 36a2e98..b0a2124 100644 --- a/core/graph_manager.py +++ b/core/graph_manager.py @@ -1,3 +1,5 @@ +# core/graph_manager.py + """ Graph data model for DNSRecon using NetworkX. Manages in-memory graph storage with confidence scoring and forensic metadata. @@ -50,21 +52,23 @@ class GraphManager: self.__dict__.update(state) self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}') - def _update_correlation_index(self, node_id: str, data: Any, path: List[str] = []): - """Recursively traverse metadata and add hashable values to the index.""" + def _update_correlation_index(self, node_id: str, data: Any, path: List[str] = [], parent_attr: str = ""): + """Recursively traverse metadata and add hashable values to the index with better path tracking.""" if path is None: path = [] if isinstance(data, dict): for key, value in data.items(): - self._update_correlation_index(node_id, value, path + [key]) + self._update_correlation_index(node_id, value, path + [key], key) elif isinstance(data, list): for i, item in enumerate(data): - self._update_correlation_index(node_id, item, path + [f"[{i}]"]) + # Instead of just using [i], include the parent attribute context + list_path_component = f"[{i}]" if not parent_attr else f"{parent_attr}[{i}]" + self._update_correlation_index(node_id, item, path + [list_path_component], parent_attr) else: - self._add_to_correlation_index(node_id, data, ".".join(path)) + self._add_to_correlation_index(node_id, data, ".".join(path), parent_attr) - def _add_to_correlation_index(self, node_id: str, value: Any, path_str: str): + def _add_to_correlation_index(self, node_id: str, value: Any, path_str: str, parent_attr: str = ""): """Add a hashable value to the correlation index, filtering out noise.""" if not isinstance(value, (str, int, float, bool)) or value is None: return @@ -90,10 +94,47 @@ class GraphManager: self.correlation_index[value] = {} if node_id not in self.correlation_index[value]: self.correlation_index[value][node_id] = [] - if path_str not in self.correlation_index[value][node_id]: - self.correlation_index[value][node_id].append(path_str) + + # Store both the full path and the parent attribute for better edge labeling + correlation_entry = { + 'path': path_str, + 'parent_attr': parent_attr, + 'meaningful_attr': self._extract_meaningful_attribute(path_str, parent_attr) + } + + if correlation_entry not in self.correlation_index[value][node_id]: + self.correlation_index[value][node_id].append(correlation_entry) - def _check_for_correlations(self, new_node_id: str, data: Any, path: List[str] = []) -> List[Dict]: + def _extract_meaningful_attribute(self, path_str: str, parent_attr: str = "") -> str: + """Extract the most meaningful attribute name from a path string.""" + if not path_str: + return "unknown" + + path_parts = path_str.split('.') + + # Look for the last non-array-index part + for part in reversed(path_parts): + # Skip array indices like [0], [1], etc. + if not (part.startswith('[') and part.endswith(']') and part[1:-1].isdigit()): + # Clean up compound names like "hostnames[0]" to just "hostnames" + clean_part = re.sub(r'\[\d+\]$', '', part) + if clean_part: + return clean_part + + # Fallback to parent attribute if available + if parent_attr: + return parent_attr + + # Last resort - use the first meaningful part + for part in path_parts: + if not (part.startswith('[') and part.endswith(']') and part[1:-1].isdigit()): + clean_part = re.sub(r'\[\d+\]$', '', part) + if clean_part: + return clean_part + + return "correlation" + + def _check_for_correlations(self, new_node_id: str, data: Any, path: List[str] = [], parent_attr: str = "") -> List[Dict]: """Recursively traverse metadata to find correlations with existing data.""" if path is None: path = [] @@ -103,10 +144,11 @@ class GraphManager: for key, value in data.items(): if key == 'source': # Avoid correlating on the provider name continue - all_correlations.extend(self._check_for_correlations(new_node_id, value, path + [key])) + all_correlations.extend(self._check_for_correlations(new_node_id, value, path + [key], key)) elif isinstance(data, list): for i, item in enumerate(data): - all_correlations.extend(self._check_for_correlations(new_node_id, item, path + [f"[{i}]"])) + list_path_component = f"[{i}]" if not parent_attr else f"{parent_attr}[{i}]" + all_correlations.extend(self._check_for_correlations(new_node_id, item, path + [list_path_component], parent_attr)) else: value = data if value in self.correlation_index: @@ -117,11 +159,31 @@ class GraphManager: if len(unique_nodes) < 2: return all_correlations # Correlation must involve at least two distinct nodes - new_source = {'node_id': new_node_id, 'path': ".".join(path)} + new_source = { + 'node_id': new_node_id, + 'path': ".".join(path), + 'parent_attr': parent_attr, + 'meaningful_attr': self._extract_meaningful_attribute(".".join(path), parent_attr) + } all_sources = [new_source] - for node_id, paths in existing_nodes_with_paths.items(): - for p_str in paths: - all_sources.append({'node_id': node_id, 'path': p_str}) + + for node_id, path_entries in existing_nodes_with_paths.items(): + for entry in path_entries: + if isinstance(entry, dict): + all_sources.append({ + 'node_id': node_id, + 'path': entry['path'], + 'parent_attr': entry.get('parent_attr', ''), + 'meaningful_attr': entry.get('meaningful_attr', self._extract_meaningful_attribute(entry['path'], entry.get('parent_attr', ''))) + }) + else: + # Handle legacy string-only entries + all_sources.append({ + 'node_id': node_id, + 'path': str(entry), + 'parent_attr': '', + 'meaningful_attr': self._extract_meaningful_attribute(str(entry)) + }) all_correlations.append({ 'value': value, @@ -163,11 +225,7 @@ class GraphManager: # Skip creating correlation node - would be redundant continue - # *** CHANGE START *** - # The overly aggressive filtering logic has been removed. - # All nodes involved in the correlation will now be used. eligible_nodes = set(corr['nodes']) - # *** CHANGE END *** if len(eligible_nodes) < 2: # Need at least 2 nodes to create a correlation @@ -187,11 +245,12 @@ class GraphManager: metadata={'values': [value], 'sources': corr['sources'], 'correlated_nodes': list(eligible_nodes)}) - # Create edges from eligible nodes to this correlation node + # Create edges from eligible nodes to this correlation node with better labeling for c_node_id in eligible_nodes: if self.graph.has_node(c_node_id): - attribute = corr['sources'][0]['path'].split('.')[-1] - relationship_type = f"c_{attribute}" + # Find the best attribute name for this node + meaningful_attr = self._find_best_attribute_name_for_node(c_node_id, corr['sources']) + relationship_type = f"c_{meaningful_attr}" self.add_edge(c_node_id, correlation_node_id, relationship_type, confidence_score=0.9) self._update_correlation_index(node_id, attributes) @@ -199,10 +258,34 @@ class GraphManager: self.last_modified = datetime.now(timezone.utc).isoformat() return is_new_node - # *** CHANGE START *** - # The following function is no longer needed and has been removed to avoid confusion. - # def _filter_nodes_without_direct_edges(self, node_set: set) -> set: - # *** CHANGE END *** + def _find_best_attribute_name_for_node(self, node_id: str, sources: List[Dict]) -> str: + """Find the best attribute name for a correlation edge by looking at the sources.""" + node_sources = [s for s in sources if s['node_id'] == node_id] + + if not node_sources: + return "correlation" + + # Use the meaningful_attr if available + for source in node_sources: + meaningful_attr = source.get('meaningful_attr') + if meaningful_attr and meaningful_attr != "unknown": + return meaningful_attr + + # Fallback to parent_attr + for source in node_sources: + parent_attr = source.get('parent_attr') + if parent_attr: + return parent_attr + + # Last resort - extract from path + for source in node_sources: + path = source.get('path', '') + if path: + extracted = self._extract_meaningful_attribute(path) + if extracted != "unknown": + return extracted + + return "correlation" def _has_direct_edge_bidirectional(self, node_a: str, node_b: str) -> bool: """ @@ -276,7 +359,7 @@ class GraphManager: # Create set of unique sources based on (node_id, path) tuples source_set = set() for source in existing_sources + new_sources: - source_tuple = (source['node_id'], source['path']) + source_tuple = (source['node_id'], source.get('path', '')) source_set.add(source_tuple) # Convert back to list of dictionaries