Fix IngestManager cancellation of all jobs, KeywordSearch.Ingest bug

This commit is contained in:
Richard Cordovano 2014-07-17 15:28:48 -04:00
parent e5026a108b
commit 30f305d40d
2 changed files with 37 additions and 74 deletions

View File

@ -26,7 +26,6 @@ import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -49,7 +48,6 @@ import org.sleuthkit.datamodel.AbstractFile;
*/
public class IngestManager {
private static final int DEFAULT_NUMBER_OF_DATA_SOURCE_INGEST_THREADS = 1;
private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1;
private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16;
private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2;
@ -64,9 +62,7 @@ public class IngestManager {
private final ExecutorService fileIngestThreadPool;
private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor();
private final AtomicLong nextThreadId = new AtomicLong(0L);
private final ConcurrentHashMap<Long, Future<Void>> startIngestJobThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ConcurrentHashMap<Long, Future<?>> dataSourceIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ConcurrentHashMap<Long, Future<?>> fileIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final ConcurrentHashMap<Long, Future<Void>> startIngestJobsTasks = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots.
private final Object processedFilesSnapshotLock = new Object();
@ -92,11 +88,13 @@ public class IngestManager {
}
/**
* Starts the ingest monitor and the data source ingest and file ingest
* threads.
* Starts the ingest monitor and submits task execution tasks (Callable
* objects) to the data source ingest and file ingest thread pools. The task
* execution tasks are simple consumers that will normally run as long as
* the application runs
*/
private IngestManager() {
startDataSourceIngestThread();
startDataSourceIngestTask();
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
@ -105,7 +103,7 @@ public class IngestManager {
}
fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads);
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
startFileIngestThread();
startFileIngestTask();
}
}
@ -118,16 +116,6 @@ public class IngestManager {
ingestMessageBox = IngestMessageTopComponent.findInstance();
}
/**
* Gets the number of data source ingest threads the ingest manager will
* use.
*
* @return The number of data source ingest threads.
*/
public int getNumberOfDataSourceIngestThreads() {
return DEFAULT_NUMBER_OF_DATA_SOURCE_INGEST_THREADS;
}
/**
* Gets the number of file ingest threads the ingest manager will use.
*
@ -138,43 +126,33 @@ public class IngestManager {
}
/**
* Submits a DataSourceIngestThread Runnable to the data source ingest
* Submits a ExecuteIngestTasksTask Callable to the data source ingest task
* thread pool.
*/
private void startDataSourceIngestThread() {
private void startDataSourceIngestTask() {
long threadId = nextThreadId.incrementAndGet();
Future<?> handle = dataSourceIngestThreadPool.submit(new ExecuteIngestTasksThread(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue()));
dataSourceIngestThreads.put(threadId, handle);
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
/**
* Submits a DataSourceIngestThread Runnable to the data source ingest
* Submits a ExecuteIngestTasksTask Callable to the data source ingest
* thread pool.
*/
private void startFileIngestThread() {
private void startFileIngestTask() {
long threadId = nextThreadId.incrementAndGet();
Future<?> handle = fileIngestThreadPool.submit(new ExecuteIngestTasksThread(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue()));
fileIngestThreads.put(threadId, handle);
fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
/**
* Cancels a DataSourceIngestThread Runnable in the file ingest thread pool.
*/
private void cancelFileIngestThread(long threadId) {
Future<?> handle = fileIngestThreads.remove(threadId);
handle.cancel(true);
}
synchronized void startIngestJobs(final List<Content> dataSources, final List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
if (!isIngestRunning()) {
clearIngestMessageBox();
}
long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobsThread(taskId, dataSources, moduleTemplates, processUnallocatedSpace));
startIngestJobThreads.put(taskId, task);
Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace));
startIngestJobsTasks.put(taskId, task);
}
private void subscribeToCaseEvents() {
@ -253,18 +231,8 @@ public class IngestManager {
public void cancelAllIngestJobs() {
// Stop creating new ingest jobs.
for (Future<Void> handle : startIngestJobThreads.values()) {
for (Future<Void> handle : startIngestJobsTasks.values()) {
handle.cancel(true);
try {
// Blocks until the job starting thread responds. The thread
// removes itself from this collection, which does not disrupt
// this loop since the collection is a ConcurrentHashMap.
handle.get();
} catch (InterruptedException | ExecutionException ex) {
// This should never happen, something is awry, but everything
// should be o.k. anyway.
logger.log(Level.SEVERE, "Unexpected thread interrupt", ex); //NON-NLS
}
}
// Cancel all the jobs already created.
@ -391,7 +359,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobStarted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
}
/**
@ -400,7 +368,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCompleted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
}
/**
@ -409,7 +377,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCancelled(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
}
/**
@ -418,7 +386,7 @@ public class IngestManager {
* @param fileId The object id of file.
*/
void fireFileIngestDone(long fileId) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, fileId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, fileId, null));
}
/**
@ -427,7 +395,7 @@ public class IngestManager {
* @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
}
/**
@ -438,7 +406,7 @@ public class IngestManager {
* content.
*/
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
}
/**
@ -482,7 +450,7 @@ public class IngestManager {
/**
* Creates ingest jobs.
*/
private class StartIngestJobsThread implements Callable<Void> {
private class StartIngestJobsTask implements Callable<Void> {
private final long threadId;
private final List<Content> dataSources;
@ -490,7 +458,7 @@ public class IngestManager {
private final boolean processUnallocatedSpace;
private ProgressHandle progress;
StartIngestJobsThread(long threadId, List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
StartIngestJobsTask(long threadId, List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
this.threadId = threadId;
this.dataSources = dataSources;
this.moduleTemplates = moduleTemplates;
@ -510,7 +478,8 @@ public class IngestManager {
"IngestManager.StartIngestJobsTask.run.cancelling",
displayName));
}
cancelFileIngestThread(threadId);
Future<?> handle = startIngestJobsTasks.remove(threadId);
handle.cancel(true);
return true;
}
});
@ -564,7 +533,7 @@ public class IngestManager {
Thread.currentThread().interrupt();
} finally {
progress.finish();
startIngestJobThreads.remove(threadId);
startIngestJobsTasks.remove(threadId);
}
return null;
}
@ -573,12 +542,12 @@ public class IngestManager {
/**
* A consumer for an ingest task queue.
*/
private class ExecuteIngestTasksThread implements Runnable {
private class ExecuteIngestTasksTask implements Runnable {
private final long threadId;
private final IngestTaskQueue tasks;
ExecuteIngestTasksThread(long threadId, IngestTaskQueue tasks) {
ExecuteIngestTasksTask(long threadId, IngestTaskQueue tasks) {
this.threadId = threadId;
this.tasks = tasks;
}
@ -602,7 +571,7 @@ public class IngestManager {
/**
* Fires ingest events to ingest manager property change listeners.
*/
private static class FireIngestEventThread implements Runnable {
private static class FireIngestEventTask implements Runnable {
private final PropertyChangeSupport publisher;
private final IngestJobEvent jobEvent;
@ -610,7 +579,7 @@ public class IngestManager {
private final Object oldValue;
private final Object newValue;
FireIngestEventThread(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) {
FireIngestEventTask(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) {
this.publisher = publisher;
this.jobEvent = event;
this.moduleEvent = null;
@ -618,7 +587,7 @@ public class IngestManager {
this.newValue = newValue;
}
FireIngestEventThread(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) {
FireIngestEventTask(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) {
this.publisher = publisher;
this.jobEvent = null;
this.moduleEvent = event;

View File

@ -187,12 +187,6 @@ class Ingester {
*/
private class GetContentFieldsV extends ContentVisitor.Default<Map<String, String>> {
private SleuthkitCase curCase = null;
GetContentFieldsV() {
curCase = Case.getCurrentCase().getSleuthkitCase();
}
@Override
protected Map<String, String> defaultVisit(Content cntnt) {
return new HashMap<String, String>();
@ -246,7 +240,7 @@ class Ingester {
params.put(Server.Schema.ID.toString(), Long.toString(af.getId()));
long dataSourceId = -1;
try {
dataSourceId = curCase.getFileDataSource(af);
dataSourceId = Case.getCurrentCase().getSleuthkitCase().getFileDataSource(af);
params.put(Server.Schema.IMAGE_ID.toString(), Long.toString(dataSourceId));
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Could not get data source id to properly index the file " + af.getId()); //NON-NLS