#!/usr/bin/python # -*- coding: utf_8 -*- # Autopsy Forensic Browser # # Copyright 2013 Basis Technology Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from tskdbdiff import TskDbDiff, TskDbDiffException import codecs import datetime import logging import os import re import shutil import socket import sqlite3 import subprocess import sys from sys import platform as _platform import time import traceback import xml from time import localtime, strftime from xml.dom.minidom import parse, parseString import smtplib import re import zipfile import zlib from regression_utils import * import shutil import ntpath # # Please read me... # # This is the regression testing Python script. # It uses an ant command to run build.xml for RegressionTest.java # # The code is cleanly sectioned and commented. # Please follow the current formatting. # It is a long and potentially confusing script. # # Variable, function, and class names are written in Python conventions: # this_is_a_variable this_is_a_function() ThisIsAClass # # # Data Definitions: # # pathto_X: A path to type X. # ConfigFile: An XML file formatted according to the template in myconfig.xml # ParsedConfig: A dom object that represents a ConfigFile # SQLCursor: A cursor recieved from a connection to an SQL database # Nat: A Natural Number # Image: An image # # Enumeration of database types used for the simplification of generating database paths DBType = enum('OUTPUT', 'GOLD', 'BACKUP') # Common filename of the output and gold databases (although they are in different directories DB_FILENAME = "autopsy.db" # Backup database filename BACKUP_DB_FILENAME = "autopsy_backup.db" # TODO: Double check this purpose statement # Folder name for gold standard database testing AUTOPSY_TEST_CASE = "AutopsyTestCase" # TODO: Double check this purpose statement # The filename of the log to store error messages COMMON_LOG = "AutopsyErrors.txt" Day = 0 def usage(): print ("-f PATH single file") print ("-r rebuild") print ("-l PATH path to config file") print ("-u Ignore unallocated space") print ("-k Do not delete SOLR index") print ("-v verbose mode") print ("-e ARG Enable exception mode with given string") print ("-h help") print ("-fr Do not download new images each time") #----------------------# # Main # #----------------------# def main(): """Parse the command-line arguments, create the configuration, and run the tests.""" args = Args() parse_result = args.parse() # The arguments were given wrong: if not parse_result: return test_config = TestConfiguration(args) # Download images unless they asked not to if(not args.fr): antin = ["ant"] antin.append("-f") antin.append(os.path.join("..","..","build.xml")) antin.append("test-download-imgs") if SYS is OS.CYGWIN: subprocess.call(antin) elif SYS is OS.WIN: theproc = subprocess.Popen(antin, shell = True, stdout=subprocess.PIPE) theproc.communicate() # Otherwise test away! TestRunner.run_tests(test_config) class TestRunner(object): """A collection of functions to run the regression tests.""" def run_tests(test_config): """Run the tests specified by the main TestConfiguration. Executes the AutopsyIngest for each image and dispatches the results based on the mode (rebuild or testing) """ # get list of test images to process test_data_list = [ TestData(image, test_config) for image in test_config.images ] Reports.html_add_images(test_config.html_log, test_config.images) # Test each image logres =[] for test_data in test_data_list: Errors.clear_print_logs() if not (test_config.args.rebuild or os.path.exists(test_data.gold_archive)): msg = "Gold standard doesn't exist, skipping image:" Errors.print_error(msg) Errors.print_error(test_data.gold_archive) continue # Analyze the given image TestRunner._run_autopsy_ingest(test_data) # Either copy the data or compare the data if test_config.args.rebuild: TestRunner.rebuild(test_data) else: logres.append(TestRunner._compare_results(test_data)) test_data.printout = Errors.printout test_data.printerror = Errors.printerror # give solr process time to die. time.sleep(10) Reports.write_html_foot(test_config.html_log) # This code was causing errors with paths, so its disabled #if test_config.jenkins: # copyErrorFiles(Errors.errors_out, test_config) if all([ test_data.overall_passed for test_data in test_data_list ]): pass else: html = open(test_config.html_log) Errors.add_errors_out(html.name) html.close() sys.exit(1) def _run_autopsy_ingest(test_data): """Run Autopsy ingest for the image in the given TestData. Also generates the necessary logs for rebuilding or diff. Args: test_data: the TestData to run the ingest on. """ if image_type(test_data.image_file) == IMGTYPE.UNKNOWN: Errors.print_error("Error: Image type is unrecognized:") Errors.print_error(test_data.image_file + "\n") return logging.debug("--------------------") logging.debug(test_data.image_name) logging.debug("--------------------") TestRunner._run_ant(test_data) time.sleep(2) # Give everything a second to process try: # Dump the database before we diff or use it for rebuild TskDbDiff.dump_output_db(test_data.get_db_path(DBType.OUTPUT), test_data.get_db_dump_path(DBType.OUTPUT), test_data.get_sorted_data_path(DBType.OUTPUT)) except sqlite3.OperationalError as e: print("Ingest did not run properly.", "Make sure no other instances of Autopsy are open and try again.") sys.exit(1) # merges logs into a single log for later diff / rebuild copy_logs(test_data) Logs.generate_log_data(test_data) TestRunner._handle_solr(test_data) TestRunner._handle_exception(test_data) #TODO: figure out return type of _compare_results(logres) def _compare_results(test_data): """Compare the results of the output to the gold standard. Args: test_data: the TestData Returns: logres? """ # Unzip the gold file TestRunner._extract_gold(test_data) # Look for core exceptions # @@@ Should be moved to TestResultsDiffer, but it didn't know about logres -- need to look into that logres = Logs.search_common_log("TskCoreException", test_data) # Compare output with gold and display results TestResultsDiffer.run_diff(test_data) print("Html report passed: ", test_data.html_report_passed) print("Errors diff passed: ", test_data.errors_diff_passed) print("DB diff passed: ", test_data.db_diff_passed) # run time test only for the specific jenkins test if test_data.main_config.timing: if test_data.main_config.timing: old_time_path = test_data.get_run_time_path() passed = TestResultsDiffer._run_time_diff(test_data, old_time_path) test_data.run_time_passed = passed print("Run time test passed: ", test_data.run_time_passed) test_data.overall_passed = (test_data.html_report_passed and test_data.errors_diff_passed and test_data.db_diff_passed and test_data.run_time_passed) # otherwise, do the usual else: test_data.overall_passed = (test_data.html_report_passed and test_data.errors_diff_passed and test_data.db_diff_passed) Reports.generate_reports(test_data) if(not test_data.overall_passed): diffFiles = [ f for f in os.listdir(test_data.output_path) if os.path.isfile(os.path.join(test_data.output_path,f)) ] for f in diffFiles: if f.endswith("Diff.txt"): Errors.add_errors_out(os.path.join(test_data.output_path, f)) Errors.add_errors_out(test_data.common_log_path) return logres def _extract_gold(test_data): """Extract gold archive file to output/gold/tmp/ Args: test_data: the TestData """ extrctr = zipfile.ZipFile(test_data.gold_archive, 'r', compression=zipfile.ZIP_DEFLATED) extrctr.extractall(test_data.main_config.gold) extrctr.close time.sleep(2) def _handle_solr(test_data): """Clean up SOLR index if in keep mode (-k). Args: test_data: the TestData """ if not test_data.main_config.args.keep: if clear_dir(test_data.solr_index): print_report([], "DELETE SOLR INDEX", "Solr index deleted.") else: print_report([], "KEEP SOLR INDEX", "Solr index has been kept.") def _handle_exception(test_data): """If running in exception mode, print exceptions to log. Args: test_data: the TestData """ if test_data.main_config.args.exception: exceptions = search_logs(test_data.main_config.args.exception_string, test_data) okay = ("No warnings or exceptions found containing text '" + test_data.main_config.args.exception_string + "'.") print_report(exceptions, "EXCEPTION", okay) def rebuild(test_data): """Rebuild the gold standard with the given TestData. Copies the test-generated database and html report files into the gold directory. """ test_config = test_data.main_config # Errors to print errors = [] # Delete the current gold standards gold_dir = test_config.img_gold clear_dir(test_config.img_gold) tmpdir = make_path(gold_dir, test_data.image_name) dbinpth = test_data.get_db_path(DBType.OUTPUT) dboutpth = make_path(tmpdir, DB_FILENAME) dataoutpth = make_path(tmpdir, test_data.image_name + "SortedData.txt") dbdumpinpth = test_data.get_db_dump_path(DBType.OUTPUT) dbdumpoutpth = make_path(tmpdir, test_data.image_name + "DBDump.txt") if not os.path.exists(test_config.img_gold): os.makedirs(test_config.img_gold) if not os.path.exists(tmpdir): os.makedirs(tmpdir) try: shutil.copy(dbinpth, dboutpth) if file_exists(test_data.get_sorted_data_path(DBType.OUTPUT)): shutil.copy(test_data.get_sorted_data_path(DBType.OUTPUT), dataoutpth) shutil.copy(dbdumpinpth, dbdumpoutpth) error_pth = make_path(tmpdir, test_data.image_name+"SortedErrors.txt") shutil.copy(test_data.sorted_log, error_pth) except IOError as e: Errors.print_error(str(e)) print(str(e)) print(traceback.format_exc()) # Rebuild the HTML report output_html_report_dir = test_data.get_html_report_path(DBType.OUTPUT) gold_html_report_dir = make_path(tmpdir, "Report") try: shutil.copytree(output_html_report_dir, gold_html_report_dir) except OSError as e: errors.append(e.error()) except Exception as e: errors.append("Error: Unknown fatal error when rebuilding the gold html report.") errors.append(str(e) + "\n") print(traceback.format_exc()) oldcwd = os.getcwd() zpdir = gold_dir os.chdir(zpdir) os.chdir("..") img_gold = "tmp" img_archive = make_path(test_data.image_name+"-archive.zip") comprssr = zipfile.ZipFile(img_archive, 'w',compression=zipfile.ZIP_DEFLATED) TestRunner.zipdir(img_gold, comprssr) comprssr.close() os.chdir(oldcwd) del_dir(test_config.img_gold) okay = "Sucessfully rebuilt all gold standards." print_report(errors, "REBUILDING", okay) def zipdir(path, zip): for root, dirs, files in os.walk(path): for file in files: zip.write(os.path.join(root, file)) def _run_ant(test_data): """Construct and run the ant build command for the given TestData. Tests Autopsy by calling RegressionTest.java via the ant build file. Args: test_data: the TestData """ test_config = test_data.main_config # Set up the directories if dir_exists(test_data.output_path): shutil.rmtree(test_data.output_path) os.makedirs(test_data.output_path) test_data.ant = ["ant"] test_data.ant.append("-v") test_data.ant.append("-f") # case.ant.append(case.build_path) test_data.ant.append(os.path.join("..","..","Testing","build.xml")) test_data.ant.append("regression-test") test_data.ant.append("-l") test_data.ant.append(test_data.antlog_dir) test_data.ant.append("-Dimg_path=" + test_data.image_file) test_data.ant.append("-Dknown_bad_path=" + test_config.known_bad_path) test_data.ant.append("-Dkeyword_path=" + test_config.keyword_path) test_data.ant.append("-Dnsrl_path=" + test_config.nsrl_path) test_data.ant.append("-Dgold_path=" + test_config.gold) test_data.ant.append("-Dout_path=" + make_local_path(test_data.output_path)) if test_config.jenkins: test_data.ant.append("-Ddiff_dir="+ test_config.diff_dir) test_data.ant.append("-Dignore_unalloc=" + "%s" % test_config.args.unallocated) test_data.ant.append("-Dtest.timeout=" + str(test_config.timeout)) Errors.print_out("Ingesting Image:\n" + test_data.image_file + "\n") Errors.print_out("CMD: " + " ".join(test_data.ant)) Errors.print_out("Starting test...\n") antoutpth = make_local_path(test_data.main_config.output_dir, "antRunOutput.txt") antout = open(antoutpth, "a") if SYS is OS.CYGWIN: subprocess.call(test_data.ant, stdout=subprocess.PIPE) elif SYS is OS.WIN: theproc = subprocess.Popen(test_data.ant, shell = True, stdout=subprocess.PIPE) theproc.communicate() antout.close() class TestData(object): """Container for the input and output of a single image. Represents data for the test of a single image, including path to the image, database paths, etc. Attributes: main_config: the global TestConfiguration ant: a listof_String, the ant command for this TestData image_file: a pathto_Image, the image for this TestData image: a String, the image file's name image_name: a String, the image file's name with a trailing (0) output_path: pathto_Dir, the output directory for this TestData autopsy_data_file: a pathto_File, the IMAGE_NAMEAutopsy_data.txt file warning_log: a pathto_File, the AutopsyLogs.txt file antlog_dir: a pathto_File, the antlog.txt file test_dbdump: a pathto_File, the database dump, IMAGENAMEDump.txt common_log_path: a pathto_File, the IMAGE_NAMECOMMON_LOG file sorted_log: a pathto_File, the IMAGENAMESortedErrors.txt file reports_dir: a pathto_Dir, the AutopsyTestCase/Reports folder gold_data_dir: a pathto_Dir, the gold standard directory gold_archive: a pathto_File, the gold standard archive logs_dir: a pathto_Dir, the location where autopsy logs are stored solr_index: a pathto_Dir, the locatino of the solr index html_report_passed: a boolean, did the HTML report diff pass? errors_diff_passed: a boolean, did the error diff pass? db_diff_passed: a boolean, did the db diff pass? run_time_passed: a boolean, did the run time test pass? overall_passed: a boolean, did the test pass? total_test_time: a String representation of the test duration start_date: a String representation of this TestData's start date end_date: a String representation of the TestData's end date total_ingest_time: a String representation of the total ingest time artifact_count: a Nat, the number of artifacts artifact_fail: a Nat, the number of artifact failures heap_space: a String representation of TODO service_times: a String representation of TODO autopsy_version: a String, the version of autopsy that was run ingest_messages: a Nat, the number of ingest messages indexed_files: a Nat, the number of files indexed during the ingest indexed_chunks: a Nat, the number of chunks indexed during the ingest printerror: a listof_String, the error messages printed during this TestData's test printout: a listof_String, the messages pritned during this TestData's test """ def __init__(self, image, main_config): """Init this TestData with it's image and the test configuration. Args: image: the Image to be tested. main_config: the global TestConfiguration. """ # Configuration Data self.main_config = main_config self.ant = [] self.image_file = str(image) # TODO: This 0 should be be refactored out, but it will require rebuilding and changing of outputs. self.image = get_image_name(self.image_file) self.image_name = self.image + "(0)" # Directory structure and files self.output_path = make_path(self.main_config.output_dir, self.image_name) self.autopsy_data_file = make_path(self.output_path, self.image_name + "Autopsy_data.txt") self.warning_log = make_local_path(self.output_path, "AutopsyLogs.txt") self.antlog_dir = make_local_path(self.output_path, "antlog.txt") self.test_dbdump = make_path(self.output_path, self.image_name + "DBDump.txt") self.common_log_path = make_local_path(self.output_path, self.image_name + COMMON_LOG) self.sorted_log = make_local_path(self.output_path, self.image_name + "SortedErrors.txt") self.reports_dir = make_path(self.output_path, AUTOPSY_TEST_CASE, "Reports") self.gold_data_dir = make_path(self.main_config.img_gold, self.image_name) self.gold_archive = make_path(self.main_config.gold, self.image_name + "-archive.zip") self.logs_dir = make_path(self.output_path, "logs") self.solr_index = make_path(self.output_path, AUTOPSY_TEST_CASE, "ModuleOutput", "KeywordSearch") # Results and Info self.html_report_passed = False self.errors_diff_passed = False self.db_diff_passed = False self.run_time_passed = False self.overall_passed = False # Ingest info self.total_test_time = "" self.start_date = "" self.end_date = "" self.total_ingest_time = "" self.artifact_count = 0 self.artifact_fail = 0 self.heap_space = "" self.service_times = "" self.autopsy_version = "" self.ingest_messages = 0 self.indexed_files = 0 self.indexed_chunks = 0 # Error tracking self.printerror = [] self.printout = [] def ant_to_string(self): string = "" for arg in self.ant: string += (arg + " ") return string def get_db_path(self, db_type): """Get the path to the database file that corresponds to the given DBType. Args: DBType: the DBType of the path to be generated. """ if(db_type == DBType.GOLD): db_path = make_path(self.gold_data_dir, DB_FILENAME) elif(db_type == DBType.OUTPUT): db_path = make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, DB_FILENAME) else: db_path = make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, BACKUP_DB_FILENAME) return db_path def get_html_report_path(self, html_type): """Get the path to the HTML Report folder that corresponds to the given DBType. Args: DBType: the DBType of the path to be generated. """ if(html_type == DBType.GOLD): return make_path(self.gold_data_dir, "Report") else: # Autopsy creates an HTML report folder in the form AutopsyTestCase DATE-TIME # It's impossible to get the exact time the folder was created, but the folder # we are looking for is the only one in the self.reports_dir folder html_path = "" for fs in os.listdir(self.reports_dir): html_path = make_path(self.reports_dir, fs) if os.path.isdir(html_path): break return make_path(html_path, os.listdir(html_path)[0]) def get_sorted_data_path(self, file_type): """Get the path to the SortedData file that corresponds to the given DBType. Args: file_type: the DBType of the path to be generated """ return self._get_path_to_file(file_type, "SortedData.txt") def get_sorted_errors_path(self, file_type): """Get the path to the SortedErrors file that correspodns to the given DBType. Args: file_type: the DBType of the path to be generated """ return self._get_path_to_file(file_type, "SortedErrors.txt") def get_db_dump_path(self, file_type): """Get the path to the DBDump file that corresponds to the given DBType. Args: file_type: the DBType of the path to be generated """ return self._get_path_to_file(file_type, "DBDump.txt") def get_run_time_path(self): """Get the path to the run time storage file." """ return os.path.join("..", "input") def _get_path_to_file(self, file_type, file_name): """Get the path to the specified file with the specified type. Args: file_type: the DBType of the path to be generated file_name: a String, the filename of the path to be generated """ full_filename = self.image_name + file_name if(file_type == DBType.GOLD): return make_path(self.gold_data_dir, full_filename) else: return make_path(self.output_path, full_filename) class TestConfiguration(object): """Container for test configuration data. The Master Test Configuration. Encapsulates consolidated high level input from config XML file and command-line arguments. Attributes: args: an Args, the command line arguments output_dir: a pathto_Dir, the output directory input_dir: a pathto_Dir, the input directory gold: a pathto_Dir, the gold directory img_gold: a pathto_Dir, the temp directory where gold images are unzipped to csv: a pathto_File, the local csv file global_csv: a pathto_File, the global csv file html_log: a pathto_File known_bad_path: keyword_path: nsrl_path: build_path: a pathto_File, the ant build file which runs the tests autopsy_version: ingest_messages: a Nat, number of ingest messages indexed_files: a Nat, the number of indexed files indexed_chunks: a Nat, the number of indexed chunks timer: images: a listof_Image, the images to be tested timeout: a Nat, the amount of time before killing the test ant: a listof_String, the ant command to run the tests jenkins: a boolean, is this test running through a Jenkins job? timing: are we doing a running time test? """ def __init__(self, args): """Inits TestConfiguration and loads a config file if available. Args: args: an Args, the command line arguments. """ self.args = args # Paths: self.output_dir = "" self.input_dir = make_local_path("..","input") self.gold = make_path("..", "output", "gold") self.img_gold = make_path(self.gold, 'tmp') # Logs: self.csv = "" self.global_csv = "" self.html_log = "" # Ant info: self.known_bad_path = make_path(self.input_dir, "notablehashes.txt-md5.idx") self.keyword_path = make_path(self.input_dir, "notablekeywords.xml") self.nsrl_path = make_path(self.input_dir, "nsrl.txt-md5.idx") self.build_path = make_path("..", "build.xml") # Infinite Testing info timer = 0 self.images = [] self.jenkins = False self.timing = False # Set the timeout to something huge # The entire tester should not timeout before this number in ms # However it only seems to take about half this time # And it's very buggy, so we're being careful self.timeout = 24 * 60 * 60 * 1000 * 1000 if not self.args.single: self._load_config_file(self.args.config_file) else: self.images.append(self.args.single_file) self._init_logs() #self._init_imgs() #self._init_build_info() def _load_config_file(self, config_file): """Updates this TestConfiguration's attributes from the config file. Initializes this TestConfiguration by iterating through the XML config file command-line argument. Populates self.images and optional email configuration Args: config_file: ConfigFile - the configuration file to load """ try: count = 0 parsed_config = parse(config_file) logres = [] counts = {} if parsed_config.getElementsByTagName("indir"): self.input_dir = parsed_config.getElementsByTagName("indir")[0].getAttribute("value").encode().decode("utf_8") if parsed_config.getElementsByTagName("global_csv"): self.global_csv = parsed_config.getElementsByTagName("global_csv")[0].getAttribute("value").encode().decode("utf_8") self.global_csv = make_local_path(self.global_csv) if parsed_config.getElementsByTagName("golddir"): self.gold = parsed_config.getElementsByTagName("golddir")[0].getAttribute("value").encode().decode("utf_8") self.img_gold = make_path(self.gold, 'tmp') if parsed_config.getElementsByTagName("jenkins"): self.jenkins = True if parsed_config.getElementsByTagName("diffdir"): self.diff_dir = parsed_config.getElementsByTagName("diffdir")[0].getAttribute("value").encode().decode("utf_8") else: self.jenkins = False if parsed_config.getElementsByTagName("timing"): self.timing = True else: self.timing = False self._init_imgs(parsed_config) self._init_build_info(parsed_config) except IOError as e: msg = "There was an error loading the configuration file.\n" msg += "\t" + str(e) logging.critical(traceback.format_exc()) print(traceback.format_exc()) def _init_logs(self): """Setup output folder, logs, and reporting infrastructure.""" if(not dir_exists(make_path("..", "output", "results"))): os.makedirs(make_path("..", "output", "results",)) self.output_dir = make_path("..", "output", "results", time.strftime("%Y.%m.%d-%H.%M.%S")) os.makedirs(self.output_dir) self.csv = make_local_path(self.output_dir, "CSV.txt") self.html_log = make_path(self.output_dir, "AutopsyTestCase.html") log_name = self.output_dir + "\\regression.log" logging.basicConfig(filename=log_name, level=logging.DEBUG) def _init_build_info(self, parsed_config): """Initializes paths that point to information necessary to run the AutopsyIngest.""" build_elements = parsed_config.getElementsByTagName("build") if build_elements: build_element = build_elements[0] build_path = build_element.getAttribute("value").encode().decode("utf_8") self.build_path = build_path def _init_imgs(self, parsed_config): """Initialize the list of images to run tests on.""" for element in parsed_config.getElementsByTagName("image"): value = element.getAttribute("value").encode().decode("utf_8") print ("Image in Config File: " + value) if file_exists(value): self.images.append(value) else: msg = "File: " + value + " doesn't exist" Errors.print_error(msg) image_count = len(self.images) # Sanity check to see if there are obvious gold images that we are not testing gold_count = 0 for file in os.listdir(self.gold): if not(file == 'tmp'): gold_count+=1 if (image_count > gold_count): print("******Alert: There are more input images than gold standards, some images will not be properly tested.\n") elif (image_count < gold_count): print("******Alert: There are more gold standards than input images, this will not check all gold Standards.\n") #-------------------------------------------------# # Functions relating to comparing outputs # #-------------------------------------------------# class TestResultsDiffer(object): """Compares results for a single test.""" def run_diff(test_data): """Compares results for a single test. Args: test_data: the TestData to use. databaseDiff: TskDbDiff object created based off test_data """ try: output_db = test_data.get_db_path(DBType.OUTPUT) gold_db = test_data.get_db_path(DBType.GOLD) output_dir = test_data.output_path gold_bb_dump = test_data.get_sorted_data_path(DBType.GOLD) gold_dump = test_data.get_db_dump_path(DBType.GOLD) test_data.db_diff_passed = all(TskDbDiff(output_db, gold_db, output_dir=output_dir, gold_bb_dump=gold_bb_dump, gold_dump=gold_dump).run_diff()) # Compare Exceptions # replace is a fucntion that replaces strings of digits with 'd' # this is needed so dates and times will not cause the diff to fail replace = lambda file: re.sub(re.compile("\d"), "d", file) output_errors = test_data.get_sorted_errors_path(DBType.OUTPUT) gold_errors = test_data.get_sorted_errors_path(DBType.GOLD) passed = TestResultsDiffer._compare_text(output_errors, gold_errors, replace) test_data.errors_diff_passed = passed # Compare html output gold_report_path = test_data.get_html_report_path(DBType.GOLD) output_report_path = test_data.get_html_report_path(DBType.OUTPUT) passed = TestResultsDiffer._html_report_diff(gold_report_path, output_report_path) test_data.html_report_passed = passed # Clean up tmp folder del_dir(test_data.gold_data_dir) except sqlite3.OperationalError as e: Errors.print_error("Tests failed while running the diff:\n") Errors.print_error(str(e)) except TskDbDiffException as e: Errors.print_error(str(e)) except Exception as e: Errors.print_error("Tests failed due to an error, try rebuilding or creating gold standards.\n") Errors.print_error(str(e) + "\n") print(traceback.format_exc()) def _compare_text(output_file, gold_file, process=None): """Compare two text files. Args: output_file: a pathto_File, the output text file gold_file: a pathto_File, the input text file pre-process: (optional) a function of String -> String that will be called on each input file before the diff, if specified. """ if(not file_exists(output_file)): return False output_data = codecs.open(output_file, "r", "utf_8").read() gold_data = codecs.open(gold_file, "r", "utf_8").read() if process is not None: output_data = process(output_data) gold_data = process(gold_data) if (not(gold_data == output_data)): diff_path = os.path.splitext(os.path.basename(output_file))[0] diff_path += "-Diff.txt" diff_file = codecs.open(diff_path, "wb", "utf_8") dffcmdlst = ["diff", output_file, gold_file] subprocess.call(dffcmdlst, stdout = diff_file) Errors.add_errors_out(diff_path) return False else: return True def _html_report_diff(gold_report_path, output_report_path): """Compare the output and gold html reports. Args: gold_report_path: a pathto_Dir, the gold HTML report directory output_report_path: a pathto_Dir, the output HTML report directory Returns: true, if the reports match, false otherwise. """ try: gold_html_files = get_files_by_ext(gold_report_path, ".html") output_html_files = get_files_by_ext(output_report_path, ".html") #ensure both reports have the same number of files and are in the same order if(len(gold_html_files) != len(output_html_files)): msg = "The reports did not have the same number or files." msg += "One of the reports may have been corrupted." Errors.print_error(msg) else: gold_html_files.sort() output_html_files.sort() total = {"Gold": 0, "New": 0} for gold, output in zip(gold_html_files, output_html_files): count = TestResultsDiffer._compare_report_files(gold, output) total["Gold"] += count[0] total["New"] += count[1] okay = "The test report matches the gold report." errors=["Gold report had " + str(total["Gold"]) +" errors", "New report had " + str(total["New"]) + " errors."] print_report(errors, "REPORT COMPARISON", okay) if total["Gold"] == total["New"]: return True else: Errors.print_error("The reports did not match each other.\n " + errors[0] +" and the " + errors[1]) return False except OSError as e: e.print_error() return False except Exception as e: Errors.print_error("Error: Unknown fatal error comparing reports.") Errors.print_error(str(e) + "\n") logging.critical(traceback.format_exc()) return False def _compare_report_files(gold_path, output_path): """Compares the two specified report html files. Args: a_path: a pathto_File, the first html report file b_path: a pathto_File, the second html report file Returns: a tuple of (Nat, Nat), which represent the length of each unordered list in the html report files, or (0, 0) if the lenghts are the same. """ gold_file = open(gold_path, encoding='utf-8') output_file = open(output_path, encoding='utf-8') goldHtml = gold_file.read() outputHtml = output_file.read() goldHtml = goldHtml[goldHtml.find("