Latest compilable experimental SearchRunner.

This commit is contained in:
Samuel H. Kenyon 2014-03-31 16:24:56 -04:00
parent a0372e9854
commit 6a78944e61
2 changed files with 132 additions and 57 deletions

View File

@ -119,7 +119,8 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
private final KeywordSearchJobSettings settings;
private boolean initialized = false;
private Tika tikaFormatDetector;
private long jobId = 0; ///@todo where does jobID come from?
private enum IngestStatus {
TEXT_INGESTED, /// Text was extracted by knowing file type and text_ingested
@ -377,18 +378,18 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
initialized = false;
}
// /**
// * Commits index and notifies listeners of index update
// */
// private void commit() {
// if (initialized) {
// logger.log(Level.INFO, "Commiting index");
// ingester.commit();
// logger.log(Level.INFO, "Index comitted");
// //signal a potential change in number of text_ingested files
// indexChangeNotify();
// }
// }
/**
* Commits index and notifies listeners of index update
*/
private void commit() {
if (initialized) {
logger.log(Level.INFO, "Commiting index");
ingester.commit();
logger.log(Level.INFO, "Index comitted");
//signal a potential change in number of text_ingested files
indexChangeNotify();
}
}
/**
* Posts inbox message with summary of text_ingested files

View File

@ -20,17 +20,18 @@
package org.sleuthkit.autopsy.keywordsearch;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;
import javax.swing.Timer;
import org.netbeans.api.progress.aggregate.AggregateProgressFactory;
@ -50,20 +51,26 @@ import org.sleuthkit.datamodel.BlackboardAttribute;
/**
* Singleton keyword search manager
* Launches search threads for each job and performs commits,
* both on timed intervals.
* Launches search threads for each job and performs commits, both on timed
* intervals.
*/
public final class SearchRunner {
private static final Logger logger = Logger.getLogger(SearchRunner.class.getName());
private AtomicInteger messageID = new AtomicInteger(0);
private static SearchRunner instance = null;
private IngestServices services = IngestServices.getInstance();
private Ingester ingester = null;
private boolean initialized = false;
private Map<Long, SearchJobInfo> jobs = new HashMap<>();
private Timer searchTimer;
private Timer updateTimer;
private Map<Keyword, List<Long>> currentResults;
SearchRunner() {
ingester = Server.getIngester();
ingester = Server.getIngester();
final int updateIntervalMs = KeywordSearchSettings.getUpdateFrequency().getTime() * 60 * 1000;
updateTimer = new Timer(updateIntervalMs, new SearchRunner.UpdateTimerAction());
initialized = true;
}
@ -77,22 +84,29 @@ public final class SearchRunner {
public synchronized void startJob(long jobId, long dataSourceId, List<String> keywordListNames) {
if (!jobs.containsKey(jobId)) {
SearchJobInfo jobData = new SearchJobInfo(jobId, dataSourceId, keywordListNames);
jobs.put(jobId, jobData);
jobs.put(jobId, jobData);
}
if (jobs.size() > 0) {
if (!updateTimer.isRunning()) {
updateTimer.start();
}
}
}
public void endJob(long jobId) {
SearchJobInfo job;
SearchJobInfo copy;
synchronized(this) {
job = jobs.get(jobId);
SearchJobInfo job = jobs.get(jobId);
if (job == null) {
return;
}
copy = new SearchJobInfo(job);
jobs.remove(jobId);
}
doFinalSearch(job);
doFinalSearch(copy);
}
/**
* Commits index and notifies listeners of index update
*/
@ -115,29 +129,64 @@ public final class SearchRunner {
private void doFinalSearch(SearchJobInfo job) {
//cancel searcher timer, ensure unwanted searcher does not start
//before we start the final one
if (searchTimer.isRunning()) {
searchTimer.stop();
if (updateTimer.isRunning()) {
updateTimer.stop();
}
logger.log(Level.INFO, "Running final index commit and search for jobid {0}", jobId);
logger.log(Level.INFO, "Running final index commit and search for jobid {0}", job.getJobId());
commit();
//run one last search as there are probably some new files committed
if (!job.getKeywordListNames().isEmpty()) {
SearchRunner.Searcher finalSearcher = new SearchRunner.Searcher(job.getKeywordListNames(), true);
SearchRunner.Searcher finalSearcher = new SearchRunner.Searcher(job, true);
finalSearcher.execute();
}
}
/**
* Timer triggered re-search for each job
* (does a single index commit first)
*/
private class UpdateTimerAction implements ActionListener {
private final Logger logger = Logger.getLogger(SearchRunner.UpdateTimerAction.class.getName());
@Override
public void actionPerformed(ActionEvent e) {
logger.log(Level.INFO, "Commiting index");
commit();
// Spawn a search thread for each job
///@todo Don't spawn a new thread if a job still has the previous one running
logger.log(Level.INFO, "Launching searchers");
for(Entry<Long, SearchJobInfo> j : jobs.entrySet()) {
SearchJobInfo copy = new SearchJobInfo(j.getValue());
Searcher s = new Searcher(copy, true);
}
}
}
/**
* Simple data structure so we can keep track of keyword lists and data sources for each jobid
* Provides a copy constructor for defensive copying
*/
private class SearchJobInfo {
private long jobId;
private long dataSourceId;
private List<String> keywordListNames = new ArrayList<>();
private List<String> keywordListNames;
public SearchJobInfo(long jobId, long dataSourceId, List<String> keywordListNames) {
this.jobId = jobId;
this.dataSourceId = dataSourceId;
this.keywordListNames = keywordListNames;
this.keywordListNames = new ArrayList<>(keywordListNames);
}
public SearchJobInfo(SearchJobInfo src) {
this(src.jobId, src.dataSourceId, src.keywordListNames);
}
long getJobId() {
return jobId;
}
long getDataSourceId() {
@ -148,10 +197,6 @@ public final class SearchRunner {
return keywordListNames;
}
}
private synchronized void setJobSearcherDone(long jobId, boolean flag) {
}
/**
* Searcher responsible for searching the current index and writing results
@ -164,24 +209,24 @@ public final class SearchRunner {
/**
* Searcher has private copies/snapshots of the lists and keywords
*/
private SearchJobInfo job;
private List<Keyword> keywords; //keywords to search
private List<String> keywordLists; // lists currently being searched
private Map<String, KeywordList> keywordToList; //keyword to list name mapping
private AggregateProgressHandle progressGroup;
private final Logger logger = Logger.getLogger(SearchRunner.Searcher.class.getName());
private boolean finalRun = false;
private long jobId;
Searcher(List<String> keywordLists, long jobId) {
this.keywordLists = new ArrayList<>(keywordLists);
this.jobId = jobId;
Searcher(SearchJobInfo job) {
this.job = job;
this.keywordLists = job.getKeywordListNames();
keywords = new ArrayList<>();
keywordToList = new HashMap<>();
//keywords are populated as searcher runs
}
Searcher(List<String> keywordLists, long jobId, boolean finalRun) {
this(keywordLists, jobId);
Searcher(SearchJobInfo job, boolean finalRun) {
this(job);
this.finalRun = finalRun;
}
@ -228,18 +273,9 @@ public final class SearchRunner {
try {
logger.log(Level.INFO, "Started a new searcher");
progressGroup.setDisplayName(displayName);
//make sure other searchers are not spawned
setJobSearcherDone(jobId, false);
//searcherDone = false;
//runSearcher = false;
if (searchTimer.isRunning()) {
searchTimer.stop();
}
int keywordsSearched = 0;
//updateKeywords();
for (Keyword keywordQuery : keywords) {
if (this.isCancelled()) {
logger.log(Level.INFO, "Cancel detected, bailing before new keyword processed: {0}", keywordQuery.getQuery());
@ -269,7 +305,7 @@ public final class SearchRunner {
//limit search to currently ingested data sources
//set up a filter with 1 or more image ids OR'ed
final KeywordQueryFilter dataSourceFilter = new KeywordQueryFilter(KeywordQueryFilter.FilterType.DATA_SOURCE, curDataSourceIds);
final KeywordQueryFilter dataSourceFilter = new KeywordQueryFilter(KeywordQueryFilter.FilterType.DATA_SOURCE, job.getDataSourceId());
del.addFilter(dataSourceFilter);
Map<String, List<ContentHit>> queryResult;
@ -290,7 +326,7 @@ public final class SearchRunner {
continue;
}
// calculate new results but substracting results already obtained in this ingest
// calculate new results by substracting results already obtained in this ingest
// this creates a map of each keyword to the list of unique files that have that hit.
Map<Keyword, List<ContentHit>> newResults = filterResults(queryResult, isRegex);
@ -425,7 +461,7 @@ public final class SearchRunner {
}
detailsSb.append("</table>");
services.postMessage(IngestMessage.createDataMessage(++messageID, KeywordSearchModuleFactory.getModuleName(), subjectSb.toString(), detailsSb.toString(), uniqueKey, written.getArtifact()));
services.postMessage(IngestMessage.createDataMessage(messageID.incrementAndGet(), KeywordSearchModuleFactory.getModuleName(), subjectSb.toString(), detailsSb.toString(), uniqueKey, written.getArtifact()));
}
} //for each file hit
@ -469,7 +505,7 @@ public final class SearchRunner {
get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.SEVERE, "Error performing keyword search: " + e.getMessage());
services.postMessage(IngestMessage.createErrorMessage(++messageID, KeywordSearchModuleFactory.getModuleName(), "Error performing keyword search", e.getMessage()));
services.postMessage(IngestMessage.createErrorMessage(messageID.incrementAndGet(), KeywordSearchModuleFactory.getModuleName(), "Error performing keyword search", e.getMessage()));
} // catch and ignore if we were cancelled
catch (java.util.concurrent.CancellationException ex) {
}
@ -509,10 +545,48 @@ public final class SearchRunner {
// progressGroup.finish();
// }
// });
setJobSearcherDone(jobId, true);
}
//calculate new results but substracting results already obtained in this ingest
//update currentResults map with the new results
private Map<Keyword, List<ContentHit>> filterResults(Map<String, List<ContentHit>> queryResult, boolean isRegex) {
Map<Keyword, List<ContentHit>> newResults = new HashMap<>();
for (String termResult : queryResult.keySet()) {
List<ContentHit> queryTermResults = queryResult.get(termResult);
//translate to list of IDs that we keep track of
List<Long> queryTermResultsIDs = new ArrayList<>();
for (ContentHit ch : queryTermResults) {
queryTermResultsIDs.add(ch.getId());
}
Keyword termResultK = new Keyword(termResult, !isRegex);
List<Long> curTermResults = currentResults.get(termResultK);
if (curTermResults == null) {
currentResults.put(termResultK, queryTermResultsIDs);
newResults.put(termResultK, queryTermResults);
} else {
//some AbstractFile hits already exist for this keyword
for (ContentHit res : queryTermResults) {
if (!curTermResults.contains(res.getId())) {
//add to new results
List<ContentHit> newResultsFs = newResults.get(termResultK);
if (newResultsFs == null) {
newResultsFs = new ArrayList<>();
newResults.put(termResultK, newResultsFs);
}
newResultsFs.add(res);
curTermResults.add(res.getId());
}
}
}
}
return newResults;
}
}
}