diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java index 119d879801..655e9ed2a5 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java @@ -524,11 +524,7 @@ final class DataSourceIngestJob { DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); } else { logger.log(Level.INFO, "Scheduling file level analysis tasks for {0} (jobId={1}), no first stage data source level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS - if (this.files.isEmpty()) { - DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this); - } else { - DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, this.files); - } + DataSourceIngestJob.taskScheduler.fastTrackFileIngestTasks(this, this.files); /** * No data source ingest task has been scheduled for this stage, and @@ -840,7 +836,7 @@ final class DataSourceIngestJob { */ void addFiles(List files) { if (DataSourceIngestJob.Stages.FIRST == this.stage) { - DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, files); + DataSourceIngestJob.taskScheduler.fastTrackFileIngestTasks(this, files); } else { DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index 8895e3a01b..01e529c705 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -20,6 +20,7 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.Iterator; @@ -51,17 +52,17 @@ final class IngestTasksScheduler { private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); @GuardedBy("IngestTasksScheduler.this") private static IngestTasksScheduler instance; - @GuardedBy("this") - private final IngestTaskTrackingQueue dataSourceTaskQueue; + private final IngestTaskTrackingQueue dataSourceIngestThreadQueue; @GuardedBy("this") private final TreeSet rootFileTaskQueue; @GuardedBy("this") - private final Deque directoryFileTaskQueue; - @GuardedBy("this") - private final IngestTaskTrackingQueue fileTaskQueue; + private final Deque pendingFileTaskQueue; + private final IngestTaskTrackingQueue fileIngestThreadsQueue; /** - * Gets the ingest tasks scheduler singleton. + * Gets the ingest tasks scheduler singleton that creates ingest tasks for + * data source ingest jobs, queueing the tasks in priority order for + * execution by the ingest manager's ingest threads. */ synchronized static IngestTasksScheduler getInstance() { if (IngestTasksScheduler.instance == null) { @@ -71,39 +72,40 @@ final class IngestTasksScheduler { } /** - * Constructs an ingest tasks scheduler. + * Constructs an ingest tasks scheduler that creates ingest tasks for data + * source ingest jobs, queueing the tasks in priority order for execution by + * the ingest manager's ingest threads. */ private IngestTasksScheduler() { - this.dataSourceTaskQueue = new IngestTaskTrackingQueue(); + this.dataSourceIngestThreadQueue = new IngestTaskTrackingQueue(); this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator()); - this.directoryFileTaskQueue = new LinkedList<>(); - this.fileTaskQueue = new IngestTaskTrackingQueue(); + this.pendingFileTaskQueue = new LinkedList<>(); + this.fileIngestThreadsQueue = new IngestTaskTrackingQueue(); } /** * Gets the data source level ingest tasks queue. This queue is a blocking - * queue intended for use by the ingest manager's data source ingest - * threads. + * queue used by the ingest manager's data source level ingest thread. * * @return The queue. */ BlockingIngestTaskQueue getDataSourceIngestTaskQueue() { - return this.dataSourceTaskQueue; + return this.dataSourceIngestThreadQueue; } /** * Gets the file level ingest tasks queue. This queue is a blocking queue - * intended for use by the ingest manager's file ingest threads. + * used by the ingest manager's file level ingest threads. * * @return The queue. */ BlockingIngestTaskQueue getFileIngestTaskQueue() { - return this.fileTaskQueue; + return this.fileIngestThreadsQueue; } /** - * Schedules a data source level ingest task and file level ingest tasks for - * a data source ingest job. + * Schedules a data source level ingest task and zero to many file level + * ingest tasks for a data source ingest job. * * @param job The data source ingest job. */ @@ -118,7 +120,7 @@ final class IngestTasksScheduler { * job are completed. */ this.scheduleDataSourceIngestTask(job); - this.scheduleFileIngestTasks(job); + this.scheduleFileIngestTasks(job, Collections.emptyList()); } } @@ -127,26 +129,34 @@ final class IngestTasksScheduler { * * @param job The data source ingest job. */ - synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { + synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { // RJCTODO: Should this throw instead? if (!job.isCancelled()) { DataSourceIngestTask task = new DataSourceIngestTask(job); try { - this.dataSourceTaskQueue.putLast(task); + this.dataSourceIngestThreadQueue.putLast(task); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling data source level ingest task (jobId={%d)", job.getId()), ex); + IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex); Thread.currentThread().interrupt(); } } } /** - * Schedules file level ingest tasks for a data source ingest job. + * Schedules file tasks for either all the files or a given subset of the + * files for a data source source ingest job. * - * @param job The data source ingest job. + * @param job The data source ingest job. + * @param files A subset of the files for the data source; if empty, then + * file tasks for all files in the data source are scheduled. */ - synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) { + synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { - List candidateFiles = getTopLevelFiles(job.getDataSource()); + Collection candidateFiles; + if (files.isEmpty()) { + candidateFiles = getTopLevelFiles(job.getDataSource()); + } else { + candidateFiles = files; + } for (AbstractFile file : candidateFiles) { FileIngestTask task = new FileIngestTask(job, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { @@ -158,60 +168,28 @@ final class IngestTasksScheduler { } /** - * Schedules file level ingest tasks for a subset of the files for a data - * source ingest job. + * Schedules file level ingest tasks for a given set of files for a data + * source ingest job by adding them directly to the front of the file tasks + * queue for the ingest manager's file ingest threads. * * @param job The data source ingest job. - * @param files A subset of the files for the data source. + * @param files A set of files for the data source. */ - synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { + synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { + /* + * Put the files directly into the queue for the file ingest + * threads, if they pass the file filter for the job. The files are + * added to the queue for the ingest threads BEFORE the other queued + * tasks because the use case for this method is scheduling new + * carved or derived files from a higher priority task that is + * already in progress. + */ for (AbstractFile file : files) { - /* - * If the current file or directory has children, try to queue - * tasks for the children. Each child task will go into either - * the directory queue if it is a directory, or directly into - * the queue for the file ingest threads, if it passes the - * filter for the job. The child files are added to the queue - * for the ingest threads BEFORE the other queued tasks because - * the primary use case for this method is adding derived files - * from a higher priority task that preceded the tasks currently - * in the queue. - */ - try { - for (Content child : file.getChildren()) { - if (child instanceof AbstractFile) { - AbstractFile childFile = (AbstractFile) child; - FileIngestTask childTask = new FileIngestTask(job, childFile); - if (childFile.hasChildren()) { - this.directoryFileTaskQueue.add(childTask); - } else if (shouldEnqueueFileTask(childTask)) { - try { - this.fileTaskQueue.putFirst(childTask); - } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex); - Thread.currentThread().interrupt(); - return; - } - } - } - } - } catch (TskCoreException ex) { - logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS - } - - /* - * Put the file directly into the queue for the file ingest - * threads, if it passes the filter for the job. The file is - * added to the queue for the ingest threads BEFORE the other - * queued tasks because the primary use case for this method is - * adding derived files from a higher priority task that - * preceded the tasks currently in the queue. - */ - FileIngestTask task = new FileIngestTask(job, file); - if (shouldEnqueueFileTask(task)) { + FileIngestTask fileTask = new FileIngestTask(job, file); + if (shouldEnqueueFileTask(fileTask)) { try { - this.fileTaskQueue.putFirst(task); + this.fileIngestThreadsQueue.putFirst(fileTask); } catch (InterruptedException ex) { IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex); Thread.currentThread().interrupt(); @@ -219,7 +197,6 @@ final class IngestTasksScheduler { } } } - this.shuffleFileTaskQueues(); } } @@ -230,7 +207,7 @@ final class IngestTasksScheduler { * @param task The completed task. */ synchronized void notifyTaskCompleted(DataSourceIngestTask task) { - this.dataSourceTaskQueue.taskCompleted(task); + this.dataSourceIngestThreadQueue.taskCompleted(task); } /** @@ -240,7 +217,7 @@ final class IngestTasksScheduler { * @param task The completed task. */ synchronized void notifyTaskCompleted(FileIngestTask task) { - this.fileTaskQueue.taskCompleted(task); + this.fileIngestThreadsQueue.taskCompleted(task); shuffleFileTaskQueues(); } @@ -254,10 +231,10 @@ final class IngestTasksScheduler { */ synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { long jobId = job.getId(); - return !this.dataSourceTaskQueue.hasTasksForJob(jobId) - && !hasTasksForJob(this.rootFileTaskQueue, jobId) - && !hasTasksForJob(this.directoryFileTaskQueue, jobId) - && !this.fileTaskQueue.hasTasksForJob(jobId); + return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId) + || hasTasksForJob(this.rootFileTaskQueue, jobId) + || hasTasksForJob(this.pendingFileTaskQueue, jobId) + || this.fileIngestThreadsQueue.hasTasksForJob(jobId)); } /** @@ -270,7 +247,7 @@ final class IngestTasksScheduler { synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { long jobId = job.getId(); IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId); - IngestTasksScheduler.removeTasksForJob(this.directoryFileTaskQueue, jobId); + IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId); } /** @@ -325,16 +302,18 @@ final class IngestTasksScheduler { * root objects of the data sources that are being analyzed. For example, * the root tasks for a disk image data source are typically the tasks for * the contents of the root directories of the file systems. This queue is a - * priority queue that attempts to ensure that user directory content is - * analyzed before general file system content. It feeds into the directory - * tasks queue. + * priority queue that attempts to ensure that user content is analyzed + * before general file system content. It feeds into the pending tasks + * queue. * - * 2. The directory file tasks queue, which contains root file tasks - * shuffled out of the root tasks queue, plus directory tasks discovered in - * the descent from the root tasks to the final leaf tasks in the content - * trees that are being analyzed for the data source ingest jobs. This queue - * is a FIFO queue. It feeds into the file tasks queue for the ingest - * manager's file ingest threads. + * 2. The pending file tasks queue, which contains root file tasks shuffled + * out of the root tasks queue, plus tasks for files with children + * discovered in the descent from the root tasks to the final leaf tasks in + * the content trees that are being analyzed for the data source ingest + * jobs. This queue is a FIFO queue that attempts to throttle the total + * number of file tasks by deferring queueing tasks for the children of + * files until the queue for the file ingest threads is emptied. It feeds + * into the file tasks queue for the ingest manager's file ingest threads. * * 3. The file tasks queue for the ingest manager's file ingest threads. * This queue is a blocking deque that is FIFO during a shuffle to maintain @@ -343,69 +322,69 @@ final class IngestTasksScheduler { * files derived from prioritized files. */ synchronized private void shuffleFileTaskQueues() { - while (this.fileTaskQueue.isEmpty()) { + while (this.fileIngestThreadsQueue.isEmpty()) { /* - * If the directory file task queue is empty, move the highest - * priority root file task, if there is one, into it. If both the - * root and the directory file task queues are empty, there is - * nothing left to shuffle, so exit. + * If the pending file task queue is empty, move the highest + * priority root file task, if there is one, into it. */ - if (this.directoryFileTaskQueue.isEmpty()) { - if (!this.rootFileTaskQueue.isEmpty()) { - this.directoryFileTaskQueue.addLast(this.rootFileTaskQueue.pollFirst()); - } else { - return; + if (this.pendingFileTaskQueue.isEmpty()) { + final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst(); + if (rootTask != null) { + this.pendingFileTaskQueue.addLast(rootTask); } } /* - * Try to move the next task from the directory task queue into the + * Try to move the next task from the pending task queue into the * queue for the file ingest threads, if it passes the filter for * the job. */ - final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollFirst(); - if (shouldEnqueueFileTask(directoryTask)) { + final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst(); + if (pendingTask == null) { + return; + } + if (shouldEnqueueFileTask(pendingTask)) { try { /* * The task is added to the queue for the ingest threads * AFTER the higher priority tasks that preceded it. */ - this.fileTaskQueue.putLast(directoryTask); + this.fileIngestThreadsQueue.putLast(pendingTask); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex); Thread.currentThread().interrupt(); - break; + return; } } /* - * If the root or directory task that was just queued for the file - * ingest threads has children, try to queue tasks for the children. - * Each child task will go into either the directory queue if it is - * a directory, or into the queue for the file ingest threads, if it - * passes the filter for the job. + * If the task that was just queued for the file ingest threads has + * children, try to queue tasks for the children. Each child task + * will go into either the directory queue if it has children of its + * own, or into the queue for the file ingest threads, if it passes + * the filter for the job. */ - final AbstractFile directory = directoryTask.getFile(); + final AbstractFile file = pendingTask.getFile(); try { - for (Content child : directory.getChildren()) { + for (Content child : file.getChildren()) { if (child instanceof AbstractFile) { AbstractFile childFile = (AbstractFile) child; - FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile); + FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJob(), childFile); if (childFile.hasChildren()) { - this.directoryFileTaskQueue.add(childTask); + this.pendingFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { try { - this.fileTaskQueue.putLast(childTask); + this.fileIngestThreadsQueue.putLast(childTask); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex); Thread.currentThread().interrupt(); - break; + return; } } } } } catch (TskCoreException ex) { - logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS + logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS } } } @@ -831,11 +810,11 @@ final class IngestTasksScheduler { */ IngestJobTasksSnapshot(long jobId) { this.jobId = jobId; - this.dsQueueSize = IngestTasksScheduler.this.dataSourceTaskQueue.countQueuedTasksForJob(jobId); + this.dsQueueSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId); this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId); - this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTaskQueue, jobId); - this.fileQueueSize = IngestTasksScheduler.this.fileTaskQueue.countQueuedTasksForJob(jobId);; - this.runningListSize = IngestTasksScheduler.this.dataSourceTaskQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileTaskQueue.countRunningTasksForJob(jobId); + this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTaskQueue, jobId); + this.fileQueueSize = IngestTasksScheduler.this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId);; + this.runningListSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileIngestThreadsQueue.countRunningTasksForJob(jobId); } /** diff --git a/Core/test/filter_test1.img b/Core/test/filter_test1.img new file mode 100755 index 0000000000..69b6e21b6c Binary files /dev/null and b/Core/test/filter_test1.img differ