Fix IngestTasksScheduler, IngestManager concurrency issues

This commit is contained in:
Richard Cordovano 2018-03-19 19:46:01 -04:00
parent 0d4a2315cd
commit 75d4b9ad55

View File

@ -166,7 +166,7 @@ final class IngestTasksScheduler {
*/ */
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) { synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) {
if (!job.isCancelled()) { if (!job.isCancelled()) {
List<FileIngestTask> newTasks = new LinkedList<>(); List<FileIngestTask> newTasksForFileIngestThreads = new LinkedList<>();
for (AbstractFile file : files) { for (AbstractFile file : files) {
/* /*
* Put the file directly into the queue for the file ingest * Put the file directly into the queue for the file ingest
@ -178,7 +178,7 @@ final class IngestTasksScheduler {
*/ */
FileIngestTask task = new FileIngestTask(job, file); FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueFileTask(task)) { if (shouldEnqueueFileTask(task)) {
newTasks.add(task); newTasksForFileIngestThreads.add(task);
} }
/* /*
@ -196,7 +196,7 @@ final class IngestTasksScheduler {
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask); this.directoryFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) { } else if (shouldEnqueueFileTask(childTask)) {
newTasks.add(task); newTasksForFileIngestThreads.add(task);
} }
} }
} }
@ -211,7 +211,7 @@ final class IngestTasksScheduler {
* method is adding derived files from a higher priority task that * method is adding derived files from a higher priority task that
* preceded the tasks currently in the queue. * preceded the tasks currently in the queue.
*/ */
for (FileIngestTask newTask : newTasks) { for (FileIngestTask newTask : newTasksForFileIngestThreads) {
try { try {
this.queuedAndRunningFileTasks.add(newTask); this.queuedAndRunningFileTasks.add(newTask);
this.fileTaskQueueForIngestThreads.addFirst(newTask); this.fileTaskQueueForIngestThreads.addFirst(newTask);
@ -351,7 +351,7 @@ final class IngestTasksScheduler {
* queue more tasks for the file ingest threads. * queue more tasks for the file ingest threads.
*/ */
synchronized private void shuffleFileTaskQueues() { synchronized private void shuffleFileTaskQueues() {
List<FileIngestTask> newTasks = new LinkedList<>(); List<FileIngestTask> newTasksForFileIngestThreads = new LinkedList<>();
while (this.queuedAndRunningFileTasks.isEmpty()) { while (this.queuedAndRunningFileTasks.isEmpty()) {
/* /*
* If the directory file task queue is empty, move the highest * If the directory file task queue is empty, move the highest
@ -375,7 +375,8 @@ final class IngestTasksScheduler {
*/ */
final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast(); final FileIngestTask directoryTask = this.directoryFileTaskQueue.pollLast();
if (shouldEnqueueFileTask(directoryTask)) { if (shouldEnqueueFileTask(directoryTask)) {
newTasks.add(directoryTask); newTasksForFileIngestThreads.add(directoryTask);
this.queuedAndRunningFileTasks.add(directoryTask);
} }
/* /*
@ -395,7 +396,8 @@ final class IngestTasksScheduler {
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.directoryFileTaskQueue.add(childTask); this.directoryFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) { } else if (shouldEnqueueFileTask(childTask)) {
newTasks.add(childTask); newTasksForFileIngestThreads.add(childTask);
this.queuedAndRunningFileTasks.add(childTask);
} }
} }
} }
@ -408,9 +410,8 @@ final class IngestTasksScheduler {
* The files are added to the queue for the ingest threads AFTER the * The files are added to the queue for the ingest threads AFTER the
* higher priority tasks that preceded them. * higher priority tasks that preceded them.
*/ */
for (FileIngestTask newTask : newTasks) { for (FileIngestTask newTask : newTasksForFileIngestThreads) {
try { try {
this.queuedAndRunningFileTasks.add(newTask);
this.fileTaskQueueForIngestThreads.addFirst(newTask); this.fileTaskQueueForIngestThreads.addFirst(newTask);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
this.queuedAndRunningFileTasks.remove(newTask); this.queuedAndRunningFileTasks.remove(newTask);