8157 tiered ingest

This commit is contained in:
Richard Cordovano 2021-12-09 16:02:30 -05:00
parent 5d9e77faa1
commit 7d939a2b89
12 changed files with 1300 additions and 1135 deletions

View File

@ -56,7 +56,7 @@ public class DataSourceIngestModuleProgress {
* @param workUnits Number of work units performed so far by the module.
*/
public void progress(int workUnits) {
ingestJobExecutor.advanceDataSourceIngestProgressBar("", workUnits);
ingestJobExecutor.updateDataSourceIngestProgressBar("", workUnits);
}
/**
@ -65,7 +65,7 @@ public class DataSourceIngestModuleProgress {
* @param message Message to display
*/
public void progress(String message) {
ingestJobExecutor.advanceDataSourceIngestProgressBar(message);
ingestJobExecutor.updateDataSourceIngestProgressBarText(message);
}
/**
@ -76,7 +76,7 @@ public class DataSourceIngestModuleProgress {
* @param workUnits Number of work units performed so far by the module.
*/
public void progress(String currentTask, int workUnits) {
ingestJobExecutor.advanceDataSourceIngestProgressBar(currentTask, workUnits);
ingestJobExecutor.updateDataSourceIngestProgressBar(currentTask, workUnits);
}
}

View File

