From 7c34d76f3a4884dba33ebe1cd036fddff9dd991f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20St=C3=B6ckl?= Date: Mon, 25 Aug 2025 13:15:20 +0000 Subject: [PATCH] =?UTF-8?q?whois=5Fanalyzer.py=20hinzugef=C3=BCgt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- whois_analyzer.py | 376 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 whois_analyzer.py diff --git a/whois_analyzer.py b/whois_analyzer.py new file mode 100644 index 0000000..07e25dc --- /dev/null +++ b/whois_analyzer.py @@ -0,0 +1,376 @@ +"""Index analyzer plugin for WHOIS data enrichment.""" + +import ipaddress +import logging +import time +import os +from typing import Dict, Optional, Set + +import requests +from flask import current_app + +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager + +# Try to import whois library, with fallback handling +try: + import whois + HAS_WHOIS = True +except ImportError: + HAS_WHOIS = False + +logger = logging.getLogger("timesketch.analyzers.whois_enrichment") + + +class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): + """Analyzer to enrich IP addresses with WHOIS data.""" + + NAME = 'whois_enrichment' + DISPLAY_NAME = 'WHOIS IP Enrichment' + DESCRIPTION = 'Enriches source IP addresses with WHOIS/ASN data' + + # Common IP fields to check (same as GeoIP analyzer for consistency) + IP_FIELDS = [ + 'ip', + 'host_ip', + 'src_ip', + 'dst_ip', + 'source_ip', + 'dest_ip', + 'ip_address', + 'client_ip', + 'address', + 'saddr', + 'daddr' + ] + + def __init__(self, index_name, sketch_id, timeline_id=None): + super().__init__(index_name, sketch_id, timeline_id) + + # Configuration options + self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 50) + self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 1.0) + self.max_retries = current_app.config.get('WHOIS_MAX_RETRIES', 2) + self.timeout = current_app.config.get('WHOIS_TIMEOUT', 30) + + # Cache to avoid duplicate queries + self.whois_cache: Dict[str, Optional[Dict]] = {} + self.processed_ips: Set[str] = set() + + def _validate_ip(self, ip_address: str) -> bool: + """Validate an IP address for analysis (same logic as GeoIP analyzer). + + Args: + ip_address: The IP address to validate + + Returns: + True if IP is valid and global (public) + """ + try: + ip = ipaddress.ip_address(ip_address.strip()) + return ip.is_global + except (ValueError, AttributeError): + return False + + def _get_asn_data_via_api(self, ip_address: str) -> Optional[Dict]: + """Get ASN data using a free API service as fallback. + + Args: + ip_address: IP address to lookup + + Returns: + Dictionary with ASN data or None + """ + try: + # Using ip-api.com which has a free tier + # Alternative: ipinfo.io, whoisapi.org, etc. + url = f"http://ip-api.com/json/{ip_address}?fields=as,asname,isp,org,country,regionName,city" + + response = requests.get(url, timeout=self.timeout) + if response.status_code == 200: + data = response.json() + if data.get('status') == 'success': + # Parse ASN number from 'as' field (format: "AS15169 Google LLC") + as_info = data.get('as', '') + asn = None + if as_info and as_info.startswith('AS'): + asn = as_info.split()[0][2:] # Remove 'AS' prefix + + return { + 'asn': asn, + 'asn_name': data.get('asname'), + 'isp': data.get('isp'), + 'organization': data.get('org'), + 'country': data.get('country'), + 'region': data.get('regionName'), + 'city': data.get('city') + } + + return None + + except Exception as e: + logger.debug(f"API lookup failed for {ip_address}: {e}") + return None + + def _get_whois_data_python_whois(self, ip_address: str) -> Optional[Dict]: + """Get WHOIS data using python-whois library. + + Args: + ip_address: IP address to lookup + + Returns: + Dictionary with WHOIS data or None + """ + if not HAS_WHOIS: + return None + + try: + w = whois.whois(ip_address) + + # Extract relevant information + data = {} + + # Network information + if hasattr(w, 'nets') and w.nets: + net = w.nets[0] if isinstance(w.nets, list) else w.nets + data['network_name'] = getattr(net, 'name', None) + data['network_range'] = getattr(net, 'range', None) + data['network_type'] = getattr(net, 'type', None) + + # ASN information + if hasattr(w, 'asn'): + data['asn'] = w.asn + if hasattr(w, 'asn_description'): + data['asn_description'] = w.asn_description + + # Organization information + if hasattr(w, 'org'): + data['organization'] = w.org + if hasattr(w, 'address'): + data['address'] = w.address + if hasattr(w, 'city'): + data['city'] = w.city + if hasattr(w, 'state'): + data['state'] = w.state + if hasattr(w, 'country'): + data['country'] = w.country + + # Registration dates + if hasattr(w, 'creation_date'): + data['creation_date'] = str(w.creation_date) + if hasattr(w, 'updated_date'): + data['updated_date'] = str(w.updated_date) + + return data if data else None + + except Exception as e: + logger.debug(f"Python-whois lookup failed for {ip_address}: {e}") + return None + + def _get_whois_data(self, ip_address: str) -> Optional[Dict]: + """Get WHOIS data for an IP address using available methods. + + Args: + ip_address: IP address to lookup + + Returns: + Dictionary with WHOIS data or None + """ + if ip_address in self.whois_cache: + return self.whois_cache[ip_address] + + whois_data = None + + # Try python-whois first if available + if HAS_WHOIS: + whois_data = self._get_whois_data_python_whois(ip_address) + + # Fallback to API if python-whois failed or unavailable + if not whois_data: + whois_data = self._get_asn_data_via_api(ip_address) + + # Cache the result (even if None) + self.whois_cache[ip_address] = whois_data + return whois_data + + def _enrich_event(self, event, ip_field: str, whois_data: Dict): + """Add WHOIS data to the event. + + Args: + event: The event object to enrich + ip_field: The field name containing the IP address + whois_data: Dictionary with WHOIS data + """ + try: + # Create enrichment attributes with field-specific naming + enrichment = {'whois_checked': True} + + # Add ASN information + if whois_data.get('asn'): + enrichment[f'{ip_field}_asn'] = whois_data['asn'] + if whois_data.get('asn_name') or whois_data.get('asn_description'): + asn_name = whois_data.get('asn_name') or whois_data.get('asn_description') + enrichment[f'{ip_field}_asn_name'] = asn_name + + # Add organization information + if whois_data.get('organization'): + enrichment[f'{ip_field}_organization'] = whois_data['organization'] + if whois_data.get('isp'): + enrichment[f'{ip_field}_isp'] = whois_data['isp'] + + # Add network information + if whois_data.get('network_name'): + enrichment[f'{ip_field}_network_name'] = whois_data['network_name'] + if whois_data.get('network_range'): + enrichment[f'{ip_field}_network_range'] = whois_data['network_range'] + if whois_data.get('network_type'): + enrichment[f'{ip_field}_network_type'] = whois_data['network_type'] + + # Add location information (if not covered by GeoIP) + if whois_data.get('country'): + enrichment[f'{ip_field}_whois_country'] = whois_data['country'] + if whois_data.get('region'): + enrichment[f'{ip_field}_whois_region'] = whois_data['region'] + if whois_data.get('city'): + enrichment[f'{ip_field}_whois_city'] = whois_data['city'] + + # Add registration dates + if whois_data.get('creation_date'): + enrichment[f'{ip_field}_creation_date'] = whois_data['creation_date'] + if whois_data.get('updated_date'): + enrichment[f'{ip_field}_updated_date'] = whois_data['updated_date'] + + event.add_attributes(enrichment) + event.add_tags(['whois-enriched']) + event.commit() + + except Exception as e: + logger.error(f"Error enriching event for {ip_field}: {e}") + # Still mark as checked to avoid reprocessing + try: + event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) + event.commit() + except Exception as commit_error: + logger.error(f"Error marking event as checked: {commit_error}") + + def run(self): + """Main analyzer logic.""" + logger.info("Starting WHOIS enrichment analysis") + + # Build query for events with IP fields that haven't been checked + ip_exists_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] + query = f'({" OR ".join(ip_exists_queries)}) AND NOT _exists_:whois_checked' + + events = self.event_stream( + query_string=query, + return_fields=self.IP_FIELDS + ['whois_checked'] + ) + + total_processed = 0 + enriched_count = 0 + + try: + current_batch = [] + + for event in events: + current_batch.append(event) + + if len(current_batch) >= self.batch_size: + processed, enriched = self._process_batch(current_batch) + total_processed += processed + enriched_count += enriched + current_batch = [] + + # Rate limiting + if self.rate_limit_delay > 0: + time.sleep(self.rate_limit_delay) + + # Log progress + if total_processed % (self.batch_size * 5) == 0: + logger.info(f"Progress: {total_processed} processed, {enriched_count} enriched") + + # Process remaining events + if current_batch: + processed, enriched = self._process_batch(current_batch) + total_processed += processed + enriched_count += enriched + + except Exception as e: + logger.error(f"Error during WHOIS processing: {e}") + + # Create a view if we enriched any events + if enriched_count > 0: + self.sketch.add_view( + view_name="WHOIS Enriched Events", + analyzer_name=self.NAME, + query_string='tag:"whois-enriched"' + ) + + logger.info(f"WHOIS enrichment complete: {total_processed} processed, {enriched_count} enriched") + return f"Processed {total_processed} events, enriched {enriched_count} with WHOIS data" + + def _process_batch(self, events): + """Process a batch of events. + + Args: + events: List of events to process + + Returns: + Tuple of (processed_count, enriched_count) + """ + processed_count = 0 + enriched_count = 0 + + for event in events: + processed_count += 1 + + # Check each IP field in the event + for ip_field in self.IP_FIELDS: + ip_value = event.source.get(ip_field) + if not ip_value: + continue + + # Handle both single IP and list of IPs + if isinstance(ip_value, str): + ip_addresses = [ip_value] + else: + ip_addresses = ip_value if isinstance(ip_value, list) else [str(ip_value)] + + for ip_addr in ip_addresses: + if not self._validate_ip(ip_addr): + continue + + if ip_addr in self.processed_ips: + continue + + self.processed_ips.add(ip_addr) + + # Get WHOIS data + whois_data = self._get_whois_data(ip_addr) + + if whois_data: + self._enrich_event(event, ip_field, whois_data) + enriched_count += 1 + logger.debug(f"Enriched {ip_addr} with WHOIS data") + else: + # Mark as checked even if no data found + event.add_attributes({'whois_checked': True, 'whois_no_data': True}) + event.commit() + logger.debug(f"No WHOIS data for {ip_addr}") + + # Break after first successful IP processing to avoid duplicate enrichment + break + else: + continue + break + + # If no valid IPs found, still mark as checked + if not any(event.source.get(field) for field in self.IP_FIELDS): + event.add_attributes({'whois_checked': True}) + event.commit() + + return processed_count, enriched_count + + +# Register the analyzer +manager.AnalysisManager.register_analyzer(WhoisEnrichmentAnalyzer) \ No newline at end of file