fic run correlation after stop request
This commit is contained in:
		
							parent
							
								
									bcd79ae2f5
								
							
						
					
					
						commit
						1b0c630667
					
				
							
								
								
									
										110
									
								
								core/scanner.py
									
									
									
									
									
								
							
							
						
						
									
										110
									
								
								core/scanner.py
									
									
									
									
									
								
							@ -627,10 +627,10 @@ class Scanner:
 | 
				
			|||||||
                self.status = ScanStatus.FAILED
 | 
					                self.status = ScanStatus.FAILED
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                self.status = ScanStatus.COMPLETED
 | 
					                self.status = ScanStatus.COMPLETED
 | 
				
			||||||
 | 
					            
 | 
				
			||||||
            if self.status in [ScanStatus.COMPLETED, ScanStatus.STOPPED]:
 | 
					            # Always run correlation at the end of a scan, regardless of status
 | 
				
			||||||
                print(f"\n=== PHASE 2: Running correlation analysis ===")
 | 
					            print(f"\n=== Running Final Correlation Analysis ===")
 | 
				
			||||||
                self._run_correlation_phase(max_depth, processed_tasks)
 | 
					            self._run_correlation_phase(max_depth, processed_tasks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            self.status_logger_stop_event.set()
 | 
					            self.status_logger_stop_event.set()
 | 
				
			||||||
            if self.status_logger_thread and self.status_logger_thread.is_alive():
 | 
					            if self.status_logger_thread and self.status_logger_thread.is_alive():
 | 
				
			||||||
@ -657,112 +657,60 @@ class Scanner:
 | 
				
			|||||||
            print("No correlation provider found - skipping correlation phase")
 | 
					            print("No correlation provider found - skipping correlation phase")
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get all nodes from the graph for correlation analysis
 | 
					 | 
				
			||||||
        all_nodes = list(self.graph.graph.nodes())
 | 
					        all_nodes = list(self.graph.graph.nodes())
 | 
				
			||||||
        correlation_tasks = []
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        print(f"Enqueueing correlation tasks for {len(all_nodes)} nodes")
 | 
					        print(f"Enqueueing correlation tasks for {len(all_nodes)} nodes")
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        for node_id in all_nodes:
 | 
					        for node_id in all_nodes:
 | 
				
			||||||
            if self._is_stop_requested():
 | 
					            task_tuple = ('correlation', node_id, 0)
 | 
				
			||||||
                break
 | 
					 | 
				
			||||||
                
 | 
					 | 
				
			||||||
            # Determine appropriate depth for correlation (use 0 for simplicity)
 | 
					 | 
				
			||||||
            correlation_depth = 0
 | 
					 | 
				
			||||||
            task_tuple = ('correlation', node_id, correlation_depth)
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            # Don't re-process already processed correlation tasks
 | 
					 | 
				
			||||||
            if task_tuple not in processed_tasks:
 | 
					            if task_tuple not in processed_tasks:
 | 
				
			||||||
                priority = self._get_priority('correlation')
 | 
					                priority = self._get_priority('correlation')
 | 
				
			||||||
                self.task_queue.put((time.time(), priority, ('correlation', node_id, correlation_depth)))
 | 
					                self.task_queue.put((time.time(), priority, task_tuple))
 | 
				
			||||||
                correlation_tasks.append(task_tuple)
 | 
					
 | 
				
			||||||
                self.total_tasks_ever_enqueued += 1
 | 
					        # Give the queue a moment to populate
 | 
				
			||||||
        
 | 
					        time.sleep(0.1)
 | 
				
			||||||
        print(f"Enqueued {len(correlation_tasks)} correlation tasks")
 | 
					
 | 
				
			||||||
        
 | 
					        while not self.task_queue.empty():
 | 
				
			||||||
        # Process correlation tasks
 | 
					 | 
				
			||||||
        consecutive_empty_iterations = 0
 | 
					 | 
				
			||||||
        max_empty_iterations = 20  # Shorter timeout for correlation phase
 | 
					 | 
				
			||||||
        
 | 
					 | 
				
			||||||
        while not self._is_stop_requested() and correlation_tasks:
 | 
					 | 
				
			||||||
            queue_empty = self.task_queue.empty()
 | 
					 | 
				
			||||||
            with self.processing_lock:
 | 
					            with self.processing_lock:
 | 
				
			||||||
                no_active_processing = len(self.currently_processing) == 0
 | 
					                if len(self.currently_processing) >= self.max_workers:
 | 
				
			||||||
            
 | 
					                    time.sleep(0.1)
 | 
				
			||||||
            if queue_empty and no_active_processing:
 | 
					                    continue
 | 
				
			||||||
                consecutive_empty_iterations += 1
 | 
					 | 
				
			||||||
                if consecutive_empty_iterations >= max_empty_iterations:
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
                time.sleep(0.1)
 | 
					 | 
				
			||||||
                continue
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                consecutive_empty_iterations = 0
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                run_at, priority, (provider_name, target_item, depth) = self.task_queue.get(timeout=0.1)
 | 
					                _, priority, (provider_name, target_item, depth) = self.task_queue.get_nowait()
 | 
				
			||||||
                
 | 
					 | 
				
			||||||
                # Only process correlation tasks in this phase
 | 
					 | 
				
			||||||
                if provider_name != 'correlation':
 | 
					 | 
				
			||||||
                    continue
 | 
					 | 
				
			||||||
                    
 | 
					 | 
				
			||||||
            except:
 | 
					            except:
 | 
				
			||||||
                time.sleep(0.1)
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if provider_name != 'correlation':
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            task_tuple = (provider_name, target_item, depth)
 | 
					            task_tuple = (provider_name, target_item, depth)
 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            # Skip if already processed
 | 
					 | 
				
			||||||
            if task_tuple in processed_tasks:
 | 
					            if task_tuple in processed_tasks:
 | 
				
			||||||
                self.tasks_skipped += 1
 | 
					 | 
				
			||||||
                self.indicators_completed += 1
 | 
					 | 
				
			||||||
                if task_tuple in correlation_tasks:
 | 
					 | 
				
			||||||
                    correlation_tasks.remove(task_tuple)
 | 
					 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            with self.processing_lock:
 | 
					            with self.processing_lock:
 | 
				
			||||||
                if self._is_stop_requested():
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
                processing_key = (provider_name, target_item)
 | 
					                processing_key = (provider_name, target_item)
 | 
				
			||||||
                if processing_key in self.currently_processing:
 | 
					                if processing_key in self.currently_processing:
 | 
				
			||||||
                    self.tasks_skipped += 1
 | 
					 | 
				
			||||||
                    self.indicators_completed += 1
 | 
					 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                self.currently_processing.add(processing_key)
 | 
					                self.currently_processing.add(processing_key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                self.current_indicator = target_item
 | 
					                self.current_indicator = target_item
 | 
				
			||||||
                self._update_session_state()
 | 
					                _, _, success = self._process_provider_task(correlation_provider, target_item, depth)
 | 
				
			||||||
                
 | 
					 | 
				
			||||||
                if self._is_stop_requested():
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
                
 | 
					 | 
				
			||||||
                # Process correlation task
 | 
					 | 
				
			||||||
                new_targets, _, success = self._process_provider_task(correlation_provider, target_item, depth)
 | 
					 | 
				
			||||||
                
 | 
					 | 
				
			||||||
                if success:
 | 
					                if success:
 | 
				
			||||||
                    processed_tasks.add(task_tuple)
 | 
					                    processed_tasks.add(task_tuple)
 | 
				
			||||||
                    self.indicators_completed += 1
 | 
					 | 
				
			||||||
                    if task_tuple in correlation_tasks:
 | 
					 | 
				
			||||||
                        correlation_tasks.remove(task_tuple)
 | 
					 | 
				
			||||||
                else:
 | 
					 | 
				
			||||||
                    # For correlations, don't retry - just mark as completed
 | 
					 | 
				
			||||||
                    self.indicators_completed += 1
 | 
					 | 
				
			||||||
                    if task_tuple in correlation_tasks:
 | 
					 | 
				
			||||||
                        correlation_tasks.remove(task_tuple)
 | 
					 | 
				
			||||||
                    
 | 
					 | 
				
			||||||
            finally:
 | 
					            finally:
 | 
				
			||||||
                with self.processing_lock:
 | 
					                with self.processing_lock:
 | 
				
			||||||
                    processing_key = (provider_name, target_item)
 | 
					 | 
				
			||||||
                    self.currently_processing.discard(processing_key)
 | 
					                    self.currently_processing.discard(processing_key)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        print(f"Correlation phase complete. Remaining tasks: {len(correlation_tasks)}")
 | 
					        print("Correlation phase complete.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
 | 
					    def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        Manages the entire process for a given target and provider.
 | 
					        Manages the entire process for a given target and provider.
 | 
				
			||||||
        This version is generalized to handle all relationships dynamically.
 | 
					        This version is generalized to handle all relationships dynamically.
 | 
				
			||||||
        """
 | 
					        """
 | 
				
			||||||
        if self._is_stop_requested():
 | 
					        # This check is still useful here to prevent starting new work if a stop is requested during correlation
 | 
				
			||||||
 | 
					        if self._is_stop_requested() and provider.get_name() != 'correlation':
 | 
				
			||||||
            return set(), set(), False
 | 
					            return set(), set(), False
 | 
				
			||||||
    
 | 
					    
 | 
				
			||||||
        is_ip = _is_valid_ip(target)
 | 
					        is_ip = _is_valid_ip(target)
 | 
				
			||||||
@ -779,7 +727,7 @@ class Scanner:
 | 
				
			|||||||
    
 | 
					    
 | 
				
			||||||
            if provider_result is None:
 | 
					            if provider_result is None:
 | 
				
			||||||
                provider_successful = False
 | 
					                provider_successful = False
 | 
				
			||||||
            elif not self._is_stop_requested():
 | 
					            else:
 | 
				
			||||||
                # Pass all relationships to be processed
 | 
					                # Pass all relationships to be processed
 | 
				
			||||||
                discovered, is_large_entity = self._process_provider_result_unified(
 | 
					                discovered, is_large_entity = self._process_provider_result_unified(
 | 
				
			||||||
                    target, provider, provider_result, depth
 | 
					                    target, provider, provider_result, depth
 | 
				
			||||||
@ -799,7 +747,8 @@ class Scanner:
 | 
				
			|||||||
        provider_name = provider.get_name()
 | 
					        provider_name = provider.get_name()
 | 
				
			||||||
        start_time = datetime.now(timezone.utc)
 | 
					        start_time = datetime.now(timezone.utc)
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        if self._is_stop_requested():
 | 
					        # This check is still useful here to prevent starting new work if a stop is requested during correlation
 | 
				
			||||||
 | 
					        if self._is_stop_requested() and provider.get_name() != 'correlation':
 | 
				
			||||||
            return None
 | 
					            return None
 | 
				
			||||||
        
 | 
					        
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
@ -808,9 +757,6 @@ class Scanner:
 | 
				
			|||||||
            else:
 | 
					            else:
 | 
				
			||||||
                result = provider.query_domain(target)
 | 
					                result = provider.query_domain(target)
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
            if self._is_stop_requested():
 | 
					 | 
				
			||||||
                return None
 | 
					 | 
				
			||||||
            
 | 
					 | 
				
			||||||
            relationship_count = result.get_relationship_count() if result else 0
 | 
					            relationship_count = result.get_relationship_count() if result else 0
 | 
				
			||||||
            self._update_provider_state(target, provider_name, 'success', relationship_count, None, start_time)
 | 
					            self._update_provider_state(target, provider_name, 'success', relationship_count, None, start_time)
 | 
				
			||||||
            
 | 
					            
 | 
				
			||||||
@ -902,9 +848,6 @@ class Scanner:
 | 
				
			|||||||
        large_entity_id = ""
 | 
					        large_entity_id = ""
 | 
				
			||||||
        large_entity_members = set()
 | 
					        large_entity_members = set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if self._is_stop_requested():
 | 
					 | 
				
			||||||
            return discovered_targets, False
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        eligible_rel_count = sum(
 | 
					        eligible_rel_count = sum(
 | 
				
			||||||
            1 for rel in provider_result.relationships if _is_valid_domain(rel.target_node) or _is_valid_ip(rel.target_node)
 | 
					            1 for rel in provider_result.relationships if _is_valid_domain(rel.target_node) or _is_valid_ip(rel.target_node)
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
@ -916,9 +859,6 @@ class Scanner:
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for i, relationship in enumerate(provider_result.relationships):
 | 
					        for i, relationship in enumerate(provider_result.relationships):
 | 
				
			||||||
            if i % 5 == 0 and self._is_stop_requested():
 | 
					 | 
				
			||||||
                break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            source_node_id = relationship.source_node
 | 
					            source_node_id = relationship.source_node
 | 
				
			||||||
            target_node_id = relationship.target_node
 | 
					            target_node_id = relationship.target_node
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user