diff --git a/misp_analyzer.py b/misp_analyzer.py new file mode 100644 index 0000000..20b73ed --- /dev/null +++ b/misp_analyzer.py @@ -0,0 +1,258 @@ +"""Index analyzer plugin for MISP.""" + +import logging +import ntpath +import re +import requests + +from flask import current_app +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager + + +logger = logging.getLogger("timesketch.analyzers.misp") + + +class MispAnalyzer(interface.BaseAnalyzer): + """Analyzer for MISP.""" + + NAME = "misp_analyzer" + DISPLAY_NAME = "MISP" + DESCRIPTION = "Mark events using MISP" + + def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): + """Initialize the Analyzer.""" + super().__init__(index_name, sketch_id, timeline_id=timeline_id) + self.misp_url = current_app.config.get("MISP_URL") + self.misp_api_key = current_app.config.get("MISP_API_KEY") + self.total_event_counter = 0 + self.result_dict = {} + self._query_string = kwargs.get("query_string") + self._attr = kwargs.get("attr") + self._timesketch_attr = kwargs.get("timesketch_attr") + self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') + + @staticmethod + def get_kwargs(): + """Get kwargs for the analyzer.""" + to_query = [ + { + "query_string": "md5_hash:*", + "attr": "md5", + "timesketch_attr": "md5_hash", + }, + { + "query_string": "sha1_hash:*", + "attr": "sha1", + "timesketch_attr": "sha1_hash", + }, + { + "query_string": "sha256_hash:*", + "attr": "sha256", + "timesketch_attr": "sha256_hash", + }, + { + "query_string": "filename:*", + "attr": "filename", + "timesketch_attr": "filename", + }, + { + "query_string": "message:*", + "attr": "ip-src", + "timesketch_attr": "message", + }, + { + "query_string": "message:*", + "attr": "ip-dst", + "timesketch_attr": "message", + }, + { + "query_string": "source_ip:*", + "attr": "ip-src", + "timesketch_attr": "source_ip", + }, + ] + return to_query + + def _is_valid_ip(self, ip_str): + """Validate IP address.""" + try: + import ipaddress + ip_str = ip_str.strip() + ipaddress.ip_address(ip_str) + # Filter out invalid ranges + if ip_str.startswith(('0.', '127.', '255.255.255.255')): + return False + return True + except (ValueError, AttributeError): + return False + + def _is_valid_hash(self, hash_str, hash_type): + """Validate hash format.""" + if not hash_str: + return False + hash_str = hash_str.strip().lower() + + if hash_type == "md5": + return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) + elif hash_type == "sha1": + return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) + elif hash_type == "sha256": + return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) + + return False + + def query_misp_single(self, value, attr): + """Query MISP for a single value.""" + try: + response = requests.post( + f"{self.misp_url}/attributes/restSearch/", + json={"returnFormat": "json", "value": value, "type": attr}, + headers={"Authorization": self.misp_api_key}, + verify=False, + timeout=30, + ) + + if response.status_code != 200: + return [] + + data = response.json() + return data.get("response", {}).get("Attribute", []) + + except Exception: + return [] + + def mark_event(self, event, result, attr): + """Add MISP intelligence to event.""" + try: + if attr.startswith("ip-"): + msg = "MISP: Malicious IP - " + else: + msg = "MISP: Known indicator - " + + event_info = result[0].get("Event", {}).get("info", "Unknown") + msg += event_info + + if len(result) > 1: + msg += f" (+{len(result)-1} more)" + + event.add_comment(msg) + event.add_tags([f"MISP-{attr}", "threat-intel"]) + event.commit() + + except Exception as e: + logger.error(f"Error marking event: {e}") + + def query_misp(self, query, attr, timesketch_attr): + """Extract indicators and query MISP.""" + events = self.event_stream(query_string=query, return_fields=[timesketch_attr]) + query_list = [] + events_list = [] + processed = 0 + + # Extract indicators from events + for event in events: + processed += 1 + if processed > 5000: # Reasonable limit + break + + loc = event.source.get(timesketch_attr) + if not loc: + continue + + events_list.append(event) + indicators = [] + + # Extract based on attribute type - STRICT VALIDATION + if attr.startswith("ip-") and timesketch_attr == "message": + # Extract IPs from message field + ip_matches = self.ip_pattern.findall(str(loc)) + indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + + elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: + # Direct IP field + if self._is_valid_ip(str(loc)): + indicators = [str(loc)] + + elif attr in ["md5", "sha1", "sha256"]: + # Hash fields + if self._is_valid_hash(str(loc), attr): + indicators = [str(loc)] + + elif attr == "filename": + # Filename extraction + filename = ntpath.basename(str(loc)) + if filename and len(filename) > 1: + indicators = [filename] + + # Add valid indicators to query list + for indicator in indicators: + if indicator not in query_list: + query_list.append(indicator) + self.result_dict[f"{attr}:{indicator}"] = [] + + logger.info(f"Extracted {len(query_list)} {attr} indicators from {processed} events") + + if not query_list: + return + + # Query MISP for each indicator + for indicator in query_list: + result = self.query_misp_single(indicator, attr) + if result: + self.result_dict[f"{attr}:{indicator}"] = result + logger.info(f"MISP hit: {indicator}") + + # Mark matching events + for event in events_list: + loc = event.source.get(timesketch_attr) + if not loc: + continue + + # Re-extract indicators from this event + event_indicators = [] + + if attr.startswith("ip-") and timesketch_attr == "message": + ip_matches = self.ip_pattern.findall(str(loc)) + event_indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: + if self._is_valid_ip(str(loc)): + event_indicators = [str(loc)] + elif attr in ["md5", "sha1", "sha256"]: + if self._is_valid_hash(str(loc), attr): + event_indicators = [str(loc)] + elif attr == "filename": + filename = ntpath.basename(str(loc)) + if filename: + event_indicators = [filename] + + # Check if any indicator has MISP match + for indicator in event_indicators: + key = f"{attr}:{indicator}" + if key in self.result_dict and self.result_dict[key]: + self.total_event_counter += 1 + self.mark_event(event, self.result_dict[key], attr) + break # Only mark once per event + + # Create view if we found matches + if self.total_event_counter > 0: + self.sketch.add_view( + view_name="MISP Threat Intelligence", + analyzer_name=self.NAME, + query_string='tag:"MISP-*" OR tag:"threat-intel"', + ) + + def run(self): + """Entry point for the analyzer.""" + if not self.misp_url or not self.misp_api_key: + return "No MISP configuration found" + + try: + self.query_misp(self._query_string, self._attr, self._timesketch_attr) + return f"[{self._timesketch_attr}] MISP Match: {self.total_event_counter}" + except Exception as e: + logger.error(f"MISP analyzer error: {e}") + return f"[{self._timesketch_attr}] MISP Error: {str(e)}" + + +manager.AnalysisManager.register_analyzer(MispAnalyzer) \ No newline at end of file