dnscope/core/provider_result.py
overcuriousity 897bb80183 gradient
2025-09-24 09:30:42 +02:00

89 lines
3.2 KiB
Python

# DNScope-reduced/core/provider_result.py
"""
Unified data model for DNScope passive reconnaissance.
Standardizes the data structure across all providers to ensure consistent processing.
"""
from typing import Any, Optional, List, Dict
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class StandardAttribute:
"""A unified data structure for a single piece of information about a node."""
target_node: str
name: str
value: Any
type: str
provider: str
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
metadata: Optional[Dict[str, Any]] = field(default_factory=dict)
@dataclass
class Relationship:
"""A unified data structure for a directional link between two nodes."""
source_node: str
target_node: str
relationship_type: str
provider: str
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
raw_data: Optional[Dict[str, Any]] = field(default_factory=dict)
@dataclass
class ProviderResult:
"""A container for all data returned by a provider from a single query."""
attributes: List[StandardAttribute] = field(default_factory=list)
relationships: List[Relationship] = field(default_factory=list)
def add_attribute(self, target_node: str, name: str, value: Any, attr_type: str,
provider: str, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Helper method to add an attribute to the result."""
self.attributes.append(StandardAttribute(
target_node=target_node,
name=name,
value=value,
type=attr_type,
provider=provider,
metadata=metadata or {}
))
def add_relationship(self, source_node: str, target_node: str, relationship_type: str,
provider: str, raw_data: Optional[Dict[str, Any]] = None) -> None:
"""Helper method to add a relationship to the result."""
self.relationships.append(Relationship(
source_node=source_node,
target_node=target_node,
relationship_type=relationship_type,
provider=provider,
raw_data=raw_data or {}
))
def get_discovered_nodes(self) -> set:
"""Get all unique node identifiers discovered in this result."""
nodes = set()
# Add nodes from relationships
for rel in self.relationships:
nodes.add(rel.source_node)
nodes.add(rel.target_node)
# Add nodes from attributes
for attr in self.attributes:
nodes.add(attr.target_node)
return nodes
def get_relationship_count(self) -> int:
"""Get the total number of relationships in this result."""
return len(self.relationships)
def get_attribute_count(self) -> int:
"""Get the total number of attributes in this result."""
return len(self.attributes)
##TODO
#def is_large_entity(self, threshold: int) -> bool:
# """Check if this result qualifies as a large entity based on relationship count."""
# return self.get_relationship_count() > threshold