diff --git a/.gitignore b/.gitignore index 0dbf2f2..0940497 100644 --- a/.gitignore +++ b/.gitignore @@ -168,3 +168,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ +History +firefox_places.sqlite diff --git a/browser2timesketch.py b/browser2timesketch.py index cd081b1..a1baa57 100755 --- a/browser2timesketch.py +++ b/browser2timesketch.py @@ -1,16 +1,16 @@ #!/usr/bin/env python3 """ -Browser History to Timesketch CSV Converter +Browser History to Timesketch CSV Converter - Enhanced Edition -Converts browser history from major browser engines to Timesketch-compatible CSV format. -Supports: Gecko (Firefox), Chromium (Chrome/Edge/Brave/etc.), WebKit (Safari) +Converts ALL timestamped browser events to Timesketch-compatible CSV format. +Includes: visits, downloads, bookmarks, annotations, engagement data, and more. """ import sqlite3 import csv import argparse import sys -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Tuple, Optional, Dict, Any, List @@ -52,7 +52,6 @@ def validate_sqlite_database(db_path: str) -> None: try: conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True) cursor = conn.cursor() - # Check if it's a valid SQLite database by querying sqlite_master cursor.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1") conn.close() except sqlite3.DatabaseError as e: @@ -78,30 +77,22 @@ def detect_browser_type(db_path: str) -> str: conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True) cursor = conn.cursor() - # Get all table names cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = {row[0] for row in cursor.fetchall()} conn.close() - # Check for Gecko/Firefox tables if 'moz_historyvisits' in tables and 'moz_places' in tables: return 'gecko' - # Check for Chromium tables if 'visits' in tables and 'urls' in tables: return 'chromium' - # Check for WebKit/Safari tables if 'history_visits' in tables and 'history_items' in tables: return 'webkit' raise BrowserDetectionError( - f"Cannot determine browser type. Found tables: {', '.join(sorted(tables))}\n" - f"Expected one of:\n" - f" - Gecko/Firefox: moz_historyvisits, moz_places\n" - f" - Chromium: visits, urls\n" - f" - WebKit/Safari: history_visits, history_items" + f"Cannot determine browser type. Found tables: {', '.join(sorted(tables))}" ) except sqlite3.Error as e: @@ -120,12 +111,10 @@ def validate_timestamp(unix_microseconds: int, browser_type: str) -> None: TimestampValidationError: If timestamp is unreasonable """ if unix_microseconds <= 0: - return # Allow 0 for missing timestamps + return - # Convert to seconds for validation timestamp_seconds = unix_microseconds / 1000000 - # Check if timestamp is reasonable (between 1990 and 2040) min_date = datetime(1990, 1, 1) max_date = datetime(2040, 1, 1) min_seconds = min_date.timestamp() @@ -147,54 +136,26 @@ def validate_timestamp(unix_microseconds: int, browser_type: str) -> None: def convert_gecko_timestamp(gecko_timestamp: Optional[int]) -> Tuple[int, str]: - """ - Convert Gecko/Firefox timestamp (microseconds since Unix epoch) to ISO format. - Firefox stores timestamps as microseconds since 1970-01-01 00:00:00 UTC. - - Args: - gecko_timestamp: Gecko timestamp in microseconds - - Returns: - tuple: (microseconds, ISO formatted datetime string) - """ + """Convert Gecko/Firefox timestamp to Unix microseconds and ISO format.""" if gecko_timestamp is None or gecko_timestamp == 0: return 0, "" - # Validate validate_timestamp(gecko_timestamp, "Gecko/Firefox") - # Convert microseconds to seconds timestamp_seconds = gecko_timestamp / 1000000 dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc) return gecko_timestamp, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00') def convert_chromium_timestamp(chromium_timestamp: Optional[int]) -> Tuple[int, str]: - """ - Convert Chromium timestamp to Unix microseconds and ISO format. - Chromium stores timestamps as microseconds since 1601-01-01 00:00:00 UTC (Windows epoch). - - Args: - chromium_timestamp: Chromium timestamp in microseconds since 1601 - - Returns: - tuple: (Unix microseconds, ISO formatted datetime string) - """ + """Convert Chromium timestamp to Unix microseconds and ISO format.""" if chromium_timestamp is None or chromium_timestamp == 0: return 0, "" - # Chromium epoch: January 1, 1601 - # Unix epoch: January 1, 1970 - # Difference: 11644473600 seconds chromium_epoch_offset = 11644473600 - - # Convert to Unix timestamp (seconds since 1970) timestamp_seconds = (chromium_timestamp / 1000000) - chromium_epoch_offset - - # Convert to Unix microseconds for Timesketch unix_microseconds = int(timestamp_seconds * 1000000) - # Validate validate_timestamp(unix_microseconds, "Chromium") dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc) @@ -202,48 +163,46 @@ def convert_chromium_timestamp(chromium_timestamp: Optional[int]) -> Tuple[int, def convert_webkit_timestamp(webkit_timestamp: Optional[float]) -> Tuple[int, str]: - """ - Convert WebKit/Safari timestamp to Unix microseconds and ISO format. - Safari stores timestamps as seconds (with decimal) since 2001-01-01 00:00:00 UTC (Cocoa/Core Data epoch). - - Args: - webkit_timestamp: WebKit timestamp in seconds since 2001 - - Returns: - tuple: (Unix microseconds, ISO formatted datetime string) - """ + """Convert WebKit/Safari timestamp to Unix microseconds and ISO format.""" if webkit_timestamp is None or webkit_timestamp == 0: return 0, "" - # WebKit/Cocoa epoch: January 1, 2001 - # Unix epoch: January 1, 1970 - # Difference: 978307200 seconds webkit_epoch_offset = 978307200 - - # Convert to Unix timestamp (seconds since 1970) timestamp_seconds = webkit_timestamp + webkit_epoch_offset - - # Convert to Unix microseconds for Timesketch unix_microseconds = int(timestamp_seconds * 1000000) - # Validate validate_timestamp(unix_microseconds, "WebKit/Safari") dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc) return unix_microseconds, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00') -def write_timesketch_csv(output_csv: str, fieldnames: List[str], rows: List[Dict[str, Any]]) -> None: +def write_timesketch_csv(output_csv: str, rows: List[Dict[str, Any]]) -> None: """ - Write history data to Timesketch-compatible CSV format. + Write history data to Timesketch-compatible CSV format with dynamic fields. Args: output_csv: Path to output CSV file - fieldnames: List of CSV field names rows: List of row dictionaries to write """ + if not rows: + return + + # Collect all unique fields from all rows + all_fields = set() + for row in rows: + all_fields.update(row.keys()) + + # Define standard field order (these come first) + standard_fields = ['timestamp', 'datetime', 'timestamp_desc', 'message', 'data_type'] + + # Build fieldnames list with standard fields first, then alphabetically sorted remainder + fieldnames = [f for f in standard_fields if f in all_fields] + remaining_fields = sorted(all_fields - set(standard_fields)) + fieldnames.extend(remaining_fields) + with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore') writer.writeheader() for row in rows: @@ -251,85 +210,32 @@ def write_timesketch_csv(output_csv: str, fieldnames: List[str], rows: List[Dict def connect_database_readonly(db_path: str) -> sqlite3.Connection: - """ - Connect to database in read-only mode to avoid lock issues. - - Args: - db_path: Path to database file - - Returns: - SQLite connection object - - Raises: - sqlite3.OperationalError: If database is locked or inaccessible - """ + """Connect to database in read-only mode to avoid lock issues.""" try: - # Use URI with read-only mode to avoid locking issues conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True) return conn except sqlite3.OperationalError as e: raise sqlite3.OperationalError( f"Cannot open database (it may be locked by the browser): {db_path}\n" - f"Please close {db_path.split('/')[-2] if '/' in db_path else 'the browser'} " - f"and try again, or copy the database file to a temporary location.\n" + f"Please close the browser and try again, or copy the database file.\n" f"Original error: {e}" ) -def validate_browser_schema(conn: sqlite3.Connection, expected_tables: List[str], browser_name: str) -> None: - """ - Validate that required tables exist in the database. - - Args: - conn: Database connection - expected_tables: List of required table names - browser_name: Browser name for error messages - - Raises: - DatabaseValidationError: If required tables are missing - """ +def table_exists(conn: sqlite3.Connection, table_name: str) -> bool: + """Check if a table exists in the database.""" cursor = conn.cursor() - cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") - existing_tables = {row[0] for row in cursor.fetchall()} - - missing_tables = set(expected_tables) - existing_tables - if missing_tables: - raise DatabaseValidationError( - f"Database does not appear to be a valid {browser_name} history database.\n" - f"Missing required tables: {', '.join(missing_tables)}\n" - f"Found tables: {', '.join(sorted(existing_tables))}" - ) + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ) + return cursor.fetchone() is not None -def extract_chromium_history(db_path: str, output_csv: str, browser_name: Optional[str] = None) -> int: - """ - Extract browser history from Chromium-based browsers and convert to Timesketch CSV. - Works with all Chromium-based browsers: Chrome, Edge, Brave, Chromium, Opera, Vivaldi, etc. - - Args: - db_path: Path to Chromium History database - output_csv: Path to output CSV file - browser_name: Optional custom name for data_type field (default: "Chromium") - - Returns: - Number of entries processed - - Raises: - DatabaseValidationError: If database validation fails - sqlite3.Error: If database query fails - """ - if browser_name is None: - browser_name = "Chromium" - - # Connect to database - conn = connect_database_readonly(db_path) - - # Validate schema - validate_browser_schema(conn, ['visits', 'urls'], browser_name) - +def extract_chromium_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract visit events from Chromium database with resolved foreign keys.""" cursor = conn.cursor() - # Query to extract history visits with URL information query = """ SELECT visits.visit_time, @@ -339,340 +245,614 @@ def extract_chromium_history(db_path: str, output_csv: str, browser_name: Option visits.visit_duration, urls.visit_count, urls.typed_count, - urls.last_visit_time + visits.segment_id, + visits.incremented_omnibox_typed_score, + urls.hidden, + from_urls.url as from_url, + from_urls.title as from_title, + opener_urls.url as opener_url, + opener_urls.title as opener_title FROM visits JOIN urls ON visits.url = urls.id + LEFT JOIN visits from_visits ON visits.from_visit = from_visits.id + LEFT JOIN urls from_urls ON from_visits.url = from_urls.id + LEFT JOIN visits opener_visits ON visits.opener_visit = opener_visits.id + LEFT JOIN urls opener_urls ON opener_visits.url = opener_urls.id ORDER BY visits.visit_time """ - try: - cursor.execute(query) - results = cursor.fetchall() - except sqlite3.Error as e: - conn.close() - raise sqlite3.Error(f"Error querying {browser_name} history: {e}") + cursor.execute(query) + results = cursor.fetchall() - # Transition type mapping (Chromium transition types) transition_types = { - 0: "Link", - 1: "Typed", - 2: "Auto_Bookmark", - 3: "Auto_Subframe", - 4: "Manual_Subframe", - 5: "Generated", - 6: "Start_Page", - 7: "Form_Submit", - 8: "Reload", - 9: "Keyword", - 10: "Keyword_Generated" + 0: "Link", 1: "Typed", 2: "Auto_Bookmark", 3: "Auto_Subframe", + 4: "Manual_Subframe", 5: "Generated", 6: "Start_Page", + 7: "Form_Submit", 8: "Reload", 9: "Keyword", 10: "Keyword_Generated" } rows = [] - validation_errors = [] - - for idx, row in enumerate(results): - chromium_timestamp = row[0] - url = row[1] or "" - title = row[2] or "(No title)" - transition = row[3] - visit_duration = row[4] or 0 - visit_count = row[5] or 0 - typed_count = row[6] or 0 + for row in results: + (chromium_timestamp, url, title, transition, visit_duration, + visit_count, typed_count, segment_id, incremented_typed, hidden, + from_url, from_title, opener_url, opener_title) = row + + try: + unix_microseconds, iso_datetime = convert_chromium_timestamp(chromium_timestamp) + except TimestampValidationError: + continue - # Extract core transition type (lower 8 bits) core_transition = transition & 0xFF transition_name = transition_types.get(core_transition, f"Unknown({core_transition})") - # Convert timestamp - try: - unix_microseconds, iso_datetime = convert_chromium_timestamp(chromium_timestamp) - except TimestampValidationError as e: - validation_errors.append(f"Entry {idx + 1}: {e}") - if len(validation_errors) <= 3: # Only store first few errors - continue - else: - break # Too many errors, likely a systematic issue - - # Construct message - message = f"Visited: {title}" - - rows.append({ + # Build row with only useful fields + row_data = { 'timestamp': unix_microseconds, 'datetime': iso_datetime, 'timestamp_desc': 'Visit Time', - 'message': message, - 'url': url, - 'title': title, + 'message': f"Visited: {title or '(No title)'}", + 'data_type': f'{browser_name.lower()}:history:visit', + 'url': url or "", + 'title': title or "(No title)", 'visit_type': transition_name, - 'visit_duration_us': visit_duration, - 'total_visits': visit_count, - 'typed_count': typed_count, - 'data_type': f'{browser_name.lower()}:history:visit' - }) + 'visit_duration_us': visit_duration or 0, + 'total_visits': visit_count or 0, + 'typed_count': typed_count or 0, + 'typed_in_omnibox': bool(incremented_typed), + 'hidden': bool(hidden) + } + + # Add optional fields only if present + if from_url: + row_data['from_url'] = from_url + if from_title: + row_data['from_title'] = from_title + + if opener_url: + row_data['opener_url'] = opener_url + if opener_title: + row_data['opener_title'] = opener_title + + # Add session ID only if non-zero + if segment_id and segment_id != 0: + row_data['session_id'] = segment_id + + rows.append(row_data) - conn.close() - - # Report validation errors if any - if validation_errors: - print(f"Warning: Found {len(validation_errors)} timestamp validation errors:", file=sys.stderr) - for error in validation_errors[:3]: - print(f" {error}", file=sys.stderr) - if len(validation_errors) > 3: - print(f" ... and {len(validation_errors) - 3} more errors", file=sys.stderr) - print(f"Continuing with {len(rows)} valid entries...", file=sys.stderr) - - # Write CSV - fieldnames = [ - 'timestamp', 'datetime', 'timestamp_desc', 'message', - 'url', 'title', 'visit_type', 'visit_duration_us', - 'total_visits', 'typed_count', 'data_type' - ] - write_timesketch_csv(output_csv, fieldnames, rows) - - return len(rows) + return rows -def extract_gecko_history(db_path: str, output_csv: str, browser_name: Optional[str] = None) -> int: - """ - Extract browser history from Gecko-based browsers (Firefox) and convert to Timesketch CSV. - Works with Firefox and Firefox derivatives (Waterfox, LibreWolf, etc.) - - Args: - db_path: Path to Gecko places.sqlite database - output_csv: Path to output CSV file - browser_name: Optional custom name for data_type field (default: "Firefox") - - Returns: - Number of entries processed - - Raises: - DatabaseValidationError: If database validation fails - sqlite3.Error: If database query fails - """ - if browser_name is None: - browser_name = "Firefox" - - # Connect to database - conn = connect_database_readonly(db_path) - - # Validate schema - validate_browser_schema(conn, ['moz_historyvisits', 'moz_places'], browser_name) +def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract download events from Chromium database.""" + if not table_exists(conn, 'downloads'): + return [] cursor = conn.cursor() - # Query to extract history visits with URL information query = """ SELECT - moz_historyvisits.visit_date as timestamp, - moz_places.url, - moz_places.title, - moz_places.description, - moz_historyvisits.visit_type, - moz_historyvisits.from_visit - FROM moz_historyvisits - JOIN moz_places ON moz_historyvisits.place_id = moz_places.id - ORDER BY moz_historyvisits.visit_date + id, + guid, + current_path, + target_path, + start_time, + received_bytes, + total_bytes, + state, + danger_type, + interrupt_reason, + end_time, + opened, + last_access_time, + referrer, + tab_url, + mime_type + FROM downloads + ORDER BY start_time """ try: cursor.execute(query) results = cursor.fetchall() - except sqlite3.Error as e: - conn.close() - raise sqlite3.Error(f"Error querying {browser_name} history: {e}") + except sqlite3.Error: + return [] - # Visit type mapping (Firefox visit types) - visit_types = { - 1: "Link", - 2: "Typed", - 3: "Bookmark", - 4: "Embed", - 5: "Redirect_Permanent", - 6: "Redirect_Temporary", - 7: "Download", - 8: "Framed_Link", - 9: "Reload" + download_states = { + 0: "In Progress", + 1: "Complete", + 2: "Cancelled", + 3: "Interrupted", + 4: "Dangerous" } rows = [] - validation_errors = [] - - for idx, row in enumerate(results): - timestamp_us = row[0] - url = row[1] or "" - title = row[2] or "(No title)" - description = row[3] or "" - visit_type_id = row[4] + for row in results: + (dl_id, guid, current_path, target_path, start_time, received_bytes, + total_bytes, state, danger_type, interrupt_reason, end_time, opened, + last_access_time, referrer, tab_url, mime_type) = row - visit_type_name = visit_types.get(visit_type_id, f"Unknown({visit_type_id})") - - # Convert timestamp try: - unix_microseconds, iso_datetime = convert_gecko_timestamp(timestamp_us) - except TimestampValidationError as e: - validation_errors.append(f"Entry {idx + 1}: {e}") - if len(validation_errors) <= 3: - continue - else: - break + start_us, start_iso = convert_chromium_timestamp(start_time) + end_us, end_iso = convert_chromium_timestamp(end_time) if end_time else (0, "") + access_us, access_iso = convert_chromium_timestamp(last_access_time) if last_access_time else (0, "") + except TimestampValidationError: + continue - # Construct message - message = f"Visited: {title}" - if description: - message += f" - {description}" + state_name = download_states.get(state, f"Unknown({state})") + filename = Path(target_path).name if target_path else "(unknown)" + # Download start event rows.append({ - 'timestamp': unix_microseconds, - 'datetime': iso_datetime, - 'timestamp_desc': 'Visit Time', - 'message': message, - 'url': url, - 'title': title, - 'visit_type': visit_type_name, - 'data_type': f'{browser_name.lower()}:history:visit' + 'timestamp': start_us, + 'datetime': start_iso, + 'timestamp_desc': 'Download Started', + 'message': f"Download started: {filename} ({mime_type or 'unknown type'})", + 'data_type': f'{browser_name.lower()}:download:start', + 'download_id': dl_id, + 'filename': filename, + 'file_path': target_path or "", + 'file_size_bytes': total_bytes or 0, + 'mime_type': mime_type or "", + 'download_state': state_name, + 'referrer_url': referrer or "", + 'tab_url': tab_url or "", + 'dangerous': bool(danger_type), + 'interrupted': bool(interrupt_reason) }) + + # Download complete event (if completed) + if end_time and end_time != start_time: + duration_seconds = (end_us - start_us) / 1000000 + rows.append({ + 'timestamp': end_us, + 'datetime': end_iso, + 'timestamp_desc': 'Download Completed', + 'message': f"Download completed: {filename} ({received_bytes or 0} bytes in {duration_seconds:.1f}s)", + 'data_type': f'{browser_name.lower()}:download:complete', + 'download_id': dl_id, + 'filename': filename, + 'file_path': target_path or "", + 'file_size_bytes': received_bytes or 0, + 'mime_type': mime_type or "", + 'download_state': state_name, + 'download_duration_seconds': duration_seconds + }) - conn.close() - - # Report validation errors if any - if validation_errors: - print(f"Warning: Found {len(validation_errors)} timestamp validation errors:", file=sys.stderr) - for error in validation_errors[:3]: - print(f" {error}", file=sys.stderr) - if len(validation_errors) > 3: - print(f" ... and {len(validation_errors) - 3} more errors", file=sys.stderr) - print(f"Continuing with {len(rows)} valid entries...", file=sys.stderr) - - # Write CSV - fieldnames = [ - 'timestamp', 'datetime', 'timestamp_desc', 'message', - 'url', 'title', 'visit_type', 'data_type' - ] - write_timesketch_csv(output_csv, fieldnames, rows) - - return len(rows) + return rows -def extract_webkit_history(db_path: str, output_csv: str, browser_name: Optional[str] = None) -> int: - """ - Extract browser history from WebKit-based browsers (Safari) and convert to Timesketch CSV. - - Args: - db_path: Path to Safari History.db database - output_csv: Path to output CSV file - browser_name: Optional custom name for data_type field (default: "Safari") - - Returns: - Number of entries processed - - Raises: - DatabaseValidationError: If database validation fails - sqlite3.Error: If database query fails - """ - if browser_name is None: - browser_name = "Safari" - - # Connect to database - conn = connect_database_readonly(db_path) - - # Validate schema - validate_browser_schema(conn, ['history_visits', 'history_items'], browser_name) +def extract_chromium_search_terms(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract search terms from Chromium database.""" + if not table_exists(conn, 'keyword_search_terms'): + return [] cursor = conn.cursor() - # Query to extract history visits with URL information query = """ SELECT - history_visits.visit_time, - history_items.url, - history_items.title, - history_visits.title as visit_title - FROM history_visits - JOIN history_items ON history_visits.history_item = history_items.id - ORDER BY history_visits.visit_time + kst.term, + kst.normalized_term, + u.url, + u.title, + u.last_visit_time + FROM keyword_search_terms kst + JOIN urls u ON kst.url_id = u.id + ORDER BY u.last_visit_time """ try: cursor.execute(query) results = cursor.fetchall() - except sqlite3.Error as e: - conn.close() - raise sqlite3.Error(f"Error querying {browser_name} history: {e}") + except sqlite3.Error: + return [] rows = [] - validation_errors = [] - - for idx, row in enumerate(results): - webkit_timestamp = row[0] - url = row[1] or "" - title = row[2] or row[3] or "(No title)" + for row in results: + term, normalized_term, url, title, last_visit = row - # Convert timestamp try: - unix_microseconds, iso_datetime = convert_webkit_timestamp(webkit_timestamp) - except TimestampValidationError as e: - validation_errors.append(f"Entry {idx + 1}: {e}") - if len(validation_errors) <= 3: - continue - else: - break - - # Construct message - message = f"Visited: {title}" + unix_microseconds, iso_datetime = convert_chromium_timestamp(last_visit) + except TimestampValidationError: + continue rows.append({ 'timestamp': unix_microseconds, 'datetime': iso_datetime, - 'timestamp_desc': 'Visit Time', - 'message': message, - 'url': url, - 'title': title, - 'data_type': f'{browser_name.lower()}:history:visit' + 'timestamp_desc': 'Search Performed', + 'message': f"Search: {term}", + 'data_type': f'{browser_name.lower()}:search:query', + 'search_term': term, + 'normalized_search_term': normalized_term, + 'search_url': url or "", + 'search_page_title': title or "" }) - conn.close() + return rows + + +def extract_gecko_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract visit events from Gecko database with resolved foreign keys.""" + cursor = conn.cursor() - # Report validation errors if any - if validation_errors: - print(f"Warning: Found {len(validation_errors)} timestamp validation errors:", file=sys.stderr) - for error in validation_errors[:3]: - print(f" {error}", file=sys.stderr) - if len(validation_errors) > 3: - print(f" ... and {len(validation_errors) - 3} more errors", file=sys.stderr) - print(f"Continuing with {len(rows)} valid entries...", file=sys.stderr) + query = """ + SELECT + v.visit_date, + p.url, + p.title, + p.description, + v.visit_type, + v.session, + p.visit_count, + p.typed, + p.frecency, + p.hidden, + p.rev_host, + prev_p.url as from_url, + prev_p.title as from_title + FROM moz_historyvisits v + JOIN moz_places p ON v.place_id = p.id + LEFT JOIN moz_historyvisits prev_v ON v.from_visit = prev_v.id + LEFT JOIN moz_places prev_p ON prev_v.place_id = prev_p.id + ORDER BY v.visit_date + """ - # Write CSV - fieldnames = [ - 'timestamp', 'datetime', 'timestamp_desc', 'message', - 'url', 'title', 'data_type' - ] - write_timesketch_csv(output_csv, fieldnames, rows) + cursor.execute(query) + results = cursor.fetchall() - return len(rows) + visit_types = { + 1: "Link", 2: "Typed", 3: "Bookmark", 4: "Embed", + 5: "Redirect_Permanent", 6: "Redirect_Temporary", + 7: "Download", 8: "Framed_Link", 9: "Reload" + } + + rows = [] + for row in results: + (timestamp_us, url, title, description, visit_type_id, + session, visit_count, typed, frecency, hidden, rev_host, + from_url, from_title) = row + + try: + unix_microseconds, iso_datetime = convert_gecko_timestamp(timestamp_us) + except TimestampValidationError: + continue + + visit_type_name = visit_types.get(visit_type_id, f"Unknown({visit_type_id})") + + message = f"Visited: {title or '(No title)'}" + if description: + message += f" - {description}" + + # Build row with only useful fields + row_data = { + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Visit Time', + 'message': message, + 'data_type': f'{browser_name.lower()}:history:visit', + 'url': url or "", + 'title': title or "(No title)", + 'visit_type': visit_type_name, + 'total_visit_count': visit_count or 0, + 'typed_count': typed or 0, + 'frecency_score': frecency, + 'hidden': bool(hidden), + 'domain': rev_host[::-1] if rev_host else "" + } + + # Add optional fields only if present + if description: + row_data['description'] = description + + # Add navigation chain info if present + if from_url: + row_data['from_url'] = from_url + if from_title: + row_data['from_title'] = from_title + + # Add session ID only if non-zero + if session and session != 0: + row_data['session_id'] = session + + rows.append(row_data) + + return rows + + +def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract bookmark events from Gecko database.""" + if not table_exists(conn, 'moz_bookmarks'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + b.id, + b.type, + b.title, + b.dateAdded, + b.lastModified, + p.url, + p.title as page_title, + b.parent, + b.position + FROM moz_bookmarks b + LEFT JOIN moz_places p ON b.fk = p.id + WHERE b.dateAdded IS NOT NULL + ORDER BY b.dateAdded + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + bookmark_types = { + 1: "Bookmark", + 2: "Folder", + 3: "Separator" + } + + rows = [] + for row in results: + (bm_id, bm_type, title, date_added, last_modified, + url, page_title, parent, position) = row + + try: + added_us, added_iso = convert_gecko_timestamp(date_added) + except TimestampValidationError: + continue + + type_name = bookmark_types.get(bm_type, f"Unknown({bm_type})") + display_title = title or page_title or "(No title)" + + # Bookmark added event + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Bookmark Added', + 'message': f"Bookmarked: {display_title}", + 'data_type': f'{browser_name.lower()}:bookmark:added', + 'bookmark_id': bm_id, + 'bookmark_type': type_name, + 'bookmark_title': display_title, + 'url': url or "", + 'parent_folder_id': parent, + 'position': position + }) + + # Bookmark modified event (if different from added) + if last_modified and last_modified != date_added: + try: + modified_us, modified_iso = convert_gecko_timestamp(last_modified) + rows.append({ + 'timestamp': modified_us, + 'datetime': modified_iso, + 'timestamp_desc': 'Bookmark Modified', + 'message': f"Modified bookmark: {display_title}", + 'data_type': f'{browser_name.lower()}:bookmark:modified', + 'bookmark_id': bm_id, + 'bookmark_title': display_title, + 'url': url or "" + }) + except TimestampValidationError: + pass + + return rows + + +def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract page metadata/engagement events from Gecko database.""" + if not table_exists(conn, 'moz_places_metadata'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + m.place_id, + m.created_at, + m.updated_at, + m.total_view_time, + m.typing_time, + m.key_presses, + m.scrolling_time, + m.scrolling_distance, + m.document_type, + p.url, + p.title + FROM moz_places_metadata m + JOIN moz_places p ON m.place_id = p.id + WHERE m.created_at > 0 + ORDER BY m.created_at + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + (place_id, created_at, updated_at, total_view_time, typing_time, + key_presses, scrolling_time, scrolling_distance, document_type, + url, title) = row + + try: + created_us, created_iso = convert_gecko_timestamp(created_at) + except TimestampValidationError: + continue + + # Convert microseconds to seconds for display + view_seconds = (total_view_time or 0) / 1000000 + typing_seconds = (typing_time or 0) / 1000000 + scrolling_seconds = (scrolling_time or 0) / 1000000 + + rows.append({ + 'timestamp': created_us, + 'datetime': created_iso, + 'timestamp_desc': 'Page Engagement', + 'message': f"Engaged with: {title or '(No title)'} ({view_seconds:.1f}s)", + 'data_type': f'{browser_name.lower()}:page:engagement', + 'url': url or "", + 'title': title or "(No title)", + 'total_view_time_seconds': view_seconds, + 'typing_time_seconds': typing_seconds, + 'key_presses': key_presses or 0, + 'scrolling_time_seconds': scrolling_seconds, + 'scrolling_distance': scrolling_distance or 0, + 'document_type': document_type + }) + + return rows + + +def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract visit events from WebKit database with resolved redirect chains.""" + cursor = conn.cursor() + + query = """ + SELECT + hv.visit_time, + hi.url, + hi.title, + hv.title as visit_title, + hv.load_successful, + hv.http_non_get, + hi.visit_count, + redirect_src_items.url as redirect_source_url, + redirect_dst_items.url as redirect_destination_url + FROM history_visits hv + JOIN history_items hi ON hv.history_item = hi.id + LEFT JOIN history_visits redirect_src ON hv.redirect_source = redirect_src.id + LEFT JOIN history_items redirect_src_items ON redirect_src.history_item = redirect_src_items.id + LEFT JOIN history_visits redirect_dst ON hv.redirect_destination = redirect_dst.id + LEFT JOIN history_items redirect_dst_items ON redirect_dst.history_item = redirect_dst_items.id + ORDER BY hv.visit_time + """ + + cursor.execute(query) + results = cursor.fetchall() + + rows = [] + for row in results: + (webkit_timestamp, url, title, visit_title, + load_successful, http_non_get, visit_count, + redirect_source_url, redirect_destination_url) = row + + try: + unix_microseconds, iso_datetime = convert_webkit_timestamp(webkit_timestamp) + except TimestampValidationError: + continue + + display_title = title or visit_title or "(No title)" + + message = f"Visited: {display_title}" + if not load_successful: + message += " [FAILED TO LOAD]" + if http_non_get: + message += " [POST/Form]" + + # Build row with only useful fields + row_data = { + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Visit Time', + 'message': message, + 'data_type': f'{browser_name.lower()}:history:visit', + 'url': url or "", + 'title': display_title, + 'load_successful': bool(load_successful), + 'http_post': bool(http_non_get), + 'total_visit_count': visit_count or 0 + } + + # Add redirect chain info if present + if redirect_source_url: + row_data['redirect_source_url'] = redirect_source_url + if redirect_destination_url: + row_data['redirect_destination_url'] = redirect_destination_url + + rows.append(row_data) + + return rows + + +def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: + """ + Extract ALL timeline events from browser database. + + Returns: + Tuple of (all_rows, event_counts dictionary) + """ + if browser_name is None: + browser_name = {'gecko': 'Firefox', 'chromium': 'Chromium', 'webkit': 'Safari'}[browser_type] + + conn = connect_database_readonly(db_path) + all_rows = [] + event_counts = {} + + print(f"Extracting events from {browser_name} database...") + + try: + if browser_type == 'gecko': + # Firefox/Gecko + visits = extract_gecko_visits(conn, browser_name) + all_rows.extend(visits) + event_counts['visits'] = len(visits) + print(f" ✓ Extracted {len(visits):,} visit events") + + bookmarks = extract_gecko_bookmarks(conn, browser_name) + all_rows.extend(bookmarks) + event_counts['bookmarks'] = len(bookmarks) + print(f" ✓ Extracted {len(bookmarks):,} bookmark events") + + metadata = extract_gecko_metadata(conn, browser_name) + all_rows.extend(metadata) + event_counts['engagement'] = len(metadata) + print(f" ✓ Extracted {len(metadata):,} page engagement events") + + elif browser_type == 'chromium': + # Chromium + visits = extract_chromium_visits(conn, browser_name) + all_rows.extend(visits) + event_counts['visits'] = len(visits) + print(f" ✓ Extracted {len(visits):,} visit events") + + downloads = extract_chromium_downloads(conn, browser_name) + all_rows.extend(downloads) + event_counts['downloads'] = len(downloads) + print(f" ✓ Extracted {len(downloads):,} download events") + + searches = extract_chromium_search_terms(conn, browser_name) + all_rows.extend(searches) + event_counts['searches'] = len(searches) + print(f" ✓ Extracted {len(searches):,} search query events") + + elif browser_type == 'webkit': + # Safari/WebKit + visits = extract_webkit_visits(conn, browser_name) + all_rows.extend(visits) + event_counts['visits'] = len(visits) + print(f" ✓ Extracted {len(visits):,} visit events") + + finally: + conn.close() + + # Sort all events by timestamp + all_rows.sort(key=lambda x: x['timestamp']) + + return all_rows, event_counts def generate_default_output_filename(browser_type: str, input_path: str) -> str: - """ - Generate a sensible default output filename based on browser type and input. - - Args: - browser_type: Browser type (gecko, chromium, webkit) - input_path: Input database path - - Returns: - Generated output filename - """ - # Extract browser name from path if possible + """Generate a sensible default output filename based on browser type and input.""" path_lower = input_path.lower() browser_names = { - 'firefox': 'firefox', - 'chrome': 'chrome', - 'edge': 'edge', - 'brave': 'brave', - 'opera': 'opera', - 'vivaldi': 'vivaldi', - 'safari': 'safari', + 'firefox': 'firefox', 'chrome': 'chrome', 'edge': 'edge', + 'brave': 'brave', 'opera': 'opera', 'vivaldi': 'vivaldi', 'safari': 'safari', } detected_name = None @@ -682,81 +862,36 @@ def generate_default_output_filename(browser_type: str, input_path: str) -> str: break if detected_name: - return f"{detected_name}_history_timesketch.csv" + return f"{detected_name}_timeline_timesketch.csv" else: - return f"{browser_type}_history_timesketch.csv" + return f"{browser_type}_timeline_timesketch.csv" def main() -> int: parser = argparse.ArgumentParser( - description='Convert browser history to Timesketch CSV format', + description='Convert browser events to Timesketch CSV format', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" -Browser Engine Types: - gecko, firefox - Gecko-based browsers (Firefox, Waterfox, LibreWolf, etc.) - chromium - Chromium-based browsers (Chrome, Edge, Brave, Opera, Vivaldi, etc.) +Extracts ALL timestamped events from browser databases: + - Page visits (with full metadata) + - Downloads (start and completion) + - Bookmarks (added and modified) + - Search queries + - Page engagement metrics (Firefox) + - And more! + +Browser types: + gecko, firefox - Gecko-based browsers (Firefox) + chromium - Chromium-based browsers (Chrome, Edge, Brave, etc.) webkit, safari - WebKit-based browsers (Safari) - auto - Auto-detect browser type from database schema - -All Chromium-based browsers (Chrome, Edge, Brave, Opera, Vivaldi) use identical database -schemas and can be processed with the "chromium" option. Use --browser-name to customize -the label in the output if needed. - -HOW TO FIND YOUR PROFILE PATH: - Firefox: - 1. Open Firefox and type: about:support - 2. Look for "Profile Folder" or "Profile Directory" - 3. Click "Open Folder" button or note the path - 4. The places.sqlite file is in this directory - - Chromium browsers (Chrome/Edge/Brave/etc.): - 1. Open browser and type: chrome://version/ - (or edge://version/, brave://version/, etc.) - 2. Look for "Profile Path" - this shows the full path - 3. The History file (no extension) is in this directory - - Safari: - Always at: ~/Library/Safari/History.db + auto - Auto-detect browser type (default) Example usage: - # Auto-detect browser type - python browser2timesketch.py -i ~/.mozilla/firefox/xyz.default/places.sqlite + # Auto-detect and extract everything + python browser2timesketch_enhanced.py -i /path/to/History - # Firefox with custom output - python browser2timesketch.py -b firefox -i ~/.mozilla/firefox/xyz.default/places.sqlite -o firefox.csv - - # Any Chromium browser (Chrome, Edge, Brave, etc.) - python browser2timesketch.py -b chromium -i ~/.config/google-chrome/Default/History -o output.csv - - # Chromium browser with custom label - python browser2timesketch.py -b chromium --browser-name "Brave" -i ~/.config/BraveSoftware/Brave-Browser/Default/History - - # Safari (macOS) - python browser2timesketch.py -b safari -i ~/Library/Safari/History.db - -Database Locations: - Gecko/Firefox: - Linux: ~/.mozilla/firefox//places.sqlite - macOS: ~/Library/Application Support/Firefox/Profiles//places.sqlite - Windows: %APPDATA%\\Mozilla\\Firefox\\Profiles\\\\places.sqlite - - Chromium (Chrome/Edge/Brave/Opera/Vivaldi): - Chrome Linux: ~/.config/google-chrome/Default/History - Chrome macOS: ~/Library/Application Support/Google/Chrome/Default/History - Chrome Windows: %LOCALAPPDATA%\\Google\\Chrome\\User Data\\Default\\History - - Edge Windows: %LOCALAPPDATA%\\Microsoft\\Edge\\User Data\\Default\\History - Edge macOS: ~/Library/Application Support/Microsoft Edge/Default/History - - Brave Linux: ~/.config/BraveSoftware/Brave-Browser/Default/History - Brave macOS: ~/Library/Application Support/BraveSoftware/Brave-Browser/Default/History - Brave Windows: %LOCALAPPDATA%\\BraveSoftware\\Brave-Browser\\User Data\\Default\\History - - WebKit/Safari: - macOS: ~/Library/Safari/History.db - -Note: The script uses read-only mode to avoid database lock issues, but closing -the browser is still recommended for best results. + # Specify browser and output + python browser2timesketch_enhanced.py -b firefox -i places.sqlite -o output.csv """ ) @@ -775,12 +910,12 @@ the browser is still recommended for best results. parser.add_argument( '-o', '--output', - help='Output CSV file path (default: auto-generated based on browser type)' + help='Output CSV file path (default: auto-generated)' ) parser.add_argument( '--browser-name', - help='Custom browser name for the data_type field (e.g., "Chrome", "Brave", "Edge")' + help='Custom browser name for the data_type field' ) args = parser.parse_args() @@ -789,7 +924,7 @@ the browser is still recommended for best results. # Validate database file print(f"Validating database: {args.input}") validate_sqlite_database(args.input) - print("✓ Database is valid SQLite file") + print("✓ Database is valid SQLite file\n") # Detect or validate browser type browser_type = args.browser.lower() @@ -797,15 +932,13 @@ the browser is still recommended for best results. if browser_type == 'auto': print("Auto-detecting browser type...") browser_type = detect_browser_type(args.input) - print(f"✓ Detected browser type: {browser_type}") + print(f"✓ Detected browser type: {browser_type}\n") else: - # Normalize aliases if browser_type == 'firefox': browser_type = 'gecko' elif browser_type == 'safari': browser_type = 'webkit' - # Validate that the database matches the specified type detected_type = detect_browser_type(args.input) if detected_type != browser_type: print(f"Warning: You specified '{args.browser}' but database appears to be '{detected_type}'", @@ -819,22 +952,30 @@ the browser is still recommended for best results. output_csv = args.output else: output_csv = generate_default_output_filename(browser_type, args.input) - print(f"Using output filename: {output_csv}") + print(f"Using output filename: {output_csv}\n") - # Extract history based on browser type - print(f"Extracting history from {browser_type} database...") + # Extract all events + all_rows, event_counts = extract_all_events(args.input, browser_type, args.browser_name) - if browser_type == 'gecko': - num_entries = extract_gecko_history(args.input, output_csv, args.browser_name) - elif browser_type == 'chromium': - num_entries = extract_chromium_history(args.input, output_csv, args.browser_name) - elif browser_type == 'webkit': - num_entries = extract_webkit_history(args.input, output_csv, args.browser_name) - else: - raise ValueError(f"Unknown browser type: {browser_type}") + if not all_rows: + print("No events found in database!") + return 1 + + # Write to CSV + print(f"\nWriting {len(all_rows):,} total events to CSV...") + write_timesketch_csv(output_csv, all_rows) + + # Summary + print("\n" + "=" * 60) + print("EXTRACTION COMPLETE") + print("=" * 60) + print(f"Total events: {len(all_rows):,}") + print("\nEvent breakdown:") + for event_type, count in sorted(event_counts.items()): + print(f" - {event_type:20} {count:,}") + print(f"\n✓ Output saved to: {output_csv}") + print("=" * 60) - print(f"\n✓ Successfully converted {num_entries} history entries") - print(f"✓ Output saved to: {output_csv}") return 0 except DatabaseValidationError as e: @@ -843,16 +984,9 @@ the browser is still recommended for best results. except BrowserDetectionError as e: print(f"Browser Detection Error: {e}", file=sys.stderr) return 1 - except TimestampValidationError as e: - print(f"Timestamp Validation Error: {e}", file=sys.stderr) - print("This indicates a systematic problem with timestamp conversion.", file=sys.stderr) - return 1 except sqlite3.Error as e: print(f"Database Error: {e}", file=sys.stderr) return 1 - except FileNotFoundError as e: - print(f"File Error: {e}", file=sys.stderr) - return 1 except KeyboardInterrupt: print("\nOperation cancelled by user", file=sys.stderr) return 130