From dfa3a9fc537d4cb5bc67bc94e9b5c5a3605ed767 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Wed, 30 Jul 2025 21:39:11 +0200 Subject: [PATCH] progress --- misp_analyzer.py | 113 ++++++++++++++++------------------------------- 1 file changed, 39 insertions(+), 74 deletions(-) diff --git a/misp_analyzer.py b/misp_analyzer.py index 2703417..5777edc 100644 --- a/misp_analyzer.py +++ b/misp_analyzer.py @@ -32,19 +32,16 @@ class MispAnalyzer(interface.BaseAnalyzer): self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") - # Simple configuration for reliability - self.include_community = kwargs.get("include_community", False) # Default to false for reliability - self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks - self.max_retries = kwargs.get("max_retries", 2) # Minimal retries - self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests + self.include_community = kwargs.get("include_community", False) + self.chunk_size = kwargs.get("chunk_size", 1000) + self.max_retries = kwargs.get("max_retries", 2) + self.request_delay = kwargs.get("request_delay", 0.5) self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') - # Track processed items self.processed_indicators = set() self.failed_indicators = set() - # Simple stats self.stats = { 'events_processed': 0, 'indicators_found': 0, @@ -59,60 +56,47 @@ class MispAnalyzer(interface.BaseAnalyzer): @staticmethod def get_kwargs(): - """Get kwargs for the analyzer - keeping original working structure.""" + """Get kwargs for the analyzer.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", - "include_community": False, # Start with own org only + "include_community": True, }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", - "include_community": False, + "include_community": True, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", - "include_community": False, + "include_community": True, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", - "include_community": False, - }, - { - "query_string": "message:*", - "attr": "ip-src", - "timesketch_attr": "message", - "include_community": False, - }, - { - "query_string": "message:*", - "attr": "ip-dst", - "timesketch_attr": "message", - "include_community": False, + "include_community": True, }, { "query_string": "source_ip:*", "attr": "ip-src", "timesketch_attr": "source_ip", - "include_community": False, + "include_community": True, }, ] return to_query def _is_valid_ip(self, ip_str): - """Simple IP validation - keeping original working version.""" + """Simple IP validation.""" try: import ipaddress ip_str = ip_str.strip() ipaddress.ip_address(ip_str) - # Filter out invalid ranges if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')): return False return True @@ -120,7 +104,7 @@ class MispAnalyzer(interface.BaseAnalyzer): return False def _is_valid_hash(self, hash_str, hash_type): - """Simple hash validation - keeping original working version.""" + """Simple hash validation.""" if not hash_str: return False hash_str = hash_str.strip().lower() @@ -135,33 +119,31 @@ class MispAnalyzer(interface.BaseAnalyzer): return False def query_misp_single(self, value, attr, retry_count=0): - """Query MISP for a single value - enhanced with community search.""" + """Query MISP for a single value.""" if value in self.failed_indicators: return [] try: - # Build enhanced payload for community search payload = { "returnFormat": "json", "value": value, "type": attr, - "enforceWarninglist": False, # Don't filter known-good indicators - "includeEventTags": True, # Include event tags for context - "includeContext": True, # Include context information + "enforceWarninglist": False, + "includeEventTags": True, + "includeContext": True, } - # Enhanced community search - include ALL distribution levels if self.include_community: payload.update({ - "distribution": [0, 1, 2, 3, 5], # Own, community, connected, all, inherit - "includeEventUuid": True, # Include event UUIDs - "includeCorrelations": True, # Include correlations - "includeDecayScore": False, # Skip decay for speed - "includeFullModel": False, # Skip full model for speed + "distribution": [0, 1, 2, 3, 5], + "includeEventUuid": True, + "includeCorrelations": True, + "includeDecayScore": False, + "includeFullModel": False, }) logger.debug(f"Community search enabled for {value} ({attr})") else: - payload["distribution"] = [0] # Own org only + payload["distribution"] = [0] logger.debug(f"Own org search only for {value} ({attr})") self.stats['api_calls'] += 1 @@ -181,7 +163,6 @@ class MispAnalyzer(interface.BaseAnalyzer): data = response.json() attributes = data.get("response", {}).get("Attribute", []) - # Log community sources for debugging if attributes and self.include_community: orgs = set() for attr_data in attributes: @@ -196,7 +177,7 @@ class MispAnalyzer(interface.BaseAnalyzer): self.stats['api_timeouts'] += 1 if retry_count < self.max_retries: - wait_time = (retry_count + 1) * 2 # Simple backoff: 2s, 4s + wait_time = (retry_count + 1) * 2 logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") time.sleep(wait_time) return self.query_misp_single(value, attr, retry_count + 1) @@ -210,17 +191,17 @@ class MispAnalyzer(interface.BaseAnalyzer): return [] def mark_event(self, event, result, attr): - """Mark event with MISP intelligence - enhanced with correlating events.""" + """Mark event with MISP intelligence including event links.""" try: if attr.startswith("ip-"): msg = "MISP: Malicious IP" else: msg = "MISP: Known indicator" - # Collect unique correlating events and organizations correlating_events = [] orgs = set() event_ids = set() + event_links = [] for res in result: event_info = res.get("Event", {}) @@ -228,28 +209,24 @@ class MispAnalyzer(interface.BaseAnalyzer): event_desc = event_info.get("info", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown") - # Avoid duplicate events (same ID) if event_id and event_id not in event_ids: event_ids.add(event_id) - # Truncate long descriptions short_desc = event_desc[:60] + "..." if len(event_desc) > 60 else event_desc correlating_events.append(short_desc) + event_links.append(f"{self.misp_url}/events/view/{event_id}") if org_name != "Unknown": orgs.add(org_name) - # Build comprehensive message with correlating events if len(correlating_events) == 1: msg += f" | Event: {correlating_events[0]}" elif len(correlating_events) > 1: - # Show first 2 correlating events, then count msg += f" | Events: {correlating_events[0]}" if len(correlating_events) > 1: msg += f" + {correlating_events[1]}" if len(correlating_events) > 2: msg += f" + {len(correlating_events)-2} more" - # Add organization information for community awareness if self.include_community and orgs: if len(orgs) > 1: msg += f" | Sources: {', '.join(list(orgs)[:2])}" @@ -258,11 +235,19 @@ class MispAnalyzer(interface.BaseAnalyzer): else: msg += f" | Source: {list(orgs)[0]}" - # Add correlation count for context if len(result) > 1: msg += f" | {len(result)} correlations" - # Add appropriate tags including correlation info + if event_links: + if len(event_links) == 1: + msg += f" | MISP Event: {event_links[0]}" + else: + msg += f" | MISP Events: {event_links[0]}" + if len(event_links) > 1: + msg += f", {event_links[1]}" + if len(event_links) > 2: + msg += f" +{len(event_links)-2} more" + tags = [f"MISP-{attr}", "threat-intel"] if self.include_community: tags.append("community-intel") @@ -275,7 +260,6 @@ class MispAnalyzer(interface.BaseAnalyzer): self.stats['events_marked'] += 1 - # Log detailed correlation info for debugging logger.info(f"Marked event with {len(correlating_events)} correlating MISP events: {', '.join(correlating_events[:3])}") except Exception as e: @@ -295,7 +279,6 @@ class MispAnalyzer(interface.BaseAnalyzer): indicators = [] - # Extract based on attribute type - SAME AS ORIGINAL if attr.startswith("ip-") and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] @@ -313,7 +296,6 @@ class MispAnalyzer(interface.BaseAnalyzer): if filename and len(filename) > 1: indicators = [filename] - # Store event with its indicators if indicators: events_with_indicators.append((event, indicators)) chunk_indicators.extend(indicators) @@ -329,19 +311,16 @@ class MispAnalyzer(interface.BaseAnalyzer): if not chunk_indicators: return - # Get unique new indicators unique_indicators = list(dict.fromkeys(chunk_indicators)) new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] if not new_indicators: - # Still need to check existing cache for matches self.check_existing_matches(events_with_indicators, attr) return logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") self.stats['indicators_found'] += len(new_indicators) - # Query MISP for each new indicator for indicator in new_indicators: if indicator in self.failed_indicators: continue @@ -350,7 +329,6 @@ class MispAnalyzer(interface.BaseAnalyzer): if result: self.result_dict[f"{attr}:{indicator}"] = result - # Track correlation statistics self.stats['total_correlations'] += len(result) unique_events = set() for res in result: @@ -361,7 +339,6 @@ class MispAnalyzer(interface.BaseAnalyzer): if len(unique_events) > 1: self.stats['multi_event_correlations'] += 1 - # Track community vs own org hits with correlation details orgs = set() events = set() for res in result: @@ -381,10 +358,8 @@ class MispAnalyzer(interface.BaseAnalyzer): self.processed_indicators.add(indicator) - # Small delay to be nice to MISP server time.sleep(self.request_delay) - # Mark events that have matches self.check_existing_matches(events_with_indicators, attr) def test_community_connectivity(self): @@ -393,10 +368,9 @@ class MispAnalyzer(interface.BaseAnalyzer): return "Community search disabled" try: - # Test with a known community indicator (if available) test_payload = { "returnFormat": "json", - "distribution": [1, 2, 3], # Community levels only + "distribution": [1, 2, 3], "limit": 1, "enforceWarninglist": False, } @@ -429,19 +403,17 @@ class MispAnalyzer(interface.BaseAnalyzer): def check_existing_matches(self, events_with_indicators, attr): """Check events against existing MISP results.""" for event, indicators in events_with_indicators: - # Check if any indicator has MISP match for indicator in indicators: key = f"{attr}:{indicator}" if key in self.result_dict and self.result_dict[key]: self.mark_event(event, self.result_dict[key], attr) - break # Only mark once per event + break def query_misp(self, query, attr, timesketch_attr): - """Process events in chunks - enhanced for large datasets.""" + """Process events in chunks.""" logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}") - # Get event stream events_stream = self.event_stream( query_string=query, return_fields=[timesketch_attr, '_id'] @@ -453,19 +425,16 @@ class MispAnalyzer(interface.BaseAnalyzer): for event in events_stream: current_chunk.append(event) - # Process when chunk is full if len(current_chunk) >= self.chunk_size: self.process_chunk(current_chunk, attr, timesketch_attr) current_chunk = [] - # Progress logging if self.stats['events_processed'] % 10000 == 0: logger.info(f"Progress: {self.stats['events_processed']} events processed, " f"{self.stats['events_marked']} marked, " f"{self.stats['api_calls']} API calls, " f"{self.stats['api_timeouts']} timeouts") - # Process final chunk if current_chunk: self.process_chunk(current_chunk, attr, timesketch_attr) @@ -473,7 +442,6 @@ class MispAnalyzer(interface.BaseAnalyzer): logger.error(f"Error during chunk processing: {e}") raise - # Create view if we found matches if self.stats['events_marked'] > 0: view_name = "MISP Threat Intelligence" if self.include_community: @@ -485,7 +453,6 @@ class MispAnalyzer(interface.BaseAnalyzer): query_string='tag:"MISP-*" OR tag:"threat-intel"', ) - # Create additional view for multi-event correlations correlation_count = sum(1 for key, results in self.result_dict.items() if results and len(results) > 1) if correlation_count > 0: @@ -502,7 +469,6 @@ class MispAnalyzer(interface.BaseAnalyzer): start_time = time.time() - # Test community connectivity if enabled if self.include_community: community_status = self.test_community_connectivity() logger.info(f"Community connectivity test: {community_status}") @@ -514,7 +480,6 @@ class MispAnalyzer(interface.BaseAnalyzer): success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / max(1, self.stats['api_calls']) * 100) - # Enhanced results with community and correlation statistics result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ")