Merge pull request #3579 from rcordovano/2103-ingest-task-sched-concurrency

2103 ingest concurrency bug fixes
This commit is contained in:
Richard Cordovano 2018-03-19 21:56:37 -04:00 committed by GitHub
commit c988090e9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 316 additions and 249 deletions

View File

@ -518,7 +518,7 @@ final class DataSourceIngestJob {
*/
if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) {
logger.log(Level.INFO, "Scheduling first stage data source and file level analysis tasks for {0} (jobId={1})", new Object[]{dataSource.getName(), this.id}); //NON-NLS
DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this, this.files);
DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this);
} else if (this.hasFirstStageDataSourceIngestPipeline()) {
logger.log(Level.INFO, "Scheduling first stage data source level analysis tasks for {0} (jobId={1}), no file level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS
DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this);
@ -827,7 +827,7 @@ final class DataSourceIngestJob {
}
/**
* Adds more files from the data source for this job to the job, i.e., adds
* Adds more files from the data source for this job to the job, e.g., adds
* extracted or carved files. Not currently supported for the second stage
* of the job.
*
@ -835,9 +835,7 @@ final class DataSourceIngestJob {
*/
void addFiles(List<AbstractFile> files) {
if (DataSourceIngestJob.Stages.FIRST == this.stage) {
for (AbstractFile file : files) {
DataSourceIngestJob.taskScheduler.scheduleFastTrackedFileIngestTask(this, file);
}
DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, files);
} else {
DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS
}

View File

@ -121,7 +121,7 @@ public class IngestManager {
private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
private final Map<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
private final ExecutorService fileLevelIngestJobTasksExecutor;
private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
@ -399,13 +399,17 @@ public class IngestManager {
ingestMonitor.start();
}
synchronized (ingestJobsById) {
ingestJobsById.put(job.getId(), job);
}
errors = job.start();
if (errors.isEmpty()) {
this.fireIngestJobStarted(job.getId());
IngestManager.logger.log(Level.INFO, "Ingest job {0} started", job.getId()); //NON-NLS
} else {
synchronized (ingestJobsById) {
this.ingestJobsById.remove(job.getId());
}
for (IngestModuleError error : errors) {
logger.log(Level.SEVERE, String.format("Error starting %s ingest module for job %d", error.getModuleDisplayName(), job.getId()), error.getThrowable()); //NON-NLS
}
@ -438,7 +442,9 @@ public class IngestManager {
*/
void finishIngestJob(IngestJob job) {
long jobId = job.getId();
synchronized (ingestJobsById) {
ingestJobsById.remove(jobId);
}
if (!job.isCancelled()) {
IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); //NON-NLS
fireIngestJobCompleted(jobId);
@ -455,8 +461,10 @@ public class IngestManager {
* @return True or false.
*/
public boolean isIngestRunning() {
synchronized (ingestJobsById) {
return !ingestJobsById.isEmpty();
}
}
/**
* Cancels all ingest jobs in progress.
@ -467,10 +475,12 @@ public class IngestManager {
startIngestJobFutures.values().forEach((handle) -> {
handle.cancel(true);
});
synchronized (ingestJobsById) {
this.ingestJobsById.values().forEach((job) -> {
job.cancel(reason);
});
}
}
/**
* Adds an ingest job event property change listener.
@ -770,9 +780,11 @@ public class IngestManager {
*/
List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
synchronized (ingestJobsById) {
ingestJobsById.values().forEach((job) -> {
snapShots.addAll(job.getDataSourceIngestJobSnapshots());
});
}
return snapShots;
}
@ -808,7 +820,9 @@ public class IngestManager {
public Void call() {
try {
if (Thread.currentThread().isInterrupted()) {
synchronized (ingestJobsById) {
ingestJobsById.remove(job.getId());
}
return null;
}

View File

@ -21,12 +21,13 @@ package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
@ -40,64 +41,20 @@ import org.sleuthkit.datamodel.TskCoreException;
import org.sleuthkit.datamodel.TskData;
/**
* Creates ingest tasks for ingest jobs, queuing the tasks in priority order for
* execution by the ingest manager's ingest threads.
* Creates ingest tasks for data source ingest jobs, queueing the tasks in
* priority order for execution by the ingest manager's ingest threads.
*/
final class IngestTasksScheduler {
private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
private static IngestTasksScheduler instance;
/**
* Scheduling of data source ingest tasks is accomplished by putting them in
* a FIFO queue to be consumed by the ingest threads, so the queue is
* wrapped in a "dispenser" that implements the IngestTaskQueue interface
* and is exposed via a getter method.
*/
private final LinkedBlockingQueue<DataSourceIngestTask> pendingDataSourceTasks;
private final DataSourceIngestTaskQueue dataSourceTasksDispenser;
/**
* Scheduling of file ingest tasks is accomplished by "shuffling" them
* through a sequence of internal queues that allows for the interleaving of
* tasks from different ingest jobs based on priority. These scheduling
* queues are:
*
* 1. Root directory tasks (priority queue)
*
* 2. Directory tasks (FIFO queue)
*
* 3. Pending file tasks (LIFO queue).
*
* The pending file tasks queue is LIFO to handle large numbers of files
* extracted from archive files. At least one image has been processed that
* had a folder full of archive files. The queue grew to have thousands of
* entries, as each successive archive file was expanded, so now extracted
* files get added to the front of the queue so that in such a scenario they
* would be processed before the expansion of the next archive file.
*
* Tasks in the pending file tasks queue are ready to be consumed by the
* ingest threads, so the queue is wrapped in a "dispenser" that implements
* the IngestTaskQueue interface and is exposed via a getter method.
*/
private final TreeSet<FileIngestTask> rootDirectoryTasks;
private final List<FileIngestTask> directoryTasks;
private final BlockingDeque<FileIngestTask> pendingFileTasks;
private final FileIngestTaskQueue fileTasksDispenser;
/**
* The ingest tasks scheduler allows ingest jobs to query it to see if there
* are any tasks in progress for the job. To make this possible, the ingest
* tasks scheduler needs to keep track not only of the tasks in its queues,
* but also of the tasks that have been handed out for processing by the
* ingest threads. Therefore all ingest tasks are added to this list when
* they are created and are not removed when an ingest thread takes an
* ingest task. Instead, the ingest thread calls back into the scheduler
* when the task is completed, at which time the task will be removed from
* this list.
*/
private final Set<IngestTask> tasksInProgress;
private final DataSourceIngestTaskQueue dataSourceTaskQueueForIngestThreads;
private final List<DataSourceIngestTask> queuedAndRunningDataSourceTasks;
private final TreeSet<FileIngestTask> rootFileTaskQueue;
private final Deque<FileIngestTask> directoryFileTaskQueue;
private final FileIngestTaskQueue fileTaskQueueForIngestThreads;
private final List<FileIngestTask> queuedAndRunningFileTasks;
/**
* Gets the ingest tasks scheduler singleton.
@ -113,52 +70,53 @@ final class IngestTasksScheduler {
* Constructs an ingest tasks scheduler.
*/
private IngestTasksScheduler() {
this.pendingDataSourceTasks = new LinkedBlockingQueue<>();
this.dataSourceTasksDispenser = new DataSourceIngestTaskQueue();
this.rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator());
this.directoryTasks = new ArrayList<>();
this.pendingFileTasks = new LinkedBlockingDeque<>();
this.fileTasksDispenser = new FileIngestTaskQueue();
this.tasksInProgress = new HashSet<>();
this.queuedAndRunningDataSourceTasks = new LinkedList<>();
this.dataSourceTaskQueueForIngestThreads = new DataSourceIngestTaskQueue();
this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator());
this.directoryFileTaskQueue = new LinkedList<>();
this.queuedAndRunningFileTasks = new LinkedList<>();
this.fileTaskQueueForIngestThreads = new FileIngestTaskQueue();
}
/**
* Gets this ingest task scheduler's implementation of the IngestTaskQueue
* interface for data source ingest tasks.
* Gets the data source level ingest tasks queue. This queue is a blocking
* queue intended for use by the ingest manager's data source ingest
* threads.
*
* @return The data source ingest tasks queue.
* @return The queue.
*/
IngestTaskQueue getDataSourceIngestTaskQueue() {
return this.dataSourceTasksDispenser;
return this.dataSourceTaskQueueForIngestThreads;
}
/**
* Gets this ingest task scheduler's implementation of the IngestTaskQueue
* interface for file ingest tasks.
* Gets the file level ingest tasks queue. This queue is a blocking queue
* intended for use by the ingest manager's file ingest threads.
*
* @return The file ingest tasks queue.
* @return The queue.
*/
IngestTaskQueue getFileIngestTaskQueue() {
return this.fileTasksDispenser;
return this.fileTaskQueueForIngestThreads;
}
/**
* Schedules a data source level ingest task and file level ingest tasks for
* an ingest job. Either all of the files in the data source or a given
* subset of the files will be scheduled.
* a data source ingest job.
*
* @param job The data source ingest job.
* @param files A subset of the files for the data source.
*/
synchronized void scheduleIngestTasks(DataSourceIngestJob job, List<AbstractFile> files) {
synchronized void scheduleIngestTasks(DataSourceIngestJob job) {
if (!job.isCancelled()) {
// Scheduling of both a data source ingest task and file ingest tasks
// for a job must be an atomic operation. Otherwise, the data source
// task might be completed before the file tasks are scheduled,
// resulting in a potential false positive when another thread checks
// whether or not all the tasks for the job are completed.
/*
* Scheduling of both the data source ingest task and the initial
* file ingest tasks for a job must be an atomic operation.
* Otherwise, the data source task might be completed before the
* file tasks are scheduled, resulting in a potential false positive
* when another thread checks whether or not all the tasks for the
* job are completed.
*/
this.scheduleDataSourceIngestTask(job);
this.scheduleFileIngestTasks(job, files);
this.scheduleFileIngestTasks(job);
}
}
@ -170,41 +128,29 @@ final class IngestTasksScheduler {
synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) {
if (!job.isCancelled()) {
DataSourceIngestTask task = new DataSourceIngestTask(job);
this.tasksInProgress.add(task);
this.queuedAndRunningDataSourceTasks.add(task);
try {
this.pendingDataSourceTasks.put(task);
this.dataSourceTaskQueueForIngestThreads.add(task);
} catch (InterruptedException ex) {
/**
* The current thread was interrupted while blocked on a full
* queue. Discard the task and reset the interrupted flag.
*/
this.tasksInProgress.remove(task);
IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while a data source ingest thread was blocked on a full queue", ex);
this.queuedAndRunningDataSourceTasks.remove(task);
Thread.currentThread().interrupt();
}
}
}
/**
* Schedules file level ingest tasks for a data source ingest job. Either
* all of the files in the data source or a given subset of the files will
* be scheduled.
* Schedules file level ingest tasks for a data source ingest job.
*
* @param job The data source ingest job.
* @param files A subset of the files for the data source.
*/
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, List<AbstractFile> files) {
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) {
if (!job.isCancelled()) {
List<AbstractFile> candidateFiles = new ArrayList<>();
if (files.isEmpty()) {
getTopLevelFiles(job.getDataSource(), candidateFiles);
} else {
candidateFiles.addAll(files);
}
for (AbstractFile firstLevelFile : candidateFiles) {
FileIngestTask task = new FileIngestTask(job, firstLevelFile);
List<AbstractFile> candidateFiles = getTopLevelFiles(job.getDataSource());
for (AbstractFile file : candidateFiles) {
FileIngestTask task = new FileIngestTask(job, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
this.tasksInProgress.add(task);
this.rootDirectoryTasks.add(task);
this.rootFileTaskQueue.add(task);
}
}
shuffleFileTaskQueues();
@ -212,73 +158,119 @@ final class IngestTasksScheduler {
}
/**
* Schedules a file ingest task for a data source ingest job. The task that
* is created is added directly to the pending file tasks queues, i.e., it
* is "fast tracked."
* Schedules file level ingest tasks for a subset of the files for a data
* source ingest job.
*
* @param job The data source ingest job.
* @param file A file.
* @param files A subset of the files for the data source.
*/
synchronized void scheduleFastTrackedFileIngestTask(DataSourceIngestJob job, AbstractFile file) {
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
if (!job.isCancelled()) {
List<FileIngestTask> newTasksForFileIngestThreads = new LinkedList<>();
for (AbstractFile file : files) {
/*
* Put the file directly into the queue for the file ingest
* threads, if it passes the filter for the job. The file is
* added to the queue for the ingest threads BEFORE the other
* queued tasks because the primary use case for this method is
* adding derived files from a higher priority task that
* preceded the tasks currently in the queue.
*/
FileIngestTask task = new FileIngestTask(job, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
this.tasksInProgress.add(task);
addToPendingFileTasksQueue(task);
if (shouldEnqueueFileTask(task)) {
newTasksForFileIngestThreads.add(task);
}
/*
* If the file or directory that was just queued has children,
* try to queue tasks for the children. Each child task will go
* into either the directory queue if it is a directory, or
* directly into the queue for the file ingest threads, if it
* passes the filter for the job.
*/
try {
for (Content child : file.getChildren()) {
if (child instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(job, childFile);
if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) {
newTasksForFileIngestThreads.add(task);
}
}
}
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
}
}
/*
* The files are added to the queue for the ingest threads BEFORE
* the other queued tasks because the primary use case for this
* method is adding derived files from a higher priority task that
* preceded the tasks currently in the queue.
*/
for (FileIngestTask newTask : newTasksForFileIngestThreads) {
try {
this.queuedAndRunningFileTasks.add(newTask);
this.fileTaskQueueForIngestThreads.addFirst(newTask);
} catch (InterruptedException ex) {
this.queuedAndRunningFileTasks.remove(newTask);
IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex);
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* Allows an ingest thread to notify this ingest task scheduler that a task
* has been completed.
* Allows an ingest thread to notify this ingest task scheduler that a data
* source level task has been completed.
*
* @param task The completed task.
*/
synchronized void notifyTaskCompleted(IngestTask task) {
tasksInProgress.remove(task);
synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
this.queuedAndRunningDataSourceTasks.remove(task);
}
/**
* Queries the task scheduler to determine whether or not all current ingest
* tasks for an ingest job are completed.
* Allows an ingest thread to notify this ingest task scheduler that a file
* level task has been completed.
*
* @param job The job for which the query is to be performed.
* @param task The completed task.
*/
synchronized void notifyTaskCompleted(FileIngestTask task) {
this.queuedAndRunningFileTasks.remove(task);
shuffleFileTaskQueues();
}
/**
* Queries the task scheduler to determine whether or not all of the ingest
* tasks for a data source ingest job have been completed.
*
* @param job The data source ingest job.
*
* @return True or false.
*/
synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) {
for (IngestTask task : tasksInProgress) {
if (task.getIngestJob().getId() == job.getId()) {
return false;
}
}
return true;
return !hasTasksForJob(this.queuedAndRunningDataSourceTasks, job)
&& !hasTasksForJob(this.rootFileTaskQueue, job)
&& !hasTasksForJob(this.directoryFileTaskQueue, job)
&& !hasTasksForJob(this.queuedAndRunningFileTasks, job);
}
/**
* Clears the "upstream" task scheduling queues for an ingest job, but does
* nothing about tasks that have already been shuffled into the concurrently
* accessed blocking queues shared with the ingest threads. Note that tasks
* in the "downstream" queues or already taken by the ingest threads will be
* flushed out when the ingest threads call back with their task completed
* notifications.
* Clears the "upstream" task scheduling queues for a data source ingest
* job, but does nothing about tasks that have already been moved into the
* queue that is consumed by the file ingest threads.
*
* @param job The job for which the tasks are to to canceled.
* @param job The data source ingest job.
*/
synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) {
/**
* This code should not flush the blocking queues that are concurrently
* accessed by the ingest threads. This is because the "lock striping"
* and "weakly consistent" iterators of these collections make it so
* that this code could have a different view of the queues than the
* ingest threads. It does clean out the directory level tasks before
* they are exploded into file tasks.
*/
long jobId = job.getId();
this.removeTasksForJob(this.rootDirectoryTasks, jobId);
this.removeTasksForJob(this.directoryTasks, jobId);
this.shuffleFileTaskQueues();
this.removeTasksForJob(this.rootFileTaskQueue, job);
this.removeTasksForJob(this.directoryFileTaskQueue, job);
}
/**
@ -287,9 +279,11 @@ final class IngestTasksScheduler {
* tasks to put into the root directories queue.
*
* @param dataSource The data source.
* @param topLevelFiles The top level files are added to this list.
*
* @return The top level files.
*/
private static void getTopLevelFiles(Content dataSource, List<AbstractFile> topLevelFiles) {
private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
List<AbstractFile> topLevelFiles = new ArrayList<>();
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
// The data source is itself a file to be processed.
@ -317,74 +311,113 @@ final class IngestTasksScheduler {
}
}
}
return topLevelFiles;
}
/**
* "Shuffles" the file task queues to ensure that there is at least one task
* in the pending file ingest tasks queue, as long as there are still file
* ingest tasks to be performed.
* Schedules file ingest tasks for the ingest manager's file ingest threads
* by "shuffling" them through a sequence of three queues that allows for
* the interleaving of tasks from different data source ingest jobs based on
* priority. The sequence of queues is:
*
* 1. The root file tasks priority queue, which contains file tasks for the
* root objects of the data sources that are being analyzed. For example,
* the root tasks for a disk image data source are typically the tasks for
* the contents of the root directories of the file systems. This queue is a
* priority queue that attempts to ensure that user directory content is
* analyzed before general file system content. It feeds into the directory
* tasks queue.
*
* 2. The directory file tasks queue, which contains root file tasks
* shuffled out of the root tasks queue, plus directory tasks discovered in
* the descent from the root tasks to the final leaf tasks in the content
* trees that are being analyzed for the data source ingest jobs. This queue
* is a FIFO queue. It feeds into the file tasks queue for the ingest
* manager's file ingest threads.
*
* 3. The file tasks queue for the ingest manager's file ingest threads.
* This queue is a blocking deque that is FIFO during a shuffle to maintain
* task prioritization, but LIFO when adding derived files to it directly
* during ingest. The reason for the LIFO additions is to give priority
* derived files of priority files.
*
* There is a fourth collection of file tasks, a "tracking" list, that keeps
* track of the file tasks that are either in the tasks queue for the file
* ingest threads, or are in the process of being analyzed in a file ingest
* thread. This queue is vital to the ingest task scheduler's ability to
* determine when all of the ingest tasks for a data source ingest job have
* been completed. It is also used to drive this shuffling algorithm -
* whenever this list is empty, the two "upstream" queues are "shuffled" to
* queue more tasks for the file ingest threads.
*/
synchronized private void shuffleFileTaskQueues() {
// This is synchronized because it is called both by synchronized
// methods of this ingest scheduler and an unsynchronized method of its
// file tasks "dispenser".
while (true) {
// Loop until either the pending file tasks queue is NOT empty
// or the upstream queues that feed into it ARE empty.
if (!this.pendingFileTasks.isEmpty()) {
// There are file tasks ready to be consumed, exit.
return;
}
if (this.directoryTasks.isEmpty()) {
if (this.rootDirectoryTasks.isEmpty()) {
// There are no root directory tasks to move into the
// directory queue, exit.
return;
List<FileIngestTask> newTasksForFileIngestThreads = new LinkedList<>();
while (this.queuedAndRunningFileTasks.isEmpty()) {
/*
* If the directory file task queue is empty, move the highest
* priority root file task, if there is one, into it. If both the
* root and the directory file task queues are empty, there is
* nothing left to shuffle, so exit.
*/
if (this.directoryFileTaskQueue.isEmpty()) {
if (!this.rootFileTaskQueue.isEmpty()) {
this.directoryFileTaskQueue.add(this.rootFileTaskQueue.pollFirst());
} else {
// Move the next root directory task into the
// directories queue. Note that the task was already
// added to the tasks in progress list when the task was
// created in scheduleFileIngestTasks().
this.directoryTasks.add(this.rootDirectoryTasks.pollFirst());
return;
}
}
// Try to add the most recently added directory from the
// directory tasks queue to the pending file tasks queue.
FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1);
/*
* Try to move the next task from the directory task queue into the
* queue for the file ingest threads, if it passes the filter for
* the job. The file is added to the queue for the ingest threads
* AFTER the higher priority tasks that preceded it.
*/
final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast();
if (shouldEnqueueFileTask(directoryTask)) {
addToPendingFileTasksQueue(directoryTask);
} else {
this.tasksInProgress.remove(directoryTask);
newTasksForFileIngestThreads.add(directoryTask);
this.queuedAndRunningFileTasks.add(directoryTask);
}
// If the directory contains subdirectories or files, try to
// enqueue tasks for them as well.
/*
* If the directory (or root level file) that was just queued has
* children, try to queue tasks for the children. Each child task
* will go into either the directory queue if it is a directory, or
* into the queue for the file ingest threads, if it passes the
* filter for the job. The file is added to the queue for the ingest
* threads AFTER the higher priority tasks that preceded it.
*/
final AbstractFile directory = directoryTask.getFile();
try {
for (Content child : directory.getChildren()) {
if (child instanceof AbstractFile) {
AbstractFile file = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file);
if (file.hasChildren()) {
// Found a subdirectory, put the task in the
// pending directory tasks queue. Note the
// addition of the task to the tasks in progress
// list. This is necessary because this is the
// first appearance of this task in the queues.
this.tasksInProgress.add(childTask);
this.directoryTasks.add(childTask);
AbstractFile childFile = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile);
if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) {
// Found a file, put the task directly into the
// pending file tasks queue.
this.tasksInProgress.add(childTask);
addToPendingFileTasksQueue(childTask);
newTasksForFileIngestThreads.add(childTask);
this.queuedAndRunningFileTasks.add(childTask);
}
}
}
} catch (TskCoreException ex) {
String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS
logger.log(Level.SEVERE, errorMessage, ex);
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS
}
}
/*
* The files are added to the queue for the ingest threads AFTER the
* higher priority tasks that preceded them.
*/
for (FileIngestTask newTask : newTasksForFileIngestThreads) {
try {
this.fileTaskQueueForIngestThreads.addFirst(newTask);
} catch (InterruptedException ex) {
this.queuedAndRunningFileTasks.remove(newTask);
IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex);
Thread.currentThread().interrupt();
break;
}
}
}
@ -463,44 +496,44 @@ final class IngestTasksScheduler {
}
/**
* Adds a file ingest task to the blocking pending tasks queue.
* Checks whether or not a collection of ingest tasks includes a task for a
* given data source ingest job.
*
* @param task The task to add.
* @param tasks The tasks.
* @param job The data source ingest job.
*
* @return True if there are no tasks for the job, false otherwise.
*/
synchronized private void addToPendingFileTasksQueue(FileIngestTask task) {
try {
this.pendingFileTasks.putFirst(task);
} catch (InterruptedException ex) {
/**
* The current thread was interrupted while blocked on a full queue.
* Discard the task and reset the interrupted flag.
*/
this.tasksInProgress.remove(task);
Thread.currentThread().interrupt();
synchronized private boolean hasTasksForJob(Collection<? extends IngestTask> tasks, DataSourceIngestJob job) {
long jobId = job.getId();
for (IngestTask task : tasks) {
if (task.getIngestJob().getId() == jobId) {
return true;
}
}
return false;
}
/**
* Removes all of the ingest tasks associated with an ingest job from a
* tasks queue. The task is removed from the the tasks in progress list as
* well.
* Removes all of the ingest tasks associated with a data source ingest job
* from a tasks collection.
*
* @param taskQueue The queue from which to remove the tasks.
* @param jobId The id of the job for which the tasks are to be removed.
* @param tasks The collection from which to remove the tasks.
* @param job THe data source ingest job.
*/
synchronized private void removeTasksForJob(Collection<? extends IngestTask> taskQueue, long jobId) {
Iterator<? extends IngestTask> iterator = taskQueue.iterator();
synchronized private void removeTasksForJob(Collection<? extends IngestTask> tasks, DataSourceIngestJob job) {
long jobId = job.getId();
Iterator<? extends IngestTask> iterator = tasks.iterator();
while (iterator.hasNext()) {
IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) {
this.tasksInProgress.remove(task);
iterator.remove();
}
}
}
/**
* Counts the number of ingest tasks in a task queue for a given job.
* Counts the number of ingest tasks in a tasks collection for a given job.
*
* @param queue The queue for which to count tasks.
* @param jobId The id of the job for which the tasks are to be counted.
@ -511,7 +544,7 @@ final class IngestTasksScheduler {
Iterator<? extends IngestTask> iterator = queue.iterator();
int count = 0;
while (iterator.hasNext()) {
IngestTask task = (IngestTask) iterator.next();
IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) {
count++;
}
@ -549,8 +582,15 @@ final class IngestTasksScheduler {
}
}
/**
* Used to prioritize file ingest tasks in the root tasks queue so that
* user content is processed first.
*/
private static class AbstractFilePriority {
private AbstractFilePriority() {
}
enum Priority {
LAST, LOW, MEDIUM, HIGH
@ -642,28 +682,43 @@ final class IngestTasksScheduler {
}
/**
* Wraps access to pending data source ingest tasks in the interface
* required by the ingest threads.
* A blocking queue of data source ingest tasks for the ingest manager's
* data source ingest threads.
*/
private final class DataSourceIngestTaskQueue implements IngestTaskQueue {
private final BlockingQueue<DataSourceIngestTask> tasks = new LinkedBlockingQueue<>();
private void add(DataSourceIngestTask task) throws InterruptedException {
this.tasks.put(task);
}
@Override
public IngestTask getNextTask() throws InterruptedException {
return IngestTasksScheduler.this.pendingDataSourceTasks.take();
return tasks.take();
}
}
/**
* Wraps access to pending file ingest tasks in the interface required by
* the ingest threads.
* A blocking, LIFO queue of data source ingest tasks for the ingest
* manager's data source ingest threads.
*/
private final class FileIngestTaskQueue implements IngestTaskQueue {
private final BlockingDeque<FileIngestTask> tasks = new LinkedBlockingDeque<>();
private void addFirst(FileIngestTask task) throws InterruptedException {
this.tasks.putFirst(task);
}
private void addLast(FileIngestTask task) throws InterruptedException {
this.tasks.putLast(task);
}
@Override
public IngestTask getNextTask() throws InterruptedException {
FileIngestTask task = IngestTasksScheduler.this.pendingFileTasks.takeFirst();
shuffleFileTaskQueues();
return task;
return tasks.takeFirst();
}
}
@ -674,10 +729,10 @@ final class IngestTasksScheduler {
class IngestJobTasksSnapshot {
private final long jobId;
private final long dsQueueSize;
private final long rootQueueSize;
private final long dirQueueSize;
private final long fileQueueSize;
private final long dsQueueSize;
private final long runningListSize;
/**
@ -687,11 +742,11 @@ final class IngestTasksScheduler {
*/
IngestJobTasksSnapshot(long jobId) {
this.jobId = jobId;
this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootDirectoryTasks, jobId);
this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId);
this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId);
this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId);
this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgress, jobId);
this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId);
this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTaskQueue, jobId);
this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.fileTaskQueueForIngestThreads.tasks, jobId);
this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.dataSourceTaskQueueForIngestThreads.tasks, jobId);
this.runningListSize = countTasksForJob(IngestTasksScheduler.this.queuedAndRunningDataSourceTasks, jobId) + countTasksForJob(IngestTasksScheduler.this.queuedAndRunningFileTasks, jobId);
}
/**