mirror of
https://github.com/overcuriousity/trace.git
synced 2025-12-20 13:02:21 +00:00
Merge pull request #2 from overcuriousity/claude/code-review-bugs-011Ga38jN53dLDkW2sLfzG1a
Review code for bugs and issues
This commit is contained in:
82
trace/cli.py
82
trace/cli.py
@@ -66,50 +66,54 @@ def quick_add_note(content: str):
|
||||
storage.save_data()
|
||||
|
||||
def export_markdown(output_file: str = "export.md"):
|
||||
storage = Storage()
|
||||
try:
|
||||
storage = Storage()
|
||||
|
||||
with open(output_file, "w") as f:
|
||||
f.write("# Forensic Notes Export\n\n")
|
||||
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||
with open(output_file, "w") as f:
|
||||
f.write("# Forensic Notes Export\n\n")
|
||||
f.write(f"Generated on: {time.ctime()}\n\n")
|
||||
|
||||
for case in storage.cases:
|
||||
f.write(f"## Case: {case.case_number}\n")
|
||||
if case.name:
|
||||
f.write(f"**Name:** {case.name}\n")
|
||||
if case.investigator:
|
||||
f.write(f"**Investigator:** {case.investigator}\n")
|
||||
f.write(f"**Case ID:** {case.case_id}\n\n")
|
||||
for case in storage.cases:
|
||||
f.write(f"## Case: {case.case_number}\n")
|
||||
if case.name:
|
||||
f.write(f"**Name:** {case.name}\n")
|
||||
if case.investigator:
|
||||
f.write(f"**Investigator:** {case.investigator}\n")
|
||||
f.write(f"**Case ID:** {case.case_id}\n\n")
|
||||
|
||||
f.write("### Case Notes\n")
|
||||
if not case.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in case.notes:
|
||||
write_note(f, note)
|
||||
|
||||
f.write("\n### Evidence\n")
|
||||
if not case.evidence:
|
||||
f.write("_No evidence._\n")
|
||||
|
||||
for ev in case.evidence:
|
||||
f.write(f"#### Evidence: {ev.name}\n")
|
||||
if ev.description:
|
||||
f.write(f"_{ev.description}_\n")
|
||||
f.write(f"**ID:** {ev.evidence_id}\n")
|
||||
|
||||
# Include source hash if available
|
||||
source_hash = ev.metadata.get("source_hash")
|
||||
if source_hash:
|
||||
f.write(f"**Source Hash:** `{source_hash}`\n")
|
||||
f.write("\n")
|
||||
|
||||
f.write("##### Evidence Notes\n")
|
||||
if not ev.notes:
|
||||
f.write("### Case Notes\n")
|
||||
if not case.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in ev.notes:
|
||||
for note in case.notes:
|
||||
write_note(f, note)
|
||||
f.write("\n")
|
||||
f.write("---\n\n")
|
||||
print(f"Exported to {output_file}")
|
||||
|
||||
f.write("\n### Evidence\n")
|
||||
if not case.evidence:
|
||||
f.write("_No evidence._\n")
|
||||
|
||||
for ev in case.evidence:
|
||||
f.write(f"#### Evidence: {ev.name}\n")
|
||||
if ev.description:
|
||||
f.write(f"_{ev.description}_\n")
|
||||
f.write(f"**ID:** {ev.evidence_id}\n")
|
||||
|
||||
# Include source hash if available
|
||||
source_hash = ev.metadata.get("source_hash")
|
||||
if source_hash:
|
||||
f.write(f"**Source Hash:** `{source_hash}`\n")
|
||||
f.write("\n")
|
||||
|
||||
f.write("##### Evidence Notes\n")
|
||||
if not ev.notes:
|
||||
f.write("_No notes._\n")
|
||||
for note in ev.notes:
|
||||
write_note(f, note)
|
||||
f.write("\n")
|
||||
f.write("---\n\n")
|
||||
print(f"Exported to {output_file}")
|
||||
except (IOError, OSError, PermissionError) as e:
|
||||
print(f"Error: Failed to export to {output_file}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def write_note(f, note: Note):
|
||||
f.write(f"- **{time.ctime(note.timestamp)}**\n")
|
||||
|
||||
@@ -15,7 +15,7 @@ class Crypto:
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
stdout, stderr = proc.communicate()
|
||||
stdout, stderr = proc.communicate(timeout=10)
|
||||
|
||||
if proc.returncode != 0:
|
||||
return []
|
||||
@@ -41,8 +41,8 @@ class Crypto:
|
||||
|
||||
return keys
|
||||
|
||||
except FileNotFoundError:
|
||||
return [] # GPG not installed
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return [] # GPG not installed or timed out
|
||||
|
||||
@staticmethod
|
||||
def sign_content(content: str, key_id: str = None) -> str:
|
||||
@@ -71,7 +71,7 @@ class Crypto:
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
stdout, stderr = proc.communicate(input=content)
|
||||
stdout, stderr = proc.communicate(input=content, timeout=10)
|
||||
|
||||
if proc.returncode != 0:
|
||||
# Fallback: maybe no key is found or gpg error
|
||||
@@ -79,8 +79,8 @@ class Crypto:
|
||||
return ""
|
||||
|
||||
return stdout
|
||||
except FileNotFoundError:
|
||||
return "" # GPG not installed
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
return "" # GPG not installed or timed out
|
||||
|
||||
@staticmethod
|
||||
def hash_content(content: str, timestamp: float) -> str:
|
||||
|
||||
@@ -35,19 +35,26 @@ class Note:
|
||||
self.iocs = []
|
||||
|
||||
# IPv4 addresses
|
||||
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
||||
for match in re.findall(ipv4_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# IPv6 addresses (simplified)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
|
||||
# IPv6 addresses (supports compressed format)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
||||
for match in re.findall(ipv6_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# URLs (check before domains to prevent double-matching)
|
||||
url_pattern = r'https?://[^\s]+'
|
||||
for match in re.findall(url_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# Domain names (basic pattern)
|
||||
domain_pattern = r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b'
|
||||
for match in re.findall(domain_pattern, self.content):
|
||||
@@ -56,16 +63,9 @@ class Note:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# URLs
|
||||
url_pattern = r'https?://[^\s]+'
|
||||
for match in re.findall(url_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# MD5 hashes (32 hex chars)
|
||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
||||
for match in re.findall(md5_pattern, self.content):
|
||||
# SHA256 hashes (64 hex chars) - check longest first
|
||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
||||
for match in re.findall(sha256_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
@@ -77,9 +77,9 @@ class Note:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
|
||||
# SHA256 hashes (64 hex chars)
|
||||
sha256_pattern = r'\b[a-fA-F0-9]{64}\b'
|
||||
for match in re.findall(sha256_pattern, self.content):
|
||||
# MD5 hashes (32 hex chars)
|
||||
md5_pattern = r'\b[a-fA-F0-9]{32}\b'
|
||||
for match in re.findall(md5_pattern, self.content):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
self.iocs.append(match)
|
||||
@@ -103,14 +103,14 @@ class Note:
|
||||
seen = set()
|
||||
|
||||
# IPv4 addresses
|
||||
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
|
||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
||||
for match in re.findall(ipv4_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
iocs.append((match, 'ipv4'))
|
||||
|
||||
# IPv6 addresses (simplified)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
|
||||
# IPv6 addresses (supports compressed format)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
||||
for match in re.findall(ipv6_pattern, text):
|
||||
if match not in seen:
|
||||
seen.add(match)
|
||||
@@ -166,39 +166,56 @@ class Note:
|
||||
"""Extract IOCs with their positions for highlighting. Returns list of (text, start, end, type) tuples"""
|
||||
import re
|
||||
highlights = []
|
||||
covered_ranges = set()
|
||||
|
||||
def overlaps(start, end):
|
||||
"""Check if range overlaps with any covered range"""
|
||||
for covered_start, covered_end in covered_ranges:
|
||||
if not (end <= covered_start or start >= covered_end):
|
||||
return True
|
||||
return False
|
||||
|
||||
def add_highlight(match, ioc_type):
|
||||
"""Add highlight if it doesn't overlap with existing ones"""
|
||||
start, end = match.start(), match.end()
|
||||
if not overlaps(start, end):
|
||||
highlights.append((match.group(), start, end, ioc_type))
|
||||
covered_ranges.add((start, end))
|
||||
|
||||
# IPv4 addresses
|
||||
for match in re.finditer(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'ipv4'))
|
||||
ipv4_pattern = r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b'
|
||||
for match in re.finditer(ipv4_pattern, text):
|
||||
add_highlight(match, 'ipv4')
|
||||
|
||||
# IPv6 addresses
|
||||
for match in re.finditer(r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'ipv6'))
|
||||
# IPv6 addresses (supports compressed format)
|
||||
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|\b(?:[0-9a-fA-F]{1,4}:)*::(?:[0-9a-fA-F]{1,4}:)*[0-9a-fA-F]{0,4}\b'
|
||||
for match in re.finditer(ipv6_pattern, text):
|
||||
add_highlight(match, 'ipv6')
|
||||
|
||||
# URLs (check before domains)
|
||||
# URLs (check before domains to prevent double-matching)
|
||||
for match in re.finditer(r'https?://[^\s]+', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'url'))
|
||||
add_highlight(match, 'url')
|
||||
|
||||
# Domain names
|
||||
for match in re.finditer(r'\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,}\b', text):
|
||||
if not match.group().startswith('example.'):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'domain'))
|
||||
add_highlight(match, 'domain')
|
||||
|
||||
# SHA256 hashes
|
||||
# SHA256 hashes (64 hex chars) - check longest first
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{64}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'sha256'))
|
||||
add_highlight(match, 'sha256')
|
||||
|
||||
# SHA1 hashes
|
||||
# SHA1 hashes (40 hex chars)
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{40}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'sha1'))
|
||||
add_highlight(match, 'sha1')
|
||||
|
||||
# MD5 hashes
|
||||
# MD5 hashes (32 hex chars)
|
||||
for match in re.finditer(r'\b[a-fA-F0-9]{32}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'md5'))
|
||||
add_highlight(match, 'md5')
|
||||
|
||||
# Email addresses
|
||||
for match in re.finditer(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text):
|
||||
highlights.append((match.group(), match.start(), match.end(), 'email'))
|
||||
add_highlight(match, 'email')
|
||||
|
||||
return highlights
|
||||
|
||||
|
||||
@@ -43,9 +43,6 @@ Key objectives:
|
||||
case_note1.extract_iocs()
|
||||
demo_case.notes.append(case_note1)
|
||||
|
||||
# Wait a moment for different timestamp
|
||||
time.sleep(0.1)
|
||||
|
||||
case_note2 = Note(content="""Investigation lead: Employee reported suspicious email from sender@phishing-domain.com
|
||||
Initial analysis shows potential credential harvesting attempt.
|
||||
Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
||||
@@ -54,8 +51,6 @@ Review email headers and attachments for IOCs. #phishing #email-analysis""")
|
||||
case_note2.extract_iocs()
|
||||
demo_case.notes.append(case_note2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 1: Compromised laptop
|
||||
evidence1 = Evidence(
|
||||
name="Employee Laptop HDD",
|
||||
@@ -74,8 +69,6 @@ Chain of custody maintained throughout process. #forensics #imaging #chain-of-cu
|
||||
note1.extract_iocs()
|
||||
evidence1.notes.append(note1)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note2 = Note(content="""Discovered suspicious connections to external IP addresses:
|
||||
- 192.168.1.100 (local gateway)
|
||||
- 203.0.113.45 (external, geolocation: Unknown)
|
||||
@@ -88,8 +81,6 @@ Browser history shows visits to malicious-site.com and data-exfil.net.
|
||||
note2.extract_iocs()
|
||||
evidence1.notes.append(note2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note3 = Note(content="""Malware identified in temp directory:
|
||||
File: evil.exe
|
||||
MD5: d41d8cd98f00b204e9800998ecf8427e
|
||||
@@ -102,8 +93,6 @@ Submitting to VirusTotal for analysis. #malware #hash-analysis #virustotal""")
|
||||
note3.extract_iocs()
|
||||
evidence1.notes.append(note3)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note4 = Note(content="""Timeline analysis reveals:
|
||||
- 2024-01-15 09:23:45 - Suspicious email received
|
||||
- 2024-01-15 09:24:12 - User clicked phishing link https://evil-domain.com/login
|
||||
@@ -118,8 +107,6 @@ User credentials compromised. Recommend immediate password reset. #timeline #lat
|
||||
|
||||
demo_case.evidence.append(evidence1)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 2: Network logs
|
||||
evidence2 = Evidence(
|
||||
name="Firewall Logs",
|
||||
@@ -139,8 +126,6 @@ Total data transferred: approximately 2.3 GB over 4 hours.
|
||||
note5.extract_iocs()
|
||||
evidence2.notes.append(note5)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
note6 = Note(content="""Contact information found in malware configuration:
|
||||
Email: attacker@malicious-domain.com
|
||||
Backup C2: 2001:0db8:85a3:0000:0000:8a2e:0370:7334 (IPv6)
|
||||
@@ -153,8 +138,6 @@ Cross-referencing with threat intelligence databases. #threat-intel #attribution
|
||||
|
||||
demo_case.evidence.append(evidence2)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
# Create evidence 3: Email forensics
|
||||
evidence3 = Evidence(
|
||||
name="Phishing Email",
|
||||
@@ -240,8 +223,11 @@ class StateManager:
|
||||
state = self.get_active()
|
||||
state["case_id"] = case_id
|
||||
state["evidence_id"] = evidence_id
|
||||
with open(self.state_file, 'w') as f:
|
||||
# Atomic write: write to temp file then rename
|
||||
temp_file = self.state_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w') as f:
|
||||
json.dump(state, f)
|
||||
temp_file.replace(self.state_file)
|
||||
|
||||
def get_active(self) -> dict:
|
||||
if not self.state_file.exists():
|
||||
@@ -250,7 +236,7 @@ class StateManager:
|
||||
with open(self.state_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError):
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
return {"case_id": None, "evidence_id": None}
|
||||
|
||||
def get_settings(self) -> dict:
|
||||
if not self.settings_file.exists():
|
||||
@@ -264,5 +250,8 @@ class StateManager:
|
||||
def set_setting(self, key: str, value):
|
||||
settings = self.get_settings()
|
||||
settings[key] = value
|
||||
with open(self.settings_file, 'w') as f:
|
||||
# Atomic write: write to temp file then rename
|
||||
temp_file = self.settings_file.with_suffix(".tmp")
|
||||
with open(temp_file, 'w') as f:
|
||||
json.dump(settings, f)
|
||||
temp_file.replace(self.settings_file)
|
||||
|
||||
11
trace/tui.py
11
trace/tui.py
@@ -144,17 +144,18 @@ class TUI:
|
||||
def _classify_ioc(self, ioc):
|
||||
"""Classify IOC type based on pattern"""
|
||||
import re
|
||||
if re.match(r'^[a-fA-F0-9]{32}$', ioc):
|
||||
return 'MD5'
|
||||
# Check longest hashes first to avoid misclassification
|
||||
if re.match(r'^[a-fA-F0-9]{64}$', ioc):
|
||||
return 'SHA256'
|
||||
elif re.match(r'^[a-fA-F0-9]{40}$', ioc):
|
||||
return 'SHA1'
|
||||
elif re.match(r'^[a-fA-F0-9]{64}$', ioc):
|
||||
return 'SHA256'
|
||||
elif re.match(r'^[a-fA-F0-9]{32}$', ioc):
|
||||
return 'MD5'
|
||||
elif re.match(r'^https?://', ioc):
|
||||
return 'URL'
|
||||
elif '@' in ioc:
|
||||
return 'EMAIL'
|
||||
elif re.match(r'^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$', ioc):
|
||||
elif re.match(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$', ioc):
|
||||
return 'IPv4'
|
||||
elif ':' in ioc and any(c in '0123456789abcdefABCDEF' for c in ioc):
|
||||
return 'IPv6'
|
||||
|
||||
Reference in New Issue
Block a user