browser2timesketch/browser2timesketch.py
2025-10-16 13:54:25 +02:00

2107 lines
69 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Browser History to Timesketch CSV Converter - Comprehensive Edition
Extracts ALL timestamped browser events to Timesketch-compatible CSV format.
Browser-agnostic output with consistent event naming across all browsers.
Supports:
- Firefox/Gecko: visits, bookmarks, downloads, form history, annotations,
page metadata, input history, keywords, origins
- Chrome/Chromium/Edge/Brave: visits, downloads, searches, autofill,
favicons, media history, site engagement
- Safari/WebKit: visits, bookmarks, downloads, reading list, top sites
Output format: Timesketch-compatible CSV with browser-agnostic event types
"""
import sqlite3
import csv
import argparse
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Tuple, Optional, Dict, Any, List
class BrowserDetectionError(Exception):
"""Raised when browser type cannot be detected"""
pass
class DatabaseValidationError(Exception):
"""Raised when database validation fails"""
pass
class TimestampValidationError(Exception):
"""Raised when timestamp validation fails"""
pass
def validate_sqlite_database(db_path: str) -> None:
"""
Validate that the file is a SQLite database and is accessible.
Args:
db_path: Path to database file
Raises:
DatabaseValidationError: If validation fails
"""
path = Path(db_path)
if not path.exists():
raise DatabaseValidationError(f"Database file not found: {db_path}")
if not path.is_file():
raise DatabaseValidationError(f"Path is not a file: {db_path}")
# Try to open as SQLite database
try:
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1")
conn.close()
except sqlite3.DatabaseError as e:
raise DatabaseValidationError(f"Not a valid SQLite database: {db_path}. Error: {e}")
except sqlite3.OperationalError as e:
raise DatabaseValidationError(f"Cannot access database (may be locked or corrupted): {db_path}. Error: {e}")
def detect_browser_type(db_path: str) -> str:
"""
Auto-detect browser type by examining database schema.
Args:
db_path: Path to database file
Returns:
Detected browser type: 'gecko', 'chromium', or 'webkit'
Raises:
BrowserDetectionError: If browser type cannot be determined
"""
try:
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in cursor.fetchall()}
conn.close()
if 'moz_historyvisits' in tables and 'moz_places' in tables:
return 'gecko'
if 'visits' in tables and 'urls' in tables:
return 'chromium'
if 'history_visits' in tables and 'history_items' in tables:
return 'webkit'
raise BrowserDetectionError(
f"Cannot determine browser type. Found tables: {', '.join(sorted(tables))}"
)
except sqlite3.Error as e:
raise BrowserDetectionError(f"Error reading database schema: {e}")
def validate_timestamp(unix_microseconds: int, browser_type: str) -> None:
"""
Validate that a timestamp is within reasonable bounds.
Args:
unix_microseconds: Timestamp in Unix microseconds
browser_type: Browser type for error messages
Raises:
TimestampValidationError: If timestamp is unreasonable
"""
if unix_microseconds <= 0:
return
timestamp_seconds = unix_microseconds / 1000000
min_date = datetime(1990, 1, 1)
max_date = datetime(2040, 1, 1)
min_seconds = min_date.timestamp()
max_seconds = max_date.timestamp()
if timestamp_seconds < min_seconds:
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
raise TimestampValidationError(
f"Timestamp appears too old: {dt.strftime('%Y-%m-%d %H:%M:%S')} (before 1990). "
f"This may indicate a timestamp conversion error for {browser_type}."
)
if timestamp_seconds > max_seconds:
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
raise TimestampValidationError(
f"Timestamp appears to be in the future: {dt.strftime('%Y-%m-%d %H:%M:%S')} (after 2040). "
f"This may indicate a timestamp conversion error for {browser_type}."
)
def convert_gecko_timestamp(gecko_timestamp: Optional[int]) -> Tuple[int, str]:
"""Convert Gecko/Firefox timestamp to Unix microseconds and ISO format."""
if gecko_timestamp is None or gecko_timestamp == 0:
return 0, ""
validate_timestamp(gecko_timestamp, "Gecko/Firefox")
timestamp_seconds = gecko_timestamp / 1000000
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
return gecko_timestamp, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
def convert_chromium_timestamp(chromium_timestamp: Optional[int]) -> Tuple[int, str]:
"""Convert Chromium timestamp to Unix microseconds and ISO format."""
if chromium_timestamp is None or chromium_timestamp == 0:
return 0, ""
chromium_epoch_offset = 11644473600
timestamp_seconds = (chromium_timestamp / 1000000) - chromium_epoch_offset
unix_microseconds = int(timestamp_seconds * 1000000)
validate_timestamp(unix_microseconds, "Chromium")
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
return unix_microseconds, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
def convert_webkit_timestamp(webkit_timestamp: Optional[float]) -> Tuple[int, str]:
"""Convert WebKit/Safari timestamp to Unix microseconds and ISO format."""
if webkit_timestamp is None or webkit_timestamp == 0:
return 0, ""
webkit_epoch_offset = 978307200
timestamp_seconds = webkit_timestamp + webkit_epoch_offset
unix_microseconds = int(timestamp_seconds * 1000000)
validate_timestamp(unix_microseconds, "WebKit/Safari")
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
return unix_microseconds, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
def write_timesketch_csv(output_csv: str, rows: List[Dict[str, Any]]) -> None:
"""
Write history data to Timesketch-compatible CSV format with dynamic fields.
Args:
output_csv: Path to output CSV file
rows: List of row dictionaries to write
"""
if not rows:
return
# Collect all unique fields from all rows
all_fields = set()
for row in rows:
all_fields.update(row.keys())
# Define standard field order (these come first)
standard_fields = ['timestamp', 'datetime', 'timestamp_desc', 'message', 'data_type']
# Build fieldnames list with standard fields first, then alphabetically sorted remainder
fieldnames = [f for f in standard_fields if f in all_fields]
remaining_fields = sorted(all_fields - set(standard_fields))
fieldnames.extend(remaining_fields)
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for row in rows:
writer.writerow(row)
def connect_database_readonly(db_path: str) -> sqlite3.Connection:
"""Connect to database in read-only mode to avoid lock issues."""
try:
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
return conn
except sqlite3.OperationalError as e:
raise sqlite3.OperationalError(
f"Cannot open database (it may be locked by the browser): {db_path}\n"
f"Please close the browser and try again, or copy the database file.\n"
f"Original error: {e}"
)
def table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
"""Check if a table exists in the database."""
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
)
return cursor.fetchone() is not None
def column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool:
"""Check if a column exists in a table."""
cursor = conn.cursor()
try:
cursor.execute(f"PRAGMA table_info({table_name})")
columns = {row[1] for row in cursor.fetchall()}
return column_name in columns
except sqlite3.Error:
return False
# ============================================================================
# CHROMIUM EXTRACTORS
# ============================================================================
def extract_chromium_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract visit events from Chromium database with resolved foreign keys."""
cursor = conn.cursor()
query = """
SELECT
visits.visit_time,
urls.url,
urls.title,
visits.transition,
visits.visit_duration,
urls.visit_count,
urls.typed_count,
visits.segment_id,
visits.incremented_omnibox_typed_score,
urls.hidden,
from_urls.url as from_url,
from_urls.title as from_title,
opener_urls.url as opener_url,
opener_urls.title as opener_title
FROM visits
JOIN urls ON visits.url = urls.id
LEFT JOIN visits from_visits ON visits.from_visit = from_visits.id
LEFT JOIN urls from_urls ON from_visits.url = from_urls.id
LEFT JOIN visits opener_visits ON visits.opener_visit = opener_visits.id
LEFT JOIN urls opener_urls ON opener_visits.url = opener_urls.id
ORDER BY visits.visit_time
"""
cursor.execute(query)
results = cursor.fetchall()
transition_types = {
0: "Link", 1: "Typed", 2: "Auto_Bookmark", 3: "Auto_Subframe",
4: "Manual_Subframe", 5: "Generated", 6: "Start_Page",
7: "Form_Submit", 8: "Reload", 9: "Keyword", 10: "Keyword_Generated"
}
rows = []
for row in results:
(chromium_timestamp, url, title, transition, visit_duration,
visit_count, typed_count, segment_id, incremented_typed, hidden,
from_url, from_title, opener_url, opener_title) = row
try:
unix_microseconds, iso_datetime = convert_chromium_timestamp(chromium_timestamp)
except TimestampValidationError:
continue
core_transition = transition & 0xFF
transition_name = transition_types.get(core_transition, f"Unknown({core_transition})")
# Build row with only useful fields
row_data = {
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Visit Time',
'message': f"Visited: {title or '(No title)'}",
'data_type': 'browser:page:visit',
'browser': browser_name,
'url': url or "",
'title': title or "(No title)",
'visit_type': transition_name,
'visit_duration_us': visit_duration or 0,
'total_visits': visit_count or 0,
'typed_count': typed_count or 0,
'typed_in_omnibox': bool(incremented_typed),
'hidden': bool(hidden)
}
# Add optional fields only if present
if from_url:
row_data['from_url'] = from_url
if from_title:
row_data['from_title'] = from_title
if opener_url:
row_data['opener_url'] = opener_url
if opener_title:
row_data['opener_title'] = opener_title
# Add session ID only if non-zero
if segment_id and segment_id != 0:
row_data['session_id'] = segment_id
rows.append(row_data)
return rows
def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract download events from Chromium database."""
if not table_exists(conn, 'downloads'):
return []
cursor = conn.cursor()
query = """
SELECT
id,
guid,
current_path,
target_path,
start_time,
received_bytes,
total_bytes,
state,
danger_type,
interrupt_reason,
end_time,
opened,
last_access_time,
referrer,
tab_url,
mime_type
FROM downloads
ORDER BY start_time
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
download_states = {
0: "In Progress",
1: "Complete",
2: "Cancelled",
3: "Interrupted",
4: "Dangerous"
}
rows = []
for row in results:
(dl_id, guid, current_path, target_path, start_time, received_bytes,
total_bytes, state, danger_type, interrupt_reason, end_time, opened,
last_access_time, referrer, tab_url, mime_type) = row
try:
start_us, start_iso = convert_chromium_timestamp(start_time)
end_us, end_iso = convert_chromium_timestamp(end_time) if end_time else (0, "")
access_us, access_iso = convert_chromium_timestamp(last_access_time) if last_access_time else (0, "")
except TimestampValidationError:
continue
state_name = download_states.get(state, f"Unknown({state})")
filename = Path(target_path).name if target_path else "(unknown)"
# Download start event
rows.append({
'timestamp': start_us,
'datetime': start_iso,
'timestamp_desc': 'Download Started',
'message': f"Download started: {filename} ({mime_type or 'unknown type'})",
'data_type': 'browser:download:start',
'browser': browser_name,
'download_id': dl_id,
'filename': filename,
'file_path': target_path or "",
'file_size_bytes': total_bytes or 0,
'mime_type': mime_type or "",
'download_state': state_name,
'referrer_url': referrer or "",
'tab_url': tab_url or "",
'dangerous': bool(danger_type),
'interrupted': bool(interrupt_reason)
})
# Download complete event (if completed)
if end_time and end_time != start_time:
duration_seconds = (end_us - start_us) / 1000000
rows.append({
'timestamp': end_us,
'datetime': end_iso,
'timestamp_desc': 'Download Completed',
'message': f"Download completed: {filename} ({received_bytes or 0} bytes in {duration_seconds:.1f}s)",
'data_type': 'browser:download:complete',
'browser': browser_name,
'download_id': dl_id,
'filename': filename,
'file_path': target_path or "",
'file_size_bytes': received_bytes or 0,
'mime_type': mime_type or "",
'download_state': state_name,
'download_duration_seconds': duration_seconds
})
# Last access event (if different from completion)
if last_access_time and last_access_time != end_time and last_access_time != start_time:
rows.append({
'timestamp': access_us,
'datetime': access_iso,
'timestamp_desc': 'File Accessed',
'message': f"Downloaded file accessed: {filename}",
'data_type': 'browser:download:accessed',
'browser': browser_name,
'download_id': dl_id,
'filename': filename,
'file_path': target_path or ""
})
return rows
def extract_chromium_search_terms(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract search terms from Chromium database."""
if not table_exists(conn, 'keyword_search_terms'):
return []
cursor = conn.cursor()
query = """
SELECT
kst.term,
kst.normalized_term,
u.url,
u.title,
u.last_visit_time
FROM keyword_search_terms kst
JOIN urls u ON kst.url_id = u.id
ORDER BY u.last_visit_time
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
term, normalized_term, url, title, last_visit = row
try:
unix_microseconds, iso_datetime = convert_chromium_timestamp(last_visit)
except TimestampValidationError:
continue
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Search Performed',
'message': f"Search: {term}",
'data_type': 'browser:search:query',
'browser': browser_name,
'search_term': term,
'normalized_search_term': normalized_term,
'search_url': url or "",
'search_page_title': title or ""
})
return rows
def extract_chromium_autofill(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract autofill/form data from Chromium database."""
if not table_exists(conn, 'autofill'):
return []
cursor = conn.cursor()
# Check which timestamp columns exist
has_created = column_exists(conn, 'autofill', 'date_created')
has_last_used = column_exists(conn, 'autofill', 'date_last_used')
if not (has_created or has_last_used):
return []
# Build query based on available columns
timestamp_cols = []
if has_created:
timestamp_cols.append('date_created')
if has_last_used:
timestamp_cols.append('date_last_used')
query = f"""
SELECT
name,
value,
{', '.join(timestamp_cols)},
count
FROM autofill
WHERE {' OR '.join([f'{col} > 0' for col in timestamp_cols])}
ORDER BY {timestamp_cols[0]}
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
name, value, *timestamps, count = row
date_created = timestamps[0] if has_created else None
date_last_used = timestamps[1] if has_last_used and len(timestamps) > 1 else timestamps[0] if not has_created else None
# Form field created/first used
if date_created:
try:
created_us, created_iso = convert_chromium_timestamp(date_created)
rows.append({
'timestamp': created_us,
'datetime': created_iso,
'timestamp_desc': 'Form Field First Used',
'message': f"First use of form field: {name}",
'data_type': 'browser:form:first_use',
'browser': browser_name,
'form_field_name': name,
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
'total_uses': count or 0
})
except TimestampValidationError:
pass
# Form field last used (if different)
if date_last_used and date_last_used != date_created:
try:
last_us, last_iso = convert_chromium_timestamp(date_last_used)
rows.append({
'timestamp': last_us,
'datetime': last_iso,
'timestamp_desc': 'Form Field Last Used',
'message': f"Used form field: {name}",
'data_type': 'browser:form:use',
'browser': browser_name,
'form_field_name': name,
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
'total_uses': count or 0
})
except TimestampValidationError:
pass
return rows
def extract_chromium_favicons(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract favicon mapping timestamps from Chromium database."""
if not table_exists(conn, 'icon_mapping'):
return []
# Check if last_updated column exists (not in all Chromium versions)
if not column_exists(conn, 'icon_mapping', 'last_updated'):
return []
cursor = conn.cursor()
query = """
SELECT
im.last_updated,
im.page_url,
f.url as favicon_url
FROM icon_mapping im
LEFT JOIN favicons f ON im.icon_id = f.id
WHERE im.last_updated > 0
ORDER BY im.last_updated
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
last_updated, page_url, favicon_url = row
try:
unix_microseconds, iso_datetime = convert_chromium_timestamp(last_updated)
except TimestampValidationError:
continue
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Favicon Updated',
'message': f"Updated favicon for: {page_url}",
'data_type': 'browser:favicon:update',
'browser': browser_name,
'page_url': page_url or "",
'favicon_url': favicon_url or ""
})
return rows
def extract_chromium_media_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract media playback history from Chromium database."""
if not table_exists(conn, 'playback'):
return []
cursor = conn.cursor()
# Check available columns
has_last_updated = column_exists(conn, 'playback', 'last_updated_time_s')
if not has_last_updated:
return []
query = """
SELECT
p.url,
p.watch_time_s,
p.has_audio,
p.has_video,
p.last_updated_time_s
FROM playback p
WHERE p.last_updated_time_s > 0
ORDER BY p.last_updated_time_s
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
url, watch_time, has_audio, has_video, last_updated = row
# Convert Unix seconds to microseconds for consistency
unix_microseconds = int(last_updated * 1000000)
try:
validate_timestamp(unix_microseconds, "Chromium Media")
dt = datetime.fromtimestamp(last_updated, tz=timezone.utc)
iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
except TimestampValidationError:
continue
media_type = []
if has_audio:
media_type.append("audio")
if has_video:
media_type.append("video")
media_type_str = "+".join(media_type) if media_type else "unknown"
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Media Playback',
'message': f"Played {media_type_str}: {url} ({watch_time:.1f}s)",
'data_type': 'browser:media:playback',
'browser': browser_name,
'media_url': url or "",
'watch_time_seconds': watch_time or 0,
'has_audio': bool(has_audio),
'has_video': bool(has_video)
})
return rows
def extract_chromium_site_engagement(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract site engagement scores from Chromium database."""
if not table_exists(conn, 'site_engagement'):
return []
cursor = conn.cursor()
# Check for timestamp column
has_last_engagement = column_exists(conn, 'site_engagement', 'last_engagement_time')
if not has_last_engagement:
return []
query = """
SELECT
origin_url,
score,
last_engagement_time
FROM site_engagement
WHERE last_engagement_time > 0 AND score > 0
ORDER BY last_engagement_time
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
origin_url, score, last_engagement = row
# Convert internal timestamp format (typically Unix seconds)
unix_microseconds = int(last_engagement * 1000000)
try:
validate_timestamp(unix_microseconds, "Chromium Site Engagement")
dt = datetime.fromtimestamp(last_engagement, tz=timezone.utc)
iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
except TimestampValidationError:
continue
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Site Engagement Updated',
'message': f"Site engagement: {origin_url} (score: {score:.1f})",
'data_type': 'browser:engagement:update',
'browser': browser_name,
'site_url': origin_url or "",
'engagement_score': score or 0
})
return rows
# ============================================================================
# GECKO/FIREFOX EXTRACTORS
# ============================================================================
def extract_gecko_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract visit events from Gecko database with resolved foreign keys."""
cursor = conn.cursor()
query = """
SELECT
v.visit_date,
p.url,
p.title,
p.description,
v.visit_type,
v.session,
p.visit_count,
p.typed,
p.frecency,
p.hidden,
p.rev_host,
prev_p.url as from_url,
prev_p.title as from_title
FROM moz_historyvisits v
JOIN moz_places p ON v.place_id = p.id
LEFT JOIN moz_historyvisits prev_v ON v.from_visit = prev_v.id
LEFT JOIN moz_places prev_p ON prev_v.place_id = prev_p.id
ORDER BY v.visit_date
"""
cursor.execute(query)
results = cursor.fetchall()
visit_types = {
1: "Link", 2: "Typed", 3: "Bookmark", 4: "Embed",
5: "Redirect_Permanent", 6: "Redirect_Temporary",
7: "Download", 8: "Framed_Link", 9: "Reload"
}
rows = []
for row in results:
(timestamp_us, url, title, description, visit_type_id,
session, visit_count, typed, frecency, hidden, rev_host,
from_url, from_title) = row
try:
unix_microseconds, iso_datetime = convert_gecko_timestamp(timestamp_us)
except TimestampValidationError:
continue
visit_type_name = visit_types.get(visit_type_id, f"Unknown({visit_type_id})")
message = f"Visited: {title or '(No title)'}"
if description:
message += f" - {description}"
# Build row with only useful fields
row_data = {
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Visit Time',
'message': message,
'data_type': 'browser:page:visit',
'browser': browser_name,
'url': url or "",
'title': title or "(No title)",
'visit_type': visit_type_name,
'total_visit_count': visit_count or 0,
'typed_count': typed or 0,
'frecency_score': frecency,
'hidden': bool(hidden),
'domain': rev_host[::-1] if rev_host else ""
}
# Add optional fields only if present
if description:
row_data['description'] = description
# Add navigation chain info if present
if from_url:
row_data['from_url'] = from_url
if from_title:
row_data['from_title'] = from_title
# Add session ID only if non-zero
if session and session != 0:
row_data['session_id'] = session
rows.append(row_data)
return rows
def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract bookmark events from Gecko database."""
if not table_exists(conn, 'moz_bookmarks'):
return []
cursor = conn.cursor()
query = """
SELECT
b.id,
b.type,
b.title,
b.dateAdded,
b.lastModified,
p.url,
p.title as page_title,
b.parent,
b.position
FROM moz_bookmarks b
LEFT JOIN moz_places p ON b.fk = p.id
WHERE b.dateAdded IS NOT NULL
ORDER BY b.dateAdded
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
bookmark_types = {
1: "Bookmark",
2: "Folder",
3: "Separator"
}
rows = []
for row in results:
(bm_id, bm_type, title, date_added, last_modified,
url, page_title, parent, position) = row
try:
added_us, added_iso = convert_gecko_timestamp(date_added)
except TimestampValidationError:
continue
type_name = bookmark_types.get(bm_type, f"Unknown({bm_type})")
display_title = title or page_title or "(No title)"
# Bookmark added event
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Bookmark Added',
'message': f"Bookmarked: {display_title}",
'data_type': 'browser:bookmark:added',
'browser': browser_name,
'bookmark_id': bm_id,
'bookmark_type': type_name,
'bookmark_title': display_title,
'url': url or "",
'parent_folder_id': parent,
'position': position
})
# Bookmark modified event (if different from added)
if last_modified and last_modified != date_added:
try:
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
rows.append({
'timestamp': modified_us,
'datetime': modified_iso,
'timestamp_desc': 'Bookmark Modified',
'message': f"Modified bookmark: {display_title}",
'data_type': 'browser:bookmark:modified',
'browser': browser_name,
'bookmark_id': bm_id,
'bookmark_title': display_title,
'url': url or ""
})
except TimestampValidationError:
pass
return rows
def extract_gecko_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract downloads from Gecko database (older Firefox versions)."""
if not table_exists(conn, 'moz_downloads'):
return []
cursor = conn.cursor()
query = """
SELECT
id,
name,
source,
target,
startTime,
endTime,
state,
referrer,
currBytes,
maxBytes,
mimeType
FROM moz_downloads
WHERE startTime IS NOT NULL
ORDER BY startTime
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
download_states = {
0: "Downloading",
1: "Complete",
2: "Failed",
3: "Cancelled",
4: "Paused"
}
rows = []
for row in results:
(dl_id, name, source, target, start_time, end_time, state,
referrer, curr_bytes, max_bytes, mime_type) = row
try:
start_us, start_iso = convert_gecko_timestamp(start_time)
except TimestampValidationError:
continue
state_name = download_states.get(state, f"Unknown({state})")
# Download start event
rows.append({
'timestamp': start_us,
'datetime': start_iso,
'timestamp_desc': 'Download Started',
'message': f"Download started: {name} ({mime_type or 'unknown type'})",
'data_type': 'browser:download:start',
'browser': browser_name,
'download_id': dl_id,
'filename': name or "",
'source_url': source or "",
'target_path': target or "",
'file_size_bytes': max_bytes or 0,
'mime_type': mime_type or "",
'download_state': state_name,
'referrer_url': referrer or ""
})
# Download complete event
if end_time and end_time != start_time:
try:
end_us, end_iso = convert_gecko_timestamp(end_time)
duration_seconds = (end_us - start_us) / 1000000
rows.append({
'timestamp': end_us,
'datetime': end_iso,
'timestamp_desc': 'Download Completed',
'message': f"Download completed: {name} ({curr_bytes or 0} bytes in {duration_seconds:.1f}s)",
'data_type': 'browser:download:complete',
'browser': browser_name,
'download_id': dl_id,
'filename': name or "",
'file_size_bytes': curr_bytes or 0,
'mime_type': mime_type or "",
'download_state': state_name,
'download_duration_seconds': duration_seconds
})
except TimestampValidationError:
pass
return rows
def extract_gecko_form_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract form autofill history from Gecko database."""
if not table_exists(conn, 'moz_formhistory'):
return []
cursor = conn.cursor()
query = """
SELECT
id,
fieldname,
value,
timesUsed,
firstUsed,
lastUsed
FROM moz_formhistory
WHERE firstUsed IS NOT NULL
ORDER BY firstUsed
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
form_id, fieldname, value, times_used, first_used, last_used = row
# First use event
if first_used:
try:
first_us, first_iso = convert_gecko_timestamp(first_used)
rows.append({
'timestamp': first_us,
'datetime': first_iso,
'timestamp_desc': 'Form Field First Used',
'message': f"First use of form field: {fieldname}",
'data_type': 'browser:form:first_use',
'browser': browser_name,
'form_id': form_id,
'form_field_name': fieldname,
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
'total_uses': times_used or 0
})
except TimestampValidationError:
pass
# Last use event (if different)
if last_used and last_used != first_used:
try:
last_us, last_iso = convert_gecko_timestamp(last_used)
rows.append({
'timestamp': last_us,
'datetime': last_iso,
'timestamp_desc': 'Form Field Last Used',
'message': f"Used form field: {fieldname}",
'data_type': 'browser:form:use',
'browser': browser_name,
'form_id': form_id,
'form_field_name': fieldname,
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
'total_uses': times_used or 0
})
except TimestampValidationError:
pass
return rows
def extract_gecko_annotations(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract page and bookmark annotations from Gecko database."""
rows = []
# Page annotations
if table_exists(conn, 'moz_annos'):
cursor = conn.cursor()
query = """
SELECT
a.id,
a.place_id,
a.dateAdded,
a.lastModified,
n.name,
a.content,
p.url,
p.title
FROM moz_annos a
JOIN moz_anno_attributes n ON a.anno_attribute_id = n.id
JOIN moz_places p ON a.place_id = p.id
WHERE a.dateAdded IS NOT NULL
ORDER BY a.dateAdded
"""
try:
cursor.execute(query)
results = cursor.fetchall()
for row in results:
anno_id, place_id, date_added, last_modified, name, content, url, title = row
# Annotation added
if date_added:
try:
added_us, added_iso = convert_gecko_timestamp(date_added)
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Page Annotation Added',
'message': f"Added annotation '{name}' to: {title or url}",
'data_type': 'browser:annotation:added',
'browser': browser_name,
'annotation_id': anno_id,
'annotation_name': name,
'annotation_content': content[:100] + '...' if content and len(content) > 100 else content,
'url': url or "",
'title': title or ""
})
except TimestampValidationError:
pass
# Annotation modified (if different)
if last_modified and last_modified != date_added:
try:
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
rows.append({
'timestamp': modified_us,
'datetime': modified_iso,
'timestamp_desc': 'Page Annotation Modified',
'message': f"Modified annotation '{name}' on: {title or url}",
'data_type': 'browser:annotation:modified',
'browser': browser_name,
'annotation_id': anno_id,
'annotation_name': name,
'url': url or ""
})
except TimestampValidationError:
pass
except sqlite3.Error:
pass
# Bookmark annotations
if table_exists(conn, 'moz_items_annos'):
cursor = conn.cursor()
query = """
SELECT
ia.id,
ia.item_id,
ia.dateAdded,
ia.lastModified,
n.name,
ia.content,
b.title
FROM moz_items_annos ia
JOIN moz_anno_attributes n ON ia.anno_attribute_id = n.id
JOIN moz_bookmarks b ON ia.item_id = b.id
WHERE ia.dateAdded IS NOT NULL
ORDER BY ia.dateAdded
"""
try:
cursor.execute(query)
results = cursor.fetchall()
for row in results:
anno_id, item_id, date_added, last_modified, name, content, title = row
# Annotation added
if date_added:
try:
added_us, added_iso = convert_gecko_timestamp(date_added)
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Bookmark Annotation Added',
'message': f"Added annotation '{name}' to bookmark: {title}",
'data_type': 'browser:annotation:added',
'browser': browser_name,
'annotation_id': anno_id,
'annotation_name': name,
'annotation_content': content[:100] + '...' if content and len(content) > 100 else content,
'bookmark_title': title or ""
})
except TimestampValidationError:
pass
# Annotation modified (if different)
if last_modified and last_modified != date_added:
try:
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
rows.append({
'timestamp': modified_us,
'datetime': modified_iso,
'timestamp_desc': 'Bookmark Annotation Modified',
'message': f"Modified annotation '{name}' on bookmark: {title}",
'data_type': 'browser:annotation:modified',
'browser': browser_name,
'annotation_id': anno_id,
'annotation_name': name
})
except TimestampValidationError:
pass
except sqlite3.Error:
pass
return rows
def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract page metadata/engagement events from Gecko database."""
if not table_exists(conn, 'moz_places_metadata'):
return []
cursor = conn.cursor()
query = """
SELECT
m.place_id,
m.created_at,
m.updated_at,
m.total_view_time,
m.typing_time,
m.key_presses,
m.scrolling_time,
m.scrolling_distance,
m.document_type,
p.url,
p.title
FROM moz_places_metadata m
JOIN moz_places p ON m.place_id = p.id
WHERE m.created_at > 0
ORDER BY m.created_at
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
(place_id, created_at, updated_at, total_view_time, typing_time,
key_presses, scrolling_time, scrolling_distance, document_type,
url, title) = row
try:
created_us, created_iso = convert_gecko_timestamp(created_at)
except TimestampValidationError:
continue
# Convert microseconds to seconds for display
view_seconds = (total_view_time or 0) / 1000000
typing_seconds = (typing_time or 0) / 1000000
scrolling_seconds = (scrolling_time or 0) / 1000000
rows.append({
'timestamp': created_us,
'datetime': created_iso,
'timestamp_desc': 'Page Engagement',
'message': f"Engaged with: {title or '(No title)'} ({view_seconds:.1f}s)",
'data_type': 'browser:page:engagement',
'browser': browser_name,
'url': url or "",
'title': title or "(No title)",
'total_view_time_seconds': view_seconds,
'typing_time_seconds': typing_seconds,
'key_presses': key_presses or 0,
'scrolling_time_seconds': scrolling_seconds,
'scrolling_distance': scrolling_distance or 0,
'document_type': document_type
})
return rows
def extract_gecko_input_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract address bar input history from Gecko database."""
if not table_exists(conn, 'moz_inputhistory'):
return []
cursor = conn.cursor()
query = """
SELECT
ih.place_id,
ih.input,
ih.use_count,
p.url,
p.title,
p.last_visit_date
FROM moz_inputhistory ih
JOIN moz_places p ON ih.place_id = p.id
WHERE p.last_visit_date IS NOT NULL
ORDER BY p.last_visit_date
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
place_id, input_text, use_count, url, title, last_visit = row
try:
unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit)
except TimestampValidationError:
continue
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Address Bar Input',
'message': f"Typed in address bar: {input_text}",
'data_type': 'browser:addressbar:input',
'browser': browser_name,
'input_text': input_text or "",
'matched_url': url or "",
'matched_title': title or "",
'use_count': use_count or 0
})
return rows
def extract_gecko_keywords(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract custom search keywords from Gecko database."""
if not table_exists(conn, 'moz_keywords'):
return []
cursor = conn.cursor()
# Check if dateAdded column exists (not in all Firefox versions)
if not column_exists(conn, 'moz_keywords', 'dateAdded'):
return []
query = """
SELECT
k.id,
k.keyword,
k.dateAdded,
p.url,
p.title
FROM moz_keywords k
LEFT JOIN moz_places p ON k.place_id = p.id
WHERE k.dateAdded IS NOT NULL
ORDER BY k.dateAdded
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
keyword_id, keyword, date_added, url, title = row
try:
added_us, added_iso = convert_gecko_timestamp(date_added)
except TimestampValidationError:
continue
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Keyword Added',
'message': f"Added search keyword: {keyword}",
'data_type': 'browser:keyword:added',
'browser': browser_name,
'keyword_id': keyword_id,
'keyword': keyword or "",
'search_url': url or "",
'title': title or ""
})
return rows
def extract_gecko_origins(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract origin (domain) tracking data from Gecko database."""
if not table_exists(conn, 'moz_origins'):
return []
cursor = conn.cursor()
# Check for last_visit_date column
if not column_exists(conn, 'moz_origins', 'last_visit_date'):
return []
query = """
SELECT
id,
prefix,
host,
frecency,
last_visit_date
FROM moz_origins
WHERE last_visit_date IS NOT NULL
ORDER BY last_visit_date
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
origin_id, prefix, host, frecency, last_visit = row
try:
unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit)
except TimestampValidationError:
continue
full_origin = f"{prefix}{host}" if prefix else host
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Domain Visited',
'message': f"Visited domain: {full_origin}",
'data_type': 'browser:domain:visit',
'browser': browser_name,
'origin': full_origin or "",
'host': host or "",
'prefix': prefix or "",
'frecency_score': frecency or 0
})
return rows
# ============================================================================
# WEBKIT/SAFARI EXTRACTORS
# ============================================================================
def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract visit events from WebKit database with resolved redirect chains."""
cursor = conn.cursor()
query = """
SELECT
hv.visit_time,
hi.url,
hi.title,
hv.title as visit_title,
hv.load_successful,
hv.http_non_get,
hi.visit_count,
redirect_src_items.url as redirect_source_url,
redirect_dst_items.url as redirect_destination_url
FROM history_visits hv
JOIN history_items hi ON hv.history_item = hi.id
LEFT JOIN history_visits redirect_src ON hv.redirect_source = redirect_src.id
LEFT JOIN history_items redirect_src_items ON redirect_src.history_item = redirect_src_items.id
LEFT JOIN history_visits redirect_dst ON hv.redirect_destination = redirect_dst.id
LEFT JOIN history_items redirect_dst_items ON redirect_dst.history_item = redirect_dst_items.id
ORDER BY hv.visit_time
"""
cursor.execute(query)
results = cursor.fetchall()
rows = []
for row in results:
(webkit_timestamp, url, title, visit_title,
load_successful, http_non_get, visit_count,
redirect_source_url, redirect_destination_url) = row
try:
unix_microseconds, iso_datetime = convert_webkit_timestamp(webkit_timestamp)
except TimestampValidationError:
continue
display_title = title or visit_title or "(No title)"
message = f"Visited: {display_title}"
if not load_successful:
message += " [FAILED TO LOAD]"
if http_non_get:
message += " [POST/Form]"
# Build row with only useful fields
row_data = {
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Visit Time',
'message': message,
'data_type': 'browser:page:visit',
'browser': browser_name,
'url': url or "",
'title': display_title,
'load_successful': bool(load_successful),
'http_post': bool(http_non_get),
'total_visit_count': visit_count or 0
}
# Add redirect chain info if present
if redirect_source_url:
row_data['redirect_source_url'] = redirect_source_url
if redirect_destination_url:
row_data['redirect_destination_url'] = redirect_destination_url
rows.append(row_data)
return rows
def extract_webkit_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract bookmarks from WebKit database."""
if not table_exists(conn, 'bookmarks'):
return []
cursor = conn.cursor()
# Check for date_added column
if not column_exists(conn, 'bookmarks', 'date_added'):
return []
query = """
SELECT
id,
title,
url,
date_added,
date_last_modified
FROM bookmarks
WHERE date_added > 0
ORDER BY date_added
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
bm_id, title, url, date_added, date_modified = row
# Bookmark added
if date_added:
try:
added_us, added_iso = convert_webkit_timestamp(date_added)
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Bookmark Added',
'message': f"Bookmarked: {title or url}",
'data_type': 'browser:bookmark:added',
'browser': browser_name,
'bookmark_id': bm_id,
'bookmark_title': title or "",
'url': url or ""
})
except TimestampValidationError:
pass
# Bookmark modified (if different)
if date_modified and date_modified != date_added:
try:
modified_us, modified_iso = convert_webkit_timestamp(date_modified)
rows.append({
'timestamp': modified_us,
'datetime': modified_iso,
'timestamp_desc': 'Bookmark Modified',
'message': f"Modified bookmark: {title or url}",
'data_type': 'browser:bookmark:modified',
'browser': browser_name,
'bookmark_id': bm_id,
'bookmark_title': title or "",
'url': url or ""
})
except TimestampValidationError:
pass
return rows
def extract_webkit_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract downloads from WebKit database."""
if not table_exists(conn, 'downloads'):
return []
cursor = conn.cursor()
# Check for date_started column
if not column_exists(conn, 'downloads', 'date_started'):
return []
query = """
SELECT
id,
url,
path,
mime_type,
bytes_received,
total_bytes,
date_started,
date_finished
FROM downloads
WHERE date_started > 0
ORDER BY date_started
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
dl_id, url, path, mime_type, bytes_received, total_bytes, date_started, date_finished = row
filename = Path(path).name if path else "(unknown)"
# Download started
if date_started:
try:
started_us, started_iso = convert_webkit_timestamp(date_started)
rows.append({
'timestamp': started_us,
'datetime': started_iso,
'timestamp_desc': 'Download Started',
'message': f"Download started: {filename} ({mime_type or 'unknown type'})",
'data_type': 'browser:download:start',
'browser': browser_name,
'download_id': dl_id,
'filename': filename,
'source_url': url or "",
'file_path': path or "",
'file_size_bytes': total_bytes or 0,
'mime_type': mime_type or ""
})
except TimestampValidationError:
pass
# Download finished (if different)
if date_finished and date_finished != date_started:
try:
finished_us, finished_iso = convert_webkit_timestamp(date_finished)
duration_seconds = (finished_us - started_us) / 1000000 if date_started else 0
rows.append({
'timestamp': finished_us,
'datetime': finished_iso,
'timestamp_desc': 'Download Completed',
'message': f"Download completed: {filename} ({bytes_received or 0} bytes in {duration_seconds:.1f}s)",
'data_type': 'browser:download:complete',
'browser': browser_name,
'download_id': dl_id,
'filename': filename,
'file_path': path or "",
'file_size_bytes': bytes_received or 0,
'mime_type': mime_type or "",
'download_duration_seconds': duration_seconds
})
except TimestampValidationError:
pass
return rows
def extract_webkit_reading_list(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract Reading List items from WebKit database."""
if not table_exists(conn, 'reading_list'):
return []
cursor = conn.cursor()
# Check for date_added column
if not column_exists(conn, 'reading_list', 'date_added'):
return []
query = """
SELECT
id,
title,
url,
date_added,
date_last_viewed
FROM reading_list
WHERE date_added > 0
ORDER BY date_added
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
item_id, title, url, date_added, date_viewed = row
# Reading list item added
if date_added:
try:
added_us, added_iso = convert_webkit_timestamp(date_added)
rows.append({
'timestamp': added_us,
'datetime': added_iso,
'timestamp_desc': 'Reading List Item Added',
'message': f"Added to Reading List: {title or url}",
'data_type': 'browser:readinglist:added',
'browser': browser_name,
'reading_list_id': item_id,
'title': title or "",
'url': url or ""
})
except TimestampValidationError:
pass
# Reading list item viewed (if present)
if date_viewed and date_viewed > 0:
try:
viewed_us, viewed_iso = convert_webkit_timestamp(date_viewed)
rows.append({
'timestamp': viewed_us,
'datetime': viewed_iso,
'timestamp_desc': 'Reading List Item Viewed',
'message': f"Viewed Reading List item: {title or url}",
'data_type': 'browser:readinglist:viewed',
'browser': browser_name,
'reading_list_id': item_id,
'title': title or "",
'url': url or ""
})
except TimestampValidationError:
pass
return rows
def extract_webkit_top_sites(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
"""Extract Top Sites data from WebKit database."""
if not table_exists(conn, 'top_sites'):
return []
cursor = conn.cursor()
# Check for last_visited column
if not column_exists(conn, 'top_sites', 'last_visited'):
return []
query = """
SELECT
id,
url,
title,
visit_count,
last_visited
FROM top_sites
WHERE last_visited > 0
ORDER BY last_visited
"""
try:
cursor.execute(query)
results = cursor.fetchall()
except sqlite3.Error:
return []
rows = []
for row in results:
site_id, url, title, visit_count, last_visited = row
try:
unix_microseconds, iso_datetime = convert_webkit_timestamp(last_visited)
except TimestampValidationError:
continue
rows.append({
'timestamp': unix_microseconds,
'datetime': iso_datetime,
'timestamp_desc': 'Top Site Last Visited',
'message': f"Top site visited: {title or url}",
'data_type': 'browser:topsite:visit',
'browser': browser_name,
'site_id': site_id,
'url': url or "",
'title': title or "",
'visit_count': visit_count or 0
})
return rows
# ============================================================================
# MAIN EXTRACTION ORCHESTRATION
# ============================================================================
def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Dict[str, int]]:
"""
Extract ALL timeline events from browser database.
Returns:
Tuple of (all_rows, event_counts dictionary)
"""
if browser_name is None:
browser_name = {'gecko': 'Firefox', 'chromium': 'Chromium', 'webkit': 'Safari'}[browser_type]
conn = connect_database_readonly(db_path)
all_rows = []
event_counts = {}
print(f"Extracting events from {browser_name} database...")
print("=" * 60)
try:
if browser_type == 'gecko':
# Firefox/Gecko - extract all event types
extractors = [
('Page Visits', extract_gecko_visits),
('Bookmarks', extract_gecko_bookmarks),
('Downloads', extract_gecko_downloads),
('Form History', extract_gecko_form_history),
('Annotations', extract_gecko_annotations),
('Page Engagement', extract_gecko_metadata),
('Address Bar Input', extract_gecko_input_history),
('Search Keywords', extract_gecko_keywords),
('Domain Tracking', extract_gecko_origins),
]
elif browser_type == 'chromium':
# Chromium - extract all event types
extractors = [
('Page Visits', extract_chromium_visits),
('Downloads', extract_chromium_downloads),
('Search Queries', extract_chromium_search_terms),
('Form Autofill', extract_chromium_autofill),
('Favicons', extract_chromium_favicons),
('Media Playback', extract_chromium_media_history),
('Site Engagement', extract_chromium_site_engagement),
]
elif browser_type == 'webkit':
# Safari/WebKit - extract all event types
extractors = [
('Page Visits', extract_webkit_visits),
('Bookmarks', extract_webkit_bookmarks),
('Downloads', extract_webkit_downloads),
('Reading List', extract_webkit_reading_list),
('Top Sites', extract_webkit_top_sites),
]
# Run all extractors
for name, extractor_func in extractors:
try:
events = extractor_func(conn, browser_name)
all_rows.extend(events)
event_counts[name] = len(events)
print(f"{name:25} {len(events):>7,} events")
except Exception as e:
print(f"{name:25} Error: {e}")
event_counts[name] = 0
finally:
conn.close()
# Sort all events by timestamp
all_rows.sort(key=lambda x: x['timestamp'])
print("=" * 60)
return all_rows, event_counts
def generate_default_output_filename(browser_type: str, input_path: str) -> str:
"""Generate a sensible default output filename based on browser type and input."""
path_lower = input_path.lower()
browser_names = {
'firefox': 'firefox', 'chrome': 'chrome', 'edge': 'edge',
'brave': 'brave', 'opera': 'opera', 'vivaldi': 'vivaldi', 'safari': 'safari',
}
detected_name = None
for name_key, name_value in browser_names.items():
if name_key in path_lower:
detected_name = name_value
break
if detected_name:
return f"{detected_name}_timeline_timesketch.csv"
else:
return f"{browser_type}_timeline_timesketch.csv"
def main() -> int:
parser = argparse.ArgumentParser(
description='Convert ALL browser events to Timesketch CSV format',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Extracts ALL timestamped events from browser databases:
CHROMIUM (Chrome, Edge, Brave, Opera, Vivaldi):
- Page visits (with metadata and navigation chains)
- Downloads (start, completion, access times)
- Search queries
- Form autofill data (first use, last use)
- Favicon updates
- Media playback history
- Site engagement scores
GECKO/FIREFOX:
- Page visits (with metadata and navigation chains)
- Bookmarks (added, modified)
- Downloads
- Form history (first use, last use)
- Page and bookmark annotations
- Page engagement metrics (view time, scrolling, typing)
- Address bar input history
- Custom search keywords
- Domain-level tracking data
WEBKIT/SAFARI:
- Page visits (with redirect chains)
- Bookmarks (added, modified)
- Downloads (start, completion)
- Reading List items (added, viewed)
- Top Sites tracking
Browser types:
gecko, firefox - Gecko-based browsers (Firefox)
chromium - Chromium-based browsers (Chrome, Edge, Brave, etc.)
webkit, safari - WebKit-based browsers (Safari)
auto - Auto-detect browser type (default)
Output Format:
- Browser-agnostic Timesketch-compatible CSV
- Consistent event naming across all browsers
- All timestamps in Unix microseconds + ISO 8601
Example usage:
# Auto-detect and extract everything
python browser2timesketch_complete.py -i /path/to/History
# Specify browser and output
python browser2timesketch_complete.py -b firefox -i places.sqlite -o output.csv
# Custom browser name (e.g., for Brave, Edge)
python browser2timesketch_complete.py --browser-name "Brave" -i History
"""
)
parser.add_argument(
'-b', '--browser',
choices=['gecko', 'firefox', 'chromium', 'webkit', 'safari', 'auto'],
default='auto',
help='Browser engine type (default: auto-detect)'
)
parser.add_argument(
'-i', '--input',
required=True,
help='Path to browser history database'
)
parser.add_argument(
'-o', '--output',
help='Output CSV file path (default: auto-generated)'
)
parser.add_argument(
'--browser-name',
help='Custom browser name for the browser field (e.g., "Brave", "Edge")'
)
args = parser.parse_args()
try:
# Validate database file
print(f"Validating database: {args.input}")
validate_sqlite_database(args.input)
print("✓ Database is valid SQLite file\n")
# Detect or validate browser type
browser_type = args.browser.lower()
if browser_type == 'auto':
print("Auto-detecting browser type...")
browser_type = detect_browser_type(args.input)
print(f"✓ Detected browser type: {browser_type}\n")
else:
if browser_type == 'firefox':
browser_type = 'gecko'
elif browser_type == 'safari':
browser_type = 'webkit'
detected_type = detect_browser_type(args.input)
if detected_type != browser_type:
print(f"Warning: You specified '{args.browser}' but database appears to be '{detected_type}'",
file=sys.stderr)
response = input("Continue anyway? [y/N]: ")
if response.lower() != 'y':
return 1
# Generate output filename if not provided
if args.output:
output_csv = args.output
else:
output_csv = generate_default_output_filename(browser_type, args.input)
print(f"Using output filename: {output_csv}\n")
# Extract all events
all_rows, event_counts = extract_all_events(args.input, browser_type, args.browser_name)
if not all_rows:
print("\n❌ No events found in database!")
return 1
# Write to CSV
print(f"\nWriting {len(all_rows):,} total events to CSV...")
write_timesketch_csv(output_csv, all_rows)
# Summary
print("\n" + "=" * 60)
print("EXTRACTION COMPLETE")
print("=" * 60)
print(f"Total events: {len(all_rows):,}")
print("\nEvent breakdown:")
for event_type, count in sorted(event_counts.items()):
if count > 0:
print(f"{event_type:25} {count:>7,} events")
print(f"\n✓ Output saved to: {output_csv}")
print(f"✓ Format: Browser-agnostic Timesketch CSV")
print("=" * 60)
return 0
except DatabaseValidationError as e:
print(f"\n❌ Database Validation Error: {e}", file=sys.stderr)
return 1
except BrowserDetectionError as e:
print(f"\n❌ Browser Detection Error: {e}", file=sys.stderr)
return 1
except sqlite3.Error as e:
print(f"\n❌ Database Error: {e}", file=sys.stderr)
return 1
except KeyboardInterrupt:
print("\n\n⚠️ Operation cancelled by user", file=sys.stderr)
return 130
except Exception as e:
print(f"\n❌ Unexpected Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())