217 lines
8.8 KiB
Python
217 lines
8.8 KiB
Python
"""Index analyzer plugin for WHOIS data enrichment - Production version."""
|
|
|
|
import ipaddress
|
|
import logging
|
|
import time
|
|
from typing import Dict, Optional, Set
|
|
|
|
import requests
|
|
from flask import current_app
|
|
|
|
from timesketch.lib.analyzers import interface
|
|
from timesketch.lib.analyzers import manager
|
|
|
|
logger = logging.getLogger("timesketch.analyzers.whois_enrichment")
|
|
|
|
|
|
class WhoisEnrichmentAnalyzer(interface.BaseAnalyzer):
|
|
"""Analyzer to enrich IP addresses with WHOIS/ASN data."""
|
|
|
|
NAME = 'whois_enrichment'
|
|
DISPLAY_NAME = 'WHOIS IP Enrichment'
|
|
DESCRIPTION = 'Enriches IP addresses with ASN/WHOIS data via APIs'
|
|
|
|
IP_FIELDS = [
|
|
'ip', 'host_ip', 'src_ip', 'dst_ip', 'source_ip', 'dest_ip',
|
|
'ip_address', 'client_ip', 'address', 'saddr', 'daddr'
|
|
]
|
|
|
|
def __init__(self, index_name, sketch_id, timeline_id=None):
|
|
super().__init__(index_name, sketch_id, timeline_id)
|
|
|
|
self.batch_size = current_app.config.get('WHOIS_BATCH_SIZE', 25)
|
|
self.rate_limit_delay = current_app.config.get('WHOIS_RATE_LIMIT_DELAY', 2.0)
|
|
self.timeout = current_app.config.get('WHOIS_TIMEOUT', 10)
|
|
|
|
self.whois_cache: Dict[str, Optional[Dict]] = {}
|
|
|
|
def _validate_ip(self, ip_address: str) -> bool:
|
|
"""Validate IP address."""
|
|
try:
|
|
ip = ipaddress.ip_address(ip_address.strip())
|
|
return not (ip.is_loopback or ip.is_multicast or ip.is_link_local)
|
|
except (ValueError, AttributeError):
|
|
return False
|
|
|
|
def _get_whois_data(self, ip_address: str) -> Optional[Dict]:
|
|
"""Get WHOIS data via API."""
|
|
if ip_address in self.whois_cache:
|
|
return self.whois_cache[ip_address]
|
|
|
|
try:
|
|
fields = "status,country,countryCode,region,regionName,city,isp,org,as,asname,mobile,proxy,hosting"
|
|
url = f"http://ip-api.com/json/{ip_address}?fields={fields}"
|
|
|
|
response = requests.get(url, timeout=self.timeout)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
|
|
if data.get('status') == 'success':
|
|
# Parse ASN from 'as' field
|
|
as_info = data.get('as', '')
|
|
asn = None
|
|
asn_description = None
|
|
|
|
if as_info and as_info.startswith('AS'):
|
|
parts = as_info.split(' ', 1)
|
|
asn = parts[0][2:]
|
|
if len(parts) > 1:
|
|
asn_description = parts[1]
|
|
|
|
result = {
|
|
'asn': asn,
|
|
'asn_description': asn_description,
|
|
'asn_name': data.get('asname'),
|
|
'isp': data.get('isp'),
|
|
'organization': data.get('org'),
|
|
'country': data.get('country'),
|
|
'country_code': data.get('countryCode'),
|
|
'region': data.get('regionName'),
|
|
'city': data.get('city'),
|
|
'is_mobile': data.get('mobile'),
|
|
'is_proxy': data.get('proxy'),
|
|
'is_hosting': data.get('hosting')
|
|
}
|
|
|
|
# Remove None values
|
|
result = {k: v for k, v in result.items() if v is not None}
|
|
|
|
self.whois_cache[ip_address] = result
|
|
return result
|
|
|
|
self.whois_cache[ip_address] = None
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"API error for {ip_address}: {e}")
|
|
self.whois_cache[ip_address] = None
|
|
return None
|
|
|
|
def _enrich_event(self, event, ip_field: str, ip_address: str, whois_data: Dict):
|
|
"""Add WHOIS data to event."""
|
|
try:
|
|
enrichment = {'whois_checked': True}
|
|
|
|
# ASN information
|
|
if whois_data.get('asn'):
|
|
enrichment[f'{ip_field}_asn'] = whois_data['asn']
|
|
if whois_data.get('asn_name'):
|
|
enrichment[f'{ip_field}_asn_name'] = whois_data['asn_name']
|
|
if whois_data.get('asn_description'):
|
|
enrichment[f'{ip_field}_asn_description'] = whois_data['asn_description']
|
|
|
|
# Organization info
|
|
if whois_data.get('organization'):
|
|
enrichment[f'{ip_field}_organization'] = whois_data['organization']
|
|
if whois_data.get('isp'):
|
|
enrichment[f'{ip_field}_isp'] = whois_data['isp']
|
|
|
|
# Location info
|
|
if whois_data.get('country'):
|
|
enrichment[f'{ip_field}_whois_country'] = whois_data['country']
|
|
if whois_data.get('country_code'):
|
|
enrichment[f'{ip_field}_whois_country_code'] = whois_data['country_code']
|
|
if whois_data.get('region'):
|
|
enrichment[f'{ip_field}_whois_region'] = whois_data['region']
|
|
if whois_data.get('city'):
|
|
enrichment[f'{ip_field}_whois_city'] = whois_data['city']
|
|
|
|
# Additional flags
|
|
if whois_data.get('is_mobile') is not None:
|
|
enrichment[f'{ip_field}_is_mobile'] = whois_data['is_mobile']
|
|
if whois_data.get('is_proxy') is not None:
|
|
enrichment[f'{ip_field}_is_proxy'] = whois_data['is_proxy']
|
|
if whois_data.get('is_hosting') is not None:
|
|
enrichment[f'{ip_field}_is_hosting'] = whois_data['is_hosting']
|
|
|
|
event.add_attributes(enrichment)
|
|
event.add_tags(['whois-enriched'])
|
|
event.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error enriching event for {ip_address}: {e}")
|
|
try:
|
|
event.add_attributes({'whois_checked': True, 'whois_error': str(e)})
|
|
event.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
def run(self):
|
|
"""Main analyzer logic."""
|
|
logger.info("Starting WHOIS enrichment analysis")
|
|
|
|
# Query ALL events with IP fields, ignoring previous processing
|
|
ip_queries = [f'_exists_:{field}' for field in self.IP_FIELDS]
|
|
query = f'({" OR ".join(ip_queries)})'
|
|
|
|
events = self.event_stream(
|
|
query_string=query,
|
|
return_fields=self.IP_FIELDS
|
|
)
|
|
|
|
events_processed = 0
|
|
enriched_count = 0
|
|
|
|
try:
|
|
for event in events:
|
|
events_processed += 1
|
|
|
|
# Find first valid IP in this event and enrich it
|
|
for ip_field in self.IP_FIELDS:
|
|
ip_value = event.source.get(ip_field)
|
|
if not ip_value:
|
|
continue
|
|
|
|
# Handle string or list of IPs
|
|
ip_list = [ip_value] if isinstance(ip_value, str) else (ip_value if isinstance(ip_value, list) else [])
|
|
|
|
for ip_addr in ip_list:
|
|
ip_str = str(ip_addr).strip()
|
|
|
|
if not self._validate_ip(ip_str):
|
|
continue
|
|
|
|
# Process EVERY IP, no duplicate checking
|
|
whois_data = self._get_whois_data(ip_str) # Uses cache to avoid duplicate API calls
|
|
|
|
if whois_data:
|
|
self._enrich_event(event, ip_field, ip_str, whois_data)
|
|
enriched_count += 1
|
|
break # Only enrich first valid IP per event
|
|
|
|
# Rate limiting
|
|
if events_processed % self.batch_size == 0:
|
|
if self.rate_limit_delay > 0:
|
|
time.sleep(self.rate_limit_delay)
|
|
|
|
if events_processed % (self.batch_size * 10) == 0:
|
|
logger.info(f"Progress: {events_processed} processed, {enriched_count} enriched")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during processing: {e}")
|
|
|
|
# Create view if we have enriched events
|
|
if enriched_count > 0:
|
|
self.sketch.add_view(
|
|
view_name="WHOIS Enriched Events",
|
|
analyzer_name=self.NAME,
|
|
query_string='tag:"whois-enriched"'
|
|
)
|
|
|
|
logger.info(f"WHOIS analysis complete: {enriched_count}/{events_processed} events enriched")
|
|
return f"Processed {events_processed} events, enriched {enriched_count} with WHOIS data"
|
|
|
|
|
|
# Register the analyzer
|
|
manager.AnalysisManager.register_analyzer(WhoisEnrichmentAnalyzer) |