From 06e2502119c114c3d704e855d850d25450fc656c Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Thu, 6 Nov 2014 19:17:43 -0500 Subject: [PATCH] Fix thread safety issue in multi-stage ingest job --- .../ingest/DataSourceIngestPipeline.java | 1 + .../sleuthkit/autopsy/ingest/IngestJob.java | 262 +++++++++++------- 2 files changed, 156 insertions(+), 107 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index b603166981..702e3b63b4 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -71,6 +71,7 @@ final class DataSourceIngestPipeline { "IngestJob.progress.dataSourceIngest.displayName", module.getDisplayName(), dataSource.getName()); this.job.updateDataSourceIngestProgressBarDisplayName(displayName); + this.job.switchDataSourceIngestProgressBarToIndeterminate(); ingestManager.setIngestTaskProgress(task, module.getDisplayName()); module.process(dataSource, new DataSourceIngestModuleProgress(this.job)); } catch (Exception ex) { // Catch-all exception firewall diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index be6566f846..8569bab9b7 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -42,26 +42,12 @@ import org.sleuthkit.datamodel.Content; */ final class IngestJob { - /** - * An ingest job may have multiple stages. - */ - private enum Stages { - - /** - * Setting up for processing. - */ - INITIALIZATION, - /** - * High priority data source ingest modules and file ingest modules. - */ - FIRST, - /** - * Lower priority, usually long-running, data source ingest modules. - */ - SECOND - }; - private static final Logger logger = Logger.getLogger(IngestJob.class.getName()); + + /** + * The task scheduler singleton is responsible for creating and scheduling + * the ingest tasks that make up ingest jobs. + */ private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); /** @@ -73,33 +59,66 @@ final class IngestJob { private static final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); /** - * These fields define the ingest job and the work it entails. Note that - * there is a collection for multiple copies of the file ingest pipeline, - * one for each file ingest thread. + * These fields define the ingest job, including its ingest pipelines. Note + * that there is a collection of multiple copies of the file ingest + * pipeline, one for each file ingest thread. */ private final long id; private final Content dataSource; private final boolean processUnallocatedSpace; - private Stages stage; + private final Object dataSourceIngestPipelineLock; private DataSourceIngestPipeline firstStageDataSourceIngestPipeline; private DataSourceIngestPipeline secondStageDataSourceIngestPipeline; private DataSourceIngestPipeline currentDataSourceIngestPipeline; private final LinkedBlockingQueue fileIngestPipelines; /** - * These fields are used to update ingest progress bars for the job. + * An ingest runs in stages. + */ + private static enum Stages { + + /** + * Setting up for processing. + */ + INITIALIZATION, + /** + * Running high priority data source level ingest modules and file level + * ingest modules. + */ + FIRST, + /** + * Running lower priority, usually long-running, data source level + * ingest modules. + */ + SECOND, + /** + * Cleaning up. + */ + FINALIZATION + }; + private Stages stage; + private final Object stageCompletionCheckLock; + + /** + * These fields are used to provide data source level task progress bars for + * the job. */ - private ProgressHandle dataSourceIngestProgress; private final Object dataSourceIngestProgressLock; + private ProgressHandle dataSourceIngestProgress; + + /** + * These fields are used to provide file level ingest task progress bars for + * the job. + */ + private final Object fileIngestProgressLock; private final List filesInProgress; private long estimatedFilesToProcess; private long processedFiles; private ProgressHandle fileIngestProgress; - private final Object fileIngestProgressLock; /** * These fields support cancellation of either the currently running data - * source ingest module or the entire ingest job. + * source level ingest module or the entire ingest job. */ private volatile boolean currentDataSourceIngestModuleCancelled; private volatile boolean cancelled; @@ -188,11 +207,13 @@ final class IngestJob { this.id = id; this.dataSource = dataSource; this.processUnallocatedSpace = processUnallocatedSpace; - this.stage = IngestJob.Stages.INITIALIZATION; + this.dataSourceIngestPipelineLock = new Object(); this.fileIngestPipelines = new LinkedBlockingQueue<>(); this.filesInProgress = new ArrayList<>(); this.dataSourceIngestProgressLock = new Object(); this.fileIngestProgressLock = new Object(); + this.stage = IngestJob.Stages.INITIALIZATION; + this.stageCompletionCheckLock = new Object(); this.startTime = new Date().getTime(); } @@ -208,15 +229,15 @@ final class IngestJob { /** * Gets the data source to be ingested by this job. * - * @return A reference to a Content object representing the data source. + * @return A Content object representing the data source. */ Content getDataSource() { return this.dataSource; } /** - * Queries whether or not unallocated space should be processed as part of - * this job. + * Gets whether or not unallocated space should be processed as part of this + * job. * * @return True or false. */ @@ -226,20 +247,22 @@ final class IngestJob { /** * Passes the data source for this job through the currently active data - * source ingest pipeline. + * source level ingest pipeline. * * @param task A data source ingest task wrapping the data source. */ void process(DataSourceIngestTask task) { try { - if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) { - /** - * Run the data source through the pipeline. - */ - List errors = new ArrayList<>(); - errors.addAll(this.currentDataSourceIngestPipeline.process(task)); - if (!errors.isEmpty()) { - logIngestModuleErrors(errors); + synchronized (this.dataSourceIngestPipelineLock) { + if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) { + /** + * Run the data source through the pipeline. + */ + List errors = new ArrayList<>(); + errors.addAll(this.currentDataSourceIngestPipeline.process(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } } } @@ -254,14 +277,17 @@ final class IngestJob { } } } finally { + /** + * No matter what happens, do ingest task bookkeeping. + */ IngestJob.taskScheduler.notifyTaskCompleted(task); - this.checkForCurrentTasksCompleted(); + this.checkForStageCompleted(); } } /** - * Passes a file from the data source for this job through the file ingest - * pipeline. + * Passes a file from the data source for this job through the file level + * ingest pipeline. * * @param task A file ingest task. * @throws InterruptedException if the thread executing this code is @@ -327,18 +353,26 @@ final class IngestJob { this.fileIngestPipelines.put(pipeline); } } finally { + /** + * No matter what happens, do ingest task bookkeeping. + */ IngestJob.taskScheduler.notifyTaskCompleted(task); - this.checkForCurrentTasksCompleted(); + this.checkForStageCompleted(); } } /** - * Adds more files to an ingest job, i.e., derived or carved files. Not + * Adds more files to an ingest job, i.e., extracted or carved files. Not * currently supported for the second stage of the job. * * @param files A list of files to add. */ void addFiles(List files) { + /** + * Note: This implementation assumes that this is being called by an an + * ingest module running code on an ingest thread that is holding a + * reference to an ingest task, so no task completion check is done. + */ if (IngestJob.Stages.FIRST == this.stage) { for (AbstractFile file : files) { IngestJob.taskScheduler.scheduleFileIngestTask(this, file); @@ -346,10 +380,18 @@ final class IngestJob { } else { IngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS } + + /** + * The intended clients of this method are ingest modules running code + * on an ingest thread that is holding a reference to an ingest task, in + * which case a task completion check would not be necessary. This is a + * bit of defensive programming. + */ + this.checkForStageCompleted(); } /** - * Updates the display name of the data source ingest progress bar. + * Updates the display name of the data source level ingest progress bar. * * @param displayName The new display name. */ @@ -362,8 +404,9 @@ final class IngestJob { } /** - * Switches the data source progress bar to determinate mode. This should be - * called if the total work units to process the data source is known. + * Switches the data source level ingest progress bar to determinate mode. + * This should be called if the total work units to process the data source + * is known. * * @param workUnits Total number of work units for the processing of the * data source. @@ -379,9 +422,9 @@ final class IngestJob { } /** - * Switches the data source ingest progress bar to indeterminate mode. This - * should be called if the total work units to process the data source is - * unknown. + * Switches the data source level ingest progress bar to indeterminate mode. + * This should be called if the total work units to process the data source + * is unknown. */ void switchDataSourceIngestProgressBarToIndeterminate() { if (!this.cancelled) { @@ -394,8 +437,8 @@ final class IngestJob { } /** - * Updates the data source ingest progress bar with the number of work units - * performed, if in the determinate mode. + * Updates the data source level ingest progress bar with the number of work + * units performed, if in the determinate mode. * * @param workUnits Number of work units performed. */ @@ -410,7 +453,8 @@ final class IngestJob { } /** - * Updates the data source ingest progress with a new task name. + * Updates the data source level ingest progress with a new task name, where + * the task name is the "subtitle" under the display name. * * @param currentTask The task name. */ @@ -425,8 +469,9 @@ final class IngestJob { } /** - * Updates the progress bar with a new task name and the number of work - * units performed, if in the determinate mode. + * Updates the data source level ingest progress bar with a new task name + * and the number of work units performed, if in the determinate mode. The + * task name is the "subtitle" under the display name. * * @param currentTask The task name. * @param workUnits Number of work units performed. @@ -440,9 +485,9 @@ final class IngestJob { } /** - * Determines whether or not a temporary cancellation of data source ingest - * in order to stop the currently executing data source ingest module is in - * effect. + * Queries whether or not a temporary cancellation of data source level + * ingest in order to stop the currently executing data source level ingest + * module is in effect. * * @return True or false. */ @@ -451,8 +496,8 @@ final class IngestJob { } /** - * Rescind a temporary cancellation of data source ingest used to stop the - * currently executing data source ingest module. + * Rescind a temporary cancellation of data source level ingest that was + * used to stop a single data source level ingest module. */ void currentDataSourceIngestModuleCancellationCompleted() { this.currentDataSourceIngestModuleCancelled = false; @@ -472,13 +517,13 @@ final class IngestJob { } /** - * Requests cancellation of ingest, i.e., a shutdown of the data source and - * file ingest pipelines. + * Requests cancellation of ingest, i.e., a shutdown of the data source + * level and file level ingest pipelines. */ void cancel() { /** - * Put a cancellation message on data source ingest progress bar, if it - * is still running. + * Put a cancellation message on data source level ingest progress bar, + * if it is still running. */ synchronized (this.dataSourceIngestProgressLock) { if (dataSourceIngestProgress != null) { @@ -493,8 +538,8 @@ final class IngestJob { } /** - * Put a cancellation message on the file ingest progress bar, if it is - * still running. + * Put a cancellation message on the file level ingest progress bar, if + * it is still running. */ synchronized (this.fileIngestProgressLock) { if (this.fileIngestProgress != null) { @@ -514,12 +559,12 @@ final class IngestJob { * not being performed by an ingest thread. */ IngestJob.taskScheduler.cancelPendingTasksForIngestJob(this); - this.checkForCurrentTasksCompleted(); + this.checkForStageCompleted(); } /** * Queries whether or not cancellation of ingest i.e., a shutdown of the - * data source and file ingest pipelines, has been requested + * data source level and file level ingest pipelines, has been requested. * * @return True or false. */ @@ -571,9 +616,9 @@ final class IngestJob { * ordered lists of ingest module templates for each ingest pipeline. */ IngestPipelinesConfiguration pipelineConfigs = IngestPipelinesConfiguration.getInstance(); - List firstStageDataSourceModuleTemplates = this.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageOneDataSourceIngestPipelineConfig()); - List fileIngestModuleTemplates = this.getConfiguredIngestModuleTemplates(fileModuleTemplates, pipelineConfigs.getFileIngestPipelineConfig()); - List secondStageDataSourceModuleTemplates = this.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageTwoDataSourceIngestPipelineConfig()); + List firstStageDataSourceModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageOneDataSourceIngestPipelineConfig()); + List fileIngestModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(fileModuleTemplates, pipelineConfigs.getFileIngestPipelineConfig()); + List secondStageDataSourceModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageTwoDataSourceIngestPipelineConfig()); /** * Add any module templates that were not specified in the pipelines @@ -623,7 +668,7 @@ final class IngestJob { * names representing an ingest pipeline. * @return */ - List getConfiguredIngestModuleTemplates(Map ingestModuleTemplates, List pipelineConfig) { + private static List getConfiguredIngestModuleTemplates(Map ingestModuleTemplates, List pipelineConfig) { List templates = new ArrayList<>(); for (String moduleClassName : pipelineConfig) { if (ingestModuleTemplates.containsKey(moduleClassName)) { @@ -640,7 +685,7 @@ final class IngestJob { this.stage = IngestJob.Stages.FIRST; /** - * Start one or both of the first stage progress bars. + * Start one or both of the first stage ingest progress bars. */ if (this.hasFirstStageDataSourceIngestPipeline()) { this.startDataSourceIngestProgressBar(); @@ -650,10 +695,12 @@ final class IngestJob { } /** - * Make the first stage data source pipeline the current data source - * pipeline. + * Make the first stage data source level ingest pipeline the current + * data source level pipeline. */ - this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline; + synchronized (this.dataSourceIngestPipelineLock) { + this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline; + } /** * Schedule the first stage tasks. @@ -672,7 +719,7 @@ final class IngestJob { * the tasks scheduler. In this special case, an ingest thread will * never to check for completion of this stage of the job. */ - this.checkForCurrentTasksCompleted(); + this.checkForStageCompleted(); } } @@ -682,7 +729,9 @@ final class IngestJob { private void startSecondStage() { this.stage = IngestJob.Stages.SECOND; this.startDataSourceIngestProgressBar(); - this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; + synchronized (this.dataSourceIngestPipelineLock) { + this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; + } IngestJob.taskScheduler.scheduleDataSourceIngestTask(this); } @@ -698,7 +747,8 @@ final class IngestJob { } /** - * Checks to see if this job has a first stage data source ingest pipeline. + * Checks to see if this job has a first stage data source level ingest + * pipeline. * * @return True or false. */ @@ -707,7 +757,8 @@ final class IngestJob { } /** - * Checks to see if this job has a second stage data source ingest pipeline. + * Checks to see if this job has a second stage data source level ingest + * pipeline. * * @return True or false. */ @@ -716,7 +767,7 @@ final class IngestJob { } /** - * Checks to see if the job has a file ingest pipeline. + * Checks to see if the job has a file level ingest pipeline. * * @return True or false. */ @@ -725,8 +776,8 @@ final class IngestJob { } /** - * Starts up each of the file and data source ingest modules to collect - * possible errors. + * Starts up each of the file and data source level ingest modules to + * collect possible errors. * * @return A collection of ingest module startup errors, empty on success. */ @@ -767,7 +818,7 @@ final class IngestJob { } /** - * Starts the data source ingest progress bar. + * Starts the data source level ingest progress bar. */ private void startDataSourceIngestProgressBar() { synchronized (this.dataSourceIngestProgressLock) { @@ -800,7 +851,7 @@ final class IngestJob { } /** - * Starts the file ingest progress bar. + * Starts the file level ingest progress bar. */ private void startFileIngestProgressBar() { synchronized (this.fileIngestProgressLock) { @@ -826,26 +877,20 @@ final class IngestJob { /** * Checks to see if the ingest tasks for the current stage are completed and - * invokes a handler if they are. + * does a stage transition if they are. */ - private void checkForCurrentTasksCompleted() { - if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) { - this.handleTasksCompleted(); - } - } - - /** - * Handles when all ingest tasks for this job are completed by finishing the - * current stage and possibly starting the next stage. - */ - private void handleTasksCompleted() { - switch (this.stage) { - case FIRST: - this.finishFirstStage(); - break; - case SECOND: - this.finish(); - break; + private void checkForStageCompleted() { + synchronized (this.stageCompletionCheckLock) { + if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) { + switch (this.stage) { + case FIRST: + this.finishFirstStage(); + break; + case SECOND: + this.finish(); + break; + } + } } } @@ -898,6 +943,8 @@ final class IngestJob { * Shuts down the ingest pipelines and progress bars for this job. */ private void finish() { + this.stage = IngestJob.Stages.FINALIZATION; + // Finish the second stage data source ingest progress bar, if it hasn't // already been finished. synchronized (this.dataSourceIngestProgressLock) { @@ -929,8 +976,8 @@ final class IngestJob { } /** - * Requests a temporary cancellation of data source ingest in order to stop - * the currently executing data source ingest module. + * Requests a temporary cancellation of data source level ingest in order to + * stop the currently executing data source ingest module. */ private void cancelCurrentDataSourceIngestModule() { this.currentDataSourceIngestModuleCancelled = true; @@ -943,6 +990,7 @@ final class IngestJob { */ private IngestJobSnapshot getSnapshot() { return new IngestJobSnapshot(); + } /**