770 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			770 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# dnsrecon/core/scanner.py
 | 
						|
 | 
						|
import threading
 | 
						|
import traceback
 | 
						|
import time
 | 
						|
import os
 | 
						|
import importlib
 | 
						|
from typing import List, Set, Dict, Any, Tuple
 | 
						|
from concurrent.futures import ThreadPoolExecutor, as_completed, CancelledError, Future
 | 
						|
from collections import defaultdict, deque
 | 
						|
from datetime import datetime, timezone
 | 
						|
 | 
						|
from core.graph_manager import GraphManager, NodeType
 | 
						|
from core.logger import get_forensic_logger, new_session
 | 
						|
from utils.helpers import _is_valid_ip, _is_valid_domain
 | 
						|
from providers.base_provider import BaseProvider
 | 
						|
 | 
						|
 | 
						|
class ScanStatus:
 | 
						|
    """Enumeration of scan statuses."""
 | 
						|
    IDLE = "idle"
 | 
						|
    RUNNING = "running"
 | 
						|
    COMPLETED = "completed"
 | 
						|
    FAILED = "failed"
 | 
						|
    STOPPED = "stopped"
 | 
						|
 | 
						|
 | 
						|
class Scanner:
 | 
						|
    """
 | 
						|
    Main scanning orchestrator for DNSRecon passive reconnaissance.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, session_config=None):
 | 
						|
        """Initialize scanner with session-specific configuration."""
 | 
						|
        print("Initializing Scanner instance...")
 | 
						|
 | 
						|
        try:
 | 
						|
            # Use provided session config or create default
 | 
						|
            if session_config is None:
 | 
						|
                from core.session_config import create_session_config
 | 
						|
                session_config = create_session_config()
 | 
						|
            
 | 
						|
            self.config = session_config
 | 
						|
            self.graph = GraphManager()
 | 
						|
            self.providers = []
 | 
						|
            self.status = ScanStatus.IDLE
 | 
						|
            self.current_target = None
 | 
						|
            self.current_depth = 0
 | 
						|
            self.max_depth = 2
 | 
						|
            self.stop_event = threading.Event()
 | 
						|
            self.scan_thread = None
 | 
						|
            self.session_id = None  # Will be set by session manager
 | 
						|
 | 
						|
            # Scanning progress tracking
 | 
						|
            self.total_indicators_found = 0
 | 
						|
            self.indicators_processed = 0
 | 
						|
            self.current_indicator = ""
 | 
						|
 | 
						|
            # Concurrent processing configuration
 | 
						|
            self.max_workers = self.config.max_concurrent_requests
 | 
						|
            self.executor = None
 | 
						|
 | 
						|
            # Initialize providers with session config
 | 
						|
            print("Calling _initialize_providers with session config...")
 | 
						|
            self._initialize_providers()
 | 
						|
 | 
						|
            # Initialize logger
 | 
						|
            print("Initializing forensic logger...")
 | 
						|
            self.logger = get_forensic_logger()
 | 
						|
 | 
						|
            print("Scanner initialization complete")
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            print(f"ERROR: Scanner initialization failed: {e}")
 | 
						|
            traceback.print_exc()
 | 
						|
            raise
 | 
						|
 | 
						|
    def _is_stop_requested(self) -> bool:
 | 
						|
        """
 | 
						|
        Check if stop is requested using both local and Redis-based signals.
 | 
						|
        This ensures reliable termination across process boundaries.
 | 
						|
        """
 | 
						|
        # Check local threading event first (fastest)
 | 
						|
        if self.stop_event.is_set():
 | 
						|
            return True
 | 
						|
        
 | 
						|
        # Check Redis-based stop signal if session ID is available
 | 
						|
        if self.session_id:
 | 
						|
            try:
 | 
						|
                from core.session_manager import session_manager
 | 
						|
                return session_manager.is_stop_requested(self.session_id)
 | 
						|
            except Exception as e:
 | 
						|
                print(f"Error checking Redis stop signal: {e}")
 | 
						|
                # Fall back to local event
 | 
						|
                return self.stop_event.is_set()
 | 
						|
        
 | 
						|
        return False
 | 
						|
 | 
						|
    def _set_stop_signal(self) -> None:
 | 
						|
        """
 | 
						|
        Set stop signal both locally and in Redis.
 | 
						|
        """
 | 
						|
        # Set local event
 | 
						|
        self.stop_event.set()
 | 
						|
        
 | 
						|
        # Set Redis signal if session ID is available
 | 
						|
        if self.session_id:
 | 
						|
            try:
 | 
						|
                from core.session_manager import session_manager
 | 
						|
                session_manager.set_stop_signal(self.session_id)
 | 
						|
            except Exception as e:
 | 
						|
                print(f"Error setting Redis stop signal: {e}")
 | 
						|
 | 
						|
    def __getstate__(self):
 | 
						|
        """Prepare object for pickling by excluding unpicklable attributes."""
 | 
						|
        state = self.__dict__.copy()
 | 
						|
        
 | 
						|
        # Remove unpicklable threading objects
 | 
						|
        unpicklable_attrs = [
 | 
						|
            'stop_event',
 | 
						|
            'scan_thread', 
 | 
						|
            'executor'
 | 
						|
        ]
 | 
						|
        
 | 
						|
        for attr in unpicklable_attrs:
 | 
						|
            if attr in state:
 | 
						|
                del state[attr]
 | 
						|
        
 | 
						|
        # Handle providers separately to ensure they're picklable
 | 
						|
        if 'providers' in state:
 | 
						|
            # The providers should be picklable now, but let's ensure clean state
 | 
						|
            for provider in state['providers']:
 | 
						|
                if hasattr(provider, '_stop_event'):
 | 
						|
                    provider._stop_event = None
 | 
						|
        
 | 
						|
        return state
 | 
						|
 | 
						|
    def __setstate__(self, state):
 | 
						|
        """Restore object after unpickling by reconstructing threading objects."""
 | 
						|
        self.__dict__.update(state)
 | 
						|
        
 | 
						|
        # Reconstruct threading objects
 | 
						|
        self.stop_event = threading.Event()
 | 
						|
        self.scan_thread = None
 | 
						|
        self.executor = None
 | 
						|
        
 | 
						|
        # Re-set stop events for providers
 | 
						|
        if hasattr(self, 'providers'):
 | 
						|
            for provider in self.providers:
 | 
						|
                if hasattr(provider, 'set_stop_event'):
 | 
						|
                    provider.set_stop_event(self.stop_event)
 | 
						|
 | 
						|
    def _initialize_providers(self) -> None:
 | 
						|
        """Initialize all available providers based on session configuration."""
 | 
						|
        self.providers = []
 | 
						|
        print("Initializing providers with session config...")
 | 
						|
 | 
						|
        provider_dir = os.path.join(os.path.dirname(__file__), '..', 'providers')
 | 
						|
        for filename in os.listdir(provider_dir):
 | 
						|
            if filename.endswith('_provider.py') and not filename.startswith('base'):
 | 
						|
                module_name = f"providers.{filename[:-3]}"
 | 
						|
                try:
 | 
						|
                    module = importlib.import_module(module_name)
 | 
						|
                    for attribute_name in dir(module):
 | 
						|
                        attribute = getattr(module, attribute_name)
 | 
						|
                        if isinstance(attribute, type) and issubclass(attribute, BaseProvider) and attribute is not BaseProvider:
 | 
						|
                            provider_class = attribute
 | 
						|
                            provider_name = provider_class(session_config=self.config).get_name()
 | 
						|
                            if self.config.is_provider_enabled(provider_name):
 | 
						|
                                provider = provider_class(session_config=self.config)
 | 
						|
                                if provider.is_available():
 | 
						|
                                    provider.set_stop_event(self.stop_event)
 | 
						|
                                    self.providers.append(provider)
 | 
						|
                                    print(f"✓ {provider.get_display_name()} provider initialized successfully for session")
 | 
						|
                                else:
 | 
						|
                                    print(f"✗ {provider.get_display_name()} provider is not available")
 | 
						|
                except Exception as e:
 | 
						|
                    print(f"✗ Failed to initialize provider from {filename}: {e}")
 | 
						|
                    traceback.print_exc()
 | 
						|
 | 
						|
        print(f"Initialized {len(self.providers)} providers for session")
 | 
						|
 | 
						|
    def update_session_config(self, new_config) -> None:
 | 
						|
        """Update session configuration and reinitialize providers."""
 | 
						|
        print("Updating session configuration...")
 | 
						|
        self.config = new_config
 | 
						|
        self.max_workers = self.config.max_concurrent_requests
 | 
						|
        self._initialize_providers()
 | 
						|
        print("Session configuration updated")
 | 
						|
 | 
						|
    def start_scan(self, target_domain: str, max_depth: int = 2, clear_graph: bool = True) -> bool:
 | 
						|
        """Start a new reconnaissance scan with immediate GUI feedback."""
 | 
						|
        print(f"=== STARTING SCAN IN SCANNER {id(self)} ===")
 | 
						|
        print(f"Session ID: {self.session_id}")
 | 
						|
        print(f"Initial scanner status: {self.status}")
 | 
						|
 | 
						|
        # Clean up previous scan thread if needed
 | 
						|
        if self.scan_thread and self.scan_thread.is_alive():
 | 
						|
            print("A previous scan thread is still alive. Sending termination signal and waiting...")
 | 
						|
            self.stop_scan()
 | 
						|
            # Wait for the thread to die, with a timeout
 | 
						|
            self.scan_thread.join(10.0) 
 | 
						|
 | 
						|
            if self.scan_thread.is_alive():
 | 
						|
                print("ERROR: The previous scan thread is unresponsive and could not be stopped.")
 | 
						|
                self.status = ScanStatus.FAILED
 | 
						|
                self._update_session_state()
 | 
						|
                return False
 | 
						|
            print("Previous scan thread terminated successfully.")
 | 
						|
 | 
						|
        # Reset state for new scan
 | 
						|
        self.status = ScanStatus.IDLE
 | 
						|
        # This update is crucial for the UI to reflect the reset before the new scan starts.
 | 
						|
        self._update_session_state() 
 | 
						|
        print("Scanner state is now clean for a new scan.")
 | 
						|
 | 
						|
        try:
 | 
						|
            if not hasattr(self, 'providers') or not self.providers:
 | 
						|
                print(f"ERROR: No providers available in scanner {id(self)}, cannot start scan")
 | 
						|
                return False
 | 
						|
            
 | 
						|
            print(f"Scanner {id(self)} validation passed, providers available: {[p.get_name() for p in self.providers]}")
 | 
						|
 | 
						|
            if clear_graph:
 | 
						|
                self.graph.clear()
 | 
						|
            self.current_target = target_domain.lower().strip()
 | 
						|
            self.max_depth = max_depth
 | 
						|
            self.current_depth = 0
 | 
						|
            
 | 
						|
            # Clear both local and Redis stop signals for the new scan
 | 
						|
            self.stop_event.clear()
 | 
						|
            if self.session_id:
 | 
						|
                from core.session_manager import session_manager
 | 
						|
                session_manager.clear_stop_signal(self.session_id)
 | 
						|
            
 | 
						|
            self.total_indicators_found = 0
 | 
						|
            self.indicators_processed = 0
 | 
						|
            self.current_indicator = self.current_target
 | 
						|
 | 
						|
            # Update GUI with scan preparation state
 | 
						|
            self._update_session_state()
 | 
						|
 | 
						|
            # Start new forensic session
 | 
						|
            print(f"Starting new forensic session for scanner {id(self)}...")
 | 
						|
            self.logger = new_session()
 | 
						|
 | 
						|
            # Start scan in a separate thread
 | 
						|
            print(f"Starting scan thread for scanner {id(self)}...")
 | 
						|
            self.scan_thread = threading.Thread(
 | 
						|
                target=self._execute_scan,
 | 
						|
                args=(self.current_target, max_depth),
 | 
						|
                daemon=True
 | 
						|
            )
 | 
						|
            self.scan_thread.start()
 | 
						|
 | 
						|
            print(f"=== SCAN STARTED SUCCESSFULLY IN SCANNER {id(self)} ===")
 | 
						|
            return True
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            print(f"ERROR: Exception in start_scan for scanner {id(self)}: {e}")
 | 
						|
            traceback.print_exc()
 | 
						|
            self.status = ScanStatus.FAILED
 | 
						|
            # Update GUI with failed status immediately
 | 
						|
            self._update_session_state() 
 | 
						|
            return False
 | 
						|
 | 
						|
    def _execute_scan(self, target_domain: str, max_depth: int) -> None:
 | 
						|
        """Execute the reconnaissance scan using a task queue-based approach."""
 | 
						|
        print(f"_execute_scan started for {target_domain} with depth {max_depth}")
 | 
						|
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
 | 
						|
        processed_targets = set()
 | 
						|
        
 | 
						|
        task_queue = deque([(target_domain, 0, False)])  # target, depth, is_large_entity_member
 | 
						|
        
 | 
						|
        try:
 | 
						|
            self.status = ScanStatus.RUNNING
 | 
						|
            self._update_session_state()
 | 
						|
            
 | 
						|
            enabled_providers = [provider.get_name() for provider in self.providers]
 | 
						|
            self.logger.log_scan_start(target_domain, max_depth, enabled_providers)
 | 
						|
            self.graph.add_node(target_domain, NodeType.DOMAIN)
 | 
						|
            self._initialize_provider_states(target_domain)
 | 
						|
 | 
						|
            while task_queue:
 | 
						|
                if self._is_stop_requested():
 | 
						|
                    print("Stop requested, terminating scan.")
 | 
						|
                    break
 | 
						|
 | 
						|
                target, depth, is_large_entity_member = task_queue.popleft()
 | 
						|
 | 
						|
                if target in processed_targets:
 | 
						|
                    continue
 | 
						|
 | 
						|
                if depth > max_depth:
 | 
						|
                    continue
 | 
						|
 | 
						|
                self.current_depth = depth
 | 
						|
                self.current_indicator = target
 | 
						|
                self._update_session_state()
 | 
						|
                
 | 
						|
                new_targets, large_entity_members = self._query_providers_for_target(target, depth, is_large_entity_member)
 | 
						|
                processed_targets.add(target)
 | 
						|
                
 | 
						|
                for new_target in new_targets:
 | 
						|
                    if new_target not in processed_targets:
 | 
						|
                        task_queue.append((new_target, depth + 1, False))
 | 
						|
                
 | 
						|
                for member in large_entity_members:
 | 
						|
                    if member not in processed_targets:
 | 
						|
                        task_queue.append((member, depth, True))
 | 
						|
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            print(f"ERROR: Scan execution failed with error: {e}")
 | 
						|
            traceback.print_exc()
 | 
						|
            self.status = ScanStatus.FAILED
 | 
						|
            self.logger.logger.error(f"Scan failed: {e}")
 | 
						|
        finally:
 | 
						|
            if self._is_stop_requested():
 | 
						|
                self.status = ScanStatus.STOPPED
 | 
						|
            else:
 | 
						|
                self.status = ScanStatus.COMPLETED
 | 
						|
                
 | 
						|
            self._update_session_state()
 | 
						|
            self.logger.log_scan_complete()
 | 
						|
            if self.executor:
 | 
						|
                self.executor.shutdown(wait=False, cancel_futures=True)
 | 
						|
            stats = self.graph.get_statistics()
 | 
						|
            print("Final scan statistics:")
 | 
						|
            print(f"  - Total nodes: {stats['basic_metrics']['total_nodes']}")
 | 
						|
            print(f"  - Total edges: {stats['basic_metrics']['total_edges']}")
 | 
						|
            print(f"  - Targets processed: {len(processed_targets)}")
 | 
						|
 | 
						|
    def _query_providers_for_target(self, target: str, depth: int, dns_only: bool = False) -> Tuple[Set[str], Set[str]]:
 | 
						|
        """Helper method to query providers for a single target."""
 | 
						|
        is_ip = _is_valid_ip(target)
 | 
						|
        target_type = NodeType.IP if is_ip else NodeType.DOMAIN
 | 
						|
        print(f"Querying providers for {target_type.value}: {target} at depth {depth}")
 | 
						|
 | 
						|
        if self._is_stop_requested():
 | 
						|
            print(f"Stop requested before querying providers for {target}")
 | 
						|
            return set(), set()
 | 
						|
 | 
						|
        self.graph.add_node(target, target_type)
 | 
						|
        self._initialize_provider_states(target)
 | 
						|
        
 | 
						|
        new_targets = set()
 | 
						|
        large_entity_members = set()
 | 
						|
        node_attributes = defaultdict(lambda: defaultdict(list))
 | 
						|
 | 
						|
        eligible_providers = self._get_eligible_providers(target, is_ip, dns_only)
 | 
						|
        
 | 
						|
        if not eligible_providers:
 | 
						|
            self._log_no_eligible_providers(target, is_ip)
 | 
						|
            return new_targets, large_entity_members
 | 
						|
 | 
						|
        for provider in eligible_providers:
 | 
						|
            if self._is_stop_requested():
 | 
						|
                print(f"Stop requested while querying providers for {target}")
 | 
						|
                break
 | 
						|
            
 | 
						|
            try:
 | 
						|
                provider_results = self._query_single_provider_forensic(provider, target, is_ip, depth)
 | 
						|
                if provider_results and not self._is_stop_requested():
 | 
						|
                    discovered, is_large_entity = self._process_provider_results_forensic(
 | 
						|
                        target, provider, provider_results, node_attributes, depth
 | 
						|
                    )
 | 
						|
                    if is_large_entity:
 | 
						|
                        large_entity_members.update(discovered)
 | 
						|
                    else:
 | 
						|
                        new_targets.update(discovered)
 | 
						|
            except Exception as e:
 | 
						|
                self._log_provider_error(target, provider.get_name(), str(e))
 | 
						|
 | 
						|
        for node_id, attributes in node_attributes.items():
 | 
						|
            if self.graph.graph.has_node(node_id):
 | 
						|
                node_is_ip = _is_valid_ip(node_id)
 | 
						|
                node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN
 | 
						|
                self.graph.add_node(node_id, node_type_to_add, attributes=attributes)
 | 
						|
 | 
						|
        return new_targets, large_entity_members
 | 
						|
 | 
						|
    def _update_session_state(self) -> None:
 | 
						|
        """
 | 
						|
        Update the scanner state in Redis for GUI updates.
 | 
						|
        This ensures the web interface sees real-time updates.
 | 
						|
        """
 | 
						|
        if self.session_id:
 | 
						|
            try:
 | 
						|
                from core.session_manager import session_manager
 | 
						|
                success = session_manager.update_session_scanner(self.session_id, self)
 | 
						|
                if not success:
 | 
						|
                    print(f"WARNING: Failed to update session state for {self.session_id}")
 | 
						|
            except Exception as e:
 | 
						|
                print(f"ERROR: Failed to update session state: {e}")
 | 
						|
 | 
						|
    def _initialize_provider_states(self, target: str) -> None:
 | 
						|
        """Initialize provider states for forensic tracking."""
 | 
						|
        if not self.graph.graph.has_node(target):
 | 
						|
            return
 | 
						|
            
 | 
						|
        node_data = self.graph.graph.nodes[target]
 | 
						|
        if 'metadata' not in node_data:
 | 
						|
            node_data['metadata'] = {}
 | 
						|
        if 'provider_states' not in node_data['metadata']:
 | 
						|
            node_data['metadata']['provider_states'] = {}
 | 
						|
 | 
						|
    def _get_eligible_providers(self, target: str, is_ip: bool, dns_only: bool) -> List:
 | 
						|
        """Get providers eligible for querying this target."""
 | 
						|
        if dns_only:
 | 
						|
            return [p for p in self.providers if p.get_name() == 'dns']
 | 
						|
 | 
						|
        eligible = []
 | 
						|
        target_key = 'ips' if is_ip else 'domains'
 | 
						|
        
 | 
						|
        for provider in self.providers:
 | 
						|
            if provider.get_eligibility().get(target_key):
 | 
						|
                if not self._already_queried_provider(target, provider.get_name()):
 | 
						|
                    eligible.append(provider)
 | 
						|
                else:
 | 
						|
                    print(f"Skipping {provider.get_name()} for {target} - already queried")
 | 
						|
        
 | 
						|
        return eligible
 | 
						|
 | 
						|
    def _already_queried_provider(self, target: str, provider_name: str) -> bool:
 | 
						|
        """Check if we already queried a provider for a target."""
 | 
						|
        if not self.graph.graph.has_node(target):
 | 
						|
            return False
 | 
						|
            
 | 
						|
        node_data = self.graph.graph.nodes[target]
 | 
						|
        provider_states = node_data.get('metadata', {}).get('provider_states', {})
 | 
						|
        return provider_name in provider_states
 | 
						|
 | 
						|
    def _query_single_provider_forensic(self, provider, target: str, is_ip: bool, current_depth: int) -> List:
 | 
						|
        """Query a single provider with stop signal checking."""
 | 
						|
        provider_name = provider.get_name()
 | 
						|
        start_time = datetime.now(timezone.utc)
 | 
						|
        
 | 
						|
        if self._is_stop_requested():
 | 
						|
            print(f"Stop requested before querying {provider_name} for {target}")
 | 
						|
            return []
 | 
						|
        
 | 
						|
        print(f"Querying {provider_name} for {target}")
 | 
						|
        
 | 
						|
        self.logger.logger.info(f"Attempting {provider_name} query for {target} at depth {current_depth}")
 | 
						|
        
 | 
						|
        try:
 | 
						|
            if is_ip:
 | 
						|
                results = provider.query_ip(target)
 | 
						|
            else:
 | 
						|
                results = provider.query_domain(target)
 | 
						|
            
 | 
						|
            if self._is_stop_requested():
 | 
						|
                print(f"Stop requested after querying {provider_name} for {target}")
 | 
						|
                return []
 | 
						|
            
 | 
						|
            self._update_provider_state(target, provider_name, 'success', len(results), None, start_time)
 | 
						|
            
 | 
						|
            print(f"✓ {provider_name} returned {len(results)} results for {target}")
 | 
						|
            return results
 | 
						|
            
 | 
						|
        except Exception as e:
 | 
						|
            self._update_provider_state(target, provider_name, 'failed', 0, str(e), start_time)
 | 
						|
            print(f"✗ {provider_name} failed for {target}: {e}")
 | 
						|
            return []
 | 
						|
 | 
						|
    def _update_provider_state(self, target: str, provider_name: str, status: str, 
 | 
						|
                              results_count: int, error: str, start_time: datetime) -> None:
 | 
						|
        """Update provider state in node metadata for forensic tracking."""
 | 
						|
        if not self.graph.graph.has_node(target):
 | 
						|
            return
 | 
						|
            
 | 
						|
        node_data = self.graph.graph.nodes[target]
 | 
						|
        if 'metadata' not in node_data:
 | 
						|
            node_data['metadata'] = {}
 | 
						|
        if 'provider_states' not in node_data['metadata']:
 | 
						|
            node_data['metadata']['provider_states'] = {}
 | 
						|
        
 | 
						|
        node_data['metadata']['provider_states'][provider_name] = {
 | 
						|
            'status': status,
 | 
						|
            'timestamp': start_time.isoformat(),
 | 
						|
            'results_count': results_count,
 | 
						|
            'error': error,
 | 
						|
            'duration_ms': (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
 | 
						|
        }
 | 
						|
        
 | 
						|
        self.logger.logger.info(f"Provider state updated: {target} -> {provider_name} -> {status} ({results_count} results)")
 | 
						|
 | 
						|
    def _process_provider_results_forensic(self, target: str, provider, results: List,
 | 
						|
                                        node_attributes: Dict, current_depth: int) -> Tuple[Set[str], bool]:
 | 
						|
        """Process provider results, returns (discovered_targets, is_large_entity)."""
 | 
						|
        provider_name = provider.get_name()
 | 
						|
        discovered_targets = set()
 | 
						|
 | 
						|
        if self._is_stop_requested():
 | 
						|
            print(f"Stop requested before processing results from {provider_name} for {target}")
 | 
						|
            return discovered_targets, False
 | 
						|
 | 
						|
        if len(results) > self.config.large_entity_threshold:
 | 
						|
            print(f"Large entity detected: {provider_name} returned {len(results)} results for {target}")
 | 
						|
            members = self._create_large_entity(target, provider_name, results, current_depth)
 | 
						|
            return members, True
 | 
						|
 | 
						|
        for i, (source, rel_target, rel_type, confidence, raw_data) in enumerate(results):
 | 
						|
            if i % 10 == 0 and self._is_stop_requested():
 | 
						|
                print(f"Stop requested while processing results from {provider_name} for {target}")
 | 
						|
                break
 | 
						|
 | 
						|
            self.logger.log_relationship_discovery(
 | 
						|
                source_node=source,
 | 
						|
                target_node=rel_target,
 | 
						|
                relationship_type=rel_type,
 | 
						|
                confidence_score=confidence,
 | 
						|
                provider=provider_name,
 | 
						|
                raw_data=raw_data,
 | 
						|
                discovery_method=f"{provider_name}_query_depth_{current_depth}"
 | 
						|
            )
 | 
						|
 | 
						|
            self._collect_node_attributes(source, provider_name, rel_type, rel_target, raw_data, node_attributes[source])
 | 
						|
 | 
						|
            if _is_valid_ip(rel_target):
 | 
						|
                self.graph.add_node(rel_target, NodeType.IP)
 | 
						|
                if self.graph.add_edge(source, rel_target, rel_type, confidence, provider_name, raw_data):
 | 
						|
                    print(f"Added IP relationship: {source} -> {rel_target} ({rel_type})")
 | 
						|
                discovered_targets.add(rel_target)
 | 
						|
 | 
						|
            elif rel_target.startswith('AS') and rel_target[2:].isdigit():
 | 
						|
                self.graph.add_node(rel_target, NodeType.ASN)
 | 
						|
                if self.graph.add_edge(source, rel_target, rel_type, confidence, provider_name, raw_data):
 | 
						|
                    print(f"Added ASN relationship: {source} -> {rel_target} ({rel_type})")
 | 
						|
 | 
						|
            elif _is_valid_domain(rel_target):
 | 
						|
                self.graph.add_node(rel_target, NodeType.DOMAIN)
 | 
						|
                if self.graph.add_edge(source, rel_target, rel_type, confidence, provider_name, raw_data):
 | 
						|
                    print(f"Added domain relationship: {source} -> {rel_target} ({rel_type})")
 | 
						|
                discovered_targets.add(rel_target)
 | 
						|
                self._collect_node_attributes(rel_target, provider_name, rel_type, source, raw_data, node_attributes[rel_target])
 | 
						|
            
 | 
						|
            else:
 | 
						|
                self._collect_node_attributes(source, provider_name, rel_type, rel_target, raw_data, node_attributes[source])
 | 
						|
 | 
						|
        return discovered_targets, False
 | 
						|
 | 
						|
    def _create_large_entity(self, source: str, provider_name: str, results: List, current_depth: int) -> Set[str]:
 | 
						|
        """Create a large entity node and returns the members for DNS processing."""
 | 
						|
        entity_id = f"large_entity_{provider_name}_{hash(source) & 0x7FFFFFFF}"
 | 
						|
        
 | 
						|
        targets = [rel[1] for rel in results if len(rel) > 1]
 | 
						|
        node_type = 'unknown'
 | 
						|
        
 | 
						|
        if targets:
 | 
						|
            if _is_valid_domain(targets[0]):
 | 
						|
                node_type = 'domain'
 | 
						|
            elif _is_valid_ip(targets[0]):
 | 
						|
                node_type = 'ip'
 | 
						|
        
 | 
						|
        for target in targets:
 | 
						|
            self.graph.add_node(target, NodeType.DOMAIN if node_type == 'domain' else NodeType.IP)
 | 
						|
 | 
						|
        attributes = {
 | 
						|
            'count': len(targets),
 | 
						|
            'nodes': targets,
 | 
						|
            'node_type': node_type,
 | 
						|
            'source_provider': provider_name,
 | 
						|
            'discovery_depth': current_depth,
 | 
						|
            'threshold_exceeded': self.config.large_entity_threshold,
 | 
						|
        }
 | 
						|
        description = f'Large entity created due to {len(targets)} results from {provider_name}'
 | 
						|
        
 | 
						|
        self.graph.add_node(entity_id, NodeType.LARGE_ENTITY, attributes=attributes, description=description)
 | 
						|
        
 | 
						|
        if results:
 | 
						|
            rel_type = results[0][2]
 | 
						|
            self.graph.add_edge(source, entity_id, rel_type, 0.9, provider_name, 
 | 
						|
                              {'large_entity_info': f'Contains {len(targets)} {node_type}s'})
 | 
						|
        
 | 
						|
        self.logger.logger.warning(f"Large entity created: {entity_id} contains {len(targets)} targets from {provider_name}")
 | 
						|
        print(f"Created large entity {entity_id} for {len(targets)} {node_type}s from {provider_name}")
 | 
						|
        
 | 
						|
        return set(targets)
 | 
						|
 | 
						|
    def _collect_node_attributes(self, node_id: str, provider_name: str, rel_type: str,
 | 
						|
                                    target: str, raw_data: Dict[str, Any], attributes: Dict[str, Any]) -> None:
 | 
						|
        """Collect and organize attributes for a node."""
 | 
						|
        self.logger.logger.debug(f"Collecting attributes for {node_id} from {provider_name}: {rel_type}")
 | 
						|
 | 
						|
        if provider_name == 'dns':
 | 
						|
            record_type = raw_data.get('query_type', 'UNKNOWN')
 | 
						|
            value = raw_data.get('value', target)
 | 
						|
            dns_entry = f"{record_type}: {value}"
 | 
						|
            if dns_entry not in attributes.get('dns_records', []):
 | 
						|
                attributes.setdefault('dns_records', []).append(dns_entry)
 | 
						|
 | 
						|
        elif provider_name == 'crtsh':
 | 
						|
            if rel_type == "san_certificate":
 | 
						|
                domain_certs = raw_data.get('domain_certificates', {})
 | 
						|
                if node_id in domain_certs:
 | 
						|
                    cert_summary = domain_certs[node_id]
 | 
						|
                    attributes['certificates'] = cert_summary
 | 
						|
                    if target not in attributes.get('related_domains_san', []):
 | 
						|
                        attributes.setdefault('related_domains_san', []).append(target)
 | 
						|
 | 
						|
        elif provider_name == 'shodan':
 | 
						|
            shodan_attributes = attributes.setdefault('shodan', {})
 | 
						|
            for key, value in raw_data.items():
 | 
						|
                if key not in shodan_attributes or not shodan_attributes.get(key):
 | 
						|
                    shodan_attributes[key] = value
 | 
						|
 | 
						|
        if rel_type == "asn_membership":
 | 
						|
            attributes['asn'] = {
 | 
						|
                'id': target,
 | 
						|
                'description': raw_data.get('org', ''),
 | 
						|
                'isp': raw_data.get('isp', ''),
 | 
						|
                'country': raw_data.get('country', '')
 | 
						|
            }
 | 
						|
        
 | 
						|
        record_type_name = rel_type
 | 
						|
        if record_type_name not in attributes:
 | 
						|
            attributes[record_type_name] = []
 | 
						|
        
 | 
						|
        if isinstance(target, list):
 | 
						|
            attributes[record_type_name].extend(target)
 | 
						|
        else:
 | 
						|
            if target not in attributes[record_type_name]:
 | 
						|
                attributes[record_type_name].append(target)
 | 
						|
 | 
						|
 | 
						|
    def _log_target_processing_error(self, target: str, error: str) -> None:
 | 
						|
        """Log target processing errors for forensic trail."""
 | 
						|
        self.logger.logger.error(f"Target processing failed for {target}: {error}")
 | 
						|
 | 
						|
    def _log_provider_error(self, target: str, provider_name: str, error: str) -> None:
 | 
						|
        """Log provider query errors for forensic trail."""
 | 
						|
        self.logger.logger.error(f"Provider {provider_name} failed for {target}: {error}")
 | 
						|
 | 
						|
    def _log_no_eligible_providers(self, target: str, is_ip: bool) -> None:
 | 
						|
        """Log when no providers are eligible for a target."""
 | 
						|
        target_type = 'IP' if is_ip else 'domain'
 | 
						|
        self.logger.logger.warning(f"No eligible providers for {target_type}: {target}")
 | 
						|
 | 
						|
    def stop_scan(self) -> bool:
 | 
						|
        """Request immediate scan termination with immediate GUI feedback."""
 | 
						|
        try:
 | 
						|
            print("=== INITIATING IMMEDIATE SCAN TERMINATION ===")
 | 
						|
            self.logger.logger.info("Scan termination requested by user")
 | 
						|
            
 | 
						|
            # Set both local and Redis stop signals
 | 
						|
            self._set_stop_signal()
 | 
						|
            self.status = ScanStatus.STOPPED
 | 
						|
            
 | 
						|
            # Immediately update GUI with stopped status
 | 
						|
            self._update_session_state()
 | 
						|
 | 
						|
            # Aggressively shut down the executor and cancel all pending tasks
 | 
						|
            if self.executor:
 | 
						|
                print("Shutting down executor with immediate cancellation...")
 | 
						|
                self.executor.shutdown(wait=False, cancel_futures=True)
 | 
						|
 | 
						|
            print("Termination signals sent. The scan will stop as soon as possible.")
 | 
						|
            return True
 | 
						|
            
 | 
						|
        except Exception as e:
 | 
						|
            print(f"ERROR: Exception in stop_scan: {e}")
 | 
						|
            self.logger.logger.error(f"Error during scan termination: {e}")
 | 
						|
            traceback.print_exc()
 | 
						|
            return False
 | 
						|
 | 
						|
    def get_scan_status(self) -> Dict[str, Any]:
 | 
						|
        """Get current scan status with forensic information."""
 | 
						|
        try:
 | 
						|
            return {
 | 
						|
                'status': self.status,
 | 
						|
                'target_domain': self.current_target,
 | 
						|
                'current_depth': self.current_depth,
 | 
						|
                'max_depth': self.max_depth,
 | 
						|
                'current_indicator': self.current_indicator,
 | 
						|
                'total_indicators_found': self.total_indicators_found,
 | 
						|
                'indicators_processed': self.indicators_processed,
 | 
						|
                'progress_percentage': self._calculate_progress(),
 | 
						|
                'enabled_providers': [provider.get_name() for provider in self.providers],
 | 
						|
                'graph_statistics': self.graph.get_statistics()
 | 
						|
            }
 | 
						|
        except Exception as e:
 | 
						|
            print(f"ERROR: Exception in get_scan_status: {e}")
 | 
						|
            traceback.print_exc()
 | 
						|
            return {
 | 
						|
                'status': 'error',
 | 
						|
                'target_domain': None,
 | 
						|
                'current_depth': 0,
 | 
						|
                'max_depth': 0,
 | 
						|
                'current_indicator': '',
 | 
						|
                'total_indicators_found': 0,
 | 
						|
                'indicators_processed': 0,
 | 
						|
                'progress_percentage': 0.0,
 | 
						|
                'enabled_providers': [],
 | 
						|
                'graph_statistics': {}
 | 
						|
            }
 | 
						|
 | 
						|
    def _calculate_progress(self) -> float:
 | 
						|
        """Calculate scan progress percentage."""
 | 
						|
        if self.total_indicators_found == 0:
 | 
						|
            return 0.0
 | 
						|
        return min(100.0, (self.indicators_processed / self.total_indicators_found) * 100)
 | 
						|
 | 
						|
    def get_graph_data(self) -> Dict[str, Any]:
 | 
						|
        """Get current graph data for visualization."""
 | 
						|
        return self.graph.get_graph_data()
 | 
						|
 | 
						|
    def export_results(self) -> Dict[str, Any]:
 | 
						|
        """Export complete scan results with forensic audit trail."""
 | 
						|
        graph_data = self.graph.export_json()
 | 
						|
        audit_trail = self.logger.export_audit_trail()
 | 
						|
        provider_stats = {}
 | 
						|
        for provider in self.providers:
 | 
						|
            provider_stats[provider.get_name()] = provider.get_statistics()
 | 
						|
        
 | 
						|
        export_data = {
 | 
						|
            'scan_metadata': {
 | 
						|
                'target_domain': self.current_target,
 | 
						|
                'max_depth': self.max_depth,
 | 
						|
                'final_status': self.status,
 | 
						|
                'total_indicators_processed': self.indicators_processed,
 | 
						|
                'enabled_providers': list(provider_stats.keys()),
 | 
						|
                'session_id': self.session_id
 | 
						|
            },
 | 
						|
            'graph_data': graph_data,
 | 
						|
            'forensic_audit': audit_trail,
 | 
						|
            'provider_statistics': provider_stats,
 | 
						|
            'scan_summary': self.logger.get_forensic_summary()
 | 
						|
        }
 | 
						|
        return export_data
 | 
						|
 | 
						|
    def get_provider_statistics(self) -> Dict[str, Dict[str, Any]]:
 | 
						|
        """Get statistics for all providers with forensic information."""
 | 
						|
        stats = {}
 | 
						|
        for provider in self.providers:
 | 
						|
            stats[provider.get_name()] = provider.get_statistics()
 | 
						|
        return stats
 | 
						|
 | 
						|
    def get_provider_info(self) -> Dict[str, Dict[str, Any]]:
 | 
						|
        """Get information about all available providers."""
 | 
						|
        info = {}
 | 
						|
        provider_dir = os.path.join(os.path.dirname(__file__), '..', 'providers')
 | 
						|
        for filename in os.listdir(provider_dir):
 | 
						|
            if filename.endswith('_provider.py') and not filename.startswith('base'):
 | 
						|
                module_name = f"providers.{filename[:-3]}"
 | 
						|
                try:
 | 
						|
                    module = importlib.import_module(module_name)
 | 
						|
                    for attribute_name in dir(module):
 | 
						|
                        attribute = getattr(module, attribute_name)
 | 
						|
                        if isinstance(attribute, type) and issubclass(attribute, BaseProvider) and attribute is not BaseProvider:
 | 
						|
                            provider_class = attribute
 | 
						|
                            # Instantiate to get metadata, even if not fully configured
 | 
						|
                            temp_provider = provider_class(session_config=self.config)
 | 
						|
                            provider_name = temp_provider.get_name()
 | 
						|
                            
 | 
						|
                            # Find the actual provider instance if it exists, to get live stats
 | 
						|
                            live_provider = next((p for p in self.providers if p.get_name() == provider_name), None)
 | 
						|
                            
 | 
						|
                            info[provider_name] = {
 | 
						|
                                'display_name': temp_provider.get_display_name(),
 | 
						|
                                'requires_api_key': temp_provider.requires_api_key(),
 | 
						|
                                'statistics': live_provider.get_statistics() if live_provider else temp_provider.get_statistics(),
 | 
						|
                                'enabled': self.config.is_provider_enabled(provider_name),
 | 
						|
                                'rate_limit': self.config.get_rate_limit(provider_name),
 | 
						|
                            }
 | 
						|
                except Exception as e:
 | 
						|
                    print(f"✗ Failed to get info for provider from {filename}: {e}")
 | 
						|
                    traceback.print_exc()
 | 
						|
        return info |