This commit is contained in:
overcuriousity 2025-07-30 21:39:11 +02:00
parent f342760ff0
commit dfa3a9fc53

View File

@ -32,19 +32,16 @@ class MispAnalyzer(interface.BaseAnalyzer):
self._attr = kwargs.get("attr") self._attr = kwargs.get("attr")
self._timesketch_attr = kwargs.get("timesketch_attr") self._timesketch_attr = kwargs.get("timesketch_attr")
# Simple configuration for reliability self.include_community = kwargs.get("include_community", False)
self.include_community = kwargs.get("include_community", False) # Default to false for reliability self.chunk_size = kwargs.get("chunk_size", 1000)
self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks self.max_retries = kwargs.get("max_retries", 2)
self.max_retries = kwargs.get("max_retries", 2) # Minimal retries self.request_delay = kwargs.get("request_delay", 0.5)
self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests
self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
# Track processed items
self.processed_indicators = set() self.processed_indicators = set()
self.failed_indicators = set() self.failed_indicators = set()
# Simple stats
self.stats = { self.stats = {
'events_processed': 0, 'events_processed': 0,
'indicators_found': 0, 'indicators_found': 0,
@ -59,60 +56,47 @@ class MispAnalyzer(interface.BaseAnalyzer):
@staticmethod @staticmethod
def get_kwargs(): def get_kwargs():
"""Get kwargs for the analyzer - keeping original working structure.""" """Get kwargs for the analyzer."""
to_query = [ to_query = [
{ {
"query_string": "md5_hash:*", "query_string": "md5_hash:*",
"attr": "md5", "attr": "md5",
"timesketch_attr": "md5_hash", "timesketch_attr": "md5_hash",
"include_community": False, # Start with own org only "include_community": True,
}, },
{ {
"query_string": "sha1_hash:*", "query_string": "sha1_hash:*",
"attr": "sha1", "attr": "sha1",
"timesketch_attr": "sha1_hash", "timesketch_attr": "sha1_hash",
"include_community": False, "include_community": True,
}, },
{ {
"query_string": "sha256_hash:*", "query_string": "sha256_hash:*",
"attr": "sha256", "attr": "sha256",
"timesketch_attr": "sha256_hash", "timesketch_attr": "sha256_hash",
"include_community": False, "include_community": True,
}, },
{ {
"query_string": "filename:*", "query_string": "filename:*",
"attr": "filename", "attr": "filename",
"timesketch_attr": "filename", "timesketch_attr": "filename",
"include_community": False, "include_community": True,
},
{
"query_string": "message:*",
"attr": "ip-src",
"timesketch_attr": "message",
"include_community": False,
},
{
"query_string": "message:*",
"attr": "ip-dst",
"timesketch_attr": "message",
"include_community": False,
}, },
{ {
"query_string": "source_ip:*", "query_string": "source_ip:*",
"attr": "ip-src", "attr": "ip-src",
"timesketch_attr": "source_ip", "timesketch_attr": "source_ip",
"include_community": False, "include_community": True,
}, },
] ]
return to_query return to_query
def _is_valid_ip(self, ip_str): def _is_valid_ip(self, ip_str):
"""Simple IP validation - keeping original working version.""" """Simple IP validation."""
try: try:
import ipaddress import ipaddress
ip_str = ip_str.strip() ip_str = ip_str.strip()
ipaddress.ip_address(ip_str) ipaddress.ip_address(ip_str)
# Filter out invalid ranges
if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')): if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')):
return False return False
return True return True
@ -120,7 +104,7 @@ class MispAnalyzer(interface.BaseAnalyzer):
return False return False
def _is_valid_hash(self, hash_str, hash_type): def _is_valid_hash(self, hash_str, hash_type):
"""Simple hash validation - keeping original working version.""" """Simple hash validation."""
if not hash_str: if not hash_str:
return False return False
hash_str = hash_str.strip().lower() hash_str = hash_str.strip().lower()
@ -135,33 +119,31 @@ class MispAnalyzer(interface.BaseAnalyzer):
return False return False
def query_misp_single(self, value, attr, retry_count=0): def query_misp_single(self, value, attr, retry_count=0):
"""Query MISP for a single value - enhanced with community search.""" """Query MISP for a single value."""
if value in self.failed_indicators: if value in self.failed_indicators:
return [] return []
try: try:
# Build enhanced payload for community search
payload = { payload = {
"returnFormat": "json", "returnFormat": "json",
"value": value, "value": value,
"type": attr, "type": attr,
"enforceWarninglist": False, # Don't filter known-good indicators "enforceWarninglist": False,
"includeEventTags": True, # Include event tags for context "includeEventTags": True,
"includeContext": True, # Include context information "includeContext": True,
} }
# Enhanced community search - include ALL distribution levels
if self.include_community: if self.include_community:
payload.update({ payload.update({
"distribution": [0, 1, 2, 3, 5], # Own, community, connected, all, inherit "distribution": [0, 1, 2, 3, 5],
"includeEventUuid": True, # Include event UUIDs "includeEventUuid": True,
"includeCorrelations": True, # Include correlations "includeCorrelations": True,
"includeDecayScore": False, # Skip decay for speed "includeDecayScore": False,
"includeFullModel": False, # Skip full model for speed "includeFullModel": False,
}) })
logger.debug(f"Community search enabled for {value} ({attr})") logger.debug(f"Community search enabled for {value} ({attr})")
else: else:
payload["distribution"] = [0] # Own org only payload["distribution"] = [0]
logger.debug(f"Own org search only for {value} ({attr})") logger.debug(f"Own org search only for {value} ({attr})")
self.stats['api_calls'] += 1 self.stats['api_calls'] += 1
@ -181,7 +163,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
data = response.json() data = response.json()
attributes = data.get("response", {}).get("Attribute", []) attributes = data.get("response", {}).get("Attribute", [])
# Log community sources for debugging
if attributes and self.include_community: if attributes and self.include_community:
orgs = set() orgs = set()
for attr_data in attributes: for attr_data in attributes:
@ -196,7 +177,7 @@ class MispAnalyzer(interface.BaseAnalyzer):
self.stats['api_timeouts'] += 1 self.stats['api_timeouts'] += 1
if retry_count < self.max_retries: if retry_count < self.max_retries:
wait_time = (retry_count + 1) * 2 # Simple backoff: 2s, 4s wait_time = (retry_count + 1) * 2
logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})")
time.sleep(wait_time) time.sleep(wait_time)
return self.query_misp_single(value, attr, retry_count + 1) return self.query_misp_single(value, attr, retry_count + 1)
@ -210,17 +191,17 @@ class MispAnalyzer(interface.BaseAnalyzer):
return [] return []
def mark_event(self, event, result, attr): def mark_event(self, event, result, attr):
"""Mark event with MISP intelligence - enhanced with correlating events.""" """Mark event with MISP intelligence including event links."""
try: try:
if attr.startswith("ip-"): if attr.startswith("ip-"):
msg = "MISP: Malicious IP" msg = "MISP: Malicious IP"
else: else:
msg = "MISP: Known indicator" msg = "MISP: Known indicator"
# Collect unique correlating events and organizations
correlating_events = [] correlating_events = []
orgs = set() orgs = set()
event_ids = set() event_ids = set()
event_links = []
for res in result: for res in result:
event_info = res.get("Event", {}) event_info = res.get("Event", {})
@ -228,28 +209,24 @@ class MispAnalyzer(interface.BaseAnalyzer):
event_desc = event_info.get("info", "Unknown") event_desc = event_info.get("info", "Unknown")
org_name = event_info.get("Orgc", {}).get("name", "Unknown") org_name = event_info.get("Orgc", {}).get("name", "Unknown")
# Avoid duplicate events (same ID)
if event_id and event_id not in event_ids: if event_id and event_id not in event_ids:
event_ids.add(event_id) event_ids.add(event_id)
# Truncate long descriptions
short_desc = event_desc[:60] + "..." if len(event_desc) > 60 else event_desc short_desc = event_desc[:60] + "..." if len(event_desc) > 60 else event_desc
correlating_events.append(short_desc) correlating_events.append(short_desc)
event_links.append(f"{self.misp_url}/events/view/{event_id}")
if org_name != "Unknown": if org_name != "Unknown":
orgs.add(org_name) orgs.add(org_name)
# Build comprehensive message with correlating events
if len(correlating_events) == 1: if len(correlating_events) == 1:
msg += f" | Event: {correlating_events[0]}" msg += f" | Event: {correlating_events[0]}"
elif len(correlating_events) > 1: elif len(correlating_events) > 1:
# Show first 2 correlating events, then count
msg += f" | Events: {correlating_events[0]}" msg += f" | Events: {correlating_events[0]}"
if len(correlating_events) > 1: if len(correlating_events) > 1:
msg += f" + {correlating_events[1]}" msg += f" + {correlating_events[1]}"
if len(correlating_events) > 2: if len(correlating_events) > 2:
msg += f" + {len(correlating_events)-2} more" msg += f" + {len(correlating_events)-2} more"
# Add organization information for community awareness
if self.include_community and orgs: if self.include_community and orgs:
if len(orgs) > 1: if len(orgs) > 1:
msg += f" | Sources: {', '.join(list(orgs)[:2])}" msg += f" | Sources: {', '.join(list(orgs)[:2])}"
@ -258,11 +235,19 @@ class MispAnalyzer(interface.BaseAnalyzer):
else: else:
msg += f" | Source: {list(orgs)[0]}" msg += f" | Source: {list(orgs)[0]}"
# Add correlation count for context
if len(result) > 1: if len(result) > 1:
msg += f" | {len(result)} correlations" msg += f" | {len(result)} correlations"
# Add appropriate tags including correlation info if event_links:
if len(event_links) == 1:
msg += f" | MISP Event: {event_links[0]}"
else:
msg += f" | MISP Events: {event_links[0]}"
if len(event_links) > 1:
msg += f", {event_links[1]}"
if len(event_links) > 2:
msg += f" +{len(event_links)-2} more"
tags = [f"MISP-{attr}", "threat-intel"] tags = [f"MISP-{attr}", "threat-intel"]
if self.include_community: if self.include_community:
tags.append("community-intel") tags.append("community-intel")
@ -275,7 +260,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
self.stats['events_marked'] += 1 self.stats['events_marked'] += 1
# Log detailed correlation info for debugging
logger.info(f"Marked event with {len(correlating_events)} correlating MISP events: {', '.join(correlating_events[:3])}") logger.info(f"Marked event with {len(correlating_events)} correlating MISP events: {', '.join(correlating_events[:3])}")
except Exception as e: except Exception as e:
@ -295,7 +279,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
indicators = [] indicators = []
# Extract based on attribute type - SAME AS ORIGINAL
if attr.startswith("ip-") and timesketch_attr == "message": if attr.startswith("ip-") and timesketch_attr == "message":
ip_matches = self.ip_pattern.findall(str(loc)) ip_matches = self.ip_pattern.findall(str(loc))
indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)]
@ -313,7 +296,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
if filename and len(filename) > 1: if filename and len(filename) > 1:
indicators = [filename] indicators = [filename]
# Store event with its indicators
if indicators: if indicators:
events_with_indicators.append((event, indicators)) events_with_indicators.append((event, indicators))
chunk_indicators.extend(indicators) chunk_indicators.extend(indicators)
@ -329,19 +311,16 @@ class MispAnalyzer(interface.BaseAnalyzer):
if not chunk_indicators: if not chunk_indicators:
return return
# Get unique new indicators
unique_indicators = list(dict.fromkeys(chunk_indicators)) unique_indicators = list(dict.fromkeys(chunk_indicators))
new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators]
if not new_indicators: if not new_indicators:
# Still need to check existing cache for matches
self.check_existing_matches(events_with_indicators, attr) self.check_existing_matches(events_with_indicators, attr)
return return
logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events")
self.stats['indicators_found'] += len(new_indicators) self.stats['indicators_found'] += len(new_indicators)
# Query MISP for each new indicator
for indicator in new_indicators: for indicator in new_indicators:
if indicator in self.failed_indicators: if indicator in self.failed_indicators:
continue continue
@ -350,7 +329,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
if result: if result:
self.result_dict[f"{attr}:{indicator}"] = result self.result_dict[f"{attr}:{indicator}"] = result
# Track correlation statistics
self.stats['total_correlations'] += len(result) self.stats['total_correlations'] += len(result)
unique_events = set() unique_events = set()
for res in result: for res in result:
@ -361,7 +339,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
if len(unique_events) > 1: if len(unique_events) > 1:
self.stats['multi_event_correlations'] += 1 self.stats['multi_event_correlations'] += 1
# Track community vs own org hits with correlation details
orgs = set() orgs = set()
events = set() events = set()
for res in result: for res in result:
@ -381,10 +358,8 @@ class MispAnalyzer(interface.BaseAnalyzer):
self.processed_indicators.add(indicator) self.processed_indicators.add(indicator)
# Small delay to be nice to MISP server
time.sleep(self.request_delay) time.sleep(self.request_delay)
# Mark events that have matches
self.check_existing_matches(events_with_indicators, attr) self.check_existing_matches(events_with_indicators, attr)
def test_community_connectivity(self): def test_community_connectivity(self):
@ -393,10 +368,9 @@ class MispAnalyzer(interface.BaseAnalyzer):
return "Community search disabled" return "Community search disabled"
try: try:
# Test with a known community indicator (if available)
test_payload = { test_payload = {
"returnFormat": "json", "returnFormat": "json",
"distribution": [1, 2, 3], # Community levels only "distribution": [1, 2, 3],
"limit": 1, "limit": 1,
"enforceWarninglist": False, "enforceWarninglist": False,
} }
@ -429,19 +403,17 @@ class MispAnalyzer(interface.BaseAnalyzer):
def check_existing_matches(self, events_with_indicators, attr): def check_existing_matches(self, events_with_indicators, attr):
"""Check events against existing MISP results.""" """Check events against existing MISP results."""
for event, indicators in events_with_indicators: for event, indicators in events_with_indicators:
# Check if any indicator has MISP match
for indicator in indicators: for indicator in indicators:
key = f"{attr}:{indicator}" key = f"{attr}:{indicator}"
if key in self.result_dict and self.result_dict[key]: if key in self.result_dict and self.result_dict[key]:
self.mark_event(event, self.result_dict[key], attr) self.mark_event(event, self.result_dict[key], attr)
break # Only mark once per event break
def query_misp(self, query, attr, timesketch_attr): def query_misp(self, query, attr, timesketch_attr):
"""Process events in chunks - enhanced for large datasets.""" """Process events in chunks."""
logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}")
logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}") logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}")
# Get event stream
events_stream = self.event_stream( events_stream = self.event_stream(
query_string=query, query_string=query,
return_fields=[timesketch_attr, '_id'] return_fields=[timesketch_attr, '_id']
@ -453,19 +425,16 @@ class MispAnalyzer(interface.BaseAnalyzer):
for event in events_stream: for event in events_stream:
current_chunk.append(event) current_chunk.append(event)
# Process when chunk is full
if len(current_chunk) >= self.chunk_size: if len(current_chunk) >= self.chunk_size:
self.process_chunk(current_chunk, attr, timesketch_attr) self.process_chunk(current_chunk, attr, timesketch_attr)
current_chunk = [] current_chunk = []
# Progress logging
if self.stats['events_processed'] % 10000 == 0: if self.stats['events_processed'] % 10000 == 0:
logger.info(f"Progress: {self.stats['events_processed']} events processed, " logger.info(f"Progress: {self.stats['events_processed']} events processed, "
f"{self.stats['events_marked']} marked, " f"{self.stats['events_marked']} marked, "
f"{self.stats['api_calls']} API calls, " f"{self.stats['api_calls']} API calls, "
f"{self.stats['api_timeouts']} timeouts") f"{self.stats['api_timeouts']} timeouts")
# Process final chunk
if current_chunk: if current_chunk:
self.process_chunk(current_chunk, attr, timesketch_attr) self.process_chunk(current_chunk, attr, timesketch_attr)
@ -473,7 +442,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
logger.error(f"Error during chunk processing: {e}") logger.error(f"Error during chunk processing: {e}")
raise raise
# Create view if we found matches
if self.stats['events_marked'] > 0: if self.stats['events_marked'] > 0:
view_name = "MISP Threat Intelligence" view_name = "MISP Threat Intelligence"
if self.include_community: if self.include_community:
@ -485,7 +453,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
query_string='tag:"MISP-*" OR tag:"threat-intel"', query_string='tag:"MISP-*" OR tag:"threat-intel"',
) )
# Create additional view for multi-event correlations
correlation_count = sum(1 for key, results in self.result_dict.items() correlation_count = sum(1 for key, results in self.result_dict.items()
if results and len(results) > 1) if results and len(results) > 1)
if correlation_count > 0: if correlation_count > 0:
@ -502,7 +469,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
start_time = time.time() start_time = time.time()
# Test community connectivity if enabled
if self.include_community: if self.include_community:
community_status = self.test_community_connectivity() community_status = self.test_community_connectivity()
logger.info(f"Community connectivity test: {community_status}") logger.info(f"Community connectivity test: {community_status}")
@ -514,7 +480,6 @@ class MispAnalyzer(interface.BaseAnalyzer):
success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) /
max(1, self.stats['api_calls']) * 100) max(1, self.stats['api_calls']) * 100)
# Enhanced results with community and correlation statistics
result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " result = (f"[{self._timesketch_attr}] MISP Analysis Complete: "
f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | "
f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ") f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ")