shodan_analyzer.py aktualisiert
This commit is contained in:
parent
8efb506fc4
commit
cf15c9f200
@ -1,9 +1,9 @@
|
||||
from timesketch.lib.analyzers import interface
|
||||
from timesketch.lib.analyzers import manager
|
||||
import requests
|
||||
import json
|
||||
import ipaddress
|
||||
import os
|
||||
import time
|
||||
|
||||
class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
|
||||
"""Analyzer to enrich IP addresses with Shodan data."""
|
||||
@ -15,14 +15,17 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
|
||||
def __init__(self, index_name, sketch_id, timeline_id=None):
|
||||
super().__init__(index_name, sketch_id, timeline_id)
|
||||
self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '')
|
||||
self.processed_ips = set() # Track processed IPs to avoid duplicates
|
||||
|
||||
def run(self):
|
||||
"""Main analyzer logic."""
|
||||
if not self.shodan_api_key:
|
||||
return "Shodan API key not configured"
|
||||
|
||||
# Query for events with source_ip that haven't been processed
|
||||
# Process in small batches to avoid timeout
|
||||
batch_size = 50 # Process 50 events at a time
|
||||
total_processed = 0
|
||||
enriched_count = 0
|
||||
|
||||
query = {
|
||||
'query': {
|
||||
'bool': {
|
||||
@ -30,45 +33,69 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
|
||||
{'exists': {'field': 'source_ip'}}
|
||||
],
|
||||
'must_not': [
|
||||
{'exists': {'field': 'shodan_org'}} # Skip if already enriched
|
||||
{'exists': {'field': 'shodan_checked'}}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
events = self.event_stream(query_dsl=query, return_fields=['source_ip'])
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
try:
|
||||
print(f"🚀 Starting Shodan enrichment in batches of {batch_size}")
|
||||
|
||||
for event in events:
|
||||
source_ip = event.source.get('source_ip')
|
||||
# Process events in smaller chunks
|
||||
events = self.event_stream(query_dsl=query, return_fields=['source_ip'])
|
||||
processed_ips = set()
|
||||
batch_count = 0
|
||||
|
||||
if source_ip:
|
||||
# Skip if we've already processed this IP in this run
|
||||
if source_ip in self.processed_ips:
|
||||
skipped_count += 1
|
||||
for event in events:
|
||||
source_ip = event.source.get('source_ip')
|
||||
|
||||
if not source_ip:
|
||||
continue
|
||||
|
||||
if self._is_public_ip(source_ip):
|
||||
print(f"Processing new IP: {source_ip}")
|
||||
self.processed_ips.add(source_ip)
|
||||
if not self._is_public_ip(source_ip):
|
||||
# Mark private IPs as checked to skip them next time
|
||||
event.add_attributes({'shodan_checked': True, 'shodan_private_ip': True})
|
||||
event.commit()
|
||||
continue
|
||||
|
||||
shodan_data = self._get_shodan_data(source_ip)
|
||||
if shodan_data:
|
||||
self._enrich_event(event, shodan_data)
|
||||
processed_count += 1
|
||||
print(f"✅ Enriched {source_ip} with Shodan data")
|
||||
else:
|
||||
# Still mark as processed even if no data found
|
||||
self._mark_as_processed(event)
|
||||
# Skip if already processed in this batch
|
||||
if source_ip in processed_ips:
|
||||
event.add_attributes({'shodan_checked': True})
|
||||
event.commit()
|
||||
continue
|
||||
|
||||
# Rate limiting
|
||||
import time
|
||||
time.sleep(1)
|
||||
processed_ips.add(source_ip)
|
||||
print(f"🔍 Processing IP ({batch_count + 1}/{batch_size}): {source_ip}")
|
||||
|
||||
# Get Shodan data
|
||||
shodan_data = self._get_shodan_data(source_ip)
|
||||
|
||||
if shodan_data:
|
||||
self._enrich_event(event, shodan_data)
|
||||
enriched_count += 1
|
||||
print(f"✅ Enriched {source_ip} with Shodan data")
|
||||
else:
|
||||
print(f"Skipping private IP: {source_ip}")
|
||||
# Mark as checked even if no data found
|
||||
event.add_attributes({'shodan_checked': True, 'shodan_no_data': True})
|
||||
event.commit()
|
||||
print(f"❌ No Shodan data for {source_ip}")
|
||||
|
||||
return f"Processed {processed_count} IPs, skipped {skipped_count} duplicates"
|
||||
total_processed += 1
|
||||
batch_count += 1
|
||||
|
||||
# Rate limit and batch control
|
||||
if batch_count >= batch_size:
|
||||
print(f"📊 Completed batch: processed {total_processed} events, enriched {enriched_count}")
|
||||
break
|
||||
|
||||
# Rate limiting between API calls
|
||||
time.sleep(1)
|
||||
|
||||
except Exception as e:
|
||||
print(f"💥 Error during processing: {e}")
|
||||
|
||||
return f"Processed {total_processed} events, enriched {enriched_count} with Shodan data"
|
||||
|
||||
def _get_shodan_data(self, ip):
|
||||
"""Fetch Shodan data for IP."""
|
||||
@ -76,26 +103,24 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
|
||||
url = f"https://api.shodan.io/shodan/host/{ip}"
|
||||
params = {'key': self.shodan_api_key}
|
||||
|
||||
print(f"🔍 Querying Shodan for: {ip}")
|
||||
response = requests.get(url, params=params, timeout=10)
|
||||
|
||||
if response.status_code == 200:
|
||||
print(f"📊 Found Shodan data for {ip}")
|
||||
return response.json()
|
||||
elif response.status_code == 404:
|
||||
print(f"❌ No Shodan data for {ip}")
|
||||
return None
|
||||
return None # No data found
|
||||
else:
|
||||
print(f"⚠️ Shodan API error for {ip}: {response.status_code}")
|
||||
print(f"⚠️ Shodan API error for {ip}: HTTP {response.status_code}")
|
||||
return None
|
||||
|
||||
except Exception as e:
|
||||
print(f"💥 Error fetching Shodan data for {ip}: {e}")
|
||||
print(f"💥 Request error for {ip}: {e}")
|
||||
return None
|
||||
|
||||
def _enrich_event(self, event, shodan_data):
|
||||
"""Add Shodan data to the event."""
|
||||
try:
|
||||
# Core enrichment data
|
||||
enrichment = {
|
||||
'shodan_org': shodan_data.get('org', ''),
|
||||
'shodan_isp': shodan_data.get('isp', ''),
|
||||
@ -104,37 +129,50 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
|
||||
'shodan_ports': shodan_data.get('ports', []),
|
||||
'shodan_hostnames': shodan_data.get('hostnames', []),
|
||||
'shodan_last_update': shodan_data.get('last_update', ''),
|
||||
'shodan_checked': True
|
||||
}
|
||||
|
||||
# Add top services
|
||||
# Add service information (top 3 services)
|
||||
if shodan_data.get('data'):
|
||||
services = []
|
||||
for service in shodan_data.get('data', [])[:3]: # Top 3 services
|
||||
for service in shodan_data.get('data', [])[:3]:
|
||||
port = service.get('port', 'Unknown')
|
||||
product = service.get('product', 'Unknown')
|
||||
services.append(f"{port}/{product}")
|
||||
product = service.get('product', '')
|
||||
version = service.get('version', '')
|
||||
|
||||
service_str = str(port)
|
||||
if product:
|
||||
service_str += f"/{product}"
|
||||
if version:
|
||||
service_str += f" {version}"
|
||||
|
||||
services.append(service_str)
|
||||
|
||||
enrichment['shodan_services'] = services
|
||||
|
||||
# Add vulnerability information if available
|
||||
if shodan_data.get('vulns'):
|
||||
enrichment['shodan_vulns'] = list(shodan_data.get('vulns', []))[:5] # Top 5 vulns
|
||||
|
||||
event.add_attributes(enrichment)
|
||||
event.add_tags(['shodan-enriched'])
|
||||
event.commit()
|
||||
|
||||
except Exception as e:
|
||||
print(f"💥 Error enriching event: {e}")
|
||||
|
||||
def _mark_as_processed(self, event):
|
||||
"""Mark event as processed even if no Shodan data found."""
|
||||
try:
|
||||
event.add_attributes({'shodan_checked': True})
|
||||
event.commit()
|
||||
except Exception:
|
||||
pass
|
||||
print(f"💥 Error enriching event for {event.source.get('source_ip', 'unknown')}: {e}")
|
||||
# Still mark as checked to avoid reprocessing
|
||||
try:
|
||||
event.add_attributes({'shodan_checked': True, 'shodan_error': str(e)})
|
||||
event.commit()
|
||||
except:
|
||||
pass
|
||||
|
||||
def _is_public_ip(self, ip):
|
||||
"""Check if IP is public."""
|
||||
"""Check if IP is public (not private/reserved)."""
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(ip)
|
||||
return ip_obj.is_global
|
||||
# Check if IP is global (public) and not in reserved ranges
|
||||
return ip_obj.is_global and not ip_obj.is_reserved
|
||||
except (ValueError, ipaddress.AddressValueError):
|
||||
return False
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user