fix?
This commit is contained in:
		
							parent
							
								
									c0b820c96c
								
							
						
					
					
						commit
						d36fb7d814
					
				
							
								
								
									
										116
									
								
								core/scanner.py
									
									
									
									
									
								
							
							
						
						
									
										116
									
								
								core/scanner.py
									
									
									
									
									
								
							@ -5,7 +5,7 @@ import traceback
 | 
				
			|||||||
import time
 | 
					import time
 | 
				
			||||||
from typing import List, Set, Dict, Any, Tuple
 | 
					from typing import List, Set, Dict, Any, Tuple
 | 
				
			||||||
from concurrent.futures import ThreadPoolExecutor, as_completed, CancelledError, Future
 | 
					from concurrent.futures import ThreadPoolExecutor, as_completed, CancelledError, Future
 | 
				
			||||||
from collections import defaultdict
 | 
					from collections import defaultdict, deque
 | 
				
			||||||
from datetime import datetime, timezone
 | 
					from datetime import datetime, timezone
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from core.graph_manager import GraphManager, NodeType, RelationshipType
 | 
					from core.graph_manager import GraphManager, NodeType, RelationshipType
 | 
				
			||||||
@ -270,14 +270,16 @@ class Scanner:
 | 
				
			|||||||
            return False
 | 
					            return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _execute_scan(self, target_domain: str, max_depth: int) -> None:
 | 
					    def _execute_scan(self, target_domain: str, max_depth: int) -> None:
 | 
				
			||||||
        """Execute the reconnaissance scan with frequent state updates for GUI."""
 | 
					        """Execute the reconnaissance scan using a task queue-based approach."""
 | 
				
			||||||
        print(f"_execute_scan started for {target_domain} with depth {max_depth}")
 | 
					        print(f"_execute_scan started for {target_domain} with depth {max_depth}")
 | 
				
			||||||
        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
 | 
					        self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
 | 
				
			||||||
        processed_targets = set()
 | 
					        processed_targets = set()
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        # Initialize the task queue with the starting target and its depth
 | 
				
			||||||
 | 
					        task_queue = deque([(target_domain, 0)])
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            self.status = ScanStatus.RUNNING
 | 
					            self.status = ScanStatus.RUNNING
 | 
				
			||||||
            # Immediate status update for GUI
 | 
					 | 
				
			||||||
            self._update_session_state()
 | 
					            self._update_session_state()
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            enabled_providers = [provider.get_name() for provider in self.providers]
 | 
					            enabled_providers = [provider.get_name() for provider in self.providers]
 | 
				
			||||||
@ -285,60 +287,44 @@ class Scanner:
 | 
				
			|||||||
            self.graph.add_node(target_domain, NodeType.DOMAIN)
 | 
					            self.graph.add_node(target_domain, NodeType.DOMAIN)
 | 
				
			||||||
            self._initialize_provider_states(target_domain)
 | 
					            self._initialize_provider_states(target_domain)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            current_level_targets = {target_domain}
 | 
					            while task_queue:
 | 
				
			||||||
            all_discovered_targets = {target_domain}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            for depth in range(max_depth + 1):
 | 
					 | 
				
			||||||
                if self._is_stop_requested():
 | 
					                if self._is_stop_requested():
 | 
				
			||||||
                    print(f"Stop requested at depth {depth}")
 | 
					                    print("Stop requested, terminating scan.")
 | 
				
			||||||
                    break
 | 
					                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                target, depth = task_queue.popleft()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if target in processed_targets:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if depth > max_depth:
 | 
				
			||||||
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                self.current_depth = depth
 | 
					                self.current_depth = depth
 | 
				
			||||||
                # Update current depth immediately for GUI
 | 
					                self.current_indicator = target
 | 
				
			||||||
                self._update_session_state()
 | 
					                self._update_session_state()
 | 
				
			||||||
                
 | 
					                
 | 
				
			||||||
                targets_to_process = current_level_targets - processed_targets
 | 
					                # Process the current target
 | 
				
			||||||
                if not targets_to_process:
 | 
					                new_targets = self._query_providers_for_target(target, depth)
 | 
				
			||||||
                    print("No new targets to process at this level.")
 | 
					                processed_targets.add(target)
 | 
				
			||||||
                    break
 | 
					                
 | 
				
			||||||
 | 
					                # Add new, unprocessed targets to the queue
 | 
				
			||||||
                print(f"Processing depth level {depth} with {len(targets_to_process)} new targets")
 | 
					                for new_target in new_targets:
 | 
				
			||||||
                self.total_indicators_found += len(targets_to_process)
 | 
					                    if new_target not in processed_targets:
 | 
				
			||||||
                # Update total indicators for GUI
 | 
					                        task_queue.append((new_target, depth + 1))
 | 
				
			||||||
                self._update_session_state()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                target_results = self._process_targets_sequential_with_stop_checks(
 | 
					 | 
				
			||||||
                    targets_to_process, processed_targets, all_discovered_targets, depth
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                processed_targets.update(targets_to_process)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                next_level_targets = set()
 | 
					 | 
				
			||||||
                for _target, new_targets in target_results:
 | 
					 | 
				
			||||||
                    all_discovered_targets.update(new_targets)
 | 
					 | 
				
			||||||
                    next_level_targets.update(new_targets)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                current_level_targets = next_level_targets - processed_targets
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # Additional stop check at end of each depth level
 | 
					 | 
				
			||||||
                if self._is_stop_requested():
 | 
					 | 
				
			||||||
                    print(f"Stop requested after processing depth {depth}")
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        except Exception as e:
 | 
					        except Exception as e:
 | 
				
			||||||
            print(f"ERROR: Scan execution failed with error: {e}")
 | 
					            print(f"ERROR: Scan execution failed with error: {e}")
 | 
				
			||||||
            traceback.print_exc()
 | 
					            traceback.print_exc()
 | 
				
			||||||
            self.status = ScanStatus.FAILED
 | 
					            self.status = ScanStatus.FAILED
 | 
				
			||||||
            self._update_session_state()  # Update failed status immediately
 | 
					 | 
				
			||||||
            self.logger.logger.error(f"Scan failed: {e}")
 | 
					            self.logger.logger.error(f"Scan failed: {e}")
 | 
				
			||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
            if self._is_stop_requested():
 | 
					            if self._is_stop_requested():
 | 
				
			||||||
                self.status = ScanStatus.STOPPED
 | 
					                self.status = ScanStatus.STOPPED
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                self.status = ScanStatus.COMPLETED
 | 
					                self.status = ScanStatus.COMPLETED
 | 
				
			||||||
            
 | 
					                
 | 
				
			||||||
            # Final status update for GUI
 | 
					 | 
				
			||||||
            self._update_session_state()
 | 
					            self._update_session_state()
 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            self.logger.log_scan_complete()
 | 
					            self.logger.log_scan_complete()
 | 
				
			||||||
            if self.executor:
 | 
					            if self.executor:
 | 
				
			||||||
                self.executor.shutdown(wait=False, cancel_futures=True)
 | 
					                self.executor.shutdown(wait=False, cancel_futures=True)
 | 
				
			||||||
@ -348,6 +334,56 @@ class Scanner:
 | 
				
			|||||||
            print(f"  - Total edges: {stats['basic_metrics']['total_edges']}")
 | 
					            print(f"  - Total edges: {stats['basic_metrics']['total_edges']}")
 | 
				
			||||||
            print(f"  - Targets processed: {len(processed_targets)}")
 | 
					            print(f"  - Targets processed: {len(processed_targets)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def _query_providers_for_target(self, target: str, depth: int) -> Set[str]:
 | 
				
			||||||
 | 
					        """Helper method to query providers for a single target."""
 | 
				
			||||||
 | 
					        is_ip = _is_valid_ip(target)
 | 
				
			||||||
 | 
					        target_type = NodeType.IP if is_ip else NodeType.DOMAIN
 | 
				
			||||||
 | 
					        print(f"Querying providers for {target_type.value}: {target} at depth {depth}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Early stop check
 | 
				
			||||||
 | 
					        if self._is_stop_requested():
 | 
				
			||||||
 | 
					            print(f"Stop requested before querying providers for {target}")
 | 
				
			||||||
 | 
					            return set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Initialize node and provider states
 | 
				
			||||||
 | 
					        self.graph.add_node(target, target_type)
 | 
				
			||||||
 | 
					        self._initialize_provider_states(target)
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        new_targets = set()
 | 
				
			||||||
 | 
					        target_metadata = defaultdict(lambda: defaultdict(list))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Determine eligible providers for this target
 | 
				
			||||||
 | 
					        eligible_providers = self._get_eligible_providers(target, is_ip)
 | 
				
			||||||
 | 
					        
 | 
				
			||||||
 | 
					        if not eligible_providers:
 | 
				
			||||||
 | 
					            self._log_no_eligible_providers(target, is_ip)
 | 
				
			||||||
 | 
					            return new_targets
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Query each eligible provider sequentially with stop checks
 | 
				
			||||||
 | 
					        for provider in eligible_providers:
 | 
				
			||||||
 | 
					            if self._is_stop_requested():
 | 
				
			||||||
 | 
					                print(f"Stop requested while querying providers for {target}")
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                provider_results = self._query_single_provider_forensic(provider, target, is_ip, depth)
 | 
				
			||||||
 | 
					                if provider_results and not self._is_stop_requested():
 | 
				
			||||||
 | 
					                    discovered_targets = self._process_provider_results_forensic(
 | 
				
			||||||
 | 
					                        target, provider, provider_results, target_metadata, depth
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    new_targets.update(discovered_targets)
 | 
				
			||||||
 | 
					            except Exception as e:
 | 
				
			||||||
 | 
					                self._log_provider_error(target, provider.get_name(), str(e))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Update node metadata
 | 
				
			||||||
 | 
					        for node_id, metadata_dict in target_metadata.items():
 | 
				
			||||||
 | 
					            if self.graph.graph.has_node(node_id):
 | 
				
			||||||
 | 
					                node_is_ip = _is_valid_ip(node_id)
 | 
				
			||||||
 | 
					                node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN
 | 
				
			||||||
 | 
					                self.graph.add_node(node_id, node_type_to_add, metadata=metadata_dict)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return new_targets
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _update_session_state(self) -> None:
 | 
					    def _update_session_state(self) -> None:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        Update the scanner state in Redis for GUI updates.
 | 
					        Update the scanner state in Redis for GUI updates.
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user