Preserve intermediate state of improved ingest framework

This commit is contained in:
Richard Cordovano 2014-05-01 09:41:01 -04:00
parent ae1793bfd3
commit a77d566a82
18 changed files with 329 additions and 329 deletions

View File

@ -131,7 +131,8 @@ public final class DirectoryTreeTopComponent extends TopComponent implements Dat
private void setListener() { private void setListener() {
Case.addPropertyChangeListener(this);// add this class to listen to any changes in the Case.java class Case.addPropertyChangeListener(this);// add this class to listen to any changes in the Case.java class
this.em.addPropertyChangeListener(this); this.em.addPropertyChangeListener(this);
IngestManager.addPropertyChangeListener(this); IngestManager.getInstance().addIngestJobEventListener(this);
IngestManager.getInstance().addIngestModuleEventListener(this);
} }
public void setDirectoryListingActive() { public void setDirectoryListingActive() {

View File

@ -60,13 +60,9 @@ IngestMessagePanel.sortByComboBox.model.priority=Priority
IngestMessagesToolbar.customizeButton.toolTipText=Ingest Messages IngestMessagesToolbar.customizeButton.toolTipText=Ingest Messages
IngestMessageTopComponent.initComponents.name=Ingest Inbox IngestMessageTopComponent.initComponents.name=Ingest Inbox
IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest job cancelled. IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg=Unable to start up one or more ingest modules, ingest job cancelled.
IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution=Please disable the failed modules or fix the errors and then restart ingest\ IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution=Please disable the failed modules or fix the errors and then restart ingest \
by right clicking on the data source and selecting Run Ingest Modules. by right clicking on the data source and selecting Run Ingest Modules.
IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList=Errors\:\ IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList=Errors\: \
\ \
{0} {0}
IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle=Ingest Failure IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle=Ingest Failure
IngestManager.StartIngestJobsTask.run.progress.msg1=Data source ingest tasks for {0}
IngestManager.StartIngestJobsTask.run.progress.msg2=Data source ingest tasks for {0}
IngestManager.StartIngestJobsTask.run.progress.msg3=Data source ingest tasks for {0}
IngestManager.StartIngestJobsTask.run.progress.msg4=Data source ingest tasks for {0}

View File

@ -56,10 +56,6 @@ IngestMonitor.mgrErrMsg.lowDiskSpace.msg=\u30c7\u30a3\u30b9\u30af{0}\u306e\u30c7
IngestMonitor.mgrErrMsg.lowDiskSpace.title=\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u304c\u4e2d\u6b62\u3055\u308c\u307e\u3057\u305f\u30fc{0}\u306e\u30c7\u30a3\u30b9\u30af\u9818\u57df\u4e0d\u8db3 IngestMonitor.mgrErrMsg.lowDiskSpace.title=\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u304c\u4e2d\u6b62\u3055\u308c\u307e\u3057\u305f\u30fc{0}\u306e\u30c7\u30a3\u30b9\u30af\u9818\u57df\u4e0d\u8db3
IngestScheduler.DataSourceScheduler.toString.size=DataSourceQueue, \u30b5\u30a4\u30ba\uff1a IngestScheduler.DataSourceScheduler.toString.size=DataSourceQueue, \u30b5\u30a4\u30ba\uff1a
OpenIDE-Module-Name=\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8 OpenIDE-Module-Name=\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8
IngestManager.StartIngestJobsTask.run.progress.msg1={0}\u306e\u30c7\u30fc\u30bf\u30bd\u30fc\u30b9\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u30bf\u30b9\u30af
IngestManager.StartIngestJobsTask.run.progress.msg2={0}\u306e\u30c7\u30fc\u30bf\u30bd\u30fc\u30b9\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u30bf\u30b9\u30af
IngestManager.StartIngestJobsTask.run.progress.msg3={0}\u306e\u30c7\u30fc\u30bf\u30bd\u30fc\u30b9\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u30bf\u30b9\u30af
IngestManager.StartIngestJobsTask.run.progress.msg4={0}\u306e\u30c7\u30fc\u30bf\u30bd\u30fc\u30b9\u30a4\u30f3\u30b8\u30a7\u30b9\u30c8\u30bf\u30b9\u30af
IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList=\u30a8\u30e9\u30fc\uff1a\ IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList=\u30a8\u30e9\u30fc\uff1a\
\ \
{0} {0}

View File

@ -18,16 +18,18 @@
*/ */
package org.sleuthkit.autopsy.ingest; package org.sleuthkit.autopsy.ingest;
import org.netbeans.api.progress.ProgressHandle;
/** /**
* Used by data source ingest modules to report progress. * Used by data source ingest modules to report progress.
*/ */
public class DataSourceIngestModuleProgress { public class DataSourceIngestModuleProgress {
private final IngestJob ingestJob; private final ProgressHandle progress;
private final String moduleDisplayName; private final String moduleDisplayName;
DataSourceIngestModuleProgress(IngestJob ingestJob, String moduleDisplayName) { DataSourceIngestModuleProgress(ProgressHandle progress, String moduleDisplayName) {
this.ingestJob = ingestJob; this.progress = progress;
this.moduleDisplayName = moduleDisplayName; this.moduleDisplayName = moduleDisplayName;
} }
@ -40,7 +42,7 @@ public class DataSourceIngestModuleProgress {
* data source. * data source.
*/ */
public void switchToDeterminate(int workUnits) { public void switchToDeterminate(int workUnits) {
ingestJob.getDataSourceTaskProgressBar().switchToDeterminate(workUnits); progress.switchToDeterminate(workUnits);
} }
/** /**
@ -48,7 +50,7 @@ public class DataSourceIngestModuleProgress {
* the total work units to process the data source is unknown. * the total work units to process the data source is unknown.
*/ */
public void switchToIndeterminate() { public void switchToIndeterminate() {
ingestJob.getDataSourceTaskProgressBar().switchToIndeterminate(); progress.switchToIndeterminate();
} }
/** /**
@ -58,6 +60,6 @@ public class DataSourceIngestModuleProgress {
* @param workUnits Number of work units performed so far by the module. * @param workUnits Number of work units performed so far by the module.
*/ */
public void progress(int workUnits) { public void progress(int workUnits) {
ingestJob.getDataSourceTaskProgressBar().progress(this.moduleDisplayName, workUnits); progress.progress(this.moduleDisplayName, workUnits);
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.netbeans.api.progress.ProgressHandle;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
/** /**
@ -30,12 +31,12 @@ import org.sleuthkit.datamodel.Content;
*/ */
final class DataSourceIngestPipeline { final class DataSourceIngestPipeline {
private final IngestJob job; private final IngestJobContext context;
private final List<IngestModuleTemplate> moduleTemplates; private final List<IngestModuleTemplate> moduleTemplates;
private List<DataSourceIngestModuleDecorator> modules = new ArrayList<>(); private List<DataSourceIngestModuleDecorator> modules = new ArrayList<>();
DataSourceIngestPipeline(IngestJob job, List<IngestModuleTemplate> moduleTemplates) { DataSourceIngestPipeline(IngestJobContext context, List<IngestModuleTemplate> moduleTemplates) {
this.job = job; this.context = context;
this.moduleTemplates = moduleTemplates; this.moduleTemplates = moduleTemplates;
} }
@ -50,7 +51,6 @@ final class DataSourceIngestPipeline {
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) { if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName()); DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName());
IngestJobContext context = new IngestJobContext(job);
try { try {
module.startUp(context); module.startUp(context);
modulesByClass.put(module.getClassName(), module); modulesByClass.put(module.getClassName(), module);
@ -75,26 +75,26 @@ final class DataSourceIngestPipeline {
return errors; return errors;
} }
List<IngestModuleError> process() { List<IngestModuleError> process(Content dataSource, ProgressHandle progress) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : this.modules) { for (DataSourceIngestModuleDecorator module : this.modules) {
try { try {
module.process(job.getDataSource(), new DataSourceIngestModuleProgress(job, module.getDisplayName())); module.process(dataSource, new DataSourceIngestModuleProgress(progress, module.getDisplayName()));
} catch (Exception ex) { } catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
if (job.isCancelled()) { if (context.isJobCancelled()) {
break; break;
} }
} }
return errors; return errors;
} }
List<IngestModuleError> shutDown(boolean ingestJobCancelled) { List<IngestModuleError> shutDown() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : this.modules) { for (DataSourceIngestModuleDecorator module : this.modules) {
try { try {
module.shutDown(ingestJobCancelled); module.shutDown(context.isJobCancelled());
} catch (Exception ex) { } catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }

View File

@ -38,7 +38,7 @@ final class DataSourceIngestTask {
return dataSource; return dataSource;
} }
void execute() { void execute() throws InterruptedException {
ingestJob.process(); ingestJob.process(dataSource);
} }
} }

View File

@ -32,26 +32,30 @@ final class DataSourceIngestTaskScheduler {
private DataSourceIngestTaskScheduler() { private DataSourceIngestTaskScheduler() {
} }
synchronized void addTask(DataSourceIngestTask task) throws InterruptedException { synchronized void addTask(DataSourceIngestTask task) {
task.getIngestJob().notifyTaskPending(); // The capacity of the tasks queue is not bounded, so the call
try { // to put() should not block except for normal synchronized access.
tasks.put(task); // Still, notify the job that the task has been added first so that
} // the take() of the task cannot occur before the notification.
catch (InterruptedException ex) { task.getIngestJob().notifyTaskAdded();
// RJCTOD: Need a safety notification to undo above
// If the thread executing this code is ever interrupted, it is
// because the number of ingest threads has been decreased while
// ingest jobs are running. This thread will exit in an orderly fashion,
// but the task still needs to be enqueued rather than lost.
while (true) {
try {
tasks.put(task);
break;
} catch (InterruptedException ex) {
// Reset the interrupted status of the thread so the orderly
// exit can occur in the intended place.
Thread.currentThread().interrupt();
}
} }
} }
DataSourceIngestTask getNextTask() throws InterruptedException { DataSourceIngestTask getNextTask() throws InterruptedException {
return tasks.take(); return tasks.take();
}
boolean hasTasksForIngestJob(long jobId) {
for (DataSourceIngestTask task : tasks) {
if (task.getIngestJobId() == jobId) {
return true;
}
}
return false;
} }
} }

View File

@ -30,12 +30,12 @@ import org.sleuthkit.datamodel.AbstractFile;
*/ */
final class FileIngestPipeline { final class FileIngestPipeline {
private final IngestJob job; private final IngestJobContext context;
private final List<IngestModuleTemplate> moduleTemplates; private final List<IngestModuleTemplate> moduleTemplates;
private List<FileIngestModuleDecorator> modules = new ArrayList<>(); private List<FileIngestModuleDecorator> modules = new ArrayList<>();
FileIngestPipeline(IngestJob task, List<IngestModuleTemplate> moduleTemplates) { FileIngestPipeline(IngestJobContext context, List<IngestModuleTemplate> moduleTemplates) {
this.job = task; this.context = context;
this.moduleTemplates = moduleTemplates; this.moduleTemplates = moduleTemplates;
} }
@ -50,7 +50,6 @@ final class FileIngestPipeline {
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) { if (template.isFileIngestModuleTemplate()) {
FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName()); FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName());
IngestJobContext context = new IngestJobContext(job);
try { try {
module.startUp(context); module.startUp(context);
modulesByClass.put(module.getClassName(), module); modulesByClass.put(module.getClassName(), module);
@ -83,22 +82,22 @@ final class FileIngestPipeline {
} catch (Exception ex) { } catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
if (job.isCancelled()) { if (context.isJobCancelled()) {
break; break;
} }
} }
file.close(); file.close();
if (!job.isCancelled()) { if (!context.isJobCancelled()) {
IngestManager.fireFileIngestDone(file.getId()); IngestManager.getInstance().fireFileIngestDone(file.getId());
} }
return errors; return errors;
} }
List<IngestModuleError> shutDown(boolean ingestJobCancelled) { List<IngestModuleError> shutDown() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) { for (FileIngestModuleDecorator module : this.modules) {
try { try {
module.shutDown(ingestJobCancelled); module.shutDown(context.isJobCancelled());
} catch (Exception ex) { } catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }

View File

@ -19,17 +19,16 @@
package org.sleuthkit.autopsy.ingest; package org.sleuthkit.autopsy.ingest;
import java.util.Objects; import java.util.Objects;
import java.util.logging.Level;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.TskCoreException;
final class FileIngestTask { final class FileIngestTask {
final AbstractFile file;
private final IngestJob ingestJob;
FileIngestTask(AbstractFile file, IngestJob task) { private final IngestJob ingestJob;
this.file = file; private final AbstractFile file;
FileIngestTask(IngestJob task, AbstractFile file) {
this.ingestJob = task; this.ingestJob = task;
this.file = file;
} }
public IngestJob getIngestJob() { public IngestJob getIngestJob() {
@ -39,30 +38,11 @@ final class FileIngestTask {
public AbstractFile getFile() { public AbstractFile getFile() {
return file; return file;
} }
void execute(long threadId) { void execute() throws InterruptedException {
ingestJob.process(file); ingestJob.process(file);
} }
@Override
public String toString() { //RJCTODO: May not keep this
try {
return "ProcessTask{" + "file=" + file.getId() + ": " + file.getUniquePath() + "}"; // + ", dataSourceTask=" + dataSourceTask + '}';
} catch (TskCoreException ex) {
// RJCTODO
// FileIngestTaskScheduler.logger.log(Level.SEVERE, "Cound not get unique path of file in queue, ", ex); //NON-NLS
}
return "ProcessTask{" + "file=" + file.getId() + ": " + file.getName() + '}';
}
/**
* two process tasks are equal when the file/dir and modules are the
* same this enables are not to queue up the same file/dir, modules
* tuples into the root dir set
*
* @param obj
* @return
*/
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (obj == null) { if (obj == null) {
@ -71,13 +51,11 @@ final class FileIngestTask {
if (getClass() != obj.getClass()) { if (getClass() != obj.getClass()) {
return false; return false;
} }
final FileIngestTask other = (FileIngestTask) obj; FileIngestTask other = (FileIngestTask) obj;
if (this.file != other.file && (this.file == null || !this.file.equals(other.file))) { if (this.ingestJob != other.ingestJob && (this.ingestJob == null || !this.ingestJob.equals(other.ingestJob))) {
return false; return false;
} }
IngestJob thisTask = this.getIngestJob(); if (this.file != other.file && (this.file == null || !this.file.equals(other.file))) {
IngestJob otherTask = other.getIngestJob();
if (thisTask != otherTask && (thisTask == null || !thisTask.equals(otherTask))) {
return false; return false;
} }
return true; return true;
@ -86,8 +64,8 @@ final class FileIngestTask {
@Override @Override
public int hashCode() { public int hashCode() {
int hash = 5; int hash = 5;
hash = 47 * hash + Objects.hashCode(this.file);
hash = 47 * hash + Objects.hashCode(this.ingestJob); hash = 47 * hash + Objects.hashCode(this.ingestJob);
hash = 47 * hash + Objects.hashCode(this.file);
return hash; return hash;
} }
} }

View File

@ -43,23 +43,20 @@ import org.sleuthkit.datamodel.VirtualDirectory;
final class FileIngestTaskScheduler { final class FileIngestTaskScheduler {
private static final Logger logger = Logger.getLogger(FileIngestTaskScheduler.class.getName()); private static final Logger logger = Logger.getLogger(FileIngestTaskScheduler.class.getName());
private static FileIngestTaskScheduler instance; private static FileIngestTaskScheduler instance = new FileIngestTaskScheduler();
private final TreeSet<FileIngestTask> rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); private final TreeSet<FileIngestTask> rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator());
private final List<FileIngestTask> directoryTasks = new ArrayList<>(); private final List<FileIngestTask> directoryTasks = new ArrayList<>();
private final LinkedBlockingQueue<FileIngestTask> fileTasks = new LinkedBlockingQueue<>(); // Unlimited capacity private final LinkedBlockingQueue<FileIngestTask> fileTasks = new LinkedBlockingQueue<>();
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
static synchronized FileIngestTaskScheduler getInstance() { static FileIngestTaskScheduler getInstance() {
if (instance == null) {
instance = new FileIngestTaskScheduler();
}
return instance; return instance;
} }
private FileIngestTaskScheduler() { private FileIngestTaskScheduler() {
} }
synchronized void addTasks(IngestJob dataSourceTask, Content dataSource) { synchronized void addTasks(IngestJob job, Content dataSource) throws InterruptedException {
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
List<AbstractFile> firstLevelFiles = new ArrayList<>(); List<AbstractFile> firstLevelFiles = new ArrayList<>();
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
@ -87,9 +84,9 @@ final class FileIngestTaskScheduler {
} }
} }
for (AbstractFile firstLevelFile : firstLevelFiles) { for (AbstractFile firstLevelFile : firstLevelFiles) {
FileIngestTask fileTask = new FileIngestTask(firstLevelFile, dataSourceTask); FileIngestTask fileTask = new FileIngestTask(job, firstLevelFile);
if (shouldEnqueueTask(fileTask)) { if (shouldEnqueueTask(fileTask)) {
rootDirectoryTasks.add(fileTask); addTaskToRootDirectoryQueue(fileTask);
} }
} }
@ -97,16 +94,9 @@ final class FileIngestTaskScheduler {
updateQueues(); updateQueues();
} }
synchronized void addTask(IngestJob ingestJob, AbstractFile file) { synchronized void addTask(FileIngestTask task) {
try { if (shouldEnqueueTask(task)) {
FileIngestTask fileTask = new FileIngestTask(file, ingestJob); addTaskToFileQueue(task);
if (shouldEnqueueTask(fileTask)) {
fileTask.getIngestJob().notifyTaskPending();
fileTasks.put(fileTask); // Queue has unlimited capacity, does not block.
}
} catch (InterruptedException ex) {
// RJCTODO: Perhaps this is the convenience method?
// RJCTODO: Need undo
} }
} }
@ -116,29 +106,7 @@ final class FileIngestTaskScheduler {
return task; return task;
} }
synchronized boolean hasTasksForJob(long ingestJobId) { private void updateQueues() throws InterruptedException {
for (FileIngestTask task : rootDirectoryTasks) {
if (task.getIngestJob().getJobId() == ingestJobId) {
return true;
}
}
for (FileIngestTask task : directoryTasks) {
if (task.getIngestJob().getJobId() == ingestJobId) {
return true;
}
}
for (FileIngestTask task : fileTasks) {
if (task.getIngestJob().getJobId() == ingestJobId) {
return true;
}
}
return false;
}
private void updateQueues() {
// we loop because we could have a directory that has all files // we loop because we could have a directory that has all files
// that do not get enqueued // that do not get enqueued
while (true) { while (true) {
@ -152,23 +120,15 @@ final class FileIngestTaskScheduler {
if (rootDirectoryTasks.isEmpty()) { if (rootDirectoryTasks.isEmpty()) {
return; return;
} }
FileIngestTask rootTask = this.rootDirectoryTasks.pollFirst(); addTaskToDirectoryQueue(rootDirectoryTasks.pollFirst(), false);
directoryTasks.add(rootTask);
} }
//pop and push AbstractFile directory children if any //pop and push AbstractFile directory children if any
//add the popped and its leaf children onto cur file list //add the popped and its leaf children onto cur file list
FileIngestTask parentTask = directoryTasks.remove(directoryTasks.size() - 1); FileIngestTask parentTask = directoryTasks.remove(directoryTasks.size() - 1);
final AbstractFile parentFile = parentTask.file; final AbstractFile parentFile = parentTask.getFile();
// add itself to the file list // add itself to the file list
if (shouldEnqueueTask(parentTask)) { if (shouldEnqueueTask(parentTask)) {
// RJCTODO addTaskToFileQueue(parentTask);
try {
parentTask.getIngestJob().notifyTaskPending();
fileTasks.put(parentTask);
} catch (InterruptedException ex) {
// RJCTODO: Maybe make a convenience method
// RJCTODO: Need undo
}
} }
// add its children to the file and directory lists // add its children to the file and directory lists
try { try {
@ -176,18 +136,11 @@ final class FileIngestTaskScheduler {
for (Content c : children) { for (Content c : children) {
if (c instanceof AbstractFile) { if (c instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) c; AbstractFile childFile = (AbstractFile) c;
FileIngestTask childTask = new FileIngestTask(childFile, parentTask.getIngestJob()); FileIngestTask childTask = new FileIngestTask(parentTask.getIngestJob(), childFile);
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.directoryTasks.add(childTask); addTaskToDirectoryQueue(childTask, true);
} else if (shouldEnqueueTask(childTask)) { } else if (shouldEnqueueTask(childTask)) {
// RJCTODO addTaskToFileQueue(childTask);
try {
childTask.getIngestJob().notifyTaskPending();
fileTasks.put(childTask);
} catch (InterruptedException ex) {
// RJCTODO: Maybe make a convenience method
// RJCTODO: Need undo
}
} }
} }
} }
@ -197,10 +150,39 @@ final class FileIngestTaskScheduler {
} }
} }
synchronized void emptyQueues() { // RJCTODO: Perhaps clear all... private void addTaskToRootDirectoryQueue(FileIngestTask task) {
this.rootDirectoryTasks.clear(); directoryTasks.add(task);
this.directoryTasks.clear(); task.getIngestJob().notifyTaskAdded();
this.fileTasks.clear(); }
private void addTaskToDirectoryQueue(FileIngestTask task, boolean isNewTask) {
if (isNewTask) {
directoryTasks.add(task);
}
task.getIngestJob().notifyTaskAdded();
}
private void addTaskToFileQueue(FileIngestTask task) {
// The capacity of the file tasks queue is not bounded, so the call
// to put() should not block except for normal synchronized access.
// Still, notify the job that the task has been added first so that
// the take() of the task cannot occur before the notification.
task.getIngestJob().notifyTaskAdded();
// If the thread executing this code is ever interrupted, it is
// because the number of ingest threads has been decreased while
// ingest jobs are running. This thread will exit in an orderly fashion,
// but the task still needs to be enqueued rather than lost.
while (true) {
try {
fileTasks.put(task);
break;
} catch (InterruptedException ex) {
// Reset the interrupted status of the thread so the orderly
// exit can occur in the intended place.
Thread.currentThread().interrupt();
}
}
} }
/** /**
@ -211,7 +193,7 @@ final class FileIngestTaskScheduler {
* @return true if should be enqueued, false otherwise * @return true if should be enqueued, false otherwise
*/ */
private static boolean shouldEnqueueTask(final FileIngestTask processTask) { private static boolean shouldEnqueueTask(final FileIngestTask processTask) {
final AbstractFile aFile = processTask.file; final AbstractFile aFile = processTask.getFile();
//if it's unalloc file, skip if so scheduled //if it's unalloc file, skip if so scheduled
if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) {
return false; return false;
@ -320,10 +302,10 @@ final class FileIngestTaskScheduler {
@Override @Override
public int compare(FileIngestTask q1, FileIngestTask q2) { public int compare(FileIngestTask q1, FileIngestTask q2) {
AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.file); AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile());
AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.file); AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile());
if (p1 == p2) { if (p1 == p2) {
return (int) (q2.file.getId() - q1.file.getId()); return (int) (q2.getFile().getId() - q1.getFile().getId());
} else { } else {
return p2.ordinal() - p1.ordinal(); return p2.ordinal() - p1.ordinal();
} }

View File

@ -24,10 +24,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandle;
import org.netbeans.api.progress.ProgressHandleFactory; import org.netbeans.api.progress.ProgressHandleFactory;
import org.openide.util.Cancellable; import org.openide.util.Cancellable;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
@ -37,33 +39,48 @@ import org.sleuthkit.datamodel.Content;
*/ */
final class IngestJob { final class IngestJob {
private static final Logger logger = Logger.getLogger(IngestManager.class.getName());
private static final AtomicLong nextIngestJobId = new AtomicLong(0L); private static final AtomicLong nextIngestJobId = new AtomicLong(0L);
private static final ConcurrentHashMap<Long, IngestJob> ingestJobs = new ConcurrentHashMap<>(); // Maps job ids to jobs. private static final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
private final long jobId; private final long id;
private final Content dataSource; private final Content rootDataSource;
private final List<IngestModuleTemplate> ingestModuleTemplates; private final List<IngestModuleTemplate> ingestModuleTemplates;
private final boolean processUnallocatedSpace; private final boolean processUnallocatedSpace;
private final LinkedBlockingQueue<DataSourceIngestPipeline> dataSourceIngestPipelines = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<DataSourceIngestPipeline> dataSourceIngestPipelines = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelines = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelines = new LinkedBlockingQueue<>();
private final AtomicInteger tasksInProgress = new AtomicInteger(0); private final AtomicInteger tasksInProgress = new AtomicInteger(0);
private final AtomicLong processedFiles = new AtomicLong(0L); private final AtomicLong processedFiles = new AtomicLong(0L);
private final AtomicLong filesToIngestEstimate = new AtomicLong(0L);
private ProgressHandle dataSourceTasksProgress; private ProgressHandle dataSourceTasksProgress;
private ProgressHandle fileTasksProgress; private ProgressHandle fileTasksProgress;
private long filesToIngestEstimate = 0;
private volatile boolean cancelled; private volatile boolean cancelled;
static List<IngestModuleError> startIngestJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) { // RJCTODO: return errors /**
* Creates an ingest job for a data source and starts up the ingest
* pipelines for the job.
*
* @param rootDataSource The data source to ingest.
* @param ingestModuleTemplates The ingest module templates to use to create
* the ingest pipelines for the job.
* @param processUnallocatedSpace Whether or not the job should include
* processing of unallocated space.
* @return A collection of ingest module startUp up errors, empty on
* success.
* @throws InterruptedException
*/
static List<IngestModuleError> startJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException {
long jobId = nextIngestJobId.incrementAndGet(); long jobId = nextIngestJobId.incrementAndGet();
IngestJob ingestJob = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace); IngestJob ingestJob = new IngestJob(jobId, dataSource, ingestModuleTemplates, processUnallocatedSpace);
List<IngestModuleError> errors = ingestJob.start(); List<IngestModuleError> errors = ingestJob.startUp();
if (errors.isEmpty()) { if (errors.isEmpty()) {
ingestJobs.put(jobId, ingestJob); ingestJobsById.put(jobId, ingestJob);
IngestManager.getInstance().fireIngestJobStarted(jobId);
} }
return errors; return errors;
} }
static boolean jobsAreRunning() { static boolean jobsAreRunning() {
for (IngestJob job : ingestJobs.values()) { for (IngestJob job : ingestJobsById.values()) {
if (!job.isCancelled()) { if (!job.isCancelled()) {
return true; return true;
} }
@ -71,79 +88,72 @@ final class IngestJob {
return false; return false;
} }
static void addFileToIngestJob(long ingestJobId, AbstractFile file) { // RJCTODO: Move back to IngestManager static void addFileToJob(long ingestJobId, AbstractFile file) { // RJCTODO: Just one at a time?
IngestJob job = ingestJobs.get(ingestJobId); IngestJob job = ingestJobsById.get(ingestJobId);
if (job != null) { if (job != null) {
FileIngestTaskScheduler.getInstance().addTask(job, file); // long adjustedFilesCount = job.filesToIngestEstimate.incrementAndGet(); // RJCTODO: Not the best name now?
// job.fileTasksProgress.switchToIndeterminate(); // RJCTODO: Comment this stuff
// job.fileTasksProgress.switchToDeterminate((int) adjustedFilesCount);
// job.fileTasksProgress.progress(job.processedFiles.intValue());
FileIngestTaskScheduler.getInstance().addTask(new FileIngestTask(job, file));
} }
} }
static void cancelAllIngestJobs() { static void cancelAllJobs() {
for (IngestJob job : ingestJobs.values()) { for (IngestJob job : ingestJobsById.values()) {
job.cancel(); job.cancel();
} }
} }
private IngestJob(long id, Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) { private IngestJob(long id, Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) {
this.jobId = id; this.id = id;
this.dataSource = dataSource; this.rootDataSource = dataSource;
this.ingestModuleTemplates = ingestModuleTemplates; this.ingestModuleTemplates = ingestModuleTemplates;
this.processUnallocatedSpace = processUnallocatedSpace; this.processUnallocatedSpace = processUnallocatedSpace;
this.cancelled = false; this.cancelled = false;
} }
long getJobId() { long getId() {
return jobId; return id;
}
Content getDataSource() {
return dataSource;
} }
boolean shouldProcessUnallocatedSpace() { boolean shouldProcessUnallocatedSpace() {
return processUnallocatedSpace; return processUnallocatedSpace;
} }
List<IngestModuleError> start() { List<IngestModuleError> startUp() throws InterruptedException {
List<IngestModuleError> errors = startUpIngestPipelines(); List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) { if (errors.isEmpty()) {
DataSourceIngestTaskScheduler.getInstance().addTask(new DataSourceIngestTask(this, dataSource));
FileIngestTaskScheduler.getInstance().addTasks(this, dataSource);
startDataSourceIngestProgressBar(); startDataSourceIngestProgressBar();
startFileIngestProgressBar(); startFileIngestProgressBar();
FileIngestTaskScheduler.getInstance().addTasks(this, rootDataSource); // RJCTODO: Think about this ordering "solution" for small images
DataSourceIngestTaskScheduler.getInstance().addTask(new DataSourceIngestTask(this, rootDataSource));
} }
return errors; return errors;
} }
private List<IngestModuleError> startUpIngestPipelines() { private List<IngestModuleError> startUpIngestPipelines() throws InterruptedException {
IngestJobContext context = new IngestJobContext(this);
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
int maxNumberOfPipelines = IngestManager.getMaxNumberOfDataSourceIngestThreads(); int maxNumberOfPipelines = IngestManager.getMaxNumberOfDataSourceIngestThreads();
for (int i = 0; i < maxNumberOfPipelines; ++i) { for (int i = 0; i < maxNumberOfPipelines; ++i) {
DataSourceIngestPipeline pipeline = new DataSourceIngestPipeline(this, ingestModuleTemplates); DataSourceIngestPipeline pipeline = new DataSourceIngestPipeline(context, ingestModuleTemplates);
errors.addAll(pipeline.startUp()); errors.addAll(pipeline.startUp());
try { dataSourceIngestPipelines.put(pipeline);
dataSourceIngestPipelines.put(pipeline); if (!errors.isEmpty()) {
} catch (InterruptedException ex) { // No need to accumulate presumably redundant errors.
// RJCTODO: log unexpected block and interrupt, or throw
}
if (errors.isEmpty()) {
// No need to accumulate presumably redundant erros.
break; break;
} }
} }
maxNumberOfPipelines = IngestManager.getMaxNumberOfFileIngestThreads(); maxNumberOfPipelines = IngestManager.getMaxNumberOfFileIngestThreads();
for (int i = 0; i < maxNumberOfPipelines; ++i) { for (int i = 0; i < maxNumberOfPipelines; ++i) {
FileIngestPipeline pipeline = new FileIngestPipeline(this, ingestModuleTemplates); FileIngestPipeline pipeline = new FileIngestPipeline(context, ingestModuleTemplates);
errors.addAll(pipeline.startUp()); errors.addAll(pipeline.startUp());
try { fileIngestPipelines.put(pipeline);
fileIngestPipelines.put(pipeline); if (!errors.isEmpty()) {
} catch (InterruptedException ex) { // No need to accumulate presumably redundant errors.
// RJCTODO: log unexpected block and interrupt, or throw
}
if (errors.isEmpty()) {
// No need to accumulate presumably redundant erros.
break; break;
} }
} }
@ -154,7 +164,7 @@ final class IngestJob {
private void startDataSourceIngestProgressBar() { private void startDataSourceIngestProgressBar() {
final String displayName = NbBundle.getMessage(this.getClass(), final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.displayName", "IngestJob.progress.dataSourceIngest.displayName",
dataSource.getName()); rootDataSource.getName());
dataSourceTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { dataSourceTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override @Override
public boolean cancel() { public boolean cancel() {
@ -169,13 +179,13 @@ final class IngestJob {
} }
}); });
dataSourceTasksProgress.start(); dataSourceTasksProgress.start();
dataSourceTasksProgress.switchToIndeterminate(); // RJCTODO: check out the logic in the pipleine class dataSourceTasksProgress.switchToIndeterminate();
} }
private void startFileIngestProgressBar() { private void startFileIngestProgressBar() {
final String displayName = NbBundle.getMessage(this.getClass(), final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.fileIngest.displayName", "IngestJob.progress.fileIngest.displayName",
dataSource.getName()); rootDataSource.getName());
fileTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { fileTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override @Override
public boolean cancel() { public boolean cancel() {
@ -188,77 +198,78 @@ final class IngestJob {
return true; return true;
} }
}); });
filesToIngestEstimate = dataSource.accept(new GetFilesCountVisitor()); long initialFilesCount = rootDataSource.accept(new GetFilesCountVisitor());
filesToIngestEstimate.getAndAdd(initialFilesCount);
fileTasksProgress.start(); fileTasksProgress.start();
fileTasksProgress.switchToDeterminate((int) filesToIngestEstimate); fileTasksProgress.switchToDeterminate((int) initialFilesCount); // RJCTODO: This cast is troublesome, can use intValue
} }
/** /**
* Called by the ingest task schedulers when an ingest task for this ingest * Called by the ingest task schedulers when an ingest task is added to this
* job is added to the scheduler's task queue. * ingest job.
*/ */
void notifyTaskScheduled() { void notifyTaskAdded() {
// Increment the task counter when a task is scheduled so that there is
// a persistent record of the task's existence even after it is removed
// from the scheduler by an ingest thread. The task counter is used by
// the job to determine when it is done.
tasksInProgress.incrementAndGet(); tasksInProgress.incrementAndGet();
} }
/** void process(Content dataSource) throws InterruptedException {
* Called by the ingest schedulers as an "undo" operation for // If the job is not cancelled, complete the task, otherwise just flush
* notifyTaskScheduled(). // it. In either case, the task counter needs to be decremented and the
*/ // shut down check needs to occur.
void notifyTaskCompleted() { if (!isCancelled()) {
// Decrement the task counter when a task is discarded by a scheduler. List<IngestModuleError> errors = new ArrayList<>();
// The task counter is used by the job to determine when it is done. DataSourceIngestPipeline pipeline = dataSourceIngestPipelines.take();
tasksInProgress.decrementAndGet(); errors.addAll(pipeline.process(dataSource, dataSourceTasksProgress));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
dataSourceIngestPipelines.put(pipeline);
}
shutDownIfAllTasksCompleted();
} }
void process() throws InterruptedException { void process(AbstractFile file) throws InterruptedException {
// If the job is not cancelled, complete the task, otherwise just flush
// it. In either case, the task counter needs to be decremented and the
// shut down check needs to occur.
if (!isCancelled()) { if (!isCancelled()) {
try { List<IngestModuleError> errors = new ArrayList<>();
DataSourceIngestPipeline pipeline = dataSourceIngestPipelines.take(); FileIngestPipeline pipeline = fileIngestPipelines.take();
pipeline.process(); // RJCTODO: Pass data source through? // fileTasksProgress.progress(file.getName(), (int) processedFiles.incrementAndGet()); RJCTODO
dataSourceIngestPipelines.put(pipeline); errors.addAll(pipeline.process(file));
} catch (InterruptedException ex) { fileIngestPipelines.put(pipeline);
// RJCTODO: if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
} }
} }
ifCompletedShutDown(); shutDownIfAllTasksCompleted();
} }
void process(AbstractFile file) { private void shutDownIfAllTasksCompleted() {
if (!isCancelled()) {
try {
FileIngestPipeline pipeline = fileIngestPipelines.take();
fileTasksProgress.progress(file.getName(), (int) processedFiles.incrementAndGet());
pipeline.process(file);
fileIngestPipelines.put(pipeline);
} catch (InterruptedException ex) {
// RJCTODO: Log block and interrupt
}
}
ifCompletedShutDown();
}
void ifCompletedShutDown() {
if (tasksInProgress.decrementAndGet() == 0) { if (tasksInProgress.decrementAndGet() == 0) {
List<IngestModuleError> errors = new ArrayList<>();
while (!dataSourceIngestPipelines.isEmpty()) { while (!dataSourceIngestPipelines.isEmpty()) {
DataSourceIngestPipeline pipeline = dataSourceIngestPipelines.poll(); DataSourceIngestPipeline pipeline = dataSourceIngestPipelines.poll();
pipeline.shutDown(cancelled); errors.addAll(pipeline.shutDown());
} }
while (!fileIngestPipelines.isEmpty()) { while (!fileIngestPipelines.isEmpty()) {
FileIngestPipeline pipeline = fileIngestPipelines.poll(); FileIngestPipeline pipeline = fileIngestPipelines.poll();
pipeline.shutDown(cancelled); errors.addAll(pipeline.shutDown());
} }
ingestJobs.remove(jobId); fileTasksProgress.finish();
IngestManager.getInstance().fireIngestJobCompleted(jobId); dataSourceTasksProgress.finish();
ingestJobsById.remove(id);
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
IngestManager.getInstance().fireIngestJobCompleted(id);
} }
} }
ProgressHandle getDataSourceTaskProgressBar() { private void logIngestModuleErrors(List<IngestModuleError> errors) {
return dataSourceTasksProgress; // RJCTODO: Should just pass the progress handle or the object to the pipeline for (IngestModuleError error : errors) {
logger.log(Level.SEVERE, error.getModuleDisplayName() + " experienced an error", error.getModuleError());
}
} }
boolean isCancelled() { boolean isCancelled() {
@ -267,7 +278,7 @@ final class IngestJob {
void cancel() { void cancel() {
cancelled = true; cancelled = true;
fileTasksProgress.finish(); fileTasksProgress.finish(); // RJCTODO: What about the other progress bar?
IngestManager.getInstance().fireIngestJobCancelled(jobId); IngestManager.getInstance().fireIngestJobCancelled(id);
} }
} }

View File

@ -39,7 +39,7 @@ public final class IngestJobContext {
* @return The ingest job identifier. * @return The ingest job identifier.
*/ */
public long getJobId() { public long getJobId() {
return this.ingestJob.getJobId(); return this.ingestJob.getId();
} }
/** /**
@ -60,7 +60,7 @@ public final class IngestJobContext {
*/ */
public void addFiles(List<AbstractFile> files) { public void addFiles(List<AbstractFile> files) {
for (AbstractFile file : files) { for (AbstractFile file : files) {
IngestJob.addFileToIngestJob(ingestJob.getJobId(), file); IngestJob.addFileToJob(ingestJob.getId(), file);
} }
} }
} }

View File

@ -21,7 +21,9 @@ package org.sleuthkit.autopsy.ingest;
import java.beans.PropertyChangeListener; import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport; import java.beans.PropertyChangeSupport;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -51,14 +53,15 @@ public class IngestManager {
private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); private static final Logger logger = Logger.getLogger(IngestManager.class.getName());
private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class); private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class);
private static final IngestManager instance = new IngestManager(); private static final IngestManager instance = new IngestManager();
private final PropertyChangeSupport pcs = new PropertyChangeSupport(IngestManager.class); private final PropertyChangeSupport ingestJobEventPublisher = new PropertyChangeSupport(IngestManager.class);
private final PropertyChangeSupport ingestModuleEventPublisher = new PropertyChangeSupport(IngestManager.class);
private final IngestMonitor ingestMonitor = new IngestMonitor(); private final IngestMonitor ingestMonitor = new IngestMonitor();
private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor(); private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor();
private final ConcurrentHashMap<Long, Future<?>> startIngestJobThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(); private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor();
private final ConcurrentHashMap<Long, Future<?>> dataSourceIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ExecutorService fileIngestThreadPool = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); private final ExecutorService fileIngestThreadPool = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS);
private final ExecutorService fireIngestJobEventsThreadPool = Executors.newSingleThreadExecutor(); private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor();
private final ConcurrentHashMap<Long, Future<Void>> startIngestJobThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ConcurrentHashMap<Long, Future<?>> dataSourceIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ConcurrentHashMap<Long, Future<?>> fileIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. private final ConcurrentHashMap<Long, Future<?>> fileIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final AtomicLong nextThreadId = new AtomicLong(0L); private final AtomicLong nextThreadId = new AtomicLong(0L);
private volatile IngestMessageTopComponent ingestMessageBox; private volatile IngestMessageTopComponent ingestMessageBox;
@ -154,7 +157,7 @@ public class IngestManager {
*/ */
private void startDataSourceIngestThread() { private void startDataSourceIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
Future<?> handle = dataSourceIngestThreadPool.submit(new DataSourceIngestThread(threadId)); Future<?> handle = dataSourceIngestThreadPool.submit(new DataSourceIngestThread());
dataSourceIngestThreads.put(threadId, handle); dataSourceIngestThreads.put(threadId, handle);
} }
@ -164,7 +167,7 @@ public class IngestManager {
*/ */
private void startFileIngestThread() { private void startFileIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
Future<?> handle = fileIngestThreadPool.submit(new FileIngestThread(threadId)); Future<?> handle = fileIngestThreadPool.submit(new FileIngestThread());
fileIngestThreads.put(threadId, handle); fileIngestThreads.put(threadId, handle);
} }
@ -200,35 +203,48 @@ public class IngestManager {
} }
public void cancelAllIngestJobs() { public void cancelAllIngestJobs() {
cancelStartIngestJobsTasks(); // Stop creating new ingest jobs.
IngestJob.cancelAllIngestJobs(); for (Future<Void> handle : startIngestJobThreads.values()) {
} handle.cancel(true);
try {
private void cancelStartIngestJobsTasks() { // Blocks until the job starting thread responds. The thread
for (Future<?> future : startIngestJobThreads.values()) { // removes itself from this collection, which does not disrupt
future.cancel(true); // this loop since the collection is a ConcurrentHashMap.
handle.get();
} catch (InterruptedException | ExecutionException ex) {
// This should never happen, something is awry, but everything
// should be o.k. anyway.
logger.log(Level.SEVERE, "Unexpected thread interrupt", ex);
}
} }
startIngestJobThreads.clear(); startIngestJobThreads.clear(); // Make sure.
// Cancel all the jobs already created. This will make the the ingest
// threads flush out any lingering ingest tasks without processing them.
IngestJob.cancelAllJobs();
} }
/** /**
* Ingest events. * Ingest events.
*/ */
public enum IngestEvent { // RJCTODO: Update comments if time permits public enum IngestEvent {
/** /**
* Property change event fired when an ingest job is started. The old * Property change event fired when an ingest job is started. The old
* and new values of the PropertyChangeEvent object are set to null. * value of the PropertyChangeEvent object is set to the ingest job id,
* and the new value is set to null.
*/ */
INGEST_JOB_STARTED, INGEST_JOB_STARTED,
/** /**
* Property change event fired when an ingest job is completed. The old * Property change event fired when an ingest job is completed. The old
* and new values of the PropertyChangeEvent object are set to null. * value of the PropertyChangeEvent object is set to the ingest job id,
* and the new value is set to null.
*/ */
INGEST_JOB_COMPLETED, INGEST_JOB_COMPLETED,
/** /**
* Property change event fired when an ingest job is canceled. The old * Property change event fired when an ingest job is canceled. The old
* and new values of the PropertyChangeEvent object are set to null. * value of the PropertyChangeEvent object is set to the ingest job id,
* and the new value is set to null.
*/ */
INGEST_JOB_CANCELLED, INGEST_JOB_CANCELLED,
/** /**
@ -253,21 +269,59 @@ public class IngestManager {
}; };
/** /**
* Add an ingest event property change listener. * Add an ingest job event property change listener.
* *
* @param listener The PropertyChangeListener to register. * @param listener The PropertyChangeListener to register.
*/ */
public void addPropertyChangeListener(final PropertyChangeListener listener) { public void addIngestJobEventListener(final PropertyChangeListener listener) {
pcs.addPropertyChangeListener(listener); ingestJobEventPublisher.addPropertyChangeListener(listener);
} }
/** /**
* Remove an ingest event property change listener. * Remove an ingest job event property change listener.
* *
* @param listener The PropertyChangeListener to unregister. * @param listener The PropertyChangeListener to unregister.
*/ */
public void removePropertyChangeListener(final PropertyChangeListener listener) { public void removeIngestJobEventListener(final PropertyChangeListener listener) {
pcs.removePropertyChangeListener(listener); ingestJobEventPublisher.removePropertyChangeListener(listener);
}
/**
* Add an ingest module event property change listener.
*
* @param listener The PropertyChangeListener to register.
*/
public void addIngestModuleEventListener(final PropertyChangeListener listener) {
ingestModuleEventPublisher.addPropertyChangeListener(listener);
}
/**
* Remove an ingest module event property change listener.
*
* @param listener The PropertyChangeListener to unregister.
*/
public void removeIngestModuleEventListener(final PropertyChangeListener listener) {
ingestModuleEventPublisher.removePropertyChangeListener(listener);
}
/**
* Add an ingest module event property change listener.
*
* @deprecated
* @param listener The PropertyChangeListener to register.
*/
public static void addPropertyChangeListener(final PropertyChangeListener listener) {
instance.ingestModuleEventPublisher.addPropertyChangeListener(listener);
}
/**
* Remove an ingest module event property change listener.
*
* @deprecated
* @param listener The PropertyChangeListener to unregister.
*/
public static void removePropertyChangeListener(final PropertyChangeListener listener) {
instance.ingestModuleEventPublisher.removePropertyChangeListener(listener);
} }
/** /**
@ -276,7 +330,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobStarted(long ingestJobId) { void fireIngestJobStarted(long ingestJobId) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.INGEST_JOB_STARTED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestEvent.INGEST_JOB_STARTED, ingestJobId, null));
} }
/** /**
@ -285,7 +339,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobCompleted(long ingestJobId) { void fireIngestJobCompleted(long ingestJobId) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.INGEST_JOB_COMPLETED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestEvent.INGEST_JOB_COMPLETED, ingestJobId, null));
} }
/** /**
@ -294,7 +348,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobCancelled(long ingestJobId) { void fireIngestJobCancelled(long ingestJobId) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.INGEST_JOB_CANCELLED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestEvent.INGEST_JOB_CANCELLED, ingestJobId, null));
} }
/** /**
@ -303,7 +357,7 @@ public class IngestManager {
* @param fileId The object id of file. * @param fileId The object id of file.
*/ */
void fireFileIngestDone(long fileId) { void fireFileIngestDone(long fileId) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.FILE_DONE, fileId, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestEvent.FILE_DONE, fileId, null));
} }
/** /**
@ -312,7 +366,7 @@ public class IngestManager {
* @param moduleDataEvent A ModuleDataEvent with the details of the posting. * @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/ */
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.DATA, moduleDataEvent, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestEvent.DATA, moduleDataEvent, null));
} }
/** /**
@ -323,7 +377,7 @@ public class IngestManager {
* content. * content.
*/ */
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.CONTENT_CHANGED, moduleContentEvent, null)); fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestEvent.CONTENT_CHANGED, moduleContentEvent, null));
} }
/** /**
@ -341,7 +395,7 @@ public class IngestManager {
* Get the free disk space of the drive where to which ingest data is being * Get the free disk space of the drive where to which ingest data is being
* written, as reported by the ingest monitor. * written, as reported by the ingest monitor.
* *
* @return Free disk space, -1 if unknown // RJCTODO: What units? * @return Free disk space, -1 if unknown
*/ */
long getFreeDiskSpace() { long getFreeDiskSpace() {
if (ingestMonitor != null) { if (ingestMonitor != null) {
@ -352,10 +406,10 @@ public class IngestManager {
} }
/** /**
* A Runnable that creates ingest jobs and submits the initial data source * A Callable that creates ingest jobs and submits the initial data source
* and file ingest tasks to the task schedulers. * and file ingest tasks to the task schedulers.
*/ */
private class StartIngestJobsThread implements Runnable { private class StartIngestJobsThread implements Callable<Void> {
private final long threadId; private final long threadId;
private final List<Content> dataSources; private final List<Content> dataSources;
@ -371,7 +425,7 @@ public class IngestManager {
} }
@Override @Override
public void run() { public Void call() {
try { try {
final String displayName = NbBundle.getMessage(this.getClass(), final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName"); "IngestManager.StartIngestJobsTask.run.displayName");
@ -387,26 +441,25 @@ public class IngestManager {
return true; return true;
} }
}); });
progress.start(dataSources.size() * 2); progress.start(dataSources.size());
if (!ingestMonitor.isRunning()) { if (!ingestMonitor.isRunning()) {
ingestMonitor.start(); ingestMonitor.start();
} }
int workUnitsCompleted = 0; int dataSourceProcessed = 0;
for (Content dataSource : dataSources) { for (Content dataSource : dataSources) {
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
break; break;
} }
// Create an ingest job. // Start an ingest job for the data source.
List<IngestModuleError> errors = IngestJob.startIngestJob(dataSource, moduleTemplates, processUnallocatedSpace); List<IngestModuleError> errors = IngestJob.startJob(dataSource, moduleTemplates, processUnallocatedSpace);
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
// Report the error to the user. // Report the errors to the user. They have already been logged.
StringBuilder moduleStartUpErrors = new StringBuilder(); StringBuilder moduleStartUpErrors = new StringBuilder();
for (IngestModuleError error : errors) { for (IngestModuleError error : errors) {
String moduleName = error.getModuleDisplayName(); String moduleName = error.getModuleDisplayName();
logger.log(Level.SEVERE, "The " + moduleName + " module failed to start up", error.getModuleError()); //NON-NLS
moduleStartUpErrors.append(moduleName); moduleStartUpErrors.append(moduleName);
moduleStartUpErrors.append(": "); moduleStartUpErrors.append(": ");
moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage()); moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage());
@ -427,40 +480,16 @@ public class IngestManager {
NbBundle.getMessage(this.getClass(), NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE); "IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE);
} }
progress.progress(++dataSourceProcessed);
fireIngestJobEventsThreadPool.submit(new FireIngestEventThread(IngestEvent.INGEST_JOB_STARTED));
// Queue a data source ingest task for the ingest job.
final String inputName = dataSource.getName();
progress.progress(
NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsThread.run.progress.msg1",
inputName), workUnitsCompleted);
DataSourceIngestTaskScheduler.getInstance().addTask(new DataSourceIngestTask(ingestJob, ingestJob.getDataSource()));
progress.progress(
NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsThread.run.progress.msg2",
inputName), ++workUnitsCompleted);
// Queue the file ingest tasks for the ingest job.
progress.progress(
NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsThread.run.progress.msg3",
inputName), workUnitsCompleted);
FileIngestTaskScheduler.getInstance().addTasks(ingestJob, ingestJob.getDataSource());
progress.progress(
NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsThread.run.progress.msg4",
inputName), ++workUnitsCompleted);
if (!Thread.currentThread().isInterrupted()) { if (!Thread.currentThread().isInterrupted()) {
break; break;
} }
} }
} catch (Exception ex) {
String message = String.format("StartIngestJobsTask (id=%d) caught exception", threadId); //NON-NLS
logger.log(Level.SEVERE, message, ex);
MessageNotifyUtil.Message.error(
NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.catchException.msg"));
} finally { } finally {
progress.finish(); progress.finish();
startIngestJobThreads.remove(threadId); startIngestJobThreads.remove(threadId);
return null;
} }
} }
} }
@ -512,16 +541,18 @@ public class IngestManager {
} }
/** /**
* A Runnable that fire ingest events to ingest manager property change * A Runnable that fires ingest events to ingest manager property change
* listeners. * listeners.
*/ */
private class FireIngestEventThread implements Runnable { private static class FireIngestEventThread implements Runnable {
private final PropertyChangeSupport publisher;
private final IngestEvent event; private final IngestEvent event;
private final Object oldValue; private final Object oldValue;
private final Object newValue; private final Object newValue;
FireIngestEventThread(IngestEvent event, Object oldValue, Object newValue) { FireIngestEventThread(PropertyChangeSupport publisher, IngestEvent event, Object oldValue, Object newValue) {
this.publisher = publisher;
this.event = event; this.event = event;
this.oldValue = oldValue; this.oldValue = oldValue;
this.newValue = newValue; this.newValue = newValue;
@ -530,10 +561,10 @@ public class IngestManager {
@Override @Override
public void run() { public void run() {
try { try {
pcs.firePropertyChange(event.toString(), oldValue, newValue); publisher.firePropertyChange(event.toString(), oldValue, newValue);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Ingest manager listener threw exception", e); //NON-NLS logger.log(Level.SEVERE, "Ingest manager listener threw exception", e); //NON-NLS
MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), // RJCTODO: Oddly named strings MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"),
NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr.errListenToUpdates.msg"), NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr.errListenToUpdates.msg"),
MessageNotifyUtil.MessageType.ERROR); MessageNotifyUtil.MessageType.ERROR);
} }

View File

@ -95,7 +95,7 @@ public final class IngestServices {
* artifact data * artifact data
*/ */
public void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { public void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) {
IngestManager.fireIngestModuleDataEvent(moduleDataEvent); IngestManager.getInstance().fireIngestModuleDataEvent(moduleDataEvent);
} }
/** /**
@ -107,7 +107,7 @@ public final class IngestServices {
* changed * changed
*/ */
public void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { public void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) {
IngestManager.fireIngestModuleContentEvent(moduleContentEvent); IngestManager.getInstance().fireIngestModuleContentEvent(moduleContentEvent);
} }
/** /**

View File

@ -69,7 +69,7 @@ public final class HashLookupSettingsPanel extends IngestModuleGlobalSettingsPan
// Listen to the ingest modules to refresh the enabled/disabled state of // Listen to the ingest modules to refresh the enabled/disabled state of
// the components in sync with file ingest. // the components in sync with file ingest.
IngestManager.addPropertyChangeListener(new PropertyChangeListener() { IngestManager.getInstance().addIngestJobEventListener(new PropertyChangeListener() {
@Override @Override
public void propertyChange(PropertyChangeEvent evt) { public void propertyChange(PropertyChangeEvent evt) {
if (isIngestJobEvent(evt)) { if (isIngestJobEvent(evt)) {

View File

@ -126,7 +126,7 @@ class KeywordSearchEditListPanel extends javax.swing.JPanel implements ListSelec
setButtonStates(); setButtonStates();
IngestManager.addPropertyChangeListener(new PropertyChangeListener() { IngestManager.getInstance().addIngestJobEventListener(new PropertyChangeListener() {
@Override @Override
public void propertyChange(PropertyChangeEvent evt) { public void propertyChange(PropertyChangeEvent evt) {
String changed = evt.getPropertyName(); String changed = evt.getPropertyName();

View File

@ -118,7 +118,7 @@ class KeywordSearchListsViewerPanel extends AbstractKeywordSearchPerformer {
ingestRunning = IngestManager.getInstance().isIngestRunning(); ingestRunning = IngestManager.getInstance().isIngestRunning();
updateComponents(); updateComponents();
IngestManager.addPropertyChangeListener(new PropertyChangeListener() { IngestManager.getInstance().addIngestJobEventListener(new PropertyChangeListener() {
@Override @Override
public void propertyChange(PropertyChangeEvent evt) { public void propertyChange(PropertyChangeEvent evt) {
String changed = evt.getPropertyName(); String changed = evt.getPropertyName();

0
c:casesSamll2Againautopsy.db Executable file
View File