diff --git a/browser2timesketch.py b/browser2timesketch.py index a1baa57..43d9b4b 100755 --- a/browser2timesketch.py +++ b/browser2timesketch.py @@ -1,16 +1,25 @@ #!/usr/bin/env python3 """ -Browser History to Timesketch CSV Converter - Enhanced Edition +Browser History to Timesketch CSV Converter - Comprehensive Edition -Converts ALL timestamped browser events to Timesketch-compatible CSV format. -Includes: visits, downloads, bookmarks, annotations, engagement data, and more. +Extracts ALL timestamped browser events to Timesketch-compatible CSV format. +Browser-agnostic output with consistent event naming across all browsers. + +Supports: +- Firefox/Gecko: visits, bookmarks, downloads, form history, annotations, + page metadata, input history, keywords, origins +- Chrome/Chromium/Edge/Brave: visits, downloads, searches, autofill, + favicons, media history, site engagement +- Safari/WebKit: visits, bookmarks, downloads, reading list, top sites + +Output format: Timesketch-compatible CSV with browser-agnostic event types """ import sqlite3 import csv import argparse import sys -from datetime import datetime, timedelta, timezone +from datetime import datetime, timezone from pathlib import Path from typing import Tuple, Optional, Dict, Any, List @@ -232,6 +241,21 @@ def table_exists(conn: sqlite3.Connection, table_name: str) -> bool: return cursor.fetchone() is not None +def column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool: + """Check if a column exists in a table.""" + cursor = conn.cursor() + try: + cursor.execute(f"PRAGMA table_info({table_name})") + columns = {row[1] for row in cursor.fetchall()} + return column_name in columns + except sqlite3.Error: + return False + + +# ============================================================================ +# CHROMIUM EXTRACTORS +# ============================================================================ + def extract_chromium_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: """Extract visit events from Chromium database with resolved foreign keys.""" cursor = conn.cursor() @@ -290,7 +314,8 @@ def extract_chromium_visits(conn: sqlite3.Connection, browser_name: str) -> List 'datetime': iso_datetime, 'timestamp_desc': 'Visit Time', 'message': f"Visited: {title or '(No title)'}", - 'data_type': f'{browser_name.lower()}:history:visit', + 'data_type': 'browser:page:visit', + 'browser': browser_name, 'url': url or "", 'title': title or "(No title)", 'visit_type': transition_name, @@ -386,7 +411,8 @@ def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> L 'datetime': start_iso, 'timestamp_desc': 'Download Started', 'message': f"Download started: {filename} ({mime_type or 'unknown type'})", - 'data_type': f'{browser_name.lower()}:download:start', + 'data_type': 'browser:download:start', + 'browser': browser_name, 'download_id': dl_id, 'filename': filename, 'file_path': target_path or "", @@ -407,7 +433,8 @@ def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> L 'datetime': end_iso, 'timestamp_desc': 'Download Completed', 'message': f"Download completed: {filename} ({received_bytes or 0} bytes in {duration_seconds:.1f}s)", - 'data_type': f'{browser_name.lower()}:download:complete', + 'data_type': 'browser:download:complete', + 'browser': browser_name, 'download_id': dl_id, 'filename': filename, 'file_path': target_path or "", @@ -416,6 +443,20 @@ def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> L 'download_state': state_name, 'download_duration_seconds': duration_seconds }) + + # Last access event (if different from completion) + if last_access_time and last_access_time != end_time and last_access_time != start_time: + rows.append({ + 'timestamp': access_us, + 'datetime': access_iso, + 'timestamp_desc': 'File Accessed', + 'message': f"Downloaded file accessed: {filename}", + 'data_type': 'browser:download:accessed', + 'browser': browser_name, + 'download_id': dl_id, + 'filename': filename, + 'file_path': target_path or "" + }) return rows @@ -459,7 +500,8 @@ def extract_chromium_search_terms(conn: sqlite3.Connection, browser_name: str) - 'datetime': iso_datetime, 'timestamp_desc': 'Search Performed', 'message': f"Search: {term}", - 'data_type': f'{browser_name.lower()}:search:query', + 'data_type': 'browser:search:query', + 'browser': browser_name, 'search_term': term, 'normalized_search_term': normalized_term, 'search_url': url or "", @@ -469,6 +511,269 @@ def extract_chromium_search_terms(conn: sqlite3.Connection, browser_name: str) - return rows +def extract_chromium_autofill(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract autofill/form data from Chromium database.""" + if not table_exists(conn, 'autofill'): + return [] + + cursor = conn.cursor() + + # Check which timestamp columns exist + has_created = column_exists(conn, 'autofill', 'date_created') + has_last_used = column_exists(conn, 'autofill', 'date_last_used') + + if not (has_created or has_last_used): + return [] + + # Build query based on available columns + timestamp_cols = [] + if has_created: + timestamp_cols.append('date_created') + if has_last_used: + timestamp_cols.append('date_last_used') + + query = f""" + SELECT + name, + value, + {', '.join(timestamp_cols)}, + count + FROM autofill + WHERE {' OR '.join([f'{col} > 0' for col in timestamp_cols])} + ORDER BY {timestamp_cols[0]} + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + name, value, *timestamps, count = row + date_created = timestamps[0] if has_created else None + date_last_used = timestamps[1] if has_last_used and len(timestamps) > 1 else timestamps[0] if not has_created else None + + # Form field created/first used + if date_created: + try: + created_us, created_iso = convert_chromium_timestamp(date_created) + rows.append({ + 'timestamp': created_us, + 'datetime': created_iso, + 'timestamp_desc': 'Form Field First Used', + 'message': f"First use of form field: {name}", + 'data_type': 'browser:form:first_use', + 'browser': browser_name, + 'form_field_name': name, + 'form_field_value': value[:50] + '...' if len(value) > 50 else value, + 'total_uses': count or 0 + }) + except TimestampValidationError: + pass + + # Form field last used (if different) + if date_last_used and date_last_used != date_created: + try: + last_us, last_iso = convert_chromium_timestamp(date_last_used) + rows.append({ + 'timestamp': last_us, + 'datetime': last_iso, + 'timestamp_desc': 'Form Field Last Used', + 'message': f"Used form field: {name}", + 'data_type': 'browser:form:use', + 'browser': browser_name, + 'form_field_name': name, + 'form_field_value': value[:50] + '...' if len(value) > 50 else value, + 'total_uses': count or 0 + }) + except TimestampValidationError: + pass + + return rows + + +def extract_chromium_favicons(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract favicon mapping timestamps from Chromium database.""" + if not table_exists(conn, 'icon_mapping'): + return [] + + # Check if last_updated column exists (not in all Chromium versions) + if not column_exists(conn, 'icon_mapping', 'last_updated'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + im.last_updated, + im.page_url, + f.url as favicon_url + FROM icon_mapping im + LEFT JOIN favicons f ON im.icon_id = f.id + WHERE im.last_updated > 0 + ORDER BY im.last_updated + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + last_updated, page_url, favicon_url = row + + try: + unix_microseconds, iso_datetime = convert_chromium_timestamp(last_updated) + except TimestampValidationError: + continue + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Favicon Updated', + 'message': f"Updated favicon for: {page_url}", + 'data_type': 'browser:favicon:update', + 'browser': browser_name, + 'page_url': page_url or "", + 'favicon_url': favicon_url or "" + }) + + return rows + + +def extract_chromium_media_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract media playback history from Chromium database.""" + if not table_exists(conn, 'playback'): + return [] + + cursor = conn.cursor() + + # Check available columns + has_last_updated = column_exists(conn, 'playback', 'last_updated_time_s') + + if not has_last_updated: + return [] + + query = """ + SELECT + p.url, + p.watch_time_s, + p.has_audio, + p.has_video, + p.last_updated_time_s + FROM playback p + WHERE p.last_updated_time_s > 0 + ORDER BY p.last_updated_time_s + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + url, watch_time, has_audio, has_video, last_updated = row + + # Convert Unix seconds to microseconds for consistency + unix_microseconds = int(last_updated * 1000000) + + try: + validate_timestamp(unix_microseconds, "Chromium Media") + dt = datetime.fromtimestamp(last_updated, tz=timezone.utc) + iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00') + except TimestampValidationError: + continue + + media_type = [] + if has_audio: + media_type.append("audio") + if has_video: + media_type.append("video") + media_type_str = "+".join(media_type) if media_type else "unknown" + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Media Playback', + 'message': f"Played {media_type_str}: {url} ({watch_time:.1f}s)", + 'data_type': 'browser:media:playback', + 'browser': browser_name, + 'media_url': url or "", + 'watch_time_seconds': watch_time or 0, + 'has_audio': bool(has_audio), + 'has_video': bool(has_video) + }) + + return rows + + +def extract_chromium_site_engagement(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract site engagement scores from Chromium database.""" + if not table_exists(conn, 'site_engagement'): + return [] + + cursor = conn.cursor() + + # Check for timestamp column + has_last_engagement = column_exists(conn, 'site_engagement', 'last_engagement_time') + + if not has_last_engagement: + return [] + + query = """ + SELECT + origin_url, + score, + last_engagement_time + FROM site_engagement + WHERE last_engagement_time > 0 AND score > 0 + ORDER BY last_engagement_time + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + origin_url, score, last_engagement = row + + # Convert internal timestamp format (typically Unix seconds) + unix_microseconds = int(last_engagement * 1000000) + + try: + validate_timestamp(unix_microseconds, "Chromium Site Engagement") + dt = datetime.fromtimestamp(last_engagement, tz=timezone.utc) + iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00') + except TimestampValidationError: + continue + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Site Engagement Updated', + 'message': f"Site engagement: {origin_url} (score: {score:.1f})", + 'data_type': 'browser:engagement:update', + 'browser': browser_name, + 'site_url': origin_url or "", + 'engagement_score': score or 0 + }) + + return rows + + +# ============================================================================ +# GECKO/FIREFOX EXTRACTORS +# ============================================================================ + def extract_gecko_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: """Extract visit events from Gecko database with resolved foreign keys.""" cursor = conn.cursor() @@ -527,7 +832,8 @@ def extract_gecko_visits(conn: sqlite3.Connection, browser_name: str) -> List[Di 'datetime': iso_datetime, 'timestamp_desc': 'Visit Time', 'message': message, - 'data_type': f'{browser_name.lower()}:history:visit', + 'data_type': 'browser:page:visit', + 'browser': browser_name, 'url': url or "", 'title': title or "(No title)", 'visit_type': visit_type_name, @@ -612,7 +918,8 @@ def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List 'datetime': added_iso, 'timestamp_desc': 'Bookmark Added', 'message': f"Bookmarked: {display_title}", - 'data_type': f'{browser_name.lower()}:bookmark:added', + 'data_type': 'browser:bookmark:added', + 'browser': browser_name, 'bookmark_id': bm_id, 'bookmark_type': type_name, 'bookmark_title': display_title, @@ -630,7 +937,8 @@ def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List 'datetime': modified_iso, 'timestamp_desc': 'Bookmark Modified', 'message': f"Modified bookmark: {display_title}", - 'data_type': f'{browser_name.lower()}:bookmark:modified', + 'data_type': 'browser:bookmark:modified', + 'browser': browser_name, 'bookmark_id': bm_id, 'bookmark_title': display_title, 'url': url or "" @@ -641,6 +949,309 @@ def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List return rows +def extract_gecko_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract downloads from Gecko database (older Firefox versions).""" + if not table_exists(conn, 'moz_downloads'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + id, + name, + source, + target, + startTime, + endTime, + state, + referrer, + currBytes, + maxBytes, + mimeType + FROM moz_downloads + WHERE startTime IS NOT NULL + ORDER BY startTime + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + download_states = { + 0: "Downloading", + 1: "Complete", + 2: "Failed", + 3: "Cancelled", + 4: "Paused" + } + + rows = [] + for row in results: + (dl_id, name, source, target, start_time, end_time, state, + referrer, curr_bytes, max_bytes, mime_type) = row + + try: + start_us, start_iso = convert_gecko_timestamp(start_time) + except TimestampValidationError: + continue + + state_name = download_states.get(state, f"Unknown({state})") + + # Download start event + rows.append({ + 'timestamp': start_us, + 'datetime': start_iso, + 'timestamp_desc': 'Download Started', + 'message': f"Download started: {name} ({mime_type or 'unknown type'})", + 'data_type': 'browser:download:start', + 'browser': browser_name, + 'download_id': dl_id, + 'filename': name or "", + 'source_url': source or "", + 'target_path': target or "", + 'file_size_bytes': max_bytes or 0, + 'mime_type': mime_type or "", + 'download_state': state_name, + 'referrer_url': referrer or "" + }) + + # Download complete event + if end_time and end_time != start_time: + try: + end_us, end_iso = convert_gecko_timestamp(end_time) + duration_seconds = (end_us - start_us) / 1000000 + rows.append({ + 'timestamp': end_us, + 'datetime': end_iso, + 'timestamp_desc': 'Download Completed', + 'message': f"Download completed: {name} ({curr_bytes or 0} bytes in {duration_seconds:.1f}s)", + 'data_type': 'browser:download:complete', + 'browser': browser_name, + 'download_id': dl_id, + 'filename': name or "", + 'file_size_bytes': curr_bytes or 0, + 'mime_type': mime_type or "", + 'download_state': state_name, + 'download_duration_seconds': duration_seconds + }) + except TimestampValidationError: + pass + + return rows + + +def extract_gecko_form_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract form autofill history from Gecko database.""" + if not table_exists(conn, 'moz_formhistory'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + id, + fieldname, + value, + timesUsed, + firstUsed, + lastUsed + FROM moz_formhistory + WHERE firstUsed IS NOT NULL + ORDER BY firstUsed + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + form_id, fieldname, value, times_used, first_used, last_used = row + + # First use event + if first_used: + try: + first_us, first_iso = convert_gecko_timestamp(first_used) + rows.append({ + 'timestamp': first_us, + 'datetime': first_iso, + 'timestamp_desc': 'Form Field First Used', + 'message': f"First use of form field: {fieldname}", + 'data_type': 'browser:form:first_use', + 'browser': browser_name, + 'form_id': form_id, + 'form_field_name': fieldname, + 'form_field_value': value[:50] + '...' if len(value) > 50 else value, + 'total_uses': times_used or 0 + }) + except TimestampValidationError: + pass + + # Last use event (if different) + if last_used and last_used != first_used: + try: + last_us, last_iso = convert_gecko_timestamp(last_used) + rows.append({ + 'timestamp': last_us, + 'datetime': last_iso, + 'timestamp_desc': 'Form Field Last Used', + 'message': f"Used form field: {fieldname}", + 'data_type': 'browser:form:use', + 'browser': browser_name, + 'form_id': form_id, + 'form_field_name': fieldname, + 'form_field_value': value[:50] + '...' if len(value) > 50 else value, + 'total_uses': times_used or 0 + }) + except TimestampValidationError: + pass + + return rows + + +def extract_gecko_annotations(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract page and bookmark annotations from Gecko database.""" + rows = [] + + # Page annotations + if table_exists(conn, 'moz_annos'): + cursor = conn.cursor() + query = """ + SELECT + a.id, + a.place_id, + a.dateAdded, + a.lastModified, + n.name, + a.content, + p.url, + p.title + FROM moz_annos a + JOIN moz_anno_attributes n ON a.anno_attribute_id = n.id + JOIN moz_places p ON a.place_id = p.id + WHERE a.dateAdded IS NOT NULL + ORDER BY a.dateAdded + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + + for row in results: + anno_id, place_id, date_added, last_modified, name, content, url, title = row + + # Annotation added + if date_added: + try: + added_us, added_iso = convert_gecko_timestamp(date_added) + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Page Annotation Added', + 'message': f"Added annotation '{name}' to: {title or url}", + 'data_type': 'browser:annotation:added', + 'browser': browser_name, + 'annotation_id': anno_id, + 'annotation_name': name, + 'annotation_content': content[:100] + '...' if content and len(content) > 100 else content, + 'url': url or "", + 'title': title or "" + }) + except TimestampValidationError: + pass + + # Annotation modified (if different) + if last_modified and last_modified != date_added: + try: + modified_us, modified_iso = convert_gecko_timestamp(last_modified) + rows.append({ + 'timestamp': modified_us, + 'datetime': modified_iso, + 'timestamp_desc': 'Page Annotation Modified', + 'message': f"Modified annotation '{name}' on: {title or url}", + 'data_type': 'browser:annotation:modified', + 'browser': browser_name, + 'annotation_id': anno_id, + 'annotation_name': name, + 'url': url or "" + }) + except TimestampValidationError: + pass + except sqlite3.Error: + pass + + # Bookmark annotations + if table_exists(conn, 'moz_items_annos'): + cursor = conn.cursor() + query = """ + SELECT + ia.id, + ia.item_id, + ia.dateAdded, + ia.lastModified, + n.name, + ia.content, + b.title + FROM moz_items_annos ia + JOIN moz_anno_attributes n ON ia.anno_attribute_id = n.id + JOIN moz_bookmarks b ON ia.item_id = b.id + WHERE ia.dateAdded IS NOT NULL + ORDER BY ia.dateAdded + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + + for row in results: + anno_id, item_id, date_added, last_modified, name, content, title = row + + # Annotation added + if date_added: + try: + added_us, added_iso = convert_gecko_timestamp(date_added) + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Bookmark Annotation Added', + 'message': f"Added annotation '{name}' to bookmark: {title}", + 'data_type': 'browser:annotation:added', + 'browser': browser_name, + 'annotation_id': anno_id, + 'annotation_name': name, + 'annotation_content': content[:100] + '...' if content and len(content) > 100 else content, + 'bookmark_title': title or "" + }) + except TimestampValidationError: + pass + + # Annotation modified (if different) + if last_modified and last_modified != date_added: + try: + modified_us, modified_iso = convert_gecko_timestamp(last_modified) + rows.append({ + 'timestamp': modified_us, + 'datetime': modified_iso, + 'timestamp_desc': 'Bookmark Annotation Modified', + 'message': f"Modified annotation '{name}' on bookmark: {title}", + 'data_type': 'browser:annotation:modified', + 'browser': browser_name, + 'annotation_id': anno_id, + 'annotation_name': name + }) + except TimestampValidationError: + pass + except sqlite3.Error: + pass + + return rows + + def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: """Extract page metadata/engagement events from Gecko database.""" if not table_exists(conn, 'moz_places_metadata'): @@ -694,7 +1305,8 @@ def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[ 'datetime': created_iso, 'timestamp_desc': 'Page Engagement', 'message': f"Engaged with: {title or '(No title)'} ({view_seconds:.1f}s)", - 'data_type': f'{browser_name.lower()}:page:engagement', + 'data_type': 'browser:page:engagement', + 'browser': browser_name, 'url': url or "", 'title': title or "(No title)", 'total_view_time_seconds': view_seconds, @@ -708,6 +1320,173 @@ def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[ return rows +def extract_gecko_input_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract address bar input history from Gecko database.""" + if not table_exists(conn, 'moz_inputhistory'): + return [] + + cursor = conn.cursor() + + query = """ + SELECT + ih.place_id, + ih.input, + ih.use_count, + p.url, + p.title, + p.last_visit_date + FROM moz_inputhistory ih + JOIN moz_places p ON ih.place_id = p.id + WHERE p.last_visit_date IS NOT NULL + ORDER BY p.last_visit_date + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + place_id, input_text, use_count, url, title, last_visit = row + + try: + unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit) + except TimestampValidationError: + continue + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Address Bar Input', + 'message': f"Typed in address bar: {input_text}", + 'data_type': 'browser:addressbar:input', + 'browser': browser_name, + 'input_text': input_text or "", + 'matched_url': url or "", + 'matched_title': title or "", + 'use_count': use_count or 0 + }) + + return rows + + +def extract_gecko_keywords(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract custom search keywords from Gecko database.""" + if not table_exists(conn, 'moz_keywords'): + return [] + + cursor = conn.cursor() + + # Check if dateAdded column exists (not in all Firefox versions) + if not column_exists(conn, 'moz_keywords', 'dateAdded'): + return [] + + query = """ + SELECT + k.id, + k.keyword, + k.dateAdded, + p.url, + p.title + FROM moz_keywords k + LEFT JOIN moz_places p ON k.place_id = p.id + WHERE k.dateAdded IS NOT NULL + ORDER BY k.dateAdded + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + keyword_id, keyword, date_added, url, title = row + + try: + added_us, added_iso = convert_gecko_timestamp(date_added) + except TimestampValidationError: + continue + + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Keyword Added', + 'message': f"Added search keyword: {keyword}", + 'data_type': 'browser:keyword:added', + 'browser': browser_name, + 'keyword_id': keyword_id, + 'keyword': keyword or "", + 'search_url': url or "", + 'title': title or "" + }) + + return rows + + +def extract_gecko_origins(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract origin (domain) tracking data from Gecko database.""" + if not table_exists(conn, 'moz_origins'): + return [] + + cursor = conn.cursor() + + # Check for last_visit_date column + if not column_exists(conn, 'moz_origins', 'last_visit_date'): + return [] + + query = """ + SELECT + id, + prefix, + host, + frecency, + last_visit_date + FROM moz_origins + WHERE last_visit_date IS NOT NULL + ORDER BY last_visit_date + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + origin_id, prefix, host, frecency, last_visit = row + + try: + unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit) + except TimestampValidationError: + continue + + full_origin = f"{prefix}{host}" if prefix else host + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Domain Visited', + 'message': f"Visited domain: {full_origin}", + 'data_type': 'browser:domain:visit', + 'browser': browser_name, + 'origin': full_origin or "", + 'host': host or "", + 'prefix': prefix or "", + 'frecency_score': frecency or 0 + }) + + return rows + + +# ============================================================================ +# WEBKIT/SAFARI EXTRACTORS +# ============================================================================ + def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: """Extract visit events from WebKit database with resolved redirect chains.""" cursor = conn.cursor() @@ -760,7 +1539,8 @@ def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[D 'datetime': iso_datetime, 'timestamp_desc': 'Visit Time', 'message': message, - 'data_type': f'{browser_name.lower()}:history:visit', + 'data_type': 'browser:page:visit', + 'browser': browser_name, 'url': url or "", 'title': display_title, 'load_successful': bool(load_successful), @@ -779,6 +1559,292 @@ def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[D return rows +def extract_webkit_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract bookmarks from WebKit database.""" + if not table_exists(conn, 'bookmarks'): + return [] + + cursor = conn.cursor() + + # Check for date_added column + if not column_exists(conn, 'bookmarks', 'date_added'): + return [] + + query = """ + SELECT + id, + title, + url, + date_added, + date_last_modified + FROM bookmarks + WHERE date_added > 0 + ORDER BY date_added + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + bm_id, title, url, date_added, date_modified = row + + # Bookmark added + if date_added: + try: + added_us, added_iso = convert_webkit_timestamp(date_added) + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Bookmark Added', + 'message': f"Bookmarked: {title or url}", + 'data_type': 'browser:bookmark:added', + 'browser': browser_name, + 'bookmark_id': bm_id, + 'bookmark_title': title or "", + 'url': url or "" + }) + except TimestampValidationError: + pass + + # Bookmark modified (if different) + if date_modified and date_modified != date_added: + try: + modified_us, modified_iso = convert_webkit_timestamp(date_modified) + rows.append({ + 'timestamp': modified_us, + 'datetime': modified_iso, + 'timestamp_desc': 'Bookmark Modified', + 'message': f"Modified bookmark: {title or url}", + 'data_type': 'browser:bookmark:modified', + 'browser': browser_name, + 'bookmark_id': bm_id, + 'bookmark_title': title or "", + 'url': url or "" + }) + except TimestampValidationError: + pass + + return rows + + +def extract_webkit_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract downloads from WebKit database.""" + if not table_exists(conn, 'downloads'): + return [] + + cursor = conn.cursor() + + # Check for date_started column + if not column_exists(conn, 'downloads', 'date_started'): + return [] + + query = """ + SELECT + id, + url, + path, + mime_type, + bytes_received, + total_bytes, + date_started, + date_finished + FROM downloads + WHERE date_started > 0 + ORDER BY date_started + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + dl_id, url, path, mime_type, bytes_received, total_bytes, date_started, date_finished = row + + filename = Path(path).name if path else "(unknown)" + + # Download started + if date_started: + try: + started_us, started_iso = convert_webkit_timestamp(date_started) + rows.append({ + 'timestamp': started_us, + 'datetime': started_iso, + 'timestamp_desc': 'Download Started', + 'message': f"Download started: {filename} ({mime_type or 'unknown type'})", + 'data_type': 'browser:download:start', + 'browser': browser_name, + 'download_id': dl_id, + 'filename': filename, + 'source_url': url or "", + 'file_path': path or "", + 'file_size_bytes': total_bytes or 0, + 'mime_type': mime_type or "" + }) + except TimestampValidationError: + pass + + # Download finished (if different) + if date_finished and date_finished != date_started: + try: + finished_us, finished_iso = convert_webkit_timestamp(date_finished) + duration_seconds = (finished_us - started_us) / 1000000 if date_started else 0 + rows.append({ + 'timestamp': finished_us, + 'datetime': finished_iso, + 'timestamp_desc': 'Download Completed', + 'message': f"Download completed: {filename} ({bytes_received or 0} bytes in {duration_seconds:.1f}s)", + 'data_type': 'browser:download:complete', + 'browser': browser_name, + 'download_id': dl_id, + 'filename': filename, + 'file_path': path or "", + 'file_size_bytes': bytes_received or 0, + 'mime_type': mime_type or "", + 'download_duration_seconds': duration_seconds + }) + except TimestampValidationError: + pass + + return rows + + +def extract_webkit_reading_list(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract Reading List items from WebKit database.""" + if not table_exists(conn, 'reading_list'): + return [] + + cursor = conn.cursor() + + # Check for date_added column + if not column_exists(conn, 'reading_list', 'date_added'): + return [] + + query = """ + SELECT + id, + title, + url, + date_added, + date_last_viewed + FROM reading_list + WHERE date_added > 0 + ORDER BY date_added + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + item_id, title, url, date_added, date_viewed = row + + # Reading list item added + if date_added: + try: + added_us, added_iso = convert_webkit_timestamp(date_added) + rows.append({ + 'timestamp': added_us, + 'datetime': added_iso, + 'timestamp_desc': 'Reading List Item Added', + 'message': f"Added to Reading List: {title or url}", + 'data_type': 'browser:readinglist:added', + 'browser': browser_name, + 'reading_list_id': item_id, + 'title': title or "", + 'url': url or "" + }) + except TimestampValidationError: + pass + + # Reading list item viewed (if present) + if date_viewed and date_viewed > 0: + try: + viewed_us, viewed_iso = convert_webkit_timestamp(date_viewed) + rows.append({ + 'timestamp': viewed_us, + 'datetime': viewed_iso, + 'timestamp_desc': 'Reading List Item Viewed', + 'message': f"Viewed Reading List item: {title or url}", + 'data_type': 'browser:readinglist:viewed', + 'browser': browser_name, + 'reading_list_id': item_id, + 'title': title or "", + 'url': url or "" + }) + except TimestampValidationError: + pass + + return rows + + +def extract_webkit_top_sites(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]: + """Extract Top Sites data from WebKit database.""" + if not table_exists(conn, 'top_sites'): + return [] + + cursor = conn.cursor() + + # Check for last_visited column + if not column_exists(conn, 'top_sites', 'last_visited'): + return [] + + query = """ + SELECT + id, + url, + title, + visit_count, + last_visited + FROM top_sites + WHERE last_visited > 0 + ORDER BY last_visited + """ + + try: + cursor.execute(query) + results = cursor.fetchall() + except sqlite3.Error: + return [] + + rows = [] + for row in results: + site_id, url, title, visit_count, last_visited = row + + try: + unix_microseconds, iso_datetime = convert_webkit_timestamp(last_visited) + except TimestampValidationError: + continue + + rows.append({ + 'timestamp': unix_microseconds, + 'datetime': iso_datetime, + 'timestamp_desc': 'Top Site Last Visited', + 'message': f"Top site visited: {title or url}", + 'data_type': 'browser:topsite:visit', + 'browser': browser_name, + 'site_id': site_id, + 'url': url or "", + 'title': title or "", + 'visit_count': visit_count or 0 + }) + + return rows + + +# ============================================================================ +# MAIN EXTRACTION ORCHESTRATION +# ============================================================================ + def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: """ Extract ALL timeline events from browser database. @@ -794,48 +1860,55 @@ def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[s event_counts = {} print(f"Extracting events from {browser_name} database...") + print("=" * 60) try: if browser_type == 'gecko': - # Firefox/Gecko - visits = extract_gecko_visits(conn, browser_name) - all_rows.extend(visits) - event_counts['visits'] = len(visits) - print(f" ✓ Extracted {len(visits):,} visit events") - - bookmarks = extract_gecko_bookmarks(conn, browser_name) - all_rows.extend(bookmarks) - event_counts['bookmarks'] = len(bookmarks) - print(f" ✓ Extracted {len(bookmarks):,} bookmark events") - - metadata = extract_gecko_metadata(conn, browser_name) - all_rows.extend(metadata) - event_counts['engagement'] = len(metadata) - print(f" ✓ Extracted {len(metadata):,} page engagement events") + # Firefox/Gecko - extract all event types + extractors = [ + ('Page Visits', extract_gecko_visits), + ('Bookmarks', extract_gecko_bookmarks), + ('Downloads', extract_gecko_downloads), + ('Form History', extract_gecko_form_history), + ('Annotations', extract_gecko_annotations), + ('Page Engagement', extract_gecko_metadata), + ('Address Bar Input', extract_gecko_input_history), + ('Search Keywords', extract_gecko_keywords), + ('Domain Tracking', extract_gecko_origins), + ] elif browser_type == 'chromium': - # Chromium - visits = extract_chromium_visits(conn, browser_name) - all_rows.extend(visits) - event_counts['visits'] = len(visits) - print(f" ✓ Extracted {len(visits):,} visit events") - - downloads = extract_chromium_downloads(conn, browser_name) - all_rows.extend(downloads) - event_counts['downloads'] = len(downloads) - print(f" ✓ Extracted {len(downloads):,} download events") - - searches = extract_chromium_search_terms(conn, browser_name) - all_rows.extend(searches) - event_counts['searches'] = len(searches) - print(f" ✓ Extracted {len(searches):,} search query events") + # Chromium - extract all event types + extractors = [ + ('Page Visits', extract_chromium_visits), + ('Downloads', extract_chromium_downloads), + ('Search Queries', extract_chromium_search_terms), + ('Form Autofill', extract_chromium_autofill), + ('Favicons', extract_chromium_favicons), + ('Media Playback', extract_chromium_media_history), + ('Site Engagement', extract_chromium_site_engagement), + ] elif browser_type == 'webkit': - # Safari/WebKit - visits = extract_webkit_visits(conn, browser_name) - all_rows.extend(visits) - event_counts['visits'] = len(visits) - print(f" ✓ Extracted {len(visits):,} visit events") + # Safari/WebKit - extract all event types + extractors = [ + ('Page Visits', extract_webkit_visits), + ('Bookmarks', extract_webkit_bookmarks), + ('Downloads', extract_webkit_downloads), + ('Reading List', extract_webkit_reading_list), + ('Top Sites', extract_webkit_top_sites), + ] + + # Run all extractors + for name, extractor_func in extractors: + try: + events = extractor_func(conn, browser_name) + all_rows.extend(events) + event_counts[name] = len(events) + print(f" ✓ {name:25} {len(events):>7,} events") + except Exception as e: + print(f" ✗ {name:25} Error: {e}") + event_counts[name] = 0 finally: conn.close() @@ -843,6 +1916,8 @@ def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[s # Sort all events by timestamp all_rows.sort(key=lambda x: x['timestamp']) + print("=" * 60) + return all_rows, event_counts @@ -869,16 +1944,37 @@ def generate_default_output_filename(browser_type: str, input_path: str) -> str: def main() -> int: parser = argparse.ArgumentParser( - description='Convert browser events to Timesketch CSV format', + description='Convert ALL browser events to Timesketch CSV format', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Extracts ALL timestamped events from browser databases: - - Page visits (with full metadata) - - Downloads (start and completion) - - Bookmarks (added and modified) + +CHROMIUM (Chrome, Edge, Brave, Opera, Vivaldi): + - Page visits (with metadata and navigation chains) + - Downloads (start, completion, access times) - Search queries - - Page engagement metrics (Firefox) - - And more! + - Form autofill data (first use, last use) + - Favicon updates + - Media playback history + - Site engagement scores + +GECKO/FIREFOX: + - Page visits (with metadata and navigation chains) + - Bookmarks (added, modified) + - Downloads + - Form history (first use, last use) + - Page and bookmark annotations + - Page engagement metrics (view time, scrolling, typing) + - Address bar input history + - Custom search keywords + - Domain-level tracking data + +WEBKIT/SAFARI: + - Page visits (with redirect chains) + - Bookmarks (added, modified) + - Downloads (start, completion) + - Reading List items (added, viewed) + - Top Sites tracking Browser types: gecko, firefox - Gecko-based browsers (Firefox) @@ -886,12 +1982,20 @@ Browser types: webkit, safari - WebKit-based browsers (Safari) auto - Auto-detect browser type (default) +Output Format: + - Browser-agnostic Timesketch-compatible CSV + - Consistent event naming across all browsers + - All timestamps in Unix microseconds + ISO 8601 + Example usage: # Auto-detect and extract everything - python browser2timesketch_enhanced.py -i /path/to/History + python browser2timesketch_complete.py -i /path/to/History # Specify browser and output - python browser2timesketch_enhanced.py -b firefox -i places.sqlite -o output.csv + python browser2timesketch_complete.py -b firefox -i places.sqlite -o output.csv + + # Custom browser name (e.g., for Brave, Edge) + python browser2timesketch_complete.py --browser-name "Brave" -i History """ ) @@ -915,7 +2019,7 @@ Example usage: parser.add_argument( '--browser-name', - help='Custom browser name for the data_type field' + help='Custom browser name for the browser field (e.g., "Brave", "Edge")' ) args = parser.parse_args() @@ -958,7 +2062,7 @@ Example usage: all_rows, event_counts = extract_all_events(args.input, browser_type, args.browser_name) if not all_rows: - print("No events found in database!") + print("\n❌ No events found in database!") return 1 # Write to CSV @@ -972,26 +2076,28 @@ Example usage: print(f"Total events: {len(all_rows):,}") print("\nEvent breakdown:") for event_type, count in sorted(event_counts.items()): - print(f" - {event_type:20} {count:,}") + if count > 0: + print(f" • {event_type:25} {count:>7,} events") print(f"\n✓ Output saved to: {output_csv}") + print(f"✓ Format: Browser-agnostic Timesketch CSV") print("=" * 60) return 0 except DatabaseValidationError as e: - print(f"Database Validation Error: {e}", file=sys.stderr) + print(f"\n❌ Database Validation Error: {e}", file=sys.stderr) return 1 except BrowserDetectionError as e: - print(f"Browser Detection Error: {e}", file=sys.stderr) + print(f"\n❌ Browser Detection Error: {e}", file=sys.stderr) return 1 except sqlite3.Error as e: - print(f"Database Error: {e}", file=sys.stderr) + print(f"\n❌ Database Error: {e}", file=sys.stderr) return 1 except KeyboardInterrupt: - print("\nOperation cancelled by user", file=sys.stderr) + print("\n\n⚠️ Operation cancelled by user", file=sys.stderr) return 130 except Exception as e: - print(f"Unexpected Error: {e}", file=sys.stderr) + print(f"\n❌ Unexpected Error: {e}", file=sys.stderr) import traceback traceback.print_exc() return 1