mirror of
https://github.com/overcuriousity/trace.git
synced 2025-12-20 13:02:21 +00:00
Merge pull request #9 from overcuriousity/claude/restructure-for-ai-agents-01VkkJKiFXNXajfM6DUohVfG
Claude/restructure 01 vkk j ki fxn xajf m6 d uoh vf g
This commit is contained in:
67
CLAUDE.md
67
CLAUDE.md
@@ -52,18 +52,30 @@ The application uses a three-level hierarchy:
|
|||||||
|
|
||||||
Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy.
|
Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy.
|
||||||
|
|
||||||
### Core Modules
|
### Modular Structure (Optimized for AI Coding Agents)
|
||||||
|
|
||||||
**`trace/models.py`**: Data models using dataclasses
|
The codebase is organized into focused, single-responsibility modules to make it easier for AI agents and developers to navigate, understand, and modify specific functionality:
|
||||||
- `Note`: Content + timestamp + SHA256 hash + optional GPG signature + auto-extracted tags/IOCs
|
|
||||||
- `Evidence`: Container for notes about a specific piece of evidence, includes metadata dict for source hashes
|
**`trace/models/`**: Data models package
|
||||||
- `Case`: Top-level container with case number, investigator, evidence list, and notes
|
- `__init__.py`: Main model classes (Note, Evidence, Case) with dataclass definitions
|
||||||
|
- `extractors/tag_extractor.py`: Tag extraction logic (hashtag parsing)
|
||||||
|
- `extractors/ioc_extractor.py`: IOC extraction logic (IPs, domains, URLs, hashes, emails)
|
||||||
- All models implement `to_dict()`/`from_dict()` for JSON serialization
|
- All models implement `to_dict()`/`from_dict()` for JSON serialization
|
||||||
|
- Models use extractors for automatic tag and IOC detection
|
||||||
|
|
||||||
**`trace/storage.py`**: Persistence layer
|
**`trace/storage_impl/`**: Storage implementation package
|
||||||
- `Storage`: Manages `~/.trace/data.json` with atomic writes (temp file + rename)
|
- `storage.py`: Main Storage class managing `~/.trace/data.json` with atomic writes
|
||||||
- `StateManager`: Manages `~/.trace/state` (active case/evidence) and `~/.trace/settings.json` (PGP enabled/disabled)
|
- `state_manager.py`: StateManager for active context and settings persistence
|
||||||
- Data is loaded into memory on init, modified, then saved atomically
|
- `lock_manager.py`: Cross-platform file locking to prevent concurrent access
|
||||||
|
- `demo_data.py`: Demo case creation for first-time users
|
||||||
|
- Backward compatible via `trace/storage.py` wrapper
|
||||||
|
|
||||||
|
**`trace/tui/`**: Text User Interface package
|
||||||
|
- `tui.py`: Main TUI class with view hierarchy and event loop (3307 lines - target for future refactoring)
|
||||||
|
- `rendering/colors.py`: Color pair initialization and constants
|
||||||
|
- `rendering/text_renderer.py`: Text rendering with IOC/tag highlighting
|
||||||
|
- `handlers/export_handler.py`: Export functionality (IOCs, markdown reports)
|
||||||
|
- Future refactoring will extract views, dialogs, and input handlers
|
||||||
|
|
||||||
**`trace/crypto.py`**: Integrity features
|
**`trace/crypto.py`**: Integrity features
|
||||||
- `sign_content()`: GPG clearsign via subprocess (falls back gracefully if GPG unavailable)
|
- `sign_content()`: GPG clearsign via subprocess (falls back gracefully if GPG unavailable)
|
||||||
@@ -74,13 +86,6 @@ Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy.
|
|||||||
- `export_markdown()`: Generates full case report with hashes and signatures
|
- `export_markdown()`: Generates full case report with hashes and signatures
|
||||||
- `main()`: Argument parsing, routes to TUI or CLI functions
|
- `main()`: Argument parsing, routes to TUI or CLI functions
|
||||||
|
|
||||||
**`trace/tui.py`**: Curses-based Text User Interface
|
|
||||||
- View hierarchy: case_list → case_detail → evidence_detail
|
|
||||||
- Additional views: tags_list, tag_notes_list, ioc_list, ioc_notes_list, note_detail
|
|
||||||
- Multi-line note editor with Ctrl+G to submit, Esc to cancel
|
|
||||||
- Filter mode (press `/`), active context management (press `a`)
|
|
||||||
- All note additions automatically extract tags (#hashtag) and IOCs (IPs, domains, URLs, hashes, emails)
|
|
||||||
|
|
||||||
### Key Features Implementation
|
### Key Features Implementation
|
||||||
|
|
||||||
**Integrity System**: Every note automatically gets:
|
**Integrity System**: Every note automatically gets:
|
||||||
@@ -129,3 +134,33 @@ temp_file.replace(self.data_file)
|
|||||||
## Testing Notes
|
## Testing Notes
|
||||||
|
|
||||||
Tests use temporary directories created with `tempfile.mkdtemp()` and cleaned up in `tearDown()` to avoid polluting `~/.trace/`.
|
Tests use temporary directories created with `tempfile.mkdtemp()` and cleaned up in `tearDown()` to avoid polluting `~/.trace/`.
|
||||||
|
|
||||||
|
## AI Agent Optimization
|
||||||
|
|
||||||
|
The codebase has been restructured to be optimal for AI coding agents:
|
||||||
|
|
||||||
|
### Module Organization Benefits
|
||||||
|
- **Focused Files**: Each module has a single, clear responsibility (50-250 lines typically)
|
||||||
|
- **Easy Navigation**: Functionality is easy to locate by purpose (e.g., IOC extraction, export handlers)
|
||||||
|
- **Independent Modification**: Changes to one module rarely affect others
|
||||||
|
- **Clear Interfaces**: Modules communicate through well-defined imports
|
||||||
|
- **Reduced Context**: AI agents can focus on relevant files without loading massive monoliths
|
||||||
|
|
||||||
|
### File Size Guidelines
|
||||||
|
- **Small modules** (< 150 lines): Ideal for focused tasks
|
||||||
|
- **Medium modules** (150-300 lines): Acceptable for cohesive functionality
|
||||||
|
- **Large modules** (> 500 lines): Consider refactoring into smaller components
|
||||||
|
- **Very large modules** (> 1000 lines): Priority target for extraction and modularization
|
||||||
|
|
||||||
|
### Current Status
|
||||||
|
- ✅ Models: Organized into package with extractors separated
|
||||||
|
- ✅ Storage: Split into focused modules (storage, state, locking, demo data)
|
||||||
|
- ✅ TUI Utilities: Rendering and export handlers extracted
|
||||||
|
- ⏳ TUI Main: Still monolithic (3307 lines) - future refactoring needed
|
||||||
|
|
||||||
|
### Future Refactoring Targets
|
||||||
|
The `trace/tui.py` file (3307 lines) should be further split into:
|
||||||
|
- `tui/views/` - Individual view classes (case list, evidence detail, etc.)
|
||||||
|
- `tui/dialogs/` - Dialog functions (input, confirm, settings, etc.)
|
||||||
|
- `tui/handlers/` - Input and navigation handlers
|
||||||
|
- `tui/app.py` - Main TUI orchestration class
|
||||||
|
|||||||
@@ -163,7 +163,7 @@ def main():
|
|||||||
|
|
||||||
# Launch TUI (with optional direct navigation to active context)
|
# Launch TUI (with optional direct navigation to active context)
|
||||||
try:
|
try:
|
||||||
from .tui import run_tui
|
from .tui_app import run_tui
|
||||||
run_tui(open_active=args.open)
|
run_tui(open_active=args.open)
|
||||||
except ImportError as e:
|
except ImportError as e:
|
||||||
print(f"Error launching TUI: {e}")
|
print(f"Error launching TUI: {e}")
|
||||||
|
|||||||
311
trace/models.py
311
trace/models.py
@@ -1,311 +0,0 @@
|
|||||||
import time
|
|
||||||
import hashlib
|
|
||||||
import uuid
|
|
||||||
import re
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from typing import List, Optional, Dict
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Note:
|
|
||||||
content: str
|
|
||||||
timestamp: float = field(default_factory=time.time)
|
|
||||||
note_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
||||||
content_hash: str = ""
|
|
||||||
signature: Optional[str] = None
|
|
||||||
tags: List[str] = field(default_factory=list)
|
|
||||||
iocs: List[str] = field(default_factory=list)
|
|
||||||
|
|
||||||
def extract_tags(self):
|
|
||||||
"""Extract hashtags from content (case-insensitive, stored lowercase)"""
|
|
||||||
# Match hashtags: # followed by word characters
|
|
||||||
tag_pattern = r'#(\w+)'
|
|
||||||
matches = re.findall(tag_pattern, self.content)
|
|
||||||
# Convert to lowercase and remove duplicates while preserving order
|
|
||||||
seen = set()
|
|
||||||
self.tags = []
|
|
||||||
for tag in matches:
|
|
||||||
tag_lower = tag.lower()
|
|
||||||
if tag_lower not in seen:
|
|
||||||
seen.add(tag_lower)
|
|
||||||
self.tags.append(tag_lower)
|
|
||||||
|
|
||||||
def extract_iocs(self):
|
|
||||||
"""Extract Indicators of Compromise from content"""
|
|
||||||
seen = set()
|
|
||||||
covered_ranges = set()
|
|
||||||
self.iocs = []
|
|
||||||
|
|
||||||
def add_ioc_if_not_covered(match_obj):
|
|
||||||
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
|
||||||
start, end = match_obj.start(), match_obj.end()
|
|
||||||
# Check if this range overlaps with any covered range
|
|
||||||
for covered_start, covered_end in covered_ranges:
|
|
||||||
if not (end <= covered_start or start >= covered_end):
|
|
||||||
return False # Overlaps, don't add
|
|
||||||
text = match_obj.group()
|
|
||||||
if text not in seen:
|
|
||||||
seen.add(text)
|
|
||||||
covered_ranges.add((start, end))
|
|
||||||
self.iocs.append(text)
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Process in order of priority to avoid false positives
|
|
||||||
# SHA256 hashes (64 hex chars) - check longest first to avoid substring matches
|
|
||||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
|
||||||
for match in re.finditer(sha256_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# SHA1 hashes (40 hex chars)
|
|
||||||
sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
|
|
||||||
for match in re.finditer(sha1_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# MD5 hashes (32 hex chars)
|
|
||||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
|
||||||
for match in re.finditer(md5_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# IPv4 addresses
|
|
||||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
|
||||||
for match in re.finditer(ipv4_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# IPv6 addresses (supports compressed format)
|
|
||||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
|
||||||
for match in re.finditer(ipv6_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# URLs (check before domains to prevent double-matching)
|
|
||||||
# Fix: exclude trailing punctuation
|
|
||||||
url_pattern = r'https?://[^\s<>\"\']+(?<![.,;:!?\)\]\}])'
|
|
||||||
for match in re.finditer(url_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# Domain names (basic pattern)
|
|
||||||
domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
|
||||||
for match in re.finditer(domain_pattern, self.content):
|
|
||||||
# Filter out common false positives
|
|
||||||
if not match.group().startswith('example.'):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
# Email addresses
|
|
||||||
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
|
||||||
for match in re.finditer(email_pattern, self.content):
|
|
||||||
add_ioc_if_not_covered(match)
|
|
||||||
|
|
||||||
def calculate_hash(self):
|
|
||||||
# We hash the content + timestamp to ensure integrity of 'when' it was said
|
|
||||||
data = f"{self.timestamp}:{self.content}".encode('utf-8')
|
|
||||||
self.content_hash = hashlib.sha256(data).hexdigest()
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def extract_iocs_from_text(text):
|
|
||||||
"""Extract IOCs from text and return as list of (ioc, type) tuples"""
|
|
||||||
iocs = []
|
|
||||||
seen = set()
|
|
||||||
covered_ranges = set()
|
|
||||||
|
|
||||||
def add_ioc_if_not_covered(match_obj, ioc_type):
|
|
||||||
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
|
||||||
start, end = match_obj.start(), match_obj.end()
|
|
||||||
# Check if this range overlaps with any covered range
|
|
||||||
for covered_start, covered_end in covered_ranges:
|
|
||||||
if not (end <= covered_start or start >= covered_end):
|
|
||||||
return False # Overlaps, don't add
|
|
||||||
ioc_text = match_obj.group()
|
|
||||||
if ioc_text not in seen:
|
|
||||||
seen.add(ioc_text)
|
|
||||||
covered_ranges.add((start, end))
|
|
||||||
iocs.append((ioc_text, ioc_type))
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Process in priority order: longest hashes first
|
|
||||||
# SHA256 hashes (64 hex chars)
|
|
||||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
|
||||||
for match in re.finditer(sha256_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'sha256')
|
|
||||||
|
|
||||||
# SHA1 hashes (40 hex chars)
|
|
||||||
sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
|
|
||||||
for match in re.finditer(sha1_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'sha1')
|
|
||||||
|
|
||||||
# MD5 hashes (32 hex chars)
|
|
||||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
|
||||||
for match in re.finditer(md5_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'md5')
|
|
||||||
|
|
||||||
# IPv4 addresses
|
|
||||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
|
||||||
for match in re.finditer(ipv4_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'ipv4')
|
|
||||||
|
|
||||||
# IPv6 addresses (supports compressed format)
|
|
||||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
|
||||||
for match in re.finditer(ipv6_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'ipv6')
|
|
||||||
|
|
||||||
# URLs (check before domains to avoid double-matching)
|
|
||||||
# Fix: exclude trailing punctuation
|
|
||||||
url_pattern = r'https?://[^\s<>\"\']+(?<![.,;:!?\)\]\}])'
|
|
||||||
for match in re.finditer(url_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'url')
|
|
||||||
|
|
||||||
# Domain names (basic pattern)
|
|
||||||
domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
|
||||||
for match in re.finditer(domain_pattern, text):
|
|
||||||
# Filter out common false positives
|
|
||||||
if not match.group().startswith('example.'):
|
|
||||||
add_ioc_if_not_covered(match, 'domain')
|
|
||||||
|
|
||||||
# Email addresses
|
|
||||||
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
|
||||||
for match in re.finditer(email_pattern, text):
|
|
||||||
add_ioc_if_not_covered(match, 'email')
|
|
||||||
|
|
||||||
return iocs
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def extract_iocs_with_positions(text):
|
|
||||||
"""Extract IOCs with their positions for highlighting. Returns list of (text, start, end, type) tuples"""
|
|
||||||
import re
|
|
||||||
highlights = []
|
|
||||||
covered_ranges = set()
|
|
||||||
|
|
||||||
def overlaps(start, end):
|
|
||||||
"""Check if range overlaps with any covered range"""
|
|
||||||
for covered_start, covered_end in covered_ranges:
|
|
||||||
if not (end <= covered_start or start >= covered_end):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def add_highlight(match, ioc_type):
|
|
||||||
"""Add highlight if it doesn't overlap with existing ones"""
|
|
||||||
start, end = match.start(), match.end()
|
|
||||||
if not overlaps(start, end):
|
|
||||||
highlights.append((match.group(), start, end, ioc_type))
|
|
||||||
covered_ranges.add((start, end))
|
|
||||||
|
|
||||||
# Process in priority order: longest hashes first to avoid substring matches
|
|
||||||
# SHA256 hashes (64 hex chars)
|
|
||||||
for match in re.finditer(r'\b[a-fA-F0-9]{64}\b', text):
|
|
||||||
add_highlight(match, 'sha256')
|
|
||||||
|
|
||||||
# SHA1 hashes (40 hex chars)
|
|
||||||
for match in re.finditer(r'\b[a-fA-F0-9]{40}\b', text):
|
|
||||||
add_highlight(match, 'sha1')
|
|
||||||
|
|
||||||
# MD5 hashes (32 hex chars)
|
|
||||||
for match in re.finditer(r'\b[a-fA-F0-9]{32}\b', text):
|
|
||||||
add_highlight(match, 'md5')
|
|
||||||
|
|
||||||
# IPv4 addresses
|
|
||||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
|
||||||
for match in re.finditer(ipv4_pattern, text):
|
|
||||||
add_highlight(match, 'ipv4')
|
|
||||||
|
|
||||||
# IPv6 addresses (supports compressed format)
|
|
||||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
|
||||||
for match in re.finditer(ipv6_pattern, text):
|
|
||||||
add_highlight(match, 'ipv6')
|
|
||||||
|
|
||||||
# URLs (check before domains to prevent double-matching)
|
|
||||||
# Fix: exclude trailing punctuation
|
|
||||||
for match in re.finditer(r'https?://[^\s<>\"\']+(?<![.,;:!?\)\]\}])', text):
|
|
||||||
add_highlight(match, 'url')
|
|
||||||
|
|
||||||
# Domain names
|
|
||||||
for match in re.finditer(r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b', text):
|
|
||||||
if not match.group().startswith('example.'):
|
|
||||||
add_highlight(match, 'domain')
|
|
||||||
|
|
||||||
# Email addresses
|
|
||||||
for match in re.finditer(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
|
|
||||||
add_highlight(match, 'email')
|
|
||||||
|
|
||||||
return highlights
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return {
|
|
||||||
"note_id": self.note_id,
|
|
||||||
"content": self.content,
|
|
||||||
"timestamp": self.timestamp,
|
|
||||||
"content_hash": self.content_hash,
|
|
||||||
"signature": self.signature,
|
|
||||||
"tags": self.tags,
|
|
||||||
"iocs": self.iocs
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(data):
|
|
||||||
note = Note(
|
|
||||||
content=data["content"],
|
|
||||||
timestamp=data["timestamp"],
|
|
||||||
note_id=data["note_id"],
|
|
||||||
content_hash=data.get("content_hash", ""),
|
|
||||||
signature=data.get("signature"),
|
|
||||||
tags=data.get("tags", []),
|
|
||||||
iocs=data.get("iocs", [])
|
|
||||||
)
|
|
||||||
return note
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Evidence:
|
|
||||||
name: str
|
|
||||||
evidence_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
||||||
description: str = ""
|
|
||||||
metadata: Dict[str, str] = field(default_factory=dict)
|
|
||||||
notes: List[Note] = field(default_factory=list)
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return {
|
|
||||||
"evidence_id": self.evidence_id,
|
|
||||||
"name": self.name,
|
|
||||||
"description": self.description,
|
|
||||||
"metadata": self.metadata,
|
|
||||||
"notes": [n.to_dict() for n in self.notes]
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(data):
|
|
||||||
ev = Evidence(
|
|
||||||
name=data["name"],
|
|
||||||
evidence_id=data["evidence_id"],
|
|
||||||
description=data.get("description", ""),
|
|
||||||
metadata=data.get("metadata", {})
|
|
||||||
)
|
|
||||||
ev.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
|
||||||
return ev
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class Case:
|
|
||||||
case_number: str
|
|
||||||
case_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
||||||
name: str = ""
|
|
||||||
investigator: str = ""
|
|
||||||
evidence: List[Evidence] = field(default_factory=list)
|
|
||||||
notes: List[Note] = field(default_factory=list)
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return {
|
|
||||||
"case_id": self.case_id,
|
|
||||||
"case_number": self.case_number,
|
|
||||||
"name": self.name,
|
|
||||||
"investigator": self.investigator,
|
|
||||||
"evidence": [e.to_dict() for e in self.evidence],
|
|
||||||
"notes": [n.to_dict() for n in self.notes]
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(data):
|
|
||||||
case = Case(
|
|
||||||
case_number=data["case_number"],
|
|
||||||
case_id=data["case_id"],
|
|
||||||
name=data.get("name", ""),
|
|
||||||
investigator=data.get("investigator", "")
|
|
||||||
)
|
|
||||||
case.evidence = [Evidence.from_dict(e) for e in data.get("evidence", [])]
|
|
||||||
case.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
|
||||||
return case
|
|
||||||
131
trace/models/__init__.py
Normal file
131
trace/models/__init__.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
"""Data models for trace application"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import hashlib
|
||||||
|
import uuid
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List, Optional, Dict
|
||||||
|
|
||||||
|
from .extractors import TagExtractor, IOCExtractor
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Note:
|
||||||
|
content: str
|
||||||
|
timestamp: float = field(default_factory=time.time)
|
||||||
|
note_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||||
|
content_hash: str = ""
|
||||||
|
signature: Optional[str] = None
|
||||||
|
tags: List[str] = field(default_factory=list)
|
||||||
|
iocs: List[str] = field(default_factory=list)
|
||||||
|
|
||||||
|
def extract_tags(self):
|
||||||
|
"""Extract hashtags from content (case-insensitive, stored lowercase)"""
|
||||||
|
self.tags = TagExtractor.extract_tags(self.content)
|
||||||
|
|
||||||
|
def extract_iocs(self):
|
||||||
|
"""Extract Indicators of Compromise from content"""
|
||||||
|
self.iocs = IOCExtractor.extract_iocs(self.content)
|
||||||
|
|
||||||
|
def calculate_hash(self):
|
||||||
|
# We hash the content + timestamp to ensure integrity of 'when' it was said
|
||||||
|
data = f"{self.timestamp}:{self.content}".encode('utf-8')
|
||||||
|
self.content_hash = hashlib.sha256(data).hexdigest()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_iocs_from_text(text):
|
||||||
|
"""Extract IOCs from text and return as list of (ioc, type) tuples"""
|
||||||
|
return IOCExtractor.extract_iocs_with_types(text)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_iocs_with_positions(text):
|
||||||
|
"""Extract IOCs with their positions for highlighting. Returns list of (text, start, end, type) tuples"""
|
||||||
|
return IOCExtractor.extract_iocs_with_positions(text)
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return {
|
||||||
|
"note_id": self.note_id,
|
||||||
|
"content": self.content,
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
"content_hash": self.content_hash,
|
||||||
|
"signature": self.signature,
|
||||||
|
"tags": self.tags,
|
||||||
|
"iocs": self.iocs
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(data):
|
||||||
|
note = Note(
|
||||||
|
content=data["content"],
|
||||||
|
timestamp=data["timestamp"],
|
||||||
|
note_id=data["note_id"],
|
||||||
|
content_hash=data.get("content_hash", ""),
|
||||||
|
signature=data.get("signature"),
|
||||||
|
tags=data.get("tags", []),
|
||||||
|
iocs=data.get("iocs", [])
|
||||||
|
)
|
||||||
|
return note
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Evidence:
|
||||||
|
name: str
|
||||||
|
evidence_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||||
|
description: str = ""
|
||||||
|
metadata: Dict[str, str] = field(default_factory=dict)
|
||||||
|
notes: List[Note] = field(default_factory=list)
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return {
|
||||||
|
"evidence_id": self.evidence_id,
|
||||||
|
"name": self.name,
|
||||||
|
"description": self.description,
|
||||||
|
"metadata": self.metadata,
|
||||||
|
"notes": [n.to_dict() for n in self.notes]
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(data):
|
||||||
|
ev = Evidence(
|
||||||
|
name=data["name"],
|
||||||
|
evidence_id=data["evidence_id"],
|
||||||
|
description=data.get("description", ""),
|
||||||
|
metadata=data.get("metadata", {})
|
||||||
|
)
|
||||||
|
ev.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||||
|
return ev
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Case:
|
||||||
|
case_number: str
|
||||||
|
case_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||||
|
name: str = ""
|
||||||
|
investigator: str = ""
|
||||||
|
evidence: List[Evidence] = field(default_factory=list)
|
||||||
|
notes: List[Note] = field(default_factory=list)
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return {
|
||||||
|
"case_id": self.case_id,
|
||||||
|
"case_number": self.case_number,
|
||||||
|
"name": self.name,
|
||||||
|
"investigator": self.investigator,
|
||||||
|
"evidence": [e.to_dict() for e in self.evidence],
|
||||||
|
"notes": [n.to_dict() for n in self.notes]
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(data):
|
||||||
|
case = Case(
|
||||||
|
case_number=data["case_number"],
|
||||||
|
case_id=data["case_id"],
|
||||||
|
name=data.get("name", ""),
|
||||||
|
investigator=data.get("investigator", "")
|
||||||
|
)
|
||||||
|
case.evidence = [Evidence.from_dict(e) for e in data.get("evidence", [])]
|
||||||
|
case.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||||
|
return case
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ['Note', 'Evidence', 'Case', 'TagExtractor', 'IOCExtractor']
|
||||||
6
trace/models/extractors/__init__.py
Normal file
6
trace/models/extractors/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Extractors for tags and IOCs from note content"""
|
||||||
|
|
||||||
|
from .tag_extractor import TagExtractor
|
||||||
|
from .ioc_extractor import IOCExtractor
|
||||||
|
|
||||||
|
__all__ = ['TagExtractor', 'IOCExtractor']
|
||||||
236
trace/models/extractors/ioc_extractor.py
Normal file
236
trace/models/extractors/ioc_extractor.py
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
"""IOC (Indicator of Compromise) extraction logic for notes"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
class IOCExtractor:
|
||||||
|
"""Extract Indicators of Compromise from text content"""
|
||||||
|
|
||||||
|
# Regex patterns for different IOC types
|
||||||
|
SHA256_PATTERN = r'\b[a-fA-F0-9]{64}\b'
|
||||||
|
SHA1_PATTERN = r'\b[a-fA-F0-9]{40}\b'
|
||||||
|
MD5_PATTERN = r'\b[a-fA-F0-9]{32}\b'
|
||||||
|
IPV4_PATTERN = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
||||||
|
IPV6_PATTERN = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
||||||
|
URL_PATTERN = r'https?://[^\s<>\"\']+(?<![.,;:!?\)\]\}])'
|
||||||
|
DOMAIN_PATTERN = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
||||||
|
EMAIL_PATTERN = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_iocs(text: str) -> List[str]:
|
||||||
|
"""
|
||||||
|
Extract IOCs from text and return as simple list
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to extract IOCs from
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of unique IOC strings
|
||||||
|
"""
|
||||||
|
seen = set()
|
||||||
|
covered_ranges = set()
|
||||||
|
iocs = []
|
||||||
|
|
||||||
|
def add_ioc_if_not_covered(match_obj):
|
||||||
|
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
||||||
|
start, end = match_obj.start(), match_obj.end()
|
||||||
|
# Check if this range overlaps with any covered range
|
||||||
|
for covered_start, covered_end in covered_ranges:
|
||||||
|
if not (end <= covered_start or start >= covered_end):
|
||||||
|
return False # Overlaps, don't add
|
||||||
|
ioc_text = match_obj.group()
|
||||||
|
if ioc_text not in seen:
|
||||||
|
seen.add(ioc_text)
|
||||||
|
covered_ranges.add((start, end))
|
||||||
|
iocs.append(ioc_text)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Process in order of priority to avoid false positives
|
||||||
|
# SHA256 hashes (64 hex chars) - check longest first to avoid substring matches
|
||||||
|
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# SHA1 hashes (40 hex chars)
|
||||||
|
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# MD5 hashes (32 hex chars)
|
||||||
|
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# IPv4 addresses
|
||||||
|
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# IPv6 addresses (supports compressed format)
|
||||||
|
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# URLs (check before domains to prevent double-matching)
|
||||||
|
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# Domain names (basic pattern)
|
||||||
|
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||||
|
# Filter out common false positives
|
||||||
|
if not match.group().startswith('example.'):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
# Email addresses
|
||||||
|
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match)
|
||||||
|
|
||||||
|
return iocs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_iocs_with_types(text: str) -> List[Tuple[str, str]]:
|
||||||
|
"""
|
||||||
|
Extract IOCs from text and return as list of (ioc, type) tuples
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to extract IOCs from
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of (ioc_text, ioc_type) tuples
|
||||||
|
"""
|
||||||
|
iocs = []
|
||||||
|
seen = set()
|
||||||
|
covered_ranges = set()
|
||||||
|
|
||||||
|
def add_ioc_if_not_covered(match_obj, ioc_type):
|
||||||
|
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
||||||
|
start, end = match_obj.start(), match_obj.end()
|
||||||
|
# Check if this range overlaps with any covered range
|
||||||
|
for covered_start, covered_end in covered_ranges:
|
||||||
|
if not (end <= covered_start or start >= covered_end):
|
||||||
|
return False # Overlaps, don't add
|
||||||
|
ioc_text = match_obj.group()
|
||||||
|
if ioc_text not in seen:
|
||||||
|
seen.add(ioc_text)
|
||||||
|
covered_ranges.add((start, end))
|
||||||
|
iocs.append((ioc_text, ioc_type))
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Process in priority order: longest hashes first
|
||||||
|
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'sha256')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'sha1')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'md5')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'ipv4')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'ipv6')
|
||||||
|
|
||||||
|
# URLs (check before domains to avoid double-matching)
|
||||||
|
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'url')
|
||||||
|
|
||||||
|
# Domain names
|
||||||
|
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||||
|
# Filter out common false positives
|
||||||
|
if not match.group().startswith('example.'):
|
||||||
|
add_ioc_if_not_covered(match, 'domain')
|
||||||
|
|
||||||
|
# Email addresses
|
||||||
|
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||||
|
add_ioc_if_not_covered(match, 'email')
|
||||||
|
|
||||||
|
return iocs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_iocs_with_positions(text: str) -> List[Tuple[str, int, int, str]]:
|
||||||
|
"""
|
||||||
|
Extract IOCs with their positions for highlighting
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to extract IOCs from
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of (ioc_text, start_pos, end_pos, ioc_type) tuples
|
||||||
|
"""
|
||||||
|
highlights = []
|
||||||
|
covered_ranges = set()
|
||||||
|
|
||||||
|
def overlaps(start, end):
|
||||||
|
"""Check if range overlaps with any covered range"""
|
||||||
|
for covered_start, covered_end in covered_ranges:
|
||||||
|
if not (end <= covered_start or start >= covered_end):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def add_highlight(match, ioc_type):
|
||||||
|
"""Add highlight if it doesn't overlap with existing ones"""
|
||||||
|
start, end = match.start(), match.end()
|
||||||
|
if not overlaps(start, end):
|
||||||
|
highlights.append((match.group(), start, end, ioc_type))
|
||||||
|
covered_ranges.add((start, end))
|
||||||
|
|
||||||
|
# Process in priority order: longest hashes first to avoid substring matches
|
||||||
|
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||||
|
add_highlight(match, 'sha256')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||||
|
add_highlight(match, 'sha1')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||||
|
add_highlight(match, 'md5')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||||
|
add_highlight(match, 'ipv4')
|
||||||
|
|
||||||
|
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||||
|
add_highlight(match, 'ipv6')
|
||||||
|
|
||||||
|
# URLs (check before domains to prevent double-matching)
|
||||||
|
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||||
|
add_highlight(match, 'url')
|
||||||
|
|
||||||
|
# Domain names
|
||||||
|
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||||
|
if not match.group().startswith('example.'):
|
||||||
|
add_highlight(match, 'domain')
|
||||||
|
|
||||||
|
# Email addresses
|
||||||
|
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||||
|
add_highlight(match, 'email')
|
||||||
|
|
||||||
|
return highlights
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def classify_ioc(ioc: str) -> str:
|
||||||
|
"""
|
||||||
|
Classify an IOC by its type
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ioc: The IOC string to classify
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The IOC type as a string
|
||||||
|
"""
|
||||||
|
if re.fullmatch(IOCExtractor.SHA256_PATTERN, ioc):
|
||||||
|
return 'sha256'
|
||||||
|
elif re.fullmatch(IOCExtractor.SHA1_PATTERN, ioc):
|
||||||
|
return 'sha1'
|
||||||
|
elif re.fullmatch(IOCExtractor.MD5_PATTERN, ioc):
|
||||||
|
return 'md5'
|
||||||
|
elif re.fullmatch(IOCExtractor.IPV4_PATTERN, ioc):
|
||||||
|
return 'ipv4'
|
||||||
|
elif re.fullmatch(IOCExtractor.IPV6_PATTERN, ioc):
|
||||||
|
return 'ipv6'
|
||||||
|
elif re.fullmatch(IOCExtractor.EMAIL_PATTERN, ioc):
|
||||||
|
return 'email'
|
||||||
|
elif re.fullmatch(IOCExtractor.URL_PATTERN, ioc):
|
||||||
|
return 'url'
|
||||||
|
elif re.fullmatch(IOCExtractor.DOMAIN_PATTERN, ioc):
|
||||||
|
return 'domain'
|
||||||
|
else:
|
||||||
|
return 'unknown'
|
||||||
34
trace/models/extractors/tag_extractor.py
Normal file
34
trace/models/extractors/tag_extractor.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""Tag extraction logic for notes"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
class TagExtractor:
|
||||||
|
"""Extract hashtags from text content"""
|
||||||
|
|
||||||
|
TAG_PATTERN = r'#(\w+)'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def extract_tags(text: str) -> list[str]:
|
||||||
|
"""
|
||||||
|
Extract hashtags from content (case-insensitive, stored lowercase)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text: The text to extract tags from
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of unique tags in lowercase, preserving order
|
||||||
|
"""
|
||||||
|
# Match hashtags: # followed by word characters
|
||||||
|
matches = re.findall(TagExtractor.TAG_PATTERN, text)
|
||||||
|
|
||||||
|
# Convert to lowercase and remove duplicates while preserving order
|
||||||
|
seen = set()
|
||||||
|
tags = []
|
||||||
|
for tag in matches:
|
||||||
|
tag_lower = tag.lower()
|
||||||
|
if tag_lower not in seen:
|
||||||
|
seen.add(tag_lower)
|
||||||
|
tags.append(tag_lower)
|
||||||
|
|
||||||
|
return tags
|
||||||
404
trace/storage.py
404
trace/storage.py
@@ -1,402 +1,6 @@
|
|||||||
import json
|
"""Storage module - backward compatibility wrapper"""
|
||||||
import time
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import List, Optional, Tuple
|
|
||||||
from .models import Case, Evidence, Note
|
|
||||||
|
|
||||||
DEFAULT_APP_DIR = Path.home() / ".trace"
|
# For backward compatibility, export all classes from storage_impl
|
||||||
|
from .storage_impl import Storage, StateManager, LockManager, create_demo_case
|
||||||
|
|
||||||
class LockManager:
|
__all__ = ['Storage', 'StateManager', 'LockManager', 'create_demo_case']
|
||||||
"""Cross-platform file lock manager to prevent concurrent access"""
|
|
||||||
def __init__(self, lock_file: Path):
|
|
||||||
self.lock_file = lock_file
|
|
||||||
self.acquired = False
|
|
||||||
|
|
||||||
def acquire(self, timeout: int = 5):
|
|
||||||
"""Acquire lock with timeout. Returns True if successful."""
|
|
||||||
start_time = time.time()
|
|
||||||
while time.time() - start_time < timeout:
|
|
||||||
try:
|
|
||||||
# Try to create lock file exclusively (fails if exists)
|
|
||||||
# Use 'x' mode which fails if file exists (atomic on most systems)
|
|
||||||
fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
||||||
os.write(fd, str(os.getpid()).encode())
|
|
||||||
os.close(fd)
|
|
||||||
self.acquired = True
|
|
||||||
return True
|
|
||||||
except FileExistsError:
|
|
||||||
# Lock file exists, check if process is still alive
|
|
||||||
if self._is_stale_lock():
|
|
||||||
# Remove stale lock and retry
|
|
||||||
try:
|
|
||||||
self.lock_file.unlink()
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
continue
|
|
||||||
# Active lock, wait a bit
|
|
||||||
time.sleep(0.1)
|
|
||||||
except Exception:
|
|
||||||
# Other errors, wait and retry
|
|
||||||
time.sleep(0.1)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _is_stale_lock(self):
|
|
||||||
"""Check if lock file is stale (process no longer exists)"""
|
|
||||||
try:
|
|
||||||
if not self.lock_file.exists():
|
|
||||||
return False
|
|
||||||
with open(self.lock_file, 'r') as f:
|
|
||||||
pid = int(f.read().strip())
|
|
||||||
|
|
||||||
# Check if process exists (cross-platform)
|
|
||||||
if sys.platform == 'win32':
|
|
||||||
import ctypes
|
|
||||||
kernel32 = ctypes.windll.kernel32
|
|
||||||
PROCESS_QUERY_INFORMATION = 0x0400
|
|
||||||
handle = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid)
|
|
||||||
if handle:
|
|
||||||
kernel32.CloseHandle(handle)
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
# Unix/Linux - send signal 0 to check if process exists
|
|
||||||
try:
|
|
||||||
os.kill(pid, 0)
|
|
||||||
return False # Process exists
|
|
||||||
except OSError:
|
|
||||||
return True # Process doesn't exist
|
|
||||||
except (ValueError, FileNotFoundError, PermissionError):
|
|
||||||
return True
|
|
||||||
|
|
||||||
def release(self):
|
|
||||||
"""Release the lock"""
|
|
||||||
if self.acquired:
|
|
||||||
try:
|
|
||||||
self.lock_file.unlink()
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass
|
|
||||||
self.acquired = False
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
if not self.acquire():
|
|
||||||
raise RuntimeError("Could not acquire lock: another instance is running")
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
self.release()
|
|
||||||
|
|
||||||
class Storage:
|
|
||||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR, acquire_lock: bool = True):
|
|
||||||
self.app_dir = app_dir
|
|
||||||
self.data_file = self.app_dir / "data.json"
|
|
||||||
self.lock_file = self.app_dir / "app.lock"
|
|
||||||
self.lock_manager = None
|
|
||||||
self._ensure_app_dir()
|
|
||||||
|
|
||||||
# Acquire lock to prevent concurrent access
|
|
||||||
if acquire_lock:
|
|
||||||
self.lock_manager = LockManager(self.lock_file)
|
|
||||||
if not self.lock_manager.acquire(timeout=5):
|
|
||||||
raise RuntimeError("Another instance of trace is already running. Please close it first.")
|
|
||||||
|
|
||||||
self.cases: List[Case] = self._load_data()
|
|
||||||
|
|
||||||
# Create demo case on first launch (only if data loaded successfully and is empty)
|
|
||||||
if not self.cases and self.data_file.exists():
|
|
||||||
# File exists but is empty - could be first run after successful load
|
|
||||||
pass
|
|
||||||
elif not self.cases and not self.data_file.exists():
|
|
||||||
# No file exists - first run
|
|
||||||
self._create_demo_case()
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
"""Release lock when Storage object is destroyed"""
|
|
||||||
if self.lock_manager:
|
|
||||||
self.lock_manager.release()
|
|
||||||
|
|
||||||
def _ensure_app_dir(self):
|
|
||||||
if not self.app_dir.exists():
|
|
||||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
def _create_demo_case(self):
|
|
||||||
"""Create a demo case with evidence showcasing all features"""
|
|
||||||
demo_case = Case(
|
|
||||||
case_number="DEMO-2024-001",
|
|
||||||
name="Sample Investigation",
|
|
||||||
investigator="Demo User"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add case-level notes to demonstrate case notes feature
|
|
||||||
case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident.
|
|
||||||
|
|
||||||
Key objectives:
|
|
||||||
- Identify compromised systems
|
|
||||||
- Determine scope of data loss
|
|
||||||
- Document timeline of events
|
|
||||||
|
|
||||||
#incident-response #data-breach #investigation""")
|
|
||||||
case_note1.calculate_hash()
|
|
||||||
case_note1.extract_tags()
|
|
||||||
case_note1.extract_iocs()
|
|
||||||
demo_case.notes.append(case_note1)
|
|
||||||
|
|
||||||
case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com
|
|
||||||
Initial analysis shows potential credential harvesting attempt.
|
|
||||||
Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
|
||||||
case_note2.calculate_hash()
|
|
||||||
case_note2.extract_tags()
|
|
||||||
case_note2.extract_iocs()
|
|
||||||
demo_case.notes.append(case_note2)
|
|
||||||
|
|
||||||
# Create evidence 1: Compromised laptop
|
|
||||||
evidence1 = Evidence(
|
|
||||||
name="Employee Laptop HDD",
|
|
||||||
description="Primary workstation hard drive - user reported suspicious activity"
|
|
||||||
)
|
|
||||||
# Add source hash for chain of custody demonstration
|
|
||||||
evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
|
||||||
|
|
||||||
# Add notes to evidence 1 with various features
|
|
||||||
note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager.
|
|
||||||
Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
|
||||||
|
|
||||||
Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""")
|
|
||||||
note1.calculate_hash()
|
|
||||||
note1.extract_tags()
|
|
||||||
note1.extract_iocs()
|
|
||||||
evidence1.notes.append(note1)
|
|
||||||
|
|
||||||
note2 = Note(content="""Discovered suspicious connections to external IP addresses:
|
|
||||||
- 192.168.1.100 (local gateway)
|
|
||||||
- 203.0.113.45 (external, geolocation: Unknown)
|
|
||||||
- 198.51.100.78 (command and control server suspected)
|
|
||||||
|
|
||||||
Browser history shows visits to malicious-site.com and data-exfil.net.
|
|
||||||
#network-analysis #ioc #c2-server""")
|
|
||||||
note2.calculate_hash()
|
|
||||||
note2.extract_tags()
|
|
||||||
note2.extract_iocs()
|
|
||||||
evidence1.notes.append(note2)
|
|
||||||
|
|
||||||
note3 = Note(content="""Malware identified in temp directory:
|
|
||||||
File: evil.exe
|
|
||||||
MD5: d41d8cd98f00b204e9800998ecf8427e
|
|
||||||
SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709
|
|
||||||
SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
|
||||||
|
|
||||||
Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""")
|
|
||||||
note3.calculate_hash()
|
|
||||||
note3.extract_tags()
|
|
||||||
note3.extract_iocs()
|
|
||||||
evidence1.notes.append(note3)
|
|
||||||
|
|
||||||
note4 = Note(content="""Timeline analysis reveals:
|
|
||||||
- 2024-01-15 09:23:45 - Suspicious email received
|
|
||||||
- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login
|
|
||||||
- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site
|
|
||||||
- 2024-01-15 09:30:15 - Lateral movement detected
|
|
||||||
|
|
||||||
User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""")
|
|
||||||
note4.calculate_hash()
|
|
||||||
note4.extract_tags()
|
|
||||||
note4.extract_iocs()
|
|
||||||
evidence1.notes.append(note4)
|
|
||||||
|
|
||||||
demo_case.evidence.append(evidence1)
|
|
||||||
|
|
||||||
# Create evidence 2: Network logs
|
|
||||||
evidence2 = Evidence(
|
|
||||||
name="Firewall Logs",
|
|
||||||
description="Corporate firewall logs from incident timeframe"
|
|
||||||
)
|
|
||||||
evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2"
|
|
||||||
|
|
||||||
note5 = Note(content="""Log analysis shows outbound connections to suspicious domains:
|
|
||||||
- attacker-c2.com on port 443 (encrypted channel)
|
|
||||||
- data-upload.net on port 8080 (unencrypted)
|
|
||||||
- exfil-server.org on port 22 (SSH tunnel)
|
|
||||||
|
|
||||||
Total data transferred: approximately 2.3 GB over 4 hours.
|
|
||||||
#log-analysis #data-exfiltration #network-traffic""")
|
|
||||||
note5.calculate_hash()
|
|
||||||
note5.extract_tags()
|
|
||||||
note5.extract_iocs()
|
|
||||||
evidence2.notes.append(note5)
|
|
||||||
|
|
||||||
note6 = Note(content="""Contact information found in malware configuration:
|
|
||||||
Email: attacker@malicious-domain.com
|
|
||||||
Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6)
|
|
||||||
|
|
||||||
Cross-referencing with threat intelligence databases. #threat-intel #attribution""")
|
|
||||||
note6.calculate_hash()
|
|
||||||
note6.extract_tags()
|
|
||||||
note6.extract_iocs()
|
|
||||||
evidence2.notes.append(note6)
|
|
||||||
|
|
||||||
demo_case.evidence.append(evidence2)
|
|
||||||
|
|
||||||
# Create evidence 3: Email forensics
|
|
||||||
evidence3 = Evidence(
|
|
||||||
name="Phishing Email",
|
|
||||||
description="Original phishing email preserved in .eml format"
|
|
||||||
)
|
|
||||||
|
|
||||||
note7 = Note(content="""Email headers analysis:
|
|
||||||
From: sender@phishing-domain.com (spoofed)
|
|
||||||
Reply-To: attacker@evil-mail-server.net
|
|
||||||
X-Originating-IP: 198.51.100.99
|
|
||||||
|
|
||||||
Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif
|
|
||||||
Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""")
|
|
||||||
note7.calculate_hash()
|
|
||||||
note7.extract_tags()
|
|
||||||
note7.extract_iocs()
|
|
||||||
evidence3.notes.append(note7)
|
|
||||||
|
|
||||||
demo_case.evidence.append(evidence3)
|
|
||||||
|
|
||||||
# Add the demo case to storage
|
|
||||||
self.cases.append(demo_case)
|
|
||||||
self.save_data()
|
|
||||||
|
|
||||||
def _load_data(self) -> List[Case]:
|
|
||||||
if not self.data_file.exists():
|
|
||||||
return []
|
|
||||||
try:
|
|
||||||
with open(self.data_file, 'r', encoding='utf-8') as f:
|
|
||||||
data = json.load(f)
|
|
||||||
return [Case.from_dict(c) for c in data]
|
|
||||||
except (json.JSONDecodeError, IOError, KeyError, ValueError) as e:
|
|
||||||
# Corrupted JSON - create backup and raise exception
|
|
||||||
import shutil
|
|
||||||
from datetime import datetime
|
|
||||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
||||||
backup_file = self.app_dir / f"data.json.corrupted.{timestamp}"
|
|
||||||
try:
|
|
||||||
shutil.copy2(self.data_file, backup_file)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
# Raise exception with information about backup
|
|
||||||
raise RuntimeError(f"Data file is corrupted. Backup saved to: {backup_file}\nError: {e}")
|
|
||||||
|
|
||||||
def start_fresh(self):
|
|
||||||
"""Start with fresh data (for corrupted JSON recovery)"""
|
|
||||||
self.cases = []
|
|
||||||
self._create_demo_case()
|
|
||||||
|
|
||||||
def save_data(self):
|
|
||||||
data = [c.to_dict() for c in self.cases]
|
|
||||||
# Write to temp file then rename for atomic-ish write
|
|
||||||
temp_file = self.data_file.with_suffix(".tmp")
|
|
||||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
||||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
|
||||||
temp_file.replace(self.data_file)
|
|
||||||
|
|
||||||
def add_case(self, case: Case):
|
|
||||||
self.cases.append(case)
|
|
||||||
self.save_data()
|
|
||||||
|
|
||||||
def get_case(self, case_id: str) -> Optional[Case]:
|
|
||||||
# Case ID lookup
|
|
||||||
for c in self.cases:
|
|
||||||
if c.case_id == case_id:
|
|
||||||
return c
|
|
||||||
return None
|
|
||||||
|
|
||||||
def delete_case(self, case_id: str):
|
|
||||||
self.cases = [c for c in self.cases if c.case_id != case_id]
|
|
||||||
self.save_data()
|
|
||||||
|
|
||||||
def delete_evidence(self, case_id: str, evidence_id: str):
|
|
||||||
case = self.get_case(case_id)
|
|
||||||
if case:
|
|
||||||
case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id]
|
|
||||||
self.save_data()
|
|
||||||
|
|
||||||
def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]:
|
|
||||||
for c in self.cases:
|
|
||||||
for e in c.evidence:
|
|
||||||
if e.evidence_id == evidence_id:
|
|
||||||
return c, e
|
|
||||||
return None, None
|
|
||||||
|
|
||||||
class StateManager:
|
|
||||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR):
|
|
||||||
self.app_dir = app_dir
|
|
||||||
self.state_file = self.app_dir / "state"
|
|
||||||
self.settings_file = self.app_dir / "settings.json"
|
|
||||||
self._ensure_app_dir()
|
|
||||||
|
|
||||||
def _ensure_app_dir(self):
|
|
||||||
if not self.app_dir.exists():
|
|
||||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
|
||||||
|
|
||||||
def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None):
|
|
||||||
state = self.get_active()
|
|
||||||
state["case_id"] = case_id
|
|
||||||
state["evidence_id"] = evidence_id
|
|
||||||
# Atomic write: write to temp file then rename
|
|
||||||
temp_file = self.state_file.with_suffix(".tmp")
|
|
||||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
||||||
json.dump(state, f, ensure_ascii=False)
|
|
||||||
temp_file.replace(self.state_file)
|
|
||||||
|
|
||||||
def get_active(self) -> dict:
|
|
||||||
if not self.state_file.exists():
|
|
||||||
return {"case_id": None, "evidence_id": None}
|
|
||||||
try:
|
|
||||||
with open(self.state_file, 'r', encoding='utf-8') as f:
|
|
||||||
return json.load(f)
|
|
||||||
except (json.JSONDecodeError, IOError):
|
|
||||||
return {"case_id": None, "evidence_id": None}
|
|
||||||
|
|
||||||
def validate_and_clear_stale(self, storage: 'Storage') -> str:
|
|
||||||
"""Validate active state against storage and clear stale references.
|
|
||||||
Returns warning message if state was cleared, empty string otherwise."""
|
|
||||||
state = self.get_active()
|
|
||||||
case_id = state.get("case_id")
|
|
||||||
evidence_id = state.get("evidence_id")
|
|
||||||
warning = ""
|
|
||||||
|
|
||||||
if case_id:
|
|
||||||
case = storage.get_case(case_id)
|
|
||||||
if not case:
|
|
||||||
warning = f"Active case (ID: {case_id[:8]}...) no longer exists. Clearing active context."
|
|
||||||
self.set_active(None, None)
|
|
||||||
return warning
|
|
||||||
|
|
||||||
# Validate evidence if set
|
|
||||||
if evidence_id:
|
|
||||||
_, evidence = storage.find_evidence(evidence_id)
|
|
||||||
if not evidence:
|
|
||||||
warning = f"Active evidence (ID: {evidence_id[:8]}...) no longer exists. Clearing to case level."
|
|
||||||
self.set_active(case_id, None)
|
|
||||||
return warning
|
|
||||||
|
|
||||||
elif evidence_id:
|
|
||||||
# Evidence set but no case - invalid state
|
|
||||||
warning = "Invalid state: evidence set without case. Clearing active context."
|
|
||||||
self.set_active(None, None)
|
|
||||||
return warning
|
|
||||||
|
|
||||||
return warning
|
|
||||||
|
|
||||||
def get_settings(self) -> dict:
|
|
||||||
if not self.settings_file.exists():
|
|
||||||
return {"pgp_enabled": True}
|
|
||||||
try:
|
|
||||||
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
|
||||||
return json.load(f)
|
|
||||||
except (json.JSONDecodeError, IOError):
|
|
||||||
return {"pgp_enabled": True}
|
|
||||||
|
|
||||||
def set_setting(self, key: str, value):
|
|
||||||
settings = self.get_settings()
|
|
||||||
settings[key] = value
|
|
||||||
# Atomic write: write to temp file then rename
|
|
||||||
temp_file = self.settings_file.with_suffix(".tmp")
|
|
||||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
|
||||||
json.dump(settings, f, ensure_ascii=False)
|
|
||||||
temp_file.replace(self.settings_file)
|
|
||||||
|
|||||||
8
trace/storage_impl/__init__.py
Normal file
8
trace/storage_impl/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
"""Storage implementation modules"""
|
||||||
|
|
||||||
|
from .lock_manager import LockManager
|
||||||
|
from .state_manager import StateManager
|
||||||
|
from .storage import Storage
|
||||||
|
from .demo_data import create_demo_case
|
||||||
|
|
||||||
|
__all__ = ['LockManager', 'StateManager', 'Storage', 'create_demo_case']
|
||||||
143
trace/storage_impl/demo_data.py
Normal file
143
trace/storage_impl/demo_data.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
"""Demo case creation for first-time users"""
|
||||||
|
|
||||||
|
from ..models import Case, Evidence, Note
|
||||||
|
|
||||||
|
|
||||||
|
def create_demo_case() -> Case:
|
||||||
|
"""Create a demo case with evidence showcasing all features"""
|
||||||
|
demo_case = Case(
|
||||||
|
case_number="DEMO-2024-001",
|
||||||
|
name="Sample Investigation",
|
||||||
|
investigator="Demo User"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add case-level notes to demonstrate case notes feature
|
||||||
|
case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident.
|
||||||
|
|
||||||
|
Key objectives:
|
||||||
|
- Identify compromised systems
|
||||||
|
- Determine scope of data loss
|
||||||
|
- Document timeline of events
|
||||||
|
|
||||||
|
#incident-response #data-breach #investigation""")
|
||||||
|
case_note1.calculate_hash()
|
||||||
|
case_note1.extract_tags()
|
||||||
|
case_note1.extract_iocs()
|
||||||
|
demo_case.notes.append(case_note1)
|
||||||
|
|
||||||
|
case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com
|
||||||
|
Initial analysis shows potential credential harvesting attempt.
|
||||||
|
Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
||||||
|
case_note2.calculate_hash()
|
||||||
|
case_note2.extract_tags()
|
||||||
|
case_note2.extract_iocs()
|
||||||
|
demo_case.notes.append(case_note2)
|
||||||
|
|
||||||
|
# Create evidence 1: Compromised laptop
|
||||||
|
evidence1 = Evidence(
|
||||||
|
name="Employee Laptop HDD",
|
||||||
|
description="Primary workstation hard drive - user reported suspicious activity"
|
||||||
|
)
|
||||||
|
# Add source hash for chain of custody demonstration
|
||||||
|
evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||||
|
|
||||||
|
# Add notes to evidence 1 with various features
|
||||||
|
note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager.
|
||||||
|
Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||||
|
|
||||||
|
Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""")
|
||||||
|
note1.calculate_hash()
|
||||||
|
note1.extract_tags()
|
||||||
|
note1.extract_iocs()
|
||||||
|
evidence1.notes.append(note1)
|
||||||
|
|
||||||
|
note2 = Note(content="""Discovered suspicious connections to external IP addresses:
|
||||||
|
- 192.168.1.100 (local gateway)
|
||||||
|
- 203.0.113.45 (external, geolocation: Unknown)
|
||||||
|
- 198.51.100.78 (command and control server suspected)
|
||||||
|
|
||||||
|
Browser history shows visits to malicious-site.com and data-exfil.net.
|
||||||
|
#network-analysis #ioc #c2-server""")
|
||||||
|
note2.calculate_hash()
|
||||||
|
note2.extract_tags()
|
||||||
|
note2.extract_iocs()
|
||||||
|
evidence1.notes.append(note2)
|
||||||
|
|
||||||
|
note3 = Note(content="""Malware identified in temp directory:
|
||||||
|
File: evil.exe
|
||||||
|
MD5: d41d8cd98f00b204e9800998ecf8427e
|
||||||
|
SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709
|
||||||
|
SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||||
|
|
||||||
|
Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""")
|
||||||
|
note3.calculate_hash()
|
||||||
|
note3.extract_tags()
|
||||||
|
note3.extract_iocs()
|
||||||
|
evidence1.notes.append(note3)
|
||||||
|
|
||||||
|
note4 = Note(content="""Timeline analysis reveals:
|
||||||
|
- 2024-01-15 09:23:45 - Suspicious email received
|
||||||
|
- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login
|
||||||
|
- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site
|
||||||
|
- 2024-01-15 09:30:15 - Lateral movement detected
|
||||||
|
|
||||||
|
User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""")
|
||||||
|
note4.calculate_hash()
|
||||||
|
note4.extract_tags()
|
||||||
|
note4.extract_iocs()
|
||||||
|
evidence1.notes.append(note4)
|
||||||
|
|
||||||
|
demo_case.evidence.append(evidence1)
|
||||||
|
|
||||||
|
# Create evidence 2: Network logs
|
||||||
|
evidence2 = Evidence(
|
||||||
|
name="Firewall Logs",
|
||||||
|
description="Corporate firewall logs from incident timeframe"
|
||||||
|
)
|
||||||
|
evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2"
|
||||||
|
|
||||||
|
note5 = Note(content="""Log analysis shows outbound connections to suspicious domains:
|
||||||
|
- attacker-c2.com on port 443 (encrypted channel)
|
||||||
|
- data-upload.net on port 8080 (unencrypted)
|
||||||
|
- exfil-server.org on port 22 (SSH tunnel)
|
||||||
|
|
||||||
|
Total data transferred: approximately 2.3 GB over 4 hours.
|
||||||
|
#log-analysis #data-exfiltration #network-traffic""")
|
||||||
|
note5.calculate_hash()
|
||||||
|
note5.extract_tags()
|
||||||
|
note5.extract_iocs()
|
||||||
|
evidence2.notes.append(note5)
|
||||||
|
|
||||||
|
note6 = Note(content="""Contact information found in malware configuration:
|
||||||
|
Email: attacker@malicious-domain.com
|
||||||
|
Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6)
|
||||||
|
|
||||||
|
Cross-referencing with threat intelligence databases. #threat-intel #attribution""")
|
||||||
|
note6.calculate_hash()
|
||||||
|
note6.extract_tags()
|
||||||
|
note6.extract_iocs()
|
||||||
|
evidence2.notes.append(note6)
|
||||||
|
|
||||||
|
demo_case.evidence.append(evidence2)
|
||||||
|
|
||||||
|
# Create evidence 3: Email forensics
|
||||||
|
evidence3 = Evidence(
|
||||||
|
name="Phishing Email",
|
||||||
|
description="Original phishing email preserved in .eml format"
|
||||||
|
)
|
||||||
|
|
||||||
|
note7 = Note(content="""Email headers analysis:
|
||||||
|
From: sender@phishing-domain.com (spoofed)
|
||||||
|
Reply-To: attacker@evil-mail-server.net
|
||||||
|
X-Originating-IP: 198.51.100.99
|
||||||
|
|
||||||
|
Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif
|
||||||
|
Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""")
|
||||||
|
note7.calculate_hash()
|
||||||
|
note7.extract_tags()
|
||||||
|
note7.extract_iocs()
|
||||||
|
evidence3.notes.append(note7)
|
||||||
|
|
||||||
|
demo_case.evidence.append(evidence3)
|
||||||
|
|
||||||
|
return demo_case
|
||||||
87
trace/storage_impl/lock_manager.py
Normal file
87
trace/storage_impl/lock_manager.py
Normal file
@@ -0,0 +1,87 @@
|
|||||||
|
"""File lock manager for preventing concurrent access"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
class LockManager:
|
||||||
|
"""Cross-platform file lock manager to prevent concurrent access"""
|
||||||
|
|
||||||
|
def __init__(self, lock_file: Path):
|
||||||
|
self.lock_file = lock_file
|
||||||
|
self.acquired = False
|
||||||
|
|
||||||
|
def acquire(self, timeout: int = 5):
|
||||||
|
"""Acquire lock with timeout. Returns True if successful."""
|
||||||
|
start_time = time.time()
|
||||||
|
while time.time() - start_time < timeout:
|
||||||
|
try:
|
||||||
|
# Try to create lock file exclusively (fails if exists)
|
||||||
|
# Use 'x' mode which fails if file exists (atomic on most systems)
|
||||||
|
fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||||
|
os.write(fd, str(os.getpid()).encode())
|
||||||
|
os.close(fd)
|
||||||
|
self.acquired = True
|
||||||
|
return True
|
||||||
|
except FileExistsError:
|
||||||
|
# Lock file exists, check if process is still alive
|
||||||
|
if self._is_stale_lock():
|
||||||
|
# Remove stale lock and retry
|
||||||
|
try:
|
||||||
|
self.lock_file.unlink()
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
continue
|
||||||
|
# Active lock, wait a bit
|
||||||
|
time.sleep(0.1)
|
||||||
|
except Exception:
|
||||||
|
# Other errors, wait and retry
|
||||||
|
time.sleep(0.1)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _is_stale_lock(self):
|
||||||
|
"""Check if lock file is stale (process no longer exists)"""
|
||||||
|
try:
|
||||||
|
if not self.lock_file.exists():
|
||||||
|
return False
|
||||||
|
with open(self.lock_file, 'r') as f:
|
||||||
|
pid = int(f.read().strip())
|
||||||
|
|
||||||
|
# Check if process exists (cross-platform)
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
import ctypes
|
||||||
|
kernel32 = ctypes.windll.kernel32
|
||||||
|
PROCESS_QUERY_INFORMATION = 0x0400
|
||||||
|
handle = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid)
|
||||||
|
if handle:
|
||||||
|
kernel32.CloseHandle(handle)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# Unix/Linux - send signal 0 to check if process exists
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0)
|
||||||
|
return False # Process exists
|
||||||
|
except OSError:
|
||||||
|
return True # Process doesn't exist
|
||||||
|
except (ValueError, FileNotFoundError, PermissionError):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def release(self):
|
||||||
|
"""Release the lock"""
|
||||||
|
if self.acquired:
|
||||||
|
try:
|
||||||
|
self.lock_file.unlink()
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
self.acquired = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
if not self.acquire():
|
||||||
|
raise RuntimeError("Could not acquire lock: another instance is running")
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.release()
|
||||||
92
trace/storage_impl/state_manager.py
Normal file
92
trace/storage_impl/state_manager.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
"""State manager for active context and settings"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional, TYPE_CHECKING
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .storage import Storage
|
||||||
|
|
||||||
|
DEFAULT_APP_DIR = Path.home() / ".trace"
|
||||||
|
|
||||||
|
|
||||||
|
class StateManager:
|
||||||
|
"""Manages active context and user settings"""
|
||||||
|
|
||||||
|
def __init__(self, app_dir: Path = DEFAULT_APP_DIR):
|
||||||
|
self.app_dir = app_dir
|
||||||
|
self.state_file = self.app_dir / "state"
|
||||||
|
self.settings_file = self.app_dir / "settings.json"
|
||||||
|
self._ensure_app_dir()
|
||||||
|
|
||||||
|
def _ensure_app_dir(self):
|
||||||
|
if not self.app_dir.exists():
|
||||||
|
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None):
|
||||||
|
state = self.get_active()
|
||||||
|
state["case_id"] = case_id
|
||||||
|
state["evidence_id"] = evidence_id
|
||||||
|
# Atomic write: write to temp file then rename
|
||||||
|
temp_file = self.state_file.with_suffix(".tmp")
|
||||||
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(state, f, ensure_ascii=False)
|
||||||
|
temp_file.replace(self.state_file)
|
||||||
|
|
||||||
|
def get_active(self) -> dict:
|
||||||
|
if not self.state_file.exists():
|
||||||
|
return {"case_id": None, "evidence_id": None}
|
||||||
|
try:
|
||||||
|
with open(self.state_file, 'r', encoding='utf-8') as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (json.JSONDecodeError, IOError):
|
||||||
|
return {"case_id": None, "evidence_id": None}
|
||||||
|
|
||||||
|
def validate_and_clear_stale(self, storage: 'Storage') -> str:
|
||||||
|
"""Validate active state against storage and clear stale references.
|
||||||
|
Returns warning message if state was cleared, empty string otherwise."""
|
||||||
|
state = self.get_active()
|
||||||
|
case_id = state.get("case_id")
|
||||||
|
evidence_id = state.get("evidence_id")
|
||||||
|
warning = ""
|
||||||
|
|
||||||
|
if case_id:
|
||||||
|
case = storage.get_case(case_id)
|
||||||
|
if not case:
|
||||||
|
warning = f"Active case (ID: {case_id[:8]}...) no longer exists. Clearing active context."
|
||||||
|
self.set_active(None, None)
|
||||||
|
return warning
|
||||||
|
|
||||||
|
# Validate evidence if set
|
||||||
|
if evidence_id:
|
||||||
|
_, evidence = storage.find_evidence(evidence_id)
|
||||||
|
if not evidence:
|
||||||
|
warning = f"Active evidence (ID: {evidence_id[:8]}...) no longer exists. Clearing to case level."
|
||||||
|
self.set_active(case_id, None)
|
||||||
|
return warning
|
||||||
|
|
||||||
|
elif evidence_id:
|
||||||
|
# Evidence set but no case - invalid state
|
||||||
|
warning = "Invalid state: evidence set without case. Clearing active context."
|
||||||
|
self.set_active(None, None)
|
||||||
|
return warning
|
||||||
|
|
||||||
|
return warning
|
||||||
|
|
||||||
|
def get_settings(self) -> dict:
|
||||||
|
if not self.settings_file.exists():
|
||||||
|
return {"pgp_enabled": True}
|
||||||
|
try:
|
||||||
|
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
||||||
|
return json.load(f)
|
||||||
|
except (json.JSONDecodeError, IOError):
|
||||||
|
return {"pgp_enabled": True}
|
||||||
|
|
||||||
|
def set_setting(self, key: str, value):
|
||||||
|
settings = self.get_settings()
|
||||||
|
settings[key] = value
|
||||||
|
# Atomic write: write to temp file then rename
|
||||||
|
temp_file = self.settings_file.with_suffix(".tmp")
|
||||||
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(settings, f, ensure_ascii=False)
|
||||||
|
temp_file.replace(self.settings_file)
|
||||||
112
trace/storage_impl/storage.py
Normal file
112
trace/storage_impl/storage.py
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
"""Main storage class for persisting cases, evidence, and notes"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Optional, Tuple
|
||||||
|
|
||||||
|
from ..models import Case, Evidence
|
||||||
|
from .lock_manager import LockManager
|
||||||
|
from .demo_data import create_demo_case
|
||||||
|
|
||||||
|
DEFAULT_APP_DIR = Path.home() / ".trace"
|
||||||
|
|
||||||
|
|
||||||
|
class Storage:
|
||||||
|
"""Manages persistence of all forensic data"""
|
||||||
|
|
||||||
|
def __init__(self, app_dir: Path = DEFAULT_APP_DIR, acquire_lock: bool = True):
|
||||||
|
self.app_dir = app_dir
|
||||||
|
self.data_file = self.app_dir / "data.json"
|
||||||
|
self.lock_file = self.app_dir / "app.lock"
|
||||||
|
self.lock_manager = None
|
||||||
|
self._ensure_app_dir()
|
||||||
|
|
||||||
|
# Acquire lock to prevent concurrent access
|
||||||
|
if acquire_lock:
|
||||||
|
self.lock_manager = LockManager(self.lock_file)
|
||||||
|
if not self.lock_manager.acquire(timeout=5):
|
||||||
|
raise RuntimeError("Another instance of trace is already running. Please close it first.")
|
||||||
|
|
||||||
|
self.cases: List[Case] = self._load_data()
|
||||||
|
|
||||||
|
# Create demo case on first launch (only if data loaded successfully and is empty)
|
||||||
|
if not self.cases and self.data_file.exists():
|
||||||
|
# File exists but is empty - could be first run after successful load
|
||||||
|
pass
|
||||||
|
elif not self.cases and not self.data_file.exists():
|
||||||
|
# No file exists - first run
|
||||||
|
demo_case = create_demo_case()
|
||||||
|
self.cases.append(demo_case)
|
||||||
|
self.save_data()
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
"""Release lock when Storage object is destroyed"""
|
||||||
|
if self.lock_manager:
|
||||||
|
self.lock_manager.release()
|
||||||
|
|
||||||
|
def _ensure_app_dir(self):
|
||||||
|
if not self.app_dir.exists():
|
||||||
|
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
def _load_data(self) -> List[Case]:
|
||||||
|
if not self.data_file.exists():
|
||||||
|
return []
|
||||||
|
try:
|
||||||
|
with open(self.data_file, 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
return [Case.from_dict(c) for c in data]
|
||||||
|
except (json.JSONDecodeError, IOError, KeyError, ValueError) as e:
|
||||||
|
# Corrupted JSON - create backup and raise exception
|
||||||
|
import shutil
|
||||||
|
from datetime import datetime
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
backup_file = self.app_dir / f"data.json.corrupted.{timestamp}"
|
||||||
|
try:
|
||||||
|
shutil.copy2(self.data_file, backup_file)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
# Raise exception with information about backup
|
||||||
|
raise RuntimeError(f"Data file is corrupted. Backup saved to: {backup_file}\nError: {e}")
|
||||||
|
|
||||||
|
def start_fresh(self):
|
||||||
|
"""Start with fresh data (for corrupted JSON recovery)"""
|
||||||
|
self.cases = []
|
||||||
|
demo_case = create_demo_case()
|
||||||
|
self.cases.append(demo_case)
|
||||||
|
self.save_data()
|
||||||
|
|
||||||
|
def save_data(self):
|
||||||
|
data = [c.to_dict() for c in self.cases]
|
||||||
|
# Write to temp file then rename for atomic-ish write
|
||||||
|
temp_file = self.data_file.with_suffix(".tmp")
|
||||||
|
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||||
|
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||||
|
temp_file.replace(self.data_file)
|
||||||
|
|
||||||
|
def add_case(self, case: Case):
|
||||||
|
self.cases.append(case)
|
||||||
|
self.save_data()
|
||||||
|
|
||||||
|
def get_case(self, case_id: str) -> Optional[Case]:
|
||||||
|
# Case ID lookup
|
||||||
|
for c in self.cases:
|
||||||
|
if c.case_id == case_id:
|
||||||
|
return c
|
||||||
|
return None
|
||||||
|
|
||||||
|
def delete_case(self, case_id: str):
|
||||||
|
self.cases = [c for c in self.cases if c.case_id != case_id]
|
||||||
|
self.save_data()
|
||||||
|
|
||||||
|
def delete_evidence(self, case_id: str, evidence_id: str):
|
||||||
|
case = self.get_case(case_id)
|
||||||
|
if case:
|
||||||
|
case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id]
|
||||||
|
self.save_data()
|
||||||
|
|
||||||
|
def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]:
|
||||||
|
for c in self.cases:
|
||||||
|
for e in c.evidence:
|
||||||
|
if e.evidence_id == evidence_id:
|
||||||
|
return c, e
|
||||||
|
return None, None
|
||||||
7
trace/tui/__init__.py
Normal file
7
trace/tui/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
"""TUI (Text User Interface) package for trace application"""
|
||||||
|
|
||||||
|
# Import from the main tui_app module for backward compatibility
|
||||||
|
# The tui_app.py file contains the main TUI class and run_tui function
|
||||||
|
from ..tui_app import run_tui, TUI
|
||||||
|
|
||||||
|
__all__ = ['run_tui', 'TUI']
|
||||||
5
trace/tui/handlers/__init__.py
Normal file
5
trace/tui/handlers/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""TUI handlers for various operations"""
|
||||||
|
|
||||||
|
from .export_handler import ExportHandler
|
||||||
|
|
||||||
|
__all__ = ['ExportHandler']
|
||||||
238
trace/tui/handlers/export_handler.py
Normal file
238
trace/tui/handlers/export_handler.py
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
"""Export functionality for TUI"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List, Tuple, Optional
|
||||||
|
|
||||||
|
from ...models import Note, Case, Evidence
|
||||||
|
|
||||||
|
|
||||||
|
class ExportHandler:
|
||||||
|
"""Handles exporting IOCs and notes to files"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def export_iocs_to_file(
|
||||||
|
iocs_with_counts: List[Tuple[str, int, str]],
|
||||||
|
active_case: Optional[Case],
|
||||||
|
active_evidence: Optional[Evidence],
|
||||||
|
get_iocs_func=None
|
||||||
|
) -> Tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Export IOCs to a text file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
iocs_with_counts: List of (ioc, count, type) tuples
|
||||||
|
active_case: Active case context
|
||||||
|
active_evidence: Active evidence context
|
||||||
|
get_iocs_func: Function to get IOCs for a list of notes
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (success: bool, message: str)
|
||||||
|
"""
|
||||||
|
if not iocs_with_counts:
|
||||||
|
return False, "No IOCs to export."
|
||||||
|
|
||||||
|
# Determine context for filename
|
||||||
|
if active_evidence:
|
||||||
|
context_name = f"{active_case.case_number}_{active_evidence.name}" if active_case else active_evidence.name
|
||||||
|
elif active_case:
|
||||||
|
context_name = active_case.case_number
|
||||||
|
else:
|
||||||
|
context_name = "unknown"
|
||||||
|
|
||||||
|
# Clean filename
|
||||||
|
context_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in context_name)
|
||||||
|
|
||||||
|
# Create exports directory if it doesn't exist
|
||||||
|
export_dir = Path.home() / ".trace" / "exports"
|
||||||
|
export_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Generate filename with timestamp
|
||||||
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"iocs_{context_name}_{timestamp}.txt"
|
||||||
|
filepath = export_dir / filename
|
||||||
|
|
||||||
|
# Build export content
|
||||||
|
lines = []
|
||||||
|
lines.append(f"# IOC Export - {context_name}")
|
||||||
|
lines.append(f"# Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if active_evidence:
|
||||||
|
# Evidence context - only evidence IOCs
|
||||||
|
lines.append(f"## Evidence: {active_evidence.name}")
|
||||||
|
lines.append("")
|
||||||
|
for ioc, count, ioc_type in iocs_with_counts:
|
||||||
|
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||||
|
elif active_case and get_iocs_func:
|
||||||
|
# Case context - show case IOCs + evidence IOCs with separators
|
||||||
|
# Get case notes IOCs
|
||||||
|
case_iocs = get_iocs_func(active_case.notes)
|
||||||
|
if case_iocs:
|
||||||
|
lines.append("## Case Notes")
|
||||||
|
lines.append("")
|
||||||
|
for ioc, count, ioc_type in case_iocs:
|
||||||
|
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Get IOCs from each evidence
|
||||||
|
for ev in active_case.evidence:
|
||||||
|
ev_iocs = get_iocs_func(ev.notes)
|
||||||
|
if ev_iocs:
|
||||||
|
lines.append(f"## Evidence: {ev.name}")
|
||||||
|
lines.append("")
|
||||||
|
for ioc, count, ioc_type in ev_iocs:
|
||||||
|
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Write to file
|
||||||
|
try:
|
||||||
|
with open(filepath, 'w', encoding='utf-8') as f:
|
||||||
|
f.write('\n'.join(lines))
|
||||||
|
return True, f"IOCs exported to: {filepath}"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Export failed: {str(e)}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def export_case_to_markdown(case: Case) -> Tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Export case (and all its evidence) to markdown
|
||||||
|
|
||||||
|
Args:
|
||||||
|
case: The case to export
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (success: bool, message: str)
|
||||||
|
"""
|
||||||
|
# Create exports directory if it doesn't exist
|
||||||
|
export_dir = Path.home() / ".trace" / "exports"
|
||||||
|
export_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Generate filename
|
||||||
|
case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number)
|
||||||
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"case_{case_name}_{timestamp}.md"
|
||||||
|
filepath = export_dir / filename
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(filepath, 'w', encoding='utf-8') as f:
|
||||||
|
f.write("# Forensic Notes Export\n\n")
|
||||||
|
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||||
|
|
||||||
|
# Write case info
|
||||||
|
f.write(f"## Case: {case.case_number}\n")
|
||||||
|
if case.name:
|
||||||
|
f.write(f"**Name:** {case.name}\n")
|
||||||
|
if case.investigator:
|
||||||
|
f.write(f"**Investigator:** {case.investigator}\n")
|
||||||
|
f.write(f"**Case ID:** {case.case_id}\n\n")
|
||||||
|
|
||||||
|
# Case notes
|
||||||
|
f.write("### Case Notes\n")
|
||||||
|
if not case.notes:
|
||||||
|
f.write("_No notes._\n")
|
||||||
|
for note in case.notes:
|
||||||
|
ExportHandler._write_note_markdown(f, note)
|
||||||
|
|
||||||
|
# Evidence
|
||||||
|
f.write("\n### Evidence\n")
|
||||||
|
if not case.evidence:
|
||||||
|
f.write("_No evidence._\n")
|
||||||
|
|
||||||
|
for ev in case.evidence:
|
||||||
|
f.write(f"#### Evidence: {ev.name}\n")
|
||||||
|
if ev.description:
|
||||||
|
f.write(f"_{ev.description}_\n")
|
||||||
|
f.write(f"**ID:** {ev.evidence_id}\n")
|
||||||
|
|
||||||
|
# Include source hash if available
|
||||||
|
source_hash = ev.metadata.get("source_hash")
|
||||||
|
if source_hash:
|
||||||
|
f.write(f"**Source Hash:** `{source_hash}`\n")
|
||||||
|
f.write("\n")
|
||||||
|
|
||||||
|
f.write("##### Evidence Notes\n")
|
||||||
|
if not ev.notes:
|
||||||
|
f.write("_No notes._\n")
|
||||||
|
for note in ev.notes:
|
||||||
|
ExportHandler._write_note_markdown(f, note)
|
||||||
|
f.write("\n")
|
||||||
|
|
||||||
|
return True, f"Case exported to: {filepath}"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Export failed: {str(e)}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def export_evidence_to_markdown(
|
||||||
|
evidence: Evidence,
|
||||||
|
case: Optional[Case]
|
||||||
|
) -> Tuple[bool, str]:
|
||||||
|
"""
|
||||||
|
Export evidence to markdown
|
||||||
|
|
||||||
|
Args:
|
||||||
|
evidence: The evidence to export
|
||||||
|
case: The parent case (for context)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (success: bool, message: str)
|
||||||
|
"""
|
||||||
|
# Create exports directory if it doesn't exist
|
||||||
|
export_dir = Path.home() / ".trace" / "exports"
|
||||||
|
export_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Generate filename
|
||||||
|
case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number) if case else "unknown"
|
||||||
|
ev_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in evidence.name)
|
||||||
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
filename = f"evidence_{case_name}_{ev_name}_{timestamp}.md"
|
||||||
|
filepath = export_dir / filename
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(filepath, 'w', encoding='utf-8') as f:
|
||||||
|
f.write("# Forensic Evidence Export\n\n")
|
||||||
|
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||||
|
|
||||||
|
# Case context
|
||||||
|
if case:
|
||||||
|
f.write(f"**Case:** {case.case_number}\n")
|
||||||
|
if case.name:
|
||||||
|
f.write(f"**Case Name:** {case.name}\n")
|
||||||
|
f.write("\n")
|
||||||
|
|
||||||
|
# Evidence info
|
||||||
|
f.write(f"## Evidence: {evidence.name}\n")
|
||||||
|
if evidence.description:
|
||||||
|
f.write(f"**Description:** {evidence.description}\n")
|
||||||
|
if evidence.metadata.get("source_hash"):
|
||||||
|
f.write(f"**Source Hash:** `{evidence.metadata['source_hash']}`\n")
|
||||||
|
f.write(f"**Evidence ID:** {evidence.evidence_id}\n\n")
|
||||||
|
|
||||||
|
# Notes
|
||||||
|
f.write("### Notes\n")
|
||||||
|
if not evidence.notes:
|
||||||
|
f.write("_No notes._\n")
|
||||||
|
for note in evidence.notes:
|
||||||
|
ExportHandler._write_note_markdown(f, note)
|
||||||
|
|
||||||
|
return True, f"Evidence exported to: {filepath}"
|
||||||
|
except Exception as e:
|
||||||
|
return False, f"Export failed: {str(e)}"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _write_note_markdown(f, note: Note):
|
||||||
|
"""Helper to write a note in markdown format"""
|
||||||
|
f.write(f"- **{time.ctime(note.timestamp)}**\n")
|
||||||
|
f.write(f" - Content: {note.content}\n")
|
||||||
|
if note.tags:
|
||||||
|
tags_str = " ".join([f"#{tag}" for tag in note.tags])
|
||||||
|
f.write(f" - Tags: {tags_str}\n")
|
||||||
|
f.write(f" - Hash: `{note.content_hash}`\n")
|
||||||
|
if note.signature:
|
||||||
|
f.write(" - **Signature Verified:**\n")
|
||||||
|
f.write(" ```\n")
|
||||||
|
for line in note.signature.splitlines():
|
||||||
|
f.write(f" {line}\n")
|
||||||
|
f.write(" ```\n")
|
||||||
|
f.write("\n")
|
||||||
6
trace/tui/rendering/__init__.py
Normal file
6
trace/tui/rendering/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Rendering utilities for TUI"""
|
||||||
|
|
||||||
|
from .colors import init_colors, ColorPairs
|
||||||
|
from .text_renderer import TextRenderer
|
||||||
|
|
||||||
|
__all__ = ['init_colors', 'ColorPairs', 'TextRenderer']
|
||||||
43
trace/tui/rendering/colors.py
Normal file
43
trace/tui/rendering/colors.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
"""Color pair initialization and constants for TUI"""
|
||||||
|
|
||||||
|
import curses
|
||||||
|
|
||||||
|
|
||||||
|
class ColorPairs:
|
||||||
|
"""Color pair constants"""
|
||||||
|
SELECTION = 1 # Black on cyan
|
||||||
|
SUCCESS = 2 # Green on black
|
||||||
|
WARNING = 3 # Yellow on black
|
||||||
|
ERROR = 4 # Red on black
|
||||||
|
HEADER = 5 # Cyan on black
|
||||||
|
METADATA = 6 # White on black
|
||||||
|
BORDER = 7 # Blue on black
|
||||||
|
TAG = 8 # Magenta on black
|
||||||
|
IOC_SELECTED = 9 # Red on cyan
|
||||||
|
TAG_SELECTED = 10 # Yellow on cyan
|
||||||
|
|
||||||
|
|
||||||
|
def init_colors():
|
||||||
|
"""Initialize color pairs for the TUI"""
|
||||||
|
curses.start_color()
|
||||||
|
if curses.has_colors():
|
||||||
|
# Selection / Highlight
|
||||||
|
curses.init_pair(ColorPairs.SELECTION, curses.COLOR_BLACK, curses.COLOR_CYAN)
|
||||||
|
# Success / Active indicators
|
||||||
|
curses.init_pair(ColorPairs.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
||||||
|
# Info / Warnings
|
||||||
|
curses.init_pair(ColorPairs.WARNING, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
||||||
|
# Errors / Critical / IOCs
|
||||||
|
curses.init_pair(ColorPairs.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
|
||||||
|
# Headers / Titles (bright cyan)
|
||||||
|
curses.init_pair(ColorPairs.HEADER, curses.COLOR_CYAN, curses.COLOR_BLACK)
|
||||||
|
# Metadata / Secondary text (dim)
|
||||||
|
curses.init_pair(ColorPairs.METADATA, curses.COLOR_WHITE, curses.COLOR_BLACK)
|
||||||
|
# Borders / Separators (blue)
|
||||||
|
curses.init_pair(ColorPairs.BORDER, curses.COLOR_BLUE, curses.COLOR_BLACK)
|
||||||
|
# Tags (magenta)
|
||||||
|
curses.init_pair(ColorPairs.TAG, curses.COLOR_MAGENTA, curses.COLOR_BLACK)
|
||||||
|
# IOCs on selected background (red on cyan)
|
||||||
|
curses.init_pair(ColorPairs.IOC_SELECTED, curses.COLOR_RED, curses.COLOR_CYAN)
|
||||||
|
# Tags on selected background (yellow on cyan)
|
||||||
|
curses.init_pair(ColorPairs.TAG_SELECTED, curses.COLOR_YELLOW, curses.COLOR_CYAN)
|
||||||
137
trace/tui/rendering/text_renderer.py
Normal file
137
trace/tui/rendering/text_renderer.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
"""Text rendering utilities with highlighting support"""
|
||||||
|
|
||||||
|
import curses
|
||||||
|
import re
|
||||||
|
from ...models import Note
|
||||||
|
from .colors import ColorPairs
|
||||||
|
|
||||||
|
|
||||||
|
class TextRenderer:
|
||||||
|
"""Utility class for rendering text with highlights"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def safe_truncate(text, max_width, ellipsis="..."):
|
||||||
|
"""
|
||||||
|
Safely truncate text to fit within max_width, handling Unicode characters.
|
||||||
|
Uses a conservative approach to avoid curses display errors.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Try to fit the text as-is
|
||||||
|
if len(text) <= max_width:
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Need to truncate - account for ellipsis
|
||||||
|
if max_width <= len(ellipsis):
|
||||||
|
return ellipsis[:max_width]
|
||||||
|
|
||||||
|
# Truncate conservatively (character by character) to handle multi-byte UTF-8
|
||||||
|
target_len = max_width - len(ellipsis)
|
||||||
|
truncated = text[:target_len]
|
||||||
|
|
||||||
|
# Encode and check actual byte length to be safe with UTF-8
|
||||||
|
# If it's too long, trim further
|
||||||
|
while len(truncated) > 0:
|
||||||
|
try:
|
||||||
|
# Test if this will fit when displayed
|
||||||
|
test_str = truncated + ellipsis
|
||||||
|
if len(test_str) <= max_width:
|
||||||
|
return test_str
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
# Trim one more character
|
||||||
|
truncated = truncated[:-1]
|
||||||
|
|
||||||
|
return ellipsis[:max_width]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def display_line_with_highlights(screen, y, x_start, line, is_selected=False):
|
||||||
|
"""
|
||||||
|
Display a line with intelligent highlighting.
|
||||||
|
- IOCs are highlighted with ColorPairs.ERROR (red)
|
||||||
|
- Tags are highlighted with ColorPairs.WARNING (yellow)
|
||||||
|
- Selection background is ColorPairs.SELECTION (cyan) for non-IOC text
|
||||||
|
- IOC highlighting takes priority over selection
|
||||||
|
"""
|
||||||
|
# Extract IOCs and tags
|
||||||
|
highlights = []
|
||||||
|
|
||||||
|
# Get IOCs with positions
|
||||||
|
for text, start, end, ioc_type in Note.extract_iocs_with_positions(line):
|
||||||
|
highlights.append((text, start, end, 'ioc'))
|
||||||
|
|
||||||
|
# Get tags
|
||||||
|
for match in re.finditer(r'#\w+', line):
|
||||||
|
highlights.append((match.group(), match.start(), match.end(), 'tag'))
|
||||||
|
|
||||||
|
# Sort by position and remove overlaps (IOCs take priority over tags)
|
||||||
|
highlights.sort(key=lambda x: x[1])
|
||||||
|
deduplicated = []
|
||||||
|
last_end = -1
|
||||||
|
for text, start, end, htype in highlights:
|
||||||
|
if start >= last_end:
|
||||||
|
deduplicated.append((text, start, end, htype))
|
||||||
|
last_end = end
|
||||||
|
highlights = deduplicated
|
||||||
|
|
||||||
|
if not highlights:
|
||||||
|
# No highlights - use selection color if selected
|
||||||
|
if is_selected:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
screen.addstr(y, x_start, line)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
else:
|
||||||
|
screen.addstr(y, x_start, line)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Display with intelligent highlighting
|
||||||
|
x_pos = x_start
|
||||||
|
last_pos = 0
|
||||||
|
|
||||||
|
for text, start, end, htype in highlights:
|
||||||
|
# Add text before this highlight
|
||||||
|
if start > last_pos:
|
||||||
|
text_before = line[last_pos:start]
|
||||||
|
if is_selected:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
screen.addstr(y, x_pos, text_before)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
else:
|
||||||
|
screen.addstr(y, x_pos, text_before)
|
||||||
|
x_pos += len(text_before)
|
||||||
|
|
||||||
|
# Add highlighted text
|
||||||
|
if htype == 'ioc':
|
||||||
|
# IOC highlighting: red on cyan if selected, red on black otherwise
|
||||||
|
if is_selected:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD)
|
||||||
|
screen.addstr(y, x_pos, text)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD)
|
||||||
|
else:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD)
|
||||||
|
screen.addstr(y, x_pos, text)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD)
|
||||||
|
else: # tag
|
||||||
|
# Tag highlighting: yellow on cyan if selected, yellow on black otherwise
|
||||||
|
if is_selected:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.TAG_SELECTED))
|
||||||
|
screen.addstr(y, x_pos, text)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.TAG_SELECTED))
|
||||||
|
else:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.WARNING))
|
||||||
|
screen.addstr(y, x_pos, text)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.WARNING))
|
||||||
|
|
||||||
|
x_pos += len(text)
|
||||||
|
last_pos = end
|
||||||
|
|
||||||
|
# Add remaining text
|
||||||
|
if last_pos < len(line):
|
||||||
|
text_after = line[last_pos:]
|
||||||
|
if is_selected:
|
||||||
|
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
screen.addstr(y, x_pos, text_after)
|
||||||
|
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||||
|
else:
|
||||||
|
screen.addstr(y, x_pos, text_after)
|
||||||
@@ -2213,9 +2213,9 @@ class TUI:
|
|||||||
options = ["GPG Signing", "Select GPG Key", "Save", "Cancel"]
|
options = ["GPG Signing", "Select GPG Key", "Save", "Cancel"]
|
||||||
|
|
||||||
curses.curs_set(0)
|
curses.curs_set(0)
|
||||||
h = 12
|
h = 15 # Increased from 12 to properly show all 4 options + footer
|
||||||
w = 60
|
w = 60
|
||||||
y = self.height // 2 - 6
|
y = self.height // 2 - 7 # Adjusted to keep centered
|
||||||
x = (self.width - w) // 2
|
x = (self.width - w) // 2
|
||||||
|
|
||||||
win = curses.newwin(h, w, y, x)
|
win = curses.newwin(h, w, y, x)
|
||||||
Reference in New Issue
Block a user