This repository has been archived on 2025-08-27. You can view files and clone it, but cannot push or open issues or pull requests.
timesketch_misp/misp_analyzer.py

367 lines
14 KiB
Python

"""Index analyzer plugin for MISP - Simple and reliable for large-scale processing."""
import logging
import ntpath
import re
import requests
import time
from flask import current_app
from timesketch.lib.analyzers import interface
from timesketch.lib.analyzers import manager
logger = logging.getLogger("timesketch.analyzers.misp")
class MispAnalyzer(interface.BaseAnalyzer):
"""Simple, reliable MISP Analyzer for large-scale processing."""
NAME = "misp_analyzer"
DISPLAY_NAME = "MISP"
DESCRIPTION = "Mark events using MISP - Simple and Reliable"
def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs):
"""Initialize the Analyzer."""
super().__init__(index_name, sketch_id, timeline_id=timeline_id)
self.misp_url = current_app.config.get("MISP_URL")
self.misp_api_key = current_app.config.get("MISP_API_KEY")
self.total_event_counter = 0
self.result_dict = {}
self._query_string = kwargs.get("query_string")
self._attr = kwargs.get("attr")
self._timesketch_attr = kwargs.get("timesketch_attr")
# Simple configuration for reliability
self.include_community = kwargs.get("include_community", False) # Default to false for reliability
self.chunk_size = kwargs.get("chunk_size", 1000) # Process in chunks
self.max_retries = kwargs.get("max_retries", 2) # Minimal retries
self.request_delay = kwargs.get("request_delay", 0.5) # Small delay between requests
self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
# Track processed items
self.processed_indicators = set()
self.failed_indicators = set()
# Simple stats
self.stats = {
'events_processed': 0,
'indicators_found': 0,
'api_calls': 0,
'api_timeouts': 0,
'events_marked': 0
}
@staticmethod
def get_kwargs():
"""Get kwargs for the analyzer - keeping original working structure."""
to_query = [
{
"query_string": "md5_hash:*",
"attr": "md5",
"timesketch_attr": "md5_hash",
"include_community": False, # Start with own org only
},
{
"query_string": "sha1_hash:*",
"attr": "sha1",
"timesketch_attr": "sha1_hash",
"include_community": False,
},
{
"query_string": "sha256_hash:*",
"attr": "sha256",
"timesketch_attr": "sha256_hash",
"include_community": False,
},
{
"query_string": "filename:*",
"attr": "filename",
"timesketch_attr": "filename",
"include_community": False,
},
{
"query_string": "message:*",
"attr": "ip-src",
"timesketch_attr": "message",
"include_community": False,
},
{
"query_string": "message:*",
"attr": "ip-dst",
"timesketch_attr": "message",
"include_community": False,
},
{
"query_string": "source_ip:*",
"attr": "ip-src",
"timesketch_attr": "source_ip",
"include_community": False,
},
]
return to_query
def _is_valid_ip(self, ip_str):
"""Simple IP validation - keeping original working version."""
try:
import ipaddress
ip_str = ip_str.strip()
ipaddress.ip_address(ip_str)
# Filter out invalid ranges
if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')):
return False
return True
except (ValueError, AttributeError):
return False
def _is_valid_hash(self, hash_str, hash_type):
"""Simple hash validation - keeping original working version."""
if not hash_str:
return False
hash_str = hash_str.strip().lower()
if hash_type == "md5":
return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str)
elif hash_type == "sha1":
return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str)
elif hash_type == "sha256":
return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str)
return False
def query_misp_single(self, value, attr, retry_count=0):
"""Query MISP for a single value - enhanced with minimal retry logic."""
if value in self.failed_indicators:
return []
try:
# Build basic payload
payload = {"returnFormat": "json", "value": value, "type": attr}
# Add community search if enabled
if self.include_community:
payload["distribution"] = [0, 1, 2] # Own, community, connected
self.stats['api_calls'] += 1
response = requests.post(
f"{self.misp_url}/attributes/restSearch/",
json=payload,
headers={"Authorization": self.misp_api_key},
verify=False,
timeout=45, # Slightly increased from original 30s
)
if response.status_code != 200:
logger.debug(f"MISP API returned status {response.status_code} for {value}")
return []
data = response.json()
return data.get("response", {}).get("Attribute", [])
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
self.stats['api_timeouts'] += 1
if retry_count < self.max_retries:
wait_time = (retry_count + 1) * 2 # Simple backoff: 2s, 4s
logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})")
time.sleep(wait_time)
return self.query_misp_single(value, attr, retry_count + 1)
else:
logger.error(f"Max retries exceeded for {value}: {e}")
self.failed_indicators.add(value)
return []
except Exception as e:
logger.debug(f"Error querying MISP for {value}: {e}")
return []
def mark_event(self, event, result, attr):
"""Mark event with MISP intelligence - keeping original working version."""
try:
if attr.startswith("ip-"):
msg = "MISP: Malicious IP - "
else:
msg = "MISP: Known indicator - "
event_info = result[0].get("Event", {}).get("info", "Unknown")
msg += event_info
if len(result) > 1:
msg += f" (+{len(result)-1} more)"
event.add_comment(msg)
event.add_tags([f"MISP-{attr}", "threat-intel"])
event.commit()
self.stats['events_marked'] += 1
except Exception as e:
logger.error(f"Error marking event: {e}")
def extract_indicators_from_chunk(self, events_chunk, attr, timesketch_attr):
"""Extract indicators from a chunk of events."""
chunk_indicators = []
events_with_indicators = []
for event in events_chunk:
self.stats['events_processed'] += 1
loc = event.source.get(timesketch_attr)
if not loc:
continue
indicators = []
# Extract based on attribute type - SAME AS ORIGINAL
if attr.startswith("ip-") and timesketch_attr == "message":
ip_matches = self.ip_pattern.findall(str(loc))
indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)]
elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]:
if self._is_valid_ip(str(loc)):
indicators = [str(loc)]
elif attr in ["md5", "sha1", "sha256"]:
if self._is_valid_hash(str(loc), attr):
indicators = [str(loc)]
elif attr == "filename":
filename = ntpath.basename(str(loc))
if filename and len(filename) > 1:
indicators = [filename]
# Store event with its indicators
if indicators:
events_with_indicators.append((event, indicators))
chunk_indicators.extend(indicators)
return events_with_indicators, chunk_indicators
def process_chunk(self, events_chunk, attr, timesketch_attr):
"""Process a chunk of events."""
events_with_indicators, chunk_indicators = self.extract_indicators_from_chunk(
events_chunk, attr, timesketch_attr
)
if not chunk_indicators:
return
# Get unique new indicators
unique_indicators = list(dict.fromkeys(chunk_indicators))
new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators]
if not new_indicators:
# Still need to check existing cache for matches
self.check_existing_matches(events_with_indicators, attr)
return
logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events")
self.stats['indicators_found'] += len(new_indicators)
# Query MISP for each new indicator
for indicator in new_indicators:
if indicator in self.failed_indicators:
continue
result = self.query_misp_single(indicator, attr)
if result:
self.result_dict[f"{attr}:{indicator}"] = result
logger.info(f"MISP hit: {indicator} ({len(result)} matches)")
self.processed_indicators.add(indicator)
# Small delay to be nice to MISP server
time.sleep(self.request_delay)
# Mark events that have matches
self.check_existing_matches(events_with_indicators, attr)
def check_existing_matches(self, events_with_indicators, attr):
"""Check events against existing MISP results."""
for event, indicators in events_with_indicators:
# Check if any indicator has MISP match
for indicator in indicators:
key = f"{attr}:{indicator}"
if key in self.result_dict and self.result_dict[key]:
self.mark_event(event, self.result_dict[key], attr)
break # Only mark once per event
def query_misp(self, query, attr, timesketch_attr):
"""Process events in chunks - enhanced for large datasets."""
logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}")
logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}")
# Get event stream
events_stream = self.event_stream(
query_string=query,
return_fields=[timesketch_attr, '_id']
)
current_chunk = []
try:
for event in events_stream:
current_chunk.append(event)
# Process when chunk is full
if len(current_chunk) >= self.chunk_size:
self.process_chunk(current_chunk, attr, timesketch_attr)
current_chunk = []
# Progress logging
if self.stats['events_processed'] % 10000 == 0:
logger.info(f"Progress: {self.stats['events_processed']} events processed, "
f"{self.stats['events_marked']} marked, "
f"{self.stats['api_calls']} API calls, "
f"{self.stats['api_timeouts']} timeouts")
# Process final chunk
if current_chunk:
self.process_chunk(current_chunk, attr, timesketch_attr)
except Exception as e:
logger.error(f"Error during chunk processing: {e}")
raise
# Create view if we found matches
if self.stats['events_marked'] > 0:
view_name = "MISP Threat Intelligence"
if self.include_community:
view_name += " (Community)"
self.sketch.add_view(
view_name=view_name,
analyzer_name=self.NAME,
query_string='tag:"MISP-*" OR tag:"threat-intel"',
)
def run(self):
"""Entry point for the analyzer."""
if not self.misp_url or not self.misp_api_key:
return "No MISP configuration found"
start_time = time.time()
try:
self.query_misp(self._query_string, self._attr, self._timesketch_attr)
elapsed = time.time() - start_time
success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) /
max(1, self.stats['api_calls']) * 100)
result = (f"[{self._timesketch_attr}] MISP Analysis Complete: "
f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | "
f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | "
f"{elapsed:.0f}s")
logger.info(result)
return result
except Exception as e:
logger.error(f"MISP analyzer error: {e}")
return f"[{self._timesketch_attr}] MISP Error: {str(e)}"
manager.AnalysisManager.register_analyzer(MispAnalyzer)