from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager import requests import json from datetime import datetime import ipaddress class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with Shodan data.""" NAME = 'shodan_enrichment' DISPLAY_NAME = 'Shodan IP Enrichment' DESCRIPTION = 'Enriches source IP addresses with Shodan historical data' def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) # Get API key from config or environment config = self.get_config() self.shodan_api_key = config.get('api_key', '') self.max_time_diff_hours = config.get('max_time_diff_hours', 24) self.rate_limit_delay = config.get('rate_limit_delay', 1) if not self.shodan_api_key: self.logger.error("Shodan API key not configured") def run(self): """Main analyzer logic.""" if not self.shodan_api_key: return "Shodan API key not configured" query = { 'query': { 'bool': { 'must': [ {'exists': {'field': 'source_ip'}}, {'bool': {'must_not': [{'term': {'__ts_analyzer_shodan_enrichment': True}}]}} ] } } } events = self.event_stream(query_dsl=query, return_fields=['source_ip', 'timestamp']) processed_count = 0 for event in events: source_ip = event.source.get('source_ip') timestamp = event.source.get('timestamp') if source_ip and self._is_public_ip(source_ip): shodan_data = self._get_shodan_data(source_ip) if shodan_data: self._enrich_event(event, shodan_data) processed_count += 1 # Rate limiting import time time.sleep(self.rate_limit_delay) return f"Processed {processed_count} events with Shodan data" def _get_shodan_data(self, ip): """Fetch Shodan data for IP.""" try: url = f"https://api.shodan.io/shodan/host/{ip}" params = { 'key': self.shodan_api_key, 'history': 'true' } response = requests.get(url, params=params, timeout=10) if response.status_code == 200: return response.json() elif response.status_code == 404: self.logger.debug(f'No Shodan data found for {ip}') return None else: self.logger.warning(f'Shodan API error for {ip}: {response.status_code}') return None except requests.exceptions.RequestException as e: self.logger.warning(f'Request error for {ip}: {e}') return None except json.JSONDecodeError as e: self.logger.warning(f'JSON decode error for {ip}: {e}') return None def _enrich_event(self, event, shodan_data): """Add Shodan data to the event.""" enrichment = { 'shodan_ports': shodan_data.get('ports', []), 'shodan_org': shodan_data.get('org', ''), 'shodan_isp': shodan_data.get('isp', ''), 'shodan_country': shodan_data.get('location', {}).get('country_name', ''), 'shodan_city': shodan_data.get('location', {}).get('city', ''), 'shodan_hostnames': shodan_data.get('hostnames', []), 'shodan_last_update': shodan_data.get('last_update', ''), '__ts_analyzer_shodan_enrichment': True } # Add service banners from latest scan if shodan_data.get('data'): latest_scan = shodan_data['data'][0] # Most recent scan enrichment['shodan_services'] = [] for service in shodan_data.get('data', []): service_info = { 'port': service.get('port'), 'protocol': service.get('transport', 'tcp'), 'service': service.get('product', ''), 'version': service.get('version', ''), 'timestamp': service.get('timestamp', '') } enrichment['shodan_services'].append(service_info) event.add_attributes(enrichment) event.add_tags(['shodan-enriched']) event.commit() def _is_public_ip(self, ip): """Check if IP is public (not RFC1918 private ranges).""" try: ip_obj = ipaddress.ip_address(ip) return ip_obj.is_global except (ValueError, ipaddress.AddressValueError): return False # Register the analyzer manager.AnalysisManager.register_analyzer(ShodanEnrichmentAnalyzer)