mirror of
https://github.com/overcuriousity/trace.git
synced 2025-12-20 21:12:22 +00:00
Compare commits
62 Commits
v0.1.1-alp
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37b6503b29 | ||
|
|
f54e2560f3 | ||
|
|
a2e7798a2d | ||
|
|
15bc00a195 | ||
|
|
053369df78 | ||
|
|
eca56c0d54 | ||
|
|
06b7680982 | ||
|
|
bfefb42761 | ||
|
|
070e76467c | ||
|
|
b80dd10901 | ||
|
|
fe3c0710c6 | ||
|
|
809a4a498f | ||
|
|
931e5debc8 | ||
|
|
f91f434f7f | ||
|
|
85ca483a1d | ||
|
|
f50fd1800d | ||
|
|
b830d15d85 | ||
|
|
4a4e1e7c06 | ||
|
|
2a7d00d221 | ||
|
|
c68fc66de6 | ||
|
|
f68c8389da | ||
|
|
50ffeb1b6e | ||
|
|
d6b8231bae | ||
|
|
8b13cfc37b | ||
|
|
62fa781350 | ||
|
|
f4f276160a | ||
|
|
33cad5bd5f | ||
|
|
4fad8a3561 | ||
|
|
48525fe505 | ||
|
|
085c9e9aa8 | ||
|
|
06548df373 | ||
|
|
dff27ac7e4 | ||
|
|
a1f95548fd | ||
|
|
425a169217 | ||
|
|
1598b16b85 | ||
|
|
90a82dc0d3 | ||
|
|
9248799e79 | ||
|
|
96309319b9 | ||
|
|
6e4bb9b265 | ||
|
|
d3e3383fc6 | ||
|
|
eec759aafb | ||
|
|
b6387f4b0c | ||
|
|
09729ee7a3 | ||
|
|
68834858e0 | ||
|
|
5fdf6d0aba | ||
|
|
2453bd4f2a | ||
|
|
ba7a8fdd5d | ||
|
|
107feaf560 | ||
|
|
d97207633b | ||
|
|
7df42cb811 | ||
|
|
71ae0eef35 | ||
|
|
d94901a41d | ||
|
|
ec5a3d9f31 | ||
|
|
ac7e442970 | ||
|
|
b973aa1009 | ||
|
|
461da25c93 | ||
|
|
b61b818952 | ||
|
|
fa90aeb063 | ||
|
|
e38b018e41 | ||
|
|
3c53969b45 | ||
|
|
a829275ce0 | ||
|
|
e59f7be3e4 |
144
CLAUDE.md
144
CLAUDE.md
@@ -9,20 +9,75 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
||||
## Development Commands
|
||||
|
||||
### Running the Application
|
||||
|
||||
#### Launching TUI
|
||||
```bash
|
||||
# Run directly from source
|
||||
# Launch TUI (default behavior)
|
||||
python3 main.py
|
||||
|
||||
# Quick CLI note addition (requires active case/evidence set in TUI)
|
||||
python3 main.py "Your note content here"
|
||||
|
||||
# Export to markdown
|
||||
python3 main.py --export --output report.md
|
||||
|
||||
# Open TUI directly at active case/evidence
|
||||
python3 main.py --open
|
||||
```
|
||||
|
||||
#### Context Management
|
||||
```bash
|
||||
# Show current active case and evidence
|
||||
python3 main.py --show-context
|
||||
|
||||
# List all cases and evidence in hierarchy
|
||||
python3 main.py --list
|
||||
|
||||
# Switch active case (by case number or UUID)
|
||||
python3 main.py --switch-case 2024-001
|
||||
|
||||
# Switch active evidence (by name or UUID, within active case)
|
||||
python3 main.py --switch-evidence "disk-image-1"
|
||||
```
|
||||
|
||||
#### Creating Cases and Evidence
|
||||
```bash
|
||||
# Create new case (automatically sets as active)
|
||||
python3 main.py --new-case 2024-001
|
||||
|
||||
# Create case with metadata
|
||||
python3 main.py --new-case 2024-001 --name "Ransomware Investigation" --investigator "John Doe"
|
||||
|
||||
# Create evidence in active case (automatically sets as active)
|
||||
python3 main.py --new-evidence "Laptop HDD"
|
||||
|
||||
# Create evidence with description
|
||||
python3 main.py --new-evidence "Server Logs" --description "Apache access logs from compromised server"
|
||||
```
|
||||
|
||||
#### Adding Notes
|
||||
```bash
|
||||
# Quick note to active context
|
||||
python3 main.py "Observed suspicious process at 14:32"
|
||||
|
||||
# Read note from stdin (for piping command output)
|
||||
echo "Network spike detected" | python3 main.py --stdin
|
||||
ps aux | grep malware | python3 main.py --stdin
|
||||
tail -f logfile.txt | grep error | python3 main.py --stdin
|
||||
|
||||
# Add note to specific case without changing active context
|
||||
python3 main.py --case 2024-002 "Found malware in temp folder"
|
||||
|
||||
# Add note to specific evidence without changing active context
|
||||
python3 main.py --evidence "disk-image-2" "Bad sectors detected"
|
||||
|
||||
# Add note to specific case and evidence (both overrides)
|
||||
python3 main.py --case 2024-001 --evidence "Laptop HDD" "Recovered deleted files"
|
||||
```
|
||||
|
||||
#### Export
|
||||
```bash
|
||||
# Export all data to markdown
|
||||
python3 main.py --export --output report.md
|
||||
|
||||
# Export with default filename (trace_export.md)
|
||||
python3 main.py --export
|
||||
```
|
||||
|
||||
### Building Binary
|
||||
```bash
|
||||
# Install dependencies first
|
||||
@@ -52,18 +107,30 @@ The application uses a three-level hierarchy:
|
||||
|
||||
Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy.
|
||||
|
||||
### Core Modules
|
||||
### Modular Structure (Optimized for AI Coding Agents)
|
||||
|
||||
**`trace/models.py`**: Data models using dataclasses
|
||||
- `Note`: Content + timestamp + SHA256 hash + optional GPG signature + auto-extracted tags/IOCs
|
||||
- `Evidence`: Container for notes about a specific piece of evidence, includes metadata dict for source hashes
|
||||
- `Case`: Top-level container with case number, investigator, evidence list, and notes
|
||||
The codebase is organized into focused, single-responsibility modules to make it easier for AI agents and developers to navigate, understand, and modify specific functionality:
|
||||
|
||||
**`trace/models/`**: Data models package
|
||||
- `__init__.py`: Main model classes (Note, Evidence, Case) with dataclass definitions
|
||||
- `extractors/tag_extractor.py`: Tag extraction logic (hashtag parsing)
|
||||
- `extractors/ioc_extractor.py`: IOC extraction logic (IPs, domains, URLs, hashes, emails)
|
||||
- All models implement `to_dict()`/`from_dict()` for JSON serialization
|
||||
- Models use extractors for automatic tag and IOC detection
|
||||
|
||||
**`trace/storage.py`**: Persistence layer
|
||||
- `Storage`: Manages `~/.trace/data.json` with atomic writes (temp file + rename)
|
||||
- `StateManager`: Manages `~/.trace/state` (active case/evidence) and `~/.trace/settings.json` (PGP enabled/disabled)
|
||||
- Data is loaded into memory on init, modified, then saved atomically
|
||||
**`trace/storage_impl/`**: Storage implementation package
|
||||
- `storage.py`: Main Storage class managing `~/.trace/data.json` with atomic writes
|
||||
- `state_manager.py`: StateManager for active context and settings persistence
|
||||
- `lock_manager.py`: Cross-platform file locking to prevent concurrent access
|
||||
- `demo_data.py`: Demo case creation for first-time users
|
||||
- Backward compatible via `trace/storage.py` wrapper
|
||||
|
||||
**`trace/tui/`**: Text User Interface package
|
||||
- `tui.py`: Main TUI class with view hierarchy and event loop (3307 lines - target for future refactoring)
|
||||
- `rendering/colors.py`: Color pair initialization and constants
|
||||
- `rendering/text_renderer.py`: Text rendering with IOC/tag highlighting
|
||||
- `handlers/export_handler.py`: Export functionality (IOCs, markdown reports)
|
||||
- Future refactoring will extract views, dialogs, and input handlers
|
||||
|
||||
**`trace/crypto.py`**: Integrity features
|
||||
- `sign_content()`: GPG clearsign via subprocess (falls back gracefully if GPG unavailable)
|
||||
@@ -74,17 +141,18 @@ Each level has unique IDs (UUIDs) for reliable lookups across the hierarchy.
|
||||
- `export_markdown()`: Generates full case report with hashes and signatures
|
||||
- `main()`: Argument parsing, routes to TUI or CLI functions
|
||||
|
||||
**`trace/tui.py`**: Curses-based Text User Interface
|
||||
- View hierarchy: case_list → case_detail → evidence_detail
|
||||
- Additional views: tags_list, tag_notes_list, ioc_list, ioc_notes_list, note_detail
|
||||
- Multi-line note editor with Ctrl+G to submit, Esc to cancel
|
||||
- Filter mode (press `/`), active context management (press `a`)
|
||||
- All note additions automatically extract tags (#hashtag) and IOCs (IPs, domains, URLs, hashes, emails)
|
||||
|
||||
### Key Features Implementation
|
||||
|
||||
**Integrity System**: Every note automatically gets:
|
||||
1. SHA256 hash of `timestamp:content` (via `Note.calculate_hash()`)
|
||||
- **Timestamp Format**: Unix epoch timestamp as float (seconds since 1970-01-01 00:00:00 UTC)
|
||||
- **Hash Input Format**: `"{timestamp}:{content}"` where timestamp is converted to string using Python's default str() conversion
|
||||
- **Example**: For content "Suspicious process detected" with timestamp 1702345678.123456, the hash input is:
|
||||
```
|
||||
1702345678.123456:Suspicious process detected
|
||||
```
|
||||
- This ensures integrity of both WHAT was said (content) and WHEN it was said (timestamp)
|
||||
- The exact float precision is preserved in the hash, making timestamps forensically tamper-evident
|
||||
2. Optional GPG clearsign signature (if `pgp_enabled` in settings and GPG available)
|
||||
|
||||
**Tag System**: Regex-based hashtag extraction (`#word`)
|
||||
@@ -129,3 +197,33 @@ temp_file.replace(self.data_file)
|
||||
## Testing Notes
|
||||
|
||||
Tests use temporary directories created with `tempfile.mkdtemp()` and cleaned up in `tearDown()` to avoid polluting `~/.trace/`.
|
||||
|
||||
## AI Agent Optimization
|
||||
|
||||
The codebase has been restructured to be optimal for AI coding agents:
|
||||
|
||||
### Module Organization Benefits
|
||||
- **Focused Files**: Each module has a single, clear responsibility (50-250 lines typically)
|
||||
- **Easy Navigation**: Functionality is easy to locate by purpose (e.g., IOC extraction, export handlers)
|
||||
- **Independent Modification**: Changes to one module rarely affect others
|
||||
- **Clear Interfaces**: Modules communicate through well-defined imports
|
||||
- **Reduced Context**: AI agents can focus on relevant files without loading massive monoliths
|
||||
|
||||
### File Size Guidelines
|
||||
- **Small modules** (< 150 lines): Ideal for focused tasks
|
||||
- **Medium modules** (150-300 lines): Acceptable for cohesive functionality
|
||||
- **Large modules** (> 500 lines): Consider refactoring into smaller components
|
||||
- **Very large modules** (> 1000 lines): Priority target for extraction and modularization
|
||||
|
||||
### Current Status
|
||||
- ✅ Models: Organized into package with extractors separated
|
||||
- ✅ Storage: Split into focused modules (storage, state, locking, demo data)
|
||||
- ✅ TUI Utilities: Rendering and export handlers extracted
|
||||
- ⏳ TUI Main: Still monolithic (3307 lines) - future refactoring needed
|
||||
|
||||
### Future Refactoring Targets
|
||||
The `trace/tui.py` file (3307 lines) should be further split into:
|
||||
- `tui/views/` - Individual view classes (case list, evidence detail, etc.)
|
||||
- `tui/dialogs/` - Dialog functions (input, confirm, settings, etc.)
|
||||
- `tui/handlers/` - Input and navigation handlers
|
||||
- `tui/app.py` - Main TUI orchestration class
|
||||
|
||||
282
README.md
282
README.md
@@ -20,7 +20,77 @@ trace "IR team gained shell access. Initial persistence checks running."
|
||||
trace "Observed outbound connection to 192.168.1.55 on port 80. #suspicious #network"
|
||||
```
|
||||
|
||||
**System Integrity Chain:** Each command-line note is immediately stamped, concatenated with its content, and hashed using SHA256 before storage. This ensures a non-repudiable log entry.
|
||||
**System Integrity Chain:** Each command-line note is immediately stamped with a Unix epoch timestamp (seconds since 1970-01-01 00:00:00 UTC as float, e.g., `1702345678.123456`), concatenated with its content in the format `"{timestamp}:{content}"`, and hashed using SHA256 before storage. This ensures a non-repudiable log entry with forensically tamper-evident timestamps.
|
||||
|
||||
## CLI Command Reference
|
||||
|
||||
### Context Management
|
||||
|
||||
View and switch between cases and evidence without opening the TUI:
|
||||
|
||||
```bash
|
||||
# Show current active case and evidence
|
||||
trace --show-context
|
||||
|
||||
# List all cases and evidence in hierarchy
|
||||
trace --list
|
||||
|
||||
# Switch active case (by case number or UUID)
|
||||
trace --switch-case 2024-001
|
||||
|
||||
# Switch active evidence (by name or UUID, within active case)
|
||||
trace --switch-evidence "disk-image-1"
|
||||
```
|
||||
|
||||
### Case and Evidence Creation
|
||||
|
||||
Create new cases and evidence directly from the command line:
|
||||
|
||||
```bash
|
||||
# Create new case (automatically becomes active)
|
||||
trace --new-case 2024-001
|
||||
|
||||
# Create case with full metadata
|
||||
trace --new-case 2024-001 --name "Ransomware Investigation" --investigator "Jane Doe"
|
||||
|
||||
# Create evidence in active case (automatically becomes active)
|
||||
trace --new-evidence "Laptop HDD"
|
||||
|
||||
# Create evidence with description
|
||||
trace --new-evidence "Server Logs" --description "Apache logs from compromised server"
|
||||
```
|
||||
|
||||
### Advanced Note-Taking
|
||||
|
||||
Beyond basic hot logging, trace supports stdin piping and context overrides:
|
||||
|
||||
```bash
|
||||
# Pipe command output directly into notes
|
||||
ps aux | grep malware | trace --stdin
|
||||
tail -f /var/log/auth.log | grep "Failed password" | trace --stdin
|
||||
netstat -an | trace --stdin
|
||||
|
||||
# Add note to specific case without changing active context
|
||||
trace --case 2024-002 "Found malware in temp folder"
|
||||
|
||||
# Add note to specific evidence without changing active context
|
||||
trace --evidence "Memory Dump" "Suspicious process identified"
|
||||
|
||||
# Override both case and evidence for a single note
|
||||
trace --case 2024-001 --evidence "Disk Image" "Recovered deleted files"
|
||||
```
|
||||
|
||||
**Identifiers:** All commands accept both human-friendly identifiers (case numbers like `2024-001`, evidence names like `Laptop HDD`) and UUIDs. Use `--list` to see available identifiers.
|
||||
|
||||
### Export
|
||||
|
||||
```bash
|
||||
# Export all data to markdown (GPG-signed if enabled)
|
||||
trace --export --output investigation-report.md
|
||||
|
||||
# Export with default filename (trace_export.md)
|
||||
trace --export
|
||||
```
|
||||
|
||||
## Installation & Deployment
|
||||
|
||||
@@ -125,12 +195,220 @@ After this, you can log with just: `t "Your note here"`
|
||||
|
||||
| Feature | Description | Operational Impact |
|
||||
| :--- | :--- | :--- |
|
||||
| **Integrity Hashing** | SHA256 applied to every log entry (content + timestamp). | **Guaranteed log integrity.** No modification possible post-entry. |
|
||||
| **Integrity Hashing** | SHA256 applied to every log entry using format `"{unix_timestamp}:{content}"`. Timestamp is Unix epoch as float (e.g., `1702345678.123456`). | **Guaranteed log integrity.** No modification possible post-entry. Timestamps are forensically tamper-evident with full float precision. |
|
||||
| **GPG Signing** | Optional PGP/GPG signature applied to notes. | **Non-repudiation** for formal evidence handling. |
|
||||
| **IOC Extraction** | Automatic parsing of IPv4, FQDNs, URLs, hashes, and email addresses. | **Immediate intelligence gathering** from raw text. |
|
||||
| **Tag System** | Supports `#hashtags` for classification and filtering. | **Efficient triage** of large log sets. |
|
||||
| **Minimal Footprint** | Built solely on Python standard library modules. | **Maximum portability** on restricted forensic environments. |
|
||||
|
||||
## Cryptographic Integrity & Chain of Custody
|
||||
|
||||
`trace` implements a dual-layer cryptographic system designed for legal admissibility and forensic integrity:
|
||||
|
||||
### Layer 1: Note-Level Integrity (Always Active)
|
||||
|
||||
**Process:**
|
||||
1. **Timestamp Generation** - Precise Unix epoch timestamp (float) captured at note creation
|
||||
- Format: Seconds since 1970-01-01 00:00:00 UTC (e.g., `1702345678.123456`)
|
||||
- Full float precision preserved for forensic tamper-evidence
|
||||
2. **Content Hashing** - SHA256 hash computed from `"{timestamp}:{content}"`
|
||||
3. **Optional Signature** - Hash is signed with investigator's GPG private key
|
||||
|
||||
**Mathematical Representation:**
|
||||
```
|
||||
timestamp = Unix epoch time as float (e.g., 1702345678.123456)
|
||||
hash_input = "{timestamp}:{content}"
|
||||
hash = SHA256(hash_input)
|
||||
signature = GPG_Sign(hash, private_key)
|
||||
```
|
||||
|
||||
**Example:**
|
||||
```
|
||||
Content: "Suspicious process detected"
|
||||
Timestamp: 1702345678.123456
|
||||
Hash input: "1702345678.123456:Suspicious process detected"
|
||||
Hash: SHA256 of above = a3f5b2c8d9e1f4a7b6c3d8e2f5a9b4c7d1e6f3a8b5c2d9e4f7a1b8c6d3e0f5a2
|
||||
```
|
||||
|
||||
**Security Properties:**
|
||||
- **Temporal Integrity**: Timestamp is cryptographically bound to content (cannot backdate notes)
|
||||
- **Tamper Detection**: Any modification to content or timestamp invalidates the hash
|
||||
- **Non-Repudiation**: GPG signature proves who created the note (if signing enabled)
|
||||
- **Hash Reproducibility**: Exported markdown includes Unix timestamp for independent verification
|
||||
- **Efficient Storage**: Signing only the hash (64 hex chars) instead of full content
|
||||
|
||||
### Layer 2: Export-Level Integrity (On Demand)
|
||||
|
||||
When exporting to markdown (`--export`), the **entire export document** is GPG-signed if signing is enabled.
|
||||
|
||||
**Process:**
|
||||
1. Generate complete markdown export with all cases, evidence, and notes
|
||||
2. Individual note signatures are preserved within the export
|
||||
3. Entire document is clearsigned with GPG
|
||||
|
||||
**Security Properties:**
|
||||
- **Document Integrity**: Proves export hasn't been modified after generation
|
||||
- **Dual Verification**: Both individual notes AND complete document can be verified
|
||||
- **Chain of Custody**: Establishes provenance from evidence collection through report generation
|
||||
|
||||
### First-Run GPG Setup
|
||||
|
||||
On first launch, `trace` runs an interactive wizard to configure GPG signing:
|
||||
|
||||
1. **GPG Detection** - Checks if GPG is installed (gracefully continues without if missing)
|
||||
2. **Key Selection** - Lists available secret keys from your GPG keyring
|
||||
3. **Configuration** - Saves selected key ID to `~/.trace/settings.json`
|
||||
|
||||
**If GPG is not available:**
|
||||
- Application continues to function normally
|
||||
- Notes are hashed (SHA256) but not signed
|
||||
- You can enable GPG later by editing `~/.trace/settings.json`
|
||||
|
||||
### Verification Workflows
|
||||
|
||||
#### Internal Verification (Within trace TUI)
|
||||
|
||||
The TUI automatically verifies signatures and displays status symbols:
|
||||
- `✓` - Signature verified with public key in keyring
|
||||
- `✗` - Signature verification failed (tampered or missing key)
|
||||
- `?` - Note is unsigned
|
||||
|
||||
**To verify a specific note:**
|
||||
1. Navigate to the note in TUI
|
||||
2. Press `Enter` to view note details
|
||||
3. Press `v` to see detailed verification information
|
||||
|
||||
#### External Verification (Manual/Court)
|
||||
|
||||
**Scenario**: Forensic investigator sends evidence to court/auditor
|
||||
|
||||
**Step 1 - Investigator exports evidence:**
|
||||
```bash
|
||||
# Export all notes with signatures
|
||||
trace --export --output investigation-2024-001.md
|
||||
|
||||
# Export public key for verification
|
||||
gpg --armor --export investigator@agency.gov > investigator-pubkey.asc
|
||||
|
||||
# Send both files to recipient
|
||||
```
|
||||
|
||||
**Step 2 - Recipient verifies document:**
|
||||
```bash
|
||||
# Import investigator's public key
|
||||
gpg --import investigator-pubkey.asc
|
||||
|
||||
# Verify entire export document
|
||||
gpg --verify investigation-2024-001.md
|
||||
```
|
||||
|
||||
**Expected output if valid:**
|
||||
```
|
||||
gpg: Signature made Mon Dec 13 14:23:45 2024
|
||||
gpg: using RSA key ABC123DEF456
|
||||
gpg: Good signature from "John Investigator <investigator@agency.gov>"
|
||||
```
|
||||
|
||||
**Step 3 - Verify individual notes (optional):**
|
||||
|
||||
Individual note signatures are embedded in the markdown export. To verify a specific note:
|
||||
|
||||
1. Open `investigation-2024-001.md` in a text editor
|
||||
2. Locate the note's signature block:
|
||||
```
|
||||
- **GPG Signature of Hash:**
|
||||
```
|
||||
-----BEGIN PGP SIGNED MESSAGE-----
|
||||
Hash: SHA256
|
||||
|
||||
a3f5b2c8d9e1f4a7b6c3d8e2f5a9b4c7d1e6f3a8b5c2d9e4f7a1b8c6d3e0f5a2
|
||||
-----BEGIN PGP SIGNATURE-----
|
||||
...
|
||||
-----END PGP SIGNATURE-----
|
||||
```
|
||||
3. Extract the signature block (from `-----BEGIN PGP SIGNED MESSAGE-----` to `-----END PGP SIGNATURE-----`)
|
||||
4. Save to a file and verify:
|
||||
```bash
|
||||
cat > note-signature.txt
|
||||
<paste signature block>
|
||||
Ctrl+D
|
||||
|
||||
gpg --verify note-signature.txt
|
||||
```
|
||||
|
||||
**What gets verified:**
|
||||
- The SHA256 hash proves the note content and timestamp haven't changed
|
||||
- The GPG signature proves who created that hash
|
||||
- Together: Proves this specific content was created by this investigator at this time
|
||||
|
||||
**Hash Verification (Manual):**
|
||||
|
||||
To independently verify a note's hash from the markdown export:
|
||||
|
||||
1. Locate the note in the export file and extract:
|
||||
- Unix Timestamp (e.g., `1702345678.123456`)
|
||||
- Content (e.g., `"Suspicious process detected"`)
|
||||
- Claimed Hash (e.g., `a3f5b2c8...`)
|
||||
|
||||
2. Recompute the hash:
|
||||
```bash
|
||||
# Using Python
|
||||
python3 -c "import hashlib; print(hashlib.sha256(b'1702345678.123456:Suspicious process detected').hexdigest())"
|
||||
|
||||
# Using command-line tools
|
||||
echo -n "1702345678.123456:Suspicious process detected" | sha256sum
|
||||
```
|
||||
|
||||
3. Compare the computed hash with the claimed hash - they must match exactly
|
||||
|
||||
### Cryptographic Trust Model
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────┐
|
||||
│ Note Creation (Investigator) │
|
||||
├─────────────────────────────────────────────────────────┤
|
||||
│ 1. Content: "Malware detected on host-192.168.1.50" │
|
||||
│ 2. Timestamp: 1702483425.123456 │
|
||||
│ 3. Hash: SHA256(timestamp:content) │
|
||||
│ → a3f5b2c8d9e1f4a7b6c3d8e2f5a9b4c7... │
|
||||
│ 4. Signature: GPG_Sign(hash, private_key) │
|
||||
└─────────────────────────────────────────────────────────┘
|
||||
↓
|
||||
┌─────────────────────────────────────────────────────────┐
|
||||
│ Export Generation │
|
||||
├─────────────────────────────────────────────────────────┤
|
||||
│ 1. Build markdown with all notes + individual sigs │
|
||||
│ 2. Sign entire document: GPG_Sign(document) │
|
||||
└─────────────────────────────────────────────────────────┘
|
||||
↓
|
||||
┌─────────────────────────────────────────────────────────┐
|
||||
│ Verification (Court/Auditor) │
|
||||
├─────────────────────────────────────────────────────────┤
|
||||
│ 1. Import investigator's public key │
|
||||
│ 2. Verify document signature → Proves export integrity │
|
||||
│ 3. Verify individual notes → Proves note authenticity │
|
||||
│ 4. Recompute hashes → Proves content hasn't changed │
|
||||
└─────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Security Considerations
|
||||
|
||||
**What is protected:**
|
||||
- ✓ Content integrity (hash detects any modification)
|
||||
- ✓ Temporal integrity (timestamp cryptographically bound)
|
||||
- ✓ Attribution (signature proves who created it)
|
||||
- ✓ Export completeness (document signature proves no additions/removals)
|
||||
|
||||
**What is NOT protected:**
|
||||
- ✗ Note deletion (signatures can't prevent removal from database)
|
||||
- ✗ Selective disclosure (investigator can choose which notes to export)
|
||||
- ✗ Sequential ordering (signatures are per-note, not chained)
|
||||
|
||||
**Trust Dependencies:**
|
||||
- You must trust the investigator's GPG key (verify fingerprint out-of-band)
|
||||
- You must trust the investigator's system clock was accurate
|
||||
- You must trust the investigator didn't destroy contradictory evidence
|
||||
|
||||
## TUI Reference (Management Console)
|
||||
|
||||
Execute `trace` (no arguments) to enter the Text User Interface. This environment is used for setup, review, and reporting.
|
||||
|
||||
463
trace/cli.py
463
trace/cli.py
@@ -1,36 +1,252 @@
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from .models import Note, Case
|
||||
from typing import Optional, Tuple
|
||||
from .models import Note, Case, Evidence
|
||||
from .storage import Storage, StateManager
|
||||
from .crypto import Crypto
|
||||
|
||||
def quick_add_note(content: str):
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
state = state_manager.get_active()
|
||||
settings = state_manager.get_settings()
|
||||
def find_case(storage: Storage, identifier: str) -> Optional[Case]:
|
||||
"""Find a case by case_id (UUID) or case_number."""
|
||||
for case in storage.cases:
|
||||
if case.case_id == identifier or case.case_number == identifier:
|
||||
return case
|
||||
return None
|
||||
|
||||
def find_evidence(case: Case, identifier: str) -> Optional[Evidence]:
|
||||
"""Find evidence by evidence_id (UUID) or name within a case."""
|
||||
for evidence in case.evidence:
|
||||
if evidence.evidence_id == identifier or evidence.name == identifier:
|
||||
return evidence
|
||||
return None
|
||||
|
||||
def show_context():
|
||||
"""Display the current active context."""
|
||||
state_manager = StateManager()
|
||||
storage = Storage()
|
||||
|
||||
state = state_manager.get_active()
|
||||
case_id = state.get("case_id")
|
||||
evidence_id = state.get("evidence_id")
|
||||
|
||||
if not case_id:
|
||||
print("Error: No active case set. Open the TUI to select a case first.")
|
||||
print("No active context set.")
|
||||
print("Use --switch-case to set an active case, or open the TUI to select one.")
|
||||
return
|
||||
|
||||
case = storage.get_case(case_id)
|
||||
if not case:
|
||||
print("Error: Active case not found in storage.")
|
||||
return
|
||||
|
||||
print(f"Active context:")
|
||||
print(f" Case: {case.case_number}", end="")
|
||||
if case.name:
|
||||
print(f" - {case.name}", end="")
|
||||
print(f" [{case.case_id[:8]}...]")
|
||||
|
||||
if evidence_id:
|
||||
evidence = find_evidence(case, evidence_id)
|
||||
if evidence:
|
||||
print(f" Evidence: {evidence.name}", end="")
|
||||
if evidence.description:
|
||||
print(f" - {evidence.description}", end="")
|
||||
print(f" [{evidence.evidence_id[:8]}...]")
|
||||
else:
|
||||
print(f" Evidence: [not found - stale reference]")
|
||||
else:
|
||||
print(f" Evidence: [none - notes will attach to case]")
|
||||
|
||||
def list_contexts():
|
||||
"""List all cases and their evidence in a hierarchical format."""
|
||||
storage = Storage()
|
||||
|
||||
if not storage.cases:
|
||||
print("No cases found.")
|
||||
print("Use --new-case to create one, or open the TUI.")
|
||||
return
|
||||
|
||||
print("Cases and Evidence:")
|
||||
for case in storage.cases:
|
||||
# Show case
|
||||
print(f" [{case.case_id[:8]}...] {case.case_number}", end="")
|
||||
if case.name:
|
||||
print(f" - {case.name}", end="")
|
||||
if case.investigator:
|
||||
print(f" (Investigator: {case.investigator})", end="")
|
||||
print()
|
||||
|
||||
# Show evidence under this case
|
||||
for evidence in case.evidence:
|
||||
print(f" [{evidence.evidence_id[:8]}...] {evidence.name}", end="")
|
||||
if evidence.description:
|
||||
print(f" - {evidence.description}", end="")
|
||||
print()
|
||||
|
||||
# Add blank line between cases for readability
|
||||
if storage.cases[-1] != case:
|
||||
print()
|
||||
|
||||
def create_case(case_number: str, name: Optional[str] = None, investigator: Optional[str] = None):
|
||||
"""Create a new case and set it as active."""
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
|
||||
# Check if case number already exists
|
||||
existing = find_case(storage, case_number)
|
||||
if existing:
|
||||
print(f"Error: Case with number '{case_number}' already exists.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Create new case
|
||||
case = Case(case_number=case_number, name=name, investigator=investigator)
|
||||
storage.cases.append(case)
|
||||
storage.save_data()
|
||||
|
||||
# Set as active case
|
||||
state_manager.set_active(case.case_id, None)
|
||||
|
||||
print(f"✓ Created case '{case_number}' [{case.case_id[:8]}...]")
|
||||
if name:
|
||||
print(f" Name: {name}")
|
||||
if investigator:
|
||||
print(f" Investigator: {investigator}")
|
||||
print(f"✓ Set as active case")
|
||||
|
||||
def create_evidence(name: str, description: Optional[str] = None):
|
||||
"""Create new evidence and attach to active case."""
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
|
||||
state = state_manager.get_active()
|
||||
case_id = state.get("case_id")
|
||||
|
||||
if not case_id:
|
||||
print("Error: No active case set. Use --switch-case or --new-case first.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
case = storage.get_case(case_id)
|
||||
if not case:
|
||||
print("Error: Active case not found in storage. Ensure you have set an active case in the TUI.")
|
||||
print("Error: Active case not found in storage.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Check if evidence with this name already exists in the case
|
||||
existing = find_evidence(case, name)
|
||||
if existing:
|
||||
print(f"Error: Evidence named '{name}' already exists in case '{case.case_number}'.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Create new evidence
|
||||
evidence = Evidence(name=name, description=description)
|
||||
case.evidence.append(evidence)
|
||||
storage.save_data()
|
||||
|
||||
# Set as active evidence
|
||||
state_manager.set_active(case.case_id, evidence.evidence_id)
|
||||
|
||||
print(f"✓ Created evidence '{name}' [{evidence.evidence_id[:8]}...]")
|
||||
if description:
|
||||
print(f" Description: {description}")
|
||||
print(f"✓ Added to case '{case.case_number}'")
|
||||
print(f"✓ Set as active evidence")
|
||||
|
||||
def switch_case(identifier: str):
|
||||
"""Switch active case context."""
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
|
||||
case = find_case(storage, identifier)
|
||||
if not case:
|
||||
print(f"Error: Case '{identifier}' not found.", file=sys.stderr)
|
||||
print("Use --list to see available cases.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Set as active case, clear evidence
|
||||
state_manager.set_active(case.case_id, None)
|
||||
|
||||
print(f"✓ Switched to case '{case.case_number}' [{case.case_id[:8]}...]")
|
||||
if case.name:
|
||||
print(f" {case.name}")
|
||||
|
||||
def switch_evidence(identifier: str):
|
||||
"""Switch active evidence context within the active case."""
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
|
||||
state = state_manager.get_active()
|
||||
case_id = state.get("case_id")
|
||||
|
||||
if not case_id:
|
||||
print("Error: No active case set. Use --switch-case first.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
case = storage.get_case(case_id)
|
||||
if not case:
|
||||
print("Error: Active case not found in storage.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
evidence = find_evidence(case, identifier)
|
||||
if not evidence:
|
||||
print(f"Error: Evidence '{identifier}' not found in case '{case.case_number}'.", file=sys.stderr)
|
||||
print("Use --list to see available evidence.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Set as active evidence
|
||||
state_manager.set_active(case.case_id, evidence.evidence_id)
|
||||
|
||||
print(f"✓ Switched to evidence '{evidence.name}' [{evidence.evidence_id[:8]}...]")
|
||||
if evidence.description:
|
||||
print(f" {evidence.description}")
|
||||
|
||||
def quick_add_note(content: str, case_override: Optional[str] = None, evidence_override: Optional[str] = None):
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
|
||||
# Validate and clear stale state
|
||||
warning = state_manager.validate_and_clear_stale(storage)
|
||||
if warning:
|
||||
print(f"Warning: {warning}", file=sys.stderr)
|
||||
|
||||
state = state_manager.get_active()
|
||||
settings = state_manager.get_settings()
|
||||
|
||||
# Handle case override or use active case
|
||||
if case_override:
|
||||
case = find_case(storage, case_override)
|
||||
if not case:
|
||||
print(f"Error: Case '{case_override}' not found.", file=sys.stderr)
|
||||
print("Use --list to see available cases.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
else:
|
||||
case_id = state.get("case_id")
|
||||
if not case_id:
|
||||
print("Error: No active case set. Use --switch-case, --new-case, or open the TUI to select a case.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
case = storage.get_case(case_id)
|
||||
if not case:
|
||||
print("Error: Active case not found in storage.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Handle evidence override or use active evidence
|
||||
target_evidence = None
|
||||
|
||||
if evidence_id:
|
||||
# Find evidence
|
||||
for ev in case.evidence:
|
||||
if ev.evidence_id == evidence_id:
|
||||
target_evidence = ev
|
||||
break
|
||||
if evidence_override:
|
||||
target_evidence = find_evidence(case, evidence_override)
|
||||
if not target_evidence:
|
||||
print(f"Error: Evidence '{evidence_override}' not found in case '{case.case_number}'.", file=sys.stderr)
|
||||
print("Use --list to see available evidence.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
elif not case_override: # Only use active evidence if not overriding case
|
||||
evidence_id = state.get("evidence_id")
|
||||
if evidence_id:
|
||||
# Find and validate evidence belongs to active case
|
||||
target_evidence = find_evidence(case, evidence_id)
|
||||
|
||||
if not target_evidence:
|
||||
# Evidence ID is set but doesn't exist in case - clear it
|
||||
print(f"Warning: Active evidence not found in case. Clearing to case level.", file=sys.stderr)
|
||||
state_manager.set_active(case.case_id, None)
|
||||
|
||||
# Create note
|
||||
note = Note(content=content)
|
||||
@@ -38,27 +254,24 @@ def quick_add_note(content: str):
|
||||
note.extract_tags() # Extract hashtags from content
|
||||
note.extract_iocs() # Extract IOCs from content
|
||||
|
||||
# Try signing if enabled
|
||||
# Try signing the hash if enabled
|
||||
signature = None
|
||||
if settings.get("pgp_enabled", True):
|
||||
gpg_key_id = settings.get("gpg_key_id", None)
|
||||
if gpg_key_id:
|
||||
signature = Crypto.sign_content(f"Hash: {note.content_hash}\nContent: {note.content}", key_id=gpg_key_id)
|
||||
# Sign only the hash (hash already includes timestamp:content for integrity)
|
||||
signature = Crypto.sign_content(note.content_hash, key_id=gpg_key_id)
|
||||
if signature:
|
||||
note.signature = signature
|
||||
else:
|
||||
print("Warning: GPG signature failed (GPG not found or no key). Note saved without signature.")
|
||||
print("Warning: GPG signature failed (GPG not found or no key). Note saved without signature.", file=sys.stderr)
|
||||
else:
|
||||
print("Warning: No GPG key ID configured. Note saved without signature.")
|
||||
print("Warning: No GPG key ID configured. Note saved without signature.", file=sys.stderr)
|
||||
|
||||
# Attach to evidence or case
|
||||
if target_evidence:
|
||||
target_evidence.notes.append(note)
|
||||
print(f"✓ Note added to evidence '{target_evidence.name}'")
|
||||
elif evidence_id:
|
||||
print("Warning: Active evidence not found. Adding to case instead.")
|
||||
case.notes.append(note)
|
||||
print(f"✓ Note added to case '{case.case_number}'")
|
||||
else:
|
||||
case.notes.append(note)
|
||||
print(f"✓ Note added to case '{case.case_number}'")
|
||||
@@ -66,86 +279,208 @@ def quick_add_note(content: str):
|
||||
storage.save_data()
|
||||
|
||||
def export_markdown(output_file: str = "export.md"):
|
||||
storage = Storage()
|
||||
try:
|
||||
storage = Storage()
|
||||
state_manager = StateManager()
|
||||
settings = state_manager.get_settings()
|
||||
|
||||
with open(output_file, "w") as f:
|
||||
f.write("# Forensic Notes Export\n\n")
|
||||
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||
# Build the export content in memory first
|
||||
content_lines = []
|
||||
content_lines.append("# Forensic Notes Export\n\n")
|
||||
content_lines.append(f"Generated on: {time.ctime()}\n\n")
|
||||
|
||||
for case in storage.cases:
|
||||
f.write(f"## Case: {case.case_number}\n")
|
||||
content_lines.append(f"## Case: {case.case_number}\n")
|
||||
if case.name:
|
||||
f.write(f"**Name:** {case.name}\n")
|
||||
content_lines.append(f"**Name:** {case.name}\n")
|
||||
if case.investigator:
|
||||
f.write(f"**Investigator:** {case.investigator}\n")
|
||||
f.write(f"**Case ID:** {case.case_id}\n\n")
|
||||
content_lines.append(f"**Investigator:** {case.investigator}\n")
|
||||
content_lines.append(f"**Case ID:** {case.case_id}\n\n")
|
||||
|
||||
f.write("### Case Notes\n")
|
||||
content_lines.append("### Case Notes\n")
|
||||
if not case.notes:
|
||||
f.write("_No notes._\n")
|
||||
content_lines.append("_No notes._\n")
|
||||
for note in case.notes:
|
||||
write_note(f, note)
|
||||
note_content = format_note_for_export(note)
|
||||
content_lines.append(note_content)
|
||||
|
||||
f.write("\n### Evidence\n")
|
||||
content_lines.append("\n### Evidence\n")
|
||||
if not case.evidence:
|
||||
f.write("_No evidence._\n")
|
||||
content_lines.append("_No evidence._\n")
|
||||
|
||||
for ev in case.evidence:
|
||||
f.write(f"#### Evidence: {ev.name}\n")
|
||||
content_lines.append(f"#### Evidence: {ev.name}\n")
|
||||
if ev.description:
|
||||
f.write(f"_{ev.description}_\n")
|
||||
f.write(f"**ID:** {ev.evidence_id}\n")
|
||||
content_lines.append(f"_{ev.description}_\n")
|
||||
content_lines.append(f"**ID:** {ev.evidence_id}\n")
|
||||
|
||||
# Include source hash if available
|
||||
source_hash = ev.metadata.get("source_hash")
|
||||
if source_hash:
|
||||
f.write(f"**Source Hash:** `{source_hash}`\n")
|
||||
f.write("\n")
|
||||
content_lines.append(f"**Source Hash:** `{source_hash}`\n")
|
||||
content_lines.append("\n")
|
||||
|
||||
f.write("##### Evidence Notes\n")
|
||||
content_lines.append("##### Evidence Notes\n")
|
||||
if not ev.notes:
|
||||
f.write("_No notes._\n")
|
||||
content_lines.append("_No notes._\n")
|
||||
for note in ev.notes:
|
||||
write_note(f, note)
|
||||
f.write("\n")
|
||||
f.write("---\n\n")
|
||||
print(f"Exported to {output_file}")
|
||||
note_content = format_note_for_export(note)
|
||||
content_lines.append(note_content)
|
||||
content_lines.append("\n")
|
||||
content_lines.append("---\n\n")
|
||||
|
||||
def write_note(f, note: Note):
|
||||
f.write(f"- **{time.ctime(note.timestamp)}**\n")
|
||||
f.write(f" - Content: {note.content}\n")
|
||||
f.write(f" - Hash: `{note.content_hash}`\n")
|
||||
# Join all content
|
||||
export_content = "".join(content_lines)
|
||||
|
||||
# Sign the entire export if GPG is enabled
|
||||
if settings.get("pgp_enabled", False):
|
||||
gpg_key_id = settings.get("gpg_key_id", None)
|
||||
signed_export = Crypto.sign_content(export_content, key_id=gpg_key_id)
|
||||
|
||||
if signed_export:
|
||||
# Write the signed version
|
||||
final_content = signed_export
|
||||
print(f"✓ Export signed with GPG")
|
||||
else:
|
||||
# Signing failed - write unsigned
|
||||
final_content = export_content
|
||||
print("⚠ Warning: GPG signing failed. Export saved unsigned.", file=sys.stderr)
|
||||
else:
|
||||
final_content = export_content
|
||||
|
||||
# Write to file
|
||||
with open(output_file, "w", encoding='utf-8') as f:
|
||||
f.write(final_content)
|
||||
|
||||
print(f"✓ Exported to {output_file}")
|
||||
|
||||
# Show verification instructions
|
||||
if settings.get("pgp_enabled", False) and signed_export:
|
||||
print(f"\nTo verify the export:")
|
||||
print(f" gpg --verify {output_file}")
|
||||
|
||||
except (IOError, OSError, PermissionError) as e:
|
||||
print(f"Error: Failed to export to {output_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def format_note_for_export(note: Note) -> str:
|
||||
"""Format a single note for export (returns string instead of writing to file)
|
||||
|
||||
Includes Unix timestamp for hash reproducibility - anyone can recompute the hash
|
||||
using the formula: SHA256("{unix_timestamp}:{content}")
|
||||
"""
|
||||
lines = []
|
||||
lines.append(f"- **{time.ctime(note.timestamp)}**\n")
|
||||
lines.append(f" - Unix Timestamp: `{note.timestamp}` (for hash verification)\n")
|
||||
lines.append(f" - Content:\n")
|
||||
# Properly indent multi-line content
|
||||
for line in note.content.splitlines():
|
||||
lines.append(f" {line}\n")
|
||||
lines.append(f" - SHA256 Hash (timestamp:content): `{note.content_hash}`\n")
|
||||
if note.signature:
|
||||
f.write(" - **Signature Verified:**\n")
|
||||
f.write(" ```\n")
|
||||
lines.append(" - **GPG Signature of Hash:**\n")
|
||||
lines.append(" ```\n")
|
||||
# Indent signature for markdown block
|
||||
for line in note.signature.splitlines():
|
||||
f.write(f" {line}\n")
|
||||
f.write(" ```\n")
|
||||
f.write("\n")
|
||||
lines.append(f" {line}\n")
|
||||
lines.append(" ```\n")
|
||||
lines.append("\n")
|
||||
return "".join(lines)
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="trace: Forensic Note Taking Tool")
|
||||
parser.add_argument("note", nargs="?", help="Quick note content to add to active context")
|
||||
parser.add_argument("--export", help="Export all data to Markdown file", action="store_true")
|
||||
parser.add_argument("--output", help="Output file for export", default="trace_export.md")
|
||||
parser.add_argument("--open", "-o", help="Open TUI directly at active case/evidence", action="store_true")
|
||||
parser = argparse.ArgumentParser(
|
||||
description="trace: Forensic Note Taking Tool",
|
||||
epilog="Examples:\n"
|
||||
" trace 'Found suspicious process' Add note to active context\n"
|
||||
" trace --stdin < output.txt Add file contents as note\n"
|
||||
" trace --list List all cases and evidence\n"
|
||||
" trace --new-case 2024-001 Create new case\n"
|
||||
" trace --switch-case 2024-001 Switch active case\n",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
|
||||
# We will import TUI only if needed to keep start time fast
|
||||
# Note content (positional or stdin)
|
||||
parser.add_argument("note", nargs="?", help="Quick note content to add to active context")
|
||||
parser.add_argument("--stdin", action="store_true", help="Read note content from stdin")
|
||||
|
||||
# Context management
|
||||
parser.add_argument("--show-context", action="store_true", help="Show active case and evidence")
|
||||
parser.add_argument("--list", action="store_true", help="List all cases and evidence")
|
||||
parser.add_argument("--switch-case", metavar="IDENTIFIER", help="Switch active case (by ID or case number)")
|
||||
parser.add_argument("--switch-evidence", metavar="IDENTIFIER", help="Switch active evidence (by ID or name)")
|
||||
|
||||
# Temporary overrides for note addition
|
||||
parser.add_argument("--case", metavar="IDENTIFIER", help="Use specific case for this note (doesn't change active)")
|
||||
parser.add_argument("--evidence", metavar="IDENTIFIER", help="Use specific evidence for this note (doesn't change active)")
|
||||
|
||||
# Case and evidence creation
|
||||
parser.add_argument("--new-case", metavar="CASE_NUMBER", help="Create new case")
|
||||
parser.add_argument("--name", metavar="NAME", help="Name for new case")
|
||||
parser.add_argument("--investigator", metavar="INVESTIGATOR", help="Investigator name for new case")
|
||||
parser.add_argument("--new-evidence", metavar="EVIDENCE_NAME", help="Create new evidence in active case")
|
||||
parser.add_argument("--description", metavar="DESC", help="Description for new evidence")
|
||||
|
||||
# Export
|
||||
parser.add_argument("--export", action="store_true", help="Export all data to Markdown file")
|
||||
parser.add_argument("--output", metavar="FILE", default="trace_export.md", help="Output file for export")
|
||||
|
||||
# TUI
|
||||
parser.add_argument("--open", "-o", action="store_true", help="Open TUI directly at active case/evidence")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Handle context management commands
|
||||
if args.show_context:
|
||||
show_context()
|
||||
return
|
||||
|
||||
if args.list:
|
||||
list_contexts()
|
||||
return
|
||||
|
||||
if args.switch_case:
|
||||
switch_case(args.switch_case)
|
||||
return
|
||||
|
||||
if args.switch_evidence:
|
||||
switch_evidence(args.switch_evidence)
|
||||
return
|
||||
|
||||
# Handle case/evidence creation
|
||||
if args.new_case:
|
||||
create_case(args.new_case, name=args.name, investigator=args.investigator)
|
||||
return
|
||||
|
||||
if args.new_evidence:
|
||||
create_evidence(args.new_evidence, description=args.description)
|
||||
return
|
||||
|
||||
# Handle export
|
||||
if args.export:
|
||||
export_markdown(args.output)
|
||||
return
|
||||
|
||||
if args.note:
|
||||
quick_add_note(args.note)
|
||||
# Handle note addition
|
||||
if args.stdin:
|
||||
# Read from stdin
|
||||
content = sys.stdin.read().strip()
|
||||
if not content:
|
||||
print("Error: No content provided from stdin.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
quick_add_note(content, case_override=args.case, evidence_override=args.evidence)
|
||||
return
|
||||
|
||||
if args.note:
|
||||
quick_add_note(args.note, case_override=args.case, evidence_override=args.evidence)
|
||||
return
|
||||
|
||||
# No arguments - check for first run and launch TUI
|
||||
from .gpg_wizard import check_and_run_wizard
|
||||
check_and_run_wizard()
|
||||
|
||||
# Launch TUI (with optional direct navigation to active context)
|
||||
try:
|
||||
from .tui import run_tui
|
||||
from .tui_app import run_tui
|
||||
run_tui(open_active=args.open)
|
||||
except ImportError as e:
|
||||
print(f"Error launching TUI: {e}")
|
||||
|
||||
134
trace/crypto.py
134
trace/crypto.py
@@ -2,6 +2,106 @@ import subprocess
|
||||
import hashlib
|
||||
|
||||
class Crypto:
|
||||
@staticmethod
|
||||
def is_gpg_available() -> bool:
|
||||
"""
|
||||
Check if GPG is available on the system.
|
||||
|
||||
Returns:
|
||||
True if GPG is available, False otherwise.
|
||||
"""
|
||||
try:
|
||||
proc = subprocess.Popen(
|
||||
['gpg', '--version'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
stdout, stderr = proc.communicate(timeout=5)
|
||||
return proc.returncode == 0
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def verify_signature(signed_content: str) -> tuple[bool, str]:
|
||||
"""
|
||||
Verify a GPG clearsigned message.
|
||||
|
||||
Args:
|
||||
signed_content: The clearsigned content to verify
|
||||
|
||||
Returns:
|
||||
A tuple of (verified: bool, signer_info: str)
|
||||
- verified: True if signature is valid, False otherwise
|
||||
- signer_info: Information about the signer (key ID, name) or error message
|
||||
"""
|
||||
if not signed_content or not signed_content.strip():
|
||||
return False, "No signature present"
|
||||
|
||||
# Check if content looks like a GPG signed message
|
||||
if "-----BEGIN PGP SIGNED MESSAGE-----" not in signed_content:
|
||||
return False, "Not a GPG signed message"
|
||||
|
||||
try:
|
||||
# Force English output for consistent parsing across locales
|
||||
# Linux/macOS: LC_ALL/LANG variables control GPG's output language
|
||||
# Windows: GPG may ignore these, but encoding='utf-8' + errors='replace' provides robustness
|
||||
import os
|
||||
env = os.environ.copy()
|
||||
# Use C.UTF-8 for English messages with UTF-8 encoding support
|
||||
# Falls back gracefully via errors='replace' if locale not available
|
||||
env['LC_ALL'] = 'C.UTF-8'
|
||||
env['LANG'] = 'C.UTF-8'
|
||||
|
||||
proc = subprocess.Popen(
|
||||
['gpg', '--verify'],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True,
|
||||
encoding='utf-8',
|
||||
errors='replace', # Handle encoding issues on any platform
|
||||
env=env
|
||||
)
|
||||
stdout, stderr = proc.communicate(input=signed_content, timeout=10)
|
||||
|
||||
if proc.returncode == 0:
|
||||
# Parse signer info from stderr (GPG outputs verification info to stderr)
|
||||
signer_info = "Unknown signer"
|
||||
for line in stderr.split('\n'):
|
||||
if "Good signature from" in line:
|
||||
# Extract the signer name/email
|
||||
parts = line.split('"')
|
||||
if len(parts) >= 2:
|
||||
signer_info = parts[1]
|
||||
break # Only break after successfully extracting signer info
|
||||
elif "using" in line:
|
||||
# Try to get key ID as fallback
|
||||
if "key" in line.lower():
|
||||
signer_info = line.strip()
|
||||
|
||||
return True, signer_info
|
||||
else:
|
||||
# Signature verification failed
|
||||
error_msg = "Verification failed"
|
||||
for line in stderr.split('\n'):
|
||||
if "BAD signature" in line:
|
||||
error_msg = "BAD signature"
|
||||
break
|
||||
elif "no public key" in line or "public key not found" in line:
|
||||
error_msg = "Public key not found in keyring"
|
||||
break
|
||||
elif "Can't check signature" in line:
|
||||
error_msg = "Cannot check signature"
|
||||
break
|
||||
|
||||
return False, error_msg
|
||||
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return False, "GPG not available or timeout"
|
||||
except Exception as e:
|
||||
return False, f"Error: {str(e)}"
|
||||
|
||||
@staticmethod
|
||||
def list_gpg_keys():
|
||||
"""
|
||||
@@ -15,7 +115,7 @@ class Crypto:
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
stdout, stderr = proc.communicate()
|
||||
stdout, stderr = proc.communicate(timeout=10)
|
||||
|
||||
if proc.returncode != 0:
|
||||
return []
|
||||
@@ -37,12 +137,12 @@ class Crypto:
|
||||
elif fields[0] == 'uid' and current_key_id:
|
||||
user_id = fields[9] if len(fields) > 9 else "Unknown"
|
||||
keys.append((current_key_id, user_id))
|
||||
current_key_id = None # Reset after matching
|
||||
# Don't reset current_key_id - allow multiple UIDs per key
|
||||
|
||||
return keys
|
||||
|
||||
except FileNotFoundError:
|
||||
return [] # GPG not installed
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return [] # GPG not installed or timed out
|
||||
|
||||
@staticmethod
|
||||
def sign_content(content: str, key_id: str = None) -> str:
|
||||
@@ -71,7 +171,7 @@ class Crypto:
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
stdout, stderr = proc.communicate(input=content)
|
||||
stdout, stderr = proc.communicate(input=content, timeout=10)
|
||||
|
||||
if proc.returncode != 0:
|
||||
# Fallback: maybe no key is found or gpg error
|
||||
@@ -79,10 +179,30 @@ class Crypto:
|
||||
return ""
|
||||
|
||||
return stdout
|
||||
except FileNotFoundError:
|
||||
return "" # GPG not installed
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return "" # GPG not installed or timed out
|
||||
|
||||
@staticmethod
|
||||
def hash_content(content: str, timestamp: float) -> str:
|
||||
"""Calculate SHA256 hash of timestamp:content.
|
||||
|
||||
Hash input format: "{timestamp}:{content}"
|
||||
- timestamp: Unix epoch timestamp as float (seconds since 1970-01-01 00:00:00 UTC)
|
||||
Example: 1702345678.123456
|
||||
- The float is converted to string using Python's default str() conversion
|
||||
- Colon (':') separator between timestamp and content
|
||||
- Ensures integrity of both WHAT was said and WHEN it was said
|
||||
|
||||
Args:
|
||||
content: The note content to hash
|
||||
timestamp: Unix epoch timestamp as float
|
||||
|
||||
Returns:
|
||||
SHA256 hash as hexadecimal string (64 characters)
|
||||
|
||||
Example:
|
||||
>>> hash_content("Suspicious process detected", 1702345678.123456)
|
||||
Computes SHA256 of: "1702345678.123456:Suspicious process detected"
|
||||
"""
|
||||
data = f"{timestamp}:{content}".encode('utf-8')
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
126
trace/gpg_wizard.py
Normal file
126
trace/gpg_wizard.py
Normal file
@@ -0,0 +1,126 @@
|
||||
"""First-run GPG setup wizard for trace application"""
|
||||
|
||||
import sys
|
||||
from .crypto import Crypto
|
||||
from .storage import StateManager
|
||||
|
||||
|
||||
def run_gpg_wizard():
|
||||
"""
|
||||
Run the first-time GPG setup wizard.
|
||||
|
||||
Returns:
|
||||
dict: Settings to save (gpg_enabled, gpg_key_id)
|
||||
"""
|
||||
print("\n" + "="*60)
|
||||
print("Welcome to trace - Forensic Note Taking Tool")
|
||||
print("="*60)
|
||||
print("\nFirst-time setup: GPG Signature Configuration\n")
|
||||
print("trace can digitally sign all notes using GPG for authenticity")
|
||||
print("and integrity verification. This is useful for legal evidence")
|
||||
print("and chain-of-custody documentation.\n")
|
||||
|
||||
# Check if GPG is available
|
||||
gpg_available = Crypto.is_gpg_available()
|
||||
|
||||
if not gpg_available:
|
||||
print("⚠ GPG is not installed or not available on your system.")
|
||||
print("\nTo use GPG signing, please install GPG:")
|
||||
print(" - Linux: apt install gnupg / yum install gnupg")
|
||||
print(" - macOS: brew install gnupg")
|
||||
print(" - Windows: Install Gpg4win (https://gpg4win.org)")
|
||||
print("\nYou can enable GPG signing later by editing ~/.trace/settings.json")
|
||||
print("\nPress Enter to continue without GPG signing...")
|
||||
input()
|
||||
return {"pgp_enabled": False, "gpg_key_id": None}
|
||||
|
||||
# GPG is available - ask if user wants to enable it
|
||||
print("✓ GPG is available on your system.\n")
|
||||
|
||||
while True:
|
||||
response = input("Do you want to enable GPG signing for notes? (y/n): ").strip().lower()
|
||||
if response in ['y', 'yes']:
|
||||
enable_gpg = True
|
||||
break
|
||||
elif response in ['n', 'no']:
|
||||
enable_gpg = False
|
||||
break
|
||||
else:
|
||||
print("Please enter 'y' or 'n'")
|
||||
|
||||
if not enable_gpg:
|
||||
print("\nGPG signing disabled. You can enable it later in settings.")
|
||||
return {"pgp_enabled": False, "gpg_key_id": None}
|
||||
|
||||
# List available GPG keys
|
||||
print("\nSearching for GPG secret keys...\n")
|
||||
keys = Crypto.list_gpg_keys()
|
||||
|
||||
if not keys:
|
||||
print("⚠ No GPG secret keys found in your keyring.")
|
||||
print("\nTo use GPG signing, you need to generate a GPG key first:")
|
||||
print(" - Use 'gpg --gen-key' (Linux/macOS)")
|
||||
print(" - Use Kleopatra (Windows)")
|
||||
print("\nAfter generating a key, you can enable GPG signing by editing")
|
||||
print("~/.trace/settings.json and setting 'gpg_enabled': true")
|
||||
print("\nPress Enter to continue without GPG signing...")
|
||||
input()
|
||||
return {"pgp_enabled": False, "gpg_key_id": None}
|
||||
|
||||
# Display available keys
|
||||
print("Available GPG keys:\n")
|
||||
for i, (key_id, user_id) in enumerate(keys, 1):
|
||||
print(f" {i}. {user_id}")
|
||||
print(f" Key ID: {key_id}\n")
|
||||
|
||||
# Let user select a key
|
||||
selected_key = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
choice = input(f"Select a key (1-{len(keys)}, or 0 to use default key): ").strip()
|
||||
choice_num = int(choice)
|
||||
|
||||
if choice_num == 0:
|
||||
print("Using GPG default key (no specific key ID)")
|
||||
selected_key = None
|
||||
break
|
||||
elif 1 <= choice_num <= len(keys):
|
||||
selected_key = keys[choice_num - 1][0]
|
||||
print(f"Selected: {keys[choice_num - 1][1]}")
|
||||
break
|
||||
else:
|
||||
print(f"Please enter a number between 0 and {len(keys)}")
|
||||
except ValueError:
|
||||
print("Please enter a valid number")
|
||||
|
||||
print("\n✓ GPG signing enabled!")
|
||||
if selected_key:
|
||||
print(f" Using key: {selected_key}")
|
||||
else:
|
||||
print(" Using default GPG key")
|
||||
|
||||
print("\nSetup complete. Starting trace...\n")
|
||||
|
||||
return {"pgp_enabled": True, "gpg_key_id": selected_key}
|
||||
|
||||
|
||||
def check_and_run_wizard():
|
||||
"""
|
||||
Check if this is first run and run wizard if needed.
|
||||
Returns True if wizard was run, False otherwise.
|
||||
"""
|
||||
state_manager = StateManager()
|
||||
|
||||
# Check if settings file exists - if it does, wizard has already been run
|
||||
if state_manager.settings_file.exists():
|
||||
return False
|
||||
|
||||
# First run - run wizard
|
||||
wizard_settings = run_gpg_wizard()
|
||||
|
||||
# Save settings
|
||||
for key, value in wizard_settings.items():
|
||||
state_manager.set_setting(key, value)
|
||||
|
||||
return True
|
||||
286
trace/models.py
286
trace/models.py
@@ -1,286 +0,0 @@
|
||||
import time
|
||||
import hashlib
|
||||
import uuid
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict
|
||||
|
||||
@dataclass
|
||||
class Note:
|
||||
content: str
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
note_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
content_hash: str = ""
|
||||
signature: Optional[str] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
iocs: List[str] = field(default_factory=list)
|
||||
|
||||
def extract_tags(self):
|
||||
"""Extract hashtags from content (case-insensitive, stored lowercase)"""
|
||||
# Match hashtags: # followed by word characters
|
||||
tag_pattern = r'#(\w+)'
|
||||
matches = re.findall(tag_pattern, self.content)
|
||||
# Convert to lowercase and remove duplicates while preserving order
|
||||
seen = set()
|
||||
self.tags = []
|
||||
for tag in matches:
|
||||
tag_lower = tag.lower()
|
||||
if tag_lower not in seen:
|
||||
seen.add(tag_lower)
|
||||
self.tags.append(tag_lower)
|
||||
|
||||
def extract_iocs(self):
|
||||
"""Extract Indicators of Compromise from content"""
|
||||
seen = set()
|
||||
self.iocs = []
|
||||
|
||||
# IPv4 addresses
|
||||
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
||||
for match in re.findall(ipv4_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# IPv6 addresses (simplified)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
|
||||
for match in re.findall(ipv6_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# Domain names (basic pattern)
|
||||
domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
||||
for match in re.findall(domain_pattern, self.content):
|
||||
# Filter out common false positives
|
||||
if match not in seen and not match.startswith('example.'):
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# URLs
|
||||
url_pattern = r'https?://[^\s]+'
|
||||
for match in re.findall(url_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# MD5 hashes (32 hex chars)
|
||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
||||
for match in re.findall(md5_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# SHA1 hashes (40 hex chars)
|
||||
sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
|
||||
for match in re.findall(sha1_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# SHA256 hashes (64 hex chars)
|
||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
||||
for match in re.findall(sha256_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# Email addresses
|
||||
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||||
for match in re.findall(email_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
def calculate_hash(self):
|
||||
# We hash the content + timestamp to ensure integrity of 'when' it was said
|
||||
data = f"{self.timestamp}:{self.content}".encode('utf-8')
|
||||
self.content_hash = hashlib.sha256(data).hexdigest()
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_from_text(text):
|
||||
"""Extract IOCs from text and return as list of (ioc, type) tuples"""
|
||||
iocs = []
|
||||
seen = set()
|
||||
|
||||
# IPv4 addresses
|
||||
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
||||
for match in re.findall(ipv4_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'ipv4'))
|
||||
|
||||
# IPv6 addresses (simplified)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
|
||||
for match in re.findall(ipv6_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'ipv6'))
|
||||
|
||||
# URLs (check before domains to avoid double-matching)
|
||||
url_pattern = r'https?://[^\s]+'
|
||||
for match in re.findall(url_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'url'))
|
||||
|
||||
# Domain names (basic pattern)
|
||||
domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
||||
for match in re.findall(domain_pattern, text):
|
||||
# Filter out common false positives and already seen URLs
|
||||
if match not in seen and not match.startswith('example.'):
|
||||
seen.add(match)
|
||||
iocs.append((match, 'domain'))
|
||||
|
||||
# SHA256 hashes (64 hex chars) - check before SHA1 and MD5
|
||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
||||
for match in re.findall(sha256_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'sha256'))
|
||||
|
||||
# SHA1 hashes (40 hex chars) - check before MD5
|
||||
sha1_pattern = r'\b[a-fA-F0-9]{40}\b'
|
||||
for match in re.findall(sha1_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'sha1'))
|
||||
|
||||
# MD5 hashes (32 hex chars)
|
||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
||||
for match in re.findall(md5_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'md5'))
|
||||
|
||||
# Email addresses
|
||||
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||||
for match in re.findall(email_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'email'))
|
||||
|
||||
return iocs
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_with_positions(text):
|
||||
"""Extract IOCs with their positions for highlighting. Returns list of (text, start, end, type) tuples"""
|
||||
import re
|
||||
highlights = []
|
||||
|
||||
# IPv4 addresses
|
||||
for match in re.finditer(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'ipv4'))
|
||||
|
||||
# IPv6 addresses
|
||||
for match in re.finditer(r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'ipv6'))
|
||||
|
||||
# URLs (check before domains)
|
||||
for match in re.finditer(r'https?://[^\s]+', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'url'))
|
||||
|
||||
# Domain names
|
||||
for match in re.finditer(r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b', text):
|
||||
if not match.group().startswith('example.'):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'domain'))
|
||||
|
||||
# SHA256 hashes
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{64}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'sha256'))
|
||||
|
||||
# SHA1 hashes
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{40}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'sha1'))
|
||||
|
||||
# MD5 hashes
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{32}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'md5'))
|
||||
|
||||
# Email addresses
|
||||
for match in re.finditer(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'email'))
|
||||
|
||||
return highlights
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"note_id": self.note_id,
|
||||
"content": self.content,
|
||||
"timestamp": self.timestamp,
|
||||
"content_hash": self.content_hash,
|
||||
"signature": self.signature,
|
||||
"tags": self.tags,
|
||||
"iocs": self.iocs
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
note = Note(
|
||||
content=data["content"],
|
||||
timestamp=data["timestamp"],
|
||||
note_id=data["note_id"],
|
||||
content_hash=data.get("content_hash", ""),
|
||||
signature=data.get("signature"),
|
||||
tags=data.get("tags", []),
|
||||
iocs=data.get("iocs", [])
|
||||
)
|
||||
return note
|
||||
|
||||
@dataclass
|
||||
class Evidence:
|
||||
name: str
|
||||
evidence_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
description: str = ""
|
||||
metadata: Dict[str, str] = field(default_factory=dict)
|
||||
notes: List[Note] = field(default_factory=list)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"evidence_id": self.evidence_id,
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"metadata": self.metadata,
|
||||
"notes": [n.to_dict() for n in self.notes]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
ev = Evidence(
|
||||
name=data["name"],
|
||||
evidence_id=data["evidence_id"],
|
||||
description=data.get("description", ""),
|
||||
metadata=data.get("metadata", {})
|
||||
)
|
||||
ev.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||
return ev
|
||||
|
||||
@dataclass
|
||||
class Case:
|
||||
case_number: str
|
||||
case_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
name: str = ""
|
||||
investigator: str = ""
|
||||
evidence: List[Evidence] = field(default_factory=list)
|
||||
notes: List[Note] = field(default_factory=list)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"case_id": self.case_id,
|
||||
"case_number": self.case_number,
|
||||
"name": self.name,
|
||||
"investigator": self.investigator,
|
||||
"evidence": [e.to_dict() for e in self.evidence],
|
||||
"notes": [n.to_dict() for n in self.notes]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
case = Case(
|
||||
case_number=data["case_number"],
|
||||
case_id=data["case_id"],
|
||||
name=data.get("name", ""),
|
||||
investigator=data.get("investigator", "")
|
||||
)
|
||||
case.evidence = [Evidence.from_dict(e) for e in data.get("evidence", [])]
|
||||
case.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||
return case
|
||||
160
trace/models/__init__.py
Normal file
160
trace/models/__init__.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Data models for trace application"""
|
||||
|
||||
import time
|
||||
import hashlib
|
||||
import uuid
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict, Tuple
|
||||
|
||||
from .extractors import TagExtractor, IOCExtractor
|
||||
|
||||
|
||||
@dataclass
|
||||
class Note:
|
||||
content: str
|
||||
# Unix timestamp: seconds since 1970-01-01 00:00:00 UTC as float
|
||||
# Example: 1702345678.123456
|
||||
# This exact float value (with full precision) is used in hash calculation
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
note_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
content_hash: str = ""
|
||||
signature: Optional[str] = None
|
||||
tags: List[str] = field(default_factory=list)
|
||||
iocs: List[str] = field(default_factory=list)
|
||||
|
||||
def extract_tags(self):
|
||||
"""Extract hashtags from content (case-insensitive, stored lowercase)"""
|
||||
self.tags = TagExtractor.extract_tags(self.content)
|
||||
|
||||
def extract_iocs(self):
|
||||
"""Extract Indicators of Compromise from content"""
|
||||
self.iocs = IOCExtractor.extract_iocs(self.content)
|
||||
|
||||
def calculate_hash(self):
|
||||
"""Calculate SHA256 hash of timestamp:content.
|
||||
|
||||
Hash input format: "{timestamp}:{content}"
|
||||
- timestamp: Unix epoch timestamp as float (e.g., "1702345678.123456")
|
||||
- The float is converted to string using Python's default str() conversion
|
||||
- Colon separator between timestamp and content
|
||||
- Ensures integrity of both WHAT was said and WHEN it was said
|
||||
|
||||
Example hash input: "1702345678.123456:Suspicious process detected"
|
||||
"""
|
||||
data = f"{self.timestamp}:{self.content}".encode('utf-8')
|
||||
self.content_hash = hashlib.sha256(data).hexdigest()
|
||||
|
||||
def verify_signature(self) -> Tuple[bool, str]:
|
||||
"""
|
||||
Verify the GPG signature of this note.
|
||||
|
||||
Returns:
|
||||
A tuple of (verified: bool, info: str)
|
||||
- verified: True if signature is valid, False if invalid or unsigned
|
||||
- info: Signer information or error/status message
|
||||
"""
|
||||
# Import here to avoid circular dependency
|
||||
from ..crypto import Crypto
|
||||
|
||||
if not self.signature:
|
||||
return False, "unsigned"
|
||||
|
||||
return Crypto.verify_signature(self.signature)
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_from_text(text):
|
||||
"""Extract IOCs from text and return as list of (ioc, type) tuples"""
|
||||
return IOCExtractor.extract_iocs_with_types(text)
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_with_positions(text):
|
||||
"""Extract IOCs with their positions for highlighting. Returns list of (text, start, end, type) tuples"""
|
||||
return IOCExtractor.extract_iocs_with_positions(text)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"note_id": self.note_id,
|
||||
"content": self.content,
|
||||
"timestamp": self.timestamp,
|
||||
"content_hash": self.content_hash,
|
||||
"signature": self.signature,
|
||||
"tags": self.tags,
|
||||
"iocs": self.iocs
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
note = Note(
|
||||
content=data["content"],
|
||||
timestamp=data["timestamp"],
|
||||
note_id=data["note_id"],
|
||||
content_hash=data.get("content_hash", ""),
|
||||
signature=data.get("signature"),
|
||||
tags=data.get("tags", []),
|
||||
iocs=data.get("iocs", [])
|
||||
)
|
||||
return note
|
||||
|
||||
|
||||
@dataclass
|
||||
class Evidence:
|
||||
name: str
|
||||
evidence_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
description: str = ""
|
||||
metadata: Dict[str, str] = field(default_factory=dict)
|
||||
notes: List[Note] = field(default_factory=list)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"evidence_id": self.evidence_id,
|
||||
"name": self.name,
|
||||
"description": self.description,
|
||||
"metadata": self.metadata,
|
||||
"notes": [n.to_dict() for n in self.notes]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
ev = Evidence(
|
||||
name=data["name"],
|
||||
evidence_id=data["evidence_id"],
|
||||
description=data.get("description", ""),
|
||||
metadata=data.get("metadata", {})
|
||||
)
|
||||
ev.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||
return ev
|
||||
|
||||
|
||||
@dataclass
|
||||
class Case:
|
||||
case_number: str
|
||||
case_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
||||
name: str = ""
|
||||
investigator: str = ""
|
||||
evidence: List[Evidence] = field(default_factory=list)
|
||||
notes: List[Note] = field(default_factory=list)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"case_id": self.case_id,
|
||||
"case_number": self.case_number,
|
||||
"name": self.name,
|
||||
"investigator": self.investigator,
|
||||
"evidence": [e.to_dict() for e in self.evidence],
|
||||
"notes": [n.to_dict() for n in self.notes]
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data):
|
||||
case = Case(
|
||||
case_number=data["case_number"],
|
||||
case_id=data["case_id"],
|
||||
name=data.get("name", ""),
|
||||
investigator=data.get("investigator", "")
|
||||
)
|
||||
case.evidence = [Evidence.from_dict(e) for e in data.get("evidence", [])]
|
||||
case.notes = [Note.from_dict(n) for n in data.get("notes", [])]
|
||||
return case
|
||||
|
||||
|
||||
__all__ = ['Note', 'Evidence', 'Case', 'TagExtractor', 'IOCExtractor']
|
||||
6
trace/models/extractors/__init__.py
Normal file
6
trace/models/extractors/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Extractors for tags and IOCs from note content"""
|
||||
|
||||
from .tag_extractor import TagExtractor
|
||||
from .ioc_extractor import IOCExtractor
|
||||
|
||||
__all__ = ['TagExtractor', 'IOCExtractor']
|
||||
236
trace/models/extractors/ioc_extractor.py
Normal file
236
trace/models/extractors/ioc_extractor.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""IOC (Indicator of Compromise) extraction logic for notes"""
|
||||
|
||||
import re
|
||||
from typing import List, Tuple
|
||||
|
||||
|
||||
class IOCExtractor:
|
||||
"""Extract Indicators of Compromise from text content"""
|
||||
|
||||
# Regex patterns for different IOC types
|
||||
SHA256_PATTERN = r'\b[a-fA-F0-9]{64}\b'
|
||||
SHA1_PATTERN = r'\b[a-fA-F0-9]{40}\b'
|
||||
MD5_PATTERN = r'\b[a-fA-F0-9]{32}\b'
|
||||
IPV4_PATTERN = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
||||
IPV6_PATTERN = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
||||
URL_PATTERN = r'https?://[^\s<>\"\']+(?<![.,;:!?\)\]\}])'
|
||||
DOMAIN_PATTERN = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
||||
EMAIL_PATTERN = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs(text: str) -> List[str]:
|
||||
"""
|
||||
Extract IOCs from text and return as simple list
|
||||
|
||||
Args:
|
||||
text: The text to extract IOCs from
|
||||
|
||||
Returns:
|
||||
List of unique IOC strings
|
||||
"""
|
||||
seen = set()
|
||||
covered_ranges = set()
|
||||
iocs = []
|
||||
|
||||
def add_ioc_if_not_covered(match_obj):
|
||||
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
||||
start, end = match_obj.start(), match_obj.end()
|
||||
# Check if this range overlaps with any covered range
|
||||
for covered_start, covered_end in covered_ranges:
|
||||
if not (end <= covered_start or start >= covered_end):
|
||||
return False # Overlaps, don't add
|
||||
ioc_text = match_obj.group()
|
||||
if ioc_text not in seen:
|
||||
seen.add(ioc_text)
|
||||
covered_ranges.add((start, end))
|
||||
iocs.append(ioc_text)
|
||||
return True
|
||||
return False
|
||||
|
||||
# Process in order of priority to avoid false positives
|
||||
# SHA256 hashes (64 hex chars) - check longest first to avoid substring matches
|
||||
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# SHA1 hashes (40 hex chars)
|
||||
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# MD5 hashes (32 hex chars)
|
||||
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# IPv4 addresses
|
||||
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# IPv6 addresses (supports compressed format)
|
||||
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# URLs (check before domains to prevent double-matching)
|
||||
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# Domain names (basic pattern)
|
||||
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||
# Filter out common false positives
|
||||
if not match.group().startswith('example.'):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
# Email addresses
|
||||
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||
add_ioc_if_not_covered(match)
|
||||
|
||||
return iocs
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_with_types(text: str) -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Extract IOCs from text and return as list of (ioc, type) tuples
|
||||
|
||||
Args:
|
||||
text: The text to extract IOCs from
|
||||
|
||||
Returns:
|
||||
List of (ioc_text, ioc_type) tuples
|
||||
"""
|
||||
iocs = []
|
||||
seen = set()
|
||||
covered_ranges = set()
|
||||
|
||||
def add_ioc_if_not_covered(match_obj, ioc_type):
|
||||
"""Add IOC if its range doesn't overlap with already covered ranges"""
|
||||
start, end = match_obj.start(), match_obj.end()
|
||||
# Check if this range overlaps with any covered range
|
||||
for covered_start, covered_end in covered_ranges:
|
||||
if not (end <= covered_start or start >= covered_end):
|
||||
return False # Overlaps, don't add
|
||||
ioc_text = match_obj.group()
|
||||
if ioc_text not in seen:
|
||||
seen.add(ioc_text)
|
||||
covered_ranges.add((start, end))
|
||||
iocs.append((ioc_text, ioc_type))
|
||||
return True
|
||||
return False
|
||||
|
||||
# Process in priority order: longest hashes first
|
||||
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'sha256')
|
||||
|
||||
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'sha1')
|
||||
|
||||
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'md5')
|
||||
|
||||
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'ipv4')
|
||||
|
||||
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'ipv6')
|
||||
|
||||
# URLs (check before domains to avoid double-matching)
|
||||
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'url')
|
||||
|
||||
# Domain names
|
||||
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||
# Filter out common false positives
|
||||
if not match.group().startswith('example.'):
|
||||
add_ioc_if_not_covered(match, 'domain')
|
||||
|
||||
# Email addresses
|
||||
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||
add_ioc_if_not_covered(match, 'email')
|
||||
|
||||
return iocs
|
||||
|
||||
@staticmethod
|
||||
def extract_iocs_with_positions(text: str) -> List[Tuple[str, int, int, str]]:
|
||||
"""
|
||||
Extract IOCs with their positions for highlighting
|
||||
|
||||
Args:
|
||||
text: The text to extract IOCs from
|
||||
|
||||
Returns:
|
||||
List of (ioc_text, start_pos, end_pos, ioc_type) tuples
|
||||
"""
|
||||
highlights = []
|
||||
covered_ranges = set()
|
||||
|
||||
def overlaps(start, end):
|
||||
"""Check if range overlaps with any covered range"""
|
||||
for covered_start, covered_end in covered_ranges:
|
||||
if not (end <= covered_start or start >= covered_end):
|
||||
return True
|
||||
return False
|
||||
|
||||
def add_highlight(match, ioc_type):
|
||||
"""Add highlight if it doesn't overlap with existing ones"""
|
||||
start, end = match.start(), match.end()
|
||||
if not overlaps(start, end):
|
||||
highlights.append((match.group(), start, end, ioc_type))
|
||||
covered_ranges.add((start, end))
|
||||
|
||||
# Process in priority order: longest hashes first to avoid substring matches
|
||||
for match in re.finditer(IOCExtractor.SHA256_PATTERN, text):
|
||||
add_highlight(match, 'sha256')
|
||||
|
||||
for match in re.finditer(IOCExtractor.SHA1_PATTERN, text):
|
||||
add_highlight(match, 'sha1')
|
||||
|
||||
for match in re.finditer(IOCExtractor.MD5_PATTERN, text):
|
||||
add_highlight(match, 'md5')
|
||||
|
||||
for match in re.finditer(IOCExtractor.IPV4_PATTERN, text):
|
||||
add_highlight(match, 'ipv4')
|
||||
|
||||
for match in re.finditer(IOCExtractor.IPV6_PATTERN, text):
|
||||
add_highlight(match, 'ipv6')
|
||||
|
||||
# URLs (check before domains to prevent double-matching)
|
||||
for match in re.finditer(IOCExtractor.URL_PATTERN, text):
|
||||
add_highlight(match, 'url')
|
||||
|
||||
# Domain names
|
||||
for match in re.finditer(IOCExtractor.DOMAIN_PATTERN, text):
|
||||
if not match.group().startswith('example.'):
|
||||
add_highlight(match, 'domain')
|
||||
|
||||
# Email addresses
|
||||
for match in re.finditer(IOCExtractor.EMAIL_PATTERN, text):
|
||||
add_highlight(match, 'email')
|
||||
|
||||
return highlights
|
||||
|
||||
@staticmethod
|
||||
def classify_ioc(ioc: str) -> str:
|
||||
"""
|
||||
Classify an IOC by its type
|
||||
|
||||
Args:
|
||||
ioc: The IOC string to classify
|
||||
|
||||
Returns:
|
||||
The IOC type as a string
|
||||
"""
|
||||
if re.fullmatch(IOCExtractor.SHA256_PATTERN, ioc):
|
||||
return 'sha256'
|
||||
elif re.fullmatch(IOCExtractor.SHA1_PATTERN, ioc):
|
||||
return 'sha1'
|
||||
elif re.fullmatch(IOCExtractor.MD5_PATTERN, ioc):
|
||||
return 'md5'
|
||||
elif re.fullmatch(IOCExtractor.IPV4_PATTERN, ioc):
|
||||
return 'ipv4'
|
||||
elif re.fullmatch(IOCExtractor.IPV6_PATTERN, ioc):
|
||||
return 'ipv6'
|
||||
elif re.fullmatch(IOCExtractor.EMAIL_PATTERN, ioc):
|
||||
return 'email'
|
||||
elif re.fullmatch(IOCExtractor.URL_PATTERN, ioc):
|
||||
return 'url'
|
||||
elif re.fullmatch(IOCExtractor.DOMAIN_PATTERN, ioc):
|
||||
return 'domain'
|
||||
else:
|
||||
return 'unknown'
|
||||
34
trace/models/extractors/tag_extractor.py
Normal file
34
trace/models/extractors/tag_extractor.py
Normal file
@@ -0,0 +1,34 @@
|
||||
"""Tag extraction logic for notes"""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
class TagExtractor:
|
||||
"""Extract hashtags from text content"""
|
||||
|
||||
TAG_PATTERN = r'#(\w+)'
|
||||
|
||||
@staticmethod
|
||||
def extract_tags(text: str) -> list[str]:
|
||||
"""
|
||||
Extract hashtags from content (case-insensitive, stored lowercase)
|
||||
|
||||
Args:
|
||||
text: The text to extract tags from
|
||||
|
||||
Returns:
|
||||
List of unique tags in lowercase, preserving order
|
||||
"""
|
||||
# Match hashtags: # followed by word characters
|
||||
matches = re.findall(TagExtractor.TAG_PATTERN, text)
|
||||
|
||||
# Convert to lowercase and remove duplicates while preserving order
|
||||
seen = set()
|
||||
tags = []
|
||||
for tag in matches:
|
||||
tag_lower = tag.lower()
|
||||
if tag_lower not in seen:
|
||||
seen.add(tag_lower)
|
||||
tags.append(tag_lower)
|
||||
|
||||
return tags
|
||||
270
trace/storage.py
270
trace/storage.py
@@ -1,268 +1,6 @@
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
from .models import Case, Evidence, Note
|
||||
"""Storage module - backward compatibility wrapper"""
|
||||
|
||||
DEFAULT_APP_DIR = Path.home() / ".trace"
|
||||
# For backward compatibility, export all classes from storage_impl
|
||||
from .storage_impl import Storage, StateManager, LockManager, create_demo_case
|
||||
|
||||
class Storage:
|
||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR):
|
||||
self.app_dir = app_dir
|
||||
self.data_file = self.app_dir / "data.json"
|
||||
self._ensure_app_dir()
|
||||
self.cases: List[Case] = self._load_data()
|
||||
|
||||
# Create demo case on first launch
|
||||
if not self.cases:
|
||||
self._create_demo_case()
|
||||
|
||||
def _ensure_app_dir(self):
|
||||
if not self.app_dir.exists():
|
||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _create_demo_case(self):
|
||||
"""Create a demo case with evidence showcasing all features"""
|
||||
demo_case = Case(
|
||||
case_number="DEMO-2024-001",
|
||||
name="Sample Investigation",
|
||||
investigator="Demo User"
|
||||
)
|
||||
|
||||
# Add case-level notes to demonstrate case notes feature
|
||||
case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident.
|
||||
|
||||
Key objectives:
|
||||
- Identify compromised systems
|
||||
- Determine scope of data loss
|
||||
- Document timeline of events
|
||||
|
||||
#incident-response #data-breach #investigation""")
|
||||
case_note1.calculate_hash()
|
||||
case_note1.extract_tags()
|
||||
case_note1.extract_iocs()
|
||||
demo_case.notes.append(case_note1)
|
||||
|
||||
# Wait a moment for different timestamp
|
||||
time.sleep(0.1)
|
||||
|
||||
case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com
|
||||
Initial analysis shows potential credential harvesting attempt.
|
||||
Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
||||
case_note2.calculate_hash()
|
||||
case_note2.extract_tags()
|
||||
case_note2.extract_iocs()
|
||||
demo_case.notes.append(case_note2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 1: Compromised laptop
|
||||
evidence1 = Evidence(
|
||||
name="Employee Laptop HDD",
|
||||
description="Primary workstation hard drive - user reported suspicious activity"
|
||||
)
|
||||
# Add source hash for chain of custody demonstration
|
||||
evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
|
||||
# Add notes to evidence 1 with various features
|
||||
note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager.
|
||||
Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||
|
||||
Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""")
|
||||
note1.calculate_hash()
|
||||
note1.extract_tags()
|
||||
note1.extract_iocs()
|
||||
evidence1.notes.append(note1)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note2 = Note(content="""Discovered suspicious connections to external IP addresses:
|
||||
- 192.168.1.100 (local gateway)
|
||||
- 203.0.113.45 (external, geolocation: Unknown)
|
||||
- 198.51.100.78 (command and control server suspected)
|
||||
|
||||
Browser history shows visits to malicious-site.com and data-exfil.net.
|
||||
#network-analysis #ioc #c2-server""")
|
||||
note2.calculate_hash()
|
||||
note2.extract_tags()
|
||||
note2.extract_iocs()
|
||||
evidence1.notes.append(note2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note3 = Note(content="""Malware identified in temp directory:
|
||||
File: evil.exe
|
||||
MD5: d41d8cd98f00b204e9800998ecf8427e
|
||||
SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709
|
||||
SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||
|
||||
Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""")
|
||||
note3.calculate_hash()
|
||||
note3.extract_tags()
|
||||
note3.extract_iocs()
|
||||
evidence1.notes.append(note3)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note4 = Note(content="""Timeline analysis reveals:
|
||||
- 2024-01-15 09:23:45 - Suspicious email received
|
||||
- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login
|
||||
- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site
|
||||
- 2024-01-15 09:30:15 - Lateral movement detected
|
||||
|
||||
User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""")
|
||||
note4.calculate_hash()
|
||||
note4.extract_tags()
|
||||
note4.extract_iocs()
|
||||
evidence1.notes.append(note4)
|
||||
|
||||
demo_case.evidence.append(evidence1)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 2: Network logs
|
||||
evidence2 = Evidence(
|
||||
name="Firewall Logs",
|
||||
description="Corporate firewall logs from incident timeframe"
|
||||
)
|
||||
evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2"
|
||||
|
||||
note5 = Note(content="""Log analysis shows outbound connections to suspicious domains:
|
||||
- attacker-c2.com on port 443 (encrypted channel)
|
||||
- data-upload.net on port 8080 (unencrypted)
|
||||
- exfil-server.org on port 22 (SSH tunnel)
|
||||
|
||||
Total data transferred: approximately 2.3 GB over 4 hours.
|
||||
#log-analysis #data-exfiltration #network-traffic""")
|
||||
note5.calculate_hash()
|
||||
note5.extract_tags()
|
||||
note5.extract_iocs()
|
||||
evidence2.notes.append(note5)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note6 = Note(content="""Contact information found in malware configuration:
|
||||
Email: attacker@malicious-domain.com
|
||||
Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6)
|
||||
|
||||
Cross-referencing with threat intelligence databases. #threat-intel #attribution""")
|
||||
note6.calculate_hash()
|
||||
note6.extract_tags()
|
||||
note6.extract_iocs()
|
||||
evidence2.notes.append(note6)
|
||||
|
||||
demo_case.evidence.append(evidence2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 3: Email forensics
|
||||
evidence3 = Evidence(
|
||||
name="Phishing Email",
|
||||
description="Original phishing email preserved in .eml format"
|
||||
)
|
||||
|
||||
note7 = Note(content="""Email headers analysis:
|
||||
From: sender@phishing-domain.com (spoofed)
|
||||
Reply-To: attacker@evil-mail-server.net
|
||||
X-Originating-IP: 198.51.100.99
|
||||
|
||||
Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif
|
||||
Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""")
|
||||
note7.calculate_hash()
|
||||
note7.extract_tags()
|
||||
note7.extract_iocs()
|
||||
evidence3.notes.append(note7)
|
||||
|
||||
demo_case.evidence.append(evidence3)
|
||||
|
||||
# Add the demo case to storage
|
||||
self.cases.append(demo_case)
|
||||
self.save_data()
|
||||
|
||||
def _load_data(self) -> List[Case]:
|
||||
if not self.data_file.exists():
|
||||
return []
|
||||
try:
|
||||
with open(self.data_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
return [Case.from_dict(c) for c in data]
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return []
|
||||
|
||||
def save_data(self):
|
||||
data = [c.to_dict() for c in self.cases]
|
||||
# Write to temp file then rename for atomic-ish write
|
||||
temp_file = self.data_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
temp_file.replace(self.data_file)
|
||||
|
||||
def add_case(self, case: Case):
|
||||
self.cases.append(case)
|
||||
self.save_data()
|
||||
|
||||
def get_case(self, case_id: str) -> Optional[Case]:
|
||||
# Case ID lookup
|
||||
for c in self.cases:
|
||||
if c.case_id == case_id:
|
||||
return c
|
||||
return None
|
||||
|
||||
def delete_case(self, case_id: str):
|
||||
self.cases = [c for c in self.cases if c.case_id != case_id]
|
||||
self.save_data()
|
||||
|
||||
def delete_evidence(self, case_id: str, evidence_id: str):
|
||||
case = self.get_case(case_id)
|
||||
if case:
|
||||
case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id]
|
||||
self.save_data()
|
||||
|
||||
def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]:
|
||||
for c in self.cases:
|
||||
for e in c.evidence:
|
||||
if e.evidence_id == evidence_id:
|
||||
return c, e
|
||||
return None, None
|
||||
|
||||
class StateManager:
|
||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR):
|
||||
self.app_dir = app_dir
|
||||
self.state_file = self.app_dir / "state"
|
||||
self.settings_file = self.app_dir / "settings.json"
|
||||
self._ensure_app_dir()
|
||||
|
||||
def _ensure_app_dir(self):
|
||||
if not self.app_dir.exists():
|
||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None):
|
||||
state = self.get_active()
|
||||
state["case_id"] = case_id
|
||||
state["evidence_id"] = evidence_id
|
||||
with open(self.state_file, 'w') as f:
|
||||
json.dump(state, f)
|
||||
|
||||
def get_active(self) -> dict:
|
||||
if not self.state_file.exists():
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
try:
|
||||
with open(self.state_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
|
||||
def get_settings(self) -> dict:
|
||||
if not self.settings_file.exists():
|
||||
return {"pgp_enabled": True}
|
||||
try:
|
||||
with open(self.settings_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"pgp_enabled": True}
|
||||
|
||||
def set_setting(self, key: str, value):
|
||||
settings = self.get_settings()
|
||||
settings[key] = value
|
||||
with open(self.settings_file, 'w') as f:
|
||||
json.dump(settings, f)
|
||||
__all__ = ['Storage', 'StateManager', 'LockManager', 'create_demo_case']
|
||||
|
||||
8
trace/storage_impl/__init__.py
Normal file
8
trace/storage_impl/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Storage implementation modules"""
|
||||
|
||||
from .lock_manager import LockManager
|
||||
from .state_manager import StateManager
|
||||
from .storage import Storage
|
||||
from .demo_data import create_demo_case
|
||||
|
||||
__all__ = ['LockManager', 'StateManager', 'Storage', 'create_demo_case']
|
||||
143
trace/storage_impl/demo_data.py
Normal file
143
trace/storage_impl/demo_data.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Demo case creation for first-time users"""
|
||||
|
||||
from ..models import Case, Evidence, Note
|
||||
|
||||
|
||||
def create_demo_case() -> Case:
|
||||
"""Create a demo case with evidence showcasing all features"""
|
||||
demo_case = Case(
|
||||
case_number="DEMO-2024-001",
|
||||
name="Sample Investigation",
|
||||
investigator="Demo User"
|
||||
)
|
||||
|
||||
# Add case-level notes to demonstrate case notes feature
|
||||
case_note1 = Note(content="""Initial case briefing: Suspected data exfiltration incident.
|
||||
|
||||
Key objectives:
|
||||
- Identify compromised systems
|
||||
- Determine scope of data loss
|
||||
- Document timeline of events
|
||||
|
||||
#incident-response #data-breach #investigation""")
|
||||
case_note1.calculate_hash()
|
||||
case_note1.extract_tags()
|
||||
case_note1.extract_iocs()
|
||||
demo_case.notes.append(case_note1)
|
||||
|
||||
case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com
|
||||
Initial analysis shows potential credential harvesting attempt.
|
||||
Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
||||
case_note2.calculate_hash()
|
||||
case_note2.extract_tags()
|
||||
case_note2.extract_iocs()
|
||||
demo_case.notes.append(case_note2)
|
||||
|
||||
# Create evidence 1: Compromised laptop
|
||||
evidence1 = Evidence(
|
||||
name="Employee Laptop HDD",
|
||||
description="Primary workstation hard drive - user reported suspicious activity"
|
||||
)
|
||||
# Add source hash for chain of custody demonstration
|
||||
evidence1.metadata["source_hash"] = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||
|
||||
# Add notes to evidence 1 with various features
|
||||
note1 = Note(content="""Forensic imaging completed. Drive imaged using FTK Imager.
|
||||
Image hash verified: SHA256 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||
|
||||
Chain of custody maintained throughout process. #forensics #imaging #chain-of-custody""")
|
||||
note1.calculate_hash()
|
||||
note1.extract_tags()
|
||||
note1.extract_iocs()
|
||||
evidence1.notes.append(note1)
|
||||
|
||||
note2 = Note(content="""Discovered suspicious connections to external IP addresses:
|
||||
- 192.168.1.100 (local gateway)
|
||||
- 203.0.113.45 (external, geolocation: Unknown)
|
||||
- 198.51.100.78 (command and control server suspected)
|
||||
|
||||
Browser history shows visits to malicious-site.com and data-exfil.net.
|
||||
#network-analysis #ioc #c2-server""")
|
||||
note2.calculate_hash()
|
||||
note2.extract_tags()
|
||||
note2.extract_iocs()
|
||||
evidence1.notes.append(note2)
|
||||
|
||||
note3 = Note(content="""Malware identified in temp directory:
|
||||
File: evil.exe
|
||||
MD5: d41d8cd98f00b204e9800998ecf8427e
|
||||
SHA1: da39a3ee5e6b4b0d3255bfef95601890afd80709
|
||||
SHA256: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
|
||||
|
||||
Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""")
|
||||
note3.calculate_hash()
|
||||
note3.extract_tags()
|
||||
note3.extract_iocs()
|
||||
evidence1.notes.append(note3)
|
||||
|
||||
note4 = Note(content="""Timeline analysis reveals:
|
||||
- 2024-01-15 09:23:45 - Suspicious email received
|
||||
- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login
|
||||
- 2024-01-15 09:25:03 - Credentials submitted to attacker-controlled site
|
||||
- 2024-01-15 09:30:15 - Lateral movement detected
|
||||
|
||||
User credentials compromised. Recommend immediate password reset. #timeline #lateral-movement""")
|
||||
note4.calculate_hash()
|
||||
note4.extract_tags()
|
||||
note4.extract_iocs()
|
||||
evidence1.notes.append(note4)
|
||||
|
||||
demo_case.evidence.append(evidence1)
|
||||
|
||||
# Create evidence 2: Network logs
|
||||
evidence2 = Evidence(
|
||||
name="Firewall Logs",
|
||||
description="Corporate firewall logs from incident timeframe"
|
||||
)
|
||||
evidence2.metadata["source_hash"] = "a3f5c8b912e4d67f89b0c1a2e3d4f5a6b7c8d9e0f1a2b3c4d5e6f7a8b9c0d1e2"
|
||||
|
||||
note5 = Note(content="""Log analysis shows outbound connections to suspicious domains:
|
||||
- attacker-c2.com on port 443 (encrypted channel)
|
||||
- data-upload.net on port 8080 (unencrypted)
|
||||
- exfil-server.org on port 22 (SSH tunnel)
|
||||
|
||||
Total data transferred: approximately 2.3 GB over 4 hours.
|
||||
#log-analysis #data-exfiltration #network-traffic""")
|
||||
note5.calculate_hash()
|
||||
note5.extract_tags()
|
||||
note5.extract_iocs()
|
||||
evidence2.notes.append(note5)
|
||||
|
||||
note6 = Note(content="""Contact information found in malware configuration:
|
||||
Email: attacker@malicious-domain.com
|
||||
Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6)
|
||||
|
||||
Cross-referencing with threat intelligence databases. #threat-intel #attribution""")
|
||||
note6.calculate_hash()
|
||||
note6.extract_tags()
|
||||
note6.extract_iocs()
|
||||
evidence2.notes.append(note6)
|
||||
|
||||
demo_case.evidence.append(evidence2)
|
||||
|
||||
# Create evidence 3: Email forensics
|
||||
evidence3 = Evidence(
|
||||
name="Phishing Email",
|
||||
description="Original phishing email preserved in .eml format"
|
||||
)
|
||||
|
||||
note7 = Note(content="""Email headers analysis:
|
||||
From: sender@phishing-domain.com (spoofed)
|
||||
Reply-To: attacker@evil-mail-server.net
|
||||
X-Originating-IP: 198.51.100.99
|
||||
|
||||
Email contains embedded tracking pixel at http://tracking.malicious-site.com/pixel.gif
|
||||
Attachment: invoice.pdf.exe (double extension trick) #email-forensics #phishing-analysis""")
|
||||
note7.calculate_hash()
|
||||
note7.extract_tags()
|
||||
note7.extract_iocs()
|
||||
evidence3.notes.append(note7)
|
||||
|
||||
demo_case.evidence.append(evidence3)
|
||||
|
||||
return demo_case
|
||||
87
trace/storage_impl/lock_manager.py
Normal file
87
trace/storage_impl/lock_manager.py
Normal file
@@ -0,0 +1,87 @@
|
||||
"""File lock manager for preventing concurrent access"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class LockManager:
|
||||
"""Cross-platform file lock manager to prevent concurrent access"""
|
||||
|
||||
def __init__(self, lock_file: Path):
|
||||
self.lock_file = lock_file
|
||||
self.acquired = False
|
||||
|
||||
def acquire(self, timeout: int = 5):
|
||||
"""Acquire lock with timeout. Returns True if successful."""
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < timeout:
|
||||
try:
|
||||
# Try to create lock file exclusively (fails if exists)
|
||||
# Use 'x' mode which fails if file exists (atomic on most systems)
|
||||
fd = os.open(str(self.lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
||||
os.write(fd, str(os.getpid()).encode())
|
||||
os.close(fd)
|
||||
self.acquired = True
|
||||
return True
|
||||
except FileExistsError:
|
||||
# Lock file exists, check if process is still alive
|
||||
if self._is_stale_lock():
|
||||
# Remove stale lock and retry
|
||||
try:
|
||||
self.lock_file.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
continue
|
||||
# Active lock, wait a bit
|
||||
time.sleep(0.1)
|
||||
except Exception:
|
||||
# Other errors, wait and retry
|
||||
time.sleep(0.1)
|
||||
return False
|
||||
|
||||
def _is_stale_lock(self):
|
||||
"""Check if lock file is stale (process no longer exists)"""
|
||||
try:
|
||||
if not self.lock_file.exists():
|
||||
return False
|
||||
with open(self.lock_file, 'r') as f:
|
||||
pid = int(f.read().strip())
|
||||
|
||||
# Check if process exists (cross-platform)
|
||||
if sys.platform == 'win32':
|
||||
import ctypes
|
||||
kernel32 = ctypes.windll.kernel32
|
||||
PROCESS_QUERY_INFORMATION = 0x0400
|
||||
handle = kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid)
|
||||
if handle:
|
||||
kernel32.CloseHandle(handle)
|
||||
return False
|
||||
return True
|
||||
else:
|
||||
# Unix/Linux - send signal 0 to check if process exists
|
||||
try:
|
||||
os.kill(pid, 0)
|
||||
return False # Process exists
|
||||
except OSError:
|
||||
return True # Process doesn't exist
|
||||
except (ValueError, FileNotFoundError, PermissionError):
|
||||
return True
|
||||
|
||||
def release(self):
|
||||
"""Release the lock"""
|
||||
if self.acquired:
|
||||
try:
|
||||
self.lock_file.unlink()
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
self.acquired = False
|
||||
|
||||
def __enter__(self):
|
||||
if not self.acquire():
|
||||
raise RuntimeError("Could not acquire lock: another instance is running")
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
92
trace/storage_impl/state_manager.py
Normal file
92
trace/storage_impl/state_manager.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""State manager for active context and settings"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .storage import Storage
|
||||
|
||||
DEFAULT_APP_DIR = Path.home() / ".trace"
|
||||
|
||||
|
||||
class StateManager:
|
||||
"""Manages active context and user settings"""
|
||||
|
||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR):
|
||||
self.app_dir = app_dir
|
||||
self.state_file = self.app_dir / "state"
|
||||
self.settings_file = self.app_dir / "settings.json"
|
||||
self._ensure_app_dir()
|
||||
|
||||
def _ensure_app_dir(self):
|
||||
if not self.app_dir.exists():
|
||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def set_active(self, case_id: Optional[str] = None, evidence_id: Optional[str] = None):
|
||||
state = self.get_active()
|
||||
state["case_id"] = case_id
|
||||
state["evidence_id"] = evidence_id
|
||||
# Atomic write: write to temp file then rename
|
||||
temp_file = self.state_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(state, f, ensure_ascii=False)
|
||||
temp_file.replace(self.state_file)
|
||||
|
||||
def get_active(self) -> dict:
|
||||
if not self.state_file.exists():
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
try:
|
||||
with open(self.state_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
|
||||
def validate_and_clear_stale(self, storage: 'Storage') -> str:
|
||||
"""Validate active state against storage and clear stale references.
|
||||
Returns warning message if state was cleared, empty string otherwise."""
|
||||
state = self.get_active()
|
||||
case_id = state.get("case_id")
|
||||
evidence_id = state.get("evidence_id")
|
||||
warning = ""
|
||||
|
||||
if case_id:
|
||||
case = storage.get_case(case_id)
|
||||
if not case:
|
||||
warning = f"Active case (ID: {case_id[:8]}...) no longer exists. Clearing active context."
|
||||
self.set_active(None, None)
|
||||
return warning
|
||||
|
||||
# Validate evidence if set
|
||||
if evidence_id:
|
||||
_, evidence = storage.find_evidence(evidence_id)
|
||||
if not evidence:
|
||||
warning = f"Active evidence (ID: {evidence_id[:8]}...) no longer exists. Clearing to case level."
|
||||
self.set_active(case_id, None)
|
||||
return warning
|
||||
|
||||
elif evidence_id:
|
||||
# Evidence set but no case - invalid state
|
||||
warning = "Invalid state: evidence set without case. Clearing active context."
|
||||
self.set_active(None, None)
|
||||
return warning
|
||||
|
||||
return warning
|
||||
|
||||
def get_settings(self) -> dict:
|
||||
if not self.settings_file.exists():
|
||||
return {"pgp_enabled": True}
|
||||
try:
|
||||
with open(self.settings_file, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"pgp_enabled": True}
|
||||
|
||||
def set_setting(self, key: str, value):
|
||||
settings = self.get_settings()
|
||||
settings[key] = value
|
||||
# Atomic write: write to temp file then rename
|
||||
temp_file = self.settings_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(settings, f, ensure_ascii=False)
|
||||
temp_file.replace(self.settings_file)
|
||||
112
trace/storage_impl/storage.py
Normal file
112
trace/storage_impl/storage.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Main storage class for persisting cases, evidence, and notes"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from ..models import Case, Evidence
|
||||
from .lock_manager import LockManager
|
||||
from .demo_data import create_demo_case
|
||||
|
||||
DEFAULT_APP_DIR = Path.home() / ".trace"
|
||||
|
||||
|
||||
class Storage:
|
||||
"""Manages persistence of all forensic data"""
|
||||
|
||||
def __init__(self, app_dir: Path = DEFAULT_APP_DIR, acquire_lock: bool = True):
|
||||
self.app_dir = app_dir
|
||||
self.data_file = self.app_dir / "data.json"
|
||||
self.lock_file = self.app_dir / "app.lock"
|
||||
self.lock_manager = None
|
||||
self._ensure_app_dir()
|
||||
|
||||
# Acquire lock to prevent concurrent access
|
||||
if acquire_lock:
|
||||
self.lock_manager = LockManager(self.lock_file)
|
||||
if not self.lock_manager.acquire(timeout=5):
|
||||
raise RuntimeError("Another instance of trace is already running. Please close it first.")
|
||||
|
||||
self.cases: List[Case] = self._load_data()
|
||||
|
||||
# Create demo case on first launch (only if data loaded successfully and is empty)
|
||||
if not self.cases and self.data_file.exists():
|
||||
# File exists but is empty - could be first run after successful load
|
||||
pass
|
||||
elif not self.cases and not self.data_file.exists():
|
||||
# No file exists - first run
|
||||
demo_case = create_demo_case()
|
||||
self.cases.append(demo_case)
|
||||
self.save_data()
|
||||
|
||||
def __del__(self):
|
||||
"""Release lock when Storage object is destroyed"""
|
||||
if self.lock_manager:
|
||||
self.lock_manager.release()
|
||||
|
||||
def _ensure_app_dir(self):
|
||||
if not self.app_dir.exists():
|
||||
self.app_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _load_data(self) -> List[Case]:
|
||||
if not self.data_file.exists():
|
||||
return []
|
||||
try:
|
||||
with open(self.data_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
return [Case.from_dict(c) for c in data]
|
||||
except (json.JSONDecodeError, IOError, KeyError, ValueError) as e:
|
||||
# Corrupted JSON - create backup and raise exception
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_file = self.app_dir / f"data.json.corrupted.{timestamp}"
|
||||
try:
|
||||
shutil.copy2(self.data_file, backup_file)
|
||||
except Exception:
|
||||
pass
|
||||
# Raise exception with information about backup
|
||||
raise RuntimeError(f"Data file is corrupted. Backup saved to: {backup_file}\nError: {e}")
|
||||
|
||||
def start_fresh(self):
|
||||
"""Start with fresh data (for corrupted JSON recovery)"""
|
||||
self.cases = []
|
||||
demo_case = create_demo_case()
|
||||
self.cases.append(demo_case)
|
||||
self.save_data()
|
||||
|
||||
def save_data(self):
|
||||
data = [c.to_dict() for c in self.cases]
|
||||
# Write to temp file then rename for atomic-ish write
|
||||
temp_file = self.data_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
temp_file.replace(self.data_file)
|
||||
|
||||
def add_case(self, case: Case):
|
||||
self.cases.append(case)
|
||||
self.save_data()
|
||||
|
||||
def get_case(self, case_id: str) -> Optional[Case]:
|
||||
# Case ID lookup
|
||||
for c in self.cases:
|
||||
if c.case_id == case_id:
|
||||
return c
|
||||
return None
|
||||
|
||||
def delete_case(self, case_id: str):
|
||||
self.cases = [c for c in self.cases if c.case_id != case_id]
|
||||
self.save_data()
|
||||
|
||||
def delete_evidence(self, case_id: str, evidence_id: str):
|
||||
case = self.get_case(case_id)
|
||||
if case:
|
||||
case.evidence = [e for e in case.evidence if e.evidence_id != evidence_id]
|
||||
self.save_data()
|
||||
|
||||
def find_evidence(self, evidence_id: str) -> Tuple[Optional[Case], Optional[Evidence]]:
|
||||
for c in self.cases:
|
||||
for e in c.evidence:
|
||||
if e.evidence_id == evidence_id:
|
||||
return c, e
|
||||
return None, None
|
||||
@@ -21,7 +21,8 @@ class TestModels(unittest.TestCase):
|
||||
class TestStorage(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.test_dir = Path(tempfile.mkdtemp())
|
||||
self.storage = Storage(app_dir=self.test_dir)
|
||||
# Disable lock for tests to allow multiple Storage instances
|
||||
self.storage = Storage(app_dir=self.test_dir, acquire_lock=False)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.test_dir)
|
||||
@@ -31,7 +32,7 @@ class TestStorage(unittest.TestCase):
|
||||
self.storage.add_case(case)
|
||||
|
||||
# Reload storage from same dir
|
||||
new_storage = Storage(app_dir=self.test_dir)
|
||||
new_storage = Storage(app_dir=self.test_dir, acquire_lock=False)
|
||||
loaded_case = new_storage.get_case(case.case_id)
|
||||
|
||||
self.assertIsNotNone(loaded_case)
|
||||
|
||||
7
trace/tui/__init__.py
Normal file
7
trace/tui/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""TUI (Text User Interface) package for trace application"""
|
||||
|
||||
# Import from the main tui_app module for backward compatibility
|
||||
# The tui_app.py file contains the main TUI class and run_tui function
|
||||
from ..tui_app import run_tui, TUI
|
||||
|
||||
__all__ = ['run_tui', 'TUI']
|
||||
5
trace/tui/handlers/__init__.py
Normal file
5
trace/tui/handlers/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""TUI handlers for various operations"""
|
||||
|
||||
from .export_handler import ExportHandler
|
||||
|
||||
__all__ = ['ExportHandler']
|
||||
246
trace/tui/handlers/export_handler.py
Normal file
246
trace/tui/handlers/export_handler.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""Export functionality for TUI"""
|
||||
|
||||
import time
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional
|
||||
|
||||
from ...models import Note, Case, Evidence
|
||||
|
||||
|
||||
class ExportHandler:
|
||||
"""Handles exporting IOCs and notes to files"""
|
||||
|
||||
@staticmethod
|
||||
def export_iocs_to_file(
|
||||
iocs_with_counts: List[Tuple[str, int, str]],
|
||||
active_case: Optional[Case],
|
||||
active_evidence: Optional[Evidence],
|
||||
get_iocs_func=None
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Export IOCs to a text file
|
||||
|
||||
Args:
|
||||
iocs_with_counts: List of (ioc, count, type) tuples
|
||||
active_case: Active case context
|
||||
active_evidence: Active evidence context
|
||||
get_iocs_func: Function to get IOCs for a list of notes
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
if not iocs_with_counts:
|
||||
return False, "No IOCs to export."
|
||||
|
||||
# Determine context for filename
|
||||
if active_evidence:
|
||||
context_name = f"{active_case.case_number}_{active_evidence.name}" if active_case else active_evidence.name
|
||||
elif active_case:
|
||||
context_name = active_case.case_number
|
||||
else:
|
||||
context_name = "unknown"
|
||||
|
||||
# Clean filename
|
||||
context_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in context_name)
|
||||
|
||||
# Create exports directory if it doesn't exist
|
||||
export_dir = Path.home() / ".trace" / "exports"
|
||||
export_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate filename with timestamp
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"iocs_{context_name}_{timestamp}.txt"
|
||||
filepath = export_dir / filename
|
||||
|
||||
# Build export content
|
||||
lines = []
|
||||
lines.append(f"# IOC Export - {context_name}")
|
||||
lines.append(f"# Generated: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
lines.append("")
|
||||
|
||||
if active_evidence:
|
||||
# Evidence context - only evidence IOCs
|
||||
lines.append(f"## Evidence: {active_evidence.name}")
|
||||
lines.append("")
|
||||
for ioc, count, ioc_type in iocs_with_counts:
|
||||
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||
elif active_case and get_iocs_func:
|
||||
# Case context - show case IOCs + evidence IOCs with separators
|
||||
# Get case notes IOCs
|
||||
case_iocs = get_iocs_func(active_case.notes)
|
||||
if case_iocs:
|
||||
lines.append("## Case Notes")
|
||||
lines.append("")
|
||||
for ioc, count, ioc_type in case_iocs:
|
||||
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||
lines.append("")
|
||||
|
||||
# Get IOCs from each evidence
|
||||
for ev in active_case.evidence:
|
||||
ev_iocs = get_iocs_func(ev.notes)
|
||||
if ev_iocs:
|
||||
lines.append(f"## Evidence: {ev.name}")
|
||||
lines.append("")
|
||||
for ioc, count, ioc_type in ev_iocs:
|
||||
lines.append(f"{ioc}\t[{ioc_type}]\t({count} occurrences)")
|
||||
lines.append("")
|
||||
|
||||
# Write to file
|
||||
try:
|
||||
with open(filepath, 'w', encoding='utf-8') as f:
|
||||
f.write('\n'.join(lines))
|
||||
return True, f"IOCs exported to: {filepath}"
|
||||
except Exception as e:
|
||||
return False, f"Export failed: {str(e)}"
|
||||
|
||||
@staticmethod
|
||||
def export_case_to_markdown(case: Case) -> Tuple[bool, str]:
|
||||
"""
|
||||
Export case (and all its evidence) to markdown
|
||||
|
||||
Args:
|
||||
case: The case to export
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
# Create exports directory if it doesn't exist
|
||||
export_dir = Path.home() / ".trace" / "exports"
|
||||
export_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate filename
|
||||
case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number)
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"case_{case_name}_{timestamp}.md"
|
||||
filepath = export_dir / filename
|
||||
|
||||
try:
|
||||
with open(filepath, 'w', encoding='utf-8') as f:
|
||||
f.write("# Forensic Notes Export\n\n")
|
||||
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||
|
||||
# Write case info
|
||||
f.write(f"## Case: {case.case_number}\n")
|
||||
if case.name:
|
||||
f.write(f"**Name:** {case.name}\n")
|
||||
if case.investigator:
|
||||
f.write(f"**Investigator:** {case.investigator}\n")
|
||||
f.write(f"**Case ID:** {case.case_id}\n\n")
|
||||
|
||||
# Case notes
|
||||
f.write("### Case Notes\n")
|
||||
if not case.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in case.notes:
|
||||
ExportHandler._write_note_markdown(f, note)
|
||||
|
||||
# Evidence
|
||||
f.write("\n### Evidence\n")
|
||||
if not case.evidence:
|
||||
f.write("_No evidence._\n")
|
||||
|
||||
for ev in case.evidence:
|
||||
f.write(f"#### Evidence: {ev.name}\n")
|
||||
if ev.description:
|
||||
f.write(f"_{ev.description}_\n")
|
||||
f.write(f"**ID:** {ev.evidence_id}\n")
|
||||
|
||||
# Include source hash if available
|
||||
source_hash = ev.metadata.get("source_hash")
|
||||
if source_hash:
|
||||
f.write(f"**Source Hash:** `{source_hash}`\n")
|
||||
f.write("\n")
|
||||
|
||||
f.write("##### Evidence Notes\n")
|
||||
if not ev.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in ev.notes:
|
||||
ExportHandler._write_note_markdown(f, note)
|
||||
f.write("\n")
|
||||
|
||||
return True, f"Case exported to: {filepath}"
|
||||
except Exception as e:
|
||||
return False, f"Export failed: {str(e)}"
|
||||
|
||||
@staticmethod
|
||||
def export_evidence_to_markdown(
|
||||
evidence: Evidence,
|
||||
case: Optional[Case]
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
Export evidence to markdown
|
||||
|
||||
Args:
|
||||
evidence: The evidence to export
|
||||
case: The parent case (for context)
|
||||
|
||||
Returns:
|
||||
Tuple of (success: bool, message: str)
|
||||
"""
|
||||
# Create exports directory if it doesn't exist
|
||||
export_dir = Path.home() / ".trace" / "exports"
|
||||
export_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Generate filename
|
||||
case_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in case.case_number) if case else "unknown"
|
||||
ev_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in evidence.name)
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
filename = f"evidence_{case_name}_{ev_name}_{timestamp}.md"
|
||||
filepath = export_dir / filename
|
||||
|
||||
try:
|
||||
with open(filepath, 'w', encoding='utf-8') as f:
|
||||
f.write("# Forensic Evidence Export\n\n")
|
||||
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||
|
||||
# Case context
|
||||
if case:
|
||||
f.write(f"**Case:** {case.case_number}\n")
|
||||
if case.name:
|
||||
f.write(f"**Case Name:** {case.name}\n")
|
||||
f.write("\n")
|
||||
|
||||
# Evidence info
|
||||
f.write(f"## Evidence: {evidence.name}\n")
|
||||
if evidence.description:
|
||||
f.write(f"**Description:** {evidence.description}\n")
|
||||
if evidence.metadata.get("source_hash"):
|
||||
f.write(f"**Source Hash:** `{evidence.metadata['source_hash']}`\n")
|
||||
f.write(f"**Evidence ID:** {evidence.evidence_id}\n\n")
|
||||
|
||||
# Notes
|
||||
f.write("### Notes\n")
|
||||
if not evidence.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in evidence.notes:
|
||||
ExportHandler._write_note_markdown(f, note)
|
||||
|
||||
return True, f"Evidence exported to: {filepath}"
|
||||
except Exception as e:
|
||||
return False, f"Export failed: {str(e)}"
|
||||
|
||||
@staticmethod
|
||||
def _write_note_markdown(f, note: Note):
|
||||
"""Helper to write a note in markdown format
|
||||
|
||||
Includes Unix timestamp for hash reproducibility - anyone can recompute the hash
|
||||
using the formula: SHA256("{unix_timestamp}:{content}")
|
||||
"""
|
||||
f.write(f"- **{time.ctime(note.timestamp)}**\n")
|
||||
f.write(f" - Unix Timestamp: `{note.timestamp}` (for hash verification)\n")
|
||||
f.write(f" - Content:\n")
|
||||
# Properly indent multi-line content
|
||||
for line in note.content.splitlines():
|
||||
f.write(f" {line}\n")
|
||||
if note.tags:
|
||||
tags_str = " ".join([f"#{tag}" for tag in note.tags])
|
||||
f.write(f" - Tags: {tags_str}\n")
|
||||
f.write(f" - SHA256 Hash (timestamp:content): `{note.content_hash}`\n")
|
||||
if note.signature:
|
||||
f.write(" - **GPG Signature of Hash:**\n")
|
||||
f.write(" ```\n")
|
||||
for line in note.signature.splitlines():
|
||||
f.write(f" {line}\n")
|
||||
f.write(" ```\n")
|
||||
f.write("\n")
|
||||
6
trace/tui/rendering/__init__.py
Normal file
6
trace/tui/rendering/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Rendering utilities for TUI"""
|
||||
|
||||
from .colors import init_colors, ColorPairs
|
||||
from .text_renderer import TextRenderer
|
||||
|
||||
__all__ = ['init_colors', 'ColorPairs', 'TextRenderer']
|
||||
43
trace/tui/rendering/colors.py
Normal file
43
trace/tui/rendering/colors.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Color pair initialization and constants for TUI"""
|
||||
|
||||
import curses
|
||||
|
||||
|
||||
class ColorPairs:
|
||||
"""Color pair constants"""
|
||||
SELECTION = 1 # Black on cyan
|
||||
SUCCESS = 2 # Green on black
|
||||
WARNING = 3 # Yellow on black
|
||||
ERROR = 4 # Red on black
|
||||
HEADER = 5 # Cyan on black
|
||||
METADATA = 6 # White on black
|
||||
BORDER = 7 # Blue on black
|
||||
TAG = 8 # Magenta on black
|
||||
IOC_SELECTED = 9 # Red on cyan
|
||||
TAG_SELECTED = 10 # Yellow on cyan
|
||||
|
||||
|
||||
def init_colors():
|
||||
"""Initialize color pairs for the TUI"""
|
||||
curses.start_color()
|
||||
if curses.has_colors():
|
||||
# Selection / Highlight
|
||||
curses.init_pair(ColorPairs.SELECTION, curses.COLOR_BLACK, curses.COLOR_CYAN)
|
||||
# Success / Active indicators
|
||||
curses.init_pair(ColorPairs.SUCCESS, curses.COLOR_GREEN, curses.COLOR_BLACK)
|
||||
# Info / Warnings
|
||||
curses.init_pair(ColorPairs.WARNING, curses.COLOR_YELLOW, curses.COLOR_BLACK)
|
||||
# Errors / Critical / IOCs
|
||||
curses.init_pair(ColorPairs.ERROR, curses.COLOR_RED, curses.COLOR_BLACK)
|
||||
# Headers / Titles (bright cyan)
|
||||
curses.init_pair(ColorPairs.HEADER, curses.COLOR_CYAN, curses.COLOR_BLACK)
|
||||
# Metadata / Secondary text (dim)
|
||||
curses.init_pair(ColorPairs.METADATA, curses.COLOR_WHITE, curses.COLOR_BLACK)
|
||||
# Borders / Separators (blue)
|
||||
curses.init_pair(ColorPairs.BORDER, curses.COLOR_BLUE, curses.COLOR_BLACK)
|
||||
# Tags (magenta)
|
||||
curses.init_pair(ColorPairs.TAG, curses.COLOR_MAGENTA, curses.COLOR_BLACK)
|
||||
# IOCs on selected background (red on cyan)
|
||||
curses.init_pair(ColorPairs.IOC_SELECTED, curses.COLOR_RED, curses.COLOR_CYAN)
|
||||
# Tags on selected background (magenta on cyan)
|
||||
curses.init_pair(ColorPairs.TAG_SELECTED, curses.COLOR_MAGENTA, curses.COLOR_CYAN)
|
||||
137
trace/tui/rendering/text_renderer.py
Normal file
137
trace/tui/rendering/text_renderer.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""Text rendering utilities with highlighting support"""
|
||||
|
||||
import curses
|
||||
import re
|
||||
from ...models import Note
|
||||
from .colors import ColorPairs
|
||||
|
||||
|
||||
class TextRenderer:
|
||||
"""Utility class for rendering text with highlights"""
|
||||
|
||||
@staticmethod
|
||||
def safe_truncate(text, max_width, ellipsis="..."):
|
||||
"""
|
||||
Safely truncate text to fit within max_width, handling Unicode characters.
|
||||
Uses a conservative approach to avoid curses display errors.
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
|
||||
# Try to fit the text as-is
|
||||
if len(text) <= max_width:
|
||||
return text
|
||||
|
||||
# Need to truncate - account for ellipsis
|
||||
if max_width <= len(ellipsis):
|
||||
return ellipsis[:max_width]
|
||||
|
||||
# Truncate conservatively (character by character) to handle multi-byte UTF-8
|
||||
target_len = max_width - len(ellipsis)
|
||||
truncated = text[:target_len]
|
||||
|
||||
# Encode and check actual byte length to be safe with UTF-8
|
||||
# If it's too long, trim further
|
||||
while len(truncated) > 0:
|
||||
try:
|
||||
# Test if this will fit when displayed
|
||||
test_str = truncated + ellipsis
|
||||
if len(test_str) <= max_width:
|
||||
return test_str
|
||||
except:
|
||||
pass
|
||||
# Trim one more character
|
||||
truncated = truncated[:-1]
|
||||
|
||||
return ellipsis[:max_width]
|
||||
|
||||
@staticmethod
|
||||
def display_line_with_highlights(screen, y, x_start, line, is_selected=False):
|
||||
"""
|
||||
Display a line with intelligent highlighting.
|
||||
- IOCs are highlighted with ColorPairs.ERROR (red)
|
||||
- Tags are highlighted with ColorPairs.WARNING (yellow)
|
||||
- Selection background is ColorPairs.SELECTION (cyan) for non-IOC text
|
||||
- IOC highlighting takes priority over selection
|
||||
"""
|
||||
# Extract IOCs and tags
|
||||
highlights = []
|
||||
|
||||
# Get IOCs with positions
|
||||
for text, start, end, ioc_type in Note.extract_iocs_with_positions(line):
|
||||
highlights.append((text, start, end, 'ioc'))
|
||||
|
||||
# Get tags
|
||||
for match in re.finditer(r'#\w+', line):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'tag'))
|
||||
|
||||
# Sort by position and remove overlaps (IOCs take priority over tags)
|
||||
highlights.sort(key=lambda x: x[1])
|
||||
deduplicated = []
|
||||
last_end = -1
|
||||
for text, start, end, htype in highlights:
|
||||
if start >= last_end:
|
||||
deduplicated.append((text, start, end, htype))
|
||||
last_end = end
|
||||
highlights = deduplicated
|
||||
|
||||
if not highlights:
|
||||
# No highlights - use selection color if selected
|
||||
if is_selected:
|
||||
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||
screen.addstr(y, x_start, line)
|
||||
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||
else:
|
||||
screen.addstr(y, x_start, line)
|
||||
return
|
||||
|
||||
# Display with intelligent highlighting
|
||||
x_pos = x_start
|
||||
last_pos = 0
|
||||
|
||||
for text, start, end, htype in highlights:
|
||||
# Add text before this highlight
|
||||
if start > last_pos:
|
||||
text_before = line[last_pos:start]
|
||||
if is_selected:
|
||||
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||
screen.addstr(y, x_pos, text_before)
|
||||
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||
else:
|
||||
screen.addstr(y, x_pos, text_before)
|
||||
x_pos += len(text_before)
|
||||
|
||||
# Add highlighted text
|
||||
if htype == 'ioc':
|
||||
# IOC highlighting: red on cyan if selected, red on black otherwise
|
||||
if is_selected:
|
||||
screen.attron(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD)
|
||||
screen.addstr(y, x_pos, text)
|
||||
screen.attroff(curses.color_pair(ColorPairs.IOC_SELECTED) | curses.A_BOLD)
|
||||
else:
|
||||
screen.attron(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD)
|
||||
screen.addstr(y, x_pos, text)
|
||||
screen.attroff(curses.color_pair(ColorPairs.ERROR) | curses.A_BOLD)
|
||||
else: # tag
|
||||
# Tag highlighting: magenta on cyan if selected, magenta on black otherwise
|
||||
if is_selected:
|
||||
screen.attron(curses.color_pair(ColorPairs.TAG_SELECTED))
|
||||
screen.addstr(y, x_pos, text)
|
||||
screen.attroff(curses.color_pair(ColorPairs.TAG_SELECTED))
|
||||
else:
|
||||
screen.attron(curses.color_pair(ColorPairs.TAG))
|
||||
screen.addstr(y, x_pos, text)
|
||||
screen.attroff(curses.color_pair(ColorPairs.TAG))
|
||||
|
||||
x_pos += len(text)
|
||||
last_pos = end
|
||||
|
||||
# Add remaining text
|
||||
if last_pos < len(line):
|
||||
text_after = line[last_pos:]
|
||||
if is_selected:
|
||||
screen.attron(curses.color_pair(ColorPairs.SELECTION))
|
||||
screen.addstr(y, x_pos, text_after)
|
||||
screen.attroff(curses.color_pair(ColorPairs.SELECTION))
|
||||
else:
|
||||
screen.addstr(y, x_pos, text_after)
|
||||
88
trace/tui/visual_constants.py
Normal file
88
trace/tui/visual_constants.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""Visual constants for consistent TUI layout and styling"""
|
||||
|
||||
|
||||
class Layout:
|
||||
"""Screen layout constants"""
|
||||
HEADER_Y = 0
|
||||
HEADER_X = 2
|
||||
CONTENT_START_Y = 2
|
||||
CONTENT_INDENT = 4
|
||||
FOOTER_OFFSET_FROM_BOTTOM = 3
|
||||
BORDER_OFFSET_FROM_BOTTOM = 2
|
||||
STATUS_LINE_OFFSET_FROM_BOTTOM = 1 # height - 1 for status bar
|
||||
NOTE_DETAIL_BOTTOM_RESERVE = 6 # height - 6 for note detail view
|
||||
|
||||
|
||||
class Spacing:
|
||||
"""Spacing and padding constants"""
|
||||
SECTION_VERTICAL_GAP = 2
|
||||
ITEM_VERTICAL_GAP = 1
|
||||
DIALOG_MARGIN = 4
|
||||
HORIZONTAL_PADDING = 6 # width - 6 for truncation
|
||||
HASH_DISPLAY_PADDING = 20 # width - 20
|
||||
HASH_SHORT_PADDING = 12 # width - 12 for shorter hash displays
|
||||
EMPTY_STATE_PADDING = 8 # width - 8 for empty state boxes
|
||||
STATUS_BAR_PADDING = 2 # width - 2 for status bar
|
||||
|
||||
|
||||
class ColumnWidths:
|
||||
"""Column widths for list displays - can be percentage-based"""
|
||||
TAG_COLUMN_MIN = 30
|
||||
IOC_COLUMN_MIN = 50
|
||||
CONTENT_PREVIEW_MIN = 50
|
||||
NOTE_PREVIEW_MIN = 60
|
||||
|
||||
@staticmethod
|
||||
def get_tag_width(terminal_width):
|
||||
"""Get responsive tag column width (40% of terminal or min 30)"""
|
||||
return max(ColumnWidths.TAG_COLUMN_MIN, int(terminal_width * 0.4))
|
||||
|
||||
@staticmethod
|
||||
def get_ioc_width(terminal_width):
|
||||
"""Get responsive IOC column width (50% of terminal or min 50)"""
|
||||
return max(ColumnWidths.IOC_COLUMN_MIN, int(terminal_width * 0.5))
|
||||
|
||||
@staticmethod
|
||||
def get_content_preview_width(terminal_width):
|
||||
"""Get responsive content preview width (50% of terminal or min 50)"""
|
||||
return max(ColumnWidths.CONTENT_PREVIEW_MIN, int(terminal_width * 0.5))
|
||||
|
||||
|
||||
class DialogSize:
|
||||
"""Standard dialog dimensions (width, height)"""
|
||||
SMALL = (40, 8) # Confirm dialogs
|
||||
MEDIUM = (60, 15) # Settings, single input
|
||||
LARGE = (70, 20) # Multiline, help
|
||||
|
||||
|
||||
class Icons:
|
||||
"""Unicode symbols used throughout UI"""
|
||||
ACTIVE = "●"
|
||||
INACTIVE = "○"
|
||||
DIAMOND = "◆"
|
||||
SQUARE = "■"
|
||||
SMALL_SQUARE = "▪"
|
||||
ARROW_RIGHT = "▸"
|
||||
WARNING = "⚠"
|
||||
HASH = "⌗"
|
||||
FILTER = "◈"
|
||||
VERIFIED = "✓"
|
||||
FAILED = "✗"
|
||||
UNSIGNED = "?"
|
||||
SEPARATOR_H = "─"
|
||||
SEPARATOR_V = "│"
|
||||
SEPARATOR_GROUP = "│" # For grouping footer commands
|
||||
BOX_TL = "┌"
|
||||
BOX_BL = "└"
|
||||
# Box drawing for improved empty states
|
||||
BOX_DOUBLE_TL = "╔"
|
||||
BOX_DOUBLE_TR = "╗"
|
||||
BOX_DOUBLE_BL = "╚"
|
||||
BOX_DOUBLE_BR = "╝"
|
||||
BOX_DOUBLE_H = "═"
|
||||
BOX_DOUBLE_V = "║"
|
||||
|
||||
|
||||
class Timing:
|
||||
"""Timing constants"""
|
||||
FLASH_MESSAGE_DURATION = 3 # seconds
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user