misp_analyzer.py aktualisiert

This commit is contained in:
Mario Stöckl 2025-07-30 13:13:01 +00:00
parent 7404c0ee8d
commit 59ea0dd658

View File

@ -1,27 +1,30 @@
"""Index analyzer plugin for MISP - Enhanced for large-scale processing.""" """Index analyzer plugin for MISP"""
import logging import logging
import ntpath import ntpath
import re import re
import requests import requests
import time import time
import json
from collections import defaultdict from collections import defaultdict
from typing import List, Dict, Set, Any from typing import List, Dict, Set, Any, Optional
from urllib3.exceptions import InsecureRequestWarning
from flask import current_app from flask import current_app
from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager from timesketch.lib.analyzers import manager
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
logger = logging.getLogger("timesketch.analyzers.misp") logger = logging.getLogger("timesketch.analyzers.misp")
class MispAnalyzer(interface.BaseAnalyzer): class MispAnalyzer(interface.BaseAnalyzer):
"""Enhanced Analyzer for MISP with large-scale processing capabilities.""" """Ultra-reliable MISP Analyzer for large-scale processing."""
NAME = "misp_analyzer" NAME = "misp_analyzer"
DISPLAY_NAME = "MISP Enhanced" DISPLAY_NAME = "MISP Enhanced"
DESCRIPTION = "Mark events using MISP with cross-org and large-scale support" DESCRIPTION = "Mark events using MISP with ultra-reliable large-scale support"
def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs):
"""Initialize the Analyzer.""" """Initialize the Analyzer."""
@ -34,209 +37,241 @@ class MispAnalyzer(interface.BaseAnalyzer):
self._attr = kwargs.get("attr") self._attr = kwargs.get("attr")
self._timesketch_attr = kwargs.get("timesketch_attr") self._timesketch_attr = kwargs.get("timesketch_attr")
# Enhanced configuration
self.include_community = kwargs.get("include_community", True) self.include_community = kwargs.get("include_community", True)
self.batch_size = kwargs.get("batch_size", 100) # Process events in batches self.chunk_size = kwargs.get("chunk_size", 500)
self.api_batch_size = kwargs.get("api_batch_size", 50) # API call batching self.max_retries = kwargs.get("max_retries", 5)
self.max_retries = kwargs.get("max_retries", 3) self.base_timeout = kwargs.get("base_timeout", 30)
self.request_timeout = kwargs.get("request_timeout", 120) # 2 minutes self.max_timeout = kwargs.get("max_timeout", 180)
self.chunk_size = kwargs.get("chunk_size", 1000) # Memory management self.request_delay = kwargs.get("request_delay", 1.0)
self.max_indicators_per_batch = kwargs.get("max_indicators_per_batch", 10)
# Regex patterns
self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
# Track processed items to prevent duplicates # Tracking sets
self.marked_events = set() self.marked_events = set()
self.processed_indicators = set() self.processed_indicators = set()
self.failed_indicators = set()
# Statistics
self.stats = { self.stats = {
'events_processed': 0, 'events_processed': 0,
'indicators_extracted': 0, 'indicators_extracted': 0,
'api_calls_made': 0, 'api_calls_successful': 0,
'api_calls_failed': 0,
'events_marked': 0, 'events_marked': 0,
'errors': 0 'total_matches': 0,
'timeouts': 0,
'retries': 0
} }
# Session for connection reuse
self.session = requests.Session()
self.session.verify = False
self.session.headers.update({
"Authorization": self.misp_api_key,
"Content-Type": "application/json",
"User-Agent": "Timesketch-MISP-Analyzer/1.0"
})
@staticmethod @staticmethod
def get_kwargs(): def get_kwargs():
"""Get kwargs for the analyzer with enhanced options.""" """Get kwargs for the analyzer with ultra-reliable settings."""
base_config = {
"include_community": True,
"chunk_size": 500,
"max_retries": 5,
"base_timeout": 30,
"request_delay": 1.0,
"max_indicators_per_batch": 10,
}
to_query = [ to_query = [
{ {
"query_string": "md5_hash:*", "query_string": "md5_hash:*",
"attr": "md5", "attr": "md5",
"timesketch_attr": "md5_hash", "timesketch_attr": "md5_hash",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
{ {
"query_string": "sha1_hash:*", "query_string": "sha1_hash:*",
"attr": "sha1", "attr": "sha1",
"timesketch_attr": "sha1_hash", "timesketch_attr": "sha1_hash",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
{ {
"query_string": "sha256_hash:*", "query_string": "sha256_hash:*",
"attr": "sha256", "attr": "sha256",
"timesketch_attr": "sha256_hash", "timesketch_attr": "sha256_hash",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
{ {
"query_string": "filename:*", "query_string": "filename:*",
"attr": "filename", "attr": "filename",
"timesketch_attr": "filename", "timesketch_attr": "filename",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
{ {
"query_string": "message:*", "query_string": "message:*",
"attr": "ip", "attr": "ip",
"timesketch_attr": "message", "timesketch_attr": "message",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
{ {
"query_string": "source_ip:* OR src_ip:* OR client_ip:*", "query_string": "source_ip:* OR src_ip:* OR client_ip:*",
"attr": "ip", "attr": "ip",
"timesketch_attr": "source_ip", "timesketch_attr": "source_ip",
"include_community": True, **base_config
"batch_size": 100,
"api_batch_size": 50,
}, },
] ]
return to_query return to_query
def _is_valid_ip(self, ip_str: str) -> bool: def _is_valid_ip(self, ip_str: str) -> bool:
"""Validate IP address with enhanced filtering.""" """Enhanced IP validation for nginx logs."""
try: try:
import ipaddress import ipaddress
ip_str = ip_str.strip() ip_str = ip_str.strip()
ip_obj = ipaddress.ip_address(ip_str)
# Filter out private, loopback, and other non-routable IPs # Basic format check first
if (ip_obj.is_private or ip_obj.is_loopback or if not re.match(r'^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$', ip_str):
ip_obj.is_multicast or ip_obj.is_reserved or
ip_obj.is_link_local):
return False return False
# Additional nginx log specific filters ip_obj = ipaddress.ip_address(ip_str)
if ip_str.startswith(('0.', '255.255.255.255', '169.254.')):
# Filter out non-routable addresses
if (ip_obj.is_private or ip_obj.is_loopback or
ip_obj.is_multicast or ip_obj.is_reserved or
ip_obj.is_link_local or ip_obj.is_unspecified):
return False
# Nginx-specific filters
if (ip_str.startswith(('0.', '10.', '172.', '192.168.', '127.', '169.254.', '224.')) or
ip_str in ['255.255.255.255', '0.0.0.0']):
return False return False
return True return True
except (ValueError, AttributeError): except (ValueError, AttributeError, TypeError):
return False return False
def _is_valid_hash(self, hash_str: str, hash_type: str) -> bool: def _is_valid_hash(self, hash_str: str, hash_type: str) -> bool:
"""Validate hash format.""" """Validate hash format with strict checking."""
if not hash_str: if not hash_str or not isinstance(hash_str, str):
return False return False
hash_str = hash_str.strip().lower() hash_str = hash_str.strip().lower()
# Check for obvious non-hash patterns
if not hash_str or hash_str in ['null', 'none', '0', '-']:
return False
hash_lengths = {"md5": 32, "sha1": 40, "sha256": 64} hash_lengths = {"md5": 32, "sha1": 40, "sha256": 64}
expected_length = hash_lengths.get(hash_type) expected_length = hash_lengths.get(hash_type)
if not expected_length: if not expected_length or len(hash_str) != expected_length:
return False return False
return (len(hash_str) == expected_length and return all(c in '0123456789abcdef' for c in hash_str)
all(c in '0123456789abcdef' for c in hash_str))
def _make_misp_request(self, payload: Dict[str, Any], retry_count: int = 0) -> List[Dict]: def _calculate_payload_size(self, payload: Dict[str, Any]) -> int:
"""Make MISP API request with retry logic.""" """Calculate approximate payload size in bytes."""
try: try:
response = requests.post( return len(json.dumps(payload).encode('utf-8'))
f"{self.misp_url}/attributes/restSearch/", except:
json=payload, return 0
headers={"Authorization": self.misp_api_key},
verify=False,
timeout=self.request_timeout,
)
if response.status_code == 200: def _make_misp_request_single(self, indicator: str, attr_type: str, retry_count: int = 0) -> List[Dict]:
data = response.json() """Make single indicator MISP request with progressive timeout."""
return data.get("response", {}).get("Attribute", []) timeout = min(self.base_timeout + (retry_count * 10), self.max_timeout)
elif response.status_code == 429: # Rate limited
wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60s
logger.warning(f"Rate limited, waiting {wait_time}s before retry")
time.sleep(wait_time)
raise requests.exceptions.RequestException("Rate limited")
else:
logger.warning(f"MISP API returned status {response.status_code}")
return []
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: # Determine search types
if retry_count < self.max_retries: search_types = ["ip-src", "ip-dst"] if attr_type == "ip" else [attr_type]
wait_time = min(2 ** retry_count * 5, 120) # Exponential backoff results = []
logger.warning(f"Request failed (attempt {retry_count + 1}), retrying in {wait_time}s: {e}")
time.sleep(wait_time)
return self._make_misp_request(payload, retry_count + 1)
else:
logger.error(f"Request failed after {self.max_retries} retries: {e}")
self.stats['errors'] += 1
return []
except Exception as e:
logger.error(f"Unexpected error in MISP request: {e}")
self.stats['errors'] += 1
return []
def query_misp_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]:
"""Query MISP for multiple indicators efficiently."""
results = defaultdict(list)
# Determine search types based on attribute
if attr == "ip":
search_types = ["ip-src", "ip-dst"]
else:
search_types = [attr]
for search_type in search_types: for search_type in search_types:
# Batch indicators to reduce API calls try:
for i in range(0, len(indicators), self.api_batch_size): # Build minimal payload
batch = indicators[i:i + self.api_batch_size] distribution_levels = [0] # Start with own org only
# Build payload with distribution settings
distribution_levels = [0] # Own org
if self.include_community: if self.include_community:
distribution_levels.extend([1, 2]) # Community and connected orgs distribution_levels.extend([1, 2])
payload = { payload = {
"returnFormat": "json", "returnFormat": "json",
"value": batch, "value": indicator,
"type": search_type, "type": search_type,
"enforceWarninglist": False, "enforceWarninglist": False,
"includeDecayScore": False, "includeDecayScore": False,
"includeFullModel": False, "includeFullModel": False,
"excludeDecayed": False,
"distribution": distribution_levels, "distribution": distribution_levels,
"limit": 10000, # High limit for large datasets "limit": 100, # Conservative limit
} }
self.stats['api_calls_made'] += 1 payload_size = self._calculate_payload_size(payload)
logger.info(f"Querying MISP for {len(batch)} {search_type} indicators (call #{self.stats['api_calls_made']})") logger.debug(f"Querying {indicator} ({search_type}) - payload: {payload_size} bytes, timeout: {timeout}s")
batch_results = self._make_misp_request(payload) response = self.session.post(
f"{self.misp_url}/attributes/restSearch/",
json=payload,
timeout=timeout,
)
# Group results by indicator value if response.status_code == 200:
for result in batch_results: data = response.json()
indicator_value = result.get("value", "").strip() attributes = data.get("response", {}).get("Attribute", [])
if indicator_value in batch: results.extend(attributes)
results[indicator_value].append(result) self.stats['api_calls_successful'] += 1
# Rate limiting courtesy pause elif response.status_code == 429: # Rate limited
time.sleep(0.5) wait_time = min(5 * (retry_count + 1), 30)
logger.warning(f"Rate limited for {indicator}, waiting {wait_time}s")
time.sleep(wait_time)
raise requests.exceptions.RequestException("Rate limited")
return dict(results) elif response.status_code >= 500: # Server error
logger.warning(f"Server error {response.status_code} for {indicator}")
if retry_count < self.max_retries:
raise requests.exceptions.RequestException(f"Server error {response.status_code}")
else:
logger.debug(f"No results for {indicator} ({search_type}): status {response.status_code}")
# Delay between search types
time.sleep(0.2)
except (requests.exceptions.Timeout, TimeoutError) as e:
self.stats['timeouts'] += 1
if retry_count < self.max_retries:
wait_time = min(2 ** retry_count, 30)
logger.warning(f"Timeout for {indicator} (attempt {retry_count + 1}/{self.max_retries}), retrying in {wait_time}s")
time.sleep(wait_time)
self.stats['retries'] += 1
return self._make_misp_request_single(indicator, attr_type, retry_count + 1)
else:
logger.error(f"Max retries exceeded for {indicator}: {e}")
self.stats['api_calls_failed'] += 1
self.failed_indicators.add(indicator)
return []
except requests.exceptions.ConnectionError as e:
self.stats['api_calls_failed'] += 1
if retry_count < self.max_retries:
wait_time = min(5 * (retry_count + 1), 60)
logger.warning(f"Connection error for {indicator} (attempt {retry_count + 1}), retrying in {wait_time}s: {e}")
time.sleep(wait_time)
self.stats['retries'] += 1
return self._make_misp_request_single(indicator, attr_type, retry_count + 1)
else:
logger.error(f"Connection failed permanently for {indicator}: {e}")
self.failed_indicators.add(indicator)
return []
except Exception as e:
logger.error(f"Unexpected error querying {indicator}: {e}")
self.stats['api_calls_failed'] += 1
return []
return results
def extract_indicators_from_event(self, event: Any, attr: str, timesketch_attr: str) -> List[str]: def extract_indicators_from_event(self, event: Any, attr: str, timesketch_attr: str) -> List[str]:
"""Extract indicators from a single event.""" """Extract and validate indicators from event."""
try:
loc = event.source.get(timesketch_attr) loc = event.source.get(timesketch_attr)
if not loc: if not loc:
return [] return []
@ -244,28 +279,32 @@ class MispAnalyzer(interface.BaseAnalyzer):
indicators = [] indicators = []
loc_str = str(loc) loc_str = str(loc)
if attr == "ip" and timesketch_attr == "message": if attr == "ip":
if timesketch_attr == "message":
# Extract IPs from nginx access log messages # Extract IPs from nginx access log messages
ip_matches = self.ip_pattern.findall(loc_str) ip_matches = self.ip_pattern.findall(loc_str)
indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)]
elif timesketch_attr in ["source_ip", "src_ip", "client_ip"]:
elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]:
if self._is_valid_ip(loc_str): if self._is_valid_ip(loc_str):
indicators = [loc_str] indicators = [loc_str]
elif attr in ["md5", "sha1", "sha256"]: elif attr in ["md5", "sha1", "sha256"]:
if self._is_valid_hash(loc_str, attr): if self._is_valid_hash(loc_str, attr):
indicators = [loc_str] indicators = [loc_str.lower().strip()]
elif attr == "filename": elif attr == "filename":
filename = ntpath.basename(loc_str) filename = ntpath.basename(loc_str).strip()
if filename and len(filename) > 3: # Meaningful filename if filename and len(filename) > 3 and '.' in filename:
indicators = [filename] indicators = [filename]
return indicators return indicators
def mark_event_with_intel(self, event: Any, misp_results: List[Dict], attr: str) -> None: except Exception as e:
"""Mark event with MISP intelligence.""" logger.debug(f"Error extracting indicators from event: {e}")
return []
def mark_event_with_intelligence(self, event: Any, misp_results: List[Dict], attr: str) -> None:
"""Mark event with MISP intelligence information."""
try: try:
event_id = event.source.get('_id', '') event_id = event.source.get('_id', '')
if event_id in self.marked_events: if event_id in self.marked_events:
@ -273,115 +312,105 @@ class MispAnalyzer(interface.BaseAnalyzer):
self.marked_events.add(event_id) self.marked_events.add(event_id)
# Build comprehensive message # Build intelligence message
if attr == "ip": if attr == "ip":
msg = "MISP: Malicious IP detected" msg_prefix = "MISP: Threat IP"
elif attr in ["md5", "sha1", "sha256"]:
msg_prefix = f"MISP: Malicious {attr.upper()}"
else: else:
msg = f"MISP: Known {attr.upper()} indicator" msg_prefix = f"MISP: Known {attr.upper()}"
# Collect event and organization info # Extract key information
events_info = {} events_info = []
orgs_info = set() orgs_info = set()
threat_levels = set() threat_levels = []
for misp_attr in misp_results: for misp_attr in misp_results[:3]: # Limit to first 3 for message clarity
event_info = misp_attr.get("Event", {}) event_info = misp_attr.get("Event", {})
event_id_misp = event_info.get("id", "") event_desc = event_info.get("info", "Unknown")[:40] # Truncate
event_desc = event_info.get("info", "Unknown")
org_name = event_info.get("Orgc", {}).get("name", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown")
threat_level = event_info.get("threat_level_id", "") threat_level = event_info.get("threat_level_id")
events_info[event_id_misp] = event_desc[:50] # Truncate long descriptions if event_desc and event_desc != "Unknown":
events_info.append(event_desc)
if org_name and org_name != "Unknown":
orgs_info.add(org_name) orgs_info.add(org_name)
if threat_level: if threat_level:
threat_levels.add(threat_level) threat_levels.append(int(threat_level))
# Enhanced message with threat context # Build comprehensive message
event_descriptions = list(events_info.values())[:2] msg_parts = [msg_prefix]
if event_descriptions:
msg += f" | Events: {' | '.join(event_descriptions)}"
if len(misp_results) > 2: if events_info:
msg += f" | +{len(misp_results)-2} more indicators" msg_parts.append(f"Events: {' | '.join(events_info[:2])}")
if len(misp_results) > 3:
msg_parts.append(f"({len(misp_results)} total matches)")
# Organization information
if len(orgs_info) > 1: if len(orgs_info) > 1:
msg += f" | Sources: {', '.join(list(orgs_info)[:3])}" msg_parts.append(f"Sources: {', '.join(list(orgs_info)[:2])}")
elif orgs_info and list(orgs_info)[0] != "Unknown": elif orgs_info and list(orgs_info)[0] != "Unknown":
msg += f" | Source: {list(orgs_info)[0]}" msg_parts.append(f"Source: {list(orgs_info)[0]}")
# Threat level context
if threat_levels: if threat_levels:
highest_threat = min(threat_levels) # Lower number = higher threat min_threat = min(threat_levels) # Lower = higher threat
threat_map = {"1": "HIGH", "2": "MEDIUM", "3": "LOW", "4": "UNDEFINED"} threat_names = {1: "HIGH", 2: "MEDIUM", 3: "LOW", 4: "UNDEFINED"}
msg += f" | Threat: {threat_map.get(str(highest_threat), 'UNKNOWN')}" msg_parts.append(f"Threat: {threat_names.get(min_threat, 'UNKNOWN')}")
# Add tags and comment final_message = " | ".join(msg_parts)
# Add tags
tags = [f"MISP-{attr}", "threat-intel"] tags = [f"MISP-{attr}", "threat-intel"]
if self.include_community and len(orgs_info) > 1: if self.include_community and len(orgs_info) > 1:
tags.append("cross-org-intel") tags.append("cross-org-intel")
event.add_comment(msg) event.add_comment(final_message)
event.add_tags(tags) event.add_tags(tags)
event.commit() event.commit()
self.stats['events_marked'] += 1 self.stats['events_marked'] += 1
self.stats['total_matches'] += len(misp_results)
except Exception as e: except Exception as e:
logger.error(f"Error marking event {event_id}: {e}") logger.error(f"Error marking event: {e}")
self.stats['errors'] += 1
def process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None: def process_indicators_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]:
"""Process a chunk of events efficiently.""" """Process indicators with careful rate limiting."""
# Extract all indicators from the chunk results = {}
chunk_indicators = []
event_to_indicators = {}
for event in events_chunk: for i, indicator in enumerate(indicators):
indicators = self.extract_indicators_from_event(event, attr, timesketch_attr) if indicator in self.failed_indicators:
if indicators:
event_id = event.source.get('_id', '')
event_to_indicators[event_id] = (event, indicators)
chunk_indicators.extend(indicators)
# Remove duplicates while preserving order
unique_indicators = list(dict.fromkeys(chunk_indicators))
new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators]
if not new_indicators:
return
logger.info(f"Processing {len(new_indicators)} new indicators from chunk of {len(events_chunk)} events")
# Query MISP for new indicators
misp_results = self.query_misp_batch(new_indicators, attr)
# Update processed indicators and result cache
self.processed_indicators.update(new_indicators)
for indicator, results in misp_results.items():
if results:
self.result_dict[f"{attr}:{indicator}"] = results
# Mark events that have matching indicators
for event_id, (event, indicators) in event_to_indicators.items():
if event_id in self.marked_events:
continue continue
matching_results = [] logger.debug(f"Processing indicator {i+1}/{len(indicators)}: {indicator}")
for indicator in indicators:
key = f"{attr}:{indicator}"
if key in self.result_dict:
matching_results.extend(self.result_dict[key])
if matching_results: misp_results = self._make_misp_request_single(indicator, attr)
self.mark_event_with_intel(event, matching_results, attr)
if misp_results:
results[indicator] = misp_results
logger.info(f"MISP hit: {indicator} ({len(misp_results)} matches)")
# Rate limiting between requests
time.sleep(self.request_delay)
# Progress update every 50 indicators
if (i + 1) % 50 == 0:
logger.info(f"Processed {i+1}/{len(indicators)} indicators, "
f"{len(results)} hits, "
f"{self.stats['timeouts']} timeouts, "
f"{self.stats['api_calls_failed']} failures")
return results
def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None: def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None:
"""Main processing function with chunked approach for large datasets.""" """Main processing with ultra-reliable chunked approach."""
logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Starting ultra-reliable MISP analysis for {attr} in {timesketch_attr}")
logger.info(f"Community querying: {'enabled' if self.include_community else 'disabled'}") logger.info(f"Configuration: chunk_size={self.chunk_size}, "
f"max_retries={self.max_retries}, "
f"request_delay={self.request_delay}s, "
f"include_community={self.include_community}")
# Process events in chunks to manage memory # Process events in chunks
events_stream = self.event_stream( events_stream = self.event_stream(
query_string=query, query_string=query,
return_fields=[timesketch_attr, '_id', 'timestamp'] return_fields=[timesketch_attr, '_id', 'timestamp']
@ -394,71 +423,119 @@ class MispAnalyzer(interface.BaseAnalyzer):
current_chunk.append(event) current_chunk.append(event)
self.stats['events_processed'] += 1 self.stats['events_processed'] += 1
# Process chunk when it reaches the specified size # Process when chunk is full
if len(current_chunk) >= self.chunk_size: if len(current_chunk) >= self.chunk_size:
self.process_events_chunk(current_chunk, attr, timesketch_attr) self._process_events_chunk(current_chunk, attr, timesketch_attr)
current_chunk = [] current_chunk = []
# Progress logging # Progress logging
if self.stats['events_processed'] % 10000 == 0: if self.stats['events_processed'] % 5000 == 0:
logger.info(f"Progress: {self.stats['events_processed']} events processed, " success_rate = (self.stats['api_calls_successful'] /
max(1, self.stats['api_calls_successful'] + self.stats['api_calls_failed']) * 100)
logger.info(f"Progress: {self.stats['events_processed']} events, "
f"{self.stats['events_marked']} marked, " f"{self.stats['events_marked']} marked, "
f"{self.stats['api_calls_made']} API calls made") f"{len(self.processed_indicators)} indicators processed, "
f"{success_rate:.1f}% API success rate")
# Process remaining events in the last chunk # Process final chunk
if current_chunk: if current_chunk:
self.process_events_chunk(current_chunk, attr, timesketch_attr) self._process_events_chunk(current_chunk, attr, timesketch_attr)
except Exception as e: except Exception as e:
logger.error(f"Error during event processing: {e}") logger.error(f"Critical error during processing: {e}")
self.stats['errors'] += 1 raise
# Create comprehensive view if we found matches def _process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None:
"""Process a chunk of events with indicator extraction and MISP queries."""
# Extract all unique indicators from chunk
chunk_indicators = []
event_to_indicators = {}
for event in events_chunk:
indicators = self.extract_indicators_from_event(event, attr, timesketch_attr)
if indicators:
event_id = event.source.get('_id', '')
event_to_indicators[event_id] = (event, indicators)
chunk_indicators.extend(indicators)
# Get unique new indicators
unique_indicators = list(dict.fromkeys(chunk_indicators))
new_indicators = [ind for ind in unique_indicators
if ind not in self.processed_indicators and ind not in self.failed_indicators]
if not new_indicators:
return
logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events")
self.stats['indicators_extracted'] += len(new_indicators)
# Query MISP for new indicators
misp_results = self.process_indicators_batch(new_indicators, attr)
# Update cache
self.processed_indicators.update(new_indicators)
for indicator, results in misp_results.items():
self.result_dict[f"{attr}:{indicator}"] = results
# Mark matching events
for event_id, (event, indicators) in event_to_indicators.items():
if event_id in self.marked_events:
continue
matching_results = []
for indicator in indicators:
key = f"{attr}:{indicator}"
if key in self.result_dict:
matching_results.extend(self.result_dict[key])
if matching_results:
self.mark_event_with_intelligence(event, matching_results, attr)
def run(self) -> str:
"""Entry point with comprehensive error handling and reporting."""
if not self.misp_url or not self.misp_api_key:
return "Error: MISP configuration missing"
start_time = time.time()
try:
self.query_misp(self._query_string, self._attr, self._timesketch_attr)
# Create view for matches
if self.stats['events_marked'] > 0: if self.stats['events_marked'] > 0:
view_name = f"MISP Threat Intel - {attr.upper()}" view_name = f"MISP {self._attr.upper()} Threats"
if self.include_community: if self.include_community:
view_name += " (Cross-Org)" view_name += " (Cross-Org)"
self.sketch.add_view( self.sketch.add_view(
view_name=view_name, view_name=view_name,
analyzer_name=self.NAME, analyzer_name=self.NAME,
query_string=f'tag:"MISP-{attr}" OR tag:"threat-intel"', query_string=f'tag:"MISP-{self._attr}" OR tag:"threat-intel"',
) )
def run(self) -> str: # Comprehensive results
"""Entry point for the analyzer with comprehensive error handling.""" elapsed = time.time() - start_time
if not self.misp_url or not self.misp_api_key: total_api_calls = self.stats['api_calls_successful'] + self.stats['api_calls_failed']
return "Error: No MISP configuration found" success_rate = (self.stats['api_calls_successful'] / max(1, total_api_calls)) * 100
start_time = time.time() result = (f"[{self._timesketch_attr}] MISP Analysis: "
try:
logger.info(f"Starting MISP analyzer with config: "
f"batch_size={self.batch_size}, "
f"api_batch_size={self.api_batch_size}, "
f"chunk_size={self.chunk_size}, "
f"include_community={self.include_community}")
self.query_misp(self._query_string, self._attr, self._timesketch_attr)
elapsed_time = time.time() - start_time
# Comprehensive results summary
result_msg = (f"[{self._timesketch_attr}] MISP Analysis Complete: "
f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | "
f"{self.stats['api_calls_made']} API calls | "
f"{len(self.processed_indicators)} indicators processed | " f"{len(self.processed_indicators)} indicators processed | "
f"{elapsed_time:.1f}s") f"{total_api_calls} API calls ({success_rate:.1f}% success) | "
f"{self.stats['timeouts']} timeouts | "
f"{elapsed:.0f}s")
if self.stats['errors'] > 0: logger.info(result)
result_msg += f" | {self.stats['errors']} errors" return result
logger.info(result_msg)
return result_msg
except Exception as e: except Exception as e:
logger.error(f"MISP analyzer critical error: {e}") logger.error(f"MISP analyzer failed: {e}")
return f"[{self._timesketch_attr}] MISP Error: {str(e)}" return f"[{self._timesketch_attr}] MISP Error: {str(e)}"
finally:
try:
self.session.close()
except:
pass
manager.AnalysisManager.register_analyzer(MispAnalyzer) manager.AnalysisManager.register_analyzer(MispAnalyzer)