# dnsrecon-reduced/core/provider_result.py """ Unified data model for DNSRecon passive reconnaissance. Standardizes the data structure across all providers to ensure consistent processing. """ from typing import Any, Optional, List, Dict from dataclasses import dataclass, field from datetime import datetime, timezone @dataclass class StandardAttribute: """A unified data structure for a single piece of information about a node.""" target_node: str name: str value: Any type: str provider: str confidence: float timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) metadata: Optional[Dict[str, Any]] = field(default_factory=dict) def __post_init__(self): """Validate the attribute after initialization.""" if not isinstance(self.confidence, (int, float)) or not 0.0 <= self.confidence <= 1.0: raise ValueError(f"Confidence must be between 0.0 and 1.0, got {self.confidence}") @dataclass class Relationship: """A unified data structure for a directional link between two nodes.""" source_node: str target_node: str relationship_type: str confidence: float provider: str timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) raw_data: Optional[Dict[str, Any]] = field(default_factory=dict) def __post_init__(self): """Validate the relationship after initialization.""" if not isinstance(self.confidence, (int, float)) or not 0.0 <= self.confidence <= 1.0: raise ValueError(f"Confidence must be between 0.0 and 1.0, got {self.confidence}") @dataclass class ProviderResult: """A container for all data returned by a provider from a single query.""" attributes: List[StandardAttribute] = field(default_factory=list) relationships: List[Relationship] = field(default_factory=list) def add_attribute(self, target_node: str, name: str, value: Any, attr_type: str, provider: str, confidence: float = 0.8, metadata: Optional[Dict[str, Any]] = None) -> None: """Helper method to add an attribute to the result.""" self.attributes.append(StandardAttribute( target_node=target_node, name=name, value=value, type=attr_type, provider=provider, confidence=confidence, metadata=metadata or {} )) def add_relationship(self, source_node: str, target_node: str, relationship_type: str, provider: str, confidence: float = 0.8, raw_data: Optional[Dict[str, Any]] = None) -> None: """Helper method to add a relationship to the result.""" self.relationships.append(Relationship( source_node=source_node, target_node=target_node, relationship_type=relationship_type, confidence=confidence, provider=provider, raw_data=raw_data or {} )) def get_discovered_nodes(self) -> set: """Get all unique node identifiers discovered in this result.""" nodes = set() # Add nodes from relationships for rel in self.relationships: nodes.add(rel.source_node) nodes.add(rel.target_node) # Add nodes from attributes for attr in self.attributes: nodes.add(attr.target_node) return nodes def get_relationship_count(self) -> int: """Get the total number of relationships in this result.""" return len(self.relationships) def get_attribute_count(self) -> int: """Get the total number of attributes in this result.""" return len(self.attributes) def is_large_entity(self, threshold: int) -> bool: """Check if this result qualifies as a large entity based on relationship count.""" return self.get_relationship_count() > threshold