From 4fd7bc0fbc9f34e085c68f40a769114e726cbcff Mon Sep 17 00:00:00 2001 From: SERVICE TIMESKETCH Date: Mon, 25 Aug 2025 12:42:45 +0000 Subject: [PATCH] add scripts --- __init__.py | 19 ++ misp_ip_analyzer.py | 500 ++++++++++++++++++++++++++++++++++++++++++++ shodan_analyzer.py | 130 ++++++++++++ 3 files changed, 649 insertions(+) create mode 100644 __init__.py create mode 100644 misp_ip_analyzer.py create mode 100644 shodan_analyzer.py diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..8d54f27 --- /dev/null +++ b/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2022 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contrib Analyzer module.""" +from timesketch.lib.analyzers.contrib import bigquery_matcher +from timesketch.lib.analyzers.contrib import misp_analyzer +from timesketch.lib.analyzers.contrib import misp_ip_analyzer +from timesketch.lib.analyzers.contrib import shodan_analyzer +from timesketch.lib.analyzers.contrib import hashlookup_analyzer diff --git a/misp_ip_analyzer.py b/misp_ip_analyzer.py new file mode 100644 index 0000000..2ee971d --- /dev/null +++ b/misp_ip_analyzer.py @@ -0,0 +1,500 @@ +"""Index analyzer plugin for MISP - Simple and reliable for large-scale processing.""" + +import logging +import ntpath +import re +import requests +import time + +from flask import current_app +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager + + +logger = logging.getLogger("timesketch.analyzers.misp") + + +class MispAnalyzer(interface.BaseAnalyzer): + """Simple, reliable MISP Analyzer for large-scale processing.""" + + NAME = "misp_ip_analyzer" + DISPLAY_NAME = "MISP-IP" + DESCRIPTION = "Mark events using MISP - Simple and Reliable" + + def __init__(self, index_name, sketch_id, timeline_id=None, **kwargs): + """Initialize the Analyzer.""" + super().__init__(index_name, sketch_id, timeline_id=timeline_id) + self.misp_url = current_app.config.get("MISP_URL") + self.misp_api_key = current_app.config.get("MISP_API_KEY") + self.total_event_counter = 0 + self.result_dict = {} + self._query_string = kwargs.get("query_string") + self._attr = kwargs.get("attr") + self._timesketch_attr = kwargs.get("timesketch_attr") + + self.include_community = kwargs.get("include_community", False) + self.chunk_size = kwargs.get("chunk_size", 1000) + self.max_retries = kwargs.get("max_retries", 2) + self.request_delay = kwargs.get("request_delay", 0.5) + + self.ip_pattern = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') + + self.processed_indicators = set() + self.failed_indicators = set() + + self.stats = { + 'events_processed': 0, + 'indicators_found': 0, + 'api_calls': 0, + 'api_timeouts': 0, + 'events_marked': 0, + 'community_hits': 0, + 'own_org_hits': 0, + 'total_correlations': 0, + 'multi_event_correlations': 0 + } + + @staticmethod + def get_kwargs(): + """Get kwargs for the analyzer.""" + to_query = [ + { + "query_string": "md5_hash:*", + "attr": "md5", + "timesketch_attr": "md5_hash", + "include_community": True, + }, + { + "query_string": "sha1_hash:*", + "attr": "sha1", + "timesketch_attr": "sha1_hash", + "include_community": True, + }, + { + "query_string": "sha256_hash:*", + "attr": "sha256", + "timesketch_attr": "sha256_hash", + "include_community": True, + }, + { + "query_string": "filename:*", + "attr": "filename", + "timesketch_attr": "filename", + "include_community": True, + }, + { + "query_string": "source_ip:*", + "attr": "ip-src", + "timesketch_attr": "source_ip", + "include_community": True, + }, + ] + return to_query + + def _is_valid_ip(self, ip_str): + """Simple IP validation.""" + try: + import ipaddress + ip_str = ip_str.strip() + ipaddress.ip_address(ip_str) + if ip_str.startswith(('0.', '127.', '255.255.255.255', '10.', '192.168.', '172.')): + return False + return True + except (ValueError, AttributeError): + return False + + def _is_valid_hash(self, hash_str, hash_type): + """Simple hash validation.""" + if not hash_str: + return False + hash_str = hash_str.strip().lower() + + if hash_type == "md5": + return len(hash_str) == 32 and all(c in '0123456789abcdef' for c in hash_str) + elif hash_type == "sha1": + return len(hash_str) == 40 and all(c in '0123456789abcdef' for c in hash_str) + elif hash_type == "sha256": + return len(hash_str) == 64 and all(c in '0123456789abcdef' for c in hash_str) + + return False + + def query_misp_single(self, value, attr, retry_count=0): + """Query MISP for a single value.""" + if value in self.failed_indicators: + return [] + + try: + # For IP searches, query both ip-src and ip-dst + search_types = [] + if attr.startswith("ip-"): + search_types = ["ip-src", "ip-dst"] + else: + search_types = [attr] + + all_results = [] + + for search_type in search_types: + payload = { + "returnFormat": "json", + "value": value, + "type": search_type, + "enforceWarninglist": False, + "includeEventTags": True, + "includeContext": True, + } + + if self.include_community: + payload.update({ + "distribution": [0, 1, 2, 3, 5], + "includeEventUuid": True, + "includeCorrelations": True, + "includeDecayScore": False, + "includeFullModel": False, + }) + else: + payload["distribution"] = [0] + + self.stats['api_calls'] += 1 + + response = requests.post( + f"{self.misp_url}/attributes/restSearch/", + json=payload, + headers={"Authorization": self.misp_api_key}, + verify=False, + timeout=45, + ) + + if response.status_code == 200: + data = response.json() + attributes = data.get("response", {}).get("Attribute", []) + all_results.extend(attributes) + + time.sleep(0.1) + + return all_results + + except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: + self.stats['api_timeouts'] += 1 + + if retry_count < self.max_retries: + wait_time = (retry_count + 1) * 2 + logger.warning(f"Timeout for {value}, retrying in {wait_time}s (attempt {retry_count + 1})") + time.sleep(wait_time) + return self.query_misp_single(value, attr, retry_count + 1) + else: + logger.error(f"Max retries exceeded for {value}: {e}") + self.failed_indicators.add(value) + return [] + + except Exception as e: + logger.debug(f"Error querying MISP for {value}: {e}") + return [] + + def mark_event(self, event, result, attr): + """Mark event with MISP intelligence including event links.""" + try: + if attr.startswith("ip-"): + msg = "MISP: Malicious IP" + else: + msg = "MISP: Known indicator" + + unique_events = {} + orgs = set() + + for res in result: + event_info = res.get("Event", {}) + event_id = event_info.get("id") + event_desc = event_info.get("info", "Unknown") + org_name = event_info.get("Orgc", {}).get("name", "Unknown") + + if event_id and event_id not in unique_events: + unique_events[event_id] = { + 'description': event_desc, + 'url': f"{self.misp_url}/events/view/{event_id}" + } + + if org_name != "Unknown": + orgs.add(org_name) + + unique_event_list = list(unique_events.values()) + + if len(unique_event_list) == 1: + event_data = unique_event_list[0] + short_desc = event_data['description'][:50] + "..." if len(event_data['description']) > 50 else event_data['description'] + msg += f" | Event: {short_desc} | Link: {event_data['url']}" + elif len(unique_event_list) > 1: + msg += f" | {len(unique_event_list)} Events:" + for i, event_data in enumerate(unique_event_list[:2]): + short_desc = event_data['description'][:40] + "..." if len(event_data['description']) > 40 else event_data['description'] + msg += f" [{i+1}] {short_desc} ({event_data['url']})" + if i < len(unique_event_list) - 1 and i < 1: + msg += " |" + + if len(unique_event_list) > 2: + msg += f" | +{len(unique_event_list)-2} more events" + + if self.include_community and orgs: + if len(orgs) > 1: + msg += f" | Sources: {', '.join(list(orgs)[:2])}" + if len(orgs) > 2: + msg += f" +{len(orgs)-2} more" + else: + msg += f" | Source: {list(orgs)[0]}" + + if len(result) > 1: + msg += f" | {len(result)} total correlations" + + tags = [f"MISP-{attr}", "threat-intel"] + if self.include_community: + tags.append("community-intel") + if len(unique_event_list) > 1: + tags.append("multi-event-correlation") + + event.add_comment(msg) + event.add_tags(tags) + event.commit() + + self.stats['events_marked'] += 1 + + logger.info(f"Marked event with {len(unique_event_list)} unique MISP events") + + except Exception as e: + logger.error(f"Error marking event: {e}") + + def extract_indicators_from_chunk(self, events_chunk, attr, timesketch_attr): + """Extract indicators from a chunk of events.""" + chunk_indicators = [] + events_with_indicators = [] + + for event in events_chunk: + self.stats['events_processed'] += 1 + + loc = event.source.get(timesketch_attr) + if not loc: + continue + + indicators = [] + + if attr.startswith("ip-") and timesketch_attr == "message": + ip_matches = self.ip_pattern.findall(str(loc)) + indicators = [ip for ip in ip_matches if self._is_valid_ip(ip)] + + elif attr.startswith("ip-") and timesketch_attr in ["source_ip", "src_ip", "client_ip"]: + if self._is_valid_ip(str(loc)): + indicators = [str(loc)] + + elif attr in ["md5", "sha1", "sha256"]: + if self._is_valid_hash(str(loc), attr): + indicators = [str(loc)] + + elif attr == "filename": + filename = ntpath.basename(str(loc)) + if filename and len(filename) > 1: + indicators = [filename] + + if indicators: + events_with_indicators.append((event, indicators)) + chunk_indicators.extend(indicators) + + return events_with_indicators, chunk_indicators + + def process_chunk(self, events_chunk, attr, timesketch_attr): + """Process a chunk of events.""" + events_with_indicators, chunk_indicators = self.extract_indicators_from_chunk( + events_chunk, attr, timesketch_attr + ) + + if not chunk_indicators: + return + + unique_indicators = list(dict.fromkeys(chunk_indicators)) + new_indicators = [ind for ind in unique_indicators if ind not in self.processed_indicators] + + if not new_indicators: + self.check_existing_matches(events_with_indicators, attr) + return + + logger.info(f"Processing {len(new_indicators)} new {attr} indicators from {len(events_chunk)} events") + self.stats['indicators_found'] += len(new_indicators) + + for indicator in new_indicators: + if indicator in self.failed_indicators: + continue + + result = self.query_misp_single(indicator, attr) + if result: + self.result_dict[f"{attr}:{indicator}"] = result + + self.stats['total_correlations'] += len(result) + unique_events = set() + for res in result: + event_id = res.get("Event", {}).get("id") + if event_id: + unique_events.add(event_id) + + if len(unique_events) > 1: + self.stats['multi_event_correlations'] += 1 + + orgs = set() + events = set() + for res in result: + org = res.get("Event", {}).get("Orgc", {}).get("name", "Unknown") + event_info = res.get("Event", {}).get("info", "Unknown")[:50] + orgs.add(org) + events.add(event_info) + + if len(orgs) > 1 or any(org not in ["Unknown", "Your Organization"] for org in orgs): + self.stats['community_hits'] += 1 + logger.info(f"Community MISP hit: {indicator} | {len(result)} correlations | " + f"Events: {', '.join(list(events)[:2])} | Sources: {', '.join(list(orgs)[:3])}") + else: + self.stats['own_org_hits'] += 1 + logger.info(f"Own org MISP hit: {indicator} | {len(result)} correlations | " + f"Events: {', '.join(list(events)[:2])}") + + self.processed_indicators.add(indicator) + + time.sleep(self.request_delay) + + self.check_existing_matches(events_with_indicators, attr) + + def test_community_connectivity(self): + """Test if community feeds are accessible.""" + if not self.include_community: + return "Community search disabled" + + try: + test_payload = { + "returnFormat": "json", + "distribution": [1, 2, 3], + "limit": 1, + "enforceWarninglist": False, + } + + response = requests.post( + f"{self.misp_url}/attributes/restSearch/", + json=test_payload, + headers={"Authorization": self.misp_api_key}, + verify=False, + timeout=30, + ) + + if response.status_code == 200: + data = response.json() + attributes = data.get("response", {}).get("Attribute", []) + if attributes: + orgs = set() + for attr in attributes[:5]: + org = attr.get("Event", {}).get("Orgc", {}).get("name", "Unknown") + orgs.add(org) + return f"Community access OK - {len(attributes)} indicators from {len(orgs)} orgs: {', '.join(list(orgs)[:3])}" + else: + return "Community access OK but no community indicators found" + else: + return f"Community test failed: HTTP {response.status_code}" + + except Exception as e: + return f"Community test error: {e}" + + def check_existing_matches(self, events_with_indicators, attr): + """Check events against existing MISP results.""" + for event, indicators in events_with_indicators: + for indicator in indicators: + key = f"{attr}:{indicator}" + if key in self.result_dict and self.result_dict[key]: + self.mark_event(event, self.result_dict[key], attr) + break + + def query_misp(self, query, attr, timesketch_attr): + """Process events in chunks.""" + logger.info(f"Starting MISP analysis for {attr} in {timesketch_attr}") + logger.info(f"Community search: {'enabled' if self.include_community else 'disabled'}") + + events_stream = self.event_stream( + query_string=query, + return_fields=[timesketch_attr, '_id'] + ) + + current_chunk = [] + + try: + for event in events_stream: + current_chunk.append(event) + + if len(current_chunk) >= self.chunk_size: + self.process_chunk(current_chunk, attr, timesketch_attr) + current_chunk = [] + + if self.stats['events_processed'] % 10000 == 0: + logger.info(f"Progress: {self.stats['events_processed']} events processed, " + f"{self.stats['events_marked']} marked, " + f"{self.stats['api_calls']} API calls, " + f"{self.stats['api_timeouts']} timeouts") + + if current_chunk: + self.process_chunk(current_chunk, attr, timesketch_attr) + + except Exception as e: + logger.error(f"Error during chunk processing: {e}") + raise + + if self.stats['events_marked'] > 0: + view_name = "MISP Threat Intelligence" + if self.include_community: + view_name += " (Community)" + + self.sketch.add_view( + view_name=view_name, + analyzer_name=self.NAME, + query_string='tag:"MISP-*" OR tag:"threat-intel"', + ) + + correlation_count = sum(1 for key, results in self.result_dict.items() + if results and len(results) > 1) + if correlation_count > 0: + self.sketch.add_view( + view_name="MISP Multi-Event Correlations", + analyzer_name=self.NAME, + query_string='tag:"multi-event-correlation"', + ) + + def run(self): + """Entry point for the analyzer.""" + if not self.misp_url or not self.misp_api_key: + return "No MISP configuration found" + + start_time = time.time() + + if self.include_community: + community_status = self.test_community_connectivity() + logger.info(f"Community connectivity test: {community_status}") + + try: + self.query_misp(self._query_string, self._attr, self._timesketch_attr) + + elapsed = time.time() - start_time + success_rate = ((self.stats['api_calls'] - self.stats['api_timeouts']) / + max(1, self.stats['api_calls']) * 100) + + result = (f"[{self._timesketch_attr}] MISP Analysis Complete: " + f"{self.stats['events_marked']}/{self.stats['events_processed']} events marked | " + f"{self.stats['api_calls']} API calls ({success_rate:.1f}% success) | ") + + if self.include_community: + result += f"Community hits: {self.stats['community_hits']}, Own org: {self.stats['own_org_hits']} | " + + result += f"Total correlations: {self.stats['total_correlations']}" + if self.stats['multi_event_correlations'] > 0: + result += f", Multi-event: {self.stats['multi_event_correlations']}" + + result += f" | {elapsed:.0f}s" + + logger.info(result) + return result + + except Exception as e: + logger.error(f"MISP analyzer error: {e}") + return f"[{self._timesketch_attr}] MISP Error: {str(e)}" + + +manager.AnalysisManager.register_analyzer(MispAnalyzer) diff --git a/shodan_analyzer.py b/shodan_analyzer.py new file mode 100644 index 0000000..d6c6449 --- /dev/null +++ b/shodan_analyzer.py @@ -0,0 +1,130 @@ +from timesketch.lib.analyzers import interface +from timesketch.lib.analyzers import manager +import requests +import json +from datetime import datetime +import ipaddress + +class ShodanEnrichmentAnalyzer(interface.BaseAnalyzer): + """Analyzer to enrich IP addresses with Shodan data.""" + + NAME = 'shodan_enrichment' + DISPLAY_NAME = 'Shodan IP Enrichment' + DESCRIPTION = 'Enriches source IP addresses with Shodan historical data' + + def __init__(self, index_name, sketch_id, timeline_id=None): + super().__init__(index_name, sketch_id, timeline_id) + + # Get API key from config or environment + config = self.get_config() + self.shodan_api_key = config.get('api_key', '') + self.max_time_diff_hours = config.get('max_time_diff_hours', 24) + self.rate_limit_delay = config.get('rate_limit_delay', 1) + + if not self.shodan_api_key: + self.logger.error("Shodan API key not configured") + + def run(self): + """Main analyzer logic.""" + if not self.shodan_api_key: + return "Shodan API key not configured" + + query = { + 'query': { + 'bool': { + 'must': [ + {'exists': {'field': 'source_ip'}}, + {'bool': {'must_not': [{'term': {'__ts_analyzer_shodan_enrichment': True}}]}} + ] + } + } + } + + events = self.event_stream(query_dsl=query, return_fields=['source_ip', 'timestamp']) + processed_count = 0 + + for event in events: + source_ip = event.source.get('source_ip') + timestamp = event.source.get('timestamp') + + if source_ip and self._is_public_ip(source_ip): + shodan_data = self._get_shodan_data(source_ip) + if shodan_data: + self._enrich_event(event, shodan_data) + processed_count += 1 + + # Rate limiting + import time + time.sleep(self.rate_limit_delay) + + return f"Processed {processed_count} events with Shodan data" + + def _get_shodan_data(self, ip): + """Fetch Shodan data for IP.""" + try: + url = f"https://api.shodan.io/shodan/host/{ip}" + params = { + 'key': self.shodan_api_key, + 'history': 'true' + } + + response = requests.get(url, params=params, timeout=10) + + if response.status_code == 200: + return response.json() + elif response.status_code == 404: + self.logger.debug(f'No Shodan data found for {ip}') + return None + else: + self.logger.warning(f'Shodan API error for {ip}: {response.status_code}') + return None + + except requests.exceptions.RequestException as e: + self.logger.warning(f'Request error for {ip}: {e}') + return None + except json.JSONDecodeError as e: + self.logger.warning(f'JSON decode error for {ip}: {e}') + return None + + def _enrich_event(self, event, shodan_data): + """Add Shodan data to the event.""" + enrichment = { + 'shodan_ports': shodan_data.get('ports', []), + 'shodan_org': shodan_data.get('org', ''), + 'shodan_isp': shodan_data.get('isp', ''), + 'shodan_country': shodan_data.get('location', {}).get('country_name', ''), + 'shodan_city': shodan_data.get('location', {}).get('city', ''), + 'shodan_hostnames': shodan_data.get('hostnames', []), + 'shodan_last_update': shodan_data.get('last_update', ''), + '__ts_analyzer_shodan_enrichment': True + } + + # Add service banners from latest scan + if shodan_data.get('data'): + latest_scan = shodan_data['data'][0] # Most recent scan + enrichment['shodan_services'] = [] + + for service in shodan_data.get('data', []): + service_info = { + 'port': service.get('port'), + 'protocol': service.get('transport', 'tcp'), + 'service': service.get('product', ''), + 'version': service.get('version', ''), + 'timestamp': service.get('timestamp', '') + } + enrichment['shodan_services'].append(service_info) + + event.add_attributes(enrichment) + event.add_tags(['shodan-enriched']) + event.commit() + + def _is_public_ip(self, ip): + """Check if IP is public (not RFC1918 private ranges).""" + try: + ip_obj = ipaddress.ip_address(ip) + return ip_obj.is_global + except (ValueError, ipaddress.AddressValueError): + return False + +# Register the analyzer +manager.AnalysisManager.register_analyzer(ShodanEnrichmentAnalyzer)