From 71a05f5b32e336c2c6c3592a95a480dffefad687 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Sat, 20 Sep 2025 18:48:47 +0200 Subject: [PATCH] run correlation after stop --- core/scanner.py | 105 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 80 insertions(+), 25 deletions(-) diff --git a/core/scanner.py b/core/scanner.py index b8c7e8c..ea4abf3 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -627,10 +627,10 @@ class Scanner: self.status = ScanStatus.FAILED else: self.status = ScanStatus.COMPLETED - - # Always run correlation at the end of a scan, regardless of status - print(f"\n=== Running Final Correlation Analysis ===") - self._run_correlation_phase(max_depth, processed_tasks) + + if self.status in [ScanStatus.COMPLETED, ScanStatus.STOPPED]: + print(f"\n=== PHASE 2: Running correlation analysis ===") + self._run_correlation_phase(max_depth, processed_tasks) self.status_logger_stop_event.set() if self.status_logger_thread and self.status_logger_thread.is_alive(): @@ -657,60 +657,104 @@ class Scanner: print("No correlation provider found - skipping correlation phase") return + # Get all nodes from the graph for correlation analysis all_nodes = list(self.graph.graph.nodes()) + correlation_tasks = [] + print(f"Enqueueing correlation tasks for {len(all_nodes)} nodes") for node_id in all_nodes: - task_tuple = ('correlation', node_id, 0) + # Determine appropriate depth for correlation (use 0 for simplicity) + correlation_depth = 0 + task_tuple = ('correlation', node_id, correlation_depth) + + # Don't re-process already processed correlation tasks if task_tuple not in processed_tasks: priority = self._get_priority('correlation') - self.task_queue.put((time.time(), priority, task_tuple)) - - # Give the queue a moment to populate - time.sleep(0.1) - - while not self.task_queue.empty(): + self.task_queue.put((time.time(), priority, ('correlation', node_id, correlation_depth))) + correlation_tasks.append(task_tuple) + self.total_tasks_ever_enqueued += 1 + + print(f"Enqueued {len(correlation_tasks)} correlation tasks") + + # Process correlation tasks + consecutive_empty_iterations = 0 + max_empty_iterations = 20 # Shorter timeout for correlation phase + + while correlation_tasks: + queue_empty = self.task_queue.empty() with self.processing_lock: - if len(self.currently_processing) >= self.max_workers: - time.sleep(0.1) - continue + no_active_processing = len(self.currently_processing) == 0 + + if queue_empty and no_active_processing: + consecutive_empty_iterations += 1 + if consecutive_empty_iterations >= max_empty_iterations: + break + time.sleep(0.1) + continue + else: + consecutive_empty_iterations = 0 try: - _, priority, (provider_name, target_item, depth) = self.task_queue.get_nowait() + run_at, priority, (provider_name, target_item, depth) = self.task_queue.get(timeout=0.1) + + # Only process correlation tasks in this phase + if provider_name != 'correlation': + continue + except: - continue - - if provider_name != 'correlation': + time.sleep(0.1) continue task_tuple = (provider_name, target_item, depth) + + # Skip if already processed if task_tuple in processed_tasks: + self.tasks_skipped += 1 + self.indicators_completed += 1 + if task_tuple in correlation_tasks: + correlation_tasks.remove(task_tuple) continue with self.processing_lock: processing_key = (provider_name, target_item) if processing_key in self.currently_processing: + self.tasks_skipped += 1 + self.indicators_completed += 1 continue self.currently_processing.add(processing_key) try: self.current_indicator = target_item - _, _, success = self._process_provider_task(correlation_provider, target_item, depth) + self._update_session_state() + + # Process correlation task + new_targets, _, success = self._process_provider_task(correlation_provider, target_item, depth) + if success: processed_tasks.add(task_tuple) + self.indicators_completed += 1 + if task_tuple in correlation_tasks: + correlation_tasks.remove(task_tuple) + else: + # For correlations, don't retry - just mark as completed + self.indicators_completed += 1 + if task_tuple in correlation_tasks: + correlation_tasks.remove(task_tuple) + finally: with self.processing_lock: + processing_key = (provider_name, target_item) self.currently_processing.discard(processing_key) - print("Correlation phase complete.") + print(f"Correlation phase complete. Remaining tasks: {len(correlation_tasks)}") def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]: """ Manages the entire process for a given target and provider. This version is generalized to handle all relationships dynamically. """ - # This check is still useful here to prevent starting new work if a stop is requested during correlation - if self._is_stop_requested() and provider.get_name() != 'correlation': + if self._is_stop_requested() and not isinstance(provider, CorrelationProvider): return set(), set(), False is_ip = _is_valid_ip(target) @@ -727,7 +771,8 @@ class Scanner: if provider_result is None: provider_successful = False - else: + # Allow correlation provider to process results even if scan is stopped + elif not self._is_stop_requested() or isinstance(provider, CorrelationProvider): # Pass all relationships to be processed discovered, is_large_entity = self._process_provider_result_unified( target, provider, provider_result, depth @@ -747,8 +792,7 @@ class Scanner: provider_name = provider.get_name() start_time = datetime.now(timezone.utc) - # This check is still useful here to prevent starting new work if a stop is requested during correlation - if self._is_stop_requested() and provider.get_name() != 'correlation': + if self._is_stop_requested() and not isinstance(provider, CorrelationProvider): return None try: @@ -757,6 +801,9 @@ class Scanner: else: result = provider.query_domain(target) + if self._is_stop_requested() and not isinstance(provider, CorrelationProvider): + return None + relationship_count = result.get_relationship_count() if result else 0 self._update_provider_state(target, provider_name, 'success', relationship_count, None, start_time) @@ -848,6 +895,10 @@ class Scanner: large_entity_id = "" large_entity_members = set() + # Stop processing for non-correlation providers if requested + if self._is_stop_requested() and not isinstance(provider, CorrelationProvider): + return discovered_targets, False + eligible_rel_count = sum( 1 for rel in provider_result.relationships if _is_valid_domain(rel.target_node) or _is_valid_ip(rel.target_node) ) @@ -859,6 +910,10 @@ class Scanner: ) for i, relationship in enumerate(provider_result.relationships): + # Stop processing for non-correlation providers if requested + if i % 5 == 0 and self._is_stop_requested() and not isinstance(provider, CorrelationProvider): + break + source_node_id = relationship.source_node target_node_id = relationship.target_node