new database scheme

This commit is contained in:
overcuriousity 2025-10-11 00:19:12 +02:00
parent 252dbbdcce
commit 690e2a261b
8 changed files with 1506 additions and 354 deletions

View File

@ -0,0 +1,6 @@
"""Core business logic modules for ForensicTrails."""
from .case_manager import CaseManager
from .note_manager import NoteManager
__all__ = ['CaseManager', 'NoteManager']

View File

@ -1,69 +1,208 @@
"""Case management for forensic investigations.
This module handles all operations related to investigation cases,
including creation, retrieval, updates, and lifecycle management.
"""
from ..utils.config import config from ..utils.config import config
from ..db.database import get_db_connection from ..db.database import get_db_connection
import logging import logging
from datetime import datetime, timezone from datetime import datetime, timezone
class CaseManager():
class CaseManager:
"""Manager for forensic investigation cases."""
def __init__(self, db_path=None): def __init__(self, db_path=None):
"""Initialize the CaseManager.
Args:
db_path: Optional path to database file. Uses config default if None.
"""
if db_path is None: if db_path is None:
db_path = config.database_path db_path = config.database_path
self.db_path = db_path self.db_path = db_path
self.conn = get_db_connection(self.db_path) self.conn = get_db_connection(self.db_path)
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
logging.debug(f"Connected to database at {self.db_path}") logging.debug(f"CaseManager connected to database at {self.db_path}")
def create_case(self, case_id, case_title, investigator, classification=None, summary=None): def create_case(self, description, investigator_name, investigator_role=None, status='Open'):
"""Create a new case.
Args:
description: Case description text
investigator_name: Name of the investigator
investigator_role: Optional role of the investigator
status: Case status ('Open', 'Closed', 'Archived'), default 'Open'
Returns:
case_id: The ID of the newly created case
"""
with self.conn: with self.conn:
# Insert case
self.cursor.execute(""" self.cursor.execute("""
INSERT INTO cases (case_id, case_title, investigator, classification, summary, status) INSERT INTO "case" (description, status)
VALUES (?, ?, ?, ?, ?, 'active') VALUES (?, ?)
""", (case_id, case_title, investigator, classification, summary)) """, (description, status))
case_id = self.cursor.lastrowid
logging.info(f"Created new case with ID: {case_id}") logging.info(f"Created new case with ID: {case_id}")
# Handle investigator - find or create
investigator_id = self._get_or_create_investigator(investigator_name, investigator_role)
# Link investigator to case
self.cursor.execute("""
INSERT INTO investigator_case (investigator_id, case_id)
VALUES (?, ?)
""", (investigator_id, case_id))
return case_id
def _get_or_create_investigator(self, name, role=None):
"""Get or create an investigator by name.
Args:
name: Investigator name
role: Optional investigator role
Returns:
Integer investigator ID
"""
self.cursor.execute("SELECT investigator_id FROM investigator WHERE name = ?", (name,))
result = self.cursor.fetchone()
if result:
return result['investigator_id']
else:
self.cursor.execute("""
INSERT INTO investigator (name, role)
VALUES (?, ?)
""", (name, role))
return self.cursor.lastrowid
def get_case(self, case_id): def get_case(self, case_id):
"""Get a case by ID.
Args:
case_id: Integer case ID
Returns:
dict with case data including investigators, or None if not found
"""
with self.conn: with self.conn:
self.cursor.execute("SELECT * FROM cases WHERE case_id = ?", (case_id,)) self.cursor.execute("""
SELECT c.case_id, c.description, c.status, c.created, c.modified, c.closed,
GROUP_CONCAT(i.name, ', ') as investigators
FROM "case" c
LEFT JOIN investigator_case ic ON c.case_id = ic.case_id
LEFT JOIN investigator i ON ic.investigator_id = i.investigator_id
WHERE c.case_id = ?
GROUP BY c.case_id
""", (case_id,))
case = self.cursor.fetchone() case = self.cursor.fetchone()
if case: if case:
return dict(case) return {
'case_id': case['case_id'],
'description': case['description'],
'investigators': case['investigators'] or '',
'status': case['status'],
'created': case['created'],
'modified': case['modified'],
'closed': case['closed']
}
else: else:
logging.warning(f"No case found with ID: {case_id}") logging.warning(f"No case found with ID: {case_id}")
return None return None
def list_cases(self, status=None, search_term=None): def list_cases(self, status=None, search_term=None):
"""List cases with optional filtering.
Args:
status: Optional status filter ('Open', 'Closed', 'Archived')
search_term: Optional search term for description or investigators
Returns:
list of case dictionaries
"""
with self.conn: with self.conn:
query = "SELECT * FROM cases WHERE 1=1" query = """
SELECT c.case_id, c.description, c.status, c.created, c.modified, c.closed,
GROUP_CONCAT(i.name, ', ') as investigators
FROM "case" c
LEFT JOIN investigator_case ic ON c.case_id = ic.case_id
LEFT JOIN investigator i ON ic.investigator_id = i.investigator_id
WHERE 1=1
"""
params = [] params = []
conditions = []
if status: if status:
query += " AND status = ?" query += " AND c.status = ?"
params.append(status) params.append(status)
if search_term:
query += " AND (case_id LIKE ? OR case_title LIKE ? OR investigator LIKE ?)"
like_term = f"%{search_term}%"
params.extend([like_term, like_term, like_term])
query += " ORDER BY created_at DESC" query += " GROUP BY c.case_id"
if search_term:
query += " HAVING c.description LIKE ? OR investigators LIKE ?"
like_term = f"%{search_term}%"
params.extend([like_term, like_term])
query += " ORDER BY c.created DESC"
self.cursor.execute(query, params) self.cursor.execute(query, params)
return [dict(row) for row in self.cursor.fetchall()] cases = []
for row in self.cursor.fetchall():
case_dict = {
'case_id': row['case_id'],
'description': row['description'],
'investigators': row['investigators'] or '',
'status': row['status'],
'created': row['created'],
'modified': row['modified'],
'closed': row['closed']
}
cases.append(case_dict)
def update_case(self, case_id, **kwargs): return cases
allowed_fields = ['case_title', 'investigator', 'classification', 'summary', 'status']
updates = {k: v for k, v in kwargs.items() if k in allowed_fields} def update_case(self, case_id, description=None, status=None):
"""Update a case.
Args:
case_id: Integer case ID
description: Optional new description
status: Optional new status ('Open', 'Closed', 'Archived')
Returns:
bool indicating success
"""
updates = {}
if description is not None:
updates['description'] = description
if status is not None:
updates['status'] = status
if not updates: if not updates:
logging.warning("No valid fields provided for update.") logging.warning("No valid fields provided for update.")
return return False
updates['modified_at'] = datetime.now(timezone.utc).isoformat() # Always update modified timestamp
set_clause = ", ".join([f"{k} = ?" for k in updates.keys()]) set_parts = []
values = list(updates.values()) + [case_id] values = []
for k, v in updates.items():
set_parts.append(f"{k} = ?")
values.append(v)
set_parts.append("modified = CURRENT_TIMESTAMP")
set_clause = ", ".join(set_parts)
values.append(case_id)
with self.conn: with self.conn:
self.cursor.execute(f""" self.cursor.execute(f"""
UPDATE cases UPDATE "case"
SET {set_clause} SET {set_clause}
WHERE case_id = ? WHERE case_id = ?
""", values) """, values)
@ -71,69 +210,81 @@ class CaseManager():
return self.cursor.rowcount > 0 return self.cursor.rowcount > 0
def close_case(self, case_id): def close_case(self, case_id):
return self.update_case(case_id, status='closed') """Close a case and set the closed timestamp.
Args:
case_id: Integer case ID
Returns:
bool indicating success
"""
with self.conn:
self.cursor.execute("""
UPDATE "case"
SET status = 'Closed', closed = CURRENT_TIMESTAMP, modified = CURRENT_TIMESTAMP
WHERE case_id = ?
""", (case_id,))
logging.info(f"Closed case with ID: {case_id}")
return self.cursor.rowcount > 0
def delete_case(self, case_id): def delete_case(self, case_id):
"""Delete a case.
Args:
case_id: Integer case ID
Returns:
bool indicating success
"""
with self.conn: with self.conn:
self.cursor.execute("DELETE FROM cases WHERE case_id = ?", (case_id,)) self.cursor.execute("DELETE FROM \"case\" WHERE case_id = ?", (case_id,))
logging.info(f"Deleted case with ID: {case_id}") logging.info(f"Deleted case with ID: {case_id}")
return self.cursor.rowcount > 0 return self.cursor.rowcount > 0
def archive_case(self, case_id): def archive_case(self, case_id):
return self.update_case(case_id, status='archived') """Archive a case.
Args:
case_id: Integer case ID
Returns:
bool indicating success
"""
return self.update_case(case_id, status='Archived')
def export_case_db(self, case_id, export_path): def export_case_db(self, case_id, export_path):
"""Export a case and all related data to a separate SQLite file.
Args:
case_id: Integer case ID
export_path: Path where to save the exported database
Returns:
bool indicating success
TODO: Implement export functionality
"""
# TODO: Implement export functionality # TODO: Implement export functionality
# should export a .sqlite file with only the data related to the specified case_id # should export a .sqlite file with only the data related to the specified case_id
pass raise NotImplementedError("Case export functionality not yet implemented")
def import_case_db(self, import_path): def import_case_db(self, import_path):
"""Import a case database and merge into the main database.
Args:
import_path: Path to the database file to import
Returns:
Integer case ID of the imported case
TODO: Implement import functionality
"""
# TODO: Implement import functionality # TODO: Implement import functionality
# should import a .sqlite file and merge its data into the main database # should import a .sqlite file and merge its data into the main database
pass raise NotImplementedError("Case import functionality not yet implemented")
def create_note(self, case_id, content, investigator, question_tags=None):
"""Create a new note for a case."""
import hashlib
import uuid
import json
note_id = str(uuid.uuid4())
timestamp = datetime.now(timezone.utc).isoformat()
# Create hash of content + timestamp for integrity
hash_content = f"{content}{timestamp}".encode('utf-8')
content_hash = hashlib.sha256(hash_content).hexdigest()
# Convert tags to JSON
tags_json = json.dumps(question_tags) if question_tags else None
with self.conn:
self.cursor.execute("""
INSERT INTO notes (note_id, case_id, timestamp, content, investigator, question_tags, hash)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (note_id, case_id, timestamp, content, investigator, tags_json, content_hash))
logging.info(f"Created note {note_id} for case {case_id}")
return note_id
def get_notes(self, case_id):
"""Get all notes for a case, ordered by timestamp."""
with self.conn:
self.cursor.execute("""
SELECT * FROM notes
WHERE case_id = ?
ORDER BY timestamp DESC
""", (case_id,))
return [dict(row) for row in self.cursor.fetchall()]
def get_note(self, note_id):
"""Get a specific note by ID."""
with self.conn:
self.cursor.execute("SELECT * FROM notes WHERE note_id = ?", (note_id,))
note = self.cursor.fetchone()
if note:
return dict(note)
else:
logging.warning(f"No note found with ID: {note_id}")
return None
def close(self):
"""Close the database connection."""
if self.conn:
self.conn.close()
logging.debug("CaseManager database connection closed")

View File

@ -0,0 +1,414 @@
"""Note management for forensic investigation cases.
This module handles all operations related to investigation notes,
including creation, retrieval, updates, and revision history.
Notes are immutable by design - updates create new revisions.
"""
from ..utils.config import config
from ..db.database import get_db_connection
import logging
class NoteManager:
"""Manager for investigation notes with immutable revision history."""
def __init__(self, db_path=None):
"""Initialize the NoteManager.
Args:
db_path: Optional path to database file. Uses config default if None.
"""
if db_path is None:
db_path = config.database_path
self.db_path = db_path
self.conn = get_db_connection(self.db_path)
self.cursor = self.conn.cursor()
logging.debug(f"NoteManager connected to database at {self.db_path}")
def _get_or_create_investigator(self, name, role=None):
"""Get or create an investigator by name.
Args:
name: Investigator name
role: Optional investigator role
Returns:
Integer investigator ID
"""
self.cursor.execute("SELECT investigator_id FROM investigator WHERE name = ?", (name,))
result = self.cursor.fetchone()
if result:
return result['investigator_id']
else:
self.cursor.execute("""
INSERT INTO investigator (name, role)
VALUES (?, ?)
""", (name, role))
return self.cursor.lastrowid
def _get_or_create_question(self, case_id, question_text):
"""Get or create a question definition for a case.
Args:
case_id: Integer case ID
question_text: Question text (e.g., "WHO", "WHAT", etc.)
Returns:
Integer question ID
"""
# Check if this question already exists for this case
self.cursor.execute("""
SELECT question_id FROM question_definition
WHERE case_id = ? AND question_text = ?
""", (case_id, question_text))
result = self.cursor.fetchone()
if result:
return result['question_id']
# Create new question
# Determine display order (last + 1)
self.cursor.execute("""
SELECT MAX(display_order) as max_order
FROM question_definition
WHERE case_id = ?
""", (case_id,))
max_order = self.cursor.fetchone()['max_order']
next_order = (max_order or 0) + 1
self.cursor.execute("""
INSERT INTO question_definition (case_id, question_text, display_order)
VALUES (?, ?, ?)
""", (case_id, question_text, next_order))
return self.cursor.lastrowid
def create_note(self, case_id, content, investigator, question_tags=None):
"""Create a new note for a case.
Args:
case_id: Integer case ID
content: Note content text
investigator: Investigator name
question_tags: Optional list of question tags (e.g., ["WHO", "WHAT"])
Returns:
Integer note ID or None on error
"""
# Get or create investigator
investigator_id = self._get_or_create_investigator(investigator)
with self.conn:
# Create the note container
self.cursor.execute("""
INSERT INTO note (case_id, investigator_id)
VALUES (?, ?)
""", (case_id, investigator_id))
note_id = self.cursor.lastrowid
# Create the first revision
self.cursor.execute("""
INSERT INTO note_revision (note_id, content, revision_number)
VALUES (?, ?, 1)
""", (note_id, content))
# Handle question tags if provided
if question_tags:
for tag in question_tags:
# Find or create question definition for this case
question_id = self._get_or_create_question(case_id, tag)
# Tag the note with the question
self.cursor.execute("""
INSERT INTO note_question_tag (note_id, question_id)
VALUES (?, ?)
""", (note_id, question_id))
logging.info(f"Created note {note_id} for case {case_id}")
return note_id
def get_note(self, note_id):
"""Get a specific note by ID with its latest revision.
Args:
note_id: Integer note ID
Returns:
Note dictionary with keys:
- note_id: Integer note ID
- case_id: Integer case ID
- timestamp: Note creation timestamp
- content: Latest revision content
- investigator: Investigator name
- question_tags: List of question tag strings
Returns None if not found.
"""
with self.conn:
self.cursor.execute("""
SELECT
n.note_id,
n.case_id,
n.created as timestamp,
i.name as investigator,
nr.content,
nr.revision_number,
GROUP_CONCAT(qd.question_text, '|') as question_tags
FROM note n
JOIN investigator i ON n.investigator_id = i.investigator_id
JOIN note_revision nr ON n.note_id = nr.note_id
LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id
LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id
WHERE n.note_id = ?
AND nr.revision_number = (
SELECT MAX(revision_number)
FROM note_revision
WHERE note_id = n.note_id
)
GROUP BY n.note_id
""", (note_id,))
row = self.cursor.fetchone()
if row:
note_dict = {
'note_id': row['note_id'],
'case_id': row['case_id'],
'timestamp': row['timestamp'],
'content': row['content'],
'investigator': row['investigator'],
'question_tags': row['question_tags'].split('|') if row['question_tags'] else []
}
return note_dict
else:
logging.warning(f"No note found with ID: {note_id}")
return None
def get_notes(self, case_id):
"""Get all notes for a case with their latest revision.
Args:
case_id: Integer case ID
Returns:
List of note dictionaries (see get_note for dictionary structure)
"""
with self.conn:
self.cursor.execute("""
SELECT
n.note_id,
n.case_id,
n.created as timestamp,
i.name as investigator,
nr.content,
nr.revision_number,
GROUP_CONCAT(qd.question_text, '|') as question_tags
FROM note n
JOIN investigator i ON n.investigator_id = i.investigator_id
JOIN note_revision nr ON n.note_id = nr.note_id
LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id
LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id
WHERE n.case_id = ?
AND nr.revision_number = (
SELECT MAX(revision_number)
FROM note_revision
WHERE note_id = n.note_id
)
GROUP BY n.note_id
ORDER BY n.created DESC
""", (case_id,))
notes = []
for row in self.cursor.fetchall():
note_dict = {
'note_id': row['note_id'],
'case_id': row['case_id'],
'timestamp': row['timestamp'],
'content': row['content'],
'investigator': row['investigator'],
'question_tags': row['question_tags'].split('|') if row['question_tags'] else []
}
notes.append(note_dict)
return notes
def update_note(self, note_id, content):
"""Update a note by creating a new revision (immutable history).
Args:
note_id: Integer note ID
content: New content text
Returns:
bool indicating success
"""
with self.conn:
# Get current max revision number
self.cursor.execute("""
SELECT MAX(revision_number) as max_rev
FROM note_revision
WHERE note_id = ?
""", (note_id,))
result = self.cursor.fetchone()
max_rev = result['max_rev'] if result else 0
next_rev = max_rev + 1
# Insert new revision
self.cursor.execute("""
INSERT INTO note_revision (note_id, content, revision_number)
VALUES (?, ?, ?)
""", (note_id, content, next_rev))
logging.info(f"Created revision {next_rev} for note {note_id}")
return True
def get_note_history(self, note_id):
"""Get all revisions of a note (revision history).
Args:
note_id: Integer note ID
Returns:
List of revision dictionaries with keys:
- revision_id: Integer revision ID
- note_id: Integer note ID
- content: Revision content
- timestamp: Revision creation timestamp
- revision_number: Sequential revision number
Ordered by revision_number DESC (newest first).
"""
with self.conn:
self.cursor.execute("""
SELECT revision_id, note_id, content, timestamp, revision_number
FROM note_revision
WHERE note_id = ?
ORDER BY revision_number DESC
""", (note_id,))
return [dict(row) for row in self.cursor.fetchall()]
def update_note_tags(self, note_id, question_tags):
"""Update the question tags for a note.
Args:
note_id: Integer note ID
question_tags: List of question tag strings
Returns:
bool indicating success
"""
with self.conn:
# First, get the case_id for this note
self.cursor.execute("SELECT case_id FROM note WHERE note_id = ?", (note_id,))
result = self.cursor.fetchone()
if not result:
logging.warning(f"Cannot update tags: note {note_id} not found")
return False
case_id = result['case_id']
# Remove existing tags
self.cursor.execute("DELETE FROM note_question_tag WHERE note_id = ?", (note_id,))
# Add new tags
if question_tags:
for tag in question_tags:
question_id = self._get_or_create_question(case_id, tag)
self.cursor.execute("""
INSERT INTO note_question_tag (note_id, question_id)
VALUES (?, ?)
""", (note_id, question_id))
logging.info(f"Updated tags for note {note_id}")
return True
def delete_note(self, note_id):
"""Delete a note and all its revisions.
Args:
note_id: Integer note ID
Returns:
bool indicating success
"""
with self.conn:
self.cursor.execute("DELETE FROM note WHERE note_id = ?", (note_id,))
logging.info(f"Deleted note with ID: {note_id}")
return self.cursor.rowcount > 0
def search_notes(self, case_id=None, search_term=None, question_tags=None):
"""Search notes with optional filters.
Args:
case_id: Optional case ID filter
search_term: Optional text to search in note content
question_tags: Optional list of question tags to filter by
Returns:
List of note dictionaries matching the criteria
"""
with self.conn:
query = """
SELECT
n.note_id,
n.case_id,
n.created as timestamp,
i.name as investigator,
nr.content,
nr.revision_number,
GROUP_CONCAT(qd.question_text, '|') as question_tags
FROM note n
JOIN investigator i ON n.investigator_id = i.investigator_id
JOIN note_revision nr ON n.note_id = nr.note_id
LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id
LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id
WHERE nr.revision_number = (
SELECT MAX(revision_number)
FROM note_revision
WHERE note_id = n.note_id
)
"""
params = []
if case_id is not None:
query += " AND n.case_id = ?"
params.append(case_id)
query += " GROUP BY n.note_id"
if search_term:
query += " HAVING nr.content LIKE ?"
params.append(f"%{search_term}%")
query += " ORDER BY n.created DESC"
self.cursor.execute(query, params)
notes = []
for row in self.cursor.fetchall():
note_dict = {
'note_id': row['note_id'],
'case_id': row['case_id'],
'timestamp': row['timestamp'],
'content': row['content'],
'investigator': row['investigator'],
'question_tags': row['question_tags'].split('|') if row['question_tags'] else []
}
# Filter by question tags if specified
if question_tags:
if any(tag in note_dict['question_tags'] for tag in question_tags):
notes.append(note_dict)
else:
notes.append(note_dict)
return notes
def close(self):
"""Close the database connection."""
if self.conn:
self.conn.close()
logging.debug("NoteManager database connection closed")

View File

@ -6,10 +6,20 @@ import os
from ..utils.config import config from ..utils.config import config
def get_db_connection(db_path=None): def get_db_connection(db_path=None):
"""Get a connection to the SQLite database.
Args:
db_path: Optional path to database file. Uses config default if None.
Returns:
sqlite3.Connection with Row factory and foreign keys enabled
"""
if db_path is None: if db_path is None:
db_path = config.database_path db_path = config.database_path
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
# Enable foreign key constraints (not enabled by default in SQLite)
conn.execute("PRAGMA foreign_keys = ON")
return conn return conn
def validate_database_schema(db_path): def validate_database_schema(db_path):
@ -23,8 +33,9 @@ def validate_database_schema(db_path):
tables = {row[0] for row in cursor.fetchall()} tables = {row[0] for row in cursor.fetchall()}
required_tables = { required_tables = {
'cases', 'notes', 'evidence', 'chain_of_custody', 'case', 'investigator', 'note', 'note_revision', 'evidence',
'attachments', 'question_entries', 'users', 'tasks' 'question_definition', 'investigator_case', 'note_question_tag',
'chain_of_custody'
} }
# Check if all required tables exist # Check if all required tables exist

View File

@ -1,107 +1,130 @@
-- Cases table -- ForensicTrails Database Schema
CREATE TABLE cases ( -- SQLite Version
case_id TEXT PRIMARY KEY, -- Based on normalized ER Diagram with immutable note revisions
case_title TEXT NOT NULL,
date_opened TIMESTAMP DEFAULT CURRENT_TIMESTAMP, PRAGMA foreign_keys = ON;
investigator TEXT NOT NULL,
classification TEXT, -- ---
summary TEXT, -- Table: "case"
status TEXT DEFAULT 'active', -- Represents a forensic investigation case
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- ---
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP CREATE TABLE IF NOT EXISTS "case" (
case_id INTEGER PRIMARY KEY AUTOINCREMENT,
description TEXT,
status TEXT CHECK(status IN ('Open', 'Closed', 'Archived')) NOT NULL DEFAULT 'Open',
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
closed TIMESTAMP DEFAULT NULL
); );
-- Notes table (append-only, immutable) -- ---
CREATE TABLE notes ( -- Table: investigator
note_id TEXT PRIMARY KEY, -- Represents forensic investigators
case_id TEXT NOT NULL, -- ---
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CREATE TABLE IF NOT EXISTS investigator (
investigator_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
role TEXT DEFAULT NULL
);
-- ---
-- Table: note
-- Represents investigation notes (immutable containers)
-- ---
CREATE TABLE IF NOT EXISTS note (
note_id INTEGER PRIMARY KEY AUTOINCREMENT,
case_id INTEGER NOT NULL,
investigator_id INTEGER NOT NULL,
created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE,
FOREIGN KEY (investigator_id) REFERENCES investigator(investigator_id)
);
CREATE INDEX IF NOT EXISTS idx_note_case ON note(case_id);
CREATE INDEX IF NOT EXISTS idx_note_created ON note(created);
-- ---
-- Table: note_revision
-- Represents versions/edits of notes (immutable history)
-- ---
CREATE TABLE IF NOT EXISTS note_revision (
revision_id INTEGER PRIMARY KEY AUTOINCREMENT,
note_id INTEGER NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
investigator TEXT NOT NULL, timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
question_tags TEXT, -- JSON array: ["WHO", "WHAT", etc.] revision_number INTEGER NOT NULL,
hash TEXT NOT NULL, -- SHA256 of content + timestamp FOREIGN KEY (note_id) REFERENCES note(note_id) ON DELETE CASCADE
FOREIGN KEY (case_id) REFERENCES cases(case_id)
); );
-- Evidence table CREATE INDEX IF NOT EXISTS idx_revision_note ON note_revision(note_id);
CREATE TABLE evidence (
evidence_id TEXT PRIMARY KEY, -- ---
case_id TEXT, -- Table: evidence
description TEXT NOT NULL, -- Represents physical or digital evidence items
filename TEXT, -- ---
file_size INTEGER, CREATE TABLE IF NOT EXISTS evidence (
md5_hash TEXT, evidence_id INTEGER PRIMARY KEY AUTOINCREMENT,
sha256_hash TEXT, case_id INTEGER NOT NULL,
source_origin TEXT, description TEXT,
received_date DATE, sha256_hash TEXT DEFAULT NULL,
received_by TEXT, FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE
physical_location TEXT,
notes TEXT,
status TEXT DEFAULT 'Active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (case_id) REFERENCES cases(case_id)
); );
-- Chain of Custody table CREATE INDEX IF NOT EXISTS idx_evidence_case ON evidence(case_id);
CREATE TABLE chain_of_custody (
coc_id TEXT PRIMARY KEY, -- ---
evidence_id TEXT NOT NULL, -- Table: question_definition
-- Represents configurable investigation questions per case
-- ---
CREATE TABLE IF NOT EXISTS question_definition (
question_id INTEGER PRIMARY KEY AUTOINCREMENT,
case_id INTEGER NOT NULL,
question_text TEXT NOT NULL,
display_order INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_question_case ON question_definition(case_id);
-- ---
-- Junction Table: investigator_case
-- Many-to-many: Investigators can work on multiple cases
-- ---
CREATE TABLE IF NOT EXISTS investigator_case (
investigator_id INTEGER NOT NULL,
case_id INTEGER NOT NULL,
PRIMARY KEY (investigator_id, case_id),
FOREIGN KEY (investigator_id) REFERENCES investigator(investigator_id) ON DELETE CASCADE,
FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE
);
-- ---
-- Junction Table: note_question_tag
-- Many-to-many: Notes can answer multiple questions
-- ---
CREATE TABLE IF NOT EXISTS note_question_tag (
note_id INTEGER NOT NULL,
question_id INTEGER NOT NULL,
tagged_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (note_id, question_id),
FOREIGN KEY (note_id) REFERENCES note(note_id) ON DELETE CASCADE,
FOREIGN KEY (question_id) REFERENCES question_definition(question_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_note_question_question ON note_question_tag(question_id);
-- ---
-- Chain of Custody table (kept for compatibility)
-- ---
CREATE TABLE IF NOT EXISTS chain_of_custody (
coc_id INTEGER PRIMARY KEY AUTOINCREMENT,
evidence_id INTEGER NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
action TEXT NOT NULL, -- 'received', 'transferred', 'accessed', 'archived' action TEXT NOT NULL,
from_person TEXT, from_person TEXT,
to_person TEXT, to_person TEXT,
location TEXT, location TEXT,
purpose TEXT, purpose TEXT,
signature_hash TEXT, -- Digital signature if needed signature_hash TEXT,
FOREIGN KEY (evidence_id) REFERENCES evidence(evidence_id) FOREIGN KEY (evidence_id) REFERENCES evidence(evidence_id) ON DELETE CASCADE
);
-- Attachments table (screenshots, documents)
CREATE TABLE attachments (
attachment_id TEXT PRIMARY KEY,
case_id TEXT NOT NULL,
note_id TEXT, -- Optional link to specific note
filename TEXT NOT NULL,
file_path TEXT NOT NULL,
file_hash TEXT NOT NULL,
mime_type TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (case_id) REFERENCES cases(case_id),
FOREIGN KEY (note_id) REFERENCES notes(note_id)
);
-- Investigation Questions tracking
CREATE TABLE question_entries (
entry_id TEXT PRIMARY KEY,
case_id TEXT NOT NULL,
note_id TEXT NOT NULL,
question_type TEXT NOT NULL, -- WHO/WHAT/WHEN/WHERE/HOW/WHY/WITH_WHAT
entry_text TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (case_id) REFERENCES cases(case_id),
FOREIGN KEY (note_id) REFERENCES notes(note_id)
);
-- User settings (for multi-user)
CREATE TABLE users (
user_id TEXT PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
full_name TEXT NOT NULL,
role TEXT DEFAULT 'Investigator', -- Investigator/Manager/Admin
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Optional: Task assignments (team mode)
CREATE TABLE tasks (
task_id TEXT PRIMARY KEY,
case_id TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
assigned_to TEXT,
assigned_by TEXT,
priority TEXT,
due_date DATE,
status TEXT DEFAULT 'Open',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (case_id) REFERENCES cases(case_id)
); );

View File

@ -6,6 +6,7 @@ import sqlite3
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
from datetime import datetime from datetime import datetime
import time
# Add the src directory to the path # Add the src directory to the path
import sys import sys
@ -47,43 +48,53 @@ class TestCaseManager(unittest.TestCase):
def test_create_case(self): def test_create_case(self):
"""Test creating a new case.""" """Test creating a new case."""
self.case_manager.create_case( case_id = self.case_manager.create_case(
case_id='CASE-001', description='Test homicide investigation',
case_title='Test Case', investigator_name='Detective Smith',
investigator='Detective Smith', investigator_role='Lead Detective'
classification='Homicide',
summary='Test summary'
) )
self.assertIsNotNone(case_id)
self.assertIsInstance(case_id, int)
# Verify case was created # Verify case was created
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertIsNotNone(case) self.assertIsNotNone(case)
self.assertEqual(case['case_id'], 'CASE-001') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['case_title'], 'Test Case') self.assertEqual(case['case_id'], case_id)
self.assertEqual(case['investigator'], 'Detective Smith') self.assertEqual(case['description'], 'Test homicide investigation')
self.assertEqual(case['classification'], 'Homicide') self.assertEqual(case['investigators'], 'Detective Smith')
self.assertEqual(case['summary'], 'Test summary') self.assertEqual(case['status'], 'Open')
self.assertEqual(case['status'], 'active')
def test_create_case_minimal_fields(self): def test_create_case_minimal(self):
"""Test creating a case with only required fields.""" """Test creating a case with minimal fields."""
self.case_manager.create_case( case_id = self.case_manager.create_case(
case_id='CASE-002', description='Minimal case',
case_title='Minimal Case', investigator_name='Detective Jones'
investigator='Detective Jones'
) )
case = self.case_manager.get_case('CASE-002') case = self.case_manager.get_case(case_id)
self.assertIsNotNone(case) self.assertIsNotNone(case)
self.assertEqual(case['case_id'], 'CASE-002') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['case_title'], 'Minimal Case') self.assertEqual(case['description'], 'Minimal case')
self.assertEqual(case['investigator'], 'Detective Jones') self.assertEqual(case['investigators'], 'Detective Jones')
self.assertIsNone(case['classification']) self.assertEqual(case['status'], 'Open')
self.assertIsNone(case['summary'])
def test_create_case_with_status(self):
"""Test creating a case with specific status."""
case_id = self.case_manager.create_case(
description='Archived case',
investigator_name='Detective Brown',
status='Archived'
)
case = self.case_manager.get_case(case_id)
assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Archived')
def test_get_case_nonexistent(self): def test_get_case_nonexistent(self):
"""Test getting a non-existent case returns None.""" """Test getting a non-existent case returns None."""
case = self.case_manager.get_case('NONEXISTENT') case = self.case_manager.get_case(99999)
self.assertIsNone(case) self.assertIsNone(case)
def test_list_cases_empty(self): def test_list_cases_empty(self):
@ -94,184 +105,178 @@ class TestCaseManager(unittest.TestCase):
def test_list_cases_multiple(self): def test_list_cases_multiple(self):
"""Test listing multiple cases.""" """Test listing multiple cases."""
# Create multiple cases # Create multiple cases
self.case_manager.create_case('CASE-001', 'Case One', 'Detective A') case_id1 = self.case_manager.create_case('Case One', 'Detective A')
self.case_manager.create_case('CASE-002', 'Case Two', 'Detective B') case_id2 = self.case_manager.create_case('Case Two', 'Detective B')
self.case_manager.create_case('CASE-003', 'Case Three', 'Detective C') case_id3 = self.case_manager.create_case('Case Three', 'Detective C')
cases = self.case_manager.list_cases() cases = self.case_manager.list_cases()
self.assertEqual(len(cases), 3) self.assertEqual(len(cases), 3)
case_ids = [c['case_id'] for c in cases] case_ids = [c['case_id'] for c in cases]
self.assertIn('CASE-001', case_ids) self.assertIn(case_id1, case_ids)
self.assertIn('CASE-002', case_ids) self.assertIn(case_id2, case_ids)
self.assertIn('CASE-003', case_ids) self.assertIn(case_id3, case_ids)
def test_list_cases_filter_by_status(self): def test_list_cases_filter_by_status(self):
"""Test listing cases filtered by status.""" """Test listing cases filtered by status."""
self.case_manager.create_case('CASE-001', 'Active Case', 'Detective A') case_id1 = self.case_manager.create_case('Active Case', 'Detective A')
self.case_manager.create_case('CASE-002', 'Another Active', 'Detective B') case_id2 = self.case_manager.create_case('Another Active', 'Detective B')
# Close one case # Close one case
self.case_manager.close_case('CASE-001') self.case_manager.close_case(case_id1)
# List only active cases # List only Open cases
active_cases = self.case_manager.list_cases(status='active') open_cases = self.case_manager.list_cases(status='Open')
self.assertEqual(len(active_cases), 1) self.assertEqual(len(open_cases), 1)
self.assertEqual(active_cases[0]['case_id'], 'CASE-002') self.assertEqual(open_cases[0]['case_id'], case_id2)
# List only closed cases # List only Closed cases
closed_cases = self.case_manager.list_cases(status='closed') closed_cases = self.case_manager.list_cases(status='Closed')
self.assertEqual(len(closed_cases), 1) self.assertEqual(len(closed_cases), 1)
self.assertEqual(closed_cases[0]['case_id'], 'CASE-001') self.assertEqual(closed_cases[0]['case_id'], case_id1)
def test_list_cases_search_term(self): def test_list_cases_search_term(self):
"""Test listing cases with search term.""" """Test listing cases with search term."""
self.case_manager.create_case('CASE-001', 'Murder Investigation', 'Detective Smith') self.case_manager.create_case('Murder investigation', 'Detective Smith')
self.case_manager.create_case('CASE-002', 'Theft Case', 'Detective Jones') self.case_manager.create_case('Theft case', 'Detective Jones')
self.case_manager.create_case('CASE-003', 'Murder Trial', 'Detective Brown') self.case_manager.create_case('Murder trial', 'Detective Brown')
# Search by title # Search by description
results = self.case_manager.list_cases(search_term='Murder') results = self.case_manager.list_cases(search_term='Murder')
self.assertEqual(len(results), 2) self.assertEqual(len(results), 2)
# Search by investigator # Search by investigator
results = self.case_manager.list_cases(search_term='Smith') results = self.case_manager.list_cases(search_term='Smith')
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0]['case_id'], 'CASE-001')
# Search by case ID
results = self.case_manager.list_cases(search_term='002')
self.assertEqual(len(results), 1)
self.assertEqual(results[0]['case_id'], 'CASE-002')
def test_list_cases_combined_filters(self): def test_list_cases_combined_filters(self):
"""Test listing cases with combined status and search filters.""" """Test listing cases with combined status and search filters."""
self.case_manager.create_case('CASE-001', 'Active Murder', 'Detective A') case_id1 = self.case_manager.create_case('Active Murder', 'Detective A')
self.case_manager.create_case('CASE-002', 'Active Theft', 'Detective B') case_id2 = self.case_manager.create_case('Active Theft', 'Detective B')
self.case_manager.create_case('CASE-003', 'Closed Murder', 'Detective C') case_id3 = self.case_manager.create_case('Closed Murder', 'Detective C')
self.case_manager.close_case('CASE-003') self.case_manager.close_case(case_id3)
# Search for "Murder" in active cases only # Search for "Murder" in Open cases only
results = self.case_manager.list_cases(status='active', search_term='Murder') results = self.case_manager.list_cases(status='Open', search_term='Murder')
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
self.assertEqual(results[0]['case_id'], 'CASE-001') self.assertEqual(results[0]['case_id'], case_id1)
def test_update_case(self): def test_update_case_description(self):
"""Test updating a case.""" """Test updating a case description."""
self.case_manager.create_case('CASE-001', 'Original Title', 'Detective A') case_id = self.case_manager.create_case('Original description', 'Detective A')
result = self.case_manager.update_case( result = self.case_manager.update_case(
'CASE-001', case_id,
case_title='Updated Title', description='Updated description'
classification='Homicide',
summary='Updated summary'
) )
self.assertTrue(result) self.assertTrue(result)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['case_title'], 'Updated Title') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['classification'], 'Homicide') self.assertEqual(case['description'], 'Updated description')
self.assertEqual(case['summary'], 'Updated summary')
self.assertEqual(case['investigator'], 'Detective A') # Unchanged
def test_update_case_invalid_fields(self): def test_update_case_status(self):
"""Test updating case with invalid fields ignores them.""" """Test updating a case status."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
result = self.case_manager.update_case( result = self.case_manager.update_case(case_id, status='Archived')
'CASE-001',
case_title='Updated Title',
invalid_field='Should be ignored'
)
# Should still work, just ignoring invalid field
self.assertTrue(result) self.assertTrue(result)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['case_title'], 'Updated Title') assert case is not None # Type narrowing for Pylance
self.assertNotIn('invalid_field', case) self.assertEqual(case['status'], 'Archived')
def test_update_case_no_valid_fields(self): def test_update_case_both_fields(self):
"""Test updating case with no valid fields.""" """Test updating both description and status."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
result = self.case_manager.update_case( result = self.case_manager.update_case(
'CASE-001', case_id,
invalid_field='Should be ignored' description='Updated description',
status='Closed'
) )
self.assertTrue(result)
# Should return None since no valid fields case = self.case_manager.get_case(case_id)
self.assertIsNone(result) assert case is not None # Type narrowing for Pylance
self.assertEqual(case['description'], 'Updated description')
self.assertEqual(case['status'], 'Closed')
def test_update_case_sets_modified_at(self): def test_update_case_no_fields(self):
"""Test that updating a case sets modified_at timestamp.""" """Test updating case with no fields returns False."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
case_before = self.case_manager.get_case('CASE-001') result = self.case_manager.update_case(case_id)
created_at = case_before['created_at'] self.assertFalse(result)
# Small delay to ensure different timestamp def test_update_case_sets_modified_timestamp(self):
import time """Test that updating a case sets modified timestamp."""
time.sleep(0.01) case_id = self.case_manager.create_case('Test case', 'Detective A')
self.case_manager.update_case('CASE-001', case_title='Updated') case_before = self.case_manager.get_case(case_id)
assert case_before is not None # Type narrowing for Pylance
created = case_before['created']
case_after = self.case_manager.get_case('CASE-001') time.sleep(1) # Ensure timestamp difference
modified_at = case_after['modified_at']
# modified_at should be different from created_at self.case_manager.update_case(case_id, description='Updated')
self.assertNotEqual(created_at, modified_at)
case_after = self.case_manager.get_case(case_id)
assert case_after is not None # Type narrowing for Pylance
modified = case_after['modified']
# Modified should be different from created
self.assertNotEqual(created, modified)
def test_update_nonexistent_case(self): def test_update_nonexistent_case(self):
"""Test updating a non-existent case returns False.""" """Test updating a non-existent case returns False."""
result = self.case_manager.update_case( result = self.case_manager.update_case(99999, description='Updated')
'NONEXISTENT',
case_title='Updated Title'
)
self.assertFalse(result) self.assertFalse(result)
def test_close_case(self): def test_close_case(self):
"""Test closing a case.""" """Test closing a case."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
result = self.case_manager.close_case('CASE-001') result = self.case_manager.close_case(case_id)
self.assertTrue(result) self.assertTrue(result)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['status'], 'closed') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Closed')
self.assertIsNotNone(case['closed'])
def test_archive_case(self): def test_archive_case(self):
"""Test archiving a case.""" """Test archiving a case."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
result = self.case_manager.archive_case('CASE-001') result = self.case_manager.archive_case(case_id)
self.assertTrue(result) self.assertTrue(result)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['status'], 'archived') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Archived')
def test_delete_case(self): def test_delete_case(self):
"""Test deleting a case.""" """Test deleting a case."""
self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') case_id = self.case_manager.create_case('Test case', 'Detective A')
# Verify case exists # Verify case exists
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertIsNotNone(case) self.assertIsNotNone(case)
# Delete case # Delete case
result = self.case_manager.delete_case('CASE-001') result = self.case_manager.delete_case(case_id)
self.assertTrue(result) self.assertTrue(result)
# Verify case is gone # Verify case is gone
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertIsNone(case) self.assertIsNone(case)
def test_delete_nonexistent_case(self): def test_delete_nonexistent_case(self):
"""Test deleting a non-existent case returns False.""" """Test deleting a non-existent case returns False."""
result = self.case_manager.delete_case('NONEXISTENT') result = self.case_manager.delete_case(99999)
self.assertFalse(result) self.assertFalse(result)
def test_case_manager_with_default_db_path(self): def test_case_manager_with_default_db_path(self):
@ -283,6 +288,19 @@ class TestCaseManager(unittest.TestCase):
self.assertEqual(cm.db_path, self.test_db_path) self.assertEqual(cm.db_path, self.test_db_path)
cm.conn.close() cm.conn.close()
def test_multiple_investigators_on_case(self):
"""Test adding multiple investigators to a case."""
case_id1 = self.case_manager.create_case('Case 1', 'Detective Smith')
case_id2 = self.case_manager.create_case('Case 2', 'Detective Smith')
# Verify investigator is reused
case1 = self.case_manager.get_case(case_id1)
case2 = self.case_manager.get_case(case_id2)
assert case1 is not None and case2 is not None # Type narrowing for Pylance
self.assertEqual(case1['investigators'], 'Detective Smith')
self.assertEqual(case2['investigators'], 'Detective Smith')
class TestCaseManagerIntegration(unittest.TestCase): class TestCaseManagerIntegration(unittest.TestCase):
"""Integration tests for CaseManager.""" """Integration tests for CaseManager."""
@ -308,70 +326,69 @@ class TestCaseManagerIntegration(unittest.TestCase):
def test_full_case_lifecycle(self): def test_full_case_lifecycle(self):
"""Test complete case lifecycle: create, update, close, archive, delete.""" """Test complete case lifecycle: create, update, close, archive, delete."""
# Create case # Create case
self.case_manager.create_case( case_id = self.case_manager.create_case(
'CASE-001', 'Investigation case',
'Investigation Case', 'Detective Smith'
'Detective Smith',
'Robbery',
'Initial summary'
) )
# Verify creation # Verify creation
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['status'], 'active') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Open')
# Update case # Update case
self.case_manager.update_case( self.case_manager.update_case(case_id, description='Updated findings')
'CASE-001', case = self.case_manager.get_case(case_id)
summary='Updated with new findings' assert case is not None # Type narrowing for Pylance
) self.assertEqual(case['description'], 'Updated findings')
case = self.case_manager.get_case('CASE-001')
self.assertEqual(case['summary'], 'Updated with new findings')
# Close case # Close case
self.case_manager.close_case('CASE-001') self.case_manager.close_case(case_id)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['status'], 'closed') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Closed')
# Archive case # Archive case
self.case_manager.archive_case('CASE-001') self.case_manager.archive_case(case_id)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertEqual(case['status'], 'archived') assert case is not None # Type narrowing for Pylance
self.assertEqual(case['status'], 'Archived')
# Delete case # Delete case
self.case_manager.delete_case('CASE-001') self.case_manager.delete_case(case_id)
case = self.case_manager.get_case('CASE-001') case = self.case_manager.get_case(case_id)
self.assertIsNone(case) self.assertIsNone(case)
def test_multiple_cases_workflow(self): def test_multiple_cases_workflow(self):
"""Test working with multiple cases simultaneously.""" """Test working with multiple cases simultaneously."""
# Create multiple cases # Create multiple cases
case_ids = []
for i in range(1, 6): for i in range(1, 6):
self.case_manager.create_case( case_id = self.case_manager.create_case(
f'CASE-{i:03d}',
f'Case {i}', f'Case {i}',
f'Detective {chr(64+i)}' # Detective A, B, C, etc. f'Detective {chr(64+i)}' # Detective A, B, C, etc.
) )
case_ids.append(case_id)
# Verify all created # Verify all created
all_cases = self.case_manager.list_cases() all_cases = self.case_manager.list_cases()
self.assertEqual(len(all_cases), 5) self.assertEqual(len(all_cases), 5)
# Close some cases # Close some cases
self.case_manager.close_case('CASE-001') self.case_manager.close_case(case_ids[0])
self.case_manager.close_case('CASE-003') self.case_manager.close_case(case_ids[2])
# Archive one # Archive one
self.case_manager.archive_case('CASE-005') self.case_manager.archive_case(case_ids[4])
# Check status distribution # Check status distribution
active_cases = self.case_manager.list_cases(status='active') open_cases = self.case_manager.list_cases(status='Open')
closed_cases = self.case_manager.list_cases(status='closed') closed_cases = self.case_manager.list_cases(status='Closed')
archived_cases = self.case_manager.list_cases(status='archived') archived_cases = self.case_manager.list_cases(status='Archived')
self.assertEqual(len(active_cases), 2) # 002, 004 self.assertEqual(len(open_cases), 2) # case_ids[1], case_ids[3]
self.assertEqual(len(closed_cases), 2) # 001, 003 self.assertEqual(len(closed_cases), 2) # case_ids[0], case_ids[2]
self.assertEqual(len(archived_cases), 1) # 005 self.assertEqual(len(archived_cases), 1) # case_ids[4]
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -78,15 +78,15 @@ class TestValidateDatabaseSchema(unittest.TestCase):
# Create only some of the required tables # Create only some of the required tables
cursor.execute(""" cursor.execute("""
CREATE TABLE cases ( CREATE TABLE "case" (
case_id TEXT PRIMARY KEY, case_id INTEGER PRIMARY KEY,
case_title TEXT NOT NULL description TEXT
) )
""") """)
cursor.execute(""" cursor.execute("""
CREATE TABLE notes ( CREATE TABLE note (
note_id TEXT PRIMARY KEY, note_id INTEGER PRIMARY KEY,
case_id TEXT NOT NULL case_id INTEGER NOT NULL
) )
""") """)
conn.commit() conn.commit()
@ -143,11 +143,13 @@ class TestCreateFreshDatabase(unittest.TestCase):
conn.close() conn.close()
required_tables = { required_tables = {
'cases', 'notes', 'evidence', 'chain_of_custody', 'case', 'note', 'note_revision', 'investigator',
'attachments', 'question_entries', 'users', 'tasks' 'evidence', 'chain_of_custody', 'question_definition',
'note_question_tag', 'investigator_case'
} }
self.assertEqual(tables, required_tables) # sqlite_sequence is automatically created by SQLite for AUTOINCREMENT
self.assertTrue(required_tables.issubset(tables))
def test_create_fresh_database_returns_path(self): def test_create_fresh_database_returns_path(self):
"""Test that create_fresh_database returns the database path.""" """Test that create_fresh_database returns the database path."""
@ -170,10 +172,12 @@ class TestCreateFreshDatabase(unittest.TestCase):
conn.close() conn.close()
required_tables = { required_tables = {
'cases', 'notes', 'evidence', 'chain_of_custody', 'case', 'note', 'note_revision', 'investigator',
'attachments', 'question_entries', 'users', 'tasks' 'evidence', 'chain_of_custody', 'question_definition',
'note_question_tag', 'investigator_case'
} }
self.assertEqual(tables, required_tables) # sqlite_sequence is automatically created by SQLite for AUTOINCREMENT
self.assertTrue(required_tables.issubset(tables))
class TestInitializeDatabase(unittest.TestCase): class TestInitializeDatabase(unittest.TestCase):
@ -219,10 +223,11 @@ class TestInitializeDatabase(unittest.TestCase):
conn = sqlite3.connect(self.test_db_path) conn = sqlite3.connect(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
INSERT INTO cases (case_id, case_title, investigator) INSERT INTO "case" (description, status)
VALUES ('TEST-001', 'Test Case', 'Test Investigator') VALUES ('Test Case', 'Open')
""") """)
conn.commit() conn.commit()
case_id = cursor.lastrowid
conn.close() conn.close()
# Initialize again # Initialize again
@ -231,12 +236,12 @@ class TestInitializeDatabase(unittest.TestCase):
# Verify data still exists # Verify data still exists
conn = sqlite3.connect(self.test_db_path) conn = sqlite3.connect(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT case_id FROM cases WHERE case_id = 'TEST-001'") cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = ?", (case_id,))
result = cursor.fetchone() result = cursor.fetchone()
conn.close() conn.close()
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEqual(result[0], 'TEST-001') self.assertEqual(result[0], case_id)
@patch('forensictrails.db.database.config') @patch('forensictrails.db.database.config')
def test_initialize_database_recreates_invalid_database(self, mock_config): def test_initialize_database_recreates_invalid_database(self, mock_config):
@ -248,8 +253,8 @@ class TestInitializeDatabase(unittest.TestCase):
# Create an incomplete database # Create an incomplete database
conn = sqlite3.connect(self.test_db_path) conn = sqlite3.connect(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("CREATE TABLE cases (case_id TEXT PRIMARY KEY)") cursor.execute("CREATE TABLE \"case\" (case_id INTEGER PRIMARY KEY)")
cursor.execute("INSERT INTO cases VALUES ('TEST-001')") cursor.execute("INSERT INTO \"case\" (case_id) VALUES (999)")
conn.commit() conn.commit()
conn.close() conn.close()
@ -261,7 +266,7 @@ class TestInitializeDatabase(unittest.TestCase):
conn = sqlite3.connect(self.test_db_path) conn = sqlite3.connect(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT case_id FROM cases WHERE case_id = 'TEST-001'") cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = 999")
result = cursor.fetchone() result = cursor.fetchone()
conn.close() conn.close()
@ -293,9 +298,9 @@ class TestShowDbSchema(unittest.TestCase):
# Verify that logging.debug was called # Verify that logging.debug was called
self.assertTrue(mock_logging.debug.called) self.assertTrue(mock_logging.debug.called)
# Check that it was called for each table (8 tables + 1 header message) # Check that it was called for each table (9 tables + 1 header message)
# Should be at least 9 calls (header + 8 tables) # Should be at least 10 calls (header + 9 tables)
self.assertGreaterEqual(mock_logging.debug.call_count, 9) self.assertGreaterEqual(mock_logging.debug.call_count, 10)
def test_show_db_schema_doesnt_raise_exception(self): def test_show_db_schema_doesnt_raise_exception(self):
"""Test that show_db_schema handles execution without raising exceptions.""" """Test that show_db_schema handles execution without raising exceptions."""
@ -338,12 +343,13 @@ class TestDatabaseIntegration(unittest.TestCase):
conn = get_db_connection(self.test_db_path) conn = get_db_connection(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(""" cursor.execute("""
INSERT INTO cases (case_id, case_title, investigator) INSERT INTO "case" (description, status)
VALUES ('CASE-001', 'Murder Investigation', 'Detective Smith') VALUES ('Murder Investigation', 'Open')
""") """)
case_id = cursor.lastrowid
cursor.execute(""" cursor.execute("""
INSERT INTO users (user_id, username, full_name) INSERT INTO investigator (name, role)
VALUES ('USER-001', 'dsmith', 'Detective Smith') VALUES ('Detective Smith', 'Lead Investigator')
""") """)
conn.commit() conn.commit()
conn.close() conn.close()
@ -351,15 +357,15 @@ class TestDatabaseIntegration(unittest.TestCase):
# Step 3: Verify data exists # Step 3: Verify data exists
conn = get_db_connection(self.test_db_path) conn = get_db_connection(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT case_title FROM cases WHERE case_id = 'CASE-001'") cursor.execute("SELECT description FROM \"case\" WHERE case_id = ?", (case_id,))
result = cursor.fetchone() result = cursor.fetchone()
self.assertEqual(result['case_title'], 'Murder Investigation') self.assertEqual(result['description'], 'Murder Investigation')
conn.close() conn.close()
# Step 4: Corrupt database (remove a required table) # Step 4: Corrupt database (remove a required table)
conn = sqlite3.connect(self.test_db_path) conn = sqlite3.connect(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("DROP TABLE users") cursor.execute("DROP TABLE investigator")
conn.commit() conn.commit()
conn.close() conn.close()
@ -373,7 +379,7 @@ class TestDatabaseIntegration(unittest.TestCase):
self.assertTrue(validate_database_schema(self.test_db_path)) self.assertTrue(validate_database_schema(self.test_db_path))
conn = get_db_connection(self.test_db_path) conn = get_db_connection(self.test_db_path)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT case_id FROM cases WHERE case_id = 'CASE-001'") cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = ?", (case_id,))
result = cursor.fetchone() result = cursor.fetchone()
self.assertIsNone(result) self.assertIsNone(result)
conn.close() conn.close()

524
tests/test_note_manager.py Normal file
View File

@ -0,0 +1,524 @@
"""Unit tests for the note_manager module."""
import unittest
import tempfile
import os
import sqlite3
from pathlib import Path
from unittest.mock import patch
from datetime import datetime
import time
# Add the src directory to the path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from forensictrails.core.note_manager import NoteManager
from forensictrails.core.case_manager import CaseManager
from forensictrails.db.database import create_fresh_database
class TestNoteManager(unittest.TestCase):
"""Test cases for NoteManager class."""
def setUp(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.test_db_path = os.path.join(self.temp_dir, 'test.db')
self.schema_path = Path(__file__).parent.parent / 'src' / 'forensictrails' / 'db' / 'schema.sql'
# Create a fresh database for testing
create_fresh_database(self.test_db_path, self.schema_path)
# Create manager instances
self.case_manager = CaseManager(self.test_db_path)
self.note_manager = NoteManager(self.test_db_path)
# Create a test case for notes
self.case_id = self.case_manager.create_case(
description='Test Case for Notes',
investigator_name='Detective Smith'
)
def tearDown(self):
"""Clean up test fixtures."""
if hasattr(self, 'note_manager') and hasattr(self.note_manager, 'conn'):
self.note_manager.conn.close()
if hasattr(self, 'case_manager') and hasattr(self.case_manager, 'conn'):
self.case_manager.conn.close()
if os.path.exists(self.test_db_path):
os.remove(self.test_db_path)
os.rmdir(self.temp_dir)
def test_note_manager_initialization(self):
"""Test NoteManager initializes correctly."""
self.assertEqual(self.note_manager.db_path, self.test_db_path)
self.assertIsNotNone(self.note_manager.conn)
self.assertIsNotNone(self.note_manager.cursor)
def test_create_note_basic(self):
"""Test creating a basic note."""
note_id = self.note_manager.create_note(
case_id=self.case_id,
content='This is a test note.',
investigator='Detective Smith'
)
self.assertIsNotNone(note_id)
self.assertIsInstance(note_id, int)
# Verify note was created
note = self.note_manager.get_note(note_id)
self.assertIsNotNone(note)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['note_id'], note_id)
self.assertEqual(note['case_id'], self.case_id)
self.assertEqual(note['content'], 'This is a test note.')
self.assertEqual(note['investigator'], 'Detective Smith')
self.assertEqual(note['question_tags'], [])
def test_create_note_with_tags(self):
"""Test creating a note with question tags."""
note_id = self.note_manager.create_note(
case_id=self.case_id,
content='Who was at the scene?',
investigator='Detective Jones',
question_tags=['WHO', 'WHEN']
)
note = self.note_manager.get_note(note_id)
self.assertIsNotNone(note)
assert note is not None # Type narrowing for Pylance
self.assertIn('WHO', note['question_tags'])
self.assertIn('WHEN', note['question_tags'])
def test_get_note_nonexistent(self):
"""Test getting a non-existent note returns None."""
note = self.note_manager.get_note(99999)
self.assertIsNone(note)
def test_get_notes_empty(self):
"""Test getting notes for a case with no notes."""
notes = self.note_manager.get_notes(self.case_id)
self.assertEqual(notes, [])
def test_get_notes_multiple(self):
"""Test getting multiple notes for a case."""
# Create multiple notes
note_id1 = self.note_manager.create_note(
self.case_id, 'First note', 'Detective A'
)
time.sleep(0.01) # Ensure different timestamps
note_id2 = self.note_manager.create_note(
self.case_id, 'Second note', 'Detective B'
)
time.sleep(0.01)
note_id3 = self.note_manager.create_note(
self.case_id, 'Third note', 'Detective C'
)
notes = self.note_manager.get_notes(self.case_id)
self.assertEqual(len(notes), 3)
note_ids = [n['note_id'] for n in notes]
self.assertIn(note_id1, note_ids)
self.assertIn(note_id2, note_ids)
self.assertIn(note_id3, note_ids)
# Verify they're ordered by creation time (DESC)
# Check that timestamps are in descending order
timestamps = [n['timestamp'] for n in notes]
self.assertEqual(timestamps, sorted(timestamps, reverse=True))
def test_update_note_creates_revision(self):
"""Test that updating a note creates a new revision."""
note_id = self.note_manager.create_note(
self.case_id, 'Original content', 'Detective Smith'
)
# Update the note
result = self.note_manager.update_note(note_id, 'Updated content')
self.assertTrue(result)
# Verify the note now shows updated content
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['content'], 'Updated content')
# Verify history exists
history = self.note_manager.get_note_history(note_id)
self.assertEqual(len(history), 2)
self.assertEqual(history[0]['content'], 'Updated content') # Newest first
self.assertEqual(history[0]['revision_number'], 2)
self.assertEqual(history[1]['content'], 'Original content')
self.assertEqual(history[1]['revision_number'], 1)
def test_update_note_multiple_times(self):
"""Test updating a note multiple times."""
note_id = self.note_manager.create_note(
self.case_id, 'Version 1', 'Detective Smith'
)
self.note_manager.update_note(note_id, 'Version 2')
self.note_manager.update_note(note_id, 'Version 3')
self.note_manager.update_note(note_id, 'Version 4')
# Verify latest version
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['content'], 'Version 4')
# Verify all revisions exist
history = self.note_manager.get_note_history(note_id)
self.assertEqual(len(history), 4)
for i, rev in enumerate(history):
self.assertEqual(rev['revision_number'], 4 - i)
def test_get_note_history_empty(self):
"""Test getting history for non-existent note."""
history = self.note_manager.get_note_history(99999)
self.assertEqual(history, [])
def test_update_note_tags(self):
"""Test updating question tags for a note."""
note_id = self.note_manager.create_note(
self.case_id, 'Test note', 'Detective Smith',
question_tags=['WHO']
)
# Verify initial tags
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['question_tags'], ['WHO'])
# Update tags
result = self.note_manager.update_note_tags(note_id, ['WHAT', 'WHEN', 'WHERE'])
self.assertTrue(result)
# Verify updated tags
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertIn('WHAT', note['question_tags'])
self.assertIn('WHEN', note['question_tags'])
self.assertIn('WHERE', note['question_tags'])
self.assertNotIn('WHO', note['question_tags'])
def test_update_note_tags_remove_all(self):
"""Test removing all tags from a note."""
note_id = self.note_manager.create_note(
self.case_id, 'Test note', 'Detective Smith',
question_tags=['WHO', 'WHAT']
)
# Remove all tags
result = self.note_manager.update_note_tags(note_id, [])
self.assertTrue(result)
# Verify no tags
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['question_tags'], [])
def test_update_note_tags_nonexistent_note(self):
"""Test updating tags for non-existent note."""
result = self.note_manager.update_note_tags(99999, ['WHO'])
self.assertFalse(result)
def test_delete_note(self):
"""Test deleting a note."""
note_id = self.note_manager.create_note(
self.case_id, 'Test note', 'Detective Smith'
)
# Verify note exists
note = self.note_manager.get_note(note_id)
self.assertIsNotNone(note)
# Delete note
result = self.note_manager.delete_note(note_id)
self.assertTrue(result)
# Verify note is gone
note = self.note_manager.get_note(note_id)
self.assertIsNone(note)
def test_delete_note_deletes_revisions(self):
"""Test that deleting a note also deletes all revisions."""
note_id = self.note_manager.create_note(
self.case_id, 'Original', 'Detective Smith'
)
self.note_manager.update_note(note_id, 'Updated')
self.note_manager.update_note(note_id, 'Updated again')
# Delete note
self.note_manager.delete_note(note_id)
# Verify revisions are gone
history = self.note_manager.get_note_history(note_id)
self.assertEqual(history, [])
def test_delete_nonexistent_note(self):
"""Test deleting a non-existent note returns False."""
result = self.note_manager.delete_note(99999)
self.assertFalse(result)
def test_search_notes_by_case(self):
"""Test searching notes by case ID."""
# Create second case
case_id2 = self.case_manager.create_case(
description='Second Case',
investigator_name='Detective Jones'
)
# Create notes in both cases
self.note_manager.create_note(self.case_id, 'Case 1 Note 1', 'Detective A')
self.note_manager.create_note(self.case_id, 'Case 1 Note 2', 'Detective B')
self.note_manager.create_note(case_id2, 'Case 2 Note 1', 'Detective C')
# Search by case
case1_notes = self.note_manager.search_notes(case_id=self.case_id)
case2_notes = self.note_manager.search_notes(case_id=case_id2)
self.assertEqual(len(case1_notes), 2)
self.assertEqual(len(case2_notes), 1)
def test_search_notes_by_content(self):
"""Test searching notes by content text."""
self.note_manager.create_note(self.case_id, 'This is about murder', 'Detective A')
self.note_manager.create_note(self.case_id, 'This is about theft', 'Detective B')
self.note_manager.create_note(self.case_id, 'Another murder note', 'Detective C')
# Search for "murder"
results = self.note_manager.search_notes(case_id=self.case_id, search_term='murder')
self.assertEqual(len(results), 2)
# Search for "theft"
results = self.note_manager.search_notes(case_id=self.case_id, search_term='theft')
self.assertEqual(len(results), 1)
def test_search_notes_by_question_tags(self):
"""Test searching notes by question tags."""
self.note_manager.create_note(
self.case_id, 'Note 1', 'Detective A', question_tags=['WHO', 'WHAT']
)
self.note_manager.create_note(
self.case_id, 'Note 2', 'Detective B', question_tags=['WHEN', 'WHERE']
)
self.note_manager.create_note(
self.case_id, 'Note 3', 'Detective C', question_tags=['WHO', 'WHEN']
)
# Search for WHO tags
results = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHO'])
self.assertEqual(len(results), 2)
# Search for WHERE tags
results = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHERE'])
self.assertEqual(len(results), 1)
def test_search_notes_combined_filters(self):
"""Test searching notes with multiple filters."""
self.note_manager.create_note(
self.case_id, 'Murder investigation note', 'Detective A', question_tags=['WHO']
)
self.note_manager.create_note(
self.case_id, 'Theft investigation note', 'Detective B', question_tags=['WHO']
)
self.note_manager.create_note(
self.case_id, 'Murder witness statement', 'Detective C', question_tags=['WHAT']
)
# Search for "murder" with WHO tag
results = self.note_manager.search_notes(
case_id=self.case_id,
search_term='murder',
question_tags=['WHO']
)
self.assertEqual(len(results), 1)
self.assertIn('Murder investigation', results[0]['content'])
def test_search_notes_all_cases(self):
"""Test searching notes across all cases."""
case_id2 = self.case_manager.create_case(
description='Second Case',
investigator_name='Detective Jones'
)
self.note_manager.create_note(self.case_id, 'Case 1 evidence', 'Detective A')
self.note_manager.create_note(case_id2, 'Case 2 evidence', 'Detective B')
# Search without case_id filter
results = self.note_manager.search_notes(search_term='evidence')
self.assertEqual(len(results), 2)
def test_investigator_reuse(self):
"""Test that investigators are reused across notes."""
note_id1 = self.note_manager.create_note(
self.case_id, 'Note 1', 'Detective Smith'
)
note_id2 = self.note_manager.create_note(
self.case_id, 'Note 2', 'Detective Smith'
)
# Both notes should reference the same investigator
note1 = self.note_manager.get_note(note_id1)
note2 = self.note_manager.get_note(note_id2)
assert note1 is not None and note2 is not None # Type narrowing for Pylance
self.assertEqual(note1['investigator'], note2['investigator'])
def test_question_definition_reuse(self):
"""Test that question definitions are reused within a case."""
note_id1 = self.note_manager.create_note(
self.case_id, 'Note 1', 'Detective A', question_tags=['WHO']
)
note_id2 = self.note_manager.create_note(
self.case_id, 'Note 2', 'Detective B', question_tags=['WHO']
)
# Both notes should use the same question definition
# Verify by checking the database directly
cursor = self.note_manager.cursor
cursor.execute("""
SELECT COUNT(DISTINCT question_id) as count
FROM question_definition
WHERE case_id = ? AND question_text = 'WHO'
""", (self.case_id,))
result = cursor.fetchone()
self.assertEqual(result['count'], 1)
def test_note_manager_with_default_db_path(self):
"""Test NoteManager uses config default when no path provided."""
with patch('forensictrails.core.note_manager.config') as mock_config:
mock_config.database_path = self.test_db_path
nm = NoteManager()
self.assertEqual(nm.db_path, self.test_db_path)
nm.conn.close()
class TestNoteManagerIntegration(unittest.TestCase):
"""Integration tests for NoteManager."""
def setUp(self):
"""Set up test fixtures."""
self.temp_dir = tempfile.mkdtemp()
self.test_db_path = os.path.join(self.temp_dir, 'integration.db')
self.schema_path = Path(__file__).parent.parent / 'src' / 'forensictrails' / 'db' / 'schema.sql'
create_fresh_database(self.test_db_path, self.schema_path)
self.case_manager = CaseManager(self.test_db_path)
self.note_manager = NoteManager(self.test_db_path)
self.case_id = self.case_manager.create_case(
description='Integration Test Case',
investigator_name='Detective Smith'
)
def tearDown(self):
"""Clean up test fixtures."""
if hasattr(self, 'note_manager') and hasattr(self.note_manager, 'conn'):
self.note_manager.conn.close()
if hasattr(self, 'case_manager') and hasattr(self.case_manager, 'conn'):
self.case_manager.conn.close()
if os.path.exists(self.test_db_path):
os.remove(self.test_db_path)
os.rmdir(self.temp_dir)
def test_full_note_lifecycle(self):
"""Test complete note lifecycle: create, update, tag, delete."""
# Create note
note_id = self.note_manager.create_note(
self.case_id,
'Initial observation',
'Detective Smith',
question_tags=['WHAT']
)
# Verify creation
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['content'], 'Initial observation')
self.assertEqual(note['question_tags'], ['WHAT'])
# Update content
self.note_manager.update_note(note_id, 'Updated observation with more details')
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(note['content'], 'Updated observation with more details')
# Update tags
self.note_manager.update_note_tags(note_id, ['WHAT', 'WHERE', 'WHEN'])
note = self.note_manager.get_note(note_id)
assert note is not None # Type narrowing for Pylance
self.assertEqual(len(note['question_tags']), 3)
# Verify history
history = self.note_manager.get_note_history(note_id)
self.assertEqual(len(history), 2)
# Delete note
self.note_manager.delete_note(note_id)
note = self.note_manager.get_note(note_id)
self.assertIsNone(note)
def test_case_with_multiple_notes_and_tags(self):
"""Test a case with multiple notes and complex tagging."""
# Create various notes with different tags
notes_data = [
('Suspect identified as John Doe', ['WHO']),
('Crime occurred at 123 Main St', ['WHERE']),
('Incident happened on Jan 15, 2025', ['WHEN']),
('Weapon used was a knife', ['WITH_WHAT', 'WHAT']),
('Motive appears to be robbery', ['WHY', 'WHAT']),
]
note_ids = []
for content, tags in notes_data:
note_id = self.note_manager.create_note(
self.case_id, content, 'Detective Smith', question_tags=tags
)
note_ids.append(note_id)
# Verify all notes created
all_notes = self.note_manager.get_notes(self.case_id)
self.assertEqual(len(all_notes), 5)
# Search by specific tags
who_notes = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHO'])
self.assertEqual(len(who_notes), 1)
what_notes = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHAT'])
# Should find notes with WHAT tag: 'Weapon used was a knife' and 'Motive appears to be robbery'
self.assertEqual(len(what_notes), 2)
def test_note_revision_immutability(self):
"""Test that note revisions are truly immutable."""
note_id = self.note_manager.create_note(
self.case_id, 'Version 1', 'Detective Smith'
)
# Create multiple revisions
for i in range(2, 6):
self.note_manager.update_note(note_id, f'Version {i}')
# Get full history
history = self.note_manager.get_note_history(note_id)
# Verify all revisions are preserved
self.assertEqual(len(history), 5)
# Verify each revision has correct content
for i, revision in enumerate(reversed(history)):
self.assertEqual(revision['revision_number'], i + 1)
self.assertEqual(revision['content'], f'Version {i + 1}')
# Verify timestamp exists
self.assertIsNotNone(revision['timestamp'])
if __name__ == '__main__':
unittest.main()