"""Index analyzer plugin for MISP - Enhanced for large-scale processing.""" import logging import ntpath import re import requests import time from collections import defaultdict from typing import List, Dict, Set, Any from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): """Enhanced Analyzer for MISP with large-scale processing capabilities.""" NAME = "misp_analyzer" DISPLAY_NAME = "MISP Enhanced" DESCRIPTION = "Mark events using MISP with cross-org and large-scale support" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" super().__init__(index_name, sketch_id, timeline_id=timeline_id) self.misp_url = current_app.config.get("MISP_URL") self.misp_api_key = current_app.config.get("MISP_API_KEY") self.total_event_counter = 0 self.result_dict = {} self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") # Enhanced configuration self.include_community = kwargs.get("include_community", True) self.batch_size = kwargs.get("batch_size", 100) # Process events in batches self.api_batch_size = kwargs.get("api_batch_size", 50) # API call batching self.max_retries = kwargs.get("max_retries", 3) self.request_timeout = kwargs.get("request_timeout", 120) # 2 minutes self.chunk_size = kwargs.get("chunk_size", 1000) # Memory management # Regex patterns self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') # Track processed items to prevent duplicates self.marked_events = set() self.processed_indicators = set() # Statistics self.stats = { 'events_processed': 0, 'indicators_extracted': 0, 'api_calls_made': 0, 'events_marked': 0, 'errors': 0 } @staticmethod def get_kwargs(): """Get kwargs for the analyzer with enhanced options.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, { "query_string": "message:*", "attr": "ip", "timesketch_attr": "message", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, { "query_string": "source_ip:* OR src_ip:* OR client_ip:*", "attr": "ip", "timesketch_attr": "source_ip", "include_community": True, "batch_size": 100, "api_batch_size": 50, }, ] return to_query def _is_valid_ip(self, ip_str: str) -> bool: """Validate IP address with enhanced filtering.""" try: import ipaddress ip_str = ip_str.strip() ip_obj = ipaddress.ip_address(ip_str) # Filter out private, loopback, and other non-routable IPs if (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_multicast or ip_obj.is_reserved or ip_obj.is_link_local): return False # Additional nginx log specific filters if ip_str.startswith(('0.', '255.255.255.255', '169.254.')): return False return True except (ValueError, AttributeError): return False def _is_valid_hash(self, hash_str: str, hash_type: str) -> bool: """Validate hash format.""" if not hash_str: return False hash_str = hash_str.strip().lower() hash_lengths = {"md5": 32, "sha1": 40, "sha256": 64} expected_length = hash_lengths.get(hash_type) if not expected_length: return False return (len(hash_str) == expected_length and all(c in '0123456789abcdef' for c in hash_str)) def _make_misp_request(self, payload: Dict[str, Any], retry_count: int = 0) -> List[Dict]: """Make MISP API request with retry logic.""" try: response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=self.request_timeout, ) if response.status_code == 200: data = response.json() return data.get("response", {}).get("Attribute", []) elif response.status_code == 429: # Rate limited wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60s logger.warning(f"Rate limited, waiting {wait_time}s before retry") time.sleep(wait_time) raise requests.exceptions.RequestException("Rate limited") else: logger.warning(f"MISP API returned status {response.status_code}") return [] except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: if retry_count < self.max_retries: wait_time = min(2 ** retry_count * 5, 120) # Exponential backoff logger.warning(f"Request failed (attempt {retry_count + 1}), retrying in {wait_time}s: {e}") time.sleep(wait_time) return self._make_misp_request(payload, retry_count + 1) else: logger.error(f"Request failed after {self.max_retries} retries: {e}") self.stats['errors'] += 1 return [] except Exception as e: logger.error(f"Unexpected error in MISP request: {e}") self.stats['errors'] += 1 return [] def query_misp_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]: """Query MISP for multiple indicators efficiently.""" results = defaultdict(list) # Determine search types based on attribute if attr == "ip": search_types = ["ip-src", "ip-dst"] else: search_types = [attr] for search_type in search_types: # Batch indicators to reduce API calls for i in range(0, len(indicators), self.api_batch_size): batch = indicators[i:i + self.api_batch_size] # Build payload with distribution settings distribution_levels = [0] # Own org if self.include_community: distribution_levels.extend([1, 2]) # Community and connected orgs payload = { "returnFormat": "json", "value": batch, "type": search_type, "enforceWarninglist": False, "includeDecayScore": False, "includeFullModel": False, "excludeDecayed": False, "distribution": distribution_levels, "limit": 10000, # High limit for large datasets } self.stats['api_calls_made'] += 1 logger.info(f"Querying MISP for {len(batch)} {search_type} indicators (call #{self.stats['api_calls_made']})") batch_results = self._make_misp_request(payload) # Group results by indicator value for result in batch_results: indicator_value = result.get("value", "").strip() if indicator_value in batch: results[indicator_value].append(result) # Rate limiting courtesy pause time.sleep(0.5) return dict(results) def extract_indicators_from_event(self, event: Any, attr: str, timesketch_attr: str) -> List[str]: """Extract indicators from a single event.""" loc = event.source.get(timesketch_attr) if not loc: return [] indicators = [] loc_str = str(loc) if attr == "ip" and timesketch_attr == "message": # Extract IPs from nginx access log messages ip_matches = self.ip_pattern.findall(loc_str) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(loc_str): indicators = [loc_str] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(loc_str, attr): indicators = [loc_str] elif attr == "filename": filename = ntpath.basename(loc_str) if filename and len(filename) > 3: # Meaningful filename indicators = [filename] return indicators def mark_event_with_intel(self, event: Any, misp_results: List[Dict], attr: str) -> None: """Mark event with MISP intelligence.""" try: event_id = event.source.get('_id', '') if event_id in self.marked_events: return self.marked_events.add(event_id) # Build comprehensive message if attr == "ip": msg = "MISP: Malicious IP detected" else: msg = f"MISP: Known {attr.upper()} indicator" # Collect event and organization info events_info = {} orgs_info = set() threat_levels = set() for misp_attr in misp_results: event_info = misp_attr.get("Event", {}) event_id_misp = event_info.get("id", "") event_desc = event_info.get("info", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown") threat_level = event_info.get("threat_level_id", "") events_info[event_id_misp] = event_desc[:50] # Truncate long descriptions orgs_info.add(org_name) if threat_level: threat_levels.add(threat_level) # Enhanced message with threat context event_descriptions = list(events_info.values())[:2] if event_descriptions: msg += f" | Events: {' | '.join(event_descriptions)}" if len(misp_results) > 2: msg += f" | +{len(misp_results)-2} more indicators" # Organization information if len(orgs_info) > 1: msg += f" | Sources: {', '.join(list(orgs_info)[:3])}" elif orgs_info and list(orgs_info)[0] != "Unknown": msg += f" | Source: {list(orgs_info)[0]}" # Threat level context if threat_levels: highest_threat = min(threat_levels) # Lower number = higher threat threat_map = {"1": "HIGH", "2": "MEDIUM", "3": "LOW", "4": "UNDEFINED"} msg += f" | Threat: {threat_map.get(str(highest_threat), 'UNKNOWN')}" # Add tags and comment tags = [f"MISP-{attr}", "threat-intel"] if self.include_community and len(orgs_info) > 1: tags.append("cross-org-intel") event.add_comment(msg) event.add_tags(tags) event.commit() self.stats['events_marked'] += 1 except Exception as e: logger.error(f"Error marking event {event_id}: {e}") self.stats['errors'] += 1 def process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None: """Process a chunk of events efficiently.""" # Extract all indicators from the chunk chunk_indicators = [] event_to_indicators = {} for event in events_chunk: indicators = self.extract_indicators_from_event(event, attr, timesketch_attr) if indicators: event_id = event.source.get('_id', '') event_to_indicators[event_id] = (event, indicators) chunk_indicators.extend(indicators) # Remove duplicates while preserving order unique_indicators = list(dict.fromkeys(chunk_indicators)) new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] if not new_indicators: return logger.info(f"Processing {len(new_indicators)} new indicators from chunk of {len(events_chunk)} events") # Query MISP for new indicators misp_results = self.query_misp_batch(new_indicators, attr) # Update processed indicators and result cache self.processed_indicators.update(new_indicators) for indicator, results in misp_results.items(): if results: self.result_dict[f"{attr}:{indicator}"] = results # Mark events that have matching indicators for event_id, (event, indicators) in event_to_indicators.items(): if event_id in self.marked_events: continue matching_results = [] for indicator in indicators: key = f"{attr}:{indicator}" if key in self.result_dict: matching_results.extend(self.result_dict[key]) if matching_results: self.mark_event_with_intel(event, matching_results, attr) def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None: """Main processing function with chunked approach for large datasets.""" logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Community querying: {'enabled' if self.include_community else 'disabled'}") # Process events in chunks to manage memory events_stream = self.event_stream( query_string=query, return_fields=[timesketch_attr, '_id', 'timestamp'] ) current_chunk = [] try: for event in events_stream: current_chunk.append(event) self.stats['events_processed'] += 1 # Process chunk when it reaches the specified size if len(current_chunk) >= self.chunk_size: self.process_events_chunk(current_chunk, attr, timesketch_attr) current_chunk = [] # Progress logging if self.stats['events_processed'] % 10000 == 0: logger.info(f"Progress: {self.stats['events_processed']} events processed, " f"{self.stats['events_marked']} marked, " f"{self.stats['api_calls_made']} API calls made") # Process remaining events in the last chunk if current_chunk: self.process_events_chunk(current_chunk, attr, timesketch_attr) except Exception as e: logger.error(f"Error during event processing: {e}") self.stats['errors'] += 1 # Create comprehensive view if we found matches if self.stats['events_marked'] > 0: view_name = f"MISP Threat Intel - {attr.upper()}" if self.include_community: view_name += " (Cross-Org)" self.sketch.add_view( view_name=view_name, analyzer_name=self.NAME, query_string=f'tag:"MISP-{attr}" OR tag:"threat-intel"', ) def run(self) -> str: """Entry point for the analyzer with comprehensive error handling.""" if not self.misp_url or not self.misp_api_key: return "Error: No MISP configuration found" start_time = time.time() try: logger.info(f"Starting MISP analyzer with config: " f"batch_size={self.batch_size}, " f"api_batch_size={self.api_batch_size}, " f"chunk_size={self.chunk_size}, " f"include_community={self.include_community}") self.query_misp(self._query_string, self._attr, self._timesketch_attr) elapsed_time = time.time() - start_time # Comprehensive results summary result_msg = (f"[{self._timesketch_attr}] MISP Analysis Complete: " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['api_calls_made']} API calls | " f"{len(self.processed_indicators)} indicators processed | " f"{elapsed_time:.1f}s") if self.stats['errors'] > 0: result_msg += f" | {self.stats['errors']} errors" logger.info(result_msg) return result_msg except Exception as e: logger.error(f"MISP analyzer critical error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" manager.AnalysisManager.register_analyzer(MispAnalyzer)