diff --git a/misp_analyzer.py b/misp_analyzer.py index e8b6498..db51f3b 100644 --- a/misp_analyzer.py +++ b/misp_analyzer.py @@ -1,9 +1,12 @@ -"""Index analyzer plugin for MISP.""" +"""Index analyzer plugin for MISP - Enhanced for large-scale processing.""" import logging import ntpath import re import requests +import time +from collections import defaultdict +from typing import List, Dict, Set, Any from flask import current_app from timesketch.lib.analyzers import interface @@ -14,11 +17,11 @@ logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): - """Analyzer for MISP.""" + """Enhanced Analyzer for MISP with large-scale processing capabilities.""" NAME = "misp_analyzer" - DISPLAY_NAME = "MISP" - DESCRIPTION = "Mark events using MISP" + DISPLAY_NAME = "MISP Enhanced" + DESCRIPTION = "Mark events using MISP with cross-org and large-scale support" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" @@ -30,285 +33,431 @@ class MispAnalyzer(interface.BaseAnalyzer): self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") + + # Enhanced configuration + self.include_community = kwargs.get("include_community", True) + self.batch_size = kwargs.get("batch_size", 100) # Process events in batches + self.api_batch_size = kwargs.get("api_batch_size", 50) # API call batching + self.max_retries = kwargs.get("max_retries", 3) + self.request_timeout = kwargs.get("request_timeout", 120) # 2 minutes + self.chunk_size = kwargs.get("chunk_size", 1000) # Memory management + + # Regex patterns self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') - # Track marked events to prevent duplicates + + # Track processed items to prevent duplicates self.marked_events = set() + self.processed_indicators = set() + + # Statistics + self.stats = { + 'events_processed': 0, + 'indicators_extracted': 0, + 'api_calls_made': 0, + 'events_marked': 0, + 'errors': 0 + } @staticmethod def get_kwargs(): - """Get kwargs for the analyzer.""" + """Get kwargs for the analyzer with enhanced options.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, { "query_string": "message:*", - "attr": "ip", # Generic IP instead of ip-src/ip-dst + "attr": "ip", "timesketch_attr": "message", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, { - "query_string": "source_ip:*", + "query_string": "source_ip:* OR src_ip:* OR client_ip:*", "attr": "ip", "timesketch_attr": "source_ip", + "include_community": True, + "batch_size": 100, + "api_batch_size": 50, }, ] return to_query - def _is_valid_ip(self, ip_str): - """Validate IP address.""" + def _is_valid_ip(self, ip_str: str) -> bool: + """Validate IP address with enhanced filtering.""" try: import ipaddress ip_str = ip_str.strip() - ipaddress.ip_address(ip_str) - if ip_str.startswith(('0.', '127.', '255.255.255.255')): + ip_obj = ipaddress.ip_address(ip_str) + + # Filter out private, loopback, and other non-routable IPs + if (ip_obj.is_private or ip_obj.is_loopback or + ip_obj.is_multicast or ip_obj.is_reserved or + ip_obj.is_link_local): return False + + # Additional nginx log specific filters + if ip_str.startswith(('0.', '255.255.255.255', '169.254.')): + return False + return True except (ValueError, AttributeError): return False - def _is_valid_hash(self, hash_str, hash_type): + def _is_valid_hash(self, hash_str: str, hash_type: str) -> bool: """Validate hash format.""" if not hash_str: return False hash_str = hash_str.strip().lower() - if hash_type == "md5": - return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) - elif hash_type == "sha1": - return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) - elif hash_type == "sha256": - return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) + hash_lengths = {"md5": 32, "sha1": 40, "sha256": 64} + expected_length = hash_lengths.get(hash_type) - return False + if not expected_length: + return False + + return (len(hash_str) == expected_length and + all(c in '0123456789abcdef' for c in hash_str)) - def query_misp_single(self, value, attr): - """Query MISP for a single value - ENHANCED for cross-org visibility.""" - results = [] + def _make_misp_request(self, payload: Dict[str, Any], retry_count: int = 0) -> List[Dict]: + """Make MISP API request with retry logic.""" + try: + response = requests.post( + f"{self.misp_url}/attributes/restSearch/", + json=payload, + headers={"Authorization": self.misp_api_key}, + verify=False, + timeout=self.request_timeout, + ) + + if response.status_code == 200: + data = response.json() + return data.get("response", {}).get("Attribute", []) + elif response.status_code == 429: # Rate limited + wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60s + logger.warning(f"Rate limited, waiting {wait_time}s before retry") + time.sleep(wait_time) + raise requests.exceptions.RequestException("Rate limited") + else: + logger.warning(f"MISP API returned status {response.status_code}") + return [] + + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + if retry_count < self.max_retries: + wait_time = min(2 ** retry_count * 5, 120) # Exponential backoff + logger.warning(f"Request failed (attempt {retry_count + 1}), retrying in {wait_time}s: {e}") + time.sleep(wait_time) + return self._make_misp_request(payload, retry_count + 1) + else: + logger.error(f"Request failed after {self.max_retries} retries: {e}") + self.stats['errors'] += 1 + return [] + except Exception as e: + logger.error(f"Unexpected error in MISP request: {e}") + self.stats['errors'] += 1 + return [] + + def query_misp_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]: + """Query MISP for multiple indicators efficiently.""" + results = defaultdict(list) - # Query both ip-src and ip-dst for IPs, include cross-org events + # Determine search types based on attribute if attr == "ip": search_types = ["ip-src", "ip-dst"] else: search_types = [attr] for search_type in search_types: - try: - # Include events from other organizations + # Batch indicators to reduce API calls + for i in range(0, len(indicators), self.api_batch_size): + batch = indicators[i:i + self.api_batch_size] + + # Build payload with distribution settings + distribution_levels = [0] # Own org + if self.include_community: + distribution_levels.extend([1, 2]) # Community and connected orgs + payload = { - "returnFormat": "json", - "value": value, + "returnFormat": "json", + "value": batch, "type": search_type, - # Include events from all organizations with proper distribution - "enforceWarninglist": False, # Don't filter known-good IPs - "includeDecayScore": False, # Skip decay scores for speed - "includeFullModel": False, # Skip full model for speed - "decayingModel": [], # No decaying model filters - "excludeDecayed": False, # Include older indicators - # Distribution levels: 0=Own org, 1=Community, 2=Connected, 3=All, 5=Inherit - "distribution": [0, 1] + "enforceWarninglist": False, + "includeDecayScore": False, + "includeFullModel": False, + "excludeDecayed": False, + "distribution": distribution_levels, + "limit": 10000, # High limit for large datasets } - response = requests.post( - f"{self.misp_url}/attributes/restSearch/", - json=payload, - headers={"Authorization": self.misp_api_key}, - verify=False, - timeout=30, - ) - - if response.status_code == 200: - data = response.json() - attributes = data.get("response", {}).get("Attribute", []) - results.extend(attributes) - - except Exception: - continue + self.stats['api_calls_made'] += 1 + logger.info(f"Querying MISP for {len(batch)} {search_type} indicators (call #{self.stats['api_calls_made']})") - return results + batch_results = self._make_misp_request(payload) + + # Group results by indicator value + for result in batch_results: + indicator_value = result.get("value", "").strip() + if indicator_value in batch: + results[indicator_value].append(result) + + # Rate limiting courtesy pause + time.sleep(0.5) + + return dict(results) - def mark_event(self, event, result, attr): - """Add MISP intelligence to event - FIXED to prevent duplicates.""" + def extract_indicators_from_event(self, event: Any, attr: str, timesketch_attr: str) -> List[str]: + """Extract indicators from a single event.""" + loc = event.source.get(timesketch_attr) + if not loc: + return [] + + indicators = [] + loc_str = str(loc) + + if attr == "ip" and timesketch_attr == "message": + # Extract IPs from nginx access log messages + ip_matches = self.ip_pattern.findall(loc_str) + indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + + elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: + if self._is_valid_ip(loc_str): + indicators = [loc_str] + + elif attr in ["md5", "sha1", "sha256"]: + if self._is_valid_hash(loc_str, attr): + indicators = [loc_str] + + elif attr == "filename": + filename = ntpath.basename(loc_str) + if filename and len(filename) > 3: # Meaningful filename + indicators = [filename] + + return indicators + + def mark_event_with_intel(self, event: Any, misp_results: List[Dict], attr: str) -> None: + """Mark event with MISP intelligence.""" try: - # Check if event already marked event_id = event.source.get('_id', '') if event_id in self.marked_events: return self.marked_events.add(event_id) - # Show organization info for cross-org awareness + # Build comprehensive message if attr == "ip": - msg = "MISP: Malicious IP detected - " + msg = "MISP: Malicious IP detected" else: - msg = "MISP: Known indicator - " + msg = f"MISP: Known {attr.upper()} indicator" - # Collect unique events and organizations + # Collect event and organization info events_info = {} orgs_info = set() + threat_levels = set() - for misp_attr in result: + for misp_attr in misp_results: event_info = misp_attr.get("Event", {}) - event_id = event_info.get("id", "") + event_id_misp = event_info.get("id", "") event_desc = event_info.get("info", "Unknown") - org_name = event_info.get("Orgc", {}).get("name", "Unknown Org") + org_name = event_info.get("Orgc", {}).get("name", "Unknown") + threat_level = event_info.get("threat_level_id", "") - events_info[event_id] = f'"{event_desc}"' + events_info[event_id_misp] = event_desc[:50] # Truncate long descriptions orgs_info.add(org_name) + if threat_level: + threat_levels.add(threat_level) - # Build message with org info - event_descriptions = list(events_info.values())[:2] # First 2 events - msg += " | ".join(event_descriptions) + # Enhanced message with threat context + event_descriptions = list(events_info.values())[:2] + if event_descriptions: + msg += f" | Events: {' | '.join(event_descriptions)}" - if len(result) > 2: - msg += f" | +{len(result)-2} more" + if len(misp_results) > 2: + msg += f" | +{len(misp_results)-2} more indicators" - # Add organization information + # Organization information if len(orgs_info) > 1: - msg += f" | Orgs: {', '.join(list(orgs_info)[:3])}" - elif orgs_info: - org_name = list(orgs_info)[0] - if org_name != "Unknown Org": - msg += f" | Org: {org_name}" - + msg += f" | Sources: {', '.join(list(orgs_info)[:3])}" + elif orgs_info and list(orgs_info)[0] != "Unknown": + msg += f" | Source: {list(orgs_info)[0]}" + + # Threat level context + if threat_levels: + highest_threat = min(threat_levels) # Lower number = higher threat + threat_map = {"1": "HIGH", "2": "MEDIUM", "3": "LOW", "4": "UNDEFINED"} + msg += f" | Threat: {threat_map.get(str(highest_threat), 'UNKNOWN')}" + + # Add tags and comment + tags = [f"MISP-{attr}", "threat-intel"] + if self.include_community and len(orgs_info) > 1: + tags.append("cross-org-intel") + event.add_comment(msg) - event.add_tags([f"MISP-{attr}", "threat-intel", "cross-org-intel"]) + event.add_tags(tags) event.commit() + self.stats['events_marked'] += 1 + except Exception as e: - logger.error(f"Error marking event: {e}") + logger.error(f"Error marking event {event_id}: {e}") + self.stats['errors'] += 1 - def query_misp(self, query, attr, timesketch_attr): - """Extract indicators and query MISP.""" - events = self.event_stream(query_string=query, return_fields=[timesketch_attr, '_id']) - query_list = [] - events_list = [] - processed = 0 - - # Extract indicators from events - for event in events: - processed += 1 - if processed > 5000: - break - - loc = event.source.get(timesketch_attr) - if not loc: - continue - - events_list.append(event) - indicators = [] - - # Extract based on attribute type - if attr == "ip" and timesketch_attr == "message": - ip_matches = self.ip_pattern.findall(str(loc)) - indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] - - elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: - if self._is_valid_ip(str(loc)): - indicators = [str(loc)] - - elif attr in ["md5", "sha1", "sha256"]: - if self._is_valid_hash(str(loc), attr): - indicators = [str(loc)] - - elif attr == "filename": - filename = ntpath.basename(str(loc)) - if filename and len(filename) > 1: - indicators = [filename] - - # Add valid indicators to query list - for indicator in indicators: - if indicator not in query_list: - query_list.append(indicator) - self.result_dict[f"{attr}:{indicator}"] = [] - - logger.info(f"Extracted {len(query_list)} {attr} indicators from {processed} events") - - if not query_list: + def process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None: + """Process a chunk of events efficiently.""" + # Extract all indicators from the chunk + chunk_indicators = [] + event_to_indicators = {} + + for event in events_chunk: + indicators = self.extract_indicators_from_event(event, attr, timesketch_attr) + if indicators: + event_id = event.source.get('_id', '') + event_to_indicators[event_id] = (event, indicators) + chunk_indicators.extend(indicators) + + # Remove duplicates while preserving order + unique_indicators = list(dict.fromkeys(chunk_indicators)) + new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] + + if not new_indicators: return - - # Query MISP for each indicator - for indicator in query_list: - result = self.query_misp_single(indicator, attr) - if result: - self.result_dict[f"{attr}:{indicator}"] = result - # Log organization diversity - orgs = set() - for r in result: - org = r.get("Event", {}).get("Orgc", {}).get("name", "Unknown") - orgs.add(org) - logger.info(f"MISP hit: {indicator} ({len(result)} indicators from {len(orgs)} orgs)") - - # Mark matching events - for event in events_list: - loc = event.source.get(timesketch_attr) - if not loc: - continue - - # Check if event already processed - event_id = event.source.get('_id', '') + + logger.info(f"Processing {len(new_indicators)} new indicators from chunk of {len(events_chunk)} events") + + # Query MISP for new indicators + misp_results = self.query_misp_batch(new_indicators, attr) + + # Update processed indicators and result cache + self.processed_indicators.update(new_indicators) + for indicator, results in misp_results.items(): + if results: + self.result_dict[f"{attr}:{indicator}"] = results + + # Mark events that have matching indicators + for event_id, (event, indicators) in event_to_indicators.items(): if event_id in self.marked_events: continue - - # Re-extract indicators from this event - event_indicators = [] - - if attr == "ip" and timesketch_attr == "message": - ip_matches = self.ip_pattern.findall(str(loc)) - event_indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] - elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: - if self._is_valid_ip(str(loc)): - event_indicators = [str(loc)] - elif attr in ["md5", "sha1", "sha256"]: - if self._is_valid_hash(str(loc), attr): - event_indicators = [str(loc)] - elif attr == "filename": - filename = ntpath.basename(str(loc)) - if filename: - event_indicators = [filename] - - # Check if any indicator has MISP match - for indicator in event_indicators: + + matching_results = [] + for indicator in indicators: key = f"{attr}:{indicator}" - if key in self.result_dict and self.result_dict[key]: - self.total_event_counter += 1 - self.mark_event(event, self.result_dict[key], attr) - break # Only mark once per event + if key in self.result_dict: + matching_results.extend(self.result_dict[key]) + + if matching_results: + self.mark_event_with_intel(event, matching_results, attr) - # Create view if we found matches - if self.total_event_counter > 0: + def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None: + """Main processing function with chunked approach for large datasets.""" + logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") + logger.info(f"Community querying: {'enabled' if self.include_community else 'disabled'}") + + # Process events in chunks to manage memory + events_stream = self.event_stream( + query_string=query, + return_fields=[timesketch_attr, '_id', 'timestamp'] + ) + + current_chunk = [] + + try: + for event in events_stream: + current_chunk.append(event) + self.stats['events_processed'] += 1 + + # Process chunk when it reaches the specified size + if len(current_chunk) >= self.chunk_size: + self.process_events_chunk(current_chunk, attr, timesketch_attr) + current_chunk = [] + + # Progress logging + if self.stats['events_processed'] % 10000 == 0: + logger.info(f"Progress: {self.stats['events_processed']} events processed, " + f"{self.stats['events_marked']} marked, " + f"{self.stats['api_calls_made']} API calls made") + + # Process remaining events in the last chunk + if current_chunk: + self.process_events_chunk(current_chunk, attr, timesketch_attr) + + except Exception as e: + logger.error(f"Error during event processing: {e}") + self.stats['errors'] += 1 + + # Create comprehensive view if we found matches + if self.stats['events_marked'] > 0: + view_name = f"MISP Threat Intel - {attr.upper()}" + if self.include_community: + view_name += " (Cross-Org)" + self.sketch.add_view( - view_name="MISP Cross-Org Threat Intel", + view_name=view_name, analyzer_name=self.NAME, - query_string='tag:"MISP-*" OR tag:"threat-intel" OR tag:"cross-org-intel"', + query_string=f'tag:"MISP-{attr}" OR tag:"threat-intel"', ) - def run(self): - """Entry point for the analyzer.""" + def run(self) -> str: + """Entry point for the analyzer with comprehensive error handling.""" if not self.misp_url or not self.misp_api_key: - return "No MISP configuration found" + return "Error: No MISP configuration found" + start_time = time.time() + try: + logger.info(f"Starting MISP analyzer with config: " + f"batch_size={self.batch_size}, " + f"api_batch_size={self.api_batch_size}, " + f"chunk_size={self.chunk_size}, " + f"include_community={self.include_community}") + self.query_misp(self._query_string, self._attr, self._timesketch_attr) - return f"[{self._timesketch_attr}] MISP Match: {self.total_event_counter}" + + elapsed_time = time.time() - start_time + + # Comprehensive results summary + result_msg = (f"[{self._timesketch_attr}] MISP Analysis Complete: " + f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " + f"{self.stats['api_calls_made']} API calls | " + f"{len(self.processed_indicators)} indicators processed | " + f"{elapsed_time:.1f}s") + + if self.stats['errors'] > 0: + result_msg += f" | {self.stats['errors']} errors" + + logger.info(result_msg) + return result_msg + except Exception as e: - logger.error(f"MISP analyzer error: {e}") + logger.error(f"MISP analyzer critical error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}"