"""Index analyzer plugin for WHOIS data enrichment - Production version.""" import ipaddress import logging import time from typing import Dict, Optional, Set import requests from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.whois_enrichment") class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): """Analyzer to enrich IP addresses with WHOIS/ASN data.""" NAME = 'whois_enrichment' DISPLAY_NAME = 'WHOIS IP Enrichment' DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs' IP_FIELDS = [ 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip', 'ip_address', 'client_ip', 'address', 'saddr', 'daddr' ] def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10) self.whois_cache: Dict[str, Optional[Dict]] = {} def _validate_ip(self, ip_address: str) -> bool: """Validate IP address.""" try: ip = ipaddress.ip_address(ip_address.strip()) return not (ip.is_loopback or ip.is_multicast or ip.is_link_local) except (ValueError, AttributeError): return False def _get_whois_data(self, ip_address: str) -> Optional[Dict]: """Get WHOIS data via API.""" if ip_address in self.whois_cache: return self.whois_cache[ip_address] try: fields = "status,country,countryCode,region,regionName,city,isp,org,as,asname,mobile,proxy,hosting" url = f"http://ip-api.com/json/{ip_address}?fields={fields}" response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() if data.get('status') == 'success': # Parse ASN from 'as' field as_info = data.get('as', '') asn = None asn_description = None if as_info and as_info.startswith('AS'): parts = as_info.split(' ', 1) asn = parts[0][2:] if len(parts) > 1: asn_description = parts[1] result = { 'asn': asn, 'asn_description': asn_description, 'asn_name': data.get('asname'), 'isp': data.get('isp'), 'organization': data.get('org'), 'country': data.get('country'), 'country_code': data.get('countryCode'), 'region': data.get('regionName'), 'city': data.get('city'), 'is_mobile': data.get('mobile'), 'is_proxy': data.get('proxy'), 'is_hosting': data.get('hosting') } # Remove None values result = {k: v for k, v in result.items() if v is not None} self.whois_cache[ip_address] = result return result self.whois_cache[ip_address] = None return None except Exception as e: logger.error(f"API error for {ip_address}: {e}") self.whois_cache[ip_address] = None return None def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict): """Add WHOIS data to event.""" try: enrichment = {'whois_checked': True} # ASN information if whois_data.get('asn'): enrichment[f'{ip_field}_asn'] = whois_data['asn'] if whois_data.get('asn_name'): enrichment[f'{ip_field}_asn_name'] = whois_data['asn_name'] if whois_data.get('asn_description'): enrichment[f'{ip_field}_asn_description'] = whois_data['asn_description'] # Organization info if whois_data.get('organization'): enrichment[f'{ip_field}_organization'] = whois_data['organization'] if whois_data.get('isp'): enrichment[f'{ip_field}_isp'] = whois_data['isp'] # Location info if whois_data.get('country'): enrichment[f'{ip_field}_whois_country'] = whois_data['country'] if whois_data.get('country_code'): enrichment[f'{ip_field}_whois_country_code'] = whois_data['country_code'] if whois_data.get('region'): enrichment[f'{ip_field}_whois_region'] = whois_data['region'] if whois_data.get('city'): enrichment[f'{ip_field}_whois_city'] = whois_data['city'] # Additional flags if whois_data.get('is_mobile') is not None: enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile'] if whois_data.get('is_proxy') is not None: enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy'] if whois_data.get('is_hosting') is not None: enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting'] event.add_attributes(enrichment) event.add_tags(['whois-enriched']) event.commit() except Exception as e: logger.error(f"Error enriching event for {ip_address}: {e}") try: event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.commit() except Exception: pass def run(self): """Main analyzer logic.""" logger.info("Starting WHOIS enrichment analysis") # Query ALL events with IP fields, ignoring previous processing ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] query = f'({" OR ".join(ip_queries)})' events = self.event_stream( query_string=query, return_fields=self.IP_FIELDS ) events_processed = 0 enriched_count = 0 try: for event in events: events_processed += 1 # Find first valid IP in this event and enrich it for ip_field in self.IP_FIELDS: ip_value = event.source.get(ip_field) if not ip_value: continue # Handle string or list of IPs ip_list = [ip_value] if isinstance(ip_value, str) else (ip_value if isinstance(ip_value, list) else []) for ip_addr in ip_list: ip_str = str(ip_addr).strip() if not self._validate_ip(ip_str): continue # Process EVERY IP, no duplicate checking whois_data = self._get_whois_data(ip_str) # Uses cache to avoid duplicate API calls if whois_data: self._enrich_event(event, ip_field, ip_str, whois_data) enriched_count += 1 break # Only enrich first valid IP per event # Rate limiting if events_processed % self.batch_size == 0: if self.rate_limit_delay > 0: time.sleep(self.rate_limit_delay) if events_processed % (self.batch_size * 10) == 0: logger.info(f"Progress: {events_processed} processed, {enriched_count} enriched") except Exception as e: logger.error(f"Error during processing: {e}") # Create view if we have enriched events if enriched_count > 0: self.sketch.add_view( view_name="WHOIS Enriched Events", analyzer_name=self.NAME, query_string='tag:"whois-enriched"' ) logger.info(f"WHOIS analysis complete: {enriched_count}/{events_processed} events enriched") return f"Processed {events_processed} events, enriched {enriched_count} with WHOIS data" # Register the analyzer manager.AnalysisManager.register_analyzer(WhoisEnrichmentAnalyzer)