This commit is contained in:
overcuriousity 2025-09-13 11:52:22 +02:00
parent 84810cdbb0
commit 53baf2e291
7 changed files with 270 additions and 401 deletions

View File

@ -2,11 +2,10 @@
Graph data model for DNSRecon using NetworkX.
Manages in-memory graph storage with confidence scoring and forensic metadata.
"""
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
import re
from datetime import datetime, timezone
from enum import Enum
from datetime import timezone
from typing import Dict, List, Any, Optional, Tuple
import networkx as nx
@ -16,8 +15,11 @@ class NodeType(Enum):
DOMAIN = "domain"
IP = "ip"
ASN = "asn"
DNS_RECORD = "dns_record"
LARGE_ENTITY = "large_entity"
CORRELATION_OBJECT = "correlation_object"
def __repr__(self):
return self.value
class RelationshipType(Enum):
@ -30,25 +32,17 @@ class RelationshipType(Enum):
NS_RECORD = ("ns_record", 0.7)
PTR_RECORD = ("ptr_record", 0.8)
SOA_RECORD = ("soa_record", 0.7)
TXT_RECORD = ("txt_record", 0.7)
SRV_RECORD = ("srv_record", 0.7)
CAA_RECORD = ("caa_record", 0.7)
DNSKEY_RECORD = ("dnskey_record", 0.7)
DS_RECORD = ("ds_record", 0.7)
RRSIG_RECORD = ("rrsig_record", 0.7)
SSHFP_RECORD = ("sshfp_record", 0.7)
TLSA_RECORD = ("tlsa_record", 0.7)
NAPTR_RECORD = ("naptr_record", 0.7)
SPF_RECORD = ("spf_record", 0.7)
DNS_RECORD = ("dns_record", 0.8)
PASSIVE_DNS = ("passive_dns", 0.6)
ASN_MEMBERSHIP = ("asn", 0.7)
CORRELATED_TO = ("correlated_to", 0.9)
def __init__(self, relationship_name: str, default_confidence: float):
self.relationship_name = relationship_name
self.default_confidence = default_confidence
def __repr__(self):
return self.relationship_name
class GraphManager:
"""
@ -59,96 +53,185 @@ class GraphManager:
def __init__(self):
"""Initialize empty directed graph."""
self.graph = nx.DiGraph()
# self.lock = threading.Lock()
self.creation_time = datetime.now(timezone.utc).isoformat()
self.last_modified = self.creation_time
self.correlation_index = {}
# Compile regex for date filtering for efficiency
self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}')
def __getstate__(self):
"""GraphManager is fully picklable, return full state."""
return self.__dict__.copy()
"""Prepare GraphManager for pickling, excluding compiled regex."""
state = self.__dict__.copy()
# Compiled regex patterns are not always picklable
if 'date_pattern' in state:
del state['date_pattern']
return state
def __setstate__(self, state):
"""Restore GraphManager state."""
"""Restore GraphManager state and recompile regex."""
self.__dict__.update(state)
self.date_pattern = re.compile(r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}')
def add_node(self, node_id: str, node_type: NodeType,
metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Add a node to the graph.
def _update_correlation_index(self, node_id: str, data: Any, path: List[str] = None):
"""Recursively traverse metadata and add hashable values to the index."""
if path is None:
path = []
Args:
node_id: Unique identifier for the node
node_type: Type of the node (Domain, IP, Certificate, ASN)
metadata: Additional metadata for the node
if isinstance(data, dict):
for key, value in data.items():
self._update_correlation_index(node_id, value, path + [key])
elif isinstance(data, list):
for i, item in enumerate(data):
self._update_correlation_index(node_id, item, path + [f"[{i}]"])
else:
self._add_to_correlation_index(node_id, data, ".".join(path))
Returns:
bool: True if node was added, False if it already exists
"""
if self.graph.has_node(node_id):
# Update metadata if node exists
def _add_to_correlation_index(self, node_id: str, value: Any, path_str: str):
"""Add a hashable value to the correlation index, filtering out noise."""
if not isinstance(value, (str, int, float, bool)) or value is None:
return
# Ignore certain paths that contain noisy, non-unique identifiers
if any(keyword in path_str.lower() for keyword in ['count', 'total', 'timestamp', 'date']):
return
# Filter out common low-entropy values and date-like strings
if isinstance(value, str):
# FIXED: Prevent correlation on date/time strings.
if self.date_pattern.match(value):
return
if len(value) < 4 or value.lower() in ['true', 'false', 'unknown', 'none', 'crt.sh']:
return
elif isinstance(value, int) and abs(value) < 9999:
return # Ignore small integers
elif isinstance(value, bool):
return # Ignore boolean values
# Add the valuable correlation data to the index
if value not in self.correlation_index:
self.correlation_index[value] = {}
if node_id not in self.correlation_index[value]:
self.correlation_index[value][node_id] = []
if path_str not in self.correlation_index[value][node_id]:
self.correlation_index[value][node_id].append(path_str)
def _check_for_correlations(self, new_node_id: str, data: Any, path: List[str] = None) -> List[Dict]:
"""Recursively traverse metadata to find correlations with existing data."""
if path is None:
path = []
all_correlations = []
if isinstance(data, dict):
for key, value in data.items():
if key == 'source': # Avoid correlating on the provider name
continue
all_correlations.extend(self._check_for_correlations(new_node_id, value, path + [key]))
elif isinstance(data, list):
for i, item in enumerate(data):
all_correlations.extend(self._check_for_correlations(new_node_id, item, path + [f"[{i}]"]))
else:
value = data
if value in self.correlation_index:
existing_nodes_with_paths = self.correlation_index[value]
unique_nodes = set(existing_nodes_with_paths.keys())
unique_nodes.add(new_node_id)
if len(unique_nodes) < 2:
return all_correlations # Correlation must involve at least two distinct nodes
new_source = {'node_id': new_node_id, 'path': ".".join(path)}
all_sources = [new_source]
for node_id, paths in existing_nodes_with_paths.items():
for p_str in paths:
all_sources.append({'node_id': node_id, 'path': p_str})
all_correlations.append({
'value': value,
'sources': all_sources,
'nodes': list(unique_nodes)
})
return all_correlations
def add_node(self, node_id: str, node_type: NodeType, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""Add a node to the graph, update metadata, and process correlations."""
is_new_node = not self.graph.has_node(node_id)
if is_new_node:
self.graph.add_node(node_id, type=node_type.value,
added_timestamp=datetime.now(timezone.utc).isoformat(),
metadata=metadata or {})
elif metadata:
# Safely merge new metadata into existing metadata
existing_metadata = self.graph.nodes[node_id].get('metadata', {})
if metadata:
existing_metadata.update(metadata)
self.graph.nodes[node_id]['metadata'] = existing_metadata
if metadata and node_type != NodeType.CORRELATION_OBJECT:
correlations = self._check_for_correlations(node_id, metadata)
for corr in correlations:
value = corr['value']
# FIXED: Check if the correlation value contains an existing node ID.
found_major_node_id = None
if isinstance(value, str):
for existing_node in self.graph.nodes():
if existing_node in value:
found_major_node_id = existing_node
break
if found_major_node_id:
# An existing major node is part of the value; link to it directly.
for c_node_id in set(corr['nodes']):
if self.graph.has_node(c_node_id) and c_node_id != found_major_node_id:
self.add_edge(c_node_id, found_major_node_id, RelationshipType.CORRELATED_TO)
continue # Skip creating a redundant correlation node
# Proceed to create a new correlation node if no major node was found.
correlation_node_id = f"corr_{hash(value) & 0x7FFFFFFF}"
if not self.graph.has_node(correlation_node_id):
self.add_node(correlation_node_id, NodeType.CORRELATION_OBJECT,
metadata={'value': value, 'sources': corr['sources'],
'correlated_nodes': list(set(corr['nodes']))})
else: # Update existing correlation node
existing_meta = self.graph.nodes[correlation_node_id]['metadata']
existing_nodes = set(existing_meta.get('correlated_nodes', []))
existing_meta['correlated_nodes'] = list(existing_nodes.union(set(corr['nodes'])))
existing_sources = {(s['node_id'], s['path']) for s in existing_meta.get('sources', [])}
for s in corr['sources']:
existing_sources.add((s['node_id'], s['path']))
existing_meta['sources'] = [{'node_id': nid, 'path': p} for nid, p in existing_sources]
for c_node_id in set(corr['nodes']):
self.add_edge(c_node_id, correlation_node_id, RelationshipType.CORRELATED_TO)
self._update_correlation_index(node_id, metadata)
self.last_modified = datetime.now(timezone.utc).isoformat()
return is_new_node
def add_edge(self, source_id: str, target_id: str, relationship_type: RelationshipType,
confidence_score: Optional[float] = None, source_provider: str = "unknown",
raw_data: Optional[Dict[str, Any]] = None) -> bool:
"""Add or update an edge between two nodes, ensuring nodes exist."""
# LOGIC FIX: Ensure both source and target nodes exist before adding an edge.
if not self.graph.has_node(source_id) or not self.graph.has_node(target_id):
return False
node_attributes = {
'type': node_type.value,
'added_timestamp': datetime.now(timezone.utc).isoformat(),
'metadata': metadata or {}
}
self.graph.add_node(node_id, **node_attributes)
self.last_modified = datetime.now(timezone.utc).isoformat()
return True
def add_edge(self, source_id: str, target_id: str,
relationship_type: RelationshipType,
confidence_score: Optional[float] = None,
source_provider: str = "unknown",
raw_data: Optional[Dict[str, Any]] = None) -> bool:
"""
Add an edge between two nodes.
Args:
source_id: Source node identifier
target_id: Target node identifier
relationship_type: Type of relationship
confidence_score: Custom confidence score (overrides default)
source_provider: Provider that discovered this relationship
raw_data: Raw data from provider response
Returns:
bool: True if edge was added, False if it already exists
"""
if not self.graph.has_node(source_id) or not self.graph.has_node(target_id):
# If the target node is a subdomain, it should be added.
# The scanner will handle this logic.
pass
# Check if edge already exists
if self.graph.has_edge(source_id, target_id):
# Update confidence score if new score is higher
existing_confidence = self.graph.edges[source_id, target_id]['confidence_score']
new_confidence = confidence_score or relationship_type.default_confidence
if new_confidence > existing_confidence:
if self.graph.has_edge(source_id, target_id):
# If edge exists, update confidence if the new score is higher.
if new_confidence > self.graph.edges[source_id, target_id].get('confidence_score', 0):
self.graph.edges[source_id, target_id]['confidence_score'] = new_confidence
self.graph.edges[source_id, target_id]['updated_timestamp'] = datetime.now(timezone.utc).isoformat()
self.graph.edges[source_id, target_id]['updated_by'] = source_provider
return False
edge_attributes = {
'relationship_type': relationship_type.relationship_name,
'confidence_score': confidence_score or relationship_type.default_confidence,
'source_provider': source_provider,
'discovery_timestamp': datetime.now(timezone.utc).isoformat(),
'raw_data': raw_data or {}
}
self.graph.add_edge(source_id, target_id, **edge_attributes)
# Add a new edge with all attributes.
self.graph.add_edge(source_id, target_id,
relationship_type=relationship_type.relationship_name,
confidence_score=new_confidence,
source_provider=source_provider,
discovery_timestamp=datetime.now(timezone.utc).isoformat(),
raw_data=raw_data or {})
self.last_modified = datetime.now(timezone.utc).isoformat()
return True
@ -161,270 +244,92 @@ class GraphManager:
return self.graph.number_of_edges()
def get_nodes_by_type(self, node_type: NodeType) -> List[str]:
"""
Get all nodes of a specific type.
Args:
node_type: Type of nodes to retrieve
Returns:
List of node identifiers
"""
return [
node_id for node_id, attributes in self.graph.nodes(data=True)
if attributes.get('type') == node_type.value
]
"""Get all nodes of a specific type."""
return [n for n, d in self.graph.nodes(data=True) if d.get('type') == node_type.value]
def get_neighbors(self, node_id: str) -> List[str]:
"""
Get all neighboring nodes (both incoming and outgoing).
Args:
node_id: Node identifier
Returns:
List of neighboring node identifiers
"""
"""Get all unique neighbors (predecessors and successors) for a node."""
if not self.graph.has_node(node_id):
return []
predecessors = list(self.graph.predecessors(node_id))
successors = list(self.graph.successors(node_id))
return list(set(predecessors + successors))
return list(set(self.graph.predecessors(node_id)) | set(self.graph.successors(node_id)))
def get_high_confidence_edges(self, min_confidence: float = 0.8) -> List[Tuple[str, str, Dict]]:
"""
Get edges with confidence score above threshold.
Args:
min_confidence: Minimum confidence threshold
Returns:
List of tuples (source, target, attributes)
"""
return [
(source, target, attributes)
for source, target, attributes in self.graph.edges(data=True)
if attributes.get('confidence_score', 0) >= min_confidence
]
"""Get edges with confidence score above a given threshold."""
return [(u, v, d) for u, v, d in self.graph.edges(data=True)
if d.get('confidence_score', 0) >= min_confidence]
def get_graph_data(self) -> Dict[str, Any]:
"""
Export graph data for visualization.
Uses comprehensive metadata collected during scanning.
"""
"""Export graph data formatted for frontend visualization."""
nodes = []
edges = []
# Create nodes with the comprehensive metadata already collected
for node_id, attributes in self.graph.nodes(data=True):
node_data = {
'id': node_id,
'label': node_id,
'type': attributes.get('type', 'unknown'),
'metadata': attributes.get('metadata', {}),
'added_timestamp': attributes.get('added_timestamp')
}
# Handle certificate node labeling
if node_id.startswith('cert_'):
# For certificate nodes, create a more informative label
cert_metadata = node_data['metadata']
issuer = cert_metadata.get('issuer_name', 'Unknown')
valid_status = "" if cert_metadata.get('is_currently_valid') else ""
node_data['label'] = f"Certificate {valid_status}\n{issuer[:30]}..."
# Color coding by type
type_colors = {
'domain': {
'background': '#00ff41',
'border': '#00aa2e',
'highlight': {'background': '#44ff75', 'border': '#00ff41'},
'hover': {'background': '#22ff63', 'border': '#00cc35'}
},
'ip': {
'background': '#ff9900',
'border': '#cc7700',
'highlight': {'background': '#ffbb44', 'border': '#ff9900'},
'hover': {'background': '#ffaa22', 'border': '#dd8800'}
},
'asn': {
'background': '#00aaff',
'border': '#0088cc',
'highlight': {'background': '#44ccff', 'border': '#00aaff'},
'hover': {'background': '#22bbff', 'border': '#0099dd'}
},
'dns_record': {
'background': '#9d4edd',
'border': '#7b2cbf',
'highlight': {'background': '#c77dff', 'border': '#9d4edd'},
'hover': {'background': '#b392f0', 'border': '#8b5cf6'}
},
'large_entity': {
'background': '#ff6b6b',
'border': '#cc3a3a',
'highlight': {'background': '#ff8c8c', 'border': '#ff6b6b'},
'hover': {'background': '#ff7a7a', 'border': '#dd4a4a'}
}
}
node_color_config = type_colors.get(attributes.get('type', 'unknown'), type_colors['domain'])
node_data['color'] = node_color_config
# Add certificate validity indicator if available
for node_id, attrs in self.graph.nodes(data=True):
node_data = {'id': node_id, 'label': node_id, 'type': attrs.get('type', 'unknown'),
'metadata': attrs.get('metadata', {}),
'added_timestamp': attrs.get('added_timestamp')}
# Customize node appearance based on type and metadata
node_type = node_data['type']
metadata = node_data['metadata']
if 'certificate_data' in metadata and 'has_valid_cert' in metadata['certificate_data']:
node_data['has_valid_cert'] = metadata['certificate_data']['has_valid_cert']
if node_type == 'domain' and metadata.get('certificate_data', {}).get('has_valid_cert') is False:
node_data['color'] = {'background': '#c7c7c7', 'border': '#999'} # Gray for invalid cert
nodes.append(node_data)
# Create edges (unchanged from original)
for source, target, attributes in self.graph.edges(data=True):
edge_data = {
'from': source,
'to': target,
'label': attributes.get('relationship_type', ''),
'confidence_score': attributes.get('confidence_score', 0),
'source_provider': attributes.get('source_provider', ''),
'discovery_timestamp': attributes.get('discovery_timestamp')
}
# Enhanced edge styling based on confidence
confidence = attributes.get('confidence_score', 0)
if confidence >= 0.8:
edge_data['color'] = {
'color': '#00ff41',
'highlight': '#44ff75',
'hover': '#22ff63',
'inherit': False
}
edge_data['width'] = 4
elif confidence >= 0.6:
edge_data['color'] = {
'color': '#ff9900',
'highlight': '#ffbb44',
'hover': '#ffaa22',
'inherit': False
}
edge_data['width'] = 3
else:
edge_data['color'] = {
'color': '#666666',
'highlight': '#888888',
'hover': '#777777',
'inherit': False
}
edge_data['width'] = 2
# Add dashed line for low confidence
if confidence < 0.6:
edge_data['dashes'] = [5, 5]
edges.append(edge_data)
edges = []
for source, target, attrs in self.graph.edges(data=True):
edges.append({'from': source, 'to': target,
'label': attrs.get('relationship_type', ''),
'confidence_score': attrs.get('confidence_score', 0),
'source_provider': attrs.get('source_provider', ''),
'discovery_timestamp': attrs.get('discovery_timestamp')})
return {
'nodes': nodes,
'edges': edges,
'statistics': {
'node_count': len(nodes),
'edge_count': len(edges),
'creation_time': self.creation_time,
'last_modified': self.last_modified
}
'nodes': nodes, 'edges': edges,
'statistics': self.get_statistics()['basic_metrics']
}
def export_json(self) -> Dict[str, Any]:
"""
Export complete graph data as JSON for download.
Returns:
Dictionary containing complete graph data with metadata
"""
# Get basic graph data
graph_data = self.get_graph_data()
# Add comprehensive metadata
export_data = {
"""Export complete graph data as a JSON-serializable dictionary."""
graph_data = nx.node_link_data(self.graph) # Use NetworkX's built-in robust serializer
return {
'export_metadata': {
'export_timestamp': datetime.now(timezone.utc).isoformat(),
'graph_creation_time': self.creation_time,
'last_modified': self.last_modified,
'total_nodes': self.graph.number_of_nodes(),
'total_edges': self.graph.number_of_edges(),
'graph_format': 'dnsrecon_v1'
'total_nodes': self.get_node_count(),
'total_edges': self.get_edge_count(),
'graph_format': 'dnsrecon_v1_nodeling'
},
'nodes': graph_data['nodes'],
'edges': graph_data['edges'],
'node_types': [node_type.value for node_type in NodeType],
'relationship_types': [
{
'name': rel_type.relationship_name,
'default_confidence': rel_type.default_confidence
'graph': graph_data,
'statistics': self.get_statistics()
}
for rel_type in RelationshipType
],
'confidence_distribution': self._get_confidence_distribution()
}
return export_data
def _get_confidence_distribution(self) -> Dict[str, int]:
"""Get distribution of confidence scores."""
"""Get distribution of edge confidence scores."""
distribution = {'high': 0, 'medium': 0, 'low': 0}
for _, _, attributes in self.graph.edges(data=True):
confidence = attributes.get('confidence_score', 0)
if confidence >= 0.8:
distribution['high'] += 1
elif confidence >= 0.6:
distribution['medium'] += 1
else:
distribution['low'] += 1
for _, _, confidence in self.graph.edges(data='confidence_score', default=0):
if confidence >= 0.8: distribution['high'] += 1
elif confidence >= 0.6: distribution['medium'] += 1
else: distribution['low'] += 1
return distribution
def get_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive graph statistics.
Returns:
Dictionary containing various graph metrics
"""
stats = {
'basic_metrics': {
'total_nodes': self.graph.number_of_nodes(),
'total_edges': self.graph.number_of_edges(),
"""Get comprehensive statistics about the graph."""
stats = {'basic_metrics': {'total_nodes': self.get_node_count(),
'total_edges': self.get_edge_count(),
'creation_time': self.creation_time,
'last_modified': self.last_modified
},
'node_type_distribution': {},
'relationship_type_distribution': {},
'last_modified': self.last_modified},
'node_type_distribution': {}, 'relationship_type_distribution': {},
'confidence_distribution': self._get_confidence_distribution(),
'provider_distribution': {}
}
# Node type distribution
'provider_distribution': {}}
# Calculate distributions
for node_type in NodeType:
count = len(self.get_nodes_by_type(node_type))
stats['node_type_distribution'][node_type.value] = count
# Relationship type distribution
for _, _, attributes in self.graph.edges(data=True):
rel_type = attributes.get('relationship_type', 'unknown')
stats['relationship_type_distribution'][rel_type] = \
stats['relationship_type_distribution'].get(rel_type, 0) + 1
# Provider distribution
for _, _, attributes in self.graph.edges(data=True):
provider = attributes.get('source_provider', 'unknown')
stats['provider_distribution'][provider] = \
stats['provider_distribution'].get(provider, 0) + 1
stats['node_type_distribution'][node_type.value] = self.get_nodes_by_type(node_type).__len__()
for _, _, rel_type in self.graph.edges(data='relationship_type', default='unknown'):
stats['relationship_type_distribution'][rel_type] = stats['relationship_type_distribution'].get(rel_type, 0) + 1
for _, _, provider in self.graph.edges(data='source_provider', default='unknown'):
stats['provider_distribution'][provider] = stats['provider_distribution'].get(provider, 0) + 1
return stats
def clear(self) -> None:
"""Clear all nodes and edges from the graph."""
"""Clear all nodes, edges, and indices from the graph."""
self.graph.clear()
self.correlation_index.clear()
self.creation_time = datetime.now(timezone.utc).isoformat()
self.last_modified = self.creation_time

View File

@ -707,8 +707,6 @@ class Scanner:
return discovered_targets
# Process each relationship
dns_records_to_create = {}
for i, (source, rel_target, rel_type, confidence, raw_data) in enumerate(results):
# Check stop signal periodically during result processing
if i % 10 == 0 and self._is_stop_requested():
@ -751,11 +749,9 @@ class Scanner:
self._collect_node_metadata_forensic(rel_target, provider_name, rel_type, source, raw_data, target_metadata[rel_target])
else:
# Handle DNS record content
self._handle_dns_record_content(source, rel_target, rel_type, confidence, raw_data, provider_name, dns_records_to_create)
# Store the record content in the domain's metadata
self._collect_node_metadata_forensic(source, provider_name, rel_type, rel_target, raw_data, target_metadata[source])
# Create DNS record nodes
self._create_dns_record_nodes(dns_records_to_create)
return discovered_targets
@ -834,56 +830,15 @@ class Scanner:
'country': raw_data.get('country', '')
}
def _handle_dns_record_content(self, source: str, rel_target: str, rel_type: RelationshipType,
confidence: float, raw_data: Dict[str, Any], provider_name: str,
dns_records: Dict) -> None:
"""Handle DNS record content with forensic tracking."""
dns_record_types = [
RelationshipType.TXT_RECORD, RelationshipType.SPF_RECORD,
RelationshipType.CAA_RECORD, RelationshipType.SRV_RECORD,
RelationshipType.DNSKEY_RECORD, RelationshipType.DS_RECORD,
RelationshipType.RRSIG_RECORD, RelationshipType.SSHFP_RECORD,
RelationshipType.TLSA_RECORD, RelationshipType.NAPTR_RECORD
]
record_type_name = rel_type.relationship_name
if record_type_name not in metadata:
metadata[record_type_name] = []
if rel_type in dns_record_types:
record_type = rel_type.relationship_name.upper().replace('_RECORD', '')
record_content = rel_target.strip()
content_hash = hash(record_content) & 0x7FFFFFFF
dns_record_id = f"{record_type}:{content_hash}"
if isinstance(target, list):
metadata[record_type_name].extend(target)
else:
metadata[record_type_name].append(target)
if dns_record_id not in dns_records:
dns_records[dns_record_id] = {
'content': record_content,
'type': record_type,
'domains': set(),
'raw_data': raw_data,
'provider_name': provider_name,
'confidence': confidence
}
dns_records[dns_record_id]['domains'].add(source)
def _create_dns_record_nodes(self, dns_records: Dict) -> None:
"""Create DNS record nodes with forensic metadata."""
for dns_record_id, record_info in dns_records.items():
record_metadata = {
'record_type': record_info['type'],
'content': record_info['content'],
'content_hash': dns_record_id.split(':')[1],
'associated_domains': list(record_info['domains']),
'source_data': record_info['raw_data'],
'forensic_note': f"DNS record created from {record_info['provider_name']} query"
}
self.graph.add_node(dns_record_id, NodeType.DNS_RECORD, metadata=record_metadata)
for domain_name in record_info['domains']:
self.graph.add_edge(domain_name, dns_record_id, RelationshipType.DNS_RECORD,
record_info['confidence'], record_info['provider_name'],
record_info['raw_data'])
# Forensic logging for DNS record creation
self.logger.logger.info(f"DNS record node created: {dns_record_id} for {len(record_info['domains'])} domains")
def _log_target_processing_error(self, target: str, error: str) -> None:
"""Log target processing errors for forensic trail."""

View File

@ -144,8 +144,8 @@ class CrtShProvider(BaseProvider):
metadata['expires_soon'] = (not_after - datetime.now(timezone.utc)).days <= 30
# Add human-readable dates
metadata['not_before_formatted'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC')
metadata['not_after_formatted'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC')
metadata['not_before'] = not_before.strftime('%Y-%m-%d %H:%M:%S UTC')
metadata['not_after'] = not_after.strftime('%Y-%m-%d %H:%M:%S UTC')
except Exception as e:
self.logger.logger.debug(f"Error computing certificate metadata: {e}")

View File

@ -27,6 +27,7 @@ class DNSProvider(BaseProvider):
self.resolver = dns.resolver.Resolver()
self.resolver.timeout = 5
self.resolver.lifetime = 10
#self.resolver.nameservers = ['127.0.0.1']
def get_name(self) -> str:
"""Return the provider name."""
@ -52,7 +53,7 @@ class DNSProvider(BaseProvider):
relationships = []
# Query all record types
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA', 'DNSKEY', 'DS', 'RRSIG', 'SSHFP', 'TLSA', 'NAPTR', 'SPF']:
for record_type in ['A', 'AAAA', 'CNAME', 'MX', 'NS', 'SOA', 'TXT', 'SRV', 'CAA']:
relationships.extend(self._query_record(domain, record_type))
return relationships
@ -133,7 +134,7 @@ class DNSProvider(BaseProvider):
target = str(record.exchange).rstrip('.')
elif record_type == 'SOA':
target = str(record.mname).rstrip('.')
elif record_type in ['TXT', 'SPF']:
elif record_type in ['TXT']:
target = b' '.join(record.strings).decode('utf-8', 'ignore')
elif record_type == 'SRV':
target = str(record.target).rstrip('.')
@ -151,7 +152,13 @@ class DNSProvider(BaseProvider):
'ttl': response.ttl
}
try:
relationship_type_enum = getattr(RelationshipType, f"{record_type}_RECORD")
relationship_type_enum_name = f"{record_type}_RECORD"
# Handle TXT records as metadata, not relationships
if record_type == 'TXT':
relationship_type_enum = RelationshipType.A_RECORD # Dummy value, won't be used
else:
relationship_type_enum = getattr(RelationshipType, relationship_type_enum_name)
relationships.append((
domain,
target,

View File

@ -336,6 +336,10 @@ class GraphManager {
}
}
if (node.type === 'correlation_object') {
processedNode.label = this.formatNodeLabel(node.metadata.value, node.type);
}
return processedNode;
}
@ -406,7 +410,7 @@ class GraphManager {
'ip': '#ff9900', // Amber
'asn': '#00aaff', // Blue
'large_entity': '#ff6b6b', // Red for large entities
'dns_record': '#9620c0ff'
'correlation_object': '#9620c0ff'
};
return colors[nodeType] || '#ffffff';
}
@ -422,7 +426,7 @@ class GraphManager {
'domain': '#00aa2e',
'ip': '#cc7700',
'asn': '#0088cc',
'dns_record': '#c235c9ff'
'correlation_object': '#c235c9ff'
};
return borderColors[nodeType] || '#666666';
}
@ -437,7 +441,7 @@ class GraphManager {
'domain': 12,
'ip': 14,
'asn': 16,
'dns_record': 8
'correlation_object': 8
};
return sizes[nodeType] || 12;
}
@ -452,7 +456,7 @@ class GraphManager {
'domain': 'dot',
'ip': 'square',
'asn': 'triangle',
'dns_record': 'hexagon'
'correlation_object': 'hexagon'
};
return shapes[nodeType] || 'dot';
}
@ -850,20 +854,6 @@ class GraphManager {
};
}
/**
* Export graph as image (if needed for future implementation)
* @param {string} format - Image format ('png', 'jpeg')
* @returns {string} Data URL of the image
*/
exportAsImage(format = 'png') {
if (!this.network) return null;
// This would require additional vis.js functionality
// Placeholder for future implementation
console.log('Image export not yet implemented');
return null;
}
/**
* Apply filters to the graph
* @param {string} nodeType - The type of node to show ('all' for no filter)

View File

@ -859,6 +859,18 @@ class DNSReconApp {
detailsHtml += createDetailRow('Shodan Data', metadata.shodan);
detailsHtml += createDetailRow('VirusTotal Data', metadata.virustotal);
break;
case 'correlation_object':
detailsHtml += createDetailRow('Correlated Value', metadata.value);
if (metadata.correlated_nodes) {
detailsHtml += createDetailRow('Correlated Nodes', metadata.correlated_nodes.join(', '));
}
if (metadata.sources) {
detailsHtml += `<div class="detail-section-header">Correlation Sources</div>`;
for (const source of metadata.sources) {
detailsHtml += createDetailRow(source.node_id, source.path);
}
}
break;
}
if (metadata.certificate_data && Object.keys(metadata.certificate_data).length > 0) {

View File

@ -120,7 +120,7 @@
<option value="domain">Domain</option>
<option value="ip">IP</option>
<option value="asn">ASN</option>
<option value="dns_record">DNS Record</option>
<option value="correlation_object">Correlation Object</option>
<option value="large_entity">Large Entity</option>
</select>
</div>
@ -157,7 +157,7 @@
</div>
<div class="legend-item">
<div class="legend-color" style="background-color: #9d4edd;"></div>
<span>DNS Records</span>
<span>Correlation Objects</span>
</div>
<div class="legend-item">
<div class="legend-edge high-confidence"></div>