INgest task scheduler fixes

This commit is contained in:
Richard Cordovano 2018-03-27 15:18:52 -04:00
parent f07e95a12b
commit be78b35d26
3 changed files with 104 additions and 129 deletions

View File

@ -524,11 +524,7 @@ final class DataSourceIngestJob {
DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this);
} else { } else {
logger.log(Level.INFO, "Scheduling file level analysis tasks for {0} (jobId={1}), no first stage data source level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS logger.log(Level.INFO, "Scheduling file level analysis tasks for {0} (jobId={1}), no first stage data source level analysis configured", new Object[]{dataSource.getName(), this.id}); //NON-NLS
if (this.files.isEmpty()) { DataSourceIngestJob.taskScheduler.fastTrackFileIngestTasks(this, this.files);
DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this);
} else {
DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, this.files);
}
/** /**
* No data source ingest task has been scheduled for this stage, and * No data source ingest task has been scheduled for this stage, and
@ -840,7 +836,7 @@ final class DataSourceIngestJob {
*/ */
void addFiles(List<AbstractFile> files) { void addFiles(List<AbstractFile> files) {
if (DataSourceIngestJob.Stages.FIRST == this.stage) { if (DataSourceIngestJob.Stages.FIRST == this.stage) {
DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this, files); DataSourceIngestJob.taskScheduler.fastTrackFileIngestTasks(this, files);
} else { } else {
DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS
} }

View File

@ -20,6 +20,7 @@ package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Deque; import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
@ -51,17 +52,17 @@ final class IngestTasksScheduler {
private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
@GuardedBy("IngestTasksScheduler.this") @GuardedBy("IngestTasksScheduler.this")
private static IngestTasksScheduler instance; private static IngestTasksScheduler instance;
@GuardedBy("this") private final IngestTaskTrackingQueue dataSourceIngestThreadQueue;
private final IngestTaskTrackingQueue dataSourceTaskQueue;
@GuardedBy("this") @GuardedBy("this")
private final TreeSet<FileIngestTask> rootFileTaskQueue; private final TreeSet<FileIngestTask> rootFileTaskQueue;
@GuardedBy("this") @GuardedBy("this")
private final Deque<FileIngestTask> directoryFileTaskQueue; private final Deque<FileIngestTask> pendingFileTaskQueue;
@GuardedBy("this") private final IngestTaskTrackingQueue fileIngestThreadsQueue;
private final IngestTaskTrackingQueue fileTaskQueue;
/** /**
* Gets the ingest tasks scheduler singleton. * Gets the ingest tasks scheduler singleton that creates ingest tasks for
* data source ingest jobs, queueing the tasks in priority order for
* execution by the ingest manager's ingest threads.
*/ */
synchronized static IngestTasksScheduler getInstance() { synchronized static IngestTasksScheduler getInstance() {
if (IngestTasksScheduler.instance == null) { if (IngestTasksScheduler.instance == null) {
@ -71,39 +72,40 @@ final class IngestTasksScheduler {
} }
/** /**
* Constructs an ingest tasks scheduler. * Constructs an ingest tasks scheduler that creates ingest tasks for data
* source ingest jobs, queueing the tasks in priority order for execution by
* the ingest manager's ingest threads.
*/ */
private IngestTasksScheduler() { private IngestTasksScheduler() {
this.dataSourceTaskQueue = new IngestTaskTrackingQueue(); this.dataSourceIngestThreadQueue = new IngestTaskTrackingQueue();
this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator()); this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator());
this.directoryFileTaskQueue = new LinkedList<>(); this.pendingFileTaskQueue = new LinkedList<>();
this.fileTaskQueue = new IngestTaskTrackingQueue(); this.fileIngestThreadsQueue = new IngestTaskTrackingQueue();
} }
/** /**
* Gets the data source level ingest tasks queue. This queue is a blocking * Gets the data source level ingest tasks queue. This queue is a blocking
* queue intended for use by the ingest manager's data source ingest * queue used by the ingest manager's data source level ingest thread.
* threads.
* *
* @return The queue. * @return The queue.
*/ */
BlockingIngestTaskQueue getDataSourceIngestTaskQueue() { BlockingIngestTaskQueue getDataSourceIngestTaskQueue() {
return this.dataSourceTaskQueue; return this.dataSourceIngestThreadQueue;
} }
/** /**
* Gets the file level ingest tasks queue. This queue is a blocking queue * Gets the file level ingest tasks queue. This queue is a blocking queue
* intended for use by the ingest manager's file ingest threads. * used by the ingest manager's file level ingest threads.
* *
* @return The queue. * @return The queue.
*/ */
BlockingIngestTaskQueue getFileIngestTaskQueue() { BlockingIngestTaskQueue getFileIngestTaskQueue() {
return this.fileTaskQueue; return this.fileIngestThreadsQueue;
} }
/** /**
* Schedules a data source level ingest task and file level ingest tasks for * Schedules a data source level ingest task and zero to many file level
* a data source ingest job. * ingest tasks for a data source ingest job.
* *
* @param job The data source ingest job. * @param job The data source ingest job.
*/ */
@ -118,7 +120,7 @@ final class IngestTasksScheduler {
* job are completed. * job are completed.
*/ */
this.scheduleDataSourceIngestTask(job); this.scheduleDataSourceIngestTask(job);
this.scheduleFileIngestTasks(job); this.scheduleFileIngestTasks(job, Collections.emptyList());
} }
} }
@ -127,26 +129,34 @@ final class IngestTasksScheduler {
* *
* @param job The data source ingest job. * @param job The data source ingest job.
*/ */
synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { // RJCTODO: Should this throw instead?
if (!job.isCancelled()) { if (!job.isCancelled()) {
DataSourceIngestTask task = new DataSourceIngestTask(job); DataSourceIngestTask task = new DataSourceIngestTask(job);
try { try {
this.dataSourceTaskQueue.putLast(task); this.dataSourceIngestThreadQueue.putLast(task);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling data source level ingest task (jobId={%d)", job.getId()), ex); IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
} }
/** /**
* Schedules file level ingest tasks for a data source ingest job. * Schedules file tasks for either all the files or a given subset of the
* files for a data source source ingest job.
* *
* @param job The data source ingest job. * @param job The data source ingest job.
* @param files A subset of the files for the data source; if empty, then
* file tasks for all files in the data source are scheduled.
*/ */
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job) { synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
if (!job.isCancelled()) { if (!job.isCancelled()) {
List<AbstractFile> candidateFiles = getTopLevelFiles(job.getDataSource()); Collection<AbstractFile> candidateFiles;
if (files.isEmpty()) {
candidateFiles = getTopLevelFiles(job.getDataSource());
} else {
candidateFiles = files;
}
for (AbstractFile file : candidateFiles) { for (AbstractFile file : candidateFiles) {
FileIngestTask task = new FileIngestTask(job, file); FileIngestTask task = new FileIngestTask(job, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
@ -158,60 +168,28 @@ final class IngestTasksScheduler {
} }
/** /**
* Schedules file level ingest tasks for a subset of the files for a data * Schedules file level ingest tasks for a given set of files for a data
* source ingest job. * source ingest job by adding them directly to the front of the file tasks
* queue for the ingest manager's file ingest threads.
* *
* @param job The data source ingest job. * @param job The data source ingest job.
* @param files A subset of the files for the data source. * @param files A set of files for the data source.
*/ */
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) { synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
if (!job.isCancelled()) { if (!job.isCancelled()) {
/*
* Put the files directly into the queue for the file ingest
* threads, if they pass the file filter for the job. The files are
* added to the queue for the ingest threads BEFORE the other queued
* tasks because the use case for this method is scheduling new
* carved or derived files from a higher priority task that is
* already in progress.
*/
for (AbstractFile file : files) { for (AbstractFile file : files) {
/* FileIngestTask fileTask = new FileIngestTask(job, file);
* If the current file or directory has children, try to queue if (shouldEnqueueFileTask(fileTask)) {
* tasks for the children. Each child task will go into either
* the directory queue if it is a directory, or directly into
* the queue for the file ingest threads, if it passes the
* filter for the job. The child files are added to the queue
* for the ingest threads BEFORE the other queued tasks because
* the primary use case for this method is adding derived files
* from a higher priority task that preceded the tasks currently
* in the queue.
*/
try {
for (Content child : file.getChildren()) {
if (child instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(job, childFile);
if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) {
try {
this.fileTaskQueue.putFirst(childTask);
} catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex);
Thread.currentThread().interrupt();
return;
}
}
}
}
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
}
/*
* Put the file directly into the queue for the file ingest
* threads, if it passes the filter for the job. The file is
* added to the queue for the ingest threads BEFORE the other
* queued tasks because the primary use case for this method is
* adding derived files from a higher priority task that
* preceded the tasks currently in the queue.
*/
FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueFileTask(task)) {
try { try {
this.fileTaskQueue.putFirst(task); this.fileIngestThreadsQueue.putFirst(fileTask);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex); IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -219,7 +197,6 @@ final class IngestTasksScheduler {
} }
} }
} }
this.shuffleFileTaskQueues();
} }
} }
@ -230,7 +207,7 @@ final class IngestTasksScheduler {
* @param task The completed task. * @param task The completed task.
*/ */
synchronized void notifyTaskCompleted(DataSourceIngestTask task) { synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
this.dataSourceTaskQueue.taskCompleted(task); this.dataSourceIngestThreadQueue.taskCompleted(task);
} }
/** /**
@ -240,7 +217,7 @@ final class IngestTasksScheduler {
* @param task The completed task. * @param task The completed task.
*/ */
synchronized void notifyTaskCompleted(FileIngestTask task) { synchronized void notifyTaskCompleted(FileIngestTask task) {
this.fileTaskQueue.taskCompleted(task); this.fileIngestThreadsQueue.taskCompleted(task);
shuffleFileTaskQueues(); shuffleFileTaskQueues();
} }
@ -254,10 +231,10 @@ final class IngestTasksScheduler {
*/ */
synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) {
long jobId = job.getId(); long jobId = job.getId();
return !this.dataSourceTaskQueue.hasTasksForJob(jobId) return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
&& !hasTasksForJob(this.rootFileTaskQueue, jobId) || hasTasksForJob(this.rootFileTaskQueue, jobId)
&& !hasTasksForJob(this.directoryFileTaskQueue, jobId) || hasTasksForJob(this.pendingFileTaskQueue, jobId)
&& !this.fileTaskQueue.hasTasksForJob(jobId); || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
} }
/** /**
@ -270,7 +247,7 @@ final class IngestTasksScheduler {
synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) {
long jobId = job.getId(); long jobId = job.getId();
IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId); IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId);
IngestTasksScheduler.removeTasksForJob(this.directoryFileTaskQueue, jobId); IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId);
} }
/** /**
@ -325,16 +302,18 @@ final class IngestTasksScheduler {
* root objects of the data sources that are being analyzed. For example, * root objects of the data sources that are being analyzed. For example,
* the root tasks for a disk image data source are typically the tasks for * the root tasks for a disk image data source are typically the tasks for
* the contents of the root directories of the file systems. This queue is a * the contents of the root directories of the file systems. This queue is a
* priority queue that attempts to ensure that user directory content is * priority queue that attempts to ensure that user content is analyzed
* analyzed before general file system content. It feeds into the directory * before general file system content. It feeds into the pending tasks
* tasks queue. * queue.
* *
* 2. The directory file tasks queue, which contains root file tasks * 2. The pending file tasks queue, which contains root file tasks shuffled
* shuffled out of the root tasks queue, plus directory tasks discovered in * out of the root tasks queue, plus tasks for files with children
* the descent from the root tasks to the final leaf tasks in the content * discovered in the descent from the root tasks to the final leaf tasks in
* trees that are being analyzed for the data source ingest jobs. This queue * the content trees that are being analyzed for the data source ingest
* is a FIFO queue. It feeds into the file tasks queue for the ingest * jobs. This queue is a FIFO queue that attempts to throttle the total
* manager's file ingest threads. * number of file tasks by deferring queueing tasks for the children of
* files until the queue for the file ingest threads is emptied. It feeds
* into the file tasks queue for the ingest manager's file ingest threads.
* *
* 3. The file tasks queue for the ingest manager's file ingest threads. * 3. The file tasks queue for the ingest manager's file ingest threads.
* This queue is a blocking deque that is FIFO during a shuffle to maintain * This queue is a blocking deque that is FIFO during a shuffle to maintain
@ -343,69 +322,69 @@ final class IngestTasksScheduler {
* files derived from prioritized files. * files derived from prioritized files.
*/ */
synchronized private void shuffleFileTaskQueues() { synchronized private void shuffleFileTaskQueues() {
while (this.fileTaskQueue.isEmpty()) { while (this.fileIngestThreadsQueue.isEmpty()) {
/* /*
* If the directory file task queue is empty, move the highest * If the pending file task queue is empty, move the highest
* priority root file task, if there is one, into it. If both the * priority root file task, if there is one, into it.
* root and the directory file task queues are empty, there is
* nothing left to shuffle, so exit.
*/ */
if (this.directoryFileTaskQueue.isEmpty()) { if (this.pendingFileTaskQueue.isEmpty()) {
if (!this.rootFileTaskQueue.isEmpty()) { final FileIngestTask rootTask = this.rootFileTaskQueue.pollFirst();
this.directoryFileTaskQueue.addLast(this.rootFileTaskQueue.pollFirst()); if (rootTask != null) {
} else { this.pendingFileTaskQueue.addLast(rootTask);
return;
} }
} }
/* /*
* Try to move the next task from the directory task queue into the * Try to move the next task from the pending task queue into the
* queue for the file ingest threads, if it passes the filter for * queue for the file ingest threads, if it passes the filter for
* the job. * the job.
*/ */
final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollFirst(); final FileIngestTask pendingTask = this.pendingFileTaskQueue.pollFirst();
if (shouldEnqueueFileTask(directoryTask)) { if (pendingTask == null) {
return;
}
if (shouldEnqueueFileTask(pendingTask)) {
try { try {
/* /*
* The task is added to the queue for the ingest threads * The task is added to the queue for the ingest threads
* AFTER the higher priority tasks that preceded it. * AFTER the higher priority tasks that preceded it.
*/ */
this.fileTaskQueue.putLast(directoryTask); this.fileIngestThreadsQueue.putLast(pendingTask);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; return;
} }
} }
/* /*
* If the root or directory task that was just queued for the file * If the task that was just queued for the file ingest threads has
* ingest threads has children, try to queue tasks for the children. * children, try to queue tasks for the children. Each child task
* Each child task will go into either the directory queue if it is * will go into either the directory queue if it has children of its
* a directory, or into the queue for the file ingest threads, if it * own, or into the queue for the file ingest threads, if it passes
* passes the filter for the job. * the filter for the job.
*/ */
final AbstractFile directory = directoryTask.getFile(); final AbstractFile file = pendingTask.getFile();
try { try {
for (Content child : directory.getChildren()) { for (Content child : file.getChildren()) {
if (child instanceof AbstractFile) { if (child instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) child; AbstractFile childFile = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), childFile); FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJob(), childFile);
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask); this.pendingFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) { } else if (shouldEnqueueFileTask(childTask)) {
try { try {
this.fileTaskQueue.putLast(childTask); this.fileIngestThreadsQueue.putLast(childTask);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, "Ingest cancelled while blocked on a full file ingest threads queue", ex); IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
break; return;
} }
} }
} }
} }
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", directory.getName(), directory.getId()), ex); //NON-NLS logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
} }
} }
} }
@ -831,11 +810,11 @@ final class IngestTasksScheduler {
*/ */
IngestJobTasksSnapshot(long jobId) { IngestJobTasksSnapshot(long jobId) {
this.jobId = jobId; this.jobId = jobId;
this.dsQueueSize = IngestTasksScheduler.this.dataSourceTaskQueue.countQueuedTasksForJob(jobId); this.dsQueueSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countQueuedTasksForJob(jobId);
this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId); this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootFileTaskQueue, jobId);
this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryFileTaskQueue, jobId); this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTaskQueue, jobId);
this.fileQueueSize = IngestTasksScheduler.this.fileTaskQueue.countQueuedTasksForJob(jobId);; this.fileQueueSize = IngestTasksScheduler.this.fileIngestThreadsQueue.countQueuedTasksForJob(jobId);;
this.runningListSize = IngestTasksScheduler.this.dataSourceTaskQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileTaskQueue.countRunningTasksForJob(jobId); this.runningListSize = IngestTasksScheduler.this.dataSourceIngestThreadQueue.countRunningTasksForJob(jobId) + IngestTasksScheduler.this.fileIngestThreadsQueue.countRunningTasksForJob(jobId);
} }
/** /**

BIN
Core/test/filter_test1.img Executable file

Binary file not shown.