"""Index analyzer plugin for MISP.""" import logging import ntpath import re import requests from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): """Analyzer for MISP.""" NAME = "misp_analyzer" DISPLAY_NAME = "MISP" DESCRIPTION = "Mark events using MISP" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" super().__init__(index_name, sketch_id, timeline_id=timeline_id) self.misp_url = current_app.config.get("MISP_URL") self.misp_api_key = current_app.config.get("MISP_API_KEY") self.total_event_counter = 0 self.result_dict = {} self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') # Track marked events to prevent duplicates self.marked_events = set() @staticmethod def get_kwargs(): """Get kwargs for the analyzer.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", }, { "query_string": "message:*", "attr": "ip", # Generic IP instead of ip-src/ip-dst "timesketch_attr": "message", }, { "query_string": "source_ip:*", "attr": "ip", "timesketch_attr": "source_ip", }, ] return to_query def _is_valid_ip(self, ip_str): """Validate IP address.""" try: import ipaddress ip_str = ip_str.strip() ipaddress.ip_address(ip_str) if ip_str.startswith(('0.', '127.', '255.255.255.255')): return False return True except (ValueError, AttributeError): return False def _is_valid_hash(self, hash_str, hash_type): """Validate hash format.""" if not hash_str: return False hash_str = hash_str.strip().lower() if hash_type == "md5": return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha1": return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha256": return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) return False def query_misp_single(self, value, attr): """Query MISP for a single value - ENHANCED for cross-org visibility.""" results = [] # Query both ip-src and ip-dst for IPs, include cross-org events if attr == "ip": search_types = ["ip-src", "ip-dst"] else: search_types = [attr] for search_type in search_types: try: # Include events from other organizations payload = { "returnFormat": "json", "value": value, "type": search_type, # Include events from all organizations with proper distribution "enforceWarninglist": False, # Don't filter known-good IPs "includeDecayScore": False, # Skip decay scores for speed "includeFullModel": False, # Skip full model for speed "decayingModel": [], # No decaying model filters "excludeDecayed": False, # Include older indicators # Distribution levels: 0=Own org, 1=Community, 2=Connected, 3=All, 5=Inherit "distribution": [0, 1, 2, 3, 5] # Include all distribution levels } response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=30, ) if response.status_code == 200: data = response.json() attributes = data.get("response", {}).get("Attribute", []) results.extend(attributes) except Exception: continue return results def mark_event(self, event, result, attr): """Add MISP intelligence to event - FIXED to prevent duplicates.""" try: # Check if event already marked event_id = event.source.get('_id', '') if event_id in self.marked_events: return self.marked_events.add(event_id) # Show organization info for cross-org awareness if attr == "ip": msg = "MISP: Malicious IP detected - " else: msg = "MISP: Known indicator - " # Collect unique events and organizations events_info = {} orgs_info = set() for misp_attr in result: event_info = misp_attr.get("Event", {}) event_id = event_info.get("id", "") event_desc = event_info.get("info", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown Org") events_info[event_id] = f'"{event_desc}"' orgs_info.add(org_name) # Build message with org info event_descriptions = list(events_info.values())[:2] # First 2 events msg += " | ".join(event_descriptions) if len(result) > 2: msg += f" | +{len(result)-2} more" # Add organization information if len(orgs_info) > 1: msg += f" | Orgs: {', '.join(list(orgs_info)[:3])}" elif orgs_info: org_name = list(orgs_info)[0] if org_name != "Unknown Org": msg += f" | Org: {org_name}" event.add_comment(msg) event.add_tags([f"MISP-{attr}", "threat-intel", "cross-org-intel"]) event.commit() except Exception as e: logger.error(f"Error marking event: {e}") def query_misp(self, query, attr, timesketch_attr): """Extract indicators and query MISP.""" events = self.event_stream(query_string=query, return_fields=[timesketch_attr, '_id']) query_list = [] events_list = [] processed = 0 # Extract indicators from events for event in events: processed += 1 if processed > 5000: break loc = event.source.get(timesketch_attr) if not loc: continue events_list.append(event) indicators = [] # Extract based on attribute type if attr == "ip" and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(str(loc)): indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(str(loc), attr): indicators = [str(loc)] elif attr == "filename": filename = ntpath.basename(str(loc)) if filename and len(filename) > 1: indicators = [filename] # Add valid indicators to query list for indicator in indicators: if indicator not in query_list: query_list.append(indicator) self.result_dict[f"{attr}:{indicator}"] = [] logger.info(f"Extracted {len(query_list)} {attr} indicators from {processed} events") if not query_list: return # Query MISP for each indicator for indicator in query_list: result = self.query_misp_single(indicator, attr) if result: self.result_dict[f"{attr}:{indicator}"] = result # Log organization diversity orgs = set() for r in result: org = r.get("Event", {}).get("Orgc", {}).get("name", "Unknown") orgs.add(org) logger.info(f"MISP hit: {indicator} ({len(result)} indicators from {len(orgs)} orgs)") # Mark matching events for event in events_list: loc = event.source.get(timesketch_attr) if not loc: continue # Check if event already processed event_id = event.source.get('_id', '') if event_id in self.marked_events: continue # Re-extract indicators from this event event_indicators = [] if attr == "ip" and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) event_indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(str(loc)): event_indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(str(loc), attr): event_indicators = [str(loc)] elif attr == "filename": filename = ntpath.basename(str(loc)) if filename: event_indicators = [filename] # Check if any indicator has MISP match for indicator in event_indicators: key = f"{attr}:{indicator}" if key in self.result_dict and self.result_dict[key]: self.total_event_counter += 1 self.mark_event(event, self.result_dict[key], attr) break # Only mark once per event # Create view if we found matches if self.total_event_counter > 0: self.sketch.add_view( view_name="MISP Cross-Org Threat Intel", analyzer_name=self.NAME, query_string='tag:"MISP-*" OR tag:"threat-intel" OR tag:"cross-org-intel"', ) def run(self): """Entry point for the analyzer.""" if not self.misp_url or not self.misp_api_key: return "No MISP configuration found" try: self.query_misp(self._query_string, self._attr, self._timesketch_attr) return f"[{self._timesketch_attr}] MISP Match: {self.total_event_counter}" except Exception as e: logger.error(f"MISP analyzer error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" manager.AnalysisManager.register_analyzer(MispAnalyzer)