First cut at implementing the search back off

This commit is contained in:
Eugene Livis 2017-12-06 17:27:02 -05:00
parent b58c91b8bf
commit 2c7f929abd

View File

@ -18,6 +18,7 @@
*/
package org.sleuthkit.autopsy.keywordsearch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -30,6 +31,9 @@ import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import javax.swing.SwingUtilities;
@ -38,6 +42,7 @@ import org.netbeans.api.progress.aggregate.AggregateProgressFactory;
import org.netbeans.api.progress.aggregate.AggregateProgressHandle;
import org.netbeans.api.progress.aggregate.ProgressContributor;
import org.openide.util.Cancellable;
import org.openide.util.Exceptions;
import org.openide.util.NbBundle;
import org.openide.util.NbBundle.Messages;
import org.sleuthkit.autopsy.coreutils.Logger;
@ -58,14 +63,17 @@ final class SearchRunner {
private IngestServices services = IngestServices.getInstance();
private Ingester ingester = null;
private volatile boolean updateTimerRunning = false;
private Timer updateTimer;
private Future<?> jobProcessingTaskFuture;
private final ScheduledThreadPoolExecutor jobProcessingExecutor;
private static final int NUM_SEARCH_SCHEDULING_THREADS = 1;
private static final String SEARCH_SCHEDULER_THREAD_NAME = "periodic-search-scheduler-%d";
// maps a jobID to the search
private Map<Long, SearchJobInfo> jobs = new HashMap<>(); //guarded by "this"
SearchRunner() {
ingester = Ingester.getDefault();
updateTimer = new Timer(NbBundle.getMessage(this.getClass(), "SearchRunner.updateTimer.title.text"), true); // run as a daemon
jobProcessingExecutor = new ScheduledThreadPoolExecutor(NUM_SEARCH_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(SEARCH_SCHEDULER_THREAD_NAME).build());
}
/**
@ -98,7 +106,7 @@ final class SearchRunner {
// start the timer, if needed
if ((jobs.size() > 0) && (updateTimerRunning == false)) {
final long updateIntervalMs = ((long) KeywordSearchSettings.getUpdateFrequency().getTime()) * 60 * 1000;
updateTimer.scheduleAtFixedRate(new UpdateTimerTask(), updateIntervalMs, updateIntervalMs);
jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), updateIntervalMs, MILLISECONDS);
updateTimerRunning = true;
}
}
@ -129,6 +137,11 @@ final class SearchRunner {
logger.log(Level.INFO, "Commiting search index before final search for search job {0}", job.getJobId()); //NON-NLS
commit();
doFinalSearch(job); //this will block until it's done
// we are done with all the searches. stop the PeriodicSearchTask.
// A new one will be created for future jobs.
updateTimerRunning = false;
jobProcessingTaskFuture.cancel(true);
}
}
@ -156,6 +169,13 @@ final class SearchRunner {
}
jobs.remove(jobId);
if (jobs.isEmpty()) {
// no more jobs left. stop the PeriodicSearchTask.
// A new one will be created for future jobs.
updateTimerRunning = false;
jobProcessingTaskFuture.cancel(true);
}
}
}
@ -223,23 +243,24 @@ final class SearchRunner {
}
/**
* Timer triggered re-search for each job (does a single index commit first)
* Task triggered re-search for each job (does a single index commit first)
*/
private class UpdateTimerTask extends TimerTask {
private final class PeriodicSearchTask implements Runnable {
private final Logger logger = Logger.getLogger(SearchRunner.UpdateTimerTask.class.getName());
private final Logger logger = Logger.getLogger(SearchRunner.PeriodicSearchTask.class.getName());
@Override
public void run() {
// If no jobs then cancel the task. If more job(s) come along, a new task will start up.
if (jobs.isEmpty()) {
this.cancel(); //terminate this timer task
if (jobs.isEmpty() || jobProcessingTaskFuture.isCancelled()) {
updateTimerRunning = false;
return;
}
commit();
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
synchronized (SearchRunner.this) {
// Spawn a search thread for each job
for (Entry<Long, SearchJobInfo> j : jobs.entrySet()) {
@ -247,13 +268,40 @@ final class SearchRunner {
// If no lists or the worker is already running then skip it
if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) {
logger.log(Level.INFO, "Executing periodic search for search job {0}", job.getJobId());
Searcher searcher = new Searcher(job);
Searcher searcher = new Searcher(job); // SwingWorker
job.setCurrentSearcher(searcher); //save the ref
searcher.execute(); //start thread
job.setWorkerRunning(true);
try {
// wait for the searcher to finish
searcher.get();
} catch (InterruptedException | ExecutionException ex) {
logger.log(Level.SEVERE, "Error performing keyword search: {0}", ex.getMessage()); //NON-NLS
services.postMessage(IngestMessage.createErrorMessage(KeywordSearchModuleFactory.getModuleName(),
NbBundle.getMessage(this.getClass(),
"SearchRunner.Searcher.done.err.msg"), ex.getMessage()));
}// catch and ignore if we were cancelled
catch (java.util.concurrent.CancellationException ex) {
}
}
}
}
stopWatch.stop();
// calculate "hold off" time
final long timeToTextSleepMs = getTimeToNextSleep(stopWatch.getElapsedTimeSecs());
// schedule next PeriodicSearchTask
jobProcessingTaskFuture = jobProcessingExecutor.schedule(new PeriodicSearchTask(), timeToTextSleepMs, MILLISECONDS);
// exit this thread
return;
}
private long getTimeToNextSleep(long lastSerchTimeMs) {
return lastSerchTimeMs;
}
}
@ -522,23 +570,6 @@ final class SearchRunner {
return null;
}
@Override
protected void done() {
// call get to see if there were any errors
try {
logger.log(Level.INFO, "Searcher calling get() on itself in done()"); //NON-NLS
get();
logger.log(Level.INFO, "Searcher finished calling get() on itself in done()"); //NON-NLS
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.SEVERE, "Error performing keyword search: " + e.getMessage()); //NON-NLS
services.postMessage(IngestMessage.createErrorMessage(KeywordSearchModuleFactory.getModuleName(),
NbBundle.getMessage(this.getClass(),
"SearchRunner.Searcher.done.err.msg"), e.getMessage()));
} // catch and ignore if we were cancelled
catch (java.util.concurrent.CancellationException ex) {
}
}
/**
* Sync-up the updated keywords from the currently used lists in the XML
*/