diff --git a/shodan_analyzer.py b/shodan_analyzer.py index a756970..bb15d26 100644 --- a/shodan_analyzer.py +++ b/shodan_analyzer.py @@ -1,9 +1,9 @@ from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager import requests -import json import ipaddress import os +import time class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with Shodan data.""" @@ -15,14 +15,17 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '') - self.processed_ips = set() # Track processed IPs to avoid duplicates def run(self): """Main analyzer logic.""" if not self.shodan_api_key: return "Shodan API key not configured" - - # Query for events with source_ip that haven't been processed + + # Process in small batches to avoid timeout + batch_size = 50 # Process 50 events at a time + total_processed = 0 + enriched_count = 0 + query = { 'query': { 'bool': { @@ -30,45 +33,69 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): {'exists': {'field': 'source_ip'}} ], 'must_not': [ - {'exists': {'field': 'shodan_org'}} # Skip if already enriched + {'exists': {'field': 'shodan_checked'}} ] } } } - events = self.event_stream(query_dsl=query, return_fields=['source_ip']) - processed_count = 0 - skipped_count = 0 - - for event in events: - source_ip = event.source.get('source_ip') + try: + print(f"🚀 Starting Shodan enrichment in batches of {batch_size}") - if source_ip: - # Skip if we've already processed this IP in this run - if source_ip in self.processed_ips: - skipped_count += 1 + # Process events in smaller chunks + events = self.event_stream(query_dsl=query, return_fields=['source_ip']) + processed_ips = set() + batch_count = 0 + + for event in events: + source_ip = event.source.get('source_ip') + + if not source_ip: continue - - if self._is_public_ip(source_ip): - print(f"Processing new IP: {source_ip}") - self.processed_ips.add(source_ip) - - shodan_data = self._get_shodan_data(source_ip) - if shodan_data: - self._enrich_event(event, shodan_data) - processed_count += 1 - print(f"✅ Enriched {source_ip} with Shodan data") - else: - # Still mark as processed even if no data found - self._mark_as_processed(event) - - # Rate limiting - import time - time.sleep(1) + + if not self._is_public_ip(source_ip): + # Mark private IPs as checked to skip them next time + event.add_attributes({'shodan_checked': True, 'shodan_private_ip': True}) + event.commit() + continue + + # Skip if already processed in this batch + if source_ip in processed_ips: + event.add_attributes({'shodan_checked': True}) + event.commit() + continue + + processed_ips.add(source_ip) + print(f"🔍 Processing IP ({batch_count + 1}/{batch_size}): {source_ip}") + + # Get Shodan data + shodan_data = self._get_shodan_data(source_ip) + + if shodan_data: + self._enrich_event(event, shodan_data) + enriched_count += 1 + print(f"✅ Enriched {source_ip} with Shodan data") else: - print(f"Skipping private IP: {source_ip}") - - return f"Processed {processed_count} IPs, skipped {skipped_count} duplicates" + # Mark as checked even if no data found + event.add_attributes({'shodan_checked': True, 'shodan_no_data': True}) + event.commit() + print(f"❌ No Shodan data for {source_ip}") + + total_processed += 1 + batch_count += 1 + + # Rate limit and batch control + if batch_count >= batch_size: + print(f"📊 Completed batch: processed {total_processed} events, enriched {enriched_count}") + break + + # Rate limiting between API calls + time.sleep(1) + + except Exception as e: + print(f"💥 Error during processing: {e}") + + return f"Processed {total_processed} events, enriched {enriched_count} with Shodan data" def _get_shodan_data(self, ip): """Fetch Shodan data for IP.""" @@ -76,26 +103,24 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): url = f"https://api.shodan.io/shodan/host/{ip}" params = {'key': self.shodan_api_key} - print(f"🔍 Querying Shodan for: {ip}") response = requests.get(url, params=params, timeout=10) if response.status_code == 200: - print(f"📊 Found Shodan data for {ip}") return response.json() elif response.status_code == 404: - print(f"❌ No Shodan data for {ip}") - return None + return None # No data found else: - print(f"⚠️ Shodan API error for {ip}: {response.status_code}") + print(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}") return None except Exception as e: - print(f"💥 Error fetching Shodan data for {ip}: {e}") + print(f"💥 Request error for {ip}: {e}") return None def _enrich_event(self, event, shodan_data): """Add Shodan data to the event.""" try: + # Core enrichment data enrichment = { 'shodan_org': shodan_data.get('org', ''), 'shodan_isp': shodan_data.get('isp', ''), @@ -104,37 +129,50 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): 'shodan_ports': shodan_data.get('ports', []), 'shodan_hostnames': shodan_data.get('hostnames', []), 'shodan_last_update': shodan_data.get('last_update', ''), + 'shodan_checked': True } - # Add top services + # Add service information (top 3 services) if shodan_data.get('data'): services = [] - for service in shodan_data.get('data', [])[:3]: # Top 3 services + for service in shodan_data.get('data', [])[:3]: port = service.get('port', 'Unknown') - product = service.get('product', 'Unknown') - services.append(f"{port}/{product}") + product = service.get('product', '') + version = service.get('version', '') + + service_str = str(port) + if product: + service_str += f"/{product}" + if version: + service_str += f" {version}" + + services.append(service_str) + enrichment['shodan_services'] = services + # Add vulnerability information if available + if shodan_data.get('vulns'): + enrichment['shodan_vulns'] = list(shodan_data.get('vulns', []))[:5] # Top 5 vulns + event.add_attributes(enrichment) event.add_tags(['shodan-enriched']) event.commit() except Exception as e: - print(f"💥 Error enriching event: {e}") - - def _mark_as_processed(self, event): - """Mark event as processed even if no Shodan data found.""" - try: - event.add_attributes({'shodan_checked': True}) - event.commit() - except Exception: - pass + print(f"💥 Error enriching event for {event.source.get('source_ip', 'unknown')}: {e}") + # Still mark as checked to avoid reprocessing + try: + event.add_attributes({'shodan_checked': True, 'shodan_error': str(e)}) + event.commit() + except: + pass def _is_public_ip(self, ip): - """Check if IP is public.""" + """Check if IP is public (not private/reserved).""" try: ip_obj = ipaddress.ip_address(ip) - return ip_obj.is_global + # Check if IP is global (public) and not in reserved ranges + return ip_obj.is_global and not ip_obj.is_reserved except (ValueError, ipaddress.AddressValueError): return False