batch processor update

This commit is contained in:
Greg DiCristofaro 2023-07-21 11:02:59 -04:00
parent eb7bbfd75c
commit 61bb6785fc

View File

@ -17,10 +17,13 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@ -53,13 +56,6 @@ public class BatchProcessor<T> {
batchingQueue.clear(); batchingQueue.clear();
} }
public synchronized void flush(boolean blockUntilFinished) throws InterruptedException {
asyncProcessBatch();
if (blockUntilFinished) {
lastProcessingFuture.wait(millisTimeout);
}
}
public synchronized void add(T item) throws InterruptedException { public synchronized void add(T item) throws InterruptedException {
batchingQueue.add(item); batchingQueue.add(item);
if (batchingQueue.size() >= batchSize) { if (batchingQueue.size() >= batchSize) {
@ -67,14 +63,29 @@ public class BatchProcessor<T> {
} }
} }
public synchronized void flush(boolean blockUntilFinished) throws InterruptedException {
asyncProcessBatch();
if (blockUntilFinished) {
waitCurrentFuture();
}
}
private synchronized void waitCurrentFuture() throws InterruptedException {
synchronized (lastProcessingFuture) {
if (!lastProcessingFuture.isDone()) {
try {
lastProcessingFuture.get(millisTimeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException ex) {
// ignore timeout
}
}
}
}
private synchronized void asyncProcessBatch() throws InterruptedException { private synchronized void asyncProcessBatch() throws InterruptedException {
if (!batchingQueue.isEmpty()) { if (!batchingQueue.isEmpty()) {
// wait for previous processing to finish // wait for previous processing to finish
synchronized (lastProcessingFuture) { waitCurrentFuture();
if (!lastProcessingFuture.isDone()) {
lastProcessingFuture.wait(millisTimeout);
}
}
// if 'andThen' doesn't run, clear the processing queue // if 'andThen' doesn't run, clear the processing queue
processingQueue.clear(); processingQueue.clear();