Added periodic task

This commit is contained in:
Eugene Livis 2020-08-07 16:40:25 -04:00
parent 6307b57149
commit d014e50437
4 changed files with 90 additions and 25 deletions

View File

@ -194,6 +194,7 @@ Server.request.exception.exception.msg=Could not issue Solr request
Server.commit.exception.msg=Could not commit index
Server.addDoc.exception.msg=Could not add document to index via update handler: {0}
Server.addDoc.exception.msg2=Could not add document to index via update handler: {0}
Server.addDocBatch.exception.msg=Could not add batched documents to index
Server.close.exception.msg=Cannot close Core
Server.close.exception.msg2=Cannot close Core
Server.solrServerNoPortException.msg=Indexing server could not bind to port {0}, port is not available, consider change the default {1} port.

View File

@ -238,6 +238,7 @@ Server.request.exception.exception.msg=Could not issue Solr request
Server.commit.exception.msg=Could not commit index
Server.addDoc.exception.msg=Could not add document to index via update handler: {0}
Server.addDoc.exception.msg2=Could not add document to index via update handler: {0}
Server.addDocBatch.exception.msg=Could not add document batch to index
Server.close.exception.msg=Cannot close Core
Server.close.exception.msg2=Cannot close Core
Server.solrServerNoPortException.msg=Indexing server could not bind to port {0}, port is not available, consider change the default {1} port.

View File

@ -18,6 +18,7 @@
*/
package org.sleuthkit.autopsy.keywordsearch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.awt.event.ActionEvent;
import java.beans.PropertyChangeListener;
import java.io.BufferedReader;
@ -42,6 +43,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
@ -1864,12 +1866,16 @@ public class Server {
// We use different Solr clients for different operations. HttpSolrClient is geared towards query performance.
// ConcurrentUpdateSolrClient is geared towards batching solr documents for better indexing throughput.
// CloudSolrClient is gaered towards SolrCloud deployments. These are only good for collection-specific operations.
private final HttpSolrClient queryClient;
private final SolrClient indexingClient;
private HttpSolrClient queryClient = null;
private SolrClient indexingClient = null;
public final int maxBufferSize;
public final int maxBufferSize;
public final List<SolrInputDocument> buffer;
private final Object bufferLock;
private ScheduledThreadPoolExecutor periodicTasksExecutor = null;
private static final long PERIODIC_BATCH_SEND_INTERVAL_MINUTES = 10;
private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
private Collection(String name, Case theCase, Index index) throws TimeoutException, InterruptedException, SolrServerException, IOException {
this.name = name;
@ -1896,9 +1902,43 @@ public class Server {
// document batching
maxBufferSize = org.sleuthkit.autopsy.keywordsearch.UserPreferences.getDocumentsQueueSize();
org.sleuthkit.autopsy.keywordsearch.UserPreferences.setDocumentsQueueSize(maxBufferSize); // ELTODO remove
logger.log(Level.INFO, "Using Solr document queue size = {0}", maxBufferSize); //NON-NLS
buffer = new ArrayList<>(maxBufferSize * 2); // ELTODO
buffer = new ArrayList<>(maxBufferSize);
periodicTasksExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("periodic-batched-document-task-%d").build()); //NON-NLS
periodicTasksExecutor.scheduleWithFixedDelay(new SentBatchedDocumentsTask(), PERIODIC_BATCH_SEND_INTERVAL_MINUTES, PERIODIC_BATCH_SEND_INTERVAL_MINUTES, TimeUnit.MINUTES);
}
/**
* A task that periodically sends batched documents to Solr. Batched documents
* get sent automatically as soon as the batching buffer is gets full. However,
* if the buffer is not full, we want to periodically send the batched documents
* so that users are able to see them in their keyword searches.
*/
private final class SentBatchedDocumentsTask implements Runnable {
@Override
public void run() {
List<SolrInputDocument> clone;
synchronized (bufferLock) {
if (buffer.isEmpty()) {
return;
}
// Buffer is full. Make a clone and release the lock, so that we don't
// hold other ingest threads
clone = buffer.stream().collect(toList());
buffer.clear();
}
try {
// send the cloned list to Solr
logger.log(Level.INFO, "Periodic batched document update"); //NON-NLS
sendBufferedDocs(clone);
} catch (KeywordSearchModuleException ex) {
logger.log(Level.SEVERE, "Periodic batched document update failed", ex); //NON-NLS
}
}
}
/**
@ -1940,11 +1980,14 @@ public class Server {
private void commit() throws SolrServerException {
try {
sendBufferedDocs(buffer);
} catch (KeywordSearchModuleException ex) {
// ELTODO bundle message
throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex);
synchronized (bufferLock) {
try {
// we do a manual commit after ingest is complete, so I
// think there is no need to clone the buffer
sendBufferedDocs(buffer);
} catch (KeywordSearchModuleException ex) {
throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex);
}
}
try {
@ -1963,7 +2006,14 @@ public class Server {
queryClient.deleteByQuery(deleteQuery);
}
// ELTODO synchronization
/**
* Add a Solr document for indexing. Documents get batched instead of
* being immediately sent to Solr (unless batch size = 1).
*
* @param doc Solr document to be indexed.
*
* @throws KeywordSearchModuleException
*/
void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException {
List<SolrInputDocument> clone;
@ -1974,37 +2024,38 @@ public class Server {
return;
}
// Buffer is full. Make a clone and release the lock, so that we don't
// hold other ingest threads
clone = buffer.stream().collect(toList());
buffer.clear();
}
// buffer is full
// send the cloned list to Solr
sendBufferedDocs(clone);
}
// ELTODO synchronization
private synchronized void sendBufferedDocs(List<SolrInputDocument> docBuffer) throws KeywordSearchModuleException {
/**
* Send a list of buffered documents to Solr.
*
* @param docBuffer List of buffered Solr documents
*
* @throws KeywordSearchModuleException
*/
private void sendBufferedDocs(List<SolrInputDocument> docBuffer) throws KeywordSearchModuleException {
if (docBuffer.isEmpty()) {
return;
}
try {
// ELTODO indexingClient.add(doc);
indexingClient.add(docBuffer);
} catch (SolrServerException | RemoteSolrException ex) {
} catch (SolrServerException | RemoteSolrException | IOException ex) {
logger.log(Level.SEVERE, "Could not add buffered documents to index", ex); //NON-NLS
// ELTODO modify "Server.addDoc.exception.msg"
throw new KeywordSearchModuleException(
NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg", "ELTODO BLAH"), ex); //NON-NLS
} catch (IOException ex) {
logger.log(Level.SEVERE, "Could not add buffered documents to index", ex); //NON-NLS
// ELTODO modify "Server.addDoc.exception.msg"
throw new KeywordSearchModuleException(
NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg2", "ELTODO BLAH"), ex); //NON-NLS
NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg"), ex); //NON-NLS
} finally {
docBuffer.clear();
}
}
}
/**
@ -2054,6 +2105,17 @@ public class Server {
synchronized void close() throws KeywordSearchModuleException {
try {
// stop the periodic batch update task. If the task is already running,
// allow it to finish.
periodicTasksExecutor.shutdown();
try {
while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Waited at least {0} seconds for periodic KWS task executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
}
} catch (InterruptedException ex) {
logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic KWS task executor", ex); //NON-NLS
}
// We only unload cores for "single-user" cases.
if (this.caseType == CaseType.MULTI_USER_CASE) {
return;
@ -2069,7 +2131,9 @@ public class Server {
} finally {
try {
queryClient.close();
queryClient = null;
indexingClient.close();
indexingClient = null;
} catch (IOException ex) {
throw new KeywordSearchModuleException(
NbBundle.getMessage(this.getClass(), "Server.close.exception.msg2"), ex);

View File

@ -130,7 +130,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService {
throw new TskCoreException("Error indexing content", ex1);
}
}
// ELTODO WHY IS THIS HERE?
ingester.commit();
}
}