improve locking

This commit is contained in:
Samuel H. Kenyon 2014-03-31 17:52:19 -04:00
parent 96404a16f4
commit c4cf08fb42

View File

@ -32,6 +32,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker; import javax.swing.SwingWorker;
import javax.swing.Timer; import javax.swing.Timer;
import org.netbeans.api.progress.aggregate.AggregateProgressFactory; import org.netbeans.api.progress.aggregate.AggregateProgressFactory;
@ -50,7 +51,7 @@ import org.sleuthkit.datamodel.BlackboardArtifact;
import org.sleuthkit.datamodel.BlackboardAttribute; import org.sleuthkit.datamodel.BlackboardAttribute;
/** /**
* Singleton keyword search manager * Singleton keyword search manager:
* Launches search threads for each job and performs commits, both on timed * Launches search threads for each job and performs commits, both on timed
* intervals. * intervals.
*/ */
@ -59,11 +60,12 @@ public final class SearchRunner {
private AtomicInteger messageID = new AtomicInteger(0); private AtomicInteger messageID = new AtomicInteger(0);
private static SearchRunner instance = null; private static SearchRunner instance = null;
private IngestServices services = IngestServices.getInstance(); private IngestServices services = IngestServices.getInstance();
private Ingester ingester = null; private Ingester ingester = null; //guarded by "this"
private boolean initialized = false; private boolean initialized = false;
private Map<Long, SearchJobInfo> jobs = new HashMap<>();
private Timer updateTimer; private Timer updateTimer;
private Map<Keyword, List<Long>> currentResults; private Map<Keyword, List<Long>> currentResults;
private Map<Long, SearchJobInfo> jobs = new HashMap<>(); //guarded by "this"
SearchRunner() { SearchRunner() {
ingester = Server.getIngester(); ingester = Server.getIngester();
@ -74,6 +76,10 @@ public final class SearchRunner {
initialized = true; initialized = true;
} }
/**
*
* @return the singleton object
*/
public static synchronized SearchRunner getInstance() { public static synchronized SearchRunner getInstance() {
if (instance == null) { if (instance == null) {
instance = new SearchRunner(); instance = new SearchRunner();
@ -81,6 +87,12 @@ public final class SearchRunner {
return instance; return instance;
} }
/**
*
* @param jobId
* @param dataSourceId
* @param keywordListNames
*/
public synchronized void startJob(long jobId, long dataSourceId, List<String> keywordListNames) { public synchronized void startJob(long jobId, long dataSourceId, List<String> keywordListNames) {
if (!jobs.containsKey(jobId)) { if (!jobs.containsKey(jobId)) {
SearchJobInfo jobData = new SearchJobInfo(jobId, dataSourceId, keywordListNames); SearchJobInfo jobData = new SearchJobInfo(jobId, dataSourceId, keywordListNames);
@ -94,6 +106,10 @@ public final class SearchRunner {
} }
} }
/**
*
* @param jobId
*/
public void endJob(long jobId) { public void endJob(long jobId) {
SearchJobInfo copy; SearchJobInfo copy;
synchronized(this) { synchronized(this) {
@ -104,6 +120,8 @@ public final class SearchRunner {
copy = new SearchJobInfo(job); copy = new SearchJobInfo(job);
jobs.remove(jobId); jobs.remove(jobId);
} }
commit();
doFinalSearch(copy); doFinalSearch(copy);
} }
@ -113,10 +131,12 @@ public final class SearchRunner {
private void commit() { private void commit() {
if (initialized) { if (initialized) {
logger.log(Level.INFO, "Commiting index"); logger.log(Level.INFO, "Commiting index");
ingester.commit(); synchronized(this) {
ingester.commit();
}
logger.log(Level.INFO, "Index comitted"); logger.log(Level.INFO, "Index comitted");
//signal a potential change in number of text_ingested files // Signal a potential change in number of text_ingested files
try { try {
final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles(); final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles();
KeywordSearch.fireNumIndexedFilesChange(null, new Integer(numIndexedFiles)); KeywordSearch.fireNumIndexedFilesChange(null, new Integer(numIndexedFiles));
@ -126,17 +146,19 @@ public final class SearchRunner {
} }
} }
/**
* Passes arg directly into a Searcher, so providing a copy is a good idea.
* @param job
*/
private void doFinalSearch(SearchJobInfo job) { private void doFinalSearch(SearchJobInfo job) {
//cancel searcher timer, ensure unwanted searcher does not start // Cancel timer to ensure unwanted searchers do not start before we
//before we start the final one // start the final one
if (updateTimer.isRunning()) { if (updateTimer.isRunning()) {
updateTimer.stop(); updateTimer.stop();
} }
logger.log(Level.INFO, "Running final index commit and search for jobid {0}", job.getJobId()); // Run one last search as there are probably some new files committed
commit(); logger.log(Level.INFO, "Running final search for jobid {0}", job.getJobId());
//run one last search as there are probably some new files committed
if (!job.getKeywordListNames().isEmpty()) { if (!job.getKeywordListNames().isEmpty()) {
SearchRunner.Searcher finalSearcher = new SearchRunner.Searcher(job, true); SearchRunner.Searcher finalSearcher = new SearchRunner.Searcher(job, true);
finalSearcher.execute(); finalSearcher.execute();
@ -145,23 +167,24 @@ public final class SearchRunner {
/** /**
* Timer triggered re-search for each job * Timer triggered re-search for each job (does a single index commit first)
* (does a single index commit first)
*/ */
private class UpdateTimerAction implements ActionListener { private class UpdateTimerAction implements ActionListener {
private final Logger logger = Logger.getLogger(SearchRunner.UpdateTimerAction.class.getName()); private final Logger logger = Logger.getLogger(SearchRunner.UpdateTimerAction.class.getName());
@Override @Override
public void actionPerformed(ActionEvent e) { public void actionPerformed(ActionEvent e) {
logger.log(Level.INFO, "Commiting index");
commit(); commit();
// Spawn a search thread for each job synchronized(SearchRunner.this) {
///@todo Don't spawn a new thread if a job still has the previous one running // Spawn a search thread for each job
logger.log(Level.INFO, "Launching searchers"); ///@todo Don't spawn a new thread if a job still has the previous one running
for(Entry<Long, SearchJobInfo> j : jobs.entrySet()) { logger.log(Level.INFO, "Launching searchers");
SearchJobInfo copy = new SearchJobInfo(j.getValue()); for(Entry<Long, SearchJobInfo> j : jobs.entrySet()) {
Searcher s = new Searcher(copy, true); SearchJobInfo copy = new SearchJobInfo(j.getValue());
Searcher s = new Searcher(copy, true);
s.execute();
}
} }
} }
} }
@ -539,12 +562,12 @@ public final class SearchRunner {
*/ */
private void finalizeSearcher() { private void finalizeSearcher() {
logger.log(Level.INFO, "Searcher finalizing"); logger.log(Level.INFO, "Searcher finalizing");
// SwingUtilities.invokeLater(new Runnable() { SwingUtilities.invokeLater(new Runnable() {
// @Override @Override
// public void run() { public void run() {
// progressGroup.finish(); progressGroup.finish();
// } }
// }); });
} }
//calculate new results but substracting results already obtained in this ingest //calculate new results but substracting results already obtained in this ingest