Multithreaded keyword search service that indexes and commits every interval seconds.

This commit is contained in:
adam-m 2012-02-07 15:48:50 -05:00
parent 3686752875
commit dfb0536dc9

View File

@ -18,11 +18,14 @@
*/ */
package org.sleuthkit.autopsy.keywordsearch; package org.sleuthkit.autopsy.keywordsearch;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.swing.SwingUtilities; import javax.swing.SwingUtilities;
import javax.swing.SwingWorker;
import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.SolrServerException;
import org.sleuthkit.autopsy.ingest.IngestManagerProxy; import org.sleuthkit.autopsy.ingest.IngestManagerProxy;
import org.sleuthkit.autopsy.ingest.IngestMessage; import org.sleuthkit.autopsy.ingest.IngestMessage;
@ -42,6 +45,14 @@ public final class KeywordSearchIngestService implements IngestServiceFsContent
private static final long MAX_STRING_EXTRACT_SIZE = 10 * (1 << 10) * (1 << 10); private static final long MAX_STRING_EXTRACT_SIZE = 10 * (1 << 10) * (1 << 10);
private static final long MAX_INDEX_SIZE = 200 * (1 << 10) * (1 << 10); private static final long MAX_INDEX_SIZE = 200 * (1 << 10) * (1 << 10);
private Ingester ingester; private Ingester ingester;
private volatile boolean commitIndex = false; //whether to commit index next time
private volatile boolean runTimer = false;
//threads
private final Object lock = new Object(); //queue synchronization
private Thread timer;
private SwingWorker searcher;
private Indexer indexer;
private static final String[] ingestibleExtensions = {"tar", "jar", "zip", "bzip2", private static final String[] ingestibleExtensions = {"tar", "jar", "zip", "bzip2",
"gz", "tgz", "doc", "xls", "ppt", "rtf", "pdf", "html", "xhtml", "txt", "gz", "tgz", "doc", "xls", "ppt", "rtf", "pdf", "html", "xhtml", "txt",
"bmp", "gif", "png", "jpeg", "tiff", "mp3", "aiff", "au", "midi", "wav", "bmp", "gif", "png", "jpeg", "tiff", "mp3", "aiff", "au", "midi", "wav",
@ -49,9 +60,9 @@ public final class KeywordSearchIngestService implements IngestServiceFsContent
public enum IngestStatus { public enum IngestStatus {
INGESTED, EXTRACTED_INGESTED, SKIPPED_EXTRACTION, INGESTED, EXTRACTED_INGESTED, SKIPPED_EXTRACTION,};
};
private Map<Long, IngestStatus> ingestStatus; private Map<Long, IngestStatus> ingestStatus;
private Map<String, List<FsContent>> reportedHits; //already reported hits
public static synchronized KeywordSearchIngestService getDefault() { public static synchronized KeywordSearchIngestService getDefault() {
if (instance == null) { if (instance == null) {
@ -62,75 +73,40 @@ public final class KeywordSearchIngestService implements IngestServiceFsContent
@Override @Override
public void process(FsContent fsContent) { public void process(FsContent fsContent) {
final long size = fsContent.getSize(); //enqueue(fsContent);
//logger.log(Level.INFO, "Processing fsContent: " + fsContent.getName()); if (commitIndex) {
if (!fsContent.isFile()) { logger.log(Level.INFO, "Commiting index");
return; commit();
} commitIndex = false;
indexChangeNotify();
if (size == 0 || size > MAX_INDEX_SIZE) {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
return;
}
boolean ingestible = false;
final String fileName = fsContent.getName();
for (String ext : ingestibleExtensions) {
if (fileName.endsWith(ext)) {
ingestible = true;
break;
}
}
if (ingestible == true) {
try {
//logger.log(Level.INFO, "indexing: " + fsContent.getName());
ingester.ingest(fsContent);
ingestStatus.put(fsContent.getId(), IngestStatus.INGESTED);
} catch (IngesterException e) {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
//try to extract strings
processNonIngestible(fsContent);
}
} else {
processNonIngestible(fsContent);
} }
indexer.indexFile(fsContent);
} }
private void processNonIngestible(FsContent fsContent) {
if (fsContent.getSize() < MAX_STRING_EXTRACT_SIZE) {
if (!extractAndIngest(fsContent)) {
logger.log(Level.INFO, "Failed to extract strings and ingest, file '" + fsContent.getName() + "' (id: " + fsContent.getId() + ").");
} else {
ingestStatus.put(fsContent.getId(), IngestStatus.EXTRACTED_INGESTED);
}
} else {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
}
}
@Override @Override
public void complete() { public void complete() {
//logger.log(Level.INFO, "complete()"); logger.log(Level.INFO, "complete()");
ingester.commit(); runTimer = false;
commit();
//signal a potential change in number of indexed files //signal a potential change in number of indexed files
try { indexChangeNotify();
final int numIndexedFiles = KeywordSearch.getServer().getCore().queryNumIndexedFiles();
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
KeywordSearch.changeSupport.firePropertyChange(KeywordSearch.NUM_FILES_CHANGE_EVT, null, new Integer(numIndexedFiles));
}
});
} catch (SolrServerException se) {
logger.log(Level.INFO, "Error executing Solr query to check number of indexed files: ", se);
}
managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Complete")); managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Complete"));
//manager.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Indexed files: " + ingestStat)); //postSummary();
}
@Override
public void stop() {
logger.log(Level.INFO, "stop()");
runTimer = false;
commit();
indexChangeNotify();
//postSummary(); //postSummary();
} }
@ -149,51 +125,36 @@ public final class KeywordSearchIngestService implements IngestServiceFsContent
ingestStatus = new HashMap<Long, IngestStatus>(); ingestStatus = new HashMap<Long, IngestStatus>();
reportedHits = new HashMap<String, List<FsContent>>();
indexer = new Indexer();
//final int commitIntervalMs = managerProxy.getUpdateFrequency() * 1000;
final int commitIntervalMs = 60 * 1000;
timer = new CommitTimer(commitIntervalMs);
runTimer = true;
timer.start();
managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Started")); managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Started"));
} }
@Override
public void stop() {
logger.log(Level.INFO, "stop()");
ingester.commit();
//signal a potential change in number of indexed files
try {
final int numIndexedFiles = KeywordSearch.getServer().getCore().queryNumIndexedFiles();
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
KeywordSearch.changeSupport.firePropertyChange(KeywordSearch.NUM_FILES_CHANGE_EVT, null, new Integer(numIndexedFiles));
}
});
} catch (SolrServerException se) {
logger.log(Level.INFO, "Error executing Solr query to check number of indexed files: ", se);
}
//postSummary();
}
@Override @Override
public ServiceType getType() { public ServiceType getType() {
return ServiceType.FsContent; return ServiceType.FsContent;
} }
@Override @Override
public void userConfigure() { public void userConfigure() {
} }
private boolean extractAndIngest(FsContent f) { private void commit() {
boolean success = false; synchronized(lock) {
FsContentStringStream fscs = new FsContentStringStream(f, FsContentStringStream.Encoding.ASCII); ingester.commit();
try {
fscs.convert();
ingester.ingest(fscs);
success = true;
} catch (TskException tskEx) {
logger.log(Level.INFO, "Problem extracting string from file: '" + f.getName() + "' (id: " + f.getId() + ").", tskEx);
} catch (IngesterException ingEx) {
logger.log(Level.INFO, "Ingester had a problem with extracted strings from file '" + f.getName() + "' (id: " + f.getId() + ").", ingEx);
} }
return success;
} }
private void postSummary() { private void postSummary() {
@ -219,4 +180,125 @@ public final class KeywordSearchIngestService implements IngestServiceFsContent
managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Indexed strings: " + indexed_extr)); managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Indexed strings: " + indexed_extr));
managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Skipped files: " + skipped)); managerProxy.postMessage(IngestMessage.createMessage(++messageID, MessageType.INFO, this, "Skipped files: " + skipped));
} }
private void indexChangeNotify() {
//signal a potential change in number of indexed files
try {
final int numIndexedFiles = KeywordSearch.getServer().getCore().queryNumIndexedFiles();
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
KeywordSearch.changeSupport.firePropertyChange(KeywordSearch.NUM_FILES_CHANGE_EVT, null, new Integer(numIndexedFiles));
}
});
} catch (SolrServerException se) {
logger.log(Level.INFO, "Error executing Solr query to check number of indexed files: ", se);
}
}
//CommitTimer wakes up every interval ms
//and sets a flag for indexer to commit after indexing next file
private class CommitTimer extends Thread {
private final Logger logger = Logger.getLogger(CommitTimer.class.getName());
private int interval;
CommitTimer(int interval) {
this.interval = interval;
}
@Override
public void run() {
while (runTimer )
{
try {
Thread.sleep(interval);
commitIndex = true;
logger.log(Level.INFO, "CommitTimer awake");
}
catch (InterruptedException e) {
}
}
commitIndex = false;
return;
}
}
//Indexer thread that processes files in the queue
//commits when timer expires
//sleeps if nothing in the queue
private class Indexer {
private final Logger logger = Logger.getLogger(Indexer.class.getName());
private boolean extractAndIngest(FsContent f) {
boolean success = false;
FsContentStringStream fscs = new FsContentStringStream(f, FsContentStringStream.Encoding.ASCII);
try {
fscs.convert();
ingester.ingest(fscs);
success = true;
} catch (TskException tskEx) {
logger.log(Level.INFO, "Problem extracting string from file: '" + f.getName() + "' (id: " + f.getId() + ").", tskEx);
} catch (IngesterException ingEx) {
logger.log(Level.INFO, "Ingester had a problem with extracted strings from file '" + f.getName() + "' (id: " + f.getId() + ").", ingEx);
}
return success;
}
private void indexFile(FsContent fsContent) {
final long size = fsContent.getSize();
//logger.log(Level.INFO, "Processing fsContent: " + fsContent.getName());
if (!fsContent.isFile()) {
return;
}
if (size == 0 || size > MAX_INDEX_SIZE) {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
return;
}
boolean ingestible = false;
final String fileName = fsContent.getName();
for (String ext : ingestibleExtensions) {
if (fileName.endsWith(ext)) {
ingestible = true;
break;
}
}
if (ingestible == true) {
try {
//logger.log(Level.INFO, "indexing: " + fsContent.getName());
ingester.ingest(fsContent);
ingestStatus.put(fsContent.getId(), IngestStatus.INGESTED);
} catch (IngesterException e) {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
//try to extract strings
processNonIngestible(fsContent);
}
} else {
processNonIngestible(fsContent);
}
}
private void processNonIngestible(FsContent fsContent) {
if (fsContent.getSize() < MAX_STRING_EXTRACT_SIZE) {
if (!extractAndIngest(fsContent)) {
logger.log(Level.INFO, "Failed to extract strings and ingest, file '" + fsContent.getName() + "' (id: " + fsContent.getId() + ").");
} else {
ingestStatus.put(fsContent.getId(), IngestStatus.EXTRACTED_INGESTED);
}
} else {
ingestStatus.put(fsContent.getId(), IngestStatus.SKIPPED_EXTRACTION);
}
}
}
} }