From 40cd61918096eb57bc270873c13b0a0840e4b080 Mon Sep 17 00:00:00 2001 From: overcuriousity Date: Wed, 30 Jul 2025 22:01:21 +0200 Subject: [PATCH] commit --- misp_analyzer.py | 19 +++---- testscript.py | 126 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 11 deletions(-) create mode 100644 testscript.py diff --git a/misp_analyzer.py b/misp_analyzer.py index 090197c..163b1f6 100644 --- a/misp_analyzer.py +++ b/misp_analyzer.py @@ -124,7 +124,6 @@ class MispAnalyzer(interface.BaseAnalyzer): return [] try: - # For IP searches, query both ip-src and ip-dst search_types = [] if attr.startswith("ip-"): search_types = ["ip-src", "ip-dst"] @@ -134,6 +133,7 @@ class MispAnalyzer(interface.BaseAnalyzer): all_results = [] for search_type in search_types: + payload = { "returnFormat": "json", "value": value, @@ -169,20 +169,17 @@ class MispAnalyzer(interface.BaseAnalyzer): if response.status_code == 200: data = response.json() attributes = data.get("response", {}).get("Attribute", []) + for i, attr_data in enumerate(attributes): + event_data = attr_data.get("Event", {}) all_results.extend(attributes) - else: - logger.debug(f"MISP API returned status {response.status_code} for {value} ({search_type})") - # Small delay between search types time.sleep(0.1) - if all_results and self.include_community: - orgs = set() - for attr_data in all_results: - org = attr_data.get("Event", {}).get("Orgc", {}).get("name", "Unknown") - orgs.add(org) - if len(orgs) > 1 or (orgs and list(orgs)[0] not in ["Unknown", "Your Org"]): - logger.info(f"Community hit for {value}: {len(all_results)} matches from {', '.join(list(orgs)[:3])}") + unique_event_ids = set() + for attr_data in all_results: + event_id = attr_data.get("Event", {}).get("id") + if event_id: + unique_event_ids.add(event_id) return all_results diff --git a/testscript.py b/testscript.py new file mode 100644 index 0000000..00c8b1a --- /dev/null +++ b/testscript.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +""" +MISP Debug Test Script +Run this standalone to test exactly what the script sees from MISP API +""" + +import requests +import json + +# Configuration +MISP_URL = "https://misp.cc24.dev" +MISP_API_KEY = "QUcv9s0M7nl10eIIxbEJchZyJpO4oZeu9i3sPOlZ" +TEST_IP = "196.251.115.108" + +def test_misp_queries(): + """Test both ip-src and ip-dst queries like the script does.""" + + print(f"Testing MISP queries for {TEST_IP}") + print("=" * 60) + + all_results = [] + search_types = ["ip-src", "ip-dst"] + + for search_type in search_types: + print(f"\nšŸ” Searching for {search_type}...") + + payload = { + "returnFormat": "json", + "value": TEST_IP, + "type": search_type, + "enforceWarninglist": False, + "includeEventTags": True, + "includeContext": True, + "distribution": [0, 1, 2, 3, 5], + "includeEventUuid": True, + "includeCorrelations": True, + "includeDecayScore": False, + "includeFullModel": False, + } + + try: + response = requests.post( + f"{MISP_URL}/attributes/restSearch/", + json=payload, + headers={"Authorization": MISP_API_KEY}, + verify=False, + timeout=45, + ) + + if response.status_code == 200: + data = response.json() + attributes = data.get("response", {}).get("Attribute", []) + print(f" āœ… Found {len(attributes)} attributes") + + for i, attr in enumerate(attributes): + event = attr.get("Event", {}) + event_id = event.get("id") + event_info = event.get("info", "Unknown") + attr_type = attr.get("type") + print(f" [{i+1}] Event {event_id}: {event_info[:60]}... (type: {attr_type})") + + all_results.extend(attributes) + else: + print(f" āŒ API returned status {response.status_code}") + + except Exception as e: + print(f" āŒ Error: {e}") + + print(f"\nšŸ“Š COMBINED RESULTS:") + print(f" Total attributes: {len(all_results)}") + + # Test deduplication logic (like the script does) + unique_events = {} + for res in all_results: + event_info = res.get("Event", {}) + event_id = event_info.get("id") + event_desc = event_info.get("info", "Unknown") + + if event_id and event_id not in unique_events: + unique_events[event_id] = { + 'description': event_desc, + 'url': f"{MISP_URL}/events/view/{event_id}" + } + + unique_event_list = list(unique_events.values()) + print(f" Unique events after dedup: {len(unique_event_list)}") + + print(f"\nšŸŽÆ FINAL EVENT LIST:") + for i, event_data in enumerate(unique_event_list): + short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description'] + print(f" [{i+1}] {short_desc}") + print(f" URL: {event_data['url']}") + + # Test the message building logic + print(f"\nšŸ’¬ SIMULATED MESSAGE:") + if len(unique_event_list) == 1: + event_data = unique_event_list[0] + short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description'] + msg = f"MISP: Malicious IP | Event: {short_desc} | Link: {event_data['url']}" + elif len(unique_event_list) > 1: + msg = f"MISP: Malicious IP | {len(unique_event_list)} Events:" + for i, event_data in enumerate(unique_event_list[:2]): + short_desc = event_data['description'][:40] + "..." if len(event_data['description']) > 40 else event_data['description'] + msg += f" [{i+1}] {short_desc} ({event_data['url']})" + if i < len(unique_event_list) - 1 and i < 1: + msg += " |" + + if len(unique_event_list) > 2: + msg += f" | +{len(unique_event_list)-2} more events" + + msg += f" | {len(all_results)} total correlations" + + print(f" {msg}") + + print(f"\nšŸ” EVENT ID CHECK:") + event_ids = [str(event_info.get("Event", {}).get("id", "")) for event_info in all_results] + unique_ids = list(set(event_ids)) + print(f" All event IDs found: {sorted(event_ids)}") + print(f" Unique event IDs: {sorted(unique_ids)}") + print(f" Event 5 present: {'5' in unique_ids}") + print(f" Event 4425 present: {'4425' in unique_ids}") + print(f" Event 4427 present: {'4427' in unique_ids}") + print(f" Event 4430 present: {'4430' in unique_ids}") + +if __name__ == "__main__": + test_misp_queries()