7332 artifact pipeline work

This commit is contained in:
Richard Cordovano 2021-06-03 14:57:49 -04:00
parent b89c870629
commit 85ab8d7c3d
2 changed files with 135 additions and 117 deletions

View File

@ -161,7 +161,7 @@ public final class IngestJob {
} }
// Streaming ingest jobs will only have one data source // Streaming ingest jobs will only have one data source
IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next(); IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next();
streamingIngestPipeline.addStreamingIngestFiles(fileObjIds); streamingIngestPipeline.addStreamedFiles(fileObjIds);
} }
/** /**
@ -174,7 +174,7 @@ public final class IngestJob {
} }
// Streaming ingest jobs will only have one data source // Streaming ingest jobs will only have one data source
IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next(); IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next();
streamingIngestPipeline.notifyFileStreamingCompleted(); streamingIngestPipeline.notifyDataSourceReady();
} }
/** /**

View File

@ -56,8 +56,8 @@ import org.sleuthkit.datamodel.DataSource;
/** /**
* A pipeline of ingest modules for analyzing one of the data sources in an * A pipeline of ingest modules for analyzing one of the data sources in an
* ingest job. The ingest modules are actually organized into child pipelines by * ingest job. The ingest modules are organized into child pipelines by ingest
* ingest module type and are run in stages. * module type and are run in stages.
*/ */
final class IngestJobPipeline { final class IngestJobPipeline {
@ -101,10 +101,11 @@ final class IngestJobPipeline {
INITIALIZATION, INITIALIZATION,
/* /*
* The pipeline is running file ingest modules on files streamed to it * The pipeline is running file ingest modules on files streamed to it
* by a data source processor. The data source has not been added to the * by a data source processor and data artifact ingest modules on
* pipeline yet. * artifacts generated by the analysis of the streamed files. The data
* source has not been added to the pipeline yet.
*/ */
FIRST_STAGE_FILES_ONLY, FILE_STREAMING,
/* /*
* The pipeline is running the following three types of ingest modules: * The pipeline is running the following three types of ingest modules:
* higher priority data source level ingest modules, file ingest * higher priority data source level ingest modules, file ingest
@ -203,26 +204,26 @@ final class IngestJobPipeline {
/** /**
* Constructs a pipeline of ingest modules for analyzing one of the data * Constructs a pipeline of ingest modules for analyzing one of the data
* sources in an ingest job. The ingest modules are actually organized into * sources in an ingest job. The ingest modules are organized into child
* child pipelines by ingest module type and are run in stages. * pipelines by ingest module type and are run in stages.
* *
* @param job The ingest job. * @param parentJob The ingest job.
* @param dataSource The data source. * @param dataSource The data source.
* @param settings The ingest job settings. * @param settings The ingest job settings.
* *
* @throws InterruptedException Exception thrown if the thread in which the * @throws InterruptedException Exception thrown if the thread in which the
* pipeline is being created is interrupted. * pipeline is being created is interrupted.
*/ */
IngestJobPipeline(IngestJob job, Content dataSource, IngestJobSettings settings) throws InterruptedException { IngestJobPipeline(IngestJob parentJob, Content dataSource, IngestJobSettings settings) throws InterruptedException {
this(job, dataSource, Collections.emptyList(), settings); this(parentJob, dataSource, Collections.emptyList(), settings);
} }
/** /**
* Constructs a pipeline of ingest modules for analyzing one of the data * Constructs a pipeline of ingest modules for analyzing one of the data
* sources in an ingest job. The ingest modules are actually organized into * sources in an ingest job. The ingest modules are organized into child
* child pipelines by ingest module type and are run in stages. * pipelines by ingest module type and are run in stages.
* *
* @param job The ingest job. * @param parentJob The ingest job.
* @param dataSource The data source. * @param dataSource The data source.
* @param files A subset of the files from the data source. If the list * @param files A subset of the files from the data source. If the list
* is empty, ALL of the files in the data source are an * is empty, ALL of the files in the data source are an
@ -232,11 +233,11 @@ final class IngestJobPipeline {
* @throws InterruptedException Exception thrown if the thread in which the * @throws InterruptedException Exception thrown if the thread in which the
* pipeline is being created is interrupted. * pipeline is being created is interrupted.
*/ */
IngestJobPipeline(IngestJob job, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) throws InterruptedException { IngestJobPipeline(IngestJob parentJob, Content dataSource, List<AbstractFile> files, IngestJobSettings settings) throws InterruptedException {
if (!(dataSource instanceof DataSource)) { if (!(dataSource instanceof DataSource)) {
throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS
} }
parentJob = job; this.parentJob = parentJob;
pipelineId = IngestJobPipeline.nextPipelineId.getAndIncrement(); pipelineId = IngestJobPipeline.nextPipelineId.getAndIncrement();
this.dataSource = (DataSource) dataSource; this.dataSource = (DataSource) dataSource;
this.files = new ArrayList<>(); this.files = new ArrayList<>();
@ -245,22 +246,21 @@ final class IngestJobPipeline {
doUI = RuntimeProperties.runningWithGUI(); doUI = RuntimeProperties.runningWithGUI();
createTime = new Date().getTime(); createTime = new Date().getTime();
stage = Stages.INITIALIZATION; stage = Stages.INITIALIZATION;
createIngestTaskPipelines(); createIngestModulePipelines();
} }
/** /**
* Adds ingest module templates to an output list with core Autopsy modules * Sorts ingest module templates so that core Autopsy ingest modules come
* first and third party modules next. * before third party ingest modules and ingest modules implemented using
* Java come before ingest modules implemented using Jython.
* *
* @param orderedModules The list to populate. * @param sortedModules The output list to hold the sorted modules.
* @param javaModules A map of the input ingest module templates for * @param javaModules The input ingest module templates for modules
* modules implemented using Java, keyed by * implemented using Java.
* fully-qualified (canonical) class name. * @param jythonModules The ingest module templates for modules implemented
* @param jythonModules A map of the input ingest module templates for * using Jython.
* modules implemented using Jython, keyed by
* fully-qualified (canonical) class name.
*/ */
private static void completePipeline(final List<IngestModuleTemplate> orderedModules, final Map<String, IngestModuleTemplate> javaModules, final Map<String, IngestModuleTemplate> jythonModules) { private static void sortModuleTemplates(final List<IngestModuleTemplate> sortedModules, final Map<String, IngestModuleTemplate> javaModules, final Map<String, IngestModuleTemplate> jythonModules) {
final List<IngestModuleTemplate> autopsyModules = new ArrayList<>(); final List<IngestModuleTemplate> autopsyModules = new ArrayList<>();
final List<IngestModuleTemplate> thirdPartyModules = new ArrayList<>(); final List<IngestModuleTemplate> thirdPartyModules = new ArrayList<>();
Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> { Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> {
@ -270,8 +270,8 @@ final class IngestJobPipeline {
thirdPartyModules.add(templateEntry.getValue()); thirdPartyModules.add(templateEntry.getValue());
} }
}); });
orderedModules.addAll(autopsyModules); sortedModules.addAll(autopsyModules);
orderedModules.addAll(thirdPartyModules); sortedModules.addAll(thirdPartyModules);
} }
/** /**
@ -304,7 +304,7 @@ final class IngestJobPipeline {
* @param jythonMapping Mapping for Jython ingest module templates. * @param jythonMapping Mapping for Jython ingest module templates.
* @param template The ingest module template. * @param template The ingest module template.
*/ */
private static void addIngestModuleTemplateToMaps(Map<String, IngestModuleTemplate> mapping, Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate template) { private static void addModuleTemplateToImplLangMap(Map<String, IngestModuleTemplate> mapping, Map<String, IngestModuleTemplate> jythonMapping, IngestModuleTemplate template) {
String className = template.getModuleFactory().getClass().getCanonicalName(); String className = template.getModuleFactory().getClass().getCanonicalName();
String jythonName = getModuleNameFromJythonClassName(className); String jythonName = getModuleNameFromJythonClassName(className);
if (jythonName != null) { if (jythonName != null) {
@ -315,13 +315,12 @@ final class IngestJobPipeline {
} }
/** /**
* Creates the child ingest task pipelines for this ingest pipeline. * Creates the child ingest module pipelines for this ingest pipeline.
* *
* @throws InterruptedException Exception thrown if the thread in which the * @throws InterruptedException Exception thrown if the thread in which the
* task pipelines are being created is * pipeline is being created is interrupted.
* interrupted.
*/ */
private void createIngestTaskPipelines() throws InterruptedException { private void createIngestModulePipelines() throws InterruptedException {
/* /*
* Get the enabled ingest module templates from the ingest job settings. * Get the enabled ingest module templates from the ingest job settings.
* An ingest module template combines an ingest module factory with job * An ingest module template combines an ingest module factory with job
@ -352,13 +351,13 @@ final class IngestJobPipeline {
Map<String, IngestModuleTemplate> jythonArtifactModuleTemplates = new LinkedHashMap<>(); Map<String, IngestModuleTemplate> jythonArtifactModuleTemplates = new LinkedHashMap<>();
for (IngestModuleTemplate template : enabledTemplates) { for (IngestModuleTemplate template : enabledTemplates) {
if (template.isDataSourceIngestModuleTemplate()) { if (template.isDataSourceIngestModuleTemplate()) {
addIngestModuleTemplateToMaps(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template); addModuleTemplateToImplLangMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template);
} }
if (template.isFileIngestModuleTemplate()) { if (template.isFileIngestModuleTemplate()) {
addIngestModuleTemplateToMaps(javaFileModuleTemplates, jythonFileModuleTemplates, template); addModuleTemplateToImplLangMap(javaFileModuleTemplates, jythonFileModuleTemplates, template);
} }
if (template.isDataArtifactIngestModuleTemplate()) { if (template.isDataArtifactIngestModuleTemplate()) {
addIngestModuleTemplateToMaps(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template); addModuleTemplateToImplLangMap(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template);
} }
} }
@ -370,9 +369,9 @@ final class IngestJobPipeline {
* source level and file ingest module pipeline layouts. * source level and file ingest module pipeline layouts.
*/ */
IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance(); IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance();
List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = createPipelineFromConfigFile(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig()); List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = addConfiguredIngestModuleTemplates(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = createPipelineFromConfigFile(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig()); List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = addConfiguredIngestModuleTemplates(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig());
List<IngestModuleTemplate> fileIngestModuleTemplates = createPipelineFromConfigFile(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig()); List<IngestModuleTemplate> fileIngestModuleTemplates = addConfiguredIngestModuleTemplates(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig());
List<IngestModuleTemplate> artifactModuleTemplates = new ArrayList<>(); List<IngestModuleTemplate> artifactModuleTemplates = new ArrayList<>();
/** /**
@ -383,9 +382,9 @@ final class IngestJobPipeline {
* modules, and Core Autopsy modules are added before third party * modules, and Core Autopsy modules are added before third party
* modules. * modules.
*/ */
completePipeline(firstStageDataSourceModuleTemplates, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates); sortModuleTemplates(firstStageDataSourceModuleTemplates, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates);
completePipeline(fileIngestModuleTemplates, javaFileModuleTemplates, jythonFileModuleTemplates); sortModuleTemplates(fileIngestModuleTemplates, javaFileModuleTemplates, jythonFileModuleTemplates);
completePipeline(artifactModuleTemplates, javaArtifactModuleTemplates, jythonArtifactModuleTemplates); sortModuleTemplates(artifactModuleTemplates, javaArtifactModuleTemplates, jythonArtifactModuleTemplates);
/** /**
* Construct the actual ingest task pipelines from the ordered lists. * Construct the actual ingest task pipelines from the ordered lists.
@ -422,7 +421,7 @@ final class IngestJobPipeline {
* @return An ordered list of ingest module templates, i.e., an * @return An ordered list of ingest module templates, i.e., an
* uninstantiated pipeline. * uninstantiated pipeline.
*/ */
private static List<IngestModuleTemplate> createPipelineFromConfigFile(Map<String, IngestModuleTemplate> javaIngestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) { private static List<IngestModuleTemplate> addConfiguredIngestModuleTemplates(Map<String, IngestModuleTemplate> javaIngestModuleTemplates, Map<String, IngestModuleTemplate> jythonIngestModuleTemplates, List<String> pipelineConfig) {
List<IngestModuleTemplate> templates = new ArrayList<>(); List<IngestModuleTemplate> templates = new ArrayList<>();
for (String moduleClassName : pipelineConfig) { for (String moduleClassName : pipelineConfig) {
if (javaIngestModuleTemplates.containsKey(moduleClassName)) { if (javaIngestModuleTemplates.containsKey(moduleClassName)) {
@ -559,12 +558,12 @@ final class IngestJobPipeline {
* @return A collection of ingest module startup errors, empty on success. * @return A collection of ingest module startup errors, empty on success.
*/ */
List<IngestModuleError> startUp() { List<IngestModuleError> startUp() {
List<IngestModuleError> errors = startUpIngestTaskPipelines(); List<IngestModuleError> errors = startUpIngestModulePipelines();
if (errors.isEmpty()) { if (errors.isEmpty()) {
recordIngestJobStartUpInfo(); recordIngestJobStartUpInfo();
if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) { if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) {
if (parentJob.getIngestMode() == IngestJob.Mode.STREAMING) { if (parentJob.getIngestMode() == IngestJob.Mode.STREAMING) {
startFirstStageFilesOnly(); startFileStreaming();
} else { } else {
startFirstStage(); startFirstStage();
} }
@ -626,7 +625,7 @@ final class IngestJobPipeline {
} }
/** /**
* Starts up each of the child ingest task pipelines in this ingest * Starts up each of the child ingest module pipelines in this ingest
* pipeline. * pipeline.
* *
* Note that all of the child pipelines are started so that any and all * Note that all of the child pipelines are started so that any and all
@ -638,12 +637,12 @@ final class IngestJobPipeline {
* *
* @return A list of ingest module startup errors, empty on success. * @return A list of ingest module startup errors, empty on success.
*/ */
private List<IngestModuleError> startUpIngestTaskPipelines() { private List<IngestModuleError> startUpIngestModulePipelines() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(startUpIngestTaskPipeline(firstStageDataSourceIngestPipeline)); errors.addAll(startUpIngestModulePipeline(firstStageDataSourceIngestPipeline));
errors.addAll(startUpIngestTaskPipeline(secondStageDataSourceIngestPipeline)); errors.addAll(startUpIngestModulePipeline(secondStageDataSourceIngestPipeline));
for (FileIngestPipeline pipeline : fileIngestPipelines) { for (FileIngestPipeline pipeline : fileIngestPipelines) {
List<IngestModuleError> filePipelineErrors = startUpIngestTaskPipeline(pipeline); List<IngestModuleError> filePipelineErrors = startUpIngestModulePipeline(pipeline);
if (!filePipelineErrors.isEmpty()) { if (!filePipelineErrors.isEmpty()) {
/* /*
* If one file pipeline copy can't start up, assume that none of * If one file pipeline copy can't start up, assume that none of
@ -653,19 +652,19 @@ final class IngestJobPipeline {
break; break;
} }
} }
errors.addAll(startUpIngestTaskPipeline(artifactIngestPipeline)); errors.addAll(startUpIngestModulePipeline(artifactIngestPipeline));
return errors; return errors;
} }
/** /**
* Starts up an ingest task pipeline. If there are any start up errors, the * Starts up an ingest module pipeline. If there are any start up errors,
* pipeline is immediately shut down. * the pipeline is immediately shut down.
* *
* @param pipeline The ingest task pipeline to start up. * @param pipeline The ingest task pipeline to start up.
* *
* @return A list of ingest module startup errors, empty on success. * @return A list of ingest module startup errors, empty on success.
*/ */
private List<IngestModuleError> startUpIngestTaskPipeline(IngestTaskPipeline<?> pipeline) { private List<IngestModuleError> startUpIngestModulePipeline(IngestTaskPipeline<?> pipeline) {
List<IngestModuleError> startUpErrors = pipeline.startUp(); List<IngestModuleError> startUpErrors = pipeline.startUp();
if (!startUpErrors.isEmpty()) { if (!startUpErrors.isEmpty()) {
List<IngestModuleError> shutDownErrors = pipeline.shutDown(); List<IngestModuleError> shutDownErrors = pipeline.shutDown();
@ -682,6 +681,9 @@ final class IngestJobPipeline {
* already been added to the case database by the data source processor. * already been added to the case database by the data source processor.
*/ */
private void startFirstStage() { private void startFirstStage() {
logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS
stage = Stages.FIRST_STAGE;
/* /*
* Do a count of the files the data source processor has added to the * Do a count of the files the data source processor has added to the
* case database. This estimate will be used for ingest progress * case database. This estimate will be used for ingest progress
@ -710,17 +712,19 @@ final class IngestJobPipeline {
} }
} }
logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS /*
stage = Stages.FIRST_STAGE; * Make the first stage data source level ingest pipeline the current
* data source level pipeline.
*/
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline; currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
/** /*
* Schedule the first stage ingest tasks and then immediately check for * Schedule the first stage ingest tasks and then immediately check for
* stage completion. This is necessary because it is possible that zero * stage completion. This is necessary because it is possible that zero
* tasks will actually make it to task execution due to the file filter * tasks will actually make it to task execution due to the file filter
* or other ingest job settings. In that case, there will never be a * or other ingest job settings. In that case, there will never be a
* stage completion check in an ingest thread executing an ingest task, * stage completion check in an ingest thread executing an ingest task,
* so such a job would run forever without the check here. * so such a job would run forever without a check here.
*/ */
if (!files.isEmpty() && hasFileIngestModules()) { if (!files.isEmpty() && hasFileIngestModules()) {
taskScheduler.scheduleFileIngestTasks(this, files); taskScheduler.scheduleFileIngestTasks(this, files);
@ -733,11 +737,13 @@ final class IngestJobPipeline {
/** /**
* Starts the first stage of this pipeline in streaming mode. In streaming * Starts the first stage of this pipeline in streaming mode. In streaming
* mode, the data source processor streams files into the pipeline as it * mode, the data source processor streams files into the pipeline as it
* adds them to the case database and only adds the data source to the * adds them to the case database and file level analysis can begin before
* pipeline after all of the files have been streamed in. See * data source level analysis.
* addStreamingIngestFiles() and addStreamingIngestDataSource().
*/ */
private void startFirstStageFilesOnly() { private void startFileStreaming() {
logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS
stage = Stages.FILE_STREAMING;
if (doUI) { if (doUI) {
/* /*
* If running with a GUI, start ingest progress bars in the lower * If running with a GUI, start ingest progress bars in the lower
@ -746,9 +752,9 @@ final class IngestJobPipeline {
if (hasFileIngestModules()) { if (hasFileIngestModules()) {
/* /*
* Note that because estimated files remaining to process still * Note that because estimated files remaining to process still
* has its initial value of zero since files are still be added * has its initial value of zero, the progress bar will start in
* to the case database, the progress bar will start in the * the "indeterminate" state. An estimate of the files to
* "indeterminate" state. * process can be computed in
*/ */
startFileIngestProgressBar(); startFileIngestProgressBar();
} }
@ -757,8 +763,6 @@ final class IngestJobPipeline {
} }
} }
logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS
stage = Stages.FIRST_STAGE_FILES_ONLY;
if (hasDataArtifactIngestModules()) { if (hasDataArtifactIngestModules()) {
/* /*
* Schedule artifact ingest tasks for any artifacts currently in the * Schedule artifact ingest tasks for any artifacts currently in the
@ -772,10 +776,14 @@ final class IngestJobPipeline {
} }
/** /**
* Start data source ingest. Used for streaming ingest when the data source * Notifies the ingest pipeline running in streaming mode that the data
* is not ready when ingest starts. * source is now ready for analysis.
*/ */
void notifyFileStreamingCompleted() { void notifyDataSourceReady() {
logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS
stage = IngestJobPipeline.Stages.FIRST_STAGE;
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
/* /*
* Do a count of the files the data source processor has added to the * Do a count of the files the data source processor has added to the
* case database. This estimate will be used for ingest progress * case database. This estimate will be used for ingest progress
@ -788,20 +796,13 @@ final class IngestJobPipeline {
estimatedFilesToProcess = filesToProcess; estimatedFilesToProcess = filesToProcess;
} }
/*
* If running with a GUI, start ingest progress bars in the lower right
* hand corner of the main application window.
*/
if (doUI) { if (doUI) {
if (hasFirstStageDataSourceIngestModules()) { if (hasFirstStageDataSourceIngestModules()) {
startDataSourceIngestProgressBar(); startDataSourceIngestProgressBar();
} }
} }
logInfoMessage("Adding the data source in streaming mode"); //NON-NLS
stage = IngestJobPipeline.Stages.FIRST_STAGE;
currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline; currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
if (hasFirstStageDataSourceIngestModules()) { if (hasFirstStageDataSourceIngestModules()) {
IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this); IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this);
} else { } else {
@ -810,7 +811,7 @@ final class IngestJobPipeline {
* all of the file level and artifact ingest tasks scheduled when * all of the file level and artifact ingest tasks scheduled when
* streaming began have already executed, there will never be a * streaming began have already executed, there will never be a
* stage completion check in an ingest thread executing an ingest * stage completion check in an ingest thread executing an ingest
* task, so such a job would run forever without the check here. * task, so such a job would run forever without a check here.
*/ */
checkForStageCompleted(); checkForStageCompleted();
} }
@ -821,11 +822,13 @@ final class IngestJobPipeline {
*/ */
private void startSecondStage() { private void startSecondStage() {
if (!cancelled && hasSecondStageDataSourceIngestModules()) { if (!cancelled && hasSecondStageDataSourceIngestModules()) {
logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId())); //NON-NLS
stage = IngestJobPipeline.Stages.SECOND_STAGE;
if (doUI) { if (doUI) {
startDataSourceIngestProgressBar(); startDataSourceIngestProgressBar();
} }
logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId())); //NON-NLS
stage = IngestJobPipeline.Stages.SECOND_STAGE;
currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline; currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline;
taskScheduler.scheduleDataSourceIngestTask(this); taskScheduler.scheduleDataSourceIngestTask(this);
} }
@ -917,7 +920,7 @@ final class IngestJobPipeline {
* completed and does a stage transition if they are. * completed and does a stage transition if they are.
*/ */
private void checkForStageCompleted() { private void checkForStageCompleted() {
if (stage == Stages.FIRST_STAGE_FILES_ONLY) { if (stage == Stages.FILE_STREAMING) {
return; return;
} }
if (taskScheduler.currentTasksAreCompleted(this)) { if (taskScheduler.currentTasksAreCompleted(this)) {
@ -939,14 +942,27 @@ final class IngestJobPipeline {
private void finishFirstStage() { private void finishFirstStage() {
logInfoMessage("Finished first stage analysis"); //NON-NLS logInfoMessage("Finished first stage analysis"); //NON-NLS
shutDownIngestTaskPipeline(currentDataSourceIngestPipeline); shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
while (!fileIngestPipelinesQueue.isEmpty()) { while (!fileIngestPipelinesQueue.isEmpty()) {
FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll(); FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll();
shutDownIngestTaskPipeline(pipeline); shutDownIngestModulePipeline(pipeline);
} }
finishProgressBar(dataSourceIngestProgressBar, dataSourceIngestProgressLock); if (doUI) {
finishProgressBar(fileIngestProgressBar, fileIngestProgressLock); synchronized (dataSourceIngestProgressLock) {
if (dataSourceIngestProgressBar != null) {
dataSourceIngestProgressBar.finish();
dataSourceIngestProgressBar = null;
}
}
synchronized (fileIngestProgressLock) {
if (fileIngestProgressBar != null) {
fileIngestProgressBar.finish();
fileIngestProgressBar = null;
}
}
}
if (!cancelled && hasSecondStageDataSourceIngestModules()) { if (!cancelled && hasSecondStageDataSourceIngestModules()) {
startSecondStage(); startSecondStage();
@ -956,19 +972,38 @@ final class IngestJobPipeline {
} }
/** /**
* Shuts down the ingest pipelines and progress bars for this job. * Shuts down the ingest module pipelines and progress bars for this job.
*/ */
private void shutDown() { private void shutDown() {
logInfoMessage("Finished all tasks"); //NON-NLS logInfoMessage("Finished all tasks"); //NON-NLS
stage = IngestJobPipeline.Stages.FINALIZATION; stage = IngestJobPipeline.Stages.FINALIZATION;
shutDownIngestTaskPipeline(currentDataSourceIngestPipeline); shutDownIngestModulePipeline(currentDataSourceIngestPipeline);
shutDownIngestTaskPipeline(artifactIngestPipeline); shutDownIngestModulePipeline(artifactIngestPipeline);
finishProgressBar(dataSourceIngestProgressBar, dataSourceIngestProgressLock); if (doUI) {
finishProgressBar(fileIngestProgressBar, fileIngestProgressLock); synchronized (dataSourceIngestProgressLock) {
finishProgressBar(artifactIngestProgressBar, artifactIngestProgressLock); if (dataSourceIngestProgressBar != null) {
dataSourceIngestProgressBar.finish();
dataSourceIngestProgressBar = null;
}
}
synchronized (fileIngestProgressLock) {
if (fileIngestProgressBar != null) {
fileIngestProgressBar.finish();
fileIngestProgressBar = null;
}
}
synchronized (artifactIngestProgressLock) {
if (artifactIngestProgressBar != null) {
artifactIngestProgressBar.finish();
artifactIngestProgressBar = null;
}
}
}
if (ingestJobInfo != null) { if (ingestJobInfo != null) {
if (cancelled) { if (cancelled) {
try { try {
@ -998,7 +1033,7 @@ final class IngestJobPipeline {
* *
* @param pipeline The pipeline. * @param pipeline The pipeline.
*/ */
private <T extends IngestTask> void shutDownIngestTaskPipeline(IngestTaskPipeline<T> pipeline) { private <T extends IngestTask> void shutDownIngestModulePipeline(IngestTaskPipeline<T> pipeline) {
if (pipeline.isRunning()) { if (pipeline.isRunning()) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(pipeline.shutDown()); errors.addAll(pipeline.shutDown());
@ -1008,23 +1043,6 @@ final class IngestJobPipeline {
} }
} }
/**
* Finishes a progress bar.
*
* @param progress The progress bar.
* @param lock The lock that guards the progress bar.
*/
private void finishProgressBar(ProgressHandle progress, Object lock) {
if (doUI) {
synchronized (lock) {
if (progress != null) {
progress.finish();
progress = null;
}
}
}
}
/** /**
* Passes the data source for the ingest job through the currently active * Passes the data source for the ingest job through the currently active
* data source level ingest task pipeline (first stage or second stage data * data source level ingest task pipeline (first stage or second stage data
@ -1147,14 +1165,14 @@ final class IngestJobPipeline {
} }
/** /**
* Adds some subset of the "streamed" files for a streaming ingest job to * Adds some subset of the streamed files for a streaming mode ingest job to
* this pipeline after startUp() has been called. * this pipeline.
* *
* @param fileObjIds The object IDs of the files. * @param fileObjIds The object IDs of the files.
*/ */
void addStreamingIngestFiles(List<Long> fileObjIds) { void addStreamedFiles(List<Long> fileObjIds) {
if (hasFileIngestModules()) { if (hasFileIngestModules()) {
if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY)) { if (stage.equals(Stages.FILE_STREAMING)) {
IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds);
} else { } else {
logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported");
@ -1170,7 +1188,7 @@ final class IngestJobPipeline {
* @param files A list of the files to add. * @param files A list of the files to add.
*/ */
void addFiles(List<AbstractFile> files) { void addFiles(List<AbstractFile> files) {
if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY) if (stage.equals(Stages.FILE_STREAMING)
|| stage.equals(Stages.FIRST_STAGE)) { || stage.equals(Stages.FIRST_STAGE)) {
taskScheduler.fastTrackFileIngestTasks(this, files); taskScheduler.fastTrackFileIngestTasks(this, files);
} else { } else {
@ -1195,7 +1213,7 @@ final class IngestJobPipeline {
*/ */
void addDataArtifacts(List<DataArtifact> artifacts) { void addDataArtifacts(List<DataArtifact> artifacts) {
List<DataArtifact> artifactsToAnalyze = new ArrayList<>(artifacts); List<DataArtifact> artifactsToAnalyze = new ArrayList<>(artifacts);
if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY) if (stage.equals(Stages.FILE_STREAMING)
|| stage.equals(Stages.FIRST_STAGE) || stage.equals(Stages.FIRST_STAGE)
|| stage.equals(Stages.SECOND_STAGE)) { || stage.equals(Stages.SECOND_STAGE)) {
taskScheduler.scheduleDataArtifactIngestTasks(this, artifactsToAnalyze); taskScheduler.scheduleDataArtifactIngestTasks(this, artifactsToAnalyze);