whois_analyzer.py aktualisiert

This commit is contained in:
Mario Stöckl 2025-08-25 19:43:40 +00:00
parent 9e338f7923
commit 65780be815

View File

@ -1,9 +1,8 @@
"""Index analyzer plugin for WHOIS data enrichment - Debug Version.""" """Index analyzer plugin for WHOIS data enrichment - API-Only Version."""
import ipaddress import ipaddress
import logging import logging
import time import time
import os
from typing import Dict, Optional, Set from typing import Dict, Optional, Set
import requests import requests
@ -12,426 +11,297 @@ from flask import current_app
from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager from timesketch.lib.analyzers import manager
# Try to import whois library, with fallback handling
try:
import whois
HAS_WHOIS = True
except ImportError:
HAS_WHOIS = False
logger = logging.getLogger("timesketch.analyzers.whois_enrichment") logger = logging.getLogger("timesketch.analyzers.whois_enrichment")
class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with WHOIS data.""" """Analyzer to enrich IP addresses with WHOIS/ASN data using APIs only."""
NAME = 'whois_enrichment' NAME = 'whois_enrichment'
DISPLAY_NAME = 'WHOIS IP Enrichment' DISPLAY_NAME = 'WHOIS IP Enrichment'
DESCRIPTION = 'Enriches source IP addresses with WHOIS/ASN data' DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs'
# Common IP fields to check (same as GeoIP analyzer for consistency) # IP fields to check (consistent with GeoIP analyzer)
IP_FIELDS = [ IP_FIELDS = [
'ip', 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip',
'host_ip', 'ip_address', 'client_ip', 'address', 'saddr', 'daddr'
'src_ip',
'dst_ip',
'source_ip',
'dest_ip',
'ip_address',
'client_ip',
'address',
'saddr',
'daddr'
] ]
def __init__(self, index_name, sketch_id, timeline_id=None): def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id) super().__init__(index_name, sketch_id, timeline_id)
# Configuration options # Configuration
self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 50) self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) # Reduced for API limits
self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 1.0) self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) # Increased
self.max_retries = current_app.config.get('WHOIS_MAX_RETRIES', 2) self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10)
self.timeout = current_app.config.get('WHOIS_TIMEOUT', 30)
# Cache to avoid duplicate queries # Cache to avoid duplicate queries
self.whois_cache: Dict[str, Optional[Dict]] = {} self.whois_cache: Dict[str, Optional[Dict]] = {}
self.processed_ips: Set[str] = set() self.processed_ips: Set[str] = set()
# Debug counters # Stats
self.debug_stats = { self.stats = {
'total_events': 0, 'events_processed': 0,
'events_with_ips': 0, 'valid_ips_found': 0,
'valid_ips_found': 0, 'api_calls': 0,
'invalid_ips_found': 0,
'api_calls_made': 0,
'api_successes': 0, 'api_successes': 0,
'api_failures': 0, 'api_failures': 0,
'whois_lib_available': HAS_WHOIS 'cached_results': 0
} }
logger.info(f"WHOIS Analyzer initialized. python-whois available: {HAS_WHOIS}")
def _validate_ip(self, ip_address: str) -> bool: def _validate_ip(self, ip_address: str) -> bool:
"""Validate an IP address for analysis. """Validate IP address - less restrictive for better coverage."""
Args:
ip_address: The IP address to validate
Returns:
True if IP is valid and should be processed
"""
try: try:
ip_str = ip_address.strip() ip = ipaddress.ip_address(ip_address.strip())
ip = ipaddress.ip_address(ip_str)
# DEBUG: Log all IPs being validated # Skip only obvious invalid cases
logger.debug(f"Validating IP: {ip_str} - is_global: {ip.is_global}, is_private: {ip.is_private}") if ip.is_loopback or ip.is_multicast or ip.is_link_local:
# Be less restrictive than just is_global - include more IPs for testing
if ip.is_private or ip.is_loopback or ip.is_multicast:
logger.debug(f"Skipping private/loopback/multicast IP: {ip_str}")
return False return False
# Accept global IPs and also some reserved ranges that might have WHOIS data # Accept both private and public IPs (some private ranges have ASN data)
return True return True
except (ValueError, AttributeError) as e: except (ValueError, AttributeError):
logger.debug(f"Invalid IP address format: {ip_address} - {e}")
return False return False
def _get_asn_data_via_api(self, ip_address: str) -> Optional[Dict]: def _get_asn_data_via_ipapi(self, ip_address: str) -> Optional[Dict]:
"""Get ASN data using a free API service as fallback.""" """Get ASN data using ip-api.com (150 requests/minute free tier)."""
try: try:
self.debug_stats['api_calls_made'] += 1 self.stats['api_calls'] += 1
# Using ip-api.com which has a free tier (150 requests per minute) # Comprehensive field list for ip-api.com
url = f"http://ip-api.com/json/{ip_address}?fields=status,message,as,asname,isp,org,country,regionName,city" fields = "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting"
url = f"http://ip-api.com/json/{ip_address}?fields={fields}"
logger.debug(f"Making API call to: {url}") logger.debug(f"API call: {url}")
response = requests.get(url, timeout=self.timeout) response = requests.get(url, timeout=self.timeout)
logger.debug(f"API response status: {response.status_code}")
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
logger.debug(f"API response data: {data}")
if data.get('status') == 'success': if data.get('status') == 'success':
# Parse ASN number from 'as' field (format: "AS15169 Google LLC") # Parse ASN from 'as' field (format: "AS15169 Google LLC")
as_info = data.get('as', '') as_info = data.get('as', '')
asn = None asn = None
asn_description = None
if as_info and as_info.startswith('AS'): if as_info and as_info.startswith('AS'):
asn = as_info.split()[0][2:] # Remove 'AS' prefix parts = as_info.split(' ', 1)
asn = parts[0][2:] # Remove 'AS' prefix
if len(parts) > 1:
asn_description = parts[1]
result = { result = {
'source': 'ip-api.com',
'asn': asn, 'asn': asn,
'asn_description': asn_description,
'asn_name': data.get('asname'), 'asn_name': data.get('asname'),
'isp': data.get('isp'), 'isp': data.get('isp'),
'organization': data.get('org'), 'organization': data.get('org'),
'country': data.get('country'), 'country': data.get('country'),
'country_code': data.get('countryCode'),
'region': data.get('regionName'), 'region': data.get('regionName'),
'city': data.get('city') 'city': data.get('city'),
'latitude': data.get('lat'),
'longitude': data.get('lon'),
'timezone': data.get('timezone'),
'is_mobile': data.get('mobile'),
'is_proxy': data.get('proxy'),
'is_hosting': data.get('hosting')
} }
self.debug_stats['api_successes'] += 1 # Remove None values
logger.debug(f"API lookup successful for {ip_address}: {result}") result = {k: v for k, v in result.items() if v is not None}
self.stats['api_successes'] += 1
logger.info(f"✅ Successfully retrieved data for {ip_address}")
return result return result
else: else:
logger.debug(f"API returned failure status for {ip_address}: {data.get('message', 'Unknown error')}") logger.debug(f"❌ API returned error for {ip_address}: {data.get('message')}")
self.debug_stats['api_failures'] += 1 self.stats['api_failures'] += 1
else: else:
logger.warning(f"API request failed with status {response.status_code}") logger.warning(f"❌ HTTP {response.status_code} for {ip_address}")
self.debug_stats['api_failures'] += 1 self.stats['api_failures'] += 1
return None return None
except requests.exceptions.Timeout:
logger.warning(f"⏰ API timeout for {ip_address}")
self.stats['api_failures'] += 1
return None
except Exception as e: except Exception as e:
logger.error(f"API lookup failed for {ip_address}: {e}") logger.error(f"💥 API error for {ip_address}: {e}")
self.debug_stats['api_failures'] += 1 self.stats['api_failures'] += 1
return None
def _get_whois_data_python_whois(self, ip_address: str) -> Optional[Dict]:
"""Get WHOIS data using python-whois library."""
if not HAS_WHOIS:
logger.debug("python-whois library not available")
return None
try:
logger.debug(f"Attempting python-whois lookup for {ip_address}")
w = whois.whois(ip_address)
# Extract relevant information
data = {}
# Network information
if hasattr(w, 'nets') and w.nets:
net = w.nets[0] if isinstance(w.nets, list) else w.nets
data['network_name'] = getattr(net, 'name', None)
data['network_range'] = getattr(net, 'range', None)
data['network_type'] = getattr(net, 'type', None)
# ASN information
if hasattr(w, 'asn'):
data['asn'] = w.asn
if hasattr(w, 'asn_description'):
data['asn_description'] = w.asn_description
# Organization information
if hasattr(w, 'org'):
data['organization'] = w.org
if hasattr(w, 'address'):
data['address'] = w.address
if hasattr(w, 'city'):
data['city'] = w.city
if hasattr(w, 'state'):
data['state'] = w.state
if hasattr(w, 'country'):
data['country'] = w.country
# Registration dates
if hasattr(w, 'creation_date'):
data['creation_date'] = str(w.creation_date)
if hasattr(w, 'updated_date'):
data['updated_date'] = str(w.updated_date)
if data:
logger.debug(f"python-whois lookup successful for {ip_address}: {data}")
else:
logger.debug(f"python-whois returned no data for {ip_address}")
return data if data else None
except Exception as e:
logger.debug(f"Python-whois lookup failed for {ip_address}: {e}")
return None return None
def _get_whois_data(self, ip_address: str) -> Optional[Dict]: def _get_whois_data(self, ip_address: str) -> Optional[Dict]:
"""Get WHOIS data for an IP address using available methods.""" """Get WHOIS data for IP address."""
if ip_address in self.whois_cache: if ip_address in self.whois_cache:
logger.debug(f"Using cached WHOIS data for {ip_address}") self.stats['cached_results'] += 1
return self.whois_cache[ip_address] return self.whois_cache[ip_address]
whois_data = None # Use API-only approach
whois_data = self._get_asn_data_via_ipapi(ip_address)
# Try python-whois first if available # Cache result (even if None to avoid repeated failed lookups)
if HAS_WHOIS:
whois_data = self._get_whois_data_python_whois(ip_address)
# Fallback to API if python-whois failed or unavailable
if not whois_data:
whois_data = self._get_asn_data_via_api(ip_address)
# Cache the result (even if None)
self.whois_cache[ip_address] = whois_data self.whois_cache[ip_address] = whois_data
if whois_data:
logger.info(f"Successfully retrieved WHOIS data for {ip_address}")
else:
logger.debug(f"No WHOIS data found for {ip_address}")
return whois_data return whois_data
def _enrich_event(self, event, ip_field: str, whois_data: Dict): def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict):
"""Add WHOIS data to the event.""" """Add WHOIS data to event."""
try: try:
# Create enrichment attributes with field-specific naming enrichment = {
enrichment = {'whois_checked': True} 'whois_checked': True,
f'{ip_field}_whois_source': whois_data.get('source', 'unknown')
}
# Add ASN information # ASN information
if whois_data.get('asn'): if whois_data.get('asn'):
enrichment[f'{ip_field}_asn'] = whois_data['asn'] enrichment[f'{ip_field}_asn'] = whois_data['asn']
if whois_data.get('asn_name') or whois_data.get('asn_description'): if whois_data.get('asn_name'):
asn_name = whois_data.get('asn_name') or whois_data.get('asn_description') enrichment[f'{ip_field}_asn_name'] = whois_data['asn_name']
enrichment[f'{ip_field}_asn_name'] = asn_name if whois_data.get('asn_description'):
enrichment[f'{ip_field}_asn_description'] = whois_data['asn_description']
# Add organization information
# Organization info
if whois_data.get('organization'): if whois_data.get('organization'):
enrichment[f'{ip_field}_organization'] = whois_data['organization'] enrichment[f'{ip_field}_organization'] = whois_data['organization']
if whois_data.get('isp'): if whois_data.get('isp'):
enrichment[f'{ip_field}_isp'] = whois_data['isp'] enrichment[f'{ip_field}_isp'] = whois_data['isp']
# Add network information # Location (prefix with 'whois' to avoid conflicts with GeoIP)
if whois_data.get('network_name'):
enrichment[f'{ip_field}_network_name'] = whois_data['network_name']
if whois_data.get('network_range'):
enrichment[f'{ip_field}_network_range'] = whois_data['network_range']
if whois_data.get('network_type'):
enrichment[f'{ip_field}_network_type'] = whois_data['network_type']
# Add location information (if not covered by GeoIP)
if whois_data.get('country'): if whois_data.get('country'):
enrichment[f'{ip_field}_whois_country'] = whois_data['country'] enrichment[f'{ip_field}_whois_country'] = whois_data['country']
if whois_data.get('country_code'):
enrichment[f'{ip_field}_whois_country_code'] = whois_data['country_code']
if whois_data.get('region'): if whois_data.get('region'):
enrichment[f'{ip_field}_whois_region'] = whois_data['region'] enrichment[f'{ip_field}_whois_region'] = whois_data['region']
if whois_data.get('city'): if whois_data.get('city'):
enrichment[f'{ip_field}_whois_city'] = whois_data['city'] enrichment[f'{ip_field}_whois_city'] = whois_data['city']
# Add registration dates
if whois_data.get('creation_date'):
enrichment[f'{ip_field}_creation_date'] = whois_data['creation_date']
if whois_data.get('updated_date'):
enrichment[f'{ip_field}_updated_date'] = whois_data['updated_date']
logger.debug(f"Adding enrichment data: {enrichment}") # Additional metadata
if whois_data.get('timezone'):
enrichment[f'{ip_field}_timezone'] = whois_data['timezone']
if whois_data.get('is_mobile') is not None:
enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile']
if whois_data.get('is_proxy') is not None:
enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy']
if whois_data.get('is_hosting') is not None:
enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting']
event.add_attributes(enrichment) event.add_attributes(enrichment)
event.add_tags(['whois-enriched']) event.add_tags(['whois-enriched'])
event.commit() event.commit()
logger.info(f"Successfully enriched event for {ip_field}") logger.info(f"✅ Enriched event for {ip_address} ({ip_field})")
except Exception as e: except Exception as e:
logger.error(f"Error enriching event for {ip_field}: {e}") logger.error(f"💥 Error enriching event for {ip_address}: {e}")
# Still mark as checked to avoid reprocessing # Mark as checked to avoid retry loops
try: try:
event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.add_attributes({'whois_checked': True, 'whois_error': str(e)})
event.commit() event.commit()
except Exception as commit_error: except Exception:
logger.error(f"Error marking event as checked: {commit_error}") pass
def run(self): def run(self):
"""Main analyzer logic.""" """Main analyzer logic."""
logger.info("Starting WHOIS enrichment analysis") logger.info("🚀 Starting WHOIS enrichment analysis")
logger.info(f"Debug stats: {self.debug_stats}")
# Test a single known IP to verify API connectivity # Test API connectivity first
test_ip = "8.8.8.8" test_result = self._get_whois_data("8.8.8.8")
logger.info(f"Testing API connectivity with {test_ip}") if not test_result:
test_result = self._get_asn_data_via_api(test_ip) return "❌ API connectivity test failed - check internet connection"
if test_result:
logger.info(f"API test successful: {test_result}")
else: else:
logger.warning("API test failed - this may indicate connectivity issues") logger.info(f"✅ API connectivity test passed: {test_result}")
# Build query for events with IP fields that haven't been checked # Query for events with IP fields, excluding already processed ones
ip_exists_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS]
query = f'({" OR ".join(ip_exists_queries)}) AND NOT _exists_:whois_checked' query = f'({" OR ".join(ip_queries)}) AND NOT _exists_:whois_checked'
logger.info(f"Query: {query}") logger.info(f"📝 Query: {query}")
events = self.event_stream( events = self.event_stream(
query_string=query, query_string=query,
return_fields=self.IP_FIELDS + ['whois_checked'] return_fields=self.IP_FIELDS + ['whois_checked']
) )
total_processed = 0
enriched_count = 0 enriched_count = 0
try: try:
current_batch = []
for event in events: for event in events:
current_batch.append(event) self.stats['events_processed'] += 1
self.debug_stats['total_events'] += 1
# Debug: Log event fields # Process IP fields in this event
ip_fields_present = [] enriched_this_event = False
for field in self.IP_FIELDS:
value = event.source.get(field)
if value:
ip_fields_present.append(f"{field}={value}")
if ip_fields_present: for ip_field in self.IP_FIELDS:
logger.debug(f"Event {self.debug_stats['total_events']} has IP fields: {ip_fields_present}") ip_value = event.source.get(ip_field)
self.debug_stats['events_with_ips'] += 1 if not ip_value:
else: continue
logger.debug(f"Event {self.debug_stats['total_events']} has no IP fields")
if len(current_batch) >= self.batch_size:
processed, enriched = self._process_batch(current_batch)
total_processed += processed
enriched_count += enriched
current_batch = []
# Rate limiting # Handle string or list of IPs
ip_list = [ip_value] if isinstance(ip_value, str) else (ip_value if isinstance(ip_value, list) else [])
for ip_addr in ip_list:
ip_str = str(ip_addr).strip()
if not self._validate_ip(ip_str):
continue
self.stats['valid_ips_found'] += 1
if ip_str not in self.processed_ips:
self.processed_ips.add(ip_str)
whois_data = self._get_whois_data(ip_str)
if whois_data:
self._enrich_event(event, ip_field, ip_str, whois_data)
enriched_count += 1
enriched_this_event = True
break # One enrichment per event
# Mark as checked even if no enrichment
if not enriched_this_event:
try:
event.add_attributes({'whois_checked': True, 'whois_no_data': True})
event.commit()
except Exception as e:
logger.error(f"Error marking event: {e}")
# Rate limiting
if self.stats['events_processed'] % self.batch_size == 0:
logger.info(f"📊 Progress: {self.stats}")
if self.rate_limit_delay > 0: if self.rate_limit_delay > 0:
time.sleep(self.rate_limit_delay) time.sleep(self.rate_limit_delay)
# Log progress
if total_processed % (self.batch_size * 5) == 0:
logger.info(f"Progress: {total_processed} processed, {enriched_count} enriched")
logger.info(f"Debug stats: {self.debug_stats}")
# Process remaining events
if current_batch:
processed, enriched = self._process_batch(current_batch)
total_processed += processed
enriched_count += enriched
except Exception as e: except Exception as e:
logger.error(f"Error during WHOIS processing: {e}", exc_info=True) logger.error(f"💥 Error during processing: {e}", exc_info=True)
# Create a view if we enriched any events # Create view if we have enriched events
if enriched_count > 0: if enriched_count > 0:
self.sketch.add_view( self.sketch.add_view(
view_name="WHOIS Enriched Events", view_name="WHOIS Enriched Events",
analyzer_name=self.NAME, analyzer_name=self.NAME,
query_string='tag:"whois-enriched"' query_string='tag:"whois-enriched"'
) )
# Final debug summary
logger.info(f"WHOIS enrichment complete:")
logger.info(f" - Total events processed: {total_processed}")
logger.info(f" - Events enriched: {enriched_count}")
logger.info(f" - Debug stats: {self.debug_stats}")
return f"Processed {total_processed} events, enriched {enriched_count} with WHOIS data. Debug stats: {self.debug_stats}" # Final summary
success_rate = (self.stats['api_successes'] / max(1, self.stats['api_calls'])) * 100
def _process_batch(self, events):
"""Process a batch of events."""
processed_count = 0
enriched_count = 0
for event in events: summary = (
processed_count += 1 f"📈 WHOIS analysis complete: "
event_enriched = False f"{enriched_count} events enriched, "
f"{self.stats['valid_ips_found']} valid IPs found, "
# Check each IP field in the event f"API success rate: {success_rate:.1f}%"
for ip_field in self.IP_FIELDS: )
ip_value = event.source.get(ip_field)
if not ip_value:
continue
# Handle both single IP and list of IPs
if isinstance(ip_value, str):
ip_addresses = [ip_value]
else:
ip_addresses = ip_value if isinstance(ip_value, list) else [str(ip_value)]
for ip_addr in ip_addresses:
if not self._validate_ip(ip_addr):
self.debug_stats['invalid_ips_found'] += 1
continue
self.debug_stats['valid_ips_found'] += 1
if ip_addr in self.processed_ips:
logger.debug(f"IP {ip_addr} already processed")
continue
self.processed_ips.add(ip_addr)
# Get WHOIS data
whois_data = self._get_whois_data(ip_addr)
if whois_data:
self._enrich_event(event, ip_field, whois_data)
enriched_count += 1
event_enriched = True
logger.info(f"Enriched {ip_addr} with WHOIS data")
break # Only enrich once per event
else:
logger.debug(f"No WHOIS data for {ip_addr}")
# Mark event as checked even if no enrichment occurred
if not event_enriched:
try:
event.add_attributes({'whois_checked': True, 'whois_no_data': True})
event.commit()
except Exception as e:
logger.error(f"Error marking event as checked: {e}")
return processed_count, enriched_count logger.info(summary)
logger.info(f"📊 Final stats: {self.stats}")
return summary
# Register the analyzer # Register the analyzer