Removed sleeps from IngestManager.stoAll(), added more cancel checks

This commit is contained in:
Richard Cordovano 2014-04-04 09:43:28 -04:00
parent f390695795
commit 13a176f413

View File

@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
@ -362,12 +363,6 @@ public class IngestManager {
// First get the task scheduling worker to stop adding new tasks. // First get the task scheduling worker to stop adding new tasks.
if (taskSchedulingWorker != null) { if (taskSchedulingWorker != null) {
taskSchedulingWorker.cancel(true); taskSchedulingWorker.cancel(true);
while (!taskSchedulingWorker.isDone()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
taskSchedulingWorker = null; taskSchedulingWorker = null;
} }
@ -378,16 +373,15 @@ public class IngestManager {
job.cancel(); job.cancel();
} }
// Jettision the remaining tasks. This will dispose of any tasks that
// the scheduling worker queued up before it was cancelled.
scheduler.getFileScheduler().empty();
scheduler.getDataSourceScheduler().empty();
// Cancel the data source task worker. It will release its pipelines // Cancel the data source task worker. It will release its pipelines
// in its done() method and the pipelines will shut down their modules. // in its done() method and the pipelines will shut down their modules.
if (dataSourceTaskWorker != null) { if (dataSourceTaskWorker != null) {
dataSourceTaskWorker.cancel(true); dataSourceTaskWorker.cancel(true);
while (!dataSourceTaskWorker.isDone()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
dataSourceTaskWorker = null; dataSourceTaskWorker = null;
} }
@ -397,26 +391,20 @@ public class IngestManager {
for (FileTaskWorker worker : fileTaskWorkers) { for (FileTaskWorker worker : fileTaskWorkers) {
if (worker != null) { if (worker != null) {
worker.cancel(true); worker.cancel(true);
while (!worker.isDone()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
}
}
worker = null; worker = null;
} }
} }
// Jettision the remaining tasks. This will dispose of any tasks that // Jettision the remaining tasks again to try to dispose of any tasks
// the scheduling worker queued up before it was cancelled. // queued up task workers before they were cancelled.
scheduler.getFileScheduler().empty(); scheduler.getFileScheduler().empty();
scheduler.getDataSourceScheduler().empty(); scheduler.getDataSourceScheduler().empty();
} }
/** /**
* Test if any ingest modules are running * Test if any ingest jobs are in progress.
* *
* @return true if any module is running, false otherwise * @return True if any ingest jobs are in progress, false otherwise
*/ */
public synchronized boolean isIngestRunning() { public synchronized boolean isIngestRunning() {
// TODO: There is a race condition here with SwingWorker.isDone(). // TODO: There is a race condition here with SwingWorker.isDone().
@ -474,6 +462,7 @@ public class IngestManager {
private final List<IngestModuleTemplate> moduleTemplates; private final List<IngestModuleTemplate> moduleTemplates;
private final boolean processUnallocatedSpace; private final boolean processUnallocatedSpace;
private ProgressHandle progress; private ProgressHandle progress;
private volatile boolean finished = false;
TaskSchedulingWorker(List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) { TaskSchedulingWorker(List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
this.dataSources = dataSources; this.dataSources = dataSources;
@ -483,13 +472,10 @@ public class IngestManager {
@Override @Override
protected Object doInBackground() throws Exception { protected Object doInBackground() throws Exception {
// Set up a progress bar that can be used to cancel all of the
// ingest jobs currently being performed.
final String displayName = "Queueing ingest tasks"; final String displayName = "Queueing ingest tasks";
progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override @Override
public boolean cancel() { public boolean cancel() {
logger.log(Level.INFO, "Queueing ingest cancelled by user.");
if (progress != null) { if (progress != null) {
progress.setDisplayName(displayName + " (Cancelling...)"); progress.setDisplayName(displayName + " (Cancelling...)");
} }
@ -502,10 +488,8 @@ public class IngestManager {
int processed = 0; int processed = 0;
for (Content dataSource : dataSources) { for (Content dataSource : dataSources) {
if (isCancelled()) { if (isCancelled()) {
logger.log(Level.INFO, "Task scheduling thread cancelled");
return null; return null;
} }
final String inputName = dataSource.getName(); final String inputName = dataSource.getName();
IngestJob ingestJob = new IngestJob(IngestManager.this.getNextDataSourceTaskId(), dataSource, moduleTemplates, processUnallocatedSpace); IngestJob ingestJob = new IngestJob(IngestManager.this.getNextDataSourceTaskId(), dataSource, moduleTemplates, processUnallocatedSpace);
@ -521,7 +505,7 @@ public class IngestManager {
failedModules.append(","); failedModules.append(",");
} }
} }
MessageNotifyUtil.Message.error( MessageNotifyUtil.Message.error( // RJCTODO: Fix this
"Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n"
+ "No ingest modules will be run. Please disable the module " + "No ingest modules will be run. Please disable the module "
+ "or fix the error and restart ingest by right clicking on " + "or fix the error and restart ingest by right clicking on "
@ -552,7 +536,6 @@ public class IngestManager {
try { try {
super.get(); super.get();
} catch (CancellationException | InterruptedException ex) { } catch (CancellationException | InterruptedException ex) {
// IngestManager.stopAll() will dispose of all tasks.
} catch (Exception ex) { } catch (Exception ex) {
logger.log(Level.SEVERE, "Error while scheduling ingest jobs", ex); logger.log(Level.SEVERE, "Error while scheduling ingest jobs", ex);
MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial");
@ -561,8 +544,13 @@ public class IngestManager {
startAll(); startAll();
} }
progress.finish(); progress.finish();
finished = true;
} }
} }
boolean isFinished() {
return finished;
}
} }
/** /**
@ -572,6 +560,7 @@ public class IngestManager {
class DataSourceTaskWorker extends SwingWorker<Object, Void> { class DataSourceTaskWorker extends SwingWorker<Object, Void> {
private final long id; private final long id;
private volatile boolean finished = false;
DataSourceTaskWorker(long threadId) { DataSourceTaskWorker(long threadId) {
this.id = threadId; this.id = threadId;
@ -579,18 +568,18 @@ public class IngestManager {
@Override @Override
protected Void doInBackground() throws Exception { protected Void doInBackground() throws Exception {
logger.log(Level.INFO, "Data source ingest thread (id={0}) started", this.id);
IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler();
while (scheduler.hasNext()) { while (scheduler.hasNext()) {
if (isCancelled()) { if (isCancelled()) {
logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id);
return null; return null;
} }
IngestJob job = scheduler.next(); IngestJob job = scheduler.next();
DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id);
pipeline.process(this, job.getDataSourceTaskProgressBar()); pipeline.process(this, job.getDataSourceTaskProgressBar());
if (isCancelled()) {
return null;
}
} }
logger.log(Level.INFO, "Data source ingest thread (id={0}) completed", this.id);
return null; return null;
} }
@ -599,14 +588,18 @@ public class IngestManager {
try { try {
super.get(); super.get();
} catch (CancellationException | InterruptedException e) { } catch (CancellationException | InterruptedException e) {
logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id);
} catch (Exception ex) { } catch (Exception ex) {
String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id); String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id);
logger.log(Level.SEVERE, message, ex); logger.log(Level.SEVERE, message, ex);
} finally { } finally {
IngestManager.getInstance().reportThreadDone(this.id); IngestManager.getInstance().reportThreadDone(this.id);
finished = true;
} }
} }
boolean isFinished() {
return finished;
}
} }
/** /**
@ -616,6 +609,7 @@ public class IngestManager {
class FileTaskWorker extends SwingWorker<Object, Void> { class FileTaskWorker extends SwingWorker<Object, Void> {
private final long id; private final long id;
private volatile boolean finished = false;
FileTaskWorker(long threadId) { FileTaskWorker(long threadId) {
this.id = threadId; this.id = threadId;
@ -623,11 +617,9 @@ public class IngestManager {
@Override @Override
protected Object doInBackground() throws Exception { protected Object doInBackground() throws Exception {
logger.log(Level.INFO, "File ingest thread (id={0}) started", this.id);
IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler();
while (fileScheduler.hasNext()) { while (fileScheduler.hasNext()) {
if (isCancelled()) { if (isCancelled()) {
logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id);
return null; return null;
} }
IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); IngestScheduler.FileScheduler.FileTask task = fileScheduler.next();
@ -635,8 +627,10 @@ public class IngestManager {
FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id);
job.handleFileTaskStarted(task); job.handleFileTaskStarted(task);
pipeline.process(task.getFile()); pipeline.process(task.getFile());
if (isCancelled()) {
return null;
}
} }
logger.log(Level.INFO, "File ingest thread (id={0}) completed", this.id);
return null; return null;
} }
@ -645,13 +639,17 @@ public class IngestManager {
try { try {
super.get(); super.get();
} catch (CancellationException | InterruptedException e) { } catch (CancellationException | InterruptedException e) {
logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id);
} catch (Exception ex) { } catch (Exception ex) {
String message = String.format("File ingest thread {0} experienced a fatal error", this.id); String message = String.format("File ingest thread {0} experienced a fatal error", this.id);
logger.log(Level.SEVERE, message, ex); logger.log(Level.SEVERE, message, ex);
} finally { } finally {
IngestManager.getInstance().reportThreadDone(this.id); IngestManager.getInstance().reportThreadDone(this.id);
finished = true;
} }
} }
boolean isFinished() {
return finished;
}
} }
} }