7683 make one ingest job per data source

This commit is contained in:
Richard Cordovano 2021-07-28 14:47:53 -04:00
parent abd903260a
commit ffe10a3052
2 changed files with 18 additions and 12 deletions

View File

@ -220,15 +220,15 @@ public final class IngestJob {
/**
* Gets a snapshot of the progress of this ingest job.
*
* @param getIngestTasksSnapshot Whether or not to include an ingest tasks
* snapshot.
* @param includeIngestTasksSnapshot Whether or not to include ingest task
* stats in the snapshot.
*
* @return The snapshot, will be null if the job is not started yet.
*/
public ProgressSnapshot getSnapshot(boolean getIngestTasksSnapshot) {
public ProgressSnapshot getSnapshot(boolean includeIngestTasksSnapshot) {
ProgressSnapshot snapshot = null;
if (ingestJobPipeline != null) {
return new ProgressSnapshot(getIngestTasksSnapshot);
return new ProgressSnapshot(includeIngestTasksSnapshot);
}
return snapshot;
}
@ -241,7 +241,7 @@ public final class IngestJob {
Snapshot getDiagnosticStatsSnapshot() {
Snapshot snapshot = null;
if (ingestJobPipeline != null) {
snapshot = ingestJobPipeline.getSnapshot(true);
snapshot = ingestJobPipeline.getDiagnosticStatsSnapshot(true);
}
return snapshot;
}
@ -410,13 +410,16 @@ public final class IngestJob {
/**
* Constructs a snapshot of the progress of an ingest job.
*
* @param includeIngestTasksSnapshot Whether or not to include ingest
* task stats in the snapshot.
*/
private ProgressSnapshot(boolean getIngestTasksSnapshot) {
private ProgressSnapshot(boolean includeIngestTasksSnapshot) {
/*
* Note that the getSnapshot() will not construct a ProgressSnapshot
* if ingestJobPipeline is null.
*/
Snapshot snapshot = ingestJobPipeline.getSnapshot(getIngestTasksSnapshot);
Snapshot snapshot = ingestJobPipeline.getDiagnosticStatsSnapshot(includeIngestTasksSnapshot);
dataSourceProcessingSnapshot = new DataSourceProcessingSnapshot(snapshot);
jobCancellationRequested = IngestJob.this.isCancelled();
jobCancellationReason = IngestJob.this.getCancellationReason();

View File

@ -1516,11 +1516,14 @@ final class IngestJobPipeline {
}
/**
* Gets a snapshot of this ingest pipelines current state.
* Gets a snapshot of some basic diagnostic statistics for this ingest
* pipeline.
*
* @return An ingest job statistics object.
* @param includeIngestTasksSnapshot
*
* @return The snapshot.
*/
Snapshot getSnapshot(boolean getIngestTasksSnapshot) {
Snapshot getDiagnosticStatsSnapshot(boolean includeIngestTasksSnapshot) {
/**
* Determine whether file ingest is running at the time of this snapshot
* and determine the earliest file ingest level pipeline start time, if
@ -1542,7 +1545,7 @@ final class IngestJobPipeline {
long estimatedFilesToProcessCount = 0;
long snapShotTime = new Date().getTime();
IngestJobTasksSnapshot tasksSnapshot = null;
if (getIngestTasksSnapshot) {
if (includeIngestTasksSnapshot) {
synchronized (fileIngestProgressLock) {
processedFilesCount = this.processedFiles;
estimatedFilesToProcessCount = this.estimatedFilesToProcess;
@ -1551,7 +1554,7 @@ final class IngestJobPipeline {
tasksSnapshot = taskScheduler.getTasksSnapshotForJob(getIngestJobId());
}
return new Snapshot(dataSource.getName(),
return new Snapshot(dataSource.getName(),
getIngestJobId(), createTime,
getCurrentDataSourceIngestModule(),
fileIngestRunning, fileIngestStartTime,