"""Index analyzer plugin for WHOIS data enrichment - Debug Version.""" import ipaddress import logging import time import os from typing import Dict, Optional, Set import requests from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager # Try to import whois library, with fallback handling try: import whois HAS_WHOIS = True except ImportError: HAS_WHOIS = False logger = logging.getLogger("timesketch.analyzers.whois_enrichment") class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with WHOIS data.""" NAME = 'whois_enrichment' DISPLAY_NAME = 'WHOIS IP Enrichment' DESCRIPTION = 'Enriches source IP addresses with WHOIS/ASN data' # Common IP fields to check (same as GeoIP analyzer for consistency) IP_FIELDS = [ 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip', 'ip_address', 'client_ip', 'address', 'saddr', 'daddr' ] def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) # Configuration options self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 50) self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 1.0) self.max_retries = current_app.config.get('WHOIS_MAX_RETRIES', 2) self.timeout = current_app.config.get('WHOIS_TIMEOUT', 30) # Cache to avoid duplicate queries self.whois_cache: Dict[str, Optional[Dict]] = {} self.processed_ips: Set[str] = set() # Debug counters self.debug_stats = { 'total_events': 0, 'events_with_ips': 0, 'valid_ips_found': 0, 'invalid_ips_found': 0, 'api_calls_made': 0, 'api_successes': 0, 'api_failures': 0, 'whois_lib_available': HAS_WHOIS } logger.info(f"WHOIS Analyzer initialized. python-whois available: {HAS_WHOIS}") def _validate_ip(self, ip_address: str) -> bool: """Validate an IP address for analysis. Args: ip_address: The IP address to validate Returns: True if IP is valid and should be processed """ try: ip_str = ip_address.strip() ip = ipaddress.ip_address(ip_str) # DEBUG: Log all IPs being validated logger.debug(f"Validating IP: {ip_str} - is_global: {ip.is_global}, is_private: {ip.is_private}") # Be less restrictive than just is_global - include more IPs for testing if ip.is_private or ip.is_loopback or ip.is_multicast: logger.debug(f"Skipping private/loopback/multicast IP: {ip_str}") return False # Accept global IPs and also some reserved ranges that might have WHOIS data return True except (ValueError, AttributeError) as e: logger.debug(f"Invalid IP address format: {ip_address} - {e}") return False def _get_asn_data_via_api(self, ip_address: str) -> Optional[Dict]: """Get ASN data using a free API service as fallback.""" try: self.debug_stats['api_calls_made'] += 1 # Using ip-api.com which has a free tier (150 requests per minute) url = f"http://ip-api.com/json/{ip_address}?fields=status,message,as,asname,isp,org,country,regionName,city" logger.debug(f"Making API call to: {url}") response = requests.get(url, timeout=self.timeout) logger.debug(f"API response status: {response.status_code}") if response.status_code == 200: data = response.json() logger.debug(f"API response data: {data}") if data.get('status') == 'success': # Parse ASN number from 'as' field (format: "AS15169 Google LLC") as_info = data.get('as', '') asn = None if as_info and as_info.startswith('AS'): asn = as_info.split()[0][2:] # Remove 'AS' prefix result = { 'asn': asn, 'asn_name': data.get('asname'), 'isp': data.get('isp'), 'organization': data.get('org'), 'country': data.get('country'), 'region': data.get('regionName'), 'city': data.get('city') } self.debug_stats['api_successes'] += 1 logger.debug(f"API lookup successful for {ip_address}: {result}") return result else: logger.debug(f"API returned failure status for {ip_address}: {data.get('message', 'Unknown error')}") self.debug_stats['api_failures'] += 1 else: logger.warning(f"API request failed with status {response.status_code}") self.debug_stats['api_failures'] += 1 return None except Exception as e: logger.error(f"API lookup failed for {ip_address}: {e}") self.debug_stats['api_failures'] += 1 return None def _get_whois_data_python_whois(self, ip_address: str) -> Optional[Dict]: """Get WHOIS data using python-whois library.""" if not HAS_WHOIS: logger.debug("python-whois library not available") return None try: logger.debug(f"Attempting python-whois lookup for {ip_address}") w = whois.whois(ip_address) # Extract relevant information data = {} # Network information if hasattr(w, 'nets') and w.nets: net = w.nets[0] if isinstance(w.nets, list) else w.nets data['network_name'] = getattr(net, 'name', None) data['network_range'] = getattr(net, 'range', None) data['network_type'] = getattr(net, 'type', None) # ASN information if hasattr(w, 'asn'): data['asn'] = w.asn if hasattr(w, 'asn_description'): data['asn_description'] = w.asn_description # Organization information if hasattr(w, 'org'): data['organization'] = w.org if hasattr(w, 'address'): data['address'] = w.address if hasattr(w, 'city'): data['city'] = w.city if hasattr(w, 'state'): data['state'] = w.state if hasattr(w, 'country'): data['country'] = w.country # Registration dates if hasattr(w, 'creation_date'): data['creation_date'] = str(w.creation_date) if hasattr(w, 'updated_date'): data['updated_date'] = str(w.updated_date) if data: logger.debug(f"python-whois lookup successful for {ip_address}: {data}") else: logger.debug(f"python-whois returned no data for {ip_address}") return data if data else None except Exception as e: logger.debug(f"Python-whois lookup failed for {ip_address}: {e}") return None def _get_whois_data(self, ip_address: str) -> Optional[Dict]: """Get WHOIS data for an IP address using available methods.""" if ip_address in self.whois_cache: logger.debug(f"Using cached WHOIS data for {ip_address}") return self.whois_cache[ip_address] whois_data = None # Try python-whois first if available if HAS_WHOIS: whois_data = self._get_whois_data_python_whois(ip_address) # Fallback to API if python-whois failed or unavailable if not whois_data: whois_data = self._get_asn_data_via_api(ip_address) # Cache the result (even if None) self.whois_cache[ip_address] = whois_data if whois_data: logger.info(f"Successfully retrieved WHOIS data for {ip_address}") else: logger.debug(f"No WHOIS data found for {ip_address}") return whois_data def _enrich_event(self, event, ip_field: str, whois_data: Dict): """Add WHOIS data to the event.""" try: # Create enrichment attributes with field-specific naming enrichment = {'whois_checked': True} # Add ASN information if whois_data.get('asn'): enrichment[f'{ip_field}_asn'] = whois_data['asn'] if whois_data.get('asn_name') or whois_data.get('asn_description'): asn_name = whois_data.get('asn_name') or whois_data.get('asn_description') enrichment[f'{ip_field}_asn_name'] = asn_name # Add organization information if whois_data.get('organization'): enrichment[f'{ip_field}_organization'] = whois_data['organization'] if whois_data.get('isp'): enrichment[f'{ip_field}_isp'] = whois_data['isp'] # Add network information if whois_data.get('network_name'): enrichment[f'{ip_field}_network_name'] = whois_data['network_name'] if whois_data.get('network_range'): enrichment[f'{ip_field}_network_range'] = whois_data['network_range'] if whois_data.get('network_type'): enrichment[f'{ip_field}_network_type'] = whois_data['network_type'] # Add location information (if not covered by GeoIP) if whois_data.get('country'): enrichment[f'{ip_field}_whois_country'] = whois_data['country'] if whois_data.get('region'): enrichment[f'{ip_field}_whois_region'] = whois_data['region'] if whois_data.get('city'): enrichment[f'{ip_field}_whois_city'] = whois_data['city'] # Add registration dates if whois_data.get('creation_date'): enrichment[f'{ip_field}_creation_date'] = whois_data['creation_date'] if whois_data.get('updated_date'): enrichment[f'{ip_field}_updated_date'] = whois_data['updated_date'] logger.debug(f"Adding enrichment data: {enrichment}") event.add_attributes(enrichment) event.add_tags(['whois-enriched']) event.commit() logger.info(f"Successfully enriched event for {ip_field}") except Exception as e: logger.error(f"Error enriching event for {ip_field}: {e}") # Still mark as checked to avoid reprocessing try: event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.commit() except Exception as commit_error: logger.error(f"Error marking event as checked: {commit_error}") def run(self): """Main analyzer logic.""" logger.info("Starting WHOIS enrichment analysis") logger.info(f"Debug stats: {self.debug_stats}") # Test a single known IP to verify API connectivity test_ip = "8.8.8.8" logger.info(f"Testing API connectivity with {test_ip}") test_result = self._get_asn_data_via_api(test_ip) if test_result: logger.info(f"API test successful: {test_result}") else: logger.warning("API test failed - this may indicate connectivity issues") # Build query for events with IP fields that haven't been checked ip_exists_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] query = f'({" OR ".join(ip_exists_queries)}) AND NOT _exists_:whois_checked' logger.info(f"Query: {query}") events = self.event_stream( query_string=query, return_fields=self.IP_FIELDS + ['whois_checked'] ) total_processed = 0 enriched_count = 0 try: current_batch = [] for event in events: current_batch.append(event) self.debug_stats['total_events'] += 1 # Debug: Log event fields ip_fields_present = [] for field in self.IP_FIELDS: value = event.source.get(field) if value: ip_fields_present.append(f"{field}={value}") if ip_fields_present: logger.debug(f"Event {self.debug_stats['total_events']} has IP fields: {ip_fields_present}") self.debug_stats['events_with_ips'] += 1 else: logger.debug(f"Event {self.debug_stats['total_events']} has no IP fields") if len(current_batch) >= self.batch_size: processed, enriched = self._process_batch(current_batch) total_processed += processed enriched_count += enriched current_batch = [] # Rate limiting if self.rate_limit_delay > 0: time.sleep(self.rate_limit_delay) # Log progress if total_processed % (self.batch_size * 5) == 0: logger.info(f"Progress: {total_processed} processed, {enriched_count} enriched") logger.info(f"Debug stats: {self.debug_stats}") # Process remaining events if current_batch: processed, enriched = self._process_batch(current_batch) total_processed += processed enriched_count += enriched except Exception as e: logger.error(f"Error during WHOIS processing: {e}", exc_info=True) # Create a view if we enriched any events if enriched_count > 0: self.sketch.add_view( view_name="WHOIS Enriched Events", analyzer_name=self.NAME, query_string='tag:"whois-enriched"' ) # Final debug summary logger.info(f"WHOIS enrichment complete:") logger.info(f" - Total events processed: {total_processed}") logger.info(f" - Events enriched: {enriched_count}") logger.info(f" - Debug stats: {self.debug_stats}") return f"Processed {total_processed} events, enriched {enriched_count} with WHOIS data. Debug stats: {self.debug_stats}" def _process_batch(self, events): """Process a batch of events.""" processed_count = 0 enriched_count = 0 for event in events: processed_count += 1 event_enriched = False # Check each IP field in the event for ip_field in self.IP_FIELDS: ip_value = event.source.get(ip_field) if not ip_value: continue # Handle both single IP and list of IPs if isinstance(ip_value, str): ip_addresses = [ip_value] else: ip_addresses = ip_value if isinstance(ip_value, list) else [str(ip_value)] for ip_addr in ip_addresses: if not self._validate_ip(ip_addr): self.debug_stats['invalid_ips_found'] += 1 continue self.debug_stats['valid_ips_found'] += 1 if ip_addr in self.processed_ips: logger.debug(f"IP {ip_addr} already processed") continue self.processed_ips.add(ip_addr) # Get WHOIS data whois_data = self._get_whois_data(ip_addr) if whois_data: self._enrich_event(event, ip_field, whois_data) enriched_count += 1 event_enriched = True logger.info(f"Enriched {ip_addr} with WHOIS data") break # Only enrich once per event else: logger.debug(f"No WHOIS data for {ip_addr}") # Mark event as checked even if no enrichment occurred if not event_enriched: try: event.add_attributes({'whois_checked': True, 'whois_no_data': True}) event.commit() except Exception as e: logger.error(f"Error marking event as checked: {e}") return processed_count, enriched_count # Register the analyzer manager.AnalysisManager.register_analyzer(WhoisEnrichmentAnalyzer)