7332 artifact pipeline work

This commit is contained in:
Richard Cordovano 2021-06-02 09:44:08 -04:00
parent d34a5860cf
commit 13d15d51ef
3 changed files with 101 additions and 139 deletions

View File

@ -174,7 +174,7 @@ public final class IngestJob {
}
// Streaming ingest jobs will only have one data source
IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next();
streamingIngestPipeline.addStreamingIngestDataSource();
streamingIngestPipeline.notifyFileStreamingCompleted();
}
/**

View File

@ -72,7 +72,7 @@ final class IngestJobPipeline {
/*
* A regular expression for identifying the proxy classes Jython generates
* for ingest module factories classes written using Python. For example:
* for ingest module factories written using Python. For example:
* org.python.proxies.GPX_Parser_Module$GPXParserFileIngestModuleFactory$14
*/
private static final Pattern JYTHON_MODULE_REGEX = Pattern.compile("org\\.python\\.proxies\\.(.+?)\\$(.+?)(\\$[0-9]*)?$");
@ -81,7 +81,7 @@ final class IngestJobPipeline {
* These fields define an ingest pipeline: the parent ingest job, a pipeline
* ID, the user's ingest job settings, and the data source to be analyzed.
* Optionally, there is a set of files to be analyzed, instead of analyzing
* ALL of the files in the data source.
* all of the files in the data source.
*
* The pipeline ID is used to associate the pipeline with its ingest tasks.
* The ingest job ID cannot be used for this purpose because the parent
@ -126,21 +126,19 @@ final class IngestJobPipeline {
*/
FINALIZATION
};
@GuardedBy("stageTransitionLock")
private Stages stage = IngestJobPipeline.Stages.INITIALIZATION;
private volatile Stages stage = IngestJobPipeline.Stages.INITIALIZATION;
private final Object stageTransitionLock = new Object();
/**
/*
* An ingest pipeline has separate data source level ingest task pipelines
* for the first and second stages. Longer running, lower priority modules
* belong in the second stage pipeline.
*/
private final Object dataSourceIngestPipelineLock = new Object();
private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
private DataSourceIngestPipeline currentDataSourceIngestPipeline;
private volatile DataSourceIngestPipeline currentDataSourceIngestPipeline;
/**
/*
* An ingest pipeline has a collection of identical file ingest task
* pipelines, one for each file ingest thread in the ingest manager. The
* ingest threads take ingest task pipelines as they need them and return
@ -156,9 +154,9 @@ final class IngestJobPipeline {
*/
private DataArtifactIngestPipeline artifactIngestPipeline;
/**
/*
* An ingest pipeline supports cancellation of just its currently running
* data source level ingest task pipeline or cancellation of ALL of its
* data source level ingest task pipeline or cancellation of all of its
* child ingest task pipelines. Cancellation works by setting flags that are
* checked by the ingest task pipelines every time they transition from one
* module to another. Modules are also expected to check these flags (via
@ -174,15 +172,11 @@ final class IngestJobPipeline {
/*
* An ingest pipeline interacts with the ingest task scheduler to create and
* queue ingest tasks and to determine whether or not there are ingest tasks
* still to be executed so that the pipeline can transition through its
* stages. The ingest modules in the pipeline can schedule ingest tasks as
* well (via the ingest job context). For example, a file carving module can
* add carved files to the ingest job and most modules will add data
* artifacts to the ingest job.
* still to be executed.
*/
private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
/**
/*
* If running in a GUI, an ingest pipeline reports progress and allows a
* user to cancel either an individual data source level ingest module or
* all of its ingest tasks using progress bars in the lower right hand
@ -200,6 +194,11 @@ final class IngestJobPipeline {
private ProgressHandle fileIngestProgressBar;
private final Object artifactIngestProgressLock = new Object();
private ProgressHandle artifactIngestProgressBar;
/*
* Ingest pipeline details are tracked using this object and are recorded in
* the case database.
*/
private volatile IngestJobInfo ingestJobInfo;
/**
@ -269,10 +268,12 @@ final class IngestJobPipeline {
* first and third party modules next.
*
* @param orderedModules The list to populate.
* @param javaModules The input ingest module templates for modules
* implemented using Java.
* @param jythonModules The input ingest module templates for modules
* implemented using Jython.
* @param javaModules A map of the input ingest module templates for
* modules implemented using Java, keyed by
* fully-qualified (canonical) class name.
* @param jythonModules A map of the input ingest module templates for
* modules implemented using Jython, keyed by
* fully-qualified (canonical) class name.
*/
private static void completePipeline(final List<IngestModuleTemplate> orderedModules, final Map<String, IngestModuleTemplate> javaModules, final Map<String, IngestModuleTemplate> jythonModules) {
final List<IngestModuleTemplate> autopsyModules = new ArrayList<>();
@ -349,8 +350,8 @@ final class IngestJobPipeline {
* Sort the ingest module templates into buckets based on the module
* types the ingest module factory can create. A template may go into
* more than one bucket. The buckets are actually maps of ingest module
* factory class names to ingest module templates. The maps are used to
* go from an ingest module factory class name read from the pipeline
* factory class names to ingest module templates. These maps are used
* to go from an ingest module factory class name read from the pipeline
* configuration file to the corresponding ingest module template.
*
* There are actually two maps for each module type bucket. One map is
@ -430,7 +431,7 @@ final class IngestJobPipeline {
* module templates.
* @param pipelineConfig An ordered list of ingest module
* factory class names representing an
* ingest pipeline, read form the
* ingest pipeline, read from the
* pipeline configuration file.
*
* @return An ordered list of ingest module templates, i.e., an
@ -457,15 +458,6 @@ final class IngestJobPipeline {
return pipelineId;
}
/**
* Gets the parent ingest job of this ingest pipeline.
*
* @return The ingest job.
*/
IngestJob getIngestJob() {
return job;
}
/**
* Gets the ingest execution context name.
*
@ -484,16 +476,6 @@ final class IngestJobPipeline {
return dataSource;
}
/**
* Gets the subset of the files from the data source to be analyzed by this
* ingest pipeline.
*
* @return The files.
*/
List<AbstractFile> getFiles() {
return Collections.unmodifiableList(files);
}
/**
* Queries whether or not unallocated space should be processed by this
* ingest pipeline.
@ -523,32 +505,46 @@ final class IngestJobPipeline {
return hasFileIngestModules()
|| hasFirstStageDataSourceIngestModules()
|| hasSecondStageDataSourceIngestModules()
|| hasArtifactIngestModules();
|| hasDataArtifactIngestModules();
}
/**
* Checks to see if this ingest pipeline has at least one ingest module to
* run.
*
* @return True or false.
*/
boolean hasDataSourceIngestModules() {
if (stage == Stages.SECOND_STAGE) {
return hasSecondStageDataSourceIngestModules();
} else {
return hasFirstStageDataSourceIngestModules();
}
}
/**
* Checks to see if this ingest pipeline has at least one first stage data
* source level ingest modules.
* source level ingest module to run.
*
* @return True or false.
*/
boolean hasFirstStageDataSourceIngestModules() {
private boolean hasFirstStageDataSourceIngestModules() {
return (firstStageDataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if this ingest pipeline has at least one second stage data
* source level ingest module.
* source level ingest module to run.
*
* @return True or false.
*/
boolean hasSecondStageDataSourceIngestModules() {
private boolean hasSecondStageDataSourceIngestModules() {
return (secondStageDataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if this ingest pipeline has at least one file ingest
* module.
* Checks to see if this ingest pipeline has at least one file ingest module
* to run.
*
* @return True or false.
*/
@ -563,12 +559,12 @@ final class IngestJobPipeline {
}
/**
* Checks to see if this ingest pipeline has at least one artifact ingest
* module.
* Checks to see if this ingest pipeline has at least one data artifact
* ingest module to run.
*
* @return True or false.
*/
boolean hasArtifactIngestModules() {
boolean hasDataArtifactIngestModules() {
return (artifactIngestPipeline.isEmpty() == false);
}
@ -581,7 +577,7 @@ final class IngestJobPipeline {
List<IngestModuleError> errors = startUpIngestTaskPipelines();
if (errors.isEmpty()) {
recordIngestJobStartUpInfo();
if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasArtifactIngestModules()) {
if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
if (job.getIngestMode() == IngestJob.Mode.STREAMING) {
startFirstStageInStreamingMode();
} else {
@ -701,13 +697,12 @@ final class IngestJobPipeline {
* already been added to the case database by the data source processor.
*/
private void startFirstStage() {
/*
* Do a count of the files the data source processor has added to the
* case database. This estimate will be used for ingest progress
* snapshots and for the file ingest progress bar if running with a GUI.
*/
if (hasFileIngestModules()) {
/*
* Do a count of the files the data source processor has added to
* the case database. This estimate will be used for ingest progress
* snapshots and for the file ingest progress bar if running with a
* GUI.
*/
long filesToProcess = dataSource.accept(new GetFilesCountVisitor());;
synchronized (fileIngestProgressLock) {
estimatedFilesToProcess = filesToProcess;
@ -719,28 +714,21 @@ final class IngestJobPipeline {
* hand corner of the main application window.
*/
if (doUI) {
if (hasFirstStageDataSourceIngestModules()) {
startDataSourceIngestProgressBar();
}
if (hasFileIngestModules()) {
startFileIngestProgressBar();
}
if (hasArtifactIngestModules()) {
if (hasFirstStageDataSourceIngestModules()) {
startDataSourceIngestProgressBar();
}
if (hasDataArtifactIngestModules()) {
startArtifactIngestProgressBar();
}
}
/*
* Make the first stage data source level ingest pipeline the current
* data source level pipeline.
*/
synchronized (dataSourceIngestPipelineLock) {
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
}
synchronized (stageTransitionLock) {
logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS
stage = Stages.FIRST_STAGE;
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
/**
* Schedule the first stage ingest tasks and then immediately check
@ -751,7 +739,11 @@ final class IngestJobPipeline {
* executing an ingest task, so such a job would run forever without
* the check here.
*/
taskScheduler.scheduleIngestTasks(this);
if (!files.isEmpty() && hasFileIngestModules()) {
taskScheduler.scheduleFileIngestTasks(this, files);
} else if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
taskScheduler.scheduleIngestTasks(this);
}
checkForStageCompleted();
}
}
@ -764,31 +756,21 @@ final class IngestJobPipeline {
* addStreamingIngestFiles() and addStreamingIngestDataSource().
*/
private void startFirstStageInStreamingMode() {
if (hasFileIngestModules()) {
synchronized (fileIngestProgressLock) {
/*
* Start with zero to signal an unknown value. This estimate
* will be used for ingest progress snapshots and for the file
* ingest progress bar if running with a GUI.
*/
estimatedFilesToProcess = 0;
}
}
/*
* If running with a GUI, start ingest progress bars in the lower right
* hand corner of the main application window.
*/
if (doUI) {
/*
* If running with a GUI, start ingest progress bars in the lower
* right hand corner of the main application window.
*/
if (hasFileIngestModules()) {
/*
* Note that because estimated files remaining to process has
* been set to zero, the progress bar will start in the
* Note that because estimated files remaining to process still
* has its initial value of zero since files are still be added
* to the case database, the progress bar will start in the
* "indeterminate" state.
*/
startFileIngestProgressBar();
}
if (hasArtifactIngestModules()) {
if (hasDataArtifactIngestModules()) {
startArtifactIngestProgressBar();
}
}
@ -796,7 +778,7 @@ final class IngestJobPipeline {
synchronized (stageTransitionLock) {
logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS
stage = Stages.FIRST_STAGE_STREAMING;
if (hasArtifactIngestModules()) {
if (hasDataArtifactIngestModules()) {
/*
* Schedule artifact ingest tasks for any artifacts currently in
* the case database. This needs to be done before any files or
@ -813,7 +795,7 @@ final class IngestJobPipeline {
* Start data source ingest. Used for streaming ingest when the data source
* is not ready when ingest starts.
*/
void addStreamingIngestDataSource() {
void notifyFileStreamingCompleted() {
/*
* Do a count of the files the data source processor has added to the
* case database. This estimate will be used for ingest progress
@ -836,17 +818,11 @@ final class IngestJobPipeline {
}
}
/**
* Make the first stage data source level ingest pipeline the current
* data source level pipeline.
*/
synchronized (this.dataSourceIngestPipelineLock) {
this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline;
}
synchronized (stageTransitionLock) {
logInfoMessage("Adding the data source in streaming mode"); //NON-NLS
stage = IngestJobPipeline.Stages.FIRST_STAGE;
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
if (hasFirstStageDataSourceIngestModules()) {
IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
} else {
@ -870,12 +846,10 @@ final class IngestJobPipeline {
if (doUI) {
startDataSourceIngestProgressBar();
}
synchronized (dataSourceIngestPipelineLock) {
currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline;
}
synchronized (stageTransitionLock) {
logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), job.getId())); //NON-NLS
stage = IngestJobPipeline.Stages.SECOND_STAGE;
currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline;
taskScheduler.scheduleDataSourceIngestTask(this);
}
}
@ -1087,29 +1061,13 @@ final class IngestJobPipeline {
*/
void execute(DataSourceIngestTask task) {
try {
synchronized (dataSourceIngestPipelineLock) {
if (!isCancelled() && !currentDataSourceIngestPipeline.isEmpty()) {
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(currentDataSourceIngestPipeline.executeTask(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
if (!isCancelled()) {
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(currentDataSourceIngestPipeline.executeTask(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
}
if (doUI) {
/**
* Shut down the data source ingest progress bar right away.
* Data source-level processing is finished for this stage.
*/
synchronized (dataSourceIngestProgressLock) {
if (dataSourceIngestProgressBar != null) {
dataSourceIngestProgressBar.finish();
dataSourceIngestProgressBar = null;
}
}
}
} finally {
taskScheduler.notifyTaskCompleted(task);
checkForStageCompleted();
@ -1425,7 +1383,7 @@ final class IngestJobPipeline {
* @return The currently running module, may be null.
*/
DataSourceIngestPipeline.DataSourcePipelineModule getCurrentDataSourceIngestModule() {
if (null != currentDataSourceIngestPipeline) {
if (currentDataSourceIngestPipeline != null) {
return (DataSourceIngestPipeline.DataSourcePipelineModule) currentDataSourceIngestPipeline.getCurrentlyRunningModule();
} else {
return null;

View File

@ -125,15 +125,11 @@ final class IngestTasksScheduler {
}
/**
* Schedules a data source level ingest task, plus ingest tasks for any
* files and artifacts associated with the data source that are currently in
* the case database. The data source is obtained from the ingest pipeline
* passed in.
*
* Scheduling these tasks atomically means that it is valid to call
* currentTasksAreCompleted() immediately afterwards. Also note that the
* file filter for the job is obtained from the ingest pipeline and its
* application may cause some or even all of the file tasks to be discarded.
* Schedules ingest tasks based on the types of ingest modules that the
* ingest pipeline that will exedute tasks has. Scheduling these tasks
* atomically means that it is valid to call currentTasksAreCompleted()
* immediately after calling this method. Note that the may cause some or
* even all of any file tasks to be discarded.
*
* @param ingestPipeline The ingest pipeline that will execute the scheduled
* tasks. A reference to the pipeline is added to each
@ -144,9 +140,15 @@ final class IngestTasksScheduler {
*/
synchronized void scheduleIngestTasks(IngestJobPipeline ingestPipeline) {
if (!ingestPipeline.isCancelled()) {
scheduleDataSourceIngestTask(ingestPipeline);
scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
scheduleDataArtifactIngestTasks(ingestPipeline);
if (ingestPipeline.hasDataSourceIngestModules()) {
scheduleDataSourceIngestTask(ingestPipeline);
}
if (ingestPipeline.hasFileIngestModules()) {
scheduleFileIngestTasks(ingestPipeline, Collections.emptyList());
}
if (ingestPipeline.hasDataArtifactIngestModules()) {
scheduleDataArtifactIngestTasks(ingestPipeline);
}
}
}
@ -312,7 +314,9 @@ final class IngestTasksScheduler {
* execute() method is called, execute() can pass the
* target Content of the task to the pipeline for
* processing by the pipeline's ingest modules.
* @param artifacts The artifacts.
* @param artifacts A subset of the data artifacts from the data
* source; if empty, then all of the data artifacts
* from the data source will be scheduled.
*/
synchronized void scheduleDataArtifactIngestTasks(IngestJobPipeline ingestPipeline, List<DataArtifact> artifacts) {
if (!ingestPipeline.isCancelled()) {