diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index c3411f88da..e4865f540d 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -33,37 +33,28 @@ import org.sleuthkit.datamodel.Content; final class DataSourceIngestPipeline { private final IngestJobContext context; - private final List moduleTemplates; private List modules = new ArrayList<>(); DataSourceIngestPipeline(IngestJobContext context, List moduleTemplates) { this.context = context; - this.moduleTemplates = moduleTemplates; - } - List startUp() { - List errors = new ArrayList<>(); // Create an ingest module instance from each ingest module template // that has an ingest module factory capable of making data source - // ingest modules. Map the module class names to the module instance + // ingest modules. Map the module class names to the module instances // to allow the modules to be put in the sequence indicated by the // ingest pipelines configuration. Map modulesByClass = new HashMap<>(); for (IngestModuleTemplate template : moduleTemplates) { if (template.isDataSourceIngestModuleTemplate()) { DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName()); - try { - module.startUp(context); - modulesByClass.put(module.getClassName(), module); - } catch (Exception ex) { - errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } + modulesByClass.put(module.getClassName(), module); } } - // Establish the module sequence of the core ingest modules - // indicated by the ingest pipeline configuration, adding any - // additional modules found in the global lookup to the end of the - // pipeline in arbitrary order. + + // Add the ingest modules to the pipeline in the order indicated by the + // data source ingest pipeline configuration, adding any additional + // modules found in the global lookup but not mentioned in the + // configuration to the end of the pipeline in arbitrary order. List pipelineConfig = IngestPipelinesConfiguration.getInstance().getDataSourceIngestPipelineConfig(); for (String moduleClassName : pipelineConfig) { if (modulesByClass.containsKey(moduleClassName)) { @@ -73,6 +64,21 @@ final class DataSourceIngestPipeline { for (DataSourceIngestModuleDecorator module : modulesByClass.values()) { modules.add(module); } + } + + boolean isEmpty() { + return modules.isEmpty(); + } + + List startUp() { + List errors = new ArrayList<>(); + for (DataSourceIngestModuleDecorator module : this.modules) { + try { + module.startUp(context); + } catch (Exception ex) { + errors.add(new IngestModuleError(module.getDisplayName(), ex)); + } + } return errors; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTaskScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTaskScheduler.java new file mode 100755 index 0000000000..c092d982c6 --- /dev/null +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTaskScheduler.java @@ -0,0 +1,56 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2012-2014 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.ingest; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import org.sleuthkit.datamodel.Content; + +final class DataSourceIngestTaskScheduler implements IngestTaskQueue{ + + private static final DataSourceIngestTaskScheduler instance = new DataSourceIngestTaskScheduler(); + private final Set tasksInProgress = new HashSet<>(); // Guarded by this + private final LinkedBlockingQueue dataSourceTasks = new LinkedBlockingQueue<>(); + + static DataSourceIngestTaskScheduler getInstance() { + return instance; + } + + private DataSourceIngestTaskScheduler() { + } + + synchronized void addDataSourceTask(IngestJob job, Content dataSource) throws InterruptedException { + tasksInProgress.add(job.getId()); + dataSourceTasks.put(new DataSourceIngestTask(job, dataSource)); + } + + @Override + public IngestTask getNextTask() throws InterruptedException { + return dataSourceTasks.take(); + } + + synchronized void taskIsCompleted(DataSourceIngestTask task) { + tasksInProgress.remove(task.getIngestJob().getId()); + } + + synchronized boolean hasIncompleteTasks(IngestJob job) { + return tasksInProgress.contains(job.getId()); + } +} diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java index f9715fdb3f..51990b722a 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java @@ -31,37 +31,28 @@ import org.sleuthkit.datamodel.AbstractFile; final class FileIngestPipeline { private final IngestJobContext context; - private final List moduleTemplates; private List modules = new ArrayList<>(); FileIngestPipeline(IngestJobContext context, List moduleTemplates) { this.context = context; - this.moduleTemplates = moduleTemplates; - } - List startUp() { - List errors = new ArrayList<>(); // Create an ingest module instance from each ingest module template - // that has an ingest module factory capable of making data source - // ingest modules. Map the module class names to the module instance - // to allow the modules to be put in the sequence indicated by the - // ingest pipelines configuration. + // that has an ingest module factory capable of making file ingest + // modules. Map the module class names to the module instances to allow + // the modules to be put in the sequence indicated by the ingest + // pipelines configuration. Map modulesByClass = new HashMap<>(); for (IngestModuleTemplate template : moduleTemplates) { if (template.isFileIngestModuleTemplate()) { FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName()); - try { - module.startUp(context); - modulesByClass.put(module.getClassName(), module); - } catch (Exception ex) { - errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } + modulesByClass.put(module.getClassName(), module); } } - // Establish the module sequence of the core ingest modules - // indicated by the ingest pipeline configuration, adding any - // additional modules found in the global lookup to the end of the - // pipeline in arbitrary order. + + // Add the ingest modules to the pipeline in the order indicated by the + // file ingest pipeline configuration, adding any additional modules + // found in the global lookup but not mentioned in the configuration to + // the end of the pipeline in arbitrary order. List pipelineConfig = IngestPipelinesConfiguration.getInstance().getFileIngestPipelineConfig(); for (String moduleClassName : pipelineConfig) { if (modulesByClass.containsKey(moduleClassName)) { @@ -71,12 +62,27 @@ final class FileIngestPipeline { for (FileIngestModuleDecorator module : modulesByClass.values()) { modules.add(module); } + } + + boolean isEmpty() { + return modules.isEmpty(); + } + + List startUp() { + List errors = new ArrayList<>(); + for (FileIngestModuleDecorator module : modules) { + try { + module.startUp(context); + } catch (Exception ex) { + errors.add(new IngestModuleError(module.getDisplayName(), ex)); + } + } return errors; } List process(AbstractFile file) { List errors = new ArrayList<>(); - for (FileIngestModuleDecorator module : this.modules) { + for (FileIngestModuleDecorator module : modules) { try { module.process(file); } catch (Exception ex) { @@ -95,7 +101,7 @@ final class FileIngestPipeline { List shutDown() { List errors = new ArrayList<>(); - for (FileIngestModuleDecorator module : this.modules) { + for (FileIngestModuleDecorator module : modules) { try { module.shutDown(); } catch (Exception ex) { @@ -105,7 +111,7 @@ final class FileIngestPipeline { return errors; } - private static class FileIngestModuleDecorator implements FileIngestModule { + private static final class FileIngestModuleDecorator implements FileIngestModule { private final FileIngestModule module; private final String displayName; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index ae2bd50bc1..5ea4e74773 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -46,8 +46,8 @@ final class IngestJob { private long estimatedFilesToProcess = 0L; // Guarded by this private long processedFiles = 0L; // Guarded by this private DataSourceIngestPipeline dataSourceIngestPipeline; - private ProgressHandle dataSourceTasksProgress; - private ProgressHandle fileTasksProgress; + private ProgressHandle dataSourceIngestProgress; + private ProgressHandle fileIngestProgress; private volatile boolean cancelled = false; /** @@ -62,15 +62,18 @@ final class IngestJob { * @throws InterruptedException */ static List startIngestJob(Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException { + List errors = new ArrayList<>(); long jobId = nextIngestJobId.incrementAndGet(); IngestJob job = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace); - ingestJobsById.put(jobId, job); - List errors = job.start(); - if (errors.isEmpty()) { - IngestManager.getInstance().fireIngestJobStarted(jobId); - taskScheduler.addTasksForIngestJob(job, dataSource); - } else { - ingestJobsById.remove(jobId); + job.createIngestPipelines(); + if (job.canBeStarted()) { + ingestJobsById.put(jobId, job); + errors = job.start(); + if (errors.isEmpty()) { + IngestManager.getInstance().fireIngestJobStarted(jobId); + } else { + ingestJobsById.remove(jobId); + } } return errors; } @@ -105,11 +108,36 @@ final class IngestJob { return processUnallocatedSpace; } + private void createIngestPipelines() throws InterruptedException { + IngestJobContext context = new IngestJobContext(this); + dataSourceIngestPipeline = new DataSourceIngestPipeline(context, ingestModuleTemplates); + int numberOfPipelines = IngestManager.getInstance().getNumberOfFileIngestThreads(); + for (int i = 0; i < numberOfPipelines; ++i) { + fileIngestPipelines.put(new FileIngestPipeline(context, ingestModuleTemplates)); + } + } + + private boolean canBeStarted() { + if (!dataSourceIngestPipeline.isEmpty()) { + return true; + } + for (FileIngestPipeline pipeline : fileIngestPipelines) { + if (!pipeline.isEmpty()) { + return true; + } + } + return false; + } + private List start() throws InterruptedException { List errors = startUpIngestPipelines(); if (errors.isEmpty()) { - startFileIngestProgressBar(); startDataSourceIngestProgressBar(); + taskScheduler.addDataSourceTask(this, dataSource); + startFileIngestProgressBar(); + if (!taskScheduler.addFileTasks(this, dataSource)) { + finishFileIngestProgressBar(); + } } return errors; } @@ -140,11 +168,11 @@ final class IngestJob { final String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", dataSource.getName()); - dataSourceTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + dataSourceIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - if (dataSourceTasksProgress != null) { - dataSourceTasksProgress.setDisplayName( + if (dataSourceIngestProgress != null) { + dataSourceIngestProgress.setDisplayName( NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", displayName)); @@ -153,19 +181,26 @@ final class IngestJob { return true; } }); - dataSourceTasksProgress.start(); - dataSourceTasksProgress.switchToIndeterminate(); + dataSourceIngestProgress.start(); + dataSourceIngestProgress.switchToIndeterminate(); + } + + private synchronized void finishDataSourceIngestProgressBar() { + if (dataSourceIngestProgress != null) { + dataSourceIngestProgress.finish(); + dataSourceIngestProgress = null; + } } private void startFileIngestProgressBar() { final String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.fileIngest.displayName", dataSource.getName()); - fileTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + fileIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - if (fileTasksProgress != null) { - fileTasksProgress.setDisplayName( + if (fileIngestProgress != null) { + fileIngestProgress.setDisplayName( NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", displayName)); } @@ -174,30 +209,33 @@ final class IngestJob { } }); estimatedFilesToProcess = dataSource.accept(new GetFilesCountVisitor()); - fileTasksProgress.start(); - fileTasksProgress.switchToDeterminate((int) estimatedFilesToProcess); + fileIngestProgress.start(); + fileIngestProgress.switchToDeterminate((int) estimatedFilesToProcess); + } + + private synchronized void finishFileIngestProgressBar() { + if (fileIngestProgress != null) { + fileIngestProgress.finish(); + fileIngestProgress = null; + } } void process(DataSourceIngestTask task) throws InterruptedException { if (!isCancelled()) { List errors = new ArrayList<>(); - errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceTasksProgress)); + errors.addAll(dataSourceIngestPipeline.process(task.getDataSource(), dataSourceIngestProgress)); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } } else { - taskScheduler.removeTasksForIngestJob(id); + taskScheduler.removeQueuedTasksForIngestJob(id); } + // taskScheduler.taskIsCompleted(task); - // Because there is only one data source task per job, it is o.k. to - // call ProgressHandle.finish() now that the data source ingest modules - // are through using the progress bar via the DataSourceIngestModuleProgress wrapper. - // Calling ProgressHandle.finish() again in finish() will be harmless. - dataSourceTasksProgress.finish(); - - if (taskScheduler.isLastTaskForIngestJob(task)) { - finish(); - } + dataSourceIngestProgress.finish(); + // if (!taskScheduler.hasFileIngestTasksForIngestJob()) { + // finish(); + // } } void process(FileIngestTask task) throws InterruptedException { @@ -206,9 +244,9 @@ final class IngestJob { synchronized (this) { ++processedFiles; if (processedFiles <= estimatedFilesToProcess) { - fileTasksProgress.progress(file.getName(), (int) processedFiles); + fileIngestProgress.progress(file.getName(), (int) processedFiles); } else { - fileTasksProgress.progress(file.getName(), (int) estimatedFilesToProcess); + fileIngestProgress.progress(file.getName(), (int) estimatedFilesToProcess); } } FileIngestPipeline pipeline = fileIngestPipelines.take(); @@ -219,12 +257,16 @@ final class IngestJob { logIngestModuleErrors(errors); } } else { - taskScheduler.removeTasksForIngestJob(id); + taskScheduler.removeQueuedTasksForIngestJob(id); } + // taskScheduler.taskIsCompleted(task); - if (taskScheduler.isLastTaskForIngestJob(task)) { - finish(); - } + // if (!taskScheduler.hasFileIngestTasksForIngestJob()) { + // fileIngestProgress.finish(); + // if (!taskScheduler.hasDataSourceTasksForIngestJob()) { + // finish(); + // } + // } } private void finish() { @@ -236,8 +278,6 @@ final class IngestJob { if (!errors.isEmpty()) { logIngestModuleErrors(errors); } - dataSourceTasksProgress.finish(); - fileTasksProgress.finish(); ingestJobsById.remove(id); if (!isCancelled()) { IngestManager.getInstance().fireIngestJobCompleted(id); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index 030a878351..c941303bc7 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -60,7 +60,7 @@ public final class IngestJobContext { */ public void addFiles(List files) { for (AbstractFile file : files) { - IngestScheduler.getInstance().addFileTaskToIngestJob(ingestJob, file); + IngestScheduler.getInstance().addFileTask(ingestJob, file); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java index 9c003895d4..92a9476604 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java @@ -45,7 +45,8 @@ final class IngestScheduler { private final TreeSet rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this private final List directoryTasks = new ArrayList<>(); // Guarded by this private final LinkedBlockingQueue fileTasks = new LinkedBlockingQueue<>(); // Guarded by this - private final List tasksInProgress = new ArrayList<>(); // Guarded by this + private final List dataSourceTasksInProgress = new ArrayList<>(); // Guarded by this + private final List fileTasksInProgress = new ArrayList<>(); // Guarded by this private final DataSourceIngestTaskQueue dataSourceTaskDispenser = new DataSourceIngestTaskQueue(); private final FileIngestTaskQueue fileTaskDispenser = new FileIngestTaskQueue(); @@ -56,23 +57,17 @@ final class IngestScheduler { private IngestScheduler() { } - synchronized void addTasksForIngestJob(IngestJob job, Content dataSource) throws InterruptedException { - // Enqueue a data source ingest task for the data source. - DataSourceIngestTask task = new DataSourceIngestTask(job, dataSource); - try { - dataSourceTasks.put(task); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - logger.log(Level.FINE, "Task scheduling for ingest job interrupted", ex); //NON-NLS - return; - } + void addDataSourceTask(IngestJob job, Content dataSource) throws InterruptedException { + dataSourceTasks.put(new DataSourceIngestTask(job, dataSource)); + } + synchronized boolean addFileTasks(IngestJob job, Content dataSource) throws InterruptedException { // Get the top level files of the data source. Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); - List toptLevelFiles = new ArrayList<>(); + List topLevelFiles = new ArrayList<>(); if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { // The data source is itself a file. - toptLevelFiles.add((AbstractFile) dataSource); + topLevelFiles.add((AbstractFile) dataSource); } else { for (AbstractFile root : rootObjects) { List children; @@ -81,13 +76,13 @@ final class IngestScheduler { if (children.isEmpty()) { // Add the root object itself, it could be an unallocated space // file, or a child of a volume or an image. - toptLevelFiles.add(root); + topLevelFiles.add(root); } else { // The root object is a file system root directory, get // the files within it. for (Content child : children) { if (child instanceof AbstractFile) { - toptLevelFiles.add((AbstractFile) child); + topLevelFiles.add((AbstractFile) child); } } } @@ -97,18 +92,22 @@ final class IngestScheduler { } } - // Enqueue file ingest tasks for the top level files. - for (AbstractFile firstLevelFile : toptLevelFiles) { - FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile); - if (shouldEnqueueFileTask(fileTask)) { - rootDirectoryTasks.add(fileTask); + if (!topLevelFiles.isEmpty()) { + // Enqueue file ingest tasks for the top level files. + for (AbstractFile firstLevelFile : topLevelFiles) { + FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile); + if (shouldEnqueueFileTask(fileTask)) { + rootDirectoryTasks.add(fileTask); + } } - } + updateFileTaskQueues(); + return true; + } else { + return false; + } + } - updateFileTaskQueues(null); - } - - void addFileTaskToIngestJob(IngestJob job, AbstractFile file) { + void addFileTask(IngestJob job, AbstractFile file) { FileIngestTask task = new FileIngestTask(job, file); if (shouldEnqueueFileTask(task)) { try { @@ -120,7 +119,7 @@ final class IngestScheduler { } } - synchronized void removeTasksForIngestJob(long ingestJobId) { + synchronized void removeQueuedTasksForIngestJob(long ingestJobId) { // Remove all tasks for this ingest job that are not in progress. Iterator fileTasksIterator = fileTasks.iterator(); while (fileTasksIterator.hasNext()) { @@ -148,11 +147,7 @@ final class IngestScheduler { } } - private synchronized void updateFileTaskQueues(FileIngestTask taskInProgress) throws InterruptedException { - if (taskInProgress != null) { - tasksInProgress.add(taskInProgress); - } - + private synchronized void updateFileTaskQueues() throws InterruptedException { // we loop because we could have a directory that has all files // that do not get enqueued while (true) { @@ -262,7 +257,7 @@ final class IngestScheduler { return fileTaskDispenser; } - synchronized boolean isLastTaskForIngestJob(IngestTask completedTask) { + synchronized boolean wasLastTaskForIngestJob(IngestTask completedTask) { tasksInProgress.remove(completedTask); IngestJob job = completedTask.getIngestJob(); long jobId = job.getId(); @@ -393,17 +388,20 @@ final class IngestScheduler { private class DataSourceIngestTaskQueue implements IngestTaskQueue { @Override - public IngestTask getNextTask() throws InterruptedException { - return dataSourceTasks.take(); + public IngestTask getNextTask() throws InterruptedException { // RJCTODO: Does this need to be synchronized? + DataSourceIngestTask task = dataSourceTasks.take(); + dataSourceTasksInProgress.add(task); + return task; } } private class FileIngestTaskQueue implements IngestTaskQueue { @Override - public IngestTask getNextTask() throws InterruptedException { + public IngestTask getNextTask() throws InterruptedException { // RJCTODO: Does this need to be synchronized? FileIngestTask task = fileTasks.take(); - updateFileTaskQueues(task); + fileTasksInProgress.add(task); + updateFileTaskQueues(); return task; } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskQueue.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskQueue.java index d18f7047b7..3f2bfc513a 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskQueue.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskQueue.java @@ -18,6 +18,6 @@ */ package org.sleuthkit.autopsy.ingest; -interface IngestTaskQueue { +interface IngestTaskQueue { // RJCTODO: Renmae to IngestTaskScheduler IngestTask getNextTask() throws InterruptedException; }