timesketch_analyzers/shodan_analyzer.py

135 lines
5.3 KiB
Python

from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager
import requests
import json
from datetime import datetime
import ipaddress
import os
class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with Shodan data."""
NAME = 'shodan_enrichment'
DISPLAY_NAME = 'Shodan IP Enrichment'
DESCRIPTION = 'Enriches source IP addresses with Shodan historical data'
def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id)
# Get API key from environment variables
self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '')
self.max_time_diff_hours = 24
self.rate_limit_delay = 1
if not self.shodan_api_key:
self.logger.error("Shodan API key not configured in environment variables")
def run(self):
"""Main analyzer logic."""
if not self.shodan_api_key:
return "Shodan API key not configured"
query = {
'query': {
'bool': {
'must': [
{'exists': {'field': 'source_ip'}},
{'bool': {'must_not': [{'term': {'__ts_analyzer_shodan_enrichment': True}}]}}
]
}
}
}
events = self.event_stream(query_dsl=query, return_fields=['source_ip', 'timestamp'])
processed_count = 0
for event in events:
source_ip = event.source.get('source_ip')
timestamp = event.source.get('timestamp')
if source_ip and self._is_public_ip(source_ip):
self.logger.info(f"Processing IP: {source_ip}")
shodan_data = self._get_shodan_data(source_ip)
if shodan_data:
self._enrich_event(event, shodan_data)
processed_count += 1
# Rate limiting
import time
time.sleep(self.rate_limit_delay)
return f"Processed {processed_count} events with Shodan data"
def _get_shodan_data(self, ip):
"""Fetch Shodan data for IP."""
try:
url = f"https://api.shodan.io/shodan/host/{ip}"
params = {
'key': self.shodan_api_key,
'history': 'true'
}
self.logger.info(f"Querying Shodan API for IP: {ip}")
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
self.logger.info(f"Successfully retrieved Shodan data for {ip}")
return response.json()
elif response.status_code == 404:
self.logger.debug(f'No Shodan data found for {ip}')
return None
else:
self.logger.warning(f'Shodan API error for {ip}: {response.status_code} - {response.text}')
return None
except Exception as e:
self.logger.warning(f'Error fetching Shodan data for {ip}: {e}')
return None
def _enrich_event(self, event, shodan_data):
"""Add Shodan data to the event."""
try:
enrichment = {
'shodan_ports': shodan_data.get('ports', []),
'shodan_org': shodan_data.get('org', ''),
'shodan_isp': shodan_data.get('isp', ''),
'shodan_country': shodan_data.get('location', {}).get('country_name', ''),
'shodan_city': shodan_data.get('location', {}).get('city', ''),
'shodan_hostnames': shodan_data.get('hostnames', []),
'shodan_last_update': shodan_data.get('last_update', ''),
'__ts_analyzer_shodan_enrichment': True
}
# Add service information from latest scan
if shodan_data.get('data'):
services = []
for service in shodan_data.get('data', [])[:5]: # Limit to first 5 services
service_info = f"Port {service.get('port', 'Unknown')}/{service.get('transport', 'tcp')}"
if service.get('product'):
service_info += f" - {service.get('product', '')}"
if service.get('version'):
service_info += f" {service.get('version', '')}"
services.append(service_info)
enrichment['shodan_services'] = services
event.add_attributes(enrichment)
event.add_tags(['shodan-enriched'])
event.commit()
self.logger.info(f"Successfully enriched event with Shodan data")
except Exception as e:
self.logger.error(f"Error enriching event: {e}")
def _is_public_ip(self, ip):
"""Check if IP is public (not RFC1918 private ranges)."""
try:
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_global
except (ValueError, ipaddress.AddressValueError):
self.logger.debug(f"Invalid IP address format: {ip}")
return False
# Register the analyzer
manager.AnalysisManager.register_analyzer(ShodanEnrichmentAnalyzer)