diff --git a/test/script/regression.py b/test/script/regression.py
index 1b42f623bc..e898bec3e5 100644
--- a/test/script/regression.py
+++ b/test/script/regression.py
@@ -16,18 +16,22 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import sys
+import subprocess
+import os
+
+# import db_diff
+sys.path.insert(0, os.path.join(subprocess.getoutput("cygpath -u $TSK_HOME"), "db_diff"))
from tskdbdiff import TskDbDiff, TskDbDiffException, PGSettings
import codecs
import datetime
import logging
-import os
+
import re
import shutil
import socket
import sqlite3
-import subprocess
-import sys
from sys import platform as _platform
import time
import traceback
diff --git a/test/script/tskdbdiff.py b/test/script/tskdbdiff.py
deleted file mode 100644
index f0aae2c2a5..0000000000
--- a/test/script/tskdbdiff.py
+++ /dev/null
@@ -1,1200 +0,0 @@
-# Requires python3
-
-import re
-import sqlite3
-import subprocess
-import shutil
-import os
-import codecs
-import datetime
-import sys
-from typing import Callable, Dict, Union, List
-
-import psycopg2
-import psycopg2.extras
-import socket
-import csv
-
-class TskDbDiff(object):
- """Compares two TSK/Autospy SQLite databases.
-
- Attributes:
- gold_artifacts:
- autopsy_artifacts:
- gold_attributes:
- autopsy_attributes:
- gold_objects:
- autopsy_objects:
- artifact_comparison:
- attribute_comparision:
- report_errors: a listof_listof_String, the error messages that will be
- printed to screen in the run_diff method
- passed: a boolean, did the diff pass?
- autopsy_db_file:
- gold_db_file:
- """
- def __init__(self, output_db, gold_db, output_dir=None, gold_bb_dump=None, gold_dump=None, verbose=False, isMultiUser=False, pgSettings=None):
- """Constructor for TskDbDiff.
-
- Args:
- output_db_path: path to output database (non-gold standard)
- gold_db_path: path to gold database
- output_dir: (optional) Path to folder where generated files will be put.
- gold_bb_dump: (optional) path to file where the gold blackboard dump is located
- gold_dump: (optional) path to file where the gold non-blackboard dump is located
- verbose: (optional) a boolean, if true, diff results are sent to stdout.
- """
-
- self.output_db_file = output_db
- self.gold_db_file = gold_db
- self.output_dir = output_dir
- self.gold_bb_dump = gold_bb_dump
- self.gold_dump = gold_dump
- self._generate_gold_dump = False
- self._generate_gold_bb_dump = False
- self._bb_dump_diff = ""
- self._dump_diff = ""
- self._bb_dump = ""
- self._dump = ""
- self.verbose = verbose
- self.isMultiUser = isMultiUser
- self.pgSettings = pgSettings
-
- if self.isMultiUser and not self.pgSettings:
- print("Missing PostgreSQL database connection settings data.")
- sys.exit(1)
-
- if self.gold_bb_dump is None:
- self._generate_gold_bb_dump = True
- if self.gold_dump is None:
- self._generate_gold_dump = True
-
- def run_diff(self):
- """Compare the databases.
-
- Raises:
- TskDbDiffException: if an error occurs while diffing or dumping the database
- """
-
- self._init_diff()
- id_obj_path_table = -1
- # generate the gold database dumps if necessary
- if self._generate_gold_dump:
- id_obj_path_table = TskDbDiff._dump_output_db_nonbb(self.gold_db_file, self.gold_dump, self.isMultiUser, self.pgSettings)
- if self._generate_gold_bb_dump:
- TskDbDiff._dump_output_db_bb(self.gold_db_file, self.gold_bb_dump, self.isMultiUser, self.pgSettings, id_obj_path_table)
-
- # generate the output database dumps (both DB and BB)
- id_obj_path_table = TskDbDiff._dump_output_db_nonbb(self.output_db_file, self._dump, self.isMultiUser, self.pgSettings)
- TskDbDiff._dump_output_db_bb(self.output_db_file, self._bb_dump, self.isMultiUser, self.pgSettings, id_obj_path_table)
-
- # Compare non-BB
- dump_diff_pass = self._diff(self._dump, self.gold_dump, self._dump_diff)
-
- # Compare BB
- bb_dump_diff_pass = self._diff(self._bb_dump, self.gold_bb_dump, self._bb_dump_diff)
-
- self._cleanup_diff()
- return dump_diff_pass, bb_dump_diff_pass
-
-
- def _init_diff(self):
- """Set up the necessary files based on the arguments given at construction"""
- if self.output_dir is None:
- # No stored files
- self._bb_dump = TskDbDiff._get_tmp_file("BlackboardDump", ".txt")
- self._bb_dump_diff = TskDbDiff._get_tmp_file("BlackboardDump-Diff", ".txt")
- self._dump = TskDbDiff._get_tmp_file("DBDump", ".txt")
- self._dump_diff = TskDbDiff._get_tmp_file("DBDump-Diff", ".txt")
- else:
- self._bb_dump = os.path.join(self.output_dir, "BlackboardDump.txt")
- self._bb_dump_diff = os.path.join(self.output_dir, "BlackboardDump-Diff.txt")
- self._dump = os.path.join(self.output_dir, "DBDump.txt")
- self._dump_diff = os.path.join(self.output_dir, "DBDump-Diff.txt")
-
- # Sorting gold before comparing (sort behaves differently in different environments)
- new_bb = TskDbDiff._get_tmp_file("GoldBlackboardDump", ".txt")
- new_db = TskDbDiff._get_tmp_file("GoldDBDump", ".txt")
- if self.gold_bb_dump is not None:
- srtcmdlst = ["sort", self.gold_bb_dump, "-o", new_bb]
- subprocess.call(srtcmdlst)
- srtcmdlst = ["sort", self.gold_dump, "-o", new_db]
- subprocess.call(srtcmdlst)
- self.gold_bb_dump = new_bb
- self.gold_dump = new_db
-
-
- def _cleanup_diff(self):
- if self.output_dir is None:
- #cleanup temp files
- os.remove(self._dump)
- os.remove(self._bb_dump)
- if os.path.isfile(self._dump_diff):
- os.remove(self._dump_diff)
- if os.path.isfile(self._bb_dump_diff):
- os.remove(self._bb_dump_diff)
-
- if self.gold_bb_dump is None:
- os.remove(self.gold_bb_dump)
- os.remove(self.gold_dump)
-
-
- def _diff(self, output_file, gold_file, diff_path):
- """Compare two text files.
-
- Args:
- output_file: a pathto_File, the latest text file
- gold_file: a pathto_File, the gold text file
- diff_path: The file to write the differences to
- Returns False if different
- """
-
- if (not os.path.isfile(output_file)):
- return False
-
- if (not os.path.isfile(gold_file)):
- return False
-
- # It is faster to read the contents in and directly compare
- output_data = codecs.open(output_file, "r", "utf_8").read()
- gold_data = codecs.open(gold_file, "r", "utf_8").read()
- if (gold_data == output_data):
- return True
-
- # If they are different, invoke 'diff'
- diff_file = codecs.open(diff_path, "wb", "utf_8")
- # Gold needs to be passed in as 1st arg and output as 2nd
- dffcmdlst = ["diff", gold_file, output_file]
- subprocess.call(dffcmdlst, stdout = diff_file)
-
- # create file path for gold files inside output folder. In case of diff, both gold and current run files
- # are available in the report output folder. Prefix Gold- is added to the filename.
- gold_file_in_output_dir = os.path.join(os.path.dirname(output_file), "Gold-" + os.path.basename(output_file))
- shutil.copy(gold_file, gold_file_in_output_dir)
-
- return False
-
-
- @staticmethod
- def _get_associated_artifact_type(cur, artifact_id, isMultiUser):
- if isMultiUser:
- cur.execute(
- "SELECT tsk_files.parent_path, blackboard_artifact_types.display_name FROM blackboard_artifact_types INNER JOIN blackboard_artifacts ON blackboard_artifact_types.artifact_type_id = blackboard_artifacts.artifact_type_id INNER JOIN tsk_files ON tsk_files.obj_id = blackboard_artifacts.obj_id WHERE artifact_id=%s",
- [artifact_id])
- else:
- cur.execute(
- "SELECT tsk_files.parent_path, blackboard_artifact_types.display_name FROM blackboard_artifact_types INNER JOIN blackboard_artifacts ON blackboard_artifact_types.artifact_type_id = blackboard_artifacts.artifact_type_id INNER JOIN tsk_files ON tsk_files.obj_id = blackboard_artifacts.obj_id WHERE artifact_id=?",
- [artifact_id])
-
- info = cur.fetchone()
-
- return "File path: " + info[0] + " Artifact Type: " + info[1]
-
-
- @staticmethod
- def _dump_output_db_bb(db_file, bb_dump_file, isMultiUser, pgSettings, id_obj_path_table):
- """Dumps sorted text results to the given output location.
-
- Smart method that deals with a blackboard comparison to avoid issues
- with different IDs based on when artifacts were created.
-
- Args:
- db_file: a pathto_File, the output database.
- bb_dump_file: a pathto_File, the sorted dump file to write to
- """
-
- unsorted_dump = TskDbDiff._get_tmp_file("dump_data", ".txt")
- if isMultiUser:
- conn, unused_db = db_connect(db_file, isMultiUser, pgSettings)
- artifact_cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- else: # Use Sqlite
- conn = sqlite3.connect(db_file)
- conn.text_factory = lambda x: x.decode("utf-8", "ignore")
- conn.row_factory = sqlite3.Row
- artifact_cursor = conn.cursor()
- # Get the list of all artifacts (along with type and associated file)
- # @@@ Could add a SORT by parent_path in here since that is how we are going to later sort it.
- artifact_cursor.execute("SELECT tsk_files.parent_path, tsk_files.name, blackboard_artifact_types.display_name, blackboard_artifacts.artifact_id FROM blackboard_artifact_types INNER JOIN blackboard_artifacts ON blackboard_artifact_types.artifact_type_id = blackboard_artifacts.artifact_type_id INNER JOIN tsk_files ON tsk_files.obj_id = blackboard_artifacts.obj_id")
- database_log = codecs.open(unsorted_dump, "wb", "utf_8")
- row = artifact_cursor.fetchone()
- appnd = False
- counter = 0
- artifact_count = 0
- artifact_fail = 0
-
- # Cycle through artifacts
- try:
- while (row != None):
-
- # File Name and artifact type
- # Remove parent object ID from Unalloc file name
- normalizedName = re.sub('^Unalloc_[0-9]+_', 'Unalloc_', row["name"])
- if(row["parent_path"] != None):
- database_log.write(row["parent_path"] + normalizedName + ' ')
- else:
- database_log.write(normalizedName + ' ')
-
- if isMultiUser:
- attribute_cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- else:
- attribute_cursor = conn.cursor()
- looptry = True
- artifact_count += 1
- try:
- art_id = ""
- art_id = str(row["artifact_id"])
-
- # Get attributes for this artifact
- if isMultiUser:
- attribute_cursor.execute("SELECT blackboard_attributes.source, blackboard_attributes.attribute_type_id, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double FROM blackboard_attributes INNER JOIN blackboard_attribute_types ON blackboard_attributes.attribute_type_id = blackboard_attribute_types.attribute_type_id WHERE artifact_id = %s ORDER BY blackboard_attributes.source, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double", [art_id])
- else:
- attribute_cursor.execute("SELECT blackboard_attributes.source, blackboard_attributes.attribute_type_id, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double FROM blackboard_attributes INNER JOIN blackboard_attribute_types ON blackboard_attributes.attribute_type_id = blackboard_attribute_types.attribute_type_id WHERE artifact_id =? ORDER BY blackboard_attributes.source, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double", [art_id])
-
- attributes = attribute_cursor.fetchall()
-
- # Print attributes
- if (len(attributes) == 0):
- # @@@@ This should be
- database_log.write(' \n')
- row = artifact_cursor.fetchone()
- continue
-
- src = attributes[0][0]
- for attr in attributes:
- numvals = 0
- for x in range(3, 6):
- if(attr[x] != None):
- numvals += 1
- if(numvals > 1):
- msg = "There were too many values for attribute type: " + attr["display_name"] + " for artifact with id #" + str(row["artifact_id"]) + ".\n"
-
- if(not attr["source"] == src):
- msg = "There were inconsistent sources for artifact with id #" + str(row["artifact_id"]) + ".\n"
-
- try:
- if attr["value_type"] == 0:
- attr_value_as_string = str(attr["value_text"])
- elif attr["value_type"] == 1:
- attr_value_as_string = str(attr["value_int32"])
- elif attr["value_type"] == 2:
- attr_value_as_string = str(attr["value_int64"])
- if attr["attribute_type_id"] == 36 and id_obj_path_table != -1 and int(attr_value_as_string) > 0: #normalize positive TSK_PATH_IDs from being object id to a path if the obj_id_path_table was generated
- attr_value_as_string = id_obj_path_table[int(attr_value_as_string)]
- elif attr["value_type"] == 3:
- attr_value_as_string = "%20.10f" % float((attr["value_double"])) #use exact format from db schema to avoid python auto format double value to (0E-10) scientific style
- elif attr["value_type"] == 4:
- attr_value_as_string = "bytes"
- elif attr["value_type"] == 5:
- attr_value_as_string = str(attr["value_int64"])
- if attr["display_name"] == "Associated Artifact":
- attr_value_as_string = TskDbDiff._get_associated_artifact_type(attribute_cursor, attr_value_as_string, isMultiUser)
- patrn = re.compile("[\n\0\a\b\r\f]")
- attr_value_as_string = re.sub(patrn, ' ', attr_value_as_string)
- if attr["source"] == "Keyword Search" and attr["display_name"] == "Keyword Preview":
- attr_value_as_string = ""
- database_log.write('')
- except IOError as e:
- print("IO error")
- raise TskDbDiffException("Unexpected IO error while writing to database log." + str(e))
-
- except sqlite3.Error as e:
- msg = "Attributes in artifact id (in output DB)# " + str(row["artifact_id"]) + " encountered an error: " + str(e) +" .\n"
- print("Attributes in artifact id (in output DB)# ", str(row["artifact_id"]), " encountered an error: ", str(e))
- print()
- looptry = False
- artifact_fail += 1
- database_log.write('Error Extracting Attributes')
- database_log.close()
- raise TskDbDiffException(msg)
- finally:
- attribute_cursor.close()
-
-
- # @@@@ This should be
- database_log.write(' \n')
- row = artifact_cursor.fetchone()
-
- if(artifact_fail > 0):
- msg ="There were " + str(artifact_count) + " artifacts and " + str(artifact_fail) + " threw an exception while loading.\n"
- except Exception as e:
- raise TskDbDiffException("Unexpected error while dumping blackboard database: " + str(e))
- finally:
- database_log.close()
- artifact_cursor.close()
- conn.close()
-
- # Now sort the file
- srtcmdlst = ["sort", unsorted_dump, "-o", bb_dump_file]
- subprocess.call(srtcmdlst)
-
- @staticmethod
- def _dump_output_db_nonbb(db_file, dump_file, isMultiUser, pgSettings):
- """Dumps a database to a text file.
-
- Does not dump the artifact and attributes.
-
- Args:
- db_file: a pathto_File, the database file to dump
- dump_file: a pathto_File, the location to dump the non-blackboard database items
- """
-
- conn, backup_db_file = db_connect(db_file, isMultiUser, pgSettings)
- guid_utils = TskGuidUtils.create(conn)
-
- if isMultiUser:
- table_cols = get_pg_table_columns(conn)
- schema = get_pg_schema(db_file, pgSettings.username, pgSettings.password,
- pgSettings.pgHost, pgSettings.pgPort)
- else:
- table_cols = get_sqlite_table_columns(conn)
- schema = get_sqlite_schema(conn)
-
- with codecs.open(dump_file, "wb", "utf_8") as output_file:
- output_file.write(schema + "\n")
- for table, cols in sorted(table_cols.items(), key=lambda pr: pr[0]):
- normalizer = TABLE_NORMALIZATIONS[table] if table in TABLE_NORMALIZATIONS else None
- write_normalized(guid_utils, output_file, conn, table, cols, normalizer)
-
- # Now sort the file
- srtcmdlst = ["sort", dump_file, "-o", dump_file]
- subprocess.call(srtcmdlst)
-
- conn.close()
- # cleanup the backup
- # if backup_db_file:
- # os.remove(backup_db_file)
- return guid_utils.obj_id_guids
-
- @staticmethod
- def dump_output_db(db_file, dump_file, bb_dump_file, isMultiUser, pgSettings):
- """Dumps the given database to text files for later comparison.
-
- Args:
- db_file: a pathto_File, the database file to dump
- dump_file: a pathto_File, the location to dump the non-blackboard database items
- bb_dump_file: a pathto_File, the location to dump the blackboard database items
- """
- id_obj_path_table = TskDbDiff._dump_output_db_nonbb(db_file, dump_file, isMultiUser, pgSettings)
- TskDbDiff._dump_output_db_bb(db_file, bb_dump_file, isMultiUser, pgSettings, id_obj_path_table)
-
- @staticmethod
- def _get_tmp_file(base, ext):
- time = datetime.datetime.now().time().strftime("%H%M%f")
- return os.path.join(os.environ['TMP'], base + time + ext)
-
-
-class TskDbDiffException(Exception):
- pass
-
-class PGSettings(object):
- def __init__(self, pgHost=None, pgPort=5432, user=None, password=None):
- self.pgHost = pgHost
- self.pgPort = pgPort
- self.username = user
- self.password = password
-
- def get_pgHost(self):
- return self.pgHost
-
- def get_pgPort(self):
- return self.pgPort
-
- def get_username(self):
- return self.username
-
- def get_password(self):
- return self.password
-
-
-class TskGuidUtils:
- """
- This class provides guids for potentially volatile data.
- """
-
- @staticmethod
- def _get_guid_dict(db_conn, select_statement, delim="", normalizer: Union[Callable[[str], str], None] = None):
- """
- Retrieves a dictionary mapping the first item selected to a concatenation of the remaining values.
- Args:
- db_conn: The database connection.
- select_statement: The select statement.
- delim: The delimiter for how row data from index 1 to end shall be concatenated.
- normalizer: Means of normalizing the generated string or None.
-
- Returns: A dictionary mapping the key (the first item in the select statement) to a concatenation of the remaining values.
-
- """
- cursor = db_conn.cursor()
- cursor.execute(select_statement)
- ret_dict = {}
- for row in cursor:
- # concatenate value rows with delimiter filtering out any null values.
- value_str = delim.join([str(col) for col in filter(lambda col: col is not None, row[1:])])
- if normalizer:
- value_str = normalizer(value_str)
- ret_dict[row[0]] = value_str
-
- return ret_dict
-
- @staticmethod
- def create(db_conn):
- """
- Creates an instance of this class by querying for relevant guid data.
- Args:
- db_conn: The database connection.
-
- Returns: The instance of this class.
-
- """
- guid_files = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, parent_path, name FROM tsk_files",
- normalizer=normalize_file_path)
- guid_vs_parts = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, addr, start FROM tsk_vs_parts", "_")
- guid_vs_info = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, vs_type, img_offset FROM tsk_vs_info", "_")
- guid_fs_info = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, img_offset, fs_type FROM tsk_fs_info", "_")
- guid_image_names = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, name FROM tsk_image_names "
- "WHERE sequence=0",
- normalizer=get_filename)
- guid_os_accounts = TskGuidUtils._get_guid_dict(db_conn, "SELECT os_account_obj_id, addr FROM tsk_os_accounts")
- guid_reports = TskGuidUtils._get_guid_dict(db_conn, "SELECT obj_id, path FROM reports",
- normalizer=normalize_file_path)
-
- objid_artifacts = TskGuidUtils._get_guid_dict(db_conn,
- "SELECT blackboard_artifacts.artifact_obj_id, "
- "blackboard_artifact_types.type_name "
- "FROM blackboard_artifacts "
- "INNER JOIN blackboard_artifact_types "
- "ON blackboard_artifact_types.artifact_type_id = "
- "blackboard_artifacts.artifact_type_id")
-
- artifact_objid_artifacts = TskGuidUtils._get_guid_dict(db_conn,
- "SELECT blackboard_artifacts.artifact_id, "
- "blackboard_artifact_types.type_name "
- "FROM blackboard_artifacts "
- "INNER JOIN blackboard_artifact_types "
- "ON blackboard_artifact_types.artifact_type_id = "
- "blackboard_artifacts.artifact_type_id")
-
- cursor = db_conn.cursor()
- cursor.execute("SELECT obj_id, par_obj_id FROM tsk_objects")
- par_obj_objects = dict([(row[0], row[1]) for row in cursor])
-
- guid_artifacts = {}
- for k, v in objid_artifacts.items():
- if k in par_obj_objects:
- par_obj_id = par_obj_objects[k]
-
- # check for artifact parent in files, images, reports
- path = ''
- for artifact_parent_dict in [guid_files, guid_image_names, guid_reports]:
- if par_obj_id in artifact_parent_dict:
- path = artifact_parent_dict[par_obj_id]
- break
-
- guid_artifacts[k] = "/".join([path, v])
-
- return TskGuidUtils(
- # aggregate all the object id dictionaries together
- obj_id_guids={**guid_files, **guid_reports, **guid_os_accounts, **guid_vs_parts, **guid_vs_info,
- **guid_fs_info, **guid_fs_info, **guid_image_names, **guid_artifacts},
- artifact_types=artifact_objid_artifacts)
-
- artifact_types: Dict[int, str]
- obj_id_guids: Dict[int, any]
-
- def __init__(self, obj_id_guids: Dict[int, any], artifact_types: Dict[int, str]):
- """
- Main constructor.
- Args:
- obj_id_guids: A dictionary mapping object ids to their guids.
- artifact_types: A dictionary mapping artifact ids to their types.
- """
- self.artifact_types = artifact_types
- self.obj_id_guids = obj_id_guids
-
- def get_guid_for_objid(self, obj_id, omitted_value: Union[str, None] = 'Object ID Omitted'):
- """
- Returns the guid for the specified object id or returns omitted value if the object id is not found.
- Args:
- obj_id: The object id.
- omitted_value: The value if no object id mapping is found.
-
- Returns: The relevant guid or the omitted_value.
-
- """
- return self.obj_id_guids[obj_id] if obj_id in self.obj_id_guids else omitted_value
-
- def get_guid_for_file_objid(self, obj_id, omitted_value: Union[str, None] = 'Object ID Omitted'):
- # this method is just an alias for get_guid_for_objid
- return self.get_guid_for_objid(obj_id, omitted_value)
-
- def get_guid_for_accountid(self, account_id, omitted_value: Union[str, None] = 'Account ID Omitted'):
- # this method is just an alias for get_guid_for_objid
- return self.get_guid_for_objid(account_id, omitted_value)
-
- def get_guid_for_artifactid(self, artifact_id, omitted_value: Union[str, None] = 'Artifact ID Omitted'):
- """
- Returns the guid for the specified artifact id or returns omitted value if the artifact id is not found.
- Args:
- artifact_id: The artifact id.
- omitted_value: The value if no object id mapping is found.
-
- Returns: The relevant guid or the omitted_value.
- """
- return self.artifact_types[artifact_id] if artifact_id in self.artifact_types else omitted_value
-
-
-class NormalizeRow:
- """
- Given a dictionary representing a row (i.e. column name mapped to value), returns a normalized representation of
- that row such that the values should be less volatile from run to run.
- """
- row_masker: Callable[[TskGuidUtils, Dict[str, any]], Dict[str, any]]
-
- def __init__(self, row_masker: Callable[[TskGuidUtils, Dict[str, any]], Union[Dict[str, any], None]]):
- """
- Main constructor.
- Args:
- row_masker: The function to be called to mask the specified row.
- """
- self.row_masker = row_masker
-
- def normalize(self, guid_util: TskGuidUtils, row: Dict[str, any]) -> Union[Dict[str, any], None]:
- """
- Normalizes a row such that the values should be less volatile from run to run.
- Args:
- guid_util: The TskGuidUtils instance providing guids for volatile ids.
- row: The row values mapping column name to value.
-
- Returns: The normalized row or None if the row should be ignored.
-
- """
- return self.row_masker(guid_util, row)
-
-
-class NormalizeColumns(NormalizeRow):
- """
- Utility for normalizing specific column values of a row so they are not volatile values that will change from run
- to run.
- """
-
- @classmethod
- def _normalize_col_vals(cls,
- col_mask: Dict[str, Union[any, Callable[[TskGuidUtils, any], any]]],
- guid_util: TskGuidUtils,
- row: Dict[str, any]):
- """
- Normalizes column values for each column rule provided.
- Args:
- col_mask: A dictionary mapping columns to either the replacement value or a function to retrieve the
- replacement value given the TskGuidUtils instance and original value as arguments.
- guid_util: The TskGuidUtil used to provide guids for volatile values.
- row: The dictionary representing the row mapping column names to values.
-
- Returns: The new row representation.
-
- """
- row_copy = row.copy()
- for key, val in col_mask.items():
- # only replace values if present in row
- if key in row_copy:
- # if a column replacing function, call with original value
- if isinstance(val, Callable):
- row_copy[key] = val(guid_util, row[key])
- # otherwise, just replace with mask value
- else:
- row_copy[key] = val
-
- return row_copy
-
- def __init__(self, col_mask: Dict[str, Union[any, Callable[[any], any]]]):
- super().__init__(lambda guid_util, row: NormalizeColumns._normalize_col_vals(col_mask, guid_util, row))
-
-
-def get_path_segs(path: Union[str, None]) -> Union[List[str], None]:
- """
- Breaks a path string into its folders and filenames.
- Args:
- path: The path string or None.
-
- Returns: The path segments or None.
-
- """
- if path:
- # split on backslash or forward slash
- return list(filter(lambda x: len(x.strip()) > 0, [s for s in re.split(r"[\\/]", path)]))
- else:
- return None
-
-
-def get_filename(path: Union[str, None]) -> Union[str, None]:
- """
- Returns the last segment of a file path.
- Args:
- path: The path.
-
- Returns: The last segment of the path
-
- """
- path_segs = get_path_segs(path)
- if path_segs is not None and len(path_segs) > 0:
- return path_segs[-1]
- else:
- return None
-
-
-def index_of(lst, search_item) -> int:
- """
- Returns the index of the item in the list or -1.
- Args:
- lst: The list.
- search_item: The item to search for.
-
- Returns: The index in the list of the item or -1.
-
- """
- for idx, item in enumerate(lst):
- if item == search_item:
- return idx
-
- return -1
-
-
-def get_sql_insert_value(val) -> str:
- """
- Returns the value that would appear in a sql insert statement (i.e. string becomes 'string', None becomes NULL)
- Args:
- val: The original value.
-
- Returns: The sql insert equivalent value.
-
- """
- if val is None:
- return "NULL"
-
- if isinstance(val, str):
- escaped_val = val.replace('\n', '\\n').replace("'", "''")
- return f"'{escaped_val}'"
-
- return str(val)
-
-
-def get_sqlite_table_columns(conn) -> Dict[str, List[str]]:
- """
- Retrieves a dictionary mapping table names to a list of all the columns for that table
- where the columns are in ordinal value.
- Args:
- conn: The database connection.
-
- Returns: A dictionary of the form { table_name: [col_name1, col_name2...col_nameN] }
-
- """
- cur = conn.cursor()
- cur.execute("SELECT name FROM sqlite_master tables WHERE tables.type='table'")
- tables = list([table[0] for table in cur.fetchall()])
- cur.close()
-
- to_ret = {}
- for table in tables:
- cur = conn.cursor()
- cur.execute('SELECT name FROM pragma_table_info(?) ORDER BY cid', [table])
- to_ret[table] = list([col[0] for col in cur.fetchall()])
-
- return to_ret
-
-
-def get_pg_table_columns(conn) -> Dict[str, List[str]]:
- """
- Returns a dictionary mapping table names to the list of their columns in ordinal order.
- Args:
- conn: The pg database connection.
-
- Returns: The dictionary of tables mapped to a list of their ordinal-orderd column names.
- """
- cursor = conn.cursor()
- cursor.execute("""
- SELECT cols.table_name, cols.column_name
- FROM information_schema.columns cols
- WHERE cols.column_name IS NOT NULL
- AND cols.table_name IS NOT NULL
- AND cols.table_name IN (
- SELECT tables.tablename FROM pg_catalog.pg_tables tables
- WHERE LOWER(schemaname) = 'public'
- )
- ORDER by cols.table_name, cols.ordinal_position;
- """)
- mapping = {}
- for row in cursor:
- mapping.setdefault(row[0], []).append(row[1])
-
- cursor.close()
- return mapping
-
-
-def sanitize_schema(original: str) -> str:
- """
- Sanitizes sql script representing table/index creations.
- Args:
- original: The original sql schema creation script.
-
- Returns: The sanitized schema.
- """
- sanitized_lines = []
- dump_line = ''
- for line in original.splitlines():
- line = line.strip('\r\n ')
- lower_line = line.lower()
- # It's comment or alter statement or catalog entry or set idle entry or empty line
- if (not line or
- line.startswith('--') or
- lower_line.startswith('set') or
- " set default nextval" in lower_line or
- " owner to " in lower_line or
- " owned by " in lower_line or
- "pg_catalog" in lower_line or
- "idle_in_transaction_session_timeout" in lower_line):
- continue
-
- # if there is no white space or parenthesis delimiter, add a space
- if re.match(r'^.+?[^\s()]$', dump_line) and re.match(r'^[^\s()]', line):
- dump_line += ' '
-
- # append the line to the outputted line
- dump_line += line
-
- # if line ends with ';' then this will be one statement in diff
- if line.endswith(';'):
- sanitized_lines.append(dump_line)
- dump_line = ''
-
- if len(dump_line.strip()) > 0:
- sanitized_lines.append(dump_line)
-
- return "\n".join(sanitized_lines)
-
-
-def get_pg_schema(dbname: str, pg_username: str, pg_pword: str, pg_host: str, pg_port: Union[str, int]):
- """
- Gets the schema to be added to the dump text from the postgres database.
- Args:
- dbname: The name of the database.
- pg_username: The postgres user name.
- pg_pword: The postgres password.
- pg_host: The postgres host.
- pg_port: The postgres port.
-
- Returns: The normalized schema.
-
- """
- os.environ['PGPASSWORD'] = pg_pword
- pg_dump = ["pg_dump", "-U", pg_username, "-h", pg_host, "-p", str(pg_port),
- "--schema-only", "-d", dbname, "-t", "public.*"]
- output = subprocess.check_output(pg_dump)
- output_str = output.decode('UTF-8')
- return sanitize_schema(output_str)
-
-
-def get_sqlite_schema(db_conn):
- """
- Gets the schema to be added to the dump text from the sqlite database.
- Args:
- db_conn: The database connection.
-
- Returns: The normalized schema.
-
- """
- cursor = db_conn.cursor()
- query = "SELECT sql FROM sqlite_master " \
- "WHERE type IN ('table', 'index') AND sql IS NOT NULL " \
- "ORDER BY type DESC, tbl_name ASC"
-
- cursor.execute(query)
- schema = '\n'.join([str(row[0]) + ';' for row in cursor])
- return sanitize_schema(schema)
-
-
-def _mask_event_desc(desc: str) -> str:
- """
- Masks dynamic event descriptions of the form ":" so the artifact id is no longer
- present.
- Args:
- desc: The original description.
-
- Returns: The normalized description.
-
- """
-
- # Takes a string like "Shell Bags: 30840" and replaces with "ShellBags:"
- match = re.search(r"^\s*(.+?)\s*:\s*\d+\s*$", desc.strip())
- if match:
- return f"{match.group(1)}:"
-
- return desc
-
-
-def normalize_tsk_event_descriptions(guid_util: TskGuidUtils, row: Dict[str, any]) -> Dict[str, any]:
- """
- Normalizes event description rows masking possibly changing column values.
- Args:
- guid_util: Provides guids for ids that may change from run to run.
- row: A dictionary mapping column names to values.
-
- Returns: The normalized event description row.
- """
- row_copy = row.copy()
- # replace object ids with information that is deterministic
- row_copy['event_description_id'] = MASKED_ID
- row_copy['content_obj_id'] = guid_util.get_guid_for_file_objid(row['content_obj_id'])
- row_copy['artifact_id'] = guid_util.get_guid_for_artifactid(row['artifact_id']) \
- if row['artifact_id'] is not None else None
- row_copy['data_source_obj_id'] = guid_util.get_guid_for_file_objid(row['data_source_obj_id'])
-
- if row['full_description'] == row['med_description'] == row['short_description']:
- row_copy['full_description'] = _mask_event_desc(row['full_description'])
- row_copy['med_description'] = _mask_event_desc(row['med_description'])
- row_copy['short_description'] = _mask_event_desc(row['short_description'])
-
- return row_copy
-
-
-def normalize_ingest_jobs(guid_util: TskGuidUtils, row: Dict[str, any]) -> Dict[str, any]:
- """
- Normalizes ingest jobs table rows.
- Args:
- guid_util: Provides guids for ids that may change from run to run.
- row: A dictionary mapping column names to values.
-
- Returns: The normalized ingest job row.
-
- """
- row_copy = row.copy()
- row_copy['host_name'] = "{host_name}"
-
- start_time = row['start_date_time']
- end_time = row['end_date_time']
- if start_time <= end_time:
- row_copy['start_date_time'] = MASKED_TIME
- row_copy['end_date_time'] = MASKED_TIME
-
- return row_copy
-
-
-def normalize_unalloc_files(path_str: Union[str, None]) -> Union[str, None]:
- """
- Normalizes a path string removing timestamps from unalloc files.
- Args:
- path_str: The original path string.
-
- Returns: The path string where timestamps are removed from unalloc strings.
-
- """
- # takes a file name like "Unalloc_30580_7466496_2980941312" and removes the object id to become
- # "Unalloc_7466496_2980941312"
- return None if path_str is None else re.sub('Unalloc_[0-9]+_', 'Unalloc_', path_str)
-
-
-def normalize_regripper_files(path_str: Union[str, None]) -> Union[str, None]:
- """
- Normalizes a path string removing timestamps from regripper files.
- Args:
- path_str: The original path string.
-
- Returns: The path string where timestamps are removed from regripper paths.
-
- """
- # takes a file name like "regripper-12345-full" and removes the id to become "regripper-full"
- return None if path_str is None else re.sub(r'regripper-[0-9]+-full', 'regripper-full', path_str)
-
-
-def normalize_file_path(path_str: Union[str, None]) -> Union[str, None]:
- """
- Normalizes file paths removing or replacing pieces that will change from run to run (i.e. object id)
- Args:
- path_str: The original path string.
-
- Returns: The normalized path string
- """
- return normalize_unalloc_files(normalize_regripper_files(path_str))
-
-
-def normalize_tsk_files(guid_util: TskGuidUtils, row: Dict[str, any]) -> Dict[str, any]:
- """
- Normalizes files table rows.
- Args:
- guid_util: Provides guids for ids that may change from run to run.
- row: A dictionary mapping column names to values.
-
- Returns: The normalized files table row.
-
- """
- # Ignore TIFF size and hash if extracted from PDFs.
- # See JIRA-6951 for more details.
- row_copy = row.copy()
- if row['extension'] is not None and row['extension'].strip().lower() == 'tif' and \
- row['parent_path'] is not None and row['parent_path'].strip().lower().endswith('.pdf/'):
- row_copy['size'] = "SIZE_IGNORED"
- row_copy['md5'] = "MD5_IGNORED"
- row_copy['sha256'] = "SHA256_IGNORED"
-
- row_copy['data_source_obj_id'] = guid_util.get_guid_for_file_objid(row['data_source_obj_id'])
- row_copy['obj_id'] = MASKED_OBJ_ID
- row_copy['os_account_obj_id'] = 'MASKED_OS_ACCOUNT_OBJ_ID'
- row_copy['parent_path'] = normalize_file_path(row['parent_path'])
- row_copy['name'] = normalize_file_path(row['name'])
- return row_copy
-
-
-def normalize_tsk_files_path(guid_util: TskGuidUtils, row: Dict[str, any]) -> Dict[str, any]:
- """
- Normalizes file path table rows.
- Args:
- guid_util: Provides guids for ids that may change from run to run.
- row: A dictionary mapping column names to values.
-
- Returns: The normalized file path table row.
- """
- row_copy = row.copy()
- path = row['path']
- if path is not None:
- path_parts = get_path_segs(path)
- module_output_idx = index_of(path_parts, 'ModuleOutput')
- if module_output_idx >= 0:
- # remove everything up to and including ModuleOutput if ModuleOutput present
- path_parts = path_parts[module_output_idx:]
- if len(path_parts) > 2 and path_parts[1] == 'EFE':
- # for embedded file extractor, the next folder is the object id and should be omitted
- del path_parts[2]
-
- row_copy['path'] = os.path.join(*path_parts) if len(path_parts) > 0 else '/'
-
- row_copy['obj_id'] = guid_util.get_guid_for_file_objid(row['obj_id'])
- return row_copy
-
-
-def normalize_tsk_objects_path(guid_util: TskGuidUtils, objid: int,
- no_path_placeholder: Union[str, None]) -> Union[str, None]:
- """
- Returns a normalized path to be used in a tsk_objects table row.
- Args:
- guid_util: The utility for fetching guids.
- objid: The object id of the item.
- no_path_placeholder: text to return if no path value found.
-
- Returns: The 'no_path_placeholder' text if no path. Otherwise, the normalized path.
-
- """
- path = guid_util.get_guid_for_objid(objid, omitted_value=None)
-
- if path is None:
- return no_path_placeholder
- else:
- # remove host name (for multi-user) and dates/times from path for reports
- path_parts = get_path_segs(path)
- module_output_idx = index_of(path_parts, 'ModuleOutput')
- if module_output_idx >= 0:
- # remove everything up to and including ModuleOutput if ModuleOutput present
- path_parts = path_parts[module_output_idx:]
-
- if "BulkExtractor" in path_parts or "Smirk" in path_parts:
- # chop off the last folder (which contains a date/time)
- path_parts = path_parts[:-1]
-
- if path_parts and len(path_parts) >= 2:
- for idx in range(0, len(path_parts) - 1):
- if path_parts[idx].lower() == "reports" and \
- path_parts[idx + 1].lower().startswith("autopsytestcase html report"):
- path_parts = ["Reports", "AutopsyTestCase HTML Report"]
- break
-
- path = os.path.join(*path_parts) if len(path_parts) > 0 else '/'
-
- return path
-
-
-def normalize_tsk_objects(guid_util: TskGuidUtils, row: Dict[str, any]) -> Dict[str, any]:
- """
- Normalizes object table rows.
- Args:
- guid_util: Provides guids for ids that may change from run to run.
- row: A dictionary mapping column names to values.
-
- Returns: The normalized object table row.
- """
- row_copy = row.copy()
- row_copy['obj_id'] = None if row['obj_id'] is None else \
- normalize_tsk_objects_path(guid_util, row['obj_id'], MASKED_OBJ_ID)
-
- row_copy['par_obj_id'] = None if row['par_obj_id'] is None else \
- normalize_tsk_objects_path(guid_util, row['par_obj_id'], 'MASKED_PARENT_OBJ_ID')
-
- return row_copy
-
-
-MASKED_TIME = "MASKED_TIME"
-MASKED_OBJ_ID = "MASKED_OBJ_ID"
-MASKED_ID = "MASKED_ID"
-
-IGNORE_TABLE = "IGNORE_TABLE"
-
-TableNormalization = Union[IGNORE_TABLE, NormalizeRow]
-
-"""
-This dictionary maps tables where data should be specially handled to how they should be handled.
-"""
-TABLE_NORMALIZATIONS: Dict[str, TableNormalization] = {
- "blackboard_artifacts": IGNORE_TABLE,
- "blackboard_attributes": IGNORE_TABLE,
- "data_source_info": NormalizeColumns({
- "device_id": "{device id}",
- "added_date_time": "{dateTime}"
- }),
- "image_gallery_groups": NormalizeColumns({
- "group_id": MASKED_ID,
- "data_source_obj_id": lambda guid_util, col: guid_util.get_guid_for_objid(col, omitted_value=None),
- }),
- "image_gallery_groups_seen": IGNORE_TABLE,
- "ingest_jobs": NormalizeRow(normalize_ingest_jobs),
- "reports": NormalizeColumns({
- "obj_id": MASKED_OBJ_ID,
- "path": "AutopsyTestCase",
- "crtime": MASKED_TIME
- }),
- "tsk_aggregate_score": NormalizeColumns({
- "obj_id": lambda guid_util, col: guid_util.get_guid_for_objid(col, omitted_value="Object ID Omitted"),
- "data_source_obj_id": lambda guid_util, col: guid_util.get_guid_for_objid(col, omitted_value="Data Source Object ID Omitted"),
- }),
- "tsk_analysis_results": NormalizeColumns({
- "artifact_obj_id":
- lambda guid_util, col: guid_util.get_guid_for_objid(col, omitted_value="Artifact Object ID Omitted"),
- }),
- "tsk_data_artifacts": NormalizeColumns({
- "artifact_obj_id":
- lambda guid_util, col: guid_util.get_guid_for_file_objid(col, omitted_value="Artifact Object ID Omitted"),
- "os_account_obj_id":
- lambda guid_util, col: guid_util.get_guid_for_file_objid(col, omitted_value="Account Object ID Omitted"),
- }),
- "tsk_event_descriptions": NormalizeRow(normalize_tsk_event_descriptions),
- "tsk_events": NormalizeColumns({
- "event_id": "MASKED_EVENT_ID",
- "event_description_id": 'ID OMITTED'
- }),
- "tsk_examiners": NormalizeColumns({
- "login_name": "{examiner_name}"
- }),
- "tsk_files": NormalizeRow(normalize_tsk_files),
- "tsk_file_layout": NormalizeColumns({
- "obj_id": lambda guid_util, col: guid_util.get_guid_for_file_objid(col)
- }),
- "tsk_files_path": NormalizeRow(normalize_tsk_files_path),
- "tsk_image_names": NormalizeColumns({
- "name": lambda guid_util, col: get_filename(col)
- }),
- "tsk_objects": NormalizeRow(normalize_tsk_objects),
- "tsk_os_account_attributes": NormalizeColumns({
- "id": MASKED_ID,
- "os_account_obj_id": lambda guid_util, col: guid_util.get_guid_for_accountid(col),
- "source_obj_id": lambda guid_util, col: guid_util.get_guid_for_objid(col)
- }),
- "tsk_os_account_instances": NormalizeColumns({
- "id": MASKED_ID,
- "os_account_obj_id": lambda guid_util, col: guid_util.get_guid_for_accountid(col)
- }),
- "tsk_os_accounts": NormalizeColumns({
- "os_account_obj_id": MASKED_OBJ_ID
- }),
- "tsk_vs_parts": NormalizeColumns({
- "obj_id": MASKED_OBJ_ID
- })
-}
-
-
-def write_normalized(guid_utils: TskGuidUtils, output_file, db_conn, table: str, column_names: List[str],
- normalizer: Union[TableNormalization, None] = None):
- """
- Outputs rows of a file as their normalized values (where values should not change from run to run).
- Args:
- guid_utils: Provides guids to replace values that would potentially change from run to run.
- output_file: The file where the normalized dump will be written.
- db_conn: The database connection.
- table: The name of the table.
- column_names: The name of the columns in the table in ordinal order.
- normalizer: The normalizer (if any) to use so that data is properly normalized.
- """
- if normalizer == IGNORE_TABLE:
- return
-
- cursor = db_conn.cursor()
-
- joined_columns = ",".join([col for col in column_names])
- cursor.execute(f"SELECT {joined_columns} FROM {table}")
- for row in cursor:
- if len(row) != len(column_names):
- print(
- f"ERROR: in {table}, number of columns retrieved: {len(row)} but columns are"
- f" {len(column_names)} with {str(column_names)}")
- continue
-
- row_dict = {}
- for col_idx in range(0, len(column_names)):
- row_dict[column_names[col_idx]] = row[col_idx]
-
- if normalizer and isinstance(normalizer, NormalizeRow):
- row_masker: NormalizeRow = normalizer
- row_dict = row_masker.normalize(guid_utils, row_dict)
-
- if row_dict is not None:
- # show row as json-like value
- entries = []
- for column in column_names:
- dict_value = row_dict[column] if column in row_dict and row_dict[column] is not None else None
- value = get_sql_insert_value(dict_value)
- if value is not None:
- entries.append((column, value))
- insert_values = ", ".join([f"{pr[0]}: {pr[1]}" for pr in entries])
- insert_statement = f"{table}: {{{insert_values}}}\n"
- output_file.write(insert_statement)
-
-
-def db_connect(db_file, is_multi_user, pg_settings=None):
- if is_multi_user: # use PostgreSQL
- try:
- return psycopg2.connect("dbname=" + db_file + " user=" + pg_settings.username + " host=" +
- pg_settings.pgHost + " password=" + pg_settings.password), None
- except:
- print("Failed to connect to the database: " + db_file)
- else: # Sqlite
- # Make a copy that we can modify
- backup_db_file = TskDbDiff._get_tmp_file("tsk_backup_db", ".db")
- shutil.copy(db_file, backup_db_file)
- # We sometimes get situations with messed up permissions
- os.chmod(backup_db_file, 0o777)
- return sqlite3.connect(backup_db_file), backup_db_file
-
-
-def main():
- try:
- sys.argv.pop(0)
- output_db = sys.argv.pop(0)
- gold_db = sys.argv.pop(0)
- except:
- print("usage: tskdbdiff [OUTPUT DB PATH] [GOLD DB PATH]")
- sys.exit(1)
-
- db_diff = TskDbDiff(output_db, gold_db, output_dir=".")
- dump_passed, bb_dump_passed = db_diff.run_diff()
-
- if dump_passed and bb_dump_passed:
- print("Database comparison passed.")
- if not dump_passed:
- print("Non blackboard database comparison failed.")
- if not bb_dump_passed:
- print("Blackboard database comparison failed.")
-
- sys.exit(0)
-
-
-if __name__ == "__main__":
- if sys.hexversion < 0x03000000:
- print("Python 3 required")
- sys.exit(1)
-
- main()