From 65780be815a22df9734003e8d29fc0100556603e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20St=C3=B6ckl?= Date: Mon, 25 Aug 2025 19:43:40 +0000 Subject: [PATCH] whois_analyzer.py aktualisiert --- whois_analyzer.py | 460 +++++++++++++++++----------------------------- 1 file changed, 165 insertions(+), 295 deletions(-) diff --git a/whois_analyzer.py b/whois_analyzer.py index 9893c3a..826eeb7 100644 --- a/whois_analyzer.py +++ b/whois_analyzer.py @@ -1,9 +1,8 @@ -"""Index analyzer plugin for WHOIS data enrichment - Debug Version.""" +"""Index analyzer plugin for WHOIS data enrichment - API-Only Version.""" import ipaddress import logging import time -import os from typing import Dict, Optional, Set import requests @@ -12,426 +11,297 @@ from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager -# Try to import whois library, with fallback handling -try: - import whois - HAS_WHOIS = True -except ImportError: - HAS_WHOIS = False - logger = logging.getLogger("timesketch.analyzers.whois_enrichment") class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): - """Analyzer to enrich IP addresses with WHOIS data.""" + """Analyzer to enrich IP addresses with WHOIS/ASN data using APIs only.""" NAME = 'whois_enrichment' - DISPLAY_NAME = 'WHOIS IP Enrichment' - DESCRIPTION = 'Enriches source IP addresses with WHOIS/ASN data' + DISPLAY_NAME = 'WHOIS IP Enrichment' + DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs' - # Common IP fields to check (same as GeoIP analyzer for consistency) + # IP fields to check (consistent with GeoIP analyzer) IP_FIELDS = [ - 'ip', - 'host_ip', - 'src_ip', - 'dst_ip', - 'source_ip', - 'dest_ip', - 'ip_address', - 'client_ip', - 'address', - 'saddr', - 'daddr' + 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip', + 'ip_address', 'client_ip', 'address', 'saddr', 'daddr' ] def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) - # Configuration options - self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 50) - self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 1.0) - self.max_retries = current_app.config.get('WHOIS_MAX_RETRIES', 2) - self.timeout = current_app.config.get('WHOIS_TIMEOUT', 30) + # Configuration + self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) # Reduced for API limits + self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) # Increased + self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10) # Cache to avoid duplicate queries self.whois_cache: Dict[str, Optional[Dict]] = {} self.processed_ips: Set[str] = set() - # Debug counters - self.debug_stats = { - 'total_events': 0, - 'events_with_ips': 0, - 'valid_ips_found': 0, - 'invalid_ips_found': 0, - 'api_calls_made': 0, + # Stats + self.stats = { + 'events_processed': 0, + 'valid_ips_found': 0, + 'api_calls': 0, 'api_successes': 0, 'api_failures': 0, - 'whois_lib_available': HAS_WHOIS + 'cached_results': 0 } - - logger.info(f"WHOIS Analyzer initialized. python-whois available: {HAS_WHOIS}") def _validate_ip(self, ip_address: str) -> bool: - """Validate an IP address for analysis. - - Args: - ip_address: The IP address to validate - - Returns: - True if IP is valid and should be processed - """ + """Validate IP address - less restrictive for better coverage.""" try: - ip_str = ip_address.strip() - ip = ipaddress.ip_address(ip_str) + ip = ipaddress.ip_address(ip_address.strip()) - # DEBUG: Log all IPs being validated - logger.debug(f"Validating IP: {ip_str} - is_global: {ip.is_global}, is_private: {ip.is_private}") - - # Be less restrictive than just is_global - include more IPs for testing - if ip.is_private or ip.is_loopback or ip.is_multicast: - logger.debug(f"Skipping private/loopback/multicast IP: {ip_str}") + # Skip only obvious invalid cases + if ip.is_loopback or ip.is_multicast or ip.is_link_local: return False - - # Accept global IPs and also some reserved ranges that might have WHOIS data + + # Accept both private and public IPs (some private ranges have ASN data) return True - except (ValueError, AttributeError) as e: - logger.debug(f"Invalid IP address format: {ip_address} - {e}") + except (ValueError, AttributeError): return False - def _get_asn_data_via_api(self, ip_address: str) -> Optional[Dict]: - """Get ASN data using a free API service as fallback.""" + def _get_asn_data_via_ipapi(self, ip_address: str) -> Optional[Dict]: + """Get ASN data using ip-api.com (150 requests/minute free tier).""" try: - self.debug_stats['api_calls_made'] += 1 + self.stats['api_calls'] += 1 - # Using ip-api.com which has a free tier (150 requests per minute) - url = f"http://ip-api.com/json/{ip_address}?fields=status,message,as,asname,isp,org,country,regionName,city" + # Comprehensive field list for ip-api.com + fields = "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting" + url = f"http://ip-api.com/json/{ip_address}?fields={fields}" - logger.debug(f"Making API call to: {url}") + logger.debug(f"API call: {url}") response = requests.get(url, timeout=self.timeout) - logger.debug(f"API response status: {response.status_code}") if response.status_code == 200: data = response.json() - logger.debug(f"API response data: {data}") if data.get('status') == 'success': - # Parse ASN number from 'as' field (format: "AS15169 Google LLC") + # Parse ASN from 'as' field (format: "AS15169 Google LLC") as_info = data.get('as', '') asn = None + asn_description = None + if as_info and as_info.startswith('AS'): - asn = as_info.split()[0][2:] # Remove 'AS' prefix + parts = as_info.split(' ', 1) + asn = parts[0][2:] # Remove 'AS' prefix + if len(parts) > 1: + asn_description = parts[1] result = { + 'source': 'ip-api.com', 'asn': asn, + 'asn_description': asn_description, 'asn_name': data.get('asname'), 'isp': data.get('isp'), 'organization': data.get('org'), 'country': data.get('country'), + 'country_code': data.get('countryCode'), 'region': data.get('regionName'), - 'city': data.get('city') + 'city': data.get('city'), + 'latitude': data.get('lat'), + 'longitude': data.get('lon'), + 'timezone': data.get('timezone'), + 'is_mobile': data.get('mobile'), + 'is_proxy': data.get('proxy'), + 'is_hosting': data.get('hosting') } - self.debug_stats['api_successes'] += 1 - logger.debug(f"API lookup successful for {ip_address}: {result}") + # Remove None values + result = {k: v for k, v in result.items() if v is not None} + + self.stats['api_successes'] += 1 + logger.info(f"✅ Successfully retrieved data for {ip_address}") return result else: - logger.debug(f"API returned failure status for {ip_address}: {data.get('message', 'Unknown error')}") - self.debug_stats['api_failures'] += 1 + logger.debug(f"❌ API returned error for {ip_address}: {data.get('message')}") + self.stats['api_failures'] += 1 else: - logger.warning(f"API request failed with status {response.status_code}") - self.debug_stats['api_failures'] += 1 + logger.warning(f"❌ HTTP {response.status_code} for {ip_address}") + self.stats['api_failures'] += 1 return None + except requests.exceptions.Timeout: + logger.warning(f"⏰ API timeout for {ip_address}") + self.stats['api_failures'] += 1 + return None except Exception as e: - logger.error(f"API lookup failed for {ip_address}: {e}") - self.debug_stats['api_failures'] += 1 - return None - - def _get_whois_data_python_whois(self, ip_address: str) -> Optional[Dict]: - """Get WHOIS data using python-whois library.""" - if not HAS_WHOIS: - logger.debug("python-whois library not available") - return None - - try: - logger.debug(f"Attempting python-whois lookup for {ip_address}") - w = whois.whois(ip_address) - - # Extract relevant information - data = {} - - # Network information - if hasattr(w, 'nets') and w.nets: - net = w.nets[0] if isinstance(w.nets, list) else w.nets - data['network_name'] = getattr(net, 'name', None) - data['network_range'] = getattr(net, 'range', None) - data['network_type'] = getattr(net, 'type', None) - - # ASN information - if hasattr(w, 'asn'): - data['asn'] = w.asn - if hasattr(w, 'asn_description'): - data['asn_description'] = w.asn_description - - # Organization information - if hasattr(w, 'org'): - data['organization'] = w.org - if hasattr(w, 'address'): - data['address'] = w.address - if hasattr(w, 'city'): - data['city'] = w.city - if hasattr(w, 'state'): - data['state'] = w.state - if hasattr(w, 'country'): - data['country'] = w.country - - # Registration dates - if hasattr(w, 'creation_date'): - data['creation_date'] = str(w.creation_date) - if hasattr(w, 'updated_date'): - data['updated_date'] = str(w.updated_date) - - if data: - logger.debug(f"python-whois lookup successful for {ip_address}: {data}") - else: - logger.debug(f"python-whois returned no data for {ip_address}") - - return data if data else None - - except Exception as e: - logger.debug(f"Python-whois lookup failed for {ip_address}: {e}") + logger.error(f"💥 API error for {ip_address}: {e}") + self.stats['api_failures'] += 1 return None def _get_whois_data(self, ip_address: str) -> Optional[Dict]: - """Get WHOIS data for an IP address using available methods.""" + """Get WHOIS data for IP address.""" if ip_address in self.whois_cache: - logger.debug(f"Using cached WHOIS data for {ip_address}") + self.stats['cached_results'] += 1 return self.whois_cache[ip_address] - whois_data = None + # Use API-only approach + whois_data = self._get_asn_data_via_ipapi(ip_address) - # Try python-whois first if available - if HAS_WHOIS: - whois_data = self._get_whois_data_python_whois(ip_address) - - # Fallback to API if python-whois failed or unavailable - if not whois_data: - whois_data = self._get_asn_data_via_api(ip_address) - - # Cache the result (even if None) + # Cache result (even if None to avoid repeated failed lookups) self.whois_cache[ip_address] = whois_data - - if whois_data: - logger.info(f"Successfully retrieved WHOIS data for {ip_address}") - else: - logger.debug(f"No WHOIS data found for {ip_address}") - return whois_data - def _enrich_event(self, event, ip_field: str, whois_data: Dict): - """Add WHOIS data to the event.""" + def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict): + """Add WHOIS data to event.""" try: - # Create enrichment attributes with field-specific naming - enrichment = {'whois_checked': True} + enrichment = { + 'whois_checked': True, + f'{ip_field}_whois_source': whois_data.get('source', 'unknown') + } - # Add ASN information + # ASN information if whois_data.get('asn'): enrichment[f'{ip_field}_asn'] = whois_data['asn'] - if whois_data.get('asn_name') or whois_data.get('asn_description'): - asn_name = whois_data.get('asn_name') or whois_data.get('asn_description') - enrichment[f'{ip_field}_asn_name'] = asn_name - - # Add organization information + if whois_data.get('asn_name'): + enrichment[f'{ip_field}_asn_name'] = whois_data['asn_name'] + if whois_data.get('asn_description'): + enrichment[f'{ip_field}_asn_description'] = whois_data['asn_description'] + + # Organization info if whois_data.get('organization'): enrichment[f'{ip_field}_organization'] = whois_data['organization'] if whois_data.get('isp'): enrichment[f'{ip_field}_isp'] = whois_data['isp'] - - # Add network information - if whois_data.get('network_name'): - enrichment[f'{ip_field}_network_name'] = whois_data['network_name'] - if whois_data.get('network_range'): - enrichment[f'{ip_field}_network_range'] = whois_data['network_range'] - if whois_data.get('network_type'): - enrichment[f'{ip_field}_network_type'] = whois_data['network_type'] - - # Add location information (if not covered by GeoIP) + + # Location (prefix with 'whois' to avoid conflicts with GeoIP) if whois_data.get('country'): enrichment[f'{ip_field}_whois_country'] = whois_data['country'] + if whois_data.get('country_code'): + enrichment[f'{ip_field}_whois_country_code'] = whois_data['country_code'] if whois_data.get('region'): enrichment[f'{ip_field}_whois_region'] = whois_data['region'] if whois_data.get('city'): enrichment[f'{ip_field}_whois_city'] = whois_data['city'] - - # Add registration dates - if whois_data.get('creation_date'): - enrichment[f'{ip_field}_creation_date'] = whois_data['creation_date'] - if whois_data.get('updated_date'): - enrichment[f'{ip_field}_updated_date'] = whois_data['updated_date'] - logger.debug(f"Adding enrichment data: {enrichment}") + # Additional metadata + if whois_data.get('timezone'): + enrichment[f'{ip_field}_timezone'] = whois_data['timezone'] + if whois_data.get('is_mobile') is not None: + enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile'] + if whois_data.get('is_proxy') is not None: + enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy'] + if whois_data.get('is_hosting') is not None: + enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting'] event.add_attributes(enrichment) event.add_tags(['whois-enriched']) event.commit() - logger.info(f"Successfully enriched event for {ip_field}") + logger.info(f"✅ Enriched event for {ip_address} ({ip_field})") except Exception as e: - logger.error(f"Error enriching event for {ip_field}: {e}") - # Still mark as checked to avoid reprocessing + logger.error(f"💥 Error enriching event for {ip_address}: {e}") + # Mark as checked to avoid retry loops try: event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.commit() - except Exception as commit_error: - logger.error(f"Error marking event as checked: {commit_error}") + except Exception: + pass def run(self): """Main analyzer logic.""" - logger.info("Starting WHOIS enrichment analysis") - logger.info(f"Debug stats: {self.debug_stats}") + logger.info("🚀 Starting WHOIS enrichment analysis") - # Test a single known IP to verify API connectivity - test_ip = "8.8.8.8" - logger.info(f"Testing API connectivity with {test_ip}") - test_result = self._get_asn_data_via_api(test_ip) - if test_result: - logger.info(f"API test successful: {test_result}") + # Test API connectivity first + test_result = self._get_whois_data("8.8.8.8") + if not test_result: + return "❌ API connectivity test failed - check internet connection" else: - logger.warning("API test failed - this may indicate connectivity issues") + logger.info(f"✅ API connectivity test passed: {test_result}") - # Build query for events with IP fields that haven't been checked - ip_exists_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] - query = f'({" OR ".join(ip_exists_queries)}) AND NOT _exists_:whois_checked' + # Query for events with IP fields, excluding already processed ones + ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] + query = f'({" OR ".join(ip_queries)}) AND NOT _exists_:whois_checked' - logger.info(f"Query: {query}") + logger.info(f"📝 Query: {query}") events = self.event_stream( - query_string=query, + query_string=query, return_fields=self.IP_FIELDS + ['whois_checked'] ) - total_processed = 0 enriched_count = 0 try: - current_batch = [] - for event in events: - current_batch.append(event) - self.debug_stats['total_events'] += 1 + self.stats['events_processed'] += 1 - # Debug: Log event fields - ip_fields_present = [] - for field in self.IP_FIELDS: - value = event.source.get(field) - if value: - ip_fields_present.append(f"{field}={value}") + # Process IP fields in this event + enriched_this_event = False - if ip_fields_present: - logger.debug(f"Event {self.debug_stats['total_events']} has IP fields: {ip_fields_present}") - self.debug_stats['events_with_ips'] += 1 - else: - logger.debug(f"Event {self.debug_stats['total_events']} has no IP fields") - - if len(current_batch) >= self.batch_size: - processed, enriched = self._process_batch(current_batch) - total_processed += processed - enriched_count += enriched - current_batch = [] + for ip_field in self.IP_FIELDS: + ip_value = event.source.get(ip_field) + if not ip_value: + continue - # Rate limiting + # Handle string or list of IPs + ip_list = [ip_value] if isinstance(ip_value, str) else (ip_value if isinstance(ip_value, list) else []) + + for ip_addr in ip_list: + ip_str = str(ip_addr).strip() + + if not self._validate_ip(ip_str): + continue + + self.stats['valid_ips_found'] += 1 + + if ip_str not in self.processed_ips: + self.processed_ips.add(ip_str) + + whois_data = self._get_whois_data(ip_str) + + if whois_data: + self._enrich_event(event, ip_field, ip_str, whois_data) + enriched_count += 1 + enriched_this_event = True + break # One enrichment per event + + # Mark as checked even if no enrichment + if not enriched_this_event: + try: + event.add_attributes({'whois_checked': True, 'whois_no_data': True}) + event.commit() + except Exception as e: + logger.error(f"Error marking event: {e}") + + # Rate limiting + if self.stats['events_processed'] % self.batch_size == 0: + logger.info(f"📊 Progress: {self.stats}") if self.rate_limit_delay > 0: time.sleep(self.rate_limit_delay) - # Log progress - if total_processed % (self.batch_size * 5) == 0: - logger.info(f"Progress: {total_processed} processed, {enriched_count} enriched") - logger.info(f"Debug stats: {self.debug_stats}") - - # Process remaining events - if current_batch: - processed, enriched = self._process_batch(current_batch) - total_processed += processed - enriched_count += enriched - except Exception as e: - logger.error(f"Error during WHOIS processing: {e}", exc_info=True) - - # Create a view if we enriched any events + logger.error(f"💥 Error during processing: {e}", exc_info=True) + + # Create view if we have enriched events if enriched_count > 0: self.sketch.add_view( view_name="WHOIS Enriched Events", analyzer_name=self.NAME, query_string='tag:"whois-enriched"' ) - - # Final debug summary - logger.info(f"WHOIS enrichment complete:") - logger.info(f" - Total events processed: {total_processed}") - logger.info(f" - Events enriched: {enriched_count}") - logger.info(f" - Debug stats: {self.debug_stats}") - return f"Processed {total_processed} events, enriched {enriched_count} with WHOIS data. Debug stats: {self.debug_stats}" - - def _process_batch(self, events): - """Process a batch of events.""" - processed_count = 0 - enriched_count = 0 + # Final summary + success_rate = (self.stats['api_successes'] / max(1, self.stats['api_calls'])) * 100 - for event in events: - processed_count += 1 - event_enriched = False - - # Check each IP field in the event - for ip_field in self.IP_FIELDS: - ip_value = event.source.get(ip_field) - if not ip_value: - continue - - # Handle both single IP and list of IPs - if isinstance(ip_value, str): - ip_addresses = [ip_value] - else: - ip_addresses = ip_value if isinstance(ip_value, list) else [str(ip_value)] - - for ip_addr in ip_addresses: - if not self._validate_ip(ip_addr): - self.debug_stats['invalid_ips_found'] += 1 - continue - - self.debug_stats['valid_ips_found'] += 1 - - if ip_addr in self.processed_ips: - logger.debug(f"IP {ip_addr} already processed") - continue - - self.processed_ips.add(ip_addr) - - # Get WHOIS data - whois_data = self._get_whois_data(ip_addr) - - if whois_data: - self._enrich_event(event, ip_field, whois_data) - enriched_count += 1 - event_enriched = True - logger.info(f"Enriched {ip_addr} with WHOIS data") - break # Only enrich once per event - else: - logger.debug(f"No WHOIS data for {ip_addr}") - - # Mark event as checked even if no enrichment occurred - if not event_enriched: - try: - event.add_attributes({'whois_checked': True, 'whois_no_data': True}) - event.commit() - except Exception as e: - logger.error(f"Error marking event as checked: {e}") + summary = ( + f"📈 WHOIS analysis complete: " + f"{enriched_count} events enriched, " + f"{self.stats['valid_ips_found']} valid IPs found, " + f"API success rate: {success_rate:.1f}%" + ) - return processed_count, enriched_count + logger.info(summary) + logger.info(f"📊 Final stats: {self.stats}") + + return summary # Register the analyzer