From bcbe7a8bd730de881825ca54f7a01fad564313ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20St=C3=B6ckl?= Date: Mon, 25 Aug 2025 19:51:03 +0000 Subject: [PATCH] whois_analyzer.py aktualisiert --- whois_analyzer.py | 168 +++++++++++----------------------------------- 1 file changed, 40 insertions(+), 128 deletions(-) diff --git a/whois_analyzer.py b/whois_analyzer.py index 826eeb7..8b7850b 100644 --- a/whois_analyzer.py +++ b/whois_analyzer.py @@ -1,4 +1,4 @@ -"""Index analyzer plugin for WHOIS data enrichment - API-Only Version.""" +"""Index analyzer plugin for WHOIS data enrichment - Production version.""" import ipaddress import logging @@ -15,13 +15,12 @@ logger = logging.getLogger("timesketch.analyzers.whois_enrichment") class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): - """Analyzer to enrich IP addresses with WHOIS/ASN data using APIs only.""" + """Analyzer to enrich IP addresses with WHOIS/ASN data.""" NAME = 'whois_enrichment' DISPLAY_NAME = 'WHOIS IP Enrichment' DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs' - # IP fields to check (consistent with GeoIP analyzer) IP_FIELDS = [ 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip', 'ip_address', 'client_ip', 'address', 'saddr', 'daddr' @@ -30,131 +29,80 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): def __init__(self, index_name, sketch_id, timeline_id=None): super().__init__(index_name, sketch_id, timeline_id) - # Configuration - self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) # Reduced for API limits - self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) # Increased + self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) + self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10) - # Cache to avoid duplicate queries self.whois_cache: Dict[str, Optional[Dict]] = {} self.processed_ips: Set[str] = set() - - # Stats - self.stats = { - 'events_processed': 0, - 'valid_ips_found': 0, - 'api_calls': 0, - 'api_successes': 0, - 'api_failures': 0, - 'cached_results': 0 - } def _validate_ip(self, ip_address: str) -> bool: - """Validate IP address - less restrictive for better coverage.""" + """Validate IP address.""" try: ip = ipaddress.ip_address(ip_address.strip()) - - # Skip only obvious invalid cases - if ip.is_loopback or ip.is_multicast or ip.is_link_local: - return False - - # Accept both private and public IPs (some private ranges have ASN data) - return True - + return not (ip.is_loopback or ip.is_multicast or ip.is_link_local) except (ValueError, AttributeError): return False - def _get_asn_data_via_ipapi(self, ip_address: str) -> Optional[Dict]: - """Get ASN data using ip-api.com (150 requests/minute free tier).""" + def _get_whois_data(self, ip_address: str) -> Optional[Dict]: + """Get WHOIS data via API.""" + if ip_address in self.whois_cache: + return self.whois_cache[ip_address] + try: - self.stats['api_calls'] += 1 - - # Comprehensive field list for ip-api.com - fields = "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting" + fields = "status,country,countryCode,region,regionName,city,isp,org,as,asname,mobile,proxy,hosting" url = f"http://ip-api.com/json/{ip_address}?fields={fields}" - logger.debug(f"API call: {url}") - response = requests.get(url, timeout=self.timeout) if response.status_code == 200: data = response.json() if data.get('status') == 'success': - # Parse ASN from 'as' field (format: "AS15169 Google LLC") + # Parse ASN from 'as' field as_info = data.get('as', '') asn = None asn_description = None if as_info and as_info.startswith('AS'): parts = as_info.split(' ', 1) - asn = parts[0][2:] # Remove 'AS' prefix + asn = parts[0][2:] if len(parts) > 1: asn_description = parts[1] result = { - 'source': 'ip-api.com', 'asn': asn, 'asn_description': asn_description, 'asn_name': data.get('asname'), 'isp': data.get('isp'), 'organization': data.get('org'), 'country': data.get('country'), - 'country_code': data.get('countryCode'), + 'country_code': data.get('countryCode'), 'region': data.get('regionName'), 'city': data.get('city'), - 'latitude': data.get('lat'), - 'longitude': data.get('lon'), - 'timezone': data.get('timezone'), 'is_mobile': data.get('mobile'), - 'is_proxy': data.get('proxy'), + 'is_proxy': data.get('proxy'), 'is_hosting': data.get('hosting') } # Remove None values result = {k: v for k, v in result.items() if v is not None} - self.stats['api_successes'] += 1 - logger.info(f"✅ Successfully retrieved data for {ip_address}") + self.whois_cache[ip_address] = result return result - else: - logger.debug(f"❌ API returned error for {ip_address}: {data.get('message')}") - self.stats['api_failures'] += 1 - else: - logger.warning(f"❌ HTTP {response.status_code} for {ip_address}") - self.stats['api_failures'] += 1 + self.whois_cache[ip_address] = None return None - except requests.exceptions.Timeout: - logger.warning(f"⏰ API timeout for {ip_address}") - self.stats['api_failures'] += 1 - return None except Exception as e: - logger.error(f"💥 API error for {ip_address}: {e}") - self.stats['api_failures'] += 1 + logger.error(f"API error for {ip_address}: {e}") + self.whois_cache[ip_address] = None return None - def _get_whois_data(self, ip_address: str) -> Optional[Dict]: - """Get WHOIS data for IP address.""" - if ip_address in self.whois_cache: - self.stats['cached_results'] += 1 - return self.whois_cache[ip_address] - - # Use API-only approach - whois_data = self._get_asn_data_via_ipapi(ip_address) - - # Cache result (even if None to avoid repeated failed lookups) - self.whois_cache[ip_address] = whois_data - return whois_data - def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict): """Add WHOIS data to event.""" try: - enrichment = { - 'whois_checked': True, - f'{ip_field}_whois_source': whois_data.get('source', 'unknown') - } + enrichment = {'whois_checked': True} # ASN information if whois_data.get('asn'): @@ -170,7 +118,7 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): if whois_data.get('isp'): enrichment[f'{ip_field}_isp'] = whois_data['isp'] - # Location (prefix with 'whois' to avoid conflicts with GeoIP) + # Location info if whois_data.get('country'): enrichment[f'{ip_field}_whois_country'] = whois_data['country'] if whois_data.get('country_code'): @@ -180,13 +128,11 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): if whois_data.get('city'): enrichment[f'{ip_field}_whois_city'] = whois_data['city'] - # Additional metadata - if whois_data.get('timezone'): - enrichment[f'{ip_field}_timezone'] = whois_data['timezone'] + # Additional flags if whois_data.get('is_mobile') is not None: enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile'] if whois_data.get('is_proxy') is not None: - enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy'] + enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy'] if whois_data.get('is_hosting') is not None: enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting'] @@ -194,11 +140,8 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): event.add_tags(['whois-enriched']) event.commit() - logger.info(f"✅ Enriched event for {ip_address} ({ip_field})") - except Exception as e: - logger.error(f"💥 Error enriching event for {ip_address}: {e}") - # Mark as checked to avoid retry loops + logger.error(f"Error enriching event for {ip_address}: {e}") try: event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.commit() @@ -207,35 +150,25 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): def run(self): """Main analyzer logic.""" - logger.info("🚀 Starting WHOIS enrichment analysis") + logger.info("Starting WHOIS enrichment analysis") - # Test API connectivity first - test_result = self._get_whois_data("8.8.8.8") - if not test_result: - return "❌ API connectivity test failed - check internet connection" - else: - logger.info(f"✅ API connectivity test passed: {test_result}") - - # Query for events with IP fields, excluding already processed ones + # Query ALL events with IP fields, ignoring previous processing ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] - query = f'({" OR ".join(ip_queries)}) AND NOT _exists_:whois_checked' - - logger.info(f"📝 Query: {query}") + query = f'({" OR ".join(ip_queries)})' events = self.event_stream( query_string=query, - return_fields=self.IP_FIELDS + ['whois_checked'] + return_fields=self.IP_FIELDS ) + events_processed = 0 enriched_count = 0 try: for event in events: - self.stats['events_processed'] += 1 - - # Process IP fields in this event - enriched_this_event = False + events_processed += 1 + # Find first valid IP in this event for ip_field in self.IP_FIELDS: ip_value = event.source.get(ip_field) if not ip_value: @@ -249,8 +182,6 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): if not self._validate_ip(ip_str): continue - - self.stats['valid_ips_found'] += 1 if ip_str not in self.processed_ips: self.processed_ips.add(ip_str) @@ -260,25 +191,18 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): if whois_data: self._enrich_event(event, ip_field, ip_str, whois_data) enriched_count += 1 - enriched_this_event = True - break # One enrichment per event - - # Mark as checked even if no enrichment - if not enriched_this_event: - try: - event.add_attributes({'whois_checked': True, 'whois_no_data': True}) - event.commit() - except Exception as e: - logger.error(f"Error marking event: {e}") + break # Rate limiting - if self.stats['events_processed'] % self.batch_size == 0: - logger.info(f"📊 Progress: {self.stats}") + if events_processed % self.batch_size == 0: if self.rate_limit_delay > 0: time.sleep(self.rate_limit_delay) + + if events_processed % (self.batch_size * 10) == 0: + logger.info(f"Progress: {events_processed} processed, {enriched_count} enriched") except Exception as e: - logger.error(f"💥 Error during processing: {e}", exc_info=True) + logger.error(f"Error during processing: {e}") # Create view if we have enriched events if enriched_count > 0: @@ -288,20 +212,8 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): query_string='tag:"whois-enriched"' ) - # Final summary - success_rate = (self.stats['api_successes'] / max(1, self.stats['api_calls'])) * 100 - - summary = ( - f"📈 WHOIS analysis complete: " - f"{enriched_count} events enriched, " - f"{self.stats['valid_ips_found']} valid IPs found, " - f"API success rate: {success_rate:.1f}%" - ) - - logger.info(summary) - logger.info(f"📊 Final stats: {self.stats}") - - return summary + logger.info(f"WHOIS analysis complete: {enriched_count}/{events_processed} events enriched") + return f"Processed {events_processed} events, enriched {enriched_count} with WHOIS data" # Register the analyzer