diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java index 689d8e7652..33474e8587 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleStatusHelper.java @@ -18,9 +18,7 @@ */ package org.sleuthkit.autopsy.ingest; -import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; -import org.sleuthkit.datamodel.Content; /** * Used by data source ingest modules to report progress and detect data source @@ -28,15 +26,11 @@ import org.sleuthkit.datamodel.Content; */ public class DataSourceIngestModuleStatusHelper { - private final SwingWorker worker; - private final ProgressHandle progress; - private final Content dataSource; + private final IngestJob ingestJob; private final String moduleDisplayName; - DataSourceIngestModuleStatusHelper(SwingWorker worker, ProgressHandle progress, Content dataSource, String moduleDisplayName) { - this.worker = worker; - this.progress = progress; - this.dataSource = dataSource; + DataSourceIngestModuleStatusHelper(IngestJob ingestJob, String moduleDisplayName) { + this.ingestJob = ingestJob; this.moduleDisplayName = moduleDisplayName; } @@ -48,7 +42,7 @@ public class DataSourceIngestModuleStatusHelper { * @return True if the task has been canceled, false otherwise. */ public boolean isIngestJobCancelled() { - return worker.isCancelled(); + return (ingestJob.isCancelled()); } /** @@ -60,9 +54,7 @@ public class DataSourceIngestModuleStatusHelper { * data source. */ public void switchToDeterminate(int workUnits) { - if (progress != null) { - progress.switchToDeterminate(workUnits); - } + ingestJob.getDataSourceTaskProgressBar().switchToDeterminate(workUnits); } /** @@ -70,9 +62,7 @@ public class DataSourceIngestModuleStatusHelper { * the total work units to process the data source is unknown. */ public void switchToIndeterminate() { - if (progress != null) { - progress.switchToIndeterminate(); - } + ingestJob.getDataSourceTaskProgressBar().switchToIndeterminate(); } /** @@ -82,8 +72,6 @@ public class DataSourceIngestModuleStatusHelper { * @param workUnits Number of work units performed so far by the module. */ public void progress(int workUnits) { - if (progress != null) { - progress.progress(this.moduleDisplayName, workUnits); - } + ingestJob.getDataSourceTaskProgressBar().progress(this.moduleDisplayName, workUnits); } } \ No newline at end of file diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index 3b919391bc..bac11238b5 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -22,10 +22,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.logging.Level; -import javax.swing.SwingWorker; -import org.netbeans.api.progress.ProgressHandle; -import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.Content; /** @@ -34,7 +30,6 @@ import org.sleuthkit.datamodel.Content; */ final class DataSourceIngestPipeline { - private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName()); private final IngestJob job; private final List moduleTemplates; private List modules = new ArrayList<>(); @@ -59,7 +54,6 @@ final class DataSourceIngestPipeline { try { module.startUp(context); modulesByClass.put(module.getClassName(), module); - IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), module.getDisplayName()); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } @@ -81,13 +75,11 @@ final class DataSourceIngestPipeline { return errors; } - List process(SwingWorker worker, ProgressHandle progress) { + List process() { List errors = new ArrayList<>(); - Content dataSource = this.job.getDataSource(); - logger.log(Level.INFO, "Processing data source {0}", dataSource.getName()); for (DataSourceIngestModuleDecorator module : this.modules) { try { - module.process(dataSource, new DataSourceIngestModuleStatusHelper(worker, progress, dataSource, module.getDisplayName())); + module.process(job.getDataSource(), new DataSourceIngestModuleStatusHelper(job, module.getDisplayName())); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } @@ -105,8 +97,6 @@ final class DataSourceIngestPipeline { module.shutDown(ingestJobCancelled); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } finally { - IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName()); } } return errors; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java index 61319f441e..e221b8b33b 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java @@ -22,10 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.logging.Level; -import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; -import org.sleuthkit.datamodel.Content; /** * A file ingest pipeline composed of a sequence of file ingest modules @@ -33,7 +30,6 @@ import org.sleuthkit.datamodel.Content; */ final class FileIngestPipeline { - private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName()); private final IngestJob job; private final List moduleTemplates; private List modules = new ArrayList<>(); @@ -58,7 +54,6 @@ final class FileIngestPipeline { try { module.startUp(context); modulesByClass.put(module.getClassName(), module); - IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), template.getModuleName()); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); } @@ -93,7 +88,9 @@ final class FileIngestPipeline { } } file.close(); - IngestManager.fireFileDone(file.getId()); + if (!job.isCancelled()) { + IngestManager.fireFileDone(file.getId()); + } return errors; } @@ -104,8 +101,6 @@ final class FileIngestPipeline { module.shutDown(ingestJobCancelled); } catch (Exception ex) { errors.add(new IngestModuleError(module.getDisplayName(), ex)); - } finally { - IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName()); } } return errors; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 0a65bafcbb..af5e9b8f3a 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -46,7 +46,7 @@ final class IngestJob { private ProgressHandle fileTasksProgress; int totalEnqueuedFiles = 0; private int processedFiles = 0; - private boolean cancelled; + private volatile boolean cancelled; IngestJob(long id, Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) { this.id = id; @@ -85,7 +85,7 @@ final class IngestJob { "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().stopAll(); + IngestManager.getInstance().cancelIngestJobs(); return true; } }); @@ -104,7 +104,7 @@ final class IngestJob { NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", displayName)); } - IngestManager.getInstance().stopAll(); + IngestManager.getInstance().cancelIngestJobs(); return true; } }); @@ -159,13 +159,13 @@ final class IngestJob { synchronized List releaseIngestPipelinesForThread(long threadId) { List errors = new ArrayList<>(); - + DataSourceIngestPipeline dataSourceIngestPipeline = dataSourceIngestPipelines.get(threadId); if (dataSourceIngestPipeline != null) { errors.addAll(dataSourceIngestPipeline.shutDown(cancelled)); + dataSourceIngestPipelines.remove(threadId); } - dataSourceIngestPipelines.remove(threadId); - if (dataSourceIngestPipelines.isEmpty() && dataSourceTaskProgress != null) { + if (initialDataSourceIngestPipeline == null && dataSourceIngestPipelines.isEmpty() && dataSourceTaskProgress != null) { dataSourceTaskProgress.finish(); dataSourceTaskProgress = null; } @@ -173,9 +173,9 @@ final class IngestJob { FileIngestPipeline fileIngestPipeline = fileIngestPipelines.get(threadId); if (fileIngestPipeline != null) { errors.addAll(fileIngestPipeline.shutDown(cancelled)); + fileIngestPipelines.remove(threadId); } - fileIngestPipelines.remove(threadId); - if (fileIngestPipelines.isEmpty() && fileTasksProgress != null) { + if (initialFileIngestPipeline == null && fileIngestPipelines.isEmpty() && fileTasksProgress != null) { fileTasksProgress.finish(); fileTasksProgress = null; } @@ -184,14 +184,17 @@ final class IngestJob { } synchronized boolean areIngestPipelinesShutDown() { - return (dataSourceIngestPipelines.isEmpty() && fileIngestPipelines.isEmpty()); + return (initialDataSourceIngestPipeline == null + && dataSourceIngestPipelines.isEmpty() + && initialFileIngestPipeline == null + && fileIngestPipelines.isEmpty()); } synchronized ProgressHandle getDataSourceTaskProgressBar() { return this.dataSourceTaskProgress; } - synchronized void handleFileTaskStarted(IngestScheduler.FileScheduler.FileTask task) { + synchronized void updateFileTasksProgressBar(String currentFileName) { int newTotalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst(); if (newTotalEnqueuedFiles > totalEnqueuedFiles) { totalEnqueuedFiles = newTotalEnqueuedFiles + 1; @@ -202,14 +205,23 @@ final class IngestJob { ++processedFiles; } - fileTasksProgress.progress(task.getFile().getName(), processedFiles); + fileTasksProgress.progress(currentFileName, processedFiles); } synchronized void cancel() { + if (initialDataSourceIngestPipeline != null) { + initialDataSourceIngestPipeline.shutDown(true); + initialDataSourceIngestPipeline = null; + } + if (initialFileIngestPipeline != null) { + initialFileIngestPipeline.shutDown(true); + initialFileIngestPipeline = null; + } + cancelled = true; } - synchronized boolean isCancelled() { + boolean isCancelled() { return cancelled; } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index 7d0301ce5c..dc9fb58e3f 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -60,7 +60,7 @@ public final class IngestJobContext { */ public void addFiles(List files) { for (AbstractFile file : files) { - IngestManager.getInstance().scheduleFile(ingestJob.getId(), file); + IngestManager.getInstance().addFileToIngestJob(ingestJob.getId(), file); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java index fbd9fb0db3..22125647d7 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobLauncher.java @@ -232,7 +232,7 @@ public final class IngestJobLauncher { } if ((!enabledModuleTemplates.isEmpty()) && (dataSources != null) && (!dataSources.isEmpty())) { - IngestManager.getInstance().scheduleDataSourceTasks(dataSources, enabledModuleTemplates, ingestConfigPanel.getProcessUnallocSpace()); + IngestManager.getInstance().startIngestJobs(dataSources, enabledModuleTemplates, ingestConfigPanel.getProcessUnallocSpace()); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 64159e19f8..59f0bf7789 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -24,10 +24,13 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import org.openide.util.NbBundle; import org.sleuthkit.autopsy.coreutils.Logger; -import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandleFactory; import org.openide.util.Cancellable; @@ -36,6 +39,7 @@ import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.Content; import java.util.prefs.Preferences; +import javax.swing.SwingWorker; /** * Manages the execution of ingest jobs. @@ -48,24 +52,96 @@ public class IngestManager { private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2; private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); private static final PropertyChangeSupport pcs = new PropertyChangeSupport(IngestManager.class); + private static final Preferences userPreferences = NbPreferences.forModule(IngestManager.class); private static IngestManager instance; private final IngestScheduler scheduler = IngestScheduler.getInstance(); private final IngestMonitor ingestMonitor = new IngestMonitor(); - private final Preferences userPreferences = NbPreferences.forModule(this.getClass()); - private final HashMap ingestJobs = new HashMap<>(); - private TaskSchedulingWorker taskSchedulingWorker = null; - private DataSourceTaskWorker dataSourceTaskWorker = null; - private final List fileTaskWorkers = new ArrayList<>(); - private long nextDataSourceTaskId = 0; - private long nextThreadId = 0; + private final ExecutorService startIngestJobsExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService dataSourceIngestTasksExecutor = Executors.newSingleThreadExecutor(); + private final ExecutorService fileIngestTasksExecutor = Executors.newFixedThreadPool(MAX_NUMBER_OF_FILE_INGEST_THREADS); + private final HashMap ingestJobs = new HashMap<>(); // Maps job ids to jobs + private final HashMap> ingestTasks = new HashMap<>(); // Maps task ids to task cancellation handles + private AtomicLong ingestJobId = new AtomicLong(0L); + private AtomicLong ingestTaskId = new AtomicLong(0L); private volatile IngestUI ingestMessageBox; + /** + * Gets the IngestManager singleton, creating it if necessary. + * + * @returns The IngestManager singleton. + */ + public synchronized static IngestManager getInstance() { + if (instance == null) { + instance = new IngestManager(); + } + return instance; + } + + private IngestManager() { + } + + /** + * Finds the top component for the ingest messages in box. Called by the + * custom installer for this package once the window system is initialized. + */ + void initIngestMessageInbox() { + if (this.ingestMessageBox == null) { + this.ingestMessageBox = IngestMessageTopComponent.findInstance(); + } + } + + public synchronized static int getNumberOfFileIngestThreads() { + return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); + } + + public synchronized static void setNumberOfFileIngestThreads(int numberOfThreads) { + if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS + || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { + numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; + } + + userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); + } + + synchronized void startIngestJobs(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { + if (!isIngestRunning() && ingestMessageBox != null) { + ingestMessageBox.clearMessages(); + } + + long taskId = ingestTaskId.incrementAndGet(); + Future task = startIngestJobsExecutor.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); + ingestTasks.put(taskId, task); + + if (ingestMessageBox != null) { + ingestMessageBox.restoreMessages(); + } + } + + /** + * Test if any ingest jobs are in progress. + * + * @return True if any ingest jobs are in progress, false otherwise + */ + public boolean isIngestRunning() { + return (ingestJobs.isEmpty() == false); + } + + synchronized void addFileToIngestJob(long ingestJobId, AbstractFile file) { + IngestJob job = ingestJobs.get(ingestJobId); + if (job != null) { + scheduler.getFileScheduler().scheduleFile(job, file); + } + } + + synchronized void cancelIngestJobs() { + new IngestCancellationWorker().execute(); + } + /** * Ingest events. */ public enum IngestEvent { - // RJCTODO: Update comments /** * Event sent when an ingest module has been started. Second argument of * the property change is a string form of the module name and the third @@ -109,65 +185,20 @@ public class IngestManager { FILE_DONE, }; - private IngestManager() { - } - /** - * Returns reference to singleton instance. - * - * @returns Instance of class. - */ - synchronized public static IngestManager getInstance() { - if (instance == null) { - instance = new IngestManager(); - } - return instance; - } - - /** - * called by Installer in AWT thread once the Window System is ready - */ - void initIngestMessageInbox() { - if (this.ingestMessageBox == null) { - this.ingestMessageBox = IngestMessageTopComponent.findInstance(); - } - } - - synchronized private long getNextDataSourceTaskId() { - return ++this.nextDataSourceTaskId; - } - - synchronized private long getNextThreadId() { - return ++this.nextThreadId; - } - - public synchronized int getNumberOfFileIngestThreads() { - return userPreferences.getInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, DEFAULT_NUMBER_OF_FILE_INGEST_THREADS); - } - - public synchronized void setNumberOfFileIngestThreads(int numberOfThreads) { - if (numberOfThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS - || numberOfThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS) { - numberOfThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; - } - userPreferences.putInt(NUMBER_OF_FILE_INGEST_THREADS_KEY, numberOfThreads); - } - - /** - * Add property change listener to listen to ingest events as defined in - * IngestModuleEvent. + * Add property change listener to listen to ingest events. * * @param listener PropertyChangeListener to register */ - public static synchronized void addPropertyChangeListener(final PropertyChangeListener listener) { + public static void addPropertyChangeListener(final PropertyChangeListener listener) { pcs.addPropertyChangeListener(listener); } - public static synchronized void removePropertyChangeListener(final PropertyChangeListener listener) { + public static void removePropertyChangeListener(final PropertyChangeListener listener) { pcs.removePropertyChangeListener(listener); } - static synchronized void fireModuleEvent(String eventType, String moduleName) { + static void fireModuleEvent(String eventType, String moduleName) { try { pcs.firePropertyChange(eventType, moduleName, null); } catch (Exception e) { @@ -183,7 +214,7 @@ public class IngestManager { * * @param objId ID of file that is done */ - static synchronized void fireFileDone(long objId) { + static void fireFileDone(long objId) { try { pcs.firePropertyChange(IngestEvent.FILE_DONE.toString(), objId, null); } catch (Exception e) { @@ -200,7 +231,7 @@ public class IngestManager { * * @param moduleDataEvent */ - static synchronized void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { + static void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { try { pcs.firePropertyChange(IngestEvent.DATA.toString(), moduleDataEvent, null); } catch (Exception e) { @@ -217,7 +248,7 @@ public class IngestManager { * * @param moduleContentEvent */ - static synchronized void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { + static void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { try { pcs.firePropertyChange(IngestEvent.CONTENT_CHANGED.toString(), moduleContentEvent, null); } catch (Exception e) { @@ -228,218 +259,6 @@ public class IngestManager { } } - /** - * Multiple data-sources version of scheduleDataSource() method. Enqueues - * multiple sources inputs (Content objects) and associated modules at once - * - * @param modules modules to scheduleDataSource on every data source - * @param inputs input data sources to enqueue and scheduleDataSource the - * ingest modules on - */ - void scheduleDataSourceTasks(final List dataSources, final List moduleTemplates, boolean processUnallocatedSpace) { - if (!isIngestRunning() && ingestMessageBox != null) { - ingestMessageBox.clearMessages(); - } - - taskSchedulingWorker = new TaskSchedulingWorker(dataSources, moduleTemplates, processUnallocatedSpace); - taskSchedulingWorker.execute(); - - if (ingestMessageBox != null) { - ingestMessageBox.restoreMessages(); - } - } - - /** - * IngestManager entry point, enqueues data to be processed and starts new - * ingest as needed, or just enqueues data to an existing pipeline. - * - * Spawns background thread which enumerates all sorted files and executes - * chosen modules per file in a pre-determined order. Notifies modules when - * work is complete or should be interrupted using complete() and stop() - * calls. Does not block and can be called multiple times to enqueue more - * work to already running background ingest process. - * - * @param modules modules to scheduleDataSource on the data source input - * @param input input data source Content objects to scheduleDataSource the - * ingest modules on - */ - void scheduleDataSourceTask(final Content dataSource, final List moduleTemplates, boolean processUnallocatedSpace) { - List dataSources = new ArrayList<>(); - dataSources.add(dataSource); - scheduleDataSourceTasks(dataSources, moduleTemplates, processUnallocatedSpace); - } - - /** - * Schedule a file for ingest and add it to ongoing file ingest process on - * the same data source. Scheduler updates the current progress. - * - * The file to be added is usually a product of a currently ran ingest. Now - * we want to process this new file with the same ingest context. - * - * @param file file to be scheduled - * @param pipelineContext ingest context used to ingest parent of the file - * to be scheduled - */ - void scheduleFile(long ingestJobId, AbstractFile file) { - IngestJob job = this.ingestJobs.get(ingestJobId); - if (job == null) { - logger.log(Level.SEVERE, "Unable to map ingest job id (id = {0}) to an ingest job, failed to schedule file (id = {1})", new Object[]{ingestJobId, file.getId()}); - MessageNotifyUtil.Notify.show(NbBundle.getMessage(IngestManager.class, "IngestManager.moduleErr"), - "Unable to associate " + file.getName() + " with ingest job, file will not be processed by ingest nodules", - MessageNotifyUtil.MessageType.ERROR); - } - - scheduler.getFileScheduler().scheduleFile(job, file); - } - - private synchronized void startAll() { - // Make sure the ingest monitor is running. - if (!ingestMonitor.isRunning()) { - ingestMonitor.start(); - } - - // Make sure a data source task worker is running. - // TODO: There is a race condition here with SwingWorker.isDone(). - // The highly unlikely chance that no data source task worker will - // run for this job needs to be addressed. Fix by using a thread pool - // and converting the SwingWorkers to Runnables. - if (dataSourceTaskWorker == null || dataSourceTaskWorker.isDone()) { - dataSourceTaskWorker = new DataSourceTaskWorker(getNextThreadId()); - dataSourceTaskWorker.execute(); - } - - // Make sure the requested number of file task workers are running. - // TODO: There is a race condition here with SwingWorker.isDone(). - // The highly unlikely chance that no file task workers or the wrong - // number of file task workers will run for this job needs to be - // addressed. Fix by using a thread pool and converting the SwingWorkers - // to Runnables. - int workersRequested = getNumberOfFileIngestThreads(); - int workersRunning = 0; - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null) { - if (worker.isDone()) { - if (workersRunning < workersRequested) { - worker = new FileTaskWorker(getNextThreadId()); - worker.execute(); - ++workersRunning; - } else { - worker = null; - } - } else { - ++workersRunning; - } - } else if (workersRunning < workersRequested) { - worker = new FileTaskWorker(getNextThreadId()); - worker.execute(); - ++workersRunning; - } - } - while (workersRunning < workersRequested - && fileTaskWorkers.size() < MAX_NUMBER_OF_FILE_INGEST_THREADS) { - FileTaskWorker worker = new FileTaskWorker(getNextThreadId()); - fileTaskWorkers.add(worker); - worker.execute(); - ++workersRunning; - } - } - - synchronized void reportThreadDone(long threadId) { - List completedJobs = new ArrayList<>(); - for (IngestJob job : ingestJobs.values()) { - job.releaseIngestPipelinesForThread(threadId); - if (job.areIngestPipelinesShutDown()) { - completedJobs.add(job.getId()); - } - } - - for (Long jobId : completedJobs) { - ingestJobs.remove(jobId); - } - } - - synchronized void stopAll() { - // First get the task scheduling worker to stop adding new tasks. - if (taskSchedulingWorker != null) { - taskSchedulingWorker.cancel(true); - while (!taskSchedulingWorker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } - taskSchedulingWorker = null; - } - - // Now mark all of the ingest jobs as cancelled. This way the ingest - // modules will know they are being shut down due to cancellation when - // the cancelled ingest workers release their pipelines. - for (IngestJob job : ingestJobs.values()) { - job.cancel(); - } - - // Cancel the data source task worker. It will release its pipelines - // in its done() method and the pipelines will shut down their modules. - if (dataSourceTaskWorker != null) { - dataSourceTaskWorker.cancel(true); - while (!dataSourceTaskWorker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } - dataSourceTaskWorker = null; - } - - // Cancel the file task workers. They will release their pipelines - // in their done() methods and the pipelines will shut down their - // modules. - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null) { - worker.cancel(true); - while (!worker.isDone()) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - } - } - worker = null; - } - } - - // Jettision the remaining tasks. This will dispose of any tasks that - // the scheduling worker queued up before it was cancelled. - scheduler.getFileScheduler().empty(); - scheduler.getDataSourceScheduler().empty(); - } - - /** - * Test if any ingest modules are running - * - * @return true if any module is running, false otherwise - */ - public synchronized boolean isIngestRunning() { - // TODO: There is a race condition here with SwingWorker.isDone(). - // It probably needs to be addressed at a later date. If we replace the - // SwingWorkers with a thread pool and Runnables, one solution would be - // to check the ingest jobs list. - if (taskSchedulingWorker != null && !taskSchedulingWorker.isDone()) { - return true; - } - - if (dataSourceTaskWorker != null && !dataSourceTaskWorker.isDone()) { - return true; - } - - for (FileTaskWorker worker : fileTaskWorkers) { - if (worker != null && !worker.isDone()) { - return true; - } - } - - return false; - } - /** * Module publishes message using InegestManager handle Does not block. The * message gets enqueued in the GUI thread and displayed in a widget @@ -468,82 +287,217 @@ public class IngestManager { } } - private class TaskSchedulingWorker extends SwingWorker { + private synchronized void startIngestTasks() { + if (!ingestMonitor.isRunning()) { + ingestMonitor.start(); + } + long taskId = ingestTaskId.incrementAndGet(); + Future task = dataSourceIngestTasksExecutor.submit(new RunDataSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, task); + + int numberOfFileTasksRequested = getNumberOfFileIngestThreads(); + for (int i = 0; i < numberOfFileTasksRequested; ++i) { + taskId = ingestTaskId.incrementAndGet(); + task = fileIngestTasksExecutor.submit(new RunFileSourceIngestModulesTask(taskId)); + ingestTasks.put(taskId, task); + } + } + + private synchronized void stopIngestTasks() { + // First mark all of the ingest jobs as cancelled. This way the + // ingest modules will know they are being shut down due to + // cancellation when the cancelled run ingest module tasks release + // their pipelines. + for (IngestJob job : ingestJobs.values()) { + job.cancel(); + } + + // Cancel the run ingest module tasks, setting the state of the threads + // running them to interrupted. + for (Future task : ingestTasks.values()) { + task.cancel(true); + } + + // Jettision the remaining data source and file ingest tasks. + scheduler.getFileScheduler().empty(); + scheduler.getDataSourceScheduler().empty(); + } + + synchronized void reportStartIngestJobsTaskDone(long taskId) { + ingestTasks.remove(taskId); + } + + synchronized void reportRunIngestModulesTaskDone(long taskId) { + ingestTasks.remove(taskId); + + List completedJobs = new ArrayList<>(); + for (IngestJob job : ingestJobs.values()) { + job.releaseIngestPipelinesForThread(taskId); + if (job.areIngestPipelinesShutDown() == true) { + completedJobs.add(job.getId()); + } + } + + for (Long jobId : completedJobs) { + ingestJobs.remove(jobId); + } + } + + private class StartIngestJobsTask implements Runnable { + + private final long id; private final List dataSources; private final List moduleTemplates; private final boolean processUnallocatedSpace; private ProgressHandle progress; - TaskSchedulingWorker(List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + StartIngestJobsTask(long taskId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + this.id = taskId; this.dataSources = dataSources; this.moduleTemplates = moduleTemplates; this.processUnallocatedSpace = processUnallocatedSpace; } @Override - protected Object doInBackground() throws Exception { - // Set up a progress bar that can be used to cancel all of the - // ingest jobs currently being performed. - final String displayName = "Queueing ingest tasks"; - progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { - @Override - public boolean cancel() { - logger.log(Level.INFO, "Queueing ingest cancelled by user."); - if (progress != null) { - progress.setDisplayName(displayName + " (Cancelling...)"); - } - IngestManager.getInstance().stopAll(); - return true; - } - }); - - progress.start(2 * dataSources.size()); - int processed = 0; - for (Content dataSource : dataSources) { - if (isCancelled()) { - logger.log(Level.INFO, "Task scheduling thread cancelled"); - return null; - } - - final String inputName = dataSource.getName(); - IngestJob ingestJob = new IngestJob(IngestManager.this.getNextDataSourceTaskId(), dataSource, moduleTemplates, processUnallocatedSpace); - - List errors = ingestJob.startUpIngestPipelines(); - if (!errors.isEmpty()) { - StringBuilder failedModules = new StringBuilder(); - for (int i = 0; i < errors.size(); ++i) { - IngestModuleError error = errors.get(i); - String moduleName = error.getModuleDisplayName(); - logger.log(Level.SEVERE, "The " + moduleName + " module failed to start up", error.getModuleError()); - failedModules.append(moduleName); - if ((errors.size() > 1) && (i != (errors.size() - 1))) { - failedModules.append(","); + public void run() { + try { + final String displayName = "Queueing ingest tasks"; + progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + @Override + public boolean cancel() { + if (progress != null) { + progress.setDisplayName(displayName + " (Cancelling...)"); } + IngestManager.getInstance().cancelIngestJobs(); + return true; + } + }); + + progress.start(2 * dataSources.size()); + int processed = 0; + for (Content dataSource : dataSources) { + if (Thread.currentThread().isInterrupted()) { + break; + } + + IngestJob ingestJob = new IngestJob(IngestManager.this.ingestJobId.incrementAndGet(), dataSource, moduleTemplates, processUnallocatedSpace); + List errors = ingestJob.startUpIngestPipelines(); + if (!errors.isEmpty()) { + StringBuilder failedModules = new StringBuilder(); + for (int i = 0; i < errors.size(); ++i) { + IngestModuleError error = errors.get(i); + String moduleName = error.getModuleDisplayName(); + logger.log(Level.SEVERE, "The " + moduleName + " module failed to start up", error.getModuleError()); + failedModules.append(moduleName); + if ((errors.size() > 1) && (i != (errors.size() - 1))) { + failedModules.append(","); + } + } + MessageNotifyUtil.Message.error( // RJCTODO: Fix this to show all errors, probably should specify data source name + "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" + + "No ingest modules will be run. Please disable the module " + + "or fix the error and restart ingest by right clicking on " + + "the data source and selecting Run Ingest Modules.\n\n" + + "Error: " + errors.get(0).getModuleError().getMessage()); + ingestJob.cancel(); + break; + } + + // Save the ingest job for later cleanup of pipelines. + synchronized (IngestManager.this) { + ingestJobs.put(ingestJob.getId(), ingestJob); + } + + // Queue the data source ingest tasks for the ingest job. + final String inputName = dataSource.getName(); + progress.progress("DataSource Ingest" + " " + inputName, processed); + scheduler.getDataSourceScheduler().schedule(ingestJob); + progress.progress("DataSource Ingest" + " " + inputName, ++processed); + + // Queue the file ingest tasks for the ingest job. + progress.progress("File Ingest" + " " + inputName, processed); + scheduler.getFileScheduler().scheduleIngestOfFiles(ingestJob); + progress.progress("File Ingest" + " " + inputName, ++processed); + + if (!Thread.currentThread().isInterrupted()) { + startIngestTasks(); } - MessageNotifyUtil.Message.error( - "Failed to start the following ingest modules: " + failedModules.toString() + " .\n\n" - + "No ingest modules will be run. Please disable the module " - + "or fix the error and restart ingest by right clicking on " - + "the data source and selecting Run Ingest Modules.\n\n" - + "Error: " + errors.get(0).getModuleError().getMessage()); - return null; } - - // Save the ingest job for later cleanup of pipelines. - ingestJobs.put(ingestJob.getId(), ingestJob); - - // Queue the data source ingest tasks for the ingest job. - progress.progress("DataSource Ingest" + " " + inputName, processed); - scheduler.getDataSourceScheduler().schedule(ingestJob); - progress.progress("DataSource Ingest" + " " + inputName, ++processed); - - // Queue the file ingest tasks for the ingest job. - progress.progress("File Ingest" + " " + inputName, processed); - scheduler.getFileScheduler().scheduleIngestOfFiles(ingestJob); - progress.progress("File Ingest" + " " + inputName, ++processed); + } catch (Exception ex) { + String message = String.format("StartIngestJobsTask (id=%d) caught exception", id); + logger.log(Level.SEVERE, message, ex); + MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); + } finally { + progress.finish(); + reportStartIngestJobsTaskDone(id); } + } + } + private class RunDataSourceIngestModulesTask implements Runnable { + + private final long id; + + RunDataSourceIngestModulesTask(long taskId) { + id = taskId; + } + + @Override + public void run() { + try { + IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); + while (scheduler.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + IngestJob job = scheduler.next(); + job.getDataSourceIngestPipelineForThread(id).process(); + } + } catch (Exception ex) { + String message = String.format("RunDataSourceIngestModulesTask (id=%d) caught exception", id); + logger.log(Level.SEVERE, message, ex); + } finally { + reportRunIngestModulesTaskDone(id); + } + } + } + + private class RunFileSourceIngestModulesTask implements Runnable { + + private final long id; + + RunFileSourceIngestModulesTask(long taskId) { + id = taskId; + } + + @Override + public void run() { + try { + IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); + while (fileScheduler.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + break; + } + IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); + IngestJob job = task.getJob(); + job.updateFileTasksProgressBar(task.getFile().getName()); + job.getFileIngestPipelineForThread(id).process(task.getFile()); + } + } catch (Exception ex) { + String message = String.format("RunFileSourceIngestModulesTask (id=%d) caught exception", id); + logger.log(Level.SEVERE, message, ex); + } finally { + reportRunIngestModulesTaskDone(id); + } + } + } + + class IngestCancellationWorker extends SwingWorker { + + @Override + protected Void doInBackground() throws Exception { + stopIngestTasks(); return null; } @@ -552,105 +506,8 @@ public class IngestManager { try { super.get(); } catch (CancellationException | InterruptedException ex) { - // IngestManager.stopAll() will dispose of all tasks. } catch (Exception ex) { - logger.log(Level.SEVERE, "Error while scheduling ingest jobs", ex); - MessageNotifyUtil.Message.error("An error occurred while starting ingest. Results may only be partial"); - } finally { - if (!isCancelled()) { - startAll(); - } - progress.finish(); - } - } - } - - /** - * Performs data source ingest tasks for one or more ingest jobs on a worker - * thread. - */ - class DataSourceTaskWorker extends SwingWorker { - - private final long id; - - DataSourceTaskWorker(long threadId) { - this.id = threadId; - } - - @Override - protected Void doInBackground() throws Exception { - logger.log(Level.INFO, "Data source ingest thread (id={0}) started", this.id); - IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler(); - while (scheduler.hasNext()) { - if (isCancelled()) { - logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id); - return null; - } - IngestJob job = scheduler.next(); - DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id); - pipeline.process(this, job.getDataSourceTaskProgressBar()); - } - logger.log(Level.INFO, "Data source ingest thread (id={0}) completed", this.id); - return null; - } - - @Override - protected void done() { - try { - super.get(); - } catch (CancellationException | InterruptedException e) { - logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id); - } catch (Exception ex) { - String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id); - logger.log(Level.SEVERE, message, ex); - } finally { - IngestManager.getInstance().reportThreadDone(this.id); - } - } - } - - /** - * Performs file ingest tasks for one or more ingest jobs on a worker - * thread. - */ - class FileTaskWorker extends SwingWorker { - - private final long id; - - FileTaskWorker(long threadId) { - this.id = threadId; - } - - @Override - protected Object doInBackground() throws Exception { - logger.log(Level.INFO, "File ingest thread (id={0}) started", this.id); - IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler(); - while (fileScheduler.hasNext()) { - if (isCancelled()) { - logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id); - return null; - } - IngestScheduler.FileScheduler.FileTask task = fileScheduler.next(); - IngestJob job = task.getJob(); - FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id); - job.handleFileTaskStarted(task); - pipeline.process(task.getFile()); - } - logger.log(Level.INFO, "File ingest thread (id={0}) completed", this.id); - return null; - } - - @Override - protected void done() { - try { - super.get(); - } catch (CancellationException | InterruptedException e) { - logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id); - } catch (Exception ex) { - String message = String.format("File ingest thread {0} experienced a fatal error", this.id); - logger.log(Level.SEVERE, message, ex); - } finally { - IngestManager.getInstance().reportThreadDone(this.id); + logger.log(Level.SEVERE, "Error while cancelling ingest jobs", ex); } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java index d8971c476e..b5feb6220c 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMessageTopComponent.java @@ -224,7 +224,7 @@ import org.sleuthkit.datamodel.Content; manager = IngestManager.getInstance(); } try { - manager.stopAll(); + manager.cancelIngestJobs(); } finally { //clear inbox clearMessages(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java index dde9704379..5c311363fd 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestMonitor.java @@ -163,9 +163,9 @@ public final class IngestMonitor { if (checkDiskSpace() == false) { //stop ingest if running final String diskPath = root.getAbsolutePath(); - MONITOR_LOGGER.log(Level.SEVERE, "Stopping ingest due to low disk space on disk " + diskPath); - logger.log(Level.SEVERE, "Stopping ingest due to low disk space on disk " + diskPath); - manager.stopAll(); + MONITOR_LOGGER.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); + logger.log(Level.SEVERE, "Stopping ingest due to low disk space on disk {0}", diskPath); + manager.cancelIngestJobs(); IngestServices.getInstance().postMessage(IngestMessage.createManagerErrorMessage( NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.title", diskPath), NbBundle.getMessage(this.getClass(), "IngestMonitor.mgrErrMsg.lowDiskSpace.msg", diskPath)));