diff --git a/test/script/Emailer.py b/test/script/Emailer.py
index d1accc0e6a..5d12e6afa3 100644
--- a/test/script/Emailer.py
+++ b/test/script/Emailer.py
@@ -1,124 +1,49 @@
-import smtplib
-from email.mime.image import MIMEImage
-from email.mime.multipart import MIMEMultipart
-from email.mime.text import MIMEText
-from email.mime.base import MIMEBase
-from email import encoders
-import xml
-from time import localtime, strftime
-from xml.dom.minidom import parse, parseString
-import subprocess
-import sys
-import os
-
-def send_email(parsed, errorem, attachl, passFail):
- element = parsed.getElementsByTagName("email")
- if(len(element)<=0):
- return
- element = element[0]
- toval = element.getAttribute("value").encode().decode("utf_8")
- if(toval==None):
- return
- element = parsed.getElementsByTagName("mail_server")[0]
- serverval = element.getAttribute("value").encode().decode("utf_8")
- # Create the container (outer) email message.
- msg = MIMEMultipart()
- element = parsed.getElementsByTagName("subject")[0]
- subval = element.getAttribute("value").encode().decode("utf_8")
- if(passFail):
- msg['Subject'] = '[Test]Autopsy ' + subval + ' test passed.'
- else:
- msg['Subject'] = '[Test]Autopsy ' + subval + ' test failed.'
- # me == the sender's email address
- # family = the list of all recipients' email addresses
- msg['From'] = 'AutopsyTest'
- msg['To'] = toval
- msg.preamble = 'This is a test'
- container = MIMEText(errorem, 'plain')
- msg.attach(container)
- Build_email(msg, attachl)
- s = smtplib.SMTP(serverval)
- try:
- print('Sending Email')
- s.sendmail(msg['From'], msg['To'], msg.as_string())
- except Exception as e:
- print(str(e))
- s.quit()
-
-def Build_email(msg, attachl):
- for file in attachl:
- part = MIMEBase('application', "octet-stream")
- atach = open(file, "rb")
- attch = atach.read()
- noml = file.split("\\")
- nom = noml[len(noml)-1]
- part.set_payload(attch)
- encoders.encode_base64(part)
- part.add_header('Content-Disposition', 'attachment; filename="' + nom + '"')
- msg.attach(part)
-
-# Returns a Windows style path starting with the cwd and
-# ending with the list of directories given
-def make_local_path(*dirs):
- path = wgetcwd().decode("utf-8")
- for dir in dirs:
- path += ("\\" + str(dir))
- return path_fix(path)
-
-# Returns a Windows style path based only off the given directories
-def make_path(*dirs):
- path = dirs[0]
- for dir in dirs[1:]:
- path += ("\\" + str(dir))
- return path_fix(path)
-
-# Fix a standard os.path by making it Windows format
-def path_fix(path):
- return path.replace("/", "\\")
-
-# Gets the true current working directory instead of Cygwin's
-def wgetcwd():
- proc = subprocess.Popen(("cygpath", "-m", os.getcwd()), stdout=subprocess.PIPE)
- out,err = proc.communicate()
- tst = out.rstrip()
- if os.getcwd == tst:
- return os.getcwd
- else:
- proc = subprocess.Popen(("cygpath", "-m", os.getcwd()), stdout=subprocess.PIPE)
- out,err = proc.communicate()
- return out.rstrip()
-# Verifies a file's existance
-def file_exists(file):
- try:
- if os.path.exists(file):
- return os.path.isfile(file)
- except:
- return False
-
-# Verifies a directory's existance
-def dir_exists(dir):
- try:
- return os.path.exists(dir)
- except:
- return False
-
-
-
-# Returns the nth word in the given string or "" if n is out of bounds
-# n starts at 0 for the first word
-def get_word_at(string, n):
- words = string.split(" ")
- if len(words) >= n:
- return words[n]
- else:
- return ""
-
-# Returns true if the given file is one of the required input files
-# for ingest testing
-def required_input_file(name):
- if ((name == "notablehashes.txt-md5.idx") or
- (name == "notablekeywords.xml") or
- (name == "nsrl.txt-md5.idx")):
- return True
- else:
- return False
\ No newline at end of file
+import smtplib
+from email.mime.image import MIMEImage
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email.mime.base import MIMEBase
+from email import encoders
+import xml
+from xml.dom.minidom import parse, parseString
+
+def send_email(to, server, subj, body, attachments):
+ """Send an email with the given information.
+
+ Args:
+ to: a String, the email address to send the email to
+ server: a String, the mail server to send from
+ subj: a String, the subject line of the message
+ body: a String, the body of the message
+ attachments: a listof_pathto_File, the attachements to include
+ """
+ msg = MIMEMultipart()
+ msg['Subject'] = subj
+ # me == the sender's email address
+ # family = the list of all recipients' email addresses
+ msg['From'] = 'AutopsyTest'
+ msg['To'] = to
+ msg.preamble = 'This is a test'
+ container = MIMEText(body, 'plain')
+ msg.attach(container)
+ Build_email(msg, attachments)
+ s = smtplib.SMTP(server)
+ try:
+ print('Sending Email')
+ s.sendmail(msg['From'], msg['To'], msg.as_string())
+ except Exception as e:
+ print(str(e))
+ s.quit()
+
+def Build_email(msg, attachments):
+ for file in attachments:
+ part = MIMEBase('application', "octet-stream")
+ atach = open(file, "rb")
+ attch = atach.read()
+ noml = file.split("\\")
+ nom = noml[len(noml)-1]
+ part.set_payload(attch)
+ encoders.encode_base64(part)
+ part.add_header('Content-Disposition', 'attachment; filename="' + nom + '"')
+ msg.attach(part)
+
diff --git a/test/script/regression.py b/test/script/regression.py
index ddaade4ec1..93b68457f6 100644
--- a/test/script/regression.py
+++ b/test/script/regression.py
@@ -42,6 +42,7 @@ import zipfile
import zlib
import Emailer
import srcupdater
+from regression_utils import *
#
# Please read me...
@@ -56,13 +57,6 @@ import srcupdater
# Variable, function, and class names are written in Python conventions:
# this_is_a_variable this_is_a_function() ThisIsAClass
#
-# All variables that are needed throughout the script have been initialized
-# in a global class.
-# - Command line arguments are in Args (named args)
-# - Global Test Configuration is in TestConfiguration(named test_config)
-# - Queried information from the databases is in TskDbDiff (named database)
-# Feel free to add additional global classes or add to the existing ones,
-# but do not overwrite any existing variables as they are used frequently.
#
@@ -76,14 +70,6 @@ import srcupdater
# Image: An image
#
-#####
-# Enumeration definition (python 3.2 doesn't have enumerations, this is a common solution
-# that allows you to access a named enum in a Java-like style, i.e. Numbers.ONE)
-#####
-def enum(*seq, **named):
- enums = dict(zip(seq, range(len(seq))), **named)
- return type('Enum', (), enums)
-
# Enumeration of database types used for the simplification of generating database paths
DBType = enum('OUTPUT', 'GOLD', 'BACKUP')
@@ -102,103 +88,459 @@ AUTOPSY_TEST_CASE = "AutopsyTestCase"
COMMON_LOG = "AutopsyErrors.txt"
Day = 0
-#-------------------------------------------------------------#
-# Parses argv and stores booleans to match command line input #
-#-------------------------------------------------------------#
-class Args(object):
- """A container for command line options and arguments.
+
+#----------------------#
+# Main #
+#----------------------#
+def main():
+ """Parse the command-line arguments, create the configuration, and run the tests."""
+ args = Args()
+ parse_result = args.parse()
+ test_config = TestConfiguration(args)
+ # The arguments were given wrong:
+ if not parse_result:
+ return
+ if(not args.fr):
+ antin = ["ant"]
+ antin.append("-f")
+ antin.append(os.path.join("..","..","build.xml"))
+ antin.append("test-download-imgs")
+ if SYS is OS.CYGWIN:
+ subprocess.call(antin)
+ elif SYS is OS.WIN:
+ theproc = subprocess.Popen(antin, shell = True, stdout=subprocess.PIPE)
+ theproc.communicate()
+ # Otherwise test away!
+ TestRunner.run_tests(test_config)
+
+
+class TestRunner(object):
+ """A collection of functions to run the regression tests."""
+
+ def run_tests(test_config):
+ """Run the tests specified by the main TestConfiguration.
+
+ Executes the AutopsyIngest for each image and dispatches the results based on
+ the mode (rebuild or testing)
+ """
+ test_data_list = [ TestData(image, test_config) for image in test_config.images ]
+
+ Reports.html_add_images(test_config.html_log, test_config.images)
+
+ logres =[]
+ for test_data in test_data_list:
+ Errors.clear_print_logs()
+ Errors.set_testing_phase(test_data.image)
+ if not (test_config.args.rebuild or os.path.exists(test_data.gold_archive)):
+ msg = "Gold standard doesn't exist, skipping image:"
+ Errors.print_error(msg)
+ Errors.print_error(test_data.gold_archive)
+ continue
+ TestRunner._run_autopsy_ingest(test_data)
+
+ if test_config.args.rebuild:
+ TestRunner.rebuild(test_data)
+ else:
+ logres.append(TestRunner._run_test(test_data))
+ test_data.printout = Errors.printout
+ test_data.printerror = Errors.printerror
+
+ Reports.write_html_foot(test_config.html_log)
+ # TODO: move this elsewhere
+ if (len(logres)>0):
+ for lm in logres:
+ for ln in lm:
+ Errors.add_email_msg(ln)
+
+ # TODO: possibly worth putting this in a sub method
+ if all([ test_data.overall_passed for test_data in test_data_list ]):
+ Errors.add_email_msg("All images passed.\n")
+ else:
+ msg = "The following images failed:\n"
+ for test_data in test_data_list:
+ if not test_data.overall_passed:
+ msg += "\t" + test_data.image + "\n"
+ Errors.add_email_msg(msg)
+ html = open(test_config.html_log)
+ Errors.add_email_attachment(html.name)
+ html.close()
+
+ if test_config.email_enabled:
+ Emailer.send_email(test_config.mail_to, test_config.mail_server,
+ test_config.mail_subject, Errors.email_body, Errors.email_attachs)
+
+ def _run_autopsy_ingest(test_data):
+ """Run Autopsy ingest for the image in the given TestData.
+
+ Also generates the necessary logs for rebuilding or diff.
+
+ Args:
+ test_data: the TestData to run the ingest on.
+ """
+ if image_type(test_data.image_file) == IMGTYPE.UNKNOWN:
+ Errors.print_error("Error: Image type is unrecognized:")
+ Errors.print_error(test_data.image_file + "\n")
+ return
+
+ logging.debug("--------------------")
+ logging.debug(test_data.image_name)
+ logging.debug("--------------------")
+ TestRunner._run_ant(test_data)
+ time.sleep(2) # Give everything a second to process
+
+ # Dump the database before we diff or use it for rebuild
+ TskDbDiff.dump_output_db(test_data)
+
+ # merges logs into a single log for later diff / rebuild
+ copy_logs(test_data)
+ Logs.generate_log_data(test_data)
+
+ TestRunner._handle_solr(test_data)
+ TestRunner._handle_exception(test_data)
+
+ #TODO: figure out return type of _run_test (logres)
+ def _run_test(test_data):
+ """Compare the results of the output to the gold standard.
+
+ Args:
+ test_data: the TestData
+
+ Returns:
+ logres?
+ """
+ TestRunner._extract_gold(test_data)
+
+ # Look for core exceptions
+ # @@@ Should be moved to TestResultsDiffer, but it didn't know about logres -- need to look into that
+ logres = Logs.search_common_log("TskCoreException", test_data)
+
+ TestResultsDiffer.run_diff(test_data)
+ test_data.overall_passed = (test_data.html_report_passed and
+ test_data.errors_diff_passed and test_data.sorted_data_passed and
+ test_data.db_dump_passed and test_data.db_diff_results.passed)
+
+ Reports.generate_reports(test_data)
+ if(not test_data.overall_passed):
+ Errors.add_email_attachment(test_data.common_log_path)
+ return logres
+
+ def _extract_gold(test_data):
+ """Extract gold archive file to output/gold/tmp/
+
+ Args:
+ test_data: the TestData
+ """
+ extrctr = zipfile.ZipFile(test_data.gold_archive, 'r', compression=zipfile.ZIP_DEFLATED)
+ extrctr.extractall(test_data.main_config.gold)
+ extrctr.close
+ time.sleep(2)
+
+ def _handle_solr(test_data):
+ """Clean up SOLR index if in keep mode (-k).
+
+ Args:
+ test_data: the TestData
+ """
+ if not test_data.main_config.args.keep:
+ if clear_dir(test_data.solr_index):
+ print_report([], "DELETE SOLR INDEX", "Solr index deleted.")
+ else:
+ print_report([], "KEEP SOLR INDEX", "Solr index has been kept.")
+
+ def _handle_exception(test_data):
+ """If running in exception mode, print exceptions to log.
+
+ Args:
+ test_data: the TestData
+ """
+ if test_data.main_config.args.exception:
+ exceptions = search_logs(test_data.main_config.args.exception_string, test_data)
+ okay = ("No warnings or exceptions found containing text '" +
+ test_data.main_config.args.exception_string + "'.")
+ print_report(exceptions, "EXCEPTION", okay)
+
+ def rebuild(test_data):
+ """Rebuild the gold standard with the given TestData.
+
+ Copies the test-generated database and html report files into the gold directory.
+ """
+ test_config = test_data.main_config
+ # Errors to print
+ errors = []
+ # Delete the current gold standards
+ gold_dir = test_config.img_gold
+ clear_dir(test_config.img_gold)
+ tmpdir = make_path(gold_dir, test_data.image_name)
+ dbinpth = test_data.get_db_path(DBType.OUTPUT)
+ dboutpth = make_path(tmpdir, DB_FILENAME)
+ dataoutpth = make_path(tmpdir, test_data.image_name + "SortedData.txt")
+ dbdumpinpth = test_data.get_db_dump_path(DBType.OUTPUT)
+ dbdumpoutpth = make_path(tmpdir, test_data.image_name + "DBDump.txt")
+ if not os.path.exists(test_config.img_gold):
+ os.makedirs(test_config.img_gold)
+ if not os.path.exists(tmpdir):
+ os.makedirs(tmpdir)
+ try:
+ copy_file(dbinpth, dboutpth)
+ if file_exists(test_data.get_sorted_data_path(DBType.OUTPUT)):
+ copy_file(test_data.get_sorted_data_path(DBType.OUTPUT), dataoutpth)
+ copy_file(dbdumpinpth, dbdumpoutpth)
+ error_pth = make_path(tmpdir, test_data.image_name+"SortedErrors.txt")
+ copy_file(test_data.sorted_log, error_pth)
+ except Exception as e:
+ Errors.print_error(str(e))
+ print(str(e))
+ print(traceback.format_exc())
+ # Rebuild the HTML report
+ output_html_report_dir = test_data.get_html_report_path(DBType.OUTPUT)
+ gold_html_report_dir = make_path(tmpdir, "Report")
+
+ try:
+ copy_dir(output_html_report_dir, gold_html_report_dir)
+ except FileNotFoundException as e:
+ errors.append(e.error())
+ except Exception as e:
+ errors.append("Error: Unknown fatal error when rebuilding the gold html report.")
+ errors.append(str(e) + "\n")
+ print(traceback.format_exc())
+ oldcwd = os.getcwd()
+ zpdir = gold_dir
+ os.chdir(zpdir)
+ os.chdir("..")
+ img_gold = "tmp"
+ img_archive = make_path(test_data.image_name+"-archive.zip")
+ comprssr = zipfile.ZipFile(img_archive, 'w',compression=zipfile.ZIP_DEFLATED)
+ TestRunner.zipdir(img_gold, comprssr)
+ comprssr.close()
+ os.chdir(oldcwd)
+ del_dir(test_config.img_gold)
+ okay = "Sucessfully rebuilt all gold standards."
+ print_report(errors, "REBUILDING", okay)
+
+ def zipdir(path, zip):
+ for root, dirs, files in os.walk(path):
+ for file in files:
+ zip.write(os.path.join(root, file))
+
+ def _run_ant(test_data):
+ """Construct and run the ant build command for the given TestData.
+
+ Tests Autopsy by calling RegressionTest.java via the ant build file.
+
+ Args:
+ test_data: the TestData
+ """
+ test_config = test_data.main_config
+ # Set up the directories
+ if dir_exists(test_data.output_path):
+ shutil.rmtree(test_data.output_path)
+ os.makedirs(test_data.output_path)
+ test_data.ant = ["ant"]
+ test_data.ant.append("-v")
+ test_data.ant.append("-f")
+ # case.ant.append(case.build_path)
+ test_data.ant.append(os.path.join("..","..","Testing","build.xml"))
+ test_data.ant.append("regression-test")
+ test_data.ant.append("-l")
+ test_data.ant.append(test_data.antlog_dir)
+ test_data.ant.append("-Dimg_path=" + test_data.image_file)
+ test_data.ant.append("-Dknown_bad_path=" + test_config.known_bad_path)
+ test_data.ant.append("-Dkeyword_path=" + test_config.keyword_path)
+ test_data.ant.append("-Dnsrl_path=" + test_config.nsrl_path)
+ test_data.ant.append("-Dgold_path=" + test_config.gold)
+ test_data.ant.append("-Dout_path=" +
+ make_local_path(test_data.output_path))
+ test_data.ant.append("-Dignore_unalloc=" + "%s" % test_config.args.unallocated)
+ test_data.ant.append("-Dtest.timeout=" + str(test_config.timeout))
+
+ Errors.print_out("Ingesting Image:\n" + test_data.image_file + "\n")
+ Errors.print_out("CMD: " + " ".join(test_data.ant))
+ Errors.print_out("Starting test...\n")
+ antoutpth = make_local_path(test_data.main_config.output_dir, "antRunOutput.txt")
+ antout = open(antoutpth, "a")
+ if SYS is OS.CYGWIN:
+ subprocess.call(test_data.ant, stdout=subprocess.PIPE)
+ elif SYS is OS.WIN:
+ theproc = subprocess.Popen(test_data.ant, shell = True, stdout=subprocess.PIPE)
+ theproc.communicate()
+ antout.close()
+
+
+class TestData(object):
+ """Container for the input and output of a single image.
+
+ Represents data for the test of a single image, including path to the image,
+ database paths, etc.
Attributes:
- single: a boolean indicating whether to run in single file mode
- single_file: an Image to run the test on
- rebuild: a boolean indicating whether to run in rebuild mode
- list: a boolean indicating a config file was specified
- unallocated: a boolean indicating unallocated space should be ignored
- ignore: a boolean indicating the input directory should be ingnored
- keep: a boolean indicating whether to keep the SOLR index
- verbose: a boolean indicating whether verbose output should be printed
- exeception: a boolean indicating whether errors containing exception
- exception_string should be printed
- exception_sring: a String representing and exception name
- fr: a boolean indicating whether gold standard images will be downloaded
+ main_config: the global TestConfiguration
+ ant: a listof_String, the ant command for this TestData
+ image_file: a pathto_Image, the image for this TestData
+ image: a String, the image file's name
+ image_name: a String, the image file's name with a trailing (0)
+ output_path: pathto_Dir, the output directory for this TestData
+ autopsy_data_file: a pathto_File, the IMAGE_NAMEAutopsy_data.txt file
+ warning_log: a pathto_File, the AutopsyLogs.txt file
+ antlog_dir: a pathto_File, the antlog.txt file
+ test_dbdump: a pathto_File, the database dump, IMAGENAMEDump.txt
+ common_log_path: a pathto_File, the IMAGE_NAMECOMMON_LOG file
+ sorted_log: a pathto_File, the IMAGENAMESortedErrors.txt file
+ reports_dir: a pathto_Dir, the AutopsyTestCase/Reports folder
+ gold_data_dir: a pathto_Dir, the gold standard directory
+ gold_archive: a pathto_File, the gold standard archive
+ logs_dir: a pathto_Dir, the location where autopsy logs are stored
+ solr_index: a pathto_Dir, the locatino of the solr index
+ db_diff_results: a DiffResults, the results of the database comparison
+ html_report_passed: a boolean, did the HTML report diff pass?
+ errors_diff_passed: a boolean, did the error diff pass?
+ db_dump_passed: a boolean, did the db dump diff pass?
+ overall_passed: a boolean, did the test pass?
+ total_test_time: a String representation of the test duration
+ start_date: a String representation of this TestData's start date
+ end_date: a String representation of the TestData's end date
+ total_ingest_time: a String representation of the total ingest time
+ artifact_count: a Nat, the number of artifacts
+ artifact_fail: a Nat, the number of artifact failures
+ heap_space: a String representation of TODO
+ service_times: a String representation of TODO
+ autopsy_version: a String, the version of autopsy that was run
+ ingest_messages: a Nat, the number of ingest messages
+ indexed_files: a Nat, the number of files indexed during the ingest
+ indexed_chunks: a Nat, the number of chunks indexed during the ingest
+ printerror: a listof_String, the error messages printed during this TestData's test
+ printout: a listof_String, the messages pritned during this TestData's test
"""
- def __init__(self):
- self.single = False
- self.single_file = ""
- self.rebuild = False
- self.list = False
- self.config_file = ""
- self.unallocated = False
- self.ignore = False
- self.keep = False
- self.verbose = False
- self.exception = False
- self.exception_string = ""
- self.fr = False
- def parse(self):
- global nxtproc
- nxtproc = []
- nxtproc.append("python3")
- nxtproc.append(sys.argv.pop(0))
- while sys.argv:
- arg = sys.argv.pop(0)
- nxtproc.append(arg)
- if(arg == "-f"):
- #try: @@@ Commented out until a more specific except statement is added
- arg = sys.argv.pop(0)
- print("Running on a single file:")
- print(Emailer.path_fix(arg) + "\n")
- self.single = True
- self.single_file = Emailer.path_fix(arg)
- #except:
- # print("Error: No single file given.\n")
- # return False
- elif(arg == "-r" or arg == "--rebuild"):
- print("Running in rebuild mode.\n")
- self.rebuild = True
- elif(arg == "-l" or arg == "--list"):
- try:
- arg = sys.argv.pop(0)
- nxtproc.append(arg)
- print("Running from configuration file:")
- print(arg + "\n")
- self.list = True
- self.config_file = arg
- except:
- print("Error: No configuration file given.\n")
- return False
- elif(arg == "-u" or arg == "--unallocated"):
- print("Ignoring unallocated space.\n")
- self.unallocated = True
- elif(arg == "-k" or arg == "--keep"):
- print("Keeping the Solr index.\n")
- self.keep = True
- elif(arg == "-v" or arg == "--verbose"):
- print("Running in verbose mode:")
- print("Printing all thrown exceptions.\n")
- self.verbose = True
- elif(arg == "-e" or arg == "--exception"):
- try:
- arg = sys.argv.pop(0)
- nxtproc.append(arg)
- print("Running in exception mode: ")
- print("Printing all exceptions with the string '" + arg + "'\n")
- self.exception = True
- self.exception_string = arg
- except:
- print("Error: No exception string given.")
- elif arg == "-h" or arg == "--help":
- print(usage())
- return False
- elif arg == "-fr" or arg == "--forcerun":
- print("Not downloading new images")
- self.fr = True
- else:
- print(usage())
- return False
- # Return the args were sucessfully parsed
- return True
+ def __init__(self, image, main_config):
+ """Init this TestData with it's image and the test configuration.
+
+ Args:
+ image: the Image to be tested.
+ main_config: the global TestConfiguration.
+ """
+ # Configuration Data
+ self.main_config = main_config
+ self.ant = []
+ self.image_file = str(image)
+ # TODO: This 0 should be be refactored out, but it will require rebuilding and changing of outputs.
+ self.image = get_image_name(self.image_file)
+ self.image_name = self.image + "(0)"
+ # Directory structure and files
+ self.output_path = make_path(self.main_config.output_dir, self.image_name)
+ self.autopsy_data_file = make_path(self.output_path, self.image_name + "Autopsy_data.txt")
+ self.warning_log = make_local_path(self.output_path, "AutopsyLogs.txt")
+ self.antlog_dir = make_local_path(self.output_path, "antlog.txt")
+ self.test_dbdump = make_path(self.output_path, self.image_name +
+ "DBDump.txt")
+ self.common_log_path = make_local_path(self.output_path, self.image_name + COMMON_LOG)
+ self.sorted_log = make_local_path(self.output_path, self.image_name + "SortedErrors.txt")
+ self.reports_dir = make_path(self.output_path, AUTOPSY_TEST_CASE, "Reports")
+ self.gold_data_dir = make_path(self.main_config.img_gold, self.image_name)
+ self.gold_archive = make_path(self.main_config.gold,
+ self.image_name + "-archive.zip")
+ self.logs_dir = make_path(self.output_path, "logs")
+ self.solr_index = make_path(self.output_path, AUTOPSY_TEST_CASE,
+ "ModuleOutput", "KeywordSearch")
+ # Results and Info
+ self.db_diff_results = None
+ self.html_report_passed = False
+ self.errors_diff_passed = False
+ self.sorted_data_passed = False
+ self.db_dump_passed = False
+ self.overall_passed = False
+ # Ingest info
+ self.total_test_time = ""
+ self.start_date = ""
+ self.end_date = ""
+ self.total_ingest_time = ""
+ self.artifact_count = 0
+ self.artifact_fail = 0
+ self.heap_space = ""
+ self.service_times = ""
+ self.autopsy_version = ""
+ self.ingest_messages = 0
+ self.indexed_files = 0
+ self.indexed_chunks = 0
+ # Error tracking
+ self.printerror = []
+ self.printout = []
+
+ def ant_to_string(self):
+ string = ""
+ for arg in self.ant:
+ string += (arg + " ")
+ return string
+
+ def get_db_path(self, db_type):
+ """Get the path to the database file that corresponds to the given DBType.
+
+ Args:
+ DBType: the DBType of the path to be generated.
+ """
+ if(db_type == DBType.GOLD):
+ db_path = make_path(self.gold_data_dir, DB_FILENAME)
+ elif(db_type == DBType.OUTPUT):
+ db_path = make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, DB_FILENAME)
+ else:
+ db_path = make_path(self.main_config.output_dir, self.image_name, AUTOPSY_TEST_CASE, BACKUP_DB_FILENAME)
+ return db_path
+
+ def get_html_report_path(self, html_type):
+ """Get the path to the HTML Report folder that corresponds to the given DBType.
+
+ Args:
+ DBType: the DBType of the path to be generated.
+ """
+ if(html_type == DBType.GOLD):
+ return make_path(self.gold_data_dir, "Report")
+ else:
+ # Autopsy creates an HTML report folder in the form AutopsyTestCase DATE-TIME
+ # It's impossible to get the exact time the folder was created, but the folder
+ # we are looking for is the only one in the self.reports_dir folder
+ html_path = ""
+ for fs in os.listdir(self.reports_dir):
+ html_path = make_path(self.reports_dir, fs)
+ if os.path.isdir(html_path):
+ break
+ return make_path(html_path, os.listdir(html_path)[0])
+
+ def get_sorted_data_path(self, file_type):
+ """Get the path to the SortedData file that corresponds to the given DBType.
+
+ Args:
+ file_type: the DBType of the path to be generated
+ """
+ return self._get_path_to_file(file_type, "SortedData.txt")
+
+ def get_sorted_errors_path(self, file_type):
+ """Get the path to the SortedErrors file that correspodns to the given
+ DBType.
+
+ Args:
+ file_type: the DBType of the path to be generated
+ """
+ return self._get_path_to_file(file_type, "SortedErrors.txt")
+
+ def get_db_dump_path(self, file_type):
+ """Get the path to the DBDump file that corresponds to the given DBType.
+
+ Args:
+ file_type: the DBType of the path to be generated
+ """
+ return self._get_path_to_file(file_type, "DBDump.txt")
+
+ def _get_path_to_file(self, file_type, file_name):
+ """Get the path to the specified file with the specified type.
+
+ Args:
+ file_type: the DBType of the path to be generated
+ file_name: a String, the filename of the path to be generated
+ """
+ full_filename = self.image_name + file_name
+ if(file_type == DBType.GOLD):
+ return make_path(self.gold_data_dir, full_filename)
+ else:
+ return make_path(self.output_path, full_filename)
class TestConfiguration(object):
@@ -239,115 +581,40 @@ class TestConfiguration(object):
self.args = args
# Paths:
self.output_dir = ""
- self.input_dir = Emailer.make_local_path("..","input")
- self.gold = Emailer.make_path("..", "output", "gold")
- self.img_gold = Emailer.make_path(self.gold, 'tmp')
+ self.input_dir = make_local_path("..","input")
+ self.gold = make_path("..", "output", "gold")
+ self.img_gold = make_path(self.gold, 'tmp')
# Logs:
self.csv = ""
self.global_csv = ""
self.html_log = ""
# Ant info:
- self.known_bad_path = ""
- self.keyword_path = ""
- self.nsrl_path = ""
- self.build_path = ""
- # test_config info
- self.autopsy_version = ""
- self.ingest_messages = 0
- self.indexed_files = 0
- self.indexed_chunks = 0
+ self.known_bad_path = make_path(self.input_dir, "notablehashes.txt-md5.idx")
+ self.keyword_path = make_path(self.input_dir, "notablekeywords.xml")
+ self.nsrl_path = make_path(self.input_dir, "nsrl.txt-md5.idx")
+ self.build_path = make_path("..", "build.xml")
# Infinite Testing info
timer = 0
self.images = []
+ # Email info
+ self.email_enabled = False
+ self.mail_server = ""
+ self.mail_to = ""
+ self.mail_subject = ""
# Set the timeout to something huge
# The entire tester should not timeout before this number in ms
# However it only seems to take about half this time
# And it's very buggy, so we're being careful
self.timeout = 24 * 60 * 60 * 1000 * 1000
- self.ant = []
- # Initialize Attributes
- self._init_logs()
- self._init_imgs()
- self._init_build_info()
-
-
- def ant_to_string(self):
- string = ""
- for arg in self.ant:
- string += (arg + " ")
- return string
-
- def reset(self):
- # Set the timeout to something huge
- # The entire tester should not timeout before this number in ms
- # However it only seems to take about half this time
- # And it's very buggy, so we're being careful
- self.timeout = 24 * 60 * 60 * 1000 * 1000
- self.ant = []
-
- def _init_imgs(self):
- """Initialize the list of images to run test on."""
- #Identify tests to run and populate test_config with list
- # If user wants to do a single file and a list (contradictory?)
- if self.args.single and self.args.list:
- msg = "Cannot run both from config file and on a single file."
- self._print_error(msg)
- return
- # If working from a configuration file
- if self.args.list:
- if not Emailer.file_exists(self.args.config_file):
- msg = "Configuration file does not exist at:" + self.args.config_file
- self._print_error(msg)
- return
- self._load_config_file(self.args.config_file)
- # Else if working on a single file
- elif self.args.single:
- if not Emailer.file_exists(self.args.single_file):
- msg = "Image file does not exist at: " + self.args.single_file
- self._print_error(msg)
- return
- test_config.images.append(self.args.single_file)
-
- # If user has not selected a single file, and does not want to ignore
- # the input directory, continue on to parsing ../input
- if (not self.args.single) and (not self.args.ignore) and (not self.args.list):
- self.args.config_file = "config.xml"
- if not Emailer.file_exists(self.args.config_file):
- msg = "Configuration file does not exist at: " + self.args.config_file
- self._print_error(msg)
- return
- self._load_config_file(self.args.config_file)
-
- def _init_logs(self):
- """Setup output folder, logs, and reporting infrastructure."""
- if(not Emailer.dir_exists(Emailer.make_path("..", "output", "results"))):
- os.makedirs(Emailer.make_path("..", "output", "results",))
- self.output_dir = Emailer.make_path("..", "output", "results", time.strftime("%Y.%m.%d-%H.%M.%S"))
- os.makedirs(self.output_dir)
- self.csv = Emailer.make_local_path(self.output_dir, "CSV.txt")
- self.html_log = Emailer.make_path(self.output_dir, "AutopsyTestCase.html")
- log_name = self.output_dir + "\\regression.log"
- logging.basicConfig(filename=log_name, level=logging.DEBUG)
-
- def _init_build_info(self):
- """Initializes paths that point to information necessary to run the AutopsyIngest."""
- global parsed
- if(self.args.list):
- build_elements = parsed.getElementsByTagName("build")
- if(len(build_elements) <= 0):
- build_path = Emailer.make_path("..", "build.xml")
- else:
- build_element = build_elements[0]
- build_path = build_element.getAttribute("value").encode().decode("utf_8")
- if(build_path == None):
- build_path = Emailer.make_path("..", "build.xml")
+ if not self.args.single:
+ self._load_config_file(self.args.config_file)
else:
- build_path = Emailer.make_path("..", "build.xml")
- self.build_path = build_path
- self.known_bad_path = Emailer.make_path(self.input_dir, "notablehashes.txt-md5.idx")
- self.keyword_path = Emailer.make_path(self.input_dir, "notablekeywords.xml")
- self.nsrl_path = Emailer.make_path(self.input_dir, "nsrl.txt-md5.idx")
+ self.images.append(self.args.single_file)
+ self._init_logs()
+ #self._init_imgs()
+ #self._init_build_info()
+
def _load_config_file(self, config_file):
"""Updates this TestConfiguration's attributes from the config file.
@@ -359,68 +626,96 @@ class TestConfiguration(object):
config_file: ConfigFile - the configuration file to load
"""
try:
- global parsed
- global errorem
- global attachl
count = 0
- parsed = parse(config_file)
+ parsed_config = parse(config_file)
logres = []
counts = {}
- if parsed.getElementsByTagName("indir"):
+ if parsed_config.getElementsByTagName("indir"):
self.input_dir = parsed.getElementsByTagName("indir")[0].getAttribute("value").encode().decode("utf_8")
- if parsed.getElementsByTagName("global_csv"):
+ if parsed_config.getElementsByTagName("global_csv"):
self.global_csv = parsed.getElementsByTagName("global_csv")[0].getAttribute("value").encode().decode("utf_8")
- self.global_csv = Emailer.make_local_path(self.global_csv)
- if parsed.getElementsByTagName("golddir"):
+ self.global_csv = make_local_path(self.global_csv)
+ if parsed_config.getElementsByTagName("golddir"):
self.gold = parsed.getElementsByTagName("golddir")[0].getAttribute("value").encode().decode("utf_8")
- self.img_gold = Emailer.make_path(self.gold, 'tmp')
+ self.img_gold = make_path(self.gold, 'tmp')
- # Generate the top navbar of the HTML for easy access to all images
- images = []
- for element in parsed.getElementsByTagName("image"):
- value = element.getAttribute("value").encode().decode("utf_8")
- print ("Image in Config File: " + value)
- if Emailer.file_exists(value):
- self.images.append(value)
- else:
- msg = "File: " + value + " doesn't exist"
- self._print_error(msg)
- image_count = len(images)
-
- # Sanity check to see if there are obvious gold images that we are not testing
- gold_count = 0
- for file in os.listdir(self.gold):
- if not(file == 'tmp'):
- gold_count+=1
-
- if (image_count > gold_count):
- print("******Alert: There are more input images than gold standards, some images will not be properly tested.\n")
- elif (image_count < gold_count):
- print("******Alert: There are more gold standards than input images, this will not check all gold Standards.\n")
+ self._init_imgs(parsed_config)
+ self._init_build_info(parsed_config)
+ self._init_email_info(parsed_config)
except Exception as e:
msg = "There was an error running with the configuration file.\n"
msg += "\t" + str(e)
- self._print_error(msg)
+ Errors.add_email_msg(msg)
logging.critical(traceback.format_exc())
print(traceback.format_exc())
- def _print_error(self, msg):
- """Append the given error message to the global error message and print the message to the screen.
+ def _init_logs(self):
+ """Setup output folder, logs, and reporting infrastructure."""
+ if(not dir_exists(make_path("..", "output", "results"))):
+ os.makedirs(make_path("..", "output", "results",))
+ self.output_dir = make_path("..", "output", "results", time.strftime("%Y.%m.%d-%H.%M.%S"))
+ os.makedirs(self.output_dir)
+ self.csv = make_local_path(self.output_dir, "CSV.txt")
+ self.html_log = make_path(self.output_dir, "AutopsyTestCase.html")
+ log_name = self.output_dir + "\\regression.log"
+ logging.basicConfig(filename=log_name, level=logging.DEBUG)
+
+ def _init_build_info(self, parsed_config):
+ """Initializes paths that point to information necessary to run the AutopsyIngest."""
+ build_elements = parsed_config.getElementsByTagName("build")
+ if build_elements:
+ build_element = build_elements[0]
+ build_path = build_element.getAttribute("value").encode().decode("utf_8")
+ self.build_path = build_path
+
+ def _init_imgs(self, parsed_config):
+ """Initialize the list of images to run tests on."""
+ for element in parsed_config.getElementsByTagName("image"):
+ value = element.getAttribute("value").encode().decode("utf_8")
+ print ("Image in Config File: " + value)
+ if file_exists(value):
+ self.images.append(value)
+ else:
+ msg = "File: " + value + " doesn't exist"
+ Errors.print_error(msg)
+ Errors.add_email_msg(msg)
+ image_count = len(self.images)
+
+ # Sanity check to see if there are obvious gold images that we are not testing
+ gold_count = 0
+ for file in os.listdir(self.gold):
+ if not(file == 'tmp'):
+ gold_count+=1
+
+ if (image_count > gold_count):
+ print("******Alert: There are more input images than gold standards, some images will not be properly tested.\n")
+ elif (image_count < gold_count):
+ print("******Alert: There are more gold standards than input images, this will not check all gold Standards.\n")
+
+ def _init_email_info(self, parsed_config):
+ """Initializes email information dictionary"""
+ email_elements = parsed_config.getElementsByTagName("email")
+ if email_elements:
+ mail_to = email_elements[0]
+ self.mail_to = mail_to.getAttribute("value").encode().decode("utf_8")
+ mail_server_elements = parsed_config.getElementsByTagName("mail_server")
+ if mail_server_elements:
+ mail_from = mail_server_elements[0]
+ self.mail_server = mail_from.getAttribute("value").encode().decode("utf_8")
+ subject_elements = parsed_config.getElementsByTagName("subject")
+ if subject_elements:
+ subject = subject_elements[0]
+ self.mail_subject = subject.getAttribute("value").encode().decode("utf_8")
+ if self.mail_server and self.mail_to:
+ self.email_enabled = True
+
- Args:
- msg: String - the error message to print
- """
- global errorem
- error_msg = "Configuration: " + msg
- print(error_msg)
- errorem += error_msg + "\n"
class TskDbDiff(object):
"""Represents the differences between the gold and output databases.
- Contains methods to compare two databases and internally
- store some of the results
+ Contains methods to compare two databases.
Attributes:
gold_artifacts:
@@ -431,15 +726,18 @@ class TskDbDiff(object):
autopsy_objects:
artifact_comparison:
attribute_comparision:
- test_data:
+ report_errors: a listof_listof_String, the error messages that will be
+ printed to screen in the run_diff method
+ passed: a boolean, did the diff pass?
autopsy_db_file:
gold_db_file:
"""
- def __init__(self, test_data):
+ def __init__(self, output_db_path, gold_db_path):
"""Constructor for TskDbDiff.
Args:
- test_data: TestData - the test data to compare
+ output_db_path: a pathto_File, the output database
+ gold_db_path: a pathto_File, the gold database
"""
self.gold_artifacts = []
self.autopsy_artifacts = []
@@ -449,9 +747,9 @@ class TskDbDiff(object):
self.autopsy_objects = 0
self.artifact_comparison = []
self.attribute_comparison = []
- self.test_data = test_data
- self.autopsy_db_file = self.test_data.get_db_path(DBType.OUTPUT)
- self.gold_db_file = self.test_data.get_db_path(DBType.GOLD)
+ self.report_errors = []
+ self.autopsy_db_file = output_db_path
+ self.gold_db_file = gold_db_path
def _get_artifacts(self, cursor):
"""Get a list of artifacts from the given SQLCursor.
@@ -495,16 +793,17 @@ class TskDbDiff(object):
return cursor.fetchone()[0]
def _compare_bb_artifacts(self):
- """Compares the blackboard artifact counts of two databases."""
+ """Compares the blackboard artifact counts of two databases.
+
+ Returns:
+ True if the artifacts are the same, false otherwise.
+ """
exceptions = []
+ passed = True
try:
- global failedbool
- global errorem
if self.gold_artifacts != self.autopsy_artifacts:
- failedbool = True
- global imgfail
- imgfail = True
- errorem += self.test_data.image + ":There was a difference in the number of artifacts.\n"
+ msg = "There was a difference in the number of artifacts.\n"
+ Errors.add_email_msg(msg)
rner = len(self.gold_artifacts)
for type_id in range(1, rner):
if self.gold_artifacts[type_id] != self.autopsy_artifacts[type_id]:
@@ -513,49 +812,59 @@ class TskDbDiff(object):
(self.gold_artifacts[type_id],
self.autopsy_artifacts[type_id]))
exceptions.append(error)
- return exceptions
+ passed = False
+ self.report_errors.append(exceptions)
+ return passed
except Exception as e:
- printerror(self.test_data, str(e))
+ Errors.print_error(str(e))
exceptions.append("Error: Unable to compare blackboard_artifacts.\n")
- return exceptions
+ self.report_errors.append(exceptions)
+ return False
def _compare_bb_attributes(self):
- """Compares the blackboard attribute counts of two databases."""
+ """Compares the blackboard attribute counts of two databases.
+
+ Updates this TskDbDiff's report_errors with the error messages from the
+ attribute diff
+
+ Returns:
+ True is the attributes are the same, False otherwise.
+ """
exceptions = []
+ passed = True
try:
if self.gold_attributes != self.autopsy_attributes:
error = "Attribute counts do not match. "
error += str("Gold: %d, Test: %d" % (self.gold_attributes, self.autopsy_attributes))
exceptions.append(error)
- global failedbool
- global errorem
- failedbool = True
- global imgfail
- imgfail = True
- errorem += self.test_data.image + ":There was a difference in the number of attributes.\n"
- return exceptions
+ msg = "There was a difference in the number of attributes.\n"
+ Errors.add_email_msg(msg)
+ passed = False
+ self.report_errors.append(exceptions)
+ return passed
except Exception as e:
exceptions.append("Error: Unable to compare blackboard_attributes.\n")
- return exceptions
+ self.report_errors.append(exceptions)
+ return False
def _compare_tsk_objects(self):
"""Compares the TSK object counts of two databases."""
exceptions = []
+ passed = True
try:
if self.gold_objects != self.autopsy_objects:
error = "TSK Object counts do not match. "
error += str("Gold: %d, Test: %d" % (self.gold_objects, self.autopsy_objects))
exceptions.append(error)
- global failedbool
- global errorem
- failedbool = True
- global imgfail
- imgfail = True
- errorem += self.test_data.image + ":There was a difference between the tsk object counts.\n"
- return exceptions
+ msg ="There was a difference between the tsk object counts.\n"
+ Errors.add_email_msg(msg)
+ passed = False
+ self.report_errors.append(exceptions)
+ return passed
except Exception as e:
exceptions.append("Error: Unable to compare tsk_objects.\n")
- return exceptions
+ self.report_errors.append(exceptions)
+ return False
def _get_basic_counts(self, autopsy_cur, gold_cur):
"""Count the items necessary to compare the databases.
@@ -567,9 +876,6 @@ class TskDbDiff(object):
Args:
autopsy_cur: SQLCursor - the cursor for the output database
gold_cur: SQLCursor - the cursor for the gold database
-
- Returns:
-
"""
try:
# Objects
@@ -582,7 +888,7 @@ class TskDbDiff(object):
self.gold_attributes = self._count_attributes(gold_cur)
self.autopsy_attributes = self._count_attributes(autopsy_cur)
except Exception as e:
- printerror(self.test_data, "Way out:" + str(e))
+ Errors.print_error("Way out:" + str(e))
def run_diff(self):
"""Basic test between output and gold databases.
@@ -591,13 +897,13 @@ class TskDbDiff(object):
Note: SQLITE needs unix style pathing
"""
# Check to make sure both db files exist
- if not Emailer.file_exists(self.autopsy_db_file):
- printerror(self.test_data, "Error: TskDbDiff file does not exist at:")
- printerror(self.test_data, self.autopsy_db_file + "\n")
+ if not file_exists(self.autopsy_db_file):
+ Errors.print_error("Error: TskDbDiff file does not exist at:")
+ Errors.print_error(self.autopsy_db_file + "\n")
return
- if not Emailer.file_exists(self.gold_db_file):
- printerror(self.test_data, "Error: Gold database file does not exist at:")
- printerror(self.test_data, self.gold_db_file + "\n")
+ if not file_exists(self.gold_db_file):
+ Errors.print_error("Error: Gold database file does not exist at:")
+ Errors.print_error(self.gold_db_file + "\n")
return
# Get connections and cursors to output / gold databases
@@ -613,45 +919,45 @@ class TskDbDiff(object):
autopsy_con.close()
gold_con.close()
- exceptions = []
-
# Compare counts
- exceptions.append(self._compare_tsk_objects())
- exceptions.append(self._compare_bb_artifacts())
- exceptions.append(self._compare_bb_attributes())
+ objects_passed = self._compare_tsk_objects()
+ artifacts_passed = self._compare_bb_artifacts()
+ attributes_passed = self._compare_bb_attributes()
- self.artifact_comparison = exceptions[1]
- self.attribute_comparison = exceptions[2]
+ self.passed = objects_passed and artifacts_passed and attributes_passed
+
+ self.artifact_comparison = self.report_errors[1]
+ self.attribute_comparison = self.report_errors[2]
okay = "All counts match."
- print_report(self.test_data, exceptions[0], "COMPARE TSK OBJECTS", okay)
- print_report(self.test_data, exceptions[1], "COMPARE ARTIFACTS", okay)
- print_report(self.test_data, exceptions[2], "COMPARE ATTRIBUTES", okay)
+ print_report(self.report_errors[0], "COMPARE TSK OBJECTS", okay)
+ print_report(self.report_errors[1], "COMPARE ARTIFACTS", okay)
+ print_report(self.report_errors[2], "COMPARE ATTRIBUTES", okay)
return DiffResults(self)
- def _dump_output_db_bb(autopsy_con, autopsy_db_file, test_data):
- """Dumps sorted text results to the output location stored in test_data.
+ def _dump_output_db_bb(autopsy_con, db_file, data_file, sorted_data_file):
+ """Dumps sorted text results to the given output location.
Smart method that deals with a blackboard comparison to avoid issues
with different IDs based on when artifacts were created.
Args:
autopsy_con: a SQLConn to the autopsy database.
- autopsy_db_file: a pathto_File, the output database.
- test_data: the TestData that corresponds with this dump.
+ db_file: a pathto_File, the output database.
+ data_file: a pathto_File, the dump file to write to
+ sorted_data_file: a pathto_File, the sorted dump file to write to
"""
autopsy_cur2 = autopsy_con.cursor()
- global errorem
- global attachl
- global failedbool
# Get the list of all artifacts
# @@@ Could add a SORT by parent_path in here since that is how we are going to later sort it.
autopsy_cur2.execute("SELECT tsk_files.parent_path, tsk_files.name, blackboard_artifact_types.display_name, blackboard_artifacts.artifact_id FROM blackboard_artifact_types INNER JOIN blackboard_artifacts ON blackboard_artifact_types.artifact_type_id = blackboard_artifacts.artifact_type_id INNER JOIN tsk_files ON tsk_files.obj_id = blackboard_artifacts.obj_id")
- database_log = codecs.open(test_data.autopsy_data_file, "wb", "utf_8")
+ database_log = codecs.open(data_file, "wb", "utf_8")
rw = autopsy_cur2.fetchone()
appnd = False
counter = 0
+ artifact_count = 0
+ artifact_fail = 0
# Cycle through artifacts
try:
while (rw != None):
@@ -664,7 +970,7 @@ class TskDbDiff(object):
# Get attributes for this artifact
autopsy_cur1 = autopsy_con.cursor()
looptry = True
- test_data.artifact_count += 1
+ artifact_count += 1
try:
key = ""
key = str(rw[3])
@@ -672,14 +978,14 @@ class TskDbDiff(object):
autopsy_cur1.execute("SELECT blackboard_attributes.source, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double FROM blackboard_attributes INNER JOIN blackboard_attribute_types ON blackboard_attributes.attribute_type_id = blackboard_attribute_types.attribute_type_id WHERE artifact_id =? ORDER BY blackboard_attributes.source, blackboard_attribute_types.display_name, blackboard_attributes.value_type, blackboard_attributes.value_text, blackboard_attributes.value_int32, blackboard_attributes.value_int64, blackboard_attributes.value_double", key)
attributes = autopsy_cur1.fetchall()
except Exception as e:
- printerror(test_data, str(e))
- printerror(test_data, str(rw[3]))
- print(test_data.image_name)
- errorem += test_data.image_name + ":Attributes in artifact id (in output DB)# " + str(rw[3]) + " encountered an error: " + str(e) +" .\n"
+ Errors.print_error(str(e))
+ Errors.print_error(str(rw[3]))
+ msg ="Attributes in artifact id (in output DB)# " + str(rw[3]) + " encountered an error: " + str(e) +" .\n"
+ Errors.add_email_msg(msg)
looptry = False
- print(test_data.artifact_fail)
- test_data.artifact_fail += 1
- print(test_data.artifact_fail)
+ print(artifact_fail)
+ artifact_fail += 1
+ print(artifact_fail)
database_log.write('Error Extracting Attributes');
# Print attributes
@@ -692,18 +998,18 @@ class TskDbDiff(object):
if(attr[x] != None):
numvals += 1
if(numvals > 1):
- errorem += test_data.image_name + ":There were too many values for attribute type: " + attr[1] + " for artifact with id #" + str(rw[3]) + ".\n"
- printerror(test_data, "There were too many values for attribute type: " + attr[1] + " for artifact with id #" + str(rw[3]) + " for image " + test_data.image_name + ".")
- failedbool = True
+ msg = "There were too many values for attribute type: " + attr[1] + " for artifact with id #" + str(rw[3]) + ".\n"
+ Errors.add_email_msg(msg)
+ Errors.print_error(msg)
if(not appnd):
- attachl.append(autopsy_db_file)
+ Errors.add_email_attachment(db_file)
appnd = True
if(not attr[0] == src):
- errorem += test_data.image_name + ":There were inconsistent sources for artifact with id #" + str(rw[3]) + ".\n"
- printerror(test_data, "There were inconsistent sources for artifact with id #" + str(rw[3]) + " for image " + test_data.image_name + ".")
- failedbool = True
+ msg ="There were inconsistent sources for artifact with id #" + str(rw[3]) + ".\n"
+ Errors.add_email_msg(msg)
+ Errors.print_error(msg)
if(not appnd):
- attachl.append(autopsy_db_file)
+ Errors.add_email_attachment(db_file)
appnd = True
try:
database_log.write('
(will skew other test results)