diff --git a/core/scanner.py b/core/scanner.py index 4b6d3b8..7a0268d 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -5,7 +5,7 @@ import traceback import time from typing import List, Set, Dict, Any, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed, CancelledError, Future -from collections import defaultdict +from collections import defaultdict, deque from datetime import datetime, timezone from core.graph_manager import GraphManager, NodeType, RelationshipType @@ -270,14 +270,16 @@ class Scanner: return False def _execute_scan(self, target_domain: str, max_depth: int) -> None: - """Execute the reconnaissance scan with frequent state updates for GUI.""" + """Execute the reconnaissance scan using a task queue-based approach.""" print(f"_execute_scan started for {target_domain} with depth {max_depth}") self.executor = ThreadPoolExecutor(max_workers=self.max_workers) processed_targets = set() - + + # Initialize the task queue with the starting target and its depth + task_queue = deque([(target_domain, 0)]) + try: self.status = ScanStatus.RUNNING - # Immediate status update for GUI self._update_session_state() enabled_providers = [provider.get_name() for provider in self.providers] @@ -285,60 +287,44 @@ class Scanner: self.graph.add_node(target_domain, NodeType.DOMAIN) self._initialize_provider_states(target_domain) - current_level_targets = {target_domain} - all_discovered_targets = {target_domain} - - for depth in range(max_depth + 1): + while task_queue: if self._is_stop_requested(): - print(f"Stop requested at depth {depth}") + print("Stop requested, terminating scan.") break + target, depth = task_queue.popleft() + + if target in processed_targets: + continue + + if depth > max_depth: + continue + self.current_depth = depth - # Update current depth immediately for GUI + self.current_indicator = target self._update_session_state() - targets_to_process = current_level_targets - processed_targets - if not targets_to_process: - print("No new targets to process at this level.") - break - - print(f"Processing depth level {depth} with {len(targets_to_process)} new targets") - self.total_indicators_found += len(targets_to_process) - # Update total indicators for GUI - self._update_session_state() - - target_results = self._process_targets_sequential_with_stop_checks( - targets_to_process, processed_targets, all_discovered_targets, depth - ) - processed_targets.update(targets_to_process) - - next_level_targets = set() - for _target, new_targets in target_results: - all_discovered_targets.update(new_targets) - next_level_targets.update(new_targets) - - current_level_targets = next_level_targets - processed_targets - - # Additional stop check at end of each depth level - if self._is_stop_requested(): - print(f"Stop requested after processing depth {depth}") - break + # Process the current target + new_targets = self._query_providers_for_target(target, depth) + processed_targets.add(target) + + # Add new, unprocessed targets to the queue + for new_target in new_targets: + if new_target not in processed_targets: + task_queue.append((new_target, depth + 1)) except Exception as e: print(f"ERROR: Scan execution failed with error: {e}") traceback.print_exc() self.status = ScanStatus.FAILED - self._update_session_state() # Update failed status immediately self.logger.logger.error(f"Scan failed: {e}") finally: if self._is_stop_requested(): self.status = ScanStatus.STOPPED else: self.status = ScanStatus.COMPLETED - - # Final status update for GUI + self._update_session_state() - self.logger.log_scan_complete() if self.executor: self.executor.shutdown(wait=False, cancel_futures=True) @@ -348,6 +334,56 @@ class Scanner: print(f" - Total edges: {stats['basic_metrics']['total_edges']}") print(f" - Targets processed: {len(processed_targets)}") + def _query_providers_for_target(self, target: str, depth: int) -> Set[str]: + """Helper method to query providers for a single target.""" + is_ip = _is_valid_ip(target) + target_type = NodeType.IP if is_ip else NodeType.DOMAIN + print(f"Querying providers for {target_type.value}: {target} at depth {depth}") + + # Early stop check + if self._is_stop_requested(): + print(f"Stop requested before querying providers for {target}") + return set() + + # Initialize node and provider states + self.graph.add_node(target, target_type) + self._initialize_provider_states(target) + + new_targets = set() + target_metadata = defaultdict(lambda: defaultdict(list)) + + # Determine eligible providers for this target + eligible_providers = self._get_eligible_providers(target, is_ip) + + if not eligible_providers: + self._log_no_eligible_providers(target, is_ip) + return new_targets + + # Query each eligible provider sequentially with stop checks + for provider in eligible_providers: + if self._is_stop_requested(): + print(f"Stop requested while querying providers for {target}") + break + + try: + provider_results = self._query_single_provider_forensic(provider, target, is_ip, depth) + if provider_results and not self._is_stop_requested(): + discovered_targets = self._process_provider_results_forensic( + target, provider, provider_results, target_metadata, depth + ) + new_targets.update(discovered_targets) + except Exception as e: + self._log_provider_error(target, provider.get_name(), str(e)) + + # Update node metadata + for node_id, metadata_dict in target_metadata.items(): + if self.graph.graph.has_node(node_id): + node_is_ip = _is_valid_ip(node_id) + node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN + self.graph.add_node(node_id, node_type_to_add, metadata=metadata_dict) + + return new_targets + def _update_session_state(self) -> None: """ Update the scanner state in Redis for GUI updates.