Merge pull request #725 from rcordovano/ingest_scheduler_job_cancel_enhancement

Ingest scheduler job cancel enhancement
This commit is contained in:
Richard Cordovano 2014-05-16 15:42:59 -04:00
commit 35a2ffa896
2 changed files with 77 additions and 51 deletions

View File

@ -65,13 +65,12 @@ final class IngestJob {
long jobId = nextIngestJobId.incrementAndGet(); long jobId = nextIngestJobId.incrementAndGet();
IngestJob job = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace); IngestJob job = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace);
ingestJobsById.put(jobId, job); ingestJobsById.put(jobId, job);
IngestManager.getInstance().fireIngestJobStarted(jobId);
List<IngestModuleError> errors = job.start(); List<IngestModuleError> errors = job.start();
if (errors.isEmpty()) { if (errors.isEmpty()) {
taskScheduler.scheduleTasksForIngestJob(job, dataSource); IngestManager.getInstance().fireIngestJobStarted(jobId);
taskScheduler.addTasksForIngestJob(job, dataSource);
} else { } else {
ingestJobsById.remove(jobId); ingestJobsById.remove(jobId);
IngestManager.getInstance().fireIngestJobCancelled(jobId);
} }
return errors; return errors;
} }
@ -106,7 +105,7 @@ final class IngestJob {
return processUnallocatedSpace; return processUnallocatedSpace;
} }
List<IngestModuleError> start() throws InterruptedException { private List<IngestModuleError> start() throws InterruptedException {
List<IngestModuleError> errors = startUpIngestPipelines(); List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) { if (errors.isEmpty()) {
startFileIngestProgressBar(); startFileIngestProgressBar();
@ -180,24 +179,28 @@ final class IngestJob {
} }
void process(DataSourceIngestTask task) throws InterruptedException { void process(DataSourceIngestTask task) throws InterruptedException {
// If the job is not cancelled, complete the task, otherwise just flush
// it.
if (!isCancelled()) { if (!isCancelled()) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceTasksProgress)); errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceTasksProgress));
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
dataSourceTasksProgress.finish(); } else {
taskScheduler.removeTasksForIngestJob(id);
} }
// Because there is only one data source task per job, it is o.k. to
// call ProgressHandle.finish() now that the data source ingest modules
// are through using the progress bar via the DataSourceIngestModuleProgress wrapper.
// Calling ProgressHandle.finish() again in finish() will be harmless.
dataSourceTasksProgress.finish();
if (taskScheduler.isLastTaskForIngestJob(task)) { if (taskScheduler.isLastTaskForIngestJob(task)) {
finish(); finish();
} }
} }
void process(FileIngestTask task) throws InterruptedException { void process(FileIngestTask task) throws InterruptedException {
// If the job is not cancelled, complete the task, otherwise just flush
// it.
if (!isCancelled()) { if (!isCancelled()) {
AbstractFile file = task.getFile(); AbstractFile file = task.getFile();
synchronized (this) { synchronized (this) {
@ -215,7 +218,10 @@ final class IngestJob {
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
} else {
taskScheduler.removeTasksForIngestJob(id);
} }
if (taskScheduler.isLastTaskForIngestJob(task)) { if (taskScheduler.isLastTaskForIngestJob(task)) {
finish(); finish();
} }
@ -227,13 +233,13 @@ final class IngestJob {
FileIngestPipeline pipeline = fileIngestPipelines.poll(); FileIngestPipeline pipeline = fileIngestPipelines.poll();
errors.addAll(pipeline.shutDown()); errors.addAll(pipeline.shutDown());
} }
fileTasksProgress.finish();
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
dataSourceTasksProgress.finish();
fileTasksProgress.finish();
ingestJobsById.remove(id); ingestJobsById.remove(id);
if (!cancelled) { if (!isCancelled()) {
IngestManager.getInstance().fireIngestJobCompleted(id); IngestManager.getInstance().fireIngestJobCompleted(id);
} }
} }
@ -248,10 +254,8 @@ final class IngestJob {
return cancelled; return cancelled;
} }
void cancel() { private void cancel() {
cancelled = true; cancelled = true;
fileTasksProgress.finish();
dataSourceTasksProgress.finish();
IngestManager.getInstance().fireIngestJobCancelled(id); IngestManager.getInstance().fireIngestJobCancelled(id);
} }
} }

View File

@ -21,6 +21,7 @@ package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -55,22 +56,15 @@ final class IngestScheduler {
private IngestScheduler() { private IngestScheduler() {
} }
synchronized void scheduleTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException { synchronized void addTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException {
// Enqueue a data source ingest task for the data source. // Enqueue a data source ingest task for the data source.
// If the thread executing this code is interrupted, it is because the
// the number of ingest threads has been decreased while ingest jobs are
// running. The calling thread will exit in an orderly fashion, but the
// task still needs to be enqueued rather than lost, hence the loop.
DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource); DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource);
while (true) { try {
try { dataSourceTasks.put(task);
dataSourceTasks.put(task); } catch (InterruptedException ex) {
break; Thread.currentThread().interrupt();
} catch (InterruptedException ex) { logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS
// Reset the interrupted status of the thread so the orderly return;
// exit can occur in the intended place.
Thread.currentThread().interrupt();
}
} }
// Get the top level files of the data source. // Get the top level files of the data source.
@ -117,7 +111,40 @@ final class IngestScheduler {
void addFileTaskToIngestJob(IngestJob job, AbstractFile file) { void addFileTaskToIngestJob(IngestJob job, AbstractFile file) {
FileIngestTask task = new FileIngestTask(job, file); FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueFileTask(task)) { if (shouldEnqueueFileTask(task)) {
addTaskToFileQueue(task); try {
fileTasks.put(task);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS
}
}
}
synchronized void removeTasksForIngestJob(long ingestJobId) {
// Remove all tasks for this ingest job that are not in progress.
Iterator<FileIngestTask> fileTasksIterator = fileTasks.iterator();
while (fileTasksIterator.hasNext()) {
if (fileTasksIterator.next().getIngestJob().getId() == ingestJobId) {
fileTasksIterator.remove();
}
}
Iterator<FileIngestTask> directoryTasksIterator = directoryTasks.iterator();
while (directoryTasksIterator.hasNext()) {
if (directoryTasksIterator.next().getIngestJob().getId() == ingestJobId) {
directoryTasksIterator.remove();
}
}
Iterator<FileIngestTask> rootDirectoryTasksIterator = rootDirectoryTasks.iterator();
while (rootDirectoryTasksIterator.hasNext()) {
if (rootDirectoryTasksIterator.next().getIngestJob().getId() == ingestJobId) {
rootDirectoryTasksIterator.remove();
}
}
Iterator<DataSourceIngestTask> dataSourceTasksIterator = dataSourceTasks.iterator();
while (dataSourceTasksIterator.hasNext()) {
if (dataSourceTasksIterator.next().getIngestJob().getId() == ingestJobId) {
dataSourceTasksIterator.remove();
}
} }
} }
@ -148,7 +175,13 @@ final class IngestScheduler {
final AbstractFile parentFile = parentTask.getFile(); final AbstractFile parentFile = parentTask.getFile();
// add itself to the file list // add itself to the file list
if (shouldEnqueueFileTask(parentTask)) { if (shouldEnqueueFileTask(parentTask)) {
addTaskToFileQueue(parentTask); try {
fileTasks.put(parentTask);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS
return;
}
} }
// add its children to the file and directory lists // add its children to the file and directory lists
try { try {
@ -160,7 +193,13 @@ final class IngestScheduler {
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
directoryTasks.add(childTask); directoryTasks.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) { } else if (shouldEnqueueFileTask(childTask)) {
addTaskToFileQueue(childTask); try {
fileTasks.put(childTask);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS
return;
}
} }
} }
} }
@ -170,23 +209,6 @@ final class IngestScheduler {
} }
} }
private void addTaskToFileQueue(FileIngestTask task) {
// If the thread executing this code is interrupted, it is because the
// the number of ingest threads has been decreased while ingest jobs are
// running. The calling thread will exit in an orderly fashion, but the
// task still needs to be enqueued rather than lost.
while (true) {
try {
fileTasks.put(task);
break;
} catch (InterruptedException ex) {
// Reset the interrupted status of the thread so the orderly
// exit can occur in the intended place.
Thread.currentThread().interrupt();
}
}
}
private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) { private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) {
final AbstractFile aFile = processTask.getFile(); final AbstractFile aFile = processTask.getFile();
//if it's unalloc file, skip if so scheduled //if it's unalloc file, skip if so scheduled