@ -88,7 +88,7 @@ final class DataSourceIngestPipeline extends IngestPipeline<DataSourceIngestTask
void process(IngestJobExecutor ingestJobExecutor, DataSourceIngestTask task) throws IngestModuleException {
Content dataSource = task.getDataSource();
String progressBarDisplayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.displayName", getDisplayName(), dataSource.getName());
ingestJobExecutor.updateDataSourceIngestProgressBarDisplayName(progressBarDisplayName);
ingestJobExecutor.changeDataSourceIngestProgressBarTitle(progressBarDisplayName);
ingestJobExecutor.switchDataSourceIngestProgressBarToIndeterminate();
ingestManager.setIngestTaskProgress(task, getDisplayName());
logger.log(Level.INFO, "{0} analysis of {1} starting", new Object[]{getDisplayName(), dataSource.getName()}); //NON-NLS

View File

@ -25,11 +25,14 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.openide.util.NbBundle;
import org.python.google.common.collect.ImmutableList;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.AnalysisResult;
import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.DataArtifact;
import org.sleuthkit.datamodel.DataSource;
import sun.security.ec.point.ProjectivePoint;
/**
* Analyzes a data sources using a set of ingest modules specified via ingest
@ -71,7 +74,7 @@ public final class IngestJob {
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
private final static AtomicLong nextId = new AtomicLong(0L);
private final long id;
private final Content dataSource;
private final DataSource dataSource;
private final List<AbstractFile> files = new ArrayList<>();
private final Mode ingestMode;
private final IngestJobSettings settings;
@ -103,8 +106,11 @@ public final class IngestJob {
* @param settings The ingest job settings.
*/
IngestJob(Content dataSource, Mode ingestMode, IngestJobSettings settings) {
if (!(dataSource instanceof DataSource)) { // RJCTODO: Push this to ingest manager?
throw new IllegalArgumentException("dataSource argument does not implement the DataSource interface"); //NON-NLS
}
id = IngestJob.nextId.getAndIncrement();
this.dataSource = dataSource;
this.dataSource = (DataSource) dataSource;
this.settings = settings;
this.ingestMode = ingestMode;
cancellationReason = CancellationReason.NOT_CANCELLED;
@ -125,10 +131,30 @@ public final class IngestJob {
*
* @return The data source.
*/
Content getDataSource() {
DataSource getDataSource() {
return dataSource;
}
/**
* Gets the subset of files from the data source to be analyzed for this
* job.
*
* @return The subset of files or an empty list if all the files in the data
* source shuld be analyzed.
*/
List<AbstractFile> getFiles() {
return ImmutableList.copyOf(files);
}
/**
* Gets the ingest job settings.
*
* @return The settings.
*/
IngestJobSettings getSettings() {
return settings;
}
/**
* Checks to see if this ingest job has at least one non-empty ingest module
* pipeline.
@ -198,6 +224,10 @@ public final class IngestJob {
* scheduling the ingest tasks that make up the job.
*
* @return A collection of ingest module start up errors, empty on success.
*
* @throws InterruptedException The exception is thrown if the current
* thread is interrupted during the start up
* process.
*/
synchronized List<IngestModuleError> start() throws InterruptedException {
if (ingestModuleExecutor != null) {
@ -205,7 +235,7 @@ public final class IngestJob {
return Collections.emptyList();
}
ingestModuleExecutor = new IngestJobExecutor(this, dataSource, files, settings);
ingestModuleExecutor = new IngestJobExecutor(this);
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(ingestModuleExecutor.startUp());
if (errors.isEmpty()) {
@ -255,10 +285,10 @@ public final class IngestJob {
*
* @return The snapshot, will be null if the job is not started yet.
*/
Snapshot getDiagnosticStatsSnapshot() {
Snapshot snapshot = null;
IngestJobProgressSnapshot getDiagnosticStatsSnapshot() {
IngestJobProgressSnapshot snapshot = null;
if (ingestModuleExecutor != null) {
snapshot = ingestModuleExecutor.getDiagnosticStatsSnapshot(true);
snapshot = ingestModuleExecutor.getIngestJobProgressSnapshot(true);
}
return snapshot;
}
@ -353,7 +383,7 @@ public final class IngestJob {
*/
public final class DataSourceProcessingSnapshot {
private final Snapshot snapshot;
private final IngestJobProgressSnapshot snapshot;
/**
* Constructs a snapshot of some basic diagnostic statistics for an
@ -362,7 +392,7 @@ public final class IngestJob {
* of multiple data sources, each of which had its own basic
* diagnostic statistics snapshot.
*/
private DataSourceProcessingSnapshot(Snapshot snapshot) {
private DataSourceProcessingSnapshot(IngestJobProgressSnapshot snapshot) {
this.snapshot = snapshot;
}
@ -445,7 +475,7 @@ public final class IngestJob {
* stats part of the snapshot.
*/
private ProgressSnapshot(boolean includeIngestTasksSnapshot) {
Snapshot snapshot = ingestModuleExecutor.getDiagnosticStatsSnapshot(includeIngestTasksSnapshot);
IngestJobProgressSnapshot snapshot = ingestModuleExecutor.getIngestJobProgressSnapshot(includeIngestTasksSnapshot);
dataSourceProcessingSnapshot = new DataSourceProcessingSnapshot(snapshot);
jobCancellationRequested = IngestJob.this.isCancelled();
jobCancellationReason = IngestJob.this.getCancellationReason();

File diff suppressed because it is too large Load Diff

View File

@ -24,9 +24,9 @@ import java.util.Date;
import java.util.List;
/**
* Stores basic diagnostic statistics for an ingest job.
* A snapshot of the progress of an ingest job.
*/
public final class Snapshot implements Serializable {
public final class IngestJobProgressSnapshot implements Serializable {
private static final long serialVersionUID = 1L;
@ -45,10 +45,9 @@ public final class Snapshot implements Serializable {
transient private final List<String> cancelledDataSourceModules;
/**
* Constructs an object to store basic diagnostic statistics for an ingest
* job.
* Constructs a snapshot of the progress of an ingest job.
*/
Snapshot(String dataSourceName, long jobId, long jobStartTime, DataSourceIngestPipeline.DataSourcePipelineModule dataSourceIngestModule,
IngestJobProgressSnapshot(String dataSourceName, long jobId, long jobStartTime, DataSourceIngestPipeline.DataSourcePipelineModule dataSourceIngestModule,
boolean fileIngestRunning, Date fileIngestStartTime,
boolean jobCancelled, IngestJob.CancellationReason cancellationReason, List<String> cancelledModules,
long processedFiles, long estimatedFilesToProcess,
@ -71,7 +70,7 @@ public final class Snapshot implements Serializable {
}
/**
* Gets time these statistics were collected.
* Gets the time this snapshot was taken.
*
* @return The statistics collection time as number of milliseconds since
* January 1, 1970, 00:00:00 GMT.
@ -81,18 +80,16 @@ public final class Snapshot implements Serializable {
}
/**
* Gets the name of the data source associated with the ingest job that is
* the subject of this snapshot.
* Gets the name of the data source for the ingest job.
*
* @return A data source name string.
* @return The data source name.
*/
String getDataSource() {
return dataSource;
}
/**
* Gets the identifier of the ingest job that is the subject of this
* snapshot.
* Gets the identifier of the ingest job.
*
* @return The ingest job id.
*/
@ -110,68 +107,73 @@ public final class Snapshot implements Serializable {
return jobStartTime;
}
/**
* Gets a handle to the currently running data source level ingest module at
* the time this snapshot was taken.
*
* @return The data source ingest module handle, may be null.
*/
DataSourceIngestPipeline.DataSourcePipelineModule getDataSourceLevelIngestModule() {
return this.dataSourceLevelIngestModule;
}
/**
* Gets whether or not file level analysis was in progress at the time this
* snapshot was taken.
*
* @return True or false.
*/
boolean getFileIngestIsRunning() {
return this.fileIngestRunning;
}
/**
* Gets the time that file level analysis was started.
*
* @return The start time.
*/
// RJCTODO: How is this affected by ingest module tiers?
Date getFileIngestStartTime() {
return new Date(fileIngestStartTime.getTime());
}
/**
* Gets files per second throughput since the ingest job that is the subject
* of this snapshot started.
* Gets files per second throughput since the ingest job started.
*
* @return Files processed per second (approximate).
*/
double getSpeed() {
// RJCTODO: How is this affected by ingest module tiers?
double getFilesProcessedPerSec() {
return (double) processedFiles / ((snapShotTime - jobStartTime) / 1000);
}
/**
* Gets the number of files processed for the job so far.
* Gets the total number of files processed so far.
*
* @return The number of processed files.
*/
// RJCTODO: How is this affected by ingest module tiers?
long getFilesProcessed() {
return processedFiles;
}
/**
* Gets an estimate of the files that still need to be processed for this
* job.
* Gets an estimate of the total number files that need to be processed.
*
* @return The estimate.
*/
// RJCTODO: How is this affected by ingest module tiers?
long getFilesEstimated() {
return estimatedFilesToProcess;
}
long getRootQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getRootQueueSize();
}
long getDirQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getDirQueueSize();
}
long getFileQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getFileQueueSize();
}
/**
* Gets the number of data source level ingest tasks for the ingest job that
* are currently in the data source ingest thread queue of the ingest tasks
* scheduler.
*
* @return The number of data source ingest tasks.
*/
long getDsQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
@ -179,6 +181,38 @@ public final class Snapshot implements Serializable {
return this.tasksSnapshot.getDataSourceQueueSize();
}
/**
* Gets the number of file ingest tasks for the ingest job that are
* currently in the root level queue of the ingest tasks scheduler.
*
* @return The number of file ingest tasks.
*/
long getRootQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getRootQueueSize();
}
/**
* Gets the number of file ingest tasks for the ingest job that are
* currently in the directory level queue of the ingest tasks scheduler.
*
* @return The number of file ingest tasks.
*/
long getDirQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getDirQueueSize();
}
/**
* Gets the number of file ingest tasks for the ingest job that are
* currently in the streamed files queue of the ingest tasks scheduler.
*
* @return The number of file ingest tasks.
*/
long getStreamingQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
@ -186,6 +220,53 @@ public final class Snapshot implements Serializable {
return this.tasksSnapshot.getStreamedFilesQueueSize();
}
/**
* Gets the number of file ingest tasks for the ingest job that are
* currently in the file ingest threads queue of the ingest tasks scheduler.
*
* @return The number of file ingest tasks.
*/
long getFileQueueSize() {
if (null == this.tasksSnapshot) {
return 0;
}
return this.tasksSnapshot.getFileQueueSize();
}
/**
* Gets the number of data artifact ingest tasks for the ingest job that are
* currently in the data artifact ingest thread queue of the ingest tasks
* scheduler.
*
* @return The number of data artifact ingest tasks.
*/
long getDataArtifactTasksQueueSize() {
if (tasksSnapshot == null) {
return 0;
}
return tasksSnapshot.getArtifactsQueueSize();
}
/**
* Gets the number of analysis result ingest tasks for the ingest job that
* are currently in the analysis result ingest thread queue of the ingest
* tasks scheduler.
*
* @return The number of analysis result ingest tasks.
*/
long getAnalysisResultTasksQueueSize() {
if (tasksSnapshot == null) {
return 0;
}
return tasksSnapshot.getResultsQueueSize();
}
/**
* Gets the number of ingest tasks for the ingest job that are currently in
* the tasks in progress list of the ingest tasks scheduler.
*
* @return The number of file ingest tasks.
*/
long getRunningListSize() {
if (null == this.tasksSnapshot) {
return 0;
@ -193,26 +274,17 @@ public final class Snapshot implements Serializable {
return this.tasksSnapshot.getProgressListSize();
}
long getArtifactTasksQueueSize() {
if (tasksSnapshot == null) {
return 0;
}
return tasksSnapshot.getArtifactsQueueSize();
}
long getResultTasksQueueSize() {
if (tasksSnapshot == null) {
return 0;
}
return tasksSnapshot.getResultsQueueSize();
}
/**
* Gets whether or not the job has been cancelled.
*
* @return True or false.
*/
boolean isCancelled() {
return this.jobCancelled;
}
/**
* Gets the reason this job was cancelled.
* Gets the reason the job was cancelled.
*
* @return The cancellation reason, may be not cancelled.
*/
@ -222,7 +294,7 @@ public final class Snapshot implements Serializable {
/**
* Gets a list of the display names of any canceled data source level ingest
* modules
* modules.
*
* @return A list of canceled data source level ingest module display names,
* possibly empty.

View File

@ -993,11 +993,11 @@ public class IngestManager implements IngestProgressSnapshotProvider {
* @return A list of ingest job state snapshots.
*/
@Override
public List<Snapshot> getIngestJobSnapshots() {
List<Snapshot> snapShots = new ArrayList<>();
public List<IngestJobProgressSnapshot> getIngestJobSnapshots() {
List<IngestJobProgressSnapshot> snapShots = new ArrayList<>();
synchronized (ingestJobsById) {
ingestJobsById.values().forEach((job) -> {
Snapshot snapshot = job.getDiagnosticStatsSnapshot();
IngestJobProgressSnapshot snapshot = job.getDiagnosticStatsSnapshot();
if (snapshot != null) {
snapShots.add(snapshot);
}

View File

@ -0,0 +1,176 @@
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import org.python.google.common.collect.ImmutableList;
/**
* A set of ingest module pipelines grouped into a tier for concurrent analysis
* during an ingest job.
*/
class IngestModuleTier {
private DataSourceIngestPipeline dataSourceIngestPipeline;
private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelinesQueue = new LinkedBlockingQueue<>();
private final List<FileIngestPipeline> fileIngestPipelines = new ArrayList<>();
private DataArtifactIngestPipeline dataArtifactIngestPipeline;
private AnalysisResultIngestPipeline analysisResultIngestPipeline;
/**
* Sets the data source ingest pipeline for this tier, if there is one.
*
* @param pipeline The pipeline.
*/
void setDataSourceIngestPipeline(DataSourceIngestPipeline pipeline) {
dataSourceIngestPipeline = pipeline;
}
/**
* Checks to see if there is at least one data source level ingest module in
* this tier.
*
* @return True or false.
*/
boolean hasDataSourceIngestModules() {
return (dataSourceIngestPipeline != null && dataSourceIngestPipeline.isEmpty() == false);
}
/**
* Gets the data source ingest pipeline for this tier, if there is one.
*
* @return The pipeline, in Optional form.
*/
Optional<DataSourceIngestPipeline> getDataSourceIngestPipeline() {
return Optional.ofNullable(dataSourceIngestPipeline);
}
/**
* Sets the file ingest pipelines for this tier, if there are any. All of
* the pipelines should be identical copies, and the number of pipeline
* copies should match the number of file ingest threads in the ingest
* manager.
*
* @param pipelines The pipelines.
*
* @throws InterruptedException The exception is thrown if the current
* thread is interrupted while blocked waiting
* for the pipelines to be added to an internal
* data structure.
*/
void setsFileIngestPipelines(List<FileIngestPipeline> pipelines) throws InterruptedException {
fileIngestPipelines.addAll(pipelines);
for (FileIngestPipeline pipeline : pipelines) {
fileIngestPipelinesQueue.put(pipeline);
}
}
/**
* Checks to see if there is at least one file ingest module in this tier.
*
* @return True or false.
*/
boolean hasFileIngestModules() {
return (!fileIngestPipelines.isEmpty() && !fileIngestPipelines.get(0).isEmpty());
}
/**
* Gets all of the file ingest pipeline copies.
*
* @return The pipeline copies, may be an empty list.
*/
List<FileIngestPipeline> getFileIngestPipelines() {
return ImmutableList.copyOf(fileIngestPipelines);
}
/**
* Gets the next available file ingest pipeline copy for this tier, blocking
* until one becomes available.
*
* @return The pipeline.
*
* @throws InterruptedException The exception is thrown if the current
* thread is interrupted while blocked waiting
* for the next available file ingest pipeline.
*/
FileIngestPipeline takeFileIngestPipeline() throws InterruptedException {
return fileIngestPipelinesQueue.take();
}
/**
* Returns a file ingest pipeline.
*
* @param pipeline The pipeline.
*
* @throws InterruptedException The exception is thrown if the current
* thread is interrupted while blocked waiting
* for pipeline to be stored in an internal
* data structure.
*/
void returnFileIngestPipeleine(FileIngestPipeline pipeline) throws InterruptedException {
fileIngestPipelinesQueue.put(pipeline);
}
/**
* Sets the data artifact ingest pipeline for this tier, if there is one.
*
* @param pipeline The pipeline.
*/
void setDataArtifactIngestPipeline(DataArtifactIngestPipeline pipeline) {
dataArtifactIngestPipeline = pipeline;
}
/**
* Checks to see if there is at least one data artifact ingest module in
* this tier.
*
* @return True or false.
*/
boolean hasDataArtifactIngestModules() {
return (dataArtifactIngestPipeline != null && dataArtifactIngestPipeline.isEmpty() == false);
}
/**
* Gets the data artifact ingest pipeline for this tier, if there is one.
*
* @return The pipeline, in Optional form.
*/
Optional<DataArtifactIngestPipeline> getDataArtifactIngestPipeline() {
return Optional.ofNullable(dataArtifactIngestPipeline);
}
/**
* Sets the analysis result ingest pipeline for this tier, if there is one.
*
* @param pipeline The pipeline.
*/
void setAnalysisResultIngestPipeline(AnalysisResultIngestPipeline pipeline) {
analysisResultIngestPipeline = pipeline;
}
/**
* Checks to see if there is at least one analysis result ingest module in
* this tier.
*
* @return True or false.
*/
boolean hasAnalysisResultIngestModules() {
return (analysisResultIngestPipeline != null && analysisResultIngestPipeline.isEmpty() == false);
}
/**
* Gets the analysis result ingest pipeline for this tier, if there is one.
*
* @return The pipeline, in Optional form.
*/
Optional<AnalysisResultIngestPipeline> getAnalysisResultIngestPipeline() {
return Optional.ofNullable(analysisResultIngestPipeline);
}
}

View File

@ -0,0 +1,232 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2021-2021 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
/**
* A utility that builds the ingest module tiers needed to execute an ingest
* job.
*/
class IngestModuleTierBuilder {
private static final String AUTOPSY_MODULE_PREFIX = "org.sleuthkit.autopsy";
private static final Pattern JYTHON_MODULE_REGEX = Pattern.compile("org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$");
/**
* Builds the ingest module tiers needed to execute an ingest job.
*
* @param settings The ingest job settings.
* @param executor The ingest job executor.
*
* @return The ingest module tiers.
*
* @throws InterruptedException The exception is thrown if the current
* thread is interrupted while blocked during
* the building process.
*/
static List<IngestModuleTier> buildIngestModuleTiers(IngestJobSettings settings, IngestJobExecutor executor) throws InterruptedException {
/*
* Get the enabled ingest module templates from the ingest job settings.
*/
List<IngestModuleTemplate> enabledTemplates = settings.getEnabledIngestModuleTemplates();
/**
* Sort the ingest module templates into buckets based on the module
* types the template can be used to create. A template may go into more
* than one bucket. Each bucket actually consists of two collections:
* one for Java modules and one for Jython modules.
*/
Map<String, IngestModuleTemplate> javaDataSourceModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> jythonDataSourceModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> javaFileModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> jythonFileModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> javaArtifactModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> jythonArtifactModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> javaResultModuleTemplates = new LinkedHashMap<>();
Map<String, IngestModuleTemplate> jythonResultModuleTemplates = new LinkedHashMap<>();
for (IngestModuleTemplate template : enabledTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
addModuleTemplateToSortingMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template);
}
if (template.isFileIngestModuleTemplate()) {
addModuleTemplateToSortingMap(javaFileModuleTemplates, jythonFileModuleTemplates, template);
}
if (template.isDataArtifactIngestModuleTemplate()) {
addModuleTemplateToSortingMap(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template);
}
if (template.isAnalysisResultIngestModuleTemplate()) {
addModuleTemplateToSortingMap(javaResultModuleTemplates, jythonResultModuleTemplates, template);
}
}
/**
* Take the module templates that have pipeline configuration entries
* out of the buckets, and add them to ingest module pipeline templates
* in the order prescribed by the pipeline configuration. There is
* currently no pipeline configuration file support for data artifact or
* analysis result ingest module pipelines.
*/
IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance();
List<IngestModuleTemplate> firstStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> secondStageDataSourcePipelineTemplate = createIngestPipelineTemplate(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> filePipelineTemplate = createIngestPipelineTemplate(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig());
List<IngestModuleTemplate> artifactPipelineTemplate = new ArrayList<>();
List<IngestModuleTemplate> resultsPipelineTemplate = new ArrayList<>();
/**
* Add any ingest module templates remaining in the buckets to the
* appropriate ingest module pipeline templates. Data source level
* ingest modules templates that were not listed in the pipeline
* configuration are added to the first stage data source pipeline
* template, Java modules are added before Jython modules, and Core
* Autopsy modules are added before third party modules.
*/
addToIngestPipelineTemplate(firstStageDataSourcePipelineTemplate, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates);
addToIngestPipelineTemplate(filePipelineTemplate, javaFileModuleTemplates, jythonFileModuleTemplates);
addToIngestPipelineTemplate(artifactPipelineTemplate, javaArtifactModuleTemplates, jythonArtifactModuleTemplates);
addToIngestPipelineTemplate(resultsPipelineTemplate, javaResultModuleTemplates, jythonResultModuleTemplates);
/**
* Construct the ingest module pipelines from the ingest module pipeline
* templates and populate the ingest module tiers.
*/
List<IngestModuleTier> moduleTiers = new ArrayList<>();
IngestModuleTier firstTier = new IngestModuleTier();
int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
List<FileIngestPipeline> fileIngestPipelines = new ArrayList<>();
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
fileIngestPipelines.add(new FileIngestPipeline(executor, filePipelineTemplate));
}
firstTier.setsFileIngestPipelines(fileIngestPipelines);
firstTier.setDataSourceIngestPipeline(new DataSourceIngestPipeline(executor, firstStageDataSourcePipelineTemplate));
firstTier.setDataArtifactIngestPipeline(new DataArtifactIngestPipeline(executor, artifactPipelineTemplate));
firstTier.setAnalysisResultIngestPipeline(new AnalysisResultIngestPipeline(executor, resultsPipelineTemplate));
moduleTiers.add(firstTier);
IngestModuleTier secondTier = new IngestModuleTier();
secondTier.setDataSourceIngestPipeline(new DataSourceIngestPipeline(executor, secondStageDataSourcePipelineTemplate));
moduleTiers.add(secondTier);
return moduleTiers;
}
/**
* Adds an ingest module template to one of two mappings of ingest module
* factory class names to module templates. One mapping is for ingest
* modules imnplemented using Java, and the other is for ingest modules
* implemented using Jython.
*
* @param mapping Mapping for Java ingest module templates.
* @param jythonMapping Mapping for Jython ingest module templates.
* @param template The ingest module template.
*/
private static void addModuleTemplateToSortingMap(Map<String, IngestModuleTemplate> mapping, Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate template) {
String className = template.getModuleFactory().getClass().getCanonicalName();
String jythonName = getModuleNameFromJythonClassName(className);
if (jythonName != null) {
jythonMapping.put(jythonName, template);
} else {
mapping.put(className, template);
}
}
/**
* Extracts a module class name from a Jython module proxy class name. For
* example, a Jython class name such
* "org.python.proxies.GPX_Parser_Module$GPXParserFileIngestModuleFactory$14"
* will be parsed to return
* "GPX_Parser_Module.GPXParserFileIngestModuleFactory."
*
* @param className The canonical class name.
*
* @return The Jython proxu class name or null if the extraction fails.
*/
private static String getModuleNameFromJythonClassName(String className) {
Matcher m = JYTHON_MODULE_REGEX.matcher(className);
if (m.find()) {
return String.format("%s.%s", m.group(1), m.group(2)); //NON-NLS
} else {
return null;
}
}
/**
* Creates an ingest module pipeline template that can be used to construct
* an ingest module pipeline.
*
* @param javaIngestModuleTemplates Ingest module templates for ingest
* modules implemented using Java.
* @param jythonIngestModuleTemplates Ingest module templates for ingest
* modules implemented using Jython.
* @param pipelineConfig An ordered list of the ingest modules
* that belong in the ingest pipeline for
* which the template is being created.
*
* @return An ordered list of ingest module templates, i.e., a template for
* creating ingest module pipelines.
*/
private static List<IngestModuleTemplate> createIngestPipelineTemplate(Map<String, IngestModuleTemplate> javaIngestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) {
List<IngestModuleTemplate> pipelineTemplate = new ArrayList<>();
for (String moduleClassName : pipelineConfig) {
if (javaIngestModuleTemplates.containsKey(moduleClassName)) {
pipelineTemplate.add(javaIngestModuleTemplates.remove(moduleClassName));
} else if (jythonIngestModuleTemplates.containsKey(moduleClassName)) {
pipelineTemplate.add(jythonIngestModuleTemplates.remove(moduleClassName));
}
}
return pipelineTemplate;
}
/**
* Sorts ingest module templates so that core Autopsy ingest modules come
* before third party ingest modules, and ingest modules implemented using
* Java come before ingest modules implemented using Jython.
*
* @param sortedModules The output list to hold the sorted modules.
* @param javaModules The input ingest module templates for modules
* implemented using Java.
* @param jythonModules The ingest module templates for modules implemented
* using Jython.
*/
private static void addToIngestPipelineTemplate(final List<IngestModuleTemplate> sortedModules, final Map<String, IngestModuleTemplate> javaModules, final Map<String, IngestModuleTemplate> jythonModules) {
final List<IngestModuleTemplate> autopsyModules = new ArrayList<>();
final List<IngestModuleTemplate> thirdPartyModules = new ArrayList<>();
Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> {
if (templateEntry.getKey().startsWith(AUTOPSY_MODULE_PREFIX)) {
autopsyModules.add(templateEntry.getValue());
} else {
thirdPartyModules.add(templateEntry.getValue());
}
});
sortedModules.addAll(autopsyModules);
sortedModules.addAll(thirdPartyModules);
}
/**
* Private constructor to prevent instatiation of this utility class.
*/
IngestModuleTierBuilder() {
}
}

View File

@ -182,7 +182,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.artifactsQueued"),
NbBundle.getMessage(this.getClass(), "IngestJobTableModel.colName.resultsQueued")};
private List<Snapshot> jobSnapshots;
private List<IngestJobProgressSnapshot> jobSnapshots;
private IngestJobTableModel() {
refresh();
@ -210,7 +210,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
@Override
public Object getValueAt(int rowIndex, int columnIndex) {
Snapshot snapShot = jobSnapshots.get(rowIndex);
IngestJobProgressSnapshot snapShot = jobSnapshots.get(rowIndex);
Object cellValue;
switch (columnIndex) {
case 0:
@ -227,7 +227,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
cellValue = snapShot.getFilesProcessed();
break;
case 4:
cellValue = snapShot.getSpeed();
cellValue = snapShot.getFilesProcessedPerSec();
break;
case 5:
cellValue = snapShot.getRunningListSize();
@ -248,10 +248,10 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
cellValue = snapShot.getDsQueueSize();
break;
case 11:
cellValue = snapShot.getArtifactTasksQueueSize();
cellValue = snapShot.getDataArtifactTasksQueueSize();
break;
case 12:
cellValue = snapShot.getResultTasksQueueSize();
cellValue = snapShot.getAnalysisResultTasksQueueSize();
break;
default:
cellValue = null;

View File

@ -38,7 +38,7 @@ public interface IngestProgressSnapshotProvider {
*
* @return A list of ingest job snapshots.
*/
List<Snapshot> getIngestJobSnapshots();
List<IngestJobProgressSnapshot> getIngestJobSnapshots();
/**
* Gets the cumulative run times for the ingest module.

View File

@ -33,7 +33,7 @@ import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor;
import org.sleuthkit.autopsy.coreutils.NetworkUtils;
import org.sleuthkit.autopsy.ingest.Snapshot;
import org.sleuthkit.autopsy.ingest.IngestJobProgressSnapshot;
import org.sleuthkit.autopsy.ingest.IngestJob;
import org.sleuthkit.autopsy.ingest.IngestManager.IngestThreadActivitySnapshot;
import org.sleuthkit.autopsy.ingest.IngestProgressSnapshotProvider;
@ -98,7 +98,7 @@ final class AutoIngestJob implements Comparable<AutoIngestJob>, IngestProgressSn
* Version 3 fields.
*/
private List<IngestThreadActivitySnapshot> ingestThreadsSnapshot;
private List<Snapshot> ingestJobsSnapshot;
private List<IngestJobProgressSnapshot> ingestJobsSnapshot;
private Map<String, Long> moduleRunTimesSnapshot;
/*
@ -409,7 +409,7 @@ final class AutoIngestJob implements Comparable<AutoIngestJob>, IngestProgressSn
*
* @param snapshot
*/
synchronized void setIngestJobsSnapshot(List<Snapshot> snapshot) {
synchronized void setIngestJobsSnapshot(List<IngestJobProgressSnapshot> snapshot) {
this.ingestJobsSnapshot = snapshot;
}
@ -643,7 +643,7 @@ final class AutoIngestJob implements Comparable<AutoIngestJob>, IngestProgressSn
}
@Override
public List<Snapshot> getIngestJobSnapshots() {
public List<IngestJobProgressSnapshot> getIngestJobSnapshots() {
return this.ingestJobsSnapshot;
}

View File

@ -37,7 +37,7 @@ import org.sleuthkit.autopsy.datamodel.NodeProperty;
import org.sleuthkit.autopsy.experimental.autoingest.AutoIngestJob.Stage;
import org.sleuthkit.autopsy.guiutils.DurationCellRenderer;
import org.sleuthkit.autopsy.guiutils.StatusIconCellRenderer;
import org.sleuthkit.autopsy.ingest.Snapshot;
import org.sleuthkit.autopsy.ingest.IngestJobProgressSnapshot;
/**
* A node which represents all AutoIngestJobs of a given AutoIngestJobStatus.
@ -98,7 +98,7 @@ final class AutoIngestJobsNode extends AbstractNode {
* they can be changed by events in other threads which
*/
private final Stage jobStage;
private final List<Snapshot> jobSnapshot;
private final List<IngestJobProgressSnapshot> jobSnapshot;
private final Integer jobPriority;
private final Boolean ocrFlag;