89 lines
3.2 KiB
Python
89 lines
3.2 KiB
Python
# DNScope-reduced/core/provider_result.py
|
|
|
|
"""
|
|
Unified data model for DNScope passive reconnaissance.
|
|
Standardizes the data structure across all providers to ensure consistent processing.
|
|
"""
|
|
|
|
from typing import Any, Optional, List, Dict
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
@dataclass
|
|
class StandardAttribute:
|
|
"""A unified data structure for a single piece of information about a node."""
|
|
target_node: str
|
|
name: str
|
|
value: Any
|
|
type: str
|
|
provider: str
|
|
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
metadata: Optional[Dict[str, Any]] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class Relationship:
|
|
"""A unified data structure for a directional link between two nodes."""
|
|
source_node: str
|
|
target_node: str
|
|
relationship_type: str
|
|
provider: str
|
|
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
|
raw_data: Optional[Dict[str, Any]] = field(default_factory=dict)
|
|
|
|
@dataclass
|
|
class ProviderResult:
|
|
"""A container for all data returned by a provider from a single query."""
|
|
attributes: List[StandardAttribute] = field(default_factory=list)
|
|
relationships: List[Relationship] = field(default_factory=list)
|
|
|
|
def add_attribute(self, target_node: str, name: str, value: Any, attr_type: str,
|
|
provider: str, metadata: Optional[Dict[str, Any]] = None) -> None:
|
|
"""Helper method to add an attribute to the result."""
|
|
self.attributes.append(StandardAttribute(
|
|
target_node=target_node,
|
|
name=name,
|
|
value=value,
|
|
type=attr_type,
|
|
provider=provider,
|
|
metadata=metadata or {}
|
|
))
|
|
|
|
def add_relationship(self, source_node: str, target_node: str, relationship_type: str,
|
|
provider: str, raw_data: Optional[Dict[str, Any]] = None) -> None:
|
|
"""Helper method to add a relationship to the result."""
|
|
self.relationships.append(Relationship(
|
|
source_node=source_node,
|
|
target_node=target_node,
|
|
relationship_type=relationship_type,
|
|
provider=provider,
|
|
raw_data=raw_data or {}
|
|
))
|
|
|
|
def get_discovered_nodes(self) -> set:
|
|
"""Get all unique node identifiers discovered in this result."""
|
|
nodes = set()
|
|
|
|
# Add nodes from relationships
|
|
for rel in self.relationships:
|
|
nodes.add(rel.source_node)
|
|
nodes.add(rel.target_node)
|
|
|
|
# Add nodes from attributes
|
|
for attr in self.attributes:
|
|
nodes.add(attr.target_node)
|
|
|
|
return nodes
|
|
|
|
def get_relationship_count(self) -> int:
|
|
"""Get the total number of relationships in this result."""
|
|
return len(self.relationships)
|
|
|
|
def get_attribute_count(self) -> int:
|
|
"""Get the total number of attributes in this result."""
|
|
return len(self.attributes)
|
|
|
|
##TODO
|
|
#def is_large_entity(self, threshold: int) -> bool:
|
|
# """Check if this result qualifies as a large entity based on relationship count."""
|
|
# return self.get_relationship_count() > threshold |