7673 ngest infrastructure documentation improvements

This commit is contained in:
Richard Cordovano 2021-10-19 14:54:07 -04:00
parent 40be596425
commit 0c1a44ca2f
14 changed files with 168 additions and 172 deletions

View File

@ -26,7 +26,7 @@ import org.sleuthkit.datamodel.DataArtifact;
* A pipeline of data artifact ingest modules used to execute data artifact * A pipeline of data artifact ingest modules used to execute data artifact
* ingest tasks for an ingest job. * ingest tasks for an ingest job.
*/ */
final class DataArtifactIngestPipeline extends IngestTaskPipeline<DataArtifactIngestTask> { final class DataArtifactIngestPipeline extends IngestPipeline<DataArtifactIngestTask> {
/** /**
* Constructs a pipeline of data artifact ingest modules used to execute * Constructs a pipeline of data artifact ingest modules used to execute
@ -37,13 +37,13 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline<DataArtifactIn
* @param moduleTemplates The ingest module templates that define this * @param moduleTemplates The ingest module templates that define this
* pipeline. May be an empty list. * pipeline. May be an empty list.
*/ */
DataArtifactIngestPipeline(IngestModulePipelines ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) { DataArtifactIngestPipeline(IngestJobExecutor ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
super(ingestJobPipeline, moduleTemplates); super(ingestJobPipeline, moduleTemplates);
} }
@Override @Override
Optional<PipelineModule<DataArtifactIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) { Optional<PipelineModule<DataArtifactIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) {
Optional<IngestTaskPipeline.PipelineModule<DataArtifactIngestTask>> module = Optional.empty(); Optional<IngestPipeline.PipelineModule<DataArtifactIngestTask>> module = Optional.empty();
if (template.isDataArtifactIngestModuleTemplate()) { if (template.isDataArtifactIngestModuleTemplate()) {
DataArtifactIngestModule ingestModule = template.createDataArtifactIngestModule(); DataArtifactIngestModule ingestModule = template.createDataArtifactIngestModule();
module = Optional.of(new DataArtifactIngestPipelineModule(ingestModule, template.getModuleName())); module = Optional.of(new DataArtifactIngestPipelineModule(ingestModule, template.getModuleName()));
@ -52,18 +52,18 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline<DataArtifactIn
} }
@Override @Override
void prepareForTask(DataArtifactIngestTask task) throws IngestTaskPipelineException { void prepareForTask(DataArtifactIngestTask task) throws IngestPipelineException {
} }
@Override @Override
void cleanUpAfterTask(DataArtifactIngestTask task) throws IngestTaskPipelineException { void cleanUpAfterTask(DataArtifactIngestTask task) throws IngestPipelineException {
} }
/** /**
* A decorator that adds ingest infrastructure operations to a data artifact * A decorator that adds ingest infrastructure operations to a data artifact
* ingest module. * ingest module.
*/ */
static final class DataArtifactIngestPipelineModule extends IngestTaskPipeline.PipelineModule<DataArtifactIngestTask> { static final class DataArtifactIngestPipelineModule extends IngestPipeline.PipelineModule<DataArtifactIngestTask> {
private final DataArtifactIngestModule module; private final DataArtifactIngestModule module;
@ -80,7 +80,7 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline<DataArtifactIn
} }
@Override @Override
void executeTask(IngestModulePipelines ingestJobPipeline, DataArtifactIngestTask task) throws IngestModuleException { void process(IngestJobExecutor ingestJobPipeline, DataArtifactIngestTask task) throws IngestModuleException {
DataArtifact artifact = task.getDataArtifact(); DataArtifact artifact = task.getDataArtifact();
module.process(artifact); module.process(artifact);
} }

View File

@ -36,7 +36,7 @@ final class DataArtifactIngestTask extends IngestTask {
* task. * task.
* @param artifact The data artifact to be processed. * @param artifact The data artifact to be processed.
*/ */
DataArtifactIngestTask(IngestModulePipelines ingestJobPipeline, DataArtifact artifact) { DataArtifactIngestTask(IngestJobExecutor ingestJobPipeline, DataArtifact artifact) {
super(ingestJobPipeline); super(ingestJobPipeline);
this.artifact = artifact; this.artifact = artifact;
} }

View File

@ -23,9 +23,9 @@ package org.sleuthkit.autopsy.ingest;
*/ */
public class DataSourceIngestModuleProgress { public class DataSourceIngestModuleProgress {
private final IngestModulePipelines ingestJobPipeline; private final IngestJobExecutor ingestJobPipeline;
DataSourceIngestModuleProgress(IngestModulePipelines pipeline) { DataSourceIngestModuleProgress(IngestJobExecutor pipeline) {
this.ingestJobPipeline = pipeline; this.ingestJobPipeline = pipeline;
} }

View File

@ -29,7 +29,7 @@ import org.sleuthkit.datamodel.Content;
* A pipeline of data source level ingest modules for executing data source * A pipeline of data source level ingest modules for executing data source
* level ingest tasks for an ingest job. * level ingest tasks for an ingest job.
*/ */
final class DataSourceIngestPipeline extends IngestTaskPipeline<DataSourceIngestTask> { final class DataSourceIngestPipeline extends IngestPipeline<DataSourceIngestTask> {
private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName()); private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName());
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
@ -42,13 +42,13 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline<DataSourceIngest
* @param moduleTemplates The ingest module templates that define this * @param moduleTemplates The ingest module templates that define this
* pipeline. * pipeline.
*/ */
DataSourceIngestPipeline(IngestModulePipelines ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) { DataSourceIngestPipeline(IngestJobExecutor ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
super(ingestJobPipeline, moduleTemplates); super(ingestJobPipeline, moduleTemplates);
} }
@Override @Override
Optional<IngestTaskPipeline.PipelineModule<DataSourceIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) { Optional<IngestPipeline.PipelineModule<DataSourceIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) {
Optional<IngestTaskPipeline.PipelineModule<DataSourceIngestTask>> module = Optional.empty(); Optional<IngestPipeline.PipelineModule<DataSourceIngestTask>> module = Optional.empty();
if (template.isDataSourceIngestModuleTemplate()) { if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModule ingestModule = template.createDataSourceIngestModule(); DataSourceIngestModule ingestModule = template.createDataSourceIngestModule();
module = Optional.of(new DataSourcePipelineModule(ingestModule, template.getModuleName())); module = Optional.of(new DataSourcePipelineModule(ingestModule, template.getModuleName()));
@ -69,7 +69,7 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline<DataSourceIngest
* A wrapper that adds ingest infrastructure operations to a data source * A wrapper that adds ingest infrastructure operations to a data source
* level ingest module. * level ingest module.
*/ */
static final class DataSourcePipelineModule extends IngestTaskPipeline.PipelineModule<DataSourceIngestTask> { static final class DataSourcePipelineModule extends IngestPipeline.PipelineModule<DataSourceIngestTask> {
private final DataSourceIngestModule module; private final DataSourceIngestModule module;
@ -83,7 +83,7 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline<DataSourceIngest
} }
@Override @Override
void executeTask(IngestModulePipelines ingestJobPipeline, DataSourceIngestTask task) throws IngestModuleException { void process(IngestJobExecutor ingestJobPipeline, DataSourceIngestTask task) throws IngestModuleException {
Content dataSource = task.getDataSource(); Content dataSource = task.getDataSource();
String progressBarDisplayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.displayName", getDisplayName(), dataSource.getName()); String progressBarDisplayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.displayName", getDisplayName(), dataSource.getName());
ingestJobPipeline.updateDataSourceIngestProgressBarDisplayName(progressBarDisplayName); ingestJobPipeline.updateDataSourceIngestProgressBarDisplayName(progressBarDisplayName);

View File

@ -31,7 +31,7 @@ final class DataSourceIngestTask extends IngestTask {
* @param ingestJobPipeline The ingest job pipeline to use to execute the * @param ingestJobPipeline The ingest job pipeline to use to execute the
* task. * task.
*/ */
DataSourceIngestTask(IngestModulePipelines ingestJobPipeline) { DataSourceIngestTask(IngestJobExecutor ingestJobPipeline) {
super(ingestJobPipeline); super(ingestJobPipeline);
} }

View File

@ -39,13 +39,13 @@ import org.sleuthkit.datamodel.TskCoreException;
@NbBundle.Messages({ @NbBundle.Messages({
"FileIngestPipeline_SaveResults_Activity=Saving Results" "FileIngestPipeline_SaveResults_Activity=Saving Results"
}) })
final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> { final class FileIngestPipeline extends IngestPipeline<FileIngestTask> {
private static final int FILE_BATCH_SIZE = 500; private static final int FILE_BATCH_SIZE = 500;
private static final String SAVE_RESULTS_ACTIVITY = Bundle.FileIngestPipeline_SaveResults_Activity(); private static final String SAVE_RESULTS_ACTIVITY = Bundle.FileIngestPipeline_SaveResults_Activity();
private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName()); private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName());
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
private final IngestModulePipelines ingestJobPipeline; private final IngestJobExecutor ingestJobPipeline;
private final List<AbstractFile> fileBatch; private final List<AbstractFile> fileBatch;
/** /**
@ -56,15 +56,15 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
* @param moduleTemplates The ingest module templates that define this * @param moduleTemplates The ingest module templates that define this
* pipeline. * pipeline.
*/ */
FileIngestPipeline(IngestModulePipelines ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) { FileIngestPipeline(IngestJobExecutor ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
super(ingestJobPipeline, moduleTemplates); super(ingestJobPipeline, moduleTemplates);
this.ingestJobPipeline = ingestJobPipeline; this.ingestJobPipeline = ingestJobPipeline;
fileBatch = new ArrayList<>(); fileBatch = new ArrayList<>();
} }
@Override @Override
Optional<IngestTaskPipeline.PipelineModule<FileIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) { Optional<IngestPipeline.PipelineModule<FileIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) {
Optional<IngestTaskPipeline.PipelineModule<FileIngestTask>> module = Optional.empty(); Optional<IngestPipeline.PipelineModule<FileIngestTask>> module = Optional.empty();
if (template.isFileIngestModuleTemplate()) { if (template.isFileIngestModuleTemplate()) {
FileIngestModule ingestModule = template.createFileIngestModule(); FileIngestModule ingestModule = template.createFileIngestModule();
module = Optional.of(new FileIngestPipelineModule(ingestModule, template.getModuleName())); module = Optional.of(new FileIngestPipelineModule(ingestModule, template.getModuleName()));
@ -73,18 +73,18 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
} }
@Override @Override
void prepareForTask(FileIngestTask task) throws IngestTaskPipelineException { void prepareForTask(FileIngestTask task) throws IngestPipelineException {
} }
@Override @Override
void cleanUpAfterTask(FileIngestTask task) throws IngestTaskPipelineException { void cleanUpAfterTask(FileIngestTask task) throws IngestPipelineException {
try { try {
ingestManager.setIngestTaskProgress(task, SAVE_RESULTS_ACTIVITY); ingestManager.setIngestTaskProgress(task, SAVE_RESULTS_ACTIVITY);
AbstractFile file = task.getFile(); AbstractFile file = task.getFile();
file.close(); file.close();
cacheFileForBatchUpdate(file); cacheFileForBatchUpdate(file);
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS throw new IngestPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS
} finally { } finally {
ingestManager.setIngestTaskProgressCompleted(task); ingestManager.setIngestTaskProgressCompleted(task);
} }
@ -96,7 +96,7 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
Date start = new Date(); Date start = new Date();
try { try {
updateBatchedFiles(); updateBatchedFiles();
} catch (IngestTaskPipelineException ex) { } catch (IngestPipelineException ex) {
errors.add(new IngestModuleError(SAVE_RESULTS_ACTIVITY, ex)); errors.add(new IngestModuleError(SAVE_RESULTS_ACTIVITY, ex));
} }
Date finish = new Date(); Date finish = new Date();
@ -113,9 +113,9 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
* *
* @param file The file. * @param file The file.
* *
* @throws IngestTaskPipelineException if the case database update fails. * @throws IngestPipelineException if the case database update fails.
*/ */
private void cacheFileForBatchUpdate(AbstractFile file) throws IngestTaskPipelineException { private void cacheFileForBatchUpdate(AbstractFile file) throws IngestPipelineException {
/* /*
* Only one file ingest thread at a time will try to access the file * Only one file ingest thread at a time will try to access the file
* cache. The synchronization here is to ensure visibility of the files * cache. The synchronization here is to ensure visibility of the files
@ -134,9 +134,9 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
* Updates the case database with new properties added to the files in the * Updates the case database with new properties added to the files in the
* cache by the ingest modules that processed them. * cache by the ingest modules that processed them.
* *
* @throws IngestTaskPipelineException if the case database update fails. * @throws IngestPipelineException if the case database update fails.
*/ */
private void updateBatchedFiles() throws IngestTaskPipelineException { private void updateBatchedFiles() throws IngestPipelineException {
/* /*
* Only one file ingest thread at a time will try to access the file * Only one file ingest thread at a time will try to access the file
* cache. The synchronization here is to ensure visibility of the files * cache. The synchronization here is to ensure visibility of the files
@ -166,7 +166,7 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
logger.log(Level.SEVERE, "Error rolling back transaction after failure to save updated properties for cached files from tasks", ex1); logger.log(Level.SEVERE, "Error rolling back transaction after failure to save updated properties for cached files from tasks", ex1);
} }
} }
throw new IngestTaskPipelineException("Failed to save updated properties for cached files from tasks", ex); //NON-NLS throw new IngestPipelineException("Failed to save updated properties for cached files from tasks", ex); //NON-NLS
} finally { } finally {
fileBatch.clear(); fileBatch.clear();
} }
@ -177,7 +177,7 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
* A wrapper that adds ingest infrastructure operations to a file ingest * A wrapper that adds ingest infrastructure operations to a file ingest
* module. * module.
*/ */
static final class FileIngestPipelineModule extends IngestTaskPipeline.PipelineModule<FileIngestTask> { static final class FileIngestPipelineModule extends IngestPipeline.PipelineModule<FileIngestTask> {
private final FileIngestModule module; private final FileIngestModule module;
@ -195,7 +195,7 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
} }
@Override @Override
void executeTask(IngestModulePipelines ingestJobPipeline, FileIngestTask task) throws IngestModuleException { void process(IngestJobExecutor ingestJobPipeline, FileIngestTask task) throws IngestModuleException {
AbstractFile file = null; AbstractFile file = null;
try { try {
file = task.getFile(); file = task.getFile();

View File

@ -40,7 +40,7 @@ final class FileIngestTask extends IngestTask {
* task. * task.
* @param file The file to be processed. * @param file The file to be processed.
*/ */
FileIngestTask(IngestModulePipelines ingestJobPipeline, AbstractFile file) { FileIngestTask(IngestJobExecutor ingestJobPipeline, AbstractFile file) {
super(ingestJobPipeline); super(ingestJobPipeline);
this.file = file; this.file = file;
fileId = file.getId(); fileId = file.getId();
@ -56,7 +56,7 @@ final class FileIngestTask extends IngestTask {
* task. * task.
* @param fileId The object ID of the file to be processed. * @param fileId The object ID of the file to be processed.
*/ */
FileIngestTask(IngestModulePipelines ingestJobPipeline, long fileId) { FileIngestTask(IngestJobExecutor ingestJobPipeline, long fileId) {
super(ingestJobPipeline); super(ingestJobPipeline);
this.fileId = fileId; this.fileId = fileId;
} }
@ -100,8 +100,8 @@ final class FileIngestTask extends IngestTask {
return false; return false;
} }
FileIngestTask other = (FileIngestTask) obj; FileIngestTask other = (FileIngestTask) obj;
IngestModulePipelines thisPipeline = getIngestJobPipeline(); IngestJobExecutor thisPipeline = getIngestJobPipeline();
IngestModulePipelines otherPipeline = other.getIngestJobPipeline(); IngestJobExecutor otherPipeline = other.getIngestJobPipeline();
if (thisPipeline != otherPipeline && (thisPipeline == null || !thisPipeline.equals(otherPipeline))) { if (thisPipeline != otherPipeline && (thisPipeline == null || !thisPipeline.equals(otherPipeline))) {
return false; return false;
} }

View File

@ -73,7 +73,7 @@ public final class IngestJob {
private final List<AbstractFile> files = new ArrayList<>(); private final List<AbstractFile> files = new ArrayList<>();
private final Mode ingestMode; private final Mode ingestMode;
private final IngestJobSettings settings; private final IngestJobSettings settings;
private volatile IngestModulePipelines ingestModulePipelines; private volatile IngestJobExecutor ingestModulePipelines;
private volatile CancellationReason cancellationReason; private volatile CancellationReason cancellationReason;
/** /**
@ -181,7 +181,7 @@ public final class IngestJob {
return Collections.emptyList(); return Collections.emptyList();
} }
ingestModulePipelines = new IngestModulePipelines(this, dataSource, files, settings); ingestModulePipelines = new IngestJobExecutor(this, dataSource, files, settings);
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(ingestModulePipelines.startUp()); errors.addAll(ingestModulePipelines.startUp());
if (errors.isEmpty()) { if (errors.isEmpty()) {
@ -507,7 +507,7 @@ public final class IngestJob {
*/ */
public static class DataSourceIngestModuleHandle { public static class DataSourceIngestModuleHandle {
private final IngestModulePipelines ingestJobPipeline; private final IngestJobExecutor ingestJobPipeline;
private final DataSourceIngestPipeline.DataSourcePipelineModule module; private final DataSourceIngestPipeline.DataSourcePipelineModule module;
private final boolean cancelled; private final boolean cancelled;
@ -520,7 +520,7 @@ public final class IngestJob {
* source level ingest module. * source level ingest module.
* @param module The data source level ingest module. * @param module The data source level ingest module.
*/ */
private DataSourceIngestModuleHandle(IngestModulePipelines ingestJobPipeline, DataSourceIngestPipeline.DataSourcePipelineModule module) { private DataSourceIngestModuleHandle(IngestJobExecutor ingestJobPipeline, DataSourceIngestPipeline.DataSourcePipelineModule module) {
this.ingestJobPipeline = ingestJobPipeline; this.ingestJobPipeline = ingestJobPipeline;
this.module = module; this.module = module;
this.cancelled = ingestJobPipeline.currentDataSourceIngestModuleIsCancelled(); this.cancelled = ingestJobPipeline.currentDataSourceIngestModuleIsCancelled();

View File

@ -29,7 +29,7 @@ import org.sleuthkit.datamodel.DataArtifact;
*/ */
public final class IngestJobContext { public final class IngestJobContext {
private final IngestModulePipelines ingestJobPipeline; private final IngestJobExecutor ingestJobPipeline;
/** /**
* Constructs an ingest job context object that provides an ingest module * Constructs an ingest job context object that provides an ingest module
@ -37,7 +37,7 @@ public final class IngestJobContext {
* *
* @param ingestJobPipeline The ingest pipeline for the job. * @param ingestJobPipeline The ingest pipeline for the job.
*/ */
IngestJobContext(IngestModulePipelines ingestJobPipeline) { IngestJobContext(IngestJobExecutor ingestJobPipeline) {
this.ingestJobPipeline = ingestJobPipeline; this.ingestJobPipeline = ingestJobPipeline;
} }

View File

@ -59,10 +59,10 @@ import org.sleuthkit.datamodel.DataSource;
* Manages the construction, start up, execution, and shut down of the ingest * Manages the construction, start up, execution, and shut down of the ingest
* module pipelines for an ingest job. * module pipelines for an ingest job.
*/ */
final class IngestModulePipelines { final class IngestJobExecutor {
private static final String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy"; private static final String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy";
private static final Logger logger = Logger.getLogger(IngestModulePipelines.class.getName()); private static final Logger logger = Logger.getLogger(IngestJobExecutor.class.getName());
/* /*
* A regular expression for identifying the proxy classes Jython generates * A regular expression for identifying the proxy classes Jython generates
@ -159,7 +159,7 @@ final class IngestModulePipelines {
* So the stage transition lock is used not to guard the stage field, but to * So the stage transition lock is used not to guard the stage field, but to
* coordinate stage transitions. * coordinate stage transitions.
*/ */
private volatile IngestJobStages stage = IngestModulePipelines.IngestJobStages.PIPELINES_START_UP; private volatile IngestJobStages stage = IngestJobExecutor.IngestJobStages.PIPELINES_START_UP;
private final Object stageTransitionLock = new Object(); private final Object stageTransitionLock = new Object();
/* /*
@ -238,7 +238,7 @@ final class IngestModulePipelines {
* @throws InterruptedException Exception thrown if the thread in which the * @throws InterruptedException Exception thrown if the thread in which the
* pipeline is being created is interrupted. * pipeline is being created is interrupted.
*/ */
IngestModulePipelines(IngestJob ingestJob, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) throws InterruptedException { IngestJobExecutor(IngestJob ingestJob, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) throws InterruptedException {
if (!(dataSource instanceof DataSource)) { if (!(dataSource instanceof DataSource)) {
throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS
} }
@ -594,7 +594,7 @@ final class IngestModulePipelines {
* *
* @return A list of ingest module startup errors, empty on success. * @return A list of ingest module startup errors, empty on success.
*/ */
private List<IngestModuleError> startUpIngestModulePipeline(IngestTaskPipeline<?> pipeline) { private List<IngestModuleError> startUpIngestModulePipeline(IngestPipeline<?> pipeline) {
List<IngestModuleError> startUpErrors = pipeline.startUp(); List<IngestModuleError> startUpErrors = pipeline.startUp();
if (!startUpErrors.isEmpty()) { if (!startUpErrors.isEmpty()) {
List<IngestModuleError> shutDownErrors = pipeline.shutDown(); List<IngestModuleError> shutDownErrors = pipeline.shutDown();
@ -776,7 +776,7 @@ final class IngestModulePipelines {
void startStreamingModeDataSrcAnalysis() { void startStreamingModeDataSrcAnalysis() {
synchronized (stageTransitionLock) { synchronized (stageTransitionLock) {
logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS
stage = IngestModulePipelines.IngestJobStages.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; stage = IngestJobExecutor.IngestJobStages.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS;
currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline;
if (hasFileIngestModules()) { if (hasFileIngestModules()) {
@ -809,7 +809,7 @@ final class IngestModulePipelines {
currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline;
if (hasHighPriorityDataSourceIngestModules()) { if (hasHighPriorityDataSourceIngestModules()) {
IngestModulePipelines.taskScheduler.scheduleDataSourceIngestTask(this); IngestJobExecutor.taskScheduler.scheduleDataSourceIngestTask(this);
} else { } else {
/* /*
* If no data source level ingest task is scheduled at this time * If no data source level ingest task is scheduled at this time
@ -831,7 +831,7 @@ final class IngestModulePipelines {
synchronized (stageTransitionLock) { synchronized (stageTransitionLock) {
if (hasLowPriorityDataSourceIngestModules()) { if (hasLowPriorityDataSourceIngestModules()) {
logInfoMessage(String.format("Starting low priority data source analysis for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), ingestJob.getId())); //NON-NLS logInfoMessage(String.format("Starting low priority data source analysis for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), ingestJob.getId())); //NON-NLS
stage = IngestModulePipelines.IngestJobStages.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; stage = IngestJobExecutor.IngestJobStages.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS;
if (usingNetBeansGUI) { if (usingNetBeansGUI) {
startDataSourceIngestProgressBar(); startDataSourceIngestProgressBar();
@ -857,7 +857,7 @@ final class IngestModulePipelines {
artifactIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { artifactIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() {
@Override @Override
public boolean cancel() { public boolean cancel() {
IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
return true; return true;
} }
}); });
@ -891,12 +891,12 @@ final class IngestModulePipelines {
* ingest job. * ingest job.
*/ */
DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel(); DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel();
String dialogTitle = NbBundle.getMessage(IngestModulePipelines.this.getClass(), "IngestJob.cancellationDialog.title"); String dialogTitle = NbBundle.getMessage(IngestJobExecutor.this.getClass(), "IngestJob.cancellationDialog.title");
JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE); JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE);
if (panel.cancelAllDataSourceIngestModules()) { if (panel.cancelAllDataSourceIngestModules()) {
IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
} else { } else {
IngestModulePipelines.this.cancelCurrentDataSourceIngestModule(); IngestJobExecutor.this.cancelCurrentDataSourceIngestModule();
} }
return true; return true;
} }
@ -921,7 +921,7 @@ final class IngestModulePipelines {
fileIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { fileIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() {
@Override @Override
public boolean cancel() { public boolean cancel() {
IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED);
return true; return true;
} }
}); });
@ -998,7 +998,7 @@ final class IngestModulePipelines {
private void shutDown() { private void shutDown() {
synchronized (stageTransitionLock) { synchronized (stageTransitionLock) {
logInfoMessage("Finished all tasks"); //NON-NLS logInfoMessage("Finished all tasks"); //NON-NLS
stage = IngestModulePipelines.IngestJobStages.PIPELINES_SHUT_DOWN; stage = IngestJobExecutor.IngestJobStages.PIPELINES_SHUT_DOWN;
shutDownIngestModulePipeline(currentDataSourceIngestPipeline); shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
shutDownIngestModulePipeline(artifactIngestPipeline); shutDownIngestModulePipeline(artifactIngestPipeline);
@ -1056,7 +1056,7 @@ final class IngestModulePipelines {
* *
* @param pipeline The pipeline. * @param pipeline The pipeline.
*/ */
private <T extends IngestTask> void shutDownIngestModulePipeline(IngestTaskPipeline<T> pipeline) { private <T extends IngestTask> void shutDownIngestModulePipeline(IngestPipeline<T> pipeline) {
if (pipeline.isRunning()) { if (pipeline.isRunning()) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(pipeline.shutDown()); errors.addAll(pipeline.shutDown());
@ -1076,7 +1076,7 @@ final class IngestModulePipelines {
try { try {
if (!isCancelled()) { if (!isCancelled()) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(currentDataSourceIngestPipeline.executeTask(task)); errors.addAll(currentDataSourceIngestPipeline.performTask(task));
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
@ -1130,7 +1130,7 @@ final class IngestModulePipelines {
* Run the file through the modules in the pipeline. * Run the file through the modules in the pipeline.
*/ */
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(pipeline.executeTask(task)); errors.addAll(pipeline.performTask(task));
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors, file); logIngestModuleErrors(errors, file);
} }
@ -1171,7 +1171,7 @@ final class IngestModulePipelines {
try { try {
if (!isCancelled() && !artifactIngestPipeline.isEmpty()) { if (!isCancelled() && !artifactIngestPipeline.isEmpty()) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(artifactIngestPipeline.executeTask(task)); errors.addAll(artifactIngestPipeline.performTask(task));
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }
@ -1191,7 +1191,7 @@ final class IngestModulePipelines {
void addStreamedFiles(List<Long> fileObjIds) { void addStreamedFiles(List<Long> fileObjIds) {
if (hasFileIngestModules()) { if (hasFileIngestModules()) {
if (stage.equals(IngestJobStages.STREAMED_FILE_ANALYSIS_ONLY)) { if (stage.equals(IngestJobStages.STREAMED_FILE_ANALYSIS_ONLY)) {
IngestModulePipelines.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); IngestJobExecutor.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds);
} else { } else {
logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
} }
@ -1412,7 +1412,7 @@ final class IngestModulePipelines {
void cancel(IngestJob.CancellationReason reason) { void cancel(IngestJob.CancellationReason reason) {
jobCancelled = true; jobCancelled = true;
cancellationReason = reason; cancellationReason = reason;
IngestModulePipelines.taskScheduler.cancelPendingFileTasksForIngestJob(this); IngestJobExecutor.taskScheduler.cancelPendingFileTasksForIngestJob(this);
if (usingNetBeansGUI) { if (usingNetBeansGUI) {
synchronized (dataSourceIngestProgressLock) { synchronized (dataSourceIngestProgressLock) {

View File

@ -33,21 +33,24 @@ import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
/** /**
* An abstract superclass for pipelines of ingest modules that execute ingest * An abstract superclass for pipelines of ingest modules that perform the
* tasks for an ingest job. Subclasses need to extend this class and to * ingest tasks that make up an ingest job. A pipeline performs a task by
* implement a specialization of the inner PipelineModule abstract superclass. * passing it sequentially to the process() method of each module in the
* pipeline.
* *
* NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use * @param <T> The type of ingest tasks the pipeline performs.
* by one thread at a time. There are a few status fields that are volatile to
* ensure visibility to threads making ingest progress snapshots, but methods
* such as startUp(), executeTask() and shutDown() are not synchronized.
*
* @param <T> The ingest task type.
*/ */
abstract class IngestTaskPipeline<T extends IngestTask> { abstract class IngestPipeline<T extends IngestTask> {
private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName()); /*
private final IngestModulePipelines ingestJobPipeline; * NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use
* by one thread at a time. There are a few status fields that are volatile
* to ensure visibility to threads making ingest progress snapshots, but
* methods such as startUp(), performTask() and shutDown() are not
* synchronized.
*/
private static final Logger logger = Logger.getLogger(IngestPipeline.class.getName());
private final IngestJobExecutor ingestJobExecutor;
private final List<IngestModuleTemplate> moduleTemplates; private final List<IngestModuleTemplate> moduleTemplates;
private final List<PipelineModule<T>> modules; private final List<PipelineModule<T>> modules;
private volatile Date startTime; private volatile Date startTime;
@ -56,38 +59,34 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
/** /**
* Constructs the superclass part of a pipeline of ingest modules that * Constructs the superclass part of a pipeline of ingest modules that
* executes ingest tasks for an ingest job. * performs ingest tasks for an ingest job.
* *
* @param ingestPipeline The parent ingest job pipeline for this ingest * @param ingestJobExecutor The ingest job executor that owns this pipeline.
* task pipeline. * @param moduleTemplates The ingest module templates to be used to
* @param moduleTemplates The ingest module templates that define this * construct the ingest modules for this pipeline.
* ingest task pipeline. May be an empty list. * May be an empty list if this type of pipeline is
* not needed for the ingest job.
*/ */
IngestTaskPipeline(IngestModulePipelines ingestPipeline, List<IngestModuleTemplate> moduleTemplates) { IngestPipeline(IngestJobExecutor ingestJobExecutor, List<IngestModuleTemplate> moduleTemplates) {
this.ingestJobPipeline = ingestPipeline; this.ingestJobExecutor = ingestJobExecutor;
/*
* The creation of ingest modules from the ingest module templates has
* been deliberately deferred to the startUp() method so that any and
* all errors in module construction or start up can be reported to the
* client code.
*/
this.moduleTemplates = moduleTemplates; this.moduleTemplates = moduleTemplates;
modules = new ArrayList<>(); modules = new ArrayList<>();
} }
/** /**
* Indicates whether or not there are any ingest modules in this ingest task * Indicates whether or not there are any ingest modules in this ingest
* pipeline. * pipeline.
* *
* @return True or false. * @return True or false; always true before startUp() is called.
*/ */
boolean isEmpty() { boolean isEmpty() {
return modules.isEmpty(); return modules.isEmpty();
} }
/** /**
* Queries whether or not this ingest task pipeline is running, i.e., the * Queries whether or not this ingest pipeline is running, i.e., the
* startUp() method has been called and the shutDown() has not been called. * startUp() method has been called and the shutDown() method has not been
* called yet.
* *
* @return True or false. * @return True or false.
*/ */
@ -96,8 +95,8 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* Starts up this ingest task pipeline by calling the startUp() methods of * Starts up this ingest pipeline by calling the startUp() methods of the
* the ingest modules in the pipeline. * ingest modules in the pipeline.
* *
* @return A list of ingest module start up errors, possibly empty. * @return A list of ingest module start up errors, possibly empty.
*/ */
@ -110,21 +109,19 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
* any and all errors in module construction or start up can be * any and all errors in module construction or start up can be
* reported to the client code. * reported to the client code.
*/ */
createIngestModules(moduleTemplates); createIngestModules();
errors.addAll(startUpIngestModules()); errors.addAll(startUpIngestModules());
} else { } else {
errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline already started"))); //NON-NLS errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline already started"))); //NON-NLS
} }
return errors; return errors;
} }
/** /**
* Creates the ingest modules for this ingest task pipeline from the given * Creates the ingest modules for this ingest pipeline using its ingest
* ingest module templates. * module templates.
*
* @param moduleTemplates The ingest module templates.
*/ */
private void createIngestModules(List<IngestModuleTemplate> moduleTemplates) { private void createIngestModules() {
if (modules.isEmpty()) { if (modules.isEmpty()) {
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
Optional<PipelineModule<T>> module = acceptModuleTemplate(template); Optional<PipelineModule<T>> module = acceptModuleTemplate(template);
@ -137,8 +134,8 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
/** /**
* Determines if one of the types of ingest modules that can be created from * Determines if one of the types of ingest modules that can be created from
* a given ingest module template should be added to this ingest task * a given ingest module template should be added to this ingest pipeline.
* pipeline. If so, the ingest module is created and returned. * If so, the ingest module is created and returned.
* *
* @param template The ingest module template to be used or ignored, as * @param template The ingest module template to be used or ignored, as
* appropriate to the pipeline type. * appropriate to the pipeline type.
@ -149,7 +146,7 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate template); abstract Optional<PipelineModule<T>> acceptModuleTemplate(IngestModuleTemplate template);
/** /**
* Starts up the ingest modules in this ingest task pipeline. * Starts up the ingest modules in this ingest pipeline.
* *
* @return A list of ingest module start up errors, possibly empty. * @return A list of ingest module start up errors, possibly empty.
*/ */
@ -159,7 +156,7 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
running = true; running = true;
for (PipelineModule<T> module : modules) { for (PipelineModule<T> module : modules) {
try { try {
module.startUp(new IngestJobContext(ingestJobPipeline)); module.startUp(new IngestJobContext(ingestJobExecutor));
} catch (Throwable ex) { } catch (Throwable ex) {
/* /*
* A catch-all exception firewall. Start up errors for all of * A catch-all exception firewall. Start up errors for all of
@ -174,10 +171,10 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* Returns the start up time of this ingest task pipeline. * Returns the start up time of this ingest pipeline.
* *
* @return The file processing start time, may be null if this pipeline has * @return The start up time, may be null if this pipeline has not been
* not been started yet. * started yet.
*/ */
Date getStartTime() { Date getStartTime() {
Date reportedStartTime = null; Date reportedStartTime = null;
@ -188,65 +185,66 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* Executes an ingest task by calling the process() methods of the ingest * Performs an ingest task by sequentially calling the process() methods of
* modules in this ingest task pipeline. * the ingest modules in this ingest pipeline.
* *
* @param task The task. * @param task The task.
* *
* @return A list of ingest module task processing errors, possibly empty. * @return A list of ingest module processing errors, possibly empty.
*/ */
List<IngestModuleError> executeTask(T task) { List<IngestModuleError> performTask(T task) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
if (running) { if (running) {
if (!ingestJobPipeline.isCancelled()) { if (!ingestJobExecutor.isCancelled()) {
pauseIfScheduled(); pauseIfScheduled();
if (ingestJobPipeline.isCancelled()) { if (ingestJobExecutor.isCancelled()) {
return errors; return errors;
} }
try { try {
prepareForTask(task); prepareForTask(task);
} catch (IngestTaskPipelineException ex) { } catch (IngestPipelineException ex) {
errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
return errors; return errors;
} }
for (PipelineModule<T> module : modules) { for (PipelineModule<T> module : modules) {
pauseIfScheduled(); pauseIfScheduled();
if (ingestJobPipeline.isCancelled()) { if (ingestJobExecutor.isCancelled()) {
break; break;
} }
try { try {
currentModule = module; currentModule = module;
currentModule.setProcessingStartTime(); currentModule.setProcessingStartTime();
module.executeTask(ingestJobPipeline, task); module.process(ingestJobExecutor, task);
} catch (Throwable ex) { } catch (Throwable ex) { // Catch-all exception firewall
/* /*
* A catch-all exception firewall. Note that a runtime * Note that an exception from a module does not stop
* exception from a single module does not stop
* processing of the task by the other modules in the * processing of the task by the other modules in the
* pipeline. * pipeline.
*/ */
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
if (ingestJobPipeline.isCancelled()) { if (ingestJobExecutor.isCancelled()) {
break; break;
} }
} }
} }
try { try {
cleanUpAfterTask(task); cleanUpAfterTask(task);
} catch (IngestTaskPipelineException ex) { } catch (IngestPipelineException ex) {
errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS
} }
} else { } else {
errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline not started or shut down"))); //NON-NLS errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline not started or shut down"))); //NON-NLS
} }
currentModule = null; currentModule = null;
return errors; return errors;
} }
/** /**
* Pauses task execution if ingest has been configured to be paused weekly * Pauses this pipeline if ingest has been configured to be paused weekly at
* at a specified time for a specified duration. * a specified time, for a specified duration. A pipeline can only be paused
* between calls to module process() methods, i.e., the individual modules
* themselves cannot be paused in the middle of processing a task.
*/ */
private void pauseIfScheduled() { private void pauseIfScheduled() {
if (ScheduledIngestPauseSettings.getPauseEnabled() == true) { if (ScheduledIngestPauseSettings.getPauseEnabled() == true) {
@ -278,7 +276,7 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
*/ */
LocalDateTime timeNow = LocalDateTime.now(); LocalDateTime timeNow = LocalDateTime.now();
if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) { if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) {
ingestJobPipeline.registerPausedIngestThread(Thread.currentThread()); ingestJobExecutor.registerPausedIngestThread(Thread.currentThread());
try { try {
long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd); long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd);
logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis))); logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis)));
@ -287,27 +285,27 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} catch (InterruptedException notLogged) { } catch (InterruptedException notLogged) {
logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now())); logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now()));
} finally { } finally {
ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread()); ingestJobExecutor.unregisterPausedIngestThread(Thread.currentThread());
} }
} }
} }
} }
/** /**
* Does any task type specific preparation required before executing an * Does any task-type-specific preparation required before performing an
* ingest task. * ingest task.
* *
* @param task The task. * @param task The task.
* *
* @throws IngestTaskPipelineException Thrown if there is an error preparing * @throws IngestPipelineException Thrown if there is an error preparing to
* to execute the task. * perform the task.
*/ */
abstract void prepareForTask(T task) throws IngestTaskPipelineException; abstract void prepareForTask(T task) throws IngestPipelineException;
/** /**
* Gets the currently running ingest module. * Gets the currently running ingest module.
* *
* @return The module, possibly null if no module is currently running. * @return The module, possibly null, if no module is currently running.
*/ */
PipelineModule<T> getCurrentlyRunningModule() { PipelineModule<T> getCurrentlyRunningModule() {
return currentModule; return currentModule;
@ -345,22 +343,22 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* Does any task type specific clean up required after executing an ingest * Does any task-type-specific clean up required after performing an ingest
* task. * task.
* *
* @param task The task. * @param task The task.
* *
* @throws IngestTaskPipelineException Thrown if there is an error cleaning * @throws IngestPipelineException Thrown if there is an error cleaning up
* up after performing the task. * after performing the task.
*/ */
abstract void cleanUpAfterTask(T task) throws IngestTaskPipelineException; abstract void cleanUpAfterTask(T task) throws IngestPipelineException;
/** /**
* An abstract superclass for a decorator that adds ingest infrastructure * An abstract superclass for an ingest module decorator that adds ingest
* operations to an ingest module. * infrastructure operations to the process() method of an ingest module.
* * Subclasses of IngestPipeline need to provide a concrete implementation of
* IMPORTANT: Subclasses of IngestTaskPipeline need to implement a * this class that provides the additional operations that the pipeline
* specialization this class * requires.
*/ */
static abstract class PipelineModule<T extends IngestTask> implements IngestModule { static abstract class PipelineModule<T extends IngestTask> implements IngestModule {
@ -369,8 +367,9 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
private volatile Date processingStartTime; private volatile Date processingStartTime;
/** /**
* Constructs an instance of an abstract superclass for a decorator that * Constructs an instance of an abstract superclass for an ingest module
* adds ingest infrastructure operations to an ingest module. * decorator that adds ingest infrastructure operations to the process()
* method of an ingest module.
* *
* @param module The ingest module to be wrapped. * @param module The ingest module to be wrapped.
* @param displayName The display name for the module. * @param displayName The display name for the module.
@ -378,7 +377,7 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
PipelineModule(IngestModule module, String displayName) { PipelineModule(IngestModule module, String displayName) {
this.module = module; this.module = module;
this.displayName = displayName; this.displayName = displayName;
this.processingStartTime = new Date(); processingStartTime = new Date();
} }
/** /**
@ -410,8 +409,8 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
/** /**
* Gets the the processing start time for the decorated module. * Gets the the processing start time for the decorated module.
* *
* @return The start time, will be null if the module has not started * @return The start time, not valid if setProcessingStartTime() has not
* processing the data source yet. * been called first.
*/ */
Date getProcessingStartTime() { Date getProcessingStartTime() {
return new Date(processingStartTime.getTime()); return new Date(processingStartTime.getTime());
@ -423,17 +422,17 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* Executes an ingest task using the process() method of the decorated * Performs an ingest task using the process() method of the decorated
* module. * module.
* *
* @param ingestJobPipeline The ingest job pipeline that owns the ingest * @param ingestJobExecutor The ingest job executor that owns the ingest
* task pipeline this module belongs to. * pipeline to which this module belongs.
* @param task The task to execute. * @param task The task to perform.
* *
* @throws IngestModuleException Exception thrown if there is an error * @throws IngestModuleException Exception thrown if there is an error
* performing the task. * performing the task.
*/ */
abstract void executeTask(IngestModulePipelines ingestJobPipeline, T task) throws IngestModuleException; abstract void process(IngestJobExecutor ingestJobExecutor, T task) throws IngestModuleException;
@Override @Override
public void shutDown() { public void shutDown() {
@ -443,28 +442,28 @@ abstract class IngestTaskPipeline<T extends IngestTask> {
} }
/** /**
* An exception thrown by an ingest task pipeline. * An exception thrown by an ingest pipeline.
*/ */
public static class IngestTaskPipelineException extends Exception { static class IngestPipelineException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
/** /**
* Constructs an exception to be thrown by an ingest task pipeline. * Constructs an exception to be thrown by an ingest pipeline.
* *
* @param message The exception message. * @param message The exception message.
*/ */
public IngestTaskPipelineException(String message) { IngestPipelineException(String message) {
super(message); super(message);
} }
/** /**
* Constructs an exception to be thrown by an ingest task pipeline. * Constructs an exception to be thrown by an ingest pipeline.
* *
* @param message The exception message. * @param message The exception message.
* @param cause The exception cause. * @param cause The exception cause.
*/ */
public IngestTaskPipelineException(String message, Throwable cause) { IngestPipelineException(String message, Throwable cause) {
super(message, cause); super(message, cause);
} }

View File

@ -29,7 +29,7 @@ import org.sleuthkit.datamodel.Content;
abstract class IngestTask { abstract class IngestTask {
private final static long NOT_SET = Long.MIN_VALUE; private final static long NOT_SET = Long.MIN_VALUE;
private final IngestModulePipelines ingestJobPipeline; private final IngestJobExecutor ingestJobPipeline;
private long threadId; private long threadId;
/** /**
@ -41,7 +41,7 @@ abstract class IngestTask {
* @param ingestJobPipeline The ingest job pipeline to use to execute the * @param ingestJobPipeline The ingest job pipeline to use to execute the
* task. * task.
*/ */
IngestTask(IngestModulePipelines ingestJobPipeline) { IngestTask(IngestJobExecutor ingestJobPipeline) {
this.ingestJobPipeline = ingestJobPipeline; this.ingestJobPipeline = ingestJobPipeline;
threadId = NOT_SET; threadId = NOT_SET;
} }
@ -51,7 +51,7 @@ abstract class IngestTask {
* *
* @return The ingest job pipeline. * @return The ingest job pipeline.
*/ */
IngestModulePipelines getIngestJobPipeline() { IngestJobExecutor getIngestJobPipeline() {
return ingestJobPipeline; return ingestJobPipeline;
} }

View File

@ -138,7 +138,7 @@ final class IngestTasksScheduler {
* task to the pipeline for processing by the * task to the pipeline for processing by the
* pipeline's ingest modules. * pipeline's ingest modules.
*/ */
synchronized void scheduleIngestTasks(IngestModulePipelines ingestPipeline) { synchronized void scheduleIngestTasks(IngestJobExecutor ingestPipeline) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
if (ingestPipeline.hasDataSourceIngestModules()) { if (ingestPipeline.hasDataSourceIngestModules()) {
scheduleDataSourceIngestTask(ingestPipeline); scheduleDataSourceIngestTask(ingestPipeline);
@ -163,7 +163,7 @@ final class IngestTasksScheduler {
* task to the pipeline for processing by the * task to the pipeline for processing by the
* pipeline's ingest modules. * pipeline's ingest modules.
*/ */
synchronized void scheduleDataSourceIngestTask(IngestModulePipelines ingestPipeline) { synchronized void scheduleDataSourceIngestTask(IngestJobExecutor ingestPipeline) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline); DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline);
try { try {
@ -190,7 +190,7 @@ final class IngestTasksScheduler {
* empty, then all if the files from the data source * empty, then all if the files from the data source
* are candidates for scheduling. * are candidates for scheduling.
*/ */
synchronized void scheduleFileIngestTasks(IngestModulePipelines ingestPipeline, Collection<AbstractFile> files) { synchronized void scheduleFileIngestTasks(IngestJobExecutor ingestPipeline, Collection<AbstractFile> files) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
Collection<AbstractFile> candidateFiles; Collection<AbstractFile> candidateFiles;
if (files.isEmpty()) { if (files.isEmpty()) {
@ -220,7 +220,7 @@ final class IngestTasksScheduler {
* processing by the pipeline's ingest modules. * processing by the pipeline's ingest modules.
* @param files A list of file object IDs for the streamed files. * @param files A list of file object IDs for the streamed files.
*/ */
synchronized void scheduleStreamedFileIngestTasks(IngestModulePipelines ingestPipeline, List<Long> fileIds) { synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor ingestPipeline, List<Long> fileIds) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
for (long id : fileIds) { for (long id : fileIds) {
/* /*
@ -252,7 +252,7 @@ final class IngestTasksScheduler {
* processing by the pipeline's ingest modules. * processing by the pipeline's ingest modules.
* @param files The files. * @param files The files.
*/ */
synchronized void fastTrackFileIngestTasks(IngestModulePipelines ingestPipeline, Collection<AbstractFile> files) { synchronized void fastTrackFileIngestTasks(IngestJobExecutor ingestPipeline, Collection<AbstractFile> files) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
/* /*
* Put the files directly into the queue for the file ingest * Put the files directly into the queue for the file ingest
@ -290,7 +290,7 @@ final class IngestTasksScheduler {
* target Content of the task to the pipeline for * target Content of the task to the pipeline for
* processing by the pipeline's ingest modules. * processing by the pipeline's ingest modules.
*/ */
synchronized void scheduleDataArtifactIngestTasks(IngestModulePipelines ingestPipeline) { synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard(); Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
try { try {
@ -318,7 +318,7 @@ final class IngestTasksScheduler {
* source; if empty, then all of the data artifacts * source; if empty, then all of the data artifacts
* from the data source will be scheduled. * from the data source will be scheduled.
*/ */
synchronized void scheduleDataArtifactIngestTasks(IngestModulePipelines ingestPipeline, List<DataArtifact> artifacts) { synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline, List<DataArtifact> artifacts) {
if (!ingestPipeline.isCancelled()) { if (!ingestPipeline.isCancelled()) {
for (DataArtifact artifact : artifacts) { for (DataArtifact artifact : artifacts) {
DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact); DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact);
@ -373,7 +373,7 @@ final class IngestTasksScheduler {
* *
* @return True or false. * @return True or false.
*/ */
synchronized boolean currentTasksAreCompleted(IngestModulePipelines ingestPipeline) { synchronized boolean currentTasksAreCompleted(IngestJobExecutor ingestPipeline) {
long pipelineId = ingestPipeline.getIngestJobId(); long pipelineId = ingestPipeline.getIngestJobId();
return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId) return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId)
|| hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId) || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId)
@ -402,7 +402,7 @@ final class IngestTasksScheduler {
* *
* @param ingestJobPipeline The ingest pipeline for the job. * @param ingestJobPipeline The ingest pipeline for the job.
*/ */
synchronized void cancelPendingFileTasksForIngestJob(IngestModulePipelines ingestJobPipeline) { synchronized void cancelPendingFileTasksForIngestJob(IngestJobExecutor ingestJobPipeline) {
long jobId = ingestJobPipeline.getIngestJobId(); long jobId = ingestJobPipeline.getIngestJobId();
removeTasksForJob(topLevelFileIngestTasksQueue, jobId); removeTasksForJob(topLevelFileIngestTasksQueue, jobId);
removeTasksForJob(batchedFileIngestTasksQueue, jobId); removeTasksForJob(batchedFileIngestTasksQueue, jobId);

View File

@ -1820,7 +1820,6 @@ class ExtractRegistry extends Extract {
*/ */
void createShellBagArtifacts(AbstractFile regFile, List<ShellBag> shellbags) throws TskCoreException { void createShellBagArtifacts(AbstractFile regFile, List<ShellBag> shellbags) throws TskCoreException {
List<BlackboardArtifact> artifacts = new ArrayList<>(); List<BlackboardArtifact> artifacts = new ArrayList<>();
List<DataArtifact> dataArtifacts = new ArrayList<>();
try { try {
for (ShellBag bag : shellbags) { for (ShellBag bag : shellbags) {
Collection<BlackboardAttribute> attributes = new ArrayList<>(); Collection<BlackboardAttribute> attributes = new ArrayList<>();
@ -1850,12 +1849,10 @@ class ExtractRegistry extends Extract {
BlackboardArtifact artifact = createArtifactWithAttributes(getShellBagArtifact(), regFile, attributes); BlackboardArtifact artifact = createArtifactWithAttributes(getShellBagArtifact(), regFile, attributes);
artifacts.add(artifact); artifacts.add(artifact);
dataArtifacts.add((DataArtifact)artifact);
} }
} finally { } finally {
if(!context.dataSourceIngestIsCancelled()) { if(!context.dataSourceIngestIsCancelled()) {
postArtifacts(artifacts); postArtifacts(artifacts);
context.addDataArtifactsToJob(dataArtifacts);
} }
} }
} }