Fix deadlock potential in IngestJob/DataSourceIngestJob interactions

This commit is contained in:
Richard Cordovano 2015-02-03 19:21:20 -05:00
parent 462fcb5413
commit 1729e4ca5b

View File

@ -22,9 +22,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.sleuthkit.datamodel.Content;
@ -39,7 +40,8 @@ public final class IngestJob {
private static final AtomicLong nextId = new AtomicLong(0L);
private final long id;
private final Map<Long, DataSourceIngestJob> dataSourceJobs;
private boolean cancelled;
private final AtomicInteger incompleteJobsCount;
private volatile boolean cancelled;
/**
* Constructs an ingest job that runs a collection of data sources through a
@ -52,11 +54,12 @@ public final class IngestJob {
*/
IngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean runInteractively) {
this.id = IngestJob.nextId.getAndIncrement();
this.dataSourceJobs = new HashMap<>();
this.dataSourceJobs = new ConcurrentHashMap<>();
for (Content dataSource : dataSources) {
DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, runInteractively);
this.dataSourceJobs.put(dataSourceIngestJob.getId(), dataSourceIngestJob);
}
incompleteJobsCount = new AtomicInteger(dataSourceJobs.size());
}
/**
@ -74,7 +77,7 @@ public final class IngestJob {
*
* @return True or false.
*/
synchronized boolean hasIngestPipeline() {
boolean hasIngestPipeline() {
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
if (dataSourceJob.hasIngestPipeline()) {
return true;
@ -89,16 +92,29 @@ public final class IngestJob {
*
* @return A collection of ingest module start up errors, empty on success.
*/
synchronized List<IngestModuleError> start() {
List<IngestModuleError> start() {
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
errors.addAll(dataSourceJob.start());
if (!errors.isEmpty()) {
// RJCTODO: Need to let sucessfully started data source ingest
// jobs know they should shut down.
break;
}
}
/**
* TODO: Need to let successfully started data source ingest jobs know
* they should shut down. This means that the start up of the ingest
* module pipelines and the submission of ingest tasks should be
* separated. This cancellation is just a stop gap; fortunately, if
* startup is going to fail, it will likely fail for the first child
* data source ingest job.
*/
if (!errors.isEmpty()) {
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
dataSourceJob.cancel();
}
}
return errors;
}
@ -107,7 +123,7 @@ public final class IngestJob {
*
* @return The snapshot.
*/
synchronized public ProgressSnapshot getSnapshot() {
public ProgressSnapshot getSnapshot() {
return new ProgressSnapshot();
}
@ -117,7 +133,7 @@ public final class IngestJob {
*
* @return A list of data source ingest job progress snapshots.
*/
synchronized List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
snapshots.add(dataSourceJob.getSnapshot());
@ -131,7 +147,7 @@ public final class IngestJob {
* but there may be a delay before all of the ingest modules in the
* pipelines respond by stopping processing.
*/
synchronized public void cancel() {
public void cancel() {
for (DataSourceIngestJob job : this.dataSourceJobs.values()) {
job.cancel();
}
@ -144,7 +160,7 @@ public final class IngestJob {
*
* @return True or false.
*/
synchronized public boolean isCancelled() {
public boolean isCancelled() {
return this.cancelled;
}
@ -154,9 +170,8 @@ public final class IngestJob {
*
* @param dataSourceIngestJob A completed data source ingest job.
*/
synchronized void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) {
this.dataSourceJobs.remove(dataSourceIngestJob.getId());
if (this.dataSourceJobs.isEmpty()) {
void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) {
if (incompleteJobsCount.decrementAndGet() == 0) {
IngestManager.getInstance().finishIngestJob(this);
}
}