mirror of
https://github.com/overcuriousity/autopsy-flatpak.git
synced 2025-07-06 21:00:22 +00:00
Merge pull request #7420 from rcordovano/7529-kws-data-artifact-module
7529 Analysis result ingest modules, KWS artifact and results modules
This commit is contained in:
commit
25f0698951
46
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestModule.java
Executable file
46
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestModule.java
Executable file
@ -0,0 +1,46 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.sleuthkit.autopsy.ingest;
|
||||
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
|
||||
/**
|
||||
* Interface that must be implemented by all ingest modules that process
|
||||
* analysis results.
|
||||
*/
|
||||
public interface AnalysisResultIngestModule extends IngestModule {
|
||||
|
||||
/**
|
||||
* Processes an analysis result.
|
||||
*
|
||||
* IMPORTANT: In addition to returning ProcessResult.OK or
|
||||
* ProcessResult.ERROR, modules should log all errors using methods provided
|
||||
* by the org.sleuthkit.autopsy.coreutils.Logger class. Log messages should
|
||||
* include the name and object ID of the data being processed. If an
|
||||
* exception has been caught by the module, the exception should be sent to
|
||||
* the Logger along with the log message so that a stack trace will appear
|
||||
* in the application log.
|
||||
*
|
||||
* @param result The analysis result to process.
|
||||
*
|
||||
* @return A result code indicating success or failure of the processing.
|
||||
*/
|
||||
ProcessResult process(AnalysisResult result);
|
||||
|
||||
}
|
93
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestPipeline.java
Executable file
93
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestPipeline.java
Executable file
@ -0,0 +1,93 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.sleuthkit.autopsy.ingest;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
|
||||
/**
|
||||
* A pipeline of analysis result ingest modules used to perform analysis result
|
||||
* ingest tasks for an ingest job.
|
||||
*/
|
||||
public class AnalysisResultIngestPipeline extends IngestPipeline<AnalysisResultIngestTask> {
|
||||
|
||||
/**
|
||||
* Constructs a pipeline of analysis result ingest modules used to perform
|
||||
* analysis result ingest tasks for an ingest job.
|
||||
*
|
||||
* @param ingestJobExecutor The ingest job executor for this pipeline.
|
||||
* @param moduleTemplates The ingest module templates to be used to
|
||||
* construct the ingest modules for this pipeline.
|
||||
* May be an empty list if this type of pipeline is
|
||||
* not needed for the ingest job.
|
||||
*/
|
||||
AnalysisResultIngestPipeline(IngestJobExecutor ingestJobExecutor, List<IngestModuleTemplate> moduleTemplates) {
|
||||
super(ingestJobExecutor, moduleTemplates);
|
||||
}
|
||||
|
||||
@Override
|
||||
Optional<PipelineModule<AnalysisResultIngestTask>> acceptModuleTemplate(IngestModuleTemplate template) {
|
||||
Optional<IngestPipeline.PipelineModule<AnalysisResultIngestTask>> module = Optional.empty();
|
||||
if (template.isAnalysisResultIngestModuleTemplate()) {
|
||||
AnalysisResultIngestModule ingestModule = template.createAnalysisResultIngestModule();
|
||||
module = Optional.of(new AnalysisResultIngestPipelineModule(ingestModule, template.getModuleName()));
|
||||
}
|
||||
return module;
|
||||
}
|
||||
|
||||
@Override
|
||||
void prepareForTask(AnalysisResultIngestTask task) throws IngestPipelineException {
|
||||
}
|
||||
|
||||
@Override
|
||||
void cleanUpAfterTask(AnalysisResultIngestTask task) throws IngestPipelineException {
|
||||
IngestManager.getInstance().setIngestTaskProgressCompleted(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* A decorator that adds ingest infrastructure operations to an analysis
|
||||
* result ingest module.
|
||||
*/
|
||||
static final class AnalysisResultIngestPipelineModule extends IngestPipeline.PipelineModule<AnalysisResultIngestTask> {
|
||||
|
||||
private final AnalysisResultIngestModule module;
|
||||
|
||||
/**
|
||||
* Constructs a decorator that adds ingest infrastructure operations to
|
||||
* an analysis result ingest module.
|
||||
*
|
||||
* @param module The module.
|
||||
* @param displayName The display name of the module.
|
||||
*/
|
||||
AnalysisResultIngestPipelineModule(AnalysisResultIngestModule module, String displayName) {
|
||||
super(module, displayName);
|
||||
this.module = module;
|
||||
}
|
||||
|
||||
@Override
|
||||
void process(IngestJobExecutor ingestJobExecutor, AnalysisResultIngestTask task) throws IngestModule.IngestModuleException {
|
||||
AnalysisResult result = task.getAnalysisResult();
|
||||
IngestManager.getInstance().setIngestTaskProgress(task, getDisplayName());
|
||||
module.process(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
59
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestTask.java
Executable file
59
Core/src/org/sleuthkit/autopsy/ingest/AnalysisResultIngestTask.java
Executable file
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.sleuthkit.autopsy.ingest;
|
||||
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
|
||||
/**
|
||||
* An analysis result ingest task that will be executed by an ingest thread
|
||||
* using a given ingest job executor.
|
||||
*/
|
||||
final class AnalysisResultIngestTask extends IngestTask {
|
||||
|
||||
private final AnalysisResult analysisResult;
|
||||
|
||||
/**
|
||||
* Constructs an analysis result ingest task that will be executed by an
|
||||
* ingest thread using a given ingest job executor.
|
||||
*
|
||||
* @param ingestJobExecutor The ingest job executor to use to execute the
|
||||
* task.
|
||||
* @param analysisResult The analysis result to be processed.
|
||||
*/
|
||||
AnalysisResultIngestTask(IngestJobExecutor ingestJobExecutor, AnalysisResult analysisResult) {
|
||||
super(analysisResult.getName(), ingestJobExecutor);
|
||||
this.analysisResult = analysisResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the analysis result for this task.
|
||||
*
|
||||
* @return The analysis result.
|
||||
*/
|
||||
AnalysisResult getAnalysisResult() {
|
||||
return analysisResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute(long threadId) {
|
||||
super.setThreadId(threadId);
|
||||
getIngestJobExecutor().execute(this);
|
||||
}
|
||||
|
||||
}
|
@ -93,6 +93,7 @@ IngestJobTableModel.colName.rootQueued=Roots Queued
|
||||
IngestJobTableModel.colName.streamingQueued=Streamed Files Queued
|
||||
IngestJobTableModel.colName.dsQueued=DS Queued
|
||||
IngestJobTableModel.colName.artifactsQueued=Artifacts Queued
|
||||
IngestJobTableModel.colName.resultsQueued=Results Queued
|
||||
ModuleTableModel.colName.module=Module
|
||||
ModuleTableModel.colName.duration=Duration
|
||||
IngestJobSettingsPanel.jButtonSelectAll.text=Select All
|
||||
|
@ -1,5 +1,7 @@
|
||||
CTL_RunIngestAction=Run Ingest
|
||||
FileIngestPipeline_SaveResults_Activity=Saving Results
|
||||
# {0} - data source name
|
||||
IngestJob_progress_analysisResultIngest_displayName=Analyzing analysis results from {0}
|
||||
IngestJobSettingsPanel.IngestModulesTableRenderer.info.message=A previous version of this ingest module has been run before on this data source.
|
||||
IngestJobSettingsPanel.IngestModulesTableRenderer.warning.message=This ingest module has been run before on this data source.
|
||||
IngestJobSettingsPanel.noPerRunSettings=The selected module has no per-run settings.
|
||||
@ -109,6 +111,7 @@ IngestJobTableModel.colName.rootQueued=Roots Queued
|
||||
IngestJobTableModel.colName.streamingQueued=Streamed Files Queued
|
||||
IngestJobTableModel.colName.dsQueued=DS Queued
|
||||
IngestJobTableModel.colName.artifactsQueued=Artifacts Queued
|
||||
IngestJobTableModel.colName.resultsQueued=Results Queued
|
||||
ModuleTableModel.colName.module=Module
|
||||
ModuleTableModel.colName.duration=Duration
|
||||
IngestJobSettingsPanel.jButtonSelectAll.text=Select All
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021 Basis Technology Corp.
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021 Basis Technology Corp.
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -58,6 +58,7 @@ final class DataArtifactIngestPipeline extends IngestPipeline<DataArtifactIngest
|
||||
|
||||
@Override
|
||||
void cleanUpAfterTask(DataArtifactIngestTask task) throws IngestPipelineException {
|
||||
IngestManager.getInstance().setIngestTaskProgressCompleted(task);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -83,6 +84,7 @@ final class DataArtifactIngestPipeline extends IngestPipeline<DataArtifactIngest
|
||||
@Override
|
||||
void process(IngestJobExecutor ingestJobExecutor, DataArtifactIngestTask task) throws IngestModuleException {
|
||||
DataArtifact artifact = task.getDataArtifact();
|
||||
IngestManager.getInstance().setIngestTaskProgress(task, getDisplayName());
|
||||
module.process(artifact);
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021 Basis Technology Corp.
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -37,7 +37,7 @@ final class DataArtifactIngestTask extends IngestTask {
|
||||
* @param artifact The data artifact to be processed.
|
||||
*/
|
||||
DataArtifactIngestTask(IngestJobExecutor ingestJobExecutor, DataArtifact artifact) {
|
||||
super(ingestJobExecutor);
|
||||
super(artifact.getName(), ingestJobExecutor);
|
||||
this.artifact = artifact;
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ final class DataSourceIngestTask extends IngestTask {
|
||||
* task.
|
||||
*/
|
||||
DataSourceIngestTask(IngestJobExecutor ingestJobExecutor) {
|
||||
super(ingestJobExecutor);
|
||||
super(ingestJobExecutor.getDataSource().getName(), ingestJobExecutor);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -41,7 +41,7 @@ final class FileIngestTask extends IngestTask {
|
||||
* @param file The file to be processed.
|
||||
*/
|
||||
FileIngestTask(IngestJobExecutor ingestJobPipeline, AbstractFile file) {
|
||||
super(ingestJobPipeline);
|
||||
super(file.getName(), ingestJobPipeline);
|
||||
this.file = file;
|
||||
fileId = file.getId();
|
||||
}
|
||||
@ -57,7 +57,7 @@ final class FileIngestTask extends IngestTask {
|
||||
* @param fileId The object ID of the file to be processed.
|
||||
*/
|
||||
FileIngestTask(IngestJobExecutor ingestJobPipeline, long fileId) {
|
||||
super(ingestJobPipeline);
|
||||
super("", ingestJobPipeline);
|
||||
this.fileId = fileId;
|
||||
}
|
||||
|
||||
@ -81,6 +81,9 @@ final class FileIngestTask extends IngestTask {
|
||||
synchronized AbstractFile getFile() throws TskCoreException {
|
||||
if (file == null) {
|
||||
file = Case.getCurrentCase().getSleuthkitCase().getAbstractFileById(fileId);
|
||||
if (file != null) {
|
||||
setContentName(file.getName());
|
||||
}
|
||||
}
|
||||
return file;
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import java.util.logging.Level;
|
||||
import org.openide.util.NbBundle;
|
||||
import org.sleuthkit.autopsy.coreutils.Logger;
|
||||
import org.sleuthkit.datamodel.AbstractFile;
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
import org.sleuthkit.datamodel.Content;
|
||||
import org.sleuthkit.datamodel.DataArtifact;
|
||||
|
||||
@ -166,6 +167,16 @@ public final class IngestJob {
|
||||
ingestModuleExecutor.addDataArtifacts(dataArtifacts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds one or more analysis results to this ingest job for processing by
|
||||
* its analysis result ingest modules.
|
||||
*
|
||||
* @param results The analysis results.
|
||||
*/
|
||||
void addAnalysisResults(List<AnalysisResult> results) {
|
||||
ingestModuleExecutor.addAnalysisResults(results);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts data source level analysis for this job if it is running in
|
||||
* streaming ingest mode.
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -70,6 +70,7 @@ import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
|
||||
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
|
||||
import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent;
|
||||
import org.sleuthkit.datamodel.AbstractFile;
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
import org.sleuthkit.datamodel.Blackboard;
|
||||
import org.sleuthkit.datamodel.BlackboardArtifact;
|
||||
import org.sleuthkit.datamodel.Content;
|
||||
@ -136,7 +137,8 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
|
||||
private final ExecutorService dataSourceLevelIngestJobTasksExecutor;
|
||||
private final ExecutorService fileLevelIngestJobTasksExecutor;
|
||||
private final ExecutorService resultIngestTasksExecutor;
|
||||
private final ExecutorService dataArtifactIngestTasksExecutor;
|
||||
private final ExecutorService analysisResultIngestTasksExecutor;
|
||||
private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
|
||||
private final IngestMonitor ingestMonitor = new IngestMonitor();
|
||||
private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance();
|
||||
@ -169,21 +171,11 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
* the processing of data sources by ingest modules.
|
||||
*/
|
||||
private IngestManager() {
|
||||
/*
|
||||
* Submit a single Runnable ingest manager task for processing data
|
||||
* source level ingest job tasks to the data source level ingest job
|
||||
* tasks executor.
|
||||
*/
|
||||
dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
|
||||
long threadId = nextIngestManagerTaskId.incrementAndGet();
|
||||
dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
|
||||
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
|
||||
|
||||
/*
|
||||
* Submit a configurable number of Runnable ingest manager tasks for
|
||||
* processing file level ingest job tasks to the file level ingest job
|
||||
* tasks executor.
|
||||
*/
|
||||
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
|
||||
fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
|
||||
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
|
||||
@ -192,12 +184,15 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
|
||||
}
|
||||
|
||||
resultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-results-ingest-%d").build()); //NON-NLS;
|
||||
dataArtifactIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-artifact-ingest-%d").build()); //NON-NLS;
|
||||
threadId = nextIngestManagerTaskId.incrementAndGet();
|
||||
resultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getResultIngestTaskQueue()));
|
||||
// RJCTODO
|
||||
// ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
|
||||
// RJCTODO: Where is the shut down code?
|
||||
dataArtifactIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataArtifactIngestTaskQueue()));
|
||||
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
|
||||
|
||||
analysisResultIngestTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-analysis-result-ingest-%d").build()); //NON-NLS;
|
||||
threadId = nextIngestManagerTaskId.incrementAndGet();
|
||||
analysisResultIngestTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getAnalysisResultIngestTaskQueue()));
|
||||
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -301,13 +296,16 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
* job for possible analysis.
|
||||
*/
|
||||
List<DataArtifact> newDataArtifacts = new ArrayList<>();
|
||||
List<AnalysisResult> newAnalysisResults = new ArrayList<>();
|
||||
Collection<BlackboardArtifact> newArtifacts = tskEvent.getArtifacts();
|
||||
for (BlackboardArtifact artifact : newArtifacts) {
|
||||
if (artifact instanceof DataArtifact) {
|
||||
newDataArtifacts.add((DataArtifact) artifact);
|
||||
} else {
|
||||
newAnalysisResults.add((AnalysisResult) artifact);
|
||||
}
|
||||
}
|
||||
if (!newDataArtifacts.isEmpty()) {
|
||||
if (!newDataArtifacts.isEmpty() || !newAnalysisResults.isEmpty()) {
|
||||
IngestJob ingestJob = null;
|
||||
Optional<Long> ingestJobId = tskEvent.getIngestJobId();
|
||||
if (ingestJobId.isPresent()) {
|
||||
@ -379,8 +377,13 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
}
|
||||
}
|
||||
if (ingestJob != null) {
|
||||
if (!newDataArtifacts.isEmpty()) {
|
||||
ingestJob.addDataArtifacts(newDataArtifacts);
|
||||
}
|
||||
if (!newAnalysisResults.isEmpty()) {
|
||||
ingestJob.addAnalysisResults(newAnalysisResults);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -909,52 +912,25 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
|
||||
/**
|
||||
* Updates the ingest progress snapshot when a new ingest module starts
|
||||
* working on a data source level ingest task.
|
||||
* working on an ingest task. This includes incrementing the total run time
|
||||
* for the PREVIOUS ingest module in the pipeline, which has now finished
|
||||
* its processing for the task.
|
||||
*
|
||||
* @param task The data source ingest task.
|
||||
* @param currentModuleName The display name of the currently processing
|
||||
* module.
|
||||
*/
|
||||
void setIngestTaskProgress(DataSourceIngestTask task, String currentModuleName) {
|
||||
void setIngestTaskProgress(IngestTask task, String currentModuleName) {
|
||||
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
|
||||
IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource());
|
||||
IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource(), task.getContentName());
|
||||
ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
|
||||
|
||||
/*
|
||||
* Update the total run time for the PREVIOUS ingest module in the
|
||||
* pipeline, which has now finished its processing for the task.
|
||||
*/
|
||||
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the ingest progress snapshot when a new ingest module starts
|
||||
* working on a file ingest task.
|
||||
*
|
||||
* @param task The file ingest task.
|
||||
* @param currentModuleName The display name of the currently processing
|
||||
* module.
|
||||
*/
|
||||
void setIngestTaskProgress(FileIngestTask task, String currentModuleName) {
|
||||
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
|
||||
IngestThreadActivitySnapshot newSnap;
|
||||
try {
|
||||
newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource(), task.getFile());
|
||||
} catch (TskCoreException ex) {
|
||||
logger.log(Level.SEVERE, "Error getting file from file ingest task", ex);
|
||||
newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobExecutor().getIngestJobId(), currentModuleName, task.getDataSource());
|
||||
}
|
||||
ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
|
||||
|
||||
/*
|
||||
* Update the total run time for the PREVIOUS ingest module in the
|
||||
* pipeline, which has now finished its processing for the task.
|
||||
*/
|
||||
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
|
||||
incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the ingest progress snapshot when an ingest task is completed.
|
||||
* This includes incrementing the total run time for the ingest module,
|
||||
* which is the LAST ingest module in the pipeline.
|
||||
*
|
||||
* @param task The ingest task.
|
||||
*/
|
||||
@ -962,12 +938,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
|
||||
IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId());
|
||||
ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
|
||||
|
||||
/*
|
||||
* Update the total run time for the LAST ingest module in the pipeline,
|
||||
* which has now finished its processing for the task.
|
||||
*/
|
||||
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
|
||||
incrementModuleRunTime(prevSnap.getModuleDisplayName(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1163,8 +1134,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* A snapshot of the current activity of an ingest job task execution task
|
||||
* running in an ingest thread.
|
||||
* A snapshot of the current activity of an ingest thread.
|
||||
*/
|
||||
@Immutable
|
||||
public static final class IngestThreadActivitySnapshot implements Serializable {
|
||||
@ -1173,107 +1143,111 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
|
||||
private final long threadId;
|
||||
private final Date startTime;
|
||||
private final String activity;
|
||||
private final String moduleDisplayName;
|
||||
private final String dataSourceName;
|
||||
private final String fileName;
|
||||
private final long jobId;
|
||||
|
||||
/**
|
||||
* A snapshot of the current activity of an idle ingest job task
|
||||
* execution task running in an ingest thread.
|
||||
* A snapshot of the current activity of an idle ingest thread.
|
||||
*
|
||||
* @param threadId The ingest manager task/thread id for the
|
||||
* task/thread.
|
||||
* @param threadId The ID assigned to the thread by the ingest manager.
|
||||
*/
|
||||
IngestThreadActivitySnapshot(long threadId) {
|
||||
this.threadId = threadId;
|
||||
startTime = new Date();
|
||||
this.activity = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
|
||||
this.moduleDisplayName = NbBundle.getMessage(this.getClass(), "IngestManager.IngestThreadActivitySnapshot.idleThread");
|
||||
this.dataSourceName = "";
|
||||
this.fileName = "";
|
||||
this.jobId = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* A snapshot of the current activity of an ingest job data source level
|
||||
* task execution task running in an ingest thread.
|
||||
* A snapshot of the current activity of an ingest thread executing a
|
||||
* data source, data artifact, or analysis result ingest task.
|
||||
*
|
||||
* @param threadId The ingest manager task/thread id for the
|
||||
* task/thread.
|
||||
* @param jobId The ingest job id.
|
||||
* @param activity A short description of the current activity.
|
||||
* @param dataSource The data source that is the subject of the task.
|
||||
* @param threadId The ID assigned to the thread by the ingest
|
||||
* manager.
|
||||
* @param jobId The ID of the ingest job of which the
|
||||
* current ingest task is a part.
|
||||
* @param moduleDisplayName The display name of the ingest module
|
||||
* currently working on the task.
|
||||
* @param dataSource The data source that is the subject of the
|
||||
* current ingest job.
|
||||
*/
|
||||
IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource) {
|
||||
IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource) {
|
||||
this.threadId = threadId;
|
||||
this.jobId = jobId;
|
||||
startTime = new Date();
|
||||
this.activity = activity;
|
||||
this.moduleDisplayName = moduleDisplayName;
|
||||
this.dataSourceName = dataSource.getName();
|
||||
this.fileName = "";
|
||||
}
|
||||
|
||||
/**
|
||||
* A snapshot of the current activity of an ingest job file level task
|
||||
* execution task running in an ingest thread.
|
||||
* A snapshot of the current activity of an ingest thread executing a
|
||||
* file ingest task.
|
||||
*
|
||||
* @param threadId The ingest manager task/thread id for the
|
||||
* task/thread.
|
||||
* @param jobId The ingest job id.
|
||||
* @param activity A short description of the current activity.
|
||||
* @param dataSource The data source that is the source of the file that
|
||||
* is the subject of the task.
|
||||
* @param file The file that is the subject of the task.
|
||||
* @param threadId The ID assigned to the thread by the ingest
|
||||
* manager.
|
||||
* @param jobId The ID of the ingest job of which the
|
||||
* current ingest task is a part.
|
||||
* @param moduleDisplayName The display name of the ingest module
|
||||
* currently working on the task.
|
||||
* @param dataSource The data source that is the subject of the
|
||||
* current ingest job.
|
||||
* @param file The name of the file that is the subject of
|
||||
* the current ingest task.
|
||||
*/
|
||||
IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
|
||||
IngestThreadActivitySnapshot(long threadId, long jobId, String moduleDisplayName, Content dataSource, String fileName) {
|
||||
this.threadId = threadId;
|
||||
this.jobId = jobId;
|
||||
startTime = new Date();
|
||||
this.activity = activity;
|
||||
this.moduleDisplayName = moduleDisplayName;
|
||||
this.dataSourceName = dataSource.getName();
|
||||
this.fileName = file.getName();
|
||||
this.fileName = fileName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ingest job id.
|
||||
* Gets the ID of the ingest job of which the current ingest task is a
|
||||
* part.
|
||||
*
|
||||
* @return The ingest job id.
|
||||
* @return The ingest job ID.
|
||||
*/
|
||||
long getIngestJobId() {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ingest manager task/thread id for the task/thread.
|
||||
* Gets the thread ID assigned to the thread by the ingest manager.
|
||||
*
|
||||
* @return The task/thread id.
|
||||
* @return The thread ID.
|
||||
*/
|
||||
long getThreadId() {
|
||||
return threadId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the start date and time for the current activity.
|
||||
* Gets the start date and time for the current ingest task.
|
||||
*
|
||||
* @return The start date and time.
|
||||
*/
|
||||
Date getStartTime() {
|
||||
return startTime;
|
||||
return new Date(startTime.getTime());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the THE short description of the current activity.
|
||||
* Gets display name of the ingest module currently working on the task.
|
||||
*
|
||||
* @return The short description of the current activity.
|
||||
* @return The module display name.
|
||||
*/
|
||||
String getActivity() {
|
||||
return activity;
|
||||
String getModuleDisplayName() {
|
||||
return moduleDisplayName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the display name of the data source that is either the subject
|
||||
* of the task or is the source of the file that is the subject of the
|
||||
* task.
|
||||
* Gets the display name of the data source that is the subject of the
|
||||
* ingest job.
|
||||
*
|
||||
* @return The data source display name.
|
||||
*/
|
||||
@ -1282,9 +1256,9 @@ public class IngestManager implements IngestProgressSnapshotProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the file, if any, that is the subject of the task.
|
||||
* Gets the name of file that is the subject of the current ingest task.
|
||||
*
|
||||
* @return The fiel name, may be the empty string.
|
||||
* @return The file name, may be the empty string.
|
||||
*/
|
||||
String getFileName() {
|
||||
return fileName;
|
||||
|
@ -271,4 +271,43 @@ public interface IngestModuleFactory {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries the factory to determine if it is capable of creating analysis
|
||||
* result ingest modules.
|
||||
*
|
||||
* @return True or false.
|
||||
*/
|
||||
default boolean isAnalysisResultIngestModuleFactory() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an analysis result ingest module instance.
|
||||
* <p>
|
||||
* Autopsy will generally use the factory to several instances of each type
|
||||
* of module for each ingest job it performs. Completing an ingest job
|
||||
* entails processing a single data source (e.g., a disk image) and all of
|
||||
* the files from the data source, including files extracted from archives
|
||||
* and any unallocated space (made to look like a series of files). The data
|
||||
* source is passed through one or more pipelines of data source ingest
|
||||
* modules. The files are passed through one or more pipelines of file
|
||||
* ingest modules.
|
||||
* <p>
|
||||
* The ingest framework may use multiple threads to complete an ingest job,
|
||||
* but it is guaranteed that there will be no more than one module instance
|
||||
* per thread. However, if the module instances must share resources, the
|
||||
* modules are responsible for synchronizing access to the shared resources
|
||||
* and doing reference counting as required to release those resources
|
||||
* correctly. Also, more than one ingest job may be in progress at any given
|
||||
* time. This must also be taken into consideration when sharing resources
|
||||
* between module instances. modules.
|
||||
*
|
||||
* @param settings The settings for the ingest job.
|
||||
*
|
||||
* @return A file ingest module instance.
|
||||
*/
|
||||
default AnalysisResultIngestModule createAnalysisResultIngestModule(IngestModuleIngestJobSettings settings) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -93,6 +93,14 @@ public final class IngestModuleTemplate {
|
||||
return moduleFactory.createDataArtifactIngestModule(settings);
|
||||
}
|
||||
|
||||
public boolean isAnalysisResultIngestModuleTemplate() {
|
||||
return moduleFactory.isAnalysisResultIngestModuleFactory();
|
||||
}
|
||||
|
||||
public AnalysisResultIngestModule createAnalysisResultIngestModule() {
|
||||
return moduleFactory.createAnalysisResultIngestModule(settings);
|
||||
}
|
||||
|
||||
public void setEnabled(boolean enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
@ -136,7 +136,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
|
||||
cellValue = snapshot.getThreadId();
|
||||
break;
|
||||
case 1:
|
||||
cellValue = snapshot.getActivity();
|
||||
cellValue = snapshot.getModuleDisplayName();
|
||||
break;
|
||||
case 2:
|
||||
cellValue = snapshot.getDataSourceName();
|
||||
@ -179,7 +179,8 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.rootQueued"),
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.streamingQueued"),
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.dsQueued"),
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.artifactsQueued")};
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.artifactsQueued"),
|
||||
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.resultsQueued")};
|
||||
|
||||
private List<Snapshot> jobSnapshots;
|
||||
|
||||
@ -249,6 +250,9 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
|
||||
case 11:
|
||||
cellValue = snapShot.getArtifactTasksQueueSize();
|
||||
break;
|
||||
case 12:
|
||||
cellValue = snapShot.getResultTasksQueueSize();
|
||||
break;
|
||||
default:
|
||||
cellValue = null;
|
||||
break;
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.sleuthkit.autopsy.ingest;
|
||||
|
||||
import org.sleuthkit.datamodel.Content;
|
||||
import org.sleuthkit.datamodel.TskCoreException;
|
||||
|
||||
/**
|
||||
* An ingest task that will be executed by an ingest thread using a given ingest
|
||||
@ -31,19 +32,47 @@ abstract class IngestTask {
|
||||
private final static long NOT_SET = Long.MIN_VALUE;
|
||||
private final IngestJobExecutor ingestJobExecutor;
|
||||
private long threadId;
|
||||
private String contentName;
|
||||
|
||||
/**
|
||||
* Constructs an ingest task that will be executed by an ingest thread using
|
||||
* a given ingest job executor.
|
||||
*
|
||||
* @param ingestJobExecutor The ingest job executor to use to execute the
|
||||
* @param contentName The name of the content that is the subject of
|
||||
* this task. May be the empty string, in which
|
||||
* case setContentName() can be called later.
|
||||
* @param ingestJobExecutor The ingest job executor to use to execute this
|
||||
* task.
|
||||
*/
|
||||
IngestTask(IngestJobExecutor ingestJobExecutor) {
|
||||
IngestTask(String contentName, IngestJobExecutor ingestJobExecutor) {
|
||||
this.contentName = contentName;
|
||||
this.ingestJobExecutor = ingestJobExecutor;
|
||||
threadId = NOT_SET;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the name of the content that is the subject content of the ingest
|
||||
* task.
|
||||
*
|
||||
* @return The content name.
|
||||
*
|
||||
* @throws TskCoreException This exception is thrown if there is a problem
|
||||
* providing the subject content.
|
||||
*/
|
||||
String getContentName() {
|
||||
return contentName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the name of the content that is the subject content of the ingest
|
||||
* task.
|
||||
*
|
||||
* @param contentName
|
||||
*/
|
||||
void setContentName(String contentName) {
|
||||
this.contentName = contentName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ingest job executor to use to execute this task.
|
||||
*
|
||||
|
@ -21,7 +21,6 @@ package org.sleuthkit.autopsy.ingest;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Deque;
|
||||
import java.util.Iterator;
|
||||
@ -39,6 +38,7 @@ import javax.annotation.concurrent.ThreadSafe;
|
||||
import org.sleuthkit.autopsy.casemodule.Case;
|
||||
import org.sleuthkit.autopsy.coreutils.Logger;
|
||||
import org.sleuthkit.datamodel.AbstractFile;
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
import org.sleuthkit.datamodel.Blackboard;
|
||||
import org.sleuthkit.datamodel.Content;
|
||||
import org.sleuthkit.datamodel.DataArtifact;
|
||||
@ -67,6 +67,7 @@ final class IngestTasksScheduler {
|
||||
private final Queue<FileIngestTask> streamedFileIngestTasksQueue;
|
||||
private final IngestTaskTrackingQueue fileIngestTasksQueue;
|
||||
private final IngestTaskTrackingQueue artifactIngestTasksQueue;
|
||||
private final IngestTaskTrackingQueue resultIngestTasksQueue;
|
||||
|
||||
/**
|
||||
* Gets the ingest tasks scheduler singleton that creates ingest tasks for
|
||||
@ -92,6 +93,7 @@ final class IngestTasksScheduler {
|
||||
fileIngestTasksQueue = new IngestTaskTrackingQueue();
|
||||
streamedFileIngestTasksQueue = new LinkedList<>();
|
||||
artifactIngestTasksQueue = new IngestTaskTrackingQueue();
|
||||
resultIngestTasksQueue = new IngestTaskTrackingQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -120,86 +122,66 @@ final class IngestTasksScheduler {
|
||||
*
|
||||
* @return The queue.
|
||||
*/
|
||||
BlockingIngestTaskQueue getResultIngestTaskQueue() {
|
||||
BlockingIngestTaskQueue getDataArtifactIngestTaskQueue() {
|
||||
return artifactIngestTasksQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules ingest tasks based on the types of ingest modules that the
|
||||
* ingest pipeline that will exedute tasks has. Scheduling these tasks
|
||||
* atomically means that it is valid to call currentTasksAreCompleted()
|
||||
* immediately after calling this method. Note that the may cause some or
|
||||
* even all of any file tasks to be discarded.
|
||||
* Gets the analysis result ingest tasks queue. This queue is a blocking
|
||||
* queue consumed by the ingest manager's analysis result ingest thread.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline that will execute the scheduled
|
||||
* tasks. A reference to the pipeline is added to each
|
||||
* task so that when the task is dequeued by an ingest
|
||||
* thread the task can pass the target Content of the
|
||||
* task to the pipeline for processing by the
|
||||
* pipeline's ingest modules.
|
||||
* @return The queue.
|
||||
*/
|
||||
synchronized void scheduleIngestTasks(IngestJobExecutor ingestPipeline) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
if (ingestPipeline.hasDataSourceIngestModules()) {
|
||||
scheduleDataSourceIngestTask(ingestPipeline);
|
||||
}
|
||||
if (ingestPipeline.hasFileIngestModules()) {
|
||||
scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
|
||||
}
|
||||
if (ingestPipeline.hasDataArtifactIngestModules()) {
|
||||
scheduleDataArtifactIngestTasks(ingestPipeline);
|
||||
}
|
||||
}
|
||||
BlockingIngestTaskQueue getAnalysisResultIngestTaskQueue() {
|
||||
return resultIngestTasksQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules a data source level ingest task for an ingest job. The data
|
||||
* source is obtained from the ingest pipeline passed in.
|
||||
* source is obtained from the ingest ingest job executor passed in.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline that will execute the scheduled
|
||||
* task. A reference to the pipeline is added to the
|
||||
* task so that when the task is dequeued by an ingest
|
||||
* thread the task can pass the target Content of the
|
||||
* task to the pipeline for processing by the
|
||||
* pipeline's ingest modules.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
*/
|
||||
synchronized void scheduleDataSourceIngestTask(IngestJobExecutor ingestPipeline) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
DataSourceIngestTask task = new DataSourceIngestTask(ingestPipeline);
|
||||
synchronized void scheduleDataSourceIngestTask(IngestJobExecutor executor) {
|
||||
if (!executor.isCancelled()) {
|
||||
DataSourceIngestTask task = new DataSourceIngestTask(executor);
|
||||
try {
|
||||
dataSourceIngestTasksQueue.putLast(task);
|
||||
} catch (InterruptedException ex) {
|
||||
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (pipelineId={%d)", ingestPipeline.getIngestJobId()), ex);
|
||||
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (ingest job ID={%d)", executor.getIngestJobId()), ex);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules file tasks for either all the files, or a given subset of the
|
||||
* files, for a data source. The data source is obtained from the ingest
|
||||
* pipeline passed in.
|
||||
* Schedules file tasks for either all of the files, or a given subset of
|
||||
* the files, for a data source. The data source is obtained from the ingest
|
||||
* ingest job executor passed in.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline that will execute the scheduled
|
||||
* tasks. A reference to the pipeline is added to each
|
||||
* task so that when the task is dequeued by an ingest
|
||||
* thread the task can pass the target Content of the
|
||||
* task to the pipeline for processing by the
|
||||
* pipeline's ingest modules.
|
||||
* @param files A subset of the files from the data source; if
|
||||
* empty, then all if the files from the data source
|
||||
* are candidates for scheduling.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
* @param files A subset of the files from the data source; if empty,
|
||||
* then all of the files from the data source are candidates
|
||||
* for scheduling.
|
||||
*/
|
||||
synchronized void scheduleFileIngestTasks(IngestJobExecutor ingestPipeline, Collection<AbstractFile> files) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
synchronized void scheduleFileIngestTasks(IngestJobExecutor executor, Collection<AbstractFile> files) {
|
||||
if (!executor.isCancelled()) {
|
||||
Collection<AbstractFile> candidateFiles;
|
||||
if (files.isEmpty()) {
|
||||
candidateFiles = getTopLevelFiles(ingestPipeline.getDataSource());
|
||||
candidateFiles = getTopLevelFiles(executor.getDataSource());
|
||||
} else {
|
||||
candidateFiles = files;
|
||||
}
|
||||
for (AbstractFile file : candidateFiles) {
|
||||
FileIngestTask task = new FileIngestTask(ingestPipeline, file);
|
||||
FileIngestTask task = new FileIngestTask(executor, file);
|
||||
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
|
||||
topLevelFileIngestTasksQueue.add(task);
|
||||
}
|
||||
@ -212,16 +194,15 @@ final class IngestTasksScheduler {
|
||||
* Schedules file tasks for a collection of "streamed" files for a streaming
|
||||
* ingest job.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline for the job. A reference to the
|
||||
* pipeline is added to each task so that when the
|
||||
* task is dequeued by an ingest thread and the task's
|
||||
* execute() method is called, execute() can pass the
|
||||
* target Content of the task to the pipeline for
|
||||
* processing by the pipeline's ingest modules.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
* @param files A list of file object IDs for the streamed files.
|
||||
*/
|
||||
synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor ingestPipeline, List<Long> fileIds) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
synchronized void scheduleStreamedFileIngestTasks(IngestJobExecutor executor, List<Long> fileIds) {
|
||||
if (!executor.isCancelled()) {
|
||||
for (long id : fileIds) {
|
||||
/*
|
||||
* Create the file ingest task. Note that we do not do the
|
||||
@ -230,7 +211,7 @@ final class IngestTasksScheduler {
|
||||
* file filter will be applied before the file task makes it to
|
||||
* the task queue consumed by the file ingest threads.
|
||||
*/
|
||||
FileIngestTask task = new FileIngestTask(ingestPipeline, id);
|
||||
FileIngestTask task = new FileIngestTask(executor, id);
|
||||
streamedFileIngestTasksQueue.add(task);
|
||||
}
|
||||
refillFileIngestTasksQueue();
|
||||
@ -244,16 +225,15 @@ final class IngestTasksScheduler {
|
||||
* be used to schedule files that are products of ingest module processing,
|
||||
* e.g., extracted files and carved files.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline for the job. A reference to the
|
||||
* pipeline is added to each task so that when the
|
||||
* task is dequeued by an ingest thread and the task's
|
||||
* execute() method is called, execute() can pass the
|
||||
* target Content of the task to the pipeline for
|
||||
* processing by the pipeline's ingest modules.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
* @param files The files.
|
||||
*/
|
||||
synchronized void fastTrackFileIngestTasks(IngestJobExecutor ingestPipeline, Collection<AbstractFile> files) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
synchronized void scheduleHighPriorityFileIngestTasks(IngestJobExecutor executor, Collection<AbstractFile> files) {
|
||||
if (!executor.isCancelled()) {
|
||||
/*
|
||||
* Put the files directly into the queue for the file ingest
|
||||
* threads, if they pass the file filter for the job. The files are
|
||||
@ -263,12 +243,12 @@ final class IngestTasksScheduler {
|
||||
* in progress.
|
||||
*/
|
||||
for (AbstractFile file : files) {
|
||||
FileIngestTask fileTask = new FileIngestTask(ingestPipeline, file);
|
||||
FileIngestTask fileTask = new FileIngestTask(executor, file);
|
||||
if (shouldEnqueueFileTask(fileTask)) {
|
||||
try {
|
||||
fileIngestTasksQueue.putFirst(fileTask);
|
||||
} catch (InterruptedException ex) {
|
||||
DataSource dataSource = ingestPipeline.getDataSource();
|
||||
DataSource dataSource = executor.getDataSource();
|
||||
logger.log(Level.WARNING, String.format("Interrupted while enqueuing file tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
@ -281,51 +261,73 @@ final class IngestTasksScheduler {
|
||||
/**
|
||||
* Schedules data artifact ingest tasks for any data artifacts that have
|
||||
* already been added to the case database for a data source. The data
|
||||
* source is obtained from the ingest pipeline passed in.
|
||||
* source is obtained from the ingest job executor passed in.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline for the job. A reference to the
|
||||
* pipeline is added to each task so that when the
|
||||
* task is dequeued by an ingest thread and the task's
|
||||
* execute() method is called, execute() can pass the
|
||||
* target Content of the task to the pipeline for
|
||||
* processing by the pipeline's ingest modules.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
*/
|
||||
synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor) {
|
||||
if (!executor.isCancelled()) {
|
||||
Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
|
||||
try {
|
||||
List<DataArtifact> artifacts = blackboard.getDataArtifacts(ingestPipeline.getDataSource().getId(), null);
|
||||
scheduleDataArtifactIngestTasks(ingestPipeline, artifacts);
|
||||
List<DataArtifact> artifacts = blackboard.getDataArtifacts(executor.getDataSource().getId(), null);
|
||||
scheduleDataArtifactIngestTasks(executor, artifacts);
|
||||
} catch (TskCoreException ex) {
|
||||
DataSource dataSource = ingestPipeline.getDataSource();
|
||||
DataSource dataSource = executor.getDataSource();
|
||||
logger.log(Level.SEVERE, String.format("Failed to retrieve data artifacts for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules analysis result ingest tasks for any analysis result that have
|
||||
* already been added to the case database for a data source. The data
|
||||
* source is obtained from the ingest job executor passed in.
|
||||
*
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
*/
|
||||
synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor) {
|
||||
if (!executor.isCancelled()) {
|
||||
Blackboard blackboard = Case.getCurrentCase().getSleuthkitCase().getBlackboard();
|
||||
try {
|
||||
List<AnalysisResult> results = blackboard.getAnalysisResults(executor.getDataSource().getId(), null);
|
||||
scheduleAnalysisResultIngestTasks(executor, results);
|
||||
} catch (TskCoreException ex) {
|
||||
DataSource dataSource = executor.getDataSource();
|
||||
logger.log(Level.SEVERE, String.format("Failed to retrieve analysis results for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules data artifact ingest tasks for an ingest job. This method is
|
||||
* intended to be used to schedule artifacts that are products of ingest
|
||||
* module processing.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline for the job. A reference to the
|
||||
* pipeline is added to each task so that when the
|
||||
* task is dequeued by an ingest thread and the task's
|
||||
* execute() method is called, execute() can pass the
|
||||
* target Content of the task to the pipeline for
|
||||
* processing by the pipeline's ingest modules.
|
||||
* @param artifacts A subset of the data artifacts from the data
|
||||
* source; if empty, then all of the data artifacts
|
||||
* from the data source will be scheduled.
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
* @param artifacts A subset of the data artifacts from the data source; if
|
||||
* empty, then all of the data artifacts from the data
|
||||
* source will be scheduled.
|
||||
*/
|
||||
synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor ingestPipeline, List<DataArtifact> artifacts) {
|
||||
if (!ingestPipeline.isCancelled()) {
|
||||
synchronized void scheduleDataArtifactIngestTasks(IngestJobExecutor executor, List<DataArtifact> artifacts) {
|
||||
if (!executor.isCancelled()) {
|
||||
for (DataArtifact artifact : artifacts) {
|
||||
DataArtifactIngestTask task = new DataArtifactIngestTask(ingestPipeline, artifact);
|
||||
DataArtifactIngestTask task = new DataArtifactIngestTask(executor, artifact);
|
||||
try {
|
||||
this.artifactIngestTasksQueue.putLast(task);
|
||||
artifactIngestTasksQueue.putLast(task);
|
||||
} catch (InterruptedException ex) {
|
||||
DataSource dataSource = ingestPipeline.getDataSource();
|
||||
DataSource dataSource = executor.getDataSource();
|
||||
logger.log(Level.WARNING, String.format("Interrupted while enqueuing data artifact tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
@ -334,6 +336,36 @@ final class IngestTasksScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedules data artifact ingest tasks for an ingest job. This method is
|
||||
* intended to be used to schedule artifacts that are products of ingest
|
||||
* module processing.
|
||||
*
|
||||
* @param executor The ingest job executor that will execute the scheduled
|
||||
* tasks. A reference to the executor is added to each task
|
||||
* so that when the task is dequeued by an ingest thread,
|
||||
* the task can pass its target item to the executor for
|
||||
* processing by the executor's ingest module pipelines.
|
||||
* @param results A subset of the data artifacts from the data source; if
|
||||
* empty, then all of the data artifacts from the data
|
||||
* source will be scheduled.
|
||||
*/
|
||||
synchronized void scheduleAnalysisResultIngestTasks(IngestJobExecutor executor, List<AnalysisResult> results) {
|
||||
if (!executor.isCancelled()) {
|
||||
for (AnalysisResult result : results) {
|
||||
AnalysisResultIngestTask task = new AnalysisResultIngestTask(executor, result);
|
||||
try {
|
||||
resultIngestTasksQueue.putLast(task);
|
||||
} catch (InterruptedException ex) {
|
||||
DataSource dataSource = executor.getDataSource();
|
||||
logger.log(Level.WARNING, String.format("Interrupted while enqueuing analysis results tasks for %s (data source object ID = %d)", dataSource.getName(), dataSource.getId()), ex); //NON-NLS
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows an ingest thread to notify this ingest task scheduler that a data
|
||||
* source level task has been completed.
|
||||
@ -365,22 +397,32 @@ final class IngestTasksScheduler {
|
||||
artifactIngestTasksQueue.taskCompleted(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows an ingest thread to notify this ingest task scheduler that a data
|
||||
* artifact ingest task has been completed.
|
||||
*
|
||||
* @param task The completed task.
|
||||
*/
|
||||
synchronized void notifyTaskCompleted(AnalysisResultIngestTask task) {
|
||||
resultIngestTasksQueue.taskCompleted(task);
|
||||
}
|
||||
|
||||
/**
|
||||
* Queries the task scheduler to determine whether or not all of the ingest
|
||||
* tasks for an ingest job have been completed.
|
||||
*
|
||||
* @param ingestPipeline The ingest pipeline for the job.
|
||||
* @param executor The ingest job executor.
|
||||
*
|
||||
* @return True or false.
|
||||
*/
|
||||
synchronized boolean currentTasksAreCompleted(IngestJobExecutor ingestPipeline) {
|
||||
long pipelineId = ingestPipeline.getIngestJobId();
|
||||
return !(dataSourceIngestTasksQueue.hasTasksForJob(pipelineId)
|
||||
|| hasTasksForJob(topLevelFileIngestTasksQueue, pipelineId)
|
||||
|| hasTasksForJob(batchedFileIngestTasksQueue, pipelineId)
|
||||
|| hasTasksForJob(streamedFileIngestTasksQueue, pipelineId)
|
||||
|| fileIngestTasksQueue.hasTasksForJob(pipelineId)
|
||||
|| artifactIngestTasksQueue.hasTasksForJob(pipelineId));
|
||||
synchronized boolean currentTasksAreCompleted(Long ingestJobId) {
|
||||
return !(dataSourceIngestTasksQueue.hasTasksForJob(ingestJobId)
|
||||
|| hasTasksForJob(topLevelFileIngestTasksQueue, ingestJobId)
|
||||
|| hasTasksForJob(batchedFileIngestTasksQueue, ingestJobId)
|
||||
|| hasTasksForJob(streamedFileIngestTasksQueue, ingestJobId)
|
||||
|| fileIngestTasksQueue.hasTasksForJob(ingestJobId)
|
||||
|| artifactIngestTasksQueue.hasTasksForJob(ingestJobId)
|
||||
|| resultIngestTasksQueue.hasTasksForJob(ingestJobId));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -400,13 +442,12 @@ final class IngestTasksScheduler {
|
||||
* in the batch root file tasks queue and any directories in the batch root
|
||||
* children file tasks queue.
|
||||
*
|
||||
* @param ingestJobPipeline The ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*/
|
||||
synchronized void cancelPendingFileTasksForIngestJob(IngestJobExecutor ingestJobPipeline) {
|
||||
long jobId = ingestJobPipeline.getIngestJobId();
|
||||
removeTasksForJob(topLevelFileIngestTasksQueue, jobId);
|
||||
removeTasksForJob(batchedFileIngestTasksQueue, jobId);
|
||||
removeTasksForJob(streamedFileIngestTasksQueue, jobId);
|
||||
synchronized void cancelPendingFileTasksForIngestJob(long ingestJobId) {
|
||||
removeTasksForJob(topLevelFileIngestTasksQueue, ingestJobId);
|
||||
removeTasksForJob(batchedFileIngestTasksQueue, ingestJobId);
|
||||
removeTasksForJob(streamedFileIngestTasksQueue, ingestJobId);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -421,7 +462,9 @@ final class IngestTasksScheduler {
|
||||
List<AbstractFile> topLevelFiles = new ArrayList<>();
|
||||
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
|
||||
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
|
||||
// The data source is itself a file to be processed.
|
||||
/*
|
||||
* The data source is itself a file to be processed.
|
||||
*/
|
||||
topLevelFiles.add((AbstractFile) dataSource);
|
||||
} else {
|
||||
for (AbstractFile root : rootObjects) {
|
||||
@ -429,12 +472,17 @@ final class IngestTasksScheduler {
|
||||
try {
|
||||
children = root.getChildren();
|
||||
if (children.isEmpty()) {
|
||||
// Add the root object itself, it could be an unallocated
|
||||
// space file, or a child of a volume or an image.
|
||||
/*
|
||||
* Add the root object itself, it could be an
|
||||
* unallocated space file, or a child of a volume or an
|
||||
* image.
|
||||
*/
|
||||
topLevelFiles.add(root);
|
||||
} else {
|
||||
// The root object is a file system root directory, get
|
||||
// the files within it.
|
||||
/*
|
||||
* The root object is a file system root directory, get
|
||||
* the files within it.
|
||||
*/
|
||||
for (Content child : children) {
|
||||
if (child instanceof AbstractFile) {
|
||||
topLevelFiles.add((AbstractFile) child);
|
||||
@ -546,7 +594,8 @@ final class IngestTasksScheduler {
|
||||
AbstractFile file = null;
|
||||
try {
|
||||
file = nextTask.getFile();
|
||||
for (Content child : file.getChildren()) {
|
||||
List<Content> children = file.getChildren();
|
||||
for (Content child : children) {
|
||||
if (child instanceof AbstractFile) {
|
||||
AbstractFile childFile = (AbstractFile) child;
|
||||
FileIngestTask childTask = new FileIngestTask(nextTask.getIngestJobExecutor(), childFile);
|
||||
@ -586,8 +635,10 @@ final class IngestTasksScheduler {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Skip the task if the file is actually the pseudo-file for the parent
|
||||
// or current directory.
|
||||
/*
|
||||
* Skip the task if the file is actually the pseudo-file for the parent
|
||||
* or current directory.
|
||||
*/
|
||||
String fileName = file.getName();
|
||||
|
||||
if (fileName.equals(".") || fileName.equals("..")) {
|
||||
@ -610,12 +661,16 @@ final class IngestTasksScheduler {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Skip the task if the file is one of a select group of special, large
|
||||
// NTFS or FAT file system files.
|
||||
/*
|
||||
* Skip the task if the file is one of a select group of special, large
|
||||
* NTFS or FAT file system files.
|
||||
*/
|
||||
if (file instanceof org.sleuthkit.datamodel.File) {
|
||||
final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
|
||||
|
||||
// Get the type of the file system, if any, that owns the file.
|
||||
/*
|
||||
* Get the type of the file system, if any, that owns the file.
|
||||
*/
|
||||
TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
|
||||
try {
|
||||
FileSystem fs = f.getFileSystem();
|
||||
@ -626,12 +681,16 @@ final class IngestTasksScheduler {
|
||||
logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS
|
||||
}
|
||||
|
||||
// If the file system is not NTFS or FAT, don't skip the file.
|
||||
/*
|
||||
* If the file system is not NTFS or FAT, don't skip the file.
|
||||
*/
|
||||
if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Find out whether the file is in a root directory.
|
||||
/*
|
||||
* Find out whether the file is in a root directory.
|
||||
*/
|
||||
boolean isInRootDir = false;
|
||||
try {
|
||||
AbstractFile parent = f.getParentDirectory();
|
||||
@ -644,9 +703,11 @@ final class IngestTasksScheduler {
|
||||
logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS
|
||||
}
|
||||
|
||||
// If the file is in the root directory of an NTFS or FAT file
|
||||
// system, check its meta-address and check its name for the '$'
|
||||
// character and a ':' character (not a default attribute).
|
||||
/*
|
||||
* If the file is in the root directory of an NTFS or FAT file
|
||||
* system, check its meta-address and check its name for the '$'
|
||||
* character and a ':' character (not a default attribute).
|
||||
*/
|
||||
if (isInRootDir && f.getMetaAddr() < 32) {
|
||||
String name = f.getName();
|
||||
if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
|
||||
@ -696,13 +757,13 @@ final class IngestTasksScheduler {
|
||||
* given ingest job.
|
||||
*
|
||||
* @param tasks The tasks.
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return True if there are no tasks for the job, false otherwise.
|
||||
*/
|
||||
synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
|
||||
synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long ingestJobId) {
|
||||
for (IngestTask task : tasks) {
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) {
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -714,13 +775,13 @@ final class IngestTasksScheduler {
|
||||
* collection of tasks.
|
||||
*
|
||||
* @param tasks The tasks.
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*/
|
||||
private static void removeTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
|
||||
private static void removeTasksForJob(Collection<? extends IngestTask> tasks, long ingestJobId) {
|
||||
Iterator<? extends IngestTask> iterator = tasks.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
IngestTask task = iterator.next();
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) {
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
@ -731,14 +792,14 @@ final class IngestTasksScheduler {
|
||||
* ingest job.
|
||||
*
|
||||
* @param tasks The tasks.
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return The count.
|
||||
*/
|
||||
private static int countTasksForJob(Collection<? extends IngestTask> tasks, long pipelineId) {
|
||||
private static int countTasksForJob(Collection<? extends IngestTask> tasks, long ingestJobId) {
|
||||
int count = 0;
|
||||
for (IngestTask task : tasks) {
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == pipelineId) {
|
||||
if (task.getIngestJobExecutor().getIngestJobId() == ingestJobId) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
@ -749,18 +810,22 @@ final class IngestTasksScheduler {
|
||||
* Returns a snapshot of the states of the tasks in progress for an ingest
|
||||
* job.
|
||||
*
|
||||
* @param jobId The identifier assigned to the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return
|
||||
* @return The snaphot.
|
||||
*/
|
||||
synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
|
||||
return new IngestJobTasksSnapshot(jobId, dataSourceIngestTasksQueue.countQueuedTasksForJob(jobId),
|
||||
countTasksForJob(topLevelFileIngestTasksQueue, jobId),
|
||||
countTasksForJob(batchedFileIngestTasksQueue, jobId),
|
||||
fileIngestTasksQueue.countQueuedTasksForJob(jobId),
|
||||
dataSourceIngestTasksQueue.countRunningTasksForJob(jobId) + fileIngestTasksQueue.countRunningTasksForJob(jobId) + artifactIngestTasksQueue.countRunningTasksForJob(jobId),
|
||||
countTasksForJob(streamedFileIngestTasksQueue, jobId),
|
||||
artifactIngestTasksQueue.countQueuedTasksForJob(jobId));
|
||||
synchronized IngestTasksSnapshot getTasksSnapshotForJob(long ingestJobId) {
|
||||
return new IngestTasksSnapshot(
|
||||
ingestJobId,
|
||||
dataSourceIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
|
||||
countTasksForJob(topLevelFileIngestTasksQueue, ingestJobId),
|
||||
countTasksForJob(batchedFileIngestTasksQueue, ingestJobId),
|
||||
fileIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
|
||||
countTasksForJob(streamedFileIngestTasksQueue, ingestJobId),
|
||||
artifactIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
|
||||
resultIngestTasksQueue.countQueuedTasksForJob(ingestJobId),
|
||||
dataSourceIngestTasksQueue.countRunningTasksForJob(ingestJobId) + fileIngestTasksQueue.countRunningTasksForJob(ingestJobId) + artifactIngestTasksQueue.countRunningTasksForJob(ingestJobId) + resultIngestTasksQueue.countRunningTasksForJob(ingestJobId)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -771,20 +836,27 @@ final class IngestTasksScheduler {
|
||||
|
||||
@Override
|
||||
public int compare(FileIngestTask q1, FileIngestTask q2) {
|
||||
// In practice the case where one or both calls to getFile() fails
|
||||
// should never occur since such tasks would not be added to the queue.
|
||||
/*
|
||||
* In practice the case where one or both calls to getFile() fails
|
||||
* should never occur since such tasks would not be added to the
|
||||
* queue.
|
||||
*/
|
||||
AbstractFile file1 = null;
|
||||
AbstractFile file2 = null;
|
||||
try {
|
||||
file1 = q1.getFile();
|
||||
} catch (TskCoreException ex) {
|
||||
// Do nothing - the exception has been logged elsewhere
|
||||
/*
|
||||
* Do nothing - the exception has been logged elsewhere
|
||||
*/
|
||||
}
|
||||
|
||||
try {
|
||||
file2 = q2.getFile();
|
||||
} catch (TskCoreException ex) {
|
||||
// Do nothing - the exception has been logged elsewhere
|
||||
/*
|
||||
* Do nothing - the exception has been logged elsewhere
|
||||
*/
|
||||
}
|
||||
|
||||
if (file1 == null) {
|
||||
@ -829,15 +901,11 @@ final class IngestTasksScheduler {
|
||||
static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
|
||||
|
||||
/*
|
||||
* prioritize root directory folders based on the assumption that we
|
||||
* Prioritize root directory folders based on the assumption that we
|
||||
* are looking for user content. Other types of investigations may
|
||||
* want different priorities.
|
||||
*/
|
||||
static /*
|
||||
* prioritize root directory folders based on the assumption that we
|
||||
* are looking for user content. Other types of investigations may
|
||||
* want different priorities.
|
||||
*/ {
|
||||
static {
|
||||
// these files have no structure, so they go last
|
||||
//unalloc files are handled as virtual files in getPriority()
|
||||
//LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
|
||||
@ -1013,98 +1081,107 @@ final class IngestTasksScheduler {
|
||||
* Checks whether there are any ingest tasks are queued and/or running
|
||||
* for a given ingest job.
|
||||
*
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return
|
||||
* @return True or false.
|
||||
*/
|
||||
boolean hasTasksForJob(long pipelineId) {
|
||||
boolean hasTasksForJob(long ingestJobId) {
|
||||
synchronized (this) {
|
||||
return IngestTasksScheduler.hasTasksForJob(queuedTasks, pipelineId) || IngestTasksScheduler.hasTasksForJob(tasksInProgress, pipelineId);
|
||||
return IngestTasksScheduler.hasTasksForJob(queuedTasks, ingestJobId) || IngestTasksScheduler.hasTasksForJob(tasksInProgress, ingestJobId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a count of the queued ingest tasks for a given ingest job.
|
||||
*
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return
|
||||
* @return The count.
|
||||
*/
|
||||
int countQueuedTasksForJob(long pipelineId) {
|
||||
int countQueuedTasksForJob(long ingestJobId) {
|
||||
synchronized (this) {
|
||||
return IngestTasksScheduler.countTasksForJob(queuedTasks, pipelineId);
|
||||
return IngestTasksScheduler.countTasksForJob(queuedTasks, ingestJobId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a count of the running ingest tasks for a given ingest job.
|
||||
*
|
||||
* @param pipelineId The ID of the ingest pipeline for the job.
|
||||
* @param ingestJobId The ingest job ID.
|
||||
*
|
||||
* @return
|
||||
* @return The count.
|
||||
*/
|
||||
int countRunningTasksForJob(long pipelineId) {
|
||||
int countRunningTasksForJob(long ingestJobId) {
|
||||
synchronized (this) {
|
||||
return IngestTasksScheduler.countTasksForJob(tasksInProgress, pipelineId);
|
||||
return IngestTasksScheduler.countTasksForJob(tasksInProgress, ingestJobId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* A snapshot of ingest tasks data for an ingest job.
|
||||
* A snapshot of the sizes of the ingest task lists and queues for a given
|
||||
* ingest job.
|
||||
*/
|
||||
static final class IngestJobTasksSnapshot implements Serializable {
|
||||
static final class IngestTasksSnapshot implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
private final long jobId;
|
||||
private final long dsQueueSize;
|
||||
private final long ingestJobId;
|
||||
private final long dataSourceQueueSize;
|
||||
private final long rootQueueSize;
|
||||
private final long dirQueueSize;
|
||||
private final long fileQueueSize;
|
||||
private final long runningListSize;
|
||||
private final long streamingQueueSize;
|
||||
private final long inProgressListSize;
|
||||
private final long streamedFileQueueSize;
|
||||
private final long artifactsQueueSize;
|
||||
private final long resultsQueueSize;
|
||||
|
||||
/**
|
||||
* RJCTODO
|
||||
* Constructs a snapshot of the sizes of the ingest task lists and
|
||||
* queues for a given ingest job.
|
||||
*
|
||||
* Constructs a snapshot of ingest tasks data for an ingest job.
|
||||
*
|
||||
* @param jobId The identifier associated with the job.
|
||||
* @param dsQueueSize
|
||||
* @param rootQueueSize
|
||||
* @param dirQueueSize
|
||||
* @param fileQueueSize
|
||||
* @param runningListSize
|
||||
* @param streamingQueueSize
|
||||
* @param artifactsQueueSize
|
||||
* @param ingestJobId The ingest job ID.
|
||||
* @param dataSourceQueueSize The number of queued ingest tasks for
|
||||
* data sources.
|
||||
* @param rootQueueSize The number of queued ingest tasks for
|
||||
* "root" file system objects.
|
||||
* @param dirQueueSize The number of queued ingest tasks for
|
||||
* directories.
|
||||
* @param fileQueueSize The number of queued ingest tasks for
|
||||
* files.
|
||||
* @param inProgressListSize The number of ingest tasks in progress.
|
||||
* @param streamedFileQueueSize The number of queued ingest tasks for
|
||||
* streamed files.
|
||||
* @param artifactsQueueSize The number of queued ingest tasks for
|
||||
* data artifacts.
|
||||
* @param resultsQueueSize The number of queued ingest tasks for
|
||||
* analysis results.
|
||||
*/
|
||||
IngestJobTasksSnapshot(long jobId, long dsQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize,
|
||||
long runningListSize, long streamingQueueSize, long artifactsQueueSize) {
|
||||
this.jobId = jobId;
|
||||
this.dsQueueSize = dsQueueSize;
|
||||
IngestTasksSnapshot(long ingestJobId, long dataSourceQueueSize, long rootQueueSize, long dirQueueSize, long fileQueueSize, long streamedFileQueueSize, long artifactsQueueSize, long resultsQueueSize, long inProgressListSize) {
|
||||
this.ingestJobId = ingestJobId;
|
||||
this.dataSourceQueueSize = dataSourceQueueSize;
|
||||
this.rootQueueSize = rootQueueSize;
|
||||
this.dirQueueSize = dirQueueSize;
|
||||
this.fileQueueSize = fileQueueSize;
|
||||
this.runningListSize = runningListSize;
|
||||
this.streamingQueueSize = streamingQueueSize;
|
||||
this.streamedFileQueueSize = streamedFileQueueSize;
|
||||
this.artifactsQueueSize = artifactsQueueSize;
|
||||
this.resultsQueueSize = resultsQueueSize;
|
||||
this.inProgressListSize = inProgressListSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the identifier associated with the ingest job for which this
|
||||
* snapshot was created.
|
||||
* Gets the ingest job ID of the ingest job for which this snapshot was
|
||||
* created.
|
||||
*
|
||||
* @return The ingest job identifier.
|
||||
* @return The ingest job ID.
|
||||
*/
|
||||
long getJobId() {
|
||||
return jobId;
|
||||
long getIngestJobId() {
|
||||
return ingestJobId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of file ingest tasks associated with the job that are
|
||||
* in the root directories queue.
|
||||
* Gets the number of file ingest tasks for this job that are in the
|
||||
* file system root objects queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
@ -1113,35 +1190,75 @@ final class IngestTasksScheduler {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of file ingest tasks associated with the job that are
|
||||
* in the root directories queue.
|
||||
* Gets the number of file ingest tasks for this job that are in the
|
||||
* ditrectories queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getDirectoryTasksQueueSize() {
|
||||
long getDirQueueSize() {
|
||||
return dirQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of file ingest tasks for this job that are in the
|
||||
* files queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getFileQueueSize() {
|
||||
return fileQueueSize;
|
||||
}
|
||||
|
||||
long getStreamingQueueSize() {
|
||||
return streamingQueueSize;
|
||||
/**
|
||||
* Gets the number of file ingest tasks for this job that are in the
|
||||
* streamed files queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getStreamedFilesQueueSize() {
|
||||
return streamedFileQueueSize;
|
||||
}
|
||||
|
||||
long getDsQueueSize() {
|
||||
return dsQueueSize;
|
||||
}
|
||||
|
||||
long getRunningListSize() {
|
||||
return runningListSize;
|
||||
/**
|
||||
* Gets the number of data source ingest tasks for this job that are in
|
||||
* the data sources queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getDataSourceQueueSize() {
|
||||
return dataSourceQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of data artifact ingest tasks for this job that are
|
||||
* in the data artifacts queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getArtifactsQueueSize() {
|
||||
return artifactsQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of analysis result ingest tasks for this job that are
|
||||
* in the analysis results queue.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getResultsQueueSize() {
|
||||
return resultsQueueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of ingest tasks for this job that are in the tasks in
|
||||
* progress list.
|
||||
*
|
||||
* @return The tasks count.
|
||||
*/
|
||||
long getProgressListSize() {
|
||||
return inProgressListSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ public final class Snapshot implements Serializable {
|
||||
private final Date fileIngestStartTime;
|
||||
private final long processedFiles;
|
||||
private final long estimatedFilesToProcess;
|
||||
private final IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot;
|
||||
private final IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot;
|
||||
transient private final boolean jobCancelled;
|
||||
transient private final IngestJob.CancellationReason jobCancellationReason;
|
||||
transient private final List<String> cancelledDataSourceModules;
|
||||
@ -52,7 +52,7 @@ public final class Snapshot implements Serializable {
|
||||
boolean fileIngestRunning, Date fileIngestStartTime,
|
||||
boolean jobCancelled, IngestJob.CancellationReason cancellationReason, List<String> cancelledModules,
|
||||
long processedFiles, long estimatedFilesToProcess,
|
||||
long snapshotTime, IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot) {
|
||||
long snapshotTime, IngestTasksScheduler.IngestTasksSnapshot tasksSnapshot) {
|
||||
this.dataSource = dataSourceName;
|
||||
this.jobId = jobId;
|
||||
this.jobStartTime = jobStartTime;
|
||||
@ -162,7 +162,7 @@ public final class Snapshot implements Serializable {
|
||||
if (null == this.tasksSnapshot) {
|
||||
return 0;
|
||||
}
|
||||
return this.tasksSnapshot.getDirectoryTasksQueueSize();
|
||||
return this.tasksSnapshot.getDirQueueSize();
|
||||
}
|
||||
|
||||
long getFileQueueSize() {
|
||||
@ -176,21 +176,21 @@ public final class Snapshot implements Serializable {
|
||||
if (null == this.tasksSnapshot) {
|
||||
return 0;
|
||||
}
|
||||
return this.tasksSnapshot.getDsQueueSize();
|
||||
return this.tasksSnapshot.getDataSourceQueueSize();
|
||||
}
|
||||
|
||||
long getStreamingQueueSize() {
|
||||
if (null == this.tasksSnapshot) {
|
||||
return 0;
|
||||
}
|
||||
return this.tasksSnapshot.getStreamingQueueSize();
|
||||
return this.tasksSnapshot.getStreamedFilesQueueSize();
|
||||
}
|
||||
|
||||
long getRunningListSize() {
|
||||
if (null == this.tasksSnapshot) {
|
||||
return 0;
|
||||
}
|
||||
return this.tasksSnapshot.getRunningListSize();
|
||||
return this.tasksSnapshot.getProgressListSize();
|
||||
}
|
||||
|
||||
long getArtifactTasksQueueSize() {
|
||||
@ -200,6 +200,13 @@ public final class Snapshot implements Serializable {
|
||||
return tasksSnapshot.getArtifactsQueueSize();
|
||||
}
|
||||
|
||||
long getResultTasksQueueSize() {
|
||||
if (tasksSnapshot == null) {
|
||||
return 0;
|
||||
}
|
||||
return tasksSnapshot.getResultsQueueSize();
|
||||
}
|
||||
|
||||
boolean isCancelled() {
|
||||
return this.jobCancelled;
|
||||
}
|
||||
|
@ -371,7 +371,6 @@ SolrSearchService.exceptionMessage.noCurrentSolrCore=IndexMetadata did not conta
|
||||
SolrSearchService.exceptionMessage.noIndexMetadata=Unable to create IndexMetaData from case directory: {0}
|
||||
# {0} - collection name
|
||||
SolrSearchService.exceptionMessage.unableToDeleteCollection=Unable to delete collection {0}
|
||||
SolrSearchService.indexingError=Unable to index blackboard artifact.
|
||||
SolrSearchService.ServiceName=Solr Keyword Search Service
|
||||
SolrSearchService.DeleteDataSource.msg=Error Deleting Solr data for data source id {0}
|
||||
DropdownSingleTermSearchPanel.dataSourceCheckBox.text=Restrict search to the selected data sources:
|
||||
|
@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.logging.Level;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import javax.swing.SwingUtilities;
|
||||
import javax.swing.SwingWorker;
|
||||
import org.netbeans.api.progress.ProgressHandle;
|
||||
@ -52,34 +53,44 @@ import org.sleuthkit.autopsy.ingest.IngestMessage;
|
||||
import org.sleuthkit.autopsy.ingest.IngestServices;
|
||||
|
||||
/**
|
||||
* Singleton keyword search manager: Launches search threads for each job and
|
||||
* performs commits, both on timed intervals.
|
||||
* Performs periodic and final keyword searches for ingest jobs. Periodic
|
||||
* searches are done in background tasks. This represents a careful working
|
||||
* around of the contract for IngestModule.process(). Final searches are done
|
||||
* synchronously in the calling thread, as required by the contract for
|
||||
* IngestModule.shutDown().
|
||||
*/
|
||||
final class IngestSearchRunner {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(IngestSearchRunner.class.getName());
|
||||
private static IngestSearchRunner instance = null;
|
||||
private IngestServices services = IngestServices.getInstance();
|
||||
private final IngestServices services = IngestServices.getInstance();
|
||||
private Ingester ingester = null;
|
||||
private long currentUpdateIntervalMs;
|
||||
private volatile boolean periodicSearchTaskRunning = false;
|
||||
private Future<?> jobProcessingTaskFuture;
|
||||
private final ScheduledThreadPoolExecutor jobProcessingExecutor;
|
||||
private volatile boolean periodicSearchTaskRunning;
|
||||
private volatile Future<?> periodicSearchTaskHandle;
|
||||
private final ScheduledThreadPoolExecutor periodicSearchTaskExecutor;
|
||||
private static final int NUM_SEARCH_SCHEDULING_THREADS = 1;
|
||||
private static final String SEARCH_SCHEDULER_THREAD_NAME = "periodic-search-scheduler-%d";
|
||||
private static final String SEARCH_SCHEDULER_THREAD_NAME = "periodic-search-scheduling-%d";
|
||||
private final Map<Long, SearchJobInfo> jobs = new ConcurrentHashMap<>(); // Ingest job ID to search job info
|
||||
private final boolean usingNetBeansGUI = RuntimeProperties.runningWithGUI();
|
||||
|
||||
// maps a jobID to the search
|
||||
private Map<Long, SearchJobInfo> jobs = new ConcurrentHashMap<>();
|
||||
|
||||
IngestSearchRunner() {
|
||||
/*
|
||||
* Constructs a singleton object that performs periodic and final keyword
|
||||
* searches for ingest jobs. Periodic searches are done in background tasks.
|
||||
* This represents a careful working around of the contract for
|
||||
* IngestModule.process(). Final searches are done synchronously in the
|
||||
* calling thread, as required by the contract for IngestModule.shutDown().
|
||||
*/
|
||||
private IngestSearchRunner() {
|
||||
currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
|
||||
ingester = Ingester.getDefault();
|
||||
jobProcessingExecutor = new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build());
|
||||
periodicSearchTaskExecutor = new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the ingest search runner singleton.
|
||||
*
|
||||
* @return the singleton object
|
||||
* @return The ingest search runner.
|
||||
*/
|
||||
public static synchronized IngestSearchRunner getInstance() {
|
||||
if (instance == null) {
|
||||
@ -89,72 +100,75 @@ final class IngestSearchRunner {
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the search job for an ingest job.
|
||||
*
|
||||
* @param jobContext
|
||||
* @param keywordListNames
|
||||
* @param jobContext The ingest job context.
|
||||
* @param keywordListNames The names of the keyword search lists for the
|
||||
* ingest job.
|
||||
*/
|
||||
public synchronized void startJob(IngestJobContext jobContext, List<String> keywordListNames) {
|
||||
long jobId = jobContext.getJobId();
|
||||
if (jobs.containsKey(jobId) == false) {
|
||||
logger.log(Level.INFO, "Adding job {0}", jobId); //NON-NLS
|
||||
SearchJobInfo jobData = new SearchJobInfo(jobContext, keywordListNames);
|
||||
jobs.put(jobId, jobData);
|
||||
}
|
||||
|
||||
// keep track of how many threads / module instances from this job have asked for this
|
||||
/*
|
||||
* Keep track of the number of keyword search file ingest modules that
|
||||
* are doing analysis for the ingest job, i.e., that have called this
|
||||
* method. This is needed by endJob().
|
||||
*/
|
||||
jobs.get(jobId).incrementModuleReferenceCount();
|
||||
|
||||
// start the timer, if needed
|
||||
/*
|
||||
* Start a periodic search task in the
|
||||
*/
|
||||
if ((jobs.size() > 0) && (periodicSearchTaskRunning == false)) {
|
||||
// reset the default periodic search frequency to the user setting
|
||||
logger.log(Level.INFO, "Resetting periodic search time out to default value"); //NON-NLS
|
||||
currentUpdateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
|
||||
jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
|
||||
periodicSearchTaskHandle = periodicSearchTaskExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
|
||||
periodicSearchTaskRunning = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform normal finishing of searching for this job, including one last
|
||||
* commit and search. Blocks until the final search is complete.
|
||||
* Finishes a search job for an ingest job.
|
||||
*
|
||||
* @param jobId
|
||||
* @param jobId The ingest job ID.
|
||||
*/
|
||||
public synchronized void endJob(long jobId) {
|
||||
/*
|
||||
* Only complete the job if this is the last keyword search file ingest
|
||||
* module doing annalysis for this job.
|
||||
*/
|
||||
SearchJobInfo job;
|
||||
boolean readyForFinalSearch = false;
|
||||
job = jobs.get(jobId);
|
||||
if (job == null) {
|
||||
return;
|
||||
return; // RJCTODO: SEVERE
|
||||
}
|
||||
|
||||
// Only do final search if this is the last module/thread in this job to call endJob()
|
||||
if (job.decrementModuleReferenceCount() == 0) {
|
||||
if (job.decrementModuleReferenceCount() != 0) {
|
||||
jobs.remove(jobId);
|
||||
readyForFinalSearch = true;
|
||||
}
|
||||
|
||||
if (readyForFinalSearch) {
|
||||
/*
|
||||
* Commit the index and do the final search. The final search is done in
|
||||
* the ingest thread that shutDown() on the keyword search file ingest
|
||||
* module, per the contract of IngestModule.shutDwon().
|
||||
*/
|
||||
logger.log(Level.INFO, "Commiting search index before final search for search job {0}", job.getJobId()); //NON-NLS
|
||||
commit();
|
||||
doFinalSearch(job); //this will block until it's done
|
||||
logger.log(Level.INFO, "Starting final search for search job {0}", job.getJobId()); //NON-NLS
|
||||
doFinalSearch(job);
|
||||
logger.log(Level.INFO, "Final search for search job {0} completed", job.getJobId()); //NON-NLS
|
||||
|
||||
// new jobs could have been added while we were doing final search
|
||||
if (jobs.isEmpty()) {
|
||||
// no more jobs left. stop the PeriodicSearchTask.
|
||||
// A new one will be created for future jobs.
|
||||
logger.log(Level.INFO, "No more search jobs. Stopping periodic search task"); //NON-NLS
|
||||
periodicSearchTaskRunning = false;
|
||||
jobProcessingTaskFuture.cancel(true);
|
||||
}
|
||||
cancelPeriodicSearchSchedulingTask();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediate stop and removal of job from SearchRunner. Cancels the
|
||||
* associated search worker if it's still running.
|
||||
* Stops the search job for an ingest job.
|
||||
*
|
||||
* @param jobId
|
||||
* @param jobId The ingest job ID.
|
||||
*/
|
||||
public synchronized void stopJob(long jobId) {
|
||||
logger.log(Level.INFO, "Stopping search job {0}", jobId); //NON-NLS
|
||||
@ -166,7 +180,10 @@ final class IngestSearchRunner {
|
||||
return;
|
||||
}
|
||||
|
||||
//stop currentSearcher
|
||||
/*
|
||||
* Request cancellation of the current keyword search, whether it is a
|
||||
* preiodic search or a final search.
|
||||
*/
|
||||
IngestSearchRunner.Searcher currentSearcher = job.getCurrentSearcher();
|
||||
if ((currentSearcher != null) && (!currentSearcher.isDone())) {
|
||||
logger.log(Level.INFO, "Cancelling search job {0}", jobId); //NON-NLS
|
||||
@ -176,19 +193,16 @@ final class IngestSearchRunner {
|
||||
jobs.remove(jobId);
|
||||
|
||||
if (jobs.isEmpty()) {
|
||||
// no more jobs left. stop the PeriodicSearchTask.
|
||||
// A new one will be created for future jobs.
|
||||
logger.log(Level.INFO, "No more search jobs. Stopping periodic search task"); //NON-NLS
|
||||
periodicSearchTaskRunning = false;
|
||||
jobProcessingTaskFuture.cancel(true);
|
||||
cancelPeriodicSearchSchedulingTask();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add these lists to all of the jobs. Used when user wants to search for a
|
||||
* list while ingest has already started.
|
||||
* Adds the given keyword list names to the set of keyword lists to be
|
||||
* searched by ALL keyword search jobs. This supports adding one or more
|
||||
* keyword search lists to ingest jobs already in progress.
|
||||
*
|
||||
* @param keywordListNames
|
||||
* @param keywordListNames The n ames of the additional keyword lists.
|
||||
*/
|
||||
public synchronized void addKeywordListsToAllJobs(List<String> keywordListNames) {
|
||||
for (String listName : keywordListNames) {
|
||||
@ -200,155 +214,171 @@ final class IngestSearchRunner {
|
||||
}
|
||||
|
||||
/**
|
||||
* Commits index and notifies listeners of index update
|
||||
* Commits the Solr index for the current case and publishes an event
|
||||
* indicating the current number of indexed items (this is no longer just
|
||||
* files).
|
||||
*/
|
||||
private void commit() {
|
||||
ingester.commit();
|
||||
|
||||
// Signal a potential change in number of text_ingested files
|
||||
/*
|
||||
* Publish an event advertising the number of indexed items. Note that
|
||||
* this is no longer the number of indexed files, since the text of many
|
||||
* items in addition to files is indexed.
|
||||
*/
|
||||
try {
|
||||
final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles();
|
||||
KeywordSearch.fireNumIndexedFilesChange(null, numIndexedFiles);
|
||||
} catch (NoOpenCoreException | KeywordSearchModuleException ex) {
|
||||
logger.log(Level.SEVERE, "Error executing Solr query to check number of indexed files", ex); //NON-NLS
|
||||
logger.log(Level.SEVERE, "Error executing Solr query for number of indexed files", ex); //NON-NLS
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A final search waits for any still-running workers, and then executes a
|
||||
* new one and waits until that is done.
|
||||
* Performs the final keyword search for an ingest job. The search is done
|
||||
* synchronously, as required by the contract for IngestModule.shutDown().
|
||||
*
|
||||
* @param job
|
||||
* @param job The keyword search job info.
|
||||
*/
|
||||
private void doFinalSearch(SearchJobInfo job) {
|
||||
// Run one last search as there are probably some new files committed
|
||||
logger.log(Level.INFO, "Starting final search for search job {0}", job.getJobId()); //NON-NLS
|
||||
if (!job.getKeywordListNames().isEmpty()) {
|
||||
try {
|
||||
// In case this job still has a worker running, wait for it to finish
|
||||
logger.log(Level.INFO, "Checking for previous search for search job {0} before executing final search", job.getJobId()); //NON-NLS
|
||||
/*
|
||||
* Wait for any periodic searches being done in a SwingWorker
|
||||
* pool thread to finish.
|
||||
*/
|
||||
job.waitForCurrentWorker();
|
||||
|
||||
IngestSearchRunner.Searcher finalSearcher = new IngestSearchRunner.Searcher(job, true);
|
||||
job.setCurrentSearcher(finalSearcher); //save the ref
|
||||
logger.log(Level.INFO, "Kicking off final search for search job {0}", job.getJobId()); //NON-NLS
|
||||
finalSearcher.execute(); //start thread
|
||||
|
||||
// block until the search is complete
|
||||
logger.log(Level.INFO, "Waiting for final search for search job {0}", job.getJobId()); //NON-NLS
|
||||
finalSearcher.get();
|
||||
logger.log(Level.INFO, "Final search for search job {0} completed", job.getJobId()); //NON-NLS
|
||||
|
||||
job.setCurrentSearcher(finalSearcher);
|
||||
/*
|
||||
* Do the final search synchronously on the current ingest
|
||||
* thread, per the contract specified
|
||||
*/
|
||||
finalSearcher.doInBackground();
|
||||
} catch (InterruptedException | CancellationException ex) {
|
||||
logger.log(Level.INFO, "Final search for search job {0} interrupted or cancelled", job.getJobId()); //NON-NLS
|
||||
} catch (ExecutionException ex) {
|
||||
} catch (Exception ex) {
|
||||
logger.log(Level.SEVERE, String.format("Final search for search job %d failed", job.getJobId()), ex); //NON-NLS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Task to perform periodic searches for each job (does a single index
|
||||
* commit first)
|
||||
* Cancels the current periodic search scheduling task.
|
||||
*/
|
||||
private final class PeriodicSearchTask implements Runnable {
|
||||
|
||||
private final Logger logger = Logger.getLogger(IngestSearchRunner.PeriodicSearchTask.class.getName());
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// If no jobs then cancel the task. If more job(s) come along, a new task will start up.
|
||||
if (jobs.isEmpty() || jobProcessingTaskFuture.isCancelled()) {
|
||||
logger.log(Level.INFO, "Exiting periodic search task"); //NON-NLS
|
||||
private synchronized void cancelPeriodicSearchSchedulingTask() {
|
||||
if (periodicSearchTaskHandle != null) {
|
||||
logger.log(Level.INFO, "No more search jobs, stopping periodic search scheduling"); //NON-NLS
|
||||
periodicSearchTaskHandle.cancel(true);
|
||||
periodicSearchTaskRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
commit();
|
||||
|
||||
logger.log(Level.INFO, "Starting periodic searches");
|
||||
final StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
// NOTE: contents of "jobs" ConcurrentHashMap can be modified in stopJob() and endJob() while we are inside this loop
|
||||
for (Iterator<Entry<Long, SearchJobInfo>> iterator = jobs.entrySet().iterator(); iterator.hasNext();) {
|
||||
SearchJobInfo job = iterator.next().getValue();
|
||||
|
||||
if (jobProcessingTaskFuture.isCancelled()) {
|
||||
logger.log(Level.INFO, "Search has been cancelled. Exiting periodic search task."); //NON-NLS
|
||||
periodicSearchTaskRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// If no lists or the worker is already running then skip it
|
||||
if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) {
|
||||
// Spawn a search thread for each job
|
||||
logger.log(Level.INFO, "Executing periodic search for search job {0}", job.getJobId());
|
||||
Searcher searcher = new Searcher(job); // SwingWorker
|
||||
job.setCurrentSearcher(searcher); //save the ref
|
||||
searcher.execute(); //start thread
|
||||
job.setWorkerRunning(true);
|
||||
|
||||
try {
|
||||
// wait for the searcher to finish
|
||||
searcher.get();
|
||||
} catch (InterruptedException | ExecutionException ex) {
|
||||
logger.log(Level.SEVERE, "Error performing keyword search: {0}", ex.getMessage()); //NON-NLS
|
||||
services.postMessage(IngestMessage.createErrorMessage(KeywordSearchModuleFactory.getModuleName(),
|
||||
NbBundle.getMessage(this.getClass(),
|
||||
"SearchRunner.Searcher.done.err.msg"), ex.getMessage()));
|
||||
}// catch and ignore if we were cancelled
|
||||
catch (java.util.concurrent.CancellationException ex) {
|
||||
}
|
||||
}
|
||||
}
|
||||
stopWatch.stop();
|
||||
logger.log(Level.INFO, "All periodic searches cumulatively took {0} secs", stopWatch.getElapsedTimeSecs()); //NON-NLS
|
||||
|
||||
// calculate "hold off" time
|
||||
recalculateUpdateIntervalTime(stopWatch.getElapsedTimeSecs()); // ELDEBUG
|
||||
|
||||
// schedule next PeriodicSearchTask
|
||||
jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
|
||||
|
||||
// exit this thread
|
||||
return;
|
||||
}
|
||||
|
||||
private void recalculateUpdateIntervalTime(long lastSerchTimeSec) {
|
||||
// If periodic search takes more than 1/4 of the current periodic search interval, then double the search interval
|
||||
if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) {
|
||||
return;
|
||||
}
|
||||
// double the search interval
|
||||
currentUpdateIntervalMs = currentUpdateIntervalMs * 2;
|
||||
logger.log(Level.WARNING, "Last periodic search took {0} sec. Increasing search interval to {1} sec", new Object[]{lastSerchTimeSec, currentUpdateIntervalMs / 1000});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Data structure to keep track of keyword lists, current results, and
|
||||
* search running status for each jobid
|
||||
* Task that runs in ScheduledThreadPoolExecutor to periodically start and
|
||||
* wait for keyword search tasks for each keyword search job in progress.
|
||||
* The keyword search tasks for individual ingest jobs are implemented as
|
||||
* SwingWorkers to support legacy APIs.
|
||||
*/
|
||||
private final class PeriodicSearchTask implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
/*
|
||||
* If there are no more jobs or this task has been cancelled, exit.
|
||||
*/
|
||||
if (jobs.isEmpty() || periodicSearchTaskHandle.isCancelled()) {
|
||||
logger.log(Level.INFO, "Periodic search scheduling task has been cancelled, exiting"); //NON-NLS
|
||||
periodicSearchTaskRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Commit the Solr index for the current case before doing the
|
||||
* searches.
|
||||
*/
|
||||
commit();
|
||||
|
||||
/*
|
||||
* Do a keyword search for each ingest job in progress. When the
|
||||
* searches are done, recalculate the "hold off" time between
|
||||
* searches to prevent back-to-back periodic searches and schedule
|
||||
* the nect periodic search task.
|
||||
*/
|
||||
final StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
for (Iterator<Entry<Long, SearchJobInfo>> iterator = jobs.entrySet().iterator(); iterator.hasNext();) {
|
||||
SearchJobInfo job = iterator.next().getValue();
|
||||
|
||||
if (periodicSearchTaskHandle.isCancelled()) {
|
||||
logger.log(Level.INFO, "Periodic search scheduling task has been cancelled, exiting"); //NON-NLS
|
||||
periodicSearchTaskRunning = false;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) {
|
||||
logger.log(Level.INFO, "Starting periodic search for search job {0}", job.getJobId());
|
||||
Searcher searcher = new Searcher(job, false);
|
||||
job.setCurrentSearcher(searcher);
|
||||
searcher.execute();
|
||||
job.setWorkerRunning(true);
|
||||
try {
|
||||
searcher.get();
|
||||
} catch (InterruptedException | ExecutionException ex) {
|
||||
logger.log(Level.SEVERE, String.format("Error performing keyword search for ingest job %d", job.getJobId()), ex); //NON-NLS
|
||||
services.postMessage(IngestMessage.createErrorMessage(
|
||||
KeywordSearchModuleFactory.getModuleName(),
|
||||
NbBundle.getMessage(this.getClass(), "SearchRunner.Searcher.done.err.msg"), ex.getMessage()));
|
||||
} catch (java.util.concurrent.CancellationException ex) {
|
||||
logger.log(Level.SEVERE, String.format("Keyword search for ingest job %d cancelled", job.getJobId()), ex); //NON-NLS
|
||||
}
|
||||
}
|
||||
}
|
||||
stopWatch.stop();
|
||||
logger.log(Level.INFO, "Periodic searches for all ingest jobs cumulatively took {0} secs", stopWatch.getElapsedTimeSecs()); //NON-NLS
|
||||
recalculateUpdateIntervalTime(stopWatch.getElapsedTimeSecs()); // ELDEBUG
|
||||
periodicSearchTaskHandle = periodicSearchTaskExecutor.schedule(new PeriodicSearchTask(), currentUpdateIntervalMs, MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the time interval between periodic keyword searches to avoid
|
||||
* running back-to-back searches. If the most recent round of searches
|
||||
* took longer that 1/4 of the current interval, doubles the interval.
|
||||
*
|
||||
* @param lastSerchTimeSec The time in seconds used to execute the most
|
||||
* recent round of keword searches.
|
||||
*/
|
||||
private void recalculateUpdateIntervalTime(long lastSerchTimeSec) {
|
||||
if (lastSerchTimeSec * 1000 < currentUpdateIntervalMs / 4) {
|
||||
return;
|
||||
}
|
||||
currentUpdateIntervalMs *= 2;
|
||||
logger.log(Level.WARNING, "Last periodic search took {0} sec. Increasing search interval to {1} sec", new Object[]{lastSerchTimeSec, currentUpdateIntervalMs / 1000});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A data structure to keep track of the keyword lists, current results, and
|
||||
* search running status for an ingest job.
|
||||
*/
|
||||
private class SearchJobInfo {
|
||||
|
||||
private final IngestJobContext jobContext;
|
||||
private final long jobId;
|
||||
private final long dataSourceId;
|
||||
// mutable state:
|
||||
private volatile boolean workerRunning;
|
||||
private List<String> keywordListNames; //guarded by SearchJobInfo.this
|
||||
|
||||
// Map of keyword to the object ids that contain a hit
|
||||
private Map<Keyword, Set<Long>> currentResults; //guarded by SearchJobInfo.this
|
||||
@GuardedBy("this")
|
||||
private final List<String> keywordListNames;
|
||||
@GuardedBy("this")
|
||||
private final Map<Keyword, Set<Long>> currentResults; // Keyword to object IDs of items with hits
|
||||
private IngestSearchRunner.Searcher currentSearcher;
|
||||
private AtomicLong moduleReferenceCount = new AtomicLong(0);
|
||||
private final Object finalSearchLock = new Object(); //used for a condition wait
|
||||
private final AtomicLong moduleReferenceCount = new AtomicLong(0);
|
||||
private final Object finalSearchLock = new Object();
|
||||
|
||||
private SearchJobInfo(IngestJobContext jobContext, List<String> keywordListNames) {
|
||||
this.jobContext = jobContext;
|
||||
this.jobId = jobContext.getJobId();
|
||||
this.dataSourceId = jobContext.getDataSource().getId();
|
||||
jobId = jobContext.getJobId();
|
||||
dataSourceId = jobContext.getDataSource().getId();
|
||||
this.keywordListNames = new ArrayList<>(keywordListNames);
|
||||
currentResults = new HashMap<>();
|
||||
workerRunning = false;
|
||||
@ -410,41 +440,40 @@ final class IngestSearchRunner {
|
||||
}
|
||||
|
||||
/**
|
||||
* In case this job still has a worker running, wait for it to finish
|
||||
* Waits for the current search task to complete.
|
||||
*
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void waitForCurrentWorker() throws InterruptedException {
|
||||
synchronized (finalSearchLock) {
|
||||
while (workerRunning) {
|
||||
logger.log(Level.INFO, "Waiting for previous worker to finish"); //NON-NLS
|
||||
finalSearchLock.wait(); //wait() releases the lock
|
||||
logger.log(Level.INFO, "Notified previous worker finished"); //NON-NLS
|
||||
logger.log(Level.INFO, String.format("Waiting for previous search task for job %d to finish", jobId)); //NON-NLS
|
||||
finalSearchLock.wait();
|
||||
logger.log(Level.INFO, String.format("Notified previous search task for job %d to finish", jobId)); //NON-NLS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset workerRunning and wake up thread(s) waiting on finalSearchLock
|
||||
* Signals any threads waiting on the current search task to complete.
|
||||
*/
|
||||
private void searchNotify() {
|
||||
synchronized (finalSearchLock) {
|
||||
logger.log(Level.INFO, "Notifying after finishing search"); //NON-NLS
|
||||
workerRunning = false;
|
||||
finalSearchLock.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Searcher responsible for searching the current index and writing results
|
||||
* to blackboard and the inbox. Also, posts results to listeners as Ingest
|
||||
* data events. Searches entire index, and keeps track of only new results
|
||||
* to report and save. Runs as a background thread.
|
||||
/*
|
||||
* A SwingWorker responsible for searching the Solr index of the current
|
||||
* case for the keywords for an ingest job. Keyword hit analysis results are
|
||||
* created and posted to the blackboard and notifications are sent to the
|
||||
* ingest inbox.
|
||||
*/
|
||||
private final class Searcher extends SwingWorker<Object, Void> {
|
||||
|
||||
/**
|
||||
/*
|
||||
* Searcher has private copies/snapshots of the lists and keywords
|
||||
*/
|
||||
private final SearchJobInfo job;
|
||||
@ -452,31 +481,22 @@ final class IngestSearchRunner {
|
||||
private final List<String> keywordListNames; // lists currently being searched
|
||||
private final List<KeywordList> keywordLists;
|
||||
private final Map<Keyword, KeywordList> keywordToList; //keyword to list name mapping
|
||||
private final boolean usingNetBeansGUI;
|
||||
@ThreadConfined(type = ThreadConfined.ThreadType.AWT)
|
||||
private ProgressHandle progressIndicator;
|
||||
private boolean finalRun = false;
|
||||
|
||||
Searcher(SearchJobInfo job) {
|
||||
Searcher(SearchJobInfo job, boolean finalRun) {
|
||||
this.job = job;
|
||||
this.finalRun = finalRun;
|
||||
keywordListNames = job.getKeywordListNames();
|
||||
keywords = new ArrayList<>();
|
||||
keywordToList = new HashMap<>();
|
||||
keywordLists = new ArrayList<>();
|
||||
//keywords are populated as searcher runs
|
||||
usingNetBeansGUI = RuntimeProperties.runningWithGUI();
|
||||
}
|
||||
|
||||
Searcher(SearchJobInfo job, boolean finalRun) {
|
||||
this(job);
|
||||
this.finalRun = finalRun;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Messages("SearchRunner.query.exception.msg=Error performing query:")
|
||||
protected Object doInBackground() throws Exception {
|
||||
final StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
try {
|
||||
if (usingNetBeansGUI) {
|
||||
/*
|
||||
@ -498,7 +518,9 @@ final class IngestSearchRunner {
|
||||
progressIndicator = ProgressHandle.createHandle(displayName, new Cancellable() {
|
||||
@Override
|
||||
public boolean cancel() {
|
||||
if (progressIndicator != null) {
|
||||
progressIndicator.setDisplayName(displayName + " " + NbBundle.getMessage(this.getClass(), "SearchRunner.doInBackGround.cancelMsg"));
|
||||
}
|
||||
logger.log(Level.INFO, "Search cancelled by user"); //NON-NLS
|
||||
new Thread(() -> {
|
||||
IngestSearchRunner.Searcher.this.cancel(true);
|
||||
@ -568,7 +590,7 @@ final class IngestSearchRunner {
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
logger.log(Level.WARNING, "Error occurred during keyword search", ex); //NON-NLS
|
||||
logger.log(Level.SEVERE, String.format("Error performing keyword search for ingest job %d", job.getJobId()), ex); //NON-NLS
|
||||
} finally {
|
||||
if (progressIndicator != null) {
|
||||
SwingUtilities.invokeLater(new Runnable() {
|
||||
@ -579,8 +601,6 @@ final class IngestSearchRunner {
|
||||
}
|
||||
});
|
||||
}
|
||||
stopWatch.stop();
|
||||
logger.log(Level.INFO, "Searcher took {0} secs to run (final = {1})", new Object[]{stopWatch.getElapsedTimeSecs(), this.finalRun}); //NON-NLS
|
||||
// In case a thread is waiting on this worker to be done
|
||||
job.searchNotify();
|
||||
}
|
||||
@ -681,4 +701,5 @@ final class IngestSearchRunner {
|
||||
return newResults;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2014 Basis Technology Corp.
|
||||
* Copyright 2014-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -25,7 +25,8 @@ import java.util.List;
|
||||
import org.openide.util.NbBundle;
|
||||
import org.openide.util.lookup.ServiceProvider;
|
||||
import org.sleuthkit.autopsy.coreutils.Version;
|
||||
import org.sleuthkit.autopsy.ingest.IngestModuleFactoryAdapter;
|
||||
import org.sleuthkit.autopsy.ingest.AnalysisResultIngestModule;
|
||||
import org.sleuthkit.autopsy.ingest.DataArtifactIngestModule;
|
||||
import org.sleuthkit.autopsy.ingest.FileIngestModule;
|
||||
import org.sleuthkit.autopsy.ingest.IngestModuleFactory;
|
||||
import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettings;
|
||||
@ -37,7 +38,7 @@ import org.sleuthkit.autopsy.ingest.IngestModuleGlobalSettingsPanel;
|
||||
* searching.
|
||||
*/
|
||||
@ServiceProvider(service = IngestModuleFactory.class)
|
||||
public class KeywordSearchModuleFactory extends IngestModuleFactoryAdapter {
|
||||
public class KeywordSearchModuleFactory implements IngestModuleFactory {
|
||||
|
||||
private static final HashSet<String> defaultDisabledKeywordListNames = new HashSet<>(Arrays.asList("Phone Numbers", "IP Addresses", "URLs", "Credit Card Numbers")); //NON-NLS
|
||||
private KeywordSearchJobSettingsPanel jobSettingsPanel = null;
|
||||
@ -121,4 +122,25 @@ public class KeywordSearchModuleFactory extends IngestModuleFactoryAdapter {
|
||||
}
|
||||
return new KeywordSearchIngestModule((KeywordSearchJobSettings) settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDataArtifactIngestModuleFactory() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataArtifactIngestModule createDataArtifactIngestModule(IngestModuleIngestJobSettings settings) {
|
||||
return new KwsDataArtifactIngestModule();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAnalysisResultIngestModuleFactory() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AnalysisResultIngestModule createAnalysisResultIngestModule(IngestModuleIngestJobSettings settings) {
|
||||
return new KwsAnalysisResultIngestModule();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.sleuthkit.autopsy.keywordsearch;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import org.openide.util.Lookup;
|
||||
import org.sleuthkit.autopsy.coreutils.Logger;
|
||||
import org.sleuthkit.autopsy.ingest.AnalysisResultIngestModule;
|
||||
import org.sleuthkit.autopsy.ingest.IngestJobContext;
|
||||
import org.sleuthkit.autopsy.ingest.IngestModule;
|
||||
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService;
|
||||
import org.sleuthkit.datamodel.AnalysisResult;
|
||||
import org.sleuthkit.datamodel.BlackboardArtifact;
|
||||
import org.sleuthkit.datamodel.TskCoreException;
|
||||
|
||||
/**
|
||||
* An analysis result ingest module that indexes text for keyword search. All
|
||||
* keyword searching of indexed text, whether from files, data artifacts, or
|
||||
* analysis results, including the final keyword search of an ingest job, is
|
||||
* done in the last instance of the companion keyword search file ingest module.
|
||||
*/
|
||||
public class KwsAnalysisResultIngestModule implements AnalysisResultIngestModule {
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(KeywordSearchIngestModule.class.getName());
|
||||
private static final int TSK_KEYWORD_HIT_TYPE_ID = BlackboardArtifact.Type.TSK_KEYWORD_HIT.getTypeID();
|
||||
private IngestJobContext context;
|
||||
private KeywordSearchService searchService;
|
||||
|
||||
@Override
|
||||
public void startUp(IngestJobContext context) throws IngestModule.IngestModuleException {
|
||||
this.context = context;
|
||||
searchService = Lookup.getDefault().lookup(KeywordSearchService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IngestModule.ProcessResult process(AnalysisResult result) {
|
||||
try {
|
||||
if (result.getType().getTypeID() != TSK_KEYWORD_HIT_TYPE_ID) {
|
||||
searchService.index(result);
|
||||
}
|
||||
} catch (TskCoreException ex) {
|
||||
LOGGER.log(Level.SEVERE, String.format("Error indexing analysis result '%s' (job ID=%d)", result, context.getJobId()), ex); //NON-NLS
|
||||
return IngestModule.ProcessResult.ERROR;
|
||||
}
|
||||
return IngestModule.ProcessResult.OK;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2021-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.sleuthkit.autopsy.keywordsearch;
|
||||
|
||||
import java.util.logging.Level;
|
||||
import org.openide.util.Lookup;
|
||||
import org.sleuthkit.autopsy.coreutils.Logger;
|
||||
import org.sleuthkit.autopsy.ingest.DataArtifactIngestModule;
|
||||
import org.sleuthkit.autopsy.ingest.IngestJobContext;
|
||||
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService;
|
||||
import org.sleuthkit.datamodel.BlackboardArtifact;
|
||||
import org.sleuthkit.datamodel.DataArtifact;
|
||||
import org.sleuthkit.datamodel.TskCoreException;
|
||||
|
||||
/**
|
||||
* A data artifact ingest module that indexes text for keyword search. All
|
||||
* keyword searching of indexed text, whether from files, data artifacts, or
|
||||
* analysis results, including the final keyword search of an ingest job, is
|
||||
* done in the last instance of the companion keyword search file ingest module.
|
||||
*/
|
||||
public class KwsDataArtifactIngestModule implements DataArtifactIngestModule {
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(KeywordSearchIngestModule.class.getName());
|
||||
private static final int TSK_ASSOCIATED_OBJECT_TYPE_ID = BlackboardArtifact.Type.TSK_ASSOCIATED_OBJECT.getTypeID();
|
||||
private IngestJobContext context;
|
||||
private KeywordSearchService searchService;
|
||||
|
||||
@Override
|
||||
public void startUp(IngestJobContext context) throws IngestModuleException {
|
||||
this.context = context;
|
||||
searchService = Lookup.getDefault().lookup(KeywordSearchService.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProcessResult process(DataArtifact artifact) {
|
||||
try {
|
||||
if (artifact.getType().getTypeID() != TSK_ASSOCIATED_OBJECT_TYPE_ID) {
|
||||
searchService.index(artifact);
|
||||
}
|
||||
} catch (TskCoreException ex) {
|
||||
LOGGER.log(Level.SEVERE, String.format("Error indexing data artifact '%s' (job ID=%d)", artifact, context.getJobId()), ex); //NON-NLS
|
||||
return ProcessResult.ERROR;
|
||||
}
|
||||
return ProcessResult.OK;
|
||||
}
|
||||
|
||||
}
|
@ -28,8 +28,6 @@ import java.util.logging.Level;
|
||||
import javax.swing.SwingUtilities;
|
||||
import javax.swing.SwingWorker;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.netbeans.api.progress.ProgressHandle;
|
||||
import org.netbeans.api.progress.aggregate.ProgressContributor;
|
||||
import org.openide.util.NbBundle;
|
||||
import org.sleuthkit.autopsy.casemodule.Case;
|
||||
import org.sleuthkit.autopsy.casemodule.NoCurrentCaseException;
|
||||
@ -53,8 +51,6 @@ import org.sleuthkit.datamodel.TskCoreException;
|
||||
* about the search hits to the ingest inbox, and publishing an event to notify
|
||||
* subscribers of the blackboard posts.
|
||||
*/
|
||||
|
||||
|
||||
class QueryResults {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(QueryResults.class.getName());
|
||||
@ -141,7 +137,7 @@ class QueryResults {
|
||||
* @param notifyInbox Whether or not to write a message to the ingest
|
||||
* messages inbox if there is a keyword hit in the text
|
||||
* exrtacted from the text source object.
|
||||
* @param saveResults Flag whether to save search results as KWS artifacts.
|
||||
* @param saveResults Whether or not to create keyword hit analysis results.
|
||||
* @param ingestJobId The numeric identifier of the ingest job within which
|
||||
* the artifacts are being created, may be null.
|
||||
*/
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* Autopsy Forensic Browser
|
||||
*
|
||||
* Copyright 2015-2020 Basis Technology Corp.
|
||||
* Copyright 2015-2021 Basis Technology Corp.
|
||||
* Contact: carrier <at> sleuthkit <dot> org
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -18,7 +18,6 @@
|
||||
*/
|
||||
package org.sleuthkit.autopsy.keywordsearch;
|
||||
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Reader;
|
||||
@ -36,14 +35,12 @@ import org.sleuthkit.autopsy.casemodule.Case;
|
||||
import org.sleuthkit.autopsy.casemodule.CaseMetadata;
|
||||
import org.sleuthkit.autopsy.coreutils.FileUtil;
|
||||
import org.sleuthkit.autopsy.coreutils.Logger;
|
||||
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
|
||||
import org.sleuthkit.autopsy.ingest.IngestManager;
|
||||
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService;
|
||||
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchServiceException;
|
||||
import org.sleuthkit.autopsy.progress.ProgressIndicator;
|
||||
import org.sleuthkit.autopsy.textextractors.TextExtractor;
|
||||
import org.sleuthkit.autopsy.textextractors.TextExtractorFactory;
|
||||
import org.sleuthkit.datamodel.Blackboard;
|
||||
import org.sleuthkit.datamodel.BlackboardArtifact;
|
||||
import org.sleuthkit.datamodel.Content;
|
||||
import org.sleuthkit.datamodel.TskCoreException;
|
||||
@ -66,13 +63,10 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService {
|
||||
/**
|
||||
* Indexes the given content for keyword search.
|
||||
*
|
||||
* IMPORTANT: Currently, there are two correct uses for this code:
|
||||
*
|
||||
* 1) Indexing an artifact created during while either the file level ingest
|
||||
* module pipeline or the first stage data source level ingest module
|
||||
* pipeline of an ingest job is running.
|
||||
*
|
||||
* 2) Indexing a report.
|
||||
* IMPORTANT: This indexes the given content, but does not execute a keyword
|
||||
* search. For the text of the content to be searched, the indexing has to
|
||||
* occur either in the context of an ingest job configured for keyword
|
||||
* search, or in the context of an ad hoc keyword search.
|
||||
*
|
||||
* @param content The content to index.
|
||||
*
|
||||
@ -80,19 +74,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService {
|
||||
*/
|
||||
@Override
|
||||
public void index(Content content) throws TskCoreException {
|
||||
/*
|
||||
* TODO (JIRA-1099): The following code has some issues that need to be
|
||||
* resolved. For artifacts, it is assumed that the posting of artifacts
|
||||
* is only occuring during an ingest job with an enabled keyword search
|
||||
* ingest module handling index commits; it also assumes that the
|
||||
* artifacts are only posted by modules in the either the file level
|
||||
* ingest pipeline or the first stage data source level ingest pipeline,
|
||||
* so that the artifacts will be searched during a periodic or final
|
||||
* keyword search. It also assumes that the only other type of Content
|
||||
* for which this API will be called are Reports generated at a time
|
||||
* when doing a commit is required and desirable, i.e., in a context
|
||||
* other than an ingest job.
|
||||
*/
|
||||
if (content == null) {
|
||||
return;
|
||||
}
|
||||
@ -411,28 +392,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Event handler for ArtifactsPostedEvents from SleuthkitCase.
|
||||
*
|
||||
* @param event The ArtifactsPostedEvent to handle.
|
||||
*/
|
||||
@NbBundle.Messages("SolrSearchService.indexingError=Unable to index blackboard artifact.")
|
||||
@Subscribe
|
||||
void handleNewArtifacts(Blackboard.ArtifactsPostedEvent event) {
|
||||
for (BlackboardArtifact artifact : event.getArtifacts()) {
|
||||
if ((artifact.getArtifactTypeID() != BlackboardArtifact.ARTIFACT_TYPE.TSK_KEYWORD_HIT.getTypeID()) && // don't index KWH bc it's based on existing indexed text
|
||||
(artifact.getArtifactTypeID() != BlackboardArtifact.ARTIFACT_TYPE.TSK_ASSOCIATED_OBJECT.getTypeID())){ //don't index AO bc it has only an artifact ID - no useful text
|
||||
try {
|
||||
index(artifact);
|
||||
} catch (TskCoreException ex) {
|
||||
//TODO: is this the right error handling?
|
||||
logger.log(Level.SEVERE, "Unable to index blackboard artifact " + artifact.getArtifactID(), ex); //NON-NLS
|
||||
MessageNotifyUtil.Notify.error(Bundle.SolrSearchService_indexingError(), artifact.getDisplayName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an artifact to the keyword search text index as a concantenation of
|
||||
* all of its attributes.
|
||||
|
@ -1,5 +1,5 @@
|
||||
#Updated by build script
|
||||
#Tue, 30 Nov 2021 17:19:50 -0500
|
||||
#Wed, 01 Dec 2021 12:53:03 -0500
|
||||
LBL_splash_window_title=Starting Autopsy
|
||||
SPLASH_HEIGHT=314
|
||||
SPLASH_WIDTH=538
|
||||
|
@ -1,4 +1,4 @@
|
||||
#Updated by build script
|
||||
#Tue, 30 Nov 2021 17:19:50 -0500
|
||||
#Wed, 01 Dec 2021 12:53:03 -0500
|
||||
CTL_MainWindow_Title=Autopsy 4.19.2
|
||||
CTL_MainWindow_Title_No_Project=Autopsy 4.19.2
|
||||
|
Loading…
x
Reference in New Issue
Block a user