From d014e50437dd93fa13b059f66f2b519e6f291968 Mon Sep 17 00:00:00 2001 From: Eugene Livis Date: Fri, 7 Aug 2020 16:40:25 -0400 Subject: [PATCH] Added periodic task --- .../autopsy/keywordsearch/Bundle.properties | 1 + .../keywordsearch/Bundle.properties-MERGED | 1 + .../autopsy/keywordsearch/Server.java | 112 ++++++++++++++---- .../keywordsearch/SolrSearchService.java | 1 - 4 files changed, 90 insertions(+), 25 deletions(-) diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties index 12a8ceed59..65e480a5dd 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties @@ -194,6 +194,7 @@ Server.request.exception.exception.msg=Could not issue Solr request Server.commit.exception.msg=Could not commit index Server.addDoc.exception.msg=Could not add document to index via update handler: {0} Server.addDoc.exception.msg2=Could not add document to index via update handler: {0} +Server.addDocBatch.exception.msg=Could not add batched documents to index Server.close.exception.msg=Cannot close Core Server.close.exception.msg2=Cannot close Core Server.solrServerNoPortException.msg=Indexing server could not bind to port {0}, port is not available, consider change the default {1} port. diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED index 60feef8c8f..dab9b089f8 100755 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Bundle.properties-MERGED @@ -238,6 +238,7 @@ Server.request.exception.exception.msg=Could not issue Solr request Server.commit.exception.msg=Could not commit index Server.addDoc.exception.msg=Could not add document to index via update handler: {0} Server.addDoc.exception.msg2=Could not add document to index via update handler: {0} +Server.addDocBatch.exception.msg=Could not add document batch to index Server.close.exception.msg=Cannot close Core Server.close.exception.msg2=Cannot close Core Server.solrServerNoPortException.msg=Indexing server could not bind to port {0}, port is not available, consider change the default {1} port. diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java index 0b9d647898..4d702a0d23 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java @@ -18,6 +18,7 @@ */ package org.sleuthkit.autopsy.keywordsearch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.awt.event.ActionEvent; import java.beans.PropertyChangeListener; import java.io.BufferedReader; @@ -42,6 +43,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; @@ -1864,12 +1866,16 @@ public class Server { // We use different Solr clients for different operations. HttpSolrClient is geared towards query performance. // ConcurrentUpdateSolrClient is geared towards batching solr documents for better indexing throughput. // CloudSolrClient is gaered towards SolrCloud deployments. These are only good for collection-specific operations. - private final HttpSolrClient queryClient; - private final SolrClient indexingClient; + private HttpSolrClient queryClient = null; + private SolrClient indexingClient = null; - public final int maxBufferSize; + public final int maxBufferSize; public final List buffer; private final Object bufferLock; + + private ScheduledThreadPoolExecutor periodicTasksExecutor = null; + private static final long PERIODIC_BATCH_SEND_INTERVAL_MINUTES = 10; + private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30; private Collection(String name, Case theCase, Index index) throws TimeoutException, InterruptedException, SolrServerException, IOException { this.name = name; @@ -1896,9 +1902,43 @@ public class Server { // document batching maxBufferSize = org.sleuthkit.autopsy.keywordsearch.UserPreferences.getDocumentsQueueSize(); - org.sleuthkit.autopsy.keywordsearch.UserPreferences.setDocumentsQueueSize(maxBufferSize); // ELTODO remove logger.log(Level.INFO, "Using Solr document queue size = {0}", maxBufferSize); //NON-NLS - buffer = new ArrayList<>(maxBufferSize * 2); // ELTODO + buffer = new ArrayList<>(maxBufferSize); + periodicTasksExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("periodic-batched-document-task-%d").build()); //NON-NLS + periodicTasksExecutor.scheduleWithFixedDelay(new SentBatchedDocumentsTask(), PERIODIC_BATCH_SEND_INTERVAL_MINUTES, PERIODIC_BATCH_SEND_INTERVAL_MINUTES, TimeUnit.MINUTES); + } + + /** + * A task that periodically sends batched documents to Solr. Batched documents + * get sent automatically as soon as the batching buffer is gets full. However, + * if the buffer is not full, we want to periodically send the batched documents + * so that users are able to see them in their keyword searches. + */ + private final class SentBatchedDocumentsTask implements Runnable { + + @Override + public void run() { + List clone; + synchronized (bufferLock) { + + if (buffer.isEmpty()) { + return; + } + + // Buffer is full. Make a clone and release the lock, so that we don't + // hold other ingest threads + clone = buffer.stream().collect(toList()); + buffer.clear(); + } + + try { + // send the cloned list to Solr + logger.log(Level.INFO, "Periodic batched document update"); //NON-NLS + sendBufferedDocs(clone); + } catch (KeywordSearchModuleException ex) { + logger.log(Level.SEVERE, "Periodic batched document update failed", ex); //NON-NLS + } + } } /** @@ -1940,11 +1980,14 @@ public class Server { private void commit() throws SolrServerException { - try { - sendBufferedDocs(buffer); - } catch (KeywordSearchModuleException ex) { - // ELTODO bundle message - throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex); + synchronized (bufferLock) { + try { + // we do a manual commit after ingest is complete, so I + // think there is no need to clone the buffer + sendBufferedDocs(buffer); + } catch (KeywordSearchModuleException ex) { + throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex); + } } try { @@ -1963,7 +2006,14 @@ public class Server { queryClient.deleteByQuery(deleteQuery); } - // ELTODO synchronization + /** + * Add a Solr document for indexing. Documents get batched instead of + * being immediately sent to Solr (unless batch size = 1). + * + * @param doc Solr document to be indexed. + * + * @throws KeywordSearchModuleException + */ void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException { List clone; @@ -1974,37 +2024,38 @@ public class Server { return; } + // Buffer is full. Make a clone and release the lock, so that we don't + // hold other ingest threads clone = buffer.stream().collect(toList()); buffer.clear(); } - // buffer is full + // send the cloned list to Solr sendBufferedDocs(clone); } - // ELTODO synchronization - private synchronized void sendBufferedDocs(List docBuffer) throws KeywordSearchModuleException { + /** + * Send a list of buffered documents to Solr. + * + * @param docBuffer List of buffered Solr documents + * + * @throws KeywordSearchModuleException + */ + private void sendBufferedDocs(List docBuffer) throws KeywordSearchModuleException { if (docBuffer.isEmpty()) { return; } try { - // ELTODO indexingClient.add(doc); indexingClient.add(docBuffer); - } catch (SolrServerException | RemoteSolrException ex) { + } catch (SolrServerException | RemoteSolrException | IOException ex) { logger.log(Level.SEVERE, "Could not add buffered documents to index", ex); //NON-NLS - // ELTODO modify "Server.addDoc.exception.msg" throw new KeywordSearchModuleException( - NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg", "ELTODO BLAH"), ex); //NON-NLS - } catch (IOException ex) { - logger.log(Level.SEVERE, "Could not add buffered documents to index", ex); //NON-NLS - // ELTODO modify "Server.addDoc.exception.msg" - throw new KeywordSearchModuleException( - NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg2", "ELTODO BLAH"), ex); //NON-NLS + NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg"), ex); //NON-NLS } finally { docBuffer.clear(); - } + } } /** @@ -2054,6 +2105,17 @@ public class Server { synchronized void close() throws KeywordSearchModuleException { try { + // stop the periodic batch update task. If the task is already running, + // allow it to finish. + periodicTasksExecutor.shutdown(); + try { + while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) { + logger.log(Level.WARNING, "Waited at least {0} seconds for periodic KWS task executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS + } + } catch (InterruptedException ex) { + logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic KWS task executor", ex); //NON-NLS + } + // We only unload cores for "single-user" cases. if (this.caseType == CaseType.MULTI_USER_CASE) { return; @@ -2069,7 +2131,9 @@ public class Server { } finally { try { queryClient.close(); + queryClient = null; indexingClient.close(); + indexingClient = null; } catch (IOException ex) { throw new KeywordSearchModuleException( NbBundle.getMessage(this.getClass(), "Server.close.exception.msg2"), ex); diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java index 9770025f47..e660f04c80 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/SolrSearchService.java @@ -130,7 +130,6 @@ public class SolrSearchService implements KeywordSearchService, AutopsyService { throw new TskCoreException("Error indexing content", ex1); } } - // ELTODO WHY IS THIS HERE? ingester.commit(); } }