Partial revision of IngestScheduler

This commit is contained in:
Richard Cordovano 2014-05-19 13:07:44 -04:00
parent f8d26589e0
commit b75adfa8f2
7 changed files with 221 additions and 115 deletions

View File

@ -33,37 +33,28 @@ import org.sleuthkit.datamodel.Content;
final class DataSourceIngestPipeline {
private final IngestJobContext context;
private final List<IngestModuleTemplate> moduleTemplates;
private List<DataSourceIngestModuleDecorator> modules = new ArrayList<>();
DataSourceIngestPipeline(IngestJobContext context, List<IngestModuleTemplate> moduleTemplates) {
this.context = context;
this.moduleTemplates = moduleTemplates;
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// ingest modules. Map the module class names to the module instances
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
Map<String, DataSourceIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName());
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
// Add the ingest modules to the pipeline in the order indicated by the
// data source ingest pipeline configuration, adding any additional
// modules found in the global lookup but not mentioned in the
// configuration to the end of the pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getDataSourceIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
@ -73,6 +64,21 @@ final class DataSourceIngestPipeline {
for (DataSourceIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
}
boolean isEmpty() {
return modules.isEmpty();
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : this.modules) {
try {
module.startUp(context);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
return errors;
}

View File

@ -0,0 +1,56 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2012-2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.sleuthkit.datamodel.Content;
final class DataSourceIngestTaskScheduler implements IngestTaskQueue{
private static final DataSourceIngestTaskScheduler instance = new DataSourceIngestTaskScheduler();
private final Set<Long> tasksInProgress = new HashSet<>(); // Guarded by this
private final LinkedBlockingQueue<DataSourceIngestTask> dataSourceTasks = new LinkedBlockingQueue<>();
static DataSourceIngestTaskScheduler getInstance() {
return instance;
}
private DataSourceIngestTaskScheduler() {
}
synchronized void addDataSourceTask(IngestJob job, Content dataSource) throws InterruptedException {
tasksInProgress.add(job.getId());
dataSourceTasks.put(new DataSourceIngestTask(job, dataSource));
}
@Override
public IngestTask getNextTask() throws InterruptedException {
return dataSourceTasks.take();
}
synchronized void taskIsCompleted(DataSourceIngestTask task) {
tasksInProgress.remove(task.getIngestJob().getId());
}
synchronized boolean hasIncompleteTasks(IngestJob job) {
return tasksInProgress.contains(job.getId());
}
}

View File

@ -31,37 +31,28 @@ import org.sleuthkit.datamodel.AbstractFile;
final class FileIngestPipeline {
private final IngestJobContext context;
private final List<IngestModuleTemplate> moduleTemplates;
private List<FileIngestModuleDecorator> modules = new ArrayList<>();
FileIngestPipeline(IngestJobContext context, List<IngestModuleTemplate> moduleTemplates) {
this.context = context;
this.moduleTemplates = moduleTemplates;
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
// that has an ingest module factory capable of making file ingest
// modules. Map the module class names to the module instances to allow
// the modules to be put in the sequence indicated by the ingest
// pipelines configuration.
Map<String, FileIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) {
FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName());
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
// Add the ingest modules to the pipeline in the order indicated by the
// file ingest pipeline configuration, adding any additional modules
// found in the global lookup but not mentioned in the configuration to
// the end of the pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getFileIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
@ -71,12 +62,27 @@ final class FileIngestPipeline {
for (FileIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
}
boolean isEmpty() {
return modules.isEmpty();
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : modules) {
try {
module.startUp(context);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
return errors;
}
List<IngestModuleError> process(AbstractFile file) {
List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) {
for (FileIngestModuleDecorator module : modules) {
try {
module.process(file);
} catch (Exception ex) {
@ -95,7 +101,7 @@ final class FileIngestPipeline {
List<IngestModuleError> shutDown() {
List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) {
for (FileIngestModuleDecorator module : modules) {
try {
module.shutDown();
} catch (Exception ex) {
@ -105,7 +111,7 @@ final class FileIngestPipeline {
return errors;
}
private static class FileIngestModuleDecorator implements FileIngestModule {
private static final class FileIngestModuleDecorator implements FileIngestModule {
private final FileIngestModule module;
private final String displayName;

View File

@ -46,8 +46,8 @@ final class IngestJob {
private long estimatedFilesToProcess = 0L; // Guarded by this
private long processedFiles = 0L; // Guarded by this
private DataSourceIngestPipeline dataSourceIngestPipeline;
private ProgressHandle dataSourceTasksProgress;
private ProgressHandle fileTasksProgress;
private ProgressHandle dataSourceIngestProgress;
private ProgressHandle fileIngestProgress;
private volatile boolean cancelled = false;
/**
@ -62,16 +62,19 @@ final class IngestJob {
* @throws InterruptedException
*/
static List<IngestModuleError> startIngestJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException {
List<IngestModuleError> errors = new ArrayList<>();
long jobId = nextIngestJobId.incrementAndGet();
IngestJob job = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace);
job.createIngestPipelines();
if (job.canBeStarted()) {
ingestJobsById.put(jobId, job);
List<IngestModuleError> errors = job.start();
errors = job.start();
if (errors.isEmpty()) {
IngestManager.getInstance().fireIngestJobStarted(jobId);
taskScheduler.addTasksForIngestJob(job, dataSource);
} else {
ingestJobsById.remove(jobId);
}
}
return errors;
}
@ -105,11 +108,36 @@ final class IngestJob {
return processUnallocatedSpace;
}
private void createIngestPipelines() throws InterruptedException {
IngestJobContext context = new IngestJobContext(this);
dataSourceIngestPipeline = new DataSourceIngestPipeline(context, ingestModuleTemplates);
int numberOfPipelines = IngestManager.getInstance().getNumberOfFileIngestThreads();
for (int i = 0; i < numberOfPipelines; ++i) {
fileIngestPipelines.put(new FileIngestPipeline(context, ingestModuleTemplates));
}
}
private boolean canBeStarted() {
if (!dataSourceIngestPipeline.isEmpty()) {
return true;
}
for (FileIngestPipeline pipeline : fileIngestPipelines) {
if (!pipeline.isEmpty()) {
return true;
}
}
return false;
}
private List<IngestModuleError> start() throws InterruptedException {
List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) {
startFileIngestProgressBar();
startDataSourceIngestProgressBar();
taskScheduler.addDataSourceTask(this, dataSource);
startFileIngestProgressBar();
if (!taskScheduler.addFileTasks(this, dataSource)) {
finishFileIngestProgressBar();
}
}
return errors;
}
@ -140,11 +168,11 @@ final class IngestJob {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.initialDisplayName",
dataSource.getName());
dataSourceTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
dataSourceIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (dataSourceTasksProgress != null) {
dataSourceTasksProgress.setDisplayName(
if (dataSourceIngestProgress != null) {
dataSourceIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(),
"IngestJob.progress.cancelling",
displayName));
@ -153,19 +181,26 @@ final class IngestJob {
return true;
}
});
dataSourceTasksProgress.start();
dataSourceTasksProgress.switchToIndeterminate();
dataSourceIngestProgress.start();
dataSourceIngestProgress.switchToIndeterminate();
}
private synchronized void finishDataSourceIngestProgressBar() {
if (dataSourceIngestProgress != null) {
dataSourceIngestProgress.finish();
dataSourceIngestProgress = null;
}
}
private void startFileIngestProgressBar() {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.fileIngest.displayName",
dataSource.getName());
fileTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
fileIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (fileTasksProgress != null) {
fileTasksProgress.setDisplayName(
if (fileIngestProgress != null) {
fileIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling",
displayName));
}
@ -174,30 +209,33 @@ final class IngestJob {
}
});
estimatedFilesToProcess = dataSource.accept(new GetFilesCountVisitor());
fileTasksProgress.start();
fileTasksProgress.switchToDeterminate((int) estimatedFilesToProcess);
fileIngestProgress.start();
fileIngestProgress.switchToDeterminate((int) estimatedFilesToProcess);
}
private synchronized void finishFileIngestProgressBar() {
if (fileIngestProgress != null) {
fileIngestProgress.finish();
fileIngestProgress = null;
}
}
void process(DataSourceIngestTask task) throws InterruptedException {
if (!isCancelled()) {
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceTasksProgress));
errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceIngestProgress));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
} else {
taskScheduler.removeTasksForIngestJob(id);
taskScheduler.removeQueuedTasksForIngestJob(id);
}
// taskScheduler.taskIsCompleted(task);
// Because there is only one data source task per job, it is o.k. to
// call ProgressHandle.finish() now that the data source ingest modules
// are through using the progress bar via the DataSourceIngestModuleProgress wrapper.
// Calling ProgressHandle.finish() again in finish() will be harmless.
dataSourceTasksProgress.finish();
if (taskScheduler.isLastTaskForIngestJob(task)) {
finish();
}
dataSourceIngestProgress.finish();
// if (!taskScheduler.hasFileIngestTasksForIngestJob()) {
// finish();
// }
}
void process(FileIngestTask task) throws InterruptedException {
@ -206,9 +244,9 @@ final class IngestJob {
synchronized (this) {
++processedFiles;
if (processedFiles <= estimatedFilesToProcess) {
fileTasksProgress.progress(file.getName(), (int) processedFiles);
fileIngestProgress.progress(file.getName(), (int) processedFiles);
} else {
fileTasksProgress.progress(file.getName(), (int) estimatedFilesToProcess);
fileIngestProgress.progress(file.getName(), (int) estimatedFilesToProcess);
}
}
FileIngestPipeline pipeline = fileIngestPipelines.take();
@ -219,12 +257,16 @@ final class IngestJob {
logIngestModuleErrors(errors);
}
} else {
taskScheduler.removeTasksForIngestJob(id);
taskScheduler.removeQueuedTasksForIngestJob(id);
}
// taskScheduler.taskIsCompleted(task);
if (taskScheduler.isLastTaskForIngestJob(task)) {
finish();
}
// if (!taskScheduler.hasFileIngestTasksForIngestJob()) {
// fileIngestProgress.finish();
// if (!taskScheduler.hasDataSourceTasksForIngestJob()) {
// finish();
// }
// }
}
private void finish() {
@ -236,8 +278,6 @@ final class IngestJob {
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
dataSourceTasksProgress.finish();
fileTasksProgress.finish();
ingestJobsById.remove(id);
if (!isCancelled()) {
IngestManager.getInstance().fireIngestJobCompleted(id);

View File

@ -60,7 +60,7 @@ public final class IngestJobContext {
*/
public void addFiles(List<AbstractFile> files) {
for (AbstractFile file : files) {
IngestScheduler.getInstance().addFileTaskToIngestJob(ingestJob, file);
IngestScheduler.getInstance().addFileTask(ingestJob, file);
}
}
}

View File

@ -45,7 +45,8 @@ final class IngestScheduler {
private final TreeSet<FileIngestTask> rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this
private final List<FileIngestTask> directoryTasks = new ArrayList<>(); // Guarded by this
private final LinkedBlockingQueue<FileIngestTask> fileTasks = new LinkedBlockingQueue<>(); // Guarded by this
private final List<IngestTask> tasksInProgress = new ArrayList<>(); // Guarded by this
private final List<IngestTask> dataSourceTasksInProgress = new ArrayList<>(); // Guarded by this
private final List<IngestTask> fileTasksInProgress = new ArrayList<>(); // Guarded by this
private final DataSourceIngestTaskQueue dataSourceTaskDispenser = new DataSourceIngestTaskQueue();
private final FileIngestTaskQueue fileTaskDispenser = new FileIngestTaskQueue();
@ -56,23 +57,17 @@ final class IngestScheduler {
private IngestScheduler() {
}
synchronized void addTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException {
// Enqueue a data source ingest task for the data source.
DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource);
try {
dataSourceTasks.put(task);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS
return;
void addDataSourceTask(IngestJob job, Content dataSource) throws InterruptedException {
dataSourceTasks.put(new DataSourceIngestTask(job, dataSource));
}
synchronized boolean addFileTasks(IngestJob job, Content dataSource) throws InterruptedException {
// Get the top level files of the data source.
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
List<AbstractFile> toptLevelFiles = new ArrayList<>();
List<AbstractFile> topLevelFiles = new ArrayList<>();
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
// The data source is itself a file.
toptLevelFiles.add((AbstractFile) dataSource);
topLevelFiles.add((AbstractFile) dataSource);
} else {
for (AbstractFile root : rootObjects) {
List<Content> children;
@ -81,13 +76,13 @@ final class IngestScheduler {
if (children.isEmpty()) {
// Add the root object itself, it could be an unallocated space
// file, or a child of a volume or an image.
toptLevelFiles.add(root);
topLevelFiles.add(root);
} else {
// The root object is a file system root directory, get
// the files within it.
for (Content child : children) {
if (child instanceof AbstractFile) {
toptLevelFiles.add((AbstractFile) child);
topLevelFiles.add((AbstractFile) child);
}
}
}
@ -97,18 +92,22 @@ final class IngestScheduler {
}
}
if (!topLevelFiles.isEmpty()) {
// Enqueue file ingest tasks for the top level files.
for (AbstractFile firstLevelFile : toptLevelFiles) {
for (AbstractFile firstLevelFile : topLevelFiles) {
FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile);
if (shouldEnqueueFileTask(fileTask)) {
rootDirectoryTasks.add(fileTask);
}
}
updateFileTaskQueues(null);
updateFileTaskQueues();
return true;
} else {
return false;
}
}
void addFileTaskToIngestJob(IngestJob job, AbstractFile file) {
void addFileTask(IngestJob job, AbstractFile file) {
FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueFileTask(task)) {
try {
@ -120,7 +119,7 @@ final class IngestScheduler {
}
}
synchronized void removeTasksForIngestJob(long ingestJobId) {
synchronized void removeQueuedTasksForIngestJob(long ingestJobId) {
// Remove all tasks for this ingest job that are not in progress.
Iterator<FileIngestTask> fileTasksIterator = fileTasks.iterator();
while (fileTasksIterator.hasNext()) {
@ -148,11 +147,7 @@ final class IngestScheduler {
}
}
private synchronized void updateFileTaskQueues(FileIngestTask taskInProgress) throws InterruptedException {
if (taskInProgress != null) {
tasksInProgress.add(taskInProgress);
}
private synchronized void updateFileTaskQueues() throws InterruptedException {
// we loop because we could have a directory that has all files
// that do not get enqueued
while (true) {
@ -262,7 +257,7 @@ final class IngestScheduler {
return fileTaskDispenser;
}
synchronized boolean isLastTaskForIngestJob(IngestTask completedTask) {
synchronized boolean wasLastTaskForIngestJob(IngestTask completedTask) {
tasksInProgress.remove(completedTask);
IngestJob job = completedTask.getIngestJob();
long jobId = job.getId();
@ -393,17 +388,20 @@ final class IngestScheduler {
private class DataSourceIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
return dataSourceTasks.take();
public IngestTask getNextTask() throws InterruptedException { // RJCTODO: Does this need to be synchronized?
DataSourceIngestTask task = dataSourceTasks.take();
dataSourceTasksInProgress.add(task);
return task;
}
}
private class FileIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
public IngestTask getNextTask() throws InterruptedException { // RJCTODO: Does this need to be synchronized?
FileIngestTask task = fileTasks.take();
updateFileTaskQueues(task);
fileTasksInProgress.add(task);
updateFileTaskQueues();
return task;
}
}

View File

@ -18,6 +18,6 @@
*/
package org.sleuthkit.autopsy.ingest;
interface IngestTaskQueue {
interface IngestTaskQueue { // RJCTODO: Renmae to IngestTaskScheduler
IngestTask getNextTask() throws InterruptedException;
}