Merge pull request #6850 from rcordovano/7090-ingest-mgr-concurrent-modification-ex

7090 Fix concurrent mod ex in IngestManager
This commit is contained in:
Richard Cordovano 2021-03-31 15:30:12 -04:00 committed by GitHub
commit 4c32218780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -125,7 +125,9 @@ public class IngestManager implements IngestProgressSnapshotProvider {
private final int numberOfFileIngestThreads;
private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
@GuardedBy("startIngestJobFutures")
private final Map<Long, Future<Void>> startIngestJobFutures = new ConcurrentHashMap<>();
@GuardedBy("ingestJobsById")
private final Map<Long, IngestJob> ingestJobsById = new HashMap<>();
private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
private final ExecutorService fileLevelIngestJobTasksExecutor;
@ -338,7 +340,9 @@ public class IngestManager implements IngestProgressSnapshotProvider {
if (job.hasIngestPipeline()) {
long taskId = nextIngestManagerTaskId.incrementAndGet();
Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
startIngestJobFutures.put(taskId, task);
synchronized (startIngestJobFutures) {
startIngestJobFutures.put(taskId, task);
}
}
}
}
@ -357,7 +361,9 @@ public class IngestManager implements IngestProgressSnapshotProvider {
if (job.hasIngestPipeline()) {
long taskId = nextIngestManagerTaskId.incrementAndGet();
Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
startIngestJobFutures.put(taskId, task);
synchronized (startIngestJobFutures) {
startIngestJobFutures.put(taskId, task);
}
}
}
}
@ -518,9 +524,11 @@ public class IngestManager implements IngestProgressSnapshotProvider {
* @param reason The cancellation reason.
*/
public void cancelAllIngestJobs(IngestJob.CancellationReason reason) {
startIngestJobFutures.values().forEach((handle) -> {
handle.cancel(true);
});
synchronized (startIngestJobFutures) {
startIngestJobFutures.values().forEach((handle) -> {
handle.cancel(true);
});
}
synchronized (ingestJobsById) {
this.ingestJobsById.values().forEach((job) -> {
job.cancel(reason);
@ -939,8 +947,10 @@ public class IngestManager implements IngestProgressSnapshotProvider {
if (progress != null) {
progress.setDisplayName(NbBundle.getMessage(this.getClass(), "IngestManager.StartIngestJobsTask.run.cancelling", displayName));
}
Future<?> handle = startIngestJobFutures.remove(threadId);
handle.cancel(true);
synchronized (startIngestJobFutures) {
Future<?> handle = startIngestJobFutures.remove(threadId);
handle.cancel(true);
}
return true;
}
});
@ -954,7 +964,9 @@ public class IngestManager implements IngestProgressSnapshotProvider {
if (null != progress) {
progress.finish();
}
startIngestJobFutures.remove(threadId);
synchronized (startIngestJobFutures) {
startIngestJobFutures.remove(threadId);
}
}
}