Merge pull request #1439 from APriestman/pipelineShutdown

Prevent shutDown from being called on ingest pipelines that haven't s…
This commit is contained in:
Richard Cordovano 2015-07-19 13:42:07 -04:00
commit 2d4cd6dd69

View File

@ -386,11 +386,13 @@ final class DataSourceIngestJob {
// errors are likely redundant. // errors are likely redundant.
while (!this.fileIngestPipelinesQueue.isEmpty()) { while (!this.fileIngestPipelinesQueue.isEmpty()) {
pipeline = this.fileIngestPipelinesQueue.poll(); pipeline = this.fileIngestPipelinesQueue.poll();
if(pipeline.isRunning()){
List<IngestModuleError> shutDownErrors = pipeline.shutDown(); List<IngestModuleError> shutDownErrors = pipeline.shutDown();
if (!shutDownErrors.isEmpty()) { if (!shutDownErrors.isEmpty()) {
logIngestModuleErrors(shutDownErrors); logIngestModuleErrors(shutDownErrors);
} }
} }
}
break; break;
} }
} }
@ -565,8 +567,10 @@ final class DataSourceIngestJob {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
while (!this.fileIngestPipelinesQueue.isEmpty()) { while (!this.fileIngestPipelinesQueue.isEmpty()) {
FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll(); FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll();
if(pipeline.isRunning()){
errors.addAll(pipeline.shutDown()); errors.addAll(pipeline.shutDown());
} }
}
if (!errors.isEmpty()) { if (!errors.isEmpty()) {
logIngestModuleErrors(errors); logIngestModuleErrors(errors);
} }