539 lines
22 KiB
Python
539 lines
22 KiB
Python
"""Index analyzer plugin for MISP - Simple and reliable for large-scale processing."""
|
|
|
|
import logging
|
|
import ntpath
|
|
import re
|
|
import requests
|
|
import time
|
|
|
|
from flask import current_app
|
|
from timesketch.lib.analyzers import interface
|
|
from timesketch.lib.analyzers import manager
|
|
|
|
|
|
logger = logging.getLogger("timesketch.analyzers.misp")
|
|
|
|
|
|
class MispAnalyzer(interface.BaseAnalyzer):
|
|
"""Simple, reliable MISP Analyzer for large-scale processing."""
|
|
|
|
NAME = "misp_analyzer"
|
|
DISPLAY_NAME = "MISP"
|
|
DESCRIPTION = "Mark events using MISP - Simple and Reliable"
|
|
|
|
def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs):
|
|
"""Initialize the Analyzer."""
|
|
super().__init__(index_name, sketch_id, timeline_id=timeline_id)
|
|
self.misp_url = current_app.config.get("MISP_URL")
|
|
self.misp_api_key = current_app.config.get("MISP_API_KEY")
|
|
self.total_event_counter = 0
|
|
self.result_dict = {}
|
|
self._query_string = kwargs.get("query_string")
|
|
self._attr = kwargs.get("attr")
|
|
self._timesketch_attr = kwargs.get("timesketch_attr")
|
|
|
|
# Simple configuration for reliability
|
|
self.include_community = kwargs.get("include_community", False) # Default to false for reliability
|
|
self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks
|
|
self.max_retries = kwargs.get("max_retries", 2) # Minimal retries
|
|
self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests
|
|
|
|
self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
|
|
|
|
# Track processed items
|
|
self.processed_indicators = set()
|
|
self.failed_indicators = set()
|
|
|
|
# Simple stats
|
|
self.stats = {
|
|
'events_processed': 0,
|
|
'indicators_found': 0,
|
|
'api_calls': 0,
|
|
'api_timeouts': 0,
|
|
'events_marked': 0,
|
|
'community_hits': 0,
|
|
'own_org_hits': 0,
|
|
'total_correlations': 0,
|
|
'multi_event_correlations': 0
|
|
}
|
|
|
|
@staticmethod
|
|
def get_kwargs():
|
|
"""Get kwargs for the analyzer - keeping original working structure."""
|
|
to_query = [
|
|
{
|
|
"query_string": "md5_hash:*",
|
|
"attr": "md5",
|
|
"timesketch_attr": "md5_hash",
|
|
"include_community": False, # Start with own org only
|
|
},
|
|
{
|
|
"query_string": "sha1_hash:*",
|
|
"attr": "sha1",
|
|
"timesketch_attr": "sha1_hash",
|
|
"include_community": False,
|
|
},
|
|
{
|
|
"query_string": "sha256_hash:*",
|
|
"attr": "sha256",
|
|
"timesketch_attr": "sha256_hash",
|
|
"include_community": False,
|
|
},
|
|
{
|
|
"query_string": "filename:*",
|
|
"attr": "filename",
|
|
"timesketch_attr": "filename",
|
|
"include_community": False,
|
|
},
|
|
{
|
|
"query_string": "message:*",
|
|
"attr": "ip-src",
|
|
"timesketch_attr": "message",
|
|
"include_community": False,
|
|
},
|
|
{
|
|
"query_string": "message:*",
|
|
"attr": "ip-dst",
|
|
"timesketch_attr": "message",
|
|
"include_community": False,
|
|
},
|
|
{
|
|
"query_string": "source_ip:*",
|
|
"attr": "ip-src",
|
|
"timesketch_attr": "source_ip",
|
|
"include_community": False,
|
|
},
|
|
]
|
|
return to_query
|
|
|
|
def _is_valid_ip(self, ip_str):
|
|
"""Simple IP validation - keeping original working version."""
|
|
try:
|
|
import ipaddress
|
|
ip_str = ip_str.strip()
|
|
ipaddress.ip_address(ip_str)
|
|
# Filter out invalid ranges
|
|
if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')):
|
|
return False
|
|
return True
|
|
except (ValueError, AttributeError):
|
|
return False
|
|
|
|
def _is_valid_hash(self, hash_str, hash_type):
|
|
"""Simple hash validation - keeping original working version."""
|
|
if not hash_str:
|
|
return False
|
|
hash_str = hash_str.strip().lower()
|
|
|
|
if hash_type == "md5":
|
|
return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str)
|
|
elif hash_type == "sha1":
|
|
return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str)
|
|
elif hash_type == "sha256":
|
|
return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str)
|
|
|
|
return False
|
|
|
|
def query_misp_single(self, value, attr, retry_count=0):
|
|
"""Query MISP for a single value - enhanced with community search."""
|
|
if value in self.failed_indicators:
|
|
return []
|
|
|
|
try:
|
|
# Build enhanced payload for community search
|
|
payload = {
|
|
"returnFormat": "json",
|
|
"value": value,
|
|
"type": attr,
|
|
"enforceWarninglist": False, # Don't filter known-good indicators
|
|
"includeEventTags": True, # Include event tags for context
|
|
"includeContext": True, # Include context information
|
|
}
|
|
|
|
# Enhanced community search - include ALL distribution levels
|
|
if self.include_community:
|
|
payload.update({
|
|
"distribution": [0, 1, 2, 3, 5], # Own, community, connected, all, inherit
|
|
"includeEventUuid": True, # Include event UUIDs
|
|
"includeCorrelations": True, # Include correlations
|
|
"includeDecayScore": False, # Skip decay for speed
|
|
"includeFullModel": False, # Skip full model for speed
|
|
})
|
|
logger.debug(f"Community search enabled for {value} ({attr})")
|
|
else:
|
|
payload["distribution"] = [0] # Own org only
|
|
logger.debug(f"Own org search only for {value} ({attr})")
|
|
|
|
self.stats['api_calls'] += 1
|
|
|
|
response = requests.post(
|
|
f"{self.misp_url}/attributes/restSearch/",
|
|
json=payload,
|
|
headers={"Authorization": self.misp_api_key},
|
|
verify=False,
|
|
timeout=45,
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
logger.debug(f"MISP API returned status {response.status_code} for {value}")
|
|
return []
|
|
|
|
data = response.json()
|
|
attributes = data.get("response", {}).get("Attribute", [])
|
|
|
|
# Log community sources for debugging
|
|
if attributes and self.include_community:
|
|
orgs = set()
|
|
for attr_data in attributes:
|
|
org = attr_data.get("Event", {}).get("Orgc", {}).get("name", "Unknown")
|
|
orgs.add(org)
|
|
if len(orgs) > 1 or (orgs and list(orgs)[0] not in ["Unknown", "Your Org"]):
|
|
logger.info(f"Community hit for {value}: {len(attributes)} matches from {', '.join(list(orgs)[:3])}")
|
|
|
|
return attributes
|
|
|
|
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
|
|
self.stats['api_timeouts'] += 1
|
|
|
|
if retry_count < self.max_retries:
|
|
wait_time = (retry_count + 1) * 2 # Simple backoff: 2s, 4s
|
|
logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})")
|
|
time.sleep(wait_time)
|
|
return self.query_misp_single(value, attr, retry_count + 1)
|
|
else:
|
|
logger.error(f"Max retries exceeded for {value}: {e}")
|
|
self.failed_indicators.add(value)
|
|
return []
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error querying MISP for {value}: {e}")
|
|
return []
|
|
|
|
def mark_event(self, event, result, attr):
|
|
"""Mark event with MISP intelligence - enhanced with correlating events."""
|
|
try:
|
|
if attr.startswith("ip-"):
|
|
msg = "MISP: Malicious IP"
|
|
else:
|
|
msg = "MISP: Known indicator"
|
|
|
|
# Collect unique correlating events and organizations
|
|
correlating_events = []
|
|
orgs = set()
|
|
event_ids = set()
|
|
|
|
for res in result:
|
|
event_info = res.get("Event", {})
|
|
event_id = event_info.get("id")
|
|
event_desc = event_info.get("info", "Unknown")
|
|
org_name = event_info.get("Orgc", {}).get("name", "Unknown")
|
|
|
|
# Avoid duplicate events (same ID)
|
|
if event_id and event_id not in event_ids:
|
|
event_ids.add(event_id)
|
|
# Truncate long descriptions
|
|
short_desc = event_desc[:60] + "..." if len(event_desc) > 60 else event_desc
|
|
correlating_events.append(short_desc)
|
|
|
|
if org_name != "Unknown":
|
|
orgs.add(org_name)
|
|
|
|
# Build comprehensive message with correlating events
|
|
if len(correlating_events) == 1:
|
|
msg += f" | Event: {correlating_events[0]}"
|
|
elif len(correlating_events) > 1:
|
|
# Show first 2 correlating events, then count
|
|
msg += f" | Events: {correlating_events[0]}"
|
|
if len(correlating_events) > 1:
|
|
msg += f" + {correlating_events[1]}"
|
|
if len(correlating_events) > 2:
|
|
msg += f" + {len(correlating_events)-2} more"
|
|
|
|
# Add organization information for community awareness
|
|
if self.include_community and orgs:
|
|
if len(orgs) > 1:
|
|
msg += f" | Sources: {', '.join(list(orgs)[:2])}"
|
|
if len(orgs) > 2:
|
|
msg += f" +{len(orgs)-2} more"
|
|
else:
|
|
msg += f" | Source: {list(orgs)[0]}"
|
|
|
|
# Add correlation count for context
|
|
if len(result) > 1:
|
|
msg += f" | {len(result)} correlations"
|
|
|
|
# Add appropriate tags including correlation info
|
|
tags = [f"MISP-{attr}", "threat-intel"]
|
|
if self.include_community:
|
|
tags.append("community-intel")
|
|
if len(correlating_events) > 1:
|
|
tags.append("multi-event-correlation")
|
|
|
|
event.add_comment(msg)
|
|
event.add_tags(tags)
|
|
event.commit()
|
|
|
|
self.stats['events_marked'] += 1
|
|
|
|
# Log detailed correlation info for debugging
|
|
logger.info(f"Marked event with {len(correlating_events)} correlating MISP events: {', '.join(correlating_events[:3])}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error marking event: {e}")
|
|
|
|
def extract_indicators_from_chunk(self, events_chunk, attr, timesketch_attr):
|
|
"""Extract indicators from a chunk of events."""
|
|
chunk_indicators = []
|
|
events_with_indicators = []
|
|
|
|
for event in events_chunk:
|
|
self.stats['events_processed'] += 1
|
|
|
|
loc = event.source.get(timesketch_attr)
|
|
if not loc:
|
|
continue
|
|
|
|
indicators = []
|
|
|
|
# Extract based on attribute type - SAME AS ORIGINAL
|
|
if attr.startswith("ip-") and timesketch_attr == "message":
|
|
ip_matches = self.ip_pattern.findall(str(loc))
|
|
indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)]
|
|
|
|
elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]:
|
|
if self._is_valid_ip(str(loc)):
|
|
indicators = [str(loc)]
|
|
|
|
elif attr in ["md5", "sha1", "sha256"]:
|
|
if self._is_valid_hash(str(loc), attr):
|
|
indicators = [str(loc)]
|
|
|
|
elif attr == "filename":
|
|
filename = ntpath.basename(str(loc))
|
|
if filename and len(filename) > 1:
|
|
indicators = [filename]
|
|
|
|
# Store event with its indicators
|
|
if indicators:
|
|
events_with_indicators.append((event, indicators))
|
|
chunk_indicators.extend(indicators)
|
|
|
|
return events_with_indicators, chunk_indicators
|
|
|
|
def process_chunk(self, events_chunk, attr, timesketch_attr):
|
|
"""Process a chunk of events."""
|
|
events_with_indicators, chunk_indicators = self.extract_indicators_from_chunk(
|
|
events_chunk, attr, timesketch_attr
|
|
)
|
|
|
|
if not chunk_indicators:
|
|
return
|
|
|
|
# Get unique new indicators
|
|
unique_indicators = list(dict.fromkeys(chunk_indicators))
|
|
new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators]
|
|
|
|
if not new_indicators:
|
|
# Still need to check existing cache for matches
|
|
self.check_existing_matches(events_with_indicators, attr)
|
|
return
|
|
|
|
logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events")
|
|
self.stats['indicators_found'] += len(new_indicators)
|
|
|
|
# Query MISP for each new indicator
|
|
for indicator in new_indicators:
|
|
if indicator in self.failed_indicators:
|
|
continue
|
|
|
|
result = self.query_misp_single(indicator, attr)
|
|
if result:
|
|
self.result_dict[f"{attr}:{indicator}"] = result
|
|
|
|
# Track correlation statistics
|
|
self.stats['total_correlations'] += len(result)
|
|
unique_events = set()
|
|
for res in result:
|
|
event_id = res.get("Event", {}).get("id")
|
|
if event_id:
|
|
unique_events.add(event_id)
|
|
|
|
if len(unique_events) > 1:
|
|
self.stats['multi_event_correlations'] += 1
|
|
|
|
# Track community vs own org hits with correlation details
|
|
orgs = set()
|
|
events = set()
|
|
for res in result:
|
|
org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown")
|
|
event_info = res.get("Event", {}).get("info", "Unknown")[:50]
|
|
orgs.add(org)
|
|
events.add(event_info)
|
|
|
|
if len(orgs) > 1 or any(org not in ["Unknown", "Your Organization"] for org in orgs):
|
|
self.stats['community_hits'] += 1
|
|
logger.info(f"Community MISP hit: {indicator} | {len(result)} correlations | "
|
|
f"Events: {', '.join(list(events)[:2])} | Sources: {', '.join(list(orgs)[:3])}")
|
|
else:
|
|
self.stats['own_org_hits'] += 1
|
|
logger.info(f"Own org MISP hit: {indicator} | {len(result)} correlations | "
|
|
f"Events: {', '.join(list(events)[:2])}")
|
|
|
|
self.processed_indicators.add(indicator)
|
|
|
|
# Small delay to be nice to MISP server
|
|
time.sleep(self.request_delay)
|
|
|
|
# Mark events that have matches
|
|
self.check_existing_matches(events_with_indicators, attr)
|
|
|
|
def test_community_connectivity(self):
|
|
"""Test if community feeds are accessible."""
|
|
if not self.include_community:
|
|
return "Community search disabled"
|
|
|
|
try:
|
|
# Test with a known community indicator (if available)
|
|
test_payload = {
|
|
"returnFormat": "json",
|
|
"distribution": [1, 2, 3], # Community levels only
|
|
"limit": 1,
|
|
"enforceWarninglist": False,
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{self.misp_url}/attributes/restSearch/",
|
|
json=test_payload,
|
|
headers={"Authorization": self.misp_api_key},
|
|
verify=False,
|
|
timeout=30,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
attributes = data.get("response", {}).get("Attribute", [])
|
|
if attributes:
|
|
orgs = set()
|
|
for attr in attributes[:5]:
|
|
org = attr.get("Event", {}).get("Orgc", {}).get("name", "Unknown")
|
|
orgs.add(org)
|
|
return f"Community access OK - {len(attributes)} indicators from {len(orgs)} orgs: {', '.join(list(orgs)[:3])}"
|
|
else:
|
|
return "Community access OK but no community indicators found"
|
|
else:
|
|
return f"Community test failed: HTTP {response.status_code}"
|
|
|
|
except Exception as e:
|
|
return f"Community test error: {e}"
|
|
|
|
def check_existing_matches(self, events_with_indicators, attr):
|
|
"""Check events against existing MISP results."""
|
|
for event, indicators in events_with_indicators:
|
|
# Check if any indicator has MISP match
|
|
for indicator in indicators:
|
|
key = f"{attr}:{indicator}"
|
|
if key in self.result_dict and self.result_dict[key]:
|
|
self.mark_event(event, self.result_dict[key], attr)
|
|
break # Only mark once per event
|
|
|
|
def query_misp(self, query, attr, timesketch_attr):
|
|
"""Process events in chunks - enhanced for large datasets."""
|
|
logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}")
|
|
logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}")
|
|
|
|
# Get event stream
|
|
events_stream = self.event_stream(
|
|
query_string=query,
|
|
return_fields=[timesketch_attr, '_id']
|
|
)
|
|
|
|
current_chunk = []
|
|
|
|
try:
|
|
for event in events_stream:
|
|
current_chunk.append(event)
|
|
|
|
# Process when chunk is full
|
|
if len(current_chunk) >= self.chunk_size:
|
|
self.process_chunk(current_chunk, attr, timesketch_attr)
|
|
current_chunk = []
|
|
|
|
# Progress logging
|
|
if self.stats['events_processed'] % 10000 == 0:
|
|
logger.info(f"Progress: {self.stats['events_processed']} events processed, "
|
|
f"{self.stats['events_marked']} marked, "
|
|
f"{self.stats['api_calls']} API calls, "
|
|
f"{self.stats['api_timeouts']} timeouts")
|
|
|
|
# Process final chunk
|
|
if current_chunk:
|
|
self.process_chunk(current_chunk, attr, timesketch_attr)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during chunk processing: {e}")
|
|
raise
|
|
|
|
# Create view if we found matches
|
|
if self.stats['events_marked'] > 0:
|
|
view_name = "MISP Threat Intelligence"
|
|
if self.include_community:
|
|
view_name += " (Community)"
|
|
|
|
self.sketch.add_view(
|
|
view_name=view_name,
|
|
analyzer_name=self.NAME,
|
|
query_string='tag:"MISP-*" OR tag:"threat-intel"',
|
|
)
|
|
|
|
# Create additional view for multi-event correlations
|
|
correlation_count = sum(1 for key, results in self.result_dict.items()
|
|
if results and len(results) > 1)
|
|
if correlation_count > 0:
|
|
self.sketch.add_view(
|
|
view_name="MISP Multi-Event Correlations",
|
|
analyzer_name=self.NAME,
|
|
query_string='tag:"multi-event-correlation"',
|
|
)
|
|
|
|
def run(self):
|
|
"""Entry point for the analyzer."""
|
|
if not self.misp_url or not self.misp_api_key:
|
|
return "No MISP configuration found"
|
|
|
|
start_time = time.time()
|
|
|
|
# Test community connectivity if enabled
|
|
if self.include_community:
|
|
community_status = self.test_community_connectivity()
|
|
logger.info(f"Community connectivity test: {community_status}")
|
|
|
|
try:
|
|
self.query_misp(self._query_string, self._attr, self._timesketch_attr)
|
|
|
|
elapsed = time.time() - start_time
|
|
success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) /
|
|
max(1, self.stats['api_calls']) * 100)
|
|
|
|
# Enhanced results with community and correlation statistics
|
|
result = (f"[{self._timesketch_attr}] MISP Analysis Complete: "
|
|
f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | "
|
|
f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ")
|
|
|
|
if self.include_community:
|
|
result += f"Community hits: {self.stats['community_hits']}, Own org: {self.stats['own_org_hits']} | "
|
|
|
|
result += f"Total correlations: {self.stats['total_correlations']}"
|
|
if self.stats['multi_event_correlations'] > 0:
|
|
result += f", Multi-event: {self.stats['multi_event_correlations']}"
|
|
|
|
result += f" | {elapsed:.0f}s"
|
|
|
|
logger.info(result)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"MISP analyzer error: {e}")
|
|
return f"[{self._timesketch_attr}] MISP Error: {str(e)}"
|
|
|
|
|
|
manager.AnalysisManager.register_analyzer(MispAnalyzer) |