Merge pull request #745 from rcordovano/ensure_ingest_jobs_finish

Ensure ingest jobs finish
This commit is contained in:
Richard Cordovano 2014-06-09 12:25:54 -04:00
commit ad5462ef03
5 changed files with 176 additions and 191 deletions

View File

@ -1,72 +0,0 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2012-2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.sleuthkit.datamodel.Content;
final class DataSourceIngestTaskScheduler implements IngestTaskQueue {
private static final DataSourceIngestTaskScheduler instance = new DataSourceIngestTaskScheduler();
private final List<DataSourceIngestTask> tasks = new ArrayList<>(); // Guarded by this
private final LinkedBlockingQueue<DataSourceIngestTask> tasksQueue = new LinkedBlockingQueue<>();
static DataSourceIngestTaskScheduler getInstance() {
return instance;
}
private DataSourceIngestTaskScheduler() {
}
synchronized void scheduleTask(IngestJob job, Content dataSource) throws InterruptedException {
DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource);
tasks.add(task);
try {
// Should not block, queue is (theoretically) unbounded.
tasksQueue.put(task);
} catch (InterruptedException ex) {
tasks.remove(task);
Logger.getLogger(DataSourceIngestTaskScheduler.class.getName()).log(Level.FINE, "Interruption of unexpected block on tasks queue", ex); //NON-NLS
throw ex;
}
}
@Override
public IngestTask getNextTask() throws InterruptedException {
return tasksQueue.take();
}
synchronized void notifyTaskCompleted(DataSourceIngestTask task) {
tasks.remove(task);
}
synchronized boolean hasIncompleteTasksForIngestJob(IngestJob job) {
long jobId = job.getId();
for (DataSourceIngestTask task : tasks) {
if (task.getIngestJob().getId() == jobId) {
return true;
}
}
return false;
}
}

View File

@ -33,16 +33,16 @@ import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
/** /**
* InjestJobs store all settings and data associated with the user selecting a * IngestJob encapsulates the settings, ingest module pipelines, and progress
* datasource and running a set of ingest modules on it. * bars that are used to process a data source when a user chooses to run a set
* of ingest modules on the data source.
*/ */
final class IngestJob { final class IngestJob {
private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); private static final Logger logger = Logger.getLogger(IngestManager.class.getName());
private static final AtomicLong nextIngestJobId = new AtomicLong(0L); private static final AtomicLong nextIngestJobId = new AtomicLong(0L);
private static final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
private static final DataSourceIngestTaskScheduler dataSourceTaskScheduler = DataSourceIngestTaskScheduler.getInstance(); private static final IngestTaskScheduler ingestTaskScheduler = IngestTaskScheduler.getInstance();
private static final FileIngestTaskScheduler fileTaskScheduler = FileIngestTaskScheduler.getInstance();
private final long id; private final long id;
private final Content dataSource; private final Content dataSource;
private final boolean processUnallocatedSpace; private final boolean processUnallocatedSpace;
@ -108,7 +108,9 @@ final class IngestJob {
/** /**
* Create the file and data source pipelines. * Create the file and data source pipelines.
* @param ingestModuleTemplates *
* @param ingestModuleTemplates Ingest module templates to use to populate
* the pipelines.
* @throws InterruptedException * @throws InterruptedException
*/ */
private void createIngestPipelines(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException { private void createIngestPipelines(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException {
@ -120,6 +122,12 @@ final class IngestJob {
} }
} }
/**
* Check the data source and file ingest pipeline queues to see if at least
* one pipeline exists.
*
* @return True or false.
*/
private boolean hasNonEmptyPipeline() { private boolean hasNonEmptyPipeline() {
if (dataSourceIngestPipeline.isEmpty() && fileIngestPipelines.peek().isEmpty()) { if (dataSourceIngestPipeline.isEmpty() && fileIngestPipelines.peek().isEmpty()) {
return false; return false;
@ -128,35 +136,29 @@ final class IngestJob {
} }
/** /**
* Start both the data source and file ingest pipelines * Start both the data source and file ingest pipelines.
* @return *
* @return A collection of ingest module start up errors, empty on success.
* @throws InterruptedException * @throws InterruptedException
*/ */
private List<IngestModuleError> start() throws InterruptedException { private List<IngestModuleError> start() throws InterruptedException {
List<IngestModuleError> errors = startUpIngestPipelines(); List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) { if (errors.isEmpty()) {
// Start the progress bars before scheduling the tasks to make sure
// the progress bar will be available as soon as the task begin to
// be processed.
if (!dataSourceIngestPipeline.isEmpty()) { if (!dataSourceIngestPipeline.isEmpty()) {
// Start the data source ingest progress bar before scheduling the
// data source task to make sure the progress bar will be available
// as soon as the task begins to be processed.
startDataSourceIngestProgressBar(); startDataSourceIngestProgressBar();
dataSourceTaskScheduler.scheduleTask(this, dataSource); ingestTaskScheduler.scheduleDataSourceIngestTask(this, dataSource);
} }
if (!fileIngestPipelines.peek().isEmpty()) { if (!fileIngestPipelines.peek().isEmpty()) {
// Start the file ingest progress bar before scheduling the file
// ingest tasks to make sure the progress bar will be available
// as soon as the tasks begin to be processed.
startFileIngestProgressBar(); startFileIngestProgressBar();
if (!fileTaskScheduler.tryScheduleTasks(this, dataSource)) { ingestTaskScheduler.scheduleFileIngestTasks(this, dataSource);
fileIngestProgress.finish();
}
} }
} }
return errors; return errors;
} }
/** /**
* Startup each of the file and data source ingest modules to collect * Startup each of the file and data source ingest modules to collect
* possible errors. * possible errors.
@ -228,10 +230,9 @@ final class IngestJob {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
} }
dataSourceTaskScheduler.notifyTaskCompleted(task); ingestTaskScheduler.notifyTaskCompleted(task);
dataSourceIngestProgress.finish(); dataSourceIngestProgress.finish();
if (!ingestTaskScheduler.hasIncompleteTasksForIngestJob(this)) {
if (!fileTaskScheduler.hasIncompleteTasksForIngestJob(this)) {
finish(); finish();
} }
} }
@ -239,6 +240,7 @@ final class IngestJob {
void process(FileIngestTask task) throws InterruptedException { void process(FileIngestTask task) throws InterruptedException {
if (!isCancelled()) { if (!isCancelled()) {
AbstractFile file = task.getFile(); AbstractFile file = task.getFile();
if (file != null) {
synchronized (this) { synchronized (this) {
++processedFiles; ++processedFiles;
if (processedFiles <= estimatedFilesToProcess) { if (processedFiles <= estimatedFilesToProcess) {
@ -255,9 +257,14 @@ final class IngestJob {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
} }
fileTaskScheduler.notifyTaskCompleted(task); }
ingestTaskScheduler.notifyTaskCompleted(task);
if (!ingestTaskScheduler.hasIncompleteTasksForIngestJob(this)) {
finish();
}
}
if (!fileTaskScheduler.hasIncompleteTasksForIngestJob(this)) { private void finish() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
while (!fileIngestPipelines.isEmpty()) { while (!fileIngestPipelines.isEmpty()) {
FileIngestPipeline pipeline = fileIngestPipelines.poll(); FileIngestPipeline pipeline = fileIngestPipelines.poll();
@ -267,13 +274,6 @@ final class IngestJob {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
fileIngestProgress.finish(); fileIngestProgress.finish();
if (!dataSourceTaskScheduler.hasIncompleteTasksForIngestJob(this)) {
finish();
}
}
}
private void finish() {
ingestJobsById.remove(id); ingestJobsById.remove(id);
if (!isCancelled()) { if (!isCancelled()) {
IngestManager.getInstance().fireIngestJobCompleted(id); IngestManager.getInstance().fireIngestJobCompleted(id);

View File

@ -63,7 +63,7 @@ public final class IngestJobContext {
public void scheduleFiles(List<AbstractFile> files) { public void scheduleFiles(List<AbstractFile> files) {
for (AbstractFile file : files) { for (AbstractFile file : files) {
try { try {
FileIngestTaskScheduler.getInstance().scheduleTask(ingestJob, file); IngestTaskScheduler.getInstance().scheduleFileIngestTask(ingestJob, file);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
// Ultimately, this method is called by ingest task execution // Ultimately, this method is called by ingest task execution
// threads running ingest module code. Handle the unexpected // threads running ingest module code. Handle the unexpected

View File

@ -124,7 +124,7 @@ public class IngestManager {
*/ */
private void startDataSourceIngestThread() { private void startDataSourceIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
Future<?> handle = dataSourceIngestThreadPool.submit(new ExecuteIngestTasksThread(DataSourceIngestTaskScheduler.getInstance())); Future<?> handle = dataSourceIngestThreadPool.submit(new ExecuteIngestTasksThread(IngestTaskScheduler.getInstance().getDataSourceIngestTaskQueue()));
dataSourceIngestThreads.put(threadId, handle); dataSourceIngestThreads.put(threadId, handle);
} }
@ -134,7 +134,7 @@ public class IngestManager {
*/ */
private void startFileIngestThread() { private void startFileIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
Future<?> handle = fileIngestThreadPool.submit(new ExecuteIngestTasksThread(FileIngestTaskScheduler.getInstance())); Future<?> handle = fileIngestThreadPool.submit(new ExecuteIngestTasksThread(IngestTaskScheduler.getInstance().getFileIngestTaskQueue()));
fileIngestThreads.put(threadId, handle); fileIngestThreads.put(threadId, handle);
} }

View File

@ -35,24 +35,40 @@ import org.sleuthkit.datamodel.FileSystem;
import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.datamodel.TskCoreException;
import org.sleuthkit.datamodel.TskData; import org.sleuthkit.datamodel.TskData;
final class FileIngestTaskScheduler implements IngestTaskQueue { final class IngestTaskScheduler {
private static final FileIngestTaskScheduler instance = new FileIngestTaskScheduler(); private static final IngestTaskScheduler instance = new IngestTaskScheduler();
private static final Logger logger = Logger.getLogger(FileIngestTaskScheduler.class.getName()); private static final Logger logger = Logger.getLogger(IngestTaskScheduler.class.getName());
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
private final List<IngestTask> fileTasks = new ArrayList<>(); // Guarded by this private final LinkedBlockingQueue<DataSourceIngestTask> dataSourceTasks = new LinkedBlockingQueue<>();
private final TreeSet<FileIngestTask> rootDirectoryTasksQueue = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this private final TreeSet<FileIngestTask> rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this
private final List<FileIngestTask> directoryTasksQueue = new ArrayList<>(); // Guarded by this private final List<FileIngestTask> directoryTasks = new ArrayList<>(); // Guarded by this
private final LinkedBlockingQueue<FileIngestTask> fileTasksQueue = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<FileIngestTask> fileTasks = new LinkedBlockingQueue<>();
private final List<IngestTask> tasksInProgress = new ArrayList<>(); // Guarded by this
private final DataSourceIngestTaskQueue dataSourceTaskDispenser = new DataSourceIngestTaskQueue();
private final FileIngestTaskQueue fileTaskDispenser = new FileIngestTaskQueue();
static FileIngestTaskScheduler getInstance() { static IngestTaskScheduler getInstance() {
return instance; return instance;
} }
private FileIngestTaskScheduler() { private IngestTaskScheduler() {
} }
boolean tryScheduleTasks(IngestJob job, Content dataSource) throws InterruptedException { synchronized void scheduleDataSourceIngestTask(IngestJob job, Content dataSource) throws InterruptedException {
DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource);
tasksInProgress.add(task);
try {
// Should not block, queue is (theoretically) unbounded.
dataSourceTasks.put(task);
} catch (InterruptedException ex) {
tasksInProgress.remove(task);
Logger.getLogger(IngestTaskScheduler.class.getName()).log(Level.FINE, "Interruption of unexpected block on tasks queue", ex); //NON-NLS
throw ex;
}
}
void scheduleFileIngestTasks(IngestJob job, Content dataSource) throws InterruptedException {
// Get the top level files of the data source. // Get the top level files of the data source.
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
List<AbstractFile> topLevelFiles = new ArrayList<>(); List<AbstractFile> topLevelFiles = new ArrayList<>();
@ -86,79 +102,84 @@ final class FileIngestTaskScheduler implements IngestTaskQueue {
// Try to enqueue file ingest tasks for the top level files. // Try to enqueue file ingest tasks for the top level files.
for (AbstractFile firstLevelFile : topLevelFiles) { for (AbstractFile firstLevelFile : topLevelFiles) {
FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile); FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile);
if (shouldEnqueueTask(fileTask)) { if (shouldEnqueueFileTask(fileTask)) {
synchronized (this) { synchronized (this) {
rootDirectoryTasksQueue.add(fileTask); rootDirectoryTasks.add(fileTask);
} }
} }
} }
updateTaskQueues(); updateFileTaskQueues();
return !fileTasks.isEmpty();
} }
void scheduleTask(IngestJob job, AbstractFile file) throws InterruptedException { void scheduleFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException {
FileIngestTask task = new FileIngestTask(job, file); FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueTask(task)) { if (shouldEnqueueFileTask(task)) {
// Direct to file tasks queue, no need to update root directory or // Direct to file tasks queue, no need to update root directory or
// directory tasks queues. // directory tasks queues.
enqueueFileTask(task); enqueueFileTask(task);
} }
} }
@Override private synchronized void updateFileTaskQueues() throws InterruptedException {
public IngestTask getNextTask() throws InterruptedException { // Loop until at least one task is added to the file tasks queue or the
FileIngestTask task = fileTasksQueue.take(); // directory task queues are empty.
updateTaskQueues(); while (true) {
return task; // First check for tasks in the file queue. If this queue is not
// empty, the update is done.
if (fileTasks.isEmpty() == false) {
return;
} }
private synchronized void updateTaskQueues() throws InterruptedException { // If the directory tasks queue is empty, move the next root
// we loop because we could have a directory that has all files // directory task, if any, into it. If both directory task queues
// that do not get enqueued // are empty and the file tasks queue is empty, the update is done.
while (true) { if (directoryTasks.isEmpty()) {
// There are files in the queue, we're done if (rootDirectoryTasks.isEmpty()) {
if (fileTasksQueue.isEmpty() == false) {
return; return;
} }
// fill in the directory queue if it is empty. directoryTasks.add(rootDirectoryTasks.pollFirst());
if (this.directoryTasksQueue.isEmpty()) {
// bail out if root is also empty -- we are done
if (rootDirectoryTasksQueue.isEmpty()) {
return;
} }
FileIngestTask rootTask = rootDirectoryTasksQueue.pollFirst();
directoryTasksQueue.add(rootTask); // Try to move a task from the directory queue to the file tasks
// queue. If the directory contains directories or files, try to
// enqueue them as well. Note that it is absolutely necesssary to
// add at least one task to the file queue for every root directory
// that was enqueued, since scheduleFileIngestTasks() returned
// true for the associated job and the job is expecting to execute
// at least one task before it calls itself done.
boolean fileTaskEnqueued = false;
FileIngestTask directoryTask = directoryTasks.remove(directoryTasks.size() - 1);
if (shouldEnqueueFileTask(directoryTask)) {
enqueueFileTask(directoryTask);
fileTaskEnqueued = true;
} }
//pop and push AbstractFile directory children if any final AbstractFile directory = directoryTask.getFile();
//add the popped and its leaf children onto cur file list
FileIngestTask parentTask = directoryTasksQueue.remove(directoryTasksQueue.size() - 1);
final AbstractFile parentFile = parentTask.getFile();
// add itself to the file list
if (shouldEnqueueTask(parentTask)) {
enqueueFileTask(parentTask);
}
// add its children to the file and directory lists
try { try {
List<Content> children = parentFile.getChildren(); List<Content> children = directory.getChildren();
for (Content c : children) { for (Content child : children) {
if (c instanceof AbstractFile) { if (child instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) c; AbstractFile file = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(parentTask.getIngestJob(), childFile); FileIngestTask fileTask = new FileIngestTask(directoryTask.getIngestJob(), file);
if (childFile.hasChildren()) { if (file.hasChildren()) {
directoryTasksQueue.add(childTask); directoryTasks.add(fileTask);
} else if (shouldEnqueueTask(childTask)) { fileTaskEnqueued = true;
enqueueFileTask(childTask); } else if (shouldEnqueueFileTask(fileTask)) {
enqueueFileTask(fileTask);
fileTaskEnqueued = true;
} }
} }
} }
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Could not get children of file and update file queues: " + parentFile.getName(), ex); //NON-NLS String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS
logger.log(Level.SEVERE, errorMessage, ex);
}
if (!fileTaskEnqueued) {
enqueueFileTask(new FileIngestTask(directoryTask.getIngestJob(), null));
} }
} }
} }
private static boolean shouldEnqueueTask(final FileIngestTask processTask) { private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) {
final AbstractFile aFile = processTask.getFile(); final AbstractFile aFile = processTask.getFile();
//if it's unalloc file, skip if so scheduled //if it's unalloc file, skip if so scheduled
if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) {
@ -204,34 +225,44 @@ final class FileIngestTaskScheduler implements IngestTaskQueue {
} }
private synchronized void enqueueFileTask(FileIngestTask task) throws InterruptedException { private synchronized void enqueueFileTask(FileIngestTask task) throws InterruptedException {
fileTasks.add(task); tasksInProgress.add(task);
try { try {
// Should not block, queue is (theoretically) unbounded. // Should not block, queue is (theoretically) unbounded.
fileTasksQueue.put(task); fileTasks.put(task);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
fileTasks.remove(task); tasksInProgress.remove(task);
Logger.getLogger(DataSourceIngestTaskScheduler.class.getName()).log(Level.FINE, "Interruption of unexpected block on tasks queue", ex); //NON-NLS Logger.getLogger(IngestTaskScheduler.class.getName()).log(Level.FINE, "Interruption of unexpected block on tasks queue", ex); //NON-NLS
throw ex; throw ex;
} }
} }
synchronized void notifyTaskCompleted(FileIngestTask task) { synchronized void notifyTaskCompleted(IngestTask task) {
fileTasks.remove(task); tasksInProgress.remove(task);
} }
synchronized boolean hasIncompleteTasksForIngestJob(IngestJob job) { synchronized boolean hasIncompleteTasksForIngestJob(IngestJob job) {
long jobId = job.getId(); long jobId = job.getId();
for (IngestTask task : tasksInProgress) {
if (task.getIngestJob().getId() == jobId) {
return true;
}
}
for (IngestTask task : fileTasks) { for (IngestTask task : fileTasks) {
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJob().getId() == jobId) {
return true; return true;
} }
} }
for (FileIngestTask task : directoryTasksQueue) { for (IngestTask task : directoryTasks) {
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJob().getId() == jobId) {
return true; return true;
} }
} }
for (FileIngestTask task : rootDirectoryTasksQueue) { for (IngestTask task : rootDirectoryTasks) {
if (task.getIngestJob().getId() == jobId) {
return true;
}
}
for (IngestTask task : dataSourceTasks) {
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJob().getId() == jobId) {
return true; return true;
} }
@ -334,4 +365,30 @@ final class FileIngestTaskScheduler implements IngestTaskQueue {
} }
} }
} }
IngestTaskQueue getDataSourceIngestTaskQueue() {
return this.dataSourceTaskDispenser;
}
IngestTaskQueue getFileIngestTaskQueue() {
return this.fileTaskDispenser;
}
private final class DataSourceIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
return dataSourceTasks.take();
}
}
private final class FileIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
FileIngestTask task = fileTasks.take();
updateFileTaskQueues();
return task;
}
}
} }