From 7d2657ff70eb6b5c1064a95c726c7d1c1c2e1cd7 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 19 Mar 2018 10:27:19 -0400 Subject: [PATCH 1/3] Fix IngestTasksScheduler, IngestManager concurrency issues --- .../autopsy/ingest/DataSourceIngestJob.java | 8 +- .../autopsy/ingest/IngestManager.java | 38 +- .../autopsy/ingest/IngestTasksScheduler.java | 481 ++++++++++-------- 3 files changed, 289 insertions(+), 238 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java index a41acd1c58..ebdfb6146f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java @@ -518,7 +518,7 @@ final class DataSourceIngestJob { */ if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) { logger.log(Level.INFO, "Scheduling first stage data source and file level analysis tasks for {0} (jobId={1})", new Object[]{dataSource.getName(), this.id}); //NON-NLS - DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this, this.files); + DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this); } else if (this.hasFirstStageDataSourceIngestPipeline()) { logger.log(Level.INFO, "Scheduling first stage data source level analysis tasks for {0} (jobId={1}), no file level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); @@ -827,7 +827,7 @@ final class DataSourceIngestJob { } /** - * Adds more files from the data source for this job to the job, i.e., adds + * Adds more files from the data source for this job to the job, e.g., adds * extracted or carved files. Not currently supported for the second stage * of the job. * @@ -835,9 +835,7 @@ final class DataSourceIngestJob { */ void addFiles(List files) { if (DataSourceIngestJob.Stages.FIRST == this.stage) { - for (AbstractFile file : files) { - DataSourceIngestJob.taskScheduler.scheduleFastTrackedFileIngestTask(this, file); - } + DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, files); } else { DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 65d0162d6d..218626985a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -121,7 +121,7 @@ public class IngestManager { private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L); private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS; private final Map> startIngestJobFutures = new ConcurrentHashMap<>(); - private final Map ingestJobsById = new ConcurrentHashMap<>(); + private final Map ingestJobsById = new HashMap<>(); private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS; private final ExecutorService fileLevelIngestJobTasksExecutor; private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS; @@ -399,13 +399,17 @@ public class IngestManager { ingestMonitor.start(); } - ingestJobsById.put(job.getId(), job); + synchronized (ingestJobsById) { + ingestJobsById.put(job.getId(), job); + } errors = job.start(); if (errors.isEmpty()) { this.fireIngestJobStarted(job.getId()); IngestManager.logger.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS } else { - this.ingestJobsById.remove(job.getId()); + synchronized (ingestJobsById) { + this.ingestJobsById.remove(job.getId()); + } for (IngestModuleError error : errors) { logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS } @@ -438,7 +442,9 @@ public class IngestManager { */ void finishIngestJob(IngestJob job) { long jobId = job.getId(); - ingestJobsById.remove(jobId); + synchronized (ingestJobsById) { + ingestJobsById.remove(jobId); + } if (!job.isCancelled()) { IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS fireIngestJobCompleted(jobId); @@ -455,7 +461,9 @@ public class IngestManager { * @return True or false. */ public boolean isIngestRunning() { - return !ingestJobsById.isEmpty(); + synchronized (ingestJobsById) { + return !ingestJobsById.isEmpty(); + } } /** @@ -467,9 +475,11 @@ public class IngestManager { startIngestJobFutures.values().forEach((handle) -> { handle.cancel(true); }); - this.ingestJobsById.values().forEach((job) -> { - job.cancel(reason); - }); + synchronized (ingestJobsById) { + this.ingestJobsById.values().forEach((job) -> { + job.cancel(reason); + }); + } } /** @@ -770,9 +780,11 @@ public class IngestManager { */ List getIngestJobSnapshots() { List snapShots = new ArrayList<>(); - ingestJobsById.values().forEach((job) -> { - snapShots.addAll(job.getDataSourceIngestJobSnapshots()); - }); + synchronized (ingestJobsById) { + ingestJobsById.values().forEach((job) -> { + snapShots.addAll(job.getDataSourceIngestJobSnapshots()); + }); + } return snapShots; } @@ -808,7 +820,9 @@ public class IngestManager { public Void call() { try { if (Thread.currentThread().isInterrupted()) { - ingestJobsById.remove(job.getId()); + synchronized (ingestJobsById) { + ingestJobsById.remove(job.getId()); + } return null; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index b9cee95687..7fea572eed 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -21,12 +21,13 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashSet; +import java.util.Deque; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; @@ -40,64 +41,20 @@ import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.datamodel.TskData; /** - * Creates ingest tasks for ingest jobs, queuing the tasks in priority order for - * execution by the ingest manager's ingest threads. + * Creates ingest tasks for data source ingest jobs, queueing the tasks in + * priority order for execution by the ingest manager's ingest threads. */ final class IngestTasksScheduler { - private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); + private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static IngestTasksScheduler instance; - - /** - * Scheduling of data source ingest tasks is accomplished by putting them in - * a FIFO queue to be consumed by the ingest threads, so the queue is - * wrapped in a "dispenser" that implements the IngestTaskQueue interface - * and is exposed via a getter method. - */ - private final LinkedBlockingQueue pendingDataSourceTasks; - private final DataSourceIngestTaskQueue dataSourceTasksDispenser; - - /** - * Scheduling of file ingest tasks is accomplished by "shuffling" them - * through a sequence of internal queues that allows for the interleaving of - * tasks from different ingest jobs based on priority. These scheduling - * queues are: - * - * 1. Root directory tasks (priority queue) - * - * 2. Directory tasks (FIFO queue) - * - * 3. Pending file tasks (LIFO queue). - * - * The pending file tasks queue is LIFO to handle large numbers of files - * extracted from archive files. At least one image has been processed that - * had a folder full of archive files. The queue grew to have thousands of - * entries, as each successive archive file was expanded, so now extracted - * files get added to the front of the queue so that in such a scenario they - * would be processed before the expansion of the next archive file. - * - * Tasks in the pending file tasks queue are ready to be consumed by the - * ingest threads, so the queue is wrapped in a "dispenser" that implements - * the IngestTaskQueue interface and is exposed via a getter method. - */ - private final TreeSet rootDirectoryTasks; - private final List directoryTasks; - private final BlockingDeque pendingFileTasks; - private final FileIngestTaskQueue fileTasksDispenser; - - /** - * The ingest tasks scheduler allows ingest jobs to query it to see if there - * are any tasks in progress for the job. To make this possible, the ingest - * tasks scheduler needs to keep track not only of the tasks in its queues, - * but also of the tasks that have been handed out for processing by the - * ingest threads. Therefore all ingest tasks are added to this list when - * they are created and are not removed when an ingest thread takes an - * ingest task. Instead, the ingest thread calls back into the scheduler - * when the task is completed, at which time the task will be removed from - * this list. - */ - private final Set tasksInProgress; + private final List activeDataSourceTasks; + private final DataSourceIngestTaskQueue dataSourceTaskQueue; + private final TreeSet rootFileTasks; + private final Deque directoryFileTasks; + private final Deque activeFileTasks; + private final FileIngestTaskQueue fileTaskQueue; /** * Gets the ingest tasks scheduler singleton. @@ -113,52 +70,54 @@ final class IngestTasksScheduler { * Constructs an ingest tasks scheduler. */ private IngestTasksScheduler() { - this.pendingDataSourceTasks = new LinkedBlockingQueue<>(); - this.dataSourceTasksDispenser = new DataSourceIngestTaskQueue(); - this.rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); - this.directoryTasks = new ArrayList<>(); - this.pendingFileTasks = new LinkedBlockingDeque<>(); - this.fileTasksDispenser = new FileIngestTaskQueue(); - this.tasksInProgress = new HashSet<>(); + this.activeDataSourceTasks = new ArrayList<>(); + this.dataSourceTaskQueue = new DataSourceIngestTaskQueue(); + this.rootFileTasks = new TreeSet<>(new RootDirectoryTaskComparator()); + this.directoryFileTasks = new LinkedList<>(); + this.activeFileTasks = new LinkedList<>(); + this.fileTaskQueue = new FileIngestTaskQueue(); } /** - * Gets this ingest task scheduler's implementation of the IngestTaskQueue - * interface for data source ingest tasks. + * Gets the data source level ingest tasks queue. The queue is a blocking + * queue intended for use by data source ingest threads. * - * @return The data source ingest tasks queue. + * @return The queue. */ IngestTaskQueue getDataSourceIngestTaskQueue() { - return this.dataSourceTasksDispenser; + return this.dataSourceTaskQueue; } /** - * Gets this ingest task scheduler's implementation of the IngestTaskQueue - * interface for file ingest tasks. + * Gets the file level ingest tasks queue for file ingest threads. The queue + * is a blocking queue intended for use by file ingest threads. * - * @return The file ingest tasks queue. + * @return The queue. */ IngestTaskQueue getFileIngestTaskQueue() { - return this.fileTasksDispenser; + return this.fileTaskQueue; } /** * Schedules a data source level ingest task and file level ingest tasks for - * an ingest job. Either all of the files in the data source or a given - * subset of the files will be scheduled. + * a data source ingest job. Either all of the files in the data source or a + * given subset of the files will be scheduled. * * @param job The data source ingest job. - * @param files A subset of the files for the data source. + * @param files A subset of the files for the data source, possibly empty. */ - synchronized void scheduleIngestTasks(DataSourceIngestJob job, List files) { + synchronized void scheduleIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { - // Scheduling of both a data source ingest task and file ingest tasks - // for a job must be an atomic operation. Otherwise, the data source - // task might be completed before the file tasks are scheduled, - // resulting in a potential false positive when another thread checks - // whether or not all the tasks for the job are completed. + /* + * Scheduling of both the data source ingest task and the initial + * file ingest tasks for a job must be an atomic operation. + * Otherwise, the data source task might be completed before the + * file tasks are scheduled, resulting in a potential false positive + * when another thread checks whether or not all the tasks for the + * job are completed. + */ this.scheduleDataSourceIngestTask(job); - this.scheduleFileIngestTasks(job, files); + this.scheduleFileIngestTasks(job); } } @@ -170,15 +129,12 @@ final class IngestTasksScheduler { synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { if (!job.isCancelled()) { DataSourceIngestTask task = new DataSourceIngestTask(job); - this.tasksInProgress.add(task); + this.activeDataSourceTasks.add(task); try { - this.pendingDataSourceTasks.put(task); + this.dataSourceTaskQueue.add(task); } catch (InterruptedException ex) { - /** - * The current thread was interrupted while blocked on a full - * queue. Discard the task and reset the interrupted flag. - */ - this.tasksInProgress.remove(task); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); + this.activeDataSourceTasks.remove(task); Thread.currentThread().interrupt(); } } @@ -190,21 +146,15 @@ final class IngestTasksScheduler { * be scheduled. * * @param job The data source ingest job. - * @param files A subset of the files for the data source. + * @param files A subset of the files for the data source, possibly empty. */ - synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, List files) { + synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { - List candidateFiles = new ArrayList<>(); - if (files.isEmpty()) { - getTopLevelFiles(job.getDataSource(), candidateFiles); - } else { - candidateFiles.addAll(files); - } - for (AbstractFile firstLevelFile : candidateFiles) { - FileIngestTask task = new FileIngestTask(job, firstLevelFile); + List candidateFiles = getTopLevelFiles(job.getDataSource()); + for (AbstractFile file : candidateFiles) { + FileIngestTask task = new FileIngestTask(job, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.tasksInProgress.add(task); - this.rootDirectoryTasks.add(task); + this.rootFileTasks.add(task); } } shuffleFileTaskQueues(); @@ -212,73 +162,113 @@ final class IngestTasksScheduler { } /** - * Schedules a file ingest task for a data source ingest job. The task that - * is created is added directly to the pending file tasks queues, i.e., it - * is "fast tracked." + * Schedules file level ingest tasks for a subset of the files in a data + * source ingest job. * - * @param job The data source ingest job. - * @param file A file. + * @param job The data source ingest job. + * @param files A subset of the files for the data source. */ - synchronized void scheduleFastTrackedFileIngestTask(DataSourceIngestJob job, AbstractFile file) { + synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { - FileIngestTask task = new FileIngestTask(job, file); - if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.tasksInProgress.add(task); - addToPendingFileTasksQueue(task); + final Deque newTasksForIngestThreads = new LinkedList<>(); + for (AbstractFile file : files) { + /* + * The file will be added directly to the front of the queue for + * the ingest threads. + */ + FileIngestTask task = new FileIngestTask(job, file); + if (shouldEnqueueFileTask(task)) { + this.activeFileTasks.addLast(task); // RJCTODO: CHeck this in other method + newTasksForIngestThreads.addLast(task); + } + + /* + * Add the children of the file, if any, either to the front of + * the queue for the ingest threads, in front of the parent + * directory task, or to the end directory task queue. + */ + try { + for (Content child : file.getChildren()) { + if (child instanceof AbstractFile) { + AbstractFile childFile = (AbstractFile) child; + FileIngestTask childTask = new FileIngestTask(job, childFile); + if (childFile.hasChildren()) { + this.directoryFileTasks.add(childTask); + } else if (shouldEnqueueFileTask(childTask)) { + this.activeFileTasks.addLast(childTask); + newTasksForIngestThreads.addFirst(childTask); + } + } + } + } catch (TskCoreException ex) { + logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS + } + } + + /* + * Add the newly active tasks into the queue for the file ingest + * threads, AFTER the higher priority tasks that are already queued. + */ + for (FileIngestTask newTask : newTasksForIngestThreads) { + try { + this.fileTaskQueue.tasks.putLast(newTask); + } catch (InterruptedException ex) { + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); // RJCTODO: Should this propagate? Correct message + this.activeFileTasks.remove(newTask); + Thread.currentThread().interrupt(); + break; + } } } } /** - * Allows an ingest thread to notify this ingest task scheduler that a task - * has been completed. + * Allows an ingest thread to notify this ingest task scheduler that a data + * source level task has been completed. * * @param task The completed task. */ - synchronized void notifyTaskCompleted(IngestTask task) { - tasksInProgress.remove(task); + synchronized void notifyTaskCompleted(DataSourceIngestTask task) { + this.activeDataSourceTasks.remove(task); + shuffleFileTaskQueues(); } /** - * Queries the task scheduler to determine whether or not all current ingest - * tasks for an ingest job are completed. + * Allows an ingest thread to notify this ingest task scheduler that a file + * level task has been completed. * - * @param job The job for which the query is to be performed. + * @param task + */ + synchronized void notifyTaskCompleted(FileIngestTask task) { + this.activeFileTasks.remove(task); + shuffleFileTaskQueues(); + } + + /** + * Queries the task scheduler to determine whether or not all of the ingest + * tasks for an ingest job have been completed. + * + * @param job The data source ingest job * * @return True or false. */ synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { - for (IngestTask task : tasksInProgress) { - if (task.getIngestJob().getId() == job.getId()) { - return false; - } - } - return true; + return !hasTasksForJob(this.activeDataSourceTasks, job) + && !hasTasksForJob(this.rootFileTasks, job) + && !hasTasksForJob(this.directoryFileTasks, job) + && !hasTasksForJob(this.activeFileTasks, job); } /** - * Clears the "upstream" task scheduling queues for an ingest job, but does - * nothing about tasks that have already been shuffled into the concurrently - * accessed blocking queues shared with the ingest threads. Note that tasks - * in the "downstream" queues or already taken by the ingest threads will be - * flushed out when the ingest threads call back with their task completed - * notifications. + * Clears the "upstream" task scheduling queues for a data source ingest + * job, but does nothing about tasks that have already been activated, i.e., + * moved into the queue that is consumed by the file ingest threads. * - * @param job The job for which the tasks are to to canceled. + * @param job The data source ingest job */ synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { - /** - * This code should not flush the blocking queues that are concurrently - * accessed by the ingest threads. This is because the "lock striping" - * and "weakly consistent" iterators of these collections make it so - * that this code could have a different view of the queues than the - * ingest threads. It does clean out the directory level tasks before - * they are exploded into file tasks. - */ - long jobId = job.getId(); - this.removeTasksForJob(this.rootDirectoryTasks, jobId); - this.removeTasksForJob(this.directoryTasks, jobId); - this.shuffleFileTaskQueues(); + this.removeTasksForJob(this.rootFileTasks, job); + this.removeTasksForJob(this.directoryFileTasks, job); } /** @@ -289,7 +279,8 @@ final class IngestTasksScheduler { * @param dataSource The data source. * @param topLevelFiles The top level files are added to this list. */ - private static void getTopLevelFiles(Content dataSource, List topLevelFiles) { + private static List getTopLevelFiles(Content dataSource) { + List topLevelFiles = new ArrayList<>(); Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { // The data source is itself a file to be processed. @@ -317,74 +308,108 @@ final class IngestTasksScheduler { } } } + return topLevelFiles; } /** - * "Shuffles" the file task queues to ensure that there is at least one task - * in the pending file ingest tasks queue, as long as there are still file - * ingest tasks to be performed. + * Intelligently queues file ingest tasks for the ingest manager's file + * ingest threads by "shuffling" them through a sequence of queues that + * allows for the interleaving of tasks from different data source ingest + * jobs based on priority. The sequence of queues is: + * + * 1. The root file tasks priority queue, which contains the tasks for the + * roots of data source content sub trees that are being analyzed for the + * data source ingest jobs. For example, typical root tasks for a disk image + * data source would be the tasks for the contents of the root directories + * of the file systems. This queue is a priority queue that attempts to + * ensure that user directory content is analyzed before general file system + * content. It feeds into the directory tasks queue. + * + * 2. The directory file tasks queue, which contains directory tasks + * discovered in the descent through the content sub trees that are being + * analyzed for the data source ingest jobs. It feeds into the active tasks + * queue. + * + * 3. The active file tasks queue, a queue of the file tasks that are either + * in the tasks queue for the file ingest threads or are in the process of + * being analyzed in a file ingest thread. + * + * 4. The file tasks queue for the ingest manager's file ingest threads. */ synchronized private void shuffleFileTaskQueues() { - // This is synchronized because it is called both by synchronized - // methods of this ingest scheduler and an unsynchronized method of its - // file tasks "dispenser". - while (true) { - // Loop until either the pending file tasks queue is NOT empty - // or the upstream queues that feed into it ARE empty. - if (!this.pendingFileTasks.isEmpty()) { - // There are file tasks ready to be consumed, exit. - return; - } - if (this.directoryTasks.isEmpty()) { - if (this.rootDirectoryTasks.isEmpty()) { - // There are no root directory tasks to move into the - // directory queue, exit. - return; + final Deque newTasksForIngestThreads = new LinkedList<>(); + while (this.activeFileTasks.isEmpty()) { + /* + * If the directory file task queue is empty, move the highest + * priority root file task, if there is one, into it. If both the + * root and the directory file task queuess are empty, there is + * nothing left to do. + */ + if (this.directoryFileTasks.isEmpty()) { + if (!this.rootFileTasks.isEmpty()) { + this.directoryFileTasks.add(this.rootFileTasks.pollFirst()); } else { - // Move the next root directory task into the - // directories queue. Note that the task was already - // added to the tasks in progress list when the task was - // created in scheduleFileIngestTasks(). - this.directoryTasks.add(this.rootDirectoryTasks.pollFirst()); + return; } } - // Try to add the most recently added directory from the - // directory tasks queue to the pending file tasks queue. - FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1); + /* + * Try to move the next task from the directory task queue into the + * active file tasks tracking queue, if it passes the filter for the + * job. + */ + final FileIngestTask directoryTask = this.directoryFileTasks.pollLast(); if (shouldEnqueueFileTask(directoryTask)) { - addToPendingFileTasksQueue(directoryTask); - } else { - this.tasksInProgress.remove(directoryTask); + this.activeFileTasks.addFirst(directoryTask); + newTasksForIngestThreads.addFirst(directoryTask); } - // If the directory contains subdirectories or files, try to - // enqueue tasks for them as well. + /* + * If the file or directory from the next that was just activated + * has children, try to queue tasks for the children. Each child + * will go into the directory task queue if it is a directory, or + * into the active file tasks tracking queue if it passes the filter + * for the job. + */ final AbstractFile directory = directoryTask.getFile(); try { for (Content child : directory.getChildren()) { if (child instanceof AbstractFile) { - AbstractFile file = (AbstractFile) child; - FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file); - if (file.hasChildren()) { - // Found a subdirectory, put the task in the - // pending directory tasks queue. Note the - // addition of the task to the tasks in progress - // list. This is necessary because this is the - // first appearance of this task in the queues. - this.tasksInProgress.add(childTask); - this.directoryTasks.add(childTask); + AbstractFile childFile = (AbstractFile) child; + FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile); + if (childFile.hasChildren()) { + this.directoryFileTasks.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - // Found a file, put the task directly into the - // pending file tasks queue. - this.tasksInProgress.add(childTask); - addToPendingFileTasksQueue(childTask); + this.activeFileTasks.add(directoryTask); + /* + * Queue the child file tasks for the ingest threads + * in front of parent directory tasks. + */ + newTasksForIngestThreads.addFirst(directoryTask); } } } } catch (TskCoreException ex) { - String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS - logger.log(Level.SEVERE, errorMessage, ex); + logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS + } + + /* + * Add the newly active tasks into the queue for the file ingest + * threads, AFTER the higher priority tasks that are already queued. + */ + for (FileIngestTask newTask : newTasksForIngestThreads) { + try { + this.fileTaskQueue.tasks.putLast(newTask); + } catch (InterruptedException ex) { + /** + * The current thread was interrupted while blocked on a + * full queue. Discard the task and reset the interrupted + * flag. + */ + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); // RJCTODO: Should this propagate? Correct message + this.activeFileTasks.remove(newTask); + Thread.currentThread().interrupt(); + } } } } @@ -463,44 +488,44 @@ final class IngestTasksScheduler { } /** - * Adds a file ingest task to the blocking pending tasks queue. + * Checks whether or not a collection of ingest tasks includes a task for a + * given data source ingest job. * - * @param task The task to add. + * @param tasks The tasks. + * @param job The data source ingest job. + * + * @return True if there are no tasks for the job, false otherwise. */ - synchronized private void addToPendingFileTasksQueue(FileIngestTask task) { - try { - this.pendingFileTasks.putFirst(task); - } catch (InterruptedException ex) { - /** - * The current thread was interrupted while blocked on a full queue. - * Discard the task and reset the interrupted flag. - */ - this.tasksInProgress.remove(task); - Thread.currentThread().interrupt(); + synchronized private boolean hasTasksForJob(Collection tasks, DataSourceIngestJob job) { + long jobId = job.getId(); + for (IngestTask task : tasks) { + if (task.getIngestJob().getId() == jobId) { + return true; + } } + return false; } /** - * Removes all of the ingest tasks associated with an ingest job from a - * tasks queue. The task is removed from the the tasks in progress list as - * well. + * Removes all of the ingest tasks associated with a data source ingest job + * from a tasks collection. * - * @param taskQueue The queue from which to remove the tasks. - * @param jobId The id of the job for which the tasks are to be removed. + * @param tasks The collection from which to remove the tasks. + * @param job THe data source ingest job. */ - synchronized private void removeTasksForJob(Collection taskQueue, long jobId) { - Iterator iterator = taskQueue.iterator(); + synchronized private void removeTasksForJob(Collection tasks, DataSourceIngestJob job) { + long jobId = job.getId(); + Iterator iterator = tasks.iterator(); while (iterator.hasNext()) { IngestTask task = iterator.next(); if (task.getIngestJob().getId() == jobId) { - this.tasksInProgress.remove(task); iterator.remove(); } } } /** - * Counts the number of ingest tasks in a task queue for a given job. + * Counts the number of ingest tasks in a tasks collection for a given job. * * @param queue The queue for which to count tasks. * @param jobId The id of the job for which the tasks are to be counted. @@ -511,7 +536,7 @@ final class IngestTasksScheduler { Iterator iterator = queue.iterator(); int count = 0; while (iterator.hasNext()) { - IngestTask task = (IngestTask) iterator.next(); + IngestTask task = iterator.next(); if (task.getIngestJob().getId() == jobId) { count++; } @@ -549,8 +574,15 @@ final class IngestTasksScheduler { } } + /** + * Used to prioritize file ingest tasks in the root tasks queue so that + * user content is processed first. + */ private static class AbstractFilePriority { + private AbstractFilePriority() { + } + enum Priority { LAST, LOW, MEDIUM, HIGH @@ -647,10 +679,17 @@ final class IngestTasksScheduler { */ private final class DataSourceIngestTaskQueue implements IngestTaskQueue { + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + @Override public IngestTask getNextTask() throws InterruptedException { - return IngestTasksScheduler.this.pendingDataSourceTasks.take(); + return tasks.take(); } + + private void add(DataSourceIngestTask task) throws InterruptedException { + this.tasks.put(task); + } + } /** @@ -659,11 +698,11 @@ final class IngestTasksScheduler { */ private final class FileIngestTaskQueue implements IngestTaskQueue { + private final BlockingDeque tasks = new LinkedBlockingDeque<>(); + @Override public IngestTask getNextTask() throws InterruptedException { - FileIngestTask task = IngestTasksScheduler.this.pendingFileTasks.takeFirst(); - shuffleFileTaskQueues(); - return task; + return tasks.takeFirst(); } } @@ -674,10 +713,10 @@ final class IngestTasksScheduler { class IngestJobTasksSnapshot { private final long jobId; + private final long dsQueueSize; private final long rootQueueSize; private final long dirQueueSize; private final long fileQueueSize; - private final long dsQueueSize; private final long runningListSize; /** @@ -687,11 +726,11 @@ final class IngestTasksScheduler { */ IngestJobTasksSnapshot(long jobId) { this.jobId = jobId; - this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootDirectoryTasks, jobId); - this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId); - this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId); - this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId); - this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgress, jobId); + this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTasks, jobId); + this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTasks, jobId); + this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.fileTaskQueue.tasks, jobId); + this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.dataSourceTaskQueue.tasks, jobId); + this.runningListSize = countTasksForJob(IngestTasksScheduler.this.activeDataSourceTasks, jobId) + countTasksForJob(IngestTasksScheduler.this.activeFileTasks, jobId); } /** From 0d4a2315cd67659f48eed00a126b7ba64d2ba9ce Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 19 Mar 2018 19:09:36 -0400 Subject: [PATCH 2/3] Fix IngestTasksScheduler, IngestManager concurrency issues --- .../autopsy/ingest/IngestTasksScheduler.java | 295 +++++++++--------- 1 file changed, 155 insertions(+), 140 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index 7fea572eed..2bc1769bc6 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -49,12 +49,12 @@ final class IngestTasksScheduler { private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static IngestTasksScheduler instance; - private final List activeDataSourceTasks; - private final DataSourceIngestTaskQueue dataSourceTaskQueue; - private final TreeSet rootFileTasks; - private final Deque directoryFileTasks; - private final Deque activeFileTasks; - private final FileIngestTaskQueue fileTaskQueue; + private final DataSourceIngestTaskQueue dataSourceTaskQueueForIngestThreads; + private final List queuedAndRunningDataSourceTasks; + private final TreeSet rootFileTaskQueue; + private final Deque directoryFileTaskQueue; + private final FileIngestTaskQueue fileTaskQueueForIngestThreads; + private final List queuedAndRunningFileTasks; /** * Gets the ingest tasks scheduler singleton. @@ -70,41 +70,40 @@ final class IngestTasksScheduler { * Constructs an ingest tasks scheduler. */ private IngestTasksScheduler() { - this.activeDataSourceTasks = new ArrayList<>(); - this.dataSourceTaskQueue = new DataSourceIngestTaskQueue(); - this.rootFileTasks = new TreeSet<>(new RootDirectoryTaskComparator()); - this.directoryFileTasks = new LinkedList<>(); - this.activeFileTasks = new LinkedList<>(); - this.fileTaskQueue = new FileIngestTaskQueue(); + this.queuedAndRunningDataSourceTasks = new LinkedList<>(); + this.dataSourceTaskQueueForIngestThreads = new DataSourceIngestTaskQueue(); + this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator()); + this.directoryFileTaskQueue = new LinkedList<>(); + this.queuedAndRunningFileTasks = new LinkedList<>(); + this.fileTaskQueueForIngestThreads = new FileIngestTaskQueue(); } /** - * Gets the data source level ingest tasks queue. The queue is a blocking - * queue intended for use by data source ingest threads. + * Gets the data source level ingest tasks queue. This queue is a blocking + * queue intended for use by the ingest manager's data source ingest + * threads. * * @return The queue. */ IngestTaskQueue getDataSourceIngestTaskQueue() { - return this.dataSourceTaskQueue; + return this.dataSourceTaskQueueForIngestThreads; } /** - * Gets the file level ingest tasks queue for file ingest threads. The queue - * is a blocking queue intended for use by file ingest threads. + * Gets the file level ingest tasks queue. This queue is a blocking queue + * intended for use by the ingest manager's file ingest threads. * * @return The queue. */ IngestTaskQueue getFileIngestTaskQueue() { - return this.fileTaskQueue; + return this.fileTaskQueueForIngestThreads; } /** * Schedules a data source level ingest task and file level ingest tasks for - * a data source ingest job. Either all of the files in the data source or a - * given subset of the files will be scheduled. + * a data source ingest job. * - * @param job The data source ingest job. - * @param files A subset of the files for the data source, possibly empty. + * @param job The data source ingest job. */ synchronized void scheduleIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { @@ -129,24 +128,21 @@ final class IngestTasksScheduler { synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { if (!job.isCancelled()) { DataSourceIngestTask task = new DataSourceIngestTask(job); - this.activeDataSourceTasks.add(task); + this.queuedAndRunningDataSourceTasks.add(task); try { - this.dataSourceTaskQueue.add(task); + this.dataSourceTaskQueueForIngestThreads.add(task); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); - this.activeDataSourceTasks.remove(task); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while a data source ingest thread was blocked on a full queue", ex); + this.queuedAndRunningDataSourceTasks.remove(task); Thread.currentThread().interrupt(); } } } /** - * Schedules file level ingest tasks for a data source ingest job. Either - * all of the files in the data source or a given subset of the files will - * be scheduled. + * Schedules file level ingest tasks for a data source ingest job. * - * @param job The data source ingest job. - * @param files A subset of the files for the data source, possibly empty. + * @param job The data source ingest job. */ synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { @@ -154,7 +150,7 @@ final class IngestTasksScheduler { for (AbstractFile file : candidateFiles) { FileIngestTask task = new FileIngestTask(job, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.rootFileTasks.add(task); + this.rootFileTaskQueue.add(task); } } shuffleFileTaskQueues(); @@ -162,7 +158,7 @@ final class IngestTasksScheduler { } /** - * Schedules file level ingest tasks for a subset of the files in a data + * Schedules file level ingest tasks for a subset of the files for a data * source ingest job. * * @param job The data source ingest job. @@ -170,22 +166,27 @@ final class IngestTasksScheduler { */ synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { - final Deque newTasksForIngestThreads = new LinkedList<>(); + List newTasks = new LinkedList<>(); for (AbstractFile file : files) { /* - * The file will be added directly to the front of the queue for - * the ingest threads. + * Put the file directly into the queue for the file ingest + * threads, if it passes the filter for the job. The file is + * added to the queue for the ingest threads BEFORE the other + * queued tasks because the primary use case for this method is + * adding derived files from a higher priority task that + * preceded the tasks currently in the queue. */ FileIngestTask task = new FileIngestTask(job, file); if (shouldEnqueueFileTask(task)) { - this.activeFileTasks.addLast(task); // RJCTODO: CHeck this in other method - newTasksForIngestThreads.addLast(task); + newTasks.add(task); } /* - * Add the children of the file, if any, either to the front of - * the queue for the ingest threads, in front of the parent - * directory task, or to the end directory task queue. + * If the file or directory that was just queued has children, + * try to queue tasks for the children. Each child task will go + * into either the directory queue if it is a directory, or + * directly into the queue for the file ingest threads, if it + * passes the filter for the job. */ try { for (Content child : file.getChildren()) { @@ -193,10 +194,9 @@ final class IngestTasksScheduler { AbstractFile childFile = (AbstractFile) child; FileIngestTask childTask = new FileIngestTask(job, childFile); if (childFile.hasChildren()) { - this.directoryFileTasks.add(childTask); + this.directoryFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - this.activeFileTasks.addLast(childTask); - newTasksForIngestThreads.addFirst(childTask); + newTasks.add(task); } } } @@ -206,15 +206,18 @@ final class IngestTasksScheduler { } /* - * Add the newly active tasks into the queue for the file ingest - * threads, AFTER the higher priority tasks that are already queued. + * The files are added to the queue for the ingest threads BEFORE + * the other queued tasks because the primary use case for this + * method is adding derived files from a higher priority task that + * preceded the tasks currently in the queue. */ - for (FileIngestTask newTask : newTasksForIngestThreads) { + for (FileIngestTask newTask : newTasks) { try { - this.fileTaskQueue.tasks.putLast(newTask); + this.queuedAndRunningFileTasks.add(newTask); + this.fileTaskQueueForIngestThreads.addFirst(newTask); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); // RJCTODO: Should this propagate? Correct message - this.activeFileTasks.remove(newTask); + this.queuedAndRunningFileTasks.remove(newTask); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); Thread.currentThread().interrupt(); break; } @@ -229,46 +232,45 @@ final class IngestTasksScheduler { * @param task The completed task. */ synchronized void notifyTaskCompleted(DataSourceIngestTask task) { - this.activeDataSourceTasks.remove(task); - shuffleFileTaskQueues(); + this.queuedAndRunningDataSourceTasks.remove(task); } /** * Allows an ingest thread to notify this ingest task scheduler that a file * level task has been completed. * - * @param task + * @param task The completed task. */ synchronized void notifyTaskCompleted(FileIngestTask task) { - this.activeFileTasks.remove(task); + this.queuedAndRunningFileTasks.remove(task); shuffleFileTaskQueues(); } /** * Queries the task scheduler to determine whether or not all of the ingest - * tasks for an ingest job have been completed. + * tasks for a data source ingest job have been completed. * - * @param job The data source ingest job + * @param job The data source ingest job. * * @return True or false. */ synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { - return !hasTasksForJob(this.activeDataSourceTasks, job) - && !hasTasksForJob(this.rootFileTasks, job) - && !hasTasksForJob(this.directoryFileTasks, job) - && !hasTasksForJob(this.activeFileTasks, job); + return !hasTasksForJob(this.queuedAndRunningDataSourceTasks, job) + && !hasTasksForJob(this.rootFileTaskQueue, job) + && !hasTasksForJob(this.directoryFileTaskQueue, job) + && !hasTasksForJob(this.queuedAndRunningFileTasks, job); } /** * Clears the "upstream" task scheduling queues for a data source ingest - * job, but does nothing about tasks that have already been activated, i.e., - * moved into the queue that is consumed by the file ingest threads. + * job, but does nothing about tasks that have already been moved into the + * queue that is consumed by the file ingest threads. * - * @param job The data source ingest job + * @param job The data source ingest job. */ synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { - this.removeTasksForJob(this.rootFileTasks, job); - this.removeTasksForJob(this.directoryFileTasks, job); + this.removeTasksForJob(this.rootFileTaskQueue, job); + this.removeTasksForJob(this.directoryFileTaskQueue, job); } /** @@ -276,8 +278,9 @@ final class IngestTasksScheduler { * files and virtual directories for a data source. Used to create file * tasks to put into the root directories queue. * - * @param dataSource The data source. - * @param topLevelFiles The top level files are added to this list. + * @param dataSource The data source. + * + * @return The top level files. */ private static List getTopLevelFiles(Content dataSource) { List topLevelFiles = new ArrayList<>(); @@ -312,42 +315,53 @@ final class IngestTasksScheduler { } /** - * Intelligently queues file ingest tasks for the ingest manager's file - * ingest threads by "shuffling" them through a sequence of queues that - * allows for the interleaving of tasks from different data source ingest - * jobs based on priority. The sequence of queues is: + * Schedules file ingest tasks for the ingest manager's file ingest threads + * by "shuffling" them through a sequence of three queues that allows for + * the interleaving of tasks from different data source ingest jobs based on + * priority. The sequence of queues is: * - * 1. The root file tasks priority queue, which contains the tasks for the - * roots of data source content sub trees that are being analyzed for the - * data source ingest jobs. For example, typical root tasks for a disk image - * data source would be the tasks for the contents of the root directories - * of the file systems. This queue is a priority queue that attempts to - * ensure that user directory content is analyzed before general file system - * content. It feeds into the directory tasks queue. + * 1. The root file tasks priority queue, which contains file tasks for the + * root objects of the data sources that are being analyzed. For example, + * the root tasks for a disk image data source are typically the tasks for + * the contents of the root directories of the file systems. This queue is a + * priority queue that attempts to ensure that user directory content is + * analyzed before general file system content. It feeds into the directory + * tasks queue. * - * 2. The directory file tasks queue, which contains directory tasks - * discovered in the descent through the content sub trees that are being - * analyzed for the data source ingest jobs. It feeds into the active tasks - * queue. + * 2. The directory file tasks queue, which contains root file tasks + * shuffled out of the root tasks queue, plus directory tasks discovered in + * the descent from the root tasks to the final leaf tasks in the content + * trees that are being analyzed for the data source ingest jobs. This queue + * is a FIFO queue. It feeds into the file tasks queue for the ingest + * manager's file ingest threads. * - * 3. The active file tasks queue, a queue of the file tasks that are either - * in the tasks queue for the file ingest threads or are in the process of - * being analyzed in a file ingest thread. + * 3. The file tasks queue for the ingest manager's file ingest threads. + * This queue is a blocking deque that is FIFO during a shuffle to maintain + * task prioritization, but LIFO when adding derived files to it directly + * during ingest. The reason for the LIFO additions is to give priority + * derived files of priority files. * - * 4. The file tasks queue for the ingest manager's file ingest threads. + * There is a fourth collection of file tasks, a "tracking" list, that keeps + * track of the file tasks that are either in the tasks queue for the file + * ingest threads, or are in the process of being analyzed in a file ingest + * thread. This queue is vital to the ingest task scheduler's ability to + * determine when all of the ingest tasks for a data source ingest job have + * been completed. It is also used to drive this shuffling algorithm - + * whenever this list is empty, the two "upstream" queues are "shuffled" to + * queue more tasks for the file ingest threads. */ synchronized private void shuffleFileTaskQueues() { - final Deque newTasksForIngestThreads = new LinkedList<>(); - while (this.activeFileTasks.isEmpty()) { + List newTasks = new LinkedList<>(); + while (this.queuedAndRunningFileTasks.isEmpty()) { /* * If the directory file task queue is empty, move the highest * priority root file task, if there is one, into it. If both the - * root and the directory file task queuess are empty, there is - * nothing left to do. + * root and the directory file task queues are empty, there is + * nothing left to shuffle, so exit. */ - if (this.directoryFileTasks.isEmpty()) { - if (!this.rootFileTasks.isEmpty()) { - this.directoryFileTasks.add(this.rootFileTasks.pollFirst()); + if (this.directoryFileTaskQueue.isEmpty()) { + if (!this.rootFileTaskQueue.isEmpty()) { + this.directoryFileTaskQueue.add(this.rootFileTaskQueue.pollFirst()); } else { return; } @@ -355,21 +369,22 @@ final class IngestTasksScheduler { /* * Try to move the next task from the directory task queue into the - * active file tasks tracking queue, if it passes the filter for the - * job. + * queue for the file ingest threads, if it passes the filter for + * the job. The file is added to the queue for the ingest threads + * AFTER the higher priority tasks that preceded it. */ - final FileIngestTask directoryTask = this.directoryFileTasks.pollLast(); + final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast(); if (shouldEnqueueFileTask(directoryTask)) { - this.activeFileTasks.addFirst(directoryTask); - newTasksForIngestThreads.addFirst(directoryTask); + newTasks.add(directoryTask); } /* - * If the file or directory from the next that was just activated - * has children, try to queue tasks for the children. Each child - * will go into the directory task queue if it is a directory, or - * into the active file tasks tracking queue if it passes the filter - * for the job. + * If the directory (or root level file) that was just queued has + * children, try to queue tasks for the children. Each child task + * will go into either the directory queue if it is a directory, or + * into the queue for the file ingest threads, if it passes the + * filter for the job. The file is added to the queue for the ingest + * threads AFTER the higher priority tasks that preceded it. */ final AbstractFile directory = directoryTask.getFile(); try { @@ -378,38 +393,30 @@ final class IngestTasksScheduler { AbstractFile childFile = (AbstractFile) child; FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile); if (childFile.hasChildren()) { - this.directoryFileTasks.add(childTask); + this.directoryFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - this.activeFileTasks.add(directoryTask); - /* - * Queue the child file tasks for the ingest threads - * in front of parent directory tasks. - */ - newTasksForIngestThreads.addFirst(directoryTask); + newTasks.add(childTask); } } } } catch (TskCoreException ex) { logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS } + } - /* - * Add the newly active tasks into the queue for the file ingest - * threads, AFTER the higher priority tasks that are already queued. - */ - for (FileIngestTask newTask : newTasksForIngestThreads) { - try { - this.fileTaskQueue.tasks.putLast(newTask); - } catch (InterruptedException ex) { - /** - * The current thread was interrupted while blocked on a - * full queue. Discard the task and reset the interrupted - * flag. - */ - IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while data source ingest thread blocked on a full queue", ex); // RJCTODO: Should this propagate? Correct message - this.activeFileTasks.remove(newTask); - Thread.currentThread().interrupt(); - } + /* + * The files are added to the queue for the ingest threads AFTER the + * higher priority tasks that preceded them. + */ + for (FileIngestTask newTask : newTasks) { + try { + this.queuedAndRunningFileTasks.add(newTask); + this.fileTaskQueueForIngestThreads.addFirst(newTask); + } catch (InterruptedException ex) { + this.queuedAndRunningFileTasks.remove(newTask); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); + Thread.currentThread().interrupt(); + break; } } } @@ -674,32 +681,40 @@ final class IngestTasksScheduler { } /** - * Wraps access to pending data source ingest tasks in the interface - * required by the ingest threads. + * A blocking queue of data source ingest tasks for the ingest manager's + * data source ingest threads. */ private final class DataSourceIngestTaskQueue implements IngestTaskQueue { private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + private void add(DataSourceIngestTask task) throws InterruptedException { + this.tasks.put(task); + } + @Override public IngestTask getNextTask() throws InterruptedException { return tasks.take(); } - private void add(DataSourceIngestTask task) throws InterruptedException { - this.tasks.put(task); - } - } /** - * Wraps access to pending file ingest tasks in the interface required by - * the ingest threads. + * A blocking, LIFO queue of data source ingest tasks for the ingest + * manager's data source ingest threads. */ private final class FileIngestTaskQueue implements IngestTaskQueue { private final BlockingDeque tasks = new LinkedBlockingDeque<>(); + private void addFirst(FileIngestTask task) throws InterruptedException { + this.tasks.putFirst(task); + } + + private void addLast(FileIngestTask task) throws InterruptedException { + this.tasks.putLast(task); + } + @Override public IngestTask getNextTask() throws InterruptedException { return tasks.takeFirst(); @@ -726,11 +741,11 @@ final class IngestTasksScheduler { */ IngestJobTasksSnapshot(long jobId) { this.jobId = jobId; - this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTasks, jobId); - this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTasks, jobId); - this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.fileTaskQueue.tasks, jobId); - this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.dataSourceTaskQueue.tasks, jobId); - this.runningListSize = countTasksForJob(IngestTasksScheduler.this.activeDataSourceTasks, jobId) + countTasksForJob(IngestTasksScheduler.this.activeFileTasks, jobId); + this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId); + this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTaskQueue, jobId); + this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.fileTaskQueueForIngestThreads.tasks, jobId); + this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.dataSourceTaskQueueForIngestThreads.tasks, jobId); + this.runningListSize = countTasksForJob(IngestTasksScheduler.this.queuedAndRunningDataSourceTasks, jobId) + countTasksForJob(IngestTasksScheduler.this.queuedAndRunningFileTasks, jobId); } /** From 75d4b9ad5502f28be15e85f902c8cbbfb8fa1ac5 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 19 Mar 2018 19:46:01 -0400 Subject: [PATCH 3/3] Fix IngestTasksScheduler, IngestManager concurrency issues --- .../autopsy/ingest/IngestTasksScheduler.java | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index 2bc1769bc6..36b09035b1 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -166,7 +166,7 @@ final class IngestTasksScheduler { */ synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { - List newTasks = new LinkedList<>(); + List newTasksForFileIngestThreads = new LinkedList<>(); for (AbstractFile file : files) { /* * Put the file directly into the queue for the file ingest @@ -178,7 +178,7 @@ final class IngestTasksScheduler { */ FileIngestTask task = new FileIngestTask(job, file); if (shouldEnqueueFileTask(task)) { - newTasks.add(task); + newTasksForFileIngestThreads.add(task); } /* @@ -196,7 +196,7 @@ final class IngestTasksScheduler { if (childFile.hasChildren()) { this.directoryFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - newTasks.add(task); + newTasksForFileIngestThreads.add(task); } } } @@ -211,7 +211,7 @@ final class IngestTasksScheduler { * method is adding derived files from a higher priority task that * preceded the tasks currently in the queue. */ - for (FileIngestTask newTask : newTasks) { + for (FileIngestTask newTask : newTasksForFileIngestThreads) { try { this.queuedAndRunningFileTasks.add(newTask); this.fileTaskQueueForIngestThreads.addFirst(newTask); @@ -351,7 +351,7 @@ final class IngestTasksScheduler { * queue more tasks for the file ingest threads. */ synchronized private void shuffleFileTaskQueues() { - List newTasks = new LinkedList<>(); + List newTasksForFileIngestThreads = new LinkedList<>(); while (this.queuedAndRunningFileTasks.isEmpty()) { /* * If the directory file task queue is empty, move the highest @@ -375,7 +375,8 @@ final class IngestTasksScheduler { */ final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast(); if (shouldEnqueueFileTask(directoryTask)) { - newTasks.add(directoryTask); + newTasksForFileIngestThreads.add(directoryTask); + this.queuedAndRunningFileTasks.add(directoryTask); } /* @@ -395,7 +396,8 @@ final class IngestTasksScheduler { if (childFile.hasChildren()) { this.directoryFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - newTasks.add(childTask); + newTasksForFileIngestThreads.add(childTask); + this.queuedAndRunningFileTasks.add(childTask); } } } @@ -408,9 +410,8 @@ final class IngestTasksScheduler { * The files are added to the queue for the ingest threads AFTER the * higher priority tasks that preceded them. */ - for (FileIngestTask newTask : newTasks) { + for (FileIngestTask newTask : newTasksForFileIngestThreads) { try { - this.queuedAndRunningFileTasks.add(newTask); this.fileTaskQueueForIngestThreads.addFirst(newTask); } catch (InterruptedException ex) { this.queuedAndRunningFileTasks.remove(newTask);