diff --git a/test/script/regression.py b/test/script/regression.py index 8074516425..ddaade4ec1 100644 --- a/test/script/regression.py +++ b/test/script/regression.py @@ -1,22 +1,22 @@ #!/usr/bin/python -# -*- coding: utf_8 -*- +# -*- coding: utf_8 -*- # Autopsy Forensic Browser - # + # # Copyright 2013 Basis Technology Corp. - # + # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at - # + # # http://www.apache.org/licenses/LICENSE-2.0 - # + # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - + import codecs import datetime import logging @@ -68,12 +68,13 @@ import srcupdater # Data Definitions: # -# Path: A path to a file or directory. +# pathto_X: A path to type X. # ConfigFile: An XML file formatted according to the template in myconfig.xml # ParsedConfig: A dom object that represents a ConfigFile # SQLCursor: A cursor recieved from a connection to an SQL database # Nat: A Natural Number - +# Image: An image +# ##### # Enumeration definition (python 3.2 doesn't have enumerations, this is a common solution @@ -86,17 +87,41 @@ def enum(*seq, **named): # Enumeration of database types used for the simplification of generating database paths DBType = enum('OUTPUT', 'GOLD', 'BACKUP') -# Common filename of the output and gold databases (although they are in different directories +# Common filename of the output and gold databases (although they are in different directories DB_FILENAME = "autopsy.db" # Backup database filename BACKUP_DB_FILENAME = "autopsy_backup.db" +# TODO: Double check this purpose statement +# Folder name for gold standard database testing +AUTOPSY_TEST_CASE = "AutopsyTestCase" + +# TODO: Double check this purpose statement +# The filename of the log to store error messages +COMMON_LOG = "AutopsyErrors.txt" + Day = 0 #-------------------------------------------------------------# # Parses argv and stores booleans to match command line input # #-------------------------------------------------------------# -class Args: +class Args(object): + """A container for command line options and arguments. + + Attributes: + single: a boolean indicating whether to run in single file mode + single_file: an Image to run the test on + rebuild: a boolean indicating whether to run in rebuild mode + list: a boolean indicating a config file was specified + unallocated: a boolean indicating unallocated space should be ignored + ignore: a boolean indicating the input directory should be ingnored + keep: a boolean indicating whether to keep the SOLR index + verbose: a boolean indicating whether verbose output should be printed + exeception: a boolean indicating whether errors containing exception + exception_string should be printed + exception_sring: a String representing and exception name + fr: a boolean indicating whether gold standard images will be downloaded + """ def __init__(self): self.single = False self.single_file = "" @@ -110,9 +135,9 @@ class Args: self.exception = False self.exception_string = "" self.fr = False - + def parse(self): - global nxtproc + global nxtproc nxtproc = [] nxtproc.append("python3") nxtproc.append(sys.argv.pop(0)) @@ -176,23 +201,49 @@ class Args: return True -class TestConfiguration: - """ +class TestConfiguration(object): + """Container for test configuration data. + The Master Test Configuration. Encapsulates consolidated high level input from config XML file and command-line arguments. + + Attributes: + args: an Args, the command line arguments + output_dir: a pathto_Dir, the output directory + input_dir: a pathto_Dir, the input directory + gold: a pathto_Dir, the gold directory + img_gold: a pathto_Dir, the temp directory where gold images are unzipped to + csv: a pathto_File, the local csv file + global_csv: a pathto_File, the global csv file + html_log: a pathto_File + known_bad_path: + keyword_path: + nsrl_path: + build_path: a pathto_File, the ant build file which runs the tests + autopsy_version: + ingest_messages: a Nat, number of ingest messages + indexed_files: a Nat, the number of indexed files + indexed_chunks: a Nat, the number of indexed chunks + timer: + images: a listof_Image, the images to be tested + timeout: a Nat, the amount of time before killing the test + ant: a listof_String, the ant command to run the tests """ + def __init__(self, args): - self.args = args # Args : the command line arguments + """Inits TestConfiguration and loads a config file if available. + + Args: + args: an Args, the command line arguments. + """ + self.args = args # Paths: - self.output_dir = "" # Path : the path to the output directory - self.input_dir = Emailer.make_local_path("..","input") # Path : the Path to the input directory - self.gold = Emailer.make_path("..", "output", "gold") # Path : the Path to the gold directory - self.img_gold = Emailer.make_path(self.gold, 'tmp') - self.common_log = "AutopsyErrors.txt" - self.test_db_file = "autopsy.db" - self.Img_Test_Folder = "AutopsyTestCase" + self.output_dir = "" + self.input_dir = Emailer.make_local_path("..","input") + self.gold = Emailer.make_path("..", "output", "gold") + self.img_gold = Emailer.make_path(self.gold, 'tmp') # Logs: - self.csv = "" + self.csv = "" self.global_csv = "" self.html_log = "" # Ant info: @@ -214,35 +265,18 @@ class TestConfiguration: # And it's very buggy, so we're being careful self.timeout = 24 * 60 * 60 * 1000 * 1000 self.ant = [] - + # Initialize Attributes self._init_logs() self._init_imgs() self._init_build_info() - def get_image_name(self, image_file): - path_end = image_file.rfind("/") - path_end2 = image_file.rfind("\\") - ext_start = image_file.rfind(".") - if(ext_start == -1): - name = image_file - if(path_end2 != -1): - name = image_file[path_end2+1:ext_start] - elif(ext_start == -1): - name = image_file[path_end+1:] - elif(path_end == -1): - name = image_file[:ext_start] - elif(path_end!=-1 and ext_start!=-1): - name = image_file[path_end+1:ext_start] - else: - name = image_file[path_end2+1:ext_start] - return name - + def ant_to_string(self): string = "" for arg in self.ant: string += (arg + " ") - return string + return string def reset(self): # Set the timeout to something huge @@ -251,11 +285,9 @@ class TestConfiguration: # And it's very buggy, so we're being careful self.timeout = 24 * 60 * 60 * 1000 * 1000 self.ant = [] - + def _init_imgs(self): - """ - Initialize the list of images to run test on. - """ + """Initialize the list of images to run test on.""" #Identify tests to run and populate test_config with list # If user wants to do a single file and a list (contradictory?) if self.args.single and self.args.list: @@ -286,11 +318,9 @@ class TestConfiguration: self._print_error(msg) return self._load_config_file(self.args.config_file) - + def _init_logs(self): - """ - Setup output folder, logs, and reporting infrastructure - """ + """Setup output folder, logs, and reporting infrastructure.""" if(not Emailer.dir_exists(Emailer.make_path("..", "output", "results"))): os.makedirs(Emailer.make_path("..", "output", "results",)) self.output_dir = Emailer.make_path("..", "output", "results", time.strftime("%Y.%m.%d-%H.%M.%S")) @@ -301,9 +331,7 @@ class TestConfiguration: logging.basicConfig(filename=log_name, level=logging.DEBUG) def _init_build_info(self): - """ - Initializes paths that point to information necessary to run the AutopsyIngest - """ + """Initializes paths that point to information necessary to run the AutopsyIngest.""" global parsed if(self.args.list): build_elements = parsed.getElementsByTagName("build") @@ -320,12 +348,15 @@ class TestConfiguration: self.known_bad_path = Emailer.make_path(self.input_dir, "notablehashes.txt-md5.idx") self.keyword_path = Emailer.make_path(self.input_dir, "notablekeywords.xml") self.nsrl_path = Emailer.make_path(self.input_dir, "nsrl.txt-md5.idx") - - # ConfigFile -> void + def _load_config_file(self, config_file): - """ + """Updates this TestConfiguration's attributes from the config file. + Initializes this TestConfiguration by iterating through the XML config file command-line argument. Populates self.images and optional email configuration + + Args: + config_file: ConfigFile - the configuration file to load """ try: global parsed @@ -343,7 +374,7 @@ class TestConfiguration: if parsed.getElementsByTagName("golddir"): self.gold = parsed.getElementsByTagName("golddir")[0].getAttribute("value").encode().decode("utf_8") self.img_gold = Emailer.make_path(self.gold, 'tmp') - + # Generate the top navbar of the HTML for easy access to all images images = [] for element in parsed.getElementsByTagName("image"): @@ -353,7 +384,7 @@ class TestConfiguration: self.images.append(value) else: msg = "File: " + value + " doesn't exist" - self._print_error(msg) + self._print_error(msg) image_count = len(images) # Sanity check to see if there are obvious gold images that we are not testing @@ -361,12 +392,12 @@ class TestConfiguration: for file in os.listdir(self.gold): if not(file == 'tmp'): gold_count+=1 - + if (image_count > gold_count): print("******Alert: There are more input images than gold standards, some images will not be properly tested.\n") elif (image_count < gold_count): print("******Alert: There are more gold standards than input images, this will not check all gold Standards.\n") - + except Exception as e: msg = "There was an error running with the configuration file.\n" msg += "\t" + str(e) @@ -374,23 +405,42 @@ class TestConfiguration: logging.critical(traceback.format_exc()) print(traceback.format_exc()) - # String -> void def _print_error(self, msg): - """ - Append the given error message to the global error message and print the message to the screen. + """Append the given error message to the global error message and print the message to the screen. + + Args: + msg: String - the error message to print """ global errorem error_msg = "Configuration: " + msg print(error_msg) errorem += error_msg + "\n" - -class TskDbDiff: - """ + +class TskDbDiff(object): + """Represents the differences between the gold and output databases. + Contains methods to compare two databases and internally store some of the results + + Attributes: + gold_artifacts: + autopsy_artifacts: + gold_attributes: + autopsy_attributes: + gold_objects: + autopsy_objects: + artifact_comparison: + attribute_comparision: + test_data: + autopsy_db_file: + gold_db_file: """ - # TskDbDiff(TestData) def __init__(self, test_data): + """Constructor for TskDbDiff. + + Args: + test_data: TestData - the test data to compare + """ self.gold_artifacts = [] self.autopsy_artifacts = [] self.gold_attributes = 0 @@ -400,54 +450,17 @@ class TskDbDiff: self.artifact_comparison = [] self.attribute_comparison = [] self.test_data = test_data - self.autopsy_db_file = self.test_data.getDBPath(DBType.OUTPUT) - self.gold_db_file = self.test_data.getDBPath(DBType.GOLD) - - def clear(self): - self.gold_artifacts = [] - self.autopsy_artifacts = [] - self.gold_attributes = 0 - self.autopsy_attributes = 0 - self.gold_objects = 0 - self.autopsy_objects = 0 - self.artifact_comparison = [] - self.attribute_comparison = [] - - - def get_artifacts_count(self): - total = 0 - for nums in self.autopsy_artifacts: - total += nums - return total - - def get_artifact_comparison(self): - if not self.artifact_comparison: - return "All counts matched" - else: - global failedbool - global errorem - failedbool = True - global imgfail - imgfail = True - return "; ".join(self.artifact_comparison) - - def get_attribute_comparison(self): - if not self.attribute_comparison: - return "All counts matched" - global failedbool - global errorem - failedbool = True - global imgfail - imgfail = True - list = [] - for error in self.attribute_comparison: - list.append(error) - return ";".join(list) - - # SQLCursor -> listof_Artifact - def _count_artifacts(self, cursor): - """ - Get a list of artifacts from the given SQLCursor + self.autopsy_db_file = self.test_data.get_db_path(DBType.OUTPUT) + self.gold_db_file = self.test_data.get_db_path(DBType.GOLD) + + def _get_artifacts(self, cursor): + """Get a list of artifacts from the given SQLCursor. + + Args: + cursor: SQLCursor - the cursor to execute on + + Returns: + listof_Artifact - the artifacts found by the query """ cursor.execute("SELECT COUNT(*) FROM blackboard_artifact_types") length = cursor.fetchone()[0] + 1 @@ -455,26 +468,34 @@ class TskDbDiff: for type_id in range(1, length): cursor.execute("SELECT COUNT(*) FROM blackboard_artifacts WHERE artifact_type_id=%d" % type_id) artifacts.append(cursor.fetchone()[0]) - return artifacts - - # SQLCursor -> Nat + return artifacts + def _count_attributes(self, cursor): - """ - Count the attributes from the given SQLCursor + """Count the attributes from the given SQLCursor. + + Args: + cursor: SQLCursor - the cursor to execute on + + Returns: + Nat - the number of attributes found by the query """ cursor.execute("SELECT COUNT(*) FROM blackboard_attributes") return cursor.fetchone()[0] - - # SQLCursor -> Nat + def _count_objects(self, cursor): + """Count the objects from the given SQLCursor. + + Args: + cursor: SQLCursor - the cursor to execute on + + Returns: + Nat - the number of objects found by the query """ - Count the objects from the given SQLCursor - """ cursor.execute("SELECT COUNT(*) FROM tsk_objects") return cursor.fetchone()[0] - - # Compares the blackboard artifact counts of two databases + def _compare_bb_artifacts(self): + """Compares the blackboard artifact counts of two databases.""" exceptions = [] try: global failedbool @@ -498,9 +519,8 @@ class TskDbDiff: exceptions.append("Error: Unable to compare blackboard_artifacts.\n") return exceptions - # Compares the blackboard atribute counts of two databases - # given the two database cursors def _compare_bb_attributes(self): + """Compares the blackboard attribute counts of two databases.""" exceptions = [] try: if self.gold_attributes != self.autopsy_attributes: @@ -518,9 +538,8 @@ class TskDbDiff: exceptions.append("Error: Unable to compare blackboard_attributes.\n") return exceptions - # Compares the tsk object counts of two databases - # given the two database cursors def _compare_tsk_objects(self): + """Compares the TSK object counts of two databases.""" exceptions = [] try: if self.gold_objects != self.autopsy_objects: @@ -537,28 +556,38 @@ class TskDbDiff: except Exception as e: exceptions.append("Error: Unable to compare tsk_objects.\n") return exceptions - - # SQLCursor x SQLCursor -> void + def _get_basic_counts(self, autopsy_cur, gold_cur): - """ - Get the counts of objects, artifacts, and attributes in the Gold and Ouput databases. + """Count the items necessary to compare the databases. + + Gets the counts of objects, artifacts, and attributes in the Gold + and Ouput databases and updates this TskDbDiff's attributes + accordingly + + Args: + autopsy_cur: SQLCursor - the cursor for the output database + gold_cur: SQLCursor - the cursor for the gold database + + Returns: + """ try: - # Objects + # Objects self.gold_objects = self._count_objects(gold_cur) self.autopsy_objects = self._count_objects(autopsy_cur) # Artifacts - self.gold_artifacts = self._count_artifacts(gold_cur) - self.autopsy_artifacts = self._count_artifacts(autopsy_cur) + self.gold_artifacts = self._get_artifacts(gold_cur) + self.autopsy_artifacts = self._get_artifacts(autopsy_cur) # Attributes self.gold_attributes = self._count_attributes(gold_cur) self.autopsy_attributes = self._count_attributes(autopsy_cur) except Exception as e: printerror(self.test_data, "Way out:" + str(e)) - def compare_basic_counts(self): - """ - Basic test between output and gold databases. Compares only counts of objects and blackboard items. + def run_diff(self): + """Basic test between output and gold databases. + + Compares only counts of objects and blackboard items. Note: SQLITE needs unix style pathing """ # Check to make sure both db files exist @@ -570,46 +599,54 @@ class TskDbDiff: printerror(self.test_data, "Error: Gold database file does not exist at:") printerror(self.test_data, self.gold_db_file + "\n") return - + # Get connections and cursors to output / gold databases autopsy_con = sqlite3.connect(self.autopsy_db_file) autopsy_cur = autopsy_con.cursor() gold_con = sqlite3.connect(self.gold_db_file) gold_cur = gold_con.cursor() - + # Get Counts of objects, artifacts, and attributes - self._get_basic_counts(autopsy_cur, gold_cur) - - # We're done with the databases, close up the connections + self._get_basic_counts(autopsy_cur, gold_cur) + + # We're done with the databases, close up the connections autopsy_con.close() gold_con.close() - + exceptions = [] - + # Compare counts exceptions.append(self._compare_tsk_objects()) exceptions.append(self._compare_bb_artifacts()) exceptions.append(self._compare_bb_attributes()) - + self.artifact_comparison = exceptions[1] self.attribute_comparison = exceptions[2] - + okay = "All counts match." print_report(self.test_data, exceptions[0], "COMPARE TSK OBJECTS", okay) print_report(self.test_data, exceptions[1], "COMPARE ARTIFACTS", okay) print_report(self.test_data, exceptions[2], "COMPARE ATTRIBUTES", okay) - - - # smart method that deals with blackboard comparison to avoid issues with different IDs based on when artifacts were created. - # Dumps sorted text results to output location stored in test_data. - # autopsy_db_file: Output database file + + return DiffResults(self) + def _dump_output_db_bb(autopsy_con, autopsy_db_file, test_data): + """Dumps sorted text results to the output location stored in test_data. + + Smart method that deals with a blackboard comparison to avoid issues + with different IDs based on when artifacts were created. + + Args: + autopsy_con: a SQLConn to the autopsy database. + autopsy_db_file: a pathto_File, the output database. + test_data: the TestData that corresponds with this dump. + """ autopsy_cur2 = autopsy_con.cursor() global errorem global attachl global failedbool # Get the list of all artifacts - # @@@ Could add a SORT by parent_path in here since that is how we are going to later sort it. + # @@@ Could add a SORT by parent_path in here since that is how we are going to later sort it. autopsy_cur2.execute("SELECT tsk_files.parent_path, tsk_files.name, blackboard_artifact_types.display_name, blackboard_artifacts.artifact_id FROM blackboard_artifact_types INNER JOIN blackboard_artifacts ON blackboard_artifact_types.artifact_type_id = blackboard_artifacts.artifact_type_id INNER JOIN tsk_files ON tsk_files.obj_id = blackboard_artifacts.obj_id") database_log = codecs.open(test_data.autopsy_data_file, "wb", "utf_8") rw = autopsy_cur2.fetchone() @@ -623,7 +660,7 @@ class TskDbDiff: database_log.write(rw[0] + rw[1] + ' ') else: database_log.write(rw[1] + ' ') - + # Get attributes for this artifact autopsy_cur1 = autopsy_con.cursor() looptry = True @@ -644,7 +681,7 @@ class TskDbDiff: test_data.artifact_fail += 1 print(test_data.artifact_fail) database_log.write('Error Extracting Attributes'); - + # Print attributes if(looptry == True): src = attributes[0][0] @@ -684,24 +721,30 @@ class TskDbDiff: database_log.write('" />') database_log.write(' \n') rw = autopsy_cur2.fetchone() - + # Now sort the file - srtcmdlst = ["sort", test_data.autopsy_data_file, "-o", test_data.sorted_data_file] + srtcmdlst = ["sort", test_data.autopsy_data_file, "-o", test_data.get_sorted_data_path(DBType.OUTPUT)] subprocess.call(srtcmdlst) print(test_data.artifact_fail) if(test_data.artifact_fail > 0): errorem += test_data.image_name + ":There were " + str(test_data.artifact_count) + " artifacts and " + str(test_data.artifact_fail) + " threw an exception while loading.\n" except Exception as e: printerror(test_data, 'outer exception: ' + str(e)) - - # Dumps a database (minus the artifact and attributes) to a text file. + def _dump_output_db_nonbb(test_data): + """Dumps a database to a text file. + + Does not dump the artifact and attributes. + + Args: + test_data: the TestData that corresponds with this dump. + """ # Make a copy of the DB - autopsy_db_file = test_data.getDBPath(DBType.OUTPUT) - backup_db_file = test_data.getDBPath(DBType.BACKUP) + autopsy_db_file = test_data.get_db_path(DBType.OUTPUT) + backup_db_file = test_data.get_db_path(DBType.BACKUP) copy_file(autopsy_db_file, backup_db_file) autopsy_con = sqlite3.connect(backup_db_file) - + # Delete the blackboard tables autopsy_con.execute("DROP TABLE blackboard_artifacts") autopsy_con.execute("DROP TABLE blackboard_attributes") @@ -716,11 +759,15 @@ class TskDbDiff: printerror(test_data, "dump_output_db_nonbb: Inner dump Exception:" + str(e)) except Exception as e: printerror(test_data, "dump_output_db_nonbb: Outer dump Exception:" + str(e)) - - - # Dumps the given database to text files for later comparison + + def dump_output_db(test_data): - autopsy_db_file = test_data.getDBPath(DBType.OUTPUT) + """Dumps the given database to text files for later comparison. + + Args: + test_data: the TestData that corresponds to this dump. + """ + autopsy_db_file = test_data.get_db_path(DBType.OUTPUT) autopsy_con = sqlite3.connect(autopsy_db_file) autopsy_cur = autopsy_con.cursor() # Try to query the databases. Ignore any exceptions, the function will @@ -728,62 +775,116 @@ class TskDbDiff: TskDbDiff._dump_output_db_bb(autopsy_con, autopsy_db_file, test_data) TskDbDiff._dump_output_db_nonbb(test_data) autopsy_con.close() - - + +class DiffResults(object): + """Container for the results of the database diff tests. + + Stores artifact, object, and attribute counts and comparisons generated by + TskDbDiff. + + Attributes: + gold_attrs: a Nat, the number of gold attributes + output_attrs: a Nat, the number of output attributes + gold_objs: a Nat, the number of gold objects + output_objs: a Nat, the number of output objects + artifact_comp: a listof_String, describing the differences + attribute_comp: a listof_String, describing the differences + """ + def __init__(self, tsk_diff): + """Inits a DiffResults + + Args: + tsk_diff: a TskDBDiff + """ + self.gold_attrs = tsk_diff.gold_attributes + self.output_attrs = tsk_diff.autopsy_attributes + self.gold_objs = tsk_diff.gold_objects + self.output_objs = tsk_diff.autopsy_objects + self.artifact_comp = tsk_diff.artifact_comparison + self.attribute_comp = tsk_diff.attribute_comparison + self.gold_artifacts = len(tsk_diff.gold_artifacts) + self.output_artifacts = len(tsk_diff.autopsy_artifacts) + + def get_artifact_comparison(self): + if not self.artifact_comp: + return "All counts matched" + else: + global failedbool + global errorem + failedbool = True + global imgfail + imgfail = True + return "; ".join(self.artifact_comp) + + def get_attribute_comparison(self): + if not self.attribute_comp: + return "All counts matched" + global failedbool + global errorem + failedbool = True + global imgfail + imgfail = True + list = [] + for error in self.attribute_comp: + list.append(error) + return ";".join(list) #-------------------------------------------------# # Functions relating to comparing outputs # -#-------------------------------------------------# -class TestResultsDiffer: - - # Compares results for a single test. Autopsy has already been run. - # test_data: TestData object - # databaseDiff: TskDbDiff object created based on test_data - def run_diff(test_data, databaseDiff): - try: - gold_path = test_config.gold - # Tmp location to extract ZIP file into - img_gold = Emailer.make_path(test_config.gold, "tmp", test_data.image_name) +#-------------------------------------------------# +class TestResultsDiffer(object): + """Compares results for a single test. + """ - # Open gold archive file - img_archive = Emailer.make_path("..", "output", "gold", test_data.image_name+"-archive.zip") - extrctr = zipfile.ZipFile(img_archive, 'r', compression=zipfile.ZIP_DEFLATED) - extrctr.extractall(gold_path) + def run_diff(test_data): + """Compares results for a single test. + + Args: + test_data: the TestData to use. + databaseDiff: TskDbDiff object created based off test_data + """ + try: + # Extract gold archive file to output/gold/tmp/ + extrctr = zipfile.ZipFile(test_data.gold_archive, 'r', compression=zipfile.ZIP_DEFLATED) + extrctr.extractall(test_data.main_config.gold) extrctr.close time.sleep(2) - # Lists of tests to run + # Lists of tests to run TestResultsDiffer._compare_errors(test_data) - + # Compare database count to gold - databaseDiff.compare_basic_counts() - + test_data.db_diff_results = TskDbDiff(test_data).run_diff() + # Compare smart blackboard results - TestResultsDiffer._compare_text(test_data.sorted_data_file, "SortedData", test_data) + TestResultsDiffer._compare_text(test_data.get_sorted_data_path(DBType.OUTPUT), "SortedData", test_data) # Compare the rest of the database (non-BB) TestResultsDiffer._compare_text(test_data.test_dbdump, "DBDump", test_data) # Compare html output TestResultsDiffer._compare_to_gold_html(test_data) - + # Clean up tmp folder - del_dir(img_gold) - + del_dir(test_data.gold_data_dir) + except Exception as e: printerror(test_data, "Tests failed due to an error, try rebuilding or creating gold standards.\n") printerror(test_data, str(e) + "\n") print(traceback.format_exc()) - - - # @@@ _compare_text could be made more generic with how it forms the paths (i.e. not add ".txt" in the method) and probably merged with + + + # TODO: _compare_text could be made more generic with how it forms the paths (i.e. not add ".txt" in the method) and probably merged with # compare_errors since they both do basic comparison of text files - - # Compares two text files - # output_file: output text file - # gold_file: gold text file - # test_data: Test being performed + def _compare_text(output_file, gold_file, test_data): + """Compare two text files. + + Args: + output_file: a pathto_File, the output text file + gold_file: a pathto_File, the input text file + test_data: the TestData of the test being performed + """ gold_dir = Emailer.make_path(test_config.img_gold, test_data.image_name, test_data.image_name + gold_file + ".txt") if(not Emailer.file_exists(output_file)): return @@ -793,8 +894,8 @@ class TestResultsDiffer: srtd_dat = srtd_data.read() if (not(gold_dat == srtd_dat)): diff_dir = Emailer.make_local_path(test_config.output_dir, test_data.image_name, test_data.image_name+gold_file+"-Diff.txt") - diff_file = codecs.open(diff_dir, "wb", "utf_8") - dffcmdlst = ["diff", test_data.sorted_data_file, gold_dir] + diff_file = codecs.open(diff_dir, "wb", "utf_8") + dffcmdlst = ["diff", test_data.get_sorted_data_path(DBType.OUTPUT), gold_dir] subprocess.call(dffcmdlst, stdout = diff_file) global attachl global errorem @@ -806,9 +907,13 @@ class TestResultsDiffer: global imgfail imgfail = True - # Compare merged error log files def _compare_errors(test_data): - gold_dir = Emailer.make_path(test_config.img_gold, test_data.image_name, test_data.image_name + "SortedErrors.txt") + """Compare merged error log files. + + Args: + test_data: the TestData of the test being performed + """ + gold_dir = test_data.get_sorted_errors_path(DBType.GOLD) common_log = codecs.open(test_data.sorted_log, "r", "utf_8") gold_log = codecs.open(gold_dir, "r", "utf_8") gold_dat = gold_log.read() @@ -816,7 +921,7 @@ class TestResultsDiffer: patrn = re.compile("\d") if (not((re.sub(patrn, 'd', gold_dat)) == (re.sub(patrn, 'd', common_dat)))): diff_dir = Emailer.make_local_path(test_config.output_dir, test_data.image_name, test_data.image_name+"AutopsyErrors-Diff.txt") - diff_file = open(diff_dir, "w") + diff_file = open(diff_dir, "w") dffcmdlst = ["diff", test_data.sorted_log, gold_dir] subprocess.call(dffcmdlst, stdout = diff_file) global attachl @@ -829,18 +934,21 @@ class TestResultsDiffer: failedbool = True global imgfail imgfail = True - - # Compare the html report file made by - # the regression test against the gold standard html report + def _compare_to_gold_html(test_data): + """Compare the output and gold html reports. + + Args: + test_data: the TestData of the test being performed. + """ gold_html_file = Emailer.make_path(test_config.img_gold, test_data.image_name, "Report", "index.html") htmlfolder = "" - for fs in os.listdir(Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports")): - if os.path.isdir(Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", fs)): + for fs in os.listdir(Emailer.make_path(test_config.output_dir, test_data.image_name, AUTOPSY_TEST_CASE, "Reports")): + if os.path.isdir(Emailer.make_path(test_config.output_dir, test_data.image_name, AUTOPSY_TEST_CASE, "Reports", fs)): htmlfolder = fs - autopsy_html_path = Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", htmlfolder, "HTML Report") - - + autopsy_html_path = Emailer.make_path(test_config.output_dir, test_data.image_name, AUTOPSY_TEST_CASE, "Reports", htmlfolder, "HTML Report") + + try: autopsy_html_file = get_file_in_dir(autopsy_html_path, "index.html") if not Emailer.file_exists(gold_html_file): @@ -853,9 +961,9 @@ class TestResultsDiffer: return #Find all gold .html files belonging to this test_config ListGoldHTML = [] - for fs in os.listdir(Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", htmlfolder)): + for fs in os.listdir(Emailer.make_path(test_config.output_dir, test_data.image_name, AUTOPSY_TEST_CASE, "Reports", htmlfolder)): if(fs.endswith(".html")): - ListGoldHTML.append(Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", htmlfolder, fs)) + ListGoldHTML.append(Emailer.make_path(test_config.output_dir, test_data.image_name, AUTOPSY_TEST_CASE, "Reports", htmlfolder, fs)) #Find all new .html files belonging to this test_config ListNewHTML = [] if(os.path.exists(Emailer.make_path(test_config.img_gold, test_data.image_name))): @@ -868,7 +976,7 @@ class TestResultsDiffer: else: ListGoldHTML.sort() ListNewHTML.sort() - + total = {"Gold": 0, "New": 0} for x in range(0, len(ListGoldHTML)): count = TestResultsDiffer._compare_report_files(ListGoldHTML[x], ListNewHTML[x]) @@ -889,46 +997,103 @@ class TestResultsDiffer: printerror(test_data, "Error: Unknown fatal error comparing reports.") printerror(test_data, str(e) + "\n") logging.critical(traceback.format_exc()) - - # Compares file a to file b and any differences are returned - # Only works with report html files, as it searches for the first
    + def _compare_report_files(a_path, b_path): + """Compares the two specified report html files. + + Args: + a_path: a pathto_File, the first html report file + b_path: a pathto_File, the second html report file + + Returns: + a tuple of (Nat, Nat), which represent the length of each + unordered list in the html report files, or (0, 0) if the + lenghts are the same. + """ a_file = open(a_path) b_file = open(b_path) a = a_file.read() b = b_file.read() a = a[a.find("
      "):] b = b[b.find("
        "):] - + a_list = TestResultsDiffer._split(a, 50) b_list = TestResultsDiffer._split(b, 50) if not len(a_list) == len(b_list): ex = (len(a_list), len(b_list)) return ex - else: + else: return (0, 0) - + # Split a string into an array of string of the given size def _split(input, size): return [input[start:start+size] for start in range(0, len(input), size)] - -class TestData: - """ + +class TestData(object): + """Container for the input and output of a single image. + Represents data for the test of a single image, including path to the image, database paths, etc. + + Attributes: + main_config: the global TestConfiguration + image_file: a pathto_Image, the image for this TestData + image: a String, the image file's name + image_name: a String, the image file's name with a trailing (0) + output_path: pathto_Dir, the output directory for this TestData + autopsy_data_file: a pathto_File, the IMAGE_NAMEAutopsy_data.txt file + warning_log: a pathto_File, the AutopsyLogs.txt file + antlog_dir: a pathto_File, the antlog.txt file + test_dbdump: a pathto_File, the database dump, IMAGENAMEDump.txt + common_log_path: a pathto_File, the IMAGE_NAMECOMMON_LOG file + sorted_log: a pathto_File, the IMAGENAMESortedErrors.txt file + reports_dir: a pathto_Dir, the AutopsyTestCase/Reports folder + gold_data_dir: a pathto_Dir, the gold standard directory + gold_archive: a pathto_File, the gold standard archive + logs_dir: a pathto_Dir, the location where autopsy logs are stored + solr_index: a pathto_Dir, the locatino of the solr index + db_diff_results: a DiffResults, the results of the database comparison + total_test_time: a String representation of the test duration + start_date: a String representation of this TestData's start date + end_date: a String representation of the TestData's end date + total_ingest_time: a String representation of the total ingest time + artifact_count: a Nat, the number of artifacts + artifact_fail: a Nat, the number of artifact failures + heap_space: a String representation of TODO + service_times: a String representation of TODO + report_passed: a boolean, indicating if the reports passed + printerror: a listof_String, the error messages printed during this TestData's test + printout: a listof_String, the messages pritned during this TestData's test """ - def __init__(self, main_config): - self.main_config = main_config - self.image = "" - self.image_file = "" - self.image_name = "" - self.sorted_log = "" - self.warning_log = "" - self.autopsy_data_file = "" - self.sorted_data_file = "" - self.common_log_path = "" - self.antlog_dir = "" - self.test_dbdump = "" + + def __init__(self, image, main_config): + """Init this TestData with it's image and the test configuration. + + Args: + image: the Image to be tested. + main_config: the global TestConfiguration. + """ + self.main_config = main_config + self.image_file = str(image) + # TODO: This 0 should be be refactored out, but it will require rebuilding and changing of outputs. + self.image = get_image_name(self.image_file) + self.image_name = self.image + "(0)" + self.output_path = Emailer.make_path(self.main_config.output_dir, self.image_name) + self.autopsy_data_file = Emailer.make_path(self.output_path, self.image_name + "Autopsy_data.txt") + self.warning_log = Emailer.make_local_path(self.output_path, "AutopsyLogs.txt") + self.antlog_dir = Emailer.make_local_path(self.output_path, "antlog.txt") + self.test_dbdump = Emailer.make_path(self.output_path, self.image_name + + "DBDump.txt") + self.common_log_path = Emailer.make_local_path(self.output_path, self.image_name + COMMON_LOG) + self.sorted_log = Emailer.make_local_path(self.output_path, self.image_name + "SortedErrors.txt") + self.reports_dir = Emailer.make_path(self.output_path, AUTOPSY_TEST_CASE, "Reports") + self.gold_data_dir = Emailer.make_path(self.main_config.img_gold, self.image_name) + self.gold_archive = Emailer.make_path(self.main_config.gold, + self.image_name + "-archive.zip") + self.logs_dir = Emailer.make_path(self.output_path, "logs") + self.solr_index = Emailer.make_path(self.output_path, AUTOPSY_TEST_CASE, + "ModuleOutput", "KeywordSearch") + self.db_diff_results = None self.total_test_time = "" self.start_date = "" self.end_date = "" @@ -941,7 +1106,7 @@ class TestData: # Error tracking self.printerror = [] self.printout = [] - + def reset(self): self.image = "" self.image_file = "" @@ -949,7 +1114,6 @@ class TestData: self.sorted_log = "" self.warning_log = "" self.autopsy_data_file = "" - self.sorted_data_file = "" self.common_log_path = "" self.antlog_dir = "" self.test_dbdump = "" @@ -965,29 +1129,93 @@ class TestData: self.printerror = [] self.printout = [] - # DBType -> Path - def getDBPath(self, db_type): - """ - Return the path to the database file that corresponds to the given DBType for this TestData + def get_db_path(self, db_type): + """Get the path to the database file that corresponds to the given DBType. + + Args: + DBType: the DBType of the path to be generated. """ if(db_type == DBType.GOLD): - db_path = Emailer.make_path(self.main_config.img_gold, self.image_name, DB_FILENAME) + db_path = Emailer.make_path(self.gold_data_dir, DB_FILENAME) elif(db_type == DBType.OUTPUT): - db_path = Emailer.make_path(self.main_config.output_dir, self.image_name, self.main_config.Img_Test_Folder, DB_FILENAME) + db_path = Emailer.make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, DB_FILENAME) else: - db_path = Emailer.make_path(self.main_config.output_dir, self.image_name, self.main_config.Img_Test_Folder, BACKUP_DB_FILENAME) - return db_path + db_path = Emailer.make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, BACKUP_DB_FILENAME) + return db_path -class Reports: - def generate_reports(csv_path, database, test_data): - Reports._generate_html(database, test_data) - if test_config.global_csv: - Reports._generate_csv(test_config.global_csv, database, test_data) + def get_html_report_path(self, html_type): + """Get the path to the HTML Report folder that corresponds to the given DBType. + + Args: + DBType: the DBType of the path to be generated. + """ + if(html_type == DBType.GOLD): + return Emailer.make_path(self.gold_data_dir, "Report") else: - Reports._generate_csv(csv_path, database, test_data) - - # Generates the HTML log file - def _generate_html(database, test_data): + # Autopsy creates an HTML report folder in the form AutopsyTestCase DATE-TIME + # It's impossible to get the exact time the folder was created, but the folder + # we are looking for is the only one in the self.reports_dir folder + html_path = "" + for fs in os.listdir(self.reports_dir): + html_path = Emailer.make_path(self.reports_dir, fs) + if os.path.isdir(html_path): + break + return Emailer.make_path(html_path, "HTML Report") + + def get_sorted_data_path(self, file_type): + """Get the path to the SortedData file that corresponds to the given DBType. + + Args: + file_type: the DBType of the path to be generated + """ + return self._get_path_to_file(file_type, "SortedData.txt") + + def get_sorted_errors_path(self, file_type): + """Get the path to the SortedErrors file that correspodns to the given + DBType. + + Args: + file_type: the DBType of the path to be generated + """ + return self._get_path_to_file(file_type, "SortedErrors.txt") + + def get_db_dump_path(self, file_type): + """Get the path to the DBDump file that corresponds to the given DBType. + + Args: + file_type: the DBType of the path to be generated + """ + return self._get_path_to_file(file_type, "DBDump.txt") + + def _get_path_to_file(self, file_type, file_name): + """Get the path to the specified file with the specified type. + + Args: + file_type: the DBType of the path to be generated + file_name: a String, the filename of the path to be generated + """ + full_filename = self.image_name + file_name + if(file_type == DBType.GOLD): + return Emailer.make_path(self.gold_data_dir, full_filename) + else: + return Emailer.make_path(self.output_path, full_filename) + +class Reports(object): + def generate_reports(csv_path, test_data): + """Generate the reports for a single test + + Args: + csv_path: a pathto_File, the csv file + test_data: the TestData + """ + Reports._generate_html(test_data) + if test_config.global_csv: + Reports._generate_csv(test_config.global_csv, test_data) + else: + Reports._generate_csv(csv_path, test_data) + + def _generate_html(test_data): + """Generate the HTML log file.""" # If the file doesn't exist yet, this is the first test_config to run for # this test, so we need to make the start of the html log global imgfail @@ -1021,16 +1249,16 @@ class Reports: if "\n" in error: errors += "
        " errors += "" - + # Links to the logs logs = "
        \

        Logs

        \
        " - logs_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs") + logs_path = test_data.logs_dir for file in os.listdir(logs_path): logs += "

        " + file + "

        " logs += "
        " - + # All the testing information info = "
        \

        Information

        \ @@ -1079,11 +1307,11 @@ class Reports:

        (will skew other test results)

        " info += "" + str(len(search_log_set("autopsy", "Stopping ingest due to low disk space on disk", test_data))) + "" info += "TSK Objects Count:" - info += "" + str(database.autopsy_objects) + "" + info += "" + str(test_data.db_diff_results.output_objs) + "" info += "Artifacts Count:" - info += "" + str(database.get_artifacts_count()) + "" + info += "" + str(test_data.db_diff_results.output_artifacts)+ "" info += "Attributes Count:" - info += "" + str(database.autopsy_attributes) + "" + info += "" + str(test_data.db_diff_results.output_attrs) + "" info += "\
        " # For all the general print statements in the test_config @@ -1097,7 +1325,7 @@ class Reports: if "\n" in out: output += "
        " output += "" - + html.write(title) html.write(errors) html.write(info) @@ -1110,8 +1338,8 @@ class Reports: printerror(test_data, str(e) + "\n") logging.critical(traceback.format_exc()) - # Writed the top of the HTML log file def write_html_head(): + """Write the top of the HTML log file.""" print(test_config.html_log) html = open(str(test_config.html_log), "a") head = "\ @@ -1137,15 +1365,19 @@ class Reports: html.write(head) html.close() - # Writed the bottom of the HTML log file def write_html_foot(): + """Write the bottom of the HTML log file.""" html = open(test_config.html_log, "a") head = "" html.write(head) html.close() - # Adds all the image names to the HTML log for easy access def html_add_images(full_image_names): + """Add all the image names to the HTML log. + + Args: + full_image_names: a listof_String, each representing an image name + """ # If the file doesn't exist yet, this is the first test_config to run for # this test, so we need to make the start of the html log if not Emailer.file_exists(test_config.html_log): @@ -1153,12 +1385,13 @@ class Reports: html = open(test_config.html_log, "a") links = [] for full_name in full_image_names: - name = test_config.get_image_name(full_name) + name = get_image_name(full_name) links.append("" + name + "") html.write("

        " + (" | ".join(links)) + "

        ") + html.close() - # Generate the CSV log file - def _generate_csv(csv_path, database, test_data): + def _generate_csv(csv_path, test_data): + """Generate the CSV log file""" try: # If the CSV file hasn't already been generated, this is the # first run, and we need to add the column names @@ -1166,7 +1399,7 @@ class Reports: Reports.csv_header(csv_path) # Now add on the fields to a new row csv = open(csv_path, "a") - + # Variables that need to be written vars = [] vars.append( test_data.image_file ) @@ -1190,12 +1423,12 @@ class Reports: vars.append( str(test_config.indexed_files) ) vars.append( str(test_config.indexed_chunks) ) vars.append( str(len(search_log_set("autopsy", "Stopping ingest due to low disk space on disk", test_data))) ) - vars.append( str(database.autopsy_objects) ) - vars.append( str(database.get_artifacts_count()) ) - vars.append( str(database.autopsy_attributes) ) - vars.append( Emailer.make_local_path("gold", test_data.image_name, test_config.test_db_file) ) - vars.append( database.get_artifact_comparison() ) - vars.append( database.get_attribute_comparison() ) + vars.append( str(test_data.db_diff_results.output_objs) ) + vars.append( str(test_data.db_diff_results.output_artifacts) ) + vars.append( str(test_data.db_diff_results.output_objs) ) + vars.append( Emailer.make_local_path("gold", test_data.image_name, DB_FILENAME) ) + vars.append( test_data.db_diff_results.get_artifact_comparison() ) + vars.append( test_data.db_diff_results.get_attribute_comparison() ) vars.append( Emailer.make_local_path("gold", test_data.image_name, "standard.html") ) vars.append( str(test_data.report_passed) ) vars.append( test_config.ant_to_string() ) @@ -1212,8 +1445,8 @@ class Reports: print(traceback.format_exc()) logging.critical(traceback.format_exc()) - # Generates the CSV header (column names) def csv_header(csv_path): + """Generate the CSV column names.""" csv = open(csv_path, "w") titles = [] titles.append("Image Path") @@ -1251,13 +1484,17 @@ class Reports: csv.write(output) csv.close() - # Returns the number of OutOfMemoryErrors and OutOfMemoryExceptions - # for a certain type of log def _get_num_memory_errors(type, test_data): - return (len(search_log_set(type, "OutOfMemoryError", test_data)) + + """Get the number of OutOfMemory errors and Exceptions. + + Args: + type: a String representing the type of log to check. + test_data: the TestData to examine. + """ + return (len(search_log_set(type, "OutOfMemoryError", test_data)) + len(search_log_set(type, "OutOfMemoryException", test_data))) -class Logs: +class Logs(object): def generate_log_data(test_data): Logs._generate_common_log(test_data) try: @@ -1275,7 +1512,7 @@ class Logs: # from each log file generated by Autopsy def _generate_common_log(test_data): try: - logs_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs") + logs_path = test_data.logs_dir common_log = codecs.open(test_data.common_log_path, "w", "utf_8") warning_log = codecs.open(test_data.warning_log, "w", "utf_8") common_log.write("--------------------------------------------------\n") @@ -1307,17 +1544,17 @@ class Logs: printerror(test_data, traceback.format_exc()) logging.critical(traceback.format_exc()) - # Fill in the global test_config's variables that require the log files def _fill_test_config_data(test_data): + """Fill the global test config's variables that require the log files.""" try: # Open autopsy.log.0 - log_path = Emailer.make_path(test_config.output_dir, test_data.image_name, "logs", "autopsy.log.0") + log_path = Emailer.make_path(test_data.logs_dir, "autopsy.log.0") log = open(log_path) - + # Set the test_config starting time based off the first line of autopsy.log.0 # *** If logging time format ever changes this will break *** test_data.start_date = log.readline().split(" org.")[0] - + # Set the test_config ending time based off the "create" time (when the file was copied) test_data.end_date = time.ctime(os.path.getmtime(log_path)) except Exception as e: @@ -1334,21 +1571,21 @@ class Logs: try: # Set Autopsy version, heap space, ingest time, and service times - + version_line = search_logs("INFO: Application name: Autopsy, version:", test_data)[0] test_config.autopsy_version = Emailer.get_word_at(version_line, 5).rstrip(",") - + test_data.heap_space = search_logs("Heap memory usage:", test_data)[0].rstrip().split(": ")[1] - + ingest_line = search_logs("Ingest (including enqueue)", test_data)[0] test_data.total_ingest_time = Emailer.get_word_at(ingest_line, 6).rstrip() - + message_line = search_log_set("autopsy", "Ingest messages count:", test_data)[0] test_config.ingest_messages = int(message_line.rstrip().split(": ")[2]) - + files_line = search_log_set("autopsy", "Indexed files count:", test_data)[0] test_config.indexed_files = int(files_line.rstrip().split(": ")[2]) - + chunks_line = search_log_set("autopsy", "Indexed file chunks count:", test_data)[0] test_config.indexed_chunks = int(chunks_line.rstrip().split(": ")[2]) except Exception as e: @@ -1375,17 +1612,30 @@ class Logs: printerror(test_data, "Error: Unknown fatal error when finding service times.") printerror(test_data, str(e) + "\n") logging.critical(traceback.format_exc()) - - # Returns all the errors found in the common log in a list + def _report_all_errors(): + """Generate a list of all the errors found in the common log. + + Returns: + a listof_String, the errors found in the common log + """ try: return get_warnings() + get_exceptions() except Exception as e: printerror(test_data, "Error: Unknown fatal error when reporting all errors.") printerror(test_data, str(e) + "\n") logging.warning(traceback.format_exc()) - # Searches the common log for any instances of a specific string. + def search_common_log(string, test_data): + """Search the common log for any instances of a given string. + + Args: + string: the String to search for. + test_data: the TestData that holds the log to search. + + Returns: + a listof_String, all the lines that the string is found on + """ results = [] log = codecs.open(test_data.common_log_path, "r", "utf_8") for line in log: @@ -1411,11 +1661,18 @@ def image_type(image_file): return IMGTYPE.SPLIT else: return IMGTYPE.UNKNOWN - -# Search through all the known log files for a specific string. -# Returns a list of all lines with that string + def search_logs(string, test_data): - logs_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs") + """Search through all the known log files for a given string. + + Args: + string: the String to search for. + test_data: the TestData that holds the logs to search. + + Returns: + a listof_String, the lines that contained the given String. + """ + logs_path = test_data.logs_dir results = [] for file in os.listdir(logs_path): log = codecs.open(Emailer.make_path(logs_path, file), "r", "utf_8") @@ -1424,11 +1681,19 @@ def search_logs(string, test_data): results.append(line) log.close() return results - -# Searches the given log for the given string -# Returns a list of all lines with that string + def search_log(log, string, test_data): - logs_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs", log) + """Search the given log for any instances of a given string. + + Args: + log: a pathto_File, the log to search in + string: the String to search for. + test_data: the TestData that holds the log to search. + + Returns: + a listof_String, all the lines that the string is found on + """ + logs_path = Emailer.make_path(test_data.logs_dir, log) try: results = [] log = codecs.open(logs_path, "r", "utf_8") @@ -1444,7 +1709,17 @@ def search_log(log, string, test_data): # Search through all the the logs of the given type # Types include autopsy, tika, and solr def search_log_set(type, string, test_data): - logs_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs") + """Search through all logs to the given type for the given string. + + Args: + type: the type of log to search in. + string: the String to search for. + test_data: the TestData containing the logs to search. + + Returns: + a listof_String, the lines on which the String was found. + """ + logs_path = test_data.logs_dir results = [] for file in os.listdir(logs_path): if type in file: @@ -1454,10 +1729,16 @@ def search_log_set(type, string, test_data): results.append(line) log.close() return results - -# Print a report for the given errors with the report name as name -# and if no errors are found, print the okay message + def print_report(test_data, errors, name, okay): + """Print a report with the specified information. + + Args: + test_data: the TestData to report about. + errors: a listof_String, the errors to report. + name: a String, the name of the report. + okay: the String to print when there are no errors. + """ if errors: printerror(test_data, "--------< " + name + " >----------") for error in errors: @@ -1481,10 +1762,33 @@ def printout(test_data, string): #----------------------------------# # Helper functions # #----------------------------------# -# Returns a list of all the exceptions listed in all the autopsy logs + +def get_image_name(image_file): + path_end = image_file.rfind("/") + path_end2 = image_file.rfind("\\") + ext_start = image_file.rfind(".") + if(ext_start == -1): + name = image_file + if(path_end2 != -1): + name = image_file[path_end2+1:ext_start] + elif(ext_start == -1): + name = image_file[path_end+1:] + elif(path_end == -1): + name = image_file[:ext_start] + elif(path_end!=-1 and ext_start!=-1): + name = image_file[path_end+1:ext_start] + else: + name = image_file[path_end2+1:ext_start] + return name + def get_exceptions(test_data): + """Get a list of the exceptions in the autopsy logs. + + Returns: + a listof_String, the exceptions found in the logs. + """ exceptions = [] - logs_path = Emailer.make_path(test_config.output_dir, test_data.image_name, "logs") + logs_path = test_data.logs_dir results = [] for file in os.listdir(logs_path): if "autopsy.log" in file: @@ -1496,9 +1800,13 @@ def get_exceptions(test_data): exceptions.append(line) log.close() return exceptions - -# Returns a list of all the warnings listed in the common log + def get_warnings(test_data): + """Get a list of the warnings listed in the common log. + + Returns: + listof_String, the warnings found. + """ warnings = [] common_log = codecs.open(test_data.warning_log, "r", "utf_8") for line in common_log: @@ -1510,13 +1818,14 @@ def get_warnings(test_data): def copy_logs(test_data): try: log_dir = os.path.join("..", "..", "Testing","build","test","qa-functional","work","userdir0","var","log") - shutil.copytree(log_dir, Emailer.make_local_path(test_config.output_dir, test_data.image_name, "logs")) + shutil.copytree(log_dir, test_data.logs_dir) except Exception as e: printerror(test_data,"Error: Failed to copy the logs.") printerror(test_data,str(e) + "\n") logging.warning(traceback.format_exc()) -# Clears all the files from a directory and remakes it + def clear_dir(dir): + """Clears all files from a directory and remakes it.""" try: if Emailer.dir_exists(dir): shutil.rmtree(dir) @@ -1537,24 +1846,26 @@ def del_dir(dir): printerror(test_data,"Error: Cannot delete the given directory:") printerror(test_data,dir + "\n") return False; -#Copies a given file from "ffrom" to "to" + def copy_file(ffrom, to): + """Copies a given file from "ffrom" to "to".""" try : shutil.copy(ffrom, to) except Exception as e: print(str(e)) print(traceback.format_exc()) -# Copies a directory file from "ffrom" to "to" def copy_dir(ffrom, to): + """Copies a directory file from "ffrom" to "to".""" try : if not os.path.isdir(ffrom): raise FileNotFoundException(ffrom) shutil.copytree(ffrom, to) except: raise FileNotFoundException(to) -# Returns the first file in the given directory with the given extension + def get_file_in_dir(dir, ext): + """Returns the first file in the given directory with the given extension.""" try: for file in os.listdir(dir): if file.endswith(ext): @@ -1563,9 +1874,9 @@ def get_file_in_dir(dir, ext): raise FileNotFoundException(dir) except: raise DirNotFoundException(dir) - + def find_file_in_dir(dir, name, ext): - try: + try: for file in os.listdir(dir): if file.startswith(name): if file.endswith(ext): @@ -1573,22 +1884,22 @@ def find_file_in_dir(dir, name, ext): raise FileNotFoundException(dir) except: raise DirNotFoundException(dir) - + def setDay(): global Day Day = int(strftime("%d", localtime())) - + def getLastDay(): return Day - + def getDay(): return int(strftime("%d", localtime())) - + def newDay(): return getLastDay() != getDay() -# Returns the args of the test script def usage(): + """Return the usage description of the test script.""" return """ Usage: ./regression.py [-f FILE] [OPTIONS] @@ -1597,12 +1908,12 @@ Usage: ./regression.py [-f FILE] [OPTIONS] When the -f flag is set, this script only tests a single given image. When the -l flag is set, the script looks for a configuration file, which may outsource to a new input directory and to individual images. - + Expected files: An NSRL database at: ../input/nsrl.txt-md5.idx A notable hash database at: ../input/notablehashes.txt-md5.idx A notable keyword file at: ../input/notablekeywords.xml - + Options: -r Rebuild the gold standards for the image(s) tested. -i Ignores the ../input directory and all files within it. @@ -1620,14 +1931,16 @@ Options: # versus unexpected and fatal exceptions # #------------------------------------------------------------# -# If a file cannot be found by one of the helper functions -# they will throw a FileNotFoundException unless the purpose -# is to return False class FileNotFoundException(Exception): + """ + If a file cannot be found by one of the helper functions, + they will throw a FileNotFoundException unless the purpose + is to return False. + """ def __init__(self, file): self.file = file self.strerror = "FileNotFoundException: " + file - + def print_error(self): printerror(test_data,"Error: File could not be found at:") printerror(test_data,self.file + "\n") @@ -1635,13 +1948,15 @@ class FileNotFoundException(Exception): error = "Error: File could not be found at:\n" + self.file + "\n" return error -# If a directory cannot be found by a helper function, -# it will throw this exception class DirNotFoundException(Exception): + """ + If a directory cannot be found by a helper function, + it will throw this exception + """ def __init__(self, dir): self.dir = dir self.strerror = "DirNotFoundException: " + dir - + def print_error(self): printerror(test_data, "Error: Directory could not be found at:") printerror(test_data, self.dir + "\n") @@ -1652,34 +1967,34 @@ class DirNotFoundException(Exception): ############################# # Main Testing Functions # ############################# -class TestRunner: +class TestRunner(object): def run_tests(): - """ + """Run the tests specified by the main TestConfiguration. + Executes the AutopsyIngest for each image and dispatches the results based on - the mode (rebuild or testing) + the mode (rebuild or testing) """ global parsed global errorem global failedbool global html global attachl - + test_data_list = TestRunner._generate_test_data() - + Reports.html_add_images(test_config.images) - + logres =[] - for test_data in test_data_list: + for test_data in test_data_list: TestRunner._run_autopsy_ingest(test_data) - + if test_config.args.rebuild: TestRunner.rebuild(test_data) else: logres.append(TestRunner._run_test(test_data)) - + Reports.write_html_foot() - html.close() if (len(logres)>0): failedbool = True imgfail = True @@ -1687,51 +2002,44 @@ class TestRunner: for lm in logres: for ln in lm: errorem += ln - html.close() if failedbool: passFail = False errorem += "The test output didn't match the gold standard.\n" errorem += "Autopsy test failed.\n" + html = open(test_config.html_log) attachl.insert(0, html.name) + html.close() else: errorem += "Autopsy test passed.\n" passFail = True attachl = [] - + # @@@ This fails here if we didn't parse an XML file try: Emailer.send_email(parsed, errorem, attachl, passFail) except NameError: printerror(test_data, "Could not send e-mail because of no XML file --maybe"); - - # void -> listof_TestData - def _generate_test_data(): - """ - Create a list of TestData objects, one for each image in the TestConfiguration object. - """ - test_data_list = [] - for img in test_config.images: - test_data = TestData(test_config) - test_data.image_file = str(img) - # TODO: This 0 should be be refactored out, but it will require rebuilding and changing of outputs. - test_data.image = test_config.get_image_name(test_data.image_file) - test_data.image_name = test_data.image + "(0)" - output_path = Emailer.make_path(test_config.output_dir, test_data.image_name) - test_data.autopsy_data_file = Emailer.make_path(output_path, test_data.image_name + "Autopsy_data.txt") - test_data.sorted_data_file = Emailer.make_path(output_path, "Sorted_Autopsy_data.txt") - test_data.warning_log = Emailer.make_local_path(output_path, "AutopsyLogs.txt") - test_data.antlog_dir = Emailer.make_local_path(output_path, "antlog.txt") - test_data.test_dbdump = Emailer.make_path(output_path, test_data.image_name + "Dump.txt") - test_data.common_log_path = Emailer.make_local_path(output_path, test_data.image_name + test_config.common_log) - test_data.sorted_log = Emailer.make_local_path(output_path, test_data.image_name + "SortedErrors.txt") - test_data_list.append(test_data) - return test_data_list - # TestData -> void - def _run_autopsy_ingest(test_data): + def _generate_test_data(): + """Create TestData objects for each image in the TestConfiguration object. + + Returns: + listof_TestData, the TestData object generated. """ - Run Autopsy ingest for the image in the given test data and - generate the necessary logs for rebuilding or diff. + return [ TestData(image, test_config) for image in test_config.images ] + #test_data_list = [] + #for img in test_config.images: + # test_data = TestData(test_config) + # test_data_list.append(test_data) + #return test_data_list + + def _run_autopsy_ingest(test_data): + """Run Autopsy ingest for the image in the given TestData. + + Also generates the necessary logs for rebuilding or diff. + + Args: + test_data: the TestData to run the ingest on. """ global parsed global imgfail @@ -1741,7 +2049,7 @@ class TestRunner: printerror(test_data, "Error: Image type is unrecognized:") printerror(test_data, test_data.image_file + "\n") return - + logging.debug("--------------------") logging.debug(test_data.image_name) logging.debug("--------------------") @@ -1757,84 +2065,78 @@ class TestRunner: TestRunner._handle_solr(test_data) TestRunner._handle_exception(test_data) - + #TODO: figure out return type of _run_test (logres) - # TestData -> ??? def _run_test(test_data): - """ - Run Autopsy ingest for the image in test_data and run necessary comparisons + """Compare the results of the output to the gold standard. + + Args: + test_data: the TestData + + Returns: + logres? """ # Look for core exceptions # @@@ Should be moved to TestResultsDiffer, but it didn't know about logres -- need to look into that logres = Logs.search_common_log("TskCoreException", test_data) - # @@@ We only need to create this here so that it can be passed into the - # Report because it stores results. Results should be stored somewhere else - # and then this can get pushed into only the diffing code. - databaseDiff = TskDbDiff(test_data) + TestResultsDiffer.run_diff(test_data) - # Now either diff or rebuild - if not test_config.args.rebuild: - TestResultsDiffer.run_diff(test_data, databaseDiff) - # If running in rebuild mode (-r) - else: - TestRunner.rebuild(test_data) - - # @@@ COnsider if we want to do this for a rebuild. + # @@@ COnsider if we want to do this for a rebuild. # Make the CSV log and the html log viewer - Reports.generate_reports(test_config.csv, databaseDiff, test_data) + Reports.generate_reports(test_config.csv, test_data) # Reset the test_config and return the tests sucessfully finished - clear_dir(Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "ModuleOutput", "keywordsearch")) if(failedbool): attachl.append(test_data.common_log_path) return logres - - # TestData -> void + def _handle_solr(test_data): - """ - Clean up SOLR index if in keep mode (-k) + """Clean up SOLR index if in keep mode (-k). + + Args: + test_data: the TestData """ if not test_config.args.keep: - solr_index = Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "ModuleOutput", "KeywordSearch") - if clear_dir(solr_index): + if clear_dir(test_data.solr_index): print_report(test_data, [], "DELETE SOLR INDEX", "Solr index deleted.") else: print_report(test_data, [], "KEEP SOLR INDEX", "Solr index has been kept.") - # TestData -> void def _handle_exception(test_data): - """ - If running in exception mode, print exceptions to log + """If running in exception mode, print exceptions to log. + + Args: + test_data: the TestData """ if test_config.args.exception: exceptions = search_logs(test_config.args.exception_string, test_data) okay = "No warnings or exceptions found containing text '" + test_config.args.exception_string + "'." print_report(test_data, exceptions, "EXCEPTION", okay) - - # Rebuilds the gold standards by copying the test-generated database - # and html report files into the gold directory. Autopsy has already been run + def rebuild(test_data): + """Rebuild the gold standard with the given TestData. + + Copies the test-generated database and html report files into the gold directory. + """ # Errors to print errors = [] # Delete the current gold standards gold_dir = test_config.img_gold clear_dir(test_config.img_gold) tmpdir = Emailer.make_path(gold_dir, test_data.image_name) - dbinpth = Emailer.make_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, test_case.test_db_file) - dboutpth = Emailer.make_path(tmpdir, test_config.test_db_file) + dbinpth = test_data.get_db_path(DBType.OUTPUT) + dboutpth = Emailer.make_path(tmpdir, DB_FILENAME) dataoutpth = Emailer.make_path(tmpdir, test_data.image_name + "SortedData.txt") - dbdumpinpth = test_data.test_dbdump + dbdumpinpth = test_data.get_db_dump_path(DBType.OUTPUT) dbdumpoutpth = Emailer.make_path(tmpdir, test_data.image_name + "DBDump.txt") if not os.path.exists(test_config.img_gold): os.makedirs(test_config.img_gold) - if not os.path.exists(gold_dir): - os.makedirs(gold_dir) if not os.path.exists(tmpdir): os.makedirs(tmpdir) try: copy_file(dbinpth, dboutpth) - if Emailer.file_exists(test_data.sorted_data_file): - copy_file(test_data.sorted_data_file, dataoutpth) + if Emailer.file_exists(test_data.get_sorted_data_path(DBType.OUTPUT)): + copy_file(test_data.get_sorted_data_path(DBType.OUTPUT), dataoutpth) copy_file(dbdumpinpth, dbdumpoutpth) error_pth = Emailer.make_path(tmpdir, test_data.image_name+"SortedErrors.txt") copy_file(test_data.sorted_log, error_pth) @@ -1843,22 +2145,13 @@ class TestRunner: print(str(e)) print(traceback.format_exc()) # Rebuild the HTML report - htmlfolder = "" - for fs in os.listdir(os.path.join(os.getcwd(),test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports")): - if os.path.isdir(os.path.join(os.getcwd(), test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", fs)): - htmlfolder = fs - autopsy_html_path = Emailer.make_local_path(test_config.output_dir, test_data.image_name, test_config.Img_Test_Folder, "Reports", htmlfolder) - - html_path = Emailer.make_path(test_config.output_dir, test_data.image_name, - test_config.Img_Test_Folder, "Reports") + output_html_report_dir = test_data.get_html_report_path(DBType.OUTPUT) + gold_html_report_dir = Emailer.make_path(tmpdir, "Report") + try: - if not os.path.exists(Emailer.make_path(tmpdir, htmlfolder)): - os.makedirs(Emailer.make_path(tmpdir, htmlfolder)) - for file in os.listdir(autopsy_html_path): - html_to = Emailer.make_path(tmpdir, file.replace("HTML Report", "Report")) - copy_dir(get_file_in_dir(autopsy_html_path, file), html_to) + copy_dir(output_html_report_dir, gold_html_report_dir) except FileNotFoundException as e: - errors.append(e.error) + errors.append(e.error()) except Exception as e: errors.append("Error: Unknown fatal error when rebuilding the gold html report.") errors.append(str(e) + "\n") @@ -1882,9 +2175,14 @@ class TestRunner: for file in files: zip.write(os.path.join(root, file)) - # Tests Autopsy with RegressionTest.java by by running - # the build.xml file through ant def _run_ant(test_data): + """Construct and run the ant build command for the given TestData. + + Tests Autopsy by calling RegressionTest.java via the ant build file. + + Args: + test_data: the TestData + """ # Set up the directories test_config_path = os.path.join(test_config.output_dir, test_data.image_name) if Emailer.dir_exists(test_config_path): @@ -1902,11 +2200,12 @@ class TestRunner: test_config.ant.append("-Dknown_bad_path=" + test_config.known_bad_path) test_config.ant.append("-Dkeyword_path=" + test_config.keyword_path) test_config.ant.append("-Dnsrl_path=" + test_config.nsrl_path) - test_config.ant.append("-Dgold_path=" + Emailer.make_path(test_config.gold)) - test_config.ant.append("-Dout_path=" + Emailer.make_local_path(test_config.output_dir, test_data.image_name)) + test_config.ant.append("-Dgold_path=" + test_config.gold) + test_config.ant.append("-Dout_path=" + + Emailer.make_local_path(test_data.output_path)) test_config.ant.append("-Dignore_unalloc=" + "%s" % test_config.args.unallocated) test_config.ant.append("-Dtest.timeout=" + str(test_config.timeout)) - + printout(test_data, "Ingesting Image:\n" + test_data.image_file + "\n") printout(test_data, "CMD: " + " ".join(test_config.ant)) printout(test_data, "Starting test...\n") @@ -1918,7 +2217,7 @@ class TestRunner: theproc = subprocess.Popen(test_config.ant, shell = True, stdout=subprocess.PIPE) theproc.communicate() antout.close() - + #----------------------# # Main # #----------------------# @@ -1962,10 +2261,9 @@ def main(): class OS: LINUX, MAC, WIN, CYGWIN = range(4) - + if __name__ == "__main__": global SYS - print(DBType.OUTPUT) if _platform == "linux" or _platform == "linux2": SYS = OS.LINUX elif _platform == "darwin": @@ -1974,7 +2272,7 @@ if __name__ == "__main__": SYS = OS.WIN elif _platform == "cygwin": SYS = OS.CYGWIN - + if SYS is OS.WIN or SYS is OS.CYGWIN: main() else: