run correlation after stop
This commit is contained in:
parent
1b0c630667
commit
71a05f5b32
@ -628,8 +628,8 @@ class Scanner:
|
|||||||
else:
|
else:
|
||||||
self.status = ScanStatus.COMPLETED
|
self.status = ScanStatus.COMPLETED
|
||||||
|
|
||||||
# Always run correlation at the end of a scan, regardless of status
|
if self.status in [ScanStatus.COMPLETED, ScanStatus.STOPPED]:
|
||||||
print(f"\n=== Running Final Correlation Analysis ===")
|
print(f"\n=== PHASE 2: Running correlation analysis ===")
|
||||||
self._run_correlation_phase(max_depth, processed_tasks)
|
self._run_correlation_phase(max_depth, processed_tasks)
|
||||||
|
|
||||||
self.status_logger_stop_event.set()
|
self.status_logger_stop_event.set()
|
||||||
@ -657,60 +657,104 @@ class Scanner:
|
|||||||
print("No correlation provider found - skipping correlation phase")
|
print("No correlation provider found - skipping correlation phase")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Get all nodes from the graph for correlation analysis
|
||||||
all_nodes = list(self.graph.graph.nodes())
|
all_nodes = list(self.graph.graph.nodes())
|
||||||
|
correlation_tasks = []
|
||||||
|
|
||||||
print(f"Enqueueing correlation tasks for {len(all_nodes)} nodes")
|
print(f"Enqueueing correlation tasks for {len(all_nodes)} nodes")
|
||||||
|
|
||||||
for node_id in all_nodes:
|
for node_id in all_nodes:
|
||||||
task_tuple = ('correlation', node_id, 0)
|
# Determine appropriate depth for correlation (use 0 for simplicity)
|
||||||
|
correlation_depth = 0
|
||||||
|
task_tuple = ('correlation', node_id, correlation_depth)
|
||||||
|
|
||||||
|
# Don't re-process already processed correlation tasks
|
||||||
if task_tuple not in processed_tasks:
|
if task_tuple not in processed_tasks:
|
||||||
priority = self._get_priority('correlation')
|
priority = self._get_priority('correlation')
|
||||||
self.task_queue.put((time.time(), priority, task_tuple))
|
self.task_queue.put((time.time(), priority, ('correlation', node_id, correlation_depth)))
|
||||||
|
correlation_tasks.append(task_tuple)
|
||||||
|
self.total_tasks_ever_enqueued += 1
|
||||||
|
|
||||||
# Give the queue a moment to populate
|
print(f"Enqueued {len(correlation_tasks)} correlation tasks")
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
while not self.task_queue.empty():
|
# Process correlation tasks
|
||||||
|
consecutive_empty_iterations = 0
|
||||||
|
max_empty_iterations = 20 # Shorter timeout for correlation phase
|
||||||
|
|
||||||
|
while correlation_tasks:
|
||||||
|
queue_empty = self.task_queue.empty()
|
||||||
with self.processing_lock:
|
with self.processing_lock:
|
||||||
if len(self.currently_processing) >= self.max_workers:
|
no_active_processing = len(self.currently_processing) == 0
|
||||||
|
|
||||||
|
if queue_empty and no_active_processing:
|
||||||
|
consecutive_empty_iterations += 1
|
||||||
|
if consecutive_empty_iterations >= max_empty_iterations:
|
||||||
|
break
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
else:
|
||||||
|
consecutive_empty_iterations = 0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
_, priority, (provider_name, target_item, depth) = self.task_queue.get_nowait()
|
run_at, priority, (provider_name, target_item, depth) = self.task_queue.get(timeout=0.1)
|
||||||
except:
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
# Only process correlation tasks in this phase
|
||||||
if provider_name != 'correlation':
|
if provider_name != 'correlation':
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
except:
|
||||||
|
time.sleep(0.1)
|
||||||
|
continue
|
||||||
|
|
||||||
task_tuple = (provider_name, target_item, depth)
|
task_tuple = (provider_name, target_item, depth)
|
||||||
|
|
||||||
|
# Skip if already processed
|
||||||
if task_tuple in processed_tasks:
|
if task_tuple in processed_tasks:
|
||||||
|
self.tasks_skipped += 1
|
||||||
|
self.indicators_completed += 1
|
||||||
|
if task_tuple in correlation_tasks:
|
||||||
|
correlation_tasks.remove(task_tuple)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
with self.processing_lock:
|
with self.processing_lock:
|
||||||
processing_key = (provider_name, target_item)
|
processing_key = (provider_name, target_item)
|
||||||
if processing_key in self.currently_processing:
|
if processing_key in self.currently_processing:
|
||||||
|
self.tasks_skipped += 1
|
||||||
|
self.indicators_completed += 1
|
||||||
continue
|
continue
|
||||||
self.currently_processing.add(processing_key)
|
self.currently_processing.add(processing_key)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.current_indicator = target_item
|
self.current_indicator = target_item
|
||||||
_, _, success = self._process_provider_task(correlation_provider, target_item, depth)
|
self._update_session_state()
|
||||||
|
|
||||||
|
# Process correlation task
|
||||||
|
new_targets, _, success = self._process_provider_task(correlation_provider, target_item, depth)
|
||||||
|
|
||||||
if success:
|
if success:
|
||||||
processed_tasks.add(task_tuple)
|
processed_tasks.add(task_tuple)
|
||||||
|
self.indicators_completed += 1
|
||||||
|
if task_tuple in correlation_tasks:
|
||||||
|
correlation_tasks.remove(task_tuple)
|
||||||
|
else:
|
||||||
|
# For correlations, don't retry - just mark as completed
|
||||||
|
self.indicators_completed += 1
|
||||||
|
if task_tuple in correlation_tasks:
|
||||||
|
correlation_tasks.remove(task_tuple)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
with self.processing_lock:
|
with self.processing_lock:
|
||||||
|
processing_key = (provider_name, target_item)
|
||||||
self.currently_processing.discard(processing_key)
|
self.currently_processing.discard(processing_key)
|
||||||
|
|
||||||
print("Correlation phase complete.")
|
print(f"Correlation phase complete. Remaining tasks: {len(correlation_tasks)}")
|
||||||
|
|
||||||
def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
|
def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
|
||||||
"""
|
"""
|
||||||
Manages the entire process for a given target and provider.
|
Manages the entire process for a given target and provider.
|
||||||
This version is generalized to handle all relationships dynamically.
|
This version is generalized to handle all relationships dynamically.
|
||||||
"""
|
"""
|
||||||
# This check is still useful here to prevent starting new work if a stop is requested during correlation
|
if self._is_stop_requested() and not isinstance(provider, CorrelationProvider):
|
||||||
if self._is_stop_requested() and provider.get_name() != 'correlation':
|
|
||||||
return set(), set(), False
|
return set(), set(), False
|
||||||
|
|
||||||
is_ip = _is_valid_ip(target)
|
is_ip = _is_valid_ip(target)
|
||||||
@ -727,7 +771,8 @@ class Scanner:
|
|||||||
|
|
||||||
if provider_result is None:
|
if provider_result is None:
|
||||||
provider_successful = False
|
provider_successful = False
|
||||||
else:
|
# Allow correlation provider to process results even if scan is stopped
|
||||||
|
elif not self._is_stop_requested() or isinstance(provider, CorrelationProvider):
|
||||||
# Pass all relationships to be processed
|
# Pass all relationships to be processed
|
||||||
discovered, is_large_entity = self._process_provider_result_unified(
|
discovered, is_large_entity = self._process_provider_result_unified(
|
||||||
target, provider, provider_result, depth
|
target, provider, provider_result, depth
|
||||||
@ -747,8 +792,7 @@ class Scanner:
|
|||||||
provider_name = provider.get_name()
|
provider_name = provider.get_name()
|
||||||
start_time = datetime.now(timezone.utc)
|
start_time = datetime.now(timezone.utc)
|
||||||
|
|
||||||
# This check is still useful here to prevent starting new work if a stop is requested during correlation
|
if self._is_stop_requested() and not isinstance(provider, CorrelationProvider):
|
||||||
if self._is_stop_requested() and provider.get_name() != 'correlation':
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -757,6 +801,9 @@ class Scanner:
|
|||||||
else:
|
else:
|
||||||
result = provider.query_domain(target)
|
result = provider.query_domain(target)
|
||||||
|
|
||||||
|
if self._is_stop_requested() and not isinstance(provider, CorrelationProvider):
|
||||||
|
return None
|
||||||
|
|
||||||
relationship_count = result.get_relationship_count() if result else 0
|
relationship_count = result.get_relationship_count() if result else 0
|
||||||
self._update_provider_state(target, provider_name, 'success', relationship_count, None, start_time)
|
self._update_provider_state(target, provider_name, 'success', relationship_count, None, start_time)
|
||||||
|
|
||||||
@ -848,6 +895,10 @@ class Scanner:
|
|||||||
large_entity_id = ""
|
large_entity_id = ""
|
||||||
large_entity_members = set()
|
large_entity_members = set()
|
||||||
|
|
||||||
|
# Stop processing for non-correlation providers if requested
|
||||||
|
if self._is_stop_requested() and not isinstance(provider, CorrelationProvider):
|
||||||
|
return discovered_targets, False
|
||||||
|
|
||||||
eligible_rel_count = sum(
|
eligible_rel_count = sum(
|
||||||
1 for rel in provider_result.relationships if _is_valid_domain(rel.target_node) or _is_valid_ip(rel.target_node)
|
1 for rel in provider_result.relationships if _is_valid_domain(rel.target_node) or _is_valid_ip(rel.target_node)
|
||||||
)
|
)
|
||||||
@ -859,6 +910,10 @@ class Scanner:
|
|||||||
)
|
)
|
||||||
|
|
||||||
for i, relationship in enumerate(provider_result.relationships):
|
for i, relationship in enumerate(provider_result.relationships):
|
||||||
|
# Stop processing for non-correlation providers if requested
|
||||||
|
if i % 5 == 0 and self._is_stop_requested() and not isinstance(provider, CorrelationProvider):
|
||||||
|
break
|
||||||
|
|
||||||
source_node_id = relationship.source_node
|
source_node_id = relationship.source_node
|
||||||
target_node_id = relationship.target_node
|
target_node_id = relationship.target_node
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user