2107 lines
69 KiB
Python
Executable File
2107 lines
69 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Browser History to Timesketch CSV Converter - Comprehensive Edition
|
|
|
|
Extracts ALL timestamped browser events to Timesketch-compatible CSV format.
|
|
Browser-agnostic output with consistent event naming across all browsers.
|
|
|
|
Supports:
|
|
- Firefox/Gecko: visits, bookmarks, downloads, form history, annotations,
|
|
page metadata, input history, keywords, origins
|
|
- Chrome/Chromium/Edge/Brave: visits, downloads, searches, autofill,
|
|
favicons, media history, site engagement
|
|
- Safari/WebKit: visits, bookmarks, downloads, reading list, top sites
|
|
|
|
Output format: Timesketch-compatible CSV with browser-agnostic event types
|
|
"""
|
|
|
|
import sqlite3
|
|
import csv
|
|
import argparse
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Tuple, Optional, Dict, Any, List
|
|
|
|
|
|
class BrowserDetectionError(Exception):
|
|
"""Raised when browser type cannot be detected"""
|
|
pass
|
|
|
|
|
|
class DatabaseValidationError(Exception):
|
|
"""Raised when database validation fails"""
|
|
pass
|
|
|
|
|
|
class TimestampValidationError(Exception):
|
|
"""Raised when timestamp validation fails"""
|
|
pass
|
|
|
|
|
|
def validate_sqlite_database(db_path: str) -> None:
|
|
"""
|
|
Validate that the file is a SQLite database and is accessible.
|
|
|
|
Args:
|
|
db_path: Path to database file
|
|
|
|
Raises:
|
|
DatabaseValidationError: If validation fails
|
|
"""
|
|
path = Path(db_path)
|
|
|
|
if not path.exists():
|
|
raise DatabaseValidationError(f"Database file not found: {db_path}")
|
|
|
|
if not path.is_file():
|
|
raise DatabaseValidationError(f"Path is not a file: {db_path}")
|
|
|
|
# Try to open as SQLite database
|
|
try:
|
|
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' LIMIT 1")
|
|
conn.close()
|
|
except sqlite3.DatabaseError as e:
|
|
raise DatabaseValidationError(f"Not a valid SQLite database: {db_path}. Error: {e}")
|
|
except sqlite3.OperationalError as e:
|
|
raise DatabaseValidationError(f"Cannot access database (may be locked or corrupted): {db_path}. Error: {e}")
|
|
|
|
|
|
def detect_browser_type(db_path: str) -> str:
|
|
"""
|
|
Auto-detect browser type by examining database schema.
|
|
|
|
Args:
|
|
db_path: Path to database file
|
|
|
|
Returns:
|
|
Detected browser type: 'gecko', 'chromium', or 'webkit'
|
|
|
|
Raises:
|
|
BrowserDetectionError: If browser type cannot be determined
|
|
"""
|
|
try:
|
|
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = {row[0] for row in cursor.fetchall()}
|
|
|
|
conn.close()
|
|
|
|
if 'moz_historyvisits' in tables and 'moz_places' in tables:
|
|
return 'gecko'
|
|
|
|
if 'visits' in tables and 'urls' in tables:
|
|
return 'chromium'
|
|
|
|
if 'history_visits' in tables and 'history_items' in tables:
|
|
return 'webkit'
|
|
|
|
raise BrowserDetectionError(
|
|
f"Cannot determine browser type. Found tables: {', '.join(sorted(tables))}"
|
|
)
|
|
|
|
except sqlite3.Error as e:
|
|
raise BrowserDetectionError(f"Error reading database schema: {e}")
|
|
|
|
|
|
def validate_timestamp(unix_microseconds: int, browser_type: str) -> None:
|
|
"""
|
|
Validate that a timestamp is within reasonable bounds.
|
|
|
|
Args:
|
|
unix_microseconds: Timestamp in Unix microseconds
|
|
browser_type: Browser type for error messages
|
|
|
|
Raises:
|
|
TimestampValidationError: If timestamp is unreasonable
|
|
"""
|
|
if unix_microseconds <= 0:
|
|
return
|
|
|
|
timestamp_seconds = unix_microseconds / 1000000
|
|
|
|
min_date = datetime(1990, 1, 1)
|
|
max_date = datetime(2040, 1, 1)
|
|
min_seconds = min_date.timestamp()
|
|
max_seconds = max_date.timestamp()
|
|
|
|
if timestamp_seconds < min_seconds:
|
|
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
|
|
raise TimestampValidationError(
|
|
f"Timestamp appears too old: {dt.strftime('%Y-%m-%d %H:%M:%S')} (before 1990). "
|
|
f"This may indicate a timestamp conversion error for {browser_type}."
|
|
)
|
|
|
|
if timestamp_seconds > max_seconds:
|
|
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
|
|
raise TimestampValidationError(
|
|
f"Timestamp appears to be in the future: {dt.strftime('%Y-%m-%d %H:%M:%S')} (after 2040). "
|
|
f"This may indicate a timestamp conversion error for {browser_type}."
|
|
)
|
|
|
|
|
|
def convert_gecko_timestamp(gecko_timestamp: Optional[int]) -> Tuple[int, str]:
|
|
"""Convert Gecko/Firefox timestamp to Unix microseconds and ISO format."""
|
|
if gecko_timestamp is None or gecko_timestamp == 0:
|
|
return 0, ""
|
|
|
|
validate_timestamp(gecko_timestamp, "Gecko/Firefox")
|
|
|
|
timestamp_seconds = gecko_timestamp / 1000000
|
|
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
|
|
return gecko_timestamp, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
|
|
|
|
|
|
def convert_chromium_timestamp(chromium_timestamp: Optional[int]) -> Tuple[int, str]:
|
|
"""Convert Chromium timestamp to Unix microseconds and ISO format."""
|
|
if chromium_timestamp is None or chromium_timestamp == 0:
|
|
return 0, ""
|
|
|
|
chromium_epoch_offset = 11644473600
|
|
timestamp_seconds = (chromium_timestamp / 1000000) - chromium_epoch_offset
|
|
unix_microseconds = int(timestamp_seconds * 1000000)
|
|
|
|
validate_timestamp(unix_microseconds, "Chromium")
|
|
|
|
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
|
|
return unix_microseconds, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
|
|
|
|
|
|
def convert_webkit_timestamp(webkit_timestamp: Optional[float]) -> Tuple[int, str]:
|
|
"""Convert WebKit/Safari timestamp to Unix microseconds and ISO format."""
|
|
if webkit_timestamp is None or webkit_timestamp == 0:
|
|
return 0, ""
|
|
|
|
webkit_epoch_offset = 978307200
|
|
timestamp_seconds = webkit_timestamp + webkit_epoch_offset
|
|
unix_microseconds = int(timestamp_seconds * 1000000)
|
|
|
|
validate_timestamp(unix_microseconds, "WebKit/Safari")
|
|
|
|
dt = datetime.fromtimestamp(timestamp_seconds, tz=timezone.utc)
|
|
return unix_microseconds, dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
|
|
|
|
|
|
def write_timesketch_csv(output_csv: str, rows: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Write history data to Timesketch-compatible CSV format with dynamic fields.
|
|
|
|
Args:
|
|
output_csv: Path to output CSV file
|
|
rows: List of row dictionaries to write
|
|
"""
|
|
if not rows:
|
|
return
|
|
|
|
# Collect all unique fields from all rows
|
|
all_fields = set()
|
|
for row in rows:
|
|
all_fields.update(row.keys())
|
|
|
|
# Define standard field order (these come first)
|
|
standard_fields = ['timestamp', 'datetime', 'timestamp_desc', 'message', 'data_type']
|
|
|
|
# Build fieldnames list with standard fields first, then alphabetically sorted remainder
|
|
fieldnames = [f for f in standard_fields if f in all_fields]
|
|
remaining_fields = sorted(all_fields - set(standard_fields))
|
|
fieldnames.extend(remaining_fields)
|
|
|
|
with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, extrasaction='ignore')
|
|
writer.writeheader()
|
|
|
|
for row in rows:
|
|
writer.writerow(row)
|
|
|
|
|
|
def connect_database_readonly(db_path: str) -> sqlite3.Connection:
|
|
"""Connect to database in read-only mode to avoid lock issues."""
|
|
try:
|
|
conn = sqlite3.connect(f'file:{db_path}?mode=ro', uri=True)
|
|
return conn
|
|
except sqlite3.OperationalError as e:
|
|
raise sqlite3.OperationalError(
|
|
f"Cannot open database (it may be locked by the browser): {db_path}\n"
|
|
f"Please close the browser and try again, or copy the database file.\n"
|
|
f"Original error: {e}"
|
|
)
|
|
|
|
|
|
def table_exists(conn: sqlite3.Connection, table_name: str) -> bool:
|
|
"""Check if a table exists in the database."""
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
|
|
(table_name,)
|
|
)
|
|
return cursor.fetchone() is not None
|
|
|
|
|
|
def column_exists(conn: sqlite3.Connection, table_name: str, column_name: str) -> bool:
|
|
"""Check if a column exists in a table."""
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(f"PRAGMA table_info({table_name})")
|
|
columns = {row[1] for row in cursor.fetchall()}
|
|
return column_name in columns
|
|
except sqlite3.Error:
|
|
return False
|
|
|
|
|
|
# ============================================================================
|
|
# CHROMIUM EXTRACTORS
|
|
# ============================================================================
|
|
|
|
def extract_chromium_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract visit events from Chromium database with resolved foreign keys."""
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
visits.visit_time,
|
|
urls.url,
|
|
urls.title,
|
|
visits.transition,
|
|
visits.visit_duration,
|
|
urls.visit_count,
|
|
urls.typed_count,
|
|
visits.segment_id,
|
|
visits.incremented_omnibox_typed_score,
|
|
urls.hidden,
|
|
from_urls.url as from_url,
|
|
from_urls.title as from_title,
|
|
opener_urls.url as opener_url,
|
|
opener_urls.title as opener_title
|
|
FROM visits
|
|
JOIN urls ON visits.url = urls.id
|
|
LEFT JOIN visits from_visits ON visits.from_visit = from_visits.id
|
|
LEFT JOIN urls from_urls ON from_visits.url = from_urls.id
|
|
LEFT JOIN visits opener_visits ON visits.opener_visit = opener_visits.id
|
|
LEFT JOIN urls opener_urls ON opener_visits.url = opener_urls.id
|
|
ORDER BY visits.visit_time
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
transition_types = {
|
|
0: "Link", 1: "Typed", 2: "Auto_Bookmark", 3: "Auto_Subframe",
|
|
4: "Manual_Subframe", 5: "Generated", 6: "Start_Page",
|
|
7: "Form_Submit", 8: "Reload", 9: "Keyword", 10: "Keyword_Generated"
|
|
}
|
|
|
|
rows = []
|
|
for row in results:
|
|
(chromium_timestamp, url, title, transition, visit_duration,
|
|
visit_count, typed_count, segment_id, incremented_typed, hidden,
|
|
from_url, from_title, opener_url, opener_title) = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_chromium_timestamp(chromium_timestamp)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
core_transition = transition & 0xFF
|
|
transition_name = transition_types.get(core_transition, f"Unknown({core_transition})")
|
|
|
|
# Build row with only useful fields
|
|
row_data = {
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Visit Time',
|
|
'message': f"Visited: {title or '(No title)'}",
|
|
'data_type': 'browser:page:visit',
|
|
'browser': browser_name,
|
|
'url': url or "",
|
|
'title': title or "(No title)",
|
|
'visit_type': transition_name,
|
|
'visit_duration_us': visit_duration or 0,
|
|
'total_visits': visit_count or 0,
|
|
'typed_count': typed_count or 0,
|
|
'typed_in_omnibox': bool(incremented_typed),
|
|
'hidden': bool(hidden)
|
|
}
|
|
|
|
# Add optional fields only if present
|
|
if from_url:
|
|
row_data['from_url'] = from_url
|
|
if from_title:
|
|
row_data['from_title'] = from_title
|
|
|
|
if opener_url:
|
|
row_data['opener_url'] = opener_url
|
|
if opener_title:
|
|
row_data['opener_title'] = opener_title
|
|
|
|
# Add session ID only if non-zero
|
|
if segment_id and segment_id != 0:
|
|
row_data['session_id'] = segment_id
|
|
|
|
rows.append(row_data)
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract download events from Chromium database."""
|
|
if not table_exists(conn, 'downloads'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
guid,
|
|
current_path,
|
|
target_path,
|
|
start_time,
|
|
received_bytes,
|
|
total_bytes,
|
|
state,
|
|
danger_type,
|
|
interrupt_reason,
|
|
end_time,
|
|
opened,
|
|
last_access_time,
|
|
referrer,
|
|
tab_url,
|
|
mime_type
|
|
FROM downloads
|
|
ORDER BY start_time
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
download_states = {
|
|
0: "In Progress",
|
|
1: "Complete",
|
|
2: "Cancelled",
|
|
3: "Interrupted",
|
|
4: "Dangerous"
|
|
}
|
|
|
|
rows = []
|
|
for row in results:
|
|
(dl_id, guid, current_path, target_path, start_time, received_bytes,
|
|
total_bytes, state, danger_type, interrupt_reason, end_time, opened,
|
|
last_access_time, referrer, tab_url, mime_type) = row
|
|
|
|
try:
|
|
start_us, start_iso = convert_chromium_timestamp(start_time)
|
|
end_us, end_iso = convert_chromium_timestamp(end_time) if end_time else (0, "")
|
|
access_us, access_iso = convert_chromium_timestamp(last_access_time) if last_access_time else (0, "")
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
state_name = download_states.get(state, f"Unknown({state})")
|
|
filename = Path(target_path).name if target_path else "(unknown)"
|
|
|
|
# Download start event
|
|
rows.append({
|
|
'timestamp': start_us,
|
|
'datetime': start_iso,
|
|
'timestamp_desc': 'Download Started',
|
|
'message': f"Download started: {filename} ({mime_type or 'unknown type'})",
|
|
'data_type': 'browser:download:start',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': filename,
|
|
'file_path': target_path or "",
|
|
'file_size_bytes': total_bytes or 0,
|
|
'mime_type': mime_type or "",
|
|
'download_state': state_name,
|
|
'referrer_url': referrer or "",
|
|
'tab_url': tab_url or "",
|
|
'dangerous': bool(danger_type),
|
|
'interrupted': bool(interrupt_reason)
|
|
})
|
|
|
|
# Download complete event (if completed)
|
|
if end_time and end_time != start_time:
|
|
duration_seconds = (end_us - start_us) / 1000000
|
|
rows.append({
|
|
'timestamp': end_us,
|
|
'datetime': end_iso,
|
|
'timestamp_desc': 'Download Completed',
|
|
'message': f"Download completed: {filename} ({received_bytes or 0} bytes in {duration_seconds:.1f}s)",
|
|
'data_type': 'browser:download:complete',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': filename,
|
|
'file_path': target_path or "",
|
|
'file_size_bytes': received_bytes or 0,
|
|
'mime_type': mime_type or "",
|
|
'download_state': state_name,
|
|
'download_duration_seconds': duration_seconds
|
|
})
|
|
|
|
# Last access event (if different from completion)
|
|
if last_access_time and last_access_time != end_time and last_access_time != start_time:
|
|
rows.append({
|
|
'timestamp': access_us,
|
|
'datetime': access_iso,
|
|
'timestamp_desc': 'File Accessed',
|
|
'message': f"Downloaded file accessed: {filename}",
|
|
'data_type': 'browser:download:accessed',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': filename,
|
|
'file_path': target_path or ""
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_search_terms(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract search terms from Chromium database."""
|
|
if not table_exists(conn, 'keyword_search_terms'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
kst.term,
|
|
kst.normalized_term,
|
|
u.url,
|
|
u.title,
|
|
u.last_visit_time
|
|
FROM keyword_search_terms kst
|
|
JOIN urls u ON kst.url_id = u.id
|
|
ORDER BY u.last_visit_time
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
term, normalized_term, url, title, last_visit = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_chromium_timestamp(last_visit)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Search Performed',
|
|
'message': f"Search: {term}",
|
|
'data_type': 'browser:search:query',
|
|
'browser': browser_name,
|
|
'search_term': term,
|
|
'normalized_search_term': normalized_term,
|
|
'search_url': url or "",
|
|
'search_page_title': title or ""
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_autofill(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract autofill/form data from Chromium database."""
|
|
if not table_exists(conn, 'autofill'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check which timestamp columns exist
|
|
has_created = column_exists(conn, 'autofill', 'date_created')
|
|
has_last_used = column_exists(conn, 'autofill', 'date_last_used')
|
|
|
|
if not (has_created or has_last_used):
|
|
return []
|
|
|
|
# Build query based on available columns
|
|
timestamp_cols = []
|
|
if has_created:
|
|
timestamp_cols.append('date_created')
|
|
if has_last_used:
|
|
timestamp_cols.append('date_last_used')
|
|
|
|
query = f"""
|
|
SELECT
|
|
name,
|
|
value,
|
|
{', '.join(timestamp_cols)},
|
|
count
|
|
FROM autofill
|
|
WHERE {' OR '.join([f'{col} > 0' for col in timestamp_cols])}
|
|
ORDER BY {timestamp_cols[0]}
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
name, value, *timestamps, count = row
|
|
date_created = timestamps[0] if has_created else None
|
|
date_last_used = timestamps[1] if has_last_used and len(timestamps) > 1 else timestamps[0] if not has_created else None
|
|
|
|
# Form field created/first used
|
|
if date_created:
|
|
try:
|
|
created_us, created_iso = convert_chromium_timestamp(date_created)
|
|
rows.append({
|
|
'timestamp': created_us,
|
|
'datetime': created_iso,
|
|
'timestamp_desc': 'Form Field First Used',
|
|
'message': f"First use of form field: {name}",
|
|
'data_type': 'browser:form:first_use',
|
|
'browser': browser_name,
|
|
'form_field_name': name,
|
|
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
|
|
'total_uses': count or 0
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Form field last used (if different)
|
|
if date_last_used and date_last_used != date_created:
|
|
try:
|
|
last_us, last_iso = convert_chromium_timestamp(date_last_used)
|
|
rows.append({
|
|
'timestamp': last_us,
|
|
'datetime': last_iso,
|
|
'timestamp_desc': 'Form Field Last Used',
|
|
'message': f"Used form field: {name}",
|
|
'data_type': 'browser:form:use',
|
|
'browser': browser_name,
|
|
'form_field_name': name,
|
|
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
|
|
'total_uses': count or 0
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_favicons(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract favicon mapping timestamps from Chromium database."""
|
|
if not table_exists(conn, 'icon_mapping'):
|
|
return []
|
|
|
|
# Check if last_updated column exists (not in all Chromium versions)
|
|
if not column_exists(conn, 'icon_mapping', 'last_updated'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
im.last_updated,
|
|
im.page_url,
|
|
f.url as favicon_url
|
|
FROM icon_mapping im
|
|
LEFT JOIN favicons f ON im.icon_id = f.id
|
|
WHERE im.last_updated > 0
|
|
ORDER BY im.last_updated
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
last_updated, page_url, favicon_url = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_chromium_timestamp(last_updated)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Favicon Updated',
|
|
'message': f"Updated favicon for: {page_url}",
|
|
'data_type': 'browser:favicon:update',
|
|
'browser': browser_name,
|
|
'page_url': page_url or "",
|
|
'favicon_url': favicon_url or ""
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_media_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract media playback history from Chromium database."""
|
|
if not table_exists(conn, 'playback'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check available columns
|
|
has_last_updated = column_exists(conn, 'playback', 'last_updated_time_s')
|
|
|
|
if not has_last_updated:
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
p.url,
|
|
p.watch_time_s,
|
|
p.has_audio,
|
|
p.has_video,
|
|
p.last_updated_time_s
|
|
FROM playback p
|
|
WHERE p.last_updated_time_s > 0
|
|
ORDER BY p.last_updated_time_s
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
url, watch_time, has_audio, has_video, last_updated = row
|
|
|
|
# Convert Unix seconds to microseconds for consistency
|
|
unix_microseconds = int(last_updated * 1000000)
|
|
|
|
try:
|
|
validate_timestamp(unix_microseconds, "Chromium Media")
|
|
dt = datetime.fromtimestamp(last_updated, tz=timezone.utc)
|
|
iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
media_type = []
|
|
if has_audio:
|
|
media_type.append("audio")
|
|
if has_video:
|
|
media_type.append("video")
|
|
media_type_str = "+".join(media_type) if media_type else "unknown"
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Media Playback',
|
|
'message': f"Played {media_type_str}: {url} ({watch_time:.1f}s)",
|
|
'data_type': 'browser:media:playback',
|
|
'browser': browser_name,
|
|
'media_url': url or "",
|
|
'watch_time_seconds': watch_time or 0,
|
|
'has_audio': bool(has_audio),
|
|
'has_video': bool(has_video)
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_chromium_site_engagement(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract site engagement scores from Chromium database."""
|
|
if not table_exists(conn, 'site_engagement'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for timestamp column
|
|
has_last_engagement = column_exists(conn, 'site_engagement', 'last_engagement_time')
|
|
|
|
if not has_last_engagement:
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
origin_url,
|
|
score,
|
|
last_engagement_time
|
|
FROM site_engagement
|
|
WHERE last_engagement_time > 0 AND score > 0
|
|
ORDER BY last_engagement_time
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
origin_url, score, last_engagement = row
|
|
|
|
# Convert internal timestamp format (typically Unix seconds)
|
|
unix_microseconds = int(last_engagement * 1000000)
|
|
|
|
try:
|
|
validate_timestamp(unix_microseconds, "Chromium Site Engagement")
|
|
dt = datetime.fromtimestamp(last_engagement, tz=timezone.utc)
|
|
iso_datetime = dt.strftime('%Y-%m-%dT%H:%M:%S+00:00')
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Site Engagement Updated',
|
|
'message': f"Site engagement: {origin_url} (score: {score:.1f})",
|
|
'data_type': 'browser:engagement:update',
|
|
'browser': browser_name,
|
|
'site_url': origin_url or "",
|
|
'engagement_score': score or 0
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
# ============================================================================
|
|
# GECKO/FIREFOX EXTRACTORS
|
|
# ============================================================================
|
|
|
|
def extract_gecko_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract visit events from Gecko database with resolved foreign keys."""
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
v.visit_date,
|
|
p.url,
|
|
p.title,
|
|
p.description,
|
|
v.visit_type,
|
|
v.session,
|
|
p.visit_count,
|
|
p.typed,
|
|
p.frecency,
|
|
p.hidden,
|
|
p.rev_host,
|
|
prev_p.url as from_url,
|
|
prev_p.title as from_title
|
|
FROM moz_historyvisits v
|
|
JOIN moz_places p ON v.place_id = p.id
|
|
LEFT JOIN moz_historyvisits prev_v ON v.from_visit = prev_v.id
|
|
LEFT JOIN moz_places prev_p ON prev_v.place_id = prev_p.id
|
|
ORDER BY v.visit_date
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
visit_types = {
|
|
1: "Link", 2: "Typed", 3: "Bookmark", 4: "Embed",
|
|
5: "Redirect_Permanent", 6: "Redirect_Temporary",
|
|
7: "Download", 8: "Framed_Link", 9: "Reload"
|
|
}
|
|
|
|
rows = []
|
|
for row in results:
|
|
(timestamp_us, url, title, description, visit_type_id,
|
|
session, visit_count, typed, frecency, hidden, rev_host,
|
|
from_url, from_title) = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_gecko_timestamp(timestamp_us)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
visit_type_name = visit_types.get(visit_type_id, f"Unknown({visit_type_id})")
|
|
|
|
message = f"Visited: {title or '(No title)'}"
|
|
if description:
|
|
message += f" - {description}"
|
|
|
|
# Build row with only useful fields
|
|
row_data = {
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Visit Time',
|
|
'message': message,
|
|
'data_type': 'browser:page:visit',
|
|
'browser': browser_name,
|
|
'url': url or "",
|
|
'title': title or "(No title)",
|
|
'visit_type': visit_type_name,
|
|
'total_visit_count': visit_count or 0,
|
|
'typed_count': typed or 0,
|
|
'frecency_score': frecency,
|
|
'hidden': bool(hidden),
|
|
'domain': rev_host[::-1] if rev_host else ""
|
|
}
|
|
|
|
# Add optional fields only if present
|
|
if description:
|
|
row_data['description'] = description
|
|
|
|
# Add navigation chain info if present
|
|
if from_url:
|
|
row_data['from_url'] = from_url
|
|
if from_title:
|
|
row_data['from_title'] = from_title
|
|
|
|
# Add session ID only if non-zero
|
|
if session and session != 0:
|
|
row_data['session_id'] = session
|
|
|
|
rows.append(row_data)
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract bookmark events from Gecko database."""
|
|
if not table_exists(conn, 'moz_bookmarks'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
b.id,
|
|
b.type,
|
|
b.title,
|
|
b.dateAdded,
|
|
b.lastModified,
|
|
p.url,
|
|
p.title as page_title,
|
|
b.parent,
|
|
b.position
|
|
FROM moz_bookmarks b
|
|
LEFT JOIN moz_places p ON b.fk = p.id
|
|
WHERE b.dateAdded IS NOT NULL
|
|
ORDER BY b.dateAdded
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
bookmark_types = {
|
|
1: "Bookmark",
|
|
2: "Folder",
|
|
3: "Separator"
|
|
}
|
|
|
|
rows = []
|
|
for row in results:
|
|
(bm_id, bm_type, title, date_added, last_modified,
|
|
url, page_title, parent, position) = row
|
|
|
|
try:
|
|
added_us, added_iso = convert_gecko_timestamp(date_added)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
type_name = bookmark_types.get(bm_type, f"Unknown({bm_type})")
|
|
display_title = title or page_title or "(No title)"
|
|
|
|
# Bookmark added event
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Bookmark Added',
|
|
'message': f"Bookmarked: {display_title}",
|
|
'data_type': 'browser:bookmark:added',
|
|
'browser': browser_name,
|
|
'bookmark_id': bm_id,
|
|
'bookmark_type': type_name,
|
|
'bookmark_title': display_title,
|
|
'url': url or "",
|
|
'parent_folder_id': parent,
|
|
'position': position
|
|
})
|
|
|
|
# Bookmark modified event (if different from added)
|
|
if last_modified and last_modified != date_added:
|
|
try:
|
|
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
|
|
rows.append({
|
|
'timestamp': modified_us,
|
|
'datetime': modified_iso,
|
|
'timestamp_desc': 'Bookmark Modified',
|
|
'message': f"Modified bookmark: {display_title}",
|
|
'data_type': 'browser:bookmark:modified',
|
|
'browser': browser_name,
|
|
'bookmark_id': bm_id,
|
|
'bookmark_title': display_title,
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract downloads from Gecko database (older Firefox versions)."""
|
|
if not table_exists(conn, 'moz_downloads'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
name,
|
|
source,
|
|
target,
|
|
startTime,
|
|
endTime,
|
|
state,
|
|
referrer,
|
|
currBytes,
|
|
maxBytes,
|
|
mimeType
|
|
FROM moz_downloads
|
|
WHERE startTime IS NOT NULL
|
|
ORDER BY startTime
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
download_states = {
|
|
0: "Downloading",
|
|
1: "Complete",
|
|
2: "Failed",
|
|
3: "Cancelled",
|
|
4: "Paused"
|
|
}
|
|
|
|
rows = []
|
|
for row in results:
|
|
(dl_id, name, source, target, start_time, end_time, state,
|
|
referrer, curr_bytes, max_bytes, mime_type) = row
|
|
|
|
try:
|
|
start_us, start_iso = convert_gecko_timestamp(start_time)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
state_name = download_states.get(state, f"Unknown({state})")
|
|
|
|
# Download start event
|
|
rows.append({
|
|
'timestamp': start_us,
|
|
'datetime': start_iso,
|
|
'timestamp_desc': 'Download Started',
|
|
'message': f"Download started: {name} ({mime_type or 'unknown type'})",
|
|
'data_type': 'browser:download:start',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': name or "",
|
|
'source_url': source or "",
|
|
'target_path': target or "",
|
|
'file_size_bytes': max_bytes or 0,
|
|
'mime_type': mime_type or "",
|
|
'download_state': state_name,
|
|
'referrer_url': referrer or ""
|
|
})
|
|
|
|
# Download complete event
|
|
if end_time and end_time != start_time:
|
|
try:
|
|
end_us, end_iso = convert_gecko_timestamp(end_time)
|
|
duration_seconds = (end_us - start_us) / 1000000
|
|
rows.append({
|
|
'timestamp': end_us,
|
|
'datetime': end_iso,
|
|
'timestamp_desc': 'Download Completed',
|
|
'message': f"Download completed: {name} ({curr_bytes or 0} bytes in {duration_seconds:.1f}s)",
|
|
'data_type': 'browser:download:complete',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': name or "",
|
|
'file_size_bytes': curr_bytes or 0,
|
|
'mime_type': mime_type or "",
|
|
'download_state': state_name,
|
|
'download_duration_seconds': duration_seconds
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_form_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract form autofill history from Gecko database."""
|
|
if not table_exists(conn, 'moz_formhistory'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
fieldname,
|
|
value,
|
|
timesUsed,
|
|
firstUsed,
|
|
lastUsed
|
|
FROM moz_formhistory
|
|
WHERE firstUsed IS NOT NULL
|
|
ORDER BY firstUsed
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
form_id, fieldname, value, times_used, first_used, last_used = row
|
|
|
|
# First use event
|
|
if first_used:
|
|
try:
|
|
first_us, first_iso = convert_gecko_timestamp(first_used)
|
|
rows.append({
|
|
'timestamp': first_us,
|
|
'datetime': first_iso,
|
|
'timestamp_desc': 'Form Field First Used',
|
|
'message': f"First use of form field: {fieldname}",
|
|
'data_type': 'browser:form:first_use',
|
|
'browser': browser_name,
|
|
'form_id': form_id,
|
|
'form_field_name': fieldname,
|
|
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
|
|
'total_uses': times_used or 0
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Last use event (if different)
|
|
if last_used and last_used != first_used:
|
|
try:
|
|
last_us, last_iso = convert_gecko_timestamp(last_used)
|
|
rows.append({
|
|
'timestamp': last_us,
|
|
'datetime': last_iso,
|
|
'timestamp_desc': 'Form Field Last Used',
|
|
'message': f"Used form field: {fieldname}",
|
|
'data_type': 'browser:form:use',
|
|
'browser': browser_name,
|
|
'form_id': form_id,
|
|
'form_field_name': fieldname,
|
|
'form_field_value': value[:50] + '...' if len(value) > 50 else value,
|
|
'total_uses': times_used or 0
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_annotations(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract page and bookmark annotations from Gecko database."""
|
|
rows = []
|
|
|
|
# Page annotations
|
|
if table_exists(conn, 'moz_annos'):
|
|
cursor = conn.cursor()
|
|
query = """
|
|
SELECT
|
|
a.id,
|
|
a.place_id,
|
|
a.dateAdded,
|
|
a.lastModified,
|
|
n.name,
|
|
a.content,
|
|
p.url,
|
|
p.title
|
|
FROM moz_annos a
|
|
JOIN moz_anno_attributes n ON a.anno_attribute_id = n.id
|
|
JOIN moz_places p ON a.place_id = p.id
|
|
WHERE a.dateAdded IS NOT NULL
|
|
ORDER BY a.dateAdded
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
for row in results:
|
|
anno_id, place_id, date_added, last_modified, name, content, url, title = row
|
|
|
|
# Annotation added
|
|
if date_added:
|
|
try:
|
|
added_us, added_iso = convert_gecko_timestamp(date_added)
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Page Annotation Added',
|
|
'message': f"Added annotation '{name}' to: {title or url}",
|
|
'data_type': 'browser:annotation:added',
|
|
'browser': browser_name,
|
|
'annotation_id': anno_id,
|
|
'annotation_name': name,
|
|
'annotation_content': content[:100] + '...' if content and len(content) > 100 else content,
|
|
'url': url or "",
|
|
'title': title or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Annotation modified (if different)
|
|
if last_modified and last_modified != date_added:
|
|
try:
|
|
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
|
|
rows.append({
|
|
'timestamp': modified_us,
|
|
'datetime': modified_iso,
|
|
'timestamp_desc': 'Page Annotation Modified',
|
|
'message': f"Modified annotation '{name}' on: {title or url}",
|
|
'data_type': 'browser:annotation:modified',
|
|
'browser': browser_name,
|
|
'annotation_id': anno_id,
|
|
'annotation_name': name,
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
except sqlite3.Error:
|
|
pass
|
|
|
|
# Bookmark annotations
|
|
if table_exists(conn, 'moz_items_annos'):
|
|
cursor = conn.cursor()
|
|
query = """
|
|
SELECT
|
|
ia.id,
|
|
ia.item_id,
|
|
ia.dateAdded,
|
|
ia.lastModified,
|
|
n.name,
|
|
ia.content,
|
|
b.title
|
|
FROM moz_items_annos ia
|
|
JOIN moz_anno_attributes n ON ia.anno_attribute_id = n.id
|
|
JOIN moz_bookmarks b ON ia.item_id = b.id
|
|
WHERE ia.dateAdded IS NOT NULL
|
|
ORDER BY ia.dateAdded
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
for row in results:
|
|
anno_id, item_id, date_added, last_modified, name, content, title = row
|
|
|
|
# Annotation added
|
|
if date_added:
|
|
try:
|
|
added_us, added_iso = convert_gecko_timestamp(date_added)
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Bookmark Annotation Added',
|
|
'message': f"Added annotation '{name}' to bookmark: {title}",
|
|
'data_type': 'browser:annotation:added',
|
|
'browser': browser_name,
|
|
'annotation_id': anno_id,
|
|
'annotation_name': name,
|
|
'annotation_content': content[:100] + '...' if content and len(content) > 100 else content,
|
|
'bookmark_title': title or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Annotation modified (if different)
|
|
if last_modified and last_modified != date_added:
|
|
try:
|
|
modified_us, modified_iso = convert_gecko_timestamp(last_modified)
|
|
rows.append({
|
|
'timestamp': modified_us,
|
|
'datetime': modified_iso,
|
|
'timestamp_desc': 'Bookmark Annotation Modified',
|
|
'message': f"Modified annotation '{name}' on bookmark: {title}",
|
|
'data_type': 'browser:annotation:modified',
|
|
'browser': browser_name,
|
|
'annotation_id': anno_id,
|
|
'annotation_name': name
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
except sqlite3.Error:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_metadata(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract page metadata/engagement events from Gecko database."""
|
|
if not table_exists(conn, 'moz_places_metadata'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
m.place_id,
|
|
m.created_at,
|
|
m.updated_at,
|
|
m.total_view_time,
|
|
m.typing_time,
|
|
m.key_presses,
|
|
m.scrolling_time,
|
|
m.scrolling_distance,
|
|
m.document_type,
|
|
p.url,
|
|
p.title
|
|
FROM moz_places_metadata m
|
|
JOIN moz_places p ON m.place_id = p.id
|
|
WHERE m.created_at > 0
|
|
ORDER BY m.created_at
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
(place_id, created_at, updated_at, total_view_time, typing_time,
|
|
key_presses, scrolling_time, scrolling_distance, document_type,
|
|
url, title) = row
|
|
|
|
try:
|
|
created_us, created_iso = convert_gecko_timestamp(created_at)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
# Convert microseconds to seconds for display
|
|
view_seconds = (total_view_time or 0) / 1000000
|
|
typing_seconds = (typing_time or 0) / 1000000
|
|
scrolling_seconds = (scrolling_time or 0) / 1000000
|
|
|
|
rows.append({
|
|
'timestamp': created_us,
|
|
'datetime': created_iso,
|
|
'timestamp_desc': 'Page Engagement',
|
|
'message': f"Engaged with: {title or '(No title)'} ({view_seconds:.1f}s)",
|
|
'data_type': 'browser:page:engagement',
|
|
'browser': browser_name,
|
|
'url': url or "",
|
|
'title': title or "(No title)",
|
|
'total_view_time_seconds': view_seconds,
|
|
'typing_time_seconds': typing_seconds,
|
|
'key_presses': key_presses or 0,
|
|
'scrolling_time_seconds': scrolling_seconds,
|
|
'scrolling_distance': scrolling_distance or 0,
|
|
'document_type': document_type
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_input_history(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract address bar input history from Gecko database."""
|
|
if not table_exists(conn, 'moz_inputhistory'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
ih.place_id,
|
|
ih.input,
|
|
ih.use_count,
|
|
p.url,
|
|
p.title,
|
|
p.last_visit_date
|
|
FROM moz_inputhistory ih
|
|
JOIN moz_places p ON ih.place_id = p.id
|
|
WHERE p.last_visit_date IS NOT NULL
|
|
ORDER BY p.last_visit_date
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
place_id, input_text, use_count, url, title, last_visit = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Address Bar Input',
|
|
'message': f"Typed in address bar: {input_text}",
|
|
'data_type': 'browser:addressbar:input',
|
|
'browser': browser_name,
|
|
'input_text': input_text or "",
|
|
'matched_url': url or "",
|
|
'matched_title': title or "",
|
|
'use_count': use_count or 0
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_keywords(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract custom search keywords from Gecko database."""
|
|
if not table_exists(conn, 'moz_keywords'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check if dateAdded column exists (not in all Firefox versions)
|
|
if not column_exists(conn, 'moz_keywords', 'dateAdded'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
k.id,
|
|
k.keyword,
|
|
k.dateAdded,
|
|
p.url,
|
|
p.title
|
|
FROM moz_keywords k
|
|
LEFT JOIN moz_places p ON k.place_id = p.id
|
|
WHERE k.dateAdded IS NOT NULL
|
|
ORDER BY k.dateAdded
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
keyword_id, keyword, date_added, url, title = row
|
|
|
|
try:
|
|
added_us, added_iso = convert_gecko_timestamp(date_added)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Keyword Added',
|
|
'message': f"Added search keyword: {keyword}",
|
|
'data_type': 'browser:keyword:added',
|
|
'browser': browser_name,
|
|
'keyword_id': keyword_id,
|
|
'keyword': keyword or "",
|
|
'search_url': url or "",
|
|
'title': title or ""
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
def extract_gecko_origins(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract origin (domain) tracking data from Gecko database."""
|
|
if not table_exists(conn, 'moz_origins'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for last_visit_date column
|
|
if not column_exists(conn, 'moz_origins', 'last_visit_date'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
prefix,
|
|
host,
|
|
frecency,
|
|
last_visit_date
|
|
FROM moz_origins
|
|
WHERE last_visit_date IS NOT NULL
|
|
ORDER BY last_visit_date
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
origin_id, prefix, host, frecency, last_visit = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_gecko_timestamp(last_visit)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
full_origin = f"{prefix}{host}" if prefix else host
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Domain Visited',
|
|
'message': f"Visited domain: {full_origin}",
|
|
'data_type': 'browser:domain:visit',
|
|
'browser': browser_name,
|
|
'origin': full_origin or "",
|
|
'host': host or "",
|
|
'prefix': prefix or "",
|
|
'frecency_score': frecency or 0
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
# ============================================================================
|
|
# WEBKIT/SAFARI EXTRACTORS
|
|
# ============================================================================
|
|
|
|
def extract_webkit_visits(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract visit events from WebKit database with resolved redirect chains."""
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
hv.visit_time,
|
|
hi.url,
|
|
hi.title,
|
|
hv.title as visit_title,
|
|
hv.load_successful,
|
|
hv.http_non_get,
|
|
hi.visit_count,
|
|
redirect_src_items.url as redirect_source_url,
|
|
redirect_dst_items.url as redirect_destination_url
|
|
FROM history_visits hv
|
|
JOIN history_items hi ON hv.history_item = hi.id
|
|
LEFT JOIN history_visits redirect_src ON hv.redirect_source = redirect_src.id
|
|
LEFT JOIN history_items redirect_src_items ON redirect_src.history_item = redirect_src_items.id
|
|
LEFT JOIN history_visits redirect_dst ON hv.redirect_destination = redirect_dst.id
|
|
LEFT JOIN history_items redirect_dst_items ON redirect_dst.history_item = redirect_dst_items.id
|
|
ORDER BY hv.visit_time
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
|
|
rows = []
|
|
for row in results:
|
|
(webkit_timestamp, url, title, visit_title,
|
|
load_successful, http_non_get, visit_count,
|
|
redirect_source_url, redirect_destination_url) = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_webkit_timestamp(webkit_timestamp)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
display_title = title or visit_title or "(No title)"
|
|
|
|
message = f"Visited: {display_title}"
|
|
if not load_successful:
|
|
message += " [FAILED TO LOAD]"
|
|
if http_non_get:
|
|
message += " [POST/Form]"
|
|
|
|
# Build row with only useful fields
|
|
row_data = {
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Visit Time',
|
|
'message': message,
|
|
'data_type': 'browser:page:visit',
|
|
'browser': browser_name,
|
|
'url': url or "",
|
|
'title': display_title,
|
|
'load_successful': bool(load_successful),
|
|
'http_post': bool(http_non_get),
|
|
'total_visit_count': visit_count or 0
|
|
}
|
|
|
|
# Add redirect chain info if present
|
|
if redirect_source_url:
|
|
row_data['redirect_source_url'] = redirect_source_url
|
|
if redirect_destination_url:
|
|
row_data['redirect_destination_url'] = redirect_destination_url
|
|
|
|
rows.append(row_data)
|
|
|
|
return rows
|
|
|
|
|
|
def extract_webkit_bookmarks(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract bookmarks from WebKit database."""
|
|
if not table_exists(conn, 'bookmarks'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for date_added column
|
|
if not column_exists(conn, 'bookmarks', 'date_added'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
title,
|
|
url,
|
|
date_added,
|
|
date_last_modified
|
|
FROM bookmarks
|
|
WHERE date_added > 0
|
|
ORDER BY date_added
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
bm_id, title, url, date_added, date_modified = row
|
|
|
|
# Bookmark added
|
|
if date_added:
|
|
try:
|
|
added_us, added_iso = convert_webkit_timestamp(date_added)
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Bookmark Added',
|
|
'message': f"Bookmarked: {title or url}",
|
|
'data_type': 'browser:bookmark:added',
|
|
'browser': browser_name,
|
|
'bookmark_id': bm_id,
|
|
'bookmark_title': title or "",
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Bookmark modified (if different)
|
|
if date_modified and date_modified != date_added:
|
|
try:
|
|
modified_us, modified_iso = convert_webkit_timestamp(date_modified)
|
|
rows.append({
|
|
'timestamp': modified_us,
|
|
'datetime': modified_iso,
|
|
'timestamp_desc': 'Bookmark Modified',
|
|
'message': f"Modified bookmark: {title or url}",
|
|
'data_type': 'browser:bookmark:modified',
|
|
'browser': browser_name,
|
|
'bookmark_id': bm_id,
|
|
'bookmark_title': title or "",
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_webkit_downloads(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract downloads from WebKit database."""
|
|
if not table_exists(conn, 'downloads'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for date_started column
|
|
if not column_exists(conn, 'downloads', 'date_started'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
url,
|
|
path,
|
|
mime_type,
|
|
bytes_received,
|
|
total_bytes,
|
|
date_started,
|
|
date_finished
|
|
FROM downloads
|
|
WHERE date_started > 0
|
|
ORDER BY date_started
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
dl_id, url, path, mime_type, bytes_received, total_bytes, date_started, date_finished = row
|
|
|
|
filename = Path(path).name if path else "(unknown)"
|
|
|
|
# Download started
|
|
if date_started:
|
|
try:
|
|
started_us, started_iso = convert_webkit_timestamp(date_started)
|
|
rows.append({
|
|
'timestamp': started_us,
|
|
'datetime': started_iso,
|
|
'timestamp_desc': 'Download Started',
|
|
'message': f"Download started: {filename} ({mime_type or 'unknown type'})",
|
|
'data_type': 'browser:download:start',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': filename,
|
|
'source_url': url or "",
|
|
'file_path': path or "",
|
|
'file_size_bytes': total_bytes or 0,
|
|
'mime_type': mime_type or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Download finished (if different)
|
|
if date_finished and date_finished != date_started:
|
|
try:
|
|
finished_us, finished_iso = convert_webkit_timestamp(date_finished)
|
|
duration_seconds = (finished_us - started_us) / 1000000 if date_started else 0
|
|
rows.append({
|
|
'timestamp': finished_us,
|
|
'datetime': finished_iso,
|
|
'timestamp_desc': 'Download Completed',
|
|
'message': f"Download completed: {filename} ({bytes_received or 0} bytes in {duration_seconds:.1f}s)",
|
|
'data_type': 'browser:download:complete',
|
|
'browser': browser_name,
|
|
'download_id': dl_id,
|
|
'filename': filename,
|
|
'file_path': path or "",
|
|
'file_size_bytes': bytes_received or 0,
|
|
'mime_type': mime_type or "",
|
|
'download_duration_seconds': duration_seconds
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_webkit_reading_list(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract Reading List items from WebKit database."""
|
|
if not table_exists(conn, 'reading_list'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for date_added column
|
|
if not column_exists(conn, 'reading_list', 'date_added'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
title,
|
|
url,
|
|
date_added,
|
|
date_last_viewed
|
|
FROM reading_list
|
|
WHERE date_added > 0
|
|
ORDER BY date_added
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
item_id, title, url, date_added, date_viewed = row
|
|
|
|
# Reading list item added
|
|
if date_added:
|
|
try:
|
|
added_us, added_iso = convert_webkit_timestamp(date_added)
|
|
rows.append({
|
|
'timestamp': added_us,
|
|
'datetime': added_iso,
|
|
'timestamp_desc': 'Reading List Item Added',
|
|
'message': f"Added to Reading List: {title or url}",
|
|
'data_type': 'browser:readinglist:added',
|
|
'browser': browser_name,
|
|
'reading_list_id': item_id,
|
|
'title': title or "",
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
# Reading list item viewed (if present)
|
|
if date_viewed and date_viewed > 0:
|
|
try:
|
|
viewed_us, viewed_iso = convert_webkit_timestamp(date_viewed)
|
|
rows.append({
|
|
'timestamp': viewed_us,
|
|
'datetime': viewed_iso,
|
|
'timestamp_desc': 'Reading List Item Viewed',
|
|
'message': f"Viewed Reading List item: {title or url}",
|
|
'data_type': 'browser:readinglist:viewed',
|
|
'browser': browser_name,
|
|
'reading_list_id': item_id,
|
|
'title': title or "",
|
|
'url': url or ""
|
|
})
|
|
except TimestampValidationError:
|
|
pass
|
|
|
|
return rows
|
|
|
|
|
|
def extract_webkit_top_sites(conn: sqlite3.Connection, browser_name: str) -> List[Dict[str, Any]]:
|
|
"""Extract Top Sites data from WebKit database."""
|
|
if not table_exists(conn, 'top_sites'):
|
|
return []
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Check for last_visited column
|
|
if not column_exists(conn, 'top_sites', 'last_visited'):
|
|
return []
|
|
|
|
query = """
|
|
SELECT
|
|
id,
|
|
url,
|
|
title,
|
|
visit_count,
|
|
last_visited
|
|
FROM top_sites
|
|
WHERE last_visited > 0
|
|
ORDER BY last_visited
|
|
"""
|
|
|
|
try:
|
|
cursor.execute(query)
|
|
results = cursor.fetchall()
|
|
except sqlite3.Error:
|
|
return []
|
|
|
|
rows = []
|
|
for row in results:
|
|
site_id, url, title, visit_count, last_visited = row
|
|
|
|
try:
|
|
unix_microseconds, iso_datetime = convert_webkit_timestamp(last_visited)
|
|
except TimestampValidationError:
|
|
continue
|
|
|
|
rows.append({
|
|
'timestamp': unix_microseconds,
|
|
'datetime': iso_datetime,
|
|
'timestamp_desc': 'Top Site Last Visited',
|
|
'message': f"Top site visited: {title or url}",
|
|
'data_type': 'browser:topsite:visit',
|
|
'browser': browser_name,
|
|
'site_id': site_id,
|
|
'url': url or "",
|
|
'title': title or "",
|
|
'visit_count': visit_count or 0
|
|
})
|
|
|
|
return rows
|
|
|
|
|
|
# ============================================================================
|
|
# MAIN EXTRACTION ORCHESTRATION
|
|
# ============================================================================
|
|
|
|
def extract_all_events(db_path: str, browser_type: str, browser_name: Optional[str] = None) -> Tuple[List[Dict[str, Any]], Dict[str, int]]:
|
|
"""
|
|
Extract ALL timeline events from browser database.
|
|
|
|
Returns:
|
|
Tuple of (all_rows, event_counts dictionary)
|
|
"""
|
|
if browser_name is None:
|
|
browser_name = {'gecko': 'Firefox', 'chromium': 'Chromium', 'webkit': 'Safari'}[browser_type]
|
|
|
|
conn = connect_database_readonly(db_path)
|
|
all_rows = []
|
|
event_counts = {}
|
|
|
|
print(f"Extracting events from {browser_name} database...")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
if browser_type == 'gecko':
|
|
# Firefox/Gecko - extract all event types
|
|
extractors = [
|
|
('Page Visits', extract_gecko_visits),
|
|
('Bookmarks', extract_gecko_bookmarks),
|
|
('Downloads', extract_gecko_downloads),
|
|
('Form History', extract_gecko_form_history),
|
|
('Annotations', extract_gecko_annotations),
|
|
('Page Engagement', extract_gecko_metadata),
|
|
('Address Bar Input', extract_gecko_input_history),
|
|
('Search Keywords', extract_gecko_keywords),
|
|
('Domain Tracking', extract_gecko_origins),
|
|
]
|
|
|
|
elif browser_type == 'chromium':
|
|
# Chromium - extract all event types
|
|
extractors = [
|
|
('Page Visits', extract_chromium_visits),
|
|
('Downloads', extract_chromium_downloads),
|
|
('Search Queries', extract_chromium_search_terms),
|
|
('Form Autofill', extract_chromium_autofill),
|
|
('Favicons', extract_chromium_favicons),
|
|
('Media Playback', extract_chromium_media_history),
|
|
('Site Engagement', extract_chromium_site_engagement),
|
|
]
|
|
|
|
elif browser_type == 'webkit':
|
|
# Safari/WebKit - extract all event types
|
|
extractors = [
|
|
('Page Visits', extract_webkit_visits),
|
|
('Bookmarks', extract_webkit_bookmarks),
|
|
('Downloads', extract_webkit_downloads),
|
|
('Reading List', extract_webkit_reading_list),
|
|
('Top Sites', extract_webkit_top_sites),
|
|
]
|
|
|
|
# Run all extractors
|
|
for name, extractor_func in extractors:
|
|
try:
|
|
events = extractor_func(conn, browser_name)
|
|
all_rows.extend(events)
|
|
event_counts[name] = len(events)
|
|
print(f" ✓ {name:25} {len(events):>7,} events")
|
|
except Exception as e:
|
|
print(f" ✗ {name:25} Error: {e}")
|
|
event_counts[name] = 0
|
|
|
|
finally:
|
|
conn.close()
|
|
|
|
# Sort all events by timestamp
|
|
all_rows.sort(key=lambda x: x['timestamp'])
|
|
|
|
print("=" * 60)
|
|
|
|
return all_rows, event_counts
|
|
|
|
|
|
def generate_default_output_filename(browser_type: str, input_path: str) -> str:
|
|
"""Generate a sensible default output filename based on browser type and input."""
|
|
path_lower = input_path.lower()
|
|
|
|
browser_names = {
|
|
'firefox': 'firefox', 'chrome': 'chrome', 'edge': 'edge',
|
|
'brave': 'brave', 'opera': 'opera', 'vivaldi': 'vivaldi', 'safari': 'safari',
|
|
}
|
|
|
|
detected_name = None
|
|
for name_key, name_value in browser_names.items():
|
|
if name_key in path_lower:
|
|
detected_name = name_value
|
|
break
|
|
|
|
if detected_name:
|
|
return f"{detected_name}_timeline_timesketch.csv"
|
|
else:
|
|
return f"{browser_type}_timeline_timesketch.csv"
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description='Convert ALL browser events to Timesketch CSV format',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Extracts ALL timestamped events from browser databases:
|
|
|
|
CHROMIUM (Chrome, Edge, Brave, Opera, Vivaldi):
|
|
- Page visits (with metadata and navigation chains)
|
|
- Downloads (start, completion, access times)
|
|
- Search queries
|
|
- Form autofill data (first use, last use)
|
|
- Favicon updates
|
|
- Media playback history
|
|
- Site engagement scores
|
|
|
|
GECKO/FIREFOX:
|
|
- Page visits (with metadata and navigation chains)
|
|
- Bookmarks (added, modified)
|
|
- Downloads
|
|
- Form history (first use, last use)
|
|
- Page and bookmark annotations
|
|
- Page engagement metrics (view time, scrolling, typing)
|
|
- Address bar input history
|
|
- Custom search keywords
|
|
- Domain-level tracking data
|
|
|
|
WEBKIT/SAFARI:
|
|
- Page visits (with redirect chains)
|
|
- Bookmarks (added, modified)
|
|
- Downloads (start, completion)
|
|
- Reading List items (added, viewed)
|
|
- Top Sites tracking
|
|
|
|
Browser types:
|
|
gecko, firefox - Gecko-based browsers (Firefox)
|
|
chromium - Chromium-based browsers (Chrome, Edge, Brave, etc.)
|
|
webkit, safari - WebKit-based browsers (Safari)
|
|
auto - Auto-detect browser type (default)
|
|
|
|
Output Format:
|
|
- Browser-agnostic Timesketch-compatible CSV
|
|
- Consistent event naming across all browsers
|
|
- All timestamps in Unix microseconds + ISO 8601
|
|
|
|
Example usage:
|
|
# Auto-detect and extract everything
|
|
python browser2timesketch_complete.py -i /path/to/History
|
|
|
|
# Specify browser and output
|
|
python browser2timesketch_complete.py -b firefox -i places.sqlite -o output.csv
|
|
|
|
# Custom browser name (e.g., for Brave, Edge)
|
|
python browser2timesketch_complete.py --browser-name "Brave" -i History
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-b', '--browser',
|
|
choices=['gecko', 'firefox', 'chromium', 'webkit', 'safari', 'auto'],
|
|
default='auto',
|
|
help='Browser engine type (default: auto-detect)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-i', '--input',
|
|
required=True,
|
|
help='Path to browser history database'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'-o', '--output',
|
|
help='Output CSV file path (default: auto-generated)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--browser-name',
|
|
help='Custom browser name for the browser field (e.g., "Brave", "Edge")'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
# Validate database file
|
|
print(f"Validating database: {args.input}")
|
|
validate_sqlite_database(args.input)
|
|
print("✓ Database is valid SQLite file\n")
|
|
|
|
# Detect or validate browser type
|
|
browser_type = args.browser.lower()
|
|
|
|
if browser_type == 'auto':
|
|
print("Auto-detecting browser type...")
|
|
browser_type = detect_browser_type(args.input)
|
|
print(f"✓ Detected browser type: {browser_type}\n")
|
|
else:
|
|
if browser_type == 'firefox':
|
|
browser_type = 'gecko'
|
|
elif browser_type == 'safari':
|
|
browser_type = 'webkit'
|
|
|
|
detected_type = detect_browser_type(args.input)
|
|
if detected_type != browser_type:
|
|
print(f"Warning: You specified '{args.browser}' but database appears to be '{detected_type}'",
|
|
file=sys.stderr)
|
|
response = input("Continue anyway? [y/N]: ")
|
|
if response.lower() != 'y':
|
|
return 1
|
|
|
|
# Generate output filename if not provided
|
|
if args.output:
|
|
output_csv = args.output
|
|
else:
|
|
output_csv = generate_default_output_filename(browser_type, args.input)
|
|
print(f"Using output filename: {output_csv}\n")
|
|
|
|
# Extract all events
|
|
all_rows, event_counts = extract_all_events(args.input, browser_type, args.browser_name)
|
|
|
|
if not all_rows:
|
|
print("\n❌ No events found in database!")
|
|
return 1
|
|
|
|
# Write to CSV
|
|
print(f"\nWriting {len(all_rows):,} total events to CSV...")
|
|
write_timesketch_csv(output_csv, all_rows)
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("EXTRACTION COMPLETE")
|
|
print("=" * 60)
|
|
print(f"Total events: {len(all_rows):,}")
|
|
print("\nEvent breakdown:")
|
|
for event_type, count in sorted(event_counts.items()):
|
|
if count > 0:
|
|
print(f" • {event_type:25} {count:>7,} events")
|
|
print(f"\n✓ Output saved to: {output_csv}")
|
|
print(f"✓ Format: Browser-agnostic Timesketch CSV")
|
|
print("=" * 60)
|
|
|
|
return 0
|
|
|
|
except DatabaseValidationError as e:
|
|
print(f"\n❌ Database Validation Error: {e}", file=sys.stderr)
|
|
return 1
|
|
except BrowserDetectionError as e:
|
|
print(f"\n❌ Browser Detection Error: {e}", file=sys.stderr)
|
|
return 1
|
|
except sqlite3.Error as e:
|
|
print(f"\n❌ Database Error: {e}", file=sys.stderr)
|
|
return 1
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Operation cancelled by user", file=sys.stderr)
|
|
return 130
|
|
except Exception as e:
|
|
print(f"\n❌ Unexpected Error: {e}", file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |