diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java index a41acd1c58..ebdfb6146f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestJob.java @@ -518,7 +518,7 @@ final class DataSourceIngestJob { */ if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) { logger.log(Level.INFO, "Scheduling first stage data source and file level analysis tasks for {0} (jobId={1})", new Object[]{dataSource.getName(), this.id}); //NON-NLS - DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this, this.files); + DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this); } else if (this.hasFirstStageDataSourceIngestPipeline()) { logger.log(Level.INFO, "Scheduling first stage data source level analysis tasks for {0} (jobId={1}), no file level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); @@ -827,7 +827,7 @@ final class DataSourceIngestJob { } /** - * Adds more files from the data source for this job to the job, i.e., adds + * Adds more files from the data source for this job to the job, e.g., adds * extracted or carved files. Not currently supported for the second stage * of the job. * @@ -835,9 +835,7 @@ final class DataSourceIngestJob { */ void addFiles(List files) { if (DataSourceIngestJob.Stages.FIRST == this.stage) { - for (AbstractFile file : files) { - DataSourceIngestJob.taskScheduler.scheduleFastTrackedFileIngestTask(this, file); - } + DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, files); } else { DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 65d0162d6d..218626985a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -121,7 +121,7 @@ public class IngestManager { private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L); private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS; private final Map> startIngestJobFutures = new ConcurrentHashMap<>(); - private final Map ingestJobsById = new ConcurrentHashMap<>(); + private final Map ingestJobsById = new HashMap<>(); private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS; private final ExecutorService fileLevelIngestJobTasksExecutor; private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS; @@ -399,13 +399,17 @@ public class IngestManager { ingestMonitor.start(); } - ingestJobsById.put(job.getId(), job); + synchronized (ingestJobsById) { + ingestJobsById.put(job.getId(), job); + } errors = job.start(); if (errors.isEmpty()) { this.fireIngestJobStarted(job.getId()); IngestManager.logger.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS } else { - this.ingestJobsById.remove(job.getId()); + synchronized (ingestJobsById) { + this.ingestJobsById.remove(job.getId()); + } for (IngestModuleError error : errors) { logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS } @@ -438,7 +442,9 @@ public class IngestManager { */ void finishIngestJob(IngestJob job) { long jobId = job.getId(); - ingestJobsById.remove(jobId); + synchronized (ingestJobsById) { + ingestJobsById.remove(jobId); + } if (!job.isCancelled()) { IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS fireIngestJobCompleted(jobId); @@ -455,7 +461,9 @@ public class IngestManager { * @return True or false. */ public boolean isIngestRunning() { - return !ingestJobsById.isEmpty(); + synchronized (ingestJobsById) { + return !ingestJobsById.isEmpty(); + } } /** @@ -467,9 +475,11 @@ public class IngestManager { startIngestJobFutures.values().forEach((handle) -> { handle.cancel(true); }); - this.ingestJobsById.values().forEach((job) -> { - job.cancel(reason); - }); + synchronized (ingestJobsById) { + this.ingestJobsById.values().forEach((job) -> { + job.cancel(reason); + }); + } } /** @@ -770,9 +780,11 @@ public class IngestManager { */ List getIngestJobSnapshots() { List snapShots = new ArrayList<>(); - ingestJobsById.values().forEach((job) -> { - snapShots.addAll(job.getDataSourceIngestJobSnapshots()); - }); + synchronized (ingestJobsById) { + ingestJobsById.values().forEach((job) -> { + snapShots.addAll(job.getDataSourceIngestJobSnapshots()); + }); + } return snapShots; } @@ -808,7 +820,9 @@ public class IngestManager { public Void call() { try { if (Thread.currentThread().isInterrupted()) { - ingestJobsById.remove(job.getId()); + synchronized (ingestJobsById) { + ingestJobsById.remove(job.getId()); + } return null; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index b9cee95687..36b09035b1 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -21,12 +21,13 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; -import java.util.HashSet; +import java.util.Deque; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; @@ -40,64 +41,20 @@ import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.datamodel.TskData; /** - * Creates ingest tasks for ingest jobs, queuing the tasks in priority order for - * execution by the ingest manager's ingest threads. + * Creates ingest tasks for data source ingest jobs, queueing the tasks in + * priority order for execution by the ingest manager's ingest threads. */ final class IngestTasksScheduler { - private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); + private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static IngestTasksScheduler instance; - - /** - * Scheduling of data source ingest tasks is accomplished by putting them in - * a FIFO queue to be consumed by the ingest threads, so the queue is - * wrapped in a "dispenser" that implements the IngestTaskQueue interface - * and is exposed via a getter method. - */ - private final LinkedBlockingQueue pendingDataSourceTasks; - private final DataSourceIngestTaskQueue dataSourceTasksDispenser; - - /** - * Scheduling of file ingest tasks is accomplished by "shuffling" them - * through a sequence of internal queues that allows for the interleaving of - * tasks from different ingest jobs based on priority. These scheduling - * queues are: - * - * 1. Root directory tasks (priority queue) - * - * 2. Directory tasks (FIFO queue) - * - * 3. Pending file tasks (LIFO queue). - * - * The pending file tasks queue is LIFO to handle large numbers of files - * extracted from archive files. At least one image has been processed that - * had a folder full of archive files. The queue grew to have thousands of - * entries, as each successive archive file was expanded, so now extracted - * files get added to the front of the queue so that in such a scenario they - * would be processed before the expansion of the next archive file. - * - * Tasks in the pending file tasks queue are ready to be consumed by the - * ingest threads, so the queue is wrapped in a "dispenser" that implements - * the IngestTaskQueue interface and is exposed via a getter method. - */ - private final TreeSet rootDirectoryTasks; - private final List directoryTasks; - private final BlockingDeque pendingFileTasks; - private final FileIngestTaskQueue fileTasksDispenser; - - /** - * The ingest tasks scheduler allows ingest jobs to query it to see if there - * are any tasks in progress for the job. To make this possible, the ingest - * tasks scheduler needs to keep track not only of the tasks in its queues, - * but also of the tasks that have been handed out for processing by the - * ingest threads. Therefore all ingest tasks are added to this list when - * they are created and are not removed when an ingest thread takes an - * ingest task. Instead, the ingest thread calls back into the scheduler - * when the task is completed, at which time the task will be removed from - * this list. - */ - private final Set tasksInProgress; + private final DataSourceIngestTaskQueue dataSourceTaskQueueForIngestThreads; + private final List queuedAndRunningDataSourceTasks; + private final TreeSet rootFileTaskQueue; + private final Deque directoryFileTaskQueue; + private final FileIngestTaskQueue fileTaskQueueForIngestThreads; + private final List queuedAndRunningFileTasks; /** * Gets the ingest tasks scheduler singleton. @@ -113,52 +70,53 @@ final class IngestTasksScheduler { * Constructs an ingest tasks scheduler. */ private IngestTasksScheduler() { - this.pendingDataSourceTasks = new LinkedBlockingQueue<>(); - this.dataSourceTasksDispenser = new DataSourceIngestTaskQueue(); - this.rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); - this.directoryTasks = new ArrayList<>(); - this.pendingFileTasks = new LinkedBlockingDeque<>(); - this.fileTasksDispenser = new FileIngestTaskQueue(); - this.tasksInProgress = new HashSet<>(); + this.queuedAndRunningDataSourceTasks = new LinkedList<>(); + this.dataSourceTaskQueueForIngestThreads = new DataSourceIngestTaskQueue(); + this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator()); + this.directoryFileTaskQueue = new LinkedList<>(); + this.queuedAndRunningFileTasks = new LinkedList<>(); + this.fileTaskQueueForIngestThreads = new FileIngestTaskQueue(); } /** - * Gets this ingest task scheduler's implementation of the IngestTaskQueue - * interface for data source ingest tasks. + * Gets the data source level ingest tasks queue. This queue is a blocking + * queue intended for use by the ingest manager's data source ingest + * threads. * - * @return The data source ingest tasks queue. + * @return The queue. */ IngestTaskQueue getDataSourceIngestTaskQueue() { - return this.dataSourceTasksDispenser; + return this.dataSourceTaskQueueForIngestThreads; } /** - * Gets this ingest task scheduler's implementation of the IngestTaskQueue - * interface for file ingest tasks. + * Gets the file level ingest tasks queue. This queue is a blocking queue + * intended for use by the ingest manager's file ingest threads. * - * @return The file ingest tasks queue. + * @return The queue. */ IngestTaskQueue getFileIngestTaskQueue() { - return this.fileTasksDispenser; + return this.fileTaskQueueForIngestThreads; } /** * Schedules a data source level ingest task and file level ingest tasks for - * an ingest job. Either all of the files in the data source or a given - * subset of the files will be scheduled. + * a data source ingest job. * - * @param job The data source ingest job. - * @param files A subset of the files for the data source. + * @param job The data source ingest job. */ - synchronized void scheduleIngestTasks(DataSourceIngestJob job, List files) { + synchronized void scheduleIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { - // Scheduling of both a data source ingest task and file ingest tasks - // for a job must be an atomic operation. Otherwise, the data source - // task might be completed before the file tasks are scheduled, - // resulting in a potential false positive when another thread checks - // whether or not all the tasks for the job are completed. + /* + * Scheduling of both the data source ingest task and the initial + * file ingest tasks for a job must be an atomic operation. + * Otherwise, the data source task might be completed before the + * file tasks are scheduled, resulting in a potential false positive + * when another thread checks whether or not all the tasks for the + * job are completed. + */ this.scheduleDataSourceIngestTask(job); - this.scheduleFileIngestTasks(job, files); + this.scheduleFileIngestTasks(job); } } @@ -170,41 +128,29 @@ final class IngestTasksScheduler { synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { if (!job.isCancelled()) { DataSourceIngestTask task = new DataSourceIngestTask(job); - this.tasksInProgress.add(task); + this.queuedAndRunningDataSourceTasks.add(task); try { - this.pendingDataSourceTasks.put(task); + this.dataSourceTaskQueueForIngestThreads.add(task); } catch (InterruptedException ex) { - /** - * The current thread was interrupted while blocked on a full - * queue. Discard the task and reset the interrupted flag. - */ - this.tasksInProgress.remove(task); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while a data source ingest thread was blocked on a full queue", ex); + this.queuedAndRunningDataSourceTasks.remove(task); Thread.currentThread().interrupt(); } } } /** - * Schedules file level ingest tasks for a data source ingest job. Either - * all of the files in the data source or a given subset of the files will - * be scheduled. + * Schedules file level ingest tasks for a data source ingest job. * - * @param job The data source ingest job. - * @param files A subset of the files for the data source. + * @param job The data source ingest job. */ - synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, List files) { + synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) { if (!job.isCancelled()) { - List candidateFiles = new ArrayList<>(); - if (files.isEmpty()) { - getTopLevelFiles(job.getDataSource(), candidateFiles); - } else { - candidateFiles.addAll(files); - } - for (AbstractFile firstLevelFile : candidateFiles) { - FileIngestTask task = new FileIngestTask(job, firstLevelFile); + List candidateFiles = getTopLevelFiles(job.getDataSource()); + for (AbstractFile file : candidateFiles) { + FileIngestTask task = new FileIngestTask(job, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.tasksInProgress.add(task); - this.rootDirectoryTasks.add(task); + this.rootFileTaskQueue.add(task); } } shuffleFileTaskQueues(); @@ -212,73 +158,119 @@ final class IngestTasksScheduler { } /** - * Schedules a file ingest task for a data source ingest job. The task that - * is created is added directly to the pending file tasks queues, i.e., it - * is "fast tracked." + * Schedules file level ingest tasks for a subset of the files for a data + * source ingest job. * - * @param job The data source ingest job. - * @param file A file. + * @param job The data source ingest job. + * @param files A subset of the files for the data source. */ - synchronized void scheduleFastTrackedFileIngestTask(DataSourceIngestJob job, AbstractFile file) { + synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection files) { if (!job.isCancelled()) { - FileIngestTask task = new FileIngestTask(job, file); - if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.tasksInProgress.add(task); - addToPendingFileTasksQueue(task); + List newTasksForFileIngestThreads = new LinkedList<>(); + for (AbstractFile file : files) { + /* + * Put the file directly into the queue for the file ingest + * threads, if it passes the filter for the job. The file is + * added to the queue for the ingest threads BEFORE the other + * queued tasks because the primary use case for this method is + * adding derived files from a higher priority task that + * preceded the tasks currently in the queue. + */ + FileIngestTask task = new FileIngestTask(job, file); + if (shouldEnqueueFileTask(task)) { + newTasksForFileIngestThreads.add(task); + } + + /* + * If the file or directory that was just queued has children, + * try to queue tasks for the children. Each child task will go + * into either the directory queue if it is a directory, or + * directly into the queue for the file ingest threads, if it + * passes the filter for the job. + */ + try { + for (Content child : file.getChildren()) { + if (child instanceof AbstractFile) { + AbstractFile childFile = (AbstractFile) child; + FileIngestTask childTask = new FileIngestTask(job, childFile); + if (childFile.hasChildren()) { + this.directoryFileTaskQueue.add(childTask); + } else if (shouldEnqueueFileTask(childTask)) { + newTasksForFileIngestThreads.add(task); + } + } + } + } catch (TskCoreException ex) { + logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS + } + } + + /* + * The files are added to the queue for the ingest threads BEFORE + * the other queued tasks because the primary use case for this + * method is adding derived files from a higher priority task that + * preceded the tasks currently in the queue. + */ + for (FileIngestTask newTask : newTasksForFileIngestThreads) { + try { + this.queuedAndRunningFileTasks.add(newTask); + this.fileTaskQueueForIngestThreads.addFirst(newTask); + } catch (InterruptedException ex) { + this.queuedAndRunningFileTasks.remove(newTask); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); + Thread.currentThread().interrupt(); + break; + } } } } /** - * Allows an ingest thread to notify this ingest task scheduler that a task - * has been completed. + * Allows an ingest thread to notify this ingest task scheduler that a data + * source level task has been completed. * * @param task The completed task. */ - synchronized void notifyTaskCompleted(IngestTask task) { - tasksInProgress.remove(task); + synchronized void notifyTaskCompleted(DataSourceIngestTask task) { + this.queuedAndRunningDataSourceTasks.remove(task); } /** - * Queries the task scheduler to determine whether or not all current ingest - * tasks for an ingest job are completed. + * Allows an ingest thread to notify this ingest task scheduler that a file + * level task has been completed. * - * @param job The job for which the query is to be performed. + * @param task The completed task. + */ + synchronized void notifyTaskCompleted(FileIngestTask task) { + this.queuedAndRunningFileTasks.remove(task); + shuffleFileTaskQueues(); + } + + /** + * Queries the task scheduler to determine whether or not all of the ingest + * tasks for a data source ingest job have been completed. + * + * @param job The data source ingest job. * * @return True or false. */ synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { - for (IngestTask task : tasksInProgress) { - if (task.getIngestJob().getId() == job.getId()) { - return false; - } - } - return true; + return !hasTasksForJob(this.queuedAndRunningDataSourceTasks, job) + && !hasTasksForJob(this.rootFileTaskQueue, job) + && !hasTasksForJob(this.directoryFileTaskQueue, job) + && !hasTasksForJob(this.queuedAndRunningFileTasks, job); } /** - * Clears the "upstream" task scheduling queues for an ingest job, but does - * nothing about tasks that have already been shuffled into the concurrently - * accessed blocking queues shared with the ingest threads. Note that tasks - * in the "downstream" queues or already taken by the ingest threads will be - * flushed out when the ingest threads call back with their task completed - * notifications. + * Clears the "upstream" task scheduling queues for a data source ingest + * job, but does nothing about tasks that have already been moved into the + * queue that is consumed by the file ingest threads. * - * @param job The job for which the tasks are to to canceled. + * @param job The data source ingest job. */ synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { - /** - * This code should not flush the blocking queues that are concurrently - * accessed by the ingest threads. This is because the "lock striping" - * and "weakly consistent" iterators of these collections make it so - * that this code could have a different view of the queues than the - * ingest threads. It does clean out the directory level tasks before - * they are exploded into file tasks. - */ - long jobId = job.getId(); - this.removeTasksForJob(this.rootDirectoryTasks, jobId); - this.removeTasksForJob(this.directoryTasks, jobId); - this.shuffleFileTaskQueues(); + this.removeTasksForJob(this.rootFileTaskQueue, job); + this.removeTasksForJob(this.directoryFileTaskQueue, job); } /** @@ -286,10 +278,12 @@ final class IngestTasksScheduler { * files and virtual directories for a data source. Used to create file * tasks to put into the root directories queue. * - * @param dataSource The data source. - * @param topLevelFiles The top level files are added to this list. + * @param dataSource The data source. + * + * @return The top level files. */ - private static void getTopLevelFiles(Content dataSource, List topLevelFiles) { + private static List getTopLevelFiles(Content dataSource) { + List topLevelFiles = new ArrayList<>(); Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { // The data source is itself a file to be processed. @@ -317,74 +311,113 @@ final class IngestTasksScheduler { } } } + return topLevelFiles; } /** - * "Shuffles" the file task queues to ensure that there is at least one task - * in the pending file ingest tasks queue, as long as there are still file - * ingest tasks to be performed. + * Schedules file ingest tasks for the ingest manager's file ingest threads + * by "shuffling" them through a sequence of three queues that allows for + * the interleaving of tasks from different data source ingest jobs based on + * priority. The sequence of queues is: + * + * 1. The root file tasks priority queue, which contains file tasks for the + * root objects of the data sources that are being analyzed. For example, + * the root tasks for a disk image data source are typically the tasks for + * the contents of the root directories of the file systems. This queue is a + * priority queue that attempts to ensure that user directory content is + * analyzed before general file system content. It feeds into the directory + * tasks queue. + * + * 2. The directory file tasks queue, which contains root file tasks + * shuffled out of the root tasks queue, plus directory tasks discovered in + * the descent from the root tasks to the final leaf tasks in the content + * trees that are being analyzed for the data source ingest jobs. This queue + * is a FIFO queue. It feeds into the file tasks queue for the ingest + * manager's file ingest threads. + * + * 3. The file tasks queue for the ingest manager's file ingest threads. + * This queue is a blocking deque that is FIFO during a shuffle to maintain + * task prioritization, but LIFO when adding derived files to it directly + * during ingest. The reason for the LIFO additions is to give priority + * derived files of priority files. + * + * There is a fourth collection of file tasks, a "tracking" list, that keeps + * track of the file tasks that are either in the tasks queue for the file + * ingest threads, or are in the process of being analyzed in a file ingest + * thread. This queue is vital to the ingest task scheduler's ability to + * determine when all of the ingest tasks for a data source ingest job have + * been completed. It is also used to drive this shuffling algorithm - + * whenever this list is empty, the two "upstream" queues are "shuffled" to + * queue more tasks for the file ingest threads. */ synchronized private void shuffleFileTaskQueues() { - // This is synchronized because it is called both by synchronized - // methods of this ingest scheduler and an unsynchronized method of its - // file tasks "dispenser". - while (true) { - // Loop until either the pending file tasks queue is NOT empty - // or the upstream queues that feed into it ARE empty. - if (!this.pendingFileTasks.isEmpty()) { - // There are file tasks ready to be consumed, exit. - return; - } - if (this.directoryTasks.isEmpty()) { - if (this.rootDirectoryTasks.isEmpty()) { - // There are no root directory tasks to move into the - // directory queue, exit. - return; + List newTasksForFileIngestThreads = new LinkedList<>(); + while (this.queuedAndRunningFileTasks.isEmpty()) { + /* + * If the directory file task queue is empty, move the highest + * priority root file task, if there is one, into it. If both the + * root and the directory file task queues are empty, there is + * nothing left to shuffle, so exit. + */ + if (this.directoryFileTaskQueue.isEmpty()) { + if (!this.rootFileTaskQueue.isEmpty()) { + this.directoryFileTaskQueue.add(this.rootFileTaskQueue.pollFirst()); } else { - // Move the next root directory task into the - // directories queue. Note that the task was already - // added to the tasks in progress list when the task was - // created in scheduleFileIngestTasks(). - this.directoryTasks.add(this.rootDirectoryTasks.pollFirst()); + return; } } - // Try to add the most recently added directory from the - // directory tasks queue to the pending file tasks queue. - FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1); + /* + * Try to move the next task from the directory task queue into the + * queue for the file ingest threads, if it passes the filter for + * the job. The file is added to the queue for the ingest threads + * AFTER the higher priority tasks that preceded it. + */ + final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast(); if (shouldEnqueueFileTask(directoryTask)) { - addToPendingFileTasksQueue(directoryTask); - } else { - this.tasksInProgress.remove(directoryTask); + newTasksForFileIngestThreads.add(directoryTask); + this.queuedAndRunningFileTasks.add(directoryTask); } - // If the directory contains subdirectories or files, try to - // enqueue tasks for them as well. + /* + * If the directory (or root level file) that was just queued has + * children, try to queue tasks for the children. Each child task + * will go into either the directory queue if it is a directory, or + * into the queue for the file ingest threads, if it passes the + * filter for the job. The file is added to the queue for the ingest + * threads AFTER the higher priority tasks that preceded it. + */ final AbstractFile directory = directoryTask.getFile(); try { for (Content child : directory.getChildren()) { if (child instanceof AbstractFile) { - AbstractFile file = (AbstractFile) child; - FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file); - if (file.hasChildren()) { - // Found a subdirectory, put the task in the - // pending directory tasks queue. Note the - // addition of the task to the tasks in progress - // list. This is necessary because this is the - // first appearance of this task in the queues. - this.tasksInProgress.add(childTask); - this.directoryTasks.add(childTask); + AbstractFile childFile = (AbstractFile) child; + FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile); + if (childFile.hasChildren()) { + this.directoryFileTaskQueue.add(childTask); } else if (shouldEnqueueFileTask(childTask)) { - // Found a file, put the task directly into the - // pending file tasks queue. - this.tasksInProgress.add(childTask); - addToPendingFileTasksQueue(childTask); + newTasksForFileIngestThreads.add(childTask); + this.queuedAndRunningFileTasks.add(childTask); } } } } catch (TskCoreException ex) { - String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS - logger.log(Level.SEVERE, errorMessage, ex); + logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS + } + } + + /* + * The files are added to the queue for the ingest threads AFTER the + * higher priority tasks that preceded them. + */ + for (FileIngestTask newTask : newTasksForFileIngestThreads) { + try { + this.fileTaskQueueForIngestThreads.addFirst(newTask); + } catch (InterruptedException ex) { + this.queuedAndRunningFileTasks.remove(newTask); + IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); + Thread.currentThread().interrupt(); + break; } } } @@ -463,44 +496,44 @@ final class IngestTasksScheduler { } /** - * Adds a file ingest task to the blocking pending tasks queue. + * Checks whether or not a collection of ingest tasks includes a task for a + * given data source ingest job. * - * @param task The task to add. + * @param tasks The tasks. + * @param job The data source ingest job. + * + * @return True if there are no tasks for the job, false otherwise. */ - synchronized private void addToPendingFileTasksQueue(FileIngestTask task) { - try { - this.pendingFileTasks.putFirst(task); - } catch (InterruptedException ex) { - /** - * The current thread was interrupted while blocked on a full queue. - * Discard the task and reset the interrupted flag. - */ - this.tasksInProgress.remove(task); - Thread.currentThread().interrupt(); + synchronized private boolean hasTasksForJob(Collection tasks, DataSourceIngestJob job) { + long jobId = job.getId(); + for (IngestTask task : tasks) { + if (task.getIngestJob().getId() == jobId) { + return true; + } } + return false; } /** - * Removes all of the ingest tasks associated with an ingest job from a - * tasks queue. The task is removed from the the tasks in progress list as - * well. + * Removes all of the ingest tasks associated with a data source ingest job + * from a tasks collection. * - * @param taskQueue The queue from which to remove the tasks. - * @param jobId The id of the job for which the tasks are to be removed. + * @param tasks The collection from which to remove the tasks. + * @param job THe data source ingest job. */ - synchronized private void removeTasksForJob(Collection taskQueue, long jobId) { - Iterator iterator = taskQueue.iterator(); + synchronized private void removeTasksForJob(Collection tasks, DataSourceIngestJob job) { + long jobId = job.getId(); + Iterator iterator = tasks.iterator(); while (iterator.hasNext()) { IngestTask task = iterator.next(); if (task.getIngestJob().getId() == jobId) { - this.tasksInProgress.remove(task); iterator.remove(); } } } /** - * Counts the number of ingest tasks in a task queue for a given job. + * Counts the number of ingest tasks in a tasks collection for a given job. * * @param queue The queue for which to count tasks. * @param jobId The id of the job for which the tasks are to be counted. @@ -511,7 +544,7 @@ final class IngestTasksScheduler { Iterator iterator = queue.iterator(); int count = 0; while (iterator.hasNext()) { - IngestTask task = (IngestTask) iterator.next(); + IngestTask task = iterator.next(); if (task.getIngestJob().getId() == jobId) { count++; } @@ -549,8 +582,15 @@ final class IngestTasksScheduler { } } + /** + * Used to prioritize file ingest tasks in the root tasks queue so that + * user content is processed first. + */ private static class AbstractFilePriority { + private AbstractFilePriority() { + } + enum Priority { LAST, LOW, MEDIUM, HIGH @@ -642,28 +682,43 @@ final class IngestTasksScheduler { } /** - * Wraps access to pending data source ingest tasks in the interface - * required by the ingest threads. + * A blocking queue of data source ingest tasks for the ingest manager's + * data source ingest threads. */ private final class DataSourceIngestTaskQueue implements IngestTaskQueue { + private final BlockingQueue tasks = new LinkedBlockingQueue<>(); + + private void add(DataSourceIngestTask task) throws InterruptedException { + this.tasks.put(task); + } + @Override public IngestTask getNextTask() throws InterruptedException { - return IngestTasksScheduler.this.pendingDataSourceTasks.take(); + return tasks.take(); } + } /** - * Wraps access to pending file ingest tasks in the interface required by - * the ingest threads. + * A blocking, LIFO queue of data source ingest tasks for the ingest + * manager's data source ingest threads. */ private final class FileIngestTaskQueue implements IngestTaskQueue { + private final BlockingDeque tasks = new LinkedBlockingDeque<>(); + + private void addFirst(FileIngestTask task) throws InterruptedException { + this.tasks.putFirst(task); + } + + private void addLast(FileIngestTask task) throws InterruptedException { + this.tasks.putLast(task); + } + @Override public IngestTask getNextTask() throws InterruptedException { - FileIngestTask task = IngestTasksScheduler.this.pendingFileTasks.takeFirst(); - shuffleFileTaskQueues(); - return task; + return tasks.takeFirst(); } } @@ -674,10 +729,10 @@ final class IngestTasksScheduler { class IngestJobTasksSnapshot { private final long jobId; + private final long dsQueueSize; private final long rootQueueSize; private final long dirQueueSize; private final long fileQueueSize; - private final long dsQueueSize; private final long runningListSize; /** @@ -687,11 +742,11 @@ final class IngestTasksScheduler { */ IngestJobTasksSnapshot(long jobId) { this.jobId = jobId; - this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootDirectoryTasks, jobId); - this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId); - this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId); - this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId); - this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgress, jobId); + this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId); + this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTaskQueue, jobId); + this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.fileTaskQueueForIngestThreads.tasks, jobId); + this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.dataSourceTaskQueueForIngestThreads.tasks, jobId); + this.runningListSize = countTasksForJob(IngestTasksScheduler.this.queuedAndRunningDataSourceTasks, jobId) + countTasksForJob(IngestTasksScheduler.this.queuedAndRunningFileTasks, jobId); } /**