This commit is contained in:
overcuriousity 2025-07-30 22:01:21 +02:00
parent e97f8d3a70
commit 40cd619180
2 changed files with 134 additions and 11 deletions

View File

@ -124,7 +124,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
return [] return []
try: try:
# For IP searches, query both ip-src and ip-dst
search_types = [] search_types = []
if attr.startswith("ip-"): if attr.startswith("ip-"):
search_types = ["ip-src", "ip-dst"] search_types = ["ip-src", "ip-dst"]
@ -134,6 +133,7 @@ class MispAnalyzer(interface.BaseAnalyzer):
all_results = [] all_results = []
for search_type in search_types: for search_type in search_types:
payload = { payload = {
"returnFormat": "json", "returnFormat": "json",
"value": value, "value": value,
@ -169,20 +169,17 @@ class MispAnalyzer(interface.BaseAnalyzer):
if response.status_code == 200: if response.status_code == 200:
data = response.json() data = response.json()
attributes = data.get("response", {}).get("Attribute", []) attributes = data.get("response", {}).get("Attribute", [])
for i, attr_data in enumerate(attributes):
event_data = attr_data.get("Event", {})
all_results.extend(attributes) all_results.extend(attributes)
else:
logger.debug(f"MISP API returned status {response.status_code} for {value} ({search_type})")
# Small delay between search types
time.sleep(0.1) time.sleep(0.1)
if all_results and self.include_community: unique_event_ids = set()
orgs = set() for attr_data in all_results:
for attr_data in all_results: event_id = attr_data.get("Event", {}).get("id")
org = attr_data.get("Event", {}).get("Orgc", {}).get("name", "Unknown") if event_id:
orgs.add(org) unique_event_ids.add(event_id)
if len(orgs) > 1 or (orgs and list(orgs)[0] not in ["Unknown", "Your Org"]):
logger.info(f"Community hit for {value}: {len(all_results)} matches from {', '.join(list(orgs)[:3])}")
return all_results return all_results

126
testscript.py Normal file
View File

@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""
MISP Debug Test Script
Run this standalone to test exactly what the script sees from MISP API
"""
import requests
import json
# Configuration
MISP_URL = "https://misp.cc24.dev"
MISP_API_KEY = "QUcv9s0M7nl10eIIxbEJchZyJpO4oZeu9i3sPOlZ"
TEST_IP = "196.251.115.108"
def test_misp_queries():
"""Test both ip-src and ip-dst queries like the script does."""
print(f"Testing MISP queries for {TEST_IP}")
print("=" * 60)
all_results = []
search_types = ["ip-src", "ip-dst"]
for search_type in search_types:
print(f"\n🔍 Searching for {search_type}...")
payload = {
"returnFormat": "json",
"value": TEST_IP,
"type": search_type,
"enforceWarninglist": False,
"includeEventTags": True,
"includeContext": True,
"distribution": [0, 1, 2, 3, 5],
"includeEventUuid": True,
"includeCorrelations": True,
"includeDecayScore": False,
"includeFullModel": False,
}
try:
response = requests.post(
f"{MISP_URL}/attributes/restSearch/",
json=payload,
headers={"Authorization": MISP_API_KEY},
verify=False,
timeout=45,
)
if response.status_code == 200:
data = response.json()
attributes = data.get("response", {}).get("Attribute", [])
print(f" ✅ Found {len(attributes)} attributes")
for i, attr in enumerate(attributes):
event = attr.get("Event", {})
event_id = event.get("id")
event_info = event.get("info", "Unknown")
attr_type = attr.get("type")
print(f" [{i+1}] Event {event_id}: {event_info[:60]}... (type: {attr_type})")
all_results.extend(attributes)
else:
print(f" ❌ API returned status {response.status_code}")
except Exception as e:
print(f" ❌ Error: {e}")
print(f"\n📊 COMBINED RESULTS:")
print(f" Total attributes: {len(all_results)}")
# Test deduplication logic (like the script does)
unique_events = {}
for res in all_results:
event_info = res.get("Event", {})
event_id = event_info.get("id")
event_desc = event_info.get("info", "Unknown")
if event_id and event_id not in unique_events:
unique_events[event_id] = {
'description': event_desc,
'url': f"{MISP_URL}/events/view/{event_id}"
}
unique_event_list = list(unique_events.values())
print(f" Unique events after dedup: {len(unique_event_list)}")
print(f"\n🎯 FINAL EVENT LIST:")
for i, event_data in enumerate(unique_event_list):
short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description']
print(f" [{i+1}] {short_desc}")
print(f" URL: {event_data['url']}")
# Test the message building logic
print(f"\n💬 SIMULATED MESSAGE:")
if len(unique_event_list) == 1:
event_data = unique_event_list[0]
short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description']
msg = f"MISP: Malicious IP | Event: {short_desc} | Link: {event_data['url']}"
elif len(unique_event_list) > 1:
msg = f"MISP: Malicious IP | {len(unique_event_list)} Events:"
for i, event_data in enumerate(unique_event_list[:2]):
short_desc = event_data['description'][:40] + "..." if len(event_data['description']) > 40 else event_data['description']
msg += f" [{i+1}] {short_desc} ({event_data['url']})"
if i < len(unique_event_list) - 1 and i < 1:
msg += " |"
if len(unique_event_list) > 2:
msg += f" | +{len(unique_event_list)-2} more events"
msg += f" | {len(all_results)} total correlations"
print(f" {msg}")
print(f"\n🔍 EVENT ID CHECK:")
event_ids = [str(event_info.get("Event", {}).get("id", "")) for event_info in all_results]
unique_ids = list(set(event_ids))
print(f" All event IDs found: {sorted(event_ids)}")
print(f" Unique event IDs: {sorted(unique_ids)}")
print(f" Event 5 present: {'5' in unique_ids}")
print(f" Event 4425 present: {'4425' in unique_ids}")
print(f" Event 4427 present: {'4427' in unique_ids}")
print(f" Event 4430 present: {'4430' in unique_ids}")
if __name__ == "__main__":
test_misp_queries()