diff --git a/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestModule.java b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestModule.java new file mode 100755 index 0000000000..6bb475b0ad --- /dev/null +++ b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestModule.java @@ -0,0 +1,46 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2021-2021 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.ingest; + +import org.sleuthkit.datamodel.AnalysisResult; + +/** + * Interface that must be implemented by all ingest modules that process + * analysis results. + */ +public interface AnalysisResultIngestModule extends IngestModule { + + /** + * Processes an analysis result. + * + * IMPORTANT: In addition to returning ProcessResult.OK or + * ProcessResult.ERROR, modules should log all errors using methods provided + * by the org.sleuthkit.autopsy.coreutils.Logger class. Log messages should + * include the name and object ID of the data being processed. If an + * exception has been caught by the module, the exception should be sent to + * the Logger along with the log message so that a stack trace will appear + * in the application log. + * + * @param result The analysis result to process. + * + * @return A result code indicating success or failure of the processing. + */ + ProcessResult process(AnalysisResult result); + +} diff --git a/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestPipeline.java new file mode 100755 index 0000000000..92497922ba --- /dev/null +++ b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestPipeline.java @@ -0,0 +1,93 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2021-2021 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.ingest; + +import java.util.List; +import java.util.Optional; +import org.sleuthkit.datamodel.AnalysisResult; + +/** + * A pipeline of analysis result ingest modules used to perform analysis result + * ingest tasks for an ingest job. + */ +public class AnalysisResultIngestPipeline extends IngestPipeline { + + /** + * Constructs a pipeline of analysis result ingest modules used to perform + * analysis result ingest tasks for an ingest job. + * + * @param ingestJobExecutor The ingest job executor for this pipeline. + * @param moduleTemplates The ingest module templates to be used to + * construct the ingest modules for this pipeline. + * May be an empty list if this type of pipeline is + * not needed for the ingest job. + */ + AnalysisResultIngestPipeline(IngestJobExecutor ingestJobExecutor, List moduleTemplates) { + super(ingestJobExecutor, moduleTemplates); + } + + @Override + Optional> acceptModuleTemplate(IngestModuleTemplate template) { + Optional> module = Optional.empty(); + if (template.isAnalysisResultIngestModuleTemplate()) { + AnalysisResultIngestModule ingestModule = template.createAnalysisResultIngestModule(); + module = Optional.of(new AnalysisResultIngestPipelineModule(ingestModule, template.getModuleName())); + } + return module; + } + + @Override + void prepareForTask(AnalysisResultIngestTask task) throws IngestPipelineException { + } + + @Override + void cleanUpAfterTask(AnalysisResultIngestTask task) throws IngestPipelineException { + IngestManager.getInstance().setIngestTaskProgressCompleted(task); + } + + /** + * A decorator that adds ingest infrastructure operations to an analysis + * result ingest module. + */ + static final class AnalysisResultIngestPipelineModule extends IngestPipeline.PipelineModule { + + private final AnalysisResultIngestModule module; + + /** + * Constructs a decorator that adds ingest infrastructure operations to + * an analysis result ingest module. + * + * @param module The module. + * @param displayName The display name of the module. + */ + AnalysisResultIngestPipelineModule(AnalysisResultIngestModule module, String displayName) { + super(module, displayName); + this.module = module; + } + + @Override + void process(IngestJobExecutor ingestJobExecutor, AnalysisResultIngestTask task) throws IngestModule.IngestModuleException { + AnalysisResult result = task.getAnalysisResult(); + IngestManager.getInstance().setIngestTaskProgress(task, getDisplayName()); + module.process(result); + } + + } + +} diff --git a/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestTask.java new file mode 100755 index 0000000000..99f3412c24 --- /dev/null +++ b/Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestTask.java @@ -0,0 +1,59 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2021-2021 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.ingest; + +import org.sleuthkit.datamodel.AnalysisResult; + +/** + * An analysis result ingest task that will be executed by an ingest thread + * using a given ingest job executor. + */ +final class AnalysisResultIngestTask extends IngestTask { + + private final AnalysisResult analysisResult; + + /** + * Constructs an analysis result ingest task that will be executed by an + * ingest thread using a given ingest job executor. + * + * @param ingestJobExecutor The ingest job executor to use to execute the + * task. + * @param analysisResult The analysis result to be processed. + */ + AnalysisResultIngestTask(IngestJobExecutor ingestJobExecutor, AnalysisResult analysisResult) { + super(analysisResult.getName(), ingestJobExecutor); + this.analysisResult = analysisResult; + } + + /** + * Gets the analysis result for this task. + * + * @return The analysis result. + */ + AnalysisResult getAnalysisResult() { + return analysisResult; + } + + @Override + void execute(long threadId) { + super.setThreadId(threadId); + getIngestJobExecutor().execute(this); + } + +} diff --git a/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties b/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties index cda1ec1503..4069a2c7f2 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties +++ b/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties @@ -93,6 +93,7 @@ IngestJobTableModel.colName.rootQueued=Roots Queued IngestJobTableModel.colName.streamingQueued=Streamed Files Queued IngestJobTableModel.colName.dsQueued=DS Queued IngestJobTableModel.colName.artifactsQueued=Artifacts Queued +IngestJobTableModel.colName.resultsQueued=Results Queued ModuleTableModel.colName.module=Module ModuleTableModel.colName.duration=Duration IngestJobSettingsPanel.jButtonSelectAll.text=Select All diff --git a/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties-MERGED b/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties-MERGED index 11fbd0a9d8..9de7eedde0 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties-MERGED +++ b/Core/src/org/sleuthkit/autopsy/ingest/Bundle.properties-MERGED @@ -1,5 +1,7 @@ CTL_RunIngestAction=Run Ingest FileIngestPipeline_SaveResults_Activity=Saving Results +# {0} - data source name +IngestJob_progress_analysisResultIngest_displayName=Analyzing analysis results from {0} IngestJobSettingsPanel.IngestModulesTableRenderer.info.message=A previous version of this ingest module has been run before on this data source. IngestJobSettingsPanel.IngestModulesTableRenderer.warning.message=This ingest module has been run before on this data source. IngestJobSettingsPanel.noPerRunSettings=The selected module has no per-run settings. @@ -109,6 +111,7 @@ IngestJobTableModel.colName.rootQueued=Roots Queued IngestJobTableModel.colName.streamingQueued=Streamed Files Queued IngestJobTableModel.colName.dsQueued=DS Queued IngestJobTableModel.colName.artifactsQueued=Artifacts Queued +IngestJobTableModel.colName.resultsQueued=Results Queued ModuleTableModel.colName.module=Module ModuleTableModel.colName.duration=Duration IngestJobSettingsPanel.jButtonSelectAll.text=Select All diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestModule.java b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestModule.java index a37816d6be..f38a4a3ebf 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestModule.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestModule.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2021 Basis Technology Corp. + * Copyright 2021-2021 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java index 865b29cb59..0caedd3629 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataArtifactIngestPipeline.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2021 Basis Technology Corp. + * Copyright 2021-2021 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -58,6 +58,7 @@ final class DataArtifactIngestPipeline extends IngestPipeline sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -37,10 +37,10 @@ final class DataArtifactIngestTask extends IngestTask { * @param artifact The data artifact to be processed. */ DataArtifactIngestTask(IngestJobExecutor ingestJobExecutor, DataArtifact artifact) { - super(ingestJobExecutor); + super(artifact.getName(), ingestJobExecutor); this.artifact = artifact; } - + /** * Gets the data artifact for this task. * diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTask.java index ddb1a7b471..914924606d 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTask.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestTask.java @@ -32,9 +32,9 @@ final class DataSourceIngestTask extends IngestTask { * task. */ DataSourceIngestTask(IngestJobExecutor ingestJobExecutor) { - super(ingestJobExecutor); + super(ingestJobExecutor.getDataSource().getName(), ingestJobExecutor); } - + @Override void execute(long threadId) { super.setThreadId(threadId); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java index 63bf99dfc4..22d28cd39b 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestTask.java @@ -41,7 +41,7 @@ final class FileIngestTask extends IngestTask { * @param file The file to be processed. */ FileIngestTask(IngestJobExecutor ingestJobPipeline, AbstractFile file) { - super(ingestJobPipeline); + super(file.getName(), ingestJobPipeline); this.file = file; fileId = file.getId(); } @@ -57,10 +57,10 @@ final class FileIngestTask extends IngestTask { * @param fileId The object ID of the file to be processed. */ FileIngestTask(IngestJobExecutor ingestJobPipeline, long fileId) { - super(ingestJobPipeline); + super("", ingestJobPipeline); this.fileId = fileId; } - + /** * Gets the object ID of the file for this task. * @@ -81,6 +81,9 @@ final class FileIngestTask extends IngestTask { synchronized AbstractFile getFile() throws TskCoreException { if (file == null) { file = Case.getCurrentCase().getSleuthkitCase().getAbstractFileById(fileId); + if (file != null) { + setContentName(file.getName()); + } } return file; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 350096d626..e16a889086 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -27,6 +27,7 @@ import java.util.logging.Level; import org.openide.util.NbBundle; import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; +import org.sleuthkit.datamodel.AnalysisResult; import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.DataArtifact; @@ -166,6 +167,16 @@ public final class IngestJob { ingestModuleExecutor.addDataArtifacts(dataArtifacts); } + /** + * Adds one or more analysis results to this ingest job for processing by + * its analysis result ingest modules. + * + * @param results The analysis results. + */ + void addAnalysisResults(List results) { + ingestModuleExecutor.addAnalysisResults(results); + } + /** * Starts data source level analysis for this job if it is running in * streaming ingest mode. diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java index 47b4d9b601..4a15bf2e4f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobExecutor.java @@ -19,6 +19,7 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.LinkedHashMap; @@ -27,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -43,7 +45,6 @@ import org.sleuthkit.autopsy.core.RuntimeProperties; import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.NetworkUtils; import org.sleuthkit.autopsy.coreutils.ThreadConfined; -import org.sleuthkit.autopsy.ingest.IngestTasksScheduler.IngestJobTasksSnapshot; import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.IngestJobInfo; @@ -54,12 +55,14 @@ import org.sleuthkit.datamodel.SleuthkitCase; import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.autopsy.modules.interestingitems.FilesSet; import org.sleuthkit.autopsy.python.FactoryClassNameNormalizer; +import org.sleuthkit.datamodel.AnalysisResult; import org.sleuthkit.datamodel.DataArtifact; import org.sleuthkit.datamodel.DataSource; /** - * Manages the construction, start up, execution, and shut down of the ingest - * module pipelines for an ingest job. + * Executes an ingest job by orchestrating the construction, start up, running, + * and shut down of the ingest module pipelines that perform the ingest tasks + * for the job. */ final class IngestJobExecutor { @@ -74,9 +77,9 @@ final class IngestJobExecutor { private static final Pattern JYTHON_MODULE_REGEX = Pattern.compile("org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$"); /* - * These fields are the identity of this object: the parent ingest job, the - * user's ingest job settings, and the data source to be analyzed by the - * ingest module pipelines. Optionally, there is a set of files to be + * These fields are the identity of this object: the ingest job to be + * executed, the ingest job settings, and the data source to be analyzed by + * the ingest module pipelines. Optionally, there is a set of files to be * analyzed instead of analyzing all of the files in the data source. */ private final IngestJob ingestJob; @@ -89,87 +92,85 @@ final class IngestJobExecutor { * There are separate pipelines for high-priority and low priority data * source level ingest modules. These pipelines are run sequentially, not * simultaneously. - */ - private DataSourceIngestPipeline highPriorityDataSourceIngestPipeline; - private DataSourceIngestPipeline lowPriorityDataSourceIngestPipeline; - private volatile DataSourceIngestPipeline currentDataSourceIngestPipeline; - - /* + * * There are one or more identical file ingest module pipelines, based on * the number of file ingest threads in the ingest manager. References to * the file ingest pipelines are put into two collections, each with its own * purpose. A blocking queue allows file ingest threads to take and return - * file ingest pipelines as they work through the file ingest tasks for one - * or more ingest jobs. Having the same number of pipelines as threads - * ensures that a file ingest thread will never be idle as long as there are - * file ingest tasks still to do, regardless of the number of ingest jobs in + * file ingest pipeline copies, as they work through the file ingest tasks + * for the job. Having the same number of pipelines as file ingest threads + * ensures that a thread will never be idle, as long as there are file + * ingest tasks still to do, regardless of the number of ingest jobs in * progress. Additionally, a fixed list is used to cycle through the file - * ingest module pipelines to make ingest progress snapshots. + * ingest module pipelines when making ingest progress snapshots. + * + * There is at most one data artifact ingest module pipeline. + * + * There is at most one analysis result ingest module pipeline. */ + private DataSourceIngestPipeline highPriorityDataSourceIngestPipeline; + private DataSourceIngestPipeline lowPriorityDataSourceIngestPipeline; + private volatile DataSourceIngestPipeline currentDataSourceIngestPipeline; private final LinkedBlockingQueue fileIngestPipelinesQueue = new LinkedBlockingQueue<>(); private final List fileIngestPipelines = new ArrayList<>(); + private DataArtifactIngestPipeline dataArtifactIngestPipeline; + private AnalysisResultIngestPipeline analysisResultIngestPipeline; /* - * There is at most one data artifact ingest module pipeline. + * An ingest job transistion through several states during its execution. */ - private DataArtifactIngestPipeline artifactIngestPipeline; - - /* - * The construction, start up, execution, and shut down of the ingest module - * pipelines for an ingest job is done in stages. - */ - private static enum IngestJobStage { + private static enum IngestJobState { /* - * In this stage, the ingest module pipelines are constructed per the - * user's ingest job settings. This stage ends when all of the ingest - * module pipelines for the ingest job are ready to run. + * In this once-per-job state, the ingest module pipelines for the + * ingest job are constructed per the ingest job settings. This state + * ends when all of the ingest module pipelines for the ingest job are + * ready to run. */ - PIPELINES_START_UP, + PIPELINES_STARTING_UP, /* - * This stage is unique to a streaming mode ingest job. In this stage, + * This state is unique to a streaming mode ingest job. In this state, * file ingest module pipelines are analyzing files streamed to them via * addStreamedFiles(). If the ingest job is configured to have a data - * artifact ingest pipeline, that pipeline is also analyzing any data - * artifacts generated by the file ingest modules. This stage ends when - * addStreamedDataSource() is called. + * artifact and/or analysis result ingest pipeline, those pipelines are + * also analyzing any data artifacts and/or analysis results generated + * by the file ingest modules. The transition out of this state occurs + * when addStreamedDataSource() is called. */ STREAMED_FILE_ANALYSIS_ONLY, /* - * In this stage, file ingest module pipelines and/or a pipeline of + * In this state, file ingest module pipelines and/or a pipeline of * higher-priority data source level ingest modules are running. If the - * ingest job is configured to have a data artifact ingest pipeline, - * that pipeline is also analyzing any data artifacts generated by the - * file and/or data source level ingest modules. + * ingest job is configured to have a data artifact and/or analysis + * result ingest pipeline, those pipelines are also analyzing any data + * artifacts and/or analysis results generated by the file and/or data + * source level ingest modules. The transition out of this state occurs + * when all of the currently scheduled ingest tasks are completed. */ FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS, /** - * In this stage, a pipeline of lower-priority, usually long-running + * In this state, a pipeline of lower-priority, usually long-running * data source level ingest ingest modules is running. If the ingest job - * is configured to have a data artifact ingest pipeline, that pipeline - * is also analyzing any data artifacts generated by the data source - * level ingest modules. + * is configured to have a data artifact and/or analysis result ingest + * pipeline, those pipelines are also analyzing any data artifacts + * and/or analysis results generated by the data source level ingest + * modules. he transition out of this state occurs when all of the + * currently scheduled ingest tasks are completed. */ LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS, /** - * In this stage, The pipeline is shutting down its ingest modules. + * In this state, the ingest job executor is shutting down ingest + * modules pipelines, either during transitions between states in which + * analysis is performed, or at the end of the ingest job. */ - PIPELINES_SHUT_DOWN + PIPELINES_SHUTTING_DOWN }; + private final ReentrantReadWriteLock jobStateLock = new ReentrantReadWriteLock(); + private volatile IngestJobState jobState = IngestJobExecutor.IngestJobState.PIPELINES_STARTING_UP; /* - * The stage field is volatile to allow it to be read by multiple threads. - * So the stage transition lock is used not to guard the stage field, but to - * coordinate stage transitions. - */ - private volatile IngestJobStage stage = IngestJobExecutor.IngestJobStage.PIPELINES_START_UP; - private final Object stageTransitionLock = new Object(); - - /* - * During each stage of the ingest job, this object interacts with the - * ingest task scheduler to create ingest tasks for analyzing the data - * source, files and data artifacts that are the subject of the ingest job. - * The scheduler queues the tasks for the ingest manager's ingest threads. - * The ingest tasks are the units of work for the ingest module pipelines. + * The ingest job executor interacts with the ingest task scheduler to + * create ingest tasks for job. The scheduler queues the ingest tasks for + * the ingest manager's ingest threads. */ private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); @@ -184,6 +185,14 @@ final class IngestJobExecutor { * cancellation means that there can be a variable length delay between a * cancellation request and its fulfillment. Analysis already completed at * the time that cancellation occurs is NOT discarded. + * + * Note that the DataSourceIngestModule interface does not currently have a + * cancel() API. As a consequence, cancelling an individual data source + * ingest module requires setting and then unsetting a cancellation flag. + * Because of this, there is no ironclad guarantee that the correct module + * will be cancelled. We are relying on the module being long-running to + * avoid a race condition between module cancellation and the transition of + * the execution of a data source level ingest task to another module. */ private volatile boolean currentDataSourceIngestModuleCancelled; private final List cancelledDataSourceIngestModules = new CopyOnWriteArrayList<>(); @@ -199,8 +208,8 @@ final class IngestJobExecutor { * A layer of abstraction to allow alternate representations of progress * could be used here, as it is in other places in the application (see * implementations and usage of - * org.sleuthkit.autopsy.progress.ProgressIndicator interface), to better - * decouple this object from the application's presentation layer. + * org.sleuthkit.autopsy.progress.ProgressIndicator interface). This would + * better decouple this object from the application's presentation layer. */ private final boolean usingNetBeansGUI; @ThreadConfined(type = ThreadConfined.ThreadType.AWT) @@ -213,6 +222,8 @@ final class IngestJobExecutor { private ProgressHandle fileIngestProgressBar; @ThreadConfined(type = ThreadConfined.ThreadType.AWT) private ProgressHandle artifactIngestProgressBar; + @ThreadConfined(type = ThreadConfined.ThreadType.AWT) + private ProgressHandle resultIngestProgressBar; /* * The ingest job details that are stored to the case database are tracked @@ -232,23 +243,26 @@ final class IngestJobExecutor { private final Set pausedIngestThreads = new HashSet<>(); /** - * Constructs an object that manages the construction, start up, execution, - * and shut down of the ingest module pipelines for an ingest job. + * Constructs an object that executes an ingest job by orchestrating the + * construction, start up, running, and shut down of the ingest module + * pipelines that perform the ingest tasks for the job. * * @param ingestJob The ingest job. - * @param dataSource The data source. + * @param dataSource The data source that is the subject of the ingest job. * @param files A subset of the files from the data source. If the list * is empty, ALL of the files in the data source are an * analyzed. * @param settings The ingest job settings. * - * @throws InterruptedException Exception thrown if the thread in which the - * pipeline is being created is interrupted. + * @throws InterruptedException The exception is thrown if the thread in + * which the pipeline is being created is + * interrupted. */ IngestJobExecutor(IngestJob ingestJob, Content dataSource, List files, IngestJobSettings settings) throws InterruptedException { if (!(dataSource instanceof DataSource)) { throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS } + // RJCTODO: Refactor so that only the job is passed in and the other params are obtained from the job. this.ingestJob = ingestJob; this.dataSource = (DataSource) dataSource; this.files = new ArrayList<>(); @@ -256,8 +270,13 @@ final class IngestJobExecutor { this.settings = settings; usingNetBeansGUI = RuntimeProperties.runningWithGUI(); createTime = new Date().getTime(); - stage = IngestJobStage.PIPELINES_START_UP; - createIngestModulePipelines(); + jobStateLock.writeLock().lock(); + try { + jobState = IngestJobState.PIPELINES_STARTING_UP; + createIngestModulePipelines(); + } finally { + jobStateLock.writeLock().unlock(); + } } /** @@ -349,6 +368,8 @@ final class IngestJobExecutor { Map jythonFileModuleTemplates = new LinkedHashMap<>(); Map javaArtifactModuleTemplates = new LinkedHashMap<>(); Map jythonArtifactModuleTemplates = new LinkedHashMap<>(); + Map javaResultModuleTemplates = new LinkedHashMap<>(); + Map jythonResultModuleTemplates = new LinkedHashMap<>(); for (IngestModuleTemplate template : enabledTemplates) { if (template.isDataSourceIngestModuleTemplate()) { addModuleTemplateToSortingMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template); @@ -359,30 +380,37 @@ final class IngestJobExecutor { if (template.isDataArtifactIngestModuleTemplate()) { addModuleTemplateToSortingMap(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template); } + if (template.isAnalysisResultIngestModuleTemplate()) { + addModuleTemplateToSortingMap(javaResultModuleTemplates, jythonResultModuleTemplates, template); + } } /** * Take the module templates that have pipeline configuration entries * out of the buckets and add them to ingest module pipeline templates - * in the order prescribed by the pipeline configuration. + * in the order prescribed by the pipeline configuration. There is + * currently no pipeline configuration file support for data artifact or + * analysis result ingest module pipelines. */ IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance(); List firstStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig()); List secondStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig()); List filePipelineTemplate = createIngestPipelineTemplate(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig()); List artifactPipelineTemplate = new ArrayList<>(); + List resultsPipelineTemplate = new ArrayList<>(); /** * Add any ingest module templates remaining in the buckets to the * appropriate ingest module pipeline templates. Data source level * ingest modules templates that were not listed in the pipeline * configuration are added to the first stage data source pipeline - * template, Java modules are added before Jython modules and Core + * template, Java modules are added before Jython modules, and Core * Autopsy modules are added before third party modules. */ addToIngestPipelineTemplate(firstStageDataSourcePipelineTemplate, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates); addToIngestPipelineTemplate(filePipelineTemplate, javaFileModuleTemplates, jythonFileModuleTemplates); addToIngestPipelineTemplate(artifactPipelineTemplate, javaArtifactModuleTemplates, jythonArtifactModuleTemplates); + addToIngestPipelineTemplate(resultsPipelineTemplate, javaResultModuleTemplates, jythonResultModuleTemplates); /** * Construct the ingest module pipelines from the ingest module pipeline @@ -396,7 +424,8 @@ final class IngestJobExecutor { fileIngestPipelinesQueue.put(pipeline); fileIngestPipelines.add(pipeline); } - artifactIngestPipeline = new DataArtifactIngestPipeline(this, artifactPipelineTemplate); + dataArtifactIngestPipeline = new DataArtifactIngestPipeline(this, artifactPipelineTemplate); + analysisResultIngestPipeline = new AnalysisResultIngestPipeline(this, resultsPipelineTemplate); } /** @@ -472,32 +501,6 @@ final class IngestJobExecutor { return settings.getFileFilter(); } - /** - * Checks to see if there is at least one ingest module to run. - * - * @return True or false. - */ - boolean hasIngestModules() { - return hasFileIngestModules() - || hasHighPriorityDataSourceIngestModules() - || hasLowPriorityDataSourceIngestModules() - || hasDataArtifactIngestModules(); - } - - /** - * Checks to see if there is at least one data source level ingest module to - * run. - * - * @return True or false. - */ - boolean hasDataSourceIngestModules() { - if (stage == IngestJobStage.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS) { - return hasLowPriorityDataSourceIngestModules(); - } else { - return hasHighPriorityDataSourceIngestModules(); - } - } - /** * Checks to see if there is at least one high priority data source level * ingest module to run. @@ -523,7 +526,7 @@ final class IngestJobExecutor { * * @return True or false. */ - boolean hasFileIngestModules() { + private boolean hasFileIngestModules() { if (!fileIngestPipelines.isEmpty()) { return !fileIngestPipelines.get(0).isEmpty(); } @@ -536,13 +539,23 @@ final class IngestJobExecutor { * * @return True or false. */ - boolean hasDataArtifactIngestModules() { - return (artifactIngestPipeline.isEmpty() == false); + private boolean hasDataArtifactIngestModules() { + return (dataArtifactIngestPipeline.isEmpty() == false); + } + + /** + * Checks to see if there is at least one analysis result ingest module to + * run. + * + * @return True or false. + */ + private boolean hasAnalysisResultIngestModules() { + return (analysisResultIngestPipeline.isEmpty() == false); } /** * Determnines which ingest job stage to start in and starts up the ingest - * module pipelines. + * module pipelines for all of the stages. * * @return A collection of ingest module startup errors, empty on success. */ @@ -550,7 +563,7 @@ final class IngestJobExecutor { List errors = startUpIngestModulePipelines(); if (errors.isEmpty()) { recordIngestJobStartUpInfo(); - if (hasHighPriorityDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) { + if (hasHighPriorityDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules() || hasAnalysisResultIngestModules()) { if (ingestJob.getIngestMode() == IngestJob.Mode.STREAMING) { startStreamingModeAnalysis(); } else { @@ -564,13 +577,12 @@ final class IngestJobExecutor { } /** - * Starts up the ingest module pipelines in this ingest. Note that all of - * the child pipelines are started so that any and all start up errors can - * be returned to the caller. It is important to capture all of the errors, - * because the ingest job will be automatically cancelled and the errors - * will be reported to the user so either the issues can be addressed or the - * modules that can't start up can be disabled before the ingest job is - * attempted again. + * Starts up the ingest module pipelines. Note that ALL of the pipelines are + * started, so that any and all start up errors can be returned to the + * caller. It is important to capture all of the errors, because the ingest + * job will be automatically cancelled, and the errors will be reported to + * the user. This allows the user to either address the issues, or to + * disable the modules that can't start up, and attempt the job again. * * @return A list of ingest module startup errors, empty on success. */ @@ -589,7 +601,8 @@ final class IngestJobExecutor { break; } } - errors.addAll(startUpIngestModulePipeline(artifactIngestPipeline)); + errors.addAll(startUpIngestModulePipeline(dataArtifactIngestPipeline)); + errors.addAll(startUpIngestModulePipeline(analysisResultIngestPipeline)); return errors; } @@ -665,64 +678,66 @@ final class IngestJobExecutor { /** * Starts analysis for a batch mode ingest job. For a batch mode job, all of * the files in the data source (excepting carved and derived files) have - * already been added to the case database by the data source processor and - * analysis starts in the file and high priority data source level analysis - * stage. + * already been added to the case database by the data source processor + * (DSP) and analysis starts in the file and high priority data source level + * analysis stage. */ private void startBatchModeAnalysis() { - synchronized (stageTransitionLock) { + jobStateLock.writeLock().lock(); + try { logInfoMessage("Starting ingest job in batch mode"); //NON-NLS - stage = IngestJobStage.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; + jobState = IngestJobState.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; + currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; if (hasFileIngestModules()) { /* * Do an estimate of the total number of files to be analyzed. - * This number will be used to estimate of how many files remain - * to be analyzed as each file ingest task is completed. The - * numbers are estimates because file analysis can add carved - * files and/or derived files. + * This will be used to estimate of how many files remain to be + * analyzed as each file ingest task is completed. The numbers + * are estimates because analysis can add carved files and/or + * derived files to the job. */ if (files.isEmpty()) { /* - * Do a count of the files the data source processor has - * added to the case database. + * Do a count of the files the data source processor (DSP) + * has added to the case database. */ estimatedFilesToProcess = dataSource.accept(new GetFilesCountVisitor()); + taskScheduler.scheduleFileIngestTasks(this, files); } else { /* - * Use the number of files in the specified subset of all of - * the files for the data source. + * Otherwise, this job is analyzing a user-specified subset + * of the files in the data source. */ estimatedFilesToProcess = files.size(); + taskScheduler.scheduleFileIngestTasks(this, Collections.emptyList()); } startFileIngestProgressBar(); } if (hasHighPriorityDataSourceIngestModules()) { + taskScheduler.scheduleDataSourceIngestTask(this); startDataSourceIngestProgressBar(); } if (hasDataArtifactIngestModules()) { - startArtifactIngestProgressBar(); + /* + * Note that even if there are no other ingest module pipelines, + * analysis of any data artifacts already in the case database + * will be performed. + */ + taskScheduler.scheduleDataArtifactIngestTasks(this); + startDataArtifactIngestProgressBar(); } - /* - * Make the high priority data source level ingest module pipeline - * the current data source level ingest module pipeline. - */ - currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; - - /* - * Schedule ingest tasks. If only analyzing a subset of the files in - * the data source, the current assumption is that only file ingest - * tasks for those files need to be scheduled. Data artifact ingest - * tasks will be scheduled as data artifacts produced by the file - * analysis are posted to the blackboard. - */ - if (!files.isEmpty() && hasFileIngestModules()) { - taskScheduler.scheduleFileIngestTasks(this, files); - } else if (hasHighPriorityDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) { - taskScheduler.scheduleIngestTasks(this); + if (hasAnalysisResultIngestModules()) { + /* + * Note that even if there are no other ingest module pipelines, + * analysis of any analysis results already in the case database + * will be performed. + */ + taskScheduler.scheduleAnalysisResultIngestTasks(this); + startAnalysisResultIngestProgressBar(); } /* @@ -735,19 +750,24 @@ final class IngestJobExecutor { * a check here. */ checkForStageCompleted(); + + } finally { + jobStateLock.writeLock().unlock(); } } /** - * Starts analysis for a streaming mode ingest job. For a streaming mode - * job, a data source processor streams files to this ingest job executor as - * it adds the files to the case database, and file level analysis can begin - * before data source level analysis. + * Starts analysis for a streaming mode ingest job. Streaming mode is + * typically used to allow a data source processor (DSP) to streams file to + * this ingest job executor as it adds the files to the case database. This + * alternative to waiting until the DSP completes its processing allows file + * level analysis to begin before data source level analysis. */ private void startStreamingModeAnalysis() { - synchronized (stageTransitionLock) { + jobStateLock.writeLock().lock(); + try { logInfoMessage("Starting ingest job in streaming mode"); //NON-NLS - stage = IngestJobStage.STREAMED_FILE_ANALYSIS_ONLY; + jobState = IngestJobState.STREAMED_FILE_ANALYSIS_ONLY; if (hasFileIngestModules()) { /* @@ -758,10 +778,10 @@ final class IngestJobExecutor { * scheduled later, via addStreamedDataSource(). * * Note that because estimated files remaining to process still - * has its initial value of zero, the fle ingest progress bar + * has its initial value of zero, the file ingest progress bar * will start in the "indeterminate" state. A rough estimate of - * the files to processed will be computed later, when all of - * the files have been added to the case database, as signaled + * the files to be processed will be computed later, when all of + * the files have been added to the case database, as signalled * by a call to the addStreamedDataSource(). */ estimatedFilesToProcess = 0; @@ -769,35 +789,50 @@ final class IngestJobExecutor { } if (hasDataArtifactIngestModules()) { - startArtifactIngestProgressBar(); - /* - * Schedule artifact ingest tasks for any artifacts currently in - * the case database. This needs to be done before any files or - * the data source are streamed in to avoid analyzing the data - * artifacts added to the case database by those tasks twice. - * This constraint is implemented by restricting construction of - * a streaming mode IngestJob to - * IngestManager.openIngestStream(), which constructs and starts - * the job before returning the IngestStream. This means that - * the code in this method will run before addStreamedFiles() or - * addStreamedDataSource() can be called via the IngestStream. + * Start the data artifact progress bar and schedule ingest + * tasks for any data artifacts currently in the case database. + * This needs to be done BEFORE any files or the data source are + * streamed in to ensure that any data artifacts added to the + * case database by the file and data source ingest tasks are + * not analyzed twice. This works here because the ingest + * manager has not yet returned the ingest stream object that is + * used to call addStreamedFiles() and addStreamedDataSource(). */ + startDataArtifactIngestProgressBar(); taskScheduler.scheduleDataArtifactIngestTasks(this); } + + if (hasAnalysisResultIngestModules()) { + /* + * Start the analysis result progress bar and schedule ingest + * tasks for any analysis results currently in the case + * database. This needs to be done BEFORE any files or the data + * source are streamed in to ensure that any analysis results + * added to the case database by the file and data source ingest + * tasks are not analyzed twice. This works here because the + * ingest manager has not yet returned the ingest stream object + * that is used to call addStreamedFiles() and + * addStreamedDataSource(). + */ + startAnalysisResultIngestProgressBar(); + taskScheduler.scheduleAnalysisResultIngestTasks(this); + } + } finally { + jobStateLock.writeLock().unlock(); } } /** * Signals in streaming mode that all of the files have been added to the - * case database and streamed in, and the data source is now ready for - * analysis. + * case database and streamed in to this ingest job executor, and the data + * source is now ready for analysis. */ void addStreamedDataSource() { - synchronized (stageTransitionLock) { + jobStateLock.writeLock().lock(); + try { logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS - stage = IngestJobExecutor.IngestJobStage.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; - currentDataSourceIngestPipeline = highPriorityDataSourceIngestPipeline; + jobState = IngestJobExecutor.IngestJobState.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; if (hasFileIngestModules()) { /* @@ -813,9 +848,9 @@ final class IngestJobExecutor { if (hasHighPriorityDataSourceIngestModules()) { /* * Start a data source level ingest progress bar in the lower - * right hand corner of the main application window. The file - * and data artifact ingest progress bars were already started - * in startStreamingModeAnalysis(). + * right hand corner of the main application window. The file, + * data artifact, and analysis result ingest progress bars were + * already started in startStreamingModeAnalysis(). */ startDataSourceIngestProgressBar(); @@ -834,6 +869,8 @@ final class IngestJobExecutor { */ checkForStageCompleted(); } + } finally { + jobStateLock.writeLock().unlock(); } } @@ -841,14 +878,17 @@ final class IngestJobExecutor { * Starts low priority data source analysis. */ private void startLowPriorityDataSourceAnalysis() { - synchronized (stageTransitionLock) { + jobStateLock.writeLock().lock(); + try { if (hasLowPriorityDataSourceIngestModules()) { logInfoMessage("Starting low priority data source analysis"); //NON-NLS - stage = IngestJobExecutor.IngestJobStage.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; - startDataSourceIngestProgressBar(); + jobState = IngestJobExecutor.IngestJobState.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS; currentDataSourceIngestPipeline = lowPriorityDataSourceIngestPipeline; + startDataSourceIngestProgressBar(); taskScheduler.scheduleDataSourceIngestTask(this); } + } finally { + jobStateLock.writeLock().unlock(); } } @@ -856,14 +896,13 @@ final class IngestJobExecutor { * Starts a NetBeans progress bar for data artifacts analysis in the lower * right hand corner of the main application window. The progress bar * provides the user with a task cancellation button. Pressing it cancels - * the ingest job. Analysis already completed at the time that cancellation - * occurs is NOT discarded. + * the entire ingest job. Analysis already completed at the time that + * cancellation occurs is NOT discarded. */ - private void startArtifactIngestProgressBar() { + private void startDataArtifactIngestProgressBar() { if (usingNetBeansGUI) { SwingUtilities.invokeLater(() -> { - String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataArtifactIngest.displayName", this.dataSource.getName()); - artifactIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { + artifactIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataArtifactIngest.displayName", this.dataSource.getName()), new Cancellable() { @Override public boolean cancel() { new Thread(() -> { @@ -878,6 +917,35 @@ final class IngestJobExecutor { } } + /** + * Starts a NetBeans progress bar for analysis results analysis in the lower + * right hand corner of the main application window. The progress bar + * provides the user with a task cancellation button. Pressing it cancels + * the entire ingest job. Analysis already completed at the time that + * cancellation occurs is NOT discarded. + */ + @NbBundle.Messages({ + "# {0} - data source name", + "IngestJob_progress_analysisResultIngest_displayName=Analyzing analysis results from {0}" + }) + private void startAnalysisResultIngestProgressBar() { + if (usingNetBeansGUI) { + SwingUtilities.invokeLater(() -> { + resultIngestProgressBar = ProgressHandle.createHandle(Bundle.IngestJob_progress_analysisResultIngest_displayName(dataSource.getName()), new Cancellable() { + @Override + public boolean cancel() { + new Thread(() -> { + IngestJobExecutor.this.cancel(IngestJob.CancellationReason.USER_CANCELLED); + }).start(); + return true; + } + }); + resultIngestProgressBar.start(); + resultIngestProgressBar.switchToIndeterminate(); + }); + } + } + /** * Starts a NetBeans progress bar for data source level analysis in the * lower right hand corner of the main application window. The progress bar @@ -889,8 +957,7 @@ final class IngestJobExecutor { private void startDataSourceIngestProgressBar() { if (usingNetBeansGUI) { SwingUtilities.invokeLater(() -> { - String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", dataSource.getName()); - dataSourceIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { + dataSourceIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", dataSource.getName()), new Cancellable() { @Override public boolean cancel() { /* @@ -925,15 +992,14 @@ final class IngestJobExecutor { /** * Starts a NetBeans progress bar for file analysis in the lower right hand * corner of the main application window. The progress bar provides the user - * with a task cancellation button. Pressing it cancels the ingest job. - * Analysis already completed at the time that cancellation occurs is NOT - * discarded. + * with a task cancellation button. Pressing it cancels the entire ingest + * job. Analysis already completed at the time that cancellation occurs is + * NOT discarded. */ private void startFileIngestProgressBar() { if (usingNetBeansGUI) { SwingUtilities.invokeLater(() -> { - String displayName = NbBundle.getMessage(getClass(), "IngestJob.progress.fileIngest.displayName", dataSource.getName()); - fileIngestProgressBar = ProgressHandle.createHandle(displayName, new Cancellable() { + fileIngestProgressBar = ProgressHandle.createHandle(NbBundle.getMessage(getClass(), "IngestJob.progress.fileIngest.displayName", dataSource.getName()), new Cancellable() { @Override public boolean cancel() { new Thread(() -> { @@ -949,7 +1015,7 @@ final class IngestJobExecutor { } /** - * Finishes the first stage progress bars. + * Finishes the first stage ingest progress bars. */ private void finishFirstStageProgressBars() { if (usingNetBeansGUI) { @@ -968,7 +1034,7 @@ final class IngestJobExecutor { } /** - * Finishes all current progress bars. + * Finishes all of the ingest progress bars. */ private void finishAllProgressBars() { if (usingNetBeansGUI) { @@ -987,6 +1053,11 @@ final class IngestJobExecutor { artifactIngestProgressBar.finish(); artifactIngestProgressBar = null; } + + if (resultIngestProgressBar != null) { + resultIngestProgressBar.finish(); + resultIngestProgressBar = null; + } }); } } @@ -996,12 +1067,13 @@ final class IngestJobExecutor { * completed and does a stage transition if they are. */ private void checkForStageCompleted() { - synchronized (stageTransitionLock) { - if (stage == IngestJobStage.STREAMED_FILE_ANALYSIS_ONLY) { + jobStateLock.writeLock().lock(); + try { + if (jobState == IngestJobState.STREAMED_FILE_ANALYSIS_ONLY) { return; } - if (taskScheduler.currentTasksAreCompleted(this)) { - switch (stage) { + if (taskScheduler.currentTasksAreCompleted(getIngestJobId())) { + switch (jobState) { case FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS: finishFileAndHighPriorityDataSrcAnalysis(); break; @@ -1010,6 +1082,8 @@ final class IngestJobExecutor { break; } } + } finally { + jobStateLock.writeLock().unlock(); } } @@ -1019,61 +1093,58 @@ final class IngestJobExecutor { * level analysis stage, if appropriate. */ private void finishFileAndHighPriorityDataSrcAnalysis() { - synchronized (stageTransitionLock) { - logInfoMessage("Finished file and high-priority data source analysis"); //NON-NLS - + jobStateLock.writeLock().lock(); + try { + jobState = IngestJobState.PIPELINES_SHUTTING_DOWN; shutDownIngestModulePipeline(currentDataSourceIngestPipeline); while (!fileIngestPipelinesQueue.isEmpty()) { FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll(); shutDownIngestModulePipeline(pipeline); } - finishFirstStageProgressBars(); + logInfoMessage("Finished file and high-priority data source analysis"); //NON-NLS if (!jobCancelled && hasLowPriorityDataSourceIngestModules()) { startLowPriorityDataSourceAnalysis(); } else { shutDown(); } + } finally { + jobStateLock.writeLock().unlock(); } } /** - * Shuts down the ingest module pipelines and ingest job progress - * indicators. + * Shuts down the ingest module pipelines and ingest job progress bars. */ private void shutDown() { - synchronized (stageTransitionLock) { - logInfoMessage("Finished all tasks"); //NON-NLS - stage = IngestJobExecutor.IngestJobStage.PIPELINES_SHUT_DOWN; - + jobStateLock.writeLock().lock(); + try { + logInfoMessage("Finished all ingest tasks"); //NON-NLS + jobState = IngestJobExecutor.IngestJobState.PIPELINES_SHUTTING_DOWN; shutDownIngestModulePipeline(currentDataSourceIngestPipeline); - shutDownIngestModulePipeline(artifactIngestPipeline); + shutDownIngestModulePipeline(dataArtifactIngestPipeline); + shutDownIngestModulePipeline(analysisResultIngestPipeline); + finishAllProgressBars(); - if (ingestJobInfo != null) { - if (jobCancelled) { - try { + try { + if (ingestJobInfo != null) { + if (jobCancelled) { ingestJobInfo.setIngestJobStatus(IngestJobStatusType.CANCELLED); - } catch (TskCoreException ex) { - logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex); - } - } else { - try { + } else { ingestJobInfo.setIngestJobStatus(IngestJobStatusType.COMPLETED); - } catch (TskCoreException ex) { - logErrorMessage(Level.WARNING, "Failed to update ingest job status in case database", ex); } - } - try { ingestJobInfo.setEndDateTime(new Date()); - } catch (TskCoreException ex) { - logErrorMessage(Level.WARNING, "Failed to set job end date in case database", ex); } + } catch (TskCoreException ex) { + logErrorMessage(Level.WARNING, "Failed to set job end date in case database", ex); } - } - ingestJob.notifyIngestPipelinesShutDown(); + ingestJob.notifyIngestPipelinesShutDown(); + } finally { + jobStateLock.writeLock().unlock(); + } } /** @@ -1095,9 +1166,11 @@ final class IngestJobExecutor { * Passes the data source for the ingest job through the currently active * data source level ingest module pipeline (high-priority or low-priority). * - * @param task A data source ingest task wrapping the data source. + * @param task A data source ingest task encapsulating the data source and + * the data source ingest pipeline. */ void execute(DataSourceIngestTask task) { + jobStateLock.readLock().lock(); try { if (!isCancelled()) { List errors = new ArrayList<>(); @@ -1108,6 +1181,7 @@ final class IngestJobExecutor { } } finally { taskScheduler.notifyTaskCompleted(task); + jobStateLock.readLock().unlock(); checkForStageCompleted(); } } @@ -1116,15 +1190,17 @@ final class IngestJobExecutor { * Passes a file from the data source for the ingest job through a file * ingest module pipeline. * - * @param task A file ingest task wrapping the file. + * @param task A file ingest task encapsulating the file and the file ingest + * pipeline. */ void execute(FileIngestTask task) { + jobStateLock.readLock().lock(); try { if (!isCancelled()) { FileIngestPipeline pipeline = fileIngestPipelinesQueue.take(); if (!pipeline.isEmpty()) { /* - * Get the file from the task. If the file was "streamed," + * Get the file from the task. If the file was streamed in, * the task may only have the file object ID, and a trip to * the case database will be required. */ @@ -1156,10 +1232,11 @@ final class IngestJobExecutor { fileIngestPipelinesQueue.put(pipeline); } } catch (InterruptedException ex) { - logger.log(Level.SEVERE, String.format("Unexpected interrupt of file ingest thread during execution of file ingest job (file object ID = %d, thread ID = %d)", task.getFileId(), task.getThreadId()), ex); + logger.log(Level.SEVERE, String.format("File ingest thread interrupted during execution of file ingest job (file object ID = %d, thread ID = %d)", task.getFileId(), task.getThreadId()), ex); Thread.currentThread().interrupt(); } finally { taskScheduler.notifyTaskCompleted(task); + jobStateLock.readLock().unlock(); checkForStageCompleted(); } } @@ -1168,90 +1245,131 @@ final class IngestJobExecutor { * Passes a data artifact from the data source for the ingest job through * the data artifact ingest module pipeline. * - * @param task A data artifact ingest task wrapping the data artifact. + * @param task A data artifact ingest task encapsulating the data artifact + * and the data artifact ingest pipeline. */ void execute(DataArtifactIngestTask task) { + jobStateLock.readLock().lock(); try { - if (!isCancelled() && !artifactIngestPipeline.isEmpty()) { + if (!isCancelled() && !dataArtifactIngestPipeline.isEmpty()) { List errors = new ArrayList<>(); - errors.addAll(artifactIngestPipeline.performTask(task)); + errors.addAll(dataArtifactIngestPipeline.performTask(task)); if (!errors.isEmpty()) { logIngestModuleErrors(errors); } } } finally { taskScheduler.notifyTaskCompleted(task); + jobStateLock.readLock().unlock(); checkForStageCompleted(); } } /** - * Adds some streamed files for analysis as part of a streaming mode ingest - * job. + * Passes an analyisis result from the data source for the ingest job + * through the analysis result ingest module pipeline. + * + * @param task An analysis result ingest task encapsulating the analysis + * result and the analysis result ingest pipeline. + */ + void execute(AnalysisResultIngestTask task) { + jobStateLock.readLock().lock(); + try { + if (!isCancelled() && !analysisResultIngestPipeline.isEmpty()) { + List errors = new ArrayList<>(); + errors.addAll(analysisResultIngestPipeline.performTask(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } + } + } finally { + taskScheduler.notifyTaskCompleted(task); + jobStateLock.readLock().unlock(); + checkForStageCompleted(); + } + } + + /** + * Streams in files for analysis as part of a streaming mode ingest job. * * @param fileObjIds The object IDs of the files. */ void addStreamedFiles(List fileObjIds) { - if (hasFileIngestModules()) { - if (stage.equals(IngestJobStage.STREAMED_FILE_ANALYSIS_ONLY)) { + if (!isCancelled() && hasFileIngestModules()) { + if (jobState.equals(IngestJobState.STREAMED_FILE_ANALYSIS_ONLY)) { IngestJobExecutor.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); } else { - logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); + logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + jobState.toString() + " not supported"); } } } /** - * Adds additional files (e.g., extracted or carved files) for analysis. + * Adds additional files produced by ingest modules (e.g., extracted or + * carved files) for analysis. The intended clients of this method are + * ingest modules running code in an ingest thread that has not yet notified + * the ingest task scheduler that the the primary ingest task that is the + * source of the files is completed. This means that the new tasks will be + * scheduled BEFORE the primary task has been removed from the scheduler's + * running tasks list. * * @param files A list of the files to add. */ void addFiles(List files) { - if (stage.equals(IngestJobStage.STREAMED_FILE_ANALYSIS_ONLY) - || stage.equals(IngestJobStage.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS)) { - taskScheduler.fastTrackFileIngestTasks(this, files); - } else { - logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); + if (!isCancelled() && hasFileIngestModules()) { + if (jobState.equals(IngestJobState.STREAMED_FILE_ANALYSIS_ONLY) || jobState.equals(IngestJobState.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS)) { + taskScheduler.scheduleHighPriorityFileIngestTasks(this, files); + } else { + logErrorMessage(Level.SEVERE, "Adding files to job during stage " + jobState.toString() + " not supported"); + } } - - /** - * The intended clients of this method are ingest modules running code - * in an ingest thread that is holding a reference to a "primary" ingest - * task that was the source of the files, in which case a completion - * check would not be necessary, so this is a bit of defensive - * programming. - */ - checkForStageCompleted(); } /** - * Adds data artifacts for analysis. + * Adds data artifacts for analysis. The intended clients of this method are + * ingest modules running code in an ingest thread that has not yet notified + * the ingest task scheduler that the the primary ingest task that is the + * source of the data artifacts is completed. This means that the new tasks + * will be scheduled BEFORE the primary task has been removed from the + * scheduler's running tasks list. * - * @param artifacts + * @param artifacts The data artifacts. */ void addDataArtifacts(List artifacts) { - List artifactsToAnalyze = new ArrayList<>(artifacts); - if (stage.equals(IngestJobStage.STREAMED_FILE_ANALYSIS_ONLY) - || stage.equals(IngestJobStage.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS) - || stage.equals(IngestJobStage.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS)) { - taskScheduler.scheduleDataArtifactIngestTasks(this, artifactsToAnalyze); - } else { - logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); + if (!isCancelled() && hasDataArtifactIngestModules()) { + if (jobState.equals(IngestJobState.STREAMED_FILE_ANALYSIS_ONLY) || jobState.equals(IngestJobState.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS) || jobState.equals(IngestJobState.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS)) { + taskScheduler.scheduleDataArtifactIngestTasks(this, artifacts); + } else { + logErrorMessage(Level.SEVERE, "Attempt to add data artifacts to job during stage " + jobState.toString() + " not supported"); + } } + } - /** - * The intended clients of this method are ingest modules running code - * in an ingest thread that is holding a reference to a "primary" ingest - * task that was the source of the files, in which case a completion - * check would not be necessary, so this is a bit of defensive - * programming. - */ - checkForStageCompleted(); + /** + * Adds analysis results for analysis. The intended clients of this method + * are ingest modules running code in an ingest thread that has not yet + * notified the ingest task scheduler that the the primary ingest task that + * is the source of the analysis results is completed. This means that the + * new tasks will be scheduled BEFORE the primary task has been removed from + * the scheduler's running tasks list. + * + * @param results The analysis results. + */ + void addAnalysisResults(List results) { + if (!isCancelled() && hasAnalysisResultIngestModules()) { + if (jobState.equals(IngestJobState.STREAMED_FILE_ANALYSIS_ONLY) || jobState.equals(IngestJobState.FILE_AND_HIGH_PRIORITY_DATA_SRC_LEVEL_ANALYSIS) || jobState.equals(IngestJobState.LOW_PRIORITY_DATA_SRC_LEVEL_ANALYSIS)) { + taskScheduler.scheduleAnalysisResultIngestTasks(this, results); + } else { + logErrorMessage(Level.SEVERE, "Attempt to add analysis results to job during stage " + jobState.toString() + " not supported"); + } + } } /** * Updates the display name shown on the current data source level ingest - * progress bar for this job, if the job has not been cancelled. + * progress bar for this job, if the job has not been cancelled. This is + * intended to be called by data source level ingest modules and the display + * name should reference the ingest module name. * * @param displayName The new display name. */ @@ -1268,7 +1386,9 @@ final class IngestJobExecutor { /** * Switches the current data source level ingest progress bar to determinate * mode, if the job has not been cancelled. This should be called if the - * total work units to process the data source is known. + * total work units to process the data source is known. This is intended to + * be called by data source level ingest modules in conjunction with + * updateDataSourceIngestProgressBarDisplayName(). * * @param workUnits Total number of work units for the processing of the * data source. @@ -1287,6 +1407,8 @@ final class IngestJobExecutor { * Switches the current data source level ingest progress bar to * indeterminate mode, if the job has not been cancelled. This should be * called if the total work units to process the data source is unknown. + * This is intended to be called by data source level ingest modules in + * conjunction with updateDataSourceIngestProgressBarDisplayName(). */ void switchDataSourceIngestProgressBarToIndeterminate() { if (usingNetBeansGUI && !jobCancelled) { @@ -1301,7 +1423,9 @@ final class IngestJobExecutor { /** * Updates the current data source level ingest progress bar with the number * of work units performed, if in the determinate mode, and the job has not - * been cancelled. + * been cancelled. This is intended to be called by data source level ingest + * modules that have called + * switchDataSourceIngestProgressBarToDeterminate(). * * @param workUnits Number of work units performed. */ @@ -1423,23 +1547,25 @@ final class IngestJobExecutor { * Displays a "cancelling" message on all of the current ingest message * progress bars. */ - private void displayCancellingProgressMessage() { + private void displayCancellingProgressMessages() { if (usingNetBeansGUI) { SwingUtilities.invokeLater(() -> { if (dataSourceIngestProgressBar != null) { dataSourceIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.dataSourceIngest.initialDisplayName", dataSource.getName())); dataSourceIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling")); } - if (fileIngestProgressBar != null) { fileIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.fileIngest.displayName", dataSource.getName())); fileIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling")); } - if (artifactIngestProgressBar != null) { artifactIngestProgressBar.setDisplayName(NbBundle.getMessage(getClass(), "IngestJob.progress.dataArtifactIngest.displayName", dataSource.getName())); artifactIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling")); } + if (resultIngestProgressBar != null) { + resultIngestProgressBar.setDisplayName(Bundle.IngestJob_progress_analysisResultIngest_displayName(dataSource.getName())); + resultIngestProgressBar.progress(NbBundle.getMessage(getClass(), "IngestJob.progress.cancelling")); + } }); } } @@ -1449,6 +1575,15 @@ final class IngestJobExecutor { * ingest in order to stop the currently executing data source level ingest * module is in effect for this job. * + * Note that the DataSourceIngestModule interface does not currently have a + * cancel() API. As a consequence, cancelling an individual data source + * ingest module requires setting and then unsetting the + * currentDataSourceIngestModuleCancelled flag. Because of this, there is no + * ironclad guarantee that the correct module will be cancelled. We are + * relying on the module being long-running to avoid a race condition + * between module cancellation and the transition of the execution of a data + * source level ingest task to another module. + * * @return True or false. */ boolean currentDataSourceIngestModuleIsCancelled() { @@ -1461,6 +1596,15 @@ final class IngestJobExecutor { * data source ingest progress bar is reset, if the job has not been * cancelled. * + * Note that the DataSourceIngestModule interface does not currently have a + * cancel() API. As a consequence, cancelling an individual data source + * ingest module requires setting and then unsetting the + * currentDataSourceIngestModuleCancelled flag. Because of this, there is no + * ironclad guarantee that the correct module will be cancelled. We are + * relying on the module being long-running to avoid a race condition + * between module cancellation and the transition of the execution of a data + * source level ingest task to another module. + * * @param moduleDisplayName The display name of the module that was stopped. */ void currentDataSourceIngestModuleCancellationCompleted(String moduleDisplayName) { @@ -1498,6 +1642,15 @@ final class IngestJobExecutor { /** * Requests a temporary cancellation of data source level ingest for this * job in order to stop the currently executing data source ingest module. + * + * Note that the DataSourceIngestModule interface does not currently have a + * cancel() API. As a consequence, cancelling an individual data source + * ingest module requires setting and then unsetting the + * currentDataSourceIngestModuleCancelled flag. Because of this, there is no + * ironclad guarantee that the correct module will be cancelled. We are + * relying on the module being long-running to avoid a race condition + * between module cancellation and the transition of the execution of a data + * source level ingest task to another module. */ void cancelCurrentDataSourceIngestModule() { currentDataSourceIngestModuleCancelled = true; @@ -1515,16 +1668,14 @@ final class IngestJobExecutor { void cancel(IngestJob.CancellationReason reason) { jobCancelled = true; cancellationReason = reason; - displayCancellingProgressMessage(); - IngestJobExecutor.taskScheduler.cancelPendingFileTasksForIngestJob(this); - + displayCancellingProgressMessages(); + IngestJobExecutor.taskScheduler.cancelPendingFileTasksForIngestJob(getIngestJobId()); synchronized (threadRegistrationLock) { for (Thread thread : pausedIngestThreads) { thread.interrupt(); } pausedIngestThreads.clear(); } - checkForStageCompleted(); } @@ -1633,7 +1784,7 @@ final class IngestJobExecutor { long processedFilesCount = 0; long estimatedFilesToProcessCount = 0; long snapShotTime = new Date().getTime(); - IngestJobTasksSnapshot tasksSnapshot = null; + IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot = null; if (includeIngestTasksSnapshot) { processedFilesCount = processedFiles; estimatedFilesToProcessCount = estimatedFilesToProcess; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 0dc2597481..ea978112e0 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -70,6 +70,7 @@ import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent; import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent; import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent; import org.sleuthkit.datamodel.AbstractFile; +import org.sleuthkit.datamodel.AnalysisResult; import org.sleuthkit.datamodel.Blackboard; import org.sleuthkit.datamodel.BlackboardArtifact; import org.sleuthkit.datamodel.Content; @@ -136,7 +137,8 @@ public class IngestManager implements IngestProgressSnapshotProvider { private final Map ingestJobsById = new HashMap<>(); private final ExecutorService dataSourceLevelIngestJobTasksExecutor; private final ExecutorService fileLevelIngestJobTasksExecutor; - private final ExecutorService resultIngestTasksExecutor; + private final ExecutorService dataArtifactIngestTasksExecutor; + private final ExecutorService analysisResultIngestTasksExecutor; private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS; private final IngestMonitor ingestMonitor = new IngestMonitor(); private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance(); @@ -169,21 +171,11 @@ public class IngestManager implements IngestProgressSnapshotProvider { * the processing of data sources by ingest modules. */ private IngestManager() { - /* - * Submit a single Runnable ingest manager task for processing data - * source level ingest job tasks to the data source level ingest job - * tasks executor. - */ dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS; long threadId = nextIngestManagerTaskId.incrementAndGet(); dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - /* - * Submit a configurable number of Runnable ingest manager tasks for - * processing file level ingest job tasks to the file level ingest job - * tasks executor. - */ numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS for (int i = 0; i < numberOfFileIngestThreads; ++i) { @@ -192,12 +184,15 @@ public class IngestManager implements IngestProgressSnapshotProvider { ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } - resultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-results-ingest-%d").build()); //NON-NLS; + dataArtifactIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-artifact-ingest-%d").build()); //NON-NLS; threadId = nextIngestManagerTaskId.incrementAndGet(); - resultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getResultIngestTaskQueue())); - // RJCTODO - // ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - // RJCTODO: Where is the shut down code? + dataArtifactIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataArtifactIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); + + analysisResultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-analysis-result-ingest-%d").build()); //NON-NLS; + threadId = nextIngestManagerTaskId.incrementAndGet(); + analysisResultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getAnalysisResultIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } /** @@ -301,13 +296,16 @@ public class IngestManager implements IngestProgressSnapshotProvider { * job for possible analysis. */ List newDataArtifacts = new ArrayList<>(); + List newAnalysisResults = new ArrayList<>(); Collection newArtifacts = tskEvent.getArtifacts(); for (BlackboardArtifact artifact : newArtifacts) { if (artifact instanceof DataArtifact) { newDataArtifacts.add((DataArtifact) artifact); + } else { + newAnalysisResults.add((AnalysisResult) artifact); } } - if (!newDataArtifacts.isEmpty()) { + if (!newDataArtifacts.isEmpty() || !newAnalysisResults.isEmpty()) { IngestJob ingestJob = null; Optional ingestJobId = tskEvent.getIngestJobId(); if (ingestJobId.isPresent()) { @@ -379,7 +377,12 @@ public class IngestManager implements IngestProgressSnapshotProvider { } } if (ingestJob != null) { - ingestJob.addDataArtifacts(newDataArtifacts); + if (!newDataArtifacts.isEmpty()) { + ingestJob.addDataArtifacts(newDataArtifacts); + } + if (!newAnalysisResults.isEmpty()) { + ingestJob.addAnalysisResults(newAnalysisResults); + } } } @@ -909,52 +912,25 @@ public class IngestManager implements IngestProgressSnapshotProvider { /** * Updates the ingest progress snapshot when a new ingest module starts - * working on a data source level ingest task. + * working on an ingest task. This includes incrementing the total run time + * for the PREVIOUS ingest module in the pipeline, which has now finished + * its processing for the task. * * @param task The data source ingest task. * @param currentModuleName The display name of the currently processing * module. */ - void setIngestTaskProgress(DataSourceIngestTask task, String currentModuleName) { + void setIngestTaskProgress(IngestTask task, String currentModuleName) { IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); - IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource()); + IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource(), task.getContentName()); ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); - - /* - * Update the total run time for the PREVIOUS ingest module in the - * pipeline, which has now finished its processing for the task. - */ - incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); - } - - /** - * Updates the ingest progress snapshot when a new ingest module starts - * working on a file ingest task. - * - * @param task The file ingest task. - * @param currentModuleName The display name of the currently processing - * module. - */ - void setIngestTaskProgress(FileIngestTask task, String currentModuleName) { - IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); - IngestThreadActivitySnapshot newSnap; - try { - newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource(), task.getFile()); - } catch (TskCoreException ex) { - logger.log(Level.SEVERE, "Error getting file from file ingest task", ex); - newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource()); - } - ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); - - /* - * Update the total run time for the PREVIOUS ingest module in the - * pipeline, which has now finished its processing for the task. - */ - incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); + incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); } /** * Updates the ingest progress snapshot when an ingest task is completed. + * This includes incrementing the total run time for the ingest module, + * which is the LAST ingest module in the pipeline. * * @param task The ingest task. */ @@ -962,12 +938,7 @@ public class IngestManager implements IngestProgressSnapshotProvider { IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId()); ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); - - /* - * Update the total run time for the LAST ingest module in the pipeline, - * which has now finished its processing for the task. - */ - incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); + incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); } /** @@ -1163,8 +1134,7 @@ public class IngestManager implements IngestProgressSnapshotProvider { } /** - * A snapshot of the current activity of an ingest job task execution task - * running in an ingest thread. + * A snapshot of the current activity of an ingest thread. */ @Immutable public static final class IngestThreadActivitySnapshot implements Serializable { @@ -1173,107 +1143,111 @@ public class IngestManager implements IngestProgressSnapshotProvider { private final long threadId; private final Date startTime; - private final String activity; + private final String moduleDisplayName; private final String dataSourceName; private final String fileName; private final long jobId; /** - * A snapshot of the current activity of an idle ingest job task - * execution task running in an ingest thread. + * A snapshot of the current activity of an idle ingest thread. * - * @param threadId The ingest manager task/thread id for the - * task/thread. + * @param threadId The ID assigned to the thread by the ingest manager. */ IngestThreadActivitySnapshot(long threadId) { this.threadId = threadId; startTime = new Date(); - this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread"); + this.moduleDisplayName = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread"); this.dataSourceName = ""; this.fileName = ""; this.jobId = 0; } /** - * A snapshot of the current activity of an ingest job data source level - * task execution task running in an ingest thread. + * A snapshot of the current activity of an ingest thread executing a + * data source, data artifact, or analysis result ingest task. * - * @param threadId The ingest manager task/thread id for the - * task/thread. - * @param jobId The ingest job id. - * @param activity A short description of the current activity. - * @param dataSource The data source that is the subject of the task. + * @param threadId The ID assigned to the thread by the ingest + * manager. + * @param jobId The ID of the ingest job of which the + * current ingest task is a part. + * @param moduleDisplayName The display name of the ingest module + * currently working on the task. + * @param dataSource The data source that is the subject of the + * current ingest job. */ - IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) { + IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource) { this.threadId = threadId; this.jobId = jobId; startTime = new Date(); - this.activity = activity; + this.moduleDisplayName = moduleDisplayName; this.dataSourceName = dataSource.getName(); this.fileName = ""; } /** - * A snapshot of the current activity of an ingest job file level task - * execution task running in an ingest thread. + * A snapshot of the current activity of an ingest thread executing a + * file ingest task. * - * @param threadId The ingest manager task/thread id for the - * task/thread. - * @param jobId The ingest job id. - * @param activity A short description of the current activity. - * @param dataSource The data source that is the source of the file that - * is the subject of the task. - * @param file The file that is the subject of the task. + * @param threadId The ID assigned to the thread by the ingest + * manager. + * @param jobId The ID of the ingest job of which the + * current ingest task is a part. + * @param moduleDisplayName The display name of the ingest module + * currently working on the task. + * @param dataSource The data source that is the subject of the + * current ingest job. + * @param file The name of the file that is the subject of + * the current ingest task. */ - IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) { + IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource, String fileName) { this.threadId = threadId; this.jobId = jobId; startTime = new Date(); - this.activity = activity; + this.moduleDisplayName = moduleDisplayName; this.dataSourceName = dataSource.getName(); - this.fileName = file.getName(); + this.fileName = fileName; } /** - * Gets the ingest job id. + * Gets the ID of the ingest job of which the current ingest task is a + * part. * - * @return The ingest job id. + * @return The ingest job ID. */ long getIngestJobId() { return jobId; } /** - * Gets the ingest manager task/thread id for the task/thread. + * Gets the thread ID assigned to the thread by the ingest manager. * - * @return The task/thread id. + * @return The thread ID. */ long getThreadId() { return threadId; } /** - * Gets the start date and time for the current activity. + * Gets the start date and time for the current ingest task. * * @return The start date and time. */ Date getStartTime() { - return startTime; + return new Date(startTime.getTime()); } /** - * Gets the THE short description of the current activity. + * Gets display name of the ingest module currently working on the task. * - * @return The short description of the current activity. + * @return The module display name. */ - String getActivity() { - return activity; + String getModuleDisplayName() { + return moduleDisplayName; } /** - * Gets the display name of the data source that is either the subject - * of the task or is the source of the file that is the subject of the - * task. + * Gets the display name of the data source that is the subject of the + * ingest job. * * @return The data source display name. */ @@ -1282,9 +1256,9 @@ public class IngestManager implements IngestProgressSnapshotProvider { } /** - * Gets the file, if any, that is the subject of the task. + * Gets the name of file that is the subject of the current ingest task. * - * @return The fiel name, may be the empty string. + * @return The file name, may be the empty string. */ String getFileName() { return fileName; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactory.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactory.java index e473086d18..361f0a9626 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactory.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactory.java @@ -271,4 +271,43 @@ public interface IngestModuleFactory { throw new UnsupportedOperationException(); } + /** + * Queries the factory to determine if it is capable of creating analysis + * result ingest modules. + * + * @return True or false. + */ + default boolean isAnalysisResultIngestModuleFactory() { + return false; + } + + /** + * Creates an analysis result ingest module instance. + *

+ * Autopsy will generally use the factory to several instances of each type + * of module for each ingest job it performs. Completing an ingest job + * entails processing a single data source (e.g., a disk image) and all of + * the files from the data source, including files extracted from archives + * and any unallocated space (made to look like a series of files). The data + * source is passed through one or more pipelines of data source ingest + * modules. The files are passed through one or more pipelines of file + * ingest modules. + *

+ * The ingest framework may use multiple threads to complete an ingest job, + * but it is guaranteed that there will be no more than one module instance + * per thread. However, if the module instances must share resources, the + * modules are responsible for synchronizing access to the shared resources + * and doing reference counting as required to release those resources + * correctly. Also, more than one ingest job may be in progress at any given + * time. This must also be taken into consideration when sharing resources + * between module instances. modules. + * + * @param settings The settings for the ingest job. + * + * @return A file ingest module instance. + */ + default AnalysisResultIngestModule createAnalysisResultIngestModule(IngestModuleIngestJobSettings settings) { + throw new UnsupportedOperationException(); + } + } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleTemplate.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleTemplate.java index 26285f6439..4aadb62929 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleTemplate.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleTemplate.java @@ -93,6 +93,14 @@ public final class IngestModuleTemplate { return moduleFactory.createDataArtifactIngestModule(settings); } + public boolean isAnalysisResultIngestModuleTemplate() { + return moduleFactory.isAnalysisResultIngestModuleFactory(); + } + + public AnalysisResultIngestModule createAnalysisResultIngestModule() { + return moduleFactory.createAnalysisResultIngestModule(settings); + } + public void setEnabled(boolean enabled) { this.enabled = enabled; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java index b15f5723b5..4d2dd5bc1e 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java @@ -136,7 +136,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel { cellValue = snapshot.getThreadId(); break; case 1: - cellValue = snapshot.getActivity(); + cellValue = snapshot.getModuleDisplayName(); break; case 2: cellValue = snapshot.getDataSourceName(); @@ -179,7 +179,8 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel { NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.rootQueued"), NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.streamingQueued"), NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.dsQueued"), - NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.artifactsQueued")}; + NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.artifactsQueued"), + NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.resultsQueued")}; private List jobSnapshots; @@ -249,6 +250,9 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel { case 11: cellValue = snapShot.getArtifactTasksQueueSize(); break; + case 12: + cellValue = snapShot.getResultTasksQueueSize(); + break; default: cellValue = null; break; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java index fcca0dc629..5a9181af86 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTask.java @@ -19,6 +19,7 @@ package org.sleuthkit.autopsy.ingest; import org.sleuthkit.datamodel.Content; +import org.sleuthkit.datamodel.TskCoreException; /** * An ingest task that will be executed by an ingest thread using a given ingest @@ -31,19 +32,47 @@ abstract class IngestTask { private final static long NOT_SET = Long.MIN_VALUE; private final IngestJobExecutor ingestJobExecutor; private long threadId; + private String contentName; /** * Constructs an ingest task that will be executed by an ingest thread using * a given ingest job executor. * - * @param ingestJobExecutor The ingest job executor to use to execute the + * @param contentName The name of the content that is the subject of + * this task. May be the empty string, in which + * case setContentName() can be called later. + * @param ingestJobExecutor The ingest job executor to use to execute this * task. */ - IngestTask(IngestJobExecutor ingestJobExecutor) { + IngestTask(String contentName, IngestJobExecutor ingestJobExecutor) { + this.contentName = contentName; this.ingestJobExecutor = ingestJobExecutor; threadId = NOT_SET; } + /** + * Gets the name of the content that is the subject content of the ingest + * task. + * + * @return The content name. + * + * @throws TskCoreException This exception is thrown if there is a problem + * providing the subject content. + */ + String getContentName() { + return contentName; + } + + /** + * Gets the name of the content that is the subject content of the ingest + * task. + * + * @param contentName + */ + void setContentName(String contentName) { + this.contentName = contentName; + } + /** * Gets the ingest job executor to use to execute this task. * diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index 63d45ae3e0..d7609e63c7 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -21,7 +21,6 @@ package org.sleuthkit.autopsy.ingest; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Deque; import java.util.Iterator; @@ -39,6 +38,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; +import org.sleuthkit.datamodel.AnalysisResult; import org.sleuthkit.datamodel.Blackboard; import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.DataArtifact; @@ -67,6 +67,7 @@ final class IngestTasksScheduler { private final Queue streamedFileIngestTasksQueue; private final IngestTaskTrackingQueue fileIngestTasksQueue; private final IngestTaskTrackingQueue artifactIngestTasksQueue; + private final IngestTaskTrackingQueue resultIngestTasksQueue; /** * Gets the ingest tasks scheduler singleton that creates ingest tasks for @@ -92,6 +93,7 @@ final class IngestTasksScheduler { fileIngestTasksQueue = new IngestTaskTrackingQueue(); streamedFileIngestTasksQueue = new LinkedList<>(); artifactIngestTasksQueue = new IngestTaskTrackingQueue(); + resultIngestTasksQueue = new IngestTaskTrackingQueue(); } /** @@ -120,86 +122,66 @@ final class IngestTasksScheduler { * * @return The queue. */ - BlockingIngestTaskQueue getResultIngestTaskQueue() { + BlockingIngestTaskQueue getDataArtifactIngestTaskQueue() { return artifactIngestTasksQueue; } /** - * Schedules ingest tasks based on the types of ingest modules that the - * ingest pipeline that will exedute tasks has. Scheduling these tasks - * atomically means that it is valid to call currentTasksAreCompleted() - * immediately after calling this method. Note that the may cause some or - * even all of any file tasks to be discarded. + * Gets the analysis result ingest tasks queue. This queue is a blocking + * queue consumed by the ingest manager's analysis result ingest thread. * - * @param ingestPipeline The ingest pipeline that will execute the scheduled - * tasks. A reference to the pipeline is added to each - * task so that when the task is dequeued by an ingest - * thread the task can pass the target Content of the - * task to the pipeline for processing by the - * pipeline's ingest modules. + * @return The queue. */ - synchronized void scheduleIngestTasks(IngestJobExecutor ingestPipeline) { - if (!ingestPipeline.isCancelled()) { - if (ingestPipeline.hasDataSourceIngestModules()) { - scheduleDataSourceIngestTask(ingestPipeline); - } - if (ingestPipeline.hasFileIngestModules()) { - scheduleFileIngestTasks(ingestPipeline, Collections.emptyList()); - } - if (ingestPipeline.hasDataArtifactIngestModules()) { - scheduleDataArtifactIngestTasks(ingestPipeline); - } - } + BlockingIngestTaskQueue getAnalysisResultIngestTaskQueue() { + return resultIngestTasksQueue; } /** * Schedules a data source level ingest task for an ingest job. The data - * source is obtained from the ingest pipeline passed in. + * source is obtained from the ingest ingest job executor passed in. * - * @param ingestPipeline The ingest pipeline that will execute the scheduled - * task. A reference to the pipeline is added to the - * task so that when the task is dequeued by an ingest - * thread the task can pass the target Content of the - * task to the pipeline for processing by the - * pipeline's ingest modules. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. */ - synchronized void scheduleDataSourceIngestTask(IngestJobExecutor ingestPipeline) { - if (!ingestPipeline.isCancelled()) { - DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline); + synchronized void scheduleDataSourceIngestTask(IngestJobExecutor executor) { + if (!executor.isCancelled()) { + DataSourceIngestTask task = new DataSourceIngestTask(executor); try { dataSourceIngestTasksQueue.putLast(task); } catch (InterruptedException ex) { - IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (pipelineId={%d)", ingestPipeline.getIngestJobId()), ex); + IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (ingest job ID={%d)", executor.getIngestJobId()), ex); Thread.currentThread().interrupt(); } } } /** - * Schedules file tasks for either all the files, or a given subset of the - * files, for a data source. The data source is obtained from the ingest - * pipeline passed in. + * Schedules file tasks for either all of the files, or a given subset of + * the files, for a data source. The data source is obtained from the ingest + * ingest job executor passed in. * - * @param ingestPipeline The ingest pipeline that will execute the scheduled - * tasks. A reference to the pipeline is added to each - * task so that when the task is dequeued by an ingest - * thread the task can pass the target Content of the - * task to the pipeline for processing by the - * pipeline's ingest modules. - * @param files A subset of the files from the data source; if - * empty, then all if the files from the data source - * are candidates for scheduling. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + * @param files A subset of the files from the data source; if empty, + * then all of the files from the data source are candidates + * for scheduling. */ - synchronized void scheduleFileIngestTasks(IngestJobExecutor ingestPipeline, Collection files) { - if (!ingestPipeline.isCancelled()) { + synchronized void scheduleFileIngestTasks(IngestJobExecutor executor, Collection files) { + if (!executor.isCancelled()) { Collection candidateFiles; if (files.isEmpty()) { - candidateFiles = getTopLevelFiles(ingestPipeline.getDataSource()); + candidateFiles = getTopLevelFiles(executor.getDataSource()); } else { candidateFiles = files; } for (AbstractFile file : candidateFiles) { - FileIngestTask task = new FileIngestTask(ingestPipeline, file); + FileIngestTask task = new FileIngestTask(executor, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { topLevelFileIngestTasksQueue.add(task); } @@ -212,16 +194,15 @@ final class IngestTasksScheduler { * Schedules file tasks for a collection of "streamed" files for a streaming * ingest job. * - * @param ingestPipeline The ingest pipeline for the job. A reference to the - * pipeline is added to each task so that when the - * task is dequeued by an ingest thread and the task's - * execute() method is called, execute() can pass the - * target Content of the task to the pipeline for - * processing by the pipeline's ingest modules. - * @param files A list of file object IDs for the streamed files. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + * @param files A list of file object IDs for the streamed files. */ - synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor ingestPipeline, List fileIds) { - if (!ingestPipeline.isCancelled()) { + synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor executor, List fileIds) { + if (!executor.isCancelled()) { for (long id : fileIds) { /* * Create the file ingest task. Note that we do not do the @@ -230,7 +211,7 @@ final class IngestTasksScheduler { * file filter will be applied before the file task makes it to * the task queue consumed by the file ingest threads. */ - FileIngestTask task = new FileIngestTask(ingestPipeline, id); + FileIngestTask task = new FileIngestTask(executor, id); streamedFileIngestTasksQueue.add(task); } refillFileIngestTasksQueue(); @@ -244,16 +225,15 @@ final class IngestTasksScheduler { * be used to schedule files that are products of ingest module processing, * e.g., extracted files and carved files. * - * @param ingestPipeline The ingest pipeline for the job. A reference to the - * pipeline is added to each task so that when the - * task is dequeued by an ingest thread and the task's - * execute() method is called, execute() can pass the - * target Content of the task to the pipeline for - * processing by the pipeline's ingest modules. - * @param files The files. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + * @param files The files. */ - synchronized void fastTrackFileIngestTasks(IngestJobExecutor ingestPipeline, Collection files) { - if (!ingestPipeline.isCancelled()) { + synchronized void scheduleHighPriorityFileIngestTasks(IngestJobExecutor executor, Collection files) { + if (!executor.isCancelled()) { /* * Put the files directly into the queue for the file ingest * threads, if they pass the file filter for the job. The files are @@ -263,12 +243,12 @@ final class IngestTasksScheduler { * in progress. */ for (AbstractFile file : files) { - FileIngestTask fileTask = new FileIngestTask(ingestPipeline, file); + FileIngestTask fileTask = new FileIngestTask(executor, file); if (shouldEnqueueFileTask(fileTask)) { try { fileIngestTasksQueue.putFirst(fileTask); } catch (InterruptedException ex) { - DataSource dataSource = ingestPipeline.getDataSource(); + DataSource dataSource = executor.getDataSource(); logger.log(Level.WARNING, String.format("Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS Thread.currentThread().interrupt(); return; @@ -281,51 +261,73 @@ final class IngestTasksScheduler { /** * Schedules data artifact ingest tasks for any data artifacts that have * already been added to the case database for a data source. The data - * source is obtained from the ingest pipeline passed in. + * source is obtained from the ingest job executor passed in. * - * @param ingestPipeline The ingest pipeline for the job. A reference to the - * pipeline is added to each task so that when the - * task is dequeued by an ingest thread and the task's - * execute() method is called, execute() can pass the - * target Content of the task to the pipeline for - * processing by the pipeline's ingest modules. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. */ - synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline) { - if (!ingestPipeline.isCancelled()) { + synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor) { + if (!executor.isCancelled()) { Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard(); try { - List artifacts = blackboard.getDataArtifacts(ingestPipeline.getDataSource().getId(), null); - scheduleDataArtifactIngestTasks(ingestPipeline, artifacts); + List artifacts = blackboard.getDataArtifacts(executor.getDataSource().getId(), null); + scheduleDataArtifactIngestTasks(executor, artifacts); } catch (TskCoreException ex) { - DataSource dataSource = ingestPipeline.getDataSource(); + DataSource dataSource = executor.getDataSource(); logger.log(Level.SEVERE, String.format("Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS } } } + /** + * Schedules analysis result ingest tasks for any analysis result that have + * already been added to the case database for a data source. The data + * source is obtained from the ingest job executor passed in. + * + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + */ + synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor) { + if (!executor.isCancelled()) { + Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard(); + try { + List results = blackboard.getAnalysisResults(executor.getDataSource().getId(), null); + scheduleAnalysisResultIngestTasks(executor, results); + } catch (TskCoreException ex) { + DataSource dataSource = executor.getDataSource(); + logger.log(Level.SEVERE, String.format("Failed to retrieve analysis results for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS + } + } + } + /** * Schedules data artifact ingest tasks for an ingest job. This method is * intended to be used to schedule artifacts that are products of ingest * module processing. * - * @param ingestPipeline The ingest pipeline for the job. A reference to the - * pipeline is added to each task so that when the - * task is dequeued by an ingest thread and the task's - * execute() method is called, execute() can pass the - * target Content of the task to the pipeline for - * processing by the pipeline's ingest modules. - * @param artifacts A subset of the data artifacts from the data - * source; if empty, then all of the data artifacts - * from the data source will be scheduled. + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + * @param artifacts A subset of the data artifacts from the data source; if + * empty, then all of the data artifacts from the data + * source will be scheduled. */ - synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline, List artifacts) { - if (!ingestPipeline.isCancelled()) { + synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor, List artifacts) { + if (!executor.isCancelled()) { for (DataArtifact artifact : artifacts) { - DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact); + DataArtifactIngestTask task = new DataArtifactIngestTask(executor, artifact); try { - this.artifactIngestTasksQueue.putLast(task); + artifactIngestTasksQueue.putLast(task); } catch (InterruptedException ex) { - DataSource dataSource = ingestPipeline.getDataSource(); + DataSource dataSource = executor.getDataSource(); logger.log(Level.WARNING, String.format("Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS Thread.currentThread().interrupt(); break; @@ -334,6 +336,36 @@ final class IngestTasksScheduler { } } + /** + * Schedules data artifact ingest tasks for an ingest job. This method is + * intended to be used to schedule artifacts that are products of ingest + * module processing. + * + * @param executor The ingest job executor that will execute the scheduled + * tasks. A reference to the executor is added to each task + * so that when the task is dequeued by an ingest thread, + * the task can pass its target item to the executor for + * processing by the executor's ingest module pipelines. + * @param results A subset of the data artifacts from the data source; if + * empty, then all of the data artifacts from the data + * source will be scheduled. + */ + synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor, List results) { + if (!executor.isCancelled()) { + for (AnalysisResult result : results) { + AnalysisResultIngestTask task = new AnalysisResultIngestTask(executor, result); + try { + resultIngestTasksQueue.putLast(task); + } catch (InterruptedException ex) { + DataSource dataSource = executor.getDataSource(); + logger.log(Level.WARNING, String.format("Interrupted while enqueuing analysis results tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS + Thread.currentThread().interrupt(); + break; + } + } + } + } + /** * Allows an ingest thread to notify this ingest task scheduler that a data * source level task has been completed. @@ -365,22 +397,32 @@ final class IngestTasksScheduler { artifactIngestTasksQueue.taskCompleted(task); } + /** + * Allows an ingest thread to notify this ingest task scheduler that a data + * artifact ingest task has been completed. + * + * @param task The completed task. + */ + synchronized void notifyTaskCompleted(AnalysisResultIngestTask task) { + resultIngestTasksQueue.taskCompleted(task); + } + /** * Queries the task scheduler to determine whether or not all of the ingest * tasks for an ingest job have been completed. * - * @param ingestPipeline The ingest pipeline for the job. + * @param executor The ingest job executor. * * @return True or false. */ - synchronized boolean currentTasksAreCompleted(IngestJobExecutor ingestPipeline) { - long pipelineId = ingestPipeline.getIngestJobId(); - return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId) - || hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId) - || hasTasksForJob(batchedFileIngestTasksQueue, pipelineId) - || hasTasksForJob(streamedFileIngestTasksQueue, pipelineId) - || fileIngestTasksQueue.hasTasksForJob(pipelineId) - || artifactIngestTasksQueue.hasTasksForJob(pipelineId)); + synchronized boolean currentTasksAreCompleted(Long ingestJobId) { + return !(dataSourceIngestTasksQueue.hasTasksForJob(ingestJobId) + || hasTasksForJob(topLevelFileIngestTasksQueue, ingestJobId) + || hasTasksForJob(batchedFileIngestTasksQueue, ingestJobId) + || hasTasksForJob(streamedFileIngestTasksQueue, ingestJobId) + || fileIngestTasksQueue.hasTasksForJob(ingestJobId) + || artifactIngestTasksQueue.hasTasksForJob(ingestJobId) + || resultIngestTasksQueue.hasTasksForJob(ingestJobId)); } /** @@ -400,13 +442,12 @@ final class IngestTasksScheduler { * in the batch root file tasks queue and any directories in the batch root * children file tasks queue. * - * @param ingestJobPipeline The ingest pipeline for the job. + * @param ingestJobId The ingest job ID. */ - synchronized void cancelPendingFileTasksForIngestJob(IngestJobExecutor ingestJobPipeline) { - long jobId = ingestJobPipeline.getIngestJobId(); - removeTasksForJob(topLevelFileIngestTasksQueue, jobId); - removeTasksForJob(batchedFileIngestTasksQueue, jobId); - removeTasksForJob(streamedFileIngestTasksQueue, jobId); + synchronized void cancelPendingFileTasksForIngestJob(long ingestJobId) { + removeTasksForJob(topLevelFileIngestTasksQueue, ingestJobId); + removeTasksForJob(batchedFileIngestTasksQueue, ingestJobId); + removeTasksForJob(streamedFileIngestTasksQueue, ingestJobId); } /** @@ -421,7 +462,9 @@ final class IngestTasksScheduler { List topLevelFiles = new ArrayList<>(); Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { - // The data source is itself a file to be processed. + /* + * The data source is itself a file to be processed. + */ topLevelFiles.add((AbstractFile) dataSource); } else { for (AbstractFile root : rootObjects) { @@ -429,12 +472,17 @@ final class IngestTasksScheduler { try { children = root.getChildren(); if (children.isEmpty()) { - // Add the root object itself, it could be an unallocated - // space file, or a child of a volume or an image. + /* + * Add the root object itself, it could be an + * unallocated space file, or a child of a volume or an + * image. + */ topLevelFiles.add(root); } else { - // The root object is a file system root directory, get - // the files within it. + /* + * The root object is a file system root directory, get + * the files within it. + */ for (Content child : children) { if (child instanceof AbstractFile) { topLevelFiles.add((AbstractFile) child); @@ -546,7 +594,8 @@ final class IngestTasksScheduler { AbstractFile file = null; try { file = nextTask.getFile(); - for (Content child : file.getChildren()) { + List children = file.getChildren(); + for (Content child : children) { if (child instanceof AbstractFile) { AbstractFile childFile = (AbstractFile) child; FileIngestTask childTask = new FileIngestTask(nextTask.getIngestJobExecutor(), childFile); @@ -586,8 +635,10 @@ final class IngestTasksScheduler { return false; } - // Skip the task if the file is actually the pseudo-file for the parent - // or current directory. + /* + * Skip the task if the file is actually the pseudo-file for the parent + * or current directory. + */ String fileName = file.getName(); if (fileName.equals(".") || fileName.equals("..")) { @@ -610,12 +661,16 @@ final class IngestTasksScheduler { return false; } - // Skip the task if the file is one of a select group of special, large - // NTFS or FAT file system files. + /* + * Skip the task if the file is one of a select group of special, large + * NTFS or FAT file system files. + */ if (file instanceof org.sleuthkit.datamodel.File) { final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file; - // Get the type of the file system, if any, that owns the file. + /* + * Get the type of the file system, if any, that owns the file. + */ TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP; try { FileSystem fs = f.getFileSystem(); @@ -626,12 +681,16 @@ final class IngestTasksScheduler { logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS } - // If the file system is not NTFS or FAT, don't skip the file. + /* + * If the file system is not NTFS or FAT, don't skip the file. + */ if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) { return true; } - // Find out whether the file is in a root directory. + /* + * Find out whether the file is in a root directory. + */ boolean isInRootDir = false; try { AbstractFile parent = f.getParentDirectory(); @@ -644,9 +703,11 @@ final class IngestTasksScheduler { logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS } - // If the file is in the root directory of an NTFS or FAT file - // system, check its meta-address and check its name for the '$' - // character and a ':' character (not a default attribute). + /* + * If the file is in the root directory of an NTFS or FAT file + * system, check its meta-address and check its name for the '$' + * character and a ':' character (not a default attribute). + */ if (isInRootDir && f.getMetaAddr() < 32) { String name = f.getName(); if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) { @@ -695,14 +756,14 @@ final class IngestTasksScheduler { * Checks whether or not a collection of ingest tasks includes a task for a * given ingest job. * - * @param tasks The tasks. - * @param pipelineId The ID of the ingest pipeline for the job. + * @param tasks The tasks. + * @param ingestJobId The ingest job ID. * * @return True if there are no tasks for the job, false otherwise. */ - synchronized private static boolean hasTasksForJob(Collection tasks, long pipelineId) { + synchronized private static boolean hasTasksForJob(Collection tasks, long ingestJobId) { for (IngestTask task : tasks) { - if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) { + if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) { return true; } } @@ -713,14 +774,14 @@ final class IngestTasksScheduler { * Removes all of the ingest tasks associated with an ingest job from a * collection of tasks. * - * @param tasks The tasks. - * @param pipelineId The ID of the ingest pipeline for the job. + * @param tasks The tasks. + * @param ingestJobId The ingest job ID. */ - private static void removeTasksForJob(Collection tasks, long pipelineId) { + private static void removeTasksForJob(Collection tasks, long ingestJobId) { Iterator iterator = tasks.iterator(); while (iterator.hasNext()) { IngestTask task = iterator.next(); - if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) { + if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) { iterator.remove(); } } @@ -730,15 +791,15 @@ final class IngestTasksScheduler { * Counts the number of ingest tasks in a collection of tasks for a given * ingest job. * - * @param tasks The tasks. - * @param pipelineId The ID of the ingest pipeline for the job. + * @param tasks The tasks. + * @param ingestJobId The ingest job ID. * * @return The count. */ - private static int countTasksForJob(Collection tasks, long pipelineId) { + private static int countTasksForJob(Collection tasks, long ingestJobId) { int count = 0; for (IngestTask task : tasks) { - if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) { + if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) { count++; } } @@ -749,18 +810,22 @@ final class IngestTasksScheduler { * Returns a snapshot of the states of the tasks in progress for an ingest * job. * - * @param jobId The identifier assigned to the job. + * @param ingestJobId The ingest job ID. * - * @return + * @return The snaphot. */ - synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) { - return new IngestJobTasksSnapshot(jobId, dataSourceIngestTasksQueue.countQueuedTasksForJob(jobId), - countTasksForJob(topLevelFileIngestTasksQueue, jobId), - countTasksForJob(batchedFileIngestTasksQueue, jobId), - fileIngestTasksQueue.countQueuedTasksForJob(jobId), - dataSourceIngestTasksQueue.countRunningTasksForJob(jobId) + fileIngestTasksQueue.countRunningTasksForJob(jobId) + artifactIngestTasksQueue.countRunningTasksForJob(jobId), - countTasksForJob(streamedFileIngestTasksQueue, jobId), - artifactIngestTasksQueue.countQueuedTasksForJob(jobId)); + synchronized IngestTasksSnapshot getTasksSnapshotForJob(long ingestJobId) { + return new IngestTasksSnapshot( + ingestJobId, + dataSourceIngestTasksQueue.countQueuedTasksForJob(ingestJobId), + countTasksForJob(topLevelFileIngestTasksQueue, ingestJobId), + countTasksForJob(batchedFileIngestTasksQueue, ingestJobId), + fileIngestTasksQueue.countQueuedTasksForJob(ingestJobId), + countTasksForJob(streamedFileIngestTasksQueue, ingestJobId), + artifactIngestTasksQueue.countQueuedTasksForJob(ingestJobId), + resultIngestTasksQueue.countQueuedTasksForJob(ingestJobId), + dataSourceIngestTasksQueue.countRunningTasksForJob(ingestJobId) + fileIngestTasksQueue.countRunningTasksForJob(ingestJobId) + artifactIngestTasksQueue.countRunningTasksForJob(ingestJobId) + resultIngestTasksQueue.countRunningTasksForJob(ingestJobId) + ); } /** @@ -771,20 +836,27 @@ final class IngestTasksScheduler { @Override public int compare(FileIngestTask q1, FileIngestTask q2) { - // In practice the case where one or both calls to getFile() fails - // should never occur since such tasks would not be added to the queue. + /* + * In practice the case where one or both calls to getFile() fails + * should never occur since such tasks would not be added to the + * queue. + */ AbstractFile file1 = null; AbstractFile file2 = null; try { file1 = q1.getFile(); } catch (TskCoreException ex) { - // Do nothing - the exception has been logged elsewhere + /* + * Do nothing - the exception has been logged elsewhere + */ } try { file2 = q2.getFile(); } catch (TskCoreException ex) { - // Do nothing - the exception has been logged elsewhere + /* + * Do nothing - the exception has been logged elsewhere + */ } if (file1 == null) { @@ -829,15 +901,11 @@ final class IngestTasksScheduler { static final List HIGH_PRI_PATHS = new ArrayList<>(); /* - * prioritize root directory folders based on the assumption that we + * Prioritize root directory folders based on the assumption that we * are looking for user content. Other types of investigations may * want different priorities. */ - static /* - * prioritize root directory folders based on the assumption that we - * are looking for user content. Other types of investigations may - * want different priorities. - */ { + static { // these files have no structure, so they go last //unalloc files are handled as virtual files in getPriority() //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE)); @@ -1013,98 +1081,107 @@ final class IngestTasksScheduler { * Checks whether there are any ingest tasks are queued and/or running * for a given ingest job. * - * @param pipelineId The ID of the ingest pipeline for the job. + * @param ingestJobId The ingest job ID. * - * @return + * @return True or false. */ - boolean hasTasksForJob(long pipelineId) { + boolean hasTasksForJob(long ingestJobId) { synchronized (this) { - return IngestTasksScheduler.hasTasksForJob(queuedTasks, pipelineId) || IngestTasksScheduler.hasTasksForJob(tasksInProgress, pipelineId); + return IngestTasksScheduler.hasTasksForJob(queuedTasks, ingestJobId) || IngestTasksScheduler.hasTasksForJob(tasksInProgress, ingestJobId); } } /** * Gets a count of the queued ingest tasks for a given ingest job. * - * @param pipelineId The ID of the ingest pipeline for the job. + * @param ingestJobId The ingest job ID. * - * @return + * @return The count. */ - int countQueuedTasksForJob(long pipelineId) { + int countQueuedTasksForJob(long ingestJobId) { synchronized (this) { - return IngestTasksScheduler.countTasksForJob(queuedTasks, pipelineId); + return IngestTasksScheduler.countTasksForJob(queuedTasks, ingestJobId); } } /** * Gets a count of the running ingest tasks for a given ingest job. * - * @param pipelineId The ID of the ingest pipeline for the job. + * @param ingestJobId The ingest job ID. * - * @return + * @return The count. */ - int countRunningTasksForJob(long pipelineId) { + int countRunningTasksForJob(long ingestJobId) { synchronized (this) { - return IngestTasksScheduler.countTasksForJob(tasksInProgress, pipelineId); + return IngestTasksScheduler.countTasksForJob(tasksInProgress, ingestJobId); } } } /** - * A snapshot of ingest tasks data for an ingest job. + * A snapshot of the sizes of the ingest task lists and queues for a given + * ingest job. */ - static final class IngestJobTasksSnapshot implements Serializable { + static final class IngestTasksSnapshot implements Serializable { private static final long serialVersionUID = 1L; - private final long jobId; - private final long dsQueueSize; + private final long ingestJobId; + private final long dataSourceQueueSize; private final long rootQueueSize; private final long dirQueueSize; private final long fileQueueSize; - private final long runningListSize; - private final long streamingQueueSize; + private final long inProgressListSize; + private final long streamedFileQueueSize; private final long artifactsQueueSize; + private final long resultsQueueSize; /** - * RJCTODO + * Constructs a snapshot of the sizes of the ingest task lists and + * queues for a given ingest job. * - * Constructs a snapshot of ingest tasks data for an ingest job. - * - * @param jobId The identifier associated with the job. - * @param dsQueueSize - * @param rootQueueSize - * @param dirQueueSize - * @param fileQueueSize - * @param runningListSize - * @param streamingQueueSize - * @param artifactsQueueSize + * @param ingestJobId The ingest job ID. + * @param dataSourceQueueSize The number of queued ingest tasks for + * data sources. + * @param rootQueueSize The number of queued ingest tasks for + * "root" file system objects. + * @param dirQueueSize The number of queued ingest tasks for + * directories. + * @param fileQueueSize The number of queued ingest tasks for + * files. + * @param inProgressListSize The number of ingest tasks in progress. + * @param streamedFileQueueSize The number of queued ingest tasks for + * streamed files. + * @param artifactsQueueSize The number of queued ingest tasks for + * data artifacts. + * @param resultsQueueSize The number of queued ingest tasks for + * analysis results. */ - IngestJobTasksSnapshot(long jobId, long dsQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize, - long runningListSize, long streamingQueueSize, long artifactsQueueSize) { - this.jobId = jobId; - this.dsQueueSize = dsQueueSize; + IngestTasksSnapshot(long ingestJobId, long dataSourceQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize, long streamedFileQueueSize, long artifactsQueueSize, long resultsQueueSize, long inProgressListSize) { + this.ingestJobId = ingestJobId; + this.dataSourceQueueSize = dataSourceQueueSize; this.rootQueueSize = rootQueueSize; this.dirQueueSize = dirQueueSize; this.fileQueueSize = fileQueueSize; - this.runningListSize = runningListSize; - this.streamingQueueSize = streamingQueueSize; + this.streamedFileQueueSize = streamedFileQueueSize; this.artifactsQueueSize = artifactsQueueSize; + this.resultsQueueSize = resultsQueueSize; + this.inProgressListSize = inProgressListSize; } /** - * Gets the identifier associated with the ingest job for which this - * snapshot was created. + * Gets the ingest job ID of the ingest job for which this snapshot was + * created. * - * @return The ingest job identifier. + * @return The ingest job ID. */ - long getJobId() { - return jobId; + long getIngestJobId() { + return ingestJobId; } /** - * Gets the number of file ingest tasks associated with the job that are - * in the root directories queue. + * Gets the number of file ingest tasks for this job that are in the + * file system root objects queue. * * @return The tasks count. */ @@ -1113,35 +1190,75 @@ final class IngestTasksScheduler { } /** - * Gets the number of file ingest tasks associated with the job that are - * in the root directories queue. + * Gets the number of file ingest tasks for this job that are in the + * ditrectories queue. * * @return The tasks count. */ - long getDirectoryTasksQueueSize() { + long getDirQueueSize() { return dirQueueSize; } + /** + * Gets the number of file ingest tasks for this job that are in the + * files queue. + * + * @return The tasks count. + */ long getFileQueueSize() { return fileQueueSize; } - long getStreamingQueueSize() { - return streamingQueueSize; + /** + * Gets the number of file ingest tasks for this job that are in the + * streamed files queue. + * + * @return The tasks count. + */ + long getStreamedFilesQueueSize() { + return streamedFileQueueSize; } - long getDsQueueSize() { - return dsQueueSize; + /** + * Gets the number of data source ingest tasks for this job that are in + * the data sources queue. + * + * @return The tasks count. + */ + long getDataSourceQueueSize() { + return dataSourceQueueSize; } - long getRunningListSize() { - return runningListSize; - } - + /** + * Gets the number of data artifact ingest tasks for this job that are + * in the data artifacts queue. + * + * @return The tasks count. + */ long getArtifactsQueueSize() { return artifactsQueueSize; } + /** + * Gets the number of analysis result ingest tasks for this job that are + * in the analysis results queue. + * + * @return The tasks count. + */ + long getResultsQueueSize() { + return resultsQueueSize; + } + + /** + * Gets the number of ingest tasks for this job that are in the tasks in + * progress list. + * + * @return The tasks count. + */ + long getProgressListSize() { + return inProgressListSize; + } + } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/Snapshot.java b/Core/src/org/sleuthkit/autopsy/ingest/Snapshot.java index d67e3f2441..5ab6fda8bf 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/Snapshot.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/Snapshot.java @@ -39,7 +39,7 @@ public final class Snapshot implements Serializable { private final Date fileIngestStartTime; private final long processedFiles; private final long estimatedFilesToProcess; - private final IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot; + private final IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot; transient private final boolean jobCancelled; transient private final IngestJob.CancellationReason jobCancellationReason; transient private final List cancelledDataSourceModules; @@ -52,7 +52,7 @@ public final class Snapshot implements Serializable { boolean fileIngestRunning, Date fileIngestStartTime, boolean jobCancelled, IngestJob.CancellationReason cancellationReason, List cancelledModules, long processedFiles, long estimatedFilesToProcess, - long snapshotTime, IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot) { + long snapshotTime, IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot) { this.dataSource = dataSourceName; this.jobId = jobId; this.jobStartTime = jobStartTime; @@ -162,7 +162,7 @@ public final class Snapshot implements Serializable { if (null == this.tasksSnapshot) { return 0; } - return this.tasksSnapshot.getDirectoryTasksQueueSize(); + return this.tasksSnapshot.getDirQueueSize(); } long getFileQueueSize() { @@ -176,21 +176,21 @@ public final class Snapshot implements Serializable { if (null == this.tasksSnapshot) { return 0; } - return this.tasksSnapshot.getDsQueueSize(); + return this.tasksSnapshot.getDataSourceQueueSize(); } long getStreamingQueueSize() { if (null == this.tasksSnapshot) { return 0; } - return this.tasksSnapshot.getStreamingQueueSize(); + return this.tasksSnapshot.getStreamedFilesQueueSize(); } long getRunningListSize() { if (null == this.tasksSnapshot) { return 0; } - return this.tasksSnapshot.getRunningListSize(); + return this.tasksSnapshot.getProgressListSize(); } long getArtifactTasksQueueSize() { @@ -200,6 +200,13 @@ public final class Snapshot implements Serializable { return tasksSnapshot.getArtifactsQueueSize(); } + long getResultTasksQueueSize() { + if (tasksSnapshot == null) { + return 0; + } + return tasksSnapshot.getResultsQueueSize(); + } + boolean isCancelled() { return this.jobCancelled; } diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED index 388f951276..1509902680 100755 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED @@ -371,7 +371,6 @@ SolrSearchService.exceptionMessage.noCurrentSolrCore=IndexMetadata did not conta SolrSearchService.exceptionMessage.noIndexMetadata=Unable to create IndexMetaData from case directory: {0} # {0} - collection name SolrSearchService.exceptionMessage.unableToDeleteCollection=Unable to delete collection {0} -SolrSearchService.indexingError=Unable to index blackboard artifact. SolrSearchService.ServiceName=Solr Keyword Search Service SolrSearchService.DeleteDataSource.msg=Error Deleting Solr data for data source id {0} DropdownSingleTermSearchPanel.dataSourceCheckBox.text=Restrict search to the selected data sources: diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/IngestSearchRunner.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/IngestSearchRunner.java index bb2fbe189a..9cd33a8167 100755 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/IngestSearchRunner.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/IngestSearchRunner.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; +import javax.annotation.concurrent.GuardedBy; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; @@ -52,34 +53,44 @@ import org.sleuthkit.autopsy.ingest.IngestMessage; import org.sleuthkit.autopsy.ingest.IngestServices; /** - * Singleton keyword search manager: Launches search threads for each job and - * performs commits, both on timed intervals. + * Performs periodic and final keyword searches for ingest jobs. Periodic + * searches are done in background tasks. This represents a careful working + * around of the contract for IngestModule.process(). Final searches are done + * synchronously in the calling thread, as required by the contract for + * IngestModule.shutDown(). */ final class IngestSearchRunner { private static final Logger logger = Logger.getLogger(IngestSearchRunner.class.getName()); private static IngestSearchRunner instance = null; - private IngestServices services = IngestServices.getInstance(); + private final IngestServices services = IngestServices.getInstance(); private Ingester ingester = null; private long currentUpdateIntervalMs; - private volatile boolean periodicSearchTaskRunning = false; - private Future jobProcessingTaskFuture; - private final ScheduledThreadPoolExecutor jobProcessingExecutor; + private volatile boolean periodicSearchTaskRunning; + private volatile Future periodicSearchTaskHandle; + private final ScheduledThreadPoolExecutor periodicSearchTaskExecutor; private static final int NUM_SEARCH_SCHEDULING_THREADS = 1; - private static final String SEARCH_SCHEDULER_THREAD_NAME = "periodic-search-scheduler-%d"; + private static final String SEARCH_SCHEDULER_THREAD_NAME = "periodic-search-scheduling-%d"; + private final Map jobs = new ConcurrentHashMap<>(); // Ingest job ID to search job info + private final boolean usingNetBeansGUI = RuntimeProperties.runningWithGUI(); - // maps a jobID to the search - private Map jobs = new ConcurrentHashMap<>(); - - IngestSearchRunner() { + /* + * Constructs a singleton object that performs periodic and final keyword + * searches for ingest jobs. Periodic searches are done in background tasks. + * This represents a careful working around of the contract for + * IngestModule.process(). Final searches are done synchronously in the + * calling thread, as required by the contract for IngestModule.shutDown(). + */ + private IngestSearchRunner() { currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000; ingester = Ingester.getDefault(); - jobProcessingExecutor = new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build()); + periodicSearchTaskExecutor = new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build()); } /** + * Gets the ingest search runner singleton. * - * @return the singleton object + * @return The ingest search runner. */ public static synchronized IngestSearchRunner getInstance() { if (instance == null) { @@ -89,72 +100,75 @@ final class IngestSearchRunner { } /** + * Starts the search job for an ingest job. * - * @param jobContext - * @param keywordListNames + * @param jobContext The ingest job context. + * @param keywordListNames The names of the keyword search lists for the + * ingest job. */ public synchronized void startJob(IngestJobContext jobContext, List keywordListNames) { long jobId = jobContext.getJobId(); if (jobs.containsKey(jobId) == false) { - logger.log(Level.INFO, "Adding job {0}", jobId); //NON-NLS SearchJobInfo jobData = new SearchJobInfo(jobContext, keywordListNames); jobs.put(jobId, jobData); } - // keep track of how many threads / module instances from this job have asked for this + /* + * Keep track of the number of keyword search file ingest modules that + * are doing analysis for the ingest job, i.e., that have called this + * method. This is needed by endJob(). + */ jobs.get(jobId).incrementModuleReferenceCount(); - // start the timer, if needed + /* + * Start a periodic search task in the + */ if ((jobs.size() > 0) && (periodicSearchTaskRunning == false)) { - // reset the default periodic search frequency to the user setting - logger.log(Level.INFO, "Resetting periodic search time out to default value"); //NON-NLS currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000; - jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS); + periodicSearchTaskHandle = periodicSearchTaskExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS); periodicSearchTaskRunning = true; } } /** - * Perform normal finishing of searching for this job, including one last - * commit and search. Blocks until the final search is complete. + * Finishes a search job for an ingest job. * - * @param jobId + * @param jobId The ingest job ID. */ public synchronized void endJob(long jobId) { + /* + * Only complete the job if this is the last keyword search file ingest + * module doing annalysis for this job. + */ SearchJobInfo job; - boolean readyForFinalSearch = false; job = jobs.get(jobId); if (job == null) { - return; + return; // RJCTODO: SEVERE } - - // Only do final search if this is the last module/thread in this job to call endJob() - if (job.decrementModuleReferenceCount() == 0) { + if (job.decrementModuleReferenceCount() != 0) { jobs.remove(jobId); - readyForFinalSearch = true; } - if (readyForFinalSearch) { - logger.log(Level.INFO, "Commiting search index before final search for search job {0}", job.getJobId()); //NON-NLS - commit(); - doFinalSearch(job); //this will block until it's done + /* + * Commit the index and do the final search. The final search is done in + * the ingest thread that shutDown() on the keyword search file ingest + * module, per the contract of IngestModule.shutDwon(). + */ + logger.log(Level.INFO, "Commiting search index before final search for search job {0}", job.getJobId()); //NON-NLS + commit(); + logger.log(Level.INFO, "Starting final search for search job {0}", job.getJobId()); //NON-NLS + doFinalSearch(job); + logger.log(Level.INFO, "Final search for search job {0} completed", job.getJobId()); //NON-NLS - // new jobs could have been added while we were doing final search - if (jobs.isEmpty()) { - // no more jobs left. stop the PeriodicSearchTask. - // A new one will be created for future jobs. - logger.log(Level.INFO, "No more search jobs. Stopping periodic search task"); //NON-NLS - periodicSearchTaskRunning = false; - jobProcessingTaskFuture.cancel(true); - } + if (jobs.isEmpty()) { + cancelPeriodicSearchSchedulingTask(); } } /** - * Immediate stop and removal of job from SearchRunner. Cancels the - * associated search worker if it's still running. + * Stops the search job for an ingest job. * - * @param jobId + * @param jobId The ingest job ID. */ public synchronized void stopJob(long jobId) { logger.log(Level.INFO, "Stopping search job {0}", jobId); //NON-NLS @@ -166,7 +180,10 @@ final class IngestSearchRunner { return; } - //stop currentSearcher + /* + * Request cancellation of the current keyword search, whether it is a + * preiodic search or a final search. + */ IngestSearchRunner.Searcher currentSearcher = job.getCurrentSearcher(); if ((currentSearcher != null) && (!currentSearcher.isDone())) { logger.log(Level.INFO, "Cancelling search job {0}", jobId); //NON-NLS @@ -176,19 +193,16 @@ final class IngestSearchRunner { jobs.remove(jobId); if (jobs.isEmpty()) { - // no more jobs left. stop the PeriodicSearchTask. - // A new one will be created for future jobs. - logger.log(Level.INFO, "No more search jobs. Stopping periodic search task"); //NON-NLS - periodicSearchTaskRunning = false; - jobProcessingTaskFuture.cancel(true); + cancelPeriodicSearchSchedulingTask(); } } /** - * Add these lists to all of the jobs. Used when user wants to search for a - * list while ingest has already started. + * Adds the given keyword list names to the set of keyword lists to be + * searched by ALL keyword search jobs. This supports adding one or more + * keyword search lists to ingest jobs already in progress. * - * @param keywordListNames + * @param keywordListNames The n ames of the additional keyword lists. */ public synchronized void addKeywordListsToAllJobs(List keywordListNames) { for (String listName : keywordListNames) { @@ -200,155 +214,171 @@ final class IngestSearchRunner { } /** - * Commits index and notifies listeners of index update + * Commits the Solr index for the current case and publishes an event + * indicating the current number of indexed items (this is no longer just + * files). */ private void commit() { ingester.commit(); - // Signal a potential change in number of text_ingested files + /* + * Publish an event advertising the number of indexed items. Note that + * this is no longer the number of indexed files, since the text of many + * items in addition to files is indexed. + */ try { final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles(); KeywordSearch.fireNumIndexedFilesChange(null, numIndexedFiles); } catch (NoOpenCoreException | KeywordSearchModuleException ex) { - logger.log(Level.SEVERE, "Error executing Solr query to check number of indexed files", ex); //NON-NLS + logger.log(Level.SEVERE, "Error executing Solr query for number of indexed files", ex); //NON-NLS } } /** - * A final search waits for any still-running workers, and then executes a - * new one and waits until that is done. + * Performs the final keyword search for an ingest job. The search is done + * synchronously, as required by the contract for IngestModule.shutDown(). * - * @param job + * @param job The keyword search job info. */ private void doFinalSearch(SearchJobInfo job) { - // Run one last search as there are probably some new files committed - logger.log(Level.INFO, "Starting final search for search job {0}", job.getJobId()); //NON-NLS if (!job.getKeywordListNames().isEmpty()) { try { - // In case this job still has a worker running, wait for it to finish - logger.log(Level.INFO, "Checking for previous search for search job {0} before executing final search", job.getJobId()); //NON-NLS + /* + * Wait for any periodic searches being done in a SwingWorker + * pool thread to finish. + */ job.waitForCurrentWorker(); - IngestSearchRunner.Searcher finalSearcher = new IngestSearchRunner.Searcher(job, true); - job.setCurrentSearcher(finalSearcher); //save the ref - logger.log(Level.INFO, "Kicking off final search for search job {0}", job.getJobId()); //NON-NLS - finalSearcher.execute(); //start thread - - // block until the search is complete - logger.log(Level.INFO, "Waiting for final search for search job {0}", job.getJobId()); //NON-NLS - finalSearcher.get(); - logger.log(Level.INFO, "Final search for search job {0} completed", job.getJobId()); //NON-NLS - + job.setCurrentSearcher(finalSearcher); + /* + * Do the final search synchronously on the current ingest + * thread, per the contract specified + */ + finalSearcher.doInBackground(); } catch (InterruptedException | CancellationException ex) { logger.log(Level.INFO, "Final search for search job {0} interrupted or cancelled", job.getJobId()); //NON-NLS - } catch (ExecutionException ex) { + } catch (Exception ex) { logger.log(Level.SEVERE, String.format("Final search for search job %d failed", job.getJobId()), ex); //NON-NLS } } } /** - * Task to perform periodic searches for each job (does a single index - * commit first) + * Cancels the current periodic search scheduling task. */ - private final class PeriodicSearchTask implements Runnable { - - private final Logger logger = Logger.getLogger(IngestSearchRunner.PeriodicSearchTask.class.getName()); - - @Override - public void run() { - // If no jobs then cancel the task. If more job(s) come along, a new task will start up. - if (jobs.isEmpty() || jobProcessingTaskFuture.isCancelled()) { - logger.log(Level.INFO, "Exiting periodic search task"); //NON-NLS - periodicSearchTaskRunning = false; - return; - } - - commit(); - - logger.log(Level.INFO, "Starting periodic searches"); - final StopWatch stopWatch = new StopWatch(); - stopWatch.start(); - // NOTE: contents of "jobs" ConcurrentHashMap can be modified in stopJob() and endJob() while we are inside this loop - for (Iterator> iterator = jobs.entrySet().iterator(); iterator.hasNext();) { - SearchJobInfo job = iterator.next().getValue(); - - if (jobProcessingTaskFuture.isCancelled()) { - logger.log(Level.INFO, "Search has been cancelled. Exiting periodic search task."); //NON-NLS - periodicSearchTaskRunning = false; - return; - } - - // If no lists or the worker is already running then skip it - if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) { - // Spawn a search thread for each job - logger.log(Level.INFO, "Executing periodic search for search job {0}", job.getJobId()); - Searcher searcher = new Searcher(job); // SwingWorker - job.setCurrentSearcher(searcher); //save the ref - searcher.execute(); //start thread - job.setWorkerRunning(true); - - try { - // wait for the searcher to finish - searcher.get(); - } catch (InterruptedException | ExecutionException ex) { - logger.log(Level.SEVERE, "Error performing keyword search: {0}", ex.getMessage()); //NON-NLS - services.postMessage(IngestMessage.createErrorMessage(KeywordSearchModuleFactory.getModuleName(), - NbBundle.getMessage(this.getClass(), - "SearchRunner.Searcher.done.err.msg"), ex.getMessage())); - }// catch and ignore if we were cancelled - catch (java.util.concurrent.CancellationException ex) { - } - } - } - stopWatch.stop(); - logger.log(Level.INFO, "All periodic searches cumulatively took {0} secs", stopWatch.getElapsedTimeSecs()); //NON-NLS - - // calculate "hold off" time - recalculateUpdateIntervalTime(stopWatch.getElapsedTimeSecs()); // ELDEBUG - - // schedule next PeriodicSearchTask - jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS); - - // exit this thread - return; - } - - private void recalculateUpdateIntervalTime(long lastSerchTimeSec) { - // If periodic search takes more than 1/4 of the current periodic search interval, then double the search interval - if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) { - return; - } - // double the search interval - currentUpdateIntervalMs = currentUpdateIntervalMs * 2; - logger.log(Level.WARNING, "Last periodic search took {0} sec. Increasing search interval to {1} sec", new Object[]{lastSerchTimeSec, currentUpdateIntervalMs / 1000}); - return; + private synchronized void cancelPeriodicSearchSchedulingTask() { + if (periodicSearchTaskHandle != null) { + logger.log(Level.INFO, "No more search jobs, stopping periodic search scheduling"); //NON-NLS + periodicSearchTaskHandle.cancel(true); + periodicSearchTaskRunning = false; } } /** - * Data structure to keep track of keyword lists, current results, and - * search running status for each jobid + * Task that runs in ScheduledThreadPoolExecutor to periodically start and + * wait for keyword search tasks for each keyword search job in progress. + * The keyword search tasks for individual ingest jobs are implemented as + * SwingWorkers to support legacy APIs. + */ + private final class PeriodicSearchTask implements Runnable { + + @Override + public void run() { + /* + * If there are no more jobs or this task has been cancelled, exit. + */ + if (jobs.isEmpty() || periodicSearchTaskHandle.isCancelled()) { + logger.log(Level.INFO, "Periodic search scheduling task has been cancelled, exiting"); //NON-NLS + periodicSearchTaskRunning = false; + return; + } + + /* + * Commit the Solr index for the current case before doing the + * searches. + */ + commit(); + + /* + * Do a keyword search for each ingest job in progress. When the + * searches are done, recalculate the "hold off" time between + * searches to prevent back-to-back periodic searches and schedule + * the nect periodic search task. + */ + final StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + for (Iterator> iterator = jobs.entrySet().iterator(); iterator.hasNext();) { + SearchJobInfo job = iterator.next().getValue(); + + if (periodicSearchTaskHandle.isCancelled()) { + logger.log(Level.INFO, "Periodic search scheduling task has been cancelled, exiting"); //NON-NLS + periodicSearchTaskRunning = false; + return; + } + + if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) { + logger.log(Level.INFO, "Starting periodic search for search job {0}", job.getJobId()); + Searcher searcher = new Searcher(job, false); + job.setCurrentSearcher(searcher); + searcher.execute(); + job.setWorkerRunning(true); + try { + searcher.get(); + } catch (InterruptedException | ExecutionException ex) { + logger.log(Level.SEVERE, String.format("Error performing keyword search for ingest job %d", job.getJobId()), ex); //NON-NLS + services.postMessage(IngestMessage.createErrorMessage( + KeywordSearchModuleFactory.getModuleName(), + NbBundle.getMessage(this.getClass(), "SearchRunner.Searcher.done.err.msg"), ex.getMessage())); + } catch (java.util.concurrent.CancellationException ex) { + logger.log(Level.SEVERE, String.format("Keyword search for ingest job %d cancelled", job.getJobId()), ex); //NON-NLS + } + } + } + stopWatch.stop(); + logger.log(Level.INFO, "Periodic searches for all ingest jobs cumulatively took {0} secs", stopWatch.getElapsedTimeSecs()); //NON-NLS + recalculateUpdateIntervalTime(stopWatch.getElapsedTimeSecs()); // ELDEBUG + periodicSearchTaskHandle = periodicSearchTaskExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS); + } + + /** + * Sets the time interval between periodic keyword searches to avoid + * running back-to-back searches. If the most recent round of searches + * took longer that 1/4 of the current interval, doubles the interval. + * + * @param lastSerchTimeSec The time in seconds used to execute the most + * recent round of keword searches. + */ + private void recalculateUpdateIntervalTime(long lastSerchTimeSec) { + if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) { + return; + } + currentUpdateIntervalMs *= 2; + logger.log(Level.WARNING, "Last periodic search took {0} sec. Increasing search interval to {1} sec", new Object[]{lastSerchTimeSec, currentUpdateIntervalMs / 1000}); + } + } + + /** + * A data structure to keep track of the keyword lists, current results, and + * search running status for an ingest job. */ private class SearchJobInfo { private final IngestJobContext jobContext; private final long jobId; private final long dataSourceId; - // mutable state: private volatile boolean workerRunning; - private List keywordListNames; //guarded by SearchJobInfo.this - - // Map of keyword to the object ids that contain a hit - private Map> currentResults; //guarded by SearchJobInfo.this + @GuardedBy("this") + private final List keywordListNames; + @GuardedBy("this") + private final Map> currentResults; // Keyword to object IDs of items with hits private IngestSearchRunner.Searcher currentSearcher; - private AtomicLong moduleReferenceCount = new AtomicLong(0); - private final Object finalSearchLock = new Object(); //used for a condition wait + private final AtomicLong moduleReferenceCount = new AtomicLong(0); + private final Object finalSearchLock = new Object(); private SearchJobInfo(IngestJobContext jobContext, List keywordListNames) { this.jobContext = jobContext; - this.jobId = jobContext.getJobId(); - this.dataSourceId = jobContext.getDataSource().getId(); + jobId = jobContext.getJobId(); + dataSourceId = jobContext.getDataSource().getId(); this.keywordListNames = new ArrayList<>(keywordListNames); currentResults = new HashMap<>(); workerRunning = false; @@ -410,41 +440,40 @@ final class IngestSearchRunner { } /** - * In case this job still has a worker running, wait for it to finish + * Waits for the current search task to complete. * * @throws InterruptedException */ private void waitForCurrentWorker() throws InterruptedException { synchronized (finalSearchLock) { while (workerRunning) { - logger.log(Level.INFO, "Waiting for previous worker to finish"); //NON-NLS - finalSearchLock.wait(); //wait() releases the lock - logger.log(Level.INFO, "Notified previous worker finished"); //NON-NLS + logger.log(Level.INFO, String.format("Waiting for previous search task for job %d to finish", jobId)); //NON-NLS + finalSearchLock.wait(); + logger.log(Level.INFO, String.format("Notified previous search task for job %d to finish", jobId)); //NON-NLS } } } /** - * Unset workerRunning and wake up thread(s) waiting on finalSearchLock + * Signals any threads waiting on the current search task to complete. */ private void searchNotify() { synchronized (finalSearchLock) { - logger.log(Level.INFO, "Notifying after finishing search"); //NON-NLS workerRunning = false; finalSearchLock.notify(); } } } - /** - * Searcher responsible for searching the current index and writing results - * to blackboard and the inbox. Also, posts results to listeners as Ingest - * data events. Searches entire index, and keeps track of only new results - * to report and save. Runs as a background thread. + /* + * A SwingWorker responsible for searching the Solr index of the current + * case for the keywords for an ingest job. Keyword hit analysis results are + * created and posted to the blackboard and notifications are sent to the + * ingest inbox. */ private final class Searcher extends SwingWorker { - /** + /* * Searcher has private copies/snapshots of the lists and keywords */ private final SearchJobInfo job; @@ -452,31 +481,22 @@ final class IngestSearchRunner { private final List keywordListNames; // lists currently being searched private final List keywordLists; private final Map keywordToList; //keyword to list name mapping - private final boolean usingNetBeansGUI; @ThreadConfined(type = ThreadConfined.ThreadType.AWT) private ProgressHandle progressIndicator; private boolean finalRun = false; - Searcher(SearchJobInfo job) { + Searcher(SearchJobInfo job, boolean finalRun) { this.job = job; + this.finalRun = finalRun; keywordListNames = job.getKeywordListNames(); keywords = new ArrayList<>(); keywordToList = new HashMap<>(); keywordLists = new ArrayList<>(); - //keywords are populated as searcher runs - usingNetBeansGUI = RuntimeProperties.runningWithGUI(); - } - - Searcher(SearchJobInfo job, boolean finalRun) { - this(job); - this.finalRun = finalRun; } @Override @Messages("SearchRunner.query.exception.msg=Error performing query:") protected Object doInBackground() throws Exception { - final StopWatch stopWatch = new StopWatch(); - stopWatch.start(); try { if (usingNetBeansGUI) { /* @@ -498,7 +518,9 @@ final class IngestSearchRunner { progressIndicator = ProgressHandle.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - progressIndicator.setDisplayName(displayName + " " + NbBundle.getMessage(this.getClass(), "SearchRunner.doInBackGround.cancelMsg")); + if (progressIndicator != null) { + progressIndicator.setDisplayName(displayName + " " + NbBundle.getMessage(this.getClass(), "SearchRunner.doInBackGround.cancelMsg")); + } logger.log(Level.INFO, "Search cancelled by user"); //NON-NLS new Thread(() -> { IngestSearchRunner.Searcher.this.cancel(true); @@ -568,7 +590,7 @@ final class IngestSearchRunner { } } } catch (Exception ex) { - logger.log(Level.WARNING, "Error occurred during keyword search", ex); //NON-NLS + logger.log(Level.SEVERE, String.format("Error performing keyword search for ingest job %d", job.getJobId()), ex); //NON-NLS } finally { if (progressIndicator != null) { SwingUtilities.invokeLater(new Runnable() { @@ -579,8 +601,6 @@ final class IngestSearchRunner { } }); } - stopWatch.stop(); - logger.log(Level.INFO, "Searcher took {0} secs to run (final = {1})", new Object[]{stopWatch.getElapsedTimeSecs(), this.finalRun}); //NON-NLS // In case a thread is waiting on this worker to be done job.searchNotify(); } @@ -681,4 +701,5 @@ final class IngestSearchRunner { return newResults; } } + } diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KeywordSearchModuleFactory.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KeywordSearchModuleFactory.java index 17c760323a..c6b2c39f88 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KeywordSearchModuleFactory.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KeywordSearchModuleFactory.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2014 Basis Technology Corp. + * Copyright 2014-2021 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,7 +25,8 @@ import java.util.List; import org.openide.util.NbBundle; import org.openide.util.lookup.ServiceProvider; import org.sleuthkit.autopsy.coreutils.Version; -import org.sleuthkit.autopsy.ingest.IngestModuleFactoryAdapter; +import org.sleuthkit.autopsy.ingest.AnalysisResultIngestModule; +import org.sleuthkit.autopsy.ingest.DataArtifactIngestModule; import org.sleuthkit.autopsy.ingest.FileIngestModule; import org.sleuthkit.autopsy.ingest.IngestModuleFactory; import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettings; @@ -37,7 +38,7 @@ import org.sleuthkit.autopsy.ingest.IngestModuleGlobalSettingsPanel; * searching. */ @ServiceProvider(service = IngestModuleFactory.class) -public class KeywordSearchModuleFactory extends IngestModuleFactoryAdapter { +public class KeywordSearchModuleFactory implements IngestModuleFactory { private static final HashSet defaultDisabledKeywordListNames = new HashSet<>(Arrays.asList("Phone Numbers", "IP Addresses", "URLs", "Credit Card Numbers")); //NON-NLS private KeywordSearchJobSettingsPanel jobSettingsPanel = null; @@ -121,4 +122,25 @@ public class KeywordSearchModuleFactory extends IngestModuleFactoryAdapter { } return new KeywordSearchIngestModule((KeywordSearchJobSettings) settings); } + + @Override + public boolean isDataArtifactIngestModuleFactory() { + return true; + } + + @Override + public DataArtifactIngestModule createDataArtifactIngestModule(IngestModuleIngestJobSettings settings) { + return new KwsDataArtifactIngestModule(); + } + + @Override + public boolean isAnalysisResultIngestModuleFactory() { + return true; + } + + @Override + public AnalysisResultIngestModule createAnalysisResultIngestModule(IngestModuleIngestJobSettings settings) { + return new KwsAnalysisResultIngestModule(); + } + } diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsAnalysisResultIngestModule.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsAnalysisResultIngestModule.java new file mode 100755 index 0000000000..b82c4d91fe --- /dev/null +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsAnalysisResultIngestModule.java @@ -0,0 +1,64 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2021-2021 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.keywordsearch; + +import java.util.logging.Level; +import org.openide.util.Lookup; +import org.sleuthkit.autopsy.coreutils.Logger; +import org.sleuthkit.autopsy.ingest.AnalysisResultIngestModule; +import org.sleuthkit.autopsy.ingest.IngestJobContext; +import org.sleuthkit.autopsy.ingest.IngestModule; +import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService; +import org.sleuthkit.datamodel.AnalysisResult; +import org.sleuthkit.datamodel.BlackboardArtifact; +import org.sleuthkit.datamodel.TskCoreException; + +/** + * An analysis result ingest module that indexes text for keyword search. All + * keyword searching of indexed text, whether from files, data artifacts, or + * analysis results, including the final keyword search of an ingest job, is + * done in the last instance of the companion keyword search file ingest module. + */ +public class KwsAnalysisResultIngestModule implements AnalysisResultIngestModule { + + private static final Logger LOGGER = Logger.getLogger(KeywordSearchIngestModule.class.getName()); + private static final int TSK_KEYWORD_HIT_TYPE_ID = BlackboardArtifact.Type.TSK_KEYWORD_HIT.getTypeID(); + private IngestJobContext context; + private KeywordSearchService searchService; + + @Override + public void startUp(IngestJobContext context) throws IngestModule.IngestModuleException { + this.context = context; + searchService = Lookup.getDefault().lookup(KeywordSearchService.class); + } + + @Override + public IngestModule.ProcessResult process(AnalysisResult result) { + try { + if (result.getType().getTypeID() != TSK_KEYWORD_HIT_TYPE_ID) { + searchService.index(result); + } + } catch (TskCoreException ex) { + LOGGER.log(Level.SEVERE, String.format("Error indexing analysis result '%s' (job ID=%d)", result, context.getJobId()), ex); //NON-NLS + return IngestModule.ProcessResult.ERROR; + } + return IngestModule.ProcessResult.OK; + } + +} diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsDataArtifactIngestModule.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsDataArtifactIngestModule.java new file mode 100755 index 0000000000..fe4cac8b4f --- /dev/null +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/KwsDataArtifactIngestModule.java @@ -0,0 +1,63 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2021-2021 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.keywordsearch; + +import java.util.logging.Level; +import org.openide.util.Lookup; +import org.sleuthkit.autopsy.coreutils.Logger; +import org.sleuthkit.autopsy.ingest.DataArtifactIngestModule; +import org.sleuthkit.autopsy.ingest.IngestJobContext; +import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService; +import org.sleuthkit.datamodel.BlackboardArtifact; +import org.sleuthkit.datamodel.DataArtifact; +import org.sleuthkit.datamodel.TskCoreException; + +/** + * A data artifact ingest module that indexes text for keyword search. All + * keyword searching of indexed text, whether from files, data artifacts, or + * analysis results, including the final keyword search of an ingest job, is + * done in the last instance of the companion keyword search file ingest module. + */ +public class KwsDataArtifactIngestModule implements DataArtifactIngestModule { + + private static final Logger LOGGER = Logger.getLogger(KeywordSearchIngestModule.class.getName()); + private static final int TSK_ASSOCIATED_OBJECT_TYPE_ID = BlackboardArtifact.Type.TSK_ASSOCIATED_OBJECT.getTypeID(); + private IngestJobContext context; + private KeywordSearchService searchService; + + @Override + public void startUp(IngestJobContext context) throws IngestModuleException { + this.context = context; + searchService = Lookup.getDefault().lookup(KeywordSearchService.class); + } + + @Override + public ProcessResult process(DataArtifact artifact) { + try { + if (artifact.getType().getTypeID() != TSK_ASSOCIATED_OBJECT_TYPE_ID) { + searchService.index(artifact); + } + } catch (TskCoreException ex) { + LOGGER.log(Level.SEVERE, String.format("Error indexing data artifact '%s' (job ID=%d)", artifact, context.getJobId()), ex); //NON-NLS + return ProcessResult.ERROR; + } + return ProcessResult.OK; + } + +} diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/QueryResults.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/QueryResults.java index 313bad3e60..5492f768d8 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/QueryResults.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/QueryResults.java @@ -28,8 +28,6 @@ import java.util.logging.Level; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; import org.apache.commons.lang.StringUtils; -import org.netbeans.api.progress.ProgressHandle; -import org.netbeans.api.progress.aggregate.ProgressContributor; import org.openide.util.NbBundle; import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.casemodule.NoCurrentCaseException; @@ -53,8 +51,6 @@ import org.sleuthkit.datamodel.TskCoreException; * about the search hits to the ingest inbox, and publishing an event to notify * subscribers of the blackboard posts. */ - - class QueryResults { private static final Logger logger = Logger.getLogger(QueryResults.class.getName()); @@ -141,7 +137,7 @@ class QueryResults { * @param notifyInbox Whether or not to write a message to the ingest * messages inbox if there is a keyword hit in the text * exrtacted from the text source object. - * @param saveResults Flag whether to save search results as KWS artifacts. + * @param saveResults Whether or not to create keyword hit analysis results. * @param ingestJobId The numeric identifier of the ingest job within which * the artifacts are being created, may be null. */ diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java index e5c3f8ae03..5cb0d3ae20 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2015-2020 Basis Technology Corp. + * Copyright 2015-2021 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,7 +18,6 @@ */ package org.sleuthkit.autopsy.keywordsearch; -import com.google.common.eventbus.Subscribe; import java.io.File; import java.io.IOException; import java.io.Reader; @@ -36,14 +35,12 @@ import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.casemodule.CaseMetadata; import org.sleuthkit.autopsy.coreutils.FileUtil; import org.sleuthkit.autopsy.coreutils.Logger; -import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; import org.sleuthkit.autopsy.ingest.IngestManager; import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService; import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchServiceException; import org.sleuthkit.autopsy.progress.ProgressIndicator; import org.sleuthkit.autopsy.textextractors.TextExtractor; import org.sleuthkit.autopsy.textextractors.TextExtractorFactory; -import org.sleuthkit.datamodel.Blackboard; import org.sleuthkit.datamodel.BlackboardArtifact; import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.TskCoreException; @@ -66,13 +63,10 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { /** * Indexes the given content for keyword search. * - * IMPORTANT: Currently, there are two correct uses for this code: - * - * 1) Indexing an artifact created during while either the file level ingest - * module pipeline or the first stage data source level ingest module - * pipeline of an ingest job is running. - * - * 2) Indexing a report. + * IMPORTANT: This indexes the given content, but does not execute a keyword + * search. For the text of the content to be searched, the indexing has to + * occur either in the context of an ingest job configured for keyword + * search, or in the context of an ad hoc keyword search. * * @param content The content to index. * @@ -80,19 +74,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { */ @Override public void index(Content content) throws TskCoreException { - /* - * TODO (JIRA-1099): The following code has some issues that need to be - * resolved. For artifacts, it is assumed that the posting of artifacts - * is only occuring during an ingest job with an enabled keyword search - * ingest module handling index commits; it also assumes that the - * artifacts are only posted by modules in the either the file level - * ingest pipeline or the first stage data source level ingest pipeline, - * so that the artifacts will be searched during a periodic or final - * keyword search. It also assumes that the only other type of Content - * for which this API will be called are Reports generated at a time - * when doing a commit is required and desirable, i.e., in a context - * other than an ingest job. - */ if (content == null) { return; } @@ -152,7 +133,7 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { if (host == null || host.isEmpty()) { throw new KeywordSearchServiceException(NbBundle.getMessage(SolrSearchService.class, "SolrConnectionCheck.MissingHostname")); //NON-NLS } - try { + try { KeywordSearch.getServer().connectToSolrServer(host, Integer.toString(port)); } catch (SolrServerException ex) { logger.log(Level.SEVERE, "Unable to connect to Solr server. Host: " + host + ", port: " + port, ex); @@ -232,7 +213,7 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { logger.log(Level.WARNING, NbBundle.getMessage(SolrSearchService.class, "SolrSearchService.exceptionMessage.noCurrentSolrCore")); throw new KeywordSearchServiceException(NbBundle.getMessage(SolrSearchService.class, - "SolrSearchService.exceptionMessage.noCurrentSolrCore")); + "SolrSearchService.exceptionMessage.noCurrentSolrCore")); } // delete index(es) for this case @@ -411,28 +392,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { } } - /** - * Event handler for ArtifactsPostedEvents from SleuthkitCase. - * - * @param event The ArtifactsPostedEvent to handle. - */ - @NbBundle.Messages("SolrSearchService.indexingError=Unable to index blackboard artifact.") - @Subscribe - void handleNewArtifacts(Blackboard.ArtifactsPostedEvent event) { - for (BlackboardArtifact artifact : event.getArtifacts()) { - if ((artifact.getArtifactTypeID() != BlackboardArtifact.ARTIFACT_TYPE.TSK_KEYWORD_HIT.getTypeID()) && // don't index KWH bc it's based on existing indexed text - (artifact.getArtifactTypeID() != BlackboardArtifact.ARTIFACT_TYPE.TSK_ASSOCIATED_OBJECT.getTypeID())){ //don't index AO bc it has only an artifact ID - no useful text - try { - index(artifact); - } catch (TskCoreException ex) { - //TODO: is this the right error handling? - logger.log(Level.SEVERE, "Unable to index blackboard artifact " + artifact.getArtifactID(), ex); //NON-NLS - MessageNotifyUtil.Notify.error(Bundle.SolrSearchService_indexingError(), artifact.getDisplayName()); - } - } - } - } - /** * Adds an artifact to the keyword search text index as a concantenation of * all of its attributes. diff --git a/branding/core/core.jar/org/netbeans/core/startup/Bundle.properties b/branding/core/core.jar/org/netbeans/core/startup/Bundle.properties index e9620c3d7a..87be1c8010 100644 --- a/branding/core/core.jar/org/netbeans/core/startup/Bundle.properties +++ b/branding/core/core.jar/org/netbeans/core/startup/Bundle.properties @@ -1,5 +1,5 @@ #Updated by build script -#Tue, 30 Nov 2021 17:19:50 -0500 +#Wed, 01 Dec 2021 12:53:03 -0500 LBL_splash_window_title=Starting Autopsy SPLASH_HEIGHT=314 SPLASH_WIDTH=538 diff --git a/branding/modules/org-netbeans-core-windows.jar/org/netbeans/core/windows/view/ui/Bundle.properties b/branding/modules/org-netbeans-core-windows.jar/org/netbeans/core/windows/view/ui/Bundle.properties index f591caf623..bfb787467d 100644 --- a/branding/modules/org-netbeans-core-windows.jar/org/netbeans/core/windows/view/ui/Bundle.properties +++ b/branding/modules/org-netbeans-core-windows.jar/org/netbeans/core/windows/view/ui/Bundle.properties @@ -1,4 +1,4 @@ #Updated by build script -#Tue, 30 Nov 2021 17:19:50 -0500 +#Wed, 01 Dec 2021 12:53:03 -0500 CTL_MainWindow_Title=Autopsy 4.19.2 CTL_MainWindow_Title_No_Project=Autopsy 4.19.2