From 3b9b10ea2ae4488a858de5ef030e3edbc598be83 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Mon, 3 Nov 2014 23:56:12 -0500 Subject: [PATCH] Complete first version of multi-stage ingest --- .../DataSourceIngestModuleProgress.java | 2 +- .../sleuthkit/autopsy/ingest/IngestJob.java | 442 ++++++++++-------- .../autopsy/ingest/IngestJobContext.java | 18 +- .../autopsy/ingest/IngestManager.java | 79 ++-- .../ingest/IngestModuleFactoryLoader.java | 4 +- .../ingest/IngestPipelinesConfiguration.java | 10 +- .../autopsy/ingest/IngestTasksScheduler.java | 324 ++++++------- .../autopsy/ingest/PipelineConfig.xml | 1 + .../PhotoRecCarverIngestModuleFactory.java | 1 - 9 files changed, 442 insertions(+), 439 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java index 1fa66aa07c..a972fd86a4 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java @@ -67,7 +67,7 @@ public class DataSourceIngestModuleProgress { * @param message Message to display */ public void progress(String message) { - this.job.advanceDataSourceIngestProgressBar(message); // RJCTODO: Is this right? + this.job.advanceDataSourceIngestProgressBar(message); } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index c050fd0a1c..fce16f5bb6 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -42,23 +42,35 @@ import org.sleuthkit.datamodel.Content; */ final class IngestJob { - private static final Logger logger = Logger.getLogger(IngestJob.class.getName()); - private static final IngestTasksScheduler ingestScheduler = IngestTasksScheduler.getInstance(); - - // These static fields are used for the creation and management of ingest - // jobs in progress. - private static volatile boolean jobCreationIsEnabled; - private static final AtomicLong nextIngestJobId = new AtomicLong(0L); - private static final ConcurrentHashMap ingestJobsById = new ConcurrentHashMap<>(); - - // An ingest job may have multiple stages. + /** + * An ingest job may have multiple stages. + */ private enum Stages { - FIRST, // High priority data source ingest modules plus file ingest modules - SECOND // Low priority data source ingest modules + /** + * High priority data source ingest modules and file ingest modules. + */ + FIRST, + /** + * Lower priority, usually long-running, data source ingest modules. + */ + SECOND }; - // These fields define the ingest job and the work it entails. + private static final Logger logger = Logger.getLogger(IngestJob.class.getName()); + private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); + + /** + * These static fields are used for the creation and management of ingest + * jobs in progress. + */ + private static volatile boolean jobCreationIsEnabled; + private static final AtomicLong nextJobId = new AtomicLong(0L); + private static final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); + + /** + * These fields define the ingest job and the work it entails. + */ private final long id; private final Content dataSource; private final boolean processUnallocatedSpace; @@ -68,10 +80,12 @@ final class IngestJob { private DataSourceIngestPipeline secondStageDataSourceIngestPipeline; private final LinkedBlockingQueue fileIngestPipelines; - // These fields are used to update the ingest progress UI components. The - // filesInProgress collection contains the names of the files that are in - // the file ingest pipelines and the two file counter fields are used to - // update the file ingest progress bar. + /** + * These fields are used to update ingest progress UI components for the + * job. The filesInProgress collection contains the names of the files that + * are in the file ingest pipelines and the two file counter fields are used + * to update the file ingest progress bar. + */ private ProgressHandle dataSourceIngestProgress; private final Object dataSourceIngestProgressLock; private final List filesInProgress; @@ -80,12 +94,16 @@ final class IngestJob { private ProgressHandle fileIngestProgress; private final Object fileIngestProgressLock; - // These fields support cancellation of either the currently running data - // source ingest module or the entire ingest job. + /** + * These fields support cancellation of either the currently running data + * source ingest module or the entire ingest job. + */ private volatile boolean currentDataSourceIngestModuleCancelled; private volatile boolean cancelled; - // This field is used for generating ingest job diagnostic data. + /** + * This field is used for generating ingest job diagnostic data. + */ private final long startTime; /** @@ -98,29 +116,27 @@ final class IngestJob { } /** - * Creates an ingest job for a data source. + * Starts an ingest job for a data source. * * @param dataSource The data source to ingest. * @param ingestModuleTemplates The ingest module templates to use to create * the ingest pipelines for the job. * @param processUnallocatedSpace Whether or not the job should include * processing of unallocated space. - * * @return A collection of ingest module start up errors, empty on success. - * - * @throws InterruptedException */ - static List startJob(Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException { + static List startJob(Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) { List errors = new ArrayList<>(); if (IngestJob.jobCreationIsEnabled) { - long jobId = nextIngestJobId.incrementAndGet(); + long jobId = nextJobId.incrementAndGet(); IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace); + IngestJob.jobsById.put(jobId, job); errors = job.start(ingestModuleTemplates); - if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) { // RJCTODO: What about 2nd stage only? - ingestJobsById.put(jobId, job); + if (errors.isEmpty() && job.hasIngestPipeline()) { IngestManager.getInstance().fireIngestJobStarted(jobId); - IngestJob.ingestScheduler.scheduleIngestTasks(job); - logger.log(Level.INFO, "Ingest job {0} started", jobId); + IngestJob.logger.log(Level.INFO, "Ingest job {0} started", jobId); + } else { + IngestJob.jobsById.remove(jobId); } } return errors; @@ -132,27 +148,27 @@ final class IngestJob { * @return True or false. */ static boolean ingestJobsAreRunning() { - return !ingestJobsById.isEmpty(); + return !jobsById.isEmpty(); } /** - * RJCTODO + * Gets snapshots of the state of all running ingest jobs. * - * @return + * @return A list of ingest job state snapshots. */ static List getJobSnapshots() { List snapShots = new ArrayList<>(); - for (IngestJob job : IngestJob.ingestJobsById.values()) { + for (IngestJob job : IngestJob.jobsById.values()) { snapShots.add(job.getIngestJobSnapshot()); } return snapShots; } /** - * RJCTODO + * Cancels all running ingest jobs. */ static void cancelAllJobs() { - for (IngestJob job : ingestJobsById.values()) { + for (IngestJob job : jobsById.values()) { job.cancel(); } } @@ -165,7 +181,7 @@ final class IngestJob { * @param processUnallocatedSpace Whether or not unallocated space should be * processed during the ingest job. */ - IngestJob(long id, Content dataSource, boolean processUnallocatedSpace) { + private IngestJob(long id, Content dataSource, boolean processUnallocatedSpace) { this.id = id; this.dataSource = dataSource; this.processUnallocatedSpace = processUnallocatedSpace; @@ -178,9 +194,9 @@ final class IngestJob { } /** - * Gets the identifier assigned to the ingest job. + * Gets the identifier assigned to this job. * - * @return The ingest job identifier. + * @return The job identifier. */ long getId() { return this.id; @@ -206,51 +222,12 @@ final class IngestJob { } /** - * Starts up the ingest pipelines and ingest progress bars. - * - * @return A collection of ingest module startup errors, empty on success. - * @throws InterruptedException - */ - List start(List ingestModuleTemplates) throws InterruptedException { - this.createIngestPipelines(ingestModuleTemplates); - List errors = startUpIngestPipelines(); - if (errors.isEmpty()) { - if (!this.dataSourceIngestPipeline.isEmpty()) { - this.startDataSourceIngestProgressBar(); - } - if (!this.fileIngestPipelines.peek().isEmpty()) { - this.startFileIngestProgressBar(); - } - } - return errors; - } - - /** - * Checks to see if this job has a data source ingest pipeline. - * - * @return True or false. - */ - boolean hasDataSourceIngestPipeline() { - return (this.dataSourceIngestPipeline.isEmpty() == false); - } - - /** - * Checks to see if the job has a file ingest pipeline. - * - * @return True or false. - */ - boolean hasFileIngestPipeline() { - return (this.fileIngestPipelines.peek().isEmpty() == false); - } - - /** - * Passes the data source for this job through the data source ingest + * Passes the data source for this job through a data source ingest * pipeline. * * @param task A data source ingest task wrapping the data source. - * @throws InterruptedException */ - void process(DataSourceIngestTask task) throws InterruptedException { + void process(DataSourceIngestTask task) { try { if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) { List errors = new ArrayList<>(); @@ -268,9 +245,12 @@ final class IngestJob { } } } finally { - // No matter what happens, let the ingest scheduler know that this - // task is completed. - IngestJob.ingestScheduler.notifyTaskCompleted(task); + // No matter what happens, let the task scheduler know that this + // task is completed and check for job completion. + IngestJob.taskScheduler.notifyTaskCompleted(task); + if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) { + this.handleTasksCompleted(); + } } } @@ -284,11 +264,15 @@ final class IngestJob { void process(FileIngestTask task) throws InterruptedException { try { if (!this.isCancelled()) { - // Get a file ingest pipeline not currently in use by another - // file ingest thread. + /** + * Get a file ingest pipeline not currently in use by another + * file ingest thread. + */ FileIngestPipeline pipeline = this.fileIngestPipelines.take(); if (!pipeline.isEmpty()) { - // Get the file to process. + /** + * Get the file to process. + */ AbstractFile file = task.getFile(); // Update the file ingest progress bar. @@ -328,46 +312,28 @@ final class IngestJob { this.fileIngestPipelines.put(pipeline); } } finally { - // No matter what happens, let the ingest scheduler know that this - // task is completed. - IngestJob.ingestScheduler.notifyTaskCompleted(task); - } - } - - /** - * - * @param file - */ - void addFiles(List files) { - // RJCTODO: Add handling of lack of support for file ingest in second stage - for (AbstractFile file : files) { - try { - // RJCTODO: Deal with possible IllegalStateException; maybe don't need logging here - IngestJob.ingestScheduler.scheduleFileIngestTask(this, file); - } catch (InterruptedException ex) { - // Handle the unexpected interrupt here rather than make ingest - // module writers responsible for writing this exception handler. - // The interrupt flag of the thread is reset for detection by - // the thread task code. - Thread.currentThread().interrupt(); - IngestJob.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS + // No matter what happens, let the task scheduler know that this + // task is completed and check for job completion. + IngestJob.taskScheduler.notifyTaskCompleted(task); + if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) { + this.handleTasksCompleted(); } } } /** - * Allows the ingest tasks scheduler to notify this ingest job whenever all - * the scheduled tasks for this ingest job have been completed. + * Adds more files to an ingest job, i.e., derived or carved files. Not + * currently supported for the second stage of the job. + * + * @param files A list of files to add. */ - void notifyTasksCompleted() { - switch (this.stage) { - case FIRST: - this.finishFirstStage(); - this.startSecondStage(); - break; - case SECOND: - this.finish(); - break; + void addFiles(List files) { + if (IngestJob.Stages.FIRST == this.stage) { + for (AbstractFile file : files) { + IngestJob.taskScheduler.scheduleFileIngestTask(this, file); + } + } else { + IngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS } } @@ -432,7 +398,6 @@ final class IngestJob { } } - // RJCTODO: Is this right? /** * Updates the data source ingest progress bar display name. * @@ -527,8 +492,10 @@ final class IngestJob { this.cancelled = true; - // Tell the ingest scheduler to cancel all pending tasks. - IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this); + /** + * Tell the task scheduler to cancel all pending tasks. + */ + IngestJob.taskScheduler.cancelPendingTasksForIngestJob(this); } /** @@ -541,25 +508,13 @@ final class IngestJob { return this.cancelled; } - /** - * Get some basic performance statistics on this job. - * - * @return An ingest job statistics object. - */ - IngestJobSnapshot getIngestJobSnapshot() { - return new IngestJobSnapshot(); - } - /** * Creates the file and data source ingest pipelines. * * @param ingestModuleTemplates Ingest module templates to use to populate * the pipelines. - * @throws InterruptedException */ - private void createIngestPipelines(List ingestModuleTemplates) throws InterruptedException { - // RJCTODO: Improve variable names! - + private void createIngestPipelines(List ingestModuleTemplates) { // Make mappings of ingest module factory class names to templates. Map dataSourceModuleTemplates = new HashMap<>(); Map fileModuleTemplates = new HashMap<>(); @@ -595,9 +550,18 @@ final class IngestJob { this.dataSourceIngestPipeline = firstStageDataSourceIngestPipeline; // Construct the file ingest pipelines. - int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads(); - for (int i = 0; i < numberOfFileIngestThreads; ++i) { - this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates)); + try { + int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads(); + for (int i = 0; i < numberOfFileIngestThreads; ++i) { + this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates)); + } + } catch (InterruptedException ex) { + /** + * The current thread was interrupted while blocked on a full queue. + * Blocking should never happen here, but reset the interrupted flag + * rather than just swallowing the exception. + */ + Thread.currentThread().interrupt(); } } @@ -623,14 +587,121 @@ final class IngestJob { return templates; } + /** + * Starts up the ingest pipelines and ingest progress bars. + * + * @return A collection of ingest module startup errors, empty on success. + */ + private List start(List ingestModuleTemplates) { + this.createIngestPipelines(ingestModuleTemplates); + List errors = startUpIngestPipelines(); + if (errors.isEmpty()) { + if (this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline()) { + // There is at least one first stage pipeline. + this.startFirstStage(); + } else if (this.hasSecondStageDataSourceIngestPipeline()) { + // There is no first stage pipeline, but there is a second stage + // ingest pipeline. + this.startSecondStage(); + } + } + return errors; + } + + /** + * Starts the first stage of the job. + */ + private void startFirstStage() { + this.stage = IngestJob.Stages.FIRST; + + /** + * Start one or both of the first stage progress bars. + */ + if (this.hasFirstStageDataSourceIngestPipeline()) { + this.startDataSourceIngestProgressBar(); + } + if (this.hasFileIngestPipeline()) { + this.startFileIngestProgressBar(); + } + + /** + * Schedule the first stage tasks. + */ + if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) { + IngestJob.taskScheduler.scheduleIngestTasks(this); + } else if (this.hasFirstStageDataSourceIngestPipeline()) { + IngestJob.taskScheduler.scheduleDataSourceIngestTask(this); + } else { + IngestJob.taskScheduler.scheduleFileIngestTasks(this); + + /** + * No data source ingest task has been scheduled for this stage, and + * it is possible, if unlikely, that no file ingest tasks were + * actually scheduled since there are files that get filtered out by + * the tasks scheduler. In this special case, an ingest thread will + * never get to make the following check for this stage of the job. + */ + if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) { + this.handleTasksCompleted(); + } + } + } + + /** + * Starts the second stage of the ingest job. + */ + private void startSecondStage() { + this.stage = IngestJob.Stages.SECOND; + this.startDataSourceIngestProgressBar(); + this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; + IngestJob.taskScheduler.scheduleDataSourceIngestTask(this); + } + + /** + * Checks to see if this job has at least one ingest pipeline. + * + * @return True or false. + */ + private boolean hasIngestPipeline() { + return this.hasFirstStageDataSourceIngestPipeline() + || this.hasFileIngestPipeline() + || this.hasSecondStageDataSourceIngestPipeline(); + } + + /** + * Checks to see if this job has a first stage data source ingest pipeline. + * + * @return True or false. + */ + private boolean hasFirstStageDataSourceIngestPipeline() { + return (this.firstStageDataSourceIngestPipeline.isEmpty() == false); + } + + /** + * Checks to see if this job has a second stage data source ingest pipeline. + * + * @return True or false. + */ + private boolean hasSecondStageDataSourceIngestPipeline() { + return (this.secondStageDataSourceIngestPipeline.isEmpty() == false); + } + + /** + * Checks to see if the job has a file ingest pipeline. + * + * @return True or false. + */ + private boolean hasFileIngestPipeline() { + return (this.fileIngestPipelines.peek().isEmpty() == false); + } + /** * Starts up each of the file and data source ingest modules to collect * possible errors. * * @return A collection of ingest module startup errors, empty on success. - * @throws InterruptedException */ - private List startUpIngestPipelines() throws InterruptedException { + private List startUpIngestPipelines() { List errors = new ArrayList<>(); // Start up the first stage data source ingest pipeline. @@ -725,8 +796,23 @@ final class IngestJob { } /** - * Shuts down the file ingest pipelines and current progress bars, if any, - * for this job. + * Handles when all ingest tasks for this job are completed by finishing the + * current stage and possibly starting the next stage. + */ + private void handleTasksCompleted() { + switch (this.stage) { + case FIRST: + this.finishFirstStage(); + break; + case SECOND: + this.finish(); + break; + } + } + + /** + * Shuts down the first stage ingest pipelines and progress bars and starts + * the second stage, if appropriate. */ private void finishFirstStage() { // Shut down the file ingest pipelines. Note that no shut down is @@ -758,22 +844,12 @@ final class IngestJob { this.fileIngestProgress = null; } } - } - /** - * RJCTODO - */ - private void startSecondStage() { - this.stage = IngestJob.Stages.SECOND; - if (!this.cancelled && !this.secondStageDataSourceIngestPipeline.isEmpty()) { - this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; - this.startDataSourceIngestProgressBar(); - try { - IngestJob.ingestScheduler.scheduleDataSourceIngestTask(this); - } catch (InterruptedException ex) { - // RJCTODO: - this.finish(); - } + /** + * Start the second stage, if appropriate. + */ + if (!this.cancelled && this.hasSecondStageDataSourceIngestPipeline()) { + this.startSecondStage(); } else { this.finish(); } @@ -792,7 +868,7 @@ final class IngestJob { } } - IngestJob.ingestJobsById.remove(this.id); + IngestJob.jobsById.remove(this.id); if (!this.isCancelled()) { logger.log(Level.INFO, "Ingest job {0} completed", this.id); IngestManager.getInstance().fireIngestJobCompleted(this.id); @@ -821,6 +897,15 @@ final class IngestJob { this.currentDataSourceIngestModuleCancelled = true; } + /** + * Gets a snapshot of this jobs state and performance. + * + * @return An ingest job statistics object. + */ + private IngestJobSnapshot getIngestJobSnapshot() { + return new IngestJobSnapshot(); + } + /** * Stores basic diagnostic statistics for an ingest job. */ @@ -847,23 +932,13 @@ final class IngestJob { this.estimatedFilesToProcess = IngestJob.this.estimatedFilesToProcess; this.snapShotTime = new Date().getTime(); } - this.tasksSnapshot = IngestJob.ingestScheduler.getTasksSnapshotForJob(this.jobId); + this.tasksSnapshot = IngestJob.taskScheduler.getTasksSnapshotForJob(this.jobId); } - /** - * RJCTODO - * - * @return - */ long getJobId() { return this.jobId; } - /** - * RJCTODO - * - * @return - */ String getDataSource() { return dataSource; } @@ -916,47 +991,22 @@ final class IngestJob { return estimatedFilesToProcess; } - /** - * RJCTODO - * - * @return - */ long getRootQueueSize() { return this.tasksSnapshot.getRootQueueSize(); } - /** - * RJCTODO - * - * @return - */ long getDirQueueSize() { - return this.tasksSnapshot.getDirQueueSize(); + return this.tasksSnapshot.getDirectoryTasksQueueSize(); } - /** - * RJCTODO - * - * @return - */ long getFileQueueSize() { return this.tasksSnapshot.getFileQueueSize(); } - /** - * RJCTODO - * - * @return - */ long getDsQueueSize() { return this.tasksSnapshot.getDsQueueSize(); } - /** - * RJCTODO - * - * @return - */ long getRunningListSize() { return this.tasksSnapshot.getRunningListSize(); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index 6d81f9db79..6587a20d19 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -19,7 +19,6 @@ package org.sleuthkit.autopsy.ingest; import java.util.List; -import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.Content; @@ -29,7 +28,6 @@ import org.sleuthkit.datamodel.Content; */ public final class IngestJobContext { - private static final Logger logger = Logger.getLogger(IngestJobContext.class.getName()); private final IngestJob ingestJob; IngestJobContext(IngestJob ingestJob) { @@ -101,25 +99,25 @@ public final class IngestJobContext { } /** - * Adds one or more files to the files to be passed through the file ingest - * pipeline of the ingest job associated with this context. + * Adds one or more files, i.e., extracted or carved files, to the ingest + * job associated with this context. * - * @param files The files to be processed by the file ingest pipeline. + * @param files The files to be added. * @deprecated use addFilesToJob() instead */ @Deprecated public void scheduleFiles(List files) { this.addFilesToJob(files); } - + /** - * Adds one or more files to the files to be passed through the file ingest - * pipeline of the ingest job associated with this context. + * Adds one or more files, i.e., extracted or carved files, to the ingest + * job associated with this context. * - * @param files The files to be processed by the file ingest pipeline. + * @param files The files to be added. */ public void addFilesToJob(List files) { this.ingestJob.addFiles(files); } - + } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index b04f973d70..ccee8b482e 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -134,7 +134,7 @@ public class IngestManager { */ private void startDataSourceIngestTask() { long threadId = nextThreadId.incrementAndGet(); - dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); + dataSourceIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } @@ -144,7 +144,7 @@ public class IngestManager { */ private void startFileIngestTask() { long threadId = nextThreadId.incrementAndGet(); - fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); + fileIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } @@ -154,7 +154,7 @@ public class IngestManager { } long taskId = nextThreadId.incrementAndGet(); - Future task = startIngestJobsThreadPool.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); + Future task = startIngestJobsThreadPool.submit(new StartIngestJobsCallable(taskId, dataSources, moduleTemplates, processUnallocatedSpace)); startIngestJobsTasks.put(taskId, task); } @@ -200,11 +200,11 @@ public class IngestManager { return IngestJob.ingestJobsAreRunning(); } - /** * Called each time a module in a data source pipeline starts + * * @param task - * @param ingestModuleDisplayName + * @param ingestModuleDisplayName */ void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) { ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource())); @@ -212,20 +212,22 @@ public class IngestManager { /** * Called each time a module in a file ingest pipeline starts + * * @param task - * @param ingestModuleDisplayName + * @param ingestModuleDisplayName */ void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) { IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile()); ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); - + incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); } /** * Called each time a data source ingest task completes - * @param task + * + * @param task */ void setIngestTaskProgressCompleted(DataSourceIngestTask task) { ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId())); @@ -233,7 +235,8 @@ public class IngestManager { /** * Called when a file ingest pipeline is complete for a given file - * @param task + * + * @param task */ void setIngestTaskProgressCompleted(FileIngestTask task) { IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); @@ -242,19 +245,21 @@ public class IngestManager { synchronized (processedFilesSnapshotLock) { processedFilesSnapshot.incrementProcessedFilesCount(); } - + incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); } - + /** - * Internal method to update the times associated with each module. + * Internal method to update the times associated with each module. + * * @param moduleName - * @param duration + * @param duration */ private void incrementModuleRunTime(String moduleName, Long duration) { - if (moduleName.equals("IDLE")) + if (moduleName.equals("IDLE")) { return; - + } + synchronized (ingestModuleRunTimes) { Long prevTimeL = ingestModuleRunTimes.get(moduleName); long prevTime = 0; @@ -262,12 +267,13 @@ public class IngestManager { prevTime = prevTimeL; } prevTime += duration; - ingestModuleRunTimes.put(moduleName, prevTime); + ingestModuleRunTimes.put(moduleName, prevTime); } } - + /** * Return the list of run times for each module + * * @return Map of module name to run time (in milliseconds) */ Map getModuleRunTimes() { @@ -279,13 +285,13 @@ public class IngestManager { /** * Get the stats on current state of each thread - * @return + * + * @return */ List getIngestThreadActivitySnapshots() { return new ArrayList<>(ingestThreadActivitySnapshots.values()); } - public void cancelAllIngestJobs() { // Stop creating new ingest jobs. for (Future handle : startIngestJobsTasks.values()) { @@ -418,7 +424,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobStarted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); } /** @@ -427,7 +433,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobCompleted(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); } /** @@ -436,7 +442,7 @@ public class IngestManager { * @param ingestJobId The ingest job id. */ void fireIngestJobCancelled(long ingestJobId) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); } /** @@ -445,7 +451,7 @@ public class IngestManager { * @param file The file that is completed. */ void fireFileIngestDone(AbstractFile file) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file)); } /** @@ -454,7 +460,7 @@ public class IngestManager { * @param moduleDataEvent A ModuleDataEvent with the details of the posting. */ void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); } /** @@ -465,7 +471,7 @@ public class IngestManager { * content. */ void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { - fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); + fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); } /** @@ -509,7 +515,7 @@ public class IngestManager { /** * Creates ingest jobs. */ - private class StartIngestJobsTask implements Callable { + private final class StartIngestJobsCallable implements Callable { private final long threadId; private final List dataSources; @@ -517,7 +523,7 @@ public class IngestManager { private final boolean processUnallocatedSpace; private ProgressHandle progress; - StartIngestJobsTask(long threadId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { + StartIngestJobsCallable(long threadId, List dataSources, List moduleTemplates, boolean processUnallocatedSpace) { this.threadId = threadId; this.dataSources = dataSources; this.moduleTemplates = moduleTemplates; @@ -587,9 +593,6 @@ public class IngestManager { break; } } - } catch (InterruptedException ex) { - // Reset interrupted status. - Thread.currentThread().interrupt(); } catch (Exception ex) { logger.log(Level.SEVERE, "Failed to create ingest job", ex); //NON-NLS } finally { @@ -603,12 +606,12 @@ public class IngestManager { /** * A consumer for an ingest task queue. */ - private class ExecuteIngestTasksTask implements Runnable { + private final class ExecuteIngestTasksRunnable implements Runnable { private final long threadId; private final IngestTaskQueue tasks; - ExecuteIngestTasksTask(long threadId, IngestTaskQueue tasks) { + ExecuteIngestTasksRunnable(long threadId, IngestTaskQueue tasks) { this.threadId = threadId; this.tasks = tasks; } @@ -632,7 +635,7 @@ public class IngestManager { /** * Fires ingest events to ingest manager property change listeners. */ - private static class FireIngestEventTask implements Runnable { + private static final class FireIngestEventRunnable implements Runnable { private final PropertyChangeSupport publisher; private final IngestJobEvent jobEvent; @@ -640,7 +643,7 @@ public class IngestManager { private final Object oldValue; private final Object newValue; - FireIngestEventTask(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) { + FireIngestEventRunnable(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) { this.publisher = publisher; this.jobEvent = event; this.moduleEvent = null; @@ -648,7 +651,7 @@ public class IngestManager { this.newValue = newValue; } - FireIngestEventTask(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) { + FireIngestEventRunnable(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) { this.publisher = publisher; this.jobEvent = null; this.moduleEvent = event; @@ -695,9 +698,9 @@ public class IngestManager { startTime = new Date(); this.activity = activity; this.dataSourceName = dataSource.getName(); - this.fileName = ""; + this.fileName = ""; } - + // file ingest thread IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) { this.threadId = threadId; @@ -711,7 +714,7 @@ public class IngestManager { long getJobId() { return jobId; } - + long getThreadId() { return threadId; } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactoryLoader.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactoryLoader.java index e7c71bab23..862ba5bcf0 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactoryLoader.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestModuleFactoryLoader.java @@ -38,6 +38,7 @@ import org.sleuthkit.autopsy.modules.fileextmismatch.FileExtMismatchDetectorModu import org.sleuthkit.autopsy.modules.filetypeid.FileTypeIdModuleFactory; import org.sleuthkit.autopsy.modules.hashdatabase.HashLookupModuleFactory; import org.sleuthkit.autopsy.modules.interestingitems.InterestingItemsIngestModuleFactory; +import org.sleuthkit.autopsy.modules.photoreccarver.PhotoRecCarverIngestModuleFactory; import org.sleuthkit.autopsy.modules.sevenzip.ArchiveFileExtractorModuleFactory; import org.sleuthkit.autopsy.python.JythonModuleLoader; @@ -51,8 +52,6 @@ final class IngestModuleFactoryLoader { private static final String SAMPLE_EXECUTABLE_MODULE_FACTORY_CLASS_NAME = SampleExecutableIngestModuleFactory.class.getCanonicalName(); private static final ArrayList coreModuleOrdering = new ArrayList() { { - // RJCTODO: Find out wherer ot put the photorec carver - // The ordering of the core ingest module factories implemented // using Java is hard-coded. add("org.sleuthkit.autopsy.recentactivity.RecentActivityExtracterModuleFactory"); //NON-NLS @@ -66,6 +65,7 @@ final class IngestModuleFactoryLoader { add(E01VerifierModuleFactory.class.getCanonicalName()); add(AndroidModuleFactory.class.getCanonicalName()); add(InterestingItemsIngestModuleFactory.class.getCanonicalName()); + add(PhotoRecCarverIngestModuleFactory.class.getCanonicalName()); } }; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestPipelinesConfiguration.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestPipelinesConfiguration.java index f9fe24fb55..669797b64b 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestPipelinesConfiguration.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestPipelinesConfiguration.java @@ -44,9 +44,8 @@ final class IngestPipelinesConfiguration { private static final String PIPELINE_TYPE_ATTR = "type"; //NON-NLS private static final String STAGE_ONE_DATA_SOURCE_INGEST_PIPELINE_ELEM = "ImageAnalysisStageOne"; //NON-NLS private static final String STAGE_TWO_DATA_SOURCE_INGEST_PIPELINE_ELEM = "ImageAnalysisStageTwo"; //NON-NLS - private static final String FILE_INGEST_PIPELINE_TYPE = "FileAnalysis"; //NON-NLS + private static final String FILE_INGEST_PIPELINE_ELEM = "FileAnalysis"; //NON-NLS private static final String INGEST_MODULE_ELEM = "MODULE"; //NON-NLS - private static final String XML_MODULE_CLASS_NAME_ATTR = "location"; //NON-NLS private static IngestPipelinesConfiguration instance; @@ -54,10 +53,6 @@ final class IngestPipelinesConfiguration { private final List fileIngestPipelineConfig = new ArrayList<>(); private final List stageTwoDataSourceIngestPipelineConfig = new ArrayList<>(); - // RJCTODO: Bring this code back into use, use it in IngestJob to sort things - // into the now three pipelines. Other NBMs built on top of Autopsy that - // have custom pipeline config files can do a PlatformUtil.extractResourceToUserConfigDir() - // before this is called. /** * Gets the ingest pipelines configuration singleton. * @@ -140,7 +135,6 @@ final class IngestPipelinesConfiguration { // Parse the pipeline elements to populate the pipeline // configuration lists. - // RJCTODO: SHould check that each element is unique. Or could try the XSD bit. List pipelineConfig = null; for (int pipelineNum = 0; pipelineNum < numPipelines; ++pipelineNum) { Element pipelineElement = (Element) pipelineElements.item(pipelineNum); @@ -150,7 +144,7 @@ final class IngestPipelinesConfiguration { case STAGE_ONE_DATA_SOURCE_INGEST_PIPELINE_ELEM: pipelineConfig = this.stageOneDataSourceIngestPipelineConfig; break; - case FILE_INGEST_PIPELINE_TYPE: + case FILE_INGEST_PIPELINE_ELEM: pipelineConfig = this.fileIngestPipelineConfig; break; case STAGE_TWO_DATA_SOURCE_INGEST_PIPELINE_ELEM: diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java index c9c97d8ab7..980753a04d 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -47,39 +47,55 @@ final class IngestTasksScheduler { private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); private static IngestTasksScheduler instance; - // Scheduling of data source ingest tasks is accomplished by putting them - // in a FIFO queue to be consumed by the ingest threads. The pending data - // tasks queue is therefore wrapped in a "dispenser" that implements the - // IngestTaskQueue interface and is exposed via a getter method. + /** + * Scheduling of data source ingest tasks is accomplished by putting them in + * a FIFO queue to be consumed by the ingest threads, so the queue is + * wrapped in a "dispenser" that implements the IngestTaskQueue interface + * and is exposed via a getter method. + */ private final LinkedBlockingQueue pendingDataSourceTasks; private final DataSourceIngestTaskQueue dataSourceTasksDispenser; - // Scheduling of file ingest tasks is accomplished by "shuffling" them - // through a sequence of internal queues that allows for the interleaving of - // tasks from different ingest jobs based on priority. These scheduling - // queues are: - // 1. root directory tasks (priority queue) - // 2. directory tasks (FIFO queue) - // 3. pending file tasks (LIFO queue) - // Tasks in the pending file tasks queue are ready to be consumed by the - // ingest threads. The pending file tasks queue is therefore wrapped in a - // "dispenser" that implements the IngestTaskQueue interface and is exposed - // via a getter method. + /** + * Scheduling of file ingest tasks is accomplished by "shuffling" them + * through a sequence of internal queues that allows for the interleaving of + * tasks from different ingest jobs based on priority. These scheduling + * queues are: + * + * 1. Root directory tasks (priority queue) + * + * 2. Directory tasks (FIFO queue) + * + * 3. Pending file tasks (LIFO queue). + * + * The pending file tasks queue is LIFO to handle large numbers of files + * extracted from archive files. At least one image has been processed that + * had a folder full of archive files. The queue grew to have thousands of + * entries, as each successive archive file was expanded, so now extracted + * files get added to the front of the queue so that in such a scenario they + * would be processed before the expansion of the next archive file. + * + * Tasks in the pending file tasks queue are ready to be consumed by the + * ingest threads, so the queue is wrapped in a "dispenser" that implements + * the IngestTaskQueue interface and is exposed via a getter method. + */ private final TreeSet rootDirectoryTasks; private final List directoryTasks; private final BlockingDeque pendingFileTasks; private final FileIngestTaskQueue fileTasksDispenser; - // The ingest scheduler is responsible for notifying an ingest jobs whenever - // all of the ingest tasks currently associated with the job are complete. - // To make this possible, the ingest tasks scheduler needs to keep track not - // only of the tasks in its queues, but also of the tasks that have been - // handed out for processing by code running on the ingest manager's ingest - // threads. Therefore all ingest tasks are added to this list and are not - // removed when an ingest thread takes an ingest task. Instead, the ingest - // thread calls back into the scheduler when the task is completed, at - // which time the task will be removed from this list. - private final List tasksInProgressAndPending; + /** + * The ingest tasks scheduler allows ingest jobs to query it to see if there + * are any tasks in progress for the job. To make this possible, the ingest + * tasks scheduler needs to keep track not only of the tasks in its queues, + * but also of the tasks that have been handed out for processing by the + * ingest threads. Therefore all ingest tasks are added to this list when + * they are created and are not removed when an ingest thread takes an + * ingest task. Instead, the ingest thread calls back into the scheduler + * when the task is completed, at which time the task will be removed from + * this list. + */ + private final List tasksInProgress; /** * Gets the ingest tasks scheduler singleton. @@ -101,7 +117,7 @@ final class IngestTasksScheduler { this.directoryTasks = new ArrayList<>(); this.pendingFileTasks = new LinkedBlockingDeque<>(); this.fileTasksDispenser = new FileIngestTaskQueue(); - this.tasksInProgressAndPending = new ArrayList<>(); + this.tasksInProgress = new ArrayList<>(); } /** @@ -132,42 +148,33 @@ final class IngestTasksScheduler { * @throws InterruptedException if the calling thread is blocked due to a * full tasks queue and is interrupted. */ - synchronized void scheduleIngestTasks(IngestJob job) throws InterruptedException { - // The initial ingest scheduling for a job an an atomic operation. - // Otherwise, the data source task might be completed before the file - // tasks are created, resulting in a potential false positive when this - // task scheduler checks whether or not all the tasks for the job are - // completed. - if (job.hasDataSourceIngestPipeline()) { - scheduleDataSourceIngestTask(job); - } - if (job.hasFileIngestPipeline()) { - scheduleFileIngestTasks(job); - } + synchronized void scheduleIngestTasks(IngestJob job) { + // Scheduling of both a data source ingest task and file ingest tasks + // for a job must be an atomic operation. Otherwise, the data source + // task might be completed before the file tasks are scheduled, + // resulting in a potential false positive when another thread checks + // whether or not all the tasks for the job are completed. + this.scheduleDataSourceIngestTask(job); + this.scheduleFileIngestTasks(job); } /** * Schedules a data source ingest task for an ingest job. * * @param job The job for which the tasks are to be scheduled. - * @throws InterruptedException if the calling thread is blocked due to a - * full tasks queue and is interrupted. */ - synchronized void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException { - // Create a data source ingest task for the data source associated with - // the ingest job and add the task to the pending data source tasks - // queue. Data source tasks are scheduled on a first come, first served - // basis. + synchronized void scheduleDataSourceIngestTask(IngestJob job) { DataSourceIngestTask task = new DataSourceIngestTask(job); - this.tasksInProgressAndPending.add(task); + this.tasksInProgress.add(task); try { - // This call should not block because the queue is (theoretically) - // unbounded. this.pendingDataSourceTasks.put(task); } catch (InterruptedException ex) { - this.tasksInProgressAndPending.remove(task); - IngestTasksScheduler.logger.log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS - throw ex; + /** + * The current thread was interrupted while blocked on a full queue. + * Discard the task and reset the interrupted flag. + */ + this.tasksInProgress.remove(task); + Thread.currentThread().interrupt(); } } @@ -175,18 +182,15 @@ final class IngestTasksScheduler { * Schedules file ingest tasks for an ingest job. * * @param job The job for which the tasks are to be scheduled. - * @throws InterruptedException if the calling thread is blocked due to a - * full tasks queue and is interrupted. */ - synchronized void scheduleFileIngestTasks(IngestJob job) throws InterruptedException { + synchronized void scheduleFileIngestTasks(IngestJob job) { // Get the top level files for the data source associated with this job - // and add them to the root directories priority queue. The file tasks - // may be interleaved with file tasks from other jobs, based on priority. + // and add them to the root directories priority queue. List topLevelFiles = getTopLevelFiles(job.getDataSource()); for (AbstractFile firstLevelFile : topLevelFiles) { FileIngestTask task = new FileIngestTask(job, firstLevelFile); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - this.tasksInProgressAndPending.add(task); + this.tasksInProgress.add(task); this.rootDirectoryTasks.add(task); } } @@ -197,16 +201,12 @@ final class IngestTasksScheduler { * Schedules a file ingest task for an ingest job. * * @param job The job for which the tasks are to be scheduled. - * @param file The file associated with the task. - * @throws InterruptedException if the calling thread is blocked due to a - * full tasks queue and is interrupted. + * @param file The file to be associated with the task. */ - void scheduleFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException, IllegalStateException { + synchronized void scheduleFileIngestTask(IngestJob job, AbstractFile file) { FileIngestTask task = new FileIngestTask(job, file); if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { - // This synchronized method sends the file task directly to the - // pending file tasks queue. This is done to prioritize derived - // and carved files generated by a file ingest task in progress. + this.tasksInProgress.add(task); addToPendingFileTasksQueue(task); } } @@ -217,12 +217,24 @@ final class IngestTasksScheduler { * * @param task The completed task. */ - synchronized void notifyTaskCompleted(IngestTask task) throws InterruptedException { - tasksInProgressAndPending.remove(task); - IngestJob job = task.getIngestJob(); - if (this.tasksForJobAreCompleted(job)) { - job.notifyTasksCompleted(); + synchronized void notifyTaskCompleted(IngestTask task) { + tasksInProgress.remove(task); + } + + /** + * Queries the task scheduler to determine whether or not all current ingest + * tasks for an ingest job are completed. + * + * @param job The job for which the query is to be performed. + * @return True or false. + */ + synchronized boolean tasksForJobAreCompleted(IngestJob job) { + for (IngestTask task : tasksInProgress) { + if (task.getIngestJob().getId() == job.getId()) { + return false; + } } + return true; } /** @@ -234,25 +246,17 @@ final class IngestTasksScheduler { * @param job The job for which the tasks are to to canceled. */ synchronized void cancelPendingTasksForIngestJob(IngestJob job) { - // The scheduling queues are cleared of tasks for the job, and the tasks - // that are removed from the scheduling queues are also removed from the - // tasks in progress list. However, a tasks in progress check for the - // job may still return true since the tasks that have been taken by the - // ingest threads are still in the tasks in progress list. long jobId = job.getId(); this.removeTasksForJob(this.rootDirectoryTasks, jobId); this.removeTasksForJob(this.directoryTasks, jobId); this.removeTasksForJob(this.pendingFileTasks, jobId); this.removeTasksForJob(this.pendingDataSourceTasks, jobId); - if (this.tasksForJobAreCompleted(job)) { - job.notifyTasksCompleted(); - } } /** - * A helper that gets the top level files such as file system root - * directories, layout files and virtual directories for a data source. Used - * to create file tasks to put into the root directories queue. + * Gets the top level files such as file system root directories, layout + * files and virtual directories for a data source. Used to create file + * tasks to put into the root directories queue. * * @param dataSource The data source. * @return A list of top level files. @@ -290,14 +294,11 @@ final class IngestTasksScheduler { } /** - * A helper that "shuffles" the file task queues to ensure that there is at - * least one task in the pending file ingest tasks queue, as long as there - * are still file ingest tasks to be performed. - * - * @throws InterruptedException if the calling thread is blocked due to a - * full tasks queue and is interrupted. + * "Shuffles" the file task queues to ensure that there is at least one task + * in the pending file ingest tasks queue, as long as there are still file + * ingest tasks to be performed. */ - synchronized private void shuffleFileTaskQueues() throws InterruptedException, IllegalStateException { + synchronized private void shuffleFileTaskQueues() { // This is synchronized because it is called both by synchronized // methods of this ingest scheduler and an unsynchronized method of its // file tasks "dispenser". @@ -323,16 +324,13 @@ final class IngestTasksScheduler { } // Try to add the most recently added directory from the - // directory tasks queue to the pending file tasks queue. Note - // the removal of the task from the tasks in progress list. If - // the task is enqueued, it will be put back in the list by - // the addToPendingFileTasksQueue() helper. - boolean tasksEnqueuedForDirectory = false; + // directory tasks queue to the pending file tasks queue. FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1); - this.tasksInProgressAndPending.remove(directoryTask); + this.tasksInProgress.remove(directoryTask); if (shouldEnqueueFileTask(directoryTask)) { addToPendingFileTasksQueue(directoryTask); - tasksEnqueuedForDirectory = true; + } else { + this.tasksInProgress.remove(directoryTask); } // If the directory contains subdirectories or files, try to @@ -349,16 +347,15 @@ final class IngestTasksScheduler { // addition of the task to the tasks in progress // list. This is necessary because this is the // first appearance of this task in the queues. - this.tasksInProgressAndPending.add(childTask); + this.tasksInProgress.add(childTask); this.directoryTasks.add(childTask); - tasksEnqueuedForDirectory = true; } else if (shouldEnqueueFileTask(childTask)) { // Found a file, put the task directly into the // pending file tasks queue. The new task will // be put into the tasks in progress list by the // addToPendingFileTasksQueue() helper. + this.tasksInProgress.add(childTask); addToPendingFileTasksQueue(childTask); - tasksEnqueuedForDirectory = true; } } } @@ -366,24 +363,13 @@ final class IngestTasksScheduler { String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS logger.log(Level.SEVERE, errorMessage, ex); } - - // In the case where the directory task is not pushed into the - // the pending file tasks queue and has no children, check to - // see if the job is completed - the directory task might have - // been the last task for the job. - if (!tasksEnqueuedForDirectory) { - IngestJob job = directoryTask.getIngestJob(); - if (this.tasksForJobAreCompleted(job)) { - job.notifyTasksCompleted(); - } - } } } /** - * A helper method that examines the file associated with a file ingest task - * to determine whether or not the file should be processed and therefore - * the task should be enqueued. + * Examines the file associated with a file ingest task to determine whether + * or not the file should be processed and therefore whether or not the task + * should be enqueued. * * @param task The task to be scrutinized. * @return True or false. @@ -407,9 +393,6 @@ final class IngestTasksScheduler { // Skip the task if the file is one of a select group of special, large // NTFS or FAT file system files. - // the file is in the root directory, has a file name - // starting with $, containing : (not default attributes) - //with meta address < 32, i.e. some special large NTFS and FAT files if (file instanceof org.sleuthkit.datamodel.File) { final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file; @@ -452,50 +435,28 @@ final class IngestTasksScheduler { return true; } - // RJCTODO: Is this still necessary? There is code elsewhere to remove and - // re-add the task to the tasks in progress list. /** - * A helper method to safely add a file ingest task to the blocking pending - * tasks queue. + * Adds a file ingest task to the blocking pending tasks queue. * - * @param task - * @throws IllegalStateException + * @param task The task to add. */ - synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException { - tasksInProgressAndPending.add(task); + synchronized private void addToPendingFileTasksQueue(FileIngestTask task) { try { - // The file is added to the front of the pending files queue because - // at least one image has been processed that had a folder full of - // archive files. The queue grew to have thousands of entries, so - // this (might) help with pushing those files through ingest. - this.pendingFileTasks.addFirst(task); - } catch (IllegalStateException ex) { - tasksInProgressAndPending.remove(task); - Logger.getLogger(IngestTasksScheduler.class.getName()).log(Level.SEVERE, "Pending file tasks queue is full", ex); //NON-NLS - throw ex; + this.pendingFileTasks.putFirst(task); + } catch (InterruptedException ex) { + /** + * The current thread was interrupted while blocked on a full queue. + * Discard the task and reset the interrupted flag. + */ + this.tasksInProgress.remove(task); + Thread.currentThread().interrupt(); } } /** - * Determines whether or not all current ingest tasks for an ingest job are - * completed. - * - * @param job The job for which the query is to be performed. - * @return True or false. - */ - private boolean tasksForJobAreCompleted(IngestJob job) { - for (IngestTask task : tasksInProgressAndPending) { - if (task.getIngestJob().getId() == job.getId()) { - return false; - } - } - return true; - } - - /** - * A helper that removes all of the ingest tasks associated with an ingest - * job from a tasks queue. The task is removed from the the tasks in - * progress list as well. + * Removes all of the ingest tasks associated with an ingest job from a + * tasks queue. The task is removed from the the tasks in progress list as + * well. * * @param taskQueue The queue from which to remove the tasks. * @param jobId The id of the job for which the tasks are to be removed. @@ -505,15 +466,14 @@ final class IngestTasksScheduler { while (iterator.hasNext()) { IngestTask task = iterator.next(); if (task.getIngestJob().getId() == jobId) { - this.tasksInProgressAndPending.remove(task); + this.tasksInProgress.remove(task); iterator.remove(); } } } /** - * A helper that counts the number of ingest tasks in a task queue for a - * given job. + * Counts the number of ingest tasks in a task queue for a given job. * * @param queue The queue for which to count tasks. * @param jobId The id of the job for which the tasks are to be counted. @@ -532,10 +492,11 @@ final class IngestTasksScheduler { } /** - * RJCTODO - * - * @param jobId - * @return + * Returns a snapshot of the states of the tasks in progress for an ingest + * job. + * + * @param jobId The identifier assigned to the job. + * @return */ synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) { return new IngestJobTasksSnapshot(jobId); @@ -684,9 +645,10 @@ final class IngestTasksScheduler { } /** - * A snapshot of ingest tasks data for an ingest job. + * A snapshot of ingest tasks data for an ingest job. */ class IngestJobTasksSnapshot { + private final long jobId; private final long rootQueueSize; private final long dirQueueSize; @@ -695,8 +657,9 @@ final class IngestTasksScheduler { private final long runningListSize; /** - * RJCTODO - * @param jobId + * Constructs a snapshot of ingest tasks data for an ingest job. + * + * @param jobId The identifier associated with the job. */ IngestJobTasksSnapshot(long jobId) { this.jobId = jobId; @@ -704,56 +667,51 @@ final class IngestTasksScheduler { this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId); this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId); this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId); - this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize; + this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgress, jobId) - fileQueueSize - dsQueueSize; } - + /** - * RJCTODO - * @return + * Gets the identifier associated with the ingest job for which this + * snapshot was created. + * + * @return The ingest job identifier. */ long getJobId() { return jobId; } /** - * RJCTODO - * @return + * Gets the number of file ingest tasks associated with the job that are + * in the root directories queue. + * + * @return The tasks count. */ long getRootQueueSize() { return rootQueueSize; } /** - * RJCTODO - * @return + * Gets the number of file ingest tasks associated with the job that are + * in the root directories queue. + * + * @return The tasks count. */ - long getDirQueueSize() { + long getDirectoryTasksQueueSize() { return dirQueueSize; } - /** - * RJCTODO - * @return - */ long getFileQueueSize() { return fileQueueSize; } - /** - * RJCTODO - * @return - */ long getDsQueueSize() { return dsQueueSize; } - /** - * RJCTODO - * @return - */ long getRunningListSize() { return runningListSize; - } + } + } - + } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/PipelineConfig.xml b/Core/src/org/sleuthkit/autopsy/ingest/PipelineConfig.xml index 36de99011f..69f9f362c3 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/PipelineConfig.xml +++ b/Core/src/org/sleuthkit/autopsy/ingest/PipelineConfig.xml @@ -16,6 +16,7 @@ Contains only the core ingest modules that ship with Autopsy --> org.sleuthkit.autopsy.thunderbirdparser.EmailParserModuleFactory org.sleuthkit.autopsy.modules.fileextmismatch.FileExtMismatchDetectorModuleFactory org.sleuthkit.autopsy.modules.interestingitems.InterestingItemsIngestModuleFactory + org.sleuthkit.autopsy.modules.photoreccarver.PhotoRecCarverIngestModuleFactory diff --git a/Core/src/org/sleuthkit/autopsy/modules/photoreccarver/PhotoRecCarverIngestModuleFactory.java b/Core/src/org/sleuthkit/autopsy/modules/photoreccarver/PhotoRecCarverIngestModuleFactory.java index 915ca1433f..325c0e7613 100755 --- a/Core/src/org/sleuthkit/autopsy/modules/photoreccarver/PhotoRecCarverIngestModuleFactory.java +++ b/Core/src/org/sleuthkit/autopsy/modules/photoreccarver/PhotoRecCarverIngestModuleFactory.java @@ -25,7 +25,6 @@ import org.sleuthkit.autopsy.ingest.FileIngestModule; import org.sleuthkit.autopsy.ingest.IngestModuleFactory; import org.sleuthkit.autopsy.ingest.IngestModuleFactoryAdapter; import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettings; -import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettingsPanel; /** * A factory for creating instances of file ingest modules that carve unallocated space