timesketch_analyzers/whois_analyzer.py

376 lines
14 KiB
Python

"""Index analyzer plugin for WHOIS data enrichment."""
import ipaddress
import logging
import time
import os
from typing import Dict, Optional, Set
import requests
from flask import current_app
from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager
# Try to import whois library, with fallback handling
try:
import whois
HAS_WHOIS = True
except ImportError:
HAS_WHOIS = False
logger = logging.getLogger("timesketch.analyzers.whois_enrichment")
class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
"""Analyzer to enrich IP addresses with WHOIS data."""
NAME = 'whois_enrichment'
DISPLAY_NAME = 'WHOIS IP Enrichment'
DESCRIPTION = 'Enriches source IP addresses with WHOIS/ASN data'
# Common IP fields to check (same as GeoIP analyzer for consistency)
IP_FIELDS = [
'ip',
'host_ip',
'src_ip',
'dst_ip',
'source_ip',
'dest_ip',
'ip_address',
'client_ip',
'address',
'saddr',
'daddr'
]
def __init__(self, index_name, sketch_id, timeline_id=None):
super().__init__(index_name, sketch_id, timeline_id)
# Configuration options
self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 50)
self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 1.0)
self.max_retries = current_app.config.get('WHOIS_MAX_RETRIES', 2)
self.timeout = current_app.config.get('WHOIS_TIMEOUT', 30)
# Cache to avoid duplicate queries
self.whois_cache: Dict[str, Optional[Dict]] = {}
self.processed_ips: Set[str] = set()
def _validate_ip(self, ip_address: str) -> bool:
"""Validate an IP address for analysis (same logic as GeoIP analyzer).
Args:
ip_address: The IP address to validate
Returns:
True if IP is valid and global (public)
"""
try:
ip = ipaddress.ip_address(ip_address.strip())
return ip.is_global
except (ValueError, AttributeError):
return False
def _get_asn_data_via_api(self, ip_address: str) -> Optional[Dict]:
"""Get ASN data using a free API service as fallback.
Args:
ip_address: IP address to lookup
Returns:
Dictionary with ASN data or None
"""
try:
# Using ip-api.com which has a free tier
# Alternative: ipinfo.io, whoisapi.org, etc.
url = f"http://ip-api.com/json/{ip_address}?fields=as,asname,isp,org,country,regionName,city"
response = requests.get(url, timeout=self.timeout)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'success':
# Parse ASN number from 'as' field (format: "AS15169 Google LLC")
as_info = data.get('as', '')
asn = None
if as_info and as_info.startswith('AS'):
asn = as_info.split()[0][2:] # Remove 'AS' prefix
return {
'asn': asn,
'asn_name': data.get('asname'),
'isp': data.get('isp'),
'organization': data.get('org'),
'country': data.get('country'),
'region': data.get('regionName'),
'city': data.get('city')
}
return None
except Exception as e:
logger.debug(f"API lookup failed for {ip_address}: {e}")
return None
def _get_whois_data_python_whois(self, ip_address: str) -> Optional[Dict]:
"""Get WHOIS data using python-whois library.
Args:
ip_address: IP address to lookup
Returns:
Dictionary with WHOIS data or None
"""
if not HAS_WHOIS:
return None
try:
w = whois.whois(ip_address)
# Extract relevant information
data = {}
# Network information
if hasattr(w, 'nets') and w.nets:
net = w.nets[0] if isinstance(w.nets, list) else w.nets
data['network_name'] = getattr(net, 'name', None)
data['network_range'] = getattr(net, 'range', None)
data['network_type'] = getattr(net, 'type', None)
# ASN information
if hasattr(w, 'asn'):
data['asn'] = w.asn
if hasattr(w, 'asn_description'):
data['asn_description'] = w.asn_description
# Organization information
if hasattr(w, 'org'):
data['organization'] = w.org
if hasattr(w, 'address'):
data['address'] = w.address
if hasattr(w, 'city'):
data['city'] = w.city
if hasattr(w, 'state'):
data['state'] = w.state
if hasattr(w, 'country'):
data['country'] = w.country
# Registration dates
if hasattr(w, 'creation_date'):
data['creation_date'] = str(w.creation_date)
if hasattr(w, 'updated_date'):
data['updated_date'] = str(w.updated_date)
return data if data else None
except Exception as e:
logger.debug(f"Python-whois lookup failed for {ip_address}: {e}")
return None
def _get_whois_data(self, ip_address: str) -> Optional[Dict]:
"""Get WHOIS data for an IP address using available methods.
Args:
ip_address: IP address to lookup
Returns:
Dictionary with WHOIS data or None
"""
if ip_address in self.whois_cache:
return self.whois_cache[ip_address]
whois_data = None
# Try python-whois first if available
if HAS_WHOIS:
whois_data = self._get_whois_data_python_whois(ip_address)
# Fallback to API if python-whois failed or unavailable
if not whois_data:
whois_data = self._get_asn_data_via_api(ip_address)
# Cache the result (even if None)
self.whois_cache[ip_address] = whois_data
return whois_data
def _enrich_event(self, event, ip_field: str, whois_data: Dict):
"""Add WHOIS data to the event.
Args:
event: The event object to enrich
ip_field: The field name containing the IP address
whois_data: Dictionary with WHOIS data
"""
try:
# Create enrichment attributes with field-specific naming
enrichment = {'whois_checked': True}
# Add ASN information
if whois_data.get('asn'):
enrichment[f'{ip_field}_asn'] = whois_data['asn']
if whois_data.get('asn_name') or whois_data.get('asn_description'):
asn_name = whois_data.get('asn_name') or whois_data.get('asn_description')
enrichment[f'{ip_field}_asn_name'] = asn_name
# Add organization information
if whois_data.get('organization'):
enrichment[f'{ip_field}_organization'] = whois_data['organization']
if whois_data.get('isp'):
enrichment[f'{ip_field}_isp'] = whois_data['isp']
# Add network information
if whois_data.get('network_name'):
enrichment[f'{ip_field}_network_name'] = whois_data['network_name']
if whois_data.get('network_range'):
enrichment[f'{ip_field}_network_range'] = whois_data['network_range']
if whois_data.get('network_type'):
enrichment[f'{ip_field}_network_type'] = whois_data['network_type']
# Add location information (if not covered by GeoIP)
if whois_data.get('country'):
enrichment[f'{ip_field}_whois_country'] = whois_data['country']
if whois_data.get('region'):
enrichment[f'{ip_field}_whois_region'] = whois_data['region']
if whois_data.get('city'):
enrichment[f'{ip_field}_whois_city'] = whois_data['city']
# Add registration dates
if whois_data.get('creation_date'):
enrichment[f'{ip_field}_creation_date'] = whois_data['creation_date']
if whois_data.get('updated_date'):
enrichment[f'{ip_field}_updated_date'] = whois_data['updated_date']
event.add_attributes(enrichment)
event.add_tags(['whois-enriched'])
event.commit()
except Exception as e:
logger.error(f"Error enriching event for {ip_field}: {e}")
# Still mark as checked to avoid reprocessing
try:
event.add_attributes({'whois_checked': True, 'whois_error': str(e)})
event.commit()
except Exception as commit_error:
logger.error(f"Error marking event as checked: {commit_error}")
def run(self):
"""Main analyzer logic."""
logger.info("Starting WHOIS enrichment analysis")
# Build query for events with IP fields that haven't been checked
ip_exists_queries = [f'_exists_:{field}' for field in self.IP_FIELDS]
query = f'({" OR ".join(ip_exists_queries)}) AND NOT _exists_:whois_checked'
events = self.event_stream(
query_string=query,
return_fields=self.IP_FIELDS + ['whois_checked']
)
total_processed = 0
enriched_count = 0
try:
current_batch = []
for event in events:
current_batch.append(event)
if len(current_batch) >= self.batch_size:
processed, enriched = self._process_batch(current_batch)
total_processed += processed
enriched_count += enriched
current_batch = []
# Rate limiting
if self.rate_limit_delay > 0:
time.sleep(self.rate_limit_delay)
# Log progress
if total_processed % (self.batch_size * 5) == 0:
logger.info(f"Progress: {total_processed} processed, {enriched_count} enriched")
# Process remaining events
if current_batch:
processed, enriched = self._process_batch(current_batch)
total_processed += processed
enriched_count += enriched
except Exception as e:
logger.error(f"Error during WHOIS processing: {e}")
# Create a view if we enriched any events
if enriched_count > 0:
self.sketch.add_view(
view_name="WHOIS Enriched Events",
analyzer_name=self.NAME,
query_string='tag:"whois-enriched"'
)
logger.info(f"WHOIS enrichment complete: {total_processed} processed, {enriched_count} enriched")
return f"Processed {total_processed} events, enriched {enriched_count} with WHOIS data"
def _process_batch(self, events):
"""Process a batch of events.
Args:
events: List of events to process
Returns:
Tuple of (processed_count, enriched_count)
"""
processed_count = 0
enriched_count = 0
for event in events:
processed_count += 1
# Check each IP field in the event
for ip_field in self.IP_FIELDS:
ip_value = event.source.get(ip_field)
if not ip_value:
continue
# Handle both single IP and list of IPs
if isinstance(ip_value, str):
ip_addresses = [ip_value]
else:
ip_addresses = ip_value if isinstance(ip_value, list) else [str(ip_value)]
for ip_addr in ip_addresses:
if not self._validate_ip(ip_addr):
continue
if ip_addr in self.processed_ips:
continue
self.processed_ips.add(ip_addr)
# Get WHOIS data
whois_data = self._get_whois_data(ip_addr)
if whois_data:
self._enrich_event(event, ip_field, whois_data)
enriched_count += 1
logger.debug(f"Enriched {ip_addr} with WHOIS data")
else:
# Mark as checked even if no data found
event.add_attributes({'whois_checked': True, 'whois_no_data': True})
event.commit()
logger.debug(f"No WHOIS data for {ip_addr}")
# Break after first successful IP processing to avoid duplicate enrichment
break
else:
continue
break
# If no valid IPs found, still mark as checked
if not any(event.source.get(field) for field in self.IP_FIELDS):
event.add_attributes({'whois_checked': True})
event.commit()
return processed_count, enriched_count
# Register the analyzer
manager.AnalysisManager.register_analyzer(WhoisEnrichmentAnalyzer)