from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager import requests import ipaddress import os import time class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with Shodan data.""" NAME = 'shodan_enrichment' DISPLAY_NAME = 'Shodan IP Enrichment' DESCRIPTION = 'Enriches source IP addresses with Shodan historical data' def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '') def run(self): """Main analyzer logic.""" if not self.shodan_api_key: return "Shodan API key not configured" # Process in small batches to avoid timeout batch_size = 50 # Process 50 events at a time total_processed = 0 enriched_count = 0 query = { 'query': { 'bool': { 'must': [ {'exists': {'field': 'source_ip'}} ], 'must_not': [ {'exists': {'field': 'shodan_checked'}} ] } } } try: print(f"🚀 Starting Shodan enrichment in batches of {batch_size}") # Process events in smaller chunks events = self.event_stream(query_dsl=query, return_fields=['source_ip']) processed_ips = set() batch_count = 0 for event in events: source_ip = event.source.get('source_ip') if not source_ip: continue if not self._is_public_ip(source_ip): # Mark private IPs as checked to skip them next time event.add_attributes({'shodan_checked': True, 'shodan_private_ip': True}) event.commit() continue # Skip if already processed in this batch if source_ip in processed_ips: event.add_attributes({'shodan_checked': True}) event.commit() continue processed_ips.add(source_ip) print(f"🔍 Processing IP ({batch_count + 1}/{batch_size}): {source_ip}") # Get Shodan data shodan_data = self._get_shodan_data(source_ip) if shodan_data: self._enrich_event(event, shodan_data) enriched_count += 1 print(f"✅ Enriched {source_ip} with Shodan data") else: # Mark as checked even if no data found event.add_attributes({'shodan_checked': True, 'shodan_no_data': True}) event.commit() print(f"❌ No Shodan data for {source_ip}") total_processed += 1 batch_count += 1 # Rate limit and batch control if batch_count >= batch_size: print(f"📊 Completed batch: processed {total_processed} events, enriched {enriched_count}") break # Rate limiting between API calls time.sleep(1) except Exception as e: print(f"💥 Error during processing: {e}") return f"Processed {total_processed} events, enriched {enriched_count} with Shodan data" def _get_shodan_data(self, ip): """Fetch Shodan data for IP.""" try: url = f"https://api.shodan.io/shodan/host/{ip}" params = {'key': self.shodan_api_key} response = requests.get(url, params=params, timeout=10) if response.status_code == 200: return response.json() elif response.status_code == 404: return None # No data found else: print(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}") return None except Exception as e: print(f"💥 Request error for {ip}: {e}") return None def _enrich_event(self, event, shodan_data): """Add Shodan data to the event.""" try: # Core enrichment data enrichment = { 'shodan_org': shodan_data.get('org', ''), 'shodan_isp': shodan_data.get('isp', ''), 'shodan_country': shodan_data.get('location', {}).get('country_name', ''), 'shodan_city': shodan_data.get('location', {}).get('city', ''), 'shodan_ports': shodan_data.get('ports', []), 'shodan_hostnames': shodan_data.get('hostnames', []), 'shodan_last_update': shodan_data.get('last_update', ''), 'shodan_checked': True } # Add service information (top 3 services) if shodan_data.get('data'): services = [] for service in shodan_data.get('data', [])[:3]: port = service.get('port', 'Unknown') product = service.get('product', '') version = service.get('version', '') service_str = str(port) if product: service_str += f"/{product}" if version: service_str += f" {version}" services.append(service_str) enrichment['shodan_services'] = services # Add vulnerability information if available if shodan_data.get('vulns'): enrichment['shodan_vulns'] = list(shodan_data.get('vulns', []))[:5] # Top 5 vulns event.add_attributes(enrichment) event.add_tags(['shodan-enriched']) event.commit() except Exception as e: print(f"💥 Error enriching event for {event.source.get('source_ip', 'unknown')}: {e}") # Still mark as checked to avoid reprocessing try: event.add_attributes({'shodan_checked': True, 'shodan_error': str(e)}) event.commit() except: pass def _is_public_ip(self, ip): """Check if IP is public (not private/reserved).""" try: ip_obj = ipaddress.ip_address(ip) # Check if IP is global (public) and not in reserved ranges return ip_obj.is_global and not ip_obj.is_reserved except (ValueError, ipaddress.AddressValueError): return False # Register the analyzer manager.AnalysisManager.register_analyzer(ShodanEnrichmentAnalyzer)