Remove stuff from KeywordSearchIngestModule that is now in SearchRunner, and add calls to SearchRunner.

This commit is contained in:
Samuel H. Kenyon 2014-04-01 18:21:00 -04:00
parent 3ebe81c6fa
commit bafbd19ade
2 changed files with 34 additions and 576 deletions

View File

@ -98,20 +98,9 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
private static final Logger logger = Logger.getLogger(KeywordSearchIngestModule.class.getName());
private IngestServices services = IngestServices.getInstance();
private Ingester ingester = null;
private volatile boolean commitIndex = false; //whether to commit index next time
private volatile boolean runSearcher = false; //whether to run searcher next time
private Timer commitTimer;
private Timer searchTimer;
private Indexer indexer;
private Searcher currentSearcher;
private Searcher finalSearcher;
private volatile boolean searcherDone = true; //mark as done, until it's inited
private Map<Keyword, List<Long>> currentResults;
//only search images from current ingest, not images previously ingested/indexed
//accessed read-only by searcher thread
private Set<Long> curDataSourceIds;
private static final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true); //use fairness policy
private static final Lock searcherLock = rwLock.writeLock();
private AtomicInteger messageID = new AtomicInteger(0);
private boolean processedFiles;
private SleuthkitCase caseHandle = null;
@ -120,8 +109,9 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
private final KeywordSearchJobSettings settings;
private boolean initialized = false;
private Tika tikaFormatDetector;
private long jobId = 0; ///@todo where does jobID come from?
private long jobId = 0; ///@todo get from IngestJobContext
private long dataSourceId;
private enum IngestStatus {
TEXT_INGESTED, /// Text was extracted by knowing file type and text_ingested
@ -213,25 +203,14 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
}
processedFiles = false;
searcherDone = true; //make sure to start the initial currentSearcher
//keeps track of all results per run not to repeat reporting the same hits
currentResults = new HashMap<>();
curDataSourceIds = new HashSet<>();
indexer = new Indexer();
final int updateIntervalMs = KeywordSearchSettings.getUpdateFrequency().getTime() * 60 * 1000;
logger.log(Level.INFO, "Using commit interval (ms): {0}", updateIntervalMs);
logger.log(Level.INFO, "Using searcher interval (ms): {0}", updateIntervalMs);
commitTimer = new Timer(updateIntervalMs, new CommitTimerAction());
searchTimer = new Timer(updateIntervalMs, new SearchTimerAction());
initialized = true;
commitTimer.start();
searchTimer.start();
}
@Override
@ -245,8 +224,8 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
}
try {
//add data source id of the file to the set, keeping track of images being ingested
final long fileSourceId = caseHandle.getFileDataSource(abstractFile);
curDataSourceIds.add(fileSourceId);
///@todo this should come from IngestJobContext
dataSourceId = caseHandle.getFileDataSource(abstractFile);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error getting image id of file processed by keyword search: " + abstractFile.getName(), ex);
@ -265,12 +244,13 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
processedFiles = true;
//check if it's time to commit after previous processing
checkRunCommitSearch();
//index the file and content (if the content is supported)
indexer.indexFile(abstractFile, true);
// Start searching if it hasn't started already
List<String> keywordListNames = settings.getNamesOfEnabledKeyWordLists();
SearchRunner.getInstance().startJob(jobId, dataSourceId, keywordListNames);
return ProcessResult.OK;
}
@ -288,34 +268,12 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
stop();
return;
}
//
// commitTimer.stop();
//
// //NOTE, we let the 1 before last searcher complete fully, and enqueue the last one
//
// //cancel searcher timer, ensure unwanted searcher does not start
// //before we start the final one
// if (searchTimer.isRunning()) {
// searchTimer.stop();
// }
// runSearcher = false;
//
// logger.log(Level.INFO, "Running final index commit and search");
// //final commit
// commit();
// Remove from the search list and trigger final commit and final search
SearchRunner.getInstance().endJob(jobId);
postIndexSummary();
//run one last search as there are probably some new files committed
// List<String> keywordLists = settings.getNamesOfEnabledKeyWordLists();
// if (!keywordLists.isEmpty() && processedFiles == true) {
// finalSearcher = new Searcher(keywordLists, true); //final searcher run
// finalSearcher.execute();
// }
//log number of files / chunks in index
//signal a potential change in number of text_ingested files
try {
@ -326,8 +284,6 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
} catch (NoOpenCoreException | KeywordSearchModuleException ex) {
logger.log(Level.WARNING, "Error executing Solr query to check number of indexed files/chunks: ", ex);
}
//cleanup done in final searcher
}
/**
@ -336,22 +292,8 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
private void stop() {
logger.log(Level.INFO, "stop()");
//stop timer
commitTimer.stop();
//stop currentSearcher
if (currentSearcher != null) {
currentSearcher.cancel(true);
}
//cancel searcher timer, ensure unwanted searcher does not start
if (searchTimer.isRunning()) {
searchTimer.stop();
}
runSearcher = false;
//commit uncommited files, don't search again
commit();
SearchRunner.getInstance().stopJob(jobId);
cleanup();
}
@ -360,15 +302,6 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
*/
private void cleanup() {
ingestStatus.clear();
currentResults.clear();
curDataSourceIds.clear();
currentSearcher = null;
//finalSearcher = null; //do not collect, might be finalizing
commitTimer.stop();
searchTimer.stop();
commitTimer = null;
//searchTimer = null; // do not collect, final searcher might still be running, in which case it throws an exception
textExtractors.clear();
textExtractors = null;
@ -379,19 +312,6 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
initialized = false;
}
/**
* Commits index and notifies listeners of index update
*/
private void commit() {
if (initialized) {
logger.log(Level.INFO, "Commiting index");
ingester.commit();
logger.log(Level.INFO, "Index comitted");
//signal a potential change in number of text_ingested files
indexChangeNotify();
}
}
/**
* Posts inbox message with summary of text_ingested files
*/
@ -423,7 +343,7 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
error_io++;
break;
default:
;
;
}
}
@ -447,73 +367,6 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
}
}
/**
* Helper method to notify listeners on index update
*/
private void indexChangeNotify() {
//signal a potential change in number of text_ingested files
try {
final int numIndexedFiles = KeywordSearch.getServer().queryNumIndexedFiles();
KeywordSearch.fireNumIndexedFilesChange(null, new Integer(numIndexedFiles));
} catch (NoOpenCoreException | KeywordSearchModuleException ex) {
logger.log(Level.WARNING, "Error executing Solr query to check number of indexed files: ", ex);
}
}
/**
* Check if time to commit, if so, run commit. Then run search if search
* timer is also set.
*/
void checkRunCommitSearch() {
if (commitIndex) {
logger.log(Level.INFO, "Commiting index");
commit();
commitIndex = false;
//after commit, check if time to run searcher
//NOTE commit/searcher timings don't need to align
//in worst case, we will run search next time after commit timer goes off, or at the end of ingest
if (searcherDone && runSearcher) {
//start search if previous not running
List<String> keywordLists = settings.getNamesOfEnabledKeyWordLists();
if (!keywordLists.isEmpty()) {
currentSearcher = new Searcher(keywordLists);
currentSearcher.execute();//searcher will stop timer and restart timer when done
}
}
}
}
/**
* CommitTimerAction to run by commitTimer Sets a flag to indicate we are
* ready for commit
*/
private class CommitTimerAction implements ActionListener {
private final Logger logger = Logger.getLogger(CommitTimerAction.class.getName());
@Override
public void actionPerformed(ActionEvent e) {
commitIndex = true;
logger.log(Level.INFO, "CommitTimer awake");
}
}
/**
* SearchTimerAction to run by searchTimer Sets a flag to indicate we are
* ready to search
*/
private class SearchTimerAction implements ActionListener {
private final Logger logger = Logger.getLogger(SearchTimerAction.class.getName());
@Override
public void actionPerformed(ActionEvent e) {
runSearcher = true;
logger.log(Level.INFO, "SearchTimer awake");
}
}
/**
* File indexer, processes and indexes known/allocated files,
* unknown/unallocated files and directories accordingly
@ -695,420 +548,4 @@ public final class KeywordSearchIngestModule extends IngestModuleAdapter impleme
}
}
/**
* Searcher responsible for searching the current index and writing results
* to blackboard and the inbox. Also, posts results to listeners as Ingest
* data events. Searches entire index, and keeps track of only new results
* to report and save. Runs as a background thread.
*/
private final class Searcher extends SwingWorker<Object, Void> {
/**
* Searcher has private copies/snapshots of the lists and keywords
*/
private List<Keyword> keywords; //keywords to search
private List<String> keywordLists; // lists currently being searched
private Map<String, KeywordList> keywordToList; //keyword to list name mapping
private AggregateProgressHandle progressGroup;
private final Logger logger = Logger.getLogger(Searcher.class.getName());
private boolean finalRun = false;
Searcher(List<String> keywordLists) {
this.keywordLists = new ArrayList<>(keywordLists);
this.keywords = new ArrayList<>();
this.keywordToList = new HashMap<>();
//keywords are populated as searcher runs
}
Searcher(List<String> keywordLists, boolean finalRun) {
this(keywordLists);
this.finalRun = finalRun;
}
@Override
protected Object doInBackground() throws Exception {
if (finalRun) {
logger.log(Level.INFO, "Pending start of new (final) searcher");
} else {
logger.log(Level.INFO, "Pending start of new searcher");
}
final String displayName = NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.doInBackGround.displayName")
+ (finalRun ? (" - " + NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.doInBackGround.finalizeMsg")) : "");
progressGroup = AggregateProgressFactory.createSystemHandle(displayName + (" ("
+ NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.doInBackGround.pendingMsg") + ")"), null, new Cancellable() {
@Override
public boolean cancel() {
logger.log(Level.INFO, "Cancelling the searcher by user.");
if (progressGroup != null) {
progressGroup.setDisplayName(displayName + " (" + NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.doInBackGround.cancelMsg") + "...)");
}
return Searcher.this.cancel(true);
}
}, null);
updateKeywords();
ProgressContributor[] subProgresses = new ProgressContributor[keywords.size()];
int i = 0;
for (Keyword keywordQuery : keywords) {
subProgresses[i] =
AggregateProgressFactory.createProgressContributor(keywordQuery.getQuery());
progressGroup.addContributor(subProgresses[i]);
i++;
}
progressGroup.start();
//block to ensure previous searcher is completely done with doInBackground()
//even after previous searcher cancellation, we need to check this
searcherLock.lock();
final StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
logger.log(Level.INFO, "Started a new searcher");
progressGroup.setDisplayName(displayName);
//make sure other searchers are not spawned
searcherDone = false;
runSearcher = false;
if (searchTimer.isRunning()) {
searchTimer.stop();
}
int keywordsSearched = 0;
//updateKeywords();
for (Keyword keywordQuery : keywords) {
if (this.isCancelled()) {
logger.log(Level.INFO, "Cancel detected, bailing before new keyword processed: {0}", keywordQuery.getQuery());
return null;
}
final String queryStr = keywordQuery.getQuery();
final KeywordList list = keywordToList.get(queryStr);
final String listName = list.getName();
//new subProgress will be active after the initial query
//when we know number of hits to start() with
if (keywordsSearched > 0) {
subProgresses[keywordsSearched - 1].finish();
}
KeywordSearchQuery del = null;
boolean isRegex = !keywordQuery.isLiteral();
if (isRegex) {
del = new TermComponentQuery(keywordQuery);
} else {
del = new LuceneQuery(keywordQuery);
del.escape();
}
//limit search to currently ingested data sources
//set up a filter with 1 or more image ids OR'ed
final KeywordQueryFilter dataSourceFilter = new KeywordQueryFilter(KeywordQueryFilter.FilterType.DATA_SOURCE, curDataSourceIds);
del.addFilter(dataSourceFilter);
Map<String, List<ContentHit>> queryResult;
try {
queryResult = del.performQuery();
} catch (NoOpenCoreException ex) {
logger.log(Level.WARNING, "Error performing query: " + keywordQuery.getQuery(), ex);
//no reason to continue with next query if recovery failed
//or wait for recovery to kick in and run again later
//likely case has closed and threads are being interrupted
return null;
} catch (CancellationException e) {
logger.log(Level.INFO, "Cancel detected, bailing during keyword query: {0}", keywordQuery.getQuery());
return null;
} catch (Exception e) {
logger.log(Level.WARNING, "Error performing query: " + keywordQuery.getQuery(), e);
continue;
}
// calculate new results but substracting results already obtained in this ingest
// this creates a map of each keyword to the list of unique files that have that hit.
Map<Keyword, List<ContentHit>> newResults = filterResults(queryResult, isRegex);
if (!newResults.isEmpty()) {
//write results to BB
//new artifacts created, to report to listeners
Collection<BlackboardArtifact> newArtifacts = new ArrayList<>();
//scale progress bar more more granular, per result sub-progress, within per keyword
int totalUnits = newResults.size();
subProgresses[keywordsSearched].start(totalUnits);
int unitProgress = 0;
String queryDisplayStr = keywordQuery.getQuery();
if (queryDisplayStr.length() > 50) {
queryDisplayStr = queryDisplayStr.substring(0, 49) + "...";
}
subProgresses[keywordsSearched].progress(listName + ": " + queryDisplayStr, unitProgress);
/* cycle through the keywords returned -- only one unless it was a regexp */
for (final Keyword hitTerm : newResults.keySet()) {
//checking for cancellation between results
if (this.isCancelled()) {
logger.log(Level.INFO, "Cancel detected, bailing before new hit processed for query: {0}", keywordQuery.getQuery());
return null;
}
// update progress display
String hitDisplayStr = hitTerm.getQuery();
if (hitDisplayStr.length() > 50) {
hitDisplayStr = hitDisplayStr.substring(0, 49) + "...";
}
subProgresses[keywordsSearched].progress(listName + ": " + hitDisplayStr, unitProgress);
//subProgresses[keywordsSearched].progress(unitProgress);
// this returns the unique files in the set with the first chunk that has a hit
Map<AbstractFile, Integer> contentHitsFlattened = ContentHit.flattenResults(newResults.get(hitTerm));
for (final AbstractFile hitFile : contentHitsFlattened.keySet()) {
// get the snippet for the first hit in the file
String snippet;
final String snippetQuery = KeywordSearchUtil.escapeLuceneQuery(hitTerm.getQuery());
int chunkId = contentHitsFlattened.get(hitFile);
try {
snippet = LuceneQuery.querySnippet(snippetQuery, hitFile.getId(), chunkId, isRegex, true);
} catch (NoOpenCoreException e) {
logger.log(Level.WARNING, "Error querying snippet: " + snippetQuery, e);
//no reason to continue
return null;
} catch (Exception e) {
logger.log(Level.WARNING, "Error querying snippet: " + snippetQuery, e);
continue;
}
// write the blackboard artifact for this keyword in this file
KeywordWriteResult written = del.writeToBlackBoard(hitTerm.getQuery(), hitFile, snippet, listName);
if (written == null) {
logger.log(Level.WARNING, "BB artifact for keyword hit not written, file: {0}, hit: {1}", new Object[]{hitFile, hitTerm.toString()});
continue;
}
newArtifacts.add(written.getArtifact());
//generate an ingest inbox message for this keyword in this file
if (list.getIngestMessages()) {
StringBuilder subjectSb = new StringBuilder();
StringBuilder detailsSb = new StringBuilder();
//final int hitFiles = newResults.size();
if (!keywordQuery.isLiteral()) {
subjectSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.regExpHitLbl"));
} else {
subjectSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.kwHitLbl"));
}
//subjectSb.append("<");
String uniqueKey = null;
BlackboardAttribute attr = written.getAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_KEYWORD.getTypeID());
if (attr != null) {
final String keyword = attr.getValueString();
subjectSb.append(keyword);
uniqueKey = keyword.toLowerCase();
}
//subjectSb.append(">");
//String uniqueKey = queryStr;
//details
detailsSb.append("<table border='0' cellpadding='4' width='280'>");
//hit
detailsSb.append("<tr>");
detailsSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.kwHitLThLbl"));
detailsSb.append("<td>").append(EscapeUtil.escapeHtml(attr.getValueString())).append("</td>");
detailsSb.append("</tr>");
//preview
attr = written.getAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_KEYWORD_PREVIEW.getTypeID());
if (attr != null) {
detailsSb.append("<tr>");
detailsSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.previewThLbl"));
detailsSb.append("<td>").append(EscapeUtil.escapeHtml(attr.getValueString())).append("</td>");
detailsSb.append("</tr>");
}
//file
detailsSb.append("<tr>");
detailsSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.fileThLbl"));
detailsSb.append("<td>").append(hitFile.getParentPath()).append(hitFile.getName()).append("</td>");
detailsSb.append("</tr>");
//list
attr = written.getAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_SET_NAME.getTypeID());
detailsSb.append("<tr>");
detailsSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.listThLbl"));
detailsSb.append("<td>").append(attr.getValueString()).append("</td>");
detailsSb.append("</tr>");
//regex
if (!keywordQuery.isLiteral()) {
attr = written.getAttribute(BlackboardAttribute.ATTRIBUTE_TYPE.TSK_KEYWORD_REGEXP.getTypeID());
if (attr != null) {
detailsSb.append("<tr>");
detailsSb.append(NbBundle.getMessage(this.getClass(), "KeywordSearchIngestModule.regExThLbl"));
detailsSb.append("<td>").append(attr.getValueString()).append("</td>");
detailsSb.append("</tr>");
}
}
detailsSb.append("</table>");
services.postMessage(IngestMessage.createDataMessage(messageID.getAndIncrement(), KeywordSearchModuleFactory.getModuleName(), subjectSb.toString(), detailsSb.toString(), uniqueKey, written.getArtifact()));
}
} //for each file hit
++unitProgress;
}//for each hit term
//update artifact browser
if (!newArtifacts.isEmpty()) {
services.fireModuleDataEvent(new ModuleDataEvent(KeywordSearchModuleFactory.getModuleName(), ARTIFACT_TYPE.TSK_KEYWORD_HIT, newArtifacts));
}
} //if has results
//reset the status text before it goes away
subProgresses[keywordsSearched].progress("");
++keywordsSearched;
} //for each keyword
} //end try block
catch (Exception ex) {
logger.log(Level.WARNING, "searcher exception occurred", ex);
} finally {
try {
finalizeSearcher();
stopWatch.stop();
logger.log(Level.INFO, "Searcher took to run: {0} secs.", stopWatch.getElapsedTimeSecs());
} finally {
searcherLock.unlock();
}
}
return null;
}
@Override
protected void done() {
// call get to see if there were any errors
try {
get();
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.SEVERE, "Error performing keyword search: " + e.getMessage());
services.postMessage(IngestMessage.createErrorMessage(messageID.getAndIncrement(), KeywordSearchModuleFactory.getModuleName(), "Error performing keyword search", e.getMessage()));
} // catch and ignore if we were cancelled
catch (java.util.concurrent.CancellationException ex) {
}
}
/**
* Sync-up the updated keywords from the currently used lists in the XML
*/
private void updateKeywords() {
KeywordSearchListsXML loader = KeywordSearchListsXML.getCurrent();
this.keywords.clear();
this.keywordToList.clear();
for (String name : this.keywordLists) {
KeywordList list = loader.getList(name);
for (Keyword k : list.getKeywords()) {
this.keywords.add(k);
this.keywordToList.put(k.getQuery(), list);
}
}
}
//perform all essential cleanup that needs to be done right AFTER doInBackground() returns
//without relying on done() method that is not guaranteed to run after background thread completes
//NEED to call this method always right before doInBackground() returns
/**
* Performs the cleanup that needs to be done right AFTER
* doInBackground() returns without relying on done() method that is not
* guaranteed to run after background thread completes REQUIRED to call
* this method always right before doInBackground() returns
*/
private void finalizeSearcher() {
logger.log(Level.INFO, "Searcher finalizing");
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
progressGroup.finish();
}
});
searcherDone = true; //next currentSearcher can start
if (finalRun) {
//this is the final searcher
logger.log(Level.INFO, "The final searcher in this ingest done.");
//run module cleanup
cleanup();
} else {
//start counting time for a new searcher to start
//unless final searcher is pending
if (finalSearcher == null) {
//we need a new Timer object, because restarting previus will not cause firing of the action
final int updateIntervalMs = KeywordSearchSettings.getUpdateFrequency().getTime() * 60 * 1000;
searchTimer = new Timer(updateIntervalMs, new SearchTimerAction());
searchTimer.start();
}
}
}
//calculate new results but substracting results already obtained in this ingest
//update currentResults map with the new results
private Map<Keyword, List<ContentHit>> filterResults(Map<String, List<ContentHit>> queryResult, boolean isRegex) {
Map<Keyword, List<ContentHit>> newResults = new HashMap<>();
for (String termResult : queryResult.keySet()) {
List<ContentHit> queryTermResults = queryResult.get(termResult);
//translate to list of IDs that we keep track of
List<Long> queryTermResultsIDs = new ArrayList<>();
for (ContentHit ch : queryTermResults) {
queryTermResultsIDs.add(ch.getId());
}
Keyword termResultK = new Keyword(termResult, !isRegex);
List<Long> curTermResults = currentResults.get(termResultK);
if (curTermResults == null) {
currentResults.put(termResultK, queryTermResultsIDs);
newResults.put(termResultK, queryTermResults);
} else {
//some AbstractFile hits already exist for this keyword
for (ContentHit res : queryTermResults) {
if (!curTermResults.contains(res.getId())) {
//add to new results
List<ContentHit> newResultsFs = newResults.get(termResultK);
if (newResultsFs == null) {
newResultsFs = new ArrayList<>();
newResults.put(termResultK, newResultsFs);
}
newResultsFs.add(res);
curTermResults.add(res.getId());
}
}
}
}
return newResults;
}
}
}

View File

@ -121,6 +121,26 @@ public final class SearchRunner {
doFinalSearch(job);
}
public void stopJob(long jobId) {
SearchJobInfo job;
synchronized(this) {
job = jobs.get(jobId);
if (job == null) {
return;
}
///@todo
//stop currentSearcher
// if (currentSearcher != null) {
// currentSearcher.cancel(true);
// }
jobs.remove(jobId);
}
commit();
}
/**
* Add this list to all of the jobs
* @param keywordListName
@ -201,6 +221,7 @@ public final class SearchRunner {
// Spawn a search thread for each job
for(Entry<Long, SearchJobInfo> j : jobs.entrySet()) {
SearchJobInfo job = j.getValue();
// If no lists or the worker is already running then skip it
if (!job.getKeywordListNames().isEmpty() && !job.isWorkerRunning()) {
Searcher s = new Searcher(job);
s.execute();