successfully implemented scheduler
This commit is contained in:
@@ -65,6 +65,7 @@ class Scanner:
|
||||
self.indicators_processed = 0
|
||||
self.indicators_completed = 0
|
||||
self.tasks_re_enqueued = 0
|
||||
self.total_tasks_ever_enqueued = 0
|
||||
self.current_indicator = ""
|
||||
|
||||
# Concurrent processing configuration
|
||||
@@ -220,6 +221,7 @@ class Scanner:
|
||||
print(f"=== STARTING SCAN IN SCANNER {id(self)} ===")
|
||||
print(f"Session ID: {self.session_id}")
|
||||
print(f"Initial scanner status: {self.status}")
|
||||
self.total_tasks_ever_enqueued = 0
|
||||
|
||||
# **IMPROVED**: More aggressive cleanup of previous scan
|
||||
if self.scan_thread and self.scan_thread.is_alive():
|
||||
@@ -343,6 +345,7 @@ class Scanner:
|
||||
for provider in initial_providers:
|
||||
provider_name = provider.get_name()
|
||||
self.task_queue.put((self._get_priority(provider_name), (provider_name, target, 0)))
|
||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
||||
|
||||
try:
|
||||
self.status = ScanStatus.RUNNING
|
||||
@@ -406,6 +409,7 @@ class Scanner:
|
||||
print(f"Re-queueing task {task_tuple} (attempt {self.target_retries[task_tuple]})")
|
||||
self.task_queue.put((priority, (provider_name, target_item, depth)))
|
||||
self.tasks_re_enqueued += 1
|
||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
||||
else:
|
||||
print(f"ERROR: Max retries exceeded for task {task_tuple}")
|
||||
self.scan_failed_due_to_retries = True
|
||||
@@ -424,6 +428,7 @@ class Scanner:
|
||||
if (p_name_new, new_target) not in processed_tasks:
|
||||
new_depth = depth + 1 if new_target in new_targets else depth
|
||||
self.task_queue.put((self._get_priority(p_name_new), (p_name_new, new_target, new_depth)))
|
||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
||||
finally:
|
||||
with self.processing_lock:
|
||||
self.currently_processing.discard(target_item)
|
||||
@@ -582,11 +587,12 @@ class Scanner:
|
||||
'indicators_completed': self.indicators_completed,
|
||||
'tasks_re_enqueued': self.tasks_re_enqueued,
|
||||
'progress_percentage': self._calculate_progress(),
|
||||
'total_tasks_ever_enqueued': self.total_tasks_ever_enqueued,
|
||||
'enabled_providers': [provider.get_name() for provider in self.providers],
|
||||
'graph_statistics': self.graph.get_statistics(),
|
||||
'task_queue_size': self.task_queue.qsize(),
|
||||
'currently_processing_count': currently_processing_count, # **NEW**
|
||||
'currently_processing': currently_processing_list[:5] # **NEW**: Show first 5 for debugging
|
||||
'currently_processing_count': currently_processing_count,
|
||||
'currently_processing': currently_processing_list[:5]
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"ERROR: Exception in get_scan_status: {e}")
|
||||
@@ -871,10 +877,9 @@ class Scanner:
|
||||
|
||||
def _calculate_progress(self) -> float:
|
||||
"""Calculate scan progress percentage based on task completion."""
|
||||
total_tasks = self.indicators_completed + self.task_queue.qsize()
|
||||
if total_tasks == 0:
|
||||
if self.total_tasks_ever_enqueued == 0:
|
||||
return 0.0
|
||||
return min(100.0, (self.indicators_completed / total_tasks) * 100)
|
||||
return min(100.0, (self.indicators_completed / self.total_tasks_ever_enqueued) * 100)
|
||||
|
||||
def get_graph_data(self) -> Dict[str, Any]:
|
||||
"""Get current graph data for visualization."""
|
||||
|
||||
Reference in New Issue
Block a user