extraction feature works
This commit is contained in:
parent
2c48316477
commit
ecc143ddbb
@ -1,4 +1,4 @@
|
|||||||
# dnsrecon/core/scanner.py
|
# dnsrecon-reduced/core/scanner.py
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
@ -162,12 +162,12 @@ class Scanner:
|
|||||||
self.stop_event = threading.Event()
|
self.stop_event = threading.Event()
|
||||||
self.scan_thread = None
|
self.scan_thread = None
|
||||||
self.executor = None
|
self.executor = None
|
||||||
self.processing_lock = threading.Lock()
|
self.processing_lock = threading.Lock() # **NEW**: Recreate processing lock
|
||||||
self.task_queue = PriorityQueue()
|
self.task_queue = PriorityQueue()
|
||||||
self.rate_limiter = GlobalRateLimiter(redis.StrictRedis(db=0))
|
self.rate_limiter = GlobalRateLimiter(redis.StrictRedis(db=0))
|
||||||
self.logger = get_forensic_logger()
|
self.logger = get_forensic_logger()
|
||||||
|
|
||||||
# This ensures the scanner has access to providers for actions like node extraction.
|
# Re-initialize providers after unpickling from session storage
|
||||||
print("Re-initializing providers after loading session...")
|
print("Re-initializing providers after loading session...")
|
||||||
self._initialize_providers()
|
self._initialize_providers()
|
||||||
|
|
||||||
@ -348,7 +348,7 @@ class Scanner:
|
|||||||
for provider in initial_providers:
|
for provider in initial_providers:
|
||||||
provider_name = provider.get_name()
|
provider_name = provider.get_name()
|
||||||
self.task_queue.put((self._get_priority(provider_name), (provider_name, target, 0)))
|
self.task_queue.put((self._get_priority(provider_name), (provider_name, target, 0)))
|
||||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
self.total_tasks_ever_enqueued += 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.status = ScanStatus.RUNNING
|
self.status = ScanStatus.RUNNING
|
||||||
@ -412,7 +412,7 @@ class Scanner:
|
|||||||
print(f"Re-queueing task {task_tuple} (attempt {self.target_retries[task_tuple]})")
|
print(f"Re-queueing task {task_tuple} (attempt {self.target_retries[task_tuple]})")
|
||||||
self.task_queue.put((priority, (provider_name, target_item, depth)))
|
self.task_queue.put((priority, (provider_name, target_item, depth)))
|
||||||
self.tasks_re_enqueued += 1
|
self.tasks_re_enqueued += 1
|
||||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
self.total_tasks_ever_enqueued += 1
|
||||||
else:
|
else:
|
||||||
print(f"ERROR: Max retries exceeded for task {task_tuple}")
|
print(f"ERROR: Max retries exceeded for task {task_tuple}")
|
||||||
self.scan_failed_due_to_retries = True
|
self.scan_failed_due_to_retries = True
|
||||||
@ -431,7 +431,7 @@ class Scanner:
|
|||||||
if (p_name_new, new_target) not in processed_tasks:
|
if (p_name_new, new_target) not in processed_tasks:
|
||||||
new_depth = depth + 1 if new_target in new_targets else depth
|
new_depth = depth + 1 if new_target in new_targets else depth
|
||||||
self.task_queue.put((self._get_priority(p_name_new), (p_name_new, new_target, new_depth)))
|
self.task_queue.put((self._get_priority(p_name_new), (p_name_new, new_target, new_depth)))
|
||||||
self.total_tasks_ever_enqueued += 1 # <<< FIX: INCREMENT HERE
|
self.total_tasks_ever_enqueued += 1
|
||||||
finally:
|
finally:
|
||||||
with self.processing_lock:
|
with self.processing_lock:
|
||||||
self.currently_processing.discard(target_item)
|
self.currently_processing.discard(target_item)
|
||||||
@ -806,7 +806,6 @@ class Scanner:
|
|||||||
'source_provider': provider_name,
|
'source_provider': provider_name,
|
||||||
'discovery_depth': current_depth,
|
'discovery_depth': current_depth,
|
||||||
'threshold_exceeded': self.config.large_entity_threshold,
|
'threshold_exceeded': self.config.large_entity_threshold,
|
||||||
# <<< FIX: Removed 'raw_results'. It's inefficient and unnecessary.
|
|
||||||
}
|
}
|
||||||
description = f'Large entity created due to {len(targets)} results from {provider_name}'
|
description = f'Large entity created due to {len(targets)} results from {provider_name}'
|
||||||
|
|
||||||
@ -821,25 +820,47 @@ class Scanner:
|
|||||||
print(f"Created large entity {entity_id} for {len(targets)} {node_type}s from {provider_name}")
|
print(f"Created large entity {entity_id} for {len(targets)} {node_type}s from {provider_name}")
|
||||||
|
|
||||||
return set(targets)
|
return set(targets)
|
||||||
|
|
||||||
def extract_node_from_large_entity(self, large_entity_id: str, node_id_to_extract: str) -> bool:
|
def extract_node_from_large_entity(self, large_entity_id: str, node_id_to_extract: str) -> bool:
|
||||||
"""
|
"""
|
||||||
Extracts a node from a large entity by re-adding it to the main processing queue.
|
Extracts a node from a large entity, re-creates its original edge, and
|
||||||
This is a much cleaner approach than storing and replaying raw results.
|
re-queues it for full scanning.
|
||||||
"""
|
"""
|
||||||
if not self.graph.graph.has_node(large_entity_id):
|
if not self.graph.graph.has_node(large_entity_id):
|
||||||
print(f"ERROR: Large entity {large_entity_id} not found.")
|
print(f"ERROR: Large entity {large_entity_id} not found.")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 1. Modify the graph data structure first
|
# 1. Get the original source node that discovered the large entity
|
||||||
# This removes the node from the container's internal list.
|
predecessors = list(self.graph.graph.predecessors(large_entity_id))
|
||||||
|
if not predecessors:
|
||||||
|
print(f"ERROR: No source node found for large entity {large_entity_id}.")
|
||||||
|
return False
|
||||||
|
source_node_id = predecessors[0]
|
||||||
|
|
||||||
|
# Get the original edge data to replicate it for the extracted node
|
||||||
|
original_edge_data = self.graph.graph.get_edge_data(source_node_id, large_entity_id)
|
||||||
|
if not original_edge_data:
|
||||||
|
print(f"ERROR: Could not find original edge data from {source_node_id} to {large_entity_id}.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 2. Modify the graph data structure first
|
||||||
success = self.graph.extract_node_from_large_entity(large_entity_id, node_id_to_extract)
|
success = self.graph.extract_node_from_large_entity(large_entity_id, node_id_to_extract)
|
||||||
if not success:
|
if not success:
|
||||||
print(f"ERROR: Node {node_id_to_extract} could not be removed from {large_entity_id}'s attributes.")
|
print(f"ERROR: Node {node_id_to_extract} could not be removed from {large_entity_id}'s attributes.")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# 2. Re-queue the extracted node for full processing by all eligible providers.
|
# 3. Create the direct edge from the original source to the newly extracted node
|
||||||
# This is the same logic used for any newly discovered node.
|
print(f"Re-creating direct edge from {source_node_id} to extracted node {node_id_to_extract}")
|
||||||
|
self.graph.add_edge(
|
||||||
|
source_id=source_node_id,
|
||||||
|
target_id=node_id_to_extract,
|
||||||
|
relationship_type=original_edge_data.get('relationship_type', 'extracted_from_large_entity'),
|
||||||
|
confidence_score=original_edge_data.get('confidence_score', 0.85), # Slightly lower confidence
|
||||||
|
source_provider=original_edge_data.get('source_provider', 'unknown'),
|
||||||
|
raw_data={'context': f'Extracted from large entity {large_entity_id}'}
|
||||||
|
)
|
||||||
|
|
||||||
|
# 4. Re-queue the extracted node for full processing by all eligible providers
|
||||||
print(f"Re-queueing extracted node {node_id_to_extract} for full reconnaissance...")
|
print(f"Re-queueing extracted node {node_id_to_extract} for full reconnaissance...")
|
||||||
is_ip = _is_valid_ip(node_id_to_extract)
|
is_ip = _is_valid_ip(node_id_to_extract)
|
||||||
current_depth = self.graph.graph.nodes[large_entity_id].get('attributes', {}).get('discovery_depth', 0)
|
current_depth = self.graph.graph.nodes[large_entity_id].get('attributes', {}).get('discovery_depth', 0)
|
||||||
@ -847,21 +868,19 @@ class Scanner:
|
|||||||
eligible_providers = self._get_eligible_providers(node_id_to_extract, is_ip, False)
|
eligible_providers = self._get_eligible_providers(node_id_to_extract, is_ip, False)
|
||||||
for provider in eligible_providers:
|
for provider in eligible_providers:
|
||||||
provider_name = provider.get_name()
|
provider_name = provider.get_name()
|
||||||
# Add the task to the main queue with the correct depth.
|
|
||||||
self.task_queue.put((self._get_priority(provider_name), (provider_name, node_id_to_extract, current_depth)))
|
self.task_queue.put((self._get_priority(provider_name), (provider_name, node_id_to_extract, current_depth)))
|
||||||
self.total_tasks_ever_enqueued += 1
|
self.total_tasks_ever_enqueued += 1
|
||||||
|
|
||||||
# 3. If the scanner is not running, we need to kickstart it to process this one item.
|
# 5. If the scanner is not running, we need to kickstart it to process this one item.
|
||||||
if self.status != ScanStatus.RUNNING:
|
if self.status != ScanStatus.RUNNING:
|
||||||
print("Scanner is idle. Starting a mini-scan to process the extracted node.")
|
print("Scanner is idle. Starting a mini-scan to process the extracted node.")
|
||||||
self.status = ScanStatus.RUNNING
|
self.status = ScanStatus.RUNNING
|
||||||
self._update_session_state()
|
self._update_session_state()
|
||||||
|
|
||||||
# Start a new thread for the scan execution if one isn't running
|
|
||||||
if not self.scan_thread or not self.scan_thread.is_alive():
|
if not self.scan_thread or not self.scan_thread.is_alive():
|
||||||
self.scan_thread = threading.Thread(
|
self.scan_thread = threading.Thread(
|
||||||
target=self._execute_scan,
|
target=self._execute_scan,
|
||||||
args=(self.current_target, self.max_depth), # Use existing target/depth
|
args=(self.current_target, self.max_depth),
|
||||||
daemon=True
|
daemon=True
|
||||||
)
|
)
|
||||||
self.scan_thread.start()
|
self.scan_thread.start()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user