Fix thread safety issue in multi-stage ingest job

This commit is contained in:
Richard Cordovano 2014-11-06 19:17:43 -05:00
parent edaef4fdbc
commit 06e2502119
2 changed files with 156 additions and 107 deletions

View File

@ -71,6 +71,7 @@ final class DataSourceIngestPipeline {
"IngestJob.progress.dataSourceIngest.displayName",
module.getDisplayName(), dataSource.getName());
this.job.updateDataSourceIngestProgressBarDisplayName(displayName);
this.job.switchDataSourceIngestProgressBarToIndeterminate();
ingestManager.setIngestTaskProgress(task, module.getDisplayName());
module.process(dataSource, new DataSourceIngestModuleProgress(this.job));
} catch (Exception ex) { // Catch-all exception firewall

View File

@ -42,26 +42,12 @@ import org.sleuthkit.datamodel.Content;
*/
final class IngestJob {
/**
* An ingest job may have multiple stages.
*/
private enum Stages {
/**
* Setting up for processing.
*/
INITIALIZATION,
/**
* High priority data source ingest modules and file ingest modules.
*/
FIRST,
/**
* Lower priority, usually long-running, data source ingest modules.
*/
SECOND
};
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
/**
* The task scheduler singleton is responsible for creating and scheduling
* the ingest tasks that make up ingest jobs.
*/
private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
/**
@ -73,33 +59,66 @@ final class IngestJob {
private static final ConcurrentHashMap<Long, IngestJob> jobsById = new ConcurrentHashMap<>();
/**
* These fields define the ingest job and the work it entails. Note that
* there is a collection for multiple copies of the file ingest pipeline,
* one for each file ingest thread.
* These fields define the ingest job, including its ingest pipelines. Note
* that there is a collection of multiple copies of the file ingest
* pipeline, one for each file ingest thread.
*/
private final long id;
private final Content dataSource;
private final boolean processUnallocatedSpace;
private Stages stage;
private final Object dataSourceIngestPipelineLock;
private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
private DataSourceIngestPipeline currentDataSourceIngestPipeline;
private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelines;
/**
* These fields are used to update ingest progress bars for the job.
* An ingest runs in stages.
*/
private static enum Stages {
/**
* Setting up for processing.
*/
INITIALIZATION,
/**
* Running high priority data source level ingest modules and file level
* ingest modules.
*/
FIRST,
/**
* Running lower priority, usually long-running, data source level
* ingest modules.
*/
SECOND,
/**
* Cleaning up.
*/
FINALIZATION
};
private Stages stage;
private final Object stageCompletionCheckLock;
/**
* These fields are used to provide data source level task progress bars for
* the job.
*/
private ProgressHandle dataSourceIngestProgress;
private final Object dataSourceIngestProgressLock;
private ProgressHandle dataSourceIngestProgress;
/**
* These fields are used to provide file level ingest task progress bars for
* the job.
*/
private final Object fileIngestProgressLock;
private final List<String> filesInProgress;
private long estimatedFilesToProcess;
private long processedFiles;
private ProgressHandle fileIngestProgress;
private final Object fileIngestProgressLock;
/**
* These fields support cancellation of either the currently running data
* source ingest module or the entire ingest job.
* source level ingest module or the entire ingest job.
*/
private volatile boolean currentDataSourceIngestModuleCancelled;
private volatile boolean cancelled;
@ -188,11 +207,13 @@ final class IngestJob {
this.id = id;
this.dataSource = dataSource;
this.processUnallocatedSpace = processUnallocatedSpace;
this.stage = IngestJob.Stages.INITIALIZATION;
this.dataSourceIngestPipelineLock = new Object();
this.fileIngestPipelines = new LinkedBlockingQueue<>();
this.filesInProgress = new ArrayList<>();
this.dataSourceIngestProgressLock = new Object();
this.fileIngestProgressLock = new Object();
this.stage = IngestJob.Stages.INITIALIZATION;
this.stageCompletionCheckLock = new Object();
this.startTime = new Date().getTime();
}
@ -208,15 +229,15 @@ final class IngestJob {
/**
* Gets the data source to be ingested by this job.
*
* @return A reference to a Content object representing the data source.
* @return A Content object representing the data source.
*/
Content getDataSource() {
return this.dataSource;
}
/**
* Queries whether or not unallocated space should be processed as part of
* this job.
* Gets whether or not unallocated space should be processed as part of this
* job.
*
* @return True or false.
*/
@ -226,20 +247,22 @@ final class IngestJob {
/**
* Passes the data source for this job through the currently active data
* source ingest pipeline.
* source level ingest pipeline.
*
* @param task A data source ingest task wrapping the data source.
*/
void process(DataSourceIngestTask task) {
try {
if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) {
/**
* Run the data source through the pipeline.
*/
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(this.currentDataSourceIngestPipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
synchronized (this.dataSourceIngestPipelineLock) {
if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) {
/**
* Run the data source through the pipeline.
*/
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(this.currentDataSourceIngestPipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
}
}
@ -254,14 +277,17 @@ final class IngestJob {
}
}
} finally {
/**
* No matter what happens, do ingest task bookkeeping.
*/
IngestJob.taskScheduler.notifyTaskCompleted(task);
this.checkForCurrentTasksCompleted();
this.checkForStageCompleted();
}
}
/**
* Passes a file from the data source for this job through the file ingest
* pipeline.
* Passes a file from the data source for this job through the file level
* ingest pipeline.
*
* @param task A file ingest task.
* @throws InterruptedException if the thread executing this code is
@ -327,18 +353,26 @@ final class IngestJob {
this.fileIngestPipelines.put(pipeline);
}
} finally {
/**
* No matter what happens, do ingest task bookkeeping.
*/
IngestJob.taskScheduler.notifyTaskCompleted(task);
this.checkForCurrentTasksCompleted();
this.checkForStageCompleted();
}
}
/**
* Adds more files to an ingest job, i.e., derived or carved files. Not
* Adds more files to an ingest job, i.e., extracted or carved files. Not
* currently supported for the second stage of the job.
*
* @param files A list of files to add.
*/
void addFiles(List<AbstractFile> files) {
/**
* Note: This implementation assumes that this is being called by an an
* ingest module running code on an ingest thread that is holding a
* reference to an ingest task, so no task completion check is done.
*/
if (IngestJob.Stages.FIRST == this.stage) {
for (AbstractFile file : files) {
IngestJob.taskScheduler.scheduleFileIngestTask(this, file);
@ -346,10 +380,18 @@ final class IngestJob {
} else {
IngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS
}
/**
* The intended clients of this method are ingest modules running code
* on an ingest thread that is holding a reference to an ingest task, in
* which case a task completion check would not be necessary. This is a
* bit of defensive programming.
*/
this.checkForStageCompleted();
}
/**
* Updates the display name of the data source ingest progress bar.
* Updates the display name of the data source level ingest progress bar.
*
* @param displayName The new display name.
*/
@ -362,8 +404,9 @@ final class IngestJob {
}
/**
* Switches the data source progress bar to determinate mode. This should be
* called if the total work units to process the data source is known.
* Switches the data source level ingest progress bar to determinate mode.
* This should be called if the total work units to process the data source
* is known.
*
* @param workUnits Total number of work units for the processing of the
* data source.
@ -379,9 +422,9 @@ final class IngestJob {
}
/**
* Switches the data source ingest progress bar to indeterminate mode. This
* should be called if the total work units to process the data source is
* unknown.
* Switches the data source level ingest progress bar to indeterminate mode.
* This should be called if the total work units to process the data source
* is unknown.
*/
void switchDataSourceIngestProgressBarToIndeterminate() {
if (!this.cancelled) {
@ -394,8 +437,8 @@ final class IngestJob {
}
/**
* Updates the data source ingest progress bar with the number of work units
* performed, if in the determinate mode.
* Updates the data source level ingest progress bar with the number of work
* units performed, if in the determinate mode.
*
* @param workUnits Number of work units performed.
*/
@ -410,7 +453,8 @@ final class IngestJob {
}
/**
* Updates the data source ingest progress with a new task name.
* Updates the data source level ingest progress with a new task name, where
* the task name is the "subtitle" under the display name.
*
* @param currentTask The task name.
*/
@ -425,8 +469,9 @@ final class IngestJob {
}
/**
* Updates the progress bar with a new task name and the number of work
* units performed, if in the determinate mode.
* Updates the data source level ingest progress bar with a new task name
* and the number of work units performed, if in the determinate mode. The
* task name is the "subtitle" under the display name.
*
* @param currentTask The task name.
* @param workUnits Number of work units performed.
@ -440,9 +485,9 @@ final class IngestJob {
}
/**
* Determines whether or not a temporary cancellation of data source ingest
* in order to stop the currently executing data source ingest module is in
* effect.
* Queries whether or not a temporary cancellation of data source level
* ingest in order to stop the currently executing data source level ingest
* module is in effect.
*
* @return True or false.
*/
@ -451,8 +496,8 @@ final class IngestJob {
}
/**
* Rescind a temporary cancellation of data source ingest used to stop the
* currently executing data source ingest module.
* Rescind a temporary cancellation of data source level ingest that was
* used to stop a single data source level ingest module.
*/
void currentDataSourceIngestModuleCancellationCompleted() {
this.currentDataSourceIngestModuleCancelled = false;
@ -472,13 +517,13 @@ final class IngestJob {
}
/**
* Requests cancellation of ingest, i.e., a shutdown of the data source and
* file ingest pipelines.
* Requests cancellation of ingest, i.e., a shutdown of the data source
* level and file level ingest pipelines.
*/
void cancel() {
/**
* Put a cancellation message on data source ingest progress bar, if it
* is still running.
* Put a cancellation message on data source level ingest progress bar,
* if it is still running.
*/
synchronized (this.dataSourceIngestProgressLock) {
if (dataSourceIngestProgress != null) {
@ -493,8 +538,8 @@ final class IngestJob {
}
/**
* Put a cancellation message on the file ingest progress bar, if it is
* still running.
* Put a cancellation message on the file level ingest progress bar, if
* it is still running.
*/
synchronized (this.fileIngestProgressLock) {
if (this.fileIngestProgress != null) {
@ -514,12 +559,12 @@ final class IngestJob {
* not being performed by an ingest thread.
*/
IngestJob.taskScheduler.cancelPendingTasksForIngestJob(this);
this.checkForCurrentTasksCompleted();
this.checkForStageCompleted();
}
/**
* Queries whether or not cancellation of ingest i.e., a shutdown of the
* data source and file ingest pipelines, has been requested
* data source level and file level ingest pipelines, has been requested.
*
* @return True or false.
*/
@ -571,9 +616,9 @@ final class IngestJob {
* ordered lists of ingest module templates for each ingest pipeline.
*/
IngestPipelinesConfiguration pipelineConfigs = IngestPipelinesConfiguration.getInstance();
List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = this.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageOneDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> fileIngestModuleTemplates = this.getConfiguredIngestModuleTemplates(fileModuleTemplates, pipelineConfigs.getFileIngestPipelineConfig());
List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = this.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageTwoDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageOneDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> fileIngestModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(fileModuleTemplates, pipelineConfigs.getFileIngestPipelineConfig());
List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = IngestJob.getConfiguredIngestModuleTemplates(dataSourceModuleTemplates, pipelineConfigs.getStageTwoDataSourceIngestPipelineConfig());
/**
* Add any module templates that were not specified in the pipelines
@ -623,7 +668,7 @@ final class IngestJob {
* names representing an ingest pipeline.
* @return
*/
List<IngestModuleTemplate> getConfiguredIngestModuleTemplates(Map<String, IngestModuleTemplate> ingestModuleTemplates, List<String> pipelineConfig) {
private static List<IngestModuleTemplate> getConfiguredIngestModuleTemplates(Map<String, IngestModuleTemplate> ingestModuleTemplates, List<String> pipelineConfig) {
List<IngestModuleTemplate> templates = new ArrayList<>();
for (String moduleClassName : pipelineConfig) {
if (ingestModuleTemplates.containsKey(moduleClassName)) {
@ -640,7 +685,7 @@ final class IngestJob {
this.stage = IngestJob.Stages.FIRST;
/**
* Start one or both of the first stage progress bars.
* Start one or both of the first stage ingest progress bars.
*/
if (this.hasFirstStageDataSourceIngestPipeline()) {
this.startDataSourceIngestProgressBar();
@ -650,10 +695,12 @@ final class IngestJob {
}
/**
* Make the first stage data source pipeline the current data source
* pipeline.
* Make the first stage data source level ingest pipeline the current
* data source level pipeline.
*/
this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline;
synchronized (this.dataSourceIngestPipelineLock) {
this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline;
}
/**
* Schedule the first stage tasks.
@ -672,7 +719,7 @@ final class IngestJob {
* the tasks scheduler. In this special case, an ingest thread will
* never to check for completion of this stage of the job.
*/
this.checkForCurrentTasksCompleted();
this.checkForStageCompleted();
}
}
@ -682,7 +729,9 @@ final class IngestJob {
private void startSecondStage() {
this.stage = IngestJob.Stages.SECOND;
this.startDataSourceIngestProgressBar();
this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
synchronized (this.dataSourceIngestPipelineLock) {
this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
}
IngestJob.taskScheduler.scheduleDataSourceIngestTask(this);
}
@ -698,7 +747,8 @@ final class IngestJob {
}
/**
* Checks to see if this job has a first stage data source ingest pipeline.
* Checks to see if this job has a first stage data source level ingest
* pipeline.
*
* @return True or false.
*/
@ -707,7 +757,8 @@ final class IngestJob {
}
/**
* Checks to see if this job has a second stage data source ingest pipeline.
* Checks to see if this job has a second stage data source level ingest
* pipeline.
*
* @return True or false.
*/
@ -716,7 +767,7 @@ final class IngestJob {
}
/**
* Checks to see if the job has a file ingest pipeline.
* Checks to see if the job has a file level ingest pipeline.
*
* @return True or false.
*/
@ -725,8 +776,8 @@ final class IngestJob {
}
/**
* Starts up each of the file and data source ingest modules to collect
* possible errors.
* Starts up each of the file and data source level ingest modules to
* collect possible errors.
*
* @return A collection of ingest module startup errors, empty on success.
*/
@ -767,7 +818,7 @@ final class IngestJob {
}
/**
* Starts the data source ingest progress bar.
* Starts the data source level ingest progress bar.
*/
private void startDataSourceIngestProgressBar() {
synchronized (this.dataSourceIngestProgressLock) {
@ -800,7 +851,7 @@ final class IngestJob {
}
/**
* Starts the file ingest progress bar.
* Starts the file level ingest progress bar.
*/
private void startFileIngestProgressBar() {
synchronized (this.fileIngestProgressLock) {
@ -826,26 +877,20 @@ final class IngestJob {
/**
* Checks to see if the ingest tasks for the current stage are completed and
* invokes a handler if they are.
* does a stage transition if they are.
*/
private void checkForCurrentTasksCompleted() {
if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) {
this.handleTasksCompleted();
}
}
/**
* Handles when all ingest tasks for this job are completed by finishing the
* current stage and possibly starting the next stage.
*/
private void handleTasksCompleted() {
switch (this.stage) {
case FIRST:
this.finishFirstStage();
break;
case SECOND:
this.finish();
break;
private void checkForStageCompleted() {
synchronized (this.stageCompletionCheckLock) {
if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) {
switch (this.stage) {
case FIRST:
this.finishFirstStage();
break;
case SECOND:
this.finish();
break;
}
}
}
}
@ -898,6 +943,8 @@ final class IngestJob {
* Shuts down the ingest pipelines and progress bars for this job.
*/
private void finish() {
this.stage = IngestJob.Stages.FINALIZATION;
// Finish the second stage data source ingest progress bar, if it hasn't
// already been finished.
synchronized (this.dataSourceIngestProgressLock) {
@ -929,8 +976,8 @@ final class IngestJob {
}
/**
* Requests a temporary cancellation of data source ingest in order to stop
* the currently executing data source ingest module.
* Requests a temporary cancellation of data source level ingest in order to
* stop the currently executing data source ingest module.
*/
private void cancelCurrentDataSourceIngestModule() {
this.currentDataSourceIngestModuleCancelled = true;
@ -943,6 +990,7 @@ final class IngestJob {
*/
private IngestJobSnapshot getSnapshot() {
return new IngestJobSnapshot();
}
/**