"""Index analyzer plugin for MISP - Simple and reliable for large-scale processing.""" import logging import ntpath import re import requests import time from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.misp") class MispIpAnalyzer(interface.BaseAnalyzer): # Changed from MispAnalyzer """Enhanced MISP Analyzer for IP address analysis.""" NAME = "misp_ip_analyzer" DISPLAY_NAME = "MISP-IP Enhanced" DESCRIPTION = "Mark events using MISP - IP address analysis using source_ip" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" super().__init__(index_name, sketch_id, timeline_id=timeline_id) self.misp_url = current_app.config.get("MISP_URL") self.misp_api_key = current_app.config.get("MISP_API_KEY") self.total_event_counter = 0 self.result_dict = {} self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") self.include_community = kwargs.get("include_community", False) self.chunk_size = kwargs.get("chunk_size", 1000) self.max_retries = kwargs.get("max_retries", 2) self.request_delay = kwargs.get("request_delay", 0.5) self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') self.processed_indicators = set() self.failed_indicators = set() self.stats = { 'events_processed': 0, 'indicators_found': 0, 'api_calls': 0, 'api_timeouts': 0, 'events_marked': 0, 'community_hits': 0, 'own_org_hits': 0, 'total_correlations': 0, 'multi_event_correlations': 0 } @staticmethod def get_kwargs(): """Get kwargs for the analyzer.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", "include_community": True, }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", "include_community": True, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", "include_community": True, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", "include_community": True, }, { "query_string": "source_ip:*", "attr": "ip-src", "timesketch_attr": "source_ip", "include_community": True, }, ] return to_query def _is_valid_ip(self, ip_str): """Simple IP validation.""" try: import ipaddress ip_str = ip_str.strip() ipaddress.ip_address(ip_str) if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')): return False return True except (ValueError, AttributeError): return False def _is_valid_hash(self, hash_str, hash_type): """Simple hash validation.""" if not hash_str: return False hash_str = hash_str.strip().lower() if hash_type == "md5": return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha1": return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha256": return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) return False def query_misp_single(self, value, attr, retry_count=0): """Query MISP for a single value.""" if value in self.failed_indicators: return [] try: # For IP searches, query both ip-src and ip-dst search_types = [] if attr.startswith("ip-"): search_types = ["ip-src", "ip-dst"] else: search_types = [attr] all_results = [] for search_type in search_types: payload = { "returnFormat": "json", "value": value, "type": search_type, "enforceWarninglist": False, "includeEventTags": True, "includeContext": True, } if self.include_community: payload.update({ "distribution": [0, 1, 2, 3, 5], "includeEventUuid": True, "includeCorrelations": True, "includeDecayScore": False, "includeFullModel": False, }) else: payload["distribution"] = [0] self.stats['api_calls'] += 1 response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=45, ) if response.status_code == 200: data = response.json() attributes = data.get("response", {}).get("Attribute", []) all_results.extend(attributes) time.sleep(0.1) return all_results except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: self.stats['api_timeouts'] += 1 if retry_count < self.max_retries: wait_time = (retry_count + 1) * 2 logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") time.sleep(wait_time) return self.query_misp_single(value, attr, retry_count + 1) else: logger.error(f"Max retries exceeded for {value}: {e}") self.failed_indicators.add(value) return [] except Exception as e: logger.debug(f"Error querying MISP for {value}: {e}") return [] def mark_event(self, event, result, attr): """Mark event with MISP intelligence including event links.""" try: if attr.startswith("ip-"): msg = "MISP: Malicious IP" else: msg = "MISP: Known indicator" unique_events = {} orgs = set() for res in result: event_info = res.get("Event", {}) event_id = event_info.get("id") event_desc = event_info.get("info", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown") if event_id and event_id not in unique_events: unique_events[event_id] = { 'description': event_desc, 'url': f"{self.misp_url}/events/view/{event_id}" } if org_name != "Unknown": orgs.add(org_name) unique_event_list = list(unique_events.values()) if len(unique_event_list) == 1: event_data = unique_event_list[0] short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description'] msg += f" | Event: {short_desc} | Link: {event_data['url']}" elif len(unique_event_list) > 1: msg += f" | {len(unique_event_list)} Events:" for i, event_data in enumerate(unique_event_list[:2]): short_desc = event_data['description'][:40] + "..." if len(event_data['description']) > 40 else event_data['description'] msg += f" [{i+1}] {short_desc} ({event_data['url']})" if i < len(unique_event_list) - 1 and i < 1: msg += " |" if len(unique_event_list) > 2: msg += f" | +{len(unique_event_list)-2} more events" if self.include_community and orgs: if len(orgs) > 1: msg += f" | Sources: {', '.join(list(orgs)[:2])}" if len(orgs) > 2: msg += f" +{len(orgs)-2} more" else: msg += f" | Source: {list(orgs)[0]}" if len(result) > 1: msg += f" | {len(result)} total correlations" tags = [f"MISP-{attr}", "threat-intel"] if self.include_community: tags.append("community-intel") if len(unique_event_list) > 1: tags.append("multi-event-correlation") event.add_comment(msg) event.add_tags(tags) event.commit() self.stats['events_marked'] += 1 logger.info(f"Marked event with {len(unique_event_list)} unique MISP events") except Exception as e: logger.error(f"Error marking event: {e}") def extract_indicators_from_chunk(self, events_chunk, attr, timesketch_attr): """Extract indicators from a chunk of events.""" chunk_indicators = [] events_with_indicators = [] for event in events_chunk: self.stats['events_processed'] += 1 loc = event.source.get(timesketch_attr) if not loc: continue indicators = [] if attr.startswith("ip-") and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(str(loc)): indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(str(loc), attr): indicators = [str(loc)] elif attr == "filename": filename = ntpath.basename(str(loc)) if filename and len(filename) > 1: indicators = [filename] if indicators: events_with_indicators.append((event, indicators)) chunk_indicators.extend(indicators) return events_with_indicators, chunk_indicators def process_chunk(self, events_chunk, attr, timesketch_attr): """Process a chunk of events.""" events_with_indicators, chunk_indicators = self.extract_indicators_from_chunk( events_chunk, attr, timesketch_attr ) if not chunk_indicators: return unique_indicators = list(dict.fromkeys(chunk_indicators)) new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] if not new_indicators: self.check_existing_matches(events_with_indicators, attr) return logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") self.stats['indicators_found'] += len(new_indicators) for indicator in new_indicators: if indicator in self.failed_indicators: continue result = self.query_misp_single(indicator, attr) if result: self.result_dict[f"{attr}:{indicator}"] = result self.stats['total_correlations'] += len(result) unique_events = set() for res in result: event_id = res.get("Event", {}).get("id") if event_id: unique_events.add(event_id) if len(unique_events) > 1: self.stats['multi_event_correlations'] += 1 orgs = set() events = set() for res in result: org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") event_info = res.get("Event", {}).get("info", "Unknown")[:50] orgs.add(org) events.add(event_info) if len(orgs) > 1 or any(org not in ["Unknown", "Your Organization"] for org in orgs): self.stats['community_hits'] += 1 logger.info(f"Community MISP hit: {indicator} | {len(result)} correlations | " f"Events: {', '.join(list(events)[:2])} | Sources: {', '.join(list(orgs)[:3])}") else: self.stats['own_org_hits'] += 1 logger.info(f"Own org MISP hit: {indicator} | {len(result)} correlations | " f"Events: {', '.join(list(events)[:2])}") self.processed_indicators.add(indicator) time.sleep(self.request_delay) self.check_existing_matches(events_with_indicators, attr) def test_community_connectivity(self): """Test if community feeds are accessible.""" if not self.include_community: return "Community search disabled" try: test_payload = { "returnFormat": "json", "distribution": [1, 2, 3], "limit": 1, "enforceWarninglist": False, } response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=test_payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=30, ) if response.status_code == 200: data = response.json() attributes = data.get("response", {}).get("Attribute", []) if attributes: orgs = set() for attr in attributes[:5]: org = attr.get("Event", {}).get("Orgc", {}).get("name", "Unknown") orgs.add(org) return f"Community access OK - {len(attributes)} indicators from {len(orgs)} orgs: {', '.join(list(orgs)[:3])}" else: return "Community access OK but no community indicators found" else: return f"Community test failed: HTTP {response.status_code}" except Exception as e: return f"Community test error: {e}" def check_existing_matches(self, events_with_indicators, attr): """Check events against existing MISP results.""" for event, indicators in events_with_indicators: for indicator in indicators: key = f"{attr}:{indicator}" if key in self.result_dict and self.result_dict[key]: self.mark_event(event, self.result_dict[key], attr) break def query_misp(self, query, attr, timesketch_attr): """Process events in chunks.""" logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}") events_stream = self.event_stream( query_string=query, return_fields=[timesketch_attr, '_id'] ) current_chunk = [] try: for event in events_stream: current_chunk.append(event) if len(current_chunk) >= self.chunk_size: self.process_chunk(current_chunk, attr, timesketch_attr) current_chunk = [] if self.stats['events_processed'] % 10000 == 0: logger.info(f"Progress: {self.stats['events_processed']} events processed, " f"{self.stats['events_marked']} marked, " f"{self.stats['api_calls']} API calls, " f"{self.stats['api_timeouts']} timeouts") if current_chunk: self.process_chunk(current_chunk, attr, timesketch_attr) except Exception as e: logger.error(f"Error during chunk processing: {e}") raise if self.stats['events_marked'] > 0: view_name = "MISP Threat Intelligence" if self.include_community: view_name += " (Community)" self.sketch.add_view( view_name=view_name, analyzer_name=self.NAME, query_string='tag:"MISP-*" OR tag:"threat-intel"', ) correlation_count = sum(1 for key, results in self.result_dict.items() if results and len(results) > 1) if correlation_count > 0: self.sketch.add_view( view_name="MISP Multi-Event Correlations", analyzer_name=self.NAME, query_string='tag:"multi-event-correlation"', ) def run(self): """Entry point for the analyzer.""" if not self.misp_url or not self.misp_api_key: return "No MISP configuration found" start_time = time.time() if self.include_community: community_status = self.test_community_connectivity() logger.info(f"Community connectivity test: {community_status}") try: self.query_misp(self._query_string, self._attr, self._timesketch_attr) elapsed = time.time() - start_time success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / max(1, self.stats['api_calls']) * 100) result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ") if self.include_community: result += f"Community hits: {self.stats['community_hits']}, Own org: {self.stats['own_org_hits']} | " result += f"Total correlations: {self.stats['total_correlations']}" if self.stats['multi_event_correlations'] > 0: result += f", Multi-event: {self.stats['multi_event_correlations']}" result += f" | {elapsed:.0f}s" logger.info(result) return result except Exception as e: logger.error(f"MISP analyzer error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" manager.AnalysisManager.register_analyzer(MispIpAnalyzer)