great progress
This commit is contained in:
parent
926f9e1096
commit
39ce0e9d11
@ -203,8 +203,6 @@ class ForensicLogger:
|
||||
self.session_metadata['target_domains'] = list(self.session_metadata['target_domains'])
|
||||
|
||||
self.logger.info(f"Scan Complete - Session: {self.session_id}")
|
||||
self.logger.info(f"Total API Requests: {self.session_metadata['total_requests']}")
|
||||
self.logger.info(f"Total Relationships: {self.session_metadata['total_relationships']}")
|
||||
|
||||
def export_audit_trail(self) -> Dict[str, Any]:
|
||||
"""
|
||||
|
359
core/scanner.py
359
core/scanner.py
@ -54,6 +54,10 @@ class Scanner:
|
||||
self.target_retries = defaultdict(int)
|
||||
self.scan_failed_due_to_retries = False
|
||||
|
||||
# **NEW**: Track currently processing tasks to prevent processing after stop
|
||||
self.currently_processing = set()
|
||||
self.processing_lock = threading.Lock()
|
||||
|
||||
# Scanning progress tracking
|
||||
self.total_indicators_found = 0
|
||||
self.indicators_processed = 0
|
||||
@ -124,7 +128,8 @@ class Scanner:
|
||||
unpicklable_attrs = [
|
||||
'stop_event',
|
||||
'scan_thread',
|
||||
'executor'
|
||||
'executor',
|
||||
'processing_lock' # **NEW**: Exclude the processing lock
|
||||
]
|
||||
|
||||
for attr in unpicklable_attrs:
|
||||
@ -148,6 +153,11 @@ class Scanner:
|
||||
self.stop_event = threading.Event()
|
||||
self.scan_thread = None
|
||||
self.executor = None
|
||||
self.processing_lock = threading.Lock() # **NEW**: Recreate processing lock
|
||||
|
||||
# **NEW**: Reset processing tracking
|
||||
if not hasattr(self, 'currently_processing'):
|
||||
self.currently_processing = set()
|
||||
|
||||
# Re-set stop events for providers
|
||||
if hasattr(self, 'providers'):
|
||||
@ -195,30 +205,59 @@ class Scanner:
|
||||
print("Session configuration updated")
|
||||
|
||||
def start_scan(self, target_domain: str, max_depth: int = 2, clear_graph: bool = True) -> bool:
|
||||
"""Start a new reconnaissance scan with immediate GUI feedback."""
|
||||
"""Start a new reconnaissance scan with proper cleanup of previous scans."""
|
||||
print(f"=== STARTING SCAN IN SCANNER {id(self)} ===")
|
||||
print(f"Session ID: {self.session_id}")
|
||||
print(f"Initial scanner status: {self.status}")
|
||||
|
||||
# Clean up previous scan thread if needed
|
||||
# **IMPROVED**: More aggressive cleanup of previous scan
|
||||
if self.scan_thread and self.scan_thread.is_alive():
|
||||
print("A previous scan thread is still alive. Sending termination signal and waiting...")
|
||||
self.stop_scan()
|
||||
# Wait for the thread to die, with a timeout
|
||||
self.scan_thread.join(10.0)
|
||||
print("A previous scan thread is still alive. Forcing termination...")
|
||||
|
||||
# Set stop signals immediately
|
||||
self._set_stop_signal()
|
||||
self.status = ScanStatus.STOPPED
|
||||
|
||||
# Clear all processing state
|
||||
with self.processing_lock:
|
||||
self.currently_processing.clear()
|
||||
self.task_queue.clear()
|
||||
|
||||
# Shutdown executor aggressively
|
||||
if self.executor:
|
||||
print("Shutting down executor forcefully...")
|
||||
self.executor.shutdown(wait=False, cancel_futures=True)
|
||||
self.executor = None
|
||||
|
||||
# Wait for thread termination with shorter timeout
|
||||
print("Waiting for previous scan thread to terminate...")
|
||||
self.scan_thread.join(5.0) # Reduced from 10 seconds
|
||||
|
||||
if self.scan_thread.is_alive():
|
||||
print("ERROR: The previous scan thread is unresponsive and could not be stopped.")
|
||||
self.status = ScanStatus.FAILED
|
||||
self._update_session_state()
|
||||
return False
|
||||
print("Previous scan thread terminated successfully.")
|
||||
print("WARNING: Previous scan thread is still alive after 5 seconds")
|
||||
# Continue anyway, but log the issue
|
||||
self.logger.logger.warning("Previous scan thread failed to terminate cleanly")
|
||||
|
||||
# Reset state for new scan
|
||||
# Reset state for new scan with proper forensic logging
|
||||
print("Resetting scanner state for new scan...")
|
||||
self.status = ScanStatus.IDLE
|
||||
# This update is crucial for the UI to reflect the reset before the new scan starts.
|
||||
self.stop_event.clear()
|
||||
|
||||
# **NEW**: Clear Redis stop signal explicitly
|
||||
if self.session_id:
|
||||
from core.session_manager import session_manager
|
||||
session_manager.clear_stop_signal(self.session_id)
|
||||
|
||||
with self.processing_lock:
|
||||
self.currently_processing.clear()
|
||||
|
||||
self.task_queue.clear()
|
||||
self.target_retries.clear()
|
||||
self.scan_failed_due_to_retries = False
|
||||
|
||||
# Update session state immediately for GUI feedback
|
||||
self._update_session_state()
|
||||
print("Scanner state is now clean for a new scan.")
|
||||
print("Scanner state reset complete.")
|
||||
|
||||
try:
|
||||
if not hasattr(self, 'providers') or not self.providers:
|
||||
@ -233,19 +272,11 @@ class Scanner:
|
||||
self.max_depth = max_depth
|
||||
self.current_depth = 0
|
||||
|
||||
# Clear both local and Redis stop signals for the new scan
|
||||
self.stop_event.clear()
|
||||
if self.session_id:
|
||||
from core.session_manager import session_manager
|
||||
session_manager.clear_stop_signal(self.session_id)
|
||||
|
||||
self.total_indicators_found = 0
|
||||
self.indicators_processed = 0
|
||||
self.indicators_completed = 0
|
||||
self.tasks_re_enqueued = 0
|
||||
self.current_indicator = self.current_target
|
||||
self.target_retries = defaultdict(int)
|
||||
self.scan_failed_due_to_retries = False
|
||||
|
||||
# Update GUI with scan preparation state
|
||||
self._update_session_state()
|
||||
@ -270,17 +301,16 @@ class Scanner:
|
||||
print(f"ERROR: Exception in start_scan for scanner {id(self)}: {e}")
|
||||
traceback.print_exc()
|
||||
self.status = ScanStatus.FAILED
|
||||
# Update GUI with failed status immediately
|
||||
self._update_session_state()
|
||||
return False
|
||||
|
||||
def _execute_scan(self, target_domain: str, max_depth: int) -> None:
|
||||
"""Execute the reconnaissance scan using a task queue-based approach."""
|
||||
"""Execute the reconnaissance scan with proper termination handling."""
|
||||
print(f"_execute_scan started for {target_domain} with depth {max_depth}")
|
||||
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
|
||||
processed_targets = set()
|
||||
|
||||
self.task_queue.append((target_domain, 0, False)) # target, depth, is_large_entity_member
|
||||
self.task_queue.append((target_domain, 0, False))
|
||||
|
||||
try:
|
||||
self.status = ScanStatus.RUNNING
|
||||
@ -291,46 +321,80 @@ class Scanner:
|
||||
self.graph.add_node(target_domain, NodeType.DOMAIN)
|
||||
self._initialize_provider_states(target_domain)
|
||||
|
||||
while self.task_queue:
|
||||
if self._is_stop_requested():
|
||||
print("Stop requested, terminating scan.")
|
||||
# **IMPROVED**: Better termination checking in main loop
|
||||
while self.task_queue and not self._is_stop_requested():
|
||||
try:
|
||||
target, depth, is_large_entity_member = self.task_queue.popleft()
|
||||
except IndexError:
|
||||
# Queue became empty during processing
|
||||
break
|
||||
|
||||
target, depth, is_large_entity_member = self.task_queue.popleft()
|
||||
|
||||
if target in processed_targets:
|
||||
continue
|
||||
|
||||
if depth > max_depth:
|
||||
continue
|
||||
|
||||
self.current_depth = depth
|
||||
self.current_indicator = target
|
||||
self._update_session_state()
|
||||
# **NEW**: Track this target as currently processing
|
||||
with self.processing_lock:
|
||||
if self._is_stop_requested():
|
||||
print(f"Stop requested before processing {target}")
|
||||
break
|
||||
self.currently_processing.add(target)
|
||||
|
||||
new_targets, large_entity_members, success = self._query_providers_for_target(target, depth, is_large_entity_member)
|
||||
if not success:
|
||||
self.target_retries[target] += 1
|
||||
if self.target_retries[target] <= self.config.max_retries_per_target:
|
||||
print(f"Re-queueing target {target} (attempt {self.target_retries[target]})")
|
||||
self.task_queue.append((target, depth, is_large_entity_member))
|
||||
self.tasks_re_enqueued += 1
|
||||
try:
|
||||
self.current_depth = depth
|
||||
self.current_indicator = target
|
||||
self._update_session_state()
|
||||
|
||||
# **IMPROVED**: More frequent stop checking during processing
|
||||
if self._is_stop_requested():
|
||||
print(f"Stop requested during processing setup for {target}")
|
||||
break
|
||||
|
||||
new_targets, large_entity_members, success = self._query_providers_for_target(target, depth, is_large_entity_member)
|
||||
|
||||
# **NEW**: Check stop signal after provider queries
|
||||
if self._is_stop_requested():
|
||||
print(f"Stop requested after querying providers for {target}")
|
||||
break
|
||||
|
||||
if not success:
|
||||
self.target_retries[target] += 1
|
||||
if self.target_retries[target] <= self.config.max_retries_per_target:
|
||||
print(f"Re-queueing target {target} (attempt {self.target_retries[target]})")
|
||||
self.task_queue.append((target, depth, is_large_entity_member))
|
||||
self.tasks_re_enqueued += 1
|
||||
else:
|
||||
print(f"ERROR: Max retries exceeded for target {target}")
|
||||
self.scan_failed_due_to_retries = True
|
||||
self._log_target_processing_error(target, "Max retries exceeded")
|
||||
else:
|
||||
print(f"ERROR: Max retries exceeded for target {target}")
|
||||
self.scan_failed_due_to_retries = True
|
||||
self._log_target_processing_error(target, "Max retries exceeded")
|
||||
else:
|
||||
processed_targets.add(target)
|
||||
self.indicators_completed += 1
|
||||
processed_targets.add(target)
|
||||
self.indicators_completed += 1
|
||||
|
||||
for new_target in new_targets:
|
||||
if new_target not in processed_targets:
|
||||
self.task_queue.append((new_target, depth + 1, False))
|
||||
# **NEW**: Only add new targets if not stopped
|
||||
if not self._is_stop_requested():
|
||||
for new_target in new_targets:
|
||||
if new_target not in processed_targets:
|
||||
self.task_queue.append((new_target, depth + 1, False))
|
||||
|
||||
for member in large_entity_members:
|
||||
if member not in processed_targets:
|
||||
self.task_queue.append((member, depth, True))
|
||||
for member in large_entity_members:
|
||||
if member not in processed_targets:
|
||||
self.task_queue.append((member, depth, True))
|
||||
|
||||
finally:
|
||||
# **NEW**: Always remove from processing set
|
||||
with self.processing_lock:
|
||||
self.currently_processing.discard(target)
|
||||
|
||||
# **NEW**: Log termination reason
|
||||
if self._is_stop_requested():
|
||||
print("Scan terminated due to stop request")
|
||||
self.logger.logger.info("Scan terminated by user request")
|
||||
elif not self.task_queue:
|
||||
print("Scan completed - no more targets to process")
|
||||
self.logger.logger.info("Scan completed - all targets processed")
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Scan execution failed with error: {e}")
|
||||
@ -338,6 +402,10 @@ class Scanner:
|
||||
self.status = ScanStatus.FAILED
|
||||
self.logger.logger.error(f"Scan failed: {e}")
|
||||
finally:
|
||||
# **NEW**: Clear processing state on exit
|
||||
with self.processing_lock:
|
||||
self.currently_processing.clear()
|
||||
|
||||
if self._is_stop_requested():
|
||||
self.status = ScanStatus.STOPPED
|
||||
elif self.scan_failed_due_to_retries:
|
||||
@ -349,6 +417,7 @@ class Scanner:
|
||||
self.logger.log_scan_complete()
|
||||
if self.executor:
|
||||
self.executor.shutdown(wait=False, cancel_futures=True)
|
||||
self.executor = None
|
||||
stats = self.graph.get_statistics()
|
||||
print("Final scan statistics:")
|
||||
print(f" - Total nodes: {stats['basic_metrics']['total_nodes']}")
|
||||
@ -356,15 +425,16 @@ class Scanner:
|
||||
print(f" - Targets processed: {len(processed_targets)}")
|
||||
|
||||
def _query_providers_for_target(self, target: str, depth: int, dns_only: bool = False) -> Tuple[Set[str], Set[str], bool]:
|
||||
"""Helper method to query providers for a single target."""
|
||||
is_ip = _is_valid_ip(target)
|
||||
target_type = NodeType.IP if is_ip else NodeType.DOMAIN
|
||||
print(f"Querying providers for {target_type.value}: {target} at depth {depth}")
|
||||
|
||||
"""Query providers for a single target with enhanced stop checking."""
|
||||
# **NEW**: Early termination check
|
||||
if self._is_stop_requested():
|
||||
print(f"Stop requested before querying providers for {target}")
|
||||
return set(), set(), False
|
||||
|
||||
is_ip = _is_valid_ip(target)
|
||||
target_type = NodeType.IP if is_ip else NodeType.DOMAIN
|
||||
print(f"Querying providers for {target_type.value}: {target} at depth {depth}")
|
||||
|
||||
self.graph.add_node(target, target_type)
|
||||
self._initialize_provider_states(target)
|
||||
|
||||
@ -379,9 +449,10 @@ class Scanner:
|
||||
self._log_no_eligible_providers(target, is_ip)
|
||||
return new_targets, large_entity_members, True
|
||||
|
||||
for provider in eligible_providers:
|
||||
# **IMPROVED**: Check stop signal before each provider
|
||||
for i, provider in enumerate(eligible_providers):
|
||||
if self._is_stop_requested():
|
||||
print(f"Stop requested while querying providers for {target}")
|
||||
print(f"Stop requested while querying provider {i+1}/{len(eligible_providers)} for {target}")
|
||||
all_providers_successful = False
|
||||
break
|
||||
|
||||
@ -397,18 +468,66 @@ class Scanner:
|
||||
large_entity_members.update(discovered)
|
||||
else:
|
||||
new_targets.update(discovered)
|
||||
else:
|
||||
print(f"Stop requested after processing results from {provider.get_name()}")
|
||||
break
|
||||
except Exception as e:
|
||||
all_providers_successful = False
|
||||
self._log_provider_error(target, provider.get_name(), str(e))
|
||||
|
||||
for node_id, attributes in node_attributes.items():
|
||||
if self.graph.graph.has_node(node_id):
|
||||
node_is_ip = _is_valid_ip(node_id)
|
||||
node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN
|
||||
self.graph.add_node(node_id, node_type_to_add, attributes=attributes)
|
||||
# **NEW**: Only update node attributes if not stopped
|
||||
if not self._is_stop_requested():
|
||||
for node_id, attributes in node_attributes.items():
|
||||
if self.graph.graph.has_node(node_id):
|
||||
node_is_ip = _is_valid_ip(node_id)
|
||||
node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN
|
||||
self.graph.add_node(node_id, node_type_to_add, attributes=attributes)
|
||||
|
||||
return new_targets, large_entity_members, all_providers_successful
|
||||
|
||||
def stop_scan(self) -> bool:
|
||||
"""Request immediate scan termination with proper cleanup."""
|
||||
try:
|
||||
print("=== INITIATING IMMEDIATE SCAN TERMINATION ===")
|
||||
self.logger.logger.info("Scan termination requested by user")
|
||||
|
||||
# **IMPROVED**: More aggressive stop signal setting
|
||||
self._set_stop_signal()
|
||||
self.status = ScanStatus.STOPPED
|
||||
|
||||
# **NEW**: Clear processing state immediately
|
||||
with self.processing_lock:
|
||||
currently_processing_copy = self.currently_processing.copy()
|
||||
self.currently_processing.clear()
|
||||
print(f"Cleared {len(currently_processing_copy)} currently processing targets: {currently_processing_copy}")
|
||||
|
||||
# **IMPROVED**: Clear task queue and log what was discarded
|
||||
discarded_tasks = list(self.task_queue)
|
||||
self.task_queue.clear()
|
||||
print(f"Discarded {len(discarded_tasks)} pending tasks")
|
||||
|
||||
# **IMPROVED**: Aggressively shut down executor
|
||||
if self.executor:
|
||||
print("Shutting down executor with immediate cancellation...")
|
||||
try:
|
||||
# Cancel all pending futures
|
||||
self.executor.shutdown(wait=False, cancel_futures=True)
|
||||
print("Executor shutdown completed")
|
||||
except Exception as e:
|
||||
print(f"Error during executor shutdown: {e}")
|
||||
|
||||
# Immediately update GUI with stopped status
|
||||
self._update_session_state()
|
||||
|
||||
print("Termination signals sent. The scan will stop as soon as possible.")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Exception in stop_scan: {e}")
|
||||
self.logger.logger.error(f"Error during scan termination: {e}")
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def _update_session_state(self) -> None:
|
||||
"""
|
||||
Update the scanner state in Redis for GUI updates.
|
||||
@ -423,6 +542,49 @@ class Scanner:
|
||||
except Exception as e:
|
||||
print(f"ERROR: Failed to update session state: {e}")
|
||||
|
||||
def get_scan_status(self) -> Dict[str, Any]:
|
||||
"""Get current scan status with processing information."""
|
||||
try:
|
||||
with self.processing_lock:
|
||||
currently_processing_count = len(self.currently_processing)
|
||||
currently_processing_list = list(self.currently_processing)
|
||||
|
||||
return {
|
||||
'status': self.status,
|
||||
'target_domain': self.current_target,
|
||||
'current_depth': self.current_depth,
|
||||
'max_depth': self.max_depth,
|
||||
'current_indicator': self.current_indicator,
|
||||
'indicators_processed': self.indicators_processed,
|
||||
'indicators_completed': self.indicators_completed,
|
||||
'tasks_re_enqueued': self.tasks_re_enqueued,
|
||||
'progress_percentage': self._calculate_progress(),
|
||||
'enabled_providers': [provider.get_name() for provider in self.providers],
|
||||
'graph_statistics': self.graph.get_statistics(),
|
||||
'task_queue_size': len(self.task_queue),
|
||||
'currently_processing_count': currently_processing_count, # **NEW**
|
||||
'currently_processing': currently_processing_list[:5] # **NEW**: Show first 5 for debugging
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"ERROR: Exception in get_scan_status: {e}")
|
||||
traceback.print_exc()
|
||||
return {
|
||||
'status': 'error',
|
||||
'target_domain': None,
|
||||
'current_depth': 0,
|
||||
'max_depth': 0,
|
||||
'current_indicator': '',
|
||||
'indicators_processed': 0,
|
||||
'indicators_completed': 0,
|
||||
'tasks_re_enqueued': 0,
|
||||
'progress_percentage': 0.0,
|
||||
'enabled_providers': [],
|
||||
'graph_statistics': {},
|
||||
'task_queue_size': 0,
|
||||
'currently_processing_count': 0,
|
||||
'currently_processing': []
|
||||
}
|
||||
|
||||
def _initialize_provider_states(self, target: str) -> None:
|
||||
"""Initialize provider states for forensic tracking."""
|
||||
if not self.graph.graph.has_node(target):
|
||||
@ -534,7 +696,7 @@ class Scanner:
|
||||
return members, True
|
||||
|
||||
for i, (source, rel_target, rel_type, confidence, raw_data) in enumerate(results):
|
||||
if i % 10 == 0 and self._is_stop_requested():
|
||||
if i % 5 == 0 and self._is_stop_requested(): # Check more frequently
|
||||
print(f"Stop requested while processing results from {provider_name} for {target}")
|
||||
break
|
||||
|
||||
@ -588,7 +750,6 @@ class Scanner:
|
||||
|
||||
return discovered_targets, False
|
||||
|
||||
|
||||
def _create_large_entity(self, source: str, provider_name: str, results: List, current_depth: int) -> Set[str]:
|
||||
"""Create a large entity node and returns the members for DNS processing."""
|
||||
entity_id = f"large_entity_{provider_name}_{hash(source) & 0x7FFFFFFF}"
|
||||
@ -672,7 +833,6 @@ class Scanner:
|
||||
if target not in attributes[record_type_name]:
|
||||
attributes[record_type_name].append(target)
|
||||
|
||||
|
||||
def _log_target_processing_error(self, target: str, error: str) -> None:
|
||||
"""Log target processing errors for forensic trail."""
|
||||
self.logger.logger.error(f"Target processing failed for {target}: {error}")
|
||||
@ -686,69 +846,6 @@ class Scanner:
|
||||
target_type = 'IP' if is_ip else 'domain'
|
||||
self.logger.logger.warning(f"No eligible providers for {target_type}: {target}")
|
||||
|
||||
def stop_scan(self) -> bool:
|
||||
"""Request immediate scan termination with immediate GUI feedback."""
|
||||
try:
|
||||
print("=== INITIATING IMMEDIATE SCAN TERMINATION ===")
|
||||
self.logger.logger.info("Scan termination requested by user")
|
||||
|
||||
# Set both local and Redis stop signals
|
||||
self._set_stop_signal()
|
||||
self.status = ScanStatus.STOPPED
|
||||
self.task_queue.clear()
|
||||
|
||||
# Immediately update GUI with stopped status
|
||||
self._update_session_state()
|
||||
|
||||
# Aggressively shut down the executor and cancel all pending tasks
|
||||
if self.executor:
|
||||
print("Shutting down executor with immediate cancellation...")
|
||||
self.executor.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
print("Termination signals sent. The scan will stop as soon as possible.")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"ERROR: Exception in stop_scan: {e}")
|
||||
self.logger.logger.error(f"Error during scan termination: {e}")
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
def get_scan_status(self) -> Dict[str, Any]:
|
||||
"""Get current scan status with forensic information."""
|
||||
try:
|
||||
return {
|
||||
'status': self.status,
|
||||
'target_domain': self.current_target,
|
||||
'current_depth': self.current_depth,
|
||||
'max_depth': self.max_depth,
|
||||
'current_indicator': self.current_indicator,
|
||||
'indicators_processed': self.indicators_processed,
|
||||
'indicators_completed': self.indicators_completed,
|
||||
'tasks_re_enqueued': self.tasks_re_enqueued,
|
||||
'progress_percentage': self._calculate_progress(),
|
||||
'enabled_providers': [provider.get_name() for provider in self.providers],
|
||||
'graph_statistics': self.graph.get_statistics(),
|
||||
'task_queue_size': len(self.task_queue)
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"ERROR: Exception in get_scan_status: {e}")
|
||||
traceback.print_exc()
|
||||
return {
|
||||
'status': 'error',
|
||||
'target_domain': None,
|
||||
'current_depth': 0,
|
||||
'max_depth': 0,
|
||||
'current_indicator': '',
|
||||
'indicators_processed': 0,
|
||||
'indicators_completed': 0,
|
||||
'tasks_re_enqueued': 0,
|
||||
'progress_percentage': 0.0,
|
||||
'enabled_providers': [],
|
||||
'graph_statistics': {},
|
||||
'task_queue_size': 0
|
||||
}
|
||||
|
||||
def _calculate_progress(self) -> float:
|
||||
"""Calculate scan progress percentage based on task completion."""
|
||||
total_tasks = self.indicators_completed + len(self.task_queue)
|
||||
|
Loading…
x
Reference in New Issue
Block a user