From 522d651670ac1a724518585a58d0b4100825c968 Mon Sep 17 00:00:00 2001 From: Eugene Livis Date: Tue, 1 Sep 2020 10:33:36 -0400 Subject: [PATCH 1/2] Solr4 batching --- .../autopsy/keywordsearch/Server.java | 142 +++++++++++++++++- .../keywordsearch/UserPreferences.java | 76 ++++++++++ 2 files changed, 217 insertions(+), 1 deletion(-) create mode 100755 KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/UserPreferences.java diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java index 05a02bf11b..b46a804baf 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java @@ -18,6 +18,7 @@ */ package org.sleuthkit.autopsy.keywordsearch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.awt.event.ActionEvent; import java.beans.PropertyChangeListener; import java.io.BufferedReader; @@ -42,9 +43,11 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; +import static java.util.stream.Collectors.toList; import javax.swing.AbstractAction; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrRequest; @@ -1468,6 +1471,15 @@ public class Server { // the server to access a core needs to be built from a URL with the // core in it, and is only good for core-specific operations private final HttpSolrServer solrCore; + + private final int maxBufferSize; + private final List buffer; + private final Object bufferLock; + + private ScheduledThreadPoolExecutor periodicTasksExecutor = null; + private static final long PERIODIC_BATCH_SEND_INTERVAL_MINUTES = 10; + private static final int NUM_BATCH_UPDATE_RETRIES = 10; + private static final long SLEEP_BETWEEN_RETRIES_MS = 10000; // 10 seconds private final int QUERY_TIMEOUT_MILLISECONDS = 86400000; // 24 Hours = 86,400,000 Milliseconds @@ -1475,6 +1487,7 @@ public class Server { this.name = name; this.caseType = caseType; this.textIndex = index; + bufferLock = new Object(); this.solrCore = new HttpSolrServer(currentSolrServer.getBaseURL() + "/" + name); //NON-NLS @@ -1490,7 +1503,45 @@ public class Server { solrCore.setAllowCompression(true); solrCore.setParser(new XMLResponseParser()); // binary parser is used by default + // document batching + maxBufferSize = org.sleuthkit.autopsy.keywordsearch.UserPreferences.getDocumentsQueueSize(); + logger.log(Level.INFO, "Using Solr document queue size = {0}", maxBufferSize); //NON-NLS + buffer = new ArrayList<>(maxBufferSize); + periodicTasksExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("periodic-batched-document-task-%d").build()); //NON-NLS + periodicTasksExecutor.scheduleWithFixedDelay(new SendBatchedDocumentsTask(), PERIODIC_BATCH_SEND_INTERVAL_MINUTES, PERIODIC_BATCH_SEND_INTERVAL_MINUTES, TimeUnit.MINUTES); } + + /** + * A task that periodically sends batched documents to Solr. Batched documents + * get sent automatically as soon as the batching buffer is gets full. However, + * if the buffer is not full, we want to periodically send the batched documents + * so that users are able to see them in their keyword searches. + */ + private final class SendBatchedDocumentsTask implements Runnable { + + @Override + public void run() { + List clone; + synchronized (bufferLock) { + + if (buffer.isEmpty()) { + return; + } + + // Buffer is full. Make a clone and release the lock, so that we don't + // hold other ingest threads + clone = buffer.stream().collect(toList()); + buffer.clear(); + } + + try { + // send the cloned list to Solr + sendBufferedDocs(clone); + } catch (KeywordSearchModuleException ex) { + logger.log(Level.SEVERE, "Periodic batched document update failed", ex); //NON-NLS + } + } + } /** * Get the name of the core @@ -1531,6 +1582,20 @@ public class Server { } private void commit() throws SolrServerException { + List clone; + synchronized (bufferLock) { + // Make a clone and release the lock, so that we don't + // hold other ingest threads + clone = buffer.stream().collect(toList()); + buffer.clear(); + } + + try { + sendBufferedDocs(clone); + } catch (KeywordSearchModuleException ex) { + throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex); + } + try { //commit and block solrCore.commit(true, true); @@ -1548,6 +1613,81 @@ public class Server { solrCore.deleteByQuery(deleteQuery); } + /** + * Add a Solr document for indexing. Documents get batched instead of + * being immediately sent to Solr (unless batch size = 1). + * + * @param doc Solr document to be indexed. + * + * @throws KeywordSearchModuleException + */ + void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException { + + List clone; + synchronized (bufferLock) { + buffer.add(doc); + // buffer documents if the buffer is not full + if (buffer.size() < maxBufferSize) { + return; + } + + // Buffer is full. Make a clone and release the lock, so that we don't + // hold other ingest threads + clone = buffer.stream().collect(toList()); + buffer.clear(); + } + + // send the cloned list to Solr + sendBufferedDocs(clone); + } + + /** + * Send a list of buffered documents to Solr. + * + * @param docBuffer List of buffered Solr documents + * + * @throws KeywordSearchModuleException + */ + private void sendBufferedDocs(List docBuffer) throws KeywordSearchModuleException { + + if (docBuffer.isEmpty()) { + return; + } + + try { + boolean success = true; + for (int reTryAttempt = 0; reTryAttempt < NUM_BATCH_UPDATE_RETRIES; reTryAttempt++) { + try { + success = true; + solrCore.add(docBuffer); + } catch (Exception ex) { + success = false; + if (reTryAttempt < NUM_BATCH_UPDATE_RETRIES - 1) { + logger.log(Level.WARNING, "Unable to send document batch to Solr. Re-trying...", ex); //NON-NLS + try { + Thread.sleep(SLEEP_BETWEEN_RETRIES_MS); + } catch (InterruptedException ex1) { + throw new KeywordSearchModuleException( + NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg"), ex1); //NON-NLS + } + } + } + if (success) { + if (reTryAttempt > 0) { + logger.log(Level.INFO, "Batch update suceeded after {0} re-try", reTryAttempt); //NON-NLS + } + return; + } + } + // if we are here, it means all re-try attempts failed + logger.log(Level.SEVERE, "Unable to send document batch to Solr. All re-try attempts failed!"); //NON-NLS + throw new KeywordSearchModuleException(NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg")); //NON-NLS + } finally { + docBuffer.clear(); + } + } + + /* ELTODO void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException { try { solrCore.add(doc); @@ -1557,7 +1697,7 @@ public class Server { throw new KeywordSearchModuleException( NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg", doc.getField("id")), ex); //NON-NLS } - } + }*/ /** * get the text from the content field for the given file diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/UserPreferences.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/UserPreferences.java new file mode 100755 index 0000000000..11f7654f8c --- /dev/null +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/UserPreferences.java @@ -0,0 +1,76 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2020 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.keywordsearch; + +import java.util.prefs.BackingStoreException; +import java.util.prefs.PreferenceChangeListener; +import java.util.prefs.Preferences; +import org.openide.util.NbPreferences; + +/** + * Provides convenient access to a Preferences node for user preferences with + * default values. + */ +final class UserPreferences { + + private static final Preferences preferences = NbPreferences.forModule(UserPreferences.class); + private static final String INDEXING_DOC_QUEUE_SIZE = "IndexingDocumentQueueSize"; //NON-NLS + private static final int DEFAULT_INDEXING_DOC_QUEUE_SIZE = 30; //NON-NLS + + // Prevent instantiation. + private UserPreferences() { + } + + /** + * Reload all preferences from disk. This is only needed if the preferences + * file is being directly modified on disk while Autopsy is running. + * + * @throws BackingStoreException + */ + public static void reloadFromStorage() throws BackingStoreException { + preferences.sync(); + } + + /** + * Saves the current preferences to storage. This is only needed if the + * preferences files are going to be copied to another location while + * Autopsy is running. + * + * @throws BackingStoreException + */ + public static void saveToStorage() throws BackingStoreException { + preferences.flush(); + } + + public static void addChangeListener(PreferenceChangeListener listener) { + preferences.addPreferenceChangeListener(listener); + } + + public static void removeChangeListener(PreferenceChangeListener listener) { + preferences.removePreferenceChangeListener(listener); + } + + public static void setDocumentsQueueSize(int size) { + preferences.putInt(INDEXING_DOC_QUEUE_SIZE, size); + } + + public static int getDocumentsQueueSize() { + return preferences.getInt(INDEXING_DOC_QUEUE_SIZE, DEFAULT_INDEXING_DOC_QUEUE_SIZE); + } +} \ No newline at end of file From b92c3d7b4d2765e0e8fdf3d7e130a1fa99e1ee21 Mon Sep 17 00:00:00 2001 From: Eugene Livis Date: Tue, 1 Sep 2020 10:37:23 -0400 Subject: [PATCH 2/2] Solr4 batching --- .../sleuthkit/autopsy/keywordsearch/Server.java | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java index b46a804baf..b87f6b9cae 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Server.java @@ -1685,19 +1685,7 @@ public class Server { } finally { docBuffer.clear(); } - } - - /* ELTODO - void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException { - try { - solrCore.add(doc); - } catch (Exception ex) { - // Solr throws a lot of unexpected exception types - logger.log(Level.SEVERE, "Could not add document to index via update handler: " + doc.getField("id"), ex); //NON-NLS - throw new KeywordSearchModuleException( - NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg", doc.getField("id")), ex); //NON-NLS - } - }*/ + } /** * get the text from the content field for the given file