From c11b5a0fe070b77ce59b0ff8001ece362b692b42 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 16 Dec 2014 23:19:19 -0500 Subject: [PATCH] Continue reworking of ingest API --- .../AddImageWizardIngestConfigPanel.java | 2 +- .../autopsy/ingest/DataSourceIngestJob.java | 1269 +++++++++-------- .../ingest/DataSourceIngestPipeline.java | 52 +- .../autopsy/ingest/FileIngestPipeline.java | 42 +- .../sleuthkit/autopsy/ingest/IngestJob.java | 219 +-- .../autopsy/ingest/IngestJobConfigurator.java | 2 +- .../autopsy/ingest/IngestManager.java | 505 ++++--- .../autopsy/ingest/IngestModule.java | 6 + .../ingest/IngestProgressSnapshotPanel.java | 2 +- .../ingest/RunIngestModulesDialog.java | 2 +- 10 files changed, 1055 insertions(+), 1046 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/casemodule/AddImageWizardIngestConfigPanel.java b/Core/src/org/sleuthkit/autopsy/casemodule/AddImageWizardIngestConfigPanel.java index 052d46047c..7ac97c1ca2 100644 --- a/Core/src/org/sleuthkit/autopsy/casemodule/AddImageWizardIngestConfigPanel.java +++ b/Core/src/org/sleuthkit/autopsy/casemodule/AddImageWizardIngestConfigPanel.java @@ -211,7 +211,7 @@ class AddImageWizardIngestConfigPanel implements WizardDescriptor.Panel fileIngestPipelinesForThreads; - private final List fileIngestPipelines; + private final LinkedBlockingQueue fileIngestPipelinesQueue = new LinkedBlockingQueue<>(); + private final List fileIngestPipelines = new ArrayList<>(); /** - * An ingest job supports cancellation of either the currently running data - * source level ingest module or the entire ingest job. + * A data source ingest job supports cancellation of either the currently + * running data source level ingest module or the entire ingest job. + * + * TODO: The currentDataSourceIngestModuleCancelled field and all of the + * code concerned with it is a hack to avoid an API change. The next time an + * API change is legal, a cancel() method needs to be added to the + * IngestModule interface and this field should be removed. The "ingest job + * is canceled" queries should also be removed from the IngestJobContext + * class. */ private volatile boolean currentDataSourceIngestModuleCancelled; private volatile boolean cancelled; /** - * An ingest job uses the task scheduler singleton to create and queue the - * ingest tasks that make up the job. + * A data source ingest job uses the task scheduler singleton to create and + * queue the ingest tasks that make up the job. */ private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); - private final boolean doUI; + /** + * A data source ingest job can run interactively using NetBeans progress + * handles. + */ + private final boolean runInteractively; /** - * These fields are used to report data source level ingest progress for the - * ingest job. + * A data source ingest job uses these fields to report data source level + * ingest progress. */ - private final Object dataSourceIngestProgressLock; + private final Object dataSourceIngestProgressLock = new Object(); private ProgressHandle dataSourceIngestProgress; /** - * These fields are used to report file level ingest task progress for the - * ingest job. + * A data source ingest job uses these fields to report file level ingest + * progress. */ - private final Object fileIngestProgressLock; - private final List filesInProgress; + private final Object fileIngestProgressLock = new Object(); + private final List filesInProgress = new ArrayList<>(); private long estimatedFilesToProcess; private long processedFiles; private ProgressHandle fileIngestProgress; /** - * This field is used to record the creation of the ingest job. + * A data source ingest job uses this field to report its creation time. */ private final long createTime; /** - * Constructs an ingest job. + * Constructs an object that encapsulates a data source and the ingest + * module pipelines used to process it. * * @param parentJob The ingest job of which this data source ingest job is a * part. * @param dataSource The data source to be ingested. * @param settings The settings for the ingest job. - * @param doUI Whether or not to display progress bars. + * @param runInteractively Whether or not this job should use NetBeans + * progress handles. */ - DataSourceIngestJob(IngestJob parentJob, Content dataSource, IngestJobSettings settings, boolean doUI) { + DataSourceIngestJob(IngestJob parentJob, Content dataSource, IngestJobSettings settings, boolean runInteractively) { this.parentJob = parentJob; this.id = DataSourceIngestJob.nextJobId.getAndIncrement(); this.dataSource = dataSource; this.settings = settings; - this.doUI = doUI; - this.dataSourceIngestPipelineLock = new Object(); - this.fileIngestPipelinesForThreads = new LinkedBlockingQueue<>(); - this.fileIngestPipelines = new ArrayList<>(); - this.filesInProgress = new ArrayList<>(); - this.dataSourceIngestProgressLock = new Object(); - this.fileIngestProgressLock = new Object(); - this.stage = DataSourceIngestJob.Stages.INITIALIZATION; - this.stageCompletionCheckLock = new Object(); + this.runInteractively = runInteractively; this.createTime = new Date().getTime(); - } - - /** - * Gets the unique identifier of this job. - * - * @return The job identifier. - */ - long getId() { - return this.id; - } - - /** - * Gets the data source to be ingested by this job. - * - * @return A Content object representing the data source. - */ - Content getDataSource() { - return this.dataSource; - } - - /** - * Indicates whether or not unallocated space should be processed as part of - * this job. - * - * @return True or false. - */ - boolean shouldProcessUnallocatedSpace() { - return this.settings.getProcessUnallocatedSpace(); - } - - /** - * Passes the data source for this job through the currently active data - * source level ingest pipeline. - * - * @param task A data source ingest task wrapping the data source. - */ - void process(DataSourceIngestTask task) { - try { - synchronized (this.dataSourceIngestPipelineLock) { - if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) { - /** - * Run the data source through the pipeline. - */ - List errors = new ArrayList<>(); - errors.addAll(this.currentDataSourceIngestPipeline.process(task)); - if (!errors.isEmpty()) { - logIngestModuleErrors(errors); - } - } - } - - /** - * Shut down the data source ingest progress bar right away. Data - * source-level processing is finished for this stage. - */ - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; - } - } - } finally { - /** - * No matter what happens, do ingest task bookkeeping. - */ - DataSourceIngestJob.taskScheduler.notifyTaskCompleted(task); - this.checkForStageCompleted(); - } - } - - /** - * Passes a file from the data source for this job through the file level - * ingest pipeline. - * - * @param task A file ingest task. - * @throws InterruptedException if the thread executing this code is - * interrupted while blocked on taking from or putting to the file ingest - * pipelines collection. - */ - void process(FileIngestTask task) throws InterruptedException { - try { - if (!this.isCancelled()) { - /** - * Get a file ingest pipeline not currently in use by another - * file ingest thread. - */ - FileIngestPipeline pipeline = this.fileIngestPipelinesForThreads.take(); - if (!pipeline.isEmpty()) { - /** - * Get the file to process. - */ - AbstractFile file = task.getFile(); - - /** - * Update the file ingest progress bar. - */ - synchronized (this.fileIngestProgressLock) { - ++this.processedFiles; - if (this.processedFiles <= this.estimatedFilesToProcess) { - this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles); - } else { - this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess); - } - this.filesInProgress.add(file.getName()); - } - - /** - * Run the file through the pipeline. - */ - List errors = new ArrayList<>(); - errors.addAll(pipeline.process(task)); - if (!errors.isEmpty()) { - logIngestModuleErrors(errors); - } - - /** - * Update the file ingest progress bar again, in case the - * file was being displayed. - */ - if (!this.cancelled) { - synchronized (this.fileIngestProgressLock) { - this.filesInProgress.remove(file.getName()); - if (this.filesInProgress.size() > 0) { - this.fileIngestProgress.progress(this.filesInProgress.get(0)); - } else { - this.fileIngestProgress.progress(""); - } - } - } - } - - /** - * Relinquish the pipeline so it can be reused by another file - * ingest thread. - */ - this.fileIngestPipelinesForThreads.put(pipeline); - } - } finally { - /** - * No matter what happens, do ingest task bookkeeping. - */ - DataSourceIngestJob.taskScheduler.notifyTaskCompleted(task); - this.checkForStageCompleted(); - } - } - - /** - * Adds more files to an ingest job, i.e., extracted or carved files. Not - * currently supported for the second stage of the job. - * - * @param files A list of files to add. - */ - void addFiles(List files) { - /** - * Note: This implementation assumes that this is being called by an an - * ingest module running code on an ingest thread that is holding a - * reference to an ingest task, so no task completion check is done. - */ - if (DataSourceIngestJob.Stages.FIRST == this.stage) { - for (AbstractFile file : files) { - DataSourceIngestJob.taskScheduler.scheduleFileIngestTask(this, file); - } - } else { - DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS - } - - /** - * The intended clients of this method are ingest modules running code - * on an ingest thread that is holding a reference to an ingest task, in - * which case a task completion check would not be necessary. This is a - * bit of defensive programming. - */ - this.checkForStageCompleted(); - } - - /** - * Queries whether or not this job is running file ingest. - * - * @return True or false. - */ - boolean fileIngestIsRunning() { - return ((DataSourceIngestJob.Stages.FIRST == this.stage) && this.hasFileIngestPipeline()); - } - - /** - * Updates the display name of the data source level ingest progress bar. - * - * @param displayName The new display name. - */ - void updateDataSourceIngestProgressBarDisplayName(String displayName) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.setDisplayName(displayName); - } - } - } - - /** - * Switches the data source level ingest progress bar to determinate mode. - * This should be called if the total work units to process the data source - * is known. - * - * @param workUnits Total number of work units for the processing of the - * data source. - */ - void switchDataSourceIngestProgressBarToDeterminate(int workUnits) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.switchToDeterminate(workUnits); - } - } - } - } - - /** - * Switches the data source level ingest progress bar to indeterminate mode. - * This should be called if the total work units to process the data source - * is unknown. - */ - void switchDataSourceIngestProgressBarToIndeterminate() { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.switchToIndeterminate(); - } - } - } - } - - /** - * Updates the data source level ingest progress bar with the number of work - * units performed, if in the determinate mode. - * - * @param workUnits Number of work units performed. - */ - void advanceDataSourceIngestProgressBar(int workUnits) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.progress("", workUnits); - } - } - } - } - - /** - * Updates the data source level ingest progress with a new task name, where - * the task name is the "subtitle" under the display name. - * - * @param currentTask The task name. - */ - void advanceDataSourceIngestProgressBar(String currentTask) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.progress(currentTask); - } - } - } - } - - /** - * Updates the data source level ingest progress bar with a new task name - * and the number of work units performed, if in the determinate mode. The - * task name is the "subtitle" under the display name. - * - * @param currentTask The task name. - * @param workUnits Number of work units performed. - */ - void advanceDataSourceIngestProgressBar(String currentTask, int workUnits) { - if (!this.cancelled) { - synchronized (this.fileIngestProgressLock) { - this.dataSourceIngestProgress.progress(currentTask, workUnits); - } - } - } - - /** - * Queries whether or not a temporary cancellation of data source level - * ingest in order to stop the currently executing data source level ingest - * module is in effect. - * - * @return True or false. - */ - boolean currentDataSourceIngestModuleIsCancelled() { - return this.currentDataSourceIngestModuleCancelled; - } - - /** - * Rescind a temporary cancellation of data source level ingest that was - * used to stop a single data source level ingest module. - */ - void currentDataSourceIngestModuleCancellationCompleted() { - this.currentDataSourceIngestModuleCancelled = false; - - /** - * A new progress bar must be created because the cancel button of the - * previously constructed component is disabled by NetBeans when the - * user selects the "OK" button of the cancellation confirmation dialog - * popped up by NetBeans when the progress bar cancel button was - * pressed. - */ - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; - this.startDataSourceIngestProgressBar(); - } - } - - /** - * Requests cancellation of ingest, i.e., a shutdown of the data source - * level and file level ingest pipelines. - */ - void cancel() { - /** - * Put a cancellation message on data source level ingest progress bar, - * if it is still running. - */ - synchronized (this.dataSourceIngestProgressLock) { - if (dataSourceIngestProgress != null) { - final String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.dataSourceIngest.initialDisplayName", - dataSource.getName()); - dataSourceIngestProgress.setDisplayName( - NbBundle.getMessage(this.getClass(), - "IngestJob.progress.cancelling", - displayName)); - } - } - - /** - * Put a cancellation message on the file level ingest progress bar, if - * it is still running. - */ - synchronized (this.fileIngestProgressLock) { - if (this.fileIngestProgress != null) { - final String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.fileIngest.displayName", - this.dataSource.getName()); - this.fileIngestProgress.setDisplayName( - NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", - displayName)); - } - } - - this.cancelled = true; - - /** - * Tell the task scheduler to cancel all pending tasks, i.e., tasks not - * not being performed by an ingest thread. - */ - DataSourceIngestJob.taskScheduler.cancelPendingTasksForIngestJob(this); - this.checkForStageCompleted(); - } - - /** - * Queries whether or not cancellation of ingest i.e., a shutdown of the - * data source level and file level ingest pipelines, has been requested. - * - * @return True or false. - */ - boolean isCancelled() { - return this.cancelled; - } - - /** - * Starts up the ingest pipelines and ingest progress bars. - * - * @return A collection of ingest module startup errors, empty on success. - */ - List start() { - this.createIngestPipelines(settings.getEnabledIngestModuleTemplates()); - List errors = startUpIngestPipelines(); - if (errors.isEmpty()) { - if (this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline()) { - this.startFirstStage(); - } else if (this.hasSecondStageDataSourceIngestPipeline()) { - this.startSecondStage(); - } - } - return errors; + this.createIngestPipelines(); } /** * Creates the file and data source ingest pipelines. - * - * @param ingestModuleTemplates Ingest module templates to use to populate - * the pipelines. */ - private void createIngestPipelines(List ingestModuleTemplates) { + private void createIngestPipelines() { + List ingestModuleTemplates = this.settings.getEnabledIngestModuleTemplates(); + /** * Make mappings of ingest module factory class names to templates. */ @@ -601,7 +226,7 @@ final class DataSourceIngestJob { int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads(); for (int i = 0; i < numberOfFileIngestThreads; ++i) { FileIngestPipeline pipeline = new FileIngestPipeline(this, fileIngestModuleTemplates); - this.fileIngestPipelinesForThreads.put(pipeline); + this.fileIngestPipelinesQueue.put(pipeline); this.fileIngestPipelines.add(pipeline); } } catch (InterruptedException ex) { @@ -615,16 +240,18 @@ final class DataSourceIngestJob { } /** - * Use an ordered list of ingest module factory class names to create an - * ordered output list of ingest module templates for an ingest pipeline. - * The ingest module templates are removed from the input collection as they - * are added to the output collection. + * Uses an input collection of ingest module templates and a pipeline + * configuration, i.e., an ordered list of ingest module factory class + * names, to create an ordered output list of ingest module templates for an + * ingest pipeline. The ingest module templates are removed from the input + * collection as they are added to the output collection. * * @param ingestModuleTemplates A mapping of ingest module factory class * names to ingest module templates. * @param pipelineConfig An ordered list of ingest module factory class * names representing an ingest pipeline. - * @return + * @return An ordered list of ingest module templates, i.e., an + * uninstantiated pipeline. */ private static List getConfiguredIngestModuleTemplates(Map ingestModuleTemplates, List pipelineConfig) { List templates = new ArrayList<>(); @@ -637,60 +264,31 @@ final class DataSourceIngestJob { } /** - * Starts the first stage of the job. + * Gets the identifier of this job. + * + * @return The job identifier. */ - private void startFirstStage() { - this.stage = DataSourceIngestJob.Stages.FIRST; - - /** - * Start one or both of the first stage ingest progress bars. - */ - if (this.hasFirstStageDataSourceIngestPipeline()) { - this.startDataSourceIngestProgressBar(); - } - if (this.hasFileIngestPipeline()) { - this.startFileIngestProgressBar(); - } - - /** - * Make the first stage data source level ingest pipeline the current - * data source level pipeline. - */ - synchronized (this.dataSourceIngestPipelineLock) { - this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline; - } - - /** - * Schedule the first stage tasks. - */ - if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) { - DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this); - } else if (this.hasFirstStageDataSourceIngestPipeline()) { - DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); - } else { - DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this); - - /** - * No data source ingest task has been scheduled for this stage, and - * it is possible, if unlikely, that no file ingest tasks were - * actually scheduled since there are files that get filtered out by - * the tasks scheduler. In this special case, an ingest thread will - * never get to check for completion of this stage of the job. - */ - this.checkForStageCompleted(); - } + long getId() { + return this.id; } /** - * Starts the second stage of the ingest job. + * Gets the data source to be ingested by this job. + * + * @return A Content object representing the data source. */ - private void startSecondStage() { - this.stage = DataSourceIngestJob.Stages.SECOND; - this.startDataSourceIngestProgressBar(); - synchronized (this.dataSourceIngestPipelineLock) { - this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; - } - DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); + Content getDataSource() { + return this.dataSource; + } + + /** + * Queries whether or not unallocated space should be processed as part of + * this job. + * + * @return True or false. + */ + boolean shouldProcessUnallocatedSpace() { + return this.settings.getProcessUnallocatedSpace(); } /** @@ -725,18 +323,37 @@ final class DataSourceIngestJob { } /** - * Checks to see if the job has a file level ingest pipeline. + * Checks to see if this job has a file level ingest pipeline. * * @return True or false. */ private boolean hasFileIngestPipeline() { - // RJCTODO: Consider going to the list instead...what if the pipelines are all checked out by threads? - return (this.fileIngestPipelinesForThreads.peek().isEmpty() == false); + if (!this.fileIngestPipelines.isEmpty()) { + return !this.fileIngestPipelines.get(0).isEmpty(); + } + return false; } /** - * Starts up each of the file and data source level ingest modules to - * collect possible errors. + * Starts up the ingest pipelines for this job. + * + * @return A collection of ingest module startup errors, empty on success. + */ + List start() { + List errors = startUpIngestPipelines(); + if (errors.isEmpty()) { + if (this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline()) { + this.startFirstStage(); + } else if (this.hasSecondStageDataSourceIngestPipeline()) { + this.startSecondStage(); + } + } + return errors; + } + + /** + * Starts up each of the ingest pipelines for this job to collect any file + * and data source level ingest modules errors that might occur. * * @return A collection of ingest module startup errors, empty on success. */ @@ -750,7 +367,7 @@ final class DataSourceIngestJob { errors.addAll(this.secondStageDataSourceIngestPipeline.startUp()); // Start up the file ingest pipelines (one per file ingest thread). - for (FileIngestPipeline pipeline : this.fileIngestPipelinesForThreads) { + for (FileIngestPipeline pipeline : this.fileIngestPipelinesQueue) { errors.addAll(pipeline.startUp()); if (!errors.isEmpty()) { // If there are start up errors, the ingest job will not proceed @@ -761,14 +378,13 @@ final class DataSourceIngestJob { // pipeline. There is no need to complete starting up all of the // file ingest pipeline copies since any additional start up // errors are likely redundant. - while (!this.fileIngestPipelinesForThreads.isEmpty()) { - pipeline = this.fileIngestPipelinesForThreads.poll(); + while (!this.fileIngestPipelinesQueue.isEmpty()) { + pipeline = this.fileIngestPipelinesQueue.poll(); List shutDownErrors = pipeline.shutDown(); if (!shutDownErrors.isEmpty()) { logIngestModuleErrors(shutDownErrors); } } - this.fileIngestPipelines.clear(); break; } } @@ -778,66 +394,135 @@ final class DataSourceIngestJob { } /** - * Starts the data source level ingest progress bar. + * Starts the first stage of this job. + */ + private void startFirstStage() { + this.stage = DataSourceIngestJob.Stages.FIRST; + + if (this.hasFileIngestPipeline()) { + this.estimatedFilesToProcess = this.dataSource.accept(new GetFilesCountVisitor()); + } + + if (this.runInteractively) { + /** + * Start one or both of the first stage ingest progress bars. + */ + if (this.hasFirstStageDataSourceIngestPipeline()) { + this.startDataSourceIngestProgressBar(); + } + if (this.hasFileIngestPipeline()) { + this.startFileIngestProgressBar(); + } + } + + /** + * Make the first stage data source level ingest pipeline the current + * data source level pipeline. + */ + synchronized (this.dataSourceIngestPipelineLock) { + this.currentDataSourceIngestPipeline = this.firstStageDataSourceIngestPipeline; + } + + /** + * Schedule the first stage tasks. + */ + if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) { + DataSourceIngestJob.taskScheduler.scheduleIngestTasks(this); + } else if (this.hasFirstStageDataSourceIngestPipeline()) { + DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); + } else { + DataSourceIngestJob.taskScheduler.scheduleFileIngestTasks(this); + + /** + * No data source ingest task has been scheduled for this stage, and + * it is possible, if unlikely, that no file ingest tasks were + * actually scheduled since there are files that get filtered out by + * the tasks scheduler. In this special case, an ingest thread will + * never get to check for completion of this stage of the job, so do + * it now. + */ + this.checkForStageCompleted(); + } + } + + /** + * Starts the second stage of this ingest job. + */ + private void startSecondStage() { + this.stage = DataSourceIngestJob.Stages.SECOND; + if (this.runInteractively) { + this.startDataSourceIngestProgressBar(); + } + synchronized (this.dataSourceIngestPipelineLock) { + this.currentDataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; + } + DataSourceIngestJob.taskScheduler.scheduleDataSourceIngestTask(this); + } + + /** + * Starts a data source level ingest progress bar for this job. */ private void startDataSourceIngestProgressBar() { - synchronized (this.dataSourceIngestProgressLock) { - String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.dataSourceIngest.initialDisplayName", - this.dataSource.getName()); - this.dataSourceIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { - @Override - public boolean cancel() { - // If this method is called, the user has already pressed - // the cancel button on the progress bar and the OK button - // of a cancelation confirmation dialog supplied by - // NetBeans. What remains to be done is to find out whether - // the user wants to cancel only the currently executing - // data source ingest module or the entire ingest job. - DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel(); - String dialogTitle = NbBundle.getMessage(DataSourceIngestJob.this.getClass(), "IngestJob.cancellationDialog.title"); - JOptionPane.showConfirmDialog(null, panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE); - if (panel.cancelAllDataSourceIngestModules()) { - DataSourceIngestJob.this.cancel(); - } else { - DataSourceIngestJob.this.cancelCurrentDataSourceIngestModule(); + if (this.runInteractively) { + synchronized (this.dataSourceIngestProgressLock) { + String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.dataSourceIngest.initialDisplayName", + this.dataSource.getName()); + this.dataSourceIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + @Override + public boolean cancel() { + // If this method is called, the user has already pressed + // the cancel button on the progress bar and the OK button + // of a cancelation confirmation dialog supplied by + // NetBeans. What remains to be done is to find out whether + // the user wants to cancel only the currently executing + // data source ingest module or the entire ingest job. + DataSourceIngestCancellationPanel panel = new DataSourceIngestCancellationPanel(); + String dialogTitle = NbBundle.getMessage(DataSourceIngestJob.this.getClass(), "IngestJob.cancellationDialog.title"); + JOptionPane.showConfirmDialog(null, panel, dialogTitle, JOptionPane.OK_OPTION, JOptionPane.PLAIN_MESSAGE); + if (panel.cancelAllDataSourceIngestModules()) { + DataSourceIngestJob.this.cancel(); + } else { + DataSourceIngestJob.this.cancelCurrentDataSourceIngestModule(); + } + return true; } - return true; - } - }); - this.dataSourceIngestProgress.start(); - this.dataSourceIngestProgress.switchToIndeterminate(); + }); + this.dataSourceIngestProgress.start(); + this.dataSourceIngestProgress.switchToIndeterminate(); + } } } /** - * Starts the file level ingest progress bar. + * Starts the file level ingest progress bar for this job. */ private void startFileIngestProgressBar() { - synchronized (this.fileIngestProgressLock) { - String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.fileIngest.displayName", - this.dataSource.getName()); - this.fileIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { - @Override - public boolean cancel() { - // If this method is called, the user has already pressed - // the cancel button on the progress bar and the OK button - // of a cancelation confirmation dialog supplied by - // NetBeans. - DataSourceIngestJob.this.cancel(); - return true; - } - }); - this.estimatedFilesToProcess = this.dataSource.accept(new GetFilesCountVisitor()); - this.fileIngestProgress.start(); - this.fileIngestProgress.switchToDeterminate((int) this.estimatedFilesToProcess); + if (this.runInteractively) { + synchronized (this.fileIngestProgressLock) { + String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.fileIngest.displayName", + this.dataSource.getName()); + this.fileIngestProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + @Override + public boolean cancel() { + // If this method is called, the user has already pressed + // the cancel button on the progress bar and the OK button + // of a cancelation confirmation dialog supplied by + // NetBeans. + DataSourceIngestJob.this.cancel(); + return true; + } + }); + this.fileIngestProgress.start(); + this.fileIngestProgress.switchToDeterminate((int) this.estimatedFilesToProcess); + } } } /** - * Checks to see if the ingest tasks for the current stage are completed and - * does a stage transition if they are. + * Checks to see if the ingest tasks for the current stage of this job are + * completed and does a stage transition if they are. */ private void checkForStageCompleted() { synchronized (this.stageCompletionCheckLock) { @@ -855,38 +540,39 @@ final class DataSourceIngestJob { } /** - * Shuts down the first stage ingest pipelines and progress bars and starts - * the second stage, if appropriate. + * Shuts down the first stage ingest pipelines and progress bars for this + * job and starts the second stage, if appropriate. */ private void finishFirstStage() { // Shut down the file ingest pipelines. Note that no shut down is // required for the data source ingest pipeline because data source // ingest modules do not have a shutdown() method. List errors = new ArrayList<>(); - while (!this.fileIngestPipelinesForThreads.isEmpty()) { - FileIngestPipeline pipeline = fileIngestPipelinesForThreads.poll(); + while (!this.fileIngestPipelinesQueue.isEmpty()) { + FileIngestPipeline pipeline = fileIngestPipelinesQueue.poll(); errors.addAll(pipeline.shutDown()); } if (!errors.isEmpty()) { logIngestModuleErrors(errors); } - this.fileIngestPipelines.clear(); - // Finish the first stage data source ingest progress bar, if it hasn't - // already been finished. - synchronized (this.dataSourceIngestProgressLock) { - if (this.dataSourceIngestProgress != null) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; + if (this.runInteractively) { + // Finish the first stage data source ingest progress bar, if it hasn't + // already been finished. + synchronized (this.dataSourceIngestProgressLock) { + if (this.dataSourceIngestProgress != null) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + } } - } - // Finish the file ingest progress bar, if it hasn't already - // been finished. - synchronized (this.fileIngestProgressLock) { - if (this.fileIngestProgress != null) { - this.fileIngestProgress.finish(); - this.fileIngestProgress = null; + // Finish the file ingest progress bar, if it hasn't already + // been finished. + synchronized (this.fileIngestProgressLock) { + if (this.fileIngestProgress != null) { + this.fileIngestProgress.finish(); + this.fileIngestProgress = null; + } } } @@ -906,18 +592,353 @@ final class DataSourceIngestJob { private void finish() { this.stage = DataSourceIngestJob.Stages.FINALIZATION; - // Finish the second stage data source ingest progress bar, if it hasn't - // already been finished. - synchronized (this.dataSourceIngestProgressLock) { - if (this.dataSourceIngestProgress != null) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; + if (this.runInteractively) { + // Finish the second stage data source ingest progress bar, if it hasn't + // already been finished. + synchronized (this.dataSourceIngestProgressLock) { + if (this.dataSourceIngestProgress != null) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + } } } this.parentJob.dataSourceJobFinished(this); } + /** + * Passes the data source for this job through the currently active data + * source level ingest pipeline. + * + * @param task A data source ingest task wrapping the data source. + */ + void process(DataSourceIngestTask task) { + try { + synchronized (this.dataSourceIngestPipelineLock) { + if (!this.isCancelled() && !this.currentDataSourceIngestPipeline.isEmpty()) { + List errors = new ArrayList<>(); + errors.addAll(this.currentDataSourceIngestPipeline.process(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } + } + } + + if (this.runInteractively) { + /** + * Shut down the data source ingest progress bar right away. + * Data source-level processing is finished for this stage. + */ + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + } + } + } + + } finally { + DataSourceIngestJob.taskScheduler.notifyTaskCompleted(task); + this.checkForStageCompleted(); + } + } + + /** + * Passes a file from the data source for this job through the file level + * ingest pipeline. + * + * @param task A file ingest task. + * @throws InterruptedException if the thread executing this code is + * interrupted while blocked on taking from or putting to the file ingest + * pipelines collection. + */ + void process(FileIngestTask task) throws InterruptedException { + try { + if (!this.isCancelled()) { + FileIngestPipeline pipeline = this.fileIngestPipelinesQueue.take(); + if (!pipeline.isEmpty()) { + AbstractFile file = task.getFile(); + ++this.processedFiles; + + if (this.runInteractively) { + /** + * Update the file ingest progress bar. + */ + synchronized (this.fileIngestProgressLock) { + if (this.processedFiles <= this.estimatedFilesToProcess) { + this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles); + } else { + this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess); + } + this.filesInProgress.add(file.getName()); + } + } + + /** + * Run the file through the pipeline. + */ + List errors = new ArrayList<>(); + errors.addAll(pipeline.process(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } + + if (this.runInteractively && !this.cancelled) { + synchronized (this.fileIngestProgressLock) { + /** + * Update the file ingest progress bar again, in + * case the file was being displayed. + */ + this.filesInProgress.remove(file.getName()); + if (this.filesInProgress.size() > 0) { + this.fileIngestProgress.progress(this.filesInProgress.get(0)); + } else { + this.fileIngestProgress.progress(""); + } + } + } + } + this.fileIngestPipelinesQueue.put(pipeline); + } + } finally { + DataSourceIngestJob.taskScheduler.notifyTaskCompleted(task); + this.checkForStageCompleted(); + } + } + + /** + * Adds more files from the data source for this job to the job, i.e., adds + * extracted or carved files. Not currently supported for the second stage + * of the job. + * + * @param files A list of the files to add. + */ + void addFiles(List files) { + if (DataSourceIngestJob.Stages.FIRST == this.stage) { // RJCTODO: Is this synchronized correctly + for (AbstractFile file : files) { + DataSourceIngestJob.taskScheduler.scheduleFileIngestTask(this, file); + } + } else { + DataSourceIngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS + } + + /** + * The intended clients of this method are ingest modules running code + * on an ingest thread that is holding a reference to an ingest task, in + * which case a completion check would not be necessary, so this is a + * bit of defensive programming. + */ + this.checkForStageCompleted(); + } + + /** + * Updates the display name shown on the current data source level ingest + * progress bar for this job. + * + * @param displayName The new display name. + */ + void updateDataSourceIngestProgressBarDisplayName(String displayName) { + if (this.runInteractively && !this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + this.dataSourceIngestProgress.setDisplayName(displayName); + } + } + } + + /** + * Switches the data source level ingest progress bar for this job to + * determinate mode. This should be called if the total work units to + * process the data source is known. + * + * @param workUnits Total number of work units for the processing of the + * data source. + */ + void switchDataSourceIngestProgressBarToDeterminate(int workUnits) { + if (this.runInteractively && !this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.switchToDeterminate(workUnits); + } + } + } + } + + /** + * Switches the data source level ingest progress bar for this job to + * indeterminate mode. This should be called if the total work units to + * process the data source is unknown. + */ + void switchDataSourceIngestProgressBarToIndeterminate() { + if (this.runInteractively && !this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.switchToIndeterminate(); + } + } + } + } + + /** + * Updates the data source level ingest progress bar for this job with the + * number of work units performed, if in the determinate mode. + * + * @param workUnits Number of work units performed. + */ + void advanceDataSourceIngestProgressBar(int workUnits) { + if (this.runInteractively && !this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.progress("", workUnits); + } + } + } + } + + /** + * Updates the data source level ingest progress for this job with a new + * task name, where the task name is the "subtitle" under the display name. + * + * @param currentTask The task name. + */ + void advanceDataSourceIngestProgressBar(String currentTask) { + if (this.runInteractively && !this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.progress(currentTask); + } + } + } + } + + /** + * Updates the data source level ingest progress bar for this with a new + * task name and the number of work units performed, if in the determinate + * mode. The task name is the "subtitle" under the display name. + * + * @param currentTask The task name. + * @param workUnits Number of work units performed. + */ + void advanceDataSourceIngestProgressBar(String currentTask, int workUnits) { + if (this.runInteractively && !this.cancelled) { + synchronized (this.fileIngestProgressLock) { + this.dataSourceIngestProgress.progress(currentTask, workUnits); + } + } + } + + /** + * Queries whether or not a temporary cancellation of data source level + * ingest in order to stop the currently executing data source level ingest + * module is in effect for this job. + * + * @return True or false. + */ + boolean currentDataSourceIngestModuleIsCancelled() { + return this.currentDataSourceIngestModuleCancelled; + } + + /** + * Rescind a temporary cancellation of data source level ingest that was + * used to stop a single data source level ingest module fro this job. + */ + void currentDataSourceIngestModuleCancellationCompleted() { + this.currentDataSourceIngestModuleCancelled = false; + + if (this.runInteractively) { + /** + * A new progress bar must be created because the cancel button of + * the previously constructed component is disabled by NetBeans when + * the user selects the "OK" button of the cancellation confirmation + * dialog popped up by NetBeans when the progress bar cancel button + * is pressed. + */ + synchronized (this.dataSourceIngestProgressLock) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + this.startDataSourceIngestProgressBar(); + } + } + } + + /** + * Gets the currently running data source level ingest module for this job. + * + * @return The currently running module, may be null. + */ + DataSourceIngestPipeline.PipelineModule getCurrentDataSourceIngestModule() { + if (null != this.currentDataSourceIngestPipeline) { + return this.currentDataSourceIngestPipeline.getCurrentlyRunningModule(); + } else { + return null; + } + } + + /** + * Requests a temporary cancellation of data source level ingest for this + * job in order to stop the currently executing data source ingest module. + */ + void cancelCurrentDataSourceIngestModule() { + this.currentDataSourceIngestModuleCancelled = true; + } + + /** + * Requests cancellation of ingest, i.e., a shutdown of the data source + * level and file level ingest pipelines. + */ + void cancel() { + if (this.runInteractively) { + /** + * Put a cancellation message on data source level ingest progress + * bar, if it is still running. + */ + synchronized (this.dataSourceIngestProgressLock) { + if (dataSourceIngestProgress != null) { + final String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.dataSourceIngest.initialDisplayName", + dataSource.getName()); + dataSourceIngestProgress.setDisplayName( + NbBundle.getMessage(this.getClass(), + "IngestJob.progress.cancelling", + displayName)); + } + } + + /** + * Put a cancellation message on the file level ingest progress bar, + * if it is still running. + */ + synchronized (this.fileIngestProgressLock) { + if (this.fileIngestProgress != null) { + final String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.fileIngest.displayName", + this.dataSource.getName()); + this.fileIngestProgress.setDisplayName( + NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", + displayName)); + } + } + } + + this.cancelled = true; + + /** + * Tell the task scheduler to cancel all pending tasks, i.e., tasks not + * not being performed by an ingest thread. + */ + DataSourceIngestJob.taskScheduler.cancelPendingTasksForIngestJob(this); + this.checkForStageCompleted(); + } + + /** + * Queries whether or not cancellation, i.e., a shutdown of the data source + * level and file level ingest pipelines for this job, has been requested. + * + * @return True or false. + */ + boolean isCancelled() { + return this.cancelled; + } + /** * Write ingest module errors to the log. * @@ -929,27 +950,6 @@ final class DataSourceIngestJob { } } - /** - * Gets the currently running data source level ingest module. - * - * @return The currently running module, may be null. - */ - DataSourceIngestPipeline.PipelineModule getCurrentDataSourceIngestModule() { - if (null != this.currentDataSourceIngestPipeline) { - return this.currentDataSourceIngestPipeline.getCurrentlyRunningModule(); - } else { - return null; - } - } - - /** - * Requests a temporary cancellation of data source level ingest in order to - * stop the currently executing data source ingest module. - */ - private void cancelCurrentDataSourceIngestModule() { - this.currentDataSourceIngestModuleCancelled = true; - } - /** * Gets a snapshot of this jobs state and performance. * @@ -960,50 +960,49 @@ final class DataSourceIngestJob { } /** - * Stores basic diagnostic statistics for an ingest job. + * Stores basic diagnostic statistics for a data source ingest job. */ class Snapshot { - private final long jobId; private final String dataSource; - private final long startTime; - private final DataSourceIngestPipeline.PipelineModule dataSourceLevelModule; - private final boolean fileIngestRunning; - private final Date fileIngestStartTime; + private final long jobId; + private final long jobStartTime; + private final long snapShotTime; + private final DataSourceIngestPipeline.PipelineModule dataSourceLevelIngestModule; + private boolean fileIngestRunning; + private Date fileIngestStartTime; private final long processedFiles; private final long estimatedFilesToProcess; - private final long snapShotTime; private final IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot; /** - * Constructs an object to store basic diagnostic statistics for an - * ingest job. + * Constructs an object to store basic diagnostic statistics for a data + * source ingest job. */ Snapshot() { - this.jobId = DataSourceIngestJob.this.id; this.dataSource = DataSourceIngestJob.this.dataSource.getName(); - this.startTime = DataSourceIngestJob.this.createTime; + this.jobId = DataSourceIngestJob.this.id; + this.jobStartTime = DataSourceIngestJob.this.createTime; + this.dataSourceLevelIngestModule = DataSourceIngestJob.this.getCurrentDataSourceIngestModule(); /** - * Get a snapshot of data source level ingest. + * Determine whether file ingest is running at the time of this + * snapshot and determine the earliest file ingest level pipeline + * start time, if file ingest was started at all. */ - this.dataSourceLevelModule = DataSourceIngestJob.this.getCurrentDataSourceIngestModule(); - - /** - * Get a snapshot of file level ingest. - */ - boolean fileIngestIsRunning = false; - Date fileIngestStart = null; for (FileIngestPipeline pipeline : DataSourceIngestJob.this.fileIngestPipelines) { if (pipeline.isRunning()) { - fileIngestIsRunning = true; - if (null == fileIngestStart || pipeline.getStartTime().before(fileIngestStart)) { - fileIngestStart = pipeline.getStartTime(); - } + this.fileIngestRunning = true; + } + Date pipelineStartTime = pipeline.getStartTime(); + if (null != pipelineStartTime && (null == this.fileIngestStartTime || pipelineStartTime.before(this.fileIngestStartTime))) { + this.fileIngestStartTime = pipelineStartTime; } } - this.fileIngestRunning = fileIngestIsRunning; - this.fileIngestStartTime = fileIngestStart; + + /** + * Get processed file statistics. + */ synchronized (DataSourceIngestJob.this.fileIngestProgressLock) { this.processedFiles = DataSourceIngestJob.this.processedFiles; this.estimatedFilesToProcess = DataSourceIngestJob.this.estimatedFilesToProcess; @@ -1017,13 +1016,13 @@ final class DataSourceIngestJob { } /** - * Gets the identifier of the ingest job that is the subject of this - * snapshot. + * Gets time these statistics were collected. * - * @return The ingest job id. + * @return The statistics collection time as number of milliseconds + * since January 1, 1970, 00:00:00 GMT. */ - long getJobId() { - return this.jobId; + long getSnapshotTime() { + return snapShotTime; } /** @@ -1037,13 +1036,13 @@ final class DataSourceIngestJob { } /** - * Gets files per second throughput since the ingest job that is the - * subject of this snapshot started. + * Gets the identifier of the ingest job that is the subject of this + * snapshot. * - * @return Files processed per second (approximate). + * @return The ingest job id. */ - double getSpeed() { - return (double) processedFiles / ((snapShotTime - startTime) / 1000); + long getJobId() { + return this.jobId; } /** @@ -1052,18 +1051,30 @@ final class DataSourceIngestJob { * @return The start time as number of milliseconds since January 1, * 1970, 00:00:00 GMT. */ - long getStartTime() { - return startTime; + long getJobStartTime() { + return jobStartTime; + } + + DataSourceIngestPipeline.PipelineModule getDataSourceLevelIngestModule() { + return this.dataSourceLevelIngestModule; + } + + boolean fileIngestIsRunning() { + return this.fileIngestRunning; + } + + Date fileIngestStartTime() { + return this.fileIngestStartTime; } /** - * Gets time these statistics were collected. + * Gets files per second throughput since the ingest job that is the + * subject of this snapshot started. * - * @return The statistics collection time as number of milliseconds - * since January 1, 1970, 00:00:00 GMT. + * @return Files processed per second (approximate). */ - long getSnapshotTime() { - return snapShotTime; + double getSpeed() { + return (double) processedFiles / ((snapShotTime - jobStartTime) / 1000); } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java index 6dcc3ce0bc..2cd1f65eb0 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestPipeline.java @@ -29,32 +29,27 @@ import org.sleuthkit.datamodel.Content; * source ingest job. It starts the modules, runs data sources through them, and * shuts them down when data source level ingest is complete. *

- * This class is not thread-safe. + * This class is thread-safe. */ final class DataSourceIngestPipeline { private static final IngestManager ingestManager = IngestManager.getInstance(); private final DataSourceIngestJob job; private final List modules = new ArrayList<>(); - private volatile PipelineModule currentModule; private boolean running; + private volatile PipelineModule currentModule; /** * Constructs an object that manages a sequence of data source level ingest * modules. It starts the modules, runs data sources through them, and shuts * them down when data source level ingest is complete. * - * @param job The data source ingest job to which this pipeline belongs. - * @param moduleTemplates The ingest module templates that define the - * pipeline. + * @param job The data source ingest job that owns this pipeline. + * @param moduleTemplates Templates for the creating the ingest modules that + * make up this pipeline. */ DataSourceIngestPipeline(DataSourceIngestJob job, List moduleTemplates) { this.job = job; - - /** - * Create a data source level ingest module instance from each ingest - * module template. - */ for (IngestModuleTemplate template : moduleTemplates) { if (template.isDataSourceIngestModuleTemplate()) { PipelineModule module = new PipelineModule(template.createDataSourceIngestModule(), template.getModuleName()); @@ -73,11 +68,11 @@ final class DataSourceIngestPipeline { } /** - * Starts up the ingest module in this pipeline. + * Starts up the ingest modules in this pipeline. * * @return A list of ingest module startup errors, possibly empty. */ - List startUp() { + synchronized List startUp() { if (this.running) { throw new IllegalStateException("Attempt to start up a pipeline that is already running"); //NON-NLS } @@ -100,7 +95,7 @@ final class DataSourceIngestPipeline { * be processed. * @return A list of processing errors, possible empty. */ - List process(DataSourceIngestTask task) { + synchronized List process(DataSourceIngestTask task) { if (!this.running) { throw new IllegalStateException("Attempt to process with pipeline that is not running"); //NON-NLS } @@ -109,14 +104,13 @@ final class DataSourceIngestPipeline { Content dataSource = task.getDataSource(); for (PipelineModule module : modules) { try { - module.setStartTime(); this.currentModule = module; String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.displayName", module.getDisplayName(), dataSource.getName()); this.job.updateDataSourceIngestProgressBarDisplayName(displayName); this.job.switchDataSourceIngestProgressBarToIndeterminate(); - ingestManager.setIngestTaskProgress(task, module.getDisplayName()); + DataSourceIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName()); module.process(dataSource, new DataSourceIngestModuleProgress(this.job)); } catch (Throwable ex) { // Catch-all exception firewall errors.add(new IngestModuleError(module.getDisplayName(), ex)); @@ -135,7 +129,7 @@ final class DataSourceIngestPipeline { /** * Gets the currently running module. * - * @return The module, possibly null. + * @return The module, possibly null if no module is currently running. */ PipelineModule getCurrentlyRunningModule() { return this.currentModule; @@ -143,17 +137,17 @@ final class DataSourceIngestPipeline { /** * This class decorates a data source level ingest module with a display - * name and a start time. + * name and a processing start time. */ static class PipelineModule implements DataSourceIngestModule { private final DataSourceIngestModule module; private final String displayName; - private Date startTime; + private volatile Date processingStartTime; /** * Constructs an object that decorates a data source level ingest module - * with a display name and a running time. + * with a display name and a processing start time. * * @param module The data source level ingest module to be decorated. * @param displayName The display name. @@ -161,7 +155,7 @@ final class DataSourceIngestPipeline { PipelineModule(DataSourceIngestModule module, String displayName) { this.module = module; this.displayName = displayName; - this.startTime = new Date(); + this.processingStartTime = new Date(); } /** @@ -174,7 +168,7 @@ final class DataSourceIngestPipeline { } /** - * Gets a module name suitable for display in a UI. + * Gets the display of the decorated ingest module. * * @return The display name. */ @@ -182,21 +176,15 @@ final class DataSourceIngestPipeline { return this.displayName; } - /** - * Sets the start time to the current time. - */ - void setStartTime() { - this.startTime = new Date(); - } - /** * Gets the time the decorated ingest module started processing the data * source. * - * @return The start time. + * @return The start time, will be null if the module has not started + * processing the data source yet. */ - Date getStartTime() { - return this.startTime; + Date getProcessingStartTime() { + return this.processingStartTime; } /** @@ -212,7 +200,7 @@ final class DataSourceIngestPipeline { */ @Override public IngestModule.ProcessResult process(Content dataSource, DataSourceIngestModuleProgress statusHelper) { - this.startTime = new Date(); + this.processingStartTime = new Date(); return this.module.process(dataSource, statusHelper); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java index f322913690..8a0b2325f4 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/FileIngestPipeline.java @@ -28,9 +28,7 @@ import org.sleuthkit.datamodel.AbstractFile; * ingest job. It starts the modules, runs files through them, and shuts them * down when file level ingest is complete. *

- * This class is not thread-safe, it is intended to be used by one file ingest - * thread at at time. However, the running flag is volatile since it may be read - * by another thread looking for a progress snapshot. + * This class is thread-safe. */ final class FileIngestPipeline { @@ -51,11 +49,6 @@ final class FileIngestPipeline { */ FileIngestPipeline(DataSourceIngestJob job, List moduleTemplates) { this.job = job; - - /** - * Create an ingest module instance from each file ingest module - * template and add it to the pipeline. - */ for (IngestModuleTemplate template : moduleTemplates) { if (template.isFileIngestModuleTemplate()) { PipelineModule module = new PipelineModule(template.createFileIngestModule(), template.getModuleName()); @@ -65,7 +58,7 @@ final class FileIngestPipeline { } /** - * Indicates whether or not there are any ingest modules in this pipeline. + * Queries whether or not there are any ingest modules in this pipeline. * * @return True or false. */ @@ -74,7 +67,7 @@ final class FileIngestPipeline { } /** - * Queries whether or not this file ingest level pipeline is running. + * Queries whether or not this pipeline is running. * * @return True or false. */ @@ -82,18 +75,28 @@ final class FileIngestPipeline { return this.running; } + /** + * Returns the start up time of this pipeline. + * + * @return The file processing start time, may be null if this pipeline has + * not been started yet. + */ + Date getStartTime() { + return this.startTime; + } + /** * Starts up all of the ingest modules in the pipeline. * * @return List of start up errors, possibly empty. */ - List startUp() { + synchronized List startUp() { if (this.running) { throw new IllegalStateException("Attempt to start up a pipeline that is already running"); //NON-NLS } this.running = true; this.startTime = new Date(); - + List errors = new ArrayList<>(); for (PipelineModule module : this.modules) { try { @@ -105,22 +108,13 @@ final class FileIngestPipeline { return errors; } - /** - * Returns the start up time of the pipeline. - * - * @return The file processing start time, may be null. - */ - Date getStartTime() { - return this.startTime; - } - /** * Runs a file through the ingest modules in sequential order. * * @param task A file level ingest task containing a file to be processed. * @return A list of processing errors, possible empty. */ - List process(FileIngestTask task) { + synchronized List process(FileIngestTask task) { if (!this.running) { throw new IllegalStateException("Attempt to process file with pipeline that is not running"); //NON-NLS } @@ -151,7 +145,7 @@ final class FileIngestPipeline { * * @return A list of shut down errors, possibly empty. */ - List shutDown() { + synchronized List shutDown() { if (!this.running) { throw new IllegalStateException("Attempt to shut down a pipeline that is not running"); //NON-NLS } @@ -198,7 +192,7 @@ final class FileIngestPipeline { } /** - * Gets display name of the decorated ingest module. + * Gets the display name of the decorated ingest module. * * @return The display name. */ diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 01ebfb0765..d6725b8220 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -30,6 +30,8 @@ import org.sleuthkit.datamodel.Content; /** * Runs a collection of data sources through a set of ingest modules specified * via ingest job settings. + *

+ * This class is thread-safe. */ public final class IngestJob { @@ -44,13 +46,14 @@ public final class IngestJob { * * @param dataSources The data sources to be ingested. * @param settings The ingest job settings. - * @param doUI Whether or not to do UI interactions. + * @param runInteractively Whether or not this job should use progress bars, + * message boxes for errors, etc. */ - IngestJob(Collection dataSources, IngestJobSettings settings, boolean doUI) { + IngestJob(Collection dataSources, IngestJobSettings settings, boolean runInteractively) { this.id = IngestJob.nextId.getAndIncrement(); this.dataSourceJobs = new HashMap<>(); for (Content dataSource : dataSources) { - DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, doUI); + DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, runInteractively); this.dataSourceJobs.put(dataSourceIngestJob.getId(), dataSourceIngestJob); } } @@ -65,40 +68,73 @@ public final class IngestJob { } /** - * Gets a snapshot of the state and performance of this ingest job. + * Checks to see if this ingest job has at least one ingest pipeline when + * its settings are applied. + * + * @return True or false. + */ + boolean hasIngestPipeline() { + for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { + if (dataSourceJob.hasIngestPipeline()) { + return true; + } + } + return false; + } + + /** + * Starts this ingest job by starting its ingest module pipelines and + * scheduling the ingest tasks that make up the job. + * + * @return A collection of ingest module start up errors, empty on success. + */ + synchronized List start() { + List errors = new ArrayList<>(); + for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { + errors.addAll(dataSourceJob.start()); + if (!errors.isEmpty()) { + // RJCTODO: Need to let sucessfully started data source ingest + // jobs know they should shut down. + break; + } + } + return errors; + } + + /** + * Gets a snapshot of the progress of this ingest job. * * @return The snapshot. */ synchronized public ProgressSnapshot getSnapshot() { - /** - * There are race conditions in the code that follows, but they are not - * important because this is just a coarse-grained status report. If - * stale data is returned in any single snapshot, it will be corrected - * in subsequent snapshots. - */ DataSourceIngestModuleHandle moduleHandle = null; - boolean fileIngestRunning = false; + boolean fileIngestIsRunning = false; Date fileIngestStartTime = null; - for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { - DataSourceIngestPipeline.PipelineModule module = dataSourceJob.getCurrentDataSourceIngestModule(); - if (null != module) { - moduleHandle = new DataSourceIngestModuleHandle(dataSourceJob.getId(), module); + for (DataSourceIngestJob.Snapshot snapshot : this.getDataSourceIngestJobSnapshots()) { + if (null != moduleHandle) { + DataSourceIngestPipeline.PipelineModule module = snapshot.getDataSourceLevelIngestModule(); + if (null != module) { + moduleHandle = new DataSourceIngestModuleHandle(this.dataSourceJobs.get(snapshot.getJobId()), module); + } + } + if (snapshot.fileIngestIsRunning()) { + fileIngestIsRunning = true; + } + Date childFileIngestStartTime = snapshot.fileIngestStartTime(); + if (null != childFileIngestStartTime && (null == fileIngestStartTime || childFileIngestStartTime.before(fileIngestStartTime))) { + fileIngestStartTime = childFileIngestStartTime; } - - // RJCTODO: For each data source job, check for a running flag and - // get the oldest start data for the start dates, if any. } - - return new ProgressSnapshot(moduleHandle, fileIngestRunning, fileIngestStartTime, this.cancelled); + return new ProgressSnapshot(moduleHandle, fileIngestIsRunning, fileIngestStartTime, this.cancelled); } /** - * Gets snapshots of the state and performance of this ingest job's child - * data source ingest jobs. + * Gets snapshots of the progress of each of this ingest job's child data + * source ingest jobs. * * @return A list of data source ingest job progress snapshots. */ - synchronized List getDetailedSnapshot() { // RJCTODO: Consider renaming + synchronized List getDataSourceIngestJobSnapshots() { List snapshots = new ArrayList<>(); for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { snapshots.add(dataSourceJob.getSnapshot()); @@ -107,21 +143,10 @@ public final class IngestJob { } /** - * Requests cancellation of a specific data source level ingest module. - * Returns immediately, but there may be a delay before the ingest module - * responds by stopping processing, if it is still running when the request - * is made. - * - * @param module The handle of the data source ingest module to be canceled, - * which can obtained from a progress snapshot. - */ - -// RJCTODO - - /** - * Requests cancellation of the data source level and file level ingest - * modules of this ingest job. Returns immediately, but there may be a delay - * before all of the ingest modules respond by stopping processing. + * Requests cancellation of this ingest job, which means discarding + * unfinished tasks and stopping the ingest pipelines. Returns immediately, + * but there may be a delay before all of the ingest modules in the + * pipelines respond by stopping processing. */ synchronized public void cancel() { for (DataSourceIngestJob job : this.dataSourceJobs.values()) { @@ -131,8 +156,8 @@ public final class IngestJob { } /** - * Queries whether or not cancellation of the data source level and file - * level ingest modules of this ingest job has been requested. + * Queries whether or not cancellation of this ingest job has been + * requested. * * @return True or false. */ @@ -141,7 +166,20 @@ public final class IngestJob { } /** - * A snapshot of ingest job progress. + * Provides a callback for completed data source ingest jobs, allowing this + * ingest job to notify the ingest manager when it is complete. + * + * @param dataSourceIngestJob A completed data source ingest job. + */ + synchronized void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) { + this.dataSourceJobs.remove(dataSourceIngestJob.getId()); + if (this.dataSourceJobs.isEmpty()) { + IngestManager.getInstance().finishIngestJob(this); + } + } + + /** + * A snapshot of the progress of an ingest job. */ public static final class ProgressSnapshot { @@ -154,11 +192,11 @@ public final class IngestJob { * Constructs a snapshot of ingest job progress. * * @param dataSourceModule The currently running data source level - * ingest module, may be null + * ingest module, may be null. * @param fileIngestRunning Whether or not file ingest is currently * running. * @param fileIngestStartTime The start time of file level ingest, may - * be null + * be null. * @param cancelled Whether or not a cancellation request has been * issued. */ @@ -171,21 +209,17 @@ public final class IngestJob { /** * Gets a handle to the currently running data source level ingest - * module at the time the snapshot is taken. + * module at the time the snapshot was taken. * * @return The handle, may be null. */ public DataSourceIngestModuleHandle runningDataSourceIngestModule() { - /** - * It is safe to hand out this reference because the object is - * immutable. - */ return this.dataSourceModule; } /** - * Queries whether or not file level ingest is running at the time the - * snapshot is taken. + * Queries whether or not file level ingest was running at the time the + * snapshot was taken. * * @return True or false. */ @@ -203,7 +237,8 @@ public final class IngestJob { } /** - * Queries whether or not a cancellation request has been issued. + * Queries whether or not a cancellation request had been issued at the + * time the snapshot was taken. * * @return True or false. */ @@ -215,21 +250,25 @@ public final class IngestJob { /** * A handle to a data source level ingest module that can be used to get - * basic information about the module and to request cancellation, i.e., - * shut down, of the module. + * basic information about the module and to request cancellation of the + * module. */ public static class DataSourceIngestModuleHandle { - private final long dataSourceIngestJobId; + private final DataSourceIngestJob job; private final DataSourceIngestPipeline.PipelineModule module; /** * Constructs a handle to a data source level ingest module that can be * used to get basic information about the module and to request * cancellation of the module. + * + * @param DataSourceIngestJob The data source ingest job that owns the + * data source level ingest module. + * @param module The data source level ingest module. */ - private DataSourceIngestModuleHandle(long dataSourceIngestJobId, DataSourceIngestPipeline.PipelineModule module) { - this.dataSourceIngestJobId = dataSourceIngestJobId; + private DataSourceIngestModuleHandle(DataSourceIngestJob job, DataSourceIngestPipeline.PipelineModule module) { + this.job = job; this.module = module; } @@ -247,60 +286,34 @@ public final class IngestJob { * Returns the time the data source level ingest module associated with * this handle began processing. * - * @return The module start time. + * @return The module processing start time. */ public Date startTime() { - return this.module.getStartTime(); + return this.module.getProcessingStartTime(); } - + + /** + * Requests cancellation of the ingest module associated with this + * handle. Returns immediately, but there may be a delay before the + * ingest module responds by stopping processing. + */ public void cancel() { - // RJCTODO: - } - - } - - /** - * Starts up the ingest pipelines and ingest progress bars for this job. - * - * @return A collection of ingest module start up errors, empty on success. - */ - List start() { - boolean hasIngestPipeline = false; - List errors = new ArrayList<>(); - for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { - errors.addAll(dataSourceJob.start()); - hasIngestPipeline = dataSourceJob.hasIngestPipeline(); - } - return errors; - } - - /** - * Checks to see if this ingest job has at least one ingest pipeline. - * - * @return True or false. - */ - boolean hasIngestPipeline() { - boolean hasIngestPipeline = false; - for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { - if (dataSourceJob.hasIngestPipeline()) { - hasIngestPipeline = true; - break; + /** + * TODO: Cancellation needs to be more precise. The long-term + * solution is to add a cancel() method to IngestModule and do away + * with the cancellation queries of IngestJobContext. However, until + * an API change is legal, a cancel() method can be added to the + * DataSourceIngestModuleAdapter and FileIngestModuleAdapter classes + * and an instanceof check can be used to call it, with this code as + * the default implementation and the fallback. All of the ingest + * modules participating in this workaround will need to consult the + * cancelled flag in the adapters. + */ + if (this.job.getCurrentDataSourceIngestModule() == this.module) { + this.job.cancelCurrentDataSourceIngestModule(); } } - return hasIngestPipeline; - } - /** - * Provides a callback for completed data source ingest jobs, allowing the - * ingest job to notify the ingest manager when it is complete. - * - * @param dataSourceIngestJob A completed data source ingest job. - */ - synchronized void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) { - this.dataSourceJobs.remove(dataSourceIngestJob.getId()); - if (this.dataSourceJobs.isEmpty()) { - IngestManager.getInstance().finishJob(this); - } } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobConfigurator.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobConfigurator.java index 63ed560b30..258e01c5ad 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobConfigurator.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobConfigurator.java @@ -87,6 +87,6 @@ public final class IngestJobConfigurator { */ @Deprecated public void startIngestJobs(List dataSources) { - IngestManager.getInstance().startIngestJob(dataSources, this.settings, true); + IngestManager.getInstance().queueIngestJob(dataSources, this.settings, true); } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index d1c56e3967..53be42ce12 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -56,15 +56,17 @@ public class IngestManager { private static IngestManager instance = null; /** - * The ingest manager maintains a mapping of ingest job IDs to ingest jobs. + * The ingest manager maintains a mapping of ingest job IDs to running + * ingest jobs. This, in combination with a mapping of thread IDs to + * Callable ingest job starters, determines whether or not ingest is + * running. */ - private final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); + private final HashMap jobsById = new HashMap<>(); /** * Each runnable/callable task the ingest manager submits to its thread * pools is given a unique thread/task ID. */ - // TODO: It is no longer necessary to have multiple thread pools. private final AtomicLong nextThreadId = new AtomicLong(0L); /** @@ -73,7 +75,7 @@ public class IngestManager { * ingest job starter is maintained to provide handles that can be used to * cancel the ingest job starter. */ - private final ConcurrentHashMap> ingestJobStarters = new ConcurrentHashMap<>(); + private final HashMap> ingestJobStarters = new HashMap<>(); private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor(); /** @@ -83,11 +85,11 @@ public class IngestManager { * ingest task executers. There is a single data source level ingest thread * and a user configurable number of file level ingest threads. */ + private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(); private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1; private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16; private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2; - private int numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; - private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(); + private int numberOfFileIngestThreads; private final ExecutorService fileIngestThreadPool; /** @@ -202,6 +204,31 @@ public class IngestManager { return instance; } + /** + * Constructs a manager of the creation and execution of ingest jobs, i.e., + * the processing of data sources by ingest modules. The manager immediately + * submits ingest task executers (Callable objects) to the data source level + * ingest and file level ingest thread pools. The ingest task executers are + * simple consumers that will normally run as long as the application runs. + */ + private IngestManager() { + long threadId = nextThreadId.incrementAndGet(); + dataSourceIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); + + numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); + if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) { + numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; + UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads); + } + fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads); + for (int i = 0; i < numberOfFileIngestThreads; ++i) { + threadId = nextThreadId.incrementAndGet(); + fileIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); + } + } + /** * Gets the number of file ingest threads the ingest manager will use to do * ingest jobs. @@ -212,17 +239,127 @@ public class IngestManager { return numberOfFileIngestThreads; } + private void subscribeToCaseEvents() { + Case.addPropertyChangeListener(new PropertyChangeListener() { + @Override + public void propertyChange(PropertyChangeEvent event) { + if (event.getPropertyName().equals(Case.Events.CURRENT_CASE.toString())) { + if (event.getNewValue() != null) { + handleCaseOpened(); + } else { + handleCaseClosed(); + } + } + } + }); + } + + void handleCaseOpened() { + this.jobCreationIsEnabled = true; + clearIngestMessageBox(); + } + + void handleCaseClosed() { + this.jobCreationIsEnabled = false; + cancelAllIngestJobs(); + clearIngestMessageBox(); + } + + /** + * Called by the custom installer for this package once the window system is + * initialized, allowing the ingest manager to get the top component used to + * display ingest messages. + */ + void initIngestMessageInbox() { + ingestMessageBox = IngestMessageTopComponent.findInstance(); + } + + /** + * Post a message to the ingest messages in box. + * + * @param message The message to be posted. + */ + // RJCTODO: Can I cut this off effectively? + void postIngestMessage(IngestMessage message) { + if (ingestMessageBox != null) { + if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) { + ingestMessageBox.displayMessage(message); + } else { + long errorPosts = ingestErrorMessagePosts.incrementAndGet(); + if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) { + ingestMessageBox.displayMessage(message); + } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) { + IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage( + NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"), + NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"), + NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS)); + ingestMessageBox.displayMessage(errorMessageLimitReachedMessage); + } + } + } + } + + private void clearIngestMessageBox() { + if (ingestMessageBox != null) { + ingestMessageBox.clearMessages(); + } + ingestErrorMessagePosts.set(0); + } + + /** + * Queues an ingest job that will process a collection of data sources. The + * job will be started on a worker thread. + * + * @param dataSources The data sources to process. + * @param settings The settings for the ingest job. + * @param runInteractively Whether or not this job should use progress bars, + * message boxes for errors, etc. + */ + public synchronized void queueIngestJob(Collection dataSources, IngestJobSettings settings, boolean runInteractively) { + if (this.jobCreationIsEnabled) { + IngestJob job = new IngestJob(dataSources, settings, runInteractively); + if (job.hasIngestPipeline()) { + long taskId = nextThreadId.incrementAndGet(); + Future task = startIngestJobsThreadPool.submit(new IngestJobStarter(taskId, job, runInteractively)); + ingestJobStarters.put(taskId, task); + } + } + } + + /** + * Starts an ingest job that will process a collection of data sources. + * + * @param dataSources The data sources to process. + * @param settings The settings for the ingest job. + * @param runInteractively Whether or not this job should use progress bars, + * message boxes for errors, etc. + * @return The ingest job that was started or null if the job could not be + * started. + */ + public synchronized IngestJob startIngestJob(Collection dataSources, IngestJobSettings settings, boolean runInteractively) { + IngestJob job = null; + if (this.jobCreationIsEnabled) { + job = new IngestJob(dataSources, settings, runInteractively); + if (job.hasIngestPipeline()) { + List errors = this.startIngestJob(job, runInteractively); + if (!errors.isEmpty()) { + job = null; + } + } + } + return job; + } + /** * Starts an ingest job for a collection of data sources. * - * @param dataSources The data sources to be processed. - * @param settings The ingest job settings. - * @param doUI Whether or not to support user interaction, e.g., showing - * message boxes and reporting progress through the NetBeans Progress API. - * @return The ingest job that was started + * @param dataSource The data sources to ingest. + * @param settings The settings for the job. + * @param runInteractively Whether or not to interact with the UI + * @return A collection of ingest module start up errors, empty on success. */ - public synchronized void startIngestJob(Collection dataSources, IngestJobSettings settings, boolean doUI) { - if (!isIngestRunning()) { + private List startIngestJob(IngestJob job, boolean runInteractively) { + if (runInteractively && jobsById.isEmpty()) { // RJCTODO: This is sort of broken clearIngestMessageBox(); } @@ -230,16 +367,25 @@ public class IngestManager { ingestMonitor.start(); } - if (doUI) { - /** - * Assume request is from code running on the EDT and dispatch to a - * worker thread. - */ - long taskId = nextThreadId.incrementAndGet(); - Future task = startIngestJobsThreadPool.submit(new IngestJobStarter(taskId, dataSources, settings, doUI)); - ingestJobStarters.put(taskId, task); + List errors = job.start(); + if (errors.isEmpty()) { + long jobId = job.getId(); + this.jobsById.put(jobId, job); + this.fireIngestJobStarted(jobId); + IngestManager.logger.log(Level.INFO, "Ingest job {0} started", jobId); + } + return errors; + } + + void finishIngestJob(IngestJob job) { + long jobId = job.getId(); + this.jobsById.remove(jobId); + if (!job.isCancelled()) { + IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); + this.fireIngestJobCompleted(jobId); } else { - this.startJob(dataSources, settings, doUI); + IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); + this.fireIngestJobCancelled(jobId); } } @@ -249,9 +395,7 @@ public class IngestManager { * @return True or false. */ public boolean isIngestRunning() { - // RJCTODO: This may return the wrong answer if an IngestJobStarter has - // been dispatched to the startIngestJobsThreadPool. - return !this.jobsById.isEmpty(); + return !this.jobsById.isEmpty() || !ingestJobStarters.values().isEmpty(); } /** @@ -332,123 +476,59 @@ public class IngestManager { } /** - * Constructs a manager of the creation and execution of ingest jobs, i.e., - * the processing of data sources by ingest modules. The manager immediately - * submits ingest task executers (Callable objects) to the data source level - * ingest and file level ingest thread pools. The ingest task executers are - * simple consumers that will normally run as long as the application runs. - */ - private IngestManager() { - startDataSourceIngestThread(); - - numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); - if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) { - numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; - UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads); - } - fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads); - for (int i = 0; i < numberOfFileIngestThreads; ++i) { - startFileIngestThread(); - } - } - - /** - * Called by the custom installer for this package once the window system is - * initialized, allowing the ingest manager to get the top component used to - * display ingest messages. - */ - void initIngestMessageInbox() { - ingestMessageBox = IngestMessageTopComponent.findInstance(); - } - - /** - * Submits an ingest task executer Callable to the data source level ingest - * thread pool. - */ - private void startDataSourceIngestThread() { - long threadId = nextThreadId.incrementAndGet(); - dataSourceIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); - ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - } - - /** - * Submits a ingest task executer Callable to the file level ingest thread - * pool. - */ - private void startFileIngestThread() { - long threadId = nextThreadId.incrementAndGet(); - fileIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); - ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - } - - private void subscribeToCaseEvents() { - Case.addPropertyChangeListener(new PropertyChangeListener() { - @Override - public void propertyChange(PropertyChangeEvent event) { - if (event.getPropertyName().equals(Case.Events.CURRENT_CASE.toString())) { - if (event.getNewValue() != null) { - handleCaseOpened(); - } else { - handleCaseClosed(); - } - } - } - }); - } - - void handleCaseOpened() { - this.jobCreationIsEnabled = true; - clearIngestMessageBox(); - } - - void handleCaseClosed() { - this.jobCreationIsEnabled = false; - cancelAllIngestJobs(); - clearIngestMessageBox(); - } - - private void clearIngestMessageBox() { - if (ingestMessageBox != null) { - ingestMessageBox.clearMessages(); - } - ingestErrorMessagePosts.set(0); - } - - /** - * Starts an ingest job for a collection of data sources. + * Fire an ingest event signifying an ingest job started. * - * @param dataSource The data sources to ingest. - * @param settings The settings for the job. - * @param doUI Whether or not to interact with the UI - * @return A collection of ingest module start up errors, empty on success. + * @param ingestJobId The ingest job id. */ - private List startJob(Collection dataSources, IngestJobSettings settings, boolean doUI) { - List errors = new ArrayList<>(); - if (this.jobCreationIsEnabled) { - IngestJob job = new IngestJob(dataSources, settings, doUI); - long jobId = job.getId(); - this.jobsById.put(jobId, job); - errors = job.start(); - if (errors.isEmpty() && job.hasIngestPipeline()) { - this.fireIngestJobStarted(jobId); - IngestManager.logger.log(Level.INFO, "Ingest job {0} started", jobId); - } else { - this.jobsById.remove(jobId); - } - } - return errors; + void fireIngestJobStarted(long ingestJobId) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); } - void finishJob(IngestJob job) { - long jobId = job.getId(); - this.jobsById.remove(jobId); - if (!job.isCancelled()) { - IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); - this.fireIngestJobCompleted(jobId); - } else { - IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); - this.fireIngestJobCancelled(jobId); - } + /** + * Fire an ingest event signifying an ingest job finished. + * + * @param ingestJobId The ingest job id. + */ + void fireIngestJobCompleted(long ingestJobId) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); + } + + /** + * Fire an ingest event signifying an ingest job was canceled. + * + * @param ingestJobId The ingest job id. + */ + void fireIngestJobCancelled(long ingestJobId) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); + } + + /** + * Fire an ingest event signifying the ingest of a file is completed. + * + * @param file The file that is completed. + */ + void fireFileIngestDone(AbstractFile file) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file)); + } + + /** + * Fire an event signifying a blackboard post by an ingest module. + * + * @param moduleDataEvent A ModuleDataEvent with the details of the posting. + */ + void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); + } + + /** + * Fire an event signifying discovery of additional content by an ingest + * module. + * + * @param moduleDataEvent A ModuleContentEvent with the details of the new + * content. + */ + void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { + fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); } /** @@ -540,83 +620,16 @@ public class IngestManager { } /** - * Fire an ingest event signifying an ingest job started. + * Gets snapshots of the state of all running ingest jobs. * - * @param ingestJobId The ingest job id. + * @return A list of ingest job state snapshots. */ - void fireIngestJobStarted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); - } - - /** - * Fire an ingest event signifying an ingest job finished. - * - * @param ingestJobId The ingest job id. - */ - void fireIngestJobCompleted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); - } - - /** - * Fire an ingest event signifying an ingest job was canceled. - * - * @param ingestJobId The ingest job id. - */ - void fireIngestJobCancelled(long ingestJobId) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); - } - - /** - * Fire an ingest event signifying the ingest of a file is completed. - * - * @param file The file that is completed. - */ - void fireFileIngestDone(AbstractFile file) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file)); - } - - /** - * Fire an event signifying a blackboard post by an ingest module. - * - * @param moduleDataEvent A ModuleDataEvent with the details of the posting. - */ - void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); - } - - /** - * Fire an event signifying discovery of additional content by an ingest - * module. - * - * @param moduleDataEvent A ModuleContentEvent with the details of the new - * content. - */ - void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { - fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); - } - - /** - * Post a message to the ingest messages in box. - * - * @param message The message to be posted. - */ - void postIngestMessage(IngestMessage message) { - if (ingestMessageBox != null) { - if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) { - ingestMessageBox.displayMessage(message); - } else { - long errorPosts = ingestErrorMessagePosts.incrementAndGet(); - if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) { - ingestMessageBox.displayMessage(message); - } else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) { - IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage( - NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"), - NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"), - NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS)); - ingestMessageBox.displayMessage(errorMessageLimitReachedMessage); - } - } + List getIngestJobSnapshots() { + List snapShots = new ArrayList<>(); + for (IngestJob job : this.jobsById.values()) { + snapShots.addAll(job.getDataSourceIngestJobSnapshots()); } + return snapShots; } /** @@ -633,73 +646,56 @@ public class IngestManager { } } - /** - * Gets snapshots of the state of all running ingest jobs. - * - * @return A list of ingest job state snapshots. - */ - List getIngestJobSnapshots() { - List snapShots = new ArrayList<>(); - for (IngestJob job : this.jobsById.values()) { - snapShots.addAll(job.getDetailedSnapshot()); - } - return snapShots; - } - /** * Creates and starts an ingest job for a collection of data sources. */ private final class IngestJobStarter implements Callable { private final long threadId; - private final Collection dataSources; - private final IngestJobSettings settings; - private final boolean doStartupErrorsMsgBox; + private final IngestJob job; + private final boolean runInteractively; private ProgressHandle progress; - IngestJobStarter(long threadId, Collection dataSources, IngestJobSettings settings, boolean doMessageDialogs) { + IngestJobStarter(long threadId, IngestJob job, boolean runInteractively) { this.threadId = threadId; - this.dataSources = dataSources; - this.settings = settings; - this.doStartupErrorsMsgBox = doMessageDialogs; + this.job = job; + this.runInteractively = runInteractively; } @Override public Void call() { try { - /** - * Bail out if there is nothing to do or cancellation has been - * requested. - */ - if (this.dataSources.isEmpty() || Thread.currentThread().isInterrupted()) { + if (Thread.currentThread().isInterrupted()) { return null; } /** * Set up a progress bar. */ - final String displayName = NbBundle.getMessage(this.getClass(), - "IngestManager.StartIngestJobsTask.run.displayName"); - progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { - @Override - public boolean cancel() { - if (progress != null) { - progress.setDisplayName(NbBundle.getMessage(this.getClass(), - "IngestManager.StartIngestJobsTask.run.cancelling", - displayName)); + if (runInteractively) { + final String displayName = NbBundle.getMessage(this.getClass(), + "IngestManager.StartIngestJobsTask.run.displayName"); + this.progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() { + @Override + public boolean cancel() { + if (progress != null) { + progress.setDisplayName(NbBundle.getMessage(this.getClass(), + "IngestManager.StartIngestJobsTask.run.cancelling", + displayName)); + } + Future handle = ingestJobStarters.remove(threadId); + handle.cancel(true); + return true; } - Future handle = ingestJobStarters.remove(threadId); - handle.cancel(true); - return true; - } - }); - progress.start(); + }); + progress.start(); + } /** * Try to start the ingest job. */ - List errors = IngestManager.this.startJob(this.dataSources, this.settings, true); - if (!errors.isEmpty() && this.doStartupErrorsMsgBox) { + List errors = IngestManager.this.startIngestJob(job, runInteractively); + if (!errors.isEmpty() && this.runInteractively) { StringBuilder moduleStartUpErrors = new StringBuilder(); for (IngestModuleError error : errors) { String moduleName = error.getModuleDisplayName(); @@ -867,6 +863,7 @@ public class IngestManager { String getFileName() { return fileName; } + } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestModule.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestModule.java index 17bd6a0dbf..7485b71e4b 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestModule.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestModule.java @@ -81,4 +81,10 @@ public interface IngestModule { * @throws org.sleuthkit.autopsy.ingest.IngestModule.IngestModuleException */ void startUp(IngestJobContext context) throws IngestModuleException; + + /** + * TODO: The next time an API change is legal, add a cancel() method and + * remove the "ingest job is canceled" queries from the IngestJobContext + * class. + */ } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java index 0e9e97a96b..ce6f0e3c0a 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java @@ -202,7 +202,7 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel { break; case 2: SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - cellValue = dateFormat.format(new Date(snapShot.getStartTime())); + cellValue = dateFormat.format(new Date(snapShot.getJobStartTime())); break; case 3: cellValue = snapShot.getFilesProcessed(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/RunIngestModulesDialog.java b/Core/src/org/sleuthkit/autopsy/ingest/RunIngestModulesDialog.java index b69e967ad9..9ed1f33489 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/RunIngestModulesDialog.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/RunIngestModulesDialog.java @@ -199,7 +199,7 @@ public final class RunIngestModulesDialog extends JDialog { ingestJobSettings.save(); showWarnings(ingestJobSettings); if (startIngestJob) { - IngestManager.getInstance().startIngestJob(RunIngestModulesDialog.this.dataSources, ingestJobSettings, true); + IngestManager.getInstance().queueIngestJob(RunIngestModulesDialog.this.dataSources, ingestJobSettings, true); } setVisible(false); dispose();