diff --git a/src/forensictrails/core/__init__.py b/src/forensictrails/core/__init__.py index e69de29..39e4907 100644 --- a/src/forensictrails/core/__init__.py +++ b/src/forensictrails/core/__init__.py @@ -0,0 +1,6 @@ +"""Core business logic modules for ForensicTrails.""" + +from .case_manager import CaseManager +from .note_manager import NoteManager + +__all__ = ['CaseManager', 'NoteManager'] diff --git a/src/forensictrails/core/case_manager.py b/src/forensictrails/core/case_manager.py index c74bd01..f9d1cbe 100644 --- a/src/forensictrails/core/case_manager.py +++ b/src/forensictrails/core/case_manager.py @@ -1,69 +1,208 @@ +"""Case management for forensic investigations. + +This module handles all operations related to investigation cases, +including creation, retrieval, updates, and lifecycle management. +""" + from ..utils.config import config from ..db.database import get_db_connection import logging from datetime import datetime, timezone -class CaseManager(): + +class CaseManager: + """Manager for forensic investigation cases.""" + def __init__(self, db_path=None): + """Initialize the CaseManager. + + Args: + db_path: Optional path to database file. Uses config default if None. + """ if db_path is None: db_path = config.database_path self.db_path = db_path self.conn = get_db_connection(self.db_path) self.cursor = self.conn.cursor() - logging.debug(f"Connected to database at {self.db_path}") + logging.debug(f"CaseManager connected to database at {self.db_path}") - def create_case(self, case_id, case_title, investigator, classification=None, summary=None): + def create_case(self, description, investigator_name, investigator_role=None, status='Open'): + """Create a new case. + + Args: + description: Case description text + investigator_name: Name of the investigator + investigator_role: Optional role of the investigator + status: Case status ('Open', 'Closed', 'Archived'), default 'Open' + + Returns: + case_id: The ID of the newly created case + """ with self.conn: + # Insert case self.cursor.execute(""" - INSERT INTO cases (case_id, case_title, investigator, classification, summary, status) - VALUES (?, ?, ?, ?, ?, 'active') - """, (case_id, case_title, investigator, classification, summary)) + INSERT INTO "case" (description, status) + VALUES (?, ?) + """, (description, status)) + + case_id = self.cursor.lastrowid logging.info(f"Created new case with ID: {case_id}") + # Handle investigator - find or create + investigator_id = self._get_or_create_investigator(investigator_name, investigator_role) + + # Link investigator to case + self.cursor.execute(""" + INSERT INTO investigator_case (investigator_id, case_id) + VALUES (?, ?) + """, (investigator_id, case_id)) + + return case_id + + + def _get_or_create_investigator(self, name, role=None): + """Get or create an investigator by name. + + Args: + name: Investigator name + role: Optional investigator role + + Returns: + Integer investigator ID + """ + self.cursor.execute("SELECT investigator_id FROM investigator WHERE name = ?", (name,)) + result = self.cursor.fetchone() + + if result: + return result['investigator_id'] + else: + self.cursor.execute(""" + INSERT INTO investigator (name, role) + VALUES (?, ?) + """, (name, role)) + return self.cursor.lastrowid + def get_case(self, case_id): + """Get a case by ID. + + Args: + case_id: Integer case ID + + Returns: + dict with case data including investigators, or None if not found + """ with self.conn: - self.cursor.execute("SELECT * FROM cases WHERE case_id = ?", (case_id,)) + self.cursor.execute(""" + SELECT c.case_id, c.description, c.status, c.created, c.modified, c.closed, + GROUP_CONCAT(i.name, ', ') as investigators + FROM "case" c + LEFT JOIN investigator_case ic ON c.case_id = ic.case_id + LEFT JOIN investigator i ON ic.investigator_id = i.investigator_id + WHERE c.case_id = ? + GROUP BY c.case_id + """, (case_id,)) + case = self.cursor.fetchone() + if case: - return dict(case) + return { + 'case_id': case['case_id'], + 'description': case['description'], + 'investigators': case['investigators'] or '', + 'status': case['status'], + 'created': case['created'], + 'modified': case['modified'], + 'closed': case['closed'] + } else: logging.warning(f"No case found with ID: {case_id}") return None def list_cases(self, status=None, search_term=None): + """List cases with optional filtering. + + Args: + status: Optional status filter ('Open', 'Closed', 'Archived') + search_term: Optional search term for description or investigators + + Returns: + list of case dictionaries + """ with self.conn: - query = "SELECT * FROM cases WHERE 1=1" + query = """ + SELECT c.case_id, c.description, c.status, c.created, c.modified, c.closed, + GROUP_CONCAT(i.name, ', ') as investigators + FROM "case" c + LEFT JOIN investigator_case ic ON c.case_id = ic.case_id + LEFT JOIN investigator i ON ic.investigator_id = i.investigator_id + WHERE 1=1 + """ params = [] - conditions = [] if status: - query += " AND status = ?" + query += " AND c.status = ?" params.append(status) + + query += " GROUP BY c.case_id" + if search_term: - query += " AND (case_id LIKE ? OR case_title LIKE ? OR investigator LIKE ?)" + query += " HAVING c.description LIKE ? OR investigators LIKE ?" like_term = f"%{search_term}%" - params.extend([like_term, like_term, like_term]) + params.extend([like_term, like_term]) - query += " ORDER BY created_at DESC" + query += " ORDER BY c.created DESC" self.cursor.execute(query, params) - return [dict(row) for row in self.cursor.fetchall()] + cases = [] + for row in self.cursor.fetchall(): + case_dict = { + 'case_id': row['case_id'], + 'description': row['description'], + 'investigators': row['investigators'] or '', + 'status': row['status'], + 'created': row['created'], + 'modified': row['modified'], + 'closed': row['closed'] + } + cases.append(case_dict) + + return cases - def update_case(self, case_id, **kwargs): - allowed_fields = ['case_title', 'investigator', 'classification', 'summary', 'status'] - updates = {k: v for k, v in kwargs.items() if k in allowed_fields} + def update_case(self, case_id, description=None, status=None): + """Update a case. + + Args: + case_id: Integer case ID + description: Optional new description + status: Optional new status ('Open', 'Closed', 'Archived') + + Returns: + bool indicating success + """ + updates = {} + if description is not None: + updates['description'] = description + if status is not None: + updates['status'] = status if not updates: logging.warning("No valid fields provided for update.") - return + return False - updates['modified_at'] = datetime.now(timezone.utc).isoformat() - set_clause = ", ".join([f"{k} = ?" for k in updates.keys()]) - values = list(updates.values()) + [case_id] + # Always update modified timestamp + set_parts = [] + values = [] + for k, v in updates.items(): + set_parts.append(f"{k} = ?") + values.append(v) + + set_parts.append("modified = CURRENT_TIMESTAMP") + set_clause = ", ".join(set_parts) + values.append(case_id) with self.conn: self.cursor.execute(f""" - UPDATE cases + UPDATE "case" SET {set_clause} WHERE case_id = ? """, values) @@ -71,69 +210,81 @@ class CaseManager(): return self.cursor.rowcount > 0 def close_case(self, case_id): - return self.update_case(case_id, status='closed') + """Close a case and set the closed timestamp. + + Args: + case_id: Integer case ID + + Returns: + bool indicating success + """ + with self.conn: + self.cursor.execute(""" + UPDATE "case" + SET status = 'Closed', closed = CURRENT_TIMESTAMP, modified = CURRENT_TIMESTAMP + WHERE case_id = ? + """, (case_id,)) + logging.info(f"Closed case with ID: {case_id}") + return self.cursor.rowcount > 0 def delete_case(self, case_id): + """Delete a case. + + Args: + case_id: Integer case ID + + Returns: + bool indicating success + """ with self.conn: - self.cursor.execute("DELETE FROM cases WHERE case_id = ?", (case_id,)) + self.cursor.execute("DELETE FROM \"case\" WHERE case_id = ?", (case_id,)) logging.info(f"Deleted case with ID: {case_id}") return self.cursor.rowcount > 0 def archive_case(self, case_id): - return self.update_case(case_id, status='archived') + """Archive a case. + + Args: + case_id: Integer case ID + + Returns: + bool indicating success + """ + return self.update_case(case_id, status='Archived') def export_case_db(self, case_id, export_path): + """Export a case and all related data to a separate SQLite file. + + Args: + case_id: Integer case ID + export_path: Path where to save the exported database + + Returns: + bool indicating success + + TODO: Implement export functionality + """ # TODO: Implement export functionality # should export a .sqlite file with only the data related to the specified case_id - pass + raise NotImplementedError("Case export functionality not yet implemented") def import_case_db(self, import_path): + """Import a case database and merge into the main database. + + Args: + import_path: Path to the database file to import + + Returns: + Integer case ID of the imported case + + TODO: Implement import functionality + """ # TODO: Implement import functionality # should import a .sqlite file and merge its data into the main database - pass + raise NotImplementedError("Case import functionality not yet implemented") - def create_note(self, case_id, content, investigator, question_tags=None): - """Create a new note for a case.""" - import hashlib - import uuid - import json - - note_id = str(uuid.uuid4()) - timestamp = datetime.now(timezone.utc).isoformat() - - # Create hash of content + timestamp for integrity - hash_content = f"{content}{timestamp}".encode('utf-8') - content_hash = hashlib.sha256(hash_content).hexdigest() - - # Convert tags to JSON - tags_json = json.dumps(question_tags) if question_tags else None - - with self.conn: - self.cursor.execute(""" - INSERT INTO notes (note_id, case_id, timestamp, content, investigator, question_tags, hash) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, (note_id, case_id, timestamp, content, investigator, tags_json, content_hash)) - logging.info(f"Created note {note_id} for case {case_id}") - return note_id - - def get_notes(self, case_id): - """Get all notes for a case, ordered by timestamp.""" - with self.conn: - self.cursor.execute(""" - SELECT * FROM notes - WHERE case_id = ? - ORDER BY timestamp DESC - """, (case_id,)) - return [dict(row) for row in self.cursor.fetchall()] - - def get_note(self, note_id): - """Get a specific note by ID.""" - with self.conn: - self.cursor.execute("SELECT * FROM notes WHERE note_id = ?", (note_id,)) - note = self.cursor.fetchone() - if note: - return dict(note) - else: - logging.warning(f"No note found with ID: {note_id}") - return None - \ No newline at end of file + def close(self): + """Close the database connection.""" + if self.conn: + self.conn.close() + logging.debug("CaseManager database connection closed") diff --git a/src/forensictrails/core/note_manager.py b/src/forensictrails/core/note_manager.py new file mode 100644 index 0000000..1500070 --- /dev/null +++ b/src/forensictrails/core/note_manager.py @@ -0,0 +1,414 @@ +"""Note management for forensic investigation cases. + +This module handles all operations related to investigation notes, +including creation, retrieval, updates, and revision history. +Notes are immutable by design - updates create new revisions. +""" + +from ..utils.config import config +from ..db.database import get_db_connection +import logging + + +class NoteManager: + """Manager for investigation notes with immutable revision history.""" + + def __init__(self, db_path=None): + """Initialize the NoteManager. + + Args: + db_path: Optional path to database file. Uses config default if None. + """ + if db_path is None: + db_path = config.database_path + self.db_path = db_path + self.conn = get_db_connection(self.db_path) + self.cursor = self.conn.cursor() + logging.debug(f"NoteManager connected to database at {self.db_path}") + + def _get_or_create_investigator(self, name, role=None): + """Get or create an investigator by name. + + Args: + name: Investigator name + role: Optional investigator role + + Returns: + Integer investigator ID + """ + self.cursor.execute("SELECT investigator_id FROM investigator WHERE name = ?", (name,)) + result = self.cursor.fetchone() + + if result: + return result['investigator_id'] + else: + self.cursor.execute(""" + INSERT INTO investigator (name, role) + VALUES (?, ?) + """, (name, role)) + return self.cursor.lastrowid + + def _get_or_create_question(self, case_id, question_text): + """Get or create a question definition for a case. + + Args: + case_id: Integer case ID + question_text: Question text (e.g., "WHO", "WHAT", etc.) + + Returns: + Integer question ID + """ + # Check if this question already exists for this case + self.cursor.execute(""" + SELECT question_id FROM question_definition + WHERE case_id = ? AND question_text = ? + """, (case_id, question_text)) + + result = self.cursor.fetchone() + if result: + return result['question_id'] + + # Create new question + # Determine display order (last + 1) + self.cursor.execute(""" + SELECT MAX(display_order) as max_order + FROM question_definition + WHERE case_id = ? + """, (case_id,)) + + max_order = self.cursor.fetchone()['max_order'] + next_order = (max_order or 0) + 1 + + self.cursor.execute(""" + INSERT INTO question_definition (case_id, question_text, display_order) + VALUES (?, ?, ?) + """, (case_id, question_text, next_order)) + + return self.cursor.lastrowid + + def create_note(self, case_id, content, investigator, question_tags=None): + """Create a new note for a case. + + Args: + case_id: Integer case ID + content: Note content text + investigator: Investigator name + question_tags: Optional list of question tags (e.g., ["WHO", "WHAT"]) + + Returns: + Integer note ID or None on error + """ + # Get or create investigator + investigator_id = self._get_or_create_investigator(investigator) + + with self.conn: + # Create the note container + self.cursor.execute(""" + INSERT INTO note (case_id, investigator_id) + VALUES (?, ?) + """, (case_id, investigator_id)) + + note_id = self.cursor.lastrowid + + # Create the first revision + self.cursor.execute(""" + INSERT INTO note_revision (note_id, content, revision_number) + VALUES (?, ?, 1) + """, (note_id, content)) + + # Handle question tags if provided + if question_tags: + for tag in question_tags: + # Find or create question definition for this case + question_id = self._get_or_create_question(case_id, tag) + + # Tag the note with the question + self.cursor.execute(""" + INSERT INTO note_question_tag (note_id, question_id) + VALUES (?, ?) + """, (note_id, question_id)) + + logging.info(f"Created note {note_id} for case {case_id}") + return note_id + + def get_note(self, note_id): + """Get a specific note by ID with its latest revision. + + Args: + note_id: Integer note ID + + Returns: + Note dictionary with keys: + - note_id: Integer note ID + - case_id: Integer case ID + - timestamp: Note creation timestamp + - content: Latest revision content + - investigator: Investigator name + - question_tags: List of question tag strings + Returns None if not found. + """ + with self.conn: + self.cursor.execute(""" + SELECT + n.note_id, + n.case_id, + n.created as timestamp, + i.name as investigator, + nr.content, + nr.revision_number, + GROUP_CONCAT(qd.question_text, '|') as question_tags + FROM note n + JOIN investigator i ON n.investigator_id = i.investigator_id + JOIN note_revision nr ON n.note_id = nr.note_id + LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id + LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id + WHERE n.note_id = ? + AND nr.revision_number = ( + SELECT MAX(revision_number) + FROM note_revision + WHERE note_id = n.note_id + ) + GROUP BY n.note_id + """, (note_id,)) + + row = self.cursor.fetchone() + if row: + note_dict = { + 'note_id': row['note_id'], + 'case_id': row['case_id'], + 'timestamp': row['timestamp'], + 'content': row['content'], + 'investigator': row['investigator'], + 'question_tags': row['question_tags'].split('|') if row['question_tags'] else [] + } + return note_dict + else: + logging.warning(f"No note found with ID: {note_id}") + return None + + def get_notes(self, case_id): + """Get all notes for a case with their latest revision. + + Args: + case_id: Integer case ID + + Returns: + List of note dictionaries (see get_note for dictionary structure) + """ + with self.conn: + self.cursor.execute(""" + SELECT + n.note_id, + n.case_id, + n.created as timestamp, + i.name as investigator, + nr.content, + nr.revision_number, + GROUP_CONCAT(qd.question_text, '|') as question_tags + FROM note n + JOIN investigator i ON n.investigator_id = i.investigator_id + JOIN note_revision nr ON n.note_id = nr.note_id + LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id + LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id + WHERE n.case_id = ? + AND nr.revision_number = ( + SELECT MAX(revision_number) + FROM note_revision + WHERE note_id = n.note_id + ) + GROUP BY n.note_id + ORDER BY n.created DESC + """, (case_id,)) + + notes = [] + for row in self.cursor.fetchall(): + note_dict = { + 'note_id': row['note_id'], + 'case_id': row['case_id'], + 'timestamp': row['timestamp'], + 'content': row['content'], + 'investigator': row['investigator'], + 'question_tags': row['question_tags'].split('|') if row['question_tags'] else [] + } + notes.append(note_dict) + + return notes + + def update_note(self, note_id, content): + """Update a note by creating a new revision (immutable history). + + Args: + note_id: Integer note ID + content: New content text + + Returns: + bool indicating success + """ + with self.conn: + # Get current max revision number + self.cursor.execute(""" + SELECT MAX(revision_number) as max_rev + FROM note_revision + WHERE note_id = ? + """, (note_id,)) + + result = self.cursor.fetchone() + max_rev = result['max_rev'] if result else 0 + next_rev = max_rev + 1 + + # Insert new revision + self.cursor.execute(""" + INSERT INTO note_revision (note_id, content, revision_number) + VALUES (?, ?, ?) + """, (note_id, content, next_rev)) + + logging.info(f"Created revision {next_rev} for note {note_id}") + return True + + def get_note_history(self, note_id): + """Get all revisions of a note (revision history). + + Args: + note_id: Integer note ID + + Returns: + List of revision dictionaries with keys: + - revision_id: Integer revision ID + - note_id: Integer note ID + - content: Revision content + - timestamp: Revision creation timestamp + - revision_number: Sequential revision number + Ordered by revision_number DESC (newest first). + """ + with self.conn: + self.cursor.execute(""" + SELECT revision_id, note_id, content, timestamp, revision_number + FROM note_revision + WHERE note_id = ? + ORDER BY revision_number DESC + """, (note_id,)) + + return [dict(row) for row in self.cursor.fetchall()] + + def update_note_tags(self, note_id, question_tags): + """Update the question tags for a note. + + Args: + note_id: Integer note ID + question_tags: List of question tag strings + + Returns: + bool indicating success + """ + with self.conn: + # First, get the case_id for this note + self.cursor.execute("SELECT case_id FROM note WHERE note_id = ?", (note_id,)) + result = self.cursor.fetchone() + if not result: + logging.warning(f"Cannot update tags: note {note_id} not found") + return False + + case_id = result['case_id'] + + # Remove existing tags + self.cursor.execute("DELETE FROM note_question_tag WHERE note_id = ?", (note_id,)) + + # Add new tags + if question_tags: + for tag in question_tags: + question_id = self._get_or_create_question(case_id, tag) + self.cursor.execute(""" + INSERT INTO note_question_tag (note_id, question_id) + VALUES (?, ?) + """, (note_id, question_id)) + + logging.info(f"Updated tags for note {note_id}") + return True + + def delete_note(self, note_id): + """Delete a note and all its revisions. + + Args: + note_id: Integer note ID + + Returns: + bool indicating success + """ + with self.conn: + self.cursor.execute("DELETE FROM note WHERE note_id = ?", (note_id,)) + logging.info(f"Deleted note with ID: {note_id}") + return self.cursor.rowcount > 0 + + def search_notes(self, case_id=None, search_term=None, question_tags=None): + """Search notes with optional filters. + + Args: + case_id: Optional case ID filter + search_term: Optional text to search in note content + question_tags: Optional list of question tags to filter by + + Returns: + List of note dictionaries matching the criteria + """ + with self.conn: + query = """ + SELECT + n.note_id, + n.case_id, + n.created as timestamp, + i.name as investigator, + nr.content, + nr.revision_number, + GROUP_CONCAT(qd.question_text, '|') as question_tags + FROM note n + JOIN investigator i ON n.investigator_id = i.investigator_id + JOIN note_revision nr ON n.note_id = nr.note_id + LEFT JOIN note_question_tag nqt ON n.note_id = nqt.note_id + LEFT JOIN question_definition qd ON nqt.question_id = qd.question_id + WHERE nr.revision_number = ( + SELECT MAX(revision_number) + FROM note_revision + WHERE note_id = n.note_id + ) + """ + params = [] + + if case_id is not None: + query += " AND n.case_id = ?" + params.append(case_id) + + query += " GROUP BY n.note_id" + + if search_term: + query += " HAVING nr.content LIKE ?" + params.append(f"%{search_term}%") + + query += " ORDER BY n.created DESC" + + self.cursor.execute(query, params) + + notes = [] + for row in self.cursor.fetchall(): + note_dict = { + 'note_id': row['note_id'], + 'case_id': row['case_id'], + 'timestamp': row['timestamp'], + 'content': row['content'], + 'investigator': row['investigator'], + 'question_tags': row['question_tags'].split('|') if row['question_tags'] else [] + } + + # Filter by question tags if specified + if question_tags: + if any(tag in note_dict['question_tags'] for tag in question_tags): + notes.append(note_dict) + else: + notes.append(note_dict) + + return notes + + def close(self): + """Close the database connection.""" + if self.conn: + self.conn.close() + logging.debug("NoteManager database connection closed") diff --git a/src/forensictrails/db/database.py b/src/forensictrails/db/database.py index 471d066..e21222e 100644 --- a/src/forensictrails/db/database.py +++ b/src/forensictrails/db/database.py @@ -6,10 +6,20 @@ import os from ..utils.config import config def get_db_connection(db_path=None): + """Get a connection to the SQLite database. + + Args: + db_path: Optional path to database file. Uses config default if None. + + Returns: + sqlite3.Connection with Row factory and foreign keys enabled + """ if db_path is None: db_path = config.database_path conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row + # Enable foreign key constraints (not enabled by default in SQLite) + conn.execute("PRAGMA foreign_keys = ON") return conn def validate_database_schema(db_path): @@ -23,8 +33,9 @@ def validate_database_schema(db_path): tables = {row[0] for row in cursor.fetchall()} required_tables = { - 'cases', 'notes', 'evidence', 'chain_of_custody', - 'attachments', 'question_entries', 'users', 'tasks' + 'case', 'investigator', 'note', 'note_revision', 'evidence', + 'question_definition', 'investigator_case', 'note_question_tag', + 'chain_of_custody' } # Check if all required tables exist diff --git a/src/forensictrails/db/schema.sql b/src/forensictrails/db/schema.sql index 3194222..c39986c 100644 --- a/src/forensictrails/db/schema.sql +++ b/src/forensictrails/db/schema.sql @@ -1,107 +1,130 @@ --- Cases table -CREATE TABLE cases ( - case_id TEXT PRIMARY KEY, - case_title TEXT NOT NULL, - date_opened TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - investigator TEXT NOT NULL, - classification TEXT, - summary TEXT, - status TEXT DEFAULT 'active', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +-- ForensicTrails Database Schema +-- SQLite Version +-- Based on normalized ER Diagram with immutable note revisions + +PRAGMA foreign_keys = ON; + +-- --- +-- Table: "case" +-- Represents a forensic investigation case +-- --- +CREATE TABLE IF NOT EXISTS "case" ( + case_id INTEGER PRIMARY KEY AUTOINCREMENT, + description TEXT, + status TEXT CHECK(status IN ('Open', 'Closed', 'Archived')) NOT NULL DEFAULT 'Open', + created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + closed TIMESTAMP DEFAULT NULL ); --- Notes table (append-only, immutable) -CREATE TABLE notes ( - note_id TEXT PRIMARY KEY, - case_id TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, +-- --- +-- Table: investigator +-- Represents forensic investigators +-- --- +CREATE TABLE IF NOT EXISTS investigator ( + investigator_id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL, + role TEXT DEFAULT NULL +); + +-- --- +-- Table: note +-- Represents investigation notes (immutable containers) +-- --- +CREATE TABLE IF NOT EXISTS note ( + note_id INTEGER PRIMARY KEY AUTOINCREMENT, + case_id INTEGER NOT NULL, + investigator_id INTEGER NOT NULL, + created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE, + FOREIGN KEY (investigator_id) REFERENCES investigator(investigator_id) +); + +CREATE INDEX IF NOT EXISTS idx_note_case ON note(case_id); +CREATE INDEX IF NOT EXISTS idx_note_created ON note(created); + +-- --- +-- Table: note_revision +-- Represents versions/edits of notes (immutable history) +-- --- +CREATE TABLE IF NOT EXISTS note_revision ( + revision_id INTEGER PRIMARY KEY AUTOINCREMENT, + note_id INTEGER NOT NULL, content TEXT NOT NULL, - investigator TEXT NOT NULL, - question_tags TEXT, -- JSON array: ["WHO", "WHAT", etc.] - hash TEXT NOT NULL, -- SHA256 of content + timestamp - FOREIGN KEY (case_id) REFERENCES cases(case_id) + timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + revision_number INTEGER NOT NULL, + FOREIGN KEY (note_id) REFERENCES note(note_id) ON DELETE CASCADE ); --- Evidence table -CREATE TABLE evidence ( - evidence_id TEXT PRIMARY KEY, - case_id TEXT, - description TEXT NOT NULL, - filename TEXT, - file_size INTEGER, - md5_hash TEXT, - sha256_hash TEXT, - source_origin TEXT, - received_date DATE, - received_by TEXT, - physical_location TEXT, - notes TEXT, - status TEXT DEFAULT 'Active', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (case_id) REFERENCES cases(case_id) +CREATE INDEX IF NOT EXISTS idx_revision_note ON note_revision(note_id); + +-- --- +-- Table: evidence +-- Represents physical or digital evidence items +-- --- +CREATE TABLE IF NOT EXISTS evidence ( + evidence_id INTEGER PRIMARY KEY AUTOINCREMENT, + case_id INTEGER NOT NULL, + description TEXT, + sha256_hash TEXT DEFAULT NULL, + FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE ); --- Chain of Custody table -CREATE TABLE chain_of_custody ( - coc_id TEXT PRIMARY KEY, - evidence_id TEXT NOT NULL, +CREATE INDEX IF NOT EXISTS idx_evidence_case ON evidence(case_id); + +-- --- +-- Table: question_definition +-- Represents configurable investigation questions per case +-- --- +CREATE TABLE IF NOT EXISTS question_definition ( + question_id INTEGER PRIMARY KEY AUTOINCREMENT, + case_id INTEGER NOT NULL, + question_text TEXT NOT NULL, + display_order INTEGER NOT NULL DEFAULT 0, + FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_question_case ON question_definition(case_id); + +-- --- +-- Junction Table: investigator_case +-- Many-to-many: Investigators can work on multiple cases +-- --- +CREATE TABLE IF NOT EXISTS investigator_case ( + investigator_id INTEGER NOT NULL, + case_id INTEGER NOT NULL, + PRIMARY KEY (investigator_id, case_id), + FOREIGN KEY (investigator_id) REFERENCES investigator(investigator_id) ON DELETE CASCADE, + FOREIGN KEY (case_id) REFERENCES "case"(case_id) ON DELETE CASCADE +); + +-- --- +-- Junction Table: note_question_tag +-- Many-to-many: Notes can answer multiple questions +-- --- +CREATE TABLE IF NOT EXISTS note_question_tag ( + note_id INTEGER NOT NULL, + question_id INTEGER NOT NULL, + tagged_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (note_id, question_id), + FOREIGN KEY (note_id) REFERENCES note(note_id) ON DELETE CASCADE, + FOREIGN KEY (question_id) REFERENCES question_definition(question_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_note_question_question ON note_question_tag(question_id); + +-- --- +-- Chain of Custody table (kept for compatibility) +-- --- +CREATE TABLE IF NOT EXISTS chain_of_custody ( + coc_id INTEGER PRIMARY KEY AUTOINCREMENT, + evidence_id INTEGER NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - action TEXT NOT NULL, -- 'received', 'transferred', 'accessed', 'archived' + action TEXT NOT NULL, from_person TEXT, to_person TEXT, location TEXT, purpose TEXT, - signature_hash TEXT, -- Digital signature if needed - FOREIGN KEY (evidence_id) REFERENCES evidence(evidence_id) + signature_hash TEXT, + FOREIGN KEY (evidence_id) REFERENCES evidence(evidence_id) ON DELETE CASCADE ); - --- Attachments table (screenshots, documents) -CREATE TABLE attachments ( - attachment_id TEXT PRIMARY KEY, - case_id TEXT NOT NULL, - note_id TEXT, -- Optional link to specific note - filename TEXT NOT NULL, - file_path TEXT NOT NULL, - file_hash TEXT NOT NULL, - mime_type TEXT, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (case_id) REFERENCES cases(case_id), - FOREIGN KEY (note_id) REFERENCES notes(note_id) -); - --- Investigation Questions tracking -CREATE TABLE question_entries ( - entry_id TEXT PRIMARY KEY, - case_id TEXT NOT NULL, - note_id TEXT NOT NULL, - question_type TEXT NOT NULL, -- WHO/WHAT/WHEN/WHERE/HOW/WHY/WITH_WHAT - entry_text TEXT NOT NULL, - timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (case_id) REFERENCES cases(case_id), - FOREIGN KEY (note_id) REFERENCES notes(note_id) -); - --- User settings (for multi-user) -CREATE TABLE users ( - user_id TEXT PRIMARY KEY, - username TEXT UNIQUE NOT NULL, - full_name TEXT NOT NULL, - role TEXT DEFAULT 'Investigator', -- Investigator/Manager/Admin - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -); - --- Optional: Task assignments (team mode) -CREATE TABLE tasks ( - task_id TEXT PRIMARY KEY, - case_id TEXT NOT NULL, - title TEXT NOT NULL, - description TEXT, - assigned_to TEXT, - assigned_by TEXT, - priority TEXT, - due_date DATE, - status TEXT DEFAULT 'Open', - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (case_id) REFERENCES cases(case_id) -); \ No newline at end of file diff --git a/tests/test_case_manager.py b/tests/test_case_manager.py index 1ee8892..57ea92a 100644 --- a/tests/test_case_manager.py +++ b/tests/test_case_manager.py @@ -6,6 +6,7 @@ import sqlite3 from pathlib import Path from unittest.mock import patch from datetime import datetime +import time # Add the src directory to the path import sys @@ -47,43 +48,53 @@ class TestCaseManager(unittest.TestCase): def test_create_case(self): """Test creating a new case.""" - self.case_manager.create_case( - case_id='CASE-001', - case_title='Test Case', - investigator='Detective Smith', - classification='Homicide', - summary='Test summary' + case_id = self.case_manager.create_case( + description='Test homicide investigation', + investigator_name='Detective Smith', + investigator_role='Lead Detective' ) + self.assertIsNotNone(case_id) + self.assertIsInstance(case_id, int) + # Verify case was created - case = self.case_manager.get_case('CASE-001') + case = self.case_manager.get_case(case_id) self.assertIsNotNone(case) - self.assertEqual(case['case_id'], 'CASE-001') - self.assertEqual(case['case_title'], 'Test Case') - self.assertEqual(case['investigator'], 'Detective Smith') - self.assertEqual(case['classification'], 'Homicide') - self.assertEqual(case['summary'], 'Test summary') - self.assertEqual(case['status'], 'active') + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['case_id'], case_id) + self.assertEqual(case['description'], 'Test homicide investigation') + self.assertEqual(case['investigators'], 'Detective Smith') + self.assertEqual(case['status'], 'Open') - def test_create_case_minimal_fields(self): - """Test creating a case with only required fields.""" - self.case_manager.create_case( - case_id='CASE-002', - case_title='Minimal Case', - investigator='Detective Jones' + def test_create_case_minimal(self): + """Test creating a case with minimal fields.""" + case_id = self.case_manager.create_case( + description='Minimal case', + investigator_name='Detective Jones' ) - case = self.case_manager.get_case('CASE-002') + case = self.case_manager.get_case(case_id) self.assertIsNotNone(case) - self.assertEqual(case['case_id'], 'CASE-002') - self.assertEqual(case['case_title'], 'Minimal Case') - self.assertEqual(case['investigator'], 'Detective Jones') - self.assertIsNone(case['classification']) - self.assertIsNone(case['summary']) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['description'], 'Minimal case') + self.assertEqual(case['investigators'], 'Detective Jones') + self.assertEqual(case['status'], 'Open') + + def test_create_case_with_status(self): + """Test creating a case with specific status.""" + case_id = self.case_manager.create_case( + description='Archived case', + investigator_name='Detective Brown', + status='Archived' + ) + + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Archived') def test_get_case_nonexistent(self): """Test getting a non-existent case returns None.""" - case = self.case_manager.get_case('NONEXISTENT') + case = self.case_manager.get_case(99999) self.assertIsNone(case) def test_list_cases_empty(self): @@ -94,184 +105,178 @@ class TestCaseManager(unittest.TestCase): def test_list_cases_multiple(self): """Test listing multiple cases.""" # Create multiple cases - self.case_manager.create_case('CASE-001', 'Case One', 'Detective A') - self.case_manager.create_case('CASE-002', 'Case Two', 'Detective B') - self.case_manager.create_case('CASE-003', 'Case Three', 'Detective C') + case_id1 = self.case_manager.create_case('Case One', 'Detective A') + case_id2 = self.case_manager.create_case('Case Two', 'Detective B') + case_id3 = self.case_manager.create_case('Case Three', 'Detective C') cases = self.case_manager.list_cases() self.assertEqual(len(cases), 3) case_ids = [c['case_id'] for c in cases] - self.assertIn('CASE-001', case_ids) - self.assertIn('CASE-002', case_ids) - self.assertIn('CASE-003', case_ids) + self.assertIn(case_id1, case_ids) + self.assertIn(case_id2, case_ids) + self.assertIn(case_id3, case_ids) def test_list_cases_filter_by_status(self): """Test listing cases filtered by status.""" - self.case_manager.create_case('CASE-001', 'Active Case', 'Detective A') - self.case_manager.create_case('CASE-002', 'Another Active', 'Detective B') + case_id1 = self.case_manager.create_case('Active Case', 'Detective A') + case_id2 = self.case_manager.create_case('Another Active', 'Detective B') # Close one case - self.case_manager.close_case('CASE-001') + self.case_manager.close_case(case_id1) - # List only active cases - active_cases = self.case_manager.list_cases(status='active') - self.assertEqual(len(active_cases), 1) - self.assertEqual(active_cases[0]['case_id'], 'CASE-002') + # List only Open cases + open_cases = self.case_manager.list_cases(status='Open') + self.assertEqual(len(open_cases), 1) + self.assertEqual(open_cases[0]['case_id'], case_id2) - # List only closed cases - closed_cases = self.case_manager.list_cases(status='closed') + # List only Closed cases + closed_cases = self.case_manager.list_cases(status='Closed') self.assertEqual(len(closed_cases), 1) - self.assertEqual(closed_cases[0]['case_id'], 'CASE-001') + self.assertEqual(closed_cases[0]['case_id'], case_id1) def test_list_cases_search_term(self): """Test listing cases with search term.""" - self.case_manager.create_case('CASE-001', 'Murder Investigation', 'Detective Smith') - self.case_manager.create_case('CASE-002', 'Theft Case', 'Detective Jones') - self.case_manager.create_case('CASE-003', 'Murder Trial', 'Detective Brown') + self.case_manager.create_case('Murder investigation', 'Detective Smith') + self.case_manager.create_case('Theft case', 'Detective Jones') + self.case_manager.create_case('Murder trial', 'Detective Brown') - # Search by title + # Search by description results = self.case_manager.list_cases(search_term='Murder') self.assertEqual(len(results), 2) # Search by investigator results = self.case_manager.list_cases(search_term='Smith') self.assertEqual(len(results), 1) - self.assertEqual(results[0]['case_id'], 'CASE-001') - - # Search by case ID - results = self.case_manager.list_cases(search_term='002') - self.assertEqual(len(results), 1) - self.assertEqual(results[0]['case_id'], 'CASE-002') def test_list_cases_combined_filters(self): """Test listing cases with combined status and search filters.""" - self.case_manager.create_case('CASE-001', 'Active Murder', 'Detective A') - self.case_manager.create_case('CASE-002', 'Active Theft', 'Detective B') - self.case_manager.create_case('CASE-003', 'Closed Murder', 'Detective C') + case_id1 = self.case_manager.create_case('Active Murder', 'Detective A') + case_id2 = self.case_manager.create_case('Active Theft', 'Detective B') + case_id3 = self.case_manager.create_case('Closed Murder', 'Detective C') - self.case_manager.close_case('CASE-003') + self.case_manager.close_case(case_id3) - # Search for "Murder" in active cases only - results = self.case_manager.list_cases(status='active', search_term='Murder') + # Search for "Murder" in Open cases only + results = self.case_manager.list_cases(status='Open', search_term='Murder') self.assertEqual(len(results), 1) - self.assertEqual(results[0]['case_id'], 'CASE-001') + self.assertEqual(results[0]['case_id'], case_id1) - def test_update_case(self): - """Test updating a case.""" - self.case_manager.create_case('CASE-001', 'Original Title', 'Detective A') + def test_update_case_description(self): + """Test updating a case description.""" + case_id = self.case_manager.create_case('Original description', 'Detective A') result = self.case_manager.update_case( - 'CASE-001', - case_title='Updated Title', - classification='Homicide', - summary='Updated summary' + case_id, + description='Updated description' ) self.assertTrue(result) - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['case_title'], 'Updated Title') - self.assertEqual(case['classification'], 'Homicide') - self.assertEqual(case['summary'], 'Updated summary') - self.assertEqual(case['investigator'], 'Detective A') # Unchanged + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['description'], 'Updated description') - def test_update_case_invalid_fields(self): - """Test updating case with invalid fields ignores them.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + def test_update_case_status(self): + """Test updating a case status.""" + case_id = self.case_manager.create_case('Test case', 'Detective A') - result = self.case_manager.update_case( - 'CASE-001', - case_title='Updated Title', - invalid_field='Should be ignored' - ) - - # Should still work, just ignoring invalid field + result = self.case_manager.update_case(case_id, status='Archived') self.assertTrue(result) - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['case_title'], 'Updated Title') - self.assertNotIn('invalid_field', case) + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Archived') - def test_update_case_no_valid_fields(self): - """Test updating case with no valid fields.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + def test_update_case_both_fields(self): + """Test updating both description and status.""" + case_id = self.case_manager.create_case('Test case', 'Detective A') result = self.case_manager.update_case( - 'CASE-001', - invalid_field='Should be ignored' + case_id, + description='Updated description', + status='Closed' ) + self.assertTrue(result) - # Should return None since no valid fields - self.assertIsNone(result) + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['description'], 'Updated description') + self.assertEqual(case['status'], 'Closed') - def test_update_case_sets_modified_at(self): - """Test that updating a case sets modified_at timestamp.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + def test_update_case_no_fields(self): + """Test updating case with no fields returns False.""" + case_id = self.case_manager.create_case('Test case', 'Detective A') - case_before = self.case_manager.get_case('CASE-001') - created_at = case_before['created_at'] + result = self.case_manager.update_case(case_id) + self.assertFalse(result) + + def test_update_case_sets_modified_timestamp(self): + """Test that updating a case sets modified timestamp.""" + case_id = self.case_manager.create_case('Test case', 'Detective A') - # Small delay to ensure different timestamp - import time - time.sleep(0.01) + case_before = self.case_manager.get_case(case_id) + assert case_before is not None # Type narrowing for Pylance + created = case_before['created'] - self.case_manager.update_case('CASE-001', case_title='Updated') + time.sleep(1) # Ensure timestamp difference - case_after = self.case_manager.get_case('CASE-001') - modified_at = case_after['modified_at'] + self.case_manager.update_case(case_id, description='Updated') - # modified_at should be different from created_at - self.assertNotEqual(created_at, modified_at) + case_after = self.case_manager.get_case(case_id) + assert case_after is not None # Type narrowing for Pylance + modified = case_after['modified'] + + # Modified should be different from created + self.assertNotEqual(created, modified) def test_update_nonexistent_case(self): """Test updating a non-existent case returns False.""" - result = self.case_manager.update_case( - 'NONEXISTENT', - case_title='Updated Title' - ) - + result = self.case_manager.update_case(99999, description='Updated') self.assertFalse(result) def test_close_case(self): """Test closing a case.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + case_id = self.case_manager.create_case('Test case', 'Detective A') - result = self.case_manager.close_case('CASE-001') + result = self.case_manager.close_case(case_id) self.assertTrue(result) - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['status'], 'closed') + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Closed') + self.assertIsNotNone(case['closed']) def test_archive_case(self): """Test archiving a case.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + case_id = self.case_manager.create_case('Test case', 'Detective A') - result = self.case_manager.archive_case('CASE-001') + result = self.case_manager.archive_case(case_id) self.assertTrue(result) - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['status'], 'archived') + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Archived') def test_delete_case(self): """Test deleting a case.""" - self.case_manager.create_case('CASE-001', 'Test Case', 'Detective A') + case_id = self.case_manager.create_case('Test case', 'Detective A') # Verify case exists - case = self.case_manager.get_case('CASE-001') + case = self.case_manager.get_case(case_id) self.assertIsNotNone(case) # Delete case - result = self.case_manager.delete_case('CASE-001') + result = self.case_manager.delete_case(case_id) self.assertTrue(result) # Verify case is gone - case = self.case_manager.get_case('CASE-001') + case = self.case_manager.get_case(case_id) self.assertIsNone(case) def test_delete_nonexistent_case(self): """Test deleting a non-existent case returns False.""" - result = self.case_manager.delete_case('NONEXISTENT') + result = self.case_manager.delete_case(99999) self.assertFalse(result) def test_case_manager_with_default_db_path(self): @@ -282,6 +287,19 @@ class TestCaseManager(unittest.TestCase): cm = CaseManager() self.assertEqual(cm.db_path, self.test_db_path) cm.conn.close() + + def test_multiple_investigators_on_case(self): + """Test adding multiple investigators to a case.""" + case_id1 = self.case_manager.create_case('Case 1', 'Detective Smith') + case_id2 = self.case_manager.create_case('Case 2', 'Detective Smith') + + # Verify investigator is reused + case1 = self.case_manager.get_case(case_id1) + case2 = self.case_manager.get_case(case_id2) + assert case1 is not None and case2 is not None # Type narrowing for Pylance + + self.assertEqual(case1['investigators'], 'Detective Smith') + self.assertEqual(case2['investigators'], 'Detective Smith') class TestCaseManagerIntegration(unittest.TestCase): @@ -308,70 +326,69 @@ class TestCaseManagerIntegration(unittest.TestCase): def test_full_case_lifecycle(self): """Test complete case lifecycle: create, update, close, archive, delete.""" # Create case - self.case_manager.create_case( - 'CASE-001', - 'Investigation Case', - 'Detective Smith', - 'Robbery', - 'Initial summary' + case_id = self.case_manager.create_case( + 'Investigation case', + 'Detective Smith' ) # Verify creation - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['status'], 'active') + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Open') # Update case - self.case_manager.update_case( - 'CASE-001', - summary='Updated with new findings' - ) - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['summary'], 'Updated with new findings') + self.case_manager.update_case(case_id, description='Updated findings') + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['description'], 'Updated findings') # Close case - self.case_manager.close_case('CASE-001') - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['status'], 'closed') + self.case_manager.close_case(case_id) + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Closed') # Archive case - self.case_manager.archive_case('CASE-001') - case = self.case_manager.get_case('CASE-001') - self.assertEqual(case['status'], 'archived') + self.case_manager.archive_case(case_id) + case = self.case_manager.get_case(case_id) + assert case is not None # Type narrowing for Pylance + self.assertEqual(case['status'], 'Archived') # Delete case - self.case_manager.delete_case('CASE-001') - case = self.case_manager.get_case('CASE-001') + self.case_manager.delete_case(case_id) + case = self.case_manager.get_case(case_id) self.assertIsNone(case) def test_multiple_cases_workflow(self): """Test working with multiple cases simultaneously.""" # Create multiple cases + case_ids = [] for i in range(1, 6): - self.case_manager.create_case( - f'CASE-{i:03d}', + case_id = self.case_manager.create_case( f'Case {i}', f'Detective {chr(64+i)}' # Detective A, B, C, etc. ) + case_ids.append(case_id) # Verify all created all_cases = self.case_manager.list_cases() self.assertEqual(len(all_cases), 5) # Close some cases - self.case_manager.close_case('CASE-001') - self.case_manager.close_case('CASE-003') + self.case_manager.close_case(case_ids[0]) + self.case_manager.close_case(case_ids[2]) # Archive one - self.case_manager.archive_case('CASE-005') + self.case_manager.archive_case(case_ids[4]) # Check status distribution - active_cases = self.case_manager.list_cases(status='active') - closed_cases = self.case_manager.list_cases(status='closed') - archived_cases = self.case_manager.list_cases(status='archived') + open_cases = self.case_manager.list_cases(status='Open') + closed_cases = self.case_manager.list_cases(status='Closed') + archived_cases = self.case_manager.list_cases(status='Archived') - self.assertEqual(len(active_cases), 2) # 002, 004 - self.assertEqual(len(closed_cases), 2) # 001, 003 - self.assertEqual(len(archived_cases), 1) # 005 + self.assertEqual(len(open_cases), 2) # case_ids[1], case_ids[3] + self.assertEqual(len(closed_cases), 2) # case_ids[0], case_ids[2] + self.assertEqual(len(archived_cases), 1) # case_ids[4] if __name__ == '__main__': diff --git a/tests/test_database.py b/tests/test_database.py index f1e608b..b111f2a 100644 --- a/tests/test_database.py +++ b/tests/test_database.py @@ -78,15 +78,15 @@ class TestValidateDatabaseSchema(unittest.TestCase): # Create only some of the required tables cursor.execute(""" - CREATE TABLE cases ( - case_id TEXT PRIMARY KEY, - case_title TEXT NOT NULL + CREATE TABLE "case" ( + case_id INTEGER PRIMARY KEY, + description TEXT ) """) cursor.execute(""" - CREATE TABLE notes ( - note_id TEXT PRIMARY KEY, - case_id TEXT NOT NULL + CREATE TABLE note ( + note_id INTEGER PRIMARY KEY, + case_id INTEGER NOT NULL ) """) conn.commit() @@ -143,11 +143,13 @@ class TestCreateFreshDatabase(unittest.TestCase): conn.close() required_tables = { - 'cases', 'notes', 'evidence', 'chain_of_custody', - 'attachments', 'question_entries', 'users', 'tasks' + 'case', 'note', 'note_revision', 'investigator', + 'evidence', 'chain_of_custody', 'question_definition', + 'note_question_tag', 'investigator_case' } - self.assertEqual(tables, required_tables) + # sqlite_sequence is automatically created by SQLite for AUTOINCREMENT + self.assertTrue(required_tables.issubset(tables)) def test_create_fresh_database_returns_path(self): """Test that create_fresh_database returns the database path.""" @@ -170,10 +172,12 @@ class TestCreateFreshDatabase(unittest.TestCase): conn.close() required_tables = { - 'cases', 'notes', 'evidence', 'chain_of_custody', - 'attachments', 'question_entries', 'users', 'tasks' + 'case', 'note', 'note_revision', 'investigator', + 'evidence', 'chain_of_custody', 'question_definition', + 'note_question_tag', 'investigator_case' } - self.assertEqual(tables, required_tables) + # sqlite_sequence is automatically created by SQLite for AUTOINCREMENT + self.assertTrue(required_tables.issubset(tables)) class TestInitializeDatabase(unittest.TestCase): @@ -219,10 +223,11 @@ class TestInitializeDatabase(unittest.TestCase): conn = sqlite3.connect(self.test_db_path) cursor = conn.cursor() cursor.execute(""" - INSERT INTO cases (case_id, case_title, investigator) - VALUES ('TEST-001', 'Test Case', 'Test Investigator') + INSERT INTO "case" (description, status) + VALUES ('Test Case', 'Open') """) conn.commit() + case_id = cursor.lastrowid conn.close() # Initialize again @@ -231,12 +236,12 @@ class TestInitializeDatabase(unittest.TestCase): # Verify data still exists conn = sqlite3.connect(self.test_db_path) cursor = conn.cursor() - cursor.execute("SELECT case_id FROM cases WHERE case_id = 'TEST-001'") + cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = ?", (case_id,)) result = cursor.fetchone() conn.close() self.assertIsNotNone(result) - self.assertEqual(result[0], 'TEST-001') + self.assertEqual(result[0], case_id) @patch('forensictrails.db.database.config') def test_initialize_database_recreates_invalid_database(self, mock_config): @@ -248,8 +253,8 @@ class TestInitializeDatabase(unittest.TestCase): # Create an incomplete database conn = sqlite3.connect(self.test_db_path) cursor = conn.cursor() - cursor.execute("CREATE TABLE cases (case_id TEXT PRIMARY KEY)") - cursor.execute("INSERT INTO cases VALUES ('TEST-001')") + cursor.execute("CREATE TABLE \"case\" (case_id INTEGER PRIMARY KEY)") + cursor.execute("INSERT INTO \"case\" (case_id) VALUES (999)") conn.commit() conn.close() @@ -261,7 +266,7 @@ class TestInitializeDatabase(unittest.TestCase): conn = sqlite3.connect(self.test_db_path) cursor = conn.cursor() - cursor.execute("SELECT case_id FROM cases WHERE case_id = 'TEST-001'") + cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = 999") result = cursor.fetchone() conn.close() @@ -293,9 +298,9 @@ class TestShowDbSchema(unittest.TestCase): # Verify that logging.debug was called self.assertTrue(mock_logging.debug.called) - # Check that it was called for each table (8 tables + 1 header message) - # Should be at least 9 calls (header + 8 tables) - self.assertGreaterEqual(mock_logging.debug.call_count, 9) + # Check that it was called for each table (9 tables + 1 header message) + # Should be at least 10 calls (header + 9 tables) + self.assertGreaterEqual(mock_logging.debug.call_count, 10) def test_show_db_schema_doesnt_raise_exception(self): """Test that show_db_schema handles execution without raising exceptions.""" @@ -338,12 +343,13 @@ class TestDatabaseIntegration(unittest.TestCase): conn = get_db_connection(self.test_db_path) cursor = conn.cursor() cursor.execute(""" - INSERT INTO cases (case_id, case_title, investigator) - VALUES ('CASE-001', 'Murder Investigation', 'Detective Smith') + INSERT INTO "case" (description, status) + VALUES ('Murder Investigation', 'Open') """) + case_id = cursor.lastrowid cursor.execute(""" - INSERT INTO users (user_id, username, full_name) - VALUES ('USER-001', 'dsmith', 'Detective Smith') + INSERT INTO investigator (name, role) + VALUES ('Detective Smith', 'Lead Investigator') """) conn.commit() conn.close() @@ -351,15 +357,15 @@ class TestDatabaseIntegration(unittest.TestCase): # Step 3: Verify data exists conn = get_db_connection(self.test_db_path) cursor = conn.cursor() - cursor.execute("SELECT case_title FROM cases WHERE case_id = 'CASE-001'") + cursor.execute("SELECT description FROM \"case\" WHERE case_id = ?", (case_id,)) result = cursor.fetchone() - self.assertEqual(result['case_title'], 'Murder Investigation') + self.assertEqual(result['description'], 'Murder Investigation') conn.close() # Step 4: Corrupt database (remove a required table) conn = sqlite3.connect(self.test_db_path) cursor = conn.cursor() - cursor.execute("DROP TABLE users") + cursor.execute("DROP TABLE investigator") conn.commit() conn.close() @@ -373,7 +379,7 @@ class TestDatabaseIntegration(unittest.TestCase): self.assertTrue(validate_database_schema(self.test_db_path)) conn = get_db_connection(self.test_db_path) cursor = conn.cursor() - cursor.execute("SELECT case_id FROM cases WHERE case_id = 'CASE-001'") + cursor.execute("SELECT case_id FROM \"case\" WHERE case_id = ?", (case_id,)) result = cursor.fetchone() self.assertIsNone(result) conn.close() diff --git a/tests/test_note_manager.py b/tests/test_note_manager.py new file mode 100644 index 0000000..d3c3b8a --- /dev/null +++ b/tests/test_note_manager.py @@ -0,0 +1,524 @@ +"""Unit tests for the note_manager module.""" +import unittest +import tempfile +import os +import sqlite3 +from pathlib import Path +from unittest.mock import patch +from datetime import datetime +import time + +# Add the src directory to the path +import sys +sys.path.insert(0, str(Path(__file__).parent.parent / 'src')) + +from forensictrails.core.note_manager import NoteManager +from forensictrails.core.case_manager import CaseManager +from forensictrails.db.database import create_fresh_database + + +class TestNoteManager(unittest.TestCase): + """Test cases for NoteManager class.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.test_db_path = os.path.join(self.temp_dir, 'test.db') + self.schema_path = Path(__file__).parent.parent / 'src' / 'forensictrails' / 'db' / 'schema.sql' + + # Create a fresh database for testing + create_fresh_database(self.test_db_path, self.schema_path) + + # Create manager instances + self.case_manager = CaseManager(self.test_db_path) + self.note_manager = NoteManager(self.test_db_path) + + # Create a test case for notes + self.case_id = self.case_manager.create_case( + description='Test Case for Notes', + investigator_name='Detective Smith' + ) + + def tearDown(self): + """Clean up test fixtures.""" + if hasattr(self, 'note_manager') and hasattr(self.note_manager, 'conn'): + self.note_manager.conn.close() + if hasattr(self, 'case_manager') and hasattr(self.case_manager, 'conn'): + self.case_manager.conn.close() + + if os.path.exists(self.test_db_path): + os.remove(self.test_db_path) + os.rmdir(self.temp_dir) + + def test_note_manager_initialization(self): + """Test NoteManager initializes correctly.""" + self.assertEqual(self.note_manager.db_path, self.test_db_path) + self.assertIsNotNone(self.note_manager.conn) + self.assertIsNotNone(self.note_manager.cursor) + + def test_create_note_basic(self): + """Test creating a basic note.""" + note_id = self.note_manager.create_note( + case_id=self.case_id, + content='This is a test note.', + investigator='Detective Smith' + ) + + self.assertIsNotNone(note_id) + self.assertIsInstance(note_id, int) + + # Verify note was created + note = self.note_manager.get_note(note_id) + self.assertIsNotNone(note) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['note_id'], note_id) + self.assertEqual(note['case_id'], self.case_id) + self.assertEqual(note['content'], 'This is a test note.') + self.assertEqual(note['investigator'], 'Detective Smith') + self.assertEqual(note['question_tags'], []) + + def test_create_note_with_tags(self): + """Test creating a note with question tags.""" + note_id = self.note_manager.create_note( + case_id=self.case_id, + content='Who was at the scene?', + investigator='Detective Jones', + question_tags=['WHO', 'WHEN'] + ) + + note = self.note_manager.get_note(note_id) + self.assertIsNotNone(note) + assert note is not None # Type narrowing for Pylance + self.assertIn('WHO', note['question_tags']) + self.assertIn('WHEN', note['question_tags']) + + def test_get_note_nonexistent(self): + """Test getting a non-existent note returns None.""" + note = self.note_manager.get_note(99999) + self.assertIsNone(note) + + def test_get_notes_empty(self): + """Test getting notes for a case with no notes.""" + notes = self.note_manager.get_notes(self.case_id) + self.assertEqual(notes, []) + + def test_get_notes_multiple(self): + """Test getting multiple notes for a case.""" + # Create multiple notes + note_id1 = self.note_manager.create_note( + self.case_id, 'First note', 'Detective A' + ) + time.sleep(0.01) # Ensure different timestamps + note_id2 = self.note_manager.create_note( + self.case_id, 'Second note', 'Detective B' + ) + time.sleep(0.01) + note_id3 = self.note_manager.create_note( + self.case_id, 'Third note', 'Detective C' + ) + + notes = self.note_manager.get_notes(self.case_id) + + self.assertEqual(len(notes), 3) + note_ids = [n['note_id'] for n in notes] + self.assertIn(note_id1, note_ids) + self.assertIn(note_id2, note_ids) + self.assertIn(note_id3, note_ids) + + # Verify they're ordered by creation time (DESC) + # Check that timestamps are in descending order + timestamps = [n['timestamp'] for n in notes] + self.assertEqual(timestamps, sorted(timestamps, reverse=True)) + + def test_update_note_creates_revision(self): + """Test that updating a note creates a new revision.""" + note_id = self.note_manager.create_note( + self.case_id, 'Original content', 'Detective Smith' + ) + + # Update the note + result = self.note_manager.update_note(note_id, 'Updated content') + self.assertTrue(result) + + # Verify the note now shows updated content + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['content'], 'Updated content') + + # Verify history exists + history = self.note_manager.get_note_history(note_id) + self.assertEqual(len(history), 2) + self.assertEqual(history[0]['content'], 'Updated content') # Newest first + self.assertEqual(history[0]['revision_number'], 2) + self.assertEqual(history[1]['content'], 'Original content') + self.assertEqual(history[1]['revision_number'], 1) + + def test_update_note_multiple_times(self): + """Test updating a note multiple times.""" + note_id = self.note_manager.create_note( + self.case_id, 'Version 1', 'Detective Smith' + ) + + self.note_manager.update_note(note_id, 'Version 2') + self.note_manager.update_note(note_id, 'Version 3') + self.note_manager.update_note(note_id, 'Version 4') + + # Verify latest version + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['content'], 'Version 4') + + # Verify all revisions exist + history = self.note_manager.get_note_history(note_id) + self.assertEqual(len(history), 4) + for i, rev in enumerate(history): + self.assertEqual(rev['revision_number'], 4 - i) + + def test_get_note_history_empty(self): + """Test getting history for non-existent note.""" + history = self.note_manager.get_note_history(99999) + self.assertEqual(history, []) + + def test_update_note_tags(self): + """Test updating question tags for a note.""" + note_id = self.note_manager.create_note( + self.case_id, 'Test note', 'Detective Smith', + question_tags=['WHO'] + ) + + # Verify initial tags + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['question_tags'], ['WHO']) + + # Update tags + result = self.note_manager.update_note_tags(note_id, ['WHAT', 'WHEN', 'WHERE']) + self.assertTrue(result) + + # Verify updated tags + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertIn('WHAT', note['question_tags']) + self.assertIn('WHEN', note['question_tags']) + self.assertIn('WHERE', note['question_tags']) + self.assertNotIn('WHO', note['question_tags']) + + def test_update_note_tags_remove_all(self): + """Test removing all tags from a note.""" + note_id = self.note_manager.create_note( + self.case_id, 'Test note', 'Detective Smith', + question_tags=['WHO', 'WHAT'] + ) + + # Remove all tags + result = self.note_manager.update_note_tags(note_id, []) + self.assertTrue(result) + + # Verify no tags + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['question_tags'], []) + + def test_update_note_tags_nonexistent_note(self): + """Test updating tags for non-existent note.""" + result = self.note_manager.update_note_tags(99999, ['WHO']) + self.assertFalse(result) + + def test_delete_note(self): + """Test deleting a note.""" + note_id = self.note_manager.create_note( + self.case_id, 'Test note', 'Detective Smith' + ) + + # Verify note exists + note = self.note_manager.get_note(note_id) + self.assertIsNotNone(note) + + # Delete note + result = self.note_manager.delete_note(note_id) + self.assertTrue(result) + + # Verify note is gone + note = self.note_manager.get_note(note_id) + self.assertIsNone(note) + + def test_delete_note_deletes_revisions(self): + """Test that deleting a note also deletes all revisions.""" + note_id = self.note_manager.create_note( + self.case_id, 'Original', 'Detective Smith' + ) + self.note_manager.update_note(note_id, 'Updated') + self.note_manager.update_note(note_id, 'Updated again') + + # Delete note + self.note_manager.delete_note(note_id) + + # Verify revisions are gone + history = self.note_manager.get_note_history(note_id) + self.assertEqual(history, []) + + def test_delete_nonexistent_note(self): + """Test deleting a non-existent note returns False.""" + result = self.note_manager.delete_note(99999) + self.assertFalse(result) + + def test_search_notes_by_case(self): + """Test searching notes by case ID.""" + # Create second case + case_id2 = self.case_manager.create_case( + description='Second Case', + investigator_name='Detective Jones' + ) + + # Create notes in both cases + self.note_manager.create_note(self.case_id, 'Case 1 Note 1', 'Detective A') + self.note_manager.create_note(self.case_id, 'Case 1 Note 2', 'Detective B') + self.note_manager.create_note(case_id2, 'Case 2 Note 1', 'Detective C') + + # Search by case + case1_notes = self.note_manager.search_notes(case_id=self.case_id) + case2_notes = self.note_manager.search_notes(case_id=case_id2) + + self.assertEqual(len(case1_notes), 2) + self.assertEqual(len(case2_notes), 1) + + def test_search_notes_by_content(self): + """Test searching notes by content text.""" + self.note_manager.create_note(self.case_id, 'This is about murder', 'Detective A') + self.note_manager.create_note(self.case_id, 'This is about theft', 'Detective B') + self.note_manager.create_note(self.case_id, 'Another murder note', 'Detective C') + + # Search for "murder" + results = self.note_manager.search_notes(case_id=self.case_id, search_term='murder') + self.assertEqual(len(results), 2) + + # Search for "theft" + results = self.note_manager.search_notes(case_id=self.case_id, search_term='theft') + self.assertEqual(len(results), 1) + + def test_search_notes_by_question_tags(self): + """Test searching notes by question tags.""" + self.note_manager.create_note( + self.case_id, 'Note 1', 'Detective A', question_tags=['WHO', 'WHAT'] + ) + self.note_manager.create_note( + self.case_id, 'Note 2', 'Detective B', question_tags=['WHEN', 'WHERE'] + ) + self.note_manager.create_note( + self.case_id, 'Note 3', 'Detective C', question_tags=['WHO', 'WHEN'] + ) + + # Search for WHO tags + results = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHO']) + self.assertEqual(len(results), 2) + + # Search for WHERE tags + results = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHERE']) + self.assertEqual(len(results), 1) + + def test_search_notes_combined_filters(self): + """Test searching notes with multiple filters.""" + self.note_manager.create_note( + self.case_id, 'Murder investigation note', 'Detective A', question_tags=['WHO'] + ) + self.note_manager.create_note( + self.case_id, 'Theft investigation note', 'Detective B', question_tags=['WHO'] + ) + self.note_manager.create_note( + self.case_id, 'Murder witness statement', 'Detective C', question_tags=['WHAT'] + ) + + # Search for "murder" with WHO tag + results = self.note_manager.search_notes( + case_id=self.case_id, + search_term='murder', + question_tags=['WHO'] + ) + self.assertEqual(len(results), 1) + self.assertIn('Murder investigation', results[0]['content']) + + def test_search_notes_all_cases(self): + """Test searching notes across all cases.""" + case_id2 = self.case_manager.create_case( + description='Second Case', + investigator_name='Detective Jones' + ) + + self.note_manager.create_note(self.case_id, 'Case 1 evidence', 'Detective A') + self.note_manager.create_note(case_id2, 'Case 2 evidence', 'Detective B') + + # Search without case_id filter + results = self.note_manager.search_notes(search_term='evidence') + self.assertEqual(len(results), 2) + + def test_investigator_reuse(self): + """Test that investigators are reused across notes.""" + note_id1 = self.note_manager.create_note( + self.case_id, 'Note 1', 'Detective Smith' + ) + note_id2 = self.note_manager.create_note( + self.case_id, 'Note 2', 'Detective Smith' + ) + + # Both notes should reference the same investigator + note1 = self.note_manager.get_note(note_id1) + note2 = self.note_manager.get_note(note_id2) + assert note1 is not None and note2 is not None # Type narrowing for Pylance + + self.assertEqual(note1['investigator'], note2['investigator']) + + def test_question_definition_reuse(self): + """Test that question definitions are reused within a case.""" + note_id1 = self.note_manager.create_note( + self.case_id, 'Note 1', 'Detective A', question_tags=['WHO'] + ) + note_id2 = self.note_manager.create_note( + self.case_id, 'Note 2', 'Detective B', question_tags=['WHO'] + ) + + # Both notes should use the same question definition + # Verify by checking the database directly + cursor = self.note_manager.cursor + cursor.execute(""" + SELECT COUNT(DISTINCT question_id) as count + FROM question_definition + WHERE case_id = ? AND question_text = 'WHO' + """, (self.case_id,)) + + result = cursor.fetchone() + self.assertEqual(result['count'], 1) + + def test_note_manager_with_default_db_path(self): + """Test NoteManager uses config default when no path provided.""" + with patch('forensictrails.core.note_manager.config') as mock_config: + mock_config.database_path = self.test_db_path + + nm = NoteManager() + self.assertEqual(nm.db_path, self.test_db_path) + nm.conn.close() + + +class TestNoteManagerIntegration(unittest.TestCase): + """Integration tests for NoteManager.""" + + def setUp(self): + """Set up test fixtures.""" + self.temp_dir = tempfile.mkdtemp() + self.test_db_path = os.path.join(self.temp_dir, 'integration.db') + self.schema_path = Path(__file__).parent.parent / 'src' / 'forensictrails' / 'db' / 'schema.sql' + + create_fresh_database(self.test_db_path, self.schema_path) + + self.case_manager = CaseManager(self.test_db_path) + self.note_manager = NoteManager(self.test_db_path) + + self.case_id = self.case_manager.create_case( + description='Integration Test Case', + investigator_name='Detective Smith' + ) + + def tearDown(self): + """Clean up test fixtures.""" + if hasattr(self, 'note_manager') and hasattr(self.note_manager, 'conn'): + self.note_manager.conn.close() + if hasattr(self, 'case_manager') and hasattr(self.case_manager, 'conn'): + self.case_manager.conn.close() + + if os.path.exists(self.test_db_path): + os.remove(self.test_db_path) + os.rmdir(self.temp_dir) + + def test_full_note_lifecycle(self): + """Test complete note lifecycle: create, update, tag, delete.""" + # Create note + note_id = self.note_manager.create_note( + self.case_id, + 'Initial observation', + 'Detective Smith', + question_tags=['WHAT'] + ) + + # Verify creation + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['content'], 'Initial observation') + self.assertEqual(note['question_tags'], ['WHAT']) + + # Update content + self.note_manager.update_note(note_id, 'Updated observation with more details') + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(note['content'], 'Updated observation with more details') + + # Update tags + self.note_manager.update_note_tags(note_id, ['WHAT', 'WHERE', 'WHEN']) + note = self.note_manager.get_note(note_id) + assert note is not None # Type narrowing for Pylance + self.assertEqual(len(note['question_tags']), 3) + + # Verify history + history = self.note_manager.get_note_history(note_id) + self.assertEqual(len(history), 2) + + # Delete note + self.note_manager.delete_note(note_id) + note = self.note_manager.get_note(note_id) + self.assertIsNone(note) + + def test_case_with_multiple_notes_and_tags(self): + """Test a case with multiple notes and complex tagging.""" + # Create various notes with different tags + notes_data = [ + ('Suspect identified as John Doe', ['WHO']), + ('Crime occurred at 123 Main St', ['WHERE']), + ('Incident happened on Jan 15, 2025', ['WHEN']), + ('Weapon used was a knife', ['WITH_WHAT', 'WHAT']), + ('Motive appears to be robbery', ['WHY', 'WHAT']), + ] + + note_ids = [] + for content, tags in notes_data: + note_id = self.note_manager.create_note( + self.case_id, content, 'Detective Smith', question_tags=tags + ) + note_ids.append(note_id) + + # Verify all notes created + all_notes = self.note_manager.get_notes(self.case_id) + self.assertEqual(len(all_notes), 5) + + # Search by specific tags + who_notes = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHO']) + self.assertEqual(len(who_notes), 1) + + what_notes = self.note_manager.search_notes(case_id=self.case_id, question_tags=['WHAT']) + # Should find notes with WHAT tag: 'Weapon used was a knife' and 'Motive appears to be robbery' + self.assertEqual(len(what_notes), 2) + + def test_note_revision_immutability(self): + """Test that note revisions are truly immutable.""" + note_id = self.note_manager.create_note( + self.case_id, 'Version 1', 'Detective Smith' + ) + + # Create multiple revisions + for i in range(2, 6): + self.note_manager.update_note(note_id, f'Version {i}') + + # Get full history + history = self.note_manager.get_note_history(note_id) + + # Verify all revisions are preserved + self.assertEqual(len(history), 5) + + # Verify each revision has correct content + for i, revision in enumerate(reversed(history)): + self.assertEqual(revision['revision_number'], i + 1) + self.assertEqual(revision['content'], f'Version {i + 1}') + + # Verify timestamp exists + self.assertIsNotNone(revision['timestamp']) + + +if __name__ == '__main__': + unittest.main()