diff --git a/core/scanner.py b/core/scanner.py index b59059e..358fa48 100644 --- a/core/scanner.py +++ b/core/scanner.py @@ -1,4 +1,4 @@ -# dnsrecon/core/scanner.py +# dnsrecon-reduced/core/scanner.py import threading import traceback @@ -162,12 +162,12 @@ class Scanner: self.stop_event = threading.Event() self.scan_thread = None self.executor = None - self.processing_lock = threading.Lock() + self.processing_lock = threading.Lock() # **NEW**: Recreate processing lock self.task_queue = PriorityQueue() self.rate_limiter = GlobalRateLimiter(redis.StrictRedis(db=0)) self.logger = get_forensic_logger() - - # This ensures the scanner has access to providers for actions like node extraction. + + # Re-initialize providers after unpickling from session storage print("Re-initializing providers after loading session...") self._initialize_providers() @@ -348,7 +348,7 @@ class Scanner: for provider in initial_providers: provider_name = provider.get_name() self.task_queue.put((self._get_priority(provider_name), (provider_name, target, 0))) - self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE + self.total_tasks_ever_enqueued += 1 try: self.status = ScanStatus.RUNNING @@ -412,7 +412,7 @@ class Scanner: print(f"Re-queueing task {task_tuple} (attempt {self.target_retries[task_tuple]})") self.task_queue.put((priority, (provider_name, target_item, depth))) self.tasks_re_enqueued += 1 - self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE + self.total_tasks_ever_enqueued += 1 else: print(f"ERROR: Max retries exceeded for task {task_tuple}") self.scan_failed_due_to_retries = True @@ -431,7 +431,7 @@ class Scanner: if (p_name_new, new_target) not in processed_tasks: new_depth = depth + 1 if new_target in new_targets else depth self.task_queue.put((self._get_priority(p_name_new), (p_name_new, new_target, new_depth))) - self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE + self.total_tasks_ever_enqueued += 1 finally: with self.processing_lock: self.currently_processing.discard(target_item) @@ -806,7 +806,6 @@ class Scanner: 'source_provider': provider_name, 'discovery_depth': current_depth, 'threshold_exceeded': self.config.large_entity_threshold, - # <<< FIX: Removed 'raw_results'. It's inefficient and unnecessary. } description = f'Large entity created due to {len(targets)} results from {provider_name}' @@ -821,25 +820,47 @@ class Scanner: print(f"Created large entity {entity_id} for {len(targets)} {node_type}s from {provider_name}") return set(targets) - + def extract_node_from_large_entity(self, large_entity_id: str, node_id_to_extract: str) -> bool: """ - Extracts a node from a large entity by re-adding it to the main processing queue. - This is a much cleaner approach than storing and replaying raw results. + Extracts a node from a large entity, re-creates its original edge, and + re-queues it for full scanning. """ if not self.graph.graph.has_node(large_entity_id): print(f"ERROR: Large entity {large_entity_id} not found.") return False - # 1. Modify the graph data structure first - # This removes the node from the container's internal list. + # 1. Get the original source node that discovered the large entity + predecessors = list(self.graph.graph.predecessors(large_entity_id)) + if not predecessors: + print(f"ERROR: No source node found for large entity {large_entity_id}.") + return False + source_node_id = predecessors[0] + + # Get the original edge data to replicate it for the extracted node + original_edge_data = self.graph.graph.get_edge_data(source_node_id, large_entity_id) + if not original_edge_data: + print(f"ERROR: Could not find original edge data from {source_node_id} to {large_entity_id}.") + return False + + # 2. Modify the graph data structure first success = self.graph.extract_node_from_large_entity(large_entity_id, node_id_to_extract) if not success: print(f"ERROR: Node {node_id_to_extract} could not be removed from {large_entity_id}'s attributes.") return False - # 2. Re-queue the extracted node for full processing by all eligible providers. - # This is the same logic used for any newly discovered node. + # 3. Create the direct edge from the original source to the newly extracted node + print(f"Re-creating direct edge from {source_node_id} to extracted node {node_id_to_extract}") + self.graph.add_edge( + source_id=source_node_id, + target_id=node_id_to_extract, + relationship_type=original_edge_data.get('relationship_type', 'extracted_from_large_entity'), + confidence_score=original_edge_data.get('confidence_score', 0.85), # Slightly lower confidence + source_provider=original_edge_data.get('source_provider', 'unknown'), + raw_data={'context': f'Extracted from large entity {large_entity_id}'} + ) + + # 4. Re-queue the extracted node for full processing by all eligible providers print(f"Re-queueing extracted node {node_id_to_extract} for full reconnaissance...") is_ip = _is_valid_ip(node_id_to_extract) current_depth = self.graph.graph.nodes[large_entity_id].get('attributes', {}).get('discovery_depth', 0) @@ -847,21 +868,19 @@ class Scanner: eligible_providers = self._get_eligible_providers(node_id_to_extract, is_ip, False) for provider in eligible_providers: provider_name = provider.get_name() - # Add the task to the main queue with the correct depth. self.task_queue.put((self._get_priority(provider_name), (provider_name, node_id_to_extract, current_depth))) self.total_tasks_ever_enqueued += 1 - # 3. If the scanner is not running, we need to kickstart it to process this one item. + # 5. If the scanner is not running, we need to kickstart it to process this one item. if self.status != ScanStatus.RUNNING: print("Scanner is idle. Starting a mini-scan to process the extracted node.") self.status = ScanStatus.RUNNING self._update_session_state() - # Start a new thread for the scan execution if one isn't running if not self.scan_thread or not self.scan_thread.is_alive(): self.scan_thread = threading.Thread( target=self._execute_scan, - args=(self.current_target, self.max_depth), # Use existing target/depth + args=(self.current_target, self.max_depth), daemon=True ) self.scan_thread.start()