diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index de2042d62d..944a482471 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -26,7 +26,6 @@ import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -49,7 +48,6 @@ import org.sleuthkit.datamodel.AbstractFile; */ public class IngestManager { - private static final int DEFAULT_NUMBER_OF_DATA_SOURCE_INGEST_THREADS = 1; private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1; private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16; private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2; @@ -64,9 +62,7 @@ public class IngestManager { private final ExecutorService fileIngestThreadPool; private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor(); private final AtomicLong nextThreadId = new AtomicLong(0L); - private final ConcurrentHashMap> startIngestJobThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. - private final ConcurrentHashMap> dataSourceIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. - private final ConcurrentHashMap> fileIngestThreads = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. + private final ConcurrentHashMap> startIngestJobsTasks = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. private final Object processedFilesSnapshotLock = new Object(); @@ -92,11 +88,13 @@ public class IngestManager { } /** - * Starts the ingest monitor and the data source ingest and file ingest - * threads. + * Starts the ingest monitor and submits task execution tasks (Callable + * objects) to the data source ingest and file ingest thread pools. The task + * execution tasks are simple consumers that will normally run as long as + * the application runs */ private IngestManager() { - startDataSourceIngestThread(); + startDataSourceIngestTask(); numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) { @@ -105,7 +103,7 @@ public class IngestManager { } fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads); for (int i = 0; i < numberOfFileIngestThreads; ++i) { - startFileIngestThread(); + startFileIngestTask(); } } @@ -118,16 +116,6 @@ public class IngestManager { ingestMessageBox = IngestMessageTopComponent.findInstance(); } - /** - * Gets the number of data source ingest threads the ingest manager will - * use. - * - * @return The number of data source ingest threads. - */ - public int getNumberOfDataSourceIngestThreads() { - return DEFAULT_NUMBER_OF_DATA_SOURCE_INGEST_THREADS; - } - /** * Gets the number of file ingest threads the ingest manager will use. * @@ -138,43 +126,33 @@ public class IngestManager { } /** - * Submits a DataSourceIngestThread Runnable to the data source ingest + * Submits a ExecuteIngestTasksTask Callable to the data source ingest task * thread pool. */ - private void startDataSourceIngestThread() { + private void startDataSourceIngestTask() { long threadId = nextThreadId.incrementAndGet(); - Future handle = dataSourceIngestThreadPool.submit(new ExecuteIngestTasksThread(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue())); - dataSourceIngestThreads.put(threadId, handle); + dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } /** - * Submits a DataSourceIngestThread Runnable to the data source ingest + * Submits a ExecuteIngestTasksTask Callable to the data source ingest * thread pool. */ - private void startFileIngestThread() { + private void startFileIngestTask() { long threadId = nextThreadId.incrementAndGet(); - Future handle = fileIngestThreadPool.submit(new ExecuteIngestTasksThread(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue())); - fileIngestThreads.put(threadId, handle); + fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } - /** - * Cancels a DataSourceIngestThread Runnable in the file ingest thread pool. - */ - private void cancelFileIngestThread(long threadId) { - Future handle = fileIngestThreads.remove(threadId); - handle.cancel(true); - } - synchronized void startIngestJobs(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { if (!isIngestRunning()) { clearIngestMessageBox(); } - + long taskId = nextThreadId.incrementAndGet(); - Future task = startIngestJobsThreadPool.submit(new StartIngestJobsThread(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); - startIngestJobThreads.put(taskId, task); + Future task = startIngestJobsThreadPool.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); + startIngestJobsTasks.put(taskId, task); } private void subscribeToCaseEvents() { @@ -207,9 +185,9 @@ public class IngestManager { if (ingestMessageBox != null) { ingestMessageBox.clearMessages(); } - ingestErrorMessagePosts.set(0); + ingestErrorMessagePosts.set(0); } - + /** * Test if any ingest jobs are in progress. * @@ -226,7 +204,7 @@ public class IngestManager { void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) { ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), ingestModuleDisplayName, task.getDataSource(), task.getFile())); } - + void setIngestTaskProgressCompleted(DataSourceIngestTask task) { ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId())); } @@ -253,18 +231,8 @@ public class IngestManager { public void cancelAllIngestJobs() { // Stop creating new ingest jobs. - for (Future handle : startIngestJobThreads.values()) { + for (Future handle : startIngestJobsTasks.values()) { handle.cancel(true); - try { - // Blocks until the job starting thread responds. The thread - // removes itself from this collection, which does not disrupt - // this loop since the collection is a ConcurrentHashMap. - handle.get(); - } catch (InterruptedException | ExecutionException ex) { - // This should never happen, something is awry, but everything - // should be o.k. anyway. - logger.log(Level.SEVERE, "Unexpected thread interrupt", ex); //NON-NLS - } } // Cancel all the jobs already created. @@ -391,7 +359,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobStarted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); } /** @@ -400,7 +368,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobCompleted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); } /** @@ -409,7 +377,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobCancelled(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); } /** @@ -418,7 +386,7 @@ public class IngestManager { * @param fileId The object id of file. */ void fireFileIngestDone(long fileId) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, fileId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, fileId, null)); } /** @@ -427,7 +395,7 @@ public class IngestManager { * @param moduleDataEvent A ModuleDataEvent with the details of the posting. */ void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); } /** @@ -438,7 +406,7 @@ public class IngestManager { * content. */ void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { - fireIngestEventsThreadPool.submit(new FireIngestEventThread(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); } /** @@ -482,7 +450,7 @@ public class IngestManager { /** * Creates ingest jobs. */ - private class StartIngestJobsThread implements Callable { + private class StartIngestJobsTask implements Callable { private final long threadId; private final List dataSources; @@ -490,7 +458,7 @@ public class IngestManager { private final boolean processUnallocatedSpace; private ProgressHandle progress; - StartIngestJobsThread(long threadId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + StartIngestJobsTask(long threadId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { this.threadId = threadId; this.dataSources = dataSources; this.moduleTemplates = moduleTemplates; @@ -510,7 +478,8 @@ public class IngestManager { "IngestManager.StartIngestJobsTask.run.cancelling", displayName)); } - cancelFileIngestThread(threadId); + Future handle = startIngestJobsTasks.remove(threadId); + handle.cancel(true); return true; } }); @@ -564,7 +533,7 @@ public class IngestManager { Thread.currentThread().interrupt(); } finally { progress.finish(); - startIngestJobThreads.remove(threadId); + startIngestJobsTasks.remove(threadId); } return null; } @@ -573,12 +542,12 @@ public class IngestManager { /** * A consumer for an ingest task queue. */ - private class ExecuteIngestTasksThread implements Runnable { + private class ExecuteIngestTasksTask implements Runnable { private final long threadId; private final IngestTaskQueue tasks; - ExecuteIngestTasksThread(long threadId, IngestTaskQueue tasks) { + ExecuteIngestTasksTask(long threadId, IngestTaskQueue tasks) { this.threadId = threadId; this.tasks = tasks; } @@ -602,7 +571,7 @@ public class IngestManager { /** * Fires ingest events to ingest manager property change listeners. */ - private static class FireIngestEventThread implements Runnable { + private static class FireIngestEventTask implements Runnable { private final PropertyChangeSupport publisher; private final IngestJobEvent jobEvent; @@ -610,7 +579,7 @@ public class IngestManager { private final Object oldValue; private final Object newValue; - FireIngestEventThread(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) { + FireIngestEventTask(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) { this.publisher = publisher; this.jobEvent = event; this.moduleEvent = null; @@ -618,7 +587,7 @@ public class IngestManager { this.newValue = newValue; } - FireIngestEventThread(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) { + FireIngestEventTask(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) { this.publisher = publisher; this.jobEvent = null; this.moduleEvent = event; diff --git a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Ingester.java b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Ingester.java index 2ae27c33e5..d6b2a3e724 100644 --- a/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Ingester.java +++ b/KeywordSearch/src/org/sleuthkit/autopsy/keywordsearch/Ingester.java @@ -186,12 +186,6 @@ class Ingester { * Visitor used to create param list to send to SOLR index. */ private class GetContentFieldsV extends ContentVisitor.Default> { - - private SleuthkitCase curCase = null; - - GetContentFieldsV() { - curCase = Case.getCurrentCase().getSleuthkitCase(); - } @Override protected Map defaultVisit(Content cntnt) { @@ -246,7 +240,7 @@ class Ingester { params.put(Server.Schema.ID.toString(), Long.toString(af.getId())); long dataSourceId = -1; try { - dataSourceId = curCase.getFileDataSource(af); + dataSourceId = Case.getCurrentCase().getSleuthkitCase().getFileDataSource(af); params.put(Server.Schema.IMAGE_ID.toString(), Long.toString(dataSourceId)); } catch (TskCoreException ex) { logger.log(Level.SEVERE, "Could not get data source id to properly index the file " + af.getId()); //NON-NLS