whois_analyzer.py aktualisiert

This commit is contained in:
Mario Stöckl 2025-08-25 19:51:03 +00:00
parent 65780be815
commit bcbe7a8bd7

View File

@ -1,4 +1,4 @@
"""Index analyzer plugin for WHOIS data enrichment - API-Only Version.""" """Index analyzer plugin for WHOIS data enrichment - Production version."""
import ipaddress import ipaddress
import logging import logging
@ -15,13 +15,12 @@ logger = logging.getLogger("timesketch.analyzers.whois_enrichment")
class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer): class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with WHOIS/ASN data using APIs only.""" """Analyzer to enrich IP addresses with WHOIS/ASN data."""
NAME = 'whois_enrichment' NAME = 'whois_enrichment'
DISPLAY_NAME = 'WHOIS IP Enrichment' DISPLAY_NAME = 'WHOIS IP Enrichment'
DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs' DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs'
# IP fields to check (consistent with GeoIP analyzer)
IP_FIELDS = [ IP_FIELDS = [
'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip', 'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip',
'ip_address', 'client_ip', 'address', 'saddr', 'daddr' 'ip_address', 'client_ip', 'address', 'saddr', 'daddr'
@ -30,131 +29,80 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
def __init__(self, index_name, sketch_id, timeline_id=None): def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id) super().__init__(index_name, sketch_id, timeline_id)
# Configuration self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25)
self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25) # Reduced for API limits self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0)
self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0) # Increased
self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10) self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10)
# Cache to avoid duplicate queries
self.whois_cache: Dict[str, Optional[Dict]] = {} self.whois_cache: Dict[str, Optional[Dict]] = {}
self.processed_ips: Set[str] = set() self.processed_ips: Set[str] = set()
# Stats
self.stats = {
'events_processed': 0,
'valid_ips_found': 0,
'api_calls': 0,
'api_successes': 0,
'api_failures': 0,
'cached_results': 0
}
def _validate_ip(self, ip_address: str) -> bool: def _validate_ip(self, ip_address: str) -> bool:
"""Validate IP address - less restrictive for better coverage.""" """Validate IP address."""
try: try:
ip = ipaddress.ip_address(ip_address.strip()) ip = ipaddress.ip_address(ip_address.strip())
return not (ip.is_loopback or ip.is_multicast or ip.is_link_local)
# Skip only obvious invalid cases
if ip.is_loopback or ip.is_multicast or ip.is_link_local:
return False
# Accept both private and public IPs (some private ranges have ASN data)
return True
except (ValueError, AttributeError): except (ValueError, AttributeError):
return False return False
def _get_asn_data_via_ipapi(self, ip_address: str) -> Optional[Dict]: def _get_whois_data(self, ip_address: str) -> Optional[Dict]:
"""Get ASN data using ip-api.com (150 requests/minute free tier).""" """Get WHOIS data via API."""
if ip_address in self.whois_cache:
return self.whois_cache[ip_address]
try: try:
self.stats['api_calls'] += 1 fields = "status,country,countryCode,region,regionName,city,isp,org,as,asname,mobile,proxy,hosting"
# Comprehensive field list for ip-api.com
fields = "status,message,country,countryCode,region,regionName,city,zip,lat,lon,timezone,isp,org,as,asname,mobile,proxy,hosting"
url = f"http://ip-api.com/json/{ip_address}?fields={fields}" url = f"http://ip-api.com/json/{ip_address}?fields={fields}"
logger.debug(f"API call: {url}")
response = requests.get(url, timeout=self.timeout) response = requests.get(url, timeout=self.timeout)
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
if data.get('status') == 'success': if data.get('status') == 'success':
# Parse ASN from 'as' field (format: "AS15169 Google LLC") # Parse ASN from 'as' field
as_info = data.get('as', '') as_info = data.get('as', '')
asn = None asn = None
asn_description = None asn_description = None
if as_info and as_info.startswith('AS'): if as_info and as_info.startswith('AS'):
parts = as_info.split(' ', 1) parts = as_info.split(' ', 1)
asn = parts[0][2:] # Remove 'AS' prefix asn = parts[0][2:]
if len(parts) > 1: if len(parts) > 1:
asn_description = parts[1] asn_description = parts[1]
result = { result = {
'source': 'ip-api.com',
'asn': asn, 'asn': asn,
'asn_description': asn_description, 'asn_description': asn_description,
'asn_name': data.get('asname'), 'asn_name': data.get('asname'),
'isp': data.get('isp'), 'isp': data.get('isp'),
'organization': data.get('org'), 'organization': data.get('org'),
'country': data.get('country'), 'country': data.get('country'),
'country_code': data.get('countryCode'), 'country_code': data.get('countryCode'),
'region': data.get('regionName'), 'region': data.get('regionName'),
'city': data.get('city'), 'city': data.get('city'),
'latitude': data.get('lat'),
'longitude': data.get('lon'),
'timezone': data.get('timezone'),
'is_mobile': data.get('mobile'), 'is_mobile': data.get('mobile'),
'is_proxy': data.get('proxy'), 'is_proxy': data.get('proxy'),
'is_hosting': data.get('hosting') 'is_hosting': data.get('hosting')
} }
# Remove None values # Remove None values
result = {k: v for k, v in result.items() if v is not None} result = {k: v for k, v in result.items() if v is not None}
self.stats['api_successes'] += 1 self.whois_cache[ip_address] = result
logger.info(f"✅ Successfully retrieved data for {ip_address}")
return result return result
else:
logger.debug(f"❌ API returned error for {ip_address}: {data.get('message')}")
self.stats['api_failures'] += 1
else:
logger.warning(f"❌ HTTP {response.status_code} for {ip_address}")
self.stats['api_failures'] += 1
self.whois_cache[ip_address] = None
return None return None
except requests.exceptions.Timeout:
logger.warning(f"⏰ API timeout for {ip_address}")
self.stats['api_failures'] += 1
return None
except Exception as e: except Exception as e:
logger.error(f"💥 API error for {ip_address}: {e}") logger.error(f"API error for {ip_address}: {e}")
self.stats['api_failures'] += 1 self.whois_cache[ip_address] = None
return None return None
def _get_whois_data(self, ip_address: str) -> Optional[Dict]:
"""Get WHOIS data for IP address."""
if ip_address in self.whois_cache:
self.stats['cached_results'] += 1
return self.whois_cache[ip_address]
# Use API-only approach
whois_data = self._get_asn_data_via_ipapi(ip_address)
# Cache result (even if None to avoid repeated failed lookups)
self.whois_cache[ip_address] = whois_data
return whois_data
def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict): def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict):
"""Add WHOIS data to event.""" """Add WHOIS data to event."""
try: try:
enrichment = { enrichment = {'whois_checked': True}
'whois_checked': True,
f'{ip_field}_whois_source': whois_data.get('source', 'unknown')
}
# ASN information # ASN information
if whois_data.get('asn'): if whois_data.get('asn'):
@ -170,7 +118,7 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
if whois_data.get('isp'): if whois_data.get('isp'):
enrichment[f'{ip_field}_isp'] = whois_data['isp'] enrichment[f'{ip_field}_isp'] = whois_data['isp']
# Location (prefix with 'whois' to avoid conflicts with GeoIP) # Location info
if whois_data.get('country'): if whois_data.get('country'):
enrichment[f'{ip_field}_whois_country'] = whois_data['country'] enrichment[f'{ip_field}_whois_country'] = whois_data['country']
if whois_data.get('country_code'): if whois_data.get('country_code'):
@ -180,13 +128,11 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
if whois_data.get('city'): if whois_data.get('city'):
enrichment[f'{ip_field}_whois_city'] = whois_data['city'] enrichment[f'{ip_field}_whois_city'] = whois_data['city']
# Additional metadata # Additional flags
if whois_data.get('timezone'):
enrichment[f'{ip_field}_timezone'] = whois_data['timezone']
if whois_data.get('is_mobile') is not None: if whois_data.get('is_mobile') is not None:
enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile'] enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile']
if whois_data.get('is_proxy') is not None: if whois_data.get('is_proxy') is not None:
enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy'] enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy']
if whois_data.get('is_hosting') is not None: if whois_data.get('is_hosting') is not None:
enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting'] enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting']
@ -194,11 +140,8 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
event.add_tags(['whois-enriched']) event.add_tags(['whois-enriched'])
event.commit() event.commit()
logger.info(f"✅ Enriched event for {ip_address} ({ip_field})")
except Exception as e: except Exception as e:
logger.error(f"💥 Error enriching event for {ip_address}: {e}") logger.error(f"Error enriching event for {ip_address}: {e}")
# Mark as checked to avoid retry loops
try: try:
event.add_attributes({'whois_checked': True, 'whois_error': str(e)}) event.add_attributes({'whois_checked': True, 'whois_error': str(e)})
event.commit() event.commit()
@ -207,35 +150,25 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
def run(self): def run(self):
"""Main analyzer logic.""" """Main analyzer logic."""
logger.info("🚀 Starting WHOIS enrichment analysis") logger.info("Starting WHOIS enrichment analysis")
# Test API connectivity first # Query ALL events with IP fields, ignoring previous processing
test_result = self._get_whois_data("8.8.8.8")
if not test_result:
return "❌ API connectivity test failed - check internet connection"
else:
logger.info(f"✅ API connectivity test passed: {test_result}")
# Query for events with IP fields, excluding already processed ones
ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS] ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS]
query = f'({" OR ".join(ip_queries)}) AND NOT _exists_:whois_checked' query = f'({" OR ".join(ip_queries)})'
logger.info(f"📝 Query: {query}")
events = self.event_stream( events = self.event_stream(
query_string=query, query_string=query,
return_fields=self.IP_FIELDS + ['whois_checked'] return_fields=self.IP_FIELDS
) )
events_processed = 0
enriched_count = 0 enriched_count = 0
try: try:
for event in events: for event in events:
self.stats['events_processed'] += 1 events_processed += 1
# Process IP fields in this event
enriched_this_event = False
# Find first valid IP in this event
for ip_field in self.IP_FIELDS: for ip_field in self.IP_FIELDS:
ip_value = event.source.get(ip_field) ip_value = event.source.get(ip_field)
if not ip_value: if not ip_value:
@ -249,8 +182,6 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
if not self._validate_ip(ip_str): if not self._validate_ip(ip_str):
continue continue
self.stats['valid_ips_found'] += 1
if ip_str not in self.processed_ips: if ip_str not in self.processed_ips:
self.processed_ips.add(ip_str) self.processed_ips.add(ip_str)
@ -260,25 +191,18 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
if whois_data: if whois_data:
self._enrich_event(event, ip_field, ip_str, whois_data) self._enrich_event(event, ip_field, ip_str, whois_data)
enriched_count += 1 enriched_count += 1
enriched_this_event = True break
break # One enrichment per event
# Mark as checked even if no enrichment
if not enriched_this_event:
try:
event.add_attributes({'whois_checked': True, 'whois_no_data': True})
event.commit()
except Exception as e:
logger.error(f"Error marking event: {e}")
# Rate limiting # Rate limiting
if self.stats['events_processed'] % self.batch_size == 0: if events_processed % self.batch_size == 0:
logger.info(f"📊 Progress: {self.stats}")
if self.rate_limit_delay > 0: if self.rate_limit_delay > 0:
time.sleep(self.rate_limit_delay) time.sleep(self.rate_limit_delay)
if events_processed % (self.batch_size * 10) == 0:
logger.info(f"Progress: {events_processed} processed, {enriched_count} enriched")
except Exception as e: except Exception as e:
logger.error(f"💥 Error during processing: {e}", exc_info=True) logger.error(f"Error during processing: {e}")
# Create view if we have enriched events # Create view if we have enriched events
if enriched_count > 0: if enriched_count > 0:
@ -288,20 +212,8 @@ class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
query_string='tag:"whois-enriched"' query_string='tag:"whois-enriched"'
) )
# Final summary logger.info(f"WHOIS analysis complete: {enriched_count}/{events_processed} events enriched")
success_rate = (self.stats['api_successes'] / max(1, self.stats['api_calls'])) * 100 return f"Processed {events_processed} events, enriched {enriched_count} with WHOIS data"
summary = (
f"📈 WHOIS analysis complete: "
f"{enriched_count} events enriched, "
f"{self.stats['valid_ips_found']} valid IPs found, "
f"API success rate: {success_rate:.1f}%"
)
logger.info(summary)
logger.info(f"📊 Final stats: {self.stats}")
return summary
# Register the analyzer # Register the analyzer