Merge branch 'solr4_batching_6766' of github.com:eugene7646/autopsy into solr4_batching_6766

This commit is contained in:
Eugene Livis 2020-09-11 16:54:46 -04:00
commit f65bd5aa7e
2 changed files with 210 additions and 6 deletions

View File

@ -18,6 +18,7 @@
*/ */
package org.sleuthkit.autopsy.keywordsearch; package org.sleuthkit.autopsy.keywordsearch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.awt.event.ActionEvent; import java.awt.event.ActionEvent;
import java.beans.PropertyChangeListener; import java.beans.PropertyChangeListener;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -42,9 +43,11 @@ import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level; import java.util.logging.Level;
import static java.util.stream.Collectors.toList;
import javax.swing.AbstractAction; import javax.swing.AbstractAction;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.SolrRequest;
@ -1468,6 +1471,15 @@ public class Server {
// the server to access a core needs to be built from a URL with the // the server to access a core needs to be built from a URL with the
// core in it, and is only good for core-specific operations // core in it, and is only good for core-specific operations
private final HttpSolrServer solrCore; private final HttpSolrServer solrCore;
private final int maxBufferSize;
private final List<SolrInputDocument> buffer;
private final Object bufferLock;
private ScheduledThreadPoolExecutor periodicTasksExecutor = null;
private static final long PERIODIC_BATCH_SEND_INTERVAL_MINUTES = 10;
private static final int NUM_BATCH_UPDATE_RETRIES = 10;
private static final long SLEEP_BETWEEN_RETRIES_MS = 10000; // 10 seconds
private final int QUERY_TIMEOUT_MILLISECONDS = 86400000; // 24 Hours = 86,400,000 Milliseconds private final int QUERY_TIMEOUT_MILLISECONDS = 86400000; // 24 Hours = 86,400,000 Milliseconds
@ -1475,6 +1487,7 @@ public class Server {
this.name = name; this.name = name;
this.caseType = caseType; this.caseType = caseType;
this.textIndex = index; this.textIndex = index;
bufferLock = new Object();
this.solrCore = new HttpSolrServer(currentSolrServer.getBaseURL() + "/" + name); //NON-NLS this.solrCore = new HttpSolrServer(currentSolrServer.getBaseURL() + "/" + name); //NON-NLS
@ -1490,7 +1503,45 @@ public class Server {
solrCore.setAllowCompression(true); solrCore.setAllowCompression(true);
solrCore.setParser(new XMLResponseParser()); // binary parser is used by default solrCore.setParser(new XMLResponseParser()); // binary parser is used by default
// document batching
maxBufferSize = org.sleuthkit.autopsy.keywordsearch.UserPreferences.getDocumentsQueueSize();
logger.log(Level.INFO, "Using Solr document queue size = {0}", maxBufferSize); //NON-NLS
buffer = new ArrayList<>(maxBufferSize);
periodicTasksExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("periodic-batched-document-task-%d").build()); //NON-NLS
periodicTasksExecutor.scheduleWithFixedDelay(new SendBatchedDocumentsTask(), PERIODIC_BATCH_SEND_INTERVAL_MINUTES, PERIODIC_BATCH_SEND_INTERVAL_MINUTES, TimeUnit.MINUTES);
} }
/**
* A task that periodically sends batched documents to Solr. Batched documents
* get sent automatically as soon as the batching buffer is gets full. However,
* if the buffer is not full, we want to periodically send the batched documents
* so that users are able to see them in their keyword searches.
*/
private final class SendBatchedDocumentsTask implements Runnable {
@Override
public void run() {
List<SolrInputDocument> clone;
synchronized (bufferLock) {
if (buffer.isEmpty()) {
return;
}
// Buffer is full. Make a clone and release the lock, so that we don't
// hold other ingest threads
clone = buffer.stream().collect(toList());
buffer.clear();
}
try {
// send the cloned list to Solr
sendBufferedDocs(clone);
} catch (KeywordSearchModuleException ex) {
logger.log(Level.SEVERE, "Periodic batched document update failed", ex); //NON-NLS
}
}
}
/** /**
* Get the name of the core * Get the name of the core
@ -1531,6 +1582,20 @@ public class Server {
} }
private void commit() throws SolrServerException { private void commit() throws SolrServerException {
List<SolrInputDocument> clone;
synchronized (bufferLock) {
// Make a clone and release the lock, so that we don't
// hold other ingest threads
clone = buffer.stream().collect(toList());
buffer.clear();
}
try {
sendBufferedDocs(clone);
} catch (KeywordSearchModuleException ex) {
throw new SolrServerException(NbBundle.getMessage(this.getClass(), "Server.commit.exception.msg"), ex);
}
try { try {
//commit and block //commit and block
solrCore.commit(true, true); solrCore.commit(true, true);
@ -1548,14 +1613,77 @@ public class Server {
solrCore.deleteByQuery(deleteQuery); solrCore.deleteByQuery(deleteQuery);
} }
/**
* Add a Solr document for indexing. Documents get batched instead of
* being immediately sent to Solr (unless batch size = 1).
*
* @param doc Solr document to be indexed.
*
* @throws KeywordSearchModuleException
*/
void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException { void addDocument(SolrInputDocument doc) throws KeywordSearchModuleException {
List<SolrInputDocument> clone;
synchronized (bufferLock) {
buffer.add(doc);
// buffer documents if the buffer is not full
if (buffer.size() < maxBufferSize) {
return;
}
// Buffer is full. Make a clone and release the lock, so that we don't
// hold other ingest threads
clone = buffer.stream().collect(toList());
buffer.clear();
}
// send the cloned list to Solr
sendBufferedDocs(clone);
}
/**
* Send a list of buffered documents to Solr.
*
* @param docBuffer List of buffered Solr documents
*
* @throws KeywordSearchModuleException
*/
private void sendBufferedDocs(List<SolrInputDocument> docBuffer) throws KeywordSearchModuleException {
if (docBuffer.isEmpty()) {
return;
}
try { try {
solrCore.add(doc); boolean success = true;
} catch (Exception ex) { for (int reTryAttempt = 0; reTryAttempt < NUM_BATCH_UPDATE_RETRIES; reTryAttempt++) {
// Solr throws a lot of unexpected exception types try {
logger.log(Level.SEVERE, "Could not add document to index via update handler: " + doc.getField("id"), ex); //NON-NLS success = true;
throw new KeywordSearchModuleException( solrCore.add(docBuffer);
NbBundle.getMessage(this.getClass(), "Server.addDoc.exception.msg", doc.getField("id")), ex); //NON-NLS } catch (Exception ex) {
success = false;
if (reTryAttempt < NUM_BATCH_UPDATE_RETRIES - 1) {
logger.log(Level.WARNING, "Unable to send document batch to Solr. Re-trying...", ex); //NON-NLS
try {
Thread.sleep(SLEEP_BETWEEN_RETRIES_MS);
} catch (InterruptedException ex1) {
throw new KeywordSearchModuleException(
NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg"), ex1); //NON-NLS
}
}
}
if (success) {
if (reTryAttempt > 0) {
logger.log(Level.INFO, "Batch update suceeded after {0} re-try", reTryAttempt); //NON-NLS
}
return;
}
}
// if we are here, it means all re-try attempts failed
logger.log(Level.SEVERE, "Unable to send document batch to Solr. All re-try attempts failed!"); //NON-NLS
throw new KeywordSearchModuleException(NbBundle.getMessage(this.getClass(), "Server.addDocBatch.exception.msg")); //NON-NLS
} finally {
docBuffer.clear();
} }
} }

View File

@ -0,0 +1,76 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2020 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.keywordsearch;
import java.util.prefs.BackingStoreException;
import java.util.prefs.PreferenceChangeListener;
import java.util.prefs.Preferences;
import org.openide.util.NbPreferences;
/**
* Provides convenient access to a Preferences node for user preferences with
* default values.
*/
final class UserPreferences {
private static final Preferences preferences = NbPreferences.forModule(UserPreferences.class);
private static final String INDEXING_DOC_QUEUE_SIZE = "IndexingDocumentQueueSize"; //NON-NLS
private static final int DEFAULT_INDEXING_DOC_QUEUE_SIZE = 30; //NON-NLS
// Prevent instantiation.
private UserPreferences() {
}
/**
* Reload all preferences from disk. This is only needed if the preferences
* file is being directly modified on disk while Autopsy is running.
*
* @throws BackingStoreException
*/
public static void reloadFromStorage() throws BackingStoreException {
preferences.sync();
}
/**
* Saves the current preferences to storage. This is only needed if the
* preferences files are going to be copied to another location while
* Autopsy is running.
*
* @throws BackingStoreException
*/
public static void saveToStorage() throws BackingStoreException {
preferences.flush();
}
public static void addChangeListener(PreferenceChangeListener listener) {
preferences.addPreferenceChangeListener(listener);
}
public static void removeChangeListener(PreferenceChangeListener listener) {
preferences.removePreferenceChangeListener(listener);
}
public static void setDocumentsQueueSize(int size) {
preferences.putInt(INDEXING_DOC_QUEUE_SIZE, size);
}
public static int getDocumentsQueueSize() {
return preferences.getInt(INDEXING_DOC_QUEUE_SIZE, DEFAULT_INDEXING_DOC_QUEUE_SIZE);
}
}