Compare commits

..

No commits in common. "4c48917993b6b07838ee1fcc96ab251158b9c494" and "12f834bb6554c62ee2f68391a995a4764c437a22" have entirely different histories.

3 changed files with 130 additions and 336 deletions

View File

@ -34,7 +34,7 @@ class Config:
'crtsh': 5, 'crtsh': 5,
'shodan': 60, 'shodan': 60,
'dns': 100, 'dns': 100,
'correlation': 0 # Set to 0 to make sure correlations run last 'correlation': 1000 # Set a high limit as it's a local operation
} }
# --- Provider Settings --- # --- Provider Settings ---

View File

@ -258,36 +258,20 @@ class Scanner:
time.sleep(2) time.sleep(2)
def start_scan(self, target: str, max_depth: int = 2, clear_graph: bool = True, force_rescan_target: Optional[str] = None) -> bool: def start_scan(self, target: str, max_depth: int = 2, clear_graph: bool = True, force_rescan_target: Optional[str] = None) -> bool:
"""
Starts a new reconnaissance scan.
"""
if self.scan_thread and self.scan_thread.is_alive(): if self.scan_thread and self.scan_thread.is_alive():
self.logger.logger.info("Stopping existing scan before starting new one")
self._set_stop_signal() self._set_stop_signal()
self.status = ScanStatus.STOPPED self.status = ScanStatus.STOPPED
# Clean up processing state
with self.processing_lock: with self.processing_lock:
self.currently_processing.clear() self.currently_processing.clear()
self.currently_processing_display = [] self.currently_processing_display = []
self.task_queue = PriorityQueue()
# Clear task queue
while not self.task_queue.empty():
try:
self.task_queue.get_nowait()
except:
break
# Shutdown executor
if self.executor: if self.executor:
try: self.executor.shutdown(wait=False, cancel_futures=True)
self.executor.shutdown(wait=False, cancel_futures=True) self.executor = None
except: self.scan_thread.join(5.0)
pass
finally:
self.executor = None
# Wait for scan thread to finish (with timeout)
self.scan_thread.join(timeout=5.0)
if self.scan_thread.is_alive():
self.logger.logger.warning("Previous scan thread did not terminate cleanly")
self.status = ScanStatus.IDLE self.status = ScanStatus.IDLE
self.stop_event.clear() self.stop_event.clear()
@ -310,40 +294,20 @@ class Scanner:
try: try:
if not hasattr(self, 'providers') or not self.providers: if not hasattr(self, 'providers') or not self.providers:
self.logger.logger.error("No providers available for scanning")
return False return False
available_providers = [p for p in self.providers if p.is_available()]
if not available_providers:
self.logger.logger.error("No providers are currently available/configured")
return False
if clear_graph: if clear_graph:
self.graph.clear() self.graph.clear()
self.initial_targets.clear() self.initial_targets.clear()
if force_rescan_target and self.graph.graph.has_node(force_rescan_target): if force_rescan_target and self.graph.graph.has_node(force_rescan_target):
try: node_data = self.graph.graph.nodes[force_rescan_target]
node_data = self.graph.graph.nodes[force_rescan_target] if 'metadata' in node_data and 'provider_states' in node_data['metadata']:
if 'metadata' in node_data and 'provider_states' in node_data['metadata']: node_data['metadata']['provider_states'] = {}
node_data['metadata']['provider_states'] = {}
self.logger.logger.info(f"Cleared provider states for forced rescan of {force_rescan_target}")
except Exception as e:
self.logger.logger.warning(f"Error clearing provider states for {force_rescan_target}: {e}")
target = target.lower().strip() self.current_target = target.lower().strip()
if not target:
self.logger.logger.error("Empty target provided")
return False
from utils.helpers import is_valid_target
if not is_valid_target(target):
self.logger.logger.error(f"Invalid target format: {target}")
return False
self.current_target = target
self.initial_targets.add(self.current_target) self.initial_targets.add(self.current_target)
self.max_depth = max(1, min(5, max_depth)) # Clamp depth between 1-5 self.max_depth = max_depth
self.current_depth = 0 self.current_depth = 0
self.total_indicators_found = 0 self.total_indicators_found = 0
@ -356,77 +320,56 @@ class Scanner:
self._update_session_state() self._update_session_state()
self.logger = new_session() self.logger = new_session()
try: self.scan_thread = threading.Thread(
self.scan_thread = threading.Thread( target=self._execute_scan,
target=self._execute_scan, args=(self.current_target, max_depth),
args=(self.current_target, self.max_depth), daemon=True
daemon=True, )
name=f"ScanThread-{self.session_id or 'default'}" self.scan_thread.start()
)
self.scan_thread.start()
self.status_logger_stop_event.clear() self.status_logger_stop_event.clear()
self.status_logger_thread = threading.Thread( self.status_logger_thread = threading.Thread(target=self._status_logger_thread, daemon=True)
target=self._status_logger_thread, self.status_logger_thread.start()
daemon=True,
name=f"StatusLogger-{self.session_id or 'default'}"
)
self.status_logger_thread.start()
self.logger.logger.info(f"Scan started successfully for {target} with depth {self.max_depth}") return True
return True
except Exception as e:
self.logger.logger.error(f"Error starting scan threads: {e}")
self.status = ScanStatus.FAILED
self._update_session_state()
return False
except Exception as e: except Exception as e:
self.logger.logger.error(f"Error in scan startup: {e}")
traceback.print_exc() traceback.print_exc()
self.status = ScanStatus.FAILED self.status = ScanStatus.FAILED
self._update_session_state() self._update_session_state()
return False return False
def _get_priority(self, provider_name): def _get_priority(self, provider_name):
if provider_name == 'correlation':
return 100 # Highest priority number = lowest priority (runs last)
rate_limit = self.config.get_rate_limit(provider_name) rate_limit = self.config.get_rate_limit(provider_name)
# Handle edge cases # Define the logarithmic scale
if rate_limit <= 0: if rate_limit < 10:
return 90 # Very low priority for invalid/disabled providers return 10 # Highest priority number (lowest priority) for very low rate limits
if provider_name == 'dns': # Calculate logarithmic value and map to priority levels
return 1 # DNS is fastest, should run first # Lower rate limits get higher priority numbers (lower priority)
elif provider_name == 'shodan': log_value = math.log10(rate_limit)
return 3 # Shodan is medium speed, good priority priority = 10 - int(log_value * 2) # Scale factor to get more granular levels
elif provider_name == 'crtsh':
return 5 # crt.sh is slower, lower priority # Ensure priority is within a reasonable range (1-10)
else: priority = max(1, min(10, priority))
# For any other providers, use rate limit as a guide
if rate_limit >= 100: return priority
return 2 # High rate limit = high priority
elif rate_limit >= 50:
return 4 # Medium-high rate limit = medium-high priority
elif rate_limit >= 20:
return 6 # Medium rate limit = medium priority
elif rate_limit >= 5:
return 8 # Low rate limit = low priority
else:
return 10 # Very low rate limit = very low priority
def _execute_scan(self, target: str, max_depth: int) -> None: def _execute_scan(self, target: str, max_depth: int) -> None:
"""
Execute the reconnaissance scan with a time-based, robust scheduler.
Handles rate-limiting via deferral and failures via exponential backoff.
"""
self.executor = ThreadPoolExecutor(max_workers=self.max_workers) self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
processed_tasks = set() # FIXED: Now includes depth to avoid incorrect skipping processed_tasks = set()
is_ip = _is_valid_ip(target) is_ip = _is_valid_ip(target)
initial_providers = self._get_eligible_providers(target, is_ip, False) initial_providers = self._get_eligible_providers(target, is_ip, False)
for provider in initial_providers: for provider in initial_providers:
provider_name = provider.get_name() provider_name = provider.get_name()
priority = self._get_priority(provider_name) priority = self._get_priority(provider_name)
# OVERHAUL: Enqueue with current timestamp to run immediately
self.task_queue.put((time.time(), priority, (provider_name, target, 0))) self.task_queue.put((time.time(), priority, (provider_name, target, 0)))
self.total_tasks_ever_enqueued += 1 self.total_tasks_ever_enqueued += 1
@ -440,156 +383,101 @@ class Scanner:
node_type = NodeType.IP if is_ip else NodeType.DOMAIN node_type = NodeType.IP if is_ip else NodeType.DOMAIN
self.graph.add_node(target, node_type) self.graph.add_node(target, node_type)
self._initialize_provider_states(target) self._initialize_provider_states(target)
consecutive_empty_iterations = 0
max_empty_iterations = 50 # Allow 5 seconds of empty queue before considering completion
while not self._is_stop_requested(): while not self._is_stop_requested():
queue_empty = self.task_queue.empty() if self.task_queue.empty() and not self.currently_processing:
with self.processing_lock: break # Scan is complete
no_active_processing = len(self.currently_processing) == 0
if queue_empty and no_active_processing:
consecutive_empty_iterations += 1
if consecutive_empty_iterations >= max_empty_iterations:
break # Scan is complete
time.sleep(0.1)
continue
else:
consecutive_empty_iterations = 0
# FIXED: Safe task retrieval without race conditions
try: try:
# Use timeout to avoid blocking indefinitely # OVERHAUL: Peek at the next task to see if it's ready to run
run_at, priority, (provider_name, target_item, depth) = self.task_queue.get(timeout=0.1) next_run_at, _, _ = self.task_queue.queue[0]
if next_run_at > time.time():
# FIXED: Check if task is ready to run time.sleep(0.1) # Sleep to prevent busy-waiting for future tasks
current_time = time.time()
if run_at > current_time:
# Task is not ready yet, re-queue it and continue
self.task_queue.put((run_at, priority, (provider_name, target_item, depth)))
time.sleep(min(0.5, run_at - current_time)) # Sleep until closer to run time
continue continue
except: # Queue is empty or timeout occurred # Task is ready, so get it from the queue
time.sleep(0.1) run_at, priority, (provider_name, target_item, depth) = self.task_queue.get()
self.last_task_from_queue = (run_at, priority, (provider_name, target_item, depth))
except IndexError:
time.sleep(0.1) # Queue is empty, but tasks might still be processing
continue continue
self.last_task_from_queue = (run_at, priority, (provider_name, target_item, depth)) task_tuple = (provider_name, target_item)
# FIXED: Include depth in processed tasks to avoid incorrect skipping
task_tuple = (provider_name, target_item, depth)
if task_tuple in processed_tasks: if task_tuple in processed_tasks:
self.tasks_skipped += 1 self.tasks_skipped += 1
self.indicators_completed += 1 self.indicators_completed +=1
continue continue
# FIXED: Proper depth checking
if depth > max_depth: if depth > max_depth:
self.tasks_skipped += 1
self.indicators_completed += 1
continue continue
# FIXED: Rate limiting with proper time-based deferral # OVERHAUL: Handle rate limiting with time-based deferral
if self.rate_limiter.is_rate_limited(provider_name, self.config.get_rate_limit(provider_name), 60): if self.rate_limiter.is_rate_limited(provider_name, self.config.get_rate_limit(provider_name), 60):
defer_until = time.time() + 60 # Defer for 60 seconds defer_until = time.time() + 60 # Defer for 60 seconds
self.task_queue.put((defer_until, priority, (provider_name, target_item, depth))) self.task_queue.put((defer_until, priority, (provider_name, target_item, depth)))
self.tasks_re_enqueued += 1 self.tasks_re_enqueued += 1
continue continue
# FIXED: Thread-safe processing state management
with self.processing_lock: with self.processing_lock:
if self._is_stop_requested(): if self._is_stop_requested(): break
break self.currently_processing.add(task_tuple)
# Use provider+target (without depth) for duplicate processing check
processing_key = (provider_name, target_item)
if processing_key in self.currently_processing:
# Already processing this provider+target combination, skip
self.tasks_skipped += 1
self.indicators_completed += 1
continue
self.currently_processing.add(processing_key)
try: try:
self.current_depth = depth self.current_depth = depth
self.current_indicator = target_item self.current_indicator = target_item
self._update_session_state() self._update_session_state()
if self._is_stop_requested(): if self._is_stop_requested(): break
break
provider = next((p for p in self.providers if p.get_name() == provider_name), None) provider = next((p for p in self.providers if p.get_name() == provider_name), None)
if provider: if provider:
new_targets, _, success = self._process_provider_task(provider, target_item, depth) new_targets, _, success = self._process_provider_task(provider, target_item, depth)
if self._is_stop_requested(): if self._is_stop_requested(): break
break
if not success: if not success:
# FIXED: Use depth-aware retry key self.target_retries[task_tuple] += 1
retry_key = (provider_name, target_item, depth) if self.target_retries[task_tuple] <= self.config.max_retries_per_target:
self.target_retries[retry_key] += 1 # OVERHAUL: Exponential backoff for retries
retry_count = self.target_retries[task_tuple]
if self.target_retries[retry_key] <= self.config.max_retries_per_target: backoff_delay = (2 ** retry_count) + random.uniform(0, 1) # Add jitter
# FIXED: Exponential backoff with jitter for retries
retry_count = self.target_retries[retry_key]
backoff_delay = min(300, (2 ** retry_count) + random.uniform(0, 1)) # Cap at 5 minutes
retry_at = time.time() + backoff_delay retry_at = time.time() + backoff_delay
self.task_queue.put((retry_at, priority, (provider_name, target_item, depth))) self.task_queue.put((retry_at, priority, (provider_name, target_item, depth)))
self.tasks_re_enqueued += 1 self.tasks_re_enqueued += 1
self.logger.logger.debug(f"Retrying {provider_name}:{target_item} in {backoff_delay:.1f}s (attempt {retry_count})")
else: else:
self.scan_failed_due_to_retries = True self.scan_failed_due_to_retries = True
self._log_target_processing_error(str(task_tuple), f"Max retries ({self.config.max_retries_per_target}) exceeded") self._log_target_processing_error(str(task_tuple), "Max retries exceeded")
else: else:
processed_tasks.add(task_tuple) processed_tasks.add(task_tuple)
self.indicators_completed += 1 self.indicators_completed += 1
# FIXED: Enqueue new targets with proper depth tracking
if not self._is_stop_requested(): if not self._is_stop_requested():
for new_target in new_targets: for new_target in new_targets:
is_ip_new = _is_valid_ip(new_target) is_ip_new = _is_valid_ip(new_target)
eligible_providers_new = self._get_eligible_providers(new_target, is_ip_new, False) eligible_providers_new = self._get_eligible_providers(new_target, is_ip_new, False)
for p_new in eligible_providers_new: for p_new in eligible_providers_new:
p_name_new = p_new.get_name() p_name_new = p_new.get_name()
new_depth = depth + 1 # Always increment depth for discovered targets if (p_name_new, new_target) not in processed_tasks:
new_task_tuple = (p_name_new, new_target, new_depth) new_depth = depth + 1 if new_target in new_targets else depth
# FIXED: Don't re-enqueue already processed tasks
if new_task_tuple not in processed_tasks and new_depth <= max_depth:
new_priority = self._get_priority(p_name_new) new_priority = self._get_priority(p_name_new)
# Enqueue new tasks to run immediately # OVERHAUL: Enqueue new tasks to run immediately
self.task_queue.put((time.time(), new_priority, (p_name_new, new_target, new_depth))) self.task_queue.put((time.time(), new_priority, (p_name_new, new_target, new_depth)))
self.total_tasks_ever_enqueued += 1 self.total_tasks_ever_enqueued += 1
else:
self.logger.logger.warning(f"Provider {provider_name} not found in active providers")
self.tasks_skipped += 1
self.indicators_completed += 1
finally: finally:
# FIXED: Always clean up processing state
with self.processing_lock: with self.processing_lock:
processing_key = (provider_name, target_item) self.currently_processing.discard(task_tuple)
self.currently_processing.discard(processing_key)
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
self.status = ScanStatus.FAILED self.status = ScanStatus.FAILED
self.logger.logger.error(f"Scan failed: {e}") self.logger.logger.error(f"Scan failed: {e}")
finally: finally:
# FIXED: Comprehensive cleanup
with self.processing_lock: with self.processing_lock:
self.currently_processing.clear() self.currently_processing.clear()
self.currently_processing_display = [] self.currently_processing_display = []
# FIXED: Clear any remaining tasks from queue to prevent memory leaks
while not self.task_queue.empty():
try:
self.task_queue.get_nowait()
except:
break
if self._is_stop_requested(): if self._is_stop_requested():
self.status = ScanStatus.STOPPED self.status = ScanStatus.STOPPED
elif self.scan_failed_due_to_retries: elif self.scan_failed_due_to_retries:
@ -598,19 +486,14 @@ class Scanner:
self.status = ScanStatus.COMPLETED self.status = ScanStatus.COMPLETED
self.status_logger_stop_event.set() self.status_logger_stop_event.set()
if self.status_logger_thread and self.status_logger_thread.is_alive(): if self.status_logger_thread:
self.status_logger_thread.join(timeout=2.0) # Don't wait forever self.status_logger_thread.join()
self._update_session_state() self._update_session_state()
self.logger.log_scan_complete() self.logger.log_scan_complete()
if self.executor: if self.executor:
try: self.executor.shutdown(wait=False, cancel_futures=True)
self.executor.shutdown(wait=False, cancel_futures=True) self.executor = None
except Exception as e:
self.logger.logger.warning(f"Error shutting down executor: {e}")
finally:
self.executor = None
def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]: def _process_provider_task(self, provider: BaseProvider, target: str, depth: int) -> Tuple[Set[str], Set[str], bool]:
""" """
@ -697,7 +580,30 @@ class Scanner:
if self._is_stop_requested(): if self._is_stop_requested():
return discovered_targets, False return discovered_targets, False
# Process all attributes first, grouping by target node
attributes_by_node = defaultdict(list)
for attribute in provider_result.attributes:
attr_dict = {
"name": attribute.name,
"value": attribute.value,
"type": attribute.type,
"provider": attribute.provider,
"confidence": attribute.confidence,
"metadata": attribute.metadata
}
attributes_by_node[attribute.target_node].append(attr_dict)
# FIXED: Add attributes to existing nodes AND create new nodes (like correlation nodes)
for node_id, node_attributes_list in attributes_by_node.items():
if provider_name == 'correlation' and not self.graph.graph.has_node(node_id):
node_type = NodeType.CORRELATION_OBJECT
elif _is_valid_ip(node_id):
node_type = NodeType.IP
else:
node_type = NodeType.DOMAIN
self.graph.add_node(node_id, node_type, attributes=node_attributes_list)
# Check if this should be a large entity # Check if this should be a large entity
if provider_result.get_relationship_count() > self.config.large_entity_threshold: if provider_result.get_relationship_count() > self.config.large_entity_threshold:
members = self._create_large_entity_from_provider_result(target, provider_name, provider_result, current_depth) members = self._create_large_entity_from_provider_result(target, provider_name, provider_result, current_depth)
@ -748,30 +654,6 @@ class Scanner:
if (_is_valid_domain(target_node) or _is_valid_ip(target_node)) and not max_depth_reached: if (_is_valid_domain(target_node) or _is_valid_ip(target_node)) and not max_depth_reached:
discovered_targets.add(target_node) discovered_targets.add(target_node)
# Process all attributes, grouping by target node
attributes_by_node = defaultdict(list)
for attribute in provider_result.attributes:
attr_dict = {
"name": attribute.name,
"value": attribute.value,
"type": attribute.type,
"provider": attribute.provider,
"confidence": attribute.confidence,
"metadata": attribute.metadata
}
attributes_by_node[attribute.target_node].append(attr_dict)
# Add attributes to existing nodes OR create new nodes if they don't exist
for node_id, node_attributes_list in attributes_by_node.items():
if not self.graph.graph.has_node(node_id):
# If the node doesn't exist, create it with a default type
node_type = NodeType.IP if _is_valid_ip(node_id) else NodeType.DOMAIN
self.graph.add_node(node_id, node_type, attributes=node_attributes_list)
else:
# If the node already exists, just add the attributes
node_type_val = self.graph.graph.nodes[node_id].get('type', 'domain')
self.graph.add_node(node_id, NodeType(node_type_val), attributes=node_attributes_list)
return discovered_targets, False return discovered_targets, False
def _create_large_entity_from_provider_result(self, source: str, provider_name: str, def _create_large_entity_from_provider_result(self, source: str, provider_name: str,
@ -954,108 +836,47 @@ class Scanner:
return { 'status': 'error', 'message': 'Failed to get status' } return { 'status': 'error', 'message': 'Failed to get status' }
def _initialize_provider_states(self, target: str) -> None: def _initialize_provider_states(self, target: str) -> None:
""" """Initialize provider states for forensic tracking."""
FIXED: Safer provider state initialization with error handling. if not self.graph.graph.has_node(target): return
""" node_data = self.graph.graph.nodes[target]
try: if 'metadata' not in node_data: node_data['metadata'] = {}
if not self.graph.graph.has_node(target): if 'provider_states' not in node_data['metadata']: node_data['metadata']['provider_states'] = {}
return
node_data = self.graph.graph.nodes[target]
if 'metadata' not in node_data:
node_data['metadata'] = {}
if 'provider_states' not in node_data['metadata']:
node_data['metadata']['provider_states'] = {}
except Exception as e:
self.logger.logger.warning(f"Error initializing provider states for {target}: {e}")
def _get_eligible_providers(self, target: str, is_ip: bool, dns_only: bool) -> List: def _get_eligible_providers(self, target: str, is_ip: bool, dns_only: bool) -> List:
""" """Get providers eligible for querying this target."""
FIXED: Improved provider eligibility checking with better filtering.
"""
if dns_only: if dns_only:
return [p for p in self.providers if p.get_name() == 'dns'] return [p for p in self.providers if p.get_name() == 'dns']
eligible = [] eligible = []
target_key = 'ips' if is_ip else 'domains' target_key = 'ips' if is_ip else 'domains'
for provider in self.providers: for provider in self.providers:
try: if provider.get_eligibility().get(target_key):
# Check if provider supports this target type
if not provider.get_eligibility().get(target_key, False):
continue
# Check if provider is available/configured
if not provider.is_available():
continue
# Check if we already successfully queried this provider
if not self._already_queried_provider(target, provider.get_name()): if not self._already_queried_provider(target, provider.get_name()):
eligible.append(provider) eligible.append(provider)
except Exception as e:
self.logger.logger.warning(f"Error checking provider eligibility {provider.get_name()}: {e}")
continue
return eligible return eligible
def _already_queried_provider(self, target: str, provider_name: str) -> bool: def _already_queried_provider(self, target: str, provider_name: str) -> bool:
""" """Check if we already successfully queried a provider for a target."""
FIXED: More robust check for already queried providers with proper error handling. if not self.graph.graph.has_node(target): return False
""" node_data = self.graph.graph.nodes[target]
try: provider_states = node_data.get('metadata', {}).get('provider_states', {})
if not self.graph.graph.has_node(target): provider_state = provider_states.get(provider_name)
return False return provider_state is not None and provider_state.get('status') == 'success'
node_data = self.graph.graph.nodes[target]
provider_states = node_data.get('metadata', {}).get('provider_states', {})
provider_state = provider_states.get(provider_name)
# Only consider it already queried if it was successful
return (provider_state is not None and
provider_state.get('status') == 'success' and
provider_state.get('results_count', 0) > 0)
except Exception as e:
self.logger.logger.warning(f"Error checking provider state for {target}:{provider_name}: {e}")
return False
def _update_provider_state(self, target: str, provider_name: str, status: str, def _update_provider_state(self, target: str, provider_name: str, status: str,
results_count: int, error: Optional[str], start_time: datetime) -> None: results_count: int, error: Optional[str], start_time: datetime) -> None:
""" """Update provider state in node metadata for forensic tracking."""
FIXED: More robust provider state updates with validation. if not self.graph.graph.has_node(target): return
""" node_data = self.graph.graph.nodes[target]
try: if 'metadata' not in node_data: node_data['metadata'] = {}
if not self.graph.graph.has_node(target): if 'provider_states' not in node_data['metadata']: node_data['metadata']['provider_states'] = {}
self.logger.logger.warning(f"Cannot update provider state: node {target} not found") node_data['metadata']['provider_states'][provider_name] = {
return 'status': status,
'timestamp': start_time.isoformat(),
node_data = self.graph.graph.nodes[target] 'results_count': results_count,
if 'metadata' not in node_data: 'error': error,
node_data['metadata'] = {} 'duration_ms': (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
if 'provider_states' not in node_data['metadata']: }
node_data['metadata']['provider_states'] = {}
# Calculate duration safely
try:
duration_ms = (datetime.now(timezone.utc) - start_time).total_seconds() * 1000
except Exception:
duration_ms = 0
node_data['metadata']['provider_states'][provider_name] = {
'status': status,
'timestamp': start_time.isoformat(),
'results_count': max(0, results_count), # Ensure non-negative
'error': str(error) if error else None,
'duration_ms': duration_ms
}
# Update last modified time for forensic integrity
self.last_modified = datetime.now(timezone.utc).isoformat()
except Exception as e:
self.logger.logger.error(f"Error updating provider state for {target}:{provider_name}: {e}")
def _log_target_processing_error(self, target: str, error: str) -> None: def _log_target_processing_error(self, target: str, error: str) -> None:
self.logger.logger.error(f"Target processing failed for {target}: {error}") self.logger.logger.error(f"Target processing failed for {target}: {error}")
@ -1063,28 +884,8 @@ class Scanner:
self.logger.logger.error(f"Provider {provider_name} failed for {target}: {error}") self.logger.logger.error(f"Provider {provider_name} failed for {target}: {error}")
def _calculate_progress(self) -> float: def _calculate_progress(self) -> float:
try: if self.total_tasks_ever_enqueued == 0: return 0.0
if self.total_tasks_ever_enqueued == 0: return min(100.0, (self.indicators_completed / self.total_tasks_ever_enqueued) * 100)
return 0.0
# Add small buffer for tasks still in queue to avoid showing 100% too early
queue_size = max(0, self.task_queue.qsize())
with self.processing_lock:
active_tasks = len(self.currently_processing)
# Adjust total to account for remaining work
adjusted_total = max(self.total_tasks_ever_enqueued,
self.indicators_completed + queue_size + active_tasks)
if adjusted_total == 0:
return 100.0
progress = (self.indicators_completed / adjusted_total) * 100
return max(0.0, min(100.0, progress)) # Clamp between 0 and 100
except Exception as e:
self.logger.logger.warning(f"Error calculating progress: {e}")
return 0.0
def get_graph_data(self) -> Dict[str, Any]: def get_graph_data(self) -> Dict[str, Any]:
graph_data = self.graph.get_graph_data() graph_data = self.graph.get_graph_data()

View File

@ -39,7 +39,6 @@ class ShodanProvider(BaseProvider):
return False return False
try: try:
response = self.session.get(f"{self.base_url}/api-info?key={self.api_key}", timeout=5) response = self.session.get(f"{self.base_url}/api-info?key={self.api_key}", timeout=5)
self.logger.logger.debug("Shodan is reacheable")
return response.status_code == 200 return response.status_code == 200
except requests.exceptions.RequestException: except requests.exceptions.RequestException:
return False return False
@ -108,12 +107,6 @@ class ShodanProvider(BaseProvider):
except (json.JSONDecodeError, ValueError, KeyError): except (json.JSONDecodeError, ValueError, KeyError):
return "stale" return "stale"
def query_domain(self, domain: str) -> ProviderResult:
"""
Shodan does not support domain queries. This method returns an empty result.
"""
return ProviderResult()
def query_ip(self, ip: str) -> ProviderResult: def query_ip(self, ip: str) -> ProviderResult:
""" """
Query Shodan for information about an IP address (IPv4 or IPv6), with caching of processed data. Query Shodan for information about an IP address (IPv4 or IPv6), with caching of processed data.