diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java index 842088afce..bd256d05ed 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java @@ -26,7 +26,7 @@ import org.sleuthkit.datamodel.DataArtifact; * A pipeline of data artifact ingest modules used to execute data artifact * ingest tasks for an ingest job. */ -final class DataArtifactIngestPipeline extends IngestTaskPipeline { +final class DataArtifactIngestPipeline extends IngestPipeline { /** * Constructs a pipeline of data artifact ingest modules used to execute @@ -37,13 +37,13 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline moduleTemplates) { + DataArtifactIngestPipeline(IngestJobExecutor ingestJobPipeline, List moduleTemplates) { super(ingestJobPipeline, moduleTemplates); } @Override Optional> acceptModuleTemplate(IngestModuleTemplate template) { - Optional> module = Optional.empty(); + Optional> module = Optional.empty(); if (template.isDataArtifactIngestModuleTemplate()) { DataArtifactIngestModule ingestModule = template.createDataArtifactIngestModule(); module = Optional.of(new DataArtifactIngestPipelineModule(ingestModule, template.getModuleName())); @@ -52,18 +52,18 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline { + static final class DataArtifactIngestPipelineModule extends IngestPipeline.PipelineModule { private final DataArtifactIngestModule module; @@ -80,7 +80,7 @@ final class DataArtifactIngestPipeline extends IngestTaskPipeline { +final class DataSourceIngestPipeline extends IngestPipeline { private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName()); private static final IngestManager ingestManager = IngestManager.getInstance(); @@ -42,13 +42,13 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline moduleTemplates) { + DataSourceIngestPipeline(IngestJobExecutor ingestJobPipeline, List moduleTemplates) { super(ingestJobPipeline, moduleTemplates); } @Override - Optional> acceptModuleTemplate(IngestModuleTemplate template) { - Optional> module = Optional.empty(); + Optional> acceptModuleTemplate(IngestModuleTemplate template) { + Optional> module = Optional.empty(); if (template.isDataSourceIngestModuleTemplate()) { DataSourceIngestModule ingestModule = template.createDataSourceIngestModule(); module = Optional.of(new DataSourcePipelineModule(ingestModule, template.getModuleName())); @@ -69,7 +69,7 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline { + static final class DataSourcePipelineModule extends IngestPipeline.PipelineModule { private final DataSourceIngestModule module; @@ -83,7 +83,7 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline { +final class FileIngestPipeline extends IngestPipeline { private static final int FILE_BATCH_SIZE = 500; private static final String SAVE_RESULTS_ACTIVITY = Bundle.FileIngestPipeline_SaveResults_Activity(); private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName()); private static final IngestManager ingestManager = IngestManager.getInstance(); - private final IngestModulePipelines ingestJobPipeline; + private final IngestJobExecutor ingestJobPipeline; private final List fileBatch; /** @@ -56,15 +56,15 @@ final class FileIngestPipeline extends IngestTaskPipeline { * @param moduleTemplates The ingest module templates that define this * pipeline. */ - FileIngestPipeline(IngestModulePipelines ingestJobPipeline, List moduleTemplates) { + FileIngestPipeline(IngestJobExecutor ingestJobPipeline, List moduleTemplates) { super(ingestJobPipeline, moduleTemplates); this.ingestJobPipeline = ingestJobPipeline; fileBatch = new ArrayList<>(); } @Override - Optional> acceptModuleTemplate(IngestModuleTemplate template) { - Optional> module = Optional.empty(); + Optional> acceptModuleTemplate(IngestModuleTemplate template) { + Optional> module = Optional.empty(); if (template.isFileIngestModuleTemplate()) { FileIngestModule ingestModule = template.createFileIngestModule(); module = Optional.of(new FileIngestPipelineModule(ingestModule, template.getModuleName())); @@ -73,18 +73,18 @@ final class FileIngestPipeline extends IngestTaskPipeline { } @Override - void prepareForTask(FileIngestTask task) throws IngestTaskPipelineException { + void prepareForTask(FileIngestTask task) throws IngestPipelineException { } @Override - void cleanUpAfterTask(FileIngestTask task) throws IngestTaskPipelineException { + void cleanUpAfterTask(FileIngestTask task) throws IngestPipelineException { try { ingestManager.setIngestTaskProgress(task, SAVE_RESULTS_ACTIVITY); AbstractFile file = task.getFile(); file.close(); cacheFileForBatchUpdate(file); } catch (TskCoreException ex) { - throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS + throw new IngestPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS } finally { ingestManager.setIngestTaskProgressCompleted(task); } @@ -96,7 +96,7 @@ final class FileIngestPipeline extends IngestTaskPipeline { Date start = new Date(); try { updateBatchedFiles(); - } catch (IngestTaskPipelineException ex) { + } catch (IngestPipelineException ex) { errors.add(new IngestModuleError(SAVE_RESULTS_ACTIVITY, ex)); } Date finish = new Date(); @@ -113,9 +113,9 @@ final class FileIngestPipeline extends IngestTaskPipeline { * * @param file The file. * - * @throws IngestTaskPipelineException if the case database update fails. + * @throws IngestPipelineException if the case database update fails. */ - private void cacheFileForBatchUpdate(AbstractFile file) throws IngestTaskPipelineException { + private void cacheFileForBatchUpdate(AbstractFile file) throws IngestPipelineException { /* * Only one file ingest thread at a time will try to access the file * cache. The synchronization here is to ensure visibility of the files @@ -134,9 +134,9 @@ final class FileIngestPipeline extends IngestTaskPipeline { * Updates the case database with new properties added to the files in the * cache by the ingest modules that processed them. * - * @throws IngestTaskPipelineException if the case database update fails. + * @throws IngestPipelineException if the case database update fails. */ - private void updateBatchedFiles() throws IngestTaskPipelineException { + private void updateBatchedFiles() throws IngestPipelineException { /* * Only one file ingest thread at a time will try to access the file * cache. The synchronization here is to ensure visibility of the files @@ -166,7 +166,7 @@ final class FileIngestPipeline extends IngestTaskPipeline { logger.log(Level.SEVERE, "Error rolling back transaction after failure to save updated properties for cached files from tasks", ex1); } } - throw new IngestTaskPipelineException("Failed to save updated properties for cached files from tasks", ex); //NON-NLS + throw new IngestPipelineException("Failed to save updated properties for cached files from tasks", ex); //NON-NLS } finally { fileBatch.clear(); } @@ -177,7 +177,7 @@ final class FileIngestPipeline extends IngestTaskPipeline { * A wrapper that adds ingest infrastructure operations to a file ingest * module. */ - static final class FileIngestPipelineModule extends IngestTaskPipeline.PipelineModule { + static final class FileIngestPipelineModule extends IngestPipeline.PipelineModule { private final FileIngestModule module; @@ -195,7 +195,7 @@ final class FileIngestPipeline extends IngestTaskPipeline { } @Override - void executeTask(IngestModulePipelines ingestJobPipeline, FileIngestTask task) throws IngestModuleException { + void process(IngestJobExecutor ingestJobPipeline, FileIngestTask task) throws IngestModuleException { AbstractFile file = null; try { file = task.getFile(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java index e411045be7..1e43cd1dae 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java @@ -40,7 +40,7 @@ final class FileIngestTask extends IngestTask { * task. * @param file The file to be processed. */ - FileIngestTask(IngestModulePipelines ingestJobPipeline, AbstractFile file) { + FileIngestTask(IngestJobExecutor ingestJobPipeline, AbstractFile file) { super(ingestJobPipeline); this.file = file; fileId = file.getId(); @@ -56,7 +56,7 @@ final class FileIngestTask extends IngestTask { * task. * @param fileId The object ID of the file to be processed. */ - FileIngestTask(IngestModulePipelines ingestJobPipeline, long fileId) { + FileIngestTask(IngestJobExecutor ingestJobPipeline, long fileId) { super(ingestJobPipeline); this.fileId = fileId; } @@ -100,8 +100,8 @@ final class FileIngestTask extends IngestTask { return false; } FileIngestTask other = (FileIngestTask) obj; - IngestModulePipelines thisPipeline = getIngestJobPipeline(); - IngestModulePipelines otherPipeline = other.getIngestJobPipeline(); + IngestJobExecutor thisPipeline = getIngestJobPipeline(); + IngestJobExecutor otherPipeline = other.getIngestJobPipeline(); if (thisPipeline != otherPipeline && (thisPipeline == null || !thisPipeline.equals(otherPipeline))) { return false; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 96a072569a..cd3c616320 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -73,7 +73,7 @@ public final class IngestJob { private final List files = new ArrayList<>(); private final Mode ingestMode; private final IngestJobSettings settings; - private volatile IngestModulePipelines ingestModulePipelines; + private volatile IngestJobExecutor ingestModulePipelines; private volatile CancellationReason cancellationReason; /** @@ -181,7 +181,7 @@ public final class IngestJob { return Collections.emptyList(); } - ingestModulePipelines = new IngestModulePipelines(this, dataSource, files, settings); + ingestModulePipelines = new IngestJobExecutor(this, dataSource, files, settings); List errors = new ArrayList<>(); errors.addAll(ingestModulePipelines.startUp()); if (errors.isEmpty()) { @@ -507,7 +507,7 @@ public final class IngestJob { */ public static class DataSourceIngestModuleHandle { - private final IngestModulePipelines ingestJobPipeline; + private final IngestJobExecutor ingestJobPipeline; private final DataSourceIngestPipeline.DataSourcePipelineModule module; private final boolean cancelled; @@ -520,7 +520,7 @@ public final class IngestJob { * source level ingest module. * @param module The data source level ingest module. */ - private DataSourceIngestModuleHandle(IngestModulePipelines ingestJobPipeline, DataSourceIngestPipeline.DataSourcePipelineModule module) { + private DataSourceIngestModuleHandle(IngestJobExecutor ingestJobPipeline, DataSourceIngestPipeline.DataSourcePipelineModule module) { this.ingestJobPipeline = ingestJobPipeline; this.module = module; this.cancelled = ingestJobPipeline.currentDataSourceIngestModuleIsCancelled(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index 0cf3981c3a..950e70084f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -29,7 +29,7 @@ import org.sleuthkit.datamodel.DataArtifact; */ public final class IngestJobContext { - private final IngestModulePipelines ingestJobPipeline; + private final IngestJobExecutor ingestJobPipeline; /** * Constructs an ingest job context object that provides an ingest module @@ -37,7 +37,7 @@ public final class IngestJobContext { * * @param ingestJobPipeline The ingest pipeline for the job. */ - IngestJobContext(IngestModulePipelines ingestJobPipeline) { + IngestJobContext(IngestJobExecutor ingestJobPipeline) { this.ingestJobPipeline = ingestJobPipeline; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestModulePipelines.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java similarity index 97% rename from Core/src/org/sleuthkit/autopsy/ingest/IngestModulePipelines.java rename to Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java index 4798f132c0..b25506cc30 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestModulePipelines.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java @@ -59,10 +59,10 @@ import org.sleuthkit.datamodel.DataSource; * Manages the construction, start up, execution, and shut down of the ingest * module pipelines for an ingest job. */ -final class IngestModulePipelines { +final class IngestJobExecutor { private static final String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy"; - private static final Logger logger = Logger.getLogger(IngestModulePipelines.class.getName()); + private static final Logger logger = Logger.getLogger(IngestJobExecutor.class.getName()); /* * A regular expression for identifying the proxy classes Jython generates @@ -159,7 +159,7 @@ final class IngestModulePipelines { * So the stage transition lock is used not to guard the stage field, but to * coordinate stage transitions. */ - private volatile IngestJobStages stage = IngestModulePipelines.IngestJobStages.PIPELINES_START_UP; + private volatile IngestJobStages stage = IngestJobExecutor.IngestJobStages.PIPELINES_START_UP; private final Object stageTransitionLock = new Object(); /* @@ -238,7 +238,7 @@ final class IngestModulePipelines { * @throws InterruptedException Exception thrown if the thread in which the * pipeline is being created is interrupted. */ - IngestModulePipelines(IngestJob ingestJob, Content dataSource, List files, IngestJobSettings settings) throws InterruptedException { + IngestJobExecutor(IngestJob ingestJob, Content dataSource, List files, IngestJobSettings settings) throws InterruptedException { if (!(dataSource instanceof DataSource)) { throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS } @@ -594,7 +594,7 @@ final class IngestModulePipelines { * * @return A list of ingest module startup errors, empty on success. */ - private List startUpIngestModulePipeline(IngestTaskPipeline pipeline) { + private List startUpIngestModulePipeline(IngestPipeline pipeline) { List startUpErrors = pipeline.startUp(); if (!startUpErrors.isEmpty()) { List shutDownErrors = pipeline.shutDown(); @@ -776,7 +776,7 @@ final class IngestModulePipelines { void startStreamingModeDataSrcAnalysis() { synchronized (stageTransitionLock) { logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS - stage = IngestModulePipelines.IngestJobStages.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; + stage = IngestJobExecutor.IngestJobStages.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; if (hasFileIngestModules()) { @@ -809,7 +809,7 @@ final class IngestModulePipelines { currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; if (hasHighPriorityDataSourceIngestModules()) { - IngestModulePipelines.taskScheduler.scheduleDataSourceIngestTask(this); + IngestJobExecutor.taskScheduler.scheduleDataSourceIngestTask(this); } else { /* * If no data source level ingest task is scheduled at this time @@ -831,7 +831,7 @@ final class IngestModulePipelines { synchronized (stageTransitionLock) { if (hasLowPriorityDataSourceIngestModules()) { logInfoMessage(String.format("Starting low priority data source analysis for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), ingestJob.getId())); //NON-NLS - stage = IngestModulePipelines.IngestJobStages.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; + stage = IngestJobExecutor.IngestJobStages.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; if (usingNetBeansGUI) { startDataSourceIngestProgressBar(); @@ -857,7 +857,7 @@ final class IngestModulePipelines { artifactIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); + IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); return true; } }); @@ -891,12 +891,12 @@ final class IngestModulePipelines { * ingest job. */ DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel(); - String dialogTitle = NbBundle.getMessage(IngestModulePipelines.this.getClass(), "IngestJob.cancellationDialog.title"); + String dialogTitle = NbBundle.getMessage(IngestJobExecutor.this.getClass(), "IngestJob.cancellationDialog.title"); JOptionPane.showConfirmDialog(WindowManager.getDefault().getMainWindow(), panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE); if (panel.cancelAllDataSourceIngestModules()) { - IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); + IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); } else { - IngestModulePipelines.this.cancelCurrentDataSourceIngestModule(); + IngestJobExecutor.this.cancelCurrentDataSourceIngestModule(); } return true; } @@ -921,7 +921,7 @@ final class IngestModulePipelines { fileIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - IngestModulePipelines.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); + IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); return true; } }); @@ -998,7 +998,7 @@ final class IngestModulePipelines { private void shutDown() { synchronized (stageTransitionLock) { logInfoMessage("Finished all tasks"); //NON-NLS - stage = IngestModulePipelines.IngestJobStages.PIPELINES_SHUT_DOWN; + stage = IngestJobExecutor.IngestJobStages.PIPELINES_SHUT_DOWN; shutDownIngestModulePipeline(currentDataSourceIngestPipeline); shutDownIngestModulePipeline(artifactIngestPipeline); @@ -1056,7 +1056,7 @@ final class IngestModulePipelines { * * @param pipeline The pipeline. */ - private void shutDownIngestModulePipeline(IngestTaskPipeline pipeline) { + private void shutDownIngestModulePipeline(IngestPipeline pipeline) { if (pipeline.isRunning()) { List errors = new ArrayList<>(); errors.addAll(pipeline.shutDown()); @@ -1076,7 +1076,7 @@ final class IngestModulePipelines { try { if (!isCancelled()) { List errors = new ArrayList<>(); - errors.addAll(currentDataSourceIngestPipeline.executeTask(task)); + errors.addAll(currentDataSourceIngestPipeline.performTask(task)); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } @@ -1130,7 +1130,7 @@ final class IngestModulePipelines { * Run the file through the modules in the pipeline. */ List errors = new ArrayList<>(); - errors.addAll(pipeline.executeTask(task)); + errors.addAll(pipeline.performTask(task)); if (!errors.isEmpty()) { logIngestModuleErrors(errors, file); } @@ -1171,7 +1171,7 @@ final class IngestModulePipelines { try { if (!isCancelled() && !artifactIngestPipeline.isEmpty()) { List errors = new ArrayList<>(); - errors.addAll(artifactIngestPipeline.executeTask(task)); + errors.addAll(artifactIngestPipeline.performTask(task)); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } @@ -1191,7 +1191,7 @@ final class IngestModulePipelines { void addStreamedFiles(List fileObjIds) { if (hasFileIngestModules()) { if (stage.equals(IngestJobStages.STREAMED_FILE_ANALYSIS_ONLY)) { - IngestModulePipelines.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); + IngestJobExecutor.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); } else { logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); } @@ -1412,7 +1412,7 @@ final class IngestModulePipelines { void cancel(IngestJob.CancellationReason reason) { jobCancelled = true; cancellationReason = reason; - IngestModulePipelines.taskScheduler.cancelPendingFileTasksForIngestJob(this); + IngestJobExecutor.taskScheduler.cancelPendingFileTasksForIngestJob(this); if (usingNetBeansGUI) { synchronized (dataSourceIngestProgressLock) { diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestPipeline.java similarity index 68% rename from Core/src/org/sleuthkit/autopsy/ingest/IngestTaskPipeline.java rename to Core/src/org/sleuthkit/autopsy/ingest/IngestPipeline.java index f12228b680..c7bbf03608 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTaskPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestPipeline.java @@ -33,21 +33,24 @@ import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; /** - * An abstract superclass for pipelines of ingest modules that execute ingest - * tasks for an ingest job. Subclasses need to extend this class and to - * implement a specialization of the inner PipelineModule abstract superclass. + * An abstract superclass for pipelines of ingest modules that perform the + * ingest tasks that make up an ingest job. A pipeline performs a task by + * passing it sequentially to the process() method of each module in the + * pipeline. * - * NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use - * by one thread at a time. There are a few status fields that are volatile to - * ensure visibility to threads making ingest progress snapshots, but methods - * such as startUp(), executeTask() and shutDown() are not synchronized. - * - * @param The ingest task type. + * @param The type of ingest tasks the pipeline performs. */ -abstract class IngestTaskPipeline { +abstract class IngestPipeline { - private static final Logger logger = Logger.getLogger(IngestTaskPipeline.class.getName()); - private final IngestModulePipelines ingestJobPipeline; + /* + * NOTE ON MULTI-THREADING POLICY: This class is primarily designed for use + * by one thread at a time. There are a few status fields that are volatile + * to ensure visibility to threads making ingest progress snapshots, but + * methods such as startUp(), performTask() and shutDown() are not + * synchronized. + */ + private static final Logger logger = Logger.getLogger(IngestPipeline.class.getName()); + private final IngestJobExecutor ingestJobExecutor; private final List moduleTemplates; private final List> modules; private volatile Date startTime; @@ -56,38 +59,34 @@ abstract class IngestTaskPipeline { /** * Constructs the superclass part of a pipeline of ingest modules that - * executes ingest tasks for an ingest job. + * performs ingest tasks for an ingest job. * - * @param ingestPipeline The parent ingest job pipeline for this ingest - * task pipeline. - * @param moduleTemplates The ingest module templates that define this - * ingest task pipeline. May be an empty list. + * @param ingestJobExecutor The ingest job executor that owns this pipeline. + * @param moduleTemplates The ingest module templates to be used to + * construct the ingest modules for this pipeline. + * May be an empty list if this type of pipeline is + * not needed for the ingest job. */ - IngestTaskPipeline(IngestModulePipelines ingestPipeline, List moduleTemplates) { - this.ingestJobPipeline = ingestPipeline; - /* - * The creation of ingest modules from the ingest module templates has - * been deliberately deferred to the startUp() method so that any and - * all errors in module construction or start up can be reported to the - * client code. - */ + IngestPipeline(IngestJobExecutor ingestJobExecutor, List moduleTemplates) { + this.ingestJobExecutor = ingestJobExecutor; this.moduleTemplates = moduleTemplates; modules = new ArrayList<>(); } /** - * Indicates whether or not there are any ingest modules in this ingest task + * Indicates whether or not there are any ingest modules in this ingest * pipeline. * - * @return True or false. + * @return True or false; always true before startUp() is called. */ boolean isEmpty() { return modules.isEmpty(); } /** - * Queries whether or not this ingest task pipeline is running, i.e., the - * startUp() method has been called and the shutDown() has not been called. + * Queries whether or not this ingest pipeline is running, i.e., the + * startUp() method has been called and the shutDown() method has not been + * called yet. * * @return True or false. */ @@ -96,8 +95,8 @@ abstract class IngestTaskPipeline { } /** - * Starts up this ingest task pipeline by calling the startUp() methods of - * the ingest modules in the pipeline. + * Starts up this ingest pipeline by calling the startUp() methods of the + * ingest modules in the pipeline. * * @return A list of ingest module start up errors, possibly empty. */ @@ -110,21 +109,19 @@ abstract class IngestTaskPipeline { * any and all errors in module construction or start up can be * reported to the client code. */ - createIngestModules(moduleTemplates); + createIngestModules(); errors.addAll(startUpIngestModules()); } else { - errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline already started"))); //NON-NLS + errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline already started"))); //NON-NLS } return errors; } /** - * Creates the ingest modules for this ingest task pipeline from the given - * ingest module templates. - * - * @param moduleTemplates The ingest module templates. + * Creates the ingest modules for this ingest pipeline using its ingest + * module templates. */ - private void createIngestModules(List moduleTemplates) { + private void createIngestModules() { if (modules.isEmpty()) { for (IngestModuleTemplate template : moduleTemplates) { Optional> module = acceptModuleTemplate(template); @@ -137,8 +134,8 @@ abstract class IngestTaskPipeline { /** * Determines if one of the types of ingest modules that can be created from - * a given ingest module template should be added to this ingest task - * pipeline. If so, the ingest module is created and returned. + * a given ingest module template should be added to this ingest pipeline. + * If so, the ingest module is created and returned. * * @param template The ingest module template to be used or ignored, as * appropriate to the pipeline type. @@ -149,7 +146,7 @@ abstract class IngestTaskPipeline { abstract Optional> acceptModuleTemplate(IngestModuleTemplate template); /** - * Starts up the ingest modules in this ingest task pipeline. + * Starts up the ingest modules in this ingest pipeline. * * @return A list of ingest module start up errors, possibly empty. */ @@ -159,7 +156,7 @@ abstract class IngestTaskPipeline { running = true; for (PipelineModule module : modules) { try { - module.startUp(new IngestJobContext(ingestJobPipeline)); + module.startUp(new IngestJobContext(ingestJobExecutor)); } catch (Throwable ex) { /* * A catch-all exception firewall. Start up errors for all of @@ -174,10 +171,10 @@ abstract class IngestTaskPipeline { } /** - * Returns the start up time of this ingest task pipeline. + * Returns the start up time of this ingest pipeline. * - * @return The file processing start time, may be null if this pipeline has - * not been started yet. + * @return The start up time, may be null if this pipeline has not been + * started yet. */ Date getStartTime() { Date reportedStartTime = null; @@ -188,65 +185,66 @@ abstract class IngestTaskPipeline { } /** - * Executes an ingest task by calling the process() methods of the ingest - * modules in this ingest task pipeline. + * Performs an ingest task by sequentially calling the process() methods of + * the ingest modules in this ingest pipeline. * * @param task The task. * - * @return A list of ingest module task processing errors, possibly empty. + * @return A list of ingest module processing errors, possibly empty. */ - List executeTask(T task) { + List performTask(T task) { List errors = new ArrayList<>(); if (running) { - if (!ingestJobPipeline.isCancelled()) { + if (!ingestJobExecutor.isCancelled()) { pauseIfScheduled(); - if (ingestJobPipeline.isCancelled()) { + if (ingestJobExecutor.isCancelled()) { return errors; } try { prepareForTask(task); - } catch (IngestTaskPipelineException ex) { + } catch (IngestPipelineException ex) { errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS return errors; } for (PipelineModule module : modules) { pauseIfScheduled(); - if (ingestJobPipeline.isCancelled()) { + if (ingestJobExecutor.isCancelled()) { break; } try { currentModule = module; currentModule.setProcessingStartTime(); - module.executeTask(ingestJobPipeline, task); - } catch (Throwable ex) { + module.process(ingestJobExecutor, task); + } catch (Throwable ex) { // Catch-all exception firewall /* - * A catch-all exception firewall. Note that a runtime - * exception from a single module does not stop + * Note that an exception from a module does not stop * processing of the task by the other modules in the * pipeline. */ errors.add(new IngestModuleError(module.getDisplayName(), ex)); } - if (ingestJobPipeline.isCancelled()) { + if (ingestJobExecutor.isCancelled()) { break; } } } try { cleanUpAfterTask(task); - } catch (IngestTaskPipelineException ex) { + } catch (IngestPipelineException ex) { errors.add(new IngestModuleError("Ingest Task Pipeline", ex)); //NON-NLS } } else { - errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestTaskPipelineException("Pipeline not started or shut down"))); //NON-NLS + errors.add(new IngestModuleError("Ingest Task Pipeline", new IngestPipelineException("Pipeline not started or shut down"))); //NON-NLS } currentModule = null; return errors; } /** - * Pauses task execution if ingest has been configured to be paused weekly - * at a specified time for a specified duration. + * Pauses this pipeline if ingest has been configured to be paused weekly at + * a specified time, for a specified duration. A pipeline can only be paused + * between calls to module process() methods, i.e., the individual modules + * themselves cannot be paused in the middle of processing a task. */ private void pauseIfScheduled() { if (ScheduledIngestPauseSettings.getPauseEnabled() == true) { @@ -278,7 +276,7 @@ abstract class IngestTaskPipeline { */ LocalDateTime timeNow = LocalDateTime.now(); if ((timeNow.equals(pauseStart) || timeNow.isAfter(pauseStart)) && timeNow.isBefore(pauseEnd)) { - ingestJobPipeline.registerPausedIngestThread(Thread.currentThread()); + ingestJobExecutor.registerPausedIngestThread(Thread.currentThread()); try { long timeRemainingMillis = ChronoUnit.MILLIS.between(timeNow, pauseEnd); logger.log(Level.INFO, String.format("%s pausing at %s for ~%d minutes", Thread.currentThread().getName(), LocalDateTime.now(), TimeUnit.MILLISECONDS.toMinutes(timeRemainingMillis))); @@ -287,27 +285,27 @@ abstract class IngestTaskPipeline { } catch (InterruptedException notLogged) { logger.log(Level.INFO, String.format("%s resuming at %s due to sleep interrupt (ingest job canceled)", Thread.currentThread().getName(), LocalDateTime.now())); } finally { - ingestJobPipeline.unregisterPausedIngestThread(Thread.currentThread()); + ingestJobExecutor.unregisterPausedIngestThread(Thread.currentThread()); } } } } /** - * Does any task type specific preparation required before executing an + * Does any task-type-specific preparation required before performing an * ingest task. * * @param task The task. * - * @throws IngestTaskPipelineException Thrown if there is an error preparing - * to execute the task. + * @throws IngestPipelineException Thrown if there is an error preparing to + * perform the task. */ - abstract void prepareForTask(T task) throws IngestTaskPipelineException; + abstract void prepareForTask(T task) throws IngestPipelineException; /** * Gets the currently running ingest module. * - * @return The module, possibly null if no module is currently running. + * @return The module, possibly null, if no module is currently running. */ PipelineModule getCurrentlyRunningModule() { return currentModule; @@ -345,22 +343,22 @@ abstract class IngestTaskPipeline { } /** - * Does any task type specific clean up required after executing an ingest + * Does any task-type-specific clean up required after performing an ingest * task. * * @param task The task. * - * @throws IngestTaskPipelineException Thrown if there is an error cleaning - * up after performing the task. + * @throws IngestPipelineException Thrown if there is an error cleaning up + * after performing the task. */ - abstract void cleanUpAfterTask(T task) throws IngestTaskPipelineException; + abstract void cleanUpAfterTask(T task) throws IngestPipelineException; /** - * An abstract superclass for a decorator that adds ingest infrastructure - * operations to an ingest module. - * - * IMPORTANT: Subclasses of IngestTaskPipeline need to implement a - * specialization this class + * An abstract superclass for an ingest module decorator that adds ingest + * infrastructure operations to the process() method of an ingest module. + * Subclasses of IngestPipeline need to provide a concrete implementation of + * this class that provides the additional operations that the pipeline + * requires. */ static abstract class PipelineModule implements IngestModule { @@ -369,8 +367,9 @@ abstract class IngestTaskPipeline { private volatile Date processingStartTime; /** - * Constructs an instance of an abstract superclass for a decorator that - * adds ingest infrastructure operations to an ingest module. + * Constructs an instance of an abstract superclass for an ingest module + * decorator that adds ingest infrastructure operations to the process() + * method of an ingest module. * * @param module The ingest module to be wrapped. * @param displayName The display name for the module. @@ -378,7 +377,7 @@ abstract class IngestTaskPipeline { PipelineModule(IngestModule module, String displayName) { this.module = module; this.displayName = displayName; - this.processingStartTime = new Date(); + processingStartTime = new Date(); } /** @@ -410,8 +409,8 @@ abstract class IngestTaskPipeline { /** * Gets the the processing start time for the decorated module. * - * @return The start time, will be null if the module has not started - * processing the data source yet. + * @return The start time, not valid if setProcessingStartTime() has not + * been called first. */ Date getProcessingStartTime() { return new Date(processingStartTime.getTime()); @@ -423,17 +422,17 @@ abstract class IngestTaskPipeline { } /** - * Executes an ingest task using the process() method of the decorated + * Performs an ingest task using the process() method of the decorated * module. * - * @param ingestJobPipeline The ingest job pipeline that owns the ingest - * task pipeline this module belongs to. - * @param task The task to execute. + * @param ingestJobExecutor The ingest job executor that owns the ingest + * pipeline to which this module belongs. + * @param task The task to perform. * * @throws IngestModuleException Exception thrown if there is an error * performing the task. */ - abstract void executeTask(IngestModulePipelines ingestJobPipeline, T task) throws IngestModuleException; + abstract void process(IngestJobExecutor ingestJobExecutor, T task) throws IngestModuleException; @Override public void shutDown() { @@ -443,28 +442,28 @@ abstract class IngestTaskPipeline { } /** - * An exception thrown by an ingest task pipeline. + * An exception thrown by an ingest pipeline. */ - public static class IngestTaskPipelineException extends Exception { + static class IngestPipelineException extends Exception { private static final long serialVersionUID = 1L; /** - * Constructs an exception to be thrown by an ingest task pipeline. + * Constructs an exception to be thrown by an ingest pipeline. * * @param message The exception message. */ - public IngestTaskPipelineException(String message) { + IngestPipelineException(String message) { super(message); } /** - * Constructs an exception to be thrown by an ingest task pipeline. + * Constructs an exception to be thrown by an ingest pipeline. * * @param message The exception message. * @param cause The exception cause. */ - public IngestTaskPipelineException(String message, Throwable cause) { + IngestPipelineException(String message, Throwable cause) { super(message, cause); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java index 815ca8cc99..cb4f398c98 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java @@ -29,7 +29,7 @@ import org.sleuthkit.datamodel.Content; abstract class IngestTask { private final static long NOT_SET = Long.MIN_VALUE; - private final IngestModulePipelines ingestJobPipeline; + private final IngestJobExecutor ingestJobPipeline; private long threadId; /** @@ -41,7 +41,7 @@ abstract class IngestTask { * @param ingestJobPipeline The ingest job pipeline to use to execute the * task. */ - IngestTask(IngestModulePipelines ingestJobPipeline) { + IngestTask(IngestJobExecutor ingestJobPipeline) { this.ingestJobPipeline = ingestJobPipeline; threadId = NOT_SET; } @@ -51,7 +51,7 @@ abstract class IngestTask { * * @return The ingest job pipeline. */ - IngestModulePipelines getIngestJobPipeline() { + IngestJobExecutor getIngestJobPipeline() { return ingestJobPipeline; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index 60bbdd0ab7..da3dc99456 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -138,7 +138,7 @@ final class IngestTasksScheduler { * task to the pipeline for processing by the * pipeline's ingest modules. */ - synchronized void scheduleIngestTasks(IngestModulePipelines ingestPipeline) { + synchronized void scheduleIngestTasks(IngestJobExecutor ingestPipeline) { if (!ingestPipeline.isCancelled()) { if (ingestPipeline.hasDataSourceIngestModules()) { scheduleDataSourceIngestTask(ingestPipeline); @@ -163,7 +163,7 @@ final class IngestTasksScheduler { * task to the pipeline for processing by the * pipeline's ingest modules. */ - synchronized void scheduleDataSourceIngestTask(IngestModulePipelines ingestPipeline) { + synchronized void scheduleDataSourceIngestTask(IngestJobExecutor ingestPipeline) { if (!ingestPipeline.isCancelled()) { DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline); try { @@ -190,7 +190,7 @@ final class IngestTasksScheduler { * empty, then all if the files from the data source * are candidates for scheduling. */ - synchronized void scheduleFileIngestTasks(IngestModulePipelines ingestPipeline, Collection files) { + synchronized void scheduleFileIngestTasks(IngestJobExecutor ingestPipeline, Collection files) { if (!ingestPipeline.isCancelled()) { Collection candidateFiles; if (files.isEmpty()) { @@ -220,7 +220,7 @@ final class IngestTasksScheduler { * processing by the pipeline's ingest modules. * @param files A list of file object IDs for the streamed files. */ - synchronized void scheduleStreamedFileIngestTasks(IngestModulePipelines ingestPipeline, List fileIds) { + synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor ingestPipeline, List fileIds) { if (!ingestPipeline.isCancelled()) { for (long id : fileIds) { /* @@ -252,7 +252,7 @@ final class IngestTasksScheduler { * processing by the pipeline's ingest modules. * @param files The files. */ - synchronized void fastTrackFileIngestTasks(IngestModulePipelines ingestPipeline, Collection files) { + synchronized void fastTrackFileIngestTasks(IngestJobExecutor ingestPipeline, Collection files) { if (!ingestPipeline.isCancelled()) { /* * Put the files directly into the queue for the file ingest @@ -290,7 +290,7 @@ final class IngestTasksScheduler { * target Content of the task to the pipeline for * processing by the pipeline's ingest modules. */ - synchronized void scheduleDataArtifactIngestTasks(IngestModulePipelines ingestPipeline) { + synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline) { if (!ingestPipeline.isCancelled()) { Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard(); try { @@ -318,7 +318,7 @@ final class IngestTasksScheduler { * source; if empty, then all of the data artifacts * from the data source will be scheduled. */ - synchronized void scheduleDataArtifactIngestTasks(IngestModulePipelines ingestPipeline, List artifacts) { + synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline, List artifacts) { if (!ingestPipeline.isCancelled()) { for (DataArtifact artifact : artifacts) { DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact); @@ -373,7 +373,7 @@ final class IngestTasksScheduler { * * @return True or false. */ - synchronized boolean currentTasksAreCompleted(IngestModulePipelines ingestPipeline) { + synchronized boolean currentTasksAreCompleted(IngestJobExecutor ingestPipeline) { long pipelineId = ingestPipeline.getIngestJobId(); return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId) || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId) @@ -402,7 +402,7 @@ final class IngestTasksScheduler { * * @param ingestJobPipeline The ingest pipeline for the job. */ - synchronized void cancelPendingFileTasksForIngestJob(IngestModulePipelines ingestJobPipeline) { + synchronized void cancelPendingFileTasksForIngestJob(IngestJobExecutor ingestJobPipeline) { long jobId = ingestJobPipeline.getIngestJobId(); removeTasksForJob(topLevelFileIngestTasksQueue, jobId); removeTasksForJob(batchedFileIngestTasksQueue, jobId); diff --git a/RecentActivity/src/org/sleuthkit/autopsy/recentactivity/ExtractRegistry.java b/RecentActivity/src/org/sleuthkit/autopsy/recentactivity/ExtractRegistry.java index 83a07cf76c..fccd4e63f6 100644 --- a/RecentActivity/src/org/sleuthkit/autopsy/recentactivity/ExtractRegistry.java +++ b/RecentActivity/src/org/sleuthkit/autopsy/recentactivity/ExtractRegistry.java @@ -1820,7 +1820,6 @@ class ExtractRegistry extends Extract { */ void createShellBagArtifacts(AbstractFile regFile, List shellbags) throws TskCoreException { List artifacts = new ArrayList<>(); - List dataArtifacts = new ArrayList<>(); try { for (ShellBag bag : shellbags) { Collection attributes = new ArrayList<>(); @@ -1850,12 +1849,10 @@ class ExtractRegistry extends Extract { BlackboardArtifact artifact = createArtifactWithAttributes(getShellBagArtifact(), regFile, attributes); artifacts.add(artifact); - dataArtifacts.add((DataArtifact)artifact); } } finally { if(!context.dataSourceIngestIsCancelled()) { postArtifacts(artifacts); - context.addDataArtifactsToJob(dataArtifacts); } } }