diff --git a/shodan_analyzer.py b/shodan_analyzer.py index 11aa2e8..a756970 100644 --- a/shodan_analyzer.py +++ b/shodan_analyzer.py @@ -2,10 +2,8 @@ from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager import requests import json -from datetime import datetime import ipaddress import os -import logging class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with Shodan data.""" @@ -16,123 +14,128 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) - - # Set up logging - self.logger = logging.getLogger(self.__class__.__name__) - - # Get API key from environment variables self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '') - self.max_time_diff_hours = 24 - self.rate_limit_delay = 1 - - if not self.shodan_api_key: - self.logger.error("Shodan API key not configured in environment variables") + self.processed_ips = set() # Track processed IPs to avoid duplicates def run(self): """Main analyzer logic.""" if not self.shodan_api_key: return "Shodan API key not configured" + # Query for events with source_ip that haven't been processed query = { 'query': { 'bool': { 'must': [ - {'exists': {'field': 'source_ip'}}, - {'bool': {'must_not': [{'term': {'__ts_analyzer_shodan_enrichment': True}}]}} + {'exists': {'field': 'source_ip'}} + ], + 'must_not': [ + {'exists': {'field': 'shodan_org'}} # Skip if already enriched ] } } } - events = self.event_stream(query_dsl=query, return_fields=['source_ip', 'timestamp']) + events = self.event_stream(query_dsl=query, return_fields=['source_ip']) processed_count = 0 + skipped_count = 0 for event in events: source_ip = event.source.get('source_ip') - timestamp = event.source.get('timestamp') - if source_ip and self._is_public_ip(source_ip): - print(f"Processing IP: {source_ip}") # Use print for now - shodan_data = self._get_shodan_data(source_ip) - if shodan_data: - self._enrich_event(event, shodan_data) - processed_count += 1 + if source_ip: + # Skip if we've already processed this IP in this run + if source_ip in self.processed_ips: + skipped_count += 1 + continue - # Rate limiting - import time - time.sleep(self.rate_limit_delay) + if self._is_public_ip(source_ip): + print(f"Processing new IP: {source_ip}") + self.processed_ips.add(source_ip) + + shodan_data = self._get_shodan_data(source_ip) + if shodan_data: + self._enrich_event(event, shodan_data) + processed_count += 1 + print(f"✅ Enriched {source_ip} with Shodan data") + else: + # Still mark as processed even if no data found + self._mark_as_processed(event) + + # Rate limiting + import time + time.sleep(1) + else: + print(f"Skipping private IP: {source_ip}") - return f"Processed {processed_count} events with Shodan data" + return f"Processed {processed_count} IPs, skipped {skipped_count} duplicates" def _get_shodan_data(self, ip): """Fetch Shodan data for IP.""" try: url = f"https://api.shodan.io/shodan/host/{ip}" - params = { - 'key': self.shodan_api_key, - 'history': 'true' - } + params = {'key': self.shodan_api_key} - print(f"Querying Shodan API for IP: {ip}") # Use print for now + print(f"🔍 Querying Shodan for: {ip}") response = requests.get(url, params=params, timeout=10) if response.status_code == 200: - print(f"Successfully retrieved Shodan data for {ip}") + print(f"📊 Found Shodan data for {ip}") return response.json() elif response.status_code == 404: - print(f'No Shodan data found for {ip}') + print(f"❌ No Shodan data for {ip}") return None else: - print(f'Shodan API error for {ip}: {response.status_code} - {response.text}') + print(f"⚠️ Shodan API error for {ip}: {response.status_code}") return None except Exception as e: - print(f'Error fetching Shodan data for {ip}: {e}') + print(f"💥 Error fetching Shodan data for {ip}: {e}") return None def _enrich_event(self, event, shodan_data): """Add Shodan data to the event.""" try: enrichment = { - 'shodan_ports': shodan_data.get('ports', []), 'shodan_org': shodan_data.get('org', ''), 'shodan_isp': shodan_data.get('isp', ''), 'shodan_country': shodan_data.get('location', {}).get('country_name', ''), 'shodan_city': shodan_data.get('location', {}).get('city', ''), + 'shodan_ports': shodan_data.get('ports', []), 'shodan_hostnames': shodan_data.get('hostnames', []), 'shodan_last_update': shodan_data.get('last_update', ''), - '__ts_analyzer_shodan_enrichment': True } - # Add service information from latest scan + # Add top services if shodan_data.get('data'): services = [] - for service in shodan_data.get('data', [])[:5]: # Limit to first 5 services - service_info = f"Port {service.get('port', 'Unknown')}/{service.get('transport', 'tcp')}" - if service.get('product'): - service_info += f" - {service.get('product', '')}" - if service.get('version'): - service_info += f" {service.get('version', '')}" - services.append(service_info) - + for service in shodan_data.get('data', [])[:3]: # Top 3 services + port = service.get('port', 'Unknown') + product = service.get('product', 'Unknown') + services.append(f"{port}/{product}") enrichment['shodan_services'] = services event.add_attributes(enrichment) event.add_tags(['shodan-enriched']) event.commit() - print(f"Successfully enriched event with Shodan data") - except Exception as e: - print(f"Error enriching event: {e}") + print(f"💥 Error enriching event: {e}") + + def _mark_as_processed(self, event): + """Mark event as processed even if no Shodan data found.""" + try: + event.add_attributes({'shodan_checked': True}) + event.commit() + except Exception: + pass def _is_public_ip(self, ip): - """Check if IP is public (not RFC1918 private ranges).""" + """Check if IP is public.""" try: ip_obj = ipaddress.ip_address(ip) return ip_obj.is_global except (ValueError, ipaddress.AddressValueError): - print(f"Invalid IP address format: {ip}") return False # Register the analyzer