From f342760ff01f7dde4e0c97736fc4753b54612429 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Wed, 30 Jul 2025 21:34:02 +0200 Subject: [PATCH] progress --- misp_analyzer.py | 139 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 107 insertions(+), 32 deletions(-) diff --git a/misp_analyzer.py b/misp_analyzer.py index 9252dce..2703417 100644 --- a/misp_analyzer.py +++ b/misp_analyzer.py @@ -32,7 +32,8 @@ class MispAnalyzer(interface.BaseAnalyzer): self._attr = kwargs.get("attr") self._timesketch_attr = kwargs.get("timesketch_attr") - self.include_community = kwargs.get("include_community", False) + # Simple configuration for reliability + self.include_community = kwargs.get("include_community", False) # Default to false for reliability self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks self.max_retries = kwargs.get("max_retries", 2) # Minimal retries self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests @@ -51,46 +52,62 @@ class MispAnalyzer(interface.BaseAnalyzer): 'api_timeouts': 0, 'events_marked': 0, 'community_hits': 0, - 'own_org_hits': 0 + 'own_org_hits': 0, + 'total_correlations': 0, + 'multi_event_correlations': 0 } @staticmethod def get_kwargs(): + """Get kwargs for the analyzer - keeping original working structure.""" to_query = [ { "query_string": "md5_hash:*", "attr": "md5", "timesketch_attr": "md5_hash", - "include_community": True, + "include_community": False, # Start with own org only }, { "query_string": "sha1_hash:*", "attr": "sha1", "timesketch_attr": "sha1_hash", - "include_community": True, + "include_community": False, }, { "query_string": "sha256_hash:*", "attr": "sha256", "timesketch_attr": "sha256_hash", - "include_community": True, + "include_community": False, }, { "query_string": "filename:*", "attr": "filename", "timesketch_attr": "filename", - "include_community": True, + "include_community": False, + }, + { + "query_string": "message:*", + "attr": "ip-src", + "timesketch_attr": "message", + "include_community": False, + }, + { + "query_string": "message:*", + "attr": "ip-dst", + "timesketch_attr": "message", + "include_community": False, }, { "query_string": "source_ip:*", "attr": "ip-src", "timesketch_attr": "source_ip", - "include_community": True, + "include_community": False, }, ] return to_query def _is_valid_ip(self, ip_str): + """Simple IP validation - keeping original working version.""" try: import ipaddress ip_str = ip_str.strip() @@ -133,7 +150,7 @@ class MispAnalyzer(interface.BaseAnalyzer): "includeContext": True, # Include context information } - # community search - include ALL distribution levels + # Enhanced community search - include ALL distribution levels if self.include_community: payload.update({ "distribution": [0, 1, 2, 3, 5], # Own, community, connected, all, inherit @@ -179,7 +196,7 @@ class MispAnalyzer(interface.BaseAnalyzer): self.stats['api_timeouts'] += 1 if retry_count < self.max_retries: - wait_time = (retry_count + 1) * 2 # backoff: 2s, 4s + wait_time = (retry_count + 1) * 2 # Simple backoff: 2s, 4s logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") time.sleep(wait_time) return self.query_misp_single(value, attr, retry_count + 1) @@ -193,41 +210,64 @@ class MispAnalyzer(interface.BaseAnalyzer): return [] def mark_event(self, event, result, attr): - """Mark event with MISP intelligence - enhanced with community info.""" + """Mark event with MISP intelligence - enhanced with correlating events.""" try: if attr.startswith("ip-"): msg = "MISP: Malicious IP" else: msg = "MISP: Known indicator" - # Extract event and organization information - event_info = result[0].get("Event", {}).get("info", "Unknown") - org_info = result[0].get("Event", {}).get("Orgc", {}).get("name", "Unknown") + # Collect unique correlating events and organizations + correlating_events = [] + orgs = set() + event_ids = set() - msg += f" - {event_info}" + for res in result: + event_info = res.get("Event", {}) + event_id = event_info.get("id") + event_desc = event_info.get("info", "Unknown") + org_name = event_info.get("Orgc", {}).get("name", "Unknown") + + # Avoid duplicate events (same ID) + if event_id and event_id not in event_ids: + event_ids.add(event_id) + # Truncate long descriptions + short_desc = event_desc[:60] + "..." if len(event_desc) > 60 else event_desc + correlating_events.append(short_desc) + + if org_name != "Unknown": + orgs.add(org_name) - if len(result) > 1: - msg += f" (+{len(result)-1} more)" + # Build comprehensive message with correlating events + if len(correlating_events) == 1: + msg += f" | Event: {correlating_events[0]}" + elif len(correlating_events) > 1: + # Show first 2 correlating events, then count + msg += f" | Events: {correlating_events[0]}" + if len(correlating_events) > 1: + msg += f" + {correlating_events[1]}" + if len(correlating_events) > 2: + msg += f" + {len(correlating_events)-2} more" # Add organization information for community awareness - if self.include_community and org_info != "Unknown": - # Collect unique organizations - orgs = set() - for res in result[:3]: - org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") - if org != "Unknown": - orgs.add(org) - + if self.include_community and orgs: if len(orgs) > 1: msg += f" | Sources: {', '.join(list(orgs)[:2])}" if len(orgs) > 2: msg += f" +{len(orgs)-2} more" - elif orgs: + else: msg += f" | Source: {list(orgs)[0]}" + + # Add correlation count for context + if len(result) > 1: + msg += f" | {len(result)} correlations" + # Add appropriate tags including correlation info tags = [f"MISP-{attr}", "threat-intel"] if self.include_community: tags.append("community-intel") + if len(correlating_events) > 1: + tags.append("multi-event-correlation") event.add_comment(msg) event.add_tags(tags) @@ -235,6 +275,9 @@ class MispAnalyzer(interface.BaseAnalyzer): self.stats['events_marked'] += 1 + # Log detailed correlation info for debugging + logger.info(f"Marked event with {len(correlating_events)} correlating MISP events: {', '.join(correlating_events[:3])}") + except Exception as e: logger.error(f"Error marking event: {e}") @@ -252,7 +295,7 @@ class MispAnalyzer(interface.BaseAnalyzer): indicators = [] - # Extract based on attribute type + # Extract based on attribute type - SAME AS ORIGINAL if attr.startswith("ip-") and timesketch_attr == "message": ip_matches = self.ip_pattern.findall(str(loc)) indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] @@ -307,21 +350,38 @@ class MispAnalyzer(interface.BaseAnalyzer): if result: self.result_dict[f"{attr}:{indicator}"] = result - # Track community vs own org hits + # Track correlation statistics + self.stats['total_correlations'] += len(result) + unique_events = set() + for res in result: + event_id = res.get("Event", {}).get("id") + if event_id: + unique_events.add(event_id) + + if len(unique_events) > 1: + self.stats['multi_event_correlations'] += 1 + + # Track community vs own org hits with correlation details orgs = set() + events = set() for res in result: org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") + event_info = res.get("Event", {}).get("info", "Unknown")[:50] orgs.add(org) + events.add(event_info) if len(orgs) > 1 or any(org not in ["Unknown", "Your Organization"] for org in orgs): self.stats['community_hits'] += 1 - logger.info(f"Community MISP hit: {indicator} ({len(result)} matches from {', '.join(list(orgs)[:3])})") + logger.info(f"Community MISP hit: {indicator} | {len(result)} correlations | " + f"Events: {', '.join(list(events)[:2])} | Sources: {', '.join(list(orgs)[:3])}") else: self.stats['own_org_hits'] += 1 - logger.info(f"Own org MISP hit: {indicator} ({len(result)} matches)") + logger.info(f"Own org MISP hit: {indicator} | {len(result)} correlations | " + f"Events: {', '.join(list(events)[:2])}") self.processed_indicators.add(indicator) + # Small delay to be nice to MISP server time.sleep(self.request_delay) # Mark events that have matches @@ -333,9 +393,10 @@ class MispAnalyzer(interface.BaseAnalyzer): return "Community search disabled" try: + # Test with a known community indicator (if available) test_payload = { "returnFormat": "json", - "distribution": [1, 2, 3], + "distribution": [1, 2, 3], # Community levels only "limit": 1, "enforceWarninglist": False, } @@ -423,6 +484,16 @@ class MispAnalyzer(interface.BaseAnalyzer): analyzer_name=self.NAME, query_string='tag:"MISP-*" OR tag:"threat-intel"', ) + + # Create additional view for multi-event correlations + correlation_count = sum(1 for key, results in self.result_dict.items() + if results and len(results) > 1) + if correlation_count > 0: + self.sketch.add_view( + view_name="MISP Multi-Event Correlations", + analyzer_name=self.NAME, + query_string='tag:"multi-event-correlation"', + ) def run(self): """Entry point for the analyzer.""" @@ -443,7 +514,7 @@ class MispAnalyzer(interface.BaseAnalyzer): success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / max(1, self.stats['api_calls']) * 100) - # Enhanced results with community statistics + # Enhanced results with community and correlation statistics result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ") @@ -451,7 +522,11 @@ class MispAnalyzer(interface.BaseAnalyzer): if self.include_community: result += f"Community hits: {self.stats['community_hits']}, Own org: {self.stats['own_org_hits']} | " - result += f"{elapsed:.0f}s" + result += f"Total correlations: {self.stats['total_correlations']}" + if self.stats['multi_event_correlations'] > 0: + result += f", Multi-event: {self.stats['multi_event_correlations']}" + + result += f" | {elapsed:.0f}s" logger.info(result) return result