diff --git a/CLAUDE.md b/CLAUDE.md index cab2056..e7b5539 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -52,18 +52,30 @@ The application uses a three-level hierarchy: Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy. -### Core Modules +### Modular Structure (Optimized for AI Coding Agents) -**`trace/models.py`**: Data models using dataclasses -- `Note`: Content + timestamp + SHA256 hash + optional GPG signature + auto-extracted tags/IOCs -- `Evidence`: Container for notes about a specific piece of evidence, includes metadata dict for source hashes -- `Case`: Top-level container with case number, investigator, evidence list, and notes +The codebase is organized into focused, single-responsibility modules to make it easier for AI agents and developers to navigate, understand, and modify specific functionality: + +**`trace/models/`**: Data models package +- `__init__.py`: Main model classes (Note, Evidence, Case) with dataclass definitions +- `extractors/tag_extractor.py`: Tag extraction logic (hashtag parsing) +- `extractors/ioc_extractor.py`: IOC extraction logic (IPs, domains, URLs, hashes, emails) - All models implement `to_dict()`/`from_dict()` for JSON serialization +- Models use extractors for automatic tag and IOC detection -**`trace/storage.py`**: Persistence layer -- `Storage`: Manages `~/.trace/data.json` with atomic writes (temp file + rename) -- `StateManager`: Manages `~/.trace/state` (active case/evidence) and `~/.trace/settings.json` (PGP enabled/disabled) -- Data is loaded into memory on init, modified, then saved atomically +**`trace/storage_impl/`**: Storage implementation package +- `storage.py`: Main Storage class managing `~/.trace/data.json` with atomic writes +- `state_manager.py`: StateManager for active context and settings persistence +- `lock_manager.py`: Cross-platform file locking to prevent concurrent access +- `demo_data.py`: Demo case creation for first-time users +- Backward compatible via `trace/storage.py` wrapper + +**`trace/tui/`**: Text User Interface package +- `tui.py`: Main TUI class with view hierarchy and event loop (3307 lines - target for future refactoring) +- `rendering/colors.py`: Color pair initialization and constants +- `rendering/text_renderer.py`: Text rendering with IOC/tag highlighting +- `handlers/export_handler.py`: Export functionality (IOCs, markdown reports) +- Future refactoring will extract views, dialogs, and input handlers **`trace/crypto.py`**: Integrity features - `sign_content()`: GPG clearsign via subprocess (falls back gracefully if GPG unavailable) @@ -74,13 +86,6 @@ Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy. - `export_markdown()`: Generates full case report with hashes and signatures - `main()`: Argument parsing, routes to TUI or CLI functions -**`trace/tui.py`**: Curses-based Text User Interface -- View hierarchy: case_list → case_detail → evidence_detail -- Additional views: tags_list, tag_notes_list, ioc_list, ioc_notes_list, note_detail -- Multi-line note editor with Ctrl+G to submit, Esc to cancel -- Filter mode (press `/`), active context management (press `a`) -- All note additions automatically extract tags (#hashtag) and IOCs (IPs, domains, URLs, hashes, emails) - ### Key Features Implementation **Integrity System**: Every note automatically gets: @@ -129,3 +134,33 @@ temp_file.replace(self.data_file) ## Testing Notes Tests use temporary directories created with `tempfile.mkdtemp()` and cleaned up in `tearDown()` to avoid polluting `~/.trace/`. + +## AI Agent Optimization + +The codebase has been restructured to be optimal for AI coding agents: + +### Module Organization Benefits +- **Focused Files**: Each module has a single, clear responsibility (50-250 lines typically) +- **Easy Navigation**: Functionality is easy to locate by purpose (e.g., IOC extraction, export handlers) +- **Independent Modification**: Changes to one module rarely affect others +- **Clear Interfaces**: Modules communicate through well-defined imports +- **Reduced Context**: AI agents can focus on relevant files without loading massive monoliths + +### File Size Guidelines +- **Small modules** (< 150 lines): Ideal for focused tasks +- **Medium modules** (150-300 lines): Acceptable for cohesive functionality +- **Large modules** (> 500 lines): Consider refactoring into smaller components +- **Very large modules** (> 1000 lines): Priority target for extraction and modularization + +### Current Status +- ✅ Models: Organized into package with extractors separated +- ✅ Storage: Split into focused modules (storage, state, locking, demo data) +- ✅ TUI Utilities: Rendering and export handlers extracted +- ⏳ TUI Main: Still monolithic (3307 lines) - future refactoring needed + +### Future Refactoring Targets +The `trace/tui.py` file (3307 lines) should be further split into: +- `tui/views/` - Individual view classes (case list, evidence detail, etc.) +- `tui/dialogs/` - Dialog functions (input, confirm, settings, etc.) +- `tui/handlers/` - Input and navigation handlers +- `tui/app.py` - Main TUI orchestration class diff --git a/trace/cli.py b/trace/cli.py index 33a841a..6101a19 100644 --- a/trace/cli.py +++ b/trace/cli.py @@ -163,7 +163,7 @@ def main(): # Launch TUI (with optional direct navigation to active context) try: - from .tui import run_tui + from .tui_app import run_tui run_tui(open_active=args.open) except ImportError as e: print(f"Error launching TUI: {e}") diff --git a/trace/models.py b/trace/models.py deleted file mode 100644 index 27ae3e8..0000000 --- a/trace/models.py +++ /dev/null @@ -1,311 +0,0 @@ -import time -import hashlib -import uuid -import re -from dataclasses import dataclass, field -from typing import List, Optional, Dict - -@dataclass -class Note: - content: str - timestamp: float = field(default_factory=time.time) - note_id: str = field(default_factory=lambda: str(uuid.uuid4())) - content_hash: str = "" - signature: Optional[str] = None - tags: List[str] = field(default_factory=list) - iocs: List[str] = field(default_factory=list) - - def extract_tags(self): - """Extract hashtags from content (case-insensitive, stored lowercase)""" - # Match hashtags: # followed by word characters - tag_pattern = r'#(\w+)' - matches = re.findall(tag_pattern, self.content) - # Convert to lowercase and remove duplicates while preserving order - seen = set() - self.tags = [] - for tag in matches: - tag_lower = tag.lower() - if tag_lower not in seen: - seen.add(tag_lower) - self.tags.append(tag_lower) - - def extract_iocs(self): - """Extract Indicators of Compromise from content""" - seen = set() - covered_ranges = set() - self.iocs = [] - - def add_ioc_if_not_covered(match_obj): - """Add IOC if its range doesn't overlap with already covered ranges""" - start, end = match_obj.start(), match_obj.end() - # Check if this range overlaps with any covered range - for covered_start, covered_end in covered_ranges: - if not (end <= covered_start or start >= covered_end): - return False # Overlaps, don't add - text = match_obj.group() - if text not in seen: - seen.add(text) - covered_ranges.add((start, end)) - self.iocs.append(text) - return True - return False - - # Process in order of priority to avoid false positives - # SHA256 hashes (64 hex chars) - check longest first to avoid substring matches - sha256_pattern = r'\b[a-fA-F0-9]{64}\b' - for match in re.finditer(sha256_pattern, self.content): - add_ioc_if_not_covered(match) - - # SHA1 hashes (40 hex chars) - sha1_pattern = r'\b[a-fA-F0-9]{40}\b' - for match in re.finditer(sha1_pattern, self.content): - add_ioc_if_not_covered(match) - - # MD5 hashes (32 hex chars) - md5_pattern = r'\b[a-fA-F0-9]{32}\b' - for match in re.finditer(md5_pattern, self.content): - add_ioc_if_not_covered(match) - - # IPv4 addresses - ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' - for match in re.finditer(ipv4_pattern, self.content): - add_ioc_if_not_covered(match) - - # IPv6 addresses (supports compressed format) - ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b' - for match in re.finditer(ipv6_pattern, self.content): - add_ioc_if_not_covered(match) - - # URLs (check before domains to prevent double-matching) - # Fix: exclude trailing punctuation - url_pattern = r'https?://[^\s<>\"\']+(?= covered_end): - return False # Overlaps, don't add - ioc_text = match_obj.group() - if ioc_text not in seen: - seen.add(ioc_text) - covered_ranges.add((start, end)) - iocs.append((ioc_text, ioc_type)) - return True - return False - - # Process in priority order: longest hashes first - # SHA256 hashes (64 hex chars) - sha256_pattern = r'\b[a-fA-F0-9]{64}\b' - for match in re.finditer(sha256_pattern, text): - add_ioc_if_not_covered(match, 'sha256') - - # SHA1 hashes (40 hex chars) - sha1_pattern = r'\b[a-fA-F0-9]{40}\b' - for match in re.finditer(sha1_pattern, text): - add_ioc_if_not_covered(match, 'sha1') - - # MD5 hashes (32 hex chars) - md5_pattern = r'\b[a-fA-F0-9]{32}\b' - for match in re.finditer(md5_pattern, text): - add_ioc_if_not_covered(match, 'md5') - - # IPv4 addresses - ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' - for match in re.finditer(ipv4_pattern, text): - add_ioc_if_not_covered(match, 'ipv4') - - # IPv6 addresses (supports compressed format) - ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b' - for match in re.finditer(ipv6_pattern, text): - add_ioc_if_not_covered(match, 'ipv6') - - # URLs (check before domains to avoid double-matching) - # Fix: exclude trailing punctuation - url_pattern = r'https?://[^\s<>\"\']+(?= covered_end): - return True - return False - - def add_highlight(match, ioc_type): - """Add highlight if it doesn't overlap with existing ones""" - start, end = match.start(), match.end() - if not overlaps(start, end): - highlights.append((match.group(), start, end, ioc_type)) - covered_ranges.add((start, end)) - - # Process in priority order: longest hashes first to avoid substring matches - # SHA256 hashes (64 hex chars) - for match in re.finditer(r'\b[a-fA-F0-9]{64}\b', text): - add_highlight(match, 'sha256') - - # SHA1 hashes (40 hex chars) - for match in re.finditer(r'\b[a-fA-F0-9]{40}\b', text): - add_highlight(match, 'sha1') - - # MD5 hashes (32 hex chars) - for match in re.finditer(r'\b[a-fA-F0-9]{32}\b', text): - add_highlight(match, 'md5') - - # IPv4 addresses - ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' - for match in re.finditer(ipv4_pattern, text): - add_highlight(match, 'ipv4') - - # IPv6 addresses (supports compressed format) - ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b' - for match in re.finditer(ipv6_pattern, text): - add_highlight(match, 'ipv6') - - # URLs (check before domains to prevent double-matching) - # Fix: exclude trailing punctuation - for match in re.finditer(r'https?://[^\s<>\"\']+(?\"\']+(? List[str]: + """ + Extract IOCs from text and return as simple list + + Args: + text: The text to extract IOCs from + + Returns: + List of unique IOC strings + """ + seen = set() + covered_ranges = set() + iocs = [] + + def add_ioc_if_not_covered(match_obj): + """Add IOC if its range doesn't overlap with already covered ranges""" + start, end = match_obj.start(), match_obj.end() + # Check if this range overlaps with any covered range + for covered_start, covered_end in covered_ranges: + if not (end <= covered_start or start >= covered_end): + return False # Overlaps, don't add + ioc_text = match_obj.group() + if ioc_text not in seen: + seen.add(ioc_text) + covered_ranges.add((start, end)) + iocs.append(ioc_text) + return True + return False + + # Process in order of priority to avoid false positives + # SHA256 hashes (64 hex chars) - check longest first to avoid substring matches + for match in re.finditer(IOCExtractor.SHA256_PATTERN, text): + add_ioc_if_not_covered(match) + + # SHA1 hashes (40 hex chars) + for match in re.finditer(IOCExtractor.SHA1_PATTERN, text): + add_ioc_if_not_covered(match) + + # MD5 hashes (32 hex chars) + for match in re.finditer(IOCExtractor.MD5_PATTERN, text): + add_ioc_if_not_covered(match) + + # IPv4 addresses + for match in re.finditer(IOCExtractor.IPV4_PATTERN, text): + add_ioc_if_not_covered(match) + + # IPv6 addresses (supports compressed format) + for match in re.finditer(IOCExtractor.IPV6_PATTERN, text): + add_ioc_if_not_covered(match) + + # URLs (check before domains to prevent double-matching) + for match in re.finditer(IOCExtractor.URL_PATTERN, text): + add_ioc_if_not_covered(match) + + # Domain names (basic pattern) + for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text): + # Filter out common false positives + if not match.group().startswith('example.'): + add_ioc_if_not_covered(match) + + # Email addresses + for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text): + add_ioc_if_not_covered(match) + + return iocs + + @staticmethod + def extract_iocs_with_types(text: str) -> List[Tuple[str, str]]: + """ + Extract IOCs from text and return as list of (ioc, type) tuples + + Args: + text: The text to extract IOCs from + + Returns: + List of (ioc_text, ioc_type) tuples + """ + iocs = [] + seen = set() + covered_ranges = set() + + def add_ioc_if_not_covered(match_obj, ioc_type): + """Add IOC if its range doesn't overlap with already covered ranges""" + start, end = match_obj.start(), match_obj.end() + # Check if this range overlaps with any covered range + for covered_start, covered_end in covered_ranges: + if not (end <= covered_start or start >= covered_end): + return False # Overlaps, don't add + ioc_text = match_obj.group() + if ioc_text not in seen: + seen.add(ioc_text) + covered_ranges.add((start, end)) + iocs.append((ioc_text, ioc_type)) + return True + return False + + # Process in priority order: longest hashes first + for match in re.finditer(IOCExtractor.SHA256_PATTERN, text): + add_ioc_if_not_covered(match, 'sha256') + + for match in re.finditer(IOCExtractor.SHA1_PATTERN, text): + add_ioc_if_not_covered(match, 'sha1') + + for match in re.finditer(IOCExtractor.MD5_PATTERN, text): + add_ioc_if_not_covered(match, 'md5') + + for match in re.finditer(IOCExtractor.IPV4_PATTERN, text): + add_ioc_if_not_covered(match, 'ipv4') + + for match in re.finditer(IOCExtractor.IPV6_PATTERN, text): + add_ioc_if_not_covered(match, 'ipv6') + + # URLs (check before domains to avoid double-matching) + for match in re.finditer(IOCExtractor.URL_PATTERN, text): + add_ioc_if_not_covered(match, 'url') + + # Domain names + for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text): + # Filter out common false positives + if not match.group().startswith('example.'): + add_ioc_if_not_covered(match, 'domain') + + # Email addresses + for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text): + add_ioc_if_not_covered(match, 'email') + + return iocs + + @staticmethod + def extract_iocs_with_positions(text: str) -> List[Tuple[str, int, int, str]]: + """ + Extract IOCs with their positions for highlighting + + Args: + text: The text to extract IOCs from + + Returns: + List of (ioc_text, start_pos, end_pos, ioc_type) tuples + """ + highlights = [] + covered_ranges = set() + + def overlaps(start, end): + """Check if range overlaps with any covered range""" + for covered_start, covered_end in covered_ranges: + if not (end <= covered_start or start >= covered_end): + return True + return False + + def add_highlight(match, ioc_type): + """Add highlight if it doesn't overlap with existing ones""" + start, end = match.start(), match.end() + if not overlaps(start, end): + highlights.append((match.group(), start, end, ioc_type)) + covered_ranges.add((start, end)) + + # Process in priority order: longest hashes first to avoid substring matches + for match in re.finditer(IOCExtractor.SHA256_PATTERN, text): + add_highlight(match, 'sha256') + + for match in re.finditer(IOCExtractor.SHA1_PATTERN, text): + add_highlight(match, 'sha1') + + for match in re.finditer(IOCExtractor.MD5_PATTERN, text): + add_highlight(match, 'md5') + + for match in re.finditer(IOCExtractor.IPV4_PATTERN, text): + add_highlight(match, 'ipv4') + + for match in re.finditer(IOCExtractor.IPV6_PATTERN, text): + add_highlight(match, 'ipv6') + + # URLs (check before domains to prevent double-matching) + for match in re.finditer(IOCExtractor.URL_PATTERN, text): + add_highlight(match, 'url') + + # Domain names + for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text): + if not match.group().startswith('example.'): + add_highlight(match, 'domain') + + # Email addresses + for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text): + add_highlight(match, 'email') + + return highlights + + @staticmethod + def classify_ioc(ioc: str) -> str: + """ + Classify an IOC by its type + + Args: + ioc: The IOC string to classify + + Returns: + The IOC type as a string + """ + if re.fullmatch(IOCExtractor.SHA256_PATTERN, ioc): + return 'sha256' + elif re.fullmatch(IOCExtractor.SHA1_PATTERN, ioc): + return 'sha1' + elif re.fullmatch(IOCExtractor.MD5_PATTERN, ioc): + return 'md5' + elif re.fullmatch(IOCExtractor.IPV4_PATTERN, ioc): + return 'ipv4' + elif re.fullmatch(IOCExtractor.IPV6_PATTERN, ioc): + return 'ipv6' + elif re.fullmatch(IOCExtractor.EMAIL_PATTERN, ioc): + return 'email' + elif re.fullmatch(IOCExtractor.URL_PATTERN, ioc): + return 'url' + elif re.fullmatch(IOCExtractor.DOMAIN_PATTERN, ioc): + return 'domain' + else: + return 'unknown' diff --git a/trace/models/extractors/tag_extractor.py b/trace/models/extractors/tag_extractor.py new file mode 100644 index 0000000..18dd066 --- /dev/null +++ b/trace/models/extractors/tag_extractor.py @@ -0,0 +1,34 @@ +"""Tag extraction logic for notes""" + +import re + + +class TagExtractor: + """Extract hashtags from text content""" + + TAG_PATTERN = r'#(\w+)' + + @staticmethod + def extract_tags(text: str) -> list[str]: + """ + Extract hashtags from content (case-insensitive, stored lowercase) + + Args: + text: The text to extract tags from + + Returns: + List of unique tags in lowercase, preserving order + """ + # Match hashtags: # followed by word characters + matches = re.findall(TagExtractor.TAG_PATTERN, text) + + # Convert to lowercase and remove duplicates while preserving order + seen = set() + tags = [] + for tag in matches: + tag_lower = tag.lower() + if tag_lower not in seen: + seen.add(tag_lower) + tags.append(tag_lower) + + return tags diff --git a/trace/storage.py b/trace/storage.py index e6dda58..3e1f5c6 100644 --- a/trace/storage.py +++ b/trace/storage.py @@ -1,402 +1,6 @@ -import json -import time -import os -import sys -from pathlib import Path -from typing import List, Optional, Tuple -from .models import Case, Evidence, Note +"""Storage module - backward compatibility wrapper""" -DEFAULT_APP_DIR = Path.home() / ".trace" +# For backward compatibility, export all classes from storage_impl +from .storage_impl import Storage, StateManager, LockManager, create_demo_case -class LockManager: - """Cross-platform file lock manager to prevent concurrent access""" - def __init__(self, lock_file: Path): - self.lock_file = lock_file - self.acquired = False - - def acquire(self, timeout: int = 5): - """Acquire lock with timeout. Returns True if successful.""" - start_time = time.time() - while time.time() - start_time < timeout: - try: - # Try to create lock file exclusively (fails if exists) - # Use 'x' mode which fails if file exists (atomic on most systems) - fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY) - os.write(fd, str(os.getpid()).encode()) - os.close(fd) - self.acquired = True - return True - except FileExistsError: - # Lock file exists, check if process is still alive - if self._is_stale_lock(): - # Remove stale lock and retry - try: - self.lock_file.unlink() - except FileNotFoundError: - pass - continue - # Active lock, wait a bit - time.sleep(0.1) - except Exception: - # Other errors, wait and retry - time.sleep(0.1) - return False - - def _is_stale_lock(self): - """Check if lock file is stale (process no longer exists)""" - try: - if not self.lock_file.exists(): - return False - with open(self.lock_file, 'r') as f: - pid = int(f.read().strip()) - - # Check if process exists (cross-platform) - if sys.platform == 'win32': - import ctypes - kernel32 = ctypes.windll.kernel32 - PROCESS_QUERY_INFORMATION = 0x0400 - handle = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid) - if handle: - kernel32.CloseHandle(handle) - return False - return True - else: - # Unix/Linux - send signal 0 to check if process exists - try: - os.kill(pid, 0) - return False # Process exists - except OSError: - return True # Process doesn't exist - except (ValueError, FileNotFoundError, PermissionError): - return True - - def release(self): - """Release the lock""" - if self.acquired: - try: - self.lock_file.unlink() - except FileNotFoundError: - pass - self.acquired = False - - def __enter__(self): - if not self.acquire(): - raise RuntimeError("Could not acquire lock: another instance is running") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - -class Storage: - def __init__(self, app_dir: Path = DEFAULT_APP_DIR, acquire_lock: bool = True): - self.app_dir = app_dir - self.data_file = self.app_dir / "data.json" - self.lock_file = self.app_dir / "app.lock" - self.lock_manager = None - self._ensure_app_dir() - - # Acquire lock to prevent concurrent access - if acquire_lock: - self.lock_manager = LockManager(self.lock_file) - if not self.lock_manager.acquire(timeout=5): - raise RuntimeError("Another instance of trace is already running. Please close it first.") - - self.cases: List[Case] = self._load_data() - - # Create demo case on first launch (only if data loaded successfully and is empty) - if not self.cases and self.data_file.exists(): - # File exists but is empty - could be first run after successful load - pass - elif not self.cases and not self.data_file.exists(): - # No file exists - first run - self._create_demo_case() - - def __del__(self): - """Release lock when Storage object is destroyed""" - if self.lock_manager: - self.lock_manager.release() - - def _ensure_app_dir(self): - if not self.app_dir.exists(): - self.app_dir.mkdir(parents=True, exist_ok=True) - - def _create_demo_case(self): - """Create a demo case with evidence showcasing all features""" - demo_case = Case( - case_number="DEMO-2024-001", - name="Sample Investigation", - investigator="Demo User" - ) - - # Add case-level notes to demonstrate case notes feature - case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident. - -Key objectives: -- Identify compromised systems -- Determine scope of data loss -- Document timeline of events - -#incident-response #data-breach #investigation""") - case_note1.calculate_hash() - case_note1.extract_tags() - case_note1.extract_iocs() - demo_case.notes.append(case_note1) - - case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com -Initial analysis shows potential credential harvesting attempt. -Review email headers and attachments for IOCs. #phishing #email-analysis""") - case_note2.calculate_hash() - case_note2.extract_tags() - case_note2.extract_iocs() - demo_case.notes.append(case_note2) - - # Create evidence 1: Compromised laptop - evidence1 = Evidence( - name="Employee Laptop HDD", - description="Primary workstation hard drive - user reported suspicious activity" - ) - # Add source hash for chain of custody demonstration - evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" - - # Add notes to evidence 1 with various features - note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager. -Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 - -Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""") - note1.calculate_hash() - note1.extract_tags() - note1.extract_iocs() - evidence1.notes.append(note1) - - note2 = Note(content="""Discovered suspicious connections to external IP addresses: -- 192.168.1.100 (local gateway) -- 203.0.113.45 (external, geolocation: Unknown) -- 198.51.100.78 (command and control server suspected) - -Browser history shows visits to malicious-site.com and data-exfil.net. -#network-analysis #ioc #c2-server""") - note2.calculate_hash() - note2.extract_tags() - note2.extract_iocs() - evidence1.notes.append(note2) - - note3 = Note(content="""Malware identified in temp directory: -File: evil.exe -MD5: d41d8cd98f00b204e9800998ecf8427e -SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709 -SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 - -Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""") - note3.calculate_hash() - note3.extract_tags() - note3.extract_iocs() - evidence1.notes.append(note3) - - note4 = Note(content="""Timeline analysis reveals: -- 2024-01-15 09:23:45 - Suspicious email received -- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login -- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site -- 2024-01-15 09:30:15 - Lateral movement detected - -User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""") - note4.calculate_hash() - note4.extract_tags() - note4.extract_iocs() - evidence1.notes.append(note4) - - demo_case.evidence.append(evidence1) - - # Create evidence 2: Network logs - evidence2 = Evidence( - name="Firewall Logs", - description="Corporate firewall logs from incident timeframe" - ) - evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2" - - note5 = Note(content="""Log analysis shows outbound connections to suspicious domains: -- attacker-c2.com on port 443 (encrypted channel) -- data-upload.net on port 8080 (unencrypted) -- exfil-server.org on port 22 (SSH tunnel) - -Total data transferred: approximately 2.3 GB over 4 hours. -#log-analysis #data-exfiltration #network-traffic""") - note5.calculate_hash() - note5.extract_tags() - note5.extract_iocs() - evidence2.notes.append(note5) - - note6 = Note(content="""Contact information found in malware configuration: -Email: attacker@malicious-domain.com -Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6) - -Cross-referencing with threat intelligence databases. #threat-intel #attribution""") - note6.calculate_hash() - note6.extract_tags() - note6.extract_iocs() - evidence2.notes.append(note6) - - demo_case.evidence.append(evidence2) - - # Create evidence 3: Email forensics - evidence3 = Evidence( - name="Phishing Email", - description="Original phishing email preserved in .eml format" - ) - - note7 = Note(content="""Email headers analysis: -From: sender@phishing-domain.com (spoofed) -Reply-To: attacker@evil-mail-server.net -X-Originating-IP: 198.51.100.99 - -Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif -Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""") - note7.calculate_hash() - note7.extract_tags() - note7.extract_iocs() - evidence3.notes.append(note7) - - demo_case.evidence.append(evidence3) - - # Add the demo case to storage - self.cases.append(demo_case) - self.save_data() - - def _load_data(self) -> List[Case]: - if not self.data_file.exists(): - return [] - try: - with open(self.data_file, 'r', encoding='utf-8') as f: - data = json.load(f) - return [Case.from_dict(c) for c in data] - except (json.JSONDecodeError, IOError, KeyError, ValueError) as e: - # Corrupted JSON - create backup and raise exception - import shutil - from datetime import datetime - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_file = self.app_dir / f"data.json.corrupted.{timestamp}" - try: - shutil.copy2(self.data_file, backup_file) - except Exception: - pass - # Raise exception with information about backup - raise RuntimeError(f"Data file is corrupted. Backup saved to: {backup_file}\nError: {e}") - - def start_fresh(self): - """Start with fresh data (for corrupted JSON recovery)""" - self.cases = [] - self._create_demo_case() - - def save_data(self): - data = [c.to_dict() for c in self.cases] - # Write to temp file then rename for atomic-ish write - temp_file = self.data_file.with_suffix(".tmp") - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(data, f, indent=2, ensure_ascii=False) - temp_file.replace(self.data_file) - - def add_case(self, case: Case): - self.cases.append(case) - self.save_data() - - def get_case(self, case_id: str) -> Optional[Case]: - # Case ID lookup - for c in self.cases: - if c.case_id == case_id: - return c - return None - - def delete_case(self, case_id: str): - self.cases = [c for c in self.cases if c.case_id != case_id] - self.save_data() - - def delete_evidence(self, case_id: str, evidence_id: str): - case = self.get_case(case_id) - if case: - case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id] - self.save_data() - - def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]: - for c in self.cases: - for e in c.evidence: - if e.evidence_id == evidence_id: - return c, e - return None, None - -class StateManager: - def __init__(self, app_dir: Path = DEFAULT_APP_DIR): - self.app_dir = app_dir - self.state_file = self.app_dir / "state" - self.settings_file = self.app_dir / "settings.json" - self._ensure_app_dir() - - def _ensure_app_dir(self): - if not self.app_dir.exists(): - self.app_dir.mkdir(parents=True, exist_ok=True) - - def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None): - state = self.get_active() - state["case_id"] = case_id - state["evidence_id"] = evidence_id - # Atomic write: write to temp file then rename - temp_file = self.state_file.with_suffix(".tmp") - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(state, f, ensure_ascii=False) - temp_file.replace(self.state_file) - - def get_active(self) -> dict: - if not self.state_file.exists(): - return {"case_id": None, "evidence_id": None} - try: - with open(self.state_file, 'r', encoding='utf-8') as f: - return json.load(f) - except (json.JSONDecodeError, IOError): - return {"case_id": None, "evidence_id": None} - - def validate_and_clear_stale(self, storage: 'Storage') -> str: - """Validate active state against storage and clear stale references. - Returns warning message if state was cleared, empty string otherwise.""" - state = self.get_active() - case_id = state.get("case_id") - evidence_id = state.get("evidence_id") - warning = "" - - if case_id: - case = storage.get_case(case_id) - if not case: - warning = f"Active case (ID: {case_id[:8]}...) no longer exists. Clearing active context." - self.set_active(None, None) - return warning - - # Validate evidence if set - if evidence_id: - _, evidence = storage.find_evidence(evidence_id) - if not evidence: - warning = f"Active evidence (ID: {evidence_id[:8]}...) no longer exists. Clearing to case level." - self.set_active(case_id, None) - return warning - - elif evidence_id: - # Evidence set but no case - invalid state - warning = "Invalid state: evidence set without case. Clearing active context." - self.set_active(None, None) - return warning - - return warning - - def get_settings(self) -> dict: - if not self.settings_file.exists(): - return {"pgp_enabled": True} - try: - with open(self.settings_file, 'r', encoding='utf-8') as f: - return json.load(f) - except (json.JSONDecodeError, IOError): - return {"pgp_enabled": True} - - def set_setting(self, key: str, value): - settings = self.get_settings() - settings[key] = value - # Atomic write: write to temp file then rename - temp_file = self.settings_file.with_suffix(".tmp") - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(settings, f, ensure_ascii=False) - temp_file.replace(self.settings_file) +__all__ = ['Storage', 'StateManager', 'LockManager', 'create_demo_case'] diff --git a/trace/storage_impl/__init__.py b/trace/storage_impl/__init__.py new file mode 100644 index 0000000..b44da70 --- /dev/null +++ b/trace/storage_impl/__init__.py @@ -0,0 +1,8 @@ +"""Storage implementation modules""" + +from .lock_manager import LockManager +from .state_manager import StateManager +from .storage import Storage +from .demo_data import create_demo_case + +__all__ = ['LockManager', 'StateManager', 'Storage', 'create_demo_case'] diff --git a/trace/storage_impl/demo_data.py b/trace/storage_impl/demo_data.py new file mode 100644 index 0000000..378ff9d --- /dev/null +++ b/trace/storage_impl/demo_data.py @@ -0,0 +1,143 @@ +"""Demo case creation for first-time users""" + +from ..models import Case, Evidence, Note + + +def create_demo_case() -> Case: + """Create a demo case with evidence showcasing all features""" + demo_case = Case( + case_number="DEMO-2024-001", + name="Sample Investigation", + investigator="Demo User" + ) + + # Add case-level notes to demonstrate case notes feature + case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident. + +Key objectives: +- Identify compromised systems +- Determine scope of data loss +- Document timeline of events + +#incident-response #data-breach #investigation""") + case_note1.calculate_hash() + case_note1.extract_tags() + case_note1.extract_iocs() + demo_case.notes.append(case_note1) + + case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com +Initial analysis shows potential credential harvesting attempt. +Review email headers and attachments for IOCs. #phishing #email-analysis""") + case_note2.calculate_hash() + case_note2.extract_tags() + case_note2.extract_iocs() + demo_case.notes.append(case_note2) + + # Create evidence 1: Compromised laptop + evidence1 = Evidence( + name="Employee Laptop HDD", + description="Primary workstation hard drive - user reported suspicious activity" + ) + # Add source hash for chain of custody demonstration + evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + + # Add notes to evidence 1 with various features + note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager. +Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""") + note1.calculate_hash() + note1.extract_tags() + note1.extract_iocs() + evidence1.notes.append(note1) + + note2 = Note(content="""Discovered suspicious connections to external IP addresses: +- 192.168.1.100 (local gateway) +- 203.0.113.45 (external, geolocation: Unknown) +- 198.51.100.78 (command and control server suspected) + +Browser history shows visits to malicious-site.com and data-exfil.net. +#network-analysis #ioc #c2-server""") + note2.calculate_hash() + note2.extract_tags() + note2.extract_iocs() + evidence1.notes.append(note2) + + note3 = Note(content="""Malware identified in temp directory: +File: evil.exe +MD5: d41d8cd98f00b204e9800998ecf8427e +SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709 +SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 + +Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""") + note3.calculate_hash() + note3.extract_tags() + note3.extract_iocs() + evidence1.notes.append(note3) + + note4 = Note(content="""Timeline analysis reveals: +- 2024-01-15 09:23:45 - Suspicious email received +- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login +- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site +- 2024-01-15 09:30:15 - Lateral movement detected + +User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""") + note4.calculate_hash() + note4.extract_tags() + note4.extract_iocs() + evidence1.notes.append(note4) + + demo_case.evidence.append(evidence1) + + # Create evidence 2: Network logs + evidence2 = Evidence( + name="Firewall Logs", + description="Corporate firewall logs from incident timeframe" + ) + evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2" + + note5 = Note(content="""Log analysis shows outbound connections to suspicious domains: +- attacker-c2.com on port 443 (encrypted channel) +- data-upload.net on port 8080 (unencrypted) +- exfil-server.org on port 22 (SSH tunnel) + +Total data transferred: approximately 2.3 GB over 4 hours. +#log-analysis #data-exfiltration #network-traffic""") + note5.calculate_hash() + note5.extract_tags() + note5.extract_iocs() + evidence2.notes.append(note5) + + note6 = Note(content="""Contact information found in malware configuration: +Email: attacker@malicious-domain.com +Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6) + +Cross-referencing with threat intelligence databases. #threat-intel #attribution""") + note6.calculate_hash() + note6.extract_tags() + note6.extract_iocs() + evidence2.notes.append(note6) + + demo_case.evidence.append(evidence2) + + # Create evidence 3: Email forensics + evidence3 = Evidence( + name="Phishing Email", + description="Original phishing email preserved in .eml format" + ) + + note7 = Note(content="""Email headers analysis: +From: sender@phishing-domain.com (spoofed) +Reply-To: attacker@evil-mail-server.net +X-Originating-IP: 198.51.100.99 + +Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif +Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""") + note7.calculate_hash() + note7.extract_tags() + note7.extract_iocs() + evidence3.notes.append(note7) + + demo_case.evidence.append(evidence3) + + return demo_case diff --git a/trace/storage_impl/lock_manager.py b/trace/storage_impl/lock_manager.py new file mode 100644 index 0000000..64caaf9 --- /dev/null +++ b/trace/storage_impl/lock_manager.py @@ -0,0 +1,87 @@ +"""File lock manager for preventing concurrent access""" + +import os +import sys +import time +from pathlib import Path + + +class LockManager: + """Cross-platform file lock manager to prevent concurrent access""" + + def __init__(self, lock_file: Path): + self.lock_file = lock_file + self.acquired = False + + def acquire(self, timeout: int = 5): + """Acquire lock with timeout. Returns True if successful.""" + start_time = time.time() + while time.time() - start_time < timeout: + try: + # Try to create lock file exclusively (fails if exists) + # Use 'x' mode which fails if file exists (atomic on most systems) + fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY) + os.write(fd, str(os.getpid()).encode()) + os.close(fd) + self.acquired = True + return True + except FileExistsError: + # Lock file exists, check if process is still alive + if self._is_stale_lock(): + # Remove stale lock and retry + try: + self.lock_file.unlink() + except FileNotFoundError: + pass + continue + # Active lock, wait a bit + time.sleep(0.1) + except Exception: + # Other errors, wait and retry + time.sleep(0.1) + return False + + def _is_stale_lock(self): + """Check if lock file is stale (process no longer exists)""" + try: + if not self.lock_file.exists(): + return False + with open(self.lock_file, 'r') as f: + pid = int(f.read().strip()) + + # Check if process exists (cross-platform) + if sys.platform == 'win32': + import ctypes + kernel32 = ctypes.windll.kernel32 + PROCESS_QUERY_INFORMATION = 0x0400 + handle = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid) + if handle: + kernel32.CloseHandle(handle) + return False + return True + else: + # Unix/Linux - send signal 0 to check if process exists + try: + os.kill(pid, 0) + return False # Process exists + except OSError: + return True # Process doesn't exist + except (ValueError, FileNotFoundError, PermissionError): + return True + + def release(self): + """Release the lock""" + if self.acquired: + try: + self.lock_file.unlink() + except FileNotFoundError: + pass + self.acquired = False + + def __enter__(self): + if not self.acquire(): + raise RuntimeError("Could not acquire lock: another instance is running") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() diff --git a/trace/storage_impl/state_manager.py b/trace/storage_impl/state_manager.py new file mode 100644 index 0000000..a7e3c99 --- /dev/null +++ b/trace/storage_impl/state_manager.py @@ -0,0 +1,92 @@ +"""State manager for active context and settings""" + +import json +from pathlib import Path +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .storage import Storage + +DEFAULT_APP_DIR = Path.home() / ".trace" + + +class StateManager: + """Manages active context and user settings""" + + def __init__(self, app_dir: Path = DEFAULT_APP_DIR): + self.app_dir = app_dir + self.state_file = self.app_dir / "state" + self.settings_file = self.app_dir / "settings.json" + self._ensure_app_dir() + + def _ensure_app_dir(self): + if not self.app_dir.exists(): + self.app_dir.mkdir(parents=True, exist_ok=True) + + def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None): + state = self.get_active() + state["case_id"] = case_id + state["evidence_id"] = evidence_id + # Atomic write: write to temp file then rename + temp_file = self.state_file.with_suffix(".tmp") + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(state, f, ensure_ascii=False) + temp_file.replace(self.state_file) + + def get_active(self) -> dict: + if not self.state_file.exists(): + return {"case_id": None, "evidence_id": None} + try: + with open(self.state_file, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {"case_id": None, "evidence_id": None} + + def validate_and_clear_stale(self, storage: 'Storage') -> str: + """Validate active state against storage and clear stale references. + Returns warning message if state was cleared, empty string otherwise.""" + state = self.get_active() + case_id = state.get("case_id") + evidence_id = state.get("evidence_id") + warning = "" + + if case_id: + case = storage.get_case(case_id) + if not case: + warning = f"Active case (ID: {case_id[:8]}...) no longer exists. Clearing active context." + self.set_active(None, None) + return warning + + # Validate evidence if set + if evidence_id: + _, evidence = storage.find_evidence(evidence_id) + if not evidence: + warning = f"Active evidence (ID: {evidence_id[:8]}...) no longer exists. Clearing to case level." + self.set_active(case_id, None) + return warning + + elif evidence_id: + # Evidence set but no case - invalid state + warning = "Invalid state: evidence set without case. Clearing active context." + self.set_active(None, None) + return warning + + return warning + + def get_settings(self) -> dict: + if not self.settings_file.exists(): + return {"pgp_enabled": True} + try: + with open(self.settings_file, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {"pgp_enabled": True} + + def set_setting(self, key: str, value): + settings = self.get_settings() + settings[key] = value + # Atomic write: write to temp file then rename + temp_file = self.settings_file.with_suffix(".tmp") + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(settings, f, ensure_ascii=False) + temp_file.replace(self.settings_file) diff --git a/trace/storage_impl/storage.py b/trace/storage_impl/storage.py new file mode 100644 index 0000000..2d839a7 --- /dev/null +++ b/trace/storage_impl/storage.py @@ -0,0 +1,112 @@ +"""Main storage class for persisting cases, evidence, and notes""" + +import json +from pathlib import Path +from typing import List, Optional, Tuple + +from ..models import Case, Evidence +from .lock_manager import LockManager +from .demo_data import create_demo_case + +DEFAULT_APP_DIR = Path.home() / ".trace" + + +class Storage: + """Manages persistence of all forensic data""" + + def __init__(self, app_dir: Path = DEFAULT_APP_DIR, acquire_lock: bool = True): + self.app_dir = app_dir + self.data_file = self.app_dir / "data.json" + self.lock_file = self.app_dir / "app.lock" + self.lock_manager = None + self._ensure_app_dir() + + # Acquire lock to prevent concurrent access + if acquire_lock: + self.lock_manager = LockManager(self.lock_file) + if not self.lock_manager.acquire(timeout=5): + raise RuntimeError("Another instance of trace is already running. Please close it first.") + + self.cases: List[Case] = self._load_data() + + # Create demo case on first launch (only if data loaded successfully and is empty) + if not self.cases and self.data_file.exists(): + # File exists but is empty - could be first run after successful load + pass + elif not self.cases and not self.data_file.exists(): + # No file exists - first run + demo_case = create_demo_case() + self.cases.append(demo_case) + self.save_data() + + def __del__(self): + """Release lock when Storage object is destroyed""" + if self.lock_manager: + self.lock_manager.release() + + def _ensure_app_dir(self): + if not self.app_dir.exists(): + self.app_dir.mkdir(parents=True, exist_ok=True) + + def _load_data(self) -> List[Case]: + if not self.data_file.exists(): + return [] + try: + with open(self.data_file, 'r', encoding='utf-8') as f: + data = json.load(f) + return [Case.from_dict(c) for c in data] + except (json.JSONDecodeError, IOError, KeyError, ValueError) as e: + # Corrupted JSON - create backup and raise exception + import shutil + from datetime import datetime + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + backup_file = self.app_dir / f"data.json.corrupted.{timestamp}" + try: + shutil.copy2(self.data_file, backup_file) + except Exception: + pass + # Raise exception with information about backup + raise RuntimeError(f"Data file is corrupted. Backup saved to: {backup_file}\nError: {e}") + + def start_fresh(self): + """Start with fresh data (for corrupted JSON recovery)""" + self.cases = [] + demo_case = create_demo_case() + self.cases.append(demo_case) + self.save_data() + + def save_data(self): + data = [c.to_dict() for c in self.cases] + # Write to temp file then rename for atomic-ish write + temp_file = self.data_file.with_suffix(".tmp") + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + temp_file.replace(self.data_file) + + def add_case(self, case: Case): + self.cases.append(case) + self.save_data() + + def get_case(self, case_id: str) -> Optional[Case]: + # Case ID lookup + for c in self.cases: + if c.case_id == case_id: + return c + return None + + def delete_case(self, case_id: str): + self.cases = [c for c in self.cases if c.case_id != case_id] + self.save_data() + + def delete_evidence(self, case_id: str, evidence_id: str): + case = self.get_case(case_id) + if case: + case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id] + self.save_data() + + def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]: + for c in self.cases: + for e in c.evidence: + if e.evidence_id == evidence_id: + return c, e + return None, None diff --git a/trace/tui/__init__.py b/trace/tui/__init__.py new file mode 100644 index 0000000..d1ead92 --- /dev/null +++ b/trace/tui/__init__.py @@ -0,0 +1,7 @@ +"""TUI (Text User Interface) package for trace application""" + +# Import from the main tui_app module for backward compatibility +# The tui_app.py file contains the main TUI class and run_tui function +from ..tui_app import run_tui, TUI + +__all__ = ['run_tui', 'TUI'] diff --git a/trace/tui/handlers/__init__.py b/trace/tui/handlers/__init__.py new file mode 100644 index 0000000..dc1d510 --- /dev/null +++ b/trace/tui/handlers/__init__.py @@ -0,0 +1,5 @@ +"""TUI handlers for various operations""" + +from .export_handler import ExportHandler + +__all__ = ['ExportHandler'] diff --git a/trace/tui/handlers/export_handler.py b/trace/tui/handlers/export_handler.py new file mode 100644 index 0000000..c83b441 --- /dev/null +++ b/trace/tui/handlers/export_handler.py @@ -0,0 +1,238 @@ +"""Export functionality for TUI""" + +import time +import datetime +from pathlib import Path +from typing import List, Tuple, Optional + +from ...models import Note, Case, Evidence + + +class ExportHandler: + """Handles exporting IOCs and notes to files""" + + @staticmethod + def export_iocs_to_file( + iocs_with_counts: List[Tuple[str, int, str]], + active_case: Optional[Case], + active_evidence: Optional[Evidence], + get_iocs_func=None + ) -> Tuple[bool, str]: + """ + Export IOCs to a text file + + Args: + iocs_with_counts: List of (ioc, count, type) tuples + active_case: Active case context + active_evidence: Active evidence context + get_iocs_func: Function to get IOCs for a list of notes + + Returns: + Tuple of (success: bool, message: str) + """ + if not iocs_with_counts: + return False, "No IOCs to export." + + # Determine context for filename + if active_evidence: + context_name = f"{active_case.case_number}_{active_evidence.name}" if active_case else active_evidence.name + elif active_case: + context_name = active_case.case_number + else: + context_name = "unknown" + + # Clean filename + context_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in context_name) + + # Create exports directory if it doesn't exist + export_dir = Path.home() / ".trace" / "exports" + export_dir.mkdir(parents=True, exist_ok=True) + + # Generate filename with timestamp + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"iocs_{context_name}_{timestamp}.txt" + filepath = export_dir / filename + + # Build export content + lines = [] + lines.append(f"# IOC Export - {context_name}") + lines.append(f"# Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + lines.append("") + + if active_evidence: + # Evidence context - only evidence IOCs + lines.append(f"## Evidence: {active_evidence.name}") + lines.append("") + for ioc, count, ioc_type in iocs_with_counts: + lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)") + elif active_case and get_iocs_func: + # Case context - show case IOCs + evidence IOCs with separators + # Get case notes IOCs + case_iocs = get_iocs_func(active_case.notes) + if case_iocs: + lines.append("## Case Notes") + lines.append("") + for ioc, count, ioc_type in case_iocs: + lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)") + lines.append("") + + # Get IOCs from each evidence + for ev in active_case.evidence: + ev_iocs = get_iocs_func(ev.notes) + if ev_iocs: + lines.append(f"## Evidence: {ev.name}") + lines.append("") + for ioc, count, ioc_type in ev_iocs: + lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)") + lines.append("") + + # Write to file + try: + with open(filepath, 'w', encoding='utf-8') as f: + f.write('\n'.join(lines)) + return True, f"IOCs exported to: {filepath}" + except Exception as e: + return False, f"Export failed: {str(e)}" + + @staticmethod + def export_case_to_markdown(case: Case) -> Tuple[bool, str]: + """ + Export case (and all its evidence) to markdown + + Args: + case: The case to export + + Returns: + Tuple of (success: bool, message: str) + """ + # Create exports directory if it doesn't exist + export_dir = Path.home() / ".trace" / "exports" + export_dir.mkdir(parents=True, exist_ok=True) + + # Generate filename + case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number) + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"case_{case_name}_{timestamp}.md" + filepath = export_dir / filename + + try: + with open(filepath, 'w', encoding='utf-8') as f: + f.write("# Forensic Notes Export\n\n") + f.write(f"Generated on: {time.ctime()}\n\n") + + # Write case info + f.write(f"## Case: {case.case_number}\n") + if case.name: + f.write(f"**Name:** {case.name}\n") + if case.investigator: + f.write(f"**Investigator:** {case.investigator}\n") + f.write(f"**Case ID:** {case.case_id}\n\n") + + # Case notes + f.write("### Case Notes\n") + if not case.notes: + f.write("_No notes._\n") + for note in case.notes: + ExportHandler._write_note_markdown(f, note) + + # Evidence + f.write("\n### Evidence\n") + if not case.evidence: + f.write("_No evidence._\n") + + for ev in case.evidence: + f.write(f"#### Evidence: {ev.name}\n") + if ev.description: + f.write(f"_{ev.description}_\n") + f.write(f"**ID:** {ev.evidence_id}\n") + + # Include source hash if available + source_hash = ev.metadata.get("source_hash") + if source_hash: + f.write(f"**Source Hash:** `{source_hash}`\n") + f.write("\n") + + f.write("##### Evidence Notes\n") + if not ev.notes: + f.write("_No notes._\n") + for note in ev.notes: + ExportHandler._write_note_markdown(f, note) + f.write("\n") + + return True, f"Case exported to: {filepath}" + except Exception as e: + return False, f"Export failed: {str(e)}" + + @staticmethod + def export_evidence_to_markdown( + evidence: Evidence, + case: Optional[Case] + ) -> Tuple[bool, str]: + """ + Export evidence to markdown + + Args: + evidence: The evidence to export + case: The parent case (for context) + + Returns: + Tuple of (success: bool, message: str) + """ + # Create exports directory if it doesn't exist + export_dir = Path.home() / ".trace" / "exports" + export_dir.mkdir(parents=True, exist_ok=True) + + # Generate filename + case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number) if case else "unknown" + ev_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in evidence.name) + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"evidence_{case_name}_{ev_name}_{timestamp}.md" + filepath = export_dir / filename + + try: + with open(filepath, 'w', encoding='utf-8') as f: + f.write("# Forensic Evidence Export\n\n") + f.write(f"Generated on: {time.ctime()}\n\n") + + # Case context + if case: + f.write(f"**Case:** {case.case_number}\n") + if case.name: + f.write(f"**Case Name:** {case.name}\n") + f.write("\n") + + # Evidence info + f.write(f"## Evidence: {evidence.name}\n") + if evidence.description: + f.write(f"**Description:** {evidence.description}\n") + if evidence.metadata.get("source_hash"): + f.write(f"**Source Hash:** `{evidence.metadata['source_hash']}`\n") + f.write(f"**Evidence ID:** {evidence.evidence_id}\n\n") + + # Notes + f.write("### Notes\n") + if not evidence.notes: + f.write("_No notes._\n") + for note in evidence.notes: + ExportHandler._write_note_markdown(f, note) + + return True, f"Evidence exported to: {filepath}" + except Exception as e: + return False, f"Export failed: {str(e)}" + + @staticmethod + def _write_note_markdown(f, note: Note): + """Helper to write a note in markdown format""" + f.write(f"- **{time.ctime(note.timestamp)}**\n") + f.write(f" - Content: {note.content}\n") + if note.tags: + tags_str = " ".join([f"#{tag}" for tag in note.tags]) + f.write(f" - Tags: {tags_str}\n") + f.write(f" - Hash: `{note.content_hash}`\n") + if note.signature: + f.write(" - **Signature Verified:**\n") + f.write(" ```\n") + for line in note.signature.splitlines(): + f.write(f" {line}\n") + f.write(" ```\n") + f.write("\n") diff --git a/trace/tui/rendering/__init__.py b/trace/tui/rendering/__init__.py new file mode 100644 index 0000000..fc14067 --- /dev/null +++ b/trace/tui/rendering/__init__.py @@ -0,0 +1,6 @@ +"""Rendering utilities for TUI""" + +from .colors import init_colors, ColorPairs +from .text_renderer import TextRenderer + +__all__ = ['init_colors', 'ColorPairs', 'TextRenderer'] diff --git a/trace/tui/rendering/colors.py b/trace/tui/rendering/colors.py new file mode 100644 index 0000000..3d67ce7 --- /dev/null +++ b/trace/tui/rendering/colors.py @@ -0,0 +1,43 @@ +"""Color pair initialization and constants for TUI""" + +import curses + + +class ColorPairs: + """Color pair constants""" + SELECTION = 1 # Black on cyan + SUCCESS = 2 # Green on black + WARNING = 3 # Yellow on black + ERROR = 4 # Red on black + HEADER = 5 # Cyan on black + METADATA = 6 # White on black + BORDER = 7 # Blue on black + TAG = 8 # Magenta on black + IOC_SELECTED = 9 # Red on cyan + TAG_SELECTED = 10 # Yellow on cyan + + +def init_colors(): + """Initialize color pairs for the TUI""" + curses.start_color() + if curses.has_colors(): + # Selection / Highlight + curses.init_pair(ColorPairs.SELECTION, curses.COLOR_BLACK, curses.COLOR_CYAN) + # Success / Active indicators + curses.init_pair(ColorPairs.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK) + # Info / Warnings + curses.init_pair(ColorPairs.WARNING, curses.COLOR_YELLOW, curses.COLOR_BLACK) + # Errors / Critical / IOCs + curses.init_pair(ColorPairs.ERROR, curses.COLOR_RED, curses.COLOR_BLACK) + # Headers / Titles (bright cyan) + curses.init_pair(ColorPairs.HEADER, curses.COLOR_CYAN, curses.COLOR_BLACK) + # Metadata / Secondary text (dim) + curses.init_pair(ColorPairs.METADATA, curses.COLOR_WHITE, curses.COLOR_BLACK) + # Borders / Separators (blue) + curses.init_pair(ColorPairs.BORDER, curses.COLOR_BLUE, curses.COLOR_BLACK) + # Tags (magenta) + curses.init_pair(ColorPairs.TAG, curses.COLOR_MAGENTA, curses.COLOR_BLACK) + # IOCs on selected background (red on cyan) + curses.init_pair(ColorPairs.IOC_SELECTED, curses.COLOR_RED, curses.COLOR_CYAN) + # Tags on selected background (yellow on cyan) + curses.init_pair(ColorPairs.TAG_SELECTED, curses.COLOR_YELLOW, curses.COLOR_CYAN) diff --git a/trace/tui/rendering/text_renderer.py b/trace/tui/rendering/text_renderer.py new file mode 100644 index 0000000..2fe3103 --- /dev/null +++ b/trace/tui/rendering/text_renderer.py @@ -0,0 +1,137 @@ +"""Text rendering utilities with highlighting support""" + +import curses +import re +from ...models import Note +from .colors import ColorPairs + + +class TextRenderer: + """Utility class for rendering text with highlights""" + + @staticmethod + def safe_truncate(text, max_width, ellipsis="..."): + """ + Safely truncate text to fit within max_width, handling Unicode characters. + Uses a conservative approach to avoid curses display errors. + """ + if not text: + return text + + # Try to fit the text as-is + if len(text) <= max_width: + return text + + # Need to truncate - account for ellipsis + if max_width <= len(ellipsis): + return ellipsis[:max_width] + + # Truncate conservatively (character by character) to handle multi-byte UTF-8 + target_len = max_width - len(ellipsis) + truncated = text[:target_len] + + # Encode and check actual byte length to be safe with UTF-8 + # If it's too long, trim further + while len(truncated) > 0: + try: + # Test if this will fit when displayed + test_str = truncated + ellipsis + if len(test_str) <= max_width: + return test_str + except: + pass + # Trim one more character + truncated = truncated[:-1] + + return ellipsis[:max_width] + + @staticmethod + def display_line_with_highlights(screen, y, x_start, line, is_selected=False): + """ + Display a line with intelligent highlighting. + - IOCs are highlighted with ColorPairs.ERROR (red) + - Tags are highlighted with ColorPairs.WARNING (yellow) + - Selection background is ColorPairs.SELECTION (cyan) for non-IOC text + - IOC highlighting takes priority over selection + """ + # Extract IOCs and tags + highlights = [] + + # Get IOCs with positions + for text, start, end, ioc_type in Note.extract_iocs_with_positions(line): + highlights.append((text, start, end, 'ioc')) + + # Get tags + for match in re.finditer(r'#\w+', line): + highlights.append((match.group(), match.start(), match.end(), 'tag')) + + # Sort by position and remove overlaps (IOCs take priority over tags) + highlights.sort(key=lambda x: x[1]) + deduplicated = [] + last_end = -1 + for text, start, end, htype in highlights: + if start >= last_end: + deduplicated.append((text, start, end, htype)) + last_end = end + highlights = deduplicated + + if not highlights: + # No highlights - use selection color if selected + if is_selected: + screen.attron(curses.color_pair(ColorPairs.SELECTION)) + screen.addstr(y, x_start, line) + screen.attroff(curses.color_pair(ColorPairs.SELECTION)) + else: + screen.addstr(y, x_start, line) + return + + # Display with intelligent highlighting + x_pos = x_start + last_pos = 0 + + for text, start, end, htype in highlights: + # Add text before this highlight + if start > last_pos: + text_before = line[last_pos:start] + if is_selected: + screen.attron(curses.color_pair(ColorPairs.SELECTION)) + screen.addstr(y, x_pos, text_before) + screen.attroff(curses.color_pair(ColorPairs.SELECTION)) + else: + screen.addstr(y, x_pos, text_before) + x_pos += len(text_before) + + # Add highlighted text + if htype == 'ioc': + # IOC highlighting: red on cyan if selected, red on black otherwise + if is_selected: + screen.attron(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD) + screen.addstr(y, x_pos, text) + screen.attroff(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD) + else: + screen.attron(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD) + screen.addstr(y, x_pos, text) + screen.attroff(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD) + else: # tag + # Tag highlighting: yellow on cyan if selected, yellow on black otherwise + if is_selected: + screen.attron(curses.color_pair(ColorPairs.TAG_SELECTED)) + screen.addstr(y, x_pos, text) + screen.attroff(curses.color_pair(ColorPairs.TAG_SELECTED)) + else: + screen.attron(curses.color_pair(ColorPairs.WARNING)) + screen.addstr(y, x_pos, text) + screen.attroff(curses.color_pair(ColorPairs.WARNING)) + + x_pos += len(text) + last_pos = end + + # Add remaining text + if last_pos < len(line): + text_after = line[last_pos:] + if is_selected: + screen.attron(curses.color_pair(ColorPairs.SELECTION)) + screen.addstr(y, x_pos, text_after) + screen.attroff(curses.color_pair(ColorPairs.SELECTION)) + else: + screen.addstr(y, x_pos, text_after) diff --git a/trace/tui.py b/trace/tui_app.py similarity index 99% rename from trace/tui.py rename to trace/tui_app.py index 4cbef05..32350c8 100644 --- a/trace/tui.py +++ b/trace/tui_app.py @@ -2213,9 +2213,9 @@ class TUI: options = ["GPG Signing", "Select GPG Key", "Save", "Cancel"] curses.curs_set(0) - h = 12 + h = 15 # Increased from 12 to properly show all 4 options + footer w = 60 - y = self.height // 2 - 6 + y = self.height // 2 - 7 # Adjusted to keep centered x = (self.width - w) // 2 win = curses.newwin(h, w, y, x)