From 85ab8d7c3d7b0ab1f7938e55d80325eecdfbc114 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Thu, 3 Jun 2021 14:57:49 -0400 Subject: [PATCH] 7332 artifact pipeline work --- .../sleuthkit/autopsy/ingest/IngestJob.java | 4 +- .../autopsy/ingest/IngestJobPipeline.java | 248 ++++++++++-------- 2 files changed, 135 insertions(+), 117 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index a82c3878b0..9793a57cf1 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -161,7 +161,7 @@ public final class IngestJob { } // Streaming ingest jobs will only have one data source IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next(); - streamingIngestPipeline.addStreamingIngestFiles(fileObjIds); + streamingIngestPipeline.addStreamedFiles(fileObjIds); } /** @@ -174,7 +174,7 @@ public final class IngestJob { } // Streaming ingest jobs will only have one data source IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next(); - streamingIngestPipeline.notifyFileStreamingCompleted(); + streamingIngestPipeline.notifyDataSourceReady(); } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobPipeline.java index d8084ceb9d..c0715c6658 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobPipeline.java @@ -56,8 +56,8 @@ import org.sleuthkit.datamodel.DataSource; /** * A pipeline of ingest modules for analyzing one of the data sources in an - * ingest job. The ingest modules are actually organized into child pipelines by - * ingest module type and are run in stages. + * ingest job. The ingest modules are organized into child pipelines by ingest + * module type and are run in stages. */ final class IngestJobPipeline { @@ -101,10 +101,11 @@ final class IngestJobPipeline { INITIALIZATION, /* * The pipeline is running file ingest modules on files streamed to it - * by a data source processor. The data source has not been added to the - * pipeline yet. + * by a data source processor and data artifact ingest modules on + * artifacts generated by the analysis of the streamed files. The data + * source has not been added to the pipeline yet. */ - FIRST_STAGE_FILES_ONLY, + FILE_STREAMING, /* * The pipeline is running the following three types of ingest modules: * higher priority data source level ingest modules, file ingest @@ -203,26 +204,26 @@ final class IngestJobPipeline { /** * Constructs a pipeline of ingest modules for analyzing one of the data - * sources in an ingest job. The ingest modules are actually organized into - * child pipelines by ingest module type and are run in stages. + * sources in an ingest job. The ingest modules are organized into child + * pipelines by ingest module type and are run in stages. * - * @param job The ingest job. + * @param parentJob The ingest job. * @param dataSource The data source. * @param settings The ingest job settings. * * @throws InterruptedException Exception thrown if the thread in which the * pipeline is being created is interrupted. */ - IngestJobPipeline(IngestJob job, Content dataSource, IngestJobSettings settings) throws InterruptedException { - this(job, dataSource, Collections.emptyList(), settings); + IngestJobPipeline(IngestJob parentJob, Content dataSource, IngestJobSettings settings) throws InterruptedException { + this(parentJob, dataSource, Collections.emptyList(), settings); } /** * Constructs a pipeline of ingest modules for analyzing one of the data - * sources in an ingest job. The ingest modules are actually organized into - * child pipelines by ingest module type and are run in stages. + * sources in an ingest job. The ingest modules are organized into child + * pipelines by ingest module type and are run in stages. * - * @param job The ingest job. + * @param parentJob The ingest job. * @param dataSource The data source. * @param files A subset of the files from the data source. If the list * is empty, ALL of the files in the data source are an @@ -232,11 +233,11 @@ final class IngestJobPipeline { * @throws InterruptedException Exception thrown if the thread in which the * pipeline is being created is interrupted. */ - IngestJobPipeline(IngestJob job, Content dataSource, List files, IngestJobSettings settings) throws InterruptedException { + IngestJobPipeline(IngestJob parentJob, Content dataSource, List files, IngestJobSettings settings) throws InterruptedException { if (!(dataSource instanceof DataSource)) { throw new IllegalArgumentException("Passed dataSource that does not implement the DataSource interface"); //NON-NLS } - parentJob = job; + this.parentJob = parentJob; pipelineId = IngestJobPipeline.nextPipelineId.getAndIncrement(); this.dataSource = (DataSource) dataSource; this.files = new ArrayList<>(); @@ -245,22 +246,21 @@ final class IngestJobPipeline { doUI = RuntimeProperties.runningWithGUI(); createTime = new Date().getTime(); stage = Stages.INITIALIZATION; - createIngestTaskPipelines(); + createIngestModulePipelines(); } /** - * Adds ingest module templates to an output list with core Autopsy modules - * first and third party modules next. + * Sorts ingest module templates so that core Autopsy ingest modules come + * before third party ingest modules and ingest modules implemented using + * Java come before ingest modules implemented using Jython. * - * @param orderedModules The list to populate. - * @param javaModules A map of the input ingest module templates for - * modules implemented using Java, keyed by - * fully-qualified (canonical) class name. - * @param jythonModules A map of the input ingest module templates for - * modules implemented using Jython, keyed by - * fully-qualified (canonical) class name. + * @param sortedModules The output list to hold the sorted modules. + * @param javaModules The input ingest module templates for modules + * implemented using Java. + * @param jythonModules The ingest module templates for modules implemented + * using Jython. */ - private static void completePipeline(final List orderedModules, final Map javaModules, final Map jythonModules) { + private static void sortModuleTemplates(final List sortedModules, final Map javaModules, final Map jythonModules) { final List autopsyModules = new ArrayList<>(); final List thirdPartyModules = new ArrayList<>(); Stream.concat(javaModules.entrySet().stream(), jythonModules.entrySet().stream()).forEach((templateEntry) -> { @@ -270,8 +270,8 @@ final class IngestJobPipeline { thirdPartyModules.add(templateEntry.getValue()); } }); - orderedModules.addAll(autopsyModules); - orderedModules.addAll(thirdPartyModules); + sortedModules.addAll(autopsyModules); + sortedModules.addAll(thirdPartyModules); } /** @@ -304,7 +304,7 @@ final class IngestJobPipeline { * @param jythonMapping Mapping for Jython ingest module templates. * @param template The ingest module template. */ - private static void addIngestModuleTemplateToMaps(Map mapping, Map jythonMapping, IngestModuleTemplate template) { + private static void addModuleTemplateToImplLangMap(Map mapping, Map jythonMapping, IngestModuleTemplate template) { String className = template.getModuleFactory().getClass().getCanonicalName(); String jythonName = getModuleNameFromJythonClassName(className); if (jythonName != null) { @@ -315,13 +315,12 @@ final class IngestJobPipeline { } /** - * Creates the child ingest task pipelines for this ingest pipeline. + * Creates the child ingest module pipelines for this ingest pipeline. * * @throws InterruptedException Exception thrown if the thread in which the - * task pipelines are being created is - * interrupted. + * pipeline is being created is interrupted. */ - private void createIngestTaskPipelines() throws InterruptedException { + private void createIngestModulePipelines() throws InterruptedException { /* * Get the enabled ingest module templates from the ingest job settings. * An ingest module template combines an ingest module factory with job @@ -352,13 +351,13 @@ final class IngestJobPipeline { Map jythonArtifactModuleTemplates = new LinkedHashMap<>(); for (IngestModuleTemplate template : enabledTemplates) { if (template.isDataSourceIngestModuleTemplate()) { - addIngestModuleTemplateToMaps(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template); + addModuleTemplateToImplLangMap(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, template); } if (template.isFileIngestModuleTemplate()) { - addIngestModuleTemplateToMaps(javaFileModuleTemplates, jythonFileModuleTemplates, template); + addModuleTemplateToImplLangMap(javaFileModuleTemplates, jythonFileModuleTemplates, template); } if (template.isDataArtifactIngestModuleTemplate()) { - addIngestModuleTemplateToMaps(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template); + addModuleTemplateToImplLangMap(javaArtifactModuleTemplates, jythonArtifactModuleTemplates, template); } } @@ -370,9 +369,9 @@ final class IngestJobPipeline { * source level and file ingest module pipeline layouts. */ IngestPipelinesConfiguration pipelineConfig = IngestPipelinesConfiguration.getInstance(); - List firstStageDataSourceModuleTemplates = createPipelineFromConfigFile(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig()); - List secondStageDataSourceModuleTemplates = createPipelineFromConfigFile(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig()); - List fileIngestModuleTemplates = createPipelineFromConfigFile(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig()); + List firstStageDataSourceModuleTemplates = addConfiguredIngestModuleTemplates(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageOneDataSourceIngestPipelineConfig()); + List secondStageDataSourceModuleTemplates = addConfiguredIngestModuleTemplates(javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates, pipelineConfig.getStageTwoDataSourceIngestPipelineConfig()); + List fileIngestModuleTemplates = addConfiguredIngestModuleTemplates(javaFileModuleTemplates, jythonFileModuleTemplates, pipelineConfig.getFileIngestPipelineConfig()); List artifactModuleTemplates = new ArrayList<>(); /** @@ -383,9 +382,9 @@ final class IngestJobPipeline { * modules, and Core Autopsy modules are added before third party * modules. */ - completePipeline(firstStageDataSourceModuleTemplates, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates); - completePipeline(fileIngestModuleTemplates, javaFileModuleTemplates, jythonFileModuleTemplates); - completePipeline(artifactModuleTemplates, javaArtifactModuleTemplates, jythonArtifactModuleTemplates); + sortModuleTemplates(firstStageDataSourceModuleTemplates, javaDataSourceModuleTemplates, jythonDataSourceModuleTemplates); + sortModuleTemplates(fileIngestModuleTemplates, javaFileModuleTemplates, jythonFileModuleTemplates); + sortModuleTemplates(artifactModuleTemplates, javaArtifactModuleTemplates, jythonArtifactModuleTemplates); /** * Construct the actual ingest task pipelines from the ordered lists. @@ -422,7 +421,7 @@ final class IngestJobPipeline { * @return An ordered list of ingest module templates, i.e., an * uninstantiated pipeline. */ - private static List createPipelineFromConfigFile(Map javaIngestModuleTemplates, Map jythonIngestModuleTemplates, List pipelineConfig) { + private static List addConfiguredIngestModuleTemplates(Map javaIngestModuleTemplates, Map jythonIngestModuleTemplates, List pipelineConfig) { List templates = new ArrayList<>(); for (String moduleClassName : pipelineConfig) { if (javaIngestModuleTemplates.containsKey(moduleClassName)) { @@ -559,12 +558,12 @@ final class IngestJobPipeline { * @return A collection of ingest module startup errors, empty on success. */ List startUp() { - List errors = startUpIngestTaskPipelines(); + List errors = startUpIngestModulePipelines(); if (errors.isEmpty()) { recordIngestJobStartUpInfo(); if (hasFirstStageDataSourceIngestModules() || hasFileIngestModules() || hasDataArtifactIngestModules()) { if (parentJob.getIngestMode() == IngestJob.Mode.STREAMING) { - startFirstStageFilesOnly(); + startFileStreaming(); } else { startFirstStage(); } @@ -626,7 +625,7 @@ final class IngestJobPipeline { } /** - * Starts up each of the child ingest task pipelines in this ingest + * Starts up each of the child ingest module pipelines in this ingest * pipeline. * * Note that all of the child pipelines are started so that any and all @@ -638,12 +637,12 @@ final class IngestJobPipeline { * * @return A list of ingest module startup errors, empty on success. */ - private List startUpIngestTaskPipelines() { + private List startUpIngestModulePipelines() { List errors = new ArrayList<>(); - errors.addAll(startUpIngestTaskPipeline(firstStageDataSourceIngestPipeline)); - errors.addAll(startUpIngestTaskPipeline(secondStageDataSourceIngestPipeline)); + errors.addAll(startUpIngestModulePipeline(firstStageDataSourceIngestPipeline)); + errors.addAll(startUpIngestModulePipeline(secondStageDataSourceIngestPipeline)); for (FileIngestPipeline pipeline : fileIngestPipelines) { - List filePipelineErrors = startUpIngestTaskPipeline(pipeline); + List filePipelineErrors = startUpIngestModulePipeline(pipeline); if (!filePipelineErrors.isEmpty()) { /* * If one file pipeline copy can't start up, assume that none of @@ -653,19 +652,19 @@ final class IngestJobPipeline { break; } } - errors.addAll(startUpIngestTaskPipeline(artifactIngestPipeline)); + errors.addAll(startUpIngestModulePipeline(artifactIngestPipeline)); return errors; } /** - * Starts up an ingest task pipeline. If there are any start up errors, the - * pipeline is immediately shut down. + * Starts up an ingest module pipeline. If there are any start up errors, + * the pipeline is immediately shut down. * * @param pipeline The ingest task pipeline to start up. * * @return A list of ingest module startup errors, empty on success. */ - private List startUpIngestTaskPipeline(IngestTaskPipeline pipeline) { + private List startUpIngestModulePipeline(IngestTaskPipeline pipeline) { List startUpErrors = pipeline.startUp(); if (!startUpErrors.isEmpty()) { List shutDownErrors = pipeline.shutDown(); @@ -682,6 +681,9 @@ final class IngestJobPipeline { * already been added to the case database by the data source processor. */ private void startFirstStage() { + logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS + stage = Stages.FIRST_STAGE; + /* * Do a count of the files the data source processor has added to the * case database. This estimate will be used for ingest progress @@ -710,17 +712,19 @@ final class IngestJobPipeline { } } - logInfoMessage("Starting first stage analysis in batch mode"); //NON-NLS - stage = Stages.FIRST_STAGE; + /* + * Make the first stage data source level ingest pipeline the current + * data source level pipeline. + */ currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline; - /** + /* * Schedule the first stage ingest tasks and then immediately check for * stage completion. This is necessary because it is possible that zero * tasks will actually make it to task execution due to the file filter * or other ingest job settings. In that case, there will never be a * stage completion check in an ingest thread executing an ingest task, - * so such a job would run forever without the check here. + * so such a job would run forever without a check here. */ if (!files.isEmpty() && hasFileIngestModules()) { taskScheduler.scheduleFileIngestTasks(this, files); @@ -733,11 +737,13 @@ final class IngestJobPipeline { /** * Starts the first stage of this pipeline in streaming mode. In streaming * mode, the data source processor streams files into the pipeline as it - * adds them to the case database and only adds the data source to the - * pipeline after all of the files have been streamed in. See - * addStreamingIngestFiles() and addStreamingIngestDataSource(). + * adds them to the case database and file level analysis can begin before + * data source level analysis. */ - private void startFirstStageFilesOnly() { + private void startFileStreaming() { + logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS + stage = Stages.FILE_STREAMING; + if (doUI) { /* * If running with a GUI, start ingest progress bars in the lower @@ -746,9 +752,9 @@ final class IngestJobPipeline { if (hasFileIngestModules()) { /* * Note that because estimated files remaining to process still - * has its initial value of zero since files are still be added - * to the case database, the progress bar will start in the - * "indeterminate" state. + * has its initial value of zero, the progress bar will start in + * the "indeterminate" state. An estimate of the files to + * process can be computed in */ startFileIngestProgressBar(); } @@ -757,8 +763,6 @@ final class IngestJobPipeline { } } - logInfoMessage("Starting first stage analysis in streaming mode"); //NON-NLS - stage = Stages.FIRST_STAGE_FILES_ONLY; if (hasDataArtifactIngestModules()) { /* * Schedule artifact ingest tasks for any artifacts currently in the @@ -772,10 +776,14 @@ final class IngestJobPipeline { } /** - * Start data source ingest. Used for streaming ingest when the data source - * is not ready when ingest starts. + * Notifies the ingest pipeline running in streaming mode that the data + * source is now ready for analysis. */ - void notifyFileStreamingCompleted() { + void notifyDataSourceReady() { + logInfoMessage("Starting full first stage analysis in streaming mode"); //NON-NLS + stage = IngestJobPipeline.Stages.FIRST_STAGE; + currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline; + /* * Do a count of the files the data source processor has added to the * case database. This estimate will be used for ingest progress @@ -788,20 +796,13 @@ final class IngestJobPipeline { estimatedFilesToProcess = filesToProcess; } - /* - * If running with a GUI, start ingest progress bars in the lower right - * hand corner of the main application window. - */ if (doUI) { if (hasFirstStageDataSourceIngestModules()) { startDataSourceIngestProgressBar(); } } - logInfoMessage("Adding the data source in streaming mode"); //NON-NLS - stage = IngestJobPipeline.Stages.FIRST_STAGE; currentDataSourceIngestPipeline = firstStageDataSourceIngestPipeline; - if (hasFirstStageDataSourceIngestModules()) { IngestJobPipeline.taskScheduler.scheduleDataSourceIngestTask(this); } else { @@ -810,7 +811,7 @@ final class IngestJobPipeline { * all of the file level and artifact ingest tasks scheduled when * streaming began have already executed, there will never be a * stage completion check in an ingest thread executing an ingest - * task, so such a job would run forever without the check here. + * task, so such a job would run forever without a check here. */ checkForStageCompleted(); } @@ -821,11 +822,13 @@ final class IngestJobPipeline { */ private void startSecondStage() { if (!cancelled && hasSecondStageDataSourceIngestModules()) { + logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId())); //NON-NLS + stage = IngestJobPipeline.Stages.SECOND_STAGE; + if (doUI) { startDataSourceIngestProgressBar(); } - logInfoMessage(String.format("Starting second stage ingest task pipelines for %s (objID=%d, jobID=%d)", dataSource.getName(), dataSource.getId(), parentJob.getId())); //NON-NLS - stage = IngestJobPipeline.Stages.SECOND_STAGE; + currentDataSourceIngestPipeline = secondStageDataSourceIngestPipeline; taskScheduler.scheduleDataSourceIngestTask(this); } @@ -917,7 +920,7 @@ final class IngestJobPipeline { * completed and does a stage transition if they are. */ private void checkForStageCompleted() { - if (stage == Stages.FIRST_STAGE_FILES_ONLY) { + if (stage == Stages.FILE_STREAMING) { return; } if (taskScheduler.currentTasksAreCompleted(this)) { @@ -939,14 +942,27 @@ final class IngestJobPipeline { private void finishFirstStage() { logInfoMessage("Finished first stage analysis"); //NON-NLS - shutDownIngestTaskPipeline(currentDataSourceIngestPipeline); + shutDownIngestModulePipeline(currentDataSourceIngestPipeline); while (!fileIngestPipelinesQueue.isEmpty()) { FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll(); - shutDownIngestTaskPipeline(pipeline); + shutDownIngestModulePipeline(pipeline); } - finishProgressBar(dataSourceIngestProgressBar, dataSourceIngestProgressLock); - finishProgressBar(fileIngestProgressBar, fileIngestProgressLock); + if (doUI) { + synchronized (dataSourceIngestProgressLock) { + if (dataSourceIngestProgressBar != null) { + dataSourceIngestProgressBar.finish(); + dataSourceIngestProgressBar = null; + } + } + + synchronized (fileIngestProgressLock) { + if (fileIngestProgressBar != null) { + fileIngestProgressBar.finish(); + fileIngestProgressBar = null; + } + } + } if (!cancelled && hasSecondStageDataSourceIngestModules()) { startSecondStage(); @@ -956,19 +972,38 @@ final class IngestJobPipeline { } /** - * Shuts down the ingest pipelines and progress bars for this job. + * Shuts down the ingest module pipelines and progress bars for this job. */ private void shutDown() { logInfoMessage("Finished all tasks"); //NON-NLS stage = IngestJobPipeline.Stages.FINALIZATION; - shutDownIngestTaskPipeline(currentDataSourceIngestPipeline); - shutDownIngestTaskPipeline(artifactIngestPipeline); + shutDownIngestModulePipeline(currentDataSourceIngestPipeline); + shutDownIngestModulePipeline(artifactIngestPipeline); - finishProgressBar(dataSourceIngestProgressBar, dataSourceIngestProgressLock); - finishProgressBar(fileIngestProgressBar, fileIngestProgressLock); - finishProgressBar(artifactIngestProgressBar, artifactIngestProgressLock); + if (doUI) { + synchronized (dataSourceIngestProgressLock) { + if (dataSourceIngestProgressBar != null) { + dataSourceIngestProgressBar.finish(); + dataSourceIngestProgressBar = null; + } + } + synchronized (fileIngestProgressLock) { + if (fileIngestProgressBar != null) { + fileIngestProgressBar.finish(); + fileIngestProgressBar = null; + } + } + + synchronized (artifactIngestProgressLock) { + if (artifactIngestProgressBar != null) { + artifactIngestProgressBar.finish(); + artifactIngestProgressBar = null; + } + } + } + if (ingestJobInfo != null) { if (cancelled) { try { @@ -998,7 +1033,7 @@ final class IngestJobPipeline { * * @param pipeline The pipeline. */ - private void shutDownIngestTaskPipeline(IngestTaskPipeline pipeline) { + private void shutDownIngestModulePipeline(IngestTaskPipeline pipeline) { if (pipeline.isRunning()) { List errors = new ArrayList<>(); errors.addAll(pipeline.shutDown()); @@ -1008,23 +1043,6 @@ final class IngestJobPipeline { } } - /** - * Finishes a progress bar. - * - * @param progress The progress bar. - * @param lock The lock that guards the progress bar. - */ - private void finishProgressBar(ProgressHandle progress, Object lock) { - if (doUI) { - synchronized (lock) { - if (progress != null) { - progress.finish(); - progress = null; - } - } - } - } - /** * Passes the data source for the ingest job through the currently active * data source level ingest task pipeline (first stage or second stage data @@ -1147,14 +1165,14 @@ final class IngestJobPipeline { } /** - * Adds some subset of the "streamed" files for a streaming ingest job to - * this pipeline after startUp() has been called. + * Adds some subset of the streamed files for a streaming mode ingest job to + * this pipeline. * * @param fileObjIds The object IDs of the files. */ - void addStreamingIngestFiles(List fileObjIds) { + void addStreamedFiles(List fileObjIds) { if (hasFileIngestModules()) { - if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY)) { + if (stage.equals(Stages.FILE_STREAMING)) { IngestJobPipeline.taskScheduler.scheduleStreamedFileIngestTasks(this, fileObjIds); } else { logErrorMessage(Level.SEVERE, "Adding streaming files to job during stage " + stage.toString() + " not supported"); @@ -1170,7 +1188,7 @@ final class IngestJobPipeline { * @param files A list of the files to add. */ void addFiles(List files) { - if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY) + if (stage.equals(Stages.FILE_STREAMING) || stage.equals(Stages.FIRST_STAGE)) { taskScheduler.fastTrackFileIngestTasks(this, files); } else { @@ -1195,7 +1213,7 @@ final class IngestJobPipeline { */ void addDataArtifacts(List artifacts) { List artifactsToAnalyze = new ArrayList<>(artifacts); - if (stage.equals(Stages.FIRST_STAGE_FILES_ONLY) + if (stage.equals(Stages.FILE_STREAMING) || stage.equals(Stages.FIRST_STAGE) || stage.equals(Stages.SECOND_STAGE)) { taskScheduler.scheduleDataArtifactIngestTasks(this, artifactsToAnalyze);