From ca1db52d096ad86f482857e3e1ff72aaaa740af0 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Fri, 16 May 2014 11:30:10 -0400 Subject: [PATCH 1/3] Add tasks flush to IngestScheduler for job cancellation --- .../sleuthkit/autopsy/ingest/IngestJob.java | 7 ++-- .../autopsy/ingest/IngestScheduler.java | 40 ++++++++++++++++--- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 76e6cf5590..667d20fa58 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -117,7 +117,7 @@ final class IngestJob { private List startUpIngestPipelines() throws InterruptedException { IngestJobContext context = new IngestJobContext(this); - + dataSourceIngestPipeline = new DataSourceIngestPipeline(context, ingestModuleTemplates); List errors = new ArrayList<>(); errors.addAll(dataSourceIngestPipeline.startUp()); @@ -227,13 +227,13 @@ final class IngestJob { FileIngestPipeline pipeline = fileIngestPipelines.poll(); errors.addAll(pipeline.shutDown()); } - fileTasksProgress.finish(); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } ingestJobsById.remove(id); if (!cancelled) { + fileTasksProgress.finish(); IngestManager.getInstance().fireIngestJobCompleted(id); } } @@ -248,7 +248,8 @@ final class IngestJob { return cancelled; } - void cancel() { + private void cancel() { + taskScheduler.removeAllTasksForIngestJob(id); cancelled = true; fileTasksProgress.finish(); dataSourceTasksProgress.finish(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java index 9b22267ff6..330ddb6c6a 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java @@ -21,6 +21,7 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.TreeSet; import java.util.concurrent.LinkedBlockingQueue; @@ -57,7 +58,7 @@ final class IngestScheduler { synchronized void scheduleTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException { // Enqueue a data source ingest task for the data source. - // If the thread executing this code is interrupted, it is because the + // If the thread executing this code is interrupted, tasksInProgressIterator is because the // the number of ingest threads has been decreased while ingest jobs are // running. The calling thread will exit in an orderly fashion, but the // task still needs to be enqueued rather than lost, hence the loop. @@ -85,12 +86,12 @@ final class IngestScheduler { try { children = root.getChildren(); if (children.isEmpty()) { - // Add the root object itself, it could be an unallocated space + // Add the root object itself, tasksInProgressIterator could be an unallocated space // file, or a child of a volume or an image. toptLevelFiles.add(root); } else { // The root object is a file system root directory, get - // the files within it. + // the files within tasksInProgressIterator. for (Content child : children) { if (child instanceof AbstractFile) { toptLevelFiles.add((AbstractFile) child); @@ -121,6 +122,33 @@ final class IngestScheduler { } } + synchronized void removeAllTasksForIngestJob(long ingestJobId) { + Iterator fileTasksIterator = fileTasks.iterator(); + while (fileTasksIterator.hasNext()) { + if (fileTasksIterator.next().getIngestJob().getId() == ingestJobId) { + fileTasksIterator.remove(); + } + } + Iterator directoryTasksIterator = directoryTasks.iterator(); + while (directoryTasksIterator.hasNext()) { + if (directoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { + directoryTasksIterator.remove(); + } + } + Iterator rootDirectoryTasksIterator = rootDirectoryTasks.iterator(); + while (rootDirectoryTasksIterator.hasNext()) { + if (rootDirectoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { + rootDirectoryTasksIterator.remove(); + } + } + Iterator dataSourceTasksIterator = dataSourceTasks.iterator(); + while (dataSourceTasksIterator.hasNext()) { + if (dataSourceTasksIterator.next().getIngestJob().getId() == ingestJobId) { + dataSourceTasksIterator.remove(); + } + } + } + private synchronized void updateFileTaskQueues(FileIngestTask taskInProgress) throws InterruptedException { if (taskInProgress != null) { tasksInProgress.add(taskInProgress); @@ -133,7 +161,7 @@ final class IngestScheduler { if (fileTasks.isEmpty() == false) { return; } - // fill in the directory queue if it is empty. + // fill in the directory queue if tasksInProgressIterator is empty. if (this.directoryTasks.isEmpty()) { // bail out if root is also empty -- we are done if (rootDirectoryTasks.isEmpty()) { @@ -171,7 +199,7 @@ final class IngestScheduler { } private void addTaskToFileQueue(FileIngestTask task) { - // If the thread executing this code is interrupted, it is because the + // If the thread executing this code is interrupted, tasksInProgressIterator is because the // the number of ingest threads has been decreased while ingest jobs are // running. The calling thread will exit in an orderly fashion, but the // task still needs to be enqueued rather than lost. @@ -189,7 +217,7 @@ final class IngestScheduler { private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) { final AbstractFile aFile = processTask.getFile(); - //if it's unalloc file, skip if so scheduled + //if tasksInProgressIterator's unalloc file, skip if so scheduled if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { return false; } From 5130353ef32cccfc9c1daff60bb7ffb6fbee2e77 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Fri, 16 May 2014 12:54:01 -0400 Subject: [PATCH 2/3] Change ingest cancel --- .../sleuthkit/autopsy/ingest/IngestJob.java | 26 +++++++++--------- .../autopsy/ingest/IngestScheduler.java | 27 ------------------- 2 files changed, 12 insertions(+), 41 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 667d20fa58..5023ff0cfb 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -65,13 +65,12 @@ final class IngestJob { long jobId = nextIngestJobId.incrementAndGet(); IngestJob job = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace); ingestJobsById.put(jobId, job); - IngestManager.getInstance().fireIngestJobStarted(jobId); List errors = job.start(); if (errors.isEmpty()) { + IngestManager.getInstance().fireIngestJobStarted(jobId); taskScheduler.scheduleTasksForIngestJob(job, dataSource); } else { ingestJobsById.remove(jobId); - IngestManager.getInstance().fireIngestJobCancelled(jobId); } return errors; } @@ -106,7 +105,7 @@ final class IngestJob { return processUnallocatedSpace; } - List start() throws InterruptedException { + private List start() throws InterruptedException { List errors = startUpIngestPipelines(); if (errors.isEmpty()) { startFileIngestProgressBar(); @@ -180,24 +179,26 @@ final class IngestJob { } void process(DataSourceIngestTask task) throws InterruptedException { - // If the job is not cancelled, complete the task, otherwise just flush - // it. if (!isCancelled()) { List errors = new ArrayList<>(); errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceTasksProgress)); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } - dataSourceTasksProgress.finish(); } + + // Because there is only one data source task per job, it is o.k. to + // call ProgressHandle.finish() now that the data source ingest modules + // are through using it via the DataSourceIngestModuleProgress wrapper. + // Calling ProgressHandle.finish() again in finish() will be harmless. + dataSourceTasksProgress.finish(); + if (taskScheduler.isLastTaskForIngestJob(task)) { finish(); } } void process(FileIngestTask task) throws InterruptedException { - // If the job is not cancelled, complete the task, otherwise just flush - // it. if (!isCancelled()) { AbstractFile file = task.getFile(); synchronized (this) { @@ -230,10 +231,10 @@ final class IngestJob { if (!errors.isEmpty()) { logIngestModuleErrors(errors); } - + dataSourceTasksProgress.finish(); + fileTasksProgress.finish(); ingestJobsById.remove(id); - if (!cancelled) { - fileTasksProgress.finish(); + if (!isCancelled()) { IngestManager.getInstance().fireIngestJobCompleted(id); } } @@ -249,10 +250,7 @@ final class IngestJob { } private void cancel() { - taskScheduler.removeAllTasksForIngestJob(id); cancelled = true; - fileTasksProgress.finish(); - dataSourceTasksProgress.finish(); IngestManager.getInstance().fireIngestJobCancelled(id); } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java index 330ddb6c6a..f202c3f2dc 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java @@ -121,33 +121,6 @@ final class IngestScheduler { addTaskToFileQueue(task); } } - - synchronized void removeAllTasksForIngestJob(long ingestJobId) { - Iterator fileTasksIterator = fileTasks.iterator(); - while (fileTasksIterator.hasNext()) { - if (fileTasksIterator.next().getIngestJob().getId() == ingestJobId) { - fileTasksIterator.remove(); - } - } - Iterator directoryTasksIterator = directoryTasks.iterator(); - while (directoryTasksIterator.hasNext()) { - if (directoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { - directoryTasksIterator.remove(); - } - } - Iterator rootDirectoryTasksIterator = rootDirectoryTasks.iterator(); - while (rootDirectoryTasksIterator.hasNext()) { - if (rootDirectoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { - rootDirectoryTasksIterator.remove(); - } - } - Iterator dataSourceTasksIterator = dataSourceTasks.iterator(); - while (dataSourceTasksIterator.hasNext()) { - if (dataSourceTasksIterator.next().getIngestJob().getId() == ingestJobId) { - dataSourceTasksIterator.remove(); - } - } - } private synchronized void updateFileTaskQueues(FileIngestTask taskInProgress) throws InterruptedException { if (taskInProgress != null) { From f8d26589e0d9439e0b5f3d021c9a23faf4eadab3 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Fri, 16 May 2014 15:41:17 -0400 Subject: [PATCH 3/3] Improve IngestJob/IngestScheduler interation --- .../sleuthkit/autopsy/ingest/IngestJob.java | 15 ++- .../autopsy/ingest/IngestScheduler.java | 101 +++++++++++------- 2 files changed, 71 insertions(+), 45 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 5023ff0cfb..ae2bd50bc1 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -68,7 +68,7 @@ final class IngestJob { List errors = job.start(); if (errors.isEmpty()) { IngestManager.getInstance().fireIngestJobStarted(jobId); - taskScheduler.scheduleTasksForIngestJob(job, dataSource); + taskScheduler.addTasksForIngestJob(job, dataSource); } else { ingestJobsById.remove(jobId); } @@ -185,14 +185,16 @@ final class IngestJob { if (!errors.isEmpty()) { logIngestModuleErrors(errors); } + } else { + taskScheduler.removeTasksForIngestJob(id); } - + // Because there is only one data source task per job, it is o.k. to // call ProgressHandle.finish() now that the data source ingest modules - // are through using it via the DataSourceIngestModuleProgress wrapper. + // are through using the progress bar via the DataSourceIngestModuleProgress wrapper. // Calling ProgressHandle.finish() again in finish() will be harmless. - dataSourceTasksProgress.finish(); - + dataSourceTasksProgress.finish(); + if (taskScheduler.isLastTaskForIngestJob(task)) { finish(); } @@ -216,7 +218,10 @@ final class IngestJob { if (!errors.isEmpty()) { logIngestModuleErrors(errors); } + } else { + taskScheduler.removeTasksForIngestJob(id); } + if (taskScheduler.isLastTaskForIngestJob(task)) { finish(); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java index f202c3f2dc..9c003895d4 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java @@ -56,22 +56,15 @@ final class IngestScheduler { private IngestScheduler() { } - synchronized void scheduleTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException { - // Enqueue a data source ingest task for the data source. - // If the thread executing this code is interrupted, tasksInProgressIterator is because the - // the number of ingest threads has been decreased while ingest jobs are - // running. The calling thread will exit in an orderly fashion, but the - // task still needs to be enqueued rather than lost, hence the loop. + synchronized void addTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException { + // Enqueue a data source ingest task for the data source. DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource); - while (true) { - try { - dataSourceTasks.put(task); - break; - } catch (InterruptedException ex) { - // Reset the interrupted status of the thread so the orderly - // exit can occur in the intended place. - Thread.currentThread().interrupt(); - } + try { + dataSourceTasks.put(task); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS + return; } // Get the top level files of the data source. @@ -86,12 +79,12 @@ final class IngestScheduler { try { children = root.getChildren(); if (children.isEmpty()) { - // Add the root object itself, tasksInProgressIterator could be an unallocated space + // Add the root object itself, it could be an unallocated space // file, or a child of a volume or an image. toptLevelFiles.add(root); } else { // The root object is a file system root directory, get - // the files within tasksInProgressIterator. + // the files within it. for (Content child : children) { if (child instanceof AbstractFile) { toptLevelFiles.add((AbstractFile) child); @@ -118,10 +111,43 @@ final class IngestScheduler { void addFileTaskToIngestJob(IngestJob job, AbstractFile file) { FileIngestTask task = new FileIngestTask(job, file); if (shouldEnqueueFileTask(task)) { - addTaskToFileQueue(task); + try { + fileTasks.put(task); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS + } } } - + + synchronized void removeTasksForIngestJob(long ingestJobId) { + // Remove all tasks for this ingest job that are not in progress. + Iterator fileTasksIterator = fileTasks.iterator(); + while (fileTasksIterator.hasNext()) { + if (fileTasksIterator.next().getIngestJob().getId() == ingestJobId) { + fileTasksIterator.remove(); + } + } + Iterator directoryTasksIterator = directoryTasks.iterator(); + while (directoryTasksIterator.hasNext()) { + if (directoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { + directoryTasksIterator.remove(); + } + } + Iterator rootDirectoryTasksIterator = rootDirectoryTasks.iterator(); + while (rootDirectoryTasksIterator.hasNext()) { + if (rootDirectoryTasksIterator.next().getIngestJob().getId() == ingestJobId) { + rootDirectoryTasksIterator.remove(); + } + } + Iterator dataSourceTasksIterator = dataSourceTasks.iterator(); + while (dataSourceTasksIterator.hasNext()) { + if (dataSourceTasksIterator.next().getIngestJob().getId() == ingestJobId) { + dataSourceTasksIterator.remove(); + } + } + } + private synchronized void updateFileTaskQueues(FileIngestTask taskInProgress) throws InterruptedException { if (taskInProgress != null) { tasksInProgress.add(taskInProgress); @@ -134,7 +160,7 @@ final class IngestScheduler { if (fileTasks.isEmpty() == false) { return; } - // fill in the directory queue if tasksInProgressIterator is empty. + // fill in the directory queue if it is empty. if (this.directoryTasks.isEmpty()) { // bail out if root is also empty -- we are done if (rootDirectoryTasks.isEmpty()) { @@ -149,7 +175,13 @@ final class IngestScheduler { final AbstractFile parentFile = parentTask.getFile(); // add itself to the file list if (shouldEnqueueFileTask(parentTask)) { - addTaskToFileQueue(parentTask); + try { + fileTasks.put(parentTask); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS + return; + } } // add its children to the file and directory lists try { @@ -161,7 +193,13 @@ final class IngestScheduler { if (childFile.hasChildren()) { directoryTasks.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - addTaskToFileQueue(childTask); + try { + fileTasks.put(childTask); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS + return; + } } } } @@ -171,26 +209,9 @@ final class IngestScheduler { } } - private void addTaskToFileQueue(FileIngestTask task) { - // If the thread executing this code is interrupted, tasksInProgressIterator is because the - // the number of ingest threads has been decreased while ingest jobs are - // running. The calling thread will exit in an orderly fashion, but the - // task still needs to be enqueued rather than lost. - while (true) { - try { - fileTasks.put(task); - break; - } catch (InterruptedException ex) { - // Reset the interrupted status of the thread so the orderly - // exit can occur in the intended place. - Thread.currentThread().interrupt(); - } - } - } - private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) { final AbstractFile aFile = processTask.getFile(); - //if tasksInProgressIterator's unalloc file, skip if so scheduled + //if it's unalloc file, skip if so scheduled if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { return false; }