From 39ce0e9d1130a90af03e34693b370e7684d4c030 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Sun, 14 Sep 2025 19:12:12 +0200 Subject: [PATCH] great progress --- core/logger.py | 2 - core/scanner.py | 371 ++++++++++++++++++++++++++++++------------------ 2 files changed, 234 insertions(+), 139 deletions(-) diff --git a/core/logger.py b/core/logger.py index a916853..fcbffeb 100644 --- a/core/logger.py +++ b/core/logger.py @@ -203,8 +203,6 @@ class ForensicLogger: self.session_metadata['target_domains'] = list(self.session_metadata['target_domains']) self.logger.info(f"Scan Complete - Session: {self.session_id}") - self.logger.info(f"Total API Requests: {self.session_metadata['total_requests']}") - self.logger.info(f"Total Relationships: {self.session_metadata['total_relationships']}") def export_audit_trail(self) -> Dict[str, Any]: """ diff --git a/core/scanner.py b/core/scanner.py index f033629..9afab06 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -54,6 +54,10 @@ class Scanner: self.target_retries = defaultdict(int) self.scan_failed_due_to_retries = False + # **NEW**: Track currently processing tasks to prevent processing after stop + self.currently_processing = set() + self.processing_lock = threading.Lock() + # Scanning progress tracking self.total_indicators_found = 0 self.indicators_processed = 0 @@ -124,7 +128,8 @@ class Scanner: unpicklable_attrs = [ 'stop_event', 'scan_thread', - 'executor' + 'executor', + 'processing_lock' # **NEW**: Exclude the processing lock ] for attr in unpicklable_attrs: @@ -148,6 +153,11 @@ class Scanner: self.stop_event = threading.Event() self.scan_thread = None self.executor = None + self.processing_lock = threading.Lock() # **NEW**: Recreate processing lock + + # **NEW**: Reset processing tracking + if not hasattr(self, 'currently_processing'): + self.currently_processing = set() # Re-set stop events for providers if hasattr(self, 'providers'): @@ -195,30 +205,59 @@ class Scanner: print("Session configuration updated") def start_scan(self, target_domain: str, max_depth: int = 2, clear_graph: bool = True) -> bool: - """Start a new reconnaissance scan with immediate GUI feedback.""" + """Start a new reconnaissance scan with proper cleanup of previous scans.""" print(f"=== STARTING SCAN IN SCANNER {id(self)} ===") print(f"Session ID: {self.session_id}") print(f"Initial scanner status: {self.status}") - # Clean up previous scan thread if needed + # **IMPROVED**: More aggressive cleanup of previous scan if self.scan_thread and self.scan_thread.is_alive(): - print("A previous scan thread is still alive. Sending termination signal and waiting...") - self.stop_scan() - # Wait for the thread to die, with a timeout - self.scan_thread.join(10.0) - + print("A previous scan thread is still alive. Forcing termination...") + + # Set stop signals immediately + self._set_stop_signal() + self.status = ScanStatus.STOPPED + + # Clear all processing state + with self.processing_lock: + self.currently_processing.clear() + self.task_queue.clear() + + # Shutdown executor aggressively + if self.executor: + print("Shutting down executor forcefully...") + self.executor.shutdown(wait=False, cancel_futures=True) + self.executor = None + + # Wait for thread termination with shorter timeout + print("Waiting for previous scan thread to terminate...") + self.scan_thread.join(5.0) # Reduced from 10 seconds + if self.scan_thread.is_alive(): - print("ERROR: The previous scan thread is unresponsive and could not be stopped.") - self.status = ScanStatus.FAILED - self._update_session_state() - return False - print("Previous scan thread terminated successfully.") + print("WARNING: Previous scan thread is still alive after 5 seconds") + # Continue anyway, but log the issue + self.logger.logger.warning("Previous scan thread failed to terminate cleanly") - # Reset state for new scan + # Reset state for new scan with proper forensic logging + print("Resetting scanner state for new scan...") self.status = ScanStatus.IDLE - # This update is crucial for the UI to reflect the reset before the new scan starts. - self._update_session_state() - print("Scanner state is now clean for a new scan.") + self.stop_event.clear() + + # **NEW**: Clear Redis stop signal explicitly + if self.session_id: + from core.session_manager import session_manager + session_manager.clear_stop_signal(self.session_id) + + with self.processing_lock: + self.currently_processing.clear() + + self.task_queue.clear() + self.target_retries.clear() + self.scan_failed_due_to_retries = False + + # Update session state immediately for GUI feedback + self._update_session_state() + print("Scanner state reset complete.") try: if not hasattr(self, 'providers') or not self.providers: @@ -233,19 +272,11 @@ class Scanner: self.max_depth = max_depth self.current_depth = 0 - # Clear both local and Redis stop signals for the new scan - self.stop_event.clear() - if self.session_id: - from core.session_manager import session_manager - session_manager.clear_stop_signal(self.session_id) - self.total_indicators_found = 0 self.indicators_processed = 0 self.indicators_completed = 0 self.tasks_re_enqueued = 0 self.current_indicator = self.current_target - self.target_retries = defaultdict(int) - self.scan_failed_due_to_retries = False # Update GUI with scan preparation state self._update_session_state() @@ -270,17 +301,16 @@ class Scanner: print(f"ERROR: Exception in start_scan for scanner {id(self)}: {e}") traceback.print_exc() self.status = ScanStatus.FAILED - # Update GUI with failed status immediately self._update_session_state() return False def _execute_scan(self, target_domain: str, max_depth: int) -> None: - """Execute the reconnaissance scan using a task queue-based approach.""" + """Execute the reconnaissance scan with proper termination handling.""" print(f"_execute_scan started for {target_domain} with depth {max_depth}") self.executor = ThreadPoolExecutor(max_workers=self.max_workers) processed_targets = set() - self.task_queue.append((target_domain, 0, False)) # target, depth, is_large_entity_member + self.task_queue.append((target_domain, 0, False)) try: self.status = ScanStatus.RUNNING @@ -291,46 +321,80 @@ class Scanner: self.graph.add_node(target_domain, NodeType.DOMAIN) self._initialize_provider_states(target_domain) - while self.task_queue: - if self._is_stop_requested(): - print("Stop requested, terminating scan.") + # **IMPROVED**: Better termination checking in main loop + while self.task_queue and not self._is_stop_requested(): + try: + target, depth, is_large_entity_member = self.task_queue.popleft() + except IndexError: + # Queue became empty during processing break - target, depth, is_large_entity_member = self.task_queue.popleft() - if target in processed_targets: continue if depth > max_depth: continue - self.current_depth = depth - self.current_indicator = target - self._update_session_state() - - new_targets, large_entity_members, success = self._query_providers_for_target(target, depth, is_large_entity_member) - if not success: - self.target_retries[target] += 1 - if self.target_retries[target] <= self.config.max_retries_per_target: - print(f"Re-queueing target {target} (attempt {self.target_retries[target]})") - self.task_queue.append((target, depth, is_large_entity_member)) - self.tasks_re_enqueued += 1 - else: - print(f"ERROR: Max retries exceeded for target {target}") - self.scan_failed_due_to_retries = True - self._log_target_processing_error(target, "Max retries exceeded") - else: - processed_targets.add(target) - self.indicators_completed += 1 - - for new_target in new_targets: - if new_target not in processed_targets: - self.task_queue.append((new_target, depth + 1, False)) - - for member in large_entity_members: - if member not in processed_targets: - self.task_queue.append((member, depth, True)) + # **NEW**: Track this target as currently processing + with self.processing_lock: + if self._is_stop_requested(): + print(f"Stop requested before processing {target}") + break + self.currently_processing.add(target) + try: + self.current_depth = depth + self.current_indicator = target + self._update_session_state() + + # **IMPROVED**: More frequent stop checking during processing + if self._is_stop_requested(): + print(f"Stop requested during processing setup for {target}") + break + + new_targets, large_entity_members, success = self._query_providers_for_target(target, depth, is_large_entity_member) + + # **NEW**: Check stop signal after provider queries + if self._is_stop_requested(): + print(f"Stop requested after querying providers for {target}") + break + + if not success: + self.target_retries[target] += 1 + if self.target_retries[target] <= self.config.max_retries_per_target: + print(f"Re-queueing target {target} (attempt {self.target_retries[target]})") + self.task_queue.append((target, depth, is_large_entity_member)) + self.tasks_re_enqueued += 1 + else: + print(f"ERROR: Max retries exceeded for target {target}") + self.scan_failed_due_to_retries = True + self._log_target_processing_error(target, "Max retries exceeded") + else: + processed_targets.add(target) + self.indicators_completed += 1 + + # **NEW**: Only add new targets if not stopped + if not self._is_stop_requested(): + for new_target in new_targets: + if new_target not in processed_targets: + self.task_queue.append((new_target, depth + 1, False)) + + for member in large_entity_members: + if member not in processed_targets: + self.task_queue.append((member, depth, True)) + + finally: + # **NEW**: Always remove from processing set + with self.processing_lock: + self.currently_processing.discard(target) + + # **NEW**: Log termination reason + if self._is_stop_requested(): + print("Scan terminated due to stop request") + self.logger.logger.info("Scan terminated by user request") + elif not self.task_queue: + print("Scan completed - no more targets to process") + self.logger.logger.info("Scan completed - all targets processed") except Exception as e: print(f"ERROR: Scan execution failed with error: {e}") @@ -338,6 +402,10 @@ class Scanner: self.status = ScanStatus.FAILED self.logger.logger.error(f"Scan failed: {e}") finally: + # **NEW**: Clear processing state on exit + with self.processing_lock: + self.currently_processing.clear() + if self._is_stop_requested(): self.status = ScanStatus.STOPPED elif self.scan_failed_due_to_retries: @@ -349,6 +417,7 @@ class Scanner: self.logger.log_scan_complete() if self.executor: self.executor.shutdown(wait=False, cancel_futures=True) + self.executor = None stats = self.graph.get_statistics() print("Final scan statistics:") print(f" - Total nodes: {stats['basic_metrics']['total_nodes']}") @@ -356,15 +425,16 @@ class Scanner: print(f" - Targets processed: {len(processed_targets)}") def _query_providers_for_target(self, target: str, depth: int, dns_only: bool = False) -> Tuple[Set[str], Set[str], bool]: - """Helper method to query providers for a single target.""" - is_ip = _is_valid_ip(target) - target_type = NodeType.IP if is_ip else NodeType.DOMAIN - print(f"Querying providers for {target_type.value}: {target} at depth {depth}") - + """Query providers for a single target with enhanced stop checking.""" + # **NEW**: Early termination check if self._is_stop_requested(): print(f"Stop requested before querying providers for {target}") return set(), set(), False + is_ip = _is_valid_ip(target) + target_type = NodeType.IP if is_ip else NodeType.DOMAIN + print(f"Querying providers for {target_type.value}: {target} at depth {depth}") + self.graph.add_node(target, target_type) self._initialize_provider_states(target) @@ -379,9 +449,10 @@ class Scanner: self._log_no_eligible_providers(target, is_ip) return new_targets, large_entity_members, True - for provider in eligible_providers: + # **IMPROVED**: Check stop signal before each provider + for i, provider in enumerate(eligible_providers): if self._is_stop_requested(): - print(f"Stop requested while querying providers for {target}") + print(f"Stop requested while querying provider {i+1}/{len(eligible_providers)} for {target}") all_providers_successful = False break @@ -397,18 +468,66 @@ class Scanner: large_entity_members.update(discovered) else: new_targets.update(discovered) + else: + print(f"Stop requested after processing results from {provider.get_name()}") + break except Exception as e: all_providers_successful = False self._log_provider_error(target, provider.get_name(), str(e)) - for node_id, attributes in node_attributes.items(): - if self.graph.graph.has_node(node_id): - node_is_ip = _is_valid_ip(node_id) - node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN - self.graph.add_node(node_id, node_type_to_add, attributes=attributes) + # **NEW**: Only update node attributes if not stopped + if not self._is_stop_requested(): + for node_id, attributes in node_attributes.items(): + if self.graph.graph.has_node(node_id): + node_is_ip = _is_valid_ip(node_id) + node_type_to_add = NodeType.IP if node_is_ip else NodeType.DOMAIN + self.graph.add_node(node_id, node_type_to_add, attributes=attributes) return new_targets, large_entity_members, all_providers_successful + def stop_scan(self) -> bool: + """Request immediate scan termination with proper cleanup.""" + try: + print("=== INITIATING IMMEDIATE SCAN TERMINATION ===") + self.logger.logger.info("Scan termination requested by user") + + # **IMPROVED**: More aggressive stop signal setting + self._set_stop_signal() + self.status = ScanStatus.STOPPED + + # **NEW**: Clear processing state immediately + with self.processing_lock: + currently_processing_copy = self.currently_processing.copy() + self.currently_processing.clear() + print(f"Cleared {len(currently_processing_copy)} currently processing targets: {currently_processing_copy}") + + # **IMPROVED**: Clear task queue and log what was discarded + discarded_tasks = list(self.task_queue) + self.task_queue.clear() + print(f"Discarded {len(discarded_tasks)} pending tasks") + + # **IMPROVED**: Aggressively shut down executor + if self.executor: + print("Shutting down executor with immediate cancellation...") + try: + # Cancel all pending futures + self.executor.shutdown(wait=False, cancel_futures=True) + print("Executor shutdown completed") + except Exception as e: + print(f"Error during executor shutdown: {e}") + + # Immediately update GUI with stopped status + self._update_session_state() + + print("Termination signals sent. The scan will stop as soon as possible.") + return True + + except Exception as e: + print(f"ERROR: Exception in stop_scan: {e}") + self.logger.logger.error(f"Error during scan termination: {e}") + traceback.print_exc() + return False + def _update_session_state(self) -> None: """ Update the scanner state in Redis for GUI updates. @@ -423,6 +542,49 @@ class Scanner: except Exception as e: print(f"ERROR: Failed to update session state: {e}") + def get_scan_status(self) -> Dict[str, Any]: + """Get current scan status with processing information.""" + try: + with self.processing_lock: + currently_processing_count = len(self.currently_processing) + currently_processing_list = list(self.currently_processing) + + return { + 'status': self.status, + 'target_domain': self.current_target, + 'current_depth': self.current_depth, + 'max_depth': self.max_depth, + 'current_indicator': self.current_indicator, + 'indicators_processed': self.indicators_processed, + 'indicators_completed': self.indicators_completed, + 'tasks_re_enqueued': self.tasks_re_enqueued, + 'progress_percentage': self._calculate_progress(), + 'enabled_providers': [provider.get_name() for provider in self.providers], + 'graph_statistics': self.graph.get_statistics(), + 'task_queue_size': len(self.task_queue), + 'currently_processing_count': currently_processing_count, # **NEW** + 'currently_processing': currently_processing_list[:5] # **NEW**: Show first 5 for debugging + } + except Exception as e: + print(f"ERROR: Exception in get_scan_status: {e}") + traceback.print_exc() + return { + 'status': 'error', + 'target_domain': None, + 'current_depth': 0, + 'max_depth': 0, + 'current_indicator': '', + 'indicators_processed': 0, + 'indicators_completed': 0, + 'tasks_re_enqueued': 0, + 'progress_percentage': 0.0, + 'enabled_providers': [], + 'graph_statistics': {}, + 'task_queue_size': 0, + 'currently_processing_count': 0, + 'currently_processing': [] + } + def _initialize_provider_states(self, target: str) -> None: """Initialize provider states for forensic tracking.""" if not self.graph.graph.has_node(target): @@ -534,7 +696,7 @@ class Scanner: return members, True for i, (source, rel_target, rel_type, confidence, raw_data) in enumerate(results): - if i % 10 == 0 and self._is_stop_requested(): + if i % 5 == 0 and self._is_stop_requested(): # Check more frequently print(f"Stop requested while processing results from {provider_name} for {target}") break @@ -588,7 +750,6 @@ class Scanner: return discovered_targets, False - def _create_large_entity(self, source: str, provider_name: str, results: List, current_depth: int) -> Set[str]: """Create a large entity node and returns the members for DNS processing.""" entity_id = f"large_entity_{provider_name}_{hash(source) & 0x7FFFFFFF}" @@ -672,7 +833,6 @@ class Scanner: if target not in attributes[record_type_name]: attributes[record_type_name].append(target) - def _log_target_processing_error(self, target: str, error: str) -> None: """Log target processing errors for forensic trail.""" self.logger.logger.error(f"Target processing failed for {target}: {error}") @@ -686,69 +846,6 @@ class Scanner: target_type = 'IP' if is_ip else 'domain' self.logger.logger.warning(f"No eligible providers for {target_type}: {target}") - def stop_scan(self) -> bool: - """Request immediate scan termination with immediate GUI feedback.""" - try: - print("=== INITIATING IMMEDIATE SCAN TERMINATION ===") - self.logger.logger.info("Scan termination requested by user") - - # Set both local and Redis stop signals - self._set_stop_signal() - self.status = ScanStatus.STOPPED - self.task_queue.clear() - - # Immediately update GUI with stopped status - self._update_session_state() - - # Aggressively shut down the executor and cancel all pending tasks - if self.executor: - print("Shutting down executor with immediate cancellation...") - self.executor.shutdown(wait=False, cancel_futures=True) - - print("Termination signals sent. The scan will stop as soon as possible.") - return True - - except Exception as e: - print(f"ERROR: Exception in stop_scan: {e}") - self.logger.logger.error(f"Error during scan termination: {e}") - traceback.print_exc() - return False - - def get_scan_status(self) -> Dict[str, Any]: - """Get current scan status with forensic information.""" - try: - return { - 'status': self.status, - 'target_domain': self.current_target, - 'current_depth': self.current_depth, - 'max_depth': self.max_depth, - 'current_indicator': self.current_indicator, - 'indicators_processed': self.indicators_processed, - 'indicators_completed': self.indicators_completed, - 'tasks_re_enqueued': self.tasks_re_enqueued, - 'progress_percentage': self._calculate_progress(), - 'enabled_providers': [provider.get_name() for provider in self.providers], - 'graph_statistics': self.graph.get_statistics(), - 'task_queue_size': len(self.task_queue) - } - except Exception as e: - print(f"ERROR: Exception in get_scan_status: {e}") - traceback.print_exc() - return { - 'status': 'error', - 'target_domain': None, - 'current_depth': 0, - 'max_depth': 0, - 'current_indicator': '', - 'indicators_processed': 0, - 'indicators_completed': 0, - 'tasks_re_enqueued': 0, - 'progress_percentage': 0.0, - 'enabled_providers': [], - 'graph_statistics': {}, - 'task_queue_size': 0 - } - def _calculate_progress(self) -> float: """Calculate scan progress percentage based on task completion.""" total_tasks = self.indicators_completed + len(self.task_queue)