"""Index analyzer plugin for MISP.""" import logging import ntpath import re import requests from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): """Analyzer for MISP.""" NAME = "misp_analyzer" DISPLAY_NAME = "MISP" DESCRIPTION = "Mark events using MISP" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" super().__init__(index_name, sketch_id, timeline_id=timeline_id) self.misp_url = current_app.config.get("MISP_URL") self.misp_api_key = current_app.config.get("MISP_API_KEY") self.total_event_counter = 0 self.result_dict = {} self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') @staticmethod def get_kwargs(): """Get kwargs for the analyzer.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", }, { "query_string": "message:*", "attr": "ip-src", "timesketch_attr": "message", }, { "query_string": "message:*", "attr": "ip-dst", "timesketch_attr": "message", }, { "query_string": "source_ip:*", "attr": "ip-src", "timesketch_attr": "source_ip", }, ] return to_query def _is_valid_ip(self, ip_str): """Validate IP address.""" try: import ipaddress ip_str = ip_str.strip() ipaddress.ip_address(ip_str) # Filter out invalid ranges if ip_str.startswith(('0.', '127.', '255.255.255.255')): return False return True except (ValueError, AttributeError): return False def _is_valid_hash(self, hash_str, hash_type): """Validate hash format.""" if not hash_str: return False hash_str = hash_str.strip().lower() if hash_type == "md5": return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha1": return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha256": return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) return False def query_misp_single(self, value, attr): """Query MISP for a single value.""" try: response = requests.post( f"{self.misp_url}/attributes/restSearch/", json={"returnFormat": "json", "value": value, "type": attr}, headers={"Authorization": self.misp_api_key}, verify=False, timeout=30, ) if response.status_code != 200: return [] data = response.json() return data.get("response", {}).get("Attribute", []) except Exception: return [] def mark_event(self, event, result, attr): """Add MISP intelligence to event.""" try: if attr.startswith("ip-"): msg = "MISP: Malicious IP - " else: msg = "MISP: Known indicator - " event_info = result[0].get("Event", {}).get("info", "Unknown") msg += event_info if len(result) > 1: msg += f" (+{len(result)-1} more)" event.add_comment(msg) event.add_tags([f"MISP-{attr}", "threat-intel"]) event.commit() except Exception as e: logger.error(f"Error marking event: {e}") def query_misp(self, query, attr, timesketch_attr): """Extract indicators and query MISP.""" events = self.event_stream(query_string=query, return_fields=[timesketch_attr]) query_list = [] events_list = [] processed = 0 # Extract indicators from events for event in events: processed += 1 if processed > 5000: # Reasonable limit break loc = event.source.get(timesketch_attr) if not loc: continue events_list.append(event) indicators = [] # Extract based on attribute type - STRICT VALIDATION if attr.startswith("ip-") and timesketch_attr == "message": # Extract IPs from message field ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: # Direct IP field if self._is_valid_ip(str(loc)): indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: # Hash fields if self._is_valid_hash(str(loc), attr): indicators = [str(loc)] elif attr == "filename": # Filename extraction filename = ntpath.basename(str(loc)) if filename and len(filename) > 1: indicators = [filename] # Add valid indicators to query list for indicator in indicators: if indicator not in query_list: query_list.append(indicator) self.result_dict[f"{attr}:{indicator}"] = [] logger.info(f"Extracted {len(query_list)} {attr} indicators from {processed} events") if not query_list: return # Query MISP for each indicator for indicator in query_list: result = self.query_misp_single(indicator, attr) if result: self.result_dict[f"{attr}:{indicator}"] = result logger.info(f"MISP hit: {indicator}") # Mark matching events for event in events_list: loc = event.source.get(timesketch_attr) if not loc: continue # Re-extract indicators from this event event_indicators = [] if attr.startswith("ip-") and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) event_indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(str(loc)): event_indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(str(loc), attr): event_indicators = [str(loc)] elif attr == "filename": filename = ntpath.basename(str(loc)) if filename: event_indicators = [filename] # Check if any indicator has MISP match for indicator in event_indicators: key = f"{attr}:{indicator}" if key in self.result_dict and self.result_dict[key]: self.total_event_counter += 1 self.mark_event(event, self.result_dict[key], attr) break # Only mark once per event # Create view if we found matches if self.total_event_counter > 0: self.sketch.add_view( view_name="MISP Threat Intelligence", analyzer_name=self.NAME, query_string='tag:"MISP-*" OR tag:"threat-intel"', ) def run(self): """Entry point for the analyzer.""" if not self.misp_url or not self.misp_api_key: return "No MISP configuration found" try: self.query_misp(self._query_string, self._attr, self._timesketch_attr) return f"[{self._timesketch_attr}] MISP Match: {self.total_event_counter}" except Exception as e: logger.error(f"MISP analyzer error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" manager.AnalysisManager.register_analyzer(MispAnalyzer)