From 13a176f413e3582f5b217b3298771639da16b11b Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Fri, 4 Apr 2014 09:43:28 -0400 Subject: [PATCH 1/6] Removed sleeps from IngestManager.stoAll(), added more cancel checks --- .../autopsy/ingest/IngestManager.java | 76 +++++++++---------- 1 file changed, 37 insertions(+), 39 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 64159e19f8..833575169a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import org.openide.util.NbBundle; import org.sleuthkit.autopsy.coreutils.Logger; @@ -362,15 +363,9 @@ public class IngestManager { // First get the task scheduling worker to stop adding new tasks. if (taskSchedulingWorker != null) { taskSchedulingWorker.cancel(true); - while (!taskSchedulingWorker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } taskSchedulingWorker = null; - } - + } + // Now mark all of the ingest jobs as cancelled. This way the ingest // modules will know they are being shut down due to cancellation when // the cancelled ingest workers release their pipelines. @@ -378,16 +373,15 @@ public class IngestManager { job.cancel(); } + // Jettision the remaining tasks. This will dispose of any tasks that + // the scheduling worker queued up before it was cancelled. + scheduler.getFileScheduler().empty(); + scheduler.getDataSourceScheduler().empty(); + // Cancel the data source task worker. It will release its pipelines // in its done() method and the pipelines will shut down their modules. if (dataSourceTaskWorker != null) { dataSourceTaskWorker.cancel(true); - while (!dataSourceTaskWorker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } dataSourceTaskWorker = null; } @@ -397,26 +391,20 @@ public class IngestManager { for (FileTaskWorker worker : fileTaskWorkers) { if (worker != null) { worker.cancel(true); - while (!worker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } worker = null; } } - // Jettision the remaining tasks. This will dispose of any tasks that - // the scheduling worker queued up before it was cancelled. + // Jettision the remaining tasks again to try to dispose of any tasks + // queued up task workers before they were cancelled. scheduler.getFileScheduler().empty(); scheduler.getDataSourceScheduler().empty(); } /** - * Test if any ingest modules are running + * Test if any ingest jobs are in progress. * - * @return true if any module is running, false otherwise + * @return True if any ingest jobs are in progress, false otherwise */ public synchronized boolean isIngestRunning() { // TODO: There is a race condition here with SwingWorker.isDone(). @@ -474,6 +462,7 @@ public class IngestManager { private final List moduleTemplates; private final boolean processUnallocatedSpace; private ProgressHandle progress; + private volatile boolean finished = false; TaskSchedulingWorker(List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { this.dataSources = dataSources; @@ -483,13 +472,10 @@ public class IngestManager { @Override protected Object doInBackground() throws Exception { - // Set up a progress bar that can be used to cancel all of the - // ingest jobs currently being performed. final String displayName = "Queueing ingest tasks"; progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { @Override public boolean cancel() { - logger.log(Level.INFO, "Queueing ingest cancelled by user."); if (progress != null) { progress.setDisplayName(displayName + " (Cancelling...)"); } @@ -502,10 +488,8 @@ public class IngestManager { int processed = 0; for (Content dataSource : dataSources) { if (isCancelled()) { - logger.log(Level.INFO, "Task scheduling thread cancelled"); return null; } - final String inputName = dataSource.getName(); IngestJob ingestJob = new IngestJob(IngestManager.this.getNextDataSourceTaskId(), dataSource, moduleTemplates, processUnallocatedSpace); @@ -521,7 +505,7 @@ public class IngestManager { failedModules.append(","); } } - MessageNotifyUtil.Message.error( + MessageNotifyUtil.Message.error( // RJCTODO: Fix this "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" + "No ingest modules will be run. Please disable the module " + "or fix the error and restart ingest by right clicking on " @@ -552,7 +536,6 @@ public class IngestManager { try { super.get(); } catch (CancellationException | InterruptedException ex) { - // IngestManager.stopAll() will dispose of all tasks. } catch (Exception ex) { logger.log(Level.SEVERE, "Error while scheduling ingest jobs", ex); MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); @@ -561,8 +544,13 @@ public class IngestManager { startAll(); } progress.finish(); + finished = true; } } + + boolean isFinished() { + return finished; + } } /** @@ -572,6 +560,7 @@ public class IngestManager { class DataSourceTaskWorker extends SwingWorker { private final long id; + private volatile boolean finished = false; DataSourceTaskWorker(long threadId) { this.id = threadId; @@ -579,18 +568,18 @@ public class IngestManager { @Override protected Void doInBackground() throws Exception { - logger.log(Level.INFO, "Data source ingest thread (id={0}) started", this.id); IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); while (scheduler.hasNext()) { if (isCancelled()) { - logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id); return null; } IngestJob job = scheduler.next(); DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); pipeline.process(this, job.getDataSourceTaskProgressBar()); + if (isCancelled()) { + return null; + } } - logger.log(Level.INFO, "Data source ingest thread (id={0}) completed", this.id); return null; } @@ -599,14 +588,18 @@ public class IngestManager { try { super.get(); } catch (CancellationException | InterruptedException e) { - logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id); } catch (Exception ex) { String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id); logger.log(Level.SEVERE, message, ex); } finally { IngestManager.getInstance().reportThreadDone(this.id); + finished = true; } } + + boolean isFinished() { + return finished; + } } /** @@ -616,6 +609,7 @@ public class IngestManager { class FileTaskWorker extends SwingWorker { private final long id; + private volatile boolean finished = false; FileTaskWorker(long threadId) { this.id = threadId; @@ -623,11 +617,9 @@ public class IngestManager { @Override protected Object doInBackground() throws Exception { - logger.log(Level.INFO, "File ingest thread (id={0}) started", this.id); IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); while (fileScheduler.hasNext()) { if (isCancelled()) { - logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id); return null; } IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); @@ -635,8 +627,10 @@ public class IngestManager { FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); job.handleFileTaskStarted(task); pipeline.process(task.getFile()); + if (isCancelled()) { + return null; + } } - logger.log(Level.INFO, "File ingest thread (id={0}) completed", this.id); return null; } @@ -645,13 +639,17 @@ public class IngestManager { try { super.get(); } catch (CancellationException | InterruptedException e) { - logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id); } catch (Exception ex) { String message = String.format("File ingest thread {0} experienced a fatal error", this.id); logger.log(Level.SEVERE, message, ex); } finally { IngestManager.getInstance().reportThreadDone(this.id); + finished = true; } } + + boolean isFinished() { + return finished; + } } } From e5e1dbe07508d40320360b2ece7f927c8586668e Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 7 Apr 2014 09:50:45 -0400 Subject: [PATCH 2/6] Experimented with ingest job cancellation improvements --- .../DataSourceIngestModuleStatusHelper.java | 9 +- .../ingest/DataSourceIngestPipeline.java | 4 +- .../sleuthkit/autopsy/ingest/IngestJob.java | 4 +- .../autopsy/ingest/IngestJobContext.java | 2 +- .../autopsy/ingest/IngestJobLauncher.java | 2 +- .../autopsy/ingest/IngestManager.java | 587 ++++++++---------- .../ingest/IngestMessageTopComponent.java | 2 +- .../autopsy/ingest/IngestMonitor.java | 6 +- 8 files changed, 260 insertions(+), 356 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java index 689d8e7652..4779f59c63 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java @@ -20,7 +20,6 @@ package org.sleuthkit.autopsy.ingest; import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; -import org.sleuthkit.datamodel.Content; /** * Used by data source ingest modules to report progress and detect data source @@ -28,15 +27,11 @@ import org.sleuthkit.datamodel.Content; */ public class DataSourceIngestModuleStatusHelper { - private final SwingWorker worker; private final ProgressHandle progress; - private final Content dataSource; private final String moduleDisplayName; - DataSourceIngestModuleStatusHelper(SwingWorker worker, ProgressHandle progress, Content dataSource, String moduleDisplayName) { - this.worker = worker; + DataSourceIngestModuleStatusHelper(ProgressHandle progress, String moduleDisplayName) { this.progress = progress; - this.dataSource = dataSource; this.moduleDisplayName = moduleDisplayName; } @@ -48,7 +43,7 @@ public class DataSourceIngestModuleStatusHelper { * @return True if the task has been canceled, false otherwise. */ public boolean isIngestJobCancelled() { - return worker.isCancelled(); + return (Thread.currentThread().isInterrupted()); // RJCTODO: This is not right? Appears to be right... } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index 3b919391bc..8a6f07786d 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -81,13 +81,13 @@ final class DataSourceIngestPipeline { return errors; } - List process(SwingWorker worker, ProgressHandle progress) { + List process(ProgressHandle progress) { List errors = new ArrayList<>(); Content dataSource = this.job.getDataSource(); logger.log(Level.INFO, "Processing data source {0}", dataSource.getName()); for (DataSourceIngestModuleDecorator module : this.modules) { try { - module.process(dataSource, new DataSourceIngestModuleStatusHelper(worker, progress, dataSource, module.getDisplayName())); + module.process(dataSource, new DataSourceIngestModuleStatusHelper(progress, module.getDisplayName())); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 0a65bafcbb..80f75818d6 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -85,7 +85,7 @@ final class IngestJob { "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().stopAll(); + IngestManager.getInstance().cancelIngestTasks(); return true; } }); @@ -104,7 +104,7 @@ final class IngestJob { NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().stopAll(); + IngestManager.getInstance().cancelIngestTasks(); return true; } }); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index 7d0301ce5c..dc9fb58e3f 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -60,7 +60,7 @@ public final class IngestJobContext { */ public void addFiles(List files) { for (AbstractFile file : files) { - IngestManager.getInstance().scheduleFile(ingestJob.getId(), file); + IngestManager.getInstance().addFileToIngestJob(ingestJob.getId(), file); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java index fbd9fb0db3..22125647d7 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java @@ -232,7 +232,7 @@ public final class IngestJobLauncher { } if ((!enabledModuleTemplates.isEmpty()) && (dataSources != null) && (!dataSources.isEmpty())) { - IngestManager.getInstance().scheduleDataSourceTasks(dataSources, enabledModuleTemplates, ingestConfigPanel.getProcessUnallocSpace()); + IngestManager.getInstance().startIngestJobs(dataSources, enabledModuleTemplates, ingestConfigPanel.getProcessUnallocSpace()); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 833575169a..44286f87dc 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -23,12 +23,14 @@ import java.beans.PropertyChangeSupport; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.CancellationException; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import org.openide.util.NbBundle; import org.sleuthkit.autopsy.coreutils.Logger; -import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandleFactory; import org.openide.util.Cancellable; @@ -49,18 +51,73 @@ public class IngestManager { private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2; private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); private static final PropertyChangeSupport pcs = new PropertyChangeSupport(IngestManager.class); + private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class); private static IngestManager instance; private final IngestScheduler scheduler = IngestScheduler.getInstance(); private final IngestMonitor ingestMonitor = new IngestMonitor(); - private final Preferences userPreferences = NbPreferences.forModule(this.getClass()); - private final HashMap ingestJobs = new HashMap<>(); - private TaskSchedulingWorker taskSchedulingWorker = null; - private DataSourceTaskWorker dataSourceTaskWorker = null; - private final List fileTaskWorkers = new ArrayList<>(); - private long nextDataSourceTaskId = 0; - private long nextThreadId = 0; + private ExecutorService startIngestJobsExecutor = null; + private ExecutorService dataSourceIngestTasksExecutor = null; + private ExecutorService fileIngestTasksExecutor = null; + private AtomicLong ingestJobId = new AtomicLong(0L); + private AtomicLong ingestTaskId = new AtomicLong(0L); + private final HashMap ingestJobs = new HashMap<>(); // Maps job ids to jobs + private final HashMap> ingestTasks = new HashMap<>(); // Maps task ids to Runnable tasks +// private TaskSchedulingWorker taskSchedulingWorker = null; private volatile IngestUI ingestMessageBox; + /** + * Gets the IngestManager singleton, creating it if necessary. + * + * @returns The IngestManager singleton. + */ + public synchronized static IngestManager getInstance() { + if (instance == null) { + instance = new IngestManager(); + } + return instance; + } + + private IngestManager() { + createThreadPools(); + } + + private synchronized void createThreadPools() { + if (startIngestJobsExecutor == null) { + startIngestJobsExecutor = Executors.newSingleThreadExecutor(); + } + if (dataSourceIngestTasksExecutor == null) { + dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); + } + if (fileIngestTasksExecutor == null) { + fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); + } + } + + public synchronized static int getNumberOfFileIngestThreads() { + return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); + } + + public static void setNumberOfFileIngestThreads(int numberOfThreads) { + if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS + || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { + numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; + } + + synchronized (IngestManager.class) { + userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); + } + } + + /** + * Finds the ingest messages in box TopComponent. Called by the custom + * installer for this package once the window system is initialized. + */ + void initIngestMessageInbox() { + if (this.ingestMessageBox == null) { + this.ingestMessageBox = IngestMessageTopComponent.findInstance(); + } + } + /** * Ingest events. */ @@ -110,65 +167,20 @@ public class IngestManager { FILE_DONE, }; - private IngestManager() { - } - /** - * Returns reference to singleton instance. - * - * @returns Instance of class. - */ - synchronized public static IngestManager getInstance() { - if (instance == null) { - instance = new IngestManager(); - } - return instance; - } - - /** - * called by Installer in AWT thread once the Window System is ready - */ - void initIngestMessageInbox() { - if (this.ingestMessageBox == null) { - this.ingestMessageBox = IngestMessageTopComponent.findInstance(); - } - } - - synchronized private long getNextDataSourceTaskId() { - return ++this.nextDataSourceTaskId; - } - - synchronized private long getNextThreadId() { - return ++this.nextThreadId; - } - - public synchronized int getNumberOfFileIngestThreads() { - return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); - } - - public synchronized void setNumberOfFileIngestThreads(int numberOfThreads) { - if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS - || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { - numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; - } - userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); - } - - /** - * Add property change listener to listen to ingest events as defined in - * IngestModuleEvent. + * Add property change listener to listen to ingest events. * * @param listener PropertyChangeListener to register */ - public static synchronized void addPropertyChangeListener(final PropertyChangeListener listener) { + synchronized public static void addPropertyChangeListener(final PropertyChangeListener listener) { pcs.addPropertyChangeListener(listener); } - public static synchronized void removePropertyChangeListener(final PropertyChangeListener listener) { + synchronized public static void removePropertyChangeListener(final PropertyChangeListener listener) { pcs.removePropertyChangeListener(listener); } - static synchronized void fireModuleEvent(String eventType, String moduleName) { + synchronized static void fireModuleEvent(String eventType, String moduleName) { try { pcs.firePropertyChange(eventType, moduleName, null); } catch (Exception e) { @@ -184,7 +196,7 @@ public class IngestManager { * * @param objId ID of file that is done */ - static synchronized void fireFileDone(long objId) { + synchronized static void fireFileDone(long objId) { try { pcs.firePropertyChange(IngestEvent.FILE_DONE.toString(), objId, null); } catch (Exception e) { @@ -201,7 +213,7 @@ public class IngestManager { * * @param moduleDataEvent */ - static synchronized void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { + synchronized static void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { try { pcs.firePropertyChange(IngestEvent.DATA.toString(), moduleDataEvent, null); } catch (Exception e) { @@ -218,7 +230,7 @@ public class IngestManager { * * @param moduleContentEvent */ - static synchronized void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { + synchronized static void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { try { pcs.firePropertyChange(IngestEvent.CONTENT_CHANGED.toString(), moduleContentEvent, null); } catch (Exception e) { @@ -229,126 +241,63 @@ public class IngestManager { } } - /** - * Multiple data-sources version of scheduleDataSource() method. Enqueues - * multiple sources inputs (Content objects) and associated modules at once - * - * @param modules modules to scheduleDataSource on every data source - * @param inputs input data sources to enqueue and scheduleDataSource the - * ingest modules on - */ - void scheduleDataSourceTasks(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { + synchronized void startIngestJob(final Content dataSource, final List moduleTemplates, boolean processUnallocatedSpace) { + List dataSources = new ArrayList<>(); + dataSources.add(dataSource); + startIngestJobs(dataSources, moduleTemplates, processUnallocatedSpace); + } + + synchronized void startIngestJobs(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { if (!isIngestRunning() && ingestMessageBox != null) { ingestMessageBox.clearMessages(); } - taskSchedulingWorker = new TaskSchedulingWorker(dataSources, moduleTemplates, processUnallocatedSpace); - taskSchedulingWorker.execute(); + createThreadPools(); + + long taskId = ingestTaskId.incrementAndGet(); + Future dataSourceIngestTask = startIngestJobsExecutor.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); + ingestTasks.put(taskId, dataSourceIngestTask); if (ingestMessageBox != null) { ingestMessageBox.restoreMessages(); } } - /** - * IngestManager entry point, enqueues data to be processed and starts new - * ingest as needed, or just enqueues data to an existing pipeline. - * - * Spawns background thread which enumerates all sorted files and executes - * chosen modules per file in a pre-determined order. Notifies modules when - * work is complete or should be interrupted using complete() and stop() - * calls. Does not block and can be called multiple times to enqueue more - * work to already running background ingest process. - * - * @param modules modules to scheduleDataSource on the data source input - * @param input input data source Content objects to scheduleDataSource the - * ingest modules on - */ - void scheduleDataSourceTask(final Content dataSource, final List moduleTemplates, boolean processUnallocatedSpace) { - List dataSources = new ArrayList<>(); - dataSources.add(dataSource); - scheduleDataSourceTasks(dataSources, moduleTemplates, processUnallocatedSpace); - } - - /** - * Schedule a file for ingest and add it to ongoing file ingest process on - * the same data source. Scheduler updates the current progress. - * - * The file to be added is usually a product of a currently ran ingest. Now - * we want to process this new file with the same ingest context. - * - * @param file file to be scheduled - * @param pipelineContext ingest context used to ingest parent of the file - * to be scheduled - */ - void scheduleFile(long ingestJobId, AbstractFile file) { + synchronized void addFileToIngestJob(long ingestJobId, AbstractFile file) { IngestJob job = this.ingestJobs.get(ingestJobId); - if (job == null) { + if (job != null) { + scheduler.getFileScheduler().scheduleFile(job, file); + } else { logger.log(Level.SEVERE, "Unable to map ingest job id (id = {0}) to an ingest job, failed to schedule file (id = {1})", new Object[]{ingestJobId, file.getId()}); MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), "Unable to associate " + file.getName() + " with ingest job, file will not be processed by ingest nodules", MessageNotifyUtil.MessageType.ERROR); } - - scheduler.getFileScheduler().scheduleFile(job, file); } - private synchronized void startAll() { - // Make sure the ingest monitor is running. + private synchronized void startIngestTasks() { if (!ingestMonitor.isRunning()) { ingestMonitor.start(); } + + long taskId = ingestTaskId.incrementAndGet(); + Future dataSourceIngestTask = dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, dataSourceIngestTask); - // Make sure a data source task worker is running. - // TODO: There is a race condition here with SwingWorker.isDone(). - // The highly unlikely chance that no data source task worker will - // run for this job needs to be addressed. Fix by using a thread pool - // and converting the SwingWorkers to Runnables. - if (dataSourceTaskWorker == null || dataSourceTaskWorker.isDone()) { - dataSourceTaskWorker = new DataSourceTaskWorker(getNextThreadId()); - dataSourceTaskWorker.execute(); - } - - // Make sure the requested number of file task workers are running. - // TODO: There is a race condition here with SwingWorker.isDone(). - // The highly unlikely chance that no file task workers or the wrong - // number of file task workers will run for this job needs to be - // addressed. Fix by using a thread pool and converting the SwingWorkers - // to Runnables. int workersRequested = getNumberOfFileIngestThreads(); - int workersRunning = 0; - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null) { - if (worker.isDone()) { - if (workersRunning < workersRequested) { - worker = new FileTaskWorker(getNextThreadId()); - worker.execute(); - ++workersRunning; - } else { - worker = null; - } - } else { - ++workersRunning; - } - } else if (workersRunning < workersRequested) { - worker = new FileTaskWorker(getNextThreadId()); - worker.execute(); - ++workersRunning; - } - } - while (workersRunning < workersRequested - && fileTaskWorkers.size() < MAX_NUMBER_OF_FILE_INGEST_THREADS) { - FileTaskWorker worker = new FileTaskWorker(getNextThreadId()); - fileTaskWorkers.add(worker); - worker.execute(); - ++workersRunning; + for (int i = 0; i < workersRequested; ++i) { + taskId = ingestTaskId.incrementAndGet(); + Future fileIngestTask = fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, fileIngestTask); } } - synchronized void reportThreadDone(long threadId) { + synchronized void reportIngestTaskDone(long taskId) { + ingestTasks.remove(taskId); + List completedJobs = new ArrayList<>(); for (IngestJob job : ingestJobs.values()) { - job.releaseIngestPipelinesForThread(threadId); + job.releaseIngestPipelinesForThread(taskId); if (job.areIngestPipelinesShutDown()) { completedJobs.add(job.getId()); } @@ -359,12 +308,11 @@ public class IngestManager { } } - synchronized void stopAll() { + synchronized void cancelIngestTasks() { +// synchronized void cancelIngestTasks(int waitTime, TimeUnit unit) { // RJCTODO // First get the task scheduling worker to stop adding new tasks. - if (taskSchedulingWorker != null) { - taskSchedulingWorker.cancel(true); - taskSchedulingWorker = null; - } +// boolean res = shutDownThreadPool(startIngestJobsExecutor, 1, TimeUnit.SECONDS); +// startIngestJobsExecutor = null; // Now mark all of the ingest jobs as cancelled. This way the ingest // modules will know they are being shut down due to cancellation when @@ -373,59 +321,58 @@ public class IngestManager { job.cancel(); } - // Jettision the remaining tasks. This will dispose of any tasks that + // Jettision the remaining data tasks. This will dispose of any tasks that // the scheduling worker queued up before it was cancelled. - scheduler.getFileScheduler().empty(); - scheduler.getDataSourceScheduler().empty(); - - // Cancel the data source task worker. It will release its pipelines - // in its done() method and the pipelines will shut down their modules. - if (dataSourceTaskWorker != null) { - dataSourceTaskWorker.cancel(true); - dataSourceTaskWorker = null; - } - - // Cancel the file task workers. They will release their pipelines - // in their done() methods and the pipelines will shut down their - // modules. - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null) { - worker.cancel(true); - worker = null; - } +// scheduler.getFileScheduler().empty(); +// scheduler.getDataSourceScheduler().empty(); + + // Cancel all of the ingest module running tasks. + for (Future task : ingestTasks.values()) { + task.cancel(true); } +// res = shutDownThreadPool(dataSourceIngestTasksExecutor, 30, TimeUnit.SECONDS); +// dataSourceIngestTasksExecutor = null; +// res = shutDownThreadPool(fileIngestTasksExecutor, 30, TimeUnit.SECONDS); +// fileIngestTasksExecutor = null; + // Jettision the remaining tasks again to try to dispose of any tasks // queued up task workers before they were cancelled. - scheduler.getFileScheduler().empty(); - scheduler.getDataSourceScheduler().empty(); +// scheduler.getFileScheduler().empty(); +// scheduler.getDataSourceScheduler().empty(); } + // The following method implementation is adapted from: + // http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit) + private boolean shutDownThreadPool(ExecutorService pool, int waitTime, TimeUnit unit) { + boolean succeeded = true; + // Prevent submission of new tasks. + pool.shutdown(); + try { + // Wait a while for existing tasks to terminate. + if (!pool.awaitTermination(1, TimeUnit.SECONDS)) { + // Cancel currently executing tasks. + pool.shutdownNow(); + if (!pool.awaitTermination(waitTime, unit)) { + succeeded = false; + } + } + } catch (InterruptedException ex) { + // (Re-)Cancel if current thread also interrupted. + pool.shutdownNow(); + // Preserve interrupt status. + Thread.currentThread().interrupt(); + } + return succeeded; + } + /** * Test if any ingest jobs are in progress. * * @return True if any ingest jobs are in progress, false otherwise */ public synchronized boolean isIngestRunning() { - // TODO: There is a race condition here with SwingWorker.isDone(). - // It probably needs to be addressed at a later date. If we replace the - // SwingWorkers with a thread pool and Runnables, one solution would be - // to check the ingest jobs list. - if (taskSchedulingWorker != null && !taskSchedulingWorker.isDone()) { - return true; - } - - if (dataSourceTaskWorker != null && !dataSourceTaskWorker.isDone()) { - return true; - } - - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null && !worker.isDone()) { - return true; - } - } - - return false; + return (ingestJobs.isEmpty() == false); } /** @@ -456,95 +403,89 @@ public class IngestManager { } } - private class TaskSchedulingWorker extends SwingWorker { + private class StartIngestJobsTask implements Runnable { + private final long id; private final List dataSources; private final List moduleTemplates; private final boolean processUnallocatedSpace; private ProgressHandle progress; private volatile boolean finished = false; - TaskSchedulingWorker(List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + StartIngestJobsTask(long taskId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + this.id = taskId; this.dataSources = dataSources; this.moduleTemplates = moduleTemplates; this.processUnallocatedSpace = processUnallocatedSpace; } @Override - protected Object doInBackground() throws Exception { - final String displayName = "Queueing ingest tasks"; - progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { - @Override - public boolean cancel() { - if (progress != null) { - progress.setDisplayName(displayName + " (Cancelling...)"); - } - IngestManager.getInstance().stopAll(); - return true; - } - }); - - progress.start(2 * dataSources.size()); - int processed = 0; - for (Content dataSource : dataSources) { - if (isCancelled()) { - return null; - } - final String inputName = dataSource.getName(); - IngestJob ingestJob = new IngestJob(IngestManager.this.getNextDataSourceTaskId(), dataSource, moduleTemplates, processUnallocatedSpace); - - List errors = ingestJob.startUpIngestPipelines(); - if (!errors.isEmpty()) { - StringBuilder failedModules = new StringBuilder(); - for (int i = 0; i < errors.size(); ++i) { - IngestModuleError error = errors.get(i); - String moduleName = error.getModuleDisplayName(); - logger.log(Level.SEVERE, "The " + moduleName + " module failed to start up", error.getModuleError()); - failedModules.append(moduleName); - if ((errors.size() > 1) && (i != (errors.size() - 1))) { - failedModules.append(","); + public void run() { + try { + final String displayName = "Queueing ingest tasks"; + progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + @Override + public boolean cancel() { + if (progress != null) { + progress.setDisplayName(displayName + " (Cancelling...)"); } + IngestManager.getInstance().cancelIngestTasks(); + return true; } - MessageNotifyUtil.Message.error( // RJCTODO: Fix this - "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" - + "No ingest modules will be run. Please disable the module " - + "or fix the error and restart ingest by right clicking on " - + "the data source and selecting Run Ingest Modules.\n\n" - + "Error: " + errors.get(0).getModuleError().getMessage()); - return null; + }); + + progress.start(2 * dataSources.size()); + int processed = 0; + for (Content dataSource : dataSources) { + if (Thread.currentThread().isInterrupted()) { + break; + } + + final String inputName = dataSource.getName(); + IngestJob ingestJob = new IngestJob(IngestManager.this.ingestJobId.incrementAndGet(), dataSource, moduleTemplates, processUnallocatedSpace); + List errors = ingestJob.startUpIngestPipelines(); + if (!errors.isEmpty()) { + StringBuilder failedModules = new StringBuilder(); + for (int i = 0; i < errors.size(); ++i) { + IngestModuleError error = errors.get(i); + String moduleName = error.getModuleDisplayName(); + logger.log(Level.SEVERE, "The " + moduleName + " module failed to start up", error.getModuleError()); + failedModules.append(moduleName); + if ((errors.size() > 1) && (i != (errors.size() - 1))) { + failedModules.append(","); + } + } + MessageNotifyUtil.Message.error( // RJCTODO: Fix this to show all errors + "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" + + "No ingest modules will be run. Please disable the module " + + "or fix the error and restart ingest by right clicking on " + + "the data source and selecting Run Ingest Modules.\n\n" + + "Error: " + errors.get(0).getModuleError().getMessage()); + break; + } + + // Save the ingest job for later cleanup of pipelines. + ingestJobs.put(ingestJob.getId(), ingestJob); + + // Queue the data source ingest tasks for the ingest job. + progress.progress("DataSource Ingest" + " " + inputName, processed); + scheduler.getDataSourceScheduler().schedule(ingestJob); + progress.progress("DataSource Ingest" + " " + inputName, ++processed); + + // Queue the file ingest tasks for the ingest job. + progress.progress("File Ingest" + " " + inputName, processed); + scheduler.getFileScheduler().scheduleIngestOfFiles(ingestJob); + progress.progress("File Ingest" + " " + inputName, ++processed); } - // Save the ingest job for later cleanup of pipelines. - ingestJobs.put(ingestJob.getId(), ingestJob); - - // Queue the data source ingest tasks for the ingest job. - progress.progress("DataSource Ingest" + " " + inputName, processed); - scheduler.getDataSourceScheduler().schedule(ingestJob); - progress.progress("DataSource Ingest" + " " + inputName, ++processed); - - // Queue the file ingest tasks for the ingest job. - progress.progress("File Ingest" + " " + inputName, processed); - scheduler.getFileScheduler().scheduleIngestOfFiles(ingestJob); - progress.progress("File Ingest" + " " + inputName, ++processed); - } - - return null; - } - - @Override - protected void done() { - try { - super.get(); - } catch (CancellationException | InterruptedException ex) { + if (!Thread.currentThread().isInterrupted()) { + startIngestTasks(); + } } catch (Exception ex) { - logger.log(Level.SEVERE, "Error while scheduling ingest jobs", ex); - MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); + // RJCTODO: } finally { - if (!isCancelled()) { - startAll(); - } + // RJCTODO: Release progress.finish(); - finished = true; } } @@ -553,103 +494,71 @@ public class IngestManager { } } - /** - * Performs data source ingest tasks for one or more ingest jobs on a worker - * thread. - */ - class DataSourceTaskWorker extends SwingWorker { + private class RunDataSourceIngestModulesTask implements Runnable { private final long id; - private volatile boolean finished = false; - DataSourceTaskWorker(long threadId) { + RunDataSourceIngestModulesTask(long threadId) { this.id = threadId; } @Override - protected Void doInBackground() throws Exception { - IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); - while (scheduler.hasNext()) { - if (isCancelled()) { - return null; - } - IngestJob job = scheduler.next(); - DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); - pipeline.process(this, job.getDataSourceTaskProgressBar()); - if (isCancelled()) { - return null; - } - } - return null; - } - - @Override - protected void done() { + public void run() { try { - super.get(); - } catch (CancellationException | InterruptedException e) { + IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); + while (scheduler.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + IngestJob job = scheduler.next(); + DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); + pipeline.process(job.getDataSourceTaskProgressBar()); + } } catch (Exception ex) { - String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id); + String message = String.format("Data source ingest thread (id=%d) caught exception", this.id); // RJCTODO logger.log(Level.SEVERE, message, ex); } finally { - IngestManager.getInstance().reportThreadDone(this.id); - finished = true; + IngestManager.getInstance().reportIngestTaskDone(this.id); } } - - boolean isFinished() { - return finished; - } } - /** - * Performs file ingest tasks for one or more ingest jobs on a worker - * thread. - */ - class FileTaskWorker extends SwingWorker { + private class RunFileSourceIngestModulesTask implements Runnable { private final long id; - private volatile boolean finished = false; - FileTaskWorker(long threadId) { - this.id = threadId; + RunFileSourceIngestModulesTask(long taskId) { + this.id = taskId; } @Override - protected Object doInBackground() throws Exception { - IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); - while (fileScheduler.hasNext()) { - if (isCancelled()) { - return null; - } - IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); - IngestJob job = task.getJob(); - FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); - job.handleFileTaskStarted(task); - pipeline.process(task.getFile()); - if (isCancelled()) { - return null; - } - } - return null; - } - - @Override - protected void done() { + public void run() { try { - super.get(); - } catch (CancellationException | InterruptedException e) { + IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); + while (fileScheduler.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); + IngestJob job = task.getJob(); + FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); + job.handleFileTaskStarted(task); + pipeline.process(task.getFile()); + } } catch (Exception ex) { - String message = String.format("File ingest thread {0} experienced a fatal error", this.id); + String message = String.format("Data source ingest thread (id=%d) caught exception", this.id); // RJCTODO logger.log(Level.SEVERE, message, ex); } finally { - IngestManager.getInstance().reportThreadDone(this.id); - finished = true; + IngestManager.getInstance().reportIngestTaskDone(this.id); } } - - boolean isFinished() { - return finished; - } } + + class IngestCancellationTask implements Runnable { + + @Override + public void run() { + // RJCTODO: Run + } + } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java index d8971c476e..f05de82635 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java @@ -224,7 +224,7 @@ import org.sleuthkit.datamodel.Content; manager = IngestManager.getInstance(); } try { - manager.stopAll(); + manager.cancelIngestTasks(); } finally { //clear inbox clearMessages(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java index dde9704379..3ec81082dd 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java @@ -163,9 +163,9 @@ public final class IngestMonitor { if (checkDiskSpace() == false) { //stop ingest if running final String diskPath = root.getAbsolutePath(); - MONITOR_LOGGER.log(Level.SEVERE, "Stopping ingest due to low disk space on disk " + diskPath); - logger.log(Level.SEVERE, "Stopping ingest due to low disk space on disk " + diskPath); - manager.stopAll(); + MONITOR_LOGGER.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); + logger.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); + manager.cancelIngestTasks(); IngestServices.getInstance().postMessage(IngestMessage.createManagerErrorMessage( NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.title", diskPath), NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.msg", diskPath))); From f7f0c46697d5f5036eb9f66218e1166839e1b573 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 7 Apr 2014 21:20:22 -0400 Subject: [PATCH 3/6] Thread pool shutdown variation --- .../sleuthkit/autopsy/ingest/IngestJob.java | 4 +- .../autopsy/ingest/IngestManager.java | 388 +++++++++--------- .../ingest/IngestMessageTopComponent.java | 2 +- .../autopsy/ingest/IngestMonitor.java | 2 +- 4 files changed, 204 insertions(+), 192 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 80f75818d6..57e0f4e16a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -85,7 +85,7 @@ final class IngestJob { "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().cancelIngestTasks(); + IngestManager.getInstance().cancelIngestJobs(); return true; } }); @@ -104,7 +104,7 @@ final class IngestJob { NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().cancelIngestTasks(); + IngestManager.getInstance().cancelIngestJobs(); return true; } }); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 44286f87dc..5f1e746675 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; @@ -39,6 +38,7 @@ import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.Content; import java.util.prefs.Preferences; +import javax.swing.SwingWorker; /** * Manages the execution of ingest jobs. @@ -53,6 +53,7 @@ public class IngestManager { private static final PropertyChangeSupport pcs = new PropertyChangeSupport(IngestManager.class); private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class); private static IngestManager instance; + private final IngestJobsManager jobsManager = new IngestJobsManager(); private final IngestScheduler scheduler = IngestScheduler.getInstance(); private final IngestMonitor ingestMonitor = new IngestMonitor(); private ExecutorService startIngestJobsExecutor = null; @@ -60,9 +61,6 @@ public class IngestManager { private ExecutorService fileIngestTasksExecutor = null; private AtomicLong ingestJobId = new AtomicLong(0L); private AtomicLong ingestTaskId = new AtomicLong(0L); - private final HashMap ingestJobs = new HashMap<>(); // Maps job ids to jobs - private final HashMap> ingestTasks = new HashMap<>(); // Maps task ids to Runnable tasks -// private TaskSchedulingWorker taskSchedulingWorker = null; private volatile IngestUI ingestMessageBox; /** @@ -81,33 +79,6 @@ public class IngestManager { createThreadPools(); } - private synchronized void createThreadPools() { - if (startIngestJobsExecutor == null) { - startIngestJobsExecutor = Executors.newSingleThreadExecutor(); - } - if (dataSourceIngestTasksExecutor == null) { - dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); - } - if (fileIngestTasksExecutor == null) { - fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); - } - } - - public synchronized static int getNumberOfFileIngestThreads() { - return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); - } - - public static void setNumberOfFileIngestThreads(int numberOfThreads) { - if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS - || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { - numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; - } - - synchronized (IngestManager.class) { - userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); - } - } - /** * Finds the ingest messages in box TopComponent. Called by the custom * installer for this package once the window system is initialized. @@ -118,6 +89,53 @@ public class IngestManager { } } + public synchronized static int getNumberOfFileIngestThreads() { + return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); + } + + public synchronized static void setNumberOfFileIngestThreads(int numberOfThreads) { + if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS + || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { + numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; + } + + userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); + } + + synchronized void startIngestJobs(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { + if (!isIngestRunning() && ingestMessageBox != null) { + ingestMessageBox.clearMessages(); + } + + createThreadPools(); + startIngestJobsExecutor.submit(new StartIngestJobsTask(ingestTaskId.incrementAndGet(), dataSources, moduleTemplates, processUnallocatedSpace)); + + if (ingestMessageBox != null) { + ingestMessageBox.restoreMessages(); + } + } + + /** + * Test if any ingest jobs are in progress. + * + * @return True if any ingest jobs are in progress, false otherwise + */ + public boolean isIngestRunning() { + return jobsManager.hasJobs(); + } + + synchronized void addFileToIngestJob(long ingestJobId, AbstractFile file) { + if (!jobsManager.addFileToIngestJob(ingestJobId, file)) { + MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), + "Unable to associate " + file.getName() + " with ingest job, file will not be processed by ingest nodules", + MessageNotifyUtil.MessageType.ERROR); + } + } + + synchronized void cancelIngestJobs() { + new IngestCancellationWorker(1, TimeUnit.SECONDS).execute(); + } + /** * Ingest events. */ @@ -172,15 +190,15 @@ public class IngestManager { * * @param listener PropertyChangeListener to register */ - synchronized public static void addPropertyChangeListener(final PropertyChangeListener listener) { + public static void addPropertyChangeListener(final PropertyChangeListener listener) { pcs.addPropertyChangeListener(listener); } - synchronized public static void removePropertyChangeListener(final PropertyChangeListener listener) { + public static void removePropertyChangeListener(final PropertyChangeListener listener) { pcs.removePropertyChangeListener(listener); } - synchronized static void fireModuleEvent(String eventType, String moduleName) { + static void fireModuleEvent(String eventType, String moduleName) { try { pcs.firePropertyChange(eventType, moduleName, null); } catch (Exception e) { @@ -196,7 +214,7 @@ public class IngestManager { * * @param objId ID of file that is done */ - synchronized static void fireFileDone(long objId) { + static void fireFileDone(long objId) { try { pcs.firePropertyChange(IngestEvent.FILE_DONE.toString(), objId, null); } catch (Exception e) { @@ -213,7 +231,7 @@ public class IngestManager { * * @param moduleDataEvent */ - synchronized static void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { + static void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { try { pcs.firePropertyChange(IngestEvent.DATA.toString(), moduleDataEvent, null); } catch (Exception e) { @@ -230,7 +248,7 @@ public class IngestManager { * * @param moduleContentEvent */ - synchronized static void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { + static void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { try { pcs.firePropertyChange(IngestEvent.CONTENT_CHANGED.toString(), moduleContentEvent, null); } catch (Exception e) { @@ -241,140 +259,6 @@ public class IngestManager { } } - synchronized void startIngestJob(final Content dataSource, final List moduleTemplates, boolean processUnallocatedSpace) { - List dataSources = new ArrayList<>(); - dataSources.add(dataSource); - startIngestJobs(dataSources, moduleTemplates, processUnallocatedSpace); - } - - synchronized void startIngestJobs(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { - if (!isIngestRunning() && ingestMessageBox != null) { - ingestMessageBox.clearMessages(); - } - - createThreadPools(); - - long taskId = ingestTaskId.incrementAndGet(); - Future dataSourceIngestTask = startIngestJobsExecutor.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); - ingestTasks.put(taskId, dataSourceIngestTask); - - if (ingestMessageBox != null) { - ingestMessageBox.restoreMessages(); - } - } - - synchronized void addFileToIngestJob(long ingestJobId, AbstractFile file) { - IngestJob job = this.ingestJobs.get(ingestJobId); - if (job != null) { - scheduler.getFileScheduler().scheduleFile(job, file); - } else { - logger.log(Level.SEVERE, "Unable to map ingest job id (id = {0}) to an ingest job, failed to schedule file (id = {1})", new Object[]{ingestJobId, file.getId()}); - MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), - "Unable to associate " + file.getName() + " with ingest job, file will not be processed by ingest nodules", - MessageNotifyUtil.MessageType.ERROR); - } - } - - private synchronized void startIngestTasks() { - if (!ingestMonitor.isRunning()) { - ingestMonitor.start(); - } - - long taskId = ingestTaskId.incrementAndGet(); - Future dataSourceIngestTask = dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(taskId)); - ingestTasks.put(taskId, dataSourceIngestTask); - - int workersRequested = getNumberOfFileIngestThreads(); - for (int i = 0; i < workersRequested; ++i) { - taskId = ingestTaskId.incrementAndGet(); - Future fileIngestTask = fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(taskId)); - ingestTasks.put(taskId, fileIngestTask); - } - } - - synchronized void reportIngestTaskDone(long taskId) { - ingestTasks.remove(taskId); - - List completedJobs = new ArrayList<>(); - for (IngestJob job : ingestJobs.values()) { - job.releaseIngestPipelinesForThread(taskId); - if (job.areIngestPipelinesShutDown()) { - completedJobs.add(job.getId()); - } - } - - for (Long jobId : completedJobs) { - ingestJobs.remove(jobId); - } - } - - synchronized void cancelIngestTasks() { -// synchronized void cancelIngestTasks(int waitTime, TimeUnit unit) { // RJCTODO - // First get the task scheduling worker to stop adding new tasks. -// boolean res = shutDownThreadPool(startIngestJobsExecutor, 1, TimeUnit.SECONDS); -// startIngestJobsExecutor = null; - - // Now mark all of the ingest jobs as cancelled. This way the ingest - // modules will know they are being shut down due to cancellation when - // the cancelled ingest workers release their pipelines. - for (IngestJob job : ingestJobs.values()) { - job.cancel(); - } - - // Jettision the remaining data tasks. This will dispose of any tasks that - // the scheduling worker queued up before it was cancelled. -// scheduler.getFileScheduler().empty(); -// scheduler.getDataSourceScheduler().empty(); - - // Cancel all of the ingest module running tasks. - for (Future task : ingestTasks.values()) { - task.cancel(true); - } - -// res = shutDownThreadPool(dataSourceIngestTasksExecutor, 30, TimeUnit.SECONDS); -// dataSourceIngestTasksExecutor = null; -// res = shutDownThreadPool(fileIngestTasksExecutor, 30, TimeUnit.SECONDS); -// fileIngestTasksExecutor = null; - - // Jettision the remaining tasks again to try to dispose of any tasks - // queued up task workers before they were cancelled. -// scheduler.getFileScheduler().empty(); -// scheduler.getDataSourceScheduler().empty(); - } - - // The following method implementation is adapted from: - // http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit) - private boolean shutDownThreadPool(ExecutorService pool, int waitTime, TimeUnit unit) { - boolean succeeded = true; - // Prevent submission of new tasks. - pool.shutdown(); - try { - // Wait a while for existing tasks to terminate. - if (!pool.awaitTermination(1, TimeUnit.SECONDS)) { - // Cancel currently executing tasks. - pool.shutdownNow(); - if (!pool.awaitTermination(waitTime, unit)) { - succeeded = false; - } - } - } catch (InterruptedException ex) { - // (Re-)Cancel if current thread also interrupted. - pool.shutdownNow(); - // Preserve interrupt status. - Thread.currentThread().interrupt(); - } - return succeeded; - } - - /** - * Test if any ingest jobs are in progress. - * - * @return True if any ingest jobs are in progress, false otherwise - */ - public synchronized boolean isIngestRunning() { - return (ingestJobs.isEmpty() == false); - } - /** * Module publishes message using InegestManager handle Does not block. The * message gets enqueued in the GUI thread and displayed in a widget @@ -403,6 +287,103 @@ public class IngestManager { } } + private synchronized void startIngestTasks() { + if (!ingestMonitor.isRunning()) { + ingestMonitor.start(); + } + + dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(ingestTaskId.incrementAndGet())); + + int numberOfFileTasksRequested = getNumberOfFileIngestThreads(); + for (int i = 0; i < numberOfFileTasksRequested; ++i) { + fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(ingestTaskId.incrementAndGet())); + } + } + + private synchronized void createThreadPools() { + if (startIngestJobsExecutor == null) { + startIngestJobsExecutor = Executors.newSingleThreadExecutor(); + } + if (dataSourceIngestTasksExecutor == null) { + dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); + } + if (fileIngestTasksExecutor == null) { + fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); + } + } + + private synchronized boolean shutDownThreadPools(int timeOut, TimeUnit timeOutUnits) { + boolean success = true; + if (!shutDownThreadPool(startIngestJobsExecutor, timeOut, timeOutUnits)) { + success = false; + } + if (!shutDownThreadPool(dataSourceIngestTasksExecutor, timeOut, timeOutUnits)) { + success = false; + } + if (!shutDownThreadPool(fileIngestTasksExecutor, timeOut, timeOutUnits)) { + success = false; + } + startIngestJobsExecutor = null; + dataSourceIngestTasksExecutor = null; + fileIngestTasksExecutor = null; + return success; + } + + private boolean shutDownThreadPool(ExecutorService pool, int waitTime, TimeUnit unit) { + try { + pool.shutdownNow(); + return pool.awaitTermination(waitTime, unit); + } catch (InterruptedException ex) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); // Preserve interrupted status. + return false; + } + } + + private class IngestJobsManager { + + private final HashMap jobs = new HashMap<>(); // Maps job ids to jobs + + synchronized void addJob(IngestJob job) { + jobs.put(job.getId(), job); + } + + synchronized boolean hasJobs() { + return (jobs.isEmpty() == false); + } + + synchronized boolean addFileToIngestJob(long ingestJobId, AbstractFile file) { + IngestJob job = jobs.get(ingestJobId); + if (job != null) { + scheduler.getFileScheduler().scheduleFile(job, file); + return true; + } else { + logger.log(Level.SEVERE, "Unable to map ingest job id (id = {0}) to an ingest job, failed to add file (id = {1})", new Object[]{ingestJobId, file.getId()}); + return false; + } + } + + synchronized void cancelJobs() { + for (IngestJob job : jobs.values()) { + job.cancel(); + } + } + + synchronized void reportIngestTaskDone(long taskId) { + List completedJobs = new ArrayList<>(); + for (IngestJob job : jobs.values()) { + job.releaseIngestPipelinesForThread(taskId); + if (job.areIngestPipelinesShutDown() == true) { + completedJobs.add(job.getId()); + } + } + + for (Long jobId : completedJobs) { + jobs.remove(jobId); + } + } + } + private class StartIngestJobsTask implements Runnable { private final long id; @@ -429,7 +410,7 @@ public class IngestManager { if (progress != null) { progress.setDisplayName(displayName + " (Cancelling...)"); } - IngestManager.getInstance().cancelIngestTasks(); + IngestManager.getInstance().cancelIngestJobs(); return true; } }); @@ -465,7 +446,7 @@ public class IngestManager { } // Save the ingest job for later cleanup of pipelines. - ingestJobs.put(ingestJob.getId(), ingestJob); + jobsManager.addJob(ingestJob); // Queue the data source ingest tasks for the ingest job. progress.progress("DataSource Ingest" + " " + inputName, processed); @@ -482,9 +463,9 @@ public class IngestManager { startIngestTasks(); } } catch (Exception ex) { - // RJCTODO: + String message = String.format("StartIngestJobsTask (id=%d) caught exception", id); + logger.log(Level.SEVERE, message, ex); } finally { - // RJCTODO: Release progress.finish(); } } @@ -498,8 +479,8 @@ public class IngestManager { private final long id; - RunDataSourceIngestModulesTask(long threadId) { - this.id = threadId; + RunDataSourceIngestModulesTask(long taskId) { + id = taskId; } @Override @@ -511,14 +492,14 @@ public class IngestManager { break; } IngestJob job = scheduler.next(); - DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); + DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(id); pipeline.process(job.getDataSourceTaskProgressBar()); } } catch (Exception ex) { - String message = String.format("Data source ingest thread (id=%d) caught exception", this.id); // RJCTODO + String message = String.format("RunDataSourceIngestModulesTask (id=%d) caught exception", id); logger.log(Level.SEVERE, message, ex); } finally { - IngestManager.getInstance().reportIngestTaskDone(this.id); + jobsManager.reportIngestTaskDone(id); } } } @@ -528,7 +509,7 @@ public class IngestManager { private final long id; RunFileSourceIngestModulesTask(long taskId) { - this.id = taskId; + id = taskId; } @Override @@ -541,24 +522,55 @@ public class IngestManager { } IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); IngestJob job = task.getJob(); - FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); + FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(id); job.handleFileTaskStarted(task); pipeline.process(task.getFile()); } } catch (Exception ex) { - String message = String.format("Data source ingest thread (id=%d) caught exception", this.id); // RJCTODO + String message = String.format("RunFileSourceIngestModulesTask (id=%d) caught exception", id); logger.log(Level.SEVERE, message, ex); } finally { - IngestManager.getInstance().reportIngestTaskDone(this.id); + jobsManager.reportIngestTaskDone(id); } } } - class IngestCancellationTask implements Runnable { + class IngestCancellationWorker extends SwingWorker { + + private final int timeOut; + private final TimeUnit timeOutUnits; + + IngestCancellationWorker(int timeOut, TimeUnit timeOutUnits) { + this.timeOut = timeOut; + this.timeOutUnits = timeOutUnits; + } @Override - public void run() { - // RJCTODO: Run + protected Boolean doInBackground() throws Exception { + // First mark all of the ingest jobs as cancelled. This way the + // ingest modules will know they are being shut down due to + // cancellation when the cancelled run ingest module tasks release + // their pipelines. This also makes sure the lock on the jobs + // manager is released before the run ingest module tasks start + // releasing pipelines. + jobsManager.cancelJobs(); + + // Jettision the remaining data source and file ingest tasks. This + // will could break the the run ingest module tasks out of their + // loops even before the pools mark their threads as interrupted. + scheduler.getFileScheduler().empty(); + scheduler.getDataSourceScheduler().empty(); + + boolean success = shutDownThreadPools(timeOut, timeOutUnits); + + // Jettision data source and file ingest tasks again to try to + // dispose of any tasks that slipped into the queues. + scheduler.getFileScheduler().empty(); + scheduler.getDataSourceScheduler().empty(); + + return success; } - } + + // TODO: Add done() override to notify a listener of success or failure + } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java index f05de82635..b5feb6220c 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java @@ -224,7 +224,7 @@ import org.sleuthkit.datamodel.Content; manager = IngestManager.getInstance(); } try { - manager.cancelIngestTasks(); + manager.cancelIngestJobs(); } finally { //clear inbox clearMessages(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java index 3ec81082dd..5c311363fd 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java @@ -165,7 +165,7 @@ public final class IngestMonitor { final String diskPath = root.getAbsolutePath(); MONITOR_LOGGER.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); logger.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); - manager.cancelIngestTasks(); + manager.cancelIngestJobs(); IngestServices.getInstance().postMessage(IngestMessage.createManagerErrorMessage( NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.title", diskPath), NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.msg", diskPath))); From 36c9044576f3b567c931763503d7fea3dcd1914a Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 8 Apr 2014 16:02:33 -0400 Subject: [PATCH 4/6] Fixed ingest job cancellation --- .../DataSourceIngestModuleStatusHelper.java | 21 +- .../ingest/DataSourceIngestPipeline.java | 19 +- .../autopsy/ingest/FileIngestPipeline.java | 11 +- .../sleuthkit/autopsy/ingest/IngestJob.java | 32 ++- .../autopsy/ingest/IngestManager.java | 220 +++++++----------- 5 files changed, 116 insertions(+), 187 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java index 4779f59c63..33474e8587 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java @@ -18,7 +18,6 @@ */ package org.sleuthkit.autopsy.ingest; -import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; /** @@ -27,11 +26,11 @@ import org.netbeans.api.progress.ProgressHandle; */ public class DataSourceIngestModuleStatusHelper { - private final ProgressHandle progress; + private final IngestJob ingestJob; private final String moduleDisplayName; - DataSourceIngestModuleStatusHelper(ProgressHandle progress, String moduleDisplayName) { - this.progress = progress; + DataSourceIngestModuleStatusHelper(IngestJob ingestJob, String moduleDisplayName) { + this.ingestJob = ingestJob; this.moduleDisplayName = moduleDisplayName; } @@ -43,7 +42,7 @@ public class DataSourceIngestModuleStatusHelper { * @return True if the task has been canceled, false otherwise. */ public boolean isIngestJobCancelled() { - return (Thread.currentThread().isInterrupted()); // RJCTODO: This is not right? Appears to be right... + return (ingestJob.isCancelled()); } /** @@ -55,9 +54,7 @@ public class DataSourceIngestModuleStatusHelper { * data source. */ public void switchToDeterminate(int workUnits) { - if (progress != null) { - progress.switchToDeterminate(workUnits); - } + ingestJob.getDataSourceTaskProgressBar().switchToDeterminate(workUnits); } /** @@ -65,9 +62,7 @@ public class DataSourceIngestModuleStatusHelper { * the total work units to process the data source is unknown. */ public void switchToIndeterminate() { - if (progress != null) { - progress.switchToIndeterminate(); - } + ingestJob.getDataSourceTaskProgressBar().switchToIndeterminate(); } /** @@ -77,8 +72,6 @@ public class DataSourceIngestModuleStatusHelper { * @param workUnits Number of work units performed so far by the module. */ public void progress(int workUnits) { - if (progress != null) { - progress.progress(this.moduleDisplayName, workUnits); - } + ingestJob.getDataSourceTaskProgressBar().progress(this.moduleDisplayName, workUnits); } } \ No newline at end of file diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index 8a6f07786d..2a135d4c96 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -22,10 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.logging.Level; -import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; -import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.Content; /** @@ -34,7 +31,6 @@ import org.sleuthkit.datamodel.Content; */ final class DataSourceIngestPipeline { - private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName()); private final IngestJob job; private final List moduleTemplates; private List modules = new ArrayList<>(); @@ -59,7 +55,6 @@ final class DataSourceIngestPipeline { try { module.startUp(context); modulesByClass.put(module.getClassName(), module); - IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), module.getDisplayName()); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } @@ -81,19 +76,14 @@ final class DataSourceIngestPipeline { return errors; } - List process(ProgressHandle progress) { + List process() { List errors = new ArrayList<>(); - Content dataSource = this.job.getDataSource(); - logger.log(Level.INFO, "Processing data source {0}", dataSource.getName()); for (DataSourceIngestModuleDecorator module : this.modules) { try { - module.process(dataSource, new DataSourceIngestModuleStatusHelper(progress, module.getDisplayName())); + module.process(job.getDataSource(), new DataSourceIngestModuleStatusHelper(job, module.getDisplayName())); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } - if (job.isCancelled()) { - break; - } } return errors; } @@ -105,8 +95,9 @@ final class DataSourceIngestPipeline { module.shutDown(ingestJobCancelled); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } finally { - IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName()); + } + if (job.isCancelled()) { + break; } } return errors; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java index 61319f441e..13137318e4 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java @@ -22,10 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.logging.Level; -import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; -import org.sleuthkit.datamodel.Content; /** * A file ingest pipeline composed of a sequence of file ingest modules @@ -33,7 +30,6 @@ import org.sleuthkit.datamodel.Content; */ final class FileIngestPipeline { - private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName()); private final IngestJob job; private final List moduleTemplates; private List modules = new ArrayList<>(); @@ -58,7 +54,6 @@ final class FileIngestPipeline { try { module.startUp(context); modulesByClass.put(module.getClassName(), module); - IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), template.getModuleName()); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } @@ -93,7 +88,9 @@ final class FileIngestPipeline { } } file.close(); - IngestManager.fireFileDone(file.getId()); + if (job.isCancelled()) { + IngestManager.fireFileDone(file.getId()); + } return errors; } @@ -104,8 +101,6 @@ final class FileIngestPipeline { module.shutDown(ingestJobCancelled); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } finally { - IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName()); } } return errors; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 57e0f4e16a..af5e9b8f3a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -46,7 +46,7 @@ final class IngestJob { private ProgressHandle fileTasksProgress; int totalEnqueuedFiles = 0; private int processedFiles = 0; - private boolean cancelled; + private volatile boolean cancelled; IngestJob(long id, Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) { this.id = id; @@ -159,13 +159,13 @@ final class IngestJob { synchronized List releaseIngestPipelinesForThread(long threadId) { List errors = new ArrayList<>(); - + DataSourceIngestPipeline dataSourceIngestPipeline = dataSourceIngestPipelines.get(threadId); if (dataSourceIngestPipeline != null) { errors.addAll(dataSourceIngestPipeline.shutDown(cancelled)); + dataSourceIngestPipelines.remove(threadId); } - dataSourceIngestPipelines.remove(threadId); - if (dataSourceIngestPipelines.isEmpty() && dataSourceTaskProgress != null) { + if (initialDataSourceIngestPipeline == null && dataSourceIngestPipelines.isEmpty() && dataSourceTaskProgress != null) { dataSourceTaskProgress.finish(); dataSourceTaskProgress = null; } @@ -173,9 +173,9 @@ final class IngestJob { FileIngestPipeline fileIngestPipeline = fileIngestPipelines.get(threadId); if (fileIngestPipeline != null) { errors.addAll(fileIngestPipeline.shutDown(cancelled)); + fileIngestPipelines.remove(threadId); } - fileIngestPipelines.remove(threadId); - if (fileIngestPipelines.isEmpty() && fileTasksProgress != null) { + if (initialFileIngestPipeline == null && fileIngestPipelines.isEmpty() && fileTasksProgress != null) { fileTasksProgress.finish(); fileTasksProgress = null; } @@ -184,14 +184,17 @@ final class IngestJob { } synchronized boolean areIngestPipelinesShutDown() { - return (dataSourceIngestPipelines.isEmpty() && fileIngestPipelines.isEmpty()); + return (initialDataSourceIngestPipeline == null + && dataSourceIngestPipelines.isEmpty() + && initialFileIngestPipeline == null + && fileIngestPipelines.isEmpty()); } synchronized ProgressHandle getDataSourceTaskProgressBar() { return this.dataSourceTaskProgress; } - synchronized void handleFileTaskStarted(IngestScheduler.FileScheduler.FileTask task) { + synchronized void updateFileTasksProgressBar(String currentFileName) { int newTotalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst(); if (newTotalEnqueuedFiles > totalEnqueuedFiles) { totalEnqueuedFiles = newTotalEnqueuedFiles + 1; @@ -202,14 +205,23 @@ final class IngestJob { ++processedFiles; } - fileTasksProgress.progress(task.getFile().getName(), processedFiles); + fileTasksProgress.progress(currentFileName, processedFiles); } synchronized void cancel() { + if (initialDataSourceIngestPipeline != null) { + initialDataSourceIngestPipeline.shutDown(true); + initialDataSourceIngestPipeline = null; + } + if (initialFileIngestPipeline != null) { + initialFileIngestPipeline.shutDown(true); + initialFileIngestPipeline = null; + } + cancelled = true; } - synchronized boolean isCancelled() { + boolean isCancelled() { return cancelled; } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 5f1e746675..59f0bf7789 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -23,9 +23,10 @@ import java.beans.PropertyChangeSupport; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import org.openide.util.NbBundle; @@ -53,12 +54,13 @@ public class IngestManager { private static final PropertyChangeSupport pcs = new PropertyChangeSupport(IngestManager.class); private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class); private static IngestManager instance; - private final IngestJobsManager jobsManager = new IngestJobsManager(); private final IngestScheduler scheduler = IngestScheduler.getInstance(); private final IngestMonitor ingestMonitor = new IngestMonitor(); - private ExecutorService startIngestJobsExecutor = null; - private ExecutorService dataSourceIngestTasksExecutor = null; - private ExecutorService fileIngestTasksExecutor = null; + private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); + private final HashMap ingestJobs = new HashMap<>(); // Maps job ids to jobs + private final HashMap> ingestTasks = new HashMap<>(); // Maps task ids to task cancellation handles private AtomicLong ingestJobId = new AtomicLong(0L); private AtomicLong ingestTaskId = new AtomicLong(0L); private volatile IngestUI ingestMessageBox; @@ -76,12 +78,11 @@ public class IngestManager { } private IngestManager() { - createThreadPools(); } /** - * Finds the ingest messages in box TopComponent. Called by the custom - * installer for this package once the window system is initialized. + * Finds the top component for the ingest messages in box. Called by the + * custom installer for this package once the window system is initialized. */ void initIngestMessageInbox() { if (this.ingestMessageBox == null) { @@ -107,8 +108,9 @@ public class IngestManager { ingestMessageBox.clearMessages(); } - createThreadPools(); - startIngestJobsExecutor.submit(new StartIngestJobsTask(ingestTaskId.incrementAndGet(), dataSources, moduleTemplates, processUnallocatedSpace)); + long taskId = ingestTaskId.incrementAndGet(); + Future task = startIngestJobsExecutor.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); + ingestTasks.put(taskId, task); if (ingestMessageBox != null) { ingestMessageBox.restoreMessages(); @@ -121,19 +123,18 @@ public class IngestManager { * @return True if any ingest jobs are in progress, false otherwise */ public boolean isIngestRunning() { - return jobsManager.hasJobs(); + return (ingestJobs.isEmpty() == false); } synchronized void addFileToIngestJob(long ingestJobId, AbstractFile file) { - if (!jobsManager.addFileToIngestJob(ingestJobId, file)) { - MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), - "Unable to associate " + file.getName() + " with ingest job, file will not be processed by ingest nodules", - MessageNotifyUtil.MessageType.ERROR); + IngestJob job = ingestJobs.get(ingestJobId); + if (job != null) { + scheduler.getFileScheduler().scheduleFile(job, file); } } synchronized void cancelIngestJobs() { - new IngestCancellationWorker(1, TimeUnit.SECONDS).execute(); + new IngestCancellationWorker().execute(); } /** @@ -141,7 +142,6 @@ public class IngestManager { */ public enum IngestEvent { - // RJCTODO: Update comments /** * Event sent when an ingest module has been started. Second argument of * the property change is a string form of the module name and the third @@ -292,95 +292,55 @@ public class IngestManager { ingestMonitor.start(); } - dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(ingestTaskId.incrementAndGet())); - + long taskId = ingestTaskId.incrementAndGet(); + Future task = dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, task); + int numberOfFileTasksRequested = getNumberOfFileIngestThreads(); for (int i = 0; i < numberOfFileTasksRequested; ++i) { - fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(ingestTaskId.incrementAndGet())); + taskId = ingestTaskId.incrementAndGet(); + task = fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, task); } } - private synchronized void createThreadPools() { - if (startIngestJobsExecutor == null) { - startIngestJobsExecutor = Executors.newSingleThreadExecutor(); + private synchronized void stopIngestTasks() { + // First mark all of the ingest jobs as cancelled. This way the + // ingest modules will know they are being shut down due to + // cancellation when the cancelled run ingest module tasks release + // their pipelines. + for (IngestJob job : ingestJobs.values()) { + job.cancel(); } - if (dataSourceIngestTasksExecutor == null) { - dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); - } - if (fileIngestTasksExecutor == null) { - fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); + + // Cancel the run ingest module tasks, setting the state of the threads + // running them to interrupted. + for (Future task : ingestTasks.values()) { + task.cancel(true); } + + // Jettision the remaining data source and file ingest tasks. + scheduler.getFileScheduler().empty(); + scheduler.getDataSourceScheduler().empty(); } - private synchronized boolean shutDownThreadPools(int timeOut, TimeUnit timeOutUnits) { - boolean success = true; - if (!shutDownThreadPool(startIngestJobsExecutor, timeOut, timeOutUnits)) { - success = false; - } - if (!shutDownThreadPool(dataSourceIngestTasksExecutor, timeOut, timeOutUnits)) { - success = false; - } - if (!shutDownThreadPool(fileIngestTasksExecutor, timeOut, timeOutUnits)) { - success = false; - } - startIngestJobsExecutor = null; - dataSourceIngestTasksExecutor = null; - fileIngestTasksExecutor = null; - return success; + synchronized void reportStartIngestJobsTaskDone(long taskId) { + ingestTasks.remove(taskId); } - private boolean shutDownThreadPool(ExecutorService pool, int waitTime, TimeUnit unit) { - try { - pool.shutdownNow(); - return pool.awaitTermination(waitTime, unit); - } catch (InterruptedException ex) { - pool.shutdownNow(); - Thread.currentThread().interrupt(); // Preserve interrupted status. - return false; - } - } + synchronized void reportRunIngestModulesTaskDone(long taskId) { + ingestTasks.remove(taskId); - private class IngestJobsManager { - - private final HashMap jobs = new HashMap<>(); // Maps job ids to jobs - - synchronized void addJob(IngestJob job) { - jobs.put(job.getId(), job); - } - - synchronized boolean hasJobs() { - return (jobs.isEmpty() == false); - } - - synchronized boolean addFileToIngestJob(long ingestJobId, AbstractFile file) { - IngestJob job = jobs.get(ingestJobId); - if (job != null) { - scheduler.getFileScheduler().scheduleFile(job, file); - return true; - } else { - logger.log(Level.SEVERE, "Unable to map ingest job id (id = {0}) to an ingest job, failed to add file (id = {1})", new Object[]{ingestJobId, file.getId()}); - return false; + List completedJobs = new ArrayList<>(); + for (IngestJob job : ingestJobs.values()) { + job.releaseIngestPipelinesForThread(taskId); + if (job.areIngestPipelinesShutDown() == true) { + completedJobs.add(job.getId()); } } - synchronized void cancelJobs() { - for (IngestJob job : jobs.values()) { - job.cancel(); - } - } - - synchronized void reportIngestTaskDone(long taskId) { - List completedJobs = new ArrayList<>(); - for (IngestJob job : jobs.values()) { - job.releaseIngestPipelinesForThread(taskId); - if (job.areIngestPipelinesShutDown() == true) { - completedJobs.add(job.getId()); - } - } - - for (Long jobId : completedJobs) { - jobs.remove(jobId); - } + for (Long jobId : completedJobs) { + ingestJobs.remove(jobId); } } @@ -391,7 +351,6 @@ public class IngestManager { private final List moduleTemplates; private final boolean processUnallocatedSpace; private ProgressHandle progress; - private volatile boolean finished = false; StartIngestJobsTask(long taskId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { this.id = taskId; @@ -422,7 +381,6 @@ public class IngestManager { break; } - final String inputName = dataSource.getName(); IngestJob ingestJob = new IngestJob(IngestManager.this.ingestJobId.incrementAndGet(), dataSource, moduleTemplates, processUnallocatedSpace); List errors = ingestJob.startUpIngestPipelines(); if (!errors.isEmpty()) { @@ -436,19 +394,23 @@ public class IngestManager { failedModules.append(","); } } - MessageNotifyUtil.Message.error( // RJCTODO: Fix this to show all errors + MessageNotifyUtil.Message.error( // RJCTODO: Fix this to show all errors, probably should specify data source name "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" + "No ingest modules will be run. Please disable the module " + "or fix the error and restart ingest by right clicking on " + "the data source and selecting Run Ingest Modules.\n\n" + "Error: " + errors.get(0).getModuleError().getMessage()); + ingestJob.cancel(); break; } // Save the ingest job for later cleanup of pipelines. - jobsManager.addJob(ingestJob); + synchronized (IngestManager.this) { + ingestJobs.put(ingestJob.getId(), ingestJob); + } // Queue the data source ingest tasks for the ingest job. + final String inputName = dataSource.getName(); progress.progress("DataSource Ingest" + " " + inputName, processed); scheduler.getDataSourceScheduler().schedule(ingestJob); progress.progress("DataSource Ingest" + " " + inputName, ++processed); @@ -457,22 +419,20 @@ public class IngestManager { progress.progress("File Ingest" + " " + inputName, processed); scheduler.getFileScheduler().scheduleIngestOfFiles(ingestJob); progress.progress("File Ingest" + " " + inputName, ++processed); - } - if (!Thread.currentThread().isInterrupted()) { - startIngestTasks(); + if (!Thread.currentThread().isInterrupted()) { + startIngestTasks(); + } } } catch (Exception ex) { String message = String.format("StartIngestJobsTask (id=%d) caught exception", id); logger.log(Level.SEVERE, message, ex); + MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); } finally { progress.finish(); + reportStartIngestJobsTaskDone(id); } } - - boolean isFinished() { - return finished; - } } private class RunDataSourceIngestModulesTask implements Runnable { @@ -492,14 +452,13 @@ public class IngestManager { break; } IngestJob job = scheduler.next(); - DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(id); - pipeline.process(job.getDataSourceTaskProgressBar()); + job.getDataSourceIngestPipelineForThread(id).process(); } } catch (Exception ex) { String message = String.format("RunDataSourceIngestModulesTask (id=%d) caught exception", id); logger.log(Level.SEVERE, message, ex); } finally { - jobsManager.reportIngestTaskDone(id); + reportRunIngestModulesTaskDone(id); } } } @@ -522,55 +481,34 @@ public class IngestManager { } IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); IngestJob job = task.getJob(); - FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(id); - job.handleFileTaskStarted(task); - pipeline.process(task.getFile()); + job.updateFileTasksProgressBar(task.getFile().getName()); + job.getFileIngestPipelineForThread(id).process(task.getFile()); } } catch (Exception ex) { String message = String.format("RunFileSourceIngestModulesTask (id=%d) caught exception", id); logger.log(Level.SEVERE, message, ex); } finally { - jobsManager.reportIngestTaskDone(id); + reportRunIngestModulesTaskDone(id); } } } - class IngestCancellationWorker extends SwingWorker { + class IngestCancellationWorker extends SwingWorker { - private final int timeOut; - private final TimeUnit timeOutUnits; - - IngestCancellationWorker(int timeOut, TimeUnit timeOutUnits) { - this.timeOut = timeOut; - this.timeOutUnits = timeOutUnits; + @Override + protected Void doInBackground() throws Exception { + stopIngestTasks(); + return null; } @Override - protected Boolean doInBackground() throws Exception { - // First mark all of the ingest jobs as cancelled. This way the - // ingest modules will know they are being shut down due to - // cancellation when the cancelled run ingest module tasks release - // their pipelines. This also makes sure the lock on the jobs - // manager is released before the run ingest module tasks start - // releasing pipelines. - jobsManager.cancelJobs(); - - // Jettision the remaining data source and file ingest tasks. This - // will could break the the run ingest module tasks out of their - // loops even before the pools mark their threads as interrupted. - scheduler.getFileScheduler().empty(); - scheduler.getDataSourceScheduler().empty(); - - boolean success = shutDownThreadPools(timeOut, timeOutUnits); - - // Jettision data source and file ingest tasks again to try to - // dispose of any tasks that slipped into the queues. - scheduler.getFileScheduler().empty(); - scheduler.getDataSourceScheduler().empty(); - - return success; + protected void done() { + try { + super.get(); + } catch (CancellationException | InterruptedException ex) { + } catch (Exception ex) { + logger.log(Level.SEVERE, "Error while cancelling ingest jobs", ex); + } } - - // TODO: Add done() override to notify a listener of success or failure } } From 9c928c17d5ff3fea1ec4e7c700aaf00257696f9e Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 8 Apr 2014 16:10:54 -0400 Subject: [PATCH 5/6] Fixed bug in DataSourceIngestPipeline --- .../sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index 2a135d4c96..bac11238b5 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.netbeans.api.progress.ProgressHandle; import org.sleuthkit.datamodel.Content; /** @@ -84,6 +83,9 @@ final class DataSourceIngestPipeline { } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } + if (job.isCancelled()) { + break; + } } return errors; } @@ -96,9 +98,6 @@ final class DataSourceIngestPipeline { } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } - if (job.isCancelled()) { - break; - } } return errors; } From 181c3828b72567ab56bc6945f973daae6cd90275 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 8 Apr 2014 16:13:44 -0400 Subject: [PATCH 6/6] Fixed bug in FileIngestPipeline --- Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java index 13137318e4..e221b8b33b 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java @@ -88,7 +88,7 @@ final class FileIngestPipeline { } } file.close(); - if (job.isCancelled()) { + if (!job.isCancelled()) { IngestManager.fireFileDone(file.getId()); } return errors;