89 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			89 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# DNScope-reduced/core/provider_result.py
 | 
						|
 | 
						|
"""
 | 
						|
Unified data model for DNScope passive reconnaissance.
 | 
						|
Standardizes the data structure across all providers to ensure consistent processing.
 | 
						|
"""
 | 
						|
 | 
						|
from typing import Any, Optional, List, Dict
 | 
						|
from dataclasses import dataclass, field
 | 
						|
from datetime import datetime, timezone
 | 
						|
 | 
						|
 | 
						|
@dataclass
 | 
						|
class StandardAttribute:
 | 
						|
    """A unified data structure for a single piece of information about a node."""
 | 
						|
    target_node: str
 | 
						|
    name: str
 | 
						|
    value: Any
 | 
						|
    type: str
 | 
						|
    provider: str
 | 
						|
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
 | 
						|
    metadata: Optional[Dict[str, Any]] = field(default_factory=dict)
 | 
						|
 | 
						|
@dataclass
 | 
						|
class Relationship:
 | 
						|
    """A unified data structure for a directional link between two nodes."""
 | 
						|
    source_node: str
 | 
						|
    target_node: str
 | 
						|
    relationship_type: str
 | 
						|
    provider: str
 | 
						|
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
 | 
						|
    raw_data: Optional[Dict[str, Any]] = field(default_factory=dict)
 | 
						|
 | 
						|
@dataclass
 | 
						|
class ProviderResult:
 | 
						|
    """A container for all data returned by a provider from a single query."""
 | 
						|
    attributes: List[StandardAttribute] = field(default_factory=list)
 | 
						|
    relationships: List[Relationship] = field(default_factory=list)
 | 
						|
 | 
						|
    def add_attribute(self, target_node: str, name: str, value: Any, attr_type: str, 
 | 
						|
                     provider: str, metadata: Optional[Dict[str, Any]] = None) -> None:
 | 
						|
        """Helper method to add an attribute to the result."""
 | 
						|
        self.attributes.append(StandardAttribute(
 | 
						|
            target_node=target_node,
 | 
						|
            name=name,
 | 
						|
            value=value,
 | 
						|
            type=attr_type,
 | 
						|
            provider=provider,
 | 
						|
            metadata=metadata or {}
 | 
						|
        ))
 | 
						|
 | 
						|
    def add_relationship(self, source_node: str, target_node: str, relationship_type: str,
 | 
						|
                        provider: str, raw_data: Optional[Dict[str, Any]] = None) -> None:
 | 
						|
        """Helper method to add a relationship to the result."""
 | 
						|
        self.relationships.append(Relationship(
 | 
						|
            source_node=source_node,
 | 
						|
            target_node=target_node,
 | 
						|
            relationship_type=relationship_type,
 | 
						|
            provider=provider,
 | 
						|
            raw_data=raw_data or {}
 | 
						|
        ))
 | 
						|
 | 
						|
    def get_discovered_nodes(self) -> set:
 | 
						|
        """Get all unique node identifiers discovered in this result."""
 | 
						|
        nodes = set()
 | 
						|
        
 | 
						|
        # Add nodes from relationships
 | 
						|
        for rel in self.relationships:
 | 
						|
            nodes.add(rel.source_node)
 | 
						|
            nodes.add(rel.target_node)
 | 
						|
            
 | 
						|
        # Add nodes from attributes
 | 
						|
        for attr in self.attributes:
 | 
						|
            nodes.add(attr.target_node)
 | 
						|
            
 | 
						|
        return nodes
 | 
						|
 | 
						|
    def get_relationship_count(self) -> int:
 | 
						|
        """Get the total number of relationships in this result."""
 | 
						|
        return len(self.relationships)
 | 
						|
 | 
						|
    def get_attribute_count(self) -> int:
 | 
						|
        """Get the total number of attributes in this result."""
 | 
						|
        return len(self.attributes)
 | 
						|
 | 
						|
    ##TODO
 | 
						|
    #def is_large_entity(self, threshold: int) -> bool:
 | 
						|
    #    """Check if this result qualifies as a large entity based on relationship count."""
 | 
						|
    #    return self.get_relationship_count() > threshold |