shodan_analyzer.py aktualisiert

This commit is contained in:
Mario Stöckl 2025-08-25 12:56:00 +00:00
parent b309a7c7d8
commit 8efb506fc4

View File

@ -2,10 +2,8 @@ from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager from timesketch.lib.analyzers import manager
import requests import requests
import json import json
from datetime import datetime
import ipaddress import ipaddress
import os import os
import logging
class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with Shodan data.""" """Analyzer to enrich IP addresses with Shodan data."""
@ -16,123 +14,128 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
def __init__(self, index_name, sketch_id, timeline_id=None): def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id) super().__init__(index_name, sketch_id, timeline_id)
# Set up logging
self.logger = logging.getLogger(self.__class__.__name__)
# Get API key from environment variables
self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '') self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '')
self.max_time_diff_hours = 24 self.processed_ips = set() # Track processed IPs to avoid duplicates
self.rate_limit_delay = 1
if not self.shodan_api_key:
self.logger.error("Shodan API key not configured in environment variables")
def run(self): def run(self):
"""Main analyzer logic.""" """Main analyzer logic."""
if not self.shodan_api_key: if not self.shodan_api_key:
return "Shodan API key not configured" return "Shodan API key not configured"
# Query for events with source_ip that haven't been processed
query = { query = {
'query': { 'query': {
'bool': { 'bool': {
'must': [ 'must': [
{'exists': {'field': 'source_ip'}}, {'exists': {'field': 'source_ip'}}
{'bool': {'must_not': [{'term': {'__ts_analyzer_shodan_enrichment': True}}]}} ],
'must_not': [
{'exists': {'field': 'shodan_org'}} # Skip if already enriched
] ]
} }
} }
} }
events = self.event_stream(query_dsl=query, return_fields=['source_ip', 'timestamp']) events = self.event_stream(query_dsl=query, return_fields=['source_ip'])
processed_count = 0 processed_count = 0
skipped_count = 0
for event in events: for event in events:
source_ip = event.source.get('source_ip') source_ip = event.source.get('source_ip')
timestamp = event.source.get('timestamp')
if source_ip and self._is_public_ip(source_ip): if source_ip:
print(f"Processing IP: {source_ip}") # Use print for now # Skip if we've already processed this IP in this run
shodan_data = self._get_shodan_data(source_ip) if source_ip in self.processed_ips:
if shodan_data: skipped_count += 1
self._enrich_event(event, shodan_data) continue
processed_count += 1
# Rate limiting if self._is_public_ip(source_ip):
import time print(f"Processing new IP: {source_ip}")
time.sleep(self.rate_limit_delay) self.processed_ips.add(source_ip)
shodan_data = self._get_shodan_data(source_ip)
if shodan_data:
self._enrich_event(event, shodan_data)
processed_count += 1
print(f"✅ Enriched {source_ip} with Shodan data")
else:
# Still mark as processed even if no data found
self._mark_as_processed(event)
# Rate limiting
import time
time.sleep(1)
else:
print(f"Skipping private IP: {source_ip}")
return f"Processed {processed_count} events with Shodan data" return f"Processed {processed_count} IPs, skipped {skipped_count} duplicates"
def _get_shodan_data(self, ip): def _get_shodan_data(self, ip):
"""Fetch Shodan data for IP.""" """Fetch Shodan data for IP."""
try: try:
url = f"https://api.shodan.io/shodan/host/{ip}" url = f"https://api.shodan.io/shodan/host/{ip}"
params = { params = {'key': self.shodan_api_key}
'key': self.shodan_api_key,
'history': 'true'
}
print(f"Querying Shodan API for IP: {ip}") # Use print for now print(f"🔍 Querying Shodan for: {ip}")
response = requests.get(url, params=params, timeout=10) response = requests.get(url, params=params, timeout=10)
if response.status_code == 200: if response.status_code == 200:
print(f"Successfully retrieved Shodan data for {ip}") print(f"📊 Found Shodan data for {ip}")
return response.json() return response.json()
elif response.status_code == 404: elif response.status_code == 404:
print(f'No Shodan data found for {ip}') print(f"❌ No Shodan data for {ip}")
return None return None
else: else:
print(f'Shodan API error for {ip}: {response.status_code} - {response.text}') print(f"⚠️ Shodan API error for {ip}: {response.status_code}")
return None return None
except Exception as e: except Exception as e:
print(f'Error fetching Shodan data for {ip}: {e}') print(f"💥 Error fetching Shodan data for {ip}: {e}")
return None return None
def _enrich_event(self, event, shodan_data): def _enrich_event(self, event, shodan_data):
"""Add Shodan data to the event.""" """Add Shodan data to the event."""
try: try:
enrichment = { enrichment = {
'shodan_ports': shodan_data.get('ports', []),
'shodan_org': shodan_data.get('org', ''), 'shodan_org': shodan_data.get('org', ''),
'shodan_isp': shodan_data.get('isp', ''), 'shodan_isp': shodan_data.get('isp', ''),
'shodan_country': shodan_data.get('location', {}).get('country_name', ''), 'shodan_country': shodan_data.get('location', {}).get('country_name', ''),
'shodan_city': shodan_data.get('location', {}).get('city', ''), 'shodan_city': shodan_data.get('location', {}).get('city', ''),
'shodan_ports': shodan_data.get('ports', []),
'shodan_hostnames': shodan_data.get('hostnames', []), 'shodan_hostnames': shodan_data.get('hostnames', []),
'shodan_last_update': shodan_data.get('last_update', ''), 'shodan_last_update': shodan_data.get('last_update', ''),
'__ts_analyzer_shodan_enrichment': True
} }
# Add service information from latest scan # Add top services
if shodan_data.get('data'): if shodan_data.get('data'):
services = [] services = []
for service in shodan_data.get('data', [])[:5]: # Limit to first 5 services for service in shodan_data.get('data', [])[:3]: # Top 3 services
service_info = f"Port {service.get('port', 'Unknown')}/{service.get('transport', 'tcp')}" port = service.get('port', 'Unknown')
if service.get('product'): product = service.get('product', 'Unknown')
service_info += f" - {service.get('product', '')}" services.append(f"{port}/{product}")
if service.get('version'):
service_info += f" {service.get('version', '')}"
services.append(service_info)
enrichment['shodan_services'] = services enrichment['shodan_services'] = services
event.add_attributes(enrichment) event.add_attributes(enrichment)
event.add_tags(['shodan-enriched']) event.add_tags(['shodan-enriched'])
event.commit() event.commit()
print(f"Successfully enriched event with Shodan data")
except Exception as e: except Exception as e:
print(f"Error enriching event: {e}") print(f"💥 Error enriching event: {e}")
def _mark_as_processed(self, event):
"""Mark event as processed even if no Shodan data found."""
try:
event.add_attributes({'shodan_checked': True})
event.commit()
except Exception:
pass
def _is_public_ip(self, ip): def _is_public_ip(self, ip):
"""Check if IP is public (not RFC1918 private ranges).""" """Check if IP is public."""
try: try:
ip_obj = ipaddress.ip_address(ip) ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_global return ip_obj.is_global
except (ValueError, ipaddress.AddressValueError): except (ValueError, ipaddress.AddressValueError):
print(f"Invalid IP address format: {ip}")
return False return False
# Register the analyzer # Register the analyzer