From 59ea0dd6584a3c068bda171d520f819094c329ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20St=C3=B6ckl?= Date: Wed, 30 Jul 2025 13:13:01 +0000 Subject: [PATCH] misp_analyzer.py aktualisiert --- misp_analyzer.py | 575 +++++++++++++++++++++++++++-------------------- 1 file changed, 326 insertions(+), 249 deletions(-) diff --git a/misp_analyzer.py b/misp_analyzer.py index db51f3b..0a7aac1 100644 --- a/misp_analyzer.py +++ b/misp_analyzer.py @@ -1,27 +1,30 @@ -"""Index analyzer plugin for MISP - Enhanced for large-scale processing.""" +"""Index analyzer plugin for MISP""" import logging import ntpath import re import requests import time +import json from collections import defaultdict -from typing import List, Dict, Set, Any +from typing import List, Dict, Set, Any, Optional +from urllib3.exceptions import InsecureRequestWarning from flask import current_app from timesketch.lib.analyzers import interface from timesketch.lib.analyzers import manager +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) logger = logging.getLogger("timesketch.analyzers.misp") class MispAnalyzer(interface.BaseAnalyzer): - """Enhanced Analyzer for MISP with large-scale processing capabilities.""" + """Ultra-reliable MISP Analyzer for large-scale processing.""" NAME = "misp_analyzer" DISPLAY_NAME = "MISP Enhanced" - DESCRIPTION = "Mark events using MISP with cross-org and large-scale support" + DESCRIPTION = "Mark events using MISP with ultra-reliable large-scale support" def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): """Initialize the Analyzer.""" @@ -34,238 +37,274 @@ class MispAnalyzer(interface.BaseAnalyzer): self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") - # Enhanced configuration self.include_community = kwargs.get("include_community", True) - self.batch_size = kwargs.get("batch_size", 100) # Process events in batches - self.api_batch_size = kwargs.get("api_batch_size", 50) # API call batching - self.max_retries = kwargs.get("max_retries", 3) - self.request_timeout = kwargs.get("request_timeout", 120) # 2 minutes - self.chunk_size = kwargs.get("chunk_size", 1000) # Memory management + self.chunk_size = kwargs.get("chunk_size", 500) + self.max_retries = kwargs.get("max_retries", 5) + self.base_timeout = kwargs.get("base_timeout", 30) + self.max_timeout = kwargs.get("max_timeout", 180) + self.request_delay = kwargs.get("request_delay", 1.0) + self.max_indicators_per_batch = kwargs.get("max_indicators_per_batch", 10) - # Regex patterns self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') - # Track processed items to prevent duplicates + # Tracking sets self.marked_events = set() self.processed_indicators = set() + self.failed_indicators = set() - # Statistics self.stats = { 'events_processed': 0, 'indicators_extracted': 0, - 'api_calls_made': 0, + 'api_calls_successful': 0, + 'api_calls_failed': 0, 'events_marked': 0, - 'errors': 0 + 'total_matches': 0, + 'timeouts': 0, + 'retries': 0 } + + # Session for connection reuse + self.session = requests.Session() + self.session.verify = False + self.session.headers.update({ + "Authorization": self.misp_api_key, + "Content-Type": "application/json", + "User-Agent": "Timesketch-MISP-Analyzer/1.0" + }) @staticmethod def get_kwargs(): - """Get kwargs for the analyzer with enhanced options.""" + """Get kwargs for the analyzer with ultra-reliable settings.""" + base_config = { + "include_community": True, + "chunk_size": 500, + "max_retries": 5, + "base_timeout": 30, + "request_delay": 1.0, + "max_indicators_per_batch": 10, + } + to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, { "query_string": "message:*", "attr": "ip", "timesketch_attr": "message", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, { "query_string": "source_ip:* OR src_ip:* OR client_ip:*", "attr": "ip", "timesketch_attr": "source_ip", - "include_community": True, - "batch_size": 100, - "api_batch_size": 50, + **base_config }, ] return to_query def _is_valid_ip(self, ip_str: str) -> bool: - """Validate IP address with enhanced filtering.""" + """Enhanced IP validation for nginx logs.""" try: import ipaddress ip_str = ip_str.strip() + + # Basic format check first + if not re.match(r'^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$', ip_str): + return False + ip_obj = ipaddress.ip_address(ip_str) - # Filter out private, loopback, and other non-routable IPs + # Filter out non-routable addresses if (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_multicast or ip_obj.is_reserved or - ip_obj.is_link_local): + ip_obj.is_link_local or ip_obj.is_unspecified): return False - # Additional nginx log specific filters - if ip_str.startswith(('0.', '255.255.255.255', '169.254.')): + # Nginx-specific filters + if (ip_str.startswith(('0.', '10.', '172.', '192.168.', '127.', '169.254.', '224.')) or + ip_str in ['255.255.255.255', '0.0.0.0']): return False return True - except (ValueError, AttributeError): + except (ValueError, AttributeError, TypeError): return False def _is_valid_hash(self, hash_str: str, hash_type: str) -> bool: - """Validate hash format.""" - if not hash_str: + """Validate hash format with strict checking.""" + if not hash_str or not isinstance(hash_str, str): return False + hash_str = hash_str.strip().lower() + # Check for obvious non-hash patterns + if not hash_str or hash_str in ['null', 'none', '0', '-']: + return False + hash_lengths = {"md5": 32, "sha1": 40, "sha256": 64} expected_length = hash_lengths.get(hash_type) - if not expected_length: + if not expected_length or len(hash_str) != expected_length: return False - return (len(hash_str) == expected_length and - all(c in '0123456789abcdef' for c in hash_str)) + return all(c in '0123456789abcdef' for c in hash_str) - def _make_misp_request(self, payload: Dict[str, Any], retry_count: int = 0) -> List[Dict]: - """Make MISP API request with retry logic.""" + def _calculate_payload_size(self, payload: Dict[str, Any]) -> int: + """Calculate approximate payload size in bytes.""" try: - response = requests.post( - f"{self.misp_url}/attributes/restSearch/", - json=payload, - headers={"Authorization": self.misp_api_key}, - verify=False, - timeout=self.request_timeout, - ) - - if response.status_code == 200: - data = response.json() - return data.get("response", {}).get("Attribute", []) - elif response.status_code == 429: # Rate limited - wait_time = min(2 ** retry_count, 60) # Exponential backoff, max 60s - logger.warning(f"Rate limited, waiting {wait_time}s before retry") - time.sleep(wait_time) - raise requests.exceptions.RequestException("Rate limited") - else: - logger.warning(f"MISP API returned status {response.status_code}") - return [] - - except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: - if retry_count < self.max_retries: - wait_time = min(2 ** retry_count * 5, 120) # Exponential backoff - logger.warning(f"Request failed (attempt {retry_count + 1}), retrying in {wait_time}s: {e}") - time.sleep(wait_time) - return self._make_misp_request(payload, retry_count + 1) - else: - logger.error(f"Request failed after {self.max_retries} retries: {e}") - self.stats['errors'] += 1 - return [] - except Exception as e: - logger.error(f"Unexpected error in MISP request: {e}") - self.stats['errors'] += 1 - return [] + return len(json.dumps(payload).encode('utf-8')) + except: + return 0 - def query_misp_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]: - """Query MISP for multiple indicators efficiently.""" - results = defaultdict(list) + def _make_misp_request_single(self, indicator: str, attr_type: str, retry_count: int = 0) -> List[Dict]: + """Make single indicator MISP request with progressive timeout.""" + timeout = min(self.base_timeout + (retry_count * 10), self.max_timeout) - # Determine search types based on attribute - if attr == "ip": - search_types = ["ip-src", "ip-dst"] - else: - search_types = [attr] + # Determine search types + search_types = ["ip-src", "ip-dst"] if attr_type == "ip" else [attr_type] + results = [] for search_type in search_types: - # Batch indicators to reduce API calls - for i in range(0, len(indicators), self.api_batch_size): - batch = indicators[i:i + self.api_batch_size] - - # Build payload with distribution settings - distribution_levels = [0] # Own org + try: + # Build minimal payload + distribution_levels = [0] # Start with own org only if self.include_community: - distribution_levels.extend([1, 2]) # Community and connected orgs + distribution_levels.extend([1, 2]) payload = { "returnFormat": "json", - "value": batch, + "value": indicator, "type": search_type, "enforceWarninglist": False, "includeDecayScore": False, "includeFullModel": False, - "excludeDecayed": False, "distribution": distribution_levels, - "limit": 10000, # High limit for large datasets + "limit": 100, # Conservative limit } - self.stats['api_calls_made'] += 1 - logger.info(f"Querying MISP for {len(batch)} {search_type} indicators (call #{self.stats['api_calls_made']})") + payload_size = self._calculate_payload_size(payload) + logger.debug(f"Querying {indicator} ({search_type}) - payload: {payload_size} bytes, timeout: {timeout}s") - batch_results = self._make_misp_request(payload) + response = self.session.post( + f"{self.misp_url}/attributes/restSearch/", + json=payload, + timeout=timeout, + ) - # Group results by indicator value - for result in batch_results: - indicator_value = result.get("value", "").strip() - if indicator_value in batch: - results[indicator_value].append(result) + if response.status_code == 200: + data = response.json() + attributes = data.get("response", {}).get("Attribute", []) + results.extend(attributes) + self.stats['api_calls_successful'] += 1 + + elif response.status_code == 429: # Rate limited + wait_time = min(5 * (retry_count + 1), 30) + logger.warning(f"Rate limited for {indicator}, waiting {wait_time}s") + time.sleep(wait_time) + raise requests.exceptions.RequestException("Rate limited") + + elif response.status_code >= 500: # Server error + logger.warning(f"Server error {response.status_code} for {indicator}") + if retry_count < self.max_retries: + raise requests.exceptions.RequestException(f"Server error {response.status_code}") + + else: + logger.debug(f"No results for {indicator} ({search_type}): status {response.status_code}") - # Rate limiting courtesy pause - time.sleep(0.5) + # Delay between search types + time.sleep(0.2) + + except (requests.exceptions.Timeout, TimeoutError) as e: + self.stats['timeouts'] += 1 + if retry_count < self.max_retries: + wait_time = min(2 ** retry_count, 30) + logger.warning(f"Timeout for {indicator} (attempt {retry_count + 1}/{self.max_retries}), retrying in {wait_time}s") + time.sleep(wait_time) + self.stats['retries'] += 1 + return self._make_misp_request_single(indicator, attr_type, retry_count + 1) + else: + logger.error(f"Max retries exceeded for {indicator}: {e}") + self.stats['api_calls_failed'] += 1 + self.failed_indicators.add(indicator) + return [] + + except requests.exceptions.ConnectionError as e: + self.stats['api_calls_failed'] += 1 + if retry_count < self.max_retries: + wait_time = min(5 * (retry_count + 1), 60) + logger.warning(f"Connection error for {indicator} (attempt {retry_count + 1}), retrying in {wait_time}s: {e}") + time.sleep(wait_time) + self.stats['retries'] += 1 + return self._make_misp_request_single(indicator, attr_type, retry_count + 1) + else: + logger.error(f"Connection failed permanently for {indicator}: {e}") + self.failed_indicators.add(indicator) + return [] + + except Exception as e: + logger.error(f"Unexpected error querying {indicator}: {e}") + self.stats['api_calls_failed'] += 1 + return [] - return dict(results) + return results def extract_indicators_from_event(self, event: Any, attr: str, timesketch_attr: str) -> List[str]: - """Extract indicators from a single event.""" - loc = event.source.get(timesketch_attr) - if not loc: - return [] - - indicators = [] - loc_str = str(loc) - - if attr == "ip" and timesketch_attr == "message": - # Extract IPs from nginx access log messages - ip_matches = self.ip_pattern.findall(loc_str) - indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + """Extract and validate indicators from event.""" + try: + loc = event.source.get(timesketch_attr) + if not loc: + return [] - elif attr == "ip" and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: - if self._is_valid_ip(loc_str): - indicators = [loc_str] - - elif attr in ["md5", "sha1", "sha256"]: - if self._is_valid_hash(loc_str, attr): - indicators = [loc_str] - - elif attr == "filename": - filename = ntpath.basename(loc_str) - if filename and len(filename) > 3: # Meaningful filename - indicators = [filename] - - return indicators + indicators = [] + loc_str = str(loc) + + if attr == "ip": + if timesketch_attr == "message": + # Extract IPs from nginx access log messages + ip_matches = self.ip_pattern.findall(loc_str) + indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + elif timesketch_attr in ["source_ip", "src_ip", "client_ip"]: + if self._is_valid_ip(loc_str): + indicators = [loc_str] + + elif attr in ["md5", "sha1", "sha256"]: + if self._is_valid_hash(loc_str, attr): + indicators = [loc_str.lower().strip()] + + elif attr == "filename": + filename = ntpath.basename(loc_str).strip() + if filename and len(filename) > 3 and '.' in filename: + indicators = [filename] + + return indicators + + except Exception as e: + logger.debug(f"Error extracting indicators from event: {e}") + return [] - def mark_event_with_intel(self, event: Any, misp_results: List[Dict], attr: str) -> None: - """Mark event with MISP intelligence.""" + def mark_event_with_intelligence(self, event: Any, misp_results: List[Dict], attr: str) -> None: + """Mark event with MISP intelligence information.""" try: event_id = event.source.get('_id', '') if event_id in self.marked_events: @@ -273,67 +312,142 @@ class MispAnalyzer(interface.BaseAnalyzer): self.marked_events.add(event_id) - # Build comprehensive message + # Build intelligence message if attr == "ip": - msg = "MISP: Malicious IP detected" + msg_prefix = "MISP: Threat IP" + elif attr in ["md5", "sha1", "sha256"]: + msg_prefix = f"MISP: Malicious {attr.upper()}" else: - msg = f"MISP: Known {attr.upper()} indicator" + msg_prefix = f"MISP: Known {attr.upper()}" - # Collect event and organization info - events_info = {} + # Extract key information + events_info = [] orgs_info = set() - threat_levels = set() + threat_levels = [] - for misp_attr in misp_results: + for misp_attr in misp_results[:3]: # Limit to first 3 for message clarity event_info = misp_attr.get("Event", {}) - event_id_misp = event_info.get("id", "") - event_desc = event_info.get("info", "Unknown") + event_desc = event_info.get("info", "Unknown")[:40] # Truncate org_name = event_info.get("Orgc", {}).get("name", "Unknown") - threat_level = event_info.get("threat_level_id", "") + threat_level = event_info.get("threat_level_id") - events_info[event_id_misp] = event_desc[:50] # Truncate long descriptions - orgs_info.add(org_name) + if event_desc and event_desc != "Unknown": + events_info.append(event_desc) + if org_name and org_name != "Unknown": + orgs_info.add(org_name) if threat_level: - threat_levels.add(threat_level) + threat_levels.append(int(threat_level)) - # Enhanced message with threat context - event_descriptions = list(events_info.values())[:2] - if event_descriptions: - msg += f" | Events: {' | '.join(event_descriptions)}" + # Build comprehensive message + msg_parts = [msg_prefix] - if len(misp_results) > 2: - msg += f" | +{len(misp_results)-2} more indicators" + if events_info: + msg_parts.append(f"Events: {' | '.join(events_info[:2])}") + + if len(misp_results) > 3: + msg_parts.append(f"({len(misp_results)} total matches)") - # Organization information if len(orgs_info) > 1: - msg += f" | Sources: {', '.join(list(orgs_info)[:3])}" + msg_parts.append(f"Sources: {', '.join(list(orgs_info)[:2])}") elif orgs_info and list(orgs_info)[0] != "Unknown": - msg += f" | Source: {list(orgs_info)[0]}" + msg_parts.append(f"Source: {list(orgs_info)[0]}") - # Threat level context if threat_levels: - highest_threat = min(threat_levels) # Lower number = higher threat - threat_map = {"1": "HIGH", "2": "MEDIUM", "3": "LOW", "4": "UNDEFINED"} - msg += f" | Threat: {threat_map.get(str(highest_threat), 'UNKNOWN')}" + min_threat = min(threat_levels) # Lower = higher threat + threat_names = {1: "HIGH", 2: "MEDIUM", 3: "LOW", 4: "UNDEFINED"} + msg_parts.append(f"Threat: {threat_names.get(min_threat, 'UNKNOWN')}") - # Add tags and comment + final_message = " | ".join(msg_parts) + + # Add tags tags = [f"MISP-{attr}", "threat-intel"] if self.include_community and len(orgs_info) > 1: tags.append("cross-org-intel") - event.add_comment(msg) + event.add_comment(final_message) event.add_tags(tags) event.commit() self.stats['events_marked'] += 1 + self.stats['total_matches'] += len(misp_results) except Exception as e: - logger.error(f"Error marking event {event_id}: {e}") - self.stats['errors'] += 1 + logger.error(f"Error marking event: {e}") - def process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None: - """Process a chunk of events efficiently.""" - # Extract all indicators from the chunk + def process_indicators_batch(self, indicators: List[str], attr: str) -> Dict[str, List[Dict]]: + """Process indicators with careful rate limiting.""" + results = {} + + for i, indicator in enumerate(indicators): + if indicator in self.failed_indicators: + continue + + logger.debug(f"Processing indicator {i+1}/{len(indicators)}: {indicator}") + + misp_results = self._make_misp_request_single(indicator, attr) + + if misp_results: + results[indicator] = misp_results + logger.info(f"MISP hit: {indicator} ({len(misp_results)} matches)") + + # Rate limiting between requests + time.sleep(self.request_delay) + + # Progress update every 50 indicators + if (i + 1) % 50 == 0: + logger.info(f"Processed {i+1}/{len(indicators)} indicators, " + f"{len(results)} hits, " + f"{self.stats['timeouts']} timeouts, " + f"{self.stats['api_calls_failed']} failures") + + return results + + def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None: + """Main processing with ultra-reliable chunked approach.""" + logger.info(f"Starting ultra-reliable MISP analysis for {attr} in {timesketch_attr}") + logger.info(f"Configuration: chunk_size={self.chunk_size}, " + f"max_retries={self.max_retries}, " + f"request_delay={self.request_delay}s, " + f"include_community={self.include_community}") + + # Process events in chunks + events_stream = self.event_stream( + query_string=query, + return_fields=[timesketch_attr, '_id', 'timestamp'] + ) + + current_chunk = [] + + try: + for event in events_stream: + current_chunk.append(event) + self.stats['events_processed'] += 1 + + # Process when chunk is full + if len(current_chunk) >= self.chunk_size: + self._process_events_chunk(current_chunk, attr, timesketch_attr) + current_chunk = [] + + # Progress logging + if self.stats['events_processed'] % 5000 == 0: + success_rate = (self.stats['api_calls_successful'] / + max(1, self.stats['api_calls_successful'] + self.stats['api_calls_failed']) * 100) + logger.info(f"Progress: {self.stats['events_processed']} events, " + f"{self.stats['events_marked']} marked, " + f"{len(self.processed_indicators)} indicators processed, " + f"{success_rate:.1f}% API success rate") + + # Process final chunk + if current_chunk: + self._process_events_chunk(current_chunk, attr, timesketch_attr) + + except Exception as e: + logger.error(f"Critical error during processing: {e}") + raise + + def _process_events_chunk(self, events_chunk: List[Any], attr: str, timesketch_attr: str) -> None: + """Process a chunk of events with indicator extraction and MISP queries.""" + # Extract all unique indicators from chunk chunk_indicators = [] event_to_indicators = {} @@ -344,25 +458,26 @@ class MispAnalyzer(interface.BaseAnalyzer): event_to_indicators[event_id] = (event, indicators) chunk_indicators.extend(indicators) - # Remove duplicates while preserving order + # Get unique new indicators unique_indicators = list(dict.fromkeys(chunk_indicators)) - new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] + new_indicators = [ind for ind in unique_indicators + if ind not in self.processed_indicators and ind not in self.failed_indicators] if not new_indicators: return - logger.info(f"Processing {len(new_indicators)} new indicators from chunk of {len(events_chunk)} events") + logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") + self.stats['indicators_extracted'] += len(new_indicators) # Query MISP for new indicators - misp_results = self.query_misp_batch(new_indicators, attr) + misp_results = self.process_indicators_batch(new_indicators, attr) - # Update processed indicators and result cache + # Update cache self.processed_indicators.update(new_indicators) for indicator, results in misp_results.items(): - if results: - self.result_dict[f"{attr}:{indicator}"] = results + self.result_dict[f"{attr}:{indicator}"] = results - # Mark events that have matching indicators + # Mark matching events for event_id, (event, indicators) in event_to_indicators.items(): if event_id in self.marked_events: continue @@ -374,91 +489,53 @@ class MispAnalyzer(interface.BaseAnalyzer): matching_results.extend(self.result_dict[key]) if matching_results: - self.mark_event_with_intel(event, matching_results, attr) - - def query_misp(self, query: str, attr: str, timesketch_attr: str) -> None: - """Main processing function with chunked approach for large datasets.""" - logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") - logger.info(f"Community querying: {'enabled' if self.include_community else 'disabled'}") - - # Process events in chunks to manage memory - events_stream = self.event_stream( - query_string=query, - return_fields=[timesketch_attr, '_id', 'timestamp'] - ) - - current_chunk = [] - - try: - for event in events_stream: - current_chunk.append(event) - self.stats['events_processed'] += 1 - - # Process chunk when it reaches the specified size - if len(current_chunk) >= self.chunk_size: - self.process_events_chunk(current_chunk, attr, timesketch_attr) - current_chunk = [] - - # Progress logging - if self.stats['events_processed'] % 10000 == 0: - logger.info(f"Progress: {self.stats['events_processed']} events processed, " - f"{self.stats['events_marked']} marked, " - f"{self.stats['api_calls_made']} API calls made") - - # Process remaining events in the last chunk - if current_chunk: - self.process_events_chunk(current_chunk, attr, timesketch_attr) - - except Exception as e: - logger.error(f"Error during event processing: {e}") - self.stats['errors'] += 1 - - # Create comprehensive view if we found matches - if self.stats['events_marked'] > 0: - view_name = f"MISP Threat Intel - {attr.upper()}" - if self.include_community: - view_name += " (Cross-Org)" - - self.sketch.add_view( - view_name=view_name, - analyzer_name=self.NAME, - query_string=f'tag:"MISP-{attr}" OR tag:"threat-intel"', - ) + self.mark_event_with_intelligence(event, matching_results, attr) def run(self) -> str: - """Entry point for the analyzer with comprehensive error handling.""" + """Entry point with comprehensive error handling and reporting.""" if not self.misp_url or not self.misp_api_key: - return "Error: No MISP configuration found" + return "Error: MISP configuration missing" start_time = time.time() try: - logger.info(f"Starting MISP analyzer with config: " - f"batch_size={self.batch_size}, " - f"api_batch_size={self.api_batch_size}, " - f"chunk_size={self.chunk_size}, " - f"include_community={self.include_community}") - self.query_misp(self._query_string, self._attr, self._timesketch_attr) - elapsed_time = time.time() - start_time + # Create view for matches + if self.stats['events_marked'] > 0: + view_name = f"MISP {self._attr.upper()} Threats" + if self.include_community: + view_name += " (Cross-Org)" + + self.sketch.add_view( + view_name=view_name, + analyzer_name=self.NAME, + query_string=f'tag:"MISP-{self._attr}" OR tag:"threat-intel"', + ) - # Comprehensive results summary - result_msg = (f"[{self._timesketch_attr}] MISP Analysis Complete: " - f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " - f"{self.stats['api_calls_made']} API calls | " - f"{len(self.processed_indicators)} indicators processed | " - f"{elapsed_time:.1f}s") + # Comprehensive results + elapsed = time.time() - start_time + total_api_calls = self.stats['api_calls_successful'] + self.stats['api_calls_failed'] + success_rate = (self.stats['api_calls_successful'] / max(1, total_api_calls)) * 100 - if self.stats['errors'] > 0: - result_msg += f" | {self.stats['errors']} errors" + result = (f"[{self._timesketch_attr}] MISP Analysis: " + f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " + f"{len(self.processed_indicators)} indicators processed | " + f"{total_api_calls} API calls ({success_rate:.1f}% success) | " + f"{self.stats['timeouts']} timeouts | " + f"{elapsed:.0f}s") - logger.info(result_msg) - return result_msg + logger.info(result) + return result except Exception as e: - logger.error(f"MISP analyzer critical error: {e}") + logger.error(f"MISP analyzer failed: {e}") return f"[{self._timesketch_attr}] MISP Error: {str(e)}" + finally: + try: + self.session.close() + except: + pass manager.AnalysisManager.register_analyzer(MispAnalyzer) \ No newline at end of file