dnsrecon/core/graph_manager.py
overcuriousity d3e1fcf35f it
2025-09-11 14:01:15 +02:00

422 lines
15 KiB
Python

"""
Graph data model for DNSRecon using NetworkX.
Manages in-memory graph storage with confidence scoring and forensic metadata.
"""
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from enum import Enum
from datetime import timezone
import networkx as nx
class NodeType(Enum):
"""Enumeration of supported node types."""
DOMAIN = "domain"
IP = "ip"
ASN = "asn"
DNS_RECORD = "dns_record"
LARGE_ENTITY = "large_entity"
class RelationshipType(Enum):
"""Enumeration of supported relationship types with confidence scores."""
SAN_CERTIFICATE = ("san", 0.9)
A_RECORD = ("a_record", 0.8)
AAAA_RECORD = ("aaaa_record", 0.8)
CNAME_RECORD = ("cname", 0.8)
MX_RECORD = ("mx_record", 0.7)
NS_RECORD = ("ns_record", 0.7)
PTR_RECORD = ("ptr_record", 0.8)
SOA_RECORD = ("soa_record", 0.7)
TXT_RECORD = ("txt_record", 0.7)
SRV_RECORD = ("srv_record", 0.7)
CAA_RECORD = ("caa_record", 0.7)
DNSKEY_RECORD = ("dnskey_record", 0.7)
DS_RECORD = ("ds_record", 0.7)
RRSIG_RECORD = ("rrsig_record", 0.7)
SSHFP_RECORD = ("sshfp_record", 0.7)
TLSA_RECORD = ("tlsa_record", 0.7)
NAPTR_RECORD = ("naptr_record", 0.7)
SPF_RECORD = ("spf_record", 0.7)
DNS_RECORD = ("dns_record", 0.8)
PASSIVE_DNS = ("passive_dns", 0.6)
ASN_MEMBERSHIP = ("asn", 0.7)
def __init__(self, relationship_name: str, default_confidence: float):
self.relationship_name = relationship_name
self.default_confidence = default_confidence
class GraphManager:
"""
Thread-safe graph manager for DNSRecon infrastructure mapping.
Uses NetworkX for in-memory graph storage with confidence scoring.
"""
def __init__(self):
"""Initialize empty directed graph."""
self.graph = nx.DiGraph()
# self.lock = threading.Lock()
self.creation_time = datetime.now(timezone.utc).isoformat()
self.last_modified = self.creation_time
def add_node(self, node_id: str, node_type: NodeType,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Add a node to the graph.
Args:
node_id: Unique identifier for the node
node_type: Type of the node (Domain, IP, Certificate, ASN)
metadata: Additional metadata for the node
Returns:
bool: True if node was added, False if it already exists
"""
if self.graph.has_node(node_id):
# Update metadata if node exists
existing_metadata = self.graph.nodes[node_id].get('metadata', {})
if metadata:
existing_metadata.update(metadata)
self.graph.nodes[node_id]['metadata'] = existing_metadata
return False
node_attributes = {
'type': node_type.value,
'added_timestamp': datetime.now(timezone.utc).isoformat(),
'metadata': metadata or {}
}
self.graph.add_node(node_id, **node_attributes)
self.last_modified = datetime.now(timezone.utc).isoformat()
return True
def add_edge(self, source_id: str, target_id: str,
relationship_type: RelationshipType,
confidence_score: Optional[float] = None,
source_provider: str = "unknown",
raw_data: Optional[Dict[str, Any]] = None) -> bool:
"""
Add an edge between two nodes.
Args:
source_id: Source node identifier
target_id: Target node identifier
relationship_type: Type of relationship
confidence_score: Custom confidence score (overrides default)
source_provider: Provider that discovered this relationship
raw_data: Raw data from provider response
Returns:
bool: True if edge was added, False if it already exists
"""
if not self.graph.has_node(source_id) or not self.graph.has_node(target_id):
# If the target node is a subdomain, it should be added.
# The scanner will handle this logic.
pass
# Check if edge already exists
if self.graph.has_edge(source_id, target_id):
# Update confidence score if new score is higher
existing_confidence = self.graph.edges[source_id, target_id]['confidence_score']
new_confidence = confidence_score or relationship_type.default_confidence
if new_confidence > existing_confidence:
self.graph.edges[source_id, target_id]['confidence_score'] = new_confidence
self.graph.edges[source_id, target_id]['updated_timestamp'] = datetime.now(timezone.utc).isoformat()
self.graph.edges[source_id, target_id]['updated_by'] = source_provider
return False
edge_attributes = {
'relationship_type': relationship_type.relationship_name,
'confidence_score': confidence_score or relationship_type.default_confidence,
'source_provider': source_provider,
'discovery_timestamp': datetime.now(timezone.utc).isoformat(),
'raw_data': raw_data or {}
}
self.graph.add_edge(source_id, target_id, **edge_attributes)
self.last_modified = datetime.now(timezone.utc).isoformat()
return True
def get_node_count(self) -> int:
"""Get total number of nodes in the graph."""
return self.graph.number_of_nodes()
def get_edge_count(self) -> int:
"""Get total number of edges in the graph."""
return self.graph.number_of_edges()
def get_nodes_by_type(self, node_type: NodeType) -> List[str]:
"""
Get all nodes of a specific type.
Args:
node_type: Type of nodes to retrieve
Returns:
List of node identifiers
"""
return [
node_id for node_id, attributes in self.graph.nodes(data=True)
if attributes.get('type') == node_type.value
]
def get_neighbors(self, node_id: str) -> List[str]:
"""
Get all neighboring nodes (both incoming and outgoing).
Args:
node_id: Node identifier
Returns:
List of neighboring node identifiers
"""
if not self.graph.has_node(node_id):
return []
predecessors = list(self.graph.predecessors(node_id))
successors = list(self.graph.successors(node_id))
return list(set(predecessors + successors))
def get_high_confidence_edges(self, min_confidence: float = 0.8) -> List[Tuple[str, str, Dict]]:
"""
Get edges with confidence score above threshold.
Args:
min_confidence: Minimum confidence threshold
Returns:
List of tuples (source, target, attributes)
"""
return [
(source, target, attributes)
for source, target, attributes in self.graph.edges(data=True)
if attributes.get('confidence_score', 0) >= min_confidence
]
def get_graph_data(self) -> Dict[str, Any]:
"""
Export graph data for visualization.
Uses comprehensive metadata collected during scanning.
"""
nodes = []
edges = []
# Create nodes with the comprehensive metadata already collected
for node_id, attributes in self.graph.nodes(data=True):
node_data = {
'id': node_id,
'label': node_id,
'type': attributes.get('type', 'unknown'),
'metadata': attributes.get('metadata', {}),
'added_timestamp': attributes.get('added_timestamp')
}
# Handle certificate node labeling
if node_id.startswith('cert_'):
# For certificate nodes, create a more informative label
cert_metadata = node_data['metadata']
issuer = cert_metadata.get('issuer_name', 'Unknown')
valid_status = "" if cert_metadata.get('is_currently_valid') else ""
node_data['label'] = f"Certificate {valid_status}\n{issuer[:30]}..."
# Color coding by type
type_colors = {
'domain': {
'background': '#00ff41',
'border': '#00aa2e',
'highlight': {'background': '#44ff75', 'border': '#00ff41'},
'hover': {'background': '#22ff63', 'border': '#00cc35'}
},
'ip': {
'background': '#ff9900',
'border': '#cc7700',
'highlight': {'background': '#ffbb44', 'border': '#ff9900'},
'hover': {'background': '#ffaa22', 'border': '#dd8800'}
},
'asn': {
'background': '#00aaff',
'border': '#0088cc',
'highlight': {'background': '#44ccff', 'border': '#00aaff'},
'hover': {'background': '#22bbff', 'border': '#0099dd'}
},
'dns_record': {
'background': '#9d4edd',
'border': '#7b2cbf',
'highlight': {'background': '#c77dff', 'border': '#9d4edd'},
'hover': {'background': '#b392f0', 'border': '#8b5cf6'}
},
'large_entity': {
'background': '#ff6b6b',
'border': '#cc3a3a',
'highlight': {'background': '#ff8c8c', 'border': '#ff6b6b'},
'hover': {'background': '#ff7a7a', 'border': '#dd4a4a'}
}
}
node_color_config = type_colors.get(attributes.get('type', 'unknown'), type_colors['domain'])
node_data['color'] = node_color_config
# Add certificate validity indicator if available
metadata = node_data['metadata']
if 'certificate_data' in metadata and 'has_valid_cert' in metadata['certificate_data']:
node_data['has_valid_cert'] = metadata['certificate_data']['has_valid_cert']
nodes.append(node_data)
# Create edges (unchanged from original)
for source, target, attributes in self.graph.edges(data=True):
edge_data = {
'from': source,
'to': target,
'label': attributes.get('relationship_type', ''),
'confidence_score': attributes.get('confidence_score', 0),
'source_provider': attributes.get('source_provider', ''),
'discovery_timestamp': attributes.get('discovery_timestamp')
}
# Enhanced edge styling based on confidence
confidence = attributes.get('confidence_score', 0)
if confidence >= 0.8:
edge_data['color'] = {
'color': '#00ff41',
'highlight': '#44ff75',
'hover': '#22ff63',
'inherit': False
}
edge_data['width'] = 4
elif confidence >= 0.6:
edge_data['color'] = {
'color': '#ff9900',
'highlight': '#ffbb44',
'hover': '#ffaa22',
'inherit': False
}
edge_data['width'] = 3
else:
edge_data['color'] = {
'color': '#666666',
'highlight': '#888888',
'hover': '#777777',
'inherit': False
}
edge_data['width'] = 2
# Add dashed line for low confidence
if confidence < 0.6:
edge_data['dashes'] = [5, 5]
edges.append(edge_data)
return {
'nodes': nodes,
'edges': edges,
'statistics': {
'node_count': len(nodes),
'edge_count': len(edges),
'creation_time': self.creation_time,
'last_modified': self.last_modified
}
}
def export_json(self) -> Dict[str, Any]:
"""
Export complete graph data as JSON for download.
Returns:
Dictionary containing complete graph data with metadata
"""
# Get basic graph data
graph_data = self.get_graph_data()
# Add comprehensive metadata
export_data = {
'export_metadata': {
'export_timestamp': datetime.now(timezone.utc).isoformat(),
'graph_creation_time': self.creation_time,
'last_modified': self.last_modified,
'total_nodes': self.graph.number_of_nodes(),
'total_edges': self.graph.number_of_edges(),
'graph_format': 'dnsrecon_v1'
},
'nodes': graph_data['nodes'],
'edges': graph_data['edges'],
'node_types': [node_type.value for node_type in NodeType],
'relationship_types': [
{
'name': rel_type.relationship_name,
'default_confidence': rel_type.default_confidence
}
for rel_type in RelationshipType
],
'confidence_distribution': self._get_confidence_distribution()
}
return export_data
def _get_confidence_distribution(self) -> Dict[str, int]:
"""Get distribution of confidence scores."""
distribution = {'high': 0, 'medium': 0, 'low': 0}
for _, _, attributes in self.graph.edges(data=True):
confidence = attributes.get('confidence_score', 0)
if confidence >= 0.8:
distribution['high'] += 1
elif confidence >= 0.6:
distribution['medium'] += 1
else:
distribution['low'] += 1
return distribution
def get_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive graph statistics.
Returns:
Dictionary containing various graph metrics
"""
stats = {
'basic_metrics': {
'total_nodes': self.graph.number_of_nodes(),
'total_edges': self.graph.number_of_edges(),
'creation_time': self.creation_time,
'last_modified': self.last_modified
},
'node_type_distribution': {},
'relationship_type_distribution': {},
'confidence_distribution': self._get_confidence_distribution(),
'provider_distribution': {}
}
# Node type distribution
for node_type in NodeType:
count = len(self.get_nodes_by_type(node_type))
stats['node_type_distribution'][node_type.value] = count
# Relationship type distribution
for _, _, attributes in self.graph.edges(data=True):
rel_type = attributes.get('relationship_type', 'unknown')
stats['relationship_type_distribution'][rel_type] = \
stats['relationship_type_distribution'].get(rel_type, 0) + 1
# Provider distribution
for _, _, attributes in self.graph.edges(data=True):
provider = attributes.get('source_provider', 'unknown')
stats['provider_distribution'][provider] = \
stats['provider_distribution'].get(provider, 0) + 1
return stats
def clear(self) -> None:
"""Clear all nodes and edges from the graph."""
self.graph.clear()
self.creation_time = datetime.now(timezone.utc).isoformat()
self.last_modified = self.creation_time