"""Index analyzer plugin for MISP - Simple and reliable for large-scale processing.""" import logging import ntpath import re import requests import time from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): """Simple, reliable MISP Analyzer for large-scale processing.""" NAME = "misp_analyzer" DISPLAY_NAME = "MISP" DESCRIPTION = "Mark events using MISP - Simple and Reliable" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" super().__init__(index_name, sketch_id, timeline_id=timeline_id) self.misp_url = current_app.config.get("MISP_URL") self.misp_api_key = current_app.config.get("MISP_API_KEY") self.total_event_counter = 0 self.result_dict = {} self._query_string = kwargs.get("query_string") self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") self.include_community = kwargs.get("include_community", False) self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks self.max_retries = kwargs.get("max_retries", 2) # Minimal retries self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') # Track processed items self.processed_indicators = set() self.failed_indicators = set() # Simple stats self.stats = { 'events_processed': 0, 'indicators_found': 0, 'api_calls': 0, 'api_timeouts': 0, 'events_marked': 0, 'community_hits': 0, 'own_org_hits': 0 } @staticmethod def get_kwargs(): to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", "include_community": True, }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", "include_community": True, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", "include_community": True, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", "include_community": True, }, { "query_string": "source_ip:*", "attr": "ip-src", "timesketch_attr": "source_ip", "include_community": True, }, ] return to_query def _is_valid_ip(self, ip_str): try: import ipaddress ip_str = ip_str.strip() ipaddress.ip_address(ip_str) # Filter out invalid ranges if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')): return False return True except (ValueError, AttributeError): return False def _is_valid_hash(self, hash_str, hash_type): """Simple hash validation - keeping original working version.""" if not hash_str: return False hash_str = hash_str.strip().lower() if hash_type == "md5": return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha1": return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) elif hash_type == "sha256": return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) return False def query_misp_single(self, value, attr, retry_count=0): """Query MISP for a single value - enhanced with community search.""" if value in self.failed_indicators: return [] try: # Build enhanced payload for community search payload = { "returnFormat": "json", "value": value, "type": attr, "enforceWarninglist": False, # Don't filter known-good indicators "includeEventTags": True, # Include event tags for context "includeContext": True, # Include context information } # community search - include ALL distribution levels if self.include_community: payload.update({ "distribution": [0, 1, 2, 3, 5], # Own, community, connected, all, inherit "includeEventUuid": True, # Include event UUIDs "includeCorrelations": True, # Include correlations "includeDecayScore": False, # Skip decay for speed "includeFullModel": False, # Skip full model for speed }) logger.debug(f"Community search enabled for {value} ({attr})") else: payload["distribution"] = [0] # Own org only logger.debug(f"Own org search only for {value} ({attr})") self.stats['api_calls'] += 1 response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=45, ) if response.status_code != 200: logger.debug(f"MISP API returned status {response.status_code} for {value}") return [] data = response.json() attributes = data.get("response", {}).get("Attribute", []) # Log community sources for debugging if attributes and self.include_community: orgs = set() for attr_data in attributes: org = attr_data.get("Event", {}).get("Orgc", {}).get("name", "Unknown") orgs.add(org) if len(orgs) > 1 or (orgs and list(orgs)[0] not in ["Unknown", "Your Org"]): logger.info(f"Community hit for {value}: {len(attributes)} matches from {', '.join(list(orgs)[:3])}") return attributes except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: self.stats['api_timeouts'] += 1 if retry_count < self.max_retries: wait_time = (retry_count + 1) * 2 # backoff: 2s, 4s logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") time.sleep(wait_time) return self.query_misp_single(value, attr, retry_count + 1) else: logger.error(f"Max retries exceeded for {value}: {e}") self.failed_indicators.add(value) return [] except Exception as e: logger.debug(f"Error querying MISP for {value}: {e}") return [] def mark_event(self, event, result, attr): """Mark event with MISP intelligence - enhanced with community info.""" try: if attr.startswith("ip-"): msg = "MISP: Malicious IP" else: msg = "MISP: Known indicator" # Extract event and organization information event_info = result[0].get("Event", {}).get("info", "Unknown") org_info = result[0].get("Event", {}).get("Orgc", {}).get("name", "Unknown") msg += f" - {event_info}" if len(result) > 1: msg += f" (+{len(result)-1} more)" # Add organization information for community awareness if self.include_community and org_info != "Unknown": # Collect unique organizations orgs = set() for res in result[:3]: org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") if org != "Unknown": orgs.add(org) if len(orgs) > 1: msg += f" | Sources: {', '.join(list(orgs)[:2])}" if len(orgs) > 2: msg += f" +{len(orgs)-2} more" elif orgs: msg += f" | Source: {list(orgs)[0]}" tags = [f"MISP-{attr}", "threat-intel"] if self.include_community: tags.append("community-intel") event.add_comment(msg) event.add_tags(tags) event.commit() self.stats['events_marked'] += 1 except Exception as e: logger.error(f"Error marking event: {e}") def extract_indicators_from_chunk(self, events_chunk, attr, timesketch_attr): """Extract indicators from a chunk of events.""" chunk_indicators = [] events_with_indicators = [] for event in events_chunk: self.stats['events_processed'] += 1 loc = event.source.get(timesketch_attr) if not loc: continue indicators = [] # Extract based on attribute type if attr.startswith("ip-") and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: if self._is_valid_ip(str(loc)): indicators = [str(loc)] elif attr in ["md5", "sha1", "sha256"]: if self._is_valid_hash(str(loc), attr): indicators = [str(loc)] elif attr == "filename": filename = ntpath.basename(str(loc)) if filename and len(filename) > 1: indicators = [filename] # Store event with its indicators if indicators: events_with_indicators.append((event, indicators)) chunk_indicators.extend(indicators) return events_with_indicators, chunk_indicators def process_chunk(self, events_chunk, attr, timesketch_attr): """Process a chunk of events.""" events_with_indicators, chunk_indicators = self.extract_indicators_from_chunk( events_chunk, attr, timesketch_attr ) if not chunk_indicators: return # Get unique new indicators unique_indicators = list(dict.fromkeys(chunk_indicators)) new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] if not new_indicators: # Still need to check existing cache for matches self.check_existing_matches(events_with_indicators, attr) return logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") self.stats['indicators_found'] += len(new_indicators) # Query MISP for each new indicator for indicator in new_indicators: if indicator in self.failed_indicators: continue result = self.query_misp_single(indicator, attr) if result: self.result_dict[f"{attr}:{indicator}"] = result # Track community vs own org hits orgs = set() for res in result: org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") orgs.add(org) if len(orgs) > 1 or any(org not in ["Unknown", "Your Organization"] for org in orgs): self.stats['community_hits'] += 1 logger.info(f"Community MISP hit: {indicator} ({len(result)} matches from {', '.join(list(orgs)[:3])})") else: self.stats['own_org_hits'] += 1 logger.info(f"Own org MISP hit: {indicator} ({len(result)} matches)") self.processed_indicators.add(indicator) time.sleep(self.request_delay) # Mark events that have matches self.check_existing_matches(events_with_indicators, attr) def test_community_connectivity(self): """Test if community feeds are accessible.""" if not self.include_community: return "Community search disabled" try: test_payload = { "returnFormat": "json", "distribution": [1, 2, 3], "limit": 1, "enforceWarninglist": False, } response = requests.post( f"{self.misp_url}/attributes/restSearch/", json=test_payload, headers={"Authorization": self.misp_api_key}, verify=False, timeout=30, ) if response.status_code == 200: data = response.json() attributes = data.get("response", {}).get("Attribute", []) if attributes: orgs = set() for attr in attributes[:5]: org = attr.get("Event", {}).get("Orgc", {}).get("name", "Unknown") orgs.add(org) return f"Community access OK - {len(attributes)} indicators from {len(orgs)} orgs: {', '.join(list(orgs)[:3])}" else: return "Community access OK but no community indicators found" else: return f"Community test failed: HTTP {response.status_code}" except Exception as e: return f"Community test error: {e}" def check_existing_matches(self, events_with_indicators, attr): """Check events against existing MISP results.""" for event, indicators in events_with_indicators: # Check if any indicator has MISP match for indicator in indicators: key = f"{attr}:{indicator}" if key in self.result_dict and self.result_dict[key]: self.mark_event(event, self.result_dict[key], attr) break # Only mark once per event def query_misp(self, query, attr, timesketch_attr): """Process events in chunks - enhanced for large datasets.""" logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}") # Get event stream events_stream = self.event_stream( query_string=query, return_fields=[timesketch_attr, '_id'] ) current_chunk = [] try: for event in events_stream: current_chunk.append(event) # Process when chunk is full if len(current_chunk) >= self.chunk_size: self.process_chunk(current_chunk, attr, timesketch_attr) current_chunk = [] # Progress logging if self.stats['events_processed'] % 10000 == 0: logger.info(f"Progress: {self.stats['events_processed']} events processed, " f"{self.stats['events_marked']} marked, " f"{self.stats['api_calls']} API calls, " f"{self.stats['api_timeouts']} timeouts") # Process final chunk if current_chunk: self.process_chunk(current_chunk, attr, timesketch_attr) except Exception as e: logger.error(f"Error during chunk processing: {e}") raise # Create view if we found matches if self.stats['events_marked'] > 0: view_name = "MISP Threat Intelligence" if self.include_community: view_name += " (Community)" self.sketch.add_view( view_name=view_name, analyzer_name=self.NAME, query_string='tag:"MISP-*" OR tag:"threat-intel"', ) def run(self): """Entry point for the analyzer.""" if not self.misp_url or not self.misp_api_key: return "No MISP configuration found" start_time = time.time() # Test community connectivity if enabled if self.include_community: community_status = self.test_community_connectivity() logger.info(f"Community connectivity test: {community_status}") try: self.query_misp(self._query_string, self._attr, self._timesketch_attr) elapsed = time.time() - start_time success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / max(1, self.stats['api_calls']) * 100) # Enhanced results with community statistics result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ") if self.include_community: result += f"Community hits: {self.stats['community_hits']}, Own org: {self.stats['own_org_hits']} | " result += f"{elapsed:.0f}s" logger.info(result) return result except Exception as e: logger.error(f"MISP analyzer error: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" manager.AnalysisManager.register_analyzer(MispAnalyzer)