shodan_analyzer.py aktualisiert

This commit is contained in:
Mario Stöckl 2025-08-25 12:49:48 +00:00
parent 6a22fa9c1f
commit b309a7c7d8

View File

@ -5,6 +5,7 @@ import json
from datetime import datetime from datetime import datetime
import ipaddress import ipaddress
import os import os
import logging
class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with Shodan data.""" """Analyzer to enrich IP addresses with Shodan data."""
@ -16,6 +17,9 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
def __init__(self, index_name, sketch_id, timeline_id=None): def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id) super().__init__(index_name, sketch_id, timeline_id)
# Set up logging
self.logger = logging.getLogger(self.__class__.__name__)
# Get API key from environment variables # Get API key from environment variables
self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '') self.shodan_api_key = os.environ.get('SHODAN_API_KEY', '')
self.max_time_diff_hours = 24 self.max_time_diff_hours = 24
@ -48,7 +52,7 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
timestamp = event.source.get('timestamp') timestamp = event.source.get('timestamp')
if source_ip and self._is_public_ip(source_ip): if source_ip and self._is_public_ip(source_ip):
self.logger.info(f"Processing IP: {source_ip}") print(f"Processing IP: {source_ip}") # Use print for now
shodan_data = self._get_shodan_data(source_ip) shodan_data = self._get_shodan_data(source_ip)
if shodan_data: if shodan_data:
self._enrich_event(event, shodan_data) self._enrich_event(event, shodan_data)
@ -69,21 +73,21 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
'history': 'true' 'history': 'true'
} }
self.logger.info(f"Querying Shodan API for IP: {ip}") print(f"Querying Shodan API for IP: {ip}") # Use print for now
response = requests.get(url, params=params, timeout=10) response = requests.get(url, params=params, timeout=10)
if response.status_code == 200: if response.status_code == 200:
self.logger.info(f"Successfully retrieved Shodan data for {ip}") print(f"Successfully retrieved Shodan data for {ip}")
return response.json() return response.json()
elif response.status_code == 404: elif response.status_code == 404:
self.logger.debug(f'No Shodan data found for {ip}') print(f'No Shodan data found for {ip}')
return None return None
else: else:
self.logger.warning(f'Shodan API error for {ip}: {response.status_code} - {response.text}') print(f'Shodan API error for {ip}: {response.status_code} - {response.text}')
return None return None
except Exception as e: except Exception as e:
self.logger.warning(f'Error fetching Shodan data for {ip}: {e}') print(f'Error fetching Shodan data for {ip}: {e}')
return None return None
def _enrich_event(self, event, shodan_data): def _enrich_event(self, event, shodan_data):
@ -117,10 +121,10 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
event.add_tags(['shodan-enriched']) event.add_tags(['shodan-enriched'])
event.commit() event.commit()
self.logger.info(f"Successfully enriched event with Shodan data") print(f"Successfully enriched event with Shodan data")
except Exception as e: except Exception as e:
self.logger.error(f"Error enriching event: {e}") print(f"Error enriching event: {e}")
def _is_public_ip(self, ip): def _is_public_ip(self, ip):
"""Check if IP is public (not RFC1918 private ranges).""" """Check if IP is public (not RFC1918 private ranges)."""
@ -128,7 +132,7 @@ class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer):
ip_obj = ipaddress.ip_address(ip) ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_global return ip_obj.is_global
except (ValueError, ipaddress.AddressValueError): except (ValueError, ipaddress.AddressValueError):
self.logger.debug(f"Invalid IP address format: {ip}") print(f"Invalid IP address format: {ip}")
return False return False
# Register the analyzer # Register the analyzer