diff --git a/Core/src/org/sleuthkit/autopsy/examples/SampleExecutableDataSourceIngestModule.java b/Core/src/org/sleuthkit/autopsy/examples/SampleExecutableDataSourceIngestModule.java index 83641fbb93..98e8029932 100755 --- a/Core/src/org/sleuthkit/autopsy/examples/SampleExecutableDataSourceIngestModule.java +++ b/Core/src/org/sleuthkit/autopsy/examples/SampleExecutableDataSourceIngestModule.java @@ -61,7 +61,6 @@ import org.sleuthkit.datamodel.BlackboardArtifact.ARTIFACT_TYPE; import org.sleuthkit.datamodel.BlackboardAttribute.ATTRIBUTE_TYPE; import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Image; -import org.sleuthkit.datamodel.TskCoreException; import org.w3c.dom.Document; import org.w3c.dom.Element; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java index a972fd86a4..1fa66aa07c 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/DataSourceIngestModuleProgress.java @@ -67,7 +67,7 @@ public class DataSourceIngestModuleProgress { * @param message Message to display */ public void progress(String message) { - this.job.advanceDataSourceIngestProgressBar(message); + this.job.advanceDataSourceIngestProgressBar(message); // RJCTODO: Is this right? } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 65fb083a8a..c1fe37824f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -21,7 +21,9 @@ package org.sleuthkit.autopsy.ingest; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import javax.swing.JOptionPane; import org.netbeans.api.progress.ProgressHandle; @@ -39,13 +41,29 @@ import org.sleuthkit.datamodel.Content; final class IngestJob { private static final Logger logger = Logger.getLogger(IngestJob.class.getName()); - private static final IngestScheduler ingestScheduler = IngestScheduler.getInstance(); + private static final IngestTasksScheduler ingestScheduler = IngestTasksScheduler.getInstance(); + + // These static fields are used for the creation and management of ingest + // jobs in progress. + private static volatile boolean jobCreationIsEnabled; + private static final AtomicLong nextIngestJobId = new AtomicLong(0L); + private static final ConcurrentHashMap ingestJobsById = new ConcurrentHashMap<>(); + + // An ingest job may have multiple stages. + private enum Stages { + + FIRST, // High priority data source ingest modules plus file ingest modules + SECOND // Low priority data source ingest modules + }; // These fields define the ingest job and the work it entails. private final long id; private final Content dataSource; private final boolean processUnallocatedSpace; + private Stages stage; private DataSourceIngestPipeline dataSourceIngestPipeline; + private DataSourceIngestPipeline firstStageDataSourceIngestPipeline; + private DataSourceIngestPipeline secondStageDataSourceIngestPipeline; private final LinkedBlockingQueue fileIngestPipelines; // These fields are used to update the ingest progress UI components. The @@ -68,6 +86,74 @@ final class IngestJob { // This field is used for generating ingest job diagnostic data. private final long startTime; + /** + * Enables and disables ingest job creation. + * + * @param enabled True or false. + */ + static void jobCreationEnabled(boolean enabled) { + IngestJob.jobCreationIsEnabled = enabled; + } + + /** + * Creates an ingest job for a data source. + * + * @param dataSource The data source to ingest. + * @param ingestModuleTemplates The ingest module templates to use to create + * the ingest pipelines for the job. + * @param processUnallocatedSpace Whether or not the job should include + * processing of unallocated space. + * + * @return A collection of ingest module start up errors, empty on success. + * + * @throws InterruptedException + */ + static List startJob(Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException { + List errors = new ArrayList<>(); + if (IngestJob.jobCreationIsEnabled) { + long jobId = nextIngestJobId.incrementAndGet(); + IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace); + errors = job.start(ingestModuleTemplates); + if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) { + ingestJobsById.put(jobId, job); + IngestManager.getInstance().fireIngestJobStarted(jobId); + IngestJob.ingestScheduler.scheduleIngestTasks(job); + logger.log(Level.INFO, "Ingest job {0} started", jobId); + } + } + return errors; + } + + /** + * Queries whether or not ingest jobs are running. + * + * @return True or false. + */ + static boolean ingestJobsAreRunning() { + return !ingestJobsById.isEmpty(); + } + + /** + * RJCTODO + * @return + */ + static List getJobSnapshots() { + List snapShots = new ArrayList<>(); + for (IngestJob job : IngestJob.ingestJobsById.values()) { + snapShots.add(job.getIngestJobSnapshot()); + } + return snapShots; + } + + /** + * RJCTODO + */ + static void cancelAllJobs() { + for (IngestJob job : ingestJobsById.values()) { + job.cancel(); + } + } + /** * Constructs an ingest job. * @@ -80,6 +166,7 @@ final class IngestJob { this.id = id; this.dataSource = dataSource; this.processUnallocatedSpace = processUnallocatedSpace; + this.stage = IngestJob.Stages.FIRST; this.fileIngestPipelines = new LinkedBlockingQueue<>(); this.filesInProgress = new ArrayList<>(); this.dataSourceIngestProgressLock = new Object(); @@ -122,19 +209,344 @@ final class IngestJob { * @throws InterruptedException */ List start(List ingestModuleTemplates) throws InterruptedException { - createIngestPipelines(ingestModuleTemplates); + this.createIngestPipelines(ingestModuleTemplates); List errors = startUpIngestPipelines(); if (errors.isEmpty()) { if (!this.dataSourceIngestPipeline.isEmpty()) { - startDataSourceIngestProgressBar(); + this.startDataSourceIngestProgressBar(); } if (!this.fileIngestPipelines.peek().isEmpty()) { - startFileIngestProgressBar(); + this.startFileIngestProgressBar(); } } return errors; } + /** + * Checks to see if this job has a data source ingest pipeline. + * + * @return True or false. + */ + boolean hasDataSourceIngestPipeline() { + return (this.dataSourceIngestPipeline.isEmpty() == false); + } + + /** + * Checks to see if the job has a file ingest pipeline. + * + * @return True or false. + */ + boolean hasFileIngestPipeline() { + return (this.fileIngestPipelines.peek().isEmpty() == false); + } + + /** + * Passes the data source for this job through the data source ingest + * pipeline. + * + * @param task A data source ingest task wrapping the data source. + * @throws InterruptedException + */ + void process(DataSourceIngestTask task) throws InterruptedException { + try { + if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) { + List errors = new ArrayList<>(); + errors.addAll(this.dataSourceIngestPipeline.process(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } + } + + // Shut down the data source ingest progress bar right away. + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + } + } + } finally { + // No matter what happens, let the ingest scheduler know that this + // task is completed. + IngestJob.ingestScheduler.notifyTaskCompleted(task); + } + } + + /** + * Passes the a file from the data source for this job through the file + * ingest pipeline. + * + * @param task A file ingest task. + * @throws InterruptedException + */ + void process(FileIngestTask task) throws InterruptedException { + try { + if (!this.isCancelled()) { + // Get a file ingest pipeline not currently in use by another + // file ingest thread. + FileIngestPipeline pipeline = this.fileIngestPipelines.take(); + if (!pipeline.isEmpty()) { + // Get the file to process. + AbstractFile file = task.getFile(); + + // Update the file ingest progress bar. + synchronized (this.fileIngestProgressLock) { + ++this.processedFiles; + if (this.processedFiles <= this.estimatedFilesToProcess) { + this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles); + } else { + this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess); + } + this.filesInProgress.add(file.getName()); + } + + // Run the file through the pipeline. + List errors = new ArrayList<>(); + errors.addAll(pipeline.process(task)); + if (!errors.isEmpty()) { + logIngestModuleErrors(errors); + } + + // Update the file ingest progress bar again in case the + // file was being displayed. + if (!this.cancelled) { + synchronized (this.fileIngestProgressLock) { + this.filesInProgress.remove(file.getName()); + if (this.filesInProgress.size() > 0) { + this.fileIngestProgress.progress(this.filesInProgress.get(0)); + } else { + this.fileIngestProgress.progress(""); + } + } + } + } + + // Relinquish the pipeline so it can be reused by another file + // ingest thread. + this.fileIngestPipelines.put(pipeline); + } + } finally { + // No matter what happens, let the ingest scheduler know that this + // task is completed. + IngestJob.ingestScheduler.notifyTaskCompleted(task); + } + } + + /** + * + * @param file + */ + void addFiles(List files) { + // RJCTODO: Add handling of lack of support for file ingest in second stage + for (AbstractFile file : files) { + try { + // RJCTODO: Deal with possible IllegalStateException; maybe don't need logging here + IngestJob.ingestScheduler.scheduleFileIngestTask(this, file); + } catch (InterruptedException ex) { + // Handle the unexpected interrupt here rather than make ingest + // module writers responsible for writing this exception handler. + // The interrupt flag of the thread is reset for detection by + // the thread task code. + Thread.currentThread().interrupt(); + IngestJob.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS + } + } + } + + /** + * Allows the ingest tasks scheduler to notify this ingest job whenever all + * the scheduled tasks for this ingest job have been completed. + */ + void notifyTasksCompleted() { + switch (this.stage) { + case FIRST: + this.finishFirstStage(); + this.startSecondStage(); + break; + case SECOND: + this.finish(); + break; + } + } + + /** + * Updates the display name of the data source ingest progress bar. + * + * @param displayName The new display name. + */ + void updateDataSourceIngestProgressBarDisplayName(String displayName) { + if (!this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + this.dataSourceIngestProgress.setDisplayName(displayName); + } + } + } + + /** + * Switches the data source progress bar to determinate mode. This should be + * called if the total work units to process the data source is known. + * + * @param workUnits Total number of work units for the processing of the + * data source. + */ + void switchDataSourceIngestProgressBarToDeterminate(int workUnits) { + if (!this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.switchToDeterminate(workUnits); + } + } + } + } + + /** + * Switches the data source ingest progress bar to indeterminate mode. This + * should be called if the total work units to process the data source is + * unknown. + */ + void switchDataSourceIngestProgressBarToIndeterminate() { + if (!this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.switchToIndeterminate(); + } + } + } + } + + /** + * Updates the data source ingest progress bar with the number of work units + * performed, if in the determinate mode. + * + * @param workUnits Number of work units performed. + */ + void advanceDataSourceIngestProgressBar(int workUnits) { + if (!this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.progress("", workUnits); + } + } + } + } + + // RJCTODO: Is this right? + /** + * Updates the data source ingest progress bar display name. + * + * @param displayName The new display name. + */ + void advanceDataSourceIngestProgressBar(String displayName) { + if (!this.cancelled) { + synchronized (this.dataSourceIngestProgressLock) { + if (null != this.dataSourceIngestProgress) { + this.dataSourceIngestProgress.progress(displayName); + } + } + } + } + + /** + * Updates the progress bar with the number of work units performed, if in + * the determinate mode. + * + * @param message Message to display in sub-title + * @param workUnits Number of work units performed. + */ + void advanceDataSourceIngestProgressBar(String message, int workUnits) { + if (!this.cancelled) { + synchronized (this.fileIngestProgressLock) { + this.dataSourceIngestProgress.progress(message, workUnits); + } + } + } + + /** + * Determines whether or not a temporary cancellation of data source ingest + * in order to stop the currently executing data source ingest module is in + * effect. + * + * @return True or false. + */ + boolean currentDataSourceIngestModuleIsCancelled() { + return this.currentDataSourceIngestModuleCancelled; + } + + /** + * Rescind a temporary cancellation of data source ingest in order to stop + * the currently executing data source ingest module. + */ + void currentDataSourceIngestModuleCancellationCompleted() { + this.currentDataSourceIngestModuleCancelled = false; + + // A new progress bar must be created because the cancel button of the + // previously constructed component is disabled by NetBeans when the + // user selects the "OK" button of the cancellation confirmation dialog + // popped up by NetBeans when the progress bar cancel button was + // pressed. + synchronized (this.dataSourceIngestProgressLock) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + this.startDataSourceIngestProgressBar(); + } + } + + /** + * Requests cancellation of ingest, i.e., a shutdown of the data source and + * file ingest pipelines. + */ + void cancel() { + // Put a cancellation message on data source ingest progress bar, + // if it is still running. + synchronized (this.dataSourceIngestProgressLock) { + if (dataSourceIngestProgress != null) { + final String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.dataSourceIngest.initialDisplayName", + dataSource.getName()); + dataSourceIngestProgress.setDisplayName( + NbBundle.getMessage(this.getClass(), + "IngestJob.progress.cancelling", + displayName)); + } + } + + // Put a cancellation message on the file ingest progress bar, + // if it is still running. + synchronized (this.fileIngestProgressLock) { + if (this.fileIngestProgress != null) { + final String displayName = NbBundle.getMessage(this.getClass(), + "IngestJob.progress.fileIngest.displayName", + this.dataSource.getName()); + this.fileIngestProgress.setDisplayName( + NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", + displayName)); + } + } + + this.cancelled = true; + + // Tell the ingest scheduler to cancel all pending tasks. + IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this); + } + + /** + * Queries whether or not cancellation of ingest i.e., a shutdown of the + * data source and file ingest pipelines, has been requested + * + * @return True or false. + */ + boolean isCancelled() { + return this.cancelled; + } + + /** + * Get some basic performance statistics on this job. + * + * @return An ingest job statistics object. + */ + IngestJobSnapshot getIngestJobSnapshot() { + return new IngestJobSnapshot(); + } + /** * Creates the file and data source ingest pipelines. * @@ -143,10 +555,28 @@ final class IngestJob { * @throws InterruptedException */ private void createIngestPipelines(List ingestModuleTemplates) throws InterruptedException { - this.dataSourceIngestPipeline = new DataSourceIngestPipeline(this, ingestModuleTemplates); + // RJCTODO: Use config file + // Sort the ingest module templates as required for the pipelines. + List firstStageDataSourceModuleTemplates = new ArrayList<>(); + List secondStageDataSourceModuleTemplates = new ArrayList<>(); + List fileIngestModuleTemplates = new ArrayList<>(); + for (IngestModuleTemplate template : ingestModuleTemplates) { + if (template.isDataSourceIngestModuleTemplate()) { + firstStageDataSourceModuleTemplates.add(template); + } else { + firstStageDataSourceModuleTemplates.add(template); + } + } + + // Contruct the pipelines. + this.firstStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, firstStageDataSourceModuleTemplates); + this.secondStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, secondStageDataSourceModuleTemplates); + this.dataSourceIngestPipeline = firstStageDataSourceIngestPipeline; + + // Construct the file ingest pipelines. int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads(); for (int i = 0; i < numberOfFileIngestThreads; ++i) { - this.fileIngestPipelines.put(new FileIngestPipeline(this, ingestModuleTemplates)); + this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates)); } } @@ -160,9 +590,12 @@ final class IngestJob { private List startUpIngestPipelines() throws InterruptedException { List errors = new ArrayList<>(); - // Start up the data source ingest pipeline. + // Start up the first stage data source ingest pipeline. errors.addAll(this.dataSourceIngestPipeline.startUp()); + // Start up the second stage data source ingest pipeline. + errors.addAll(this.secondStageDataSourceIngestPipeline.startUp()); + // Start up the file ingest pipelines (one per file ingest thread). for (FileIngestPipeline pipeline : this.fileIngestPipelines) { errors.addAll(pipeline.startUp()); @@ -249,201 +682,10 @@ final class IngestJob { } /** - * Checks to see if this job has a data source ingest pipeline. - * - * @return True or false. + * Shuts down the file ingest pipelines and current progress bars, if any, + * for this job. */ - boolean hasDataSourceIngestPipeline() { - return (this.dataSourceIngestPipeline.isEmpty() == false); - } - - /** - * Checks to see if the job has a file ingest pipeline. - * - * @return True or false. - */ - boolean hasFileIngestPipeline() { - return (this.fileIngestPipelines.peek().isEmpty() == false); - } - - /** - * Passes the data source for this job through the data source ingest - * pipeline. - * - * @param task A data source ingest task wrapping the data source. - * @throws InterruptedException - */ - void process(DataSourceIngestTask task) throws InterruptedException { - try { - if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) { - List errors = new ArrayList<>(); - errors.addAll(this.dataSourceIngestPipeline.process(task)); - if (!errors.isEmpty()) { - logIngestModuleErrors(errors); - } - } - - // The single data source ingest task for this job is done, so shut - // down the data source ingest progress bar right away. - synchronized (this.dataSourceIngestProgressLock) { - if (null != this.dataSourceIngestProgress) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; - } - } - } finally { - // No matter what happens, let the ingest scheduler know that this - // task is completed. - IngestJob.ingestScheduler.notifyTaskCompleted(task); - } - } - - /** - * Updates the display name of the data source ingest progress bar. - * - * @param displayName The new display name. - */ - void updateDataSourceIngestProgressBarDisplayName(String displayName) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.setDisplayName(displayName); - } - } - } - - /** - * Updates the data source progress bar and switches it to determinate mode. - * - * @param workUnits Total number of work units for the processing of the - * data source. - */ - void switchDataSourceIngestProgressBarToDeterminate(int workUnits) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.switchToDeterminate(workUnits); - } - } - } - - /** - * Switches the data source ingest progress bar to indeterminate mode. This - * should be called if the total work units to process the data source is - * unknown. - */ - void switchDataSourceIngestProgressBarToIndeterminate() { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.switchToIndeterminate(); - } - } - } - - /** - * Updates the data source ingest progress bar with the number of work units - * performed, if in the determinate mode. - * - * @param workUnits Number of work units performed. - */ - void advanceDataSourceIngestProgressBar(int workUnits) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.progress("", workUnits); - } - } - } - - /** - * Updates the data source ingest progress bar display name. - * - * @param displayName The new display name. - */ - void advanceDataSourceIngestProgressBar(String displayName) { - if (!this.cancelled) { - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.progress(displayName); - } - } - } - - /** - * Updates the progress bar with the number of work units performed, if in - * the determinate mode. - * - * @param message Message to display in sub-title - * @param workUnits Number of work units performed. - */ - void advanceDataSourceIngestProgressBar(String message, int workUnits) { - if (!this.cancelled) { - synchronized (this.fileIngestProgressLock) { - this.dataSourceIngestProgress.progress(message, workUnits); - } - } - } - - /** - * Passes the a file from the data source for this job through the file - * ingest pipeline. - * - * @param task A file ingest task. - * @throws InterruptedException - */ - void process(FileIngestTask task) throws InterruptedException { - try { - if (!this.isCancelled()) { - // Get a file ingest pipeline not currently in use by another - // file ingest thread. - FileIngestPipeline pipeline = this.fileIngestPipelines.take(); - if (!pipeline.isEmpty()) { - // Get the file to process. - AbstractFile file = task.getFile(); - - // Update the file ingest progress bar. - synchronized (this.fileIngestProgressLock) { - ++this.processedFiles; - if (this.processedFiles <= this.estimatedFilesToProcess) { - this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles); - } else { - this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess); - } - this.filesInProgress.add(file.getName()); - } - - // Run the file through the pipeline. - List errors = new ArrayList<>(); - errors.addAll(pipeline.process(task)); - if (!errors.isEmpty()) { - logIngestModuleErrors(errors); - } - - // Update the file ingest progress bar again in case the - // file was being displayed. - if (!this.cancelled) { - synchronized (this.fileIngestProgressLock) { - this.filesInProgress.remove(file.getName()); - if (this.filesInProgress.size() > 0) { - this.fileIngestProgress.progress(this.filesInProgress.get(0)); - } else { - this.fileIngestProgress.progress(""); - } - } - } - } - - // Relinquish the pipeline so it can be reused by another file - // ingest thread. - this.fileIngestPipelines.put(pipeline); - } - } finally { - // No matter what happens, let the ingest scheduler know that this - // task is completed. - IngestJob.ingestScheduler.notifyTaskCompleted(task); - } - } - - /** - * Shuts down the ingest pipelines and progress bars for this job. - */ - void finish() { + private void finishFirstStage() { // Shut down the file ingest pipelines. Note that no shut down is // required for the data source ingest pipeline because data source // ingest modules do not have a shutdown() method. @@ -456,8 +698,8 @@ final class IngestJob { logIngestModuleErrors(errors); } - // Finish the data source ingest progress bar, if it hasn't already - // been finished. + // Finish the first stage data source ingest progress bar, if it hasn't + // already been finished. synchronized (this.dataSourceIngestProgressLock) { if (this.dataSourceIngestProgress != null) { this.dataSourceIngestProgress.finish(); @@ -475,6 +717,48 @@ final class IngestJob { } } + /** + * RJCTODO + */ + private void startSecondStage() { + this.stage = IngestJob.Stages.SECOND; + if (!this.cancelled && !this.secondStageDataSourceIngestPipeline.isEmpty()) { + this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline; + this.startDataSourceIngestProgressBar(); + try { + IngestJob.ingestScheduler.scheduleDataSourceIngestTask(this); + } catch (InterruptedException ex) { + // RJCTODO: + this.finish(); + } + } else { + this.finish(); + } + } + + /** + * Shuts down the ingest pipelines and progress bars for this job. + */ + private void finish() { + // Finish the second stage data source ingest progress bar, if it hasn't + // already been finished. + synchronized (this.dataSourceIngestProgressLock) { + if (this.dataSourceIngestProgress != null) { + this.dataSourceIngestProgress.finish(); + this.dataSourceIngestProgress = null; + } + } + + IngestJob.ingestJobsById.remove(this.id); + if (!this.isCancelled()) { + logger.log(Level.INFO, "Ingest job {0} completed", this.id); + IngestManager.getInstance().fireIngestJobCompleted(this.id); + } else { + logger.log(Level.INFO, "Ingest job {0} cancelled", this.id); + IngestManager.getInstance().fireIngestJobCancelled(this.id); + } + } + /** * Write ingest module errors to the log. * @@ -494,116 +778,51 @@ final class IngestJob { this.currentDataSourceIngestModuleCancelled = true; } - /** - * Determines whether or not a temporary cancellation of data source ingest - * in order to stop the currently executing data source ingest module is in - * effect. - * - * @return True or false. - */ - boolean currentDataSourceIngestModuleIsCancelled() { - return this.currentDataSourceIngestModuleCancelled; - } - - /** - * Rescind a temporary cancellation of data source ingest in order to stop - * the currently executing data source ingest module. - */ - void currentDataSourceIngestModuleCancellationCompleted() { - this.currentDataSourceIngestModuleCancelled = false; - - // A new progress bar must be created because the cancel button of the - // previously constructed component is disabled by NetBeans when the - // user selects the "OK" button of the cancellation confirmation dialog - // popped up by NetBeans when the progress bar cancel button was - // pressed. - synchronized (this.dataSourceIngestProgressLock) { - this.dataSourceIngestProgress.finish(); - this.dataSourceIngestProgress = null; - this.startDataSourceIngestProgressBar(); - } - } - - /** - * Requests cancellation of ingest, i.e., a shutdown of the data source and - * file ingest pipelines. - */ - void cancel() { - // Put a cancellation message on data source ingest progress bar, - // if it is still running. - synchronized (this.dataSourceIngestProgressLock) { - if (dataSourceIngestProgress != null) { - final String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.dataSourceIngest.initialDisplayName", - dataSource.getName()); - dataSourceIngestProgress.setDisplayName( - NbBundle.getMessage(this.getClass(), - "IngestJob.progress.cancelling", - displayName)); - } - } - - // Put a cancellation message on the file ingest progress bar, - // if it is still running. - synchronized (this.fileIngestProgressLock) { - if (this.fileIngestProgress != null) { - final String displayName = NbBundle.getMessage(this.getClass(), - "IngestJob.progress.fileIngest.displayName", - this.dataSource.getName()); - this.fileIngestProgress.setDisplayName( - NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling", - displayName)); - } - } - - this.cancelled = true; - - // Tell the ingest scheduler to cancel all pending tasks. - IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this); - } - - /** - * Queries whether or not cancellation of ingest i.e., a shutdown of the - * data source and file ingest pipelines, has been requested - * - * @return True or false. - */ - boolean isCancelled() { - return this.cancelled; - } - - /** - * Get some basic performance statistics on this job. - * - * @return An ingest job statistics object. - */ - IngestJobStats getStats() { - return new IngestJobStats(); - } - /** * Stores basic diagnostic statistics for an ingest job. */ - class IngestJobStats { + class IngestJobSnapshot { + private final long jobId; + private final String dataSource; private final long startTime; private final long processedFiles; private final long estimatedFilesToProcess; private final long snapShotTime; + private final IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot; /** * Constructs an object to stores basic diagnostic statistics for an * ingest job. */ - IngestJobStats() { + IngestJobSnapshot() { + this.jobId = IngestJob.this.id; + this.dataSource = IngestJob.this.dataSource.getName(); this.startTime = IngestJob.this.startTime; synchronized (IngestJob.this.fileIngestProgressLock) { this.processedFiles = IngestJob.this.processedFiles; this.estimatedFilesToProcess = IngestJob.this.estimatedFilesToProcess; this.snapShotTime = new Date().getTime(); } + this.tasksSnapshot = IngestJob.ingestScheduler.getTasksSnapshotForJob(this.jobId); } + /** + * RJCTODO + * @return + */ + long getJobId() { + return this.jobId; + } + + /** + * RJCTODO + * @return + */ + String getDataSource() { + return dataSource; + } + /** * Gets files per second throughput since job started. * @@ -651,6 +870,47 @@ final class IngestJob { long getFilesEstimated() { return estimatedFilesToProcess; } + + /** + * RJCTODO + * @return + */ + long getRootQueueSize() { + return this.tasksSnapshot.getRootQueueSize(); + } + + /** + * RJCTODO + * @return + */ + long getDirQueueSize() { + return this.tasksSnapshot.getDirQueueSize(); + } + + /** + * RJCTODO + * @return + */ + long getFileQueueSize() { + return this.tasksSnapshot.getFileQueueSize(); + } + + /** + * RJCTODO + * @return + */ + long getDsQueueSize() { + return this.tasksSnapshot.getDsQueueSize(); + } + + /** + * RJCTODO + * @return + */ + long getRunningListSize() { + return this.tasksSnapshot.getRunningListSize(); + } + } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java index dc0248ad11..6d81f9db79 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJobContext.java @@ -19,7 +19,6 @@ package org.sleuthkit.autopsy.ingest; import java.util.List; -import java.util.logging.Level; import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.Content; @@ -31,7 +30,6 @@ import org.sleuthkit.datamodel.Content; public final class IngestJobContext { private static final Logger logger = Logger.getLogger(IngestJobContext.class.getName()); - private static final IngestScheduler scheduler = IngestScheduler.getInstance(); private final IngestJob ingestJob; IngestJobContext(IngestJob ingestJob) { @@ -107,20 +105,21 @@ public final class IngestJobContext { * pipeline of the ingest job associated with this context. * * @param files The files to be processed by the file ingest pipeline. + * @deprecated use addFilesToJob() instead */ + @Deprecated public void scheduleFiles(List files) { - for (AbstractFile file : files) { - try { - IngestJobContext.scheduler.scheduleAdditionalFileIngestTask(this.ingestJob, file); - } catch (InterruptedException ex) { - // Handle the unexpected interrupt here rather than make ingest - // module writers responsible for writing this exception handler. - // The interrupt flag of the thread is reset for detection by - // the thread task code. - Thread.currentThread().interrupt(); - IngestJobContext.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS - } - } + this.addFilesToJob(files); + } + + /** + * Adds one or more files to the files to be passed through the file ingest + * pipeline of the ingest job associated with this context. + * + * @param files The files to be processed by the file ingest pipeline. + */ + public void addFilesToJob(List files) { + this.ingestJob.addFiles(files); } } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index a86b9ff78c..b04f973d70 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -134,7 +134,7 @@ public class IngestManager { */ private void startDataSourceIngestTask() { long threadId = nextThreadId.incrementAndGet(); - dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue())); + dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } @@ -144,7 +144,7 @@ public class IngestManager { */ private void startFileIngestTask() { long threadId = nextThreadId.incrementAndGet(); - fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue())); + fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); } @@ -174,12 +174,12 @@ public class IngestManager { } void handleCaseOpened() { - IngestScheduler.getInstance().setEnabled(true); + IngestJob.jobCreationEnabled(true); clearIngestMessageBox(); } void handleCaseClosed() { - IngestScheduler.getInstance().setEnabled(false); + IngestJob.jobCreationEnabled(false); cancelAllIngestJobs(); clearIngestMessageBox(); } @@ -197,7 +197,7 @@ public class IngestManager { * @return True if any ingest jobs are in progress, false otherwise. */ public boolean isIngestRunning() { - return IngestScheduler.getInstance().ingestJobsAreRunning(); + return IngestJob.ingestJobsAreRunning(); } @@ -293,7 +293,7 @@ public class IngestManager { } // Cancel all the jobs already created. - IngestScheduler.getInstance().cancelAllIngestJobs(); + IngestJob.cancelAllJobs(); } /** @@ -555,7 +555,7 @@ public class IngestManager { } // Start an ingest job for the data source. - List errors = IngestScheduler.getInstance().startIngestJob(dataSource, moduleTemplates, processUnallocatedSpace); + List errors = IngestJob.startJob(dataSource, moduleTemplates, processUnallocatedSpace); if (!errors.isEmpty()) { // Report the errors to the user. They have already been logged. StringBuilder moduleStartUpErrors = new StringBuilder(); diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java index dff5c2c856..61dd097f6f 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java @@ -29,7 +29,6 @@ import javax.swing.table.AbstractTableModel; import javax.swing.table.TableColumn; import org.apache.commons.lang3.time.DurationFormatUtils; import org.openide.util.NbBundle; -import org.sleuthkit.autopsy.ingest.IngestScheduler.IngestJobSchedulerStats; public class IngestProgressSnapshotPanel extends javax.swing.JPanel { @@ -161,20 +160,20 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel { private final String[] columnNames = {"Job ID", "Data Source", "Start", "Num Processed", "Files/Sec", "In Progress", "Files Queued", "Dir Queued", "Root Queued", "DS Queued"}; - private List schedStats; + private List jobSnapshots; private IngestJobTableModel() { refresh(); } private void refresh() { - schedStats = IngestScheduler.getInstance().getJobStats(); + jobSnapshots = IngestJob.getJobSnapshots(); fireTableDataChanged(); } @Override public int getRowCount() { - return schedStats.size(); + return jobSnapshots.size(); } @Override @@ -189,39 +188,39 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel { @Override public Object getValueAt(int rowIndex, int columnIndex) { - IngestJobSchedulerStats schedStat = schedStats.get(rowIndex); + IngestJob.IngestJobSnapshot snapShot = jobSnapshots.get(rowIndex); Object cellValue; switch (columnIndex) { case 0: - cellValue = schedStat.getJobId(); + cellValue = snapShot.getJobId(); break; case 1: - cellValue = schedStat.getDataSource(); + cellValue = snapShot.getDataSource(); break; case 2: SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - cellValue = dateFormat.format(new Date(schedStat.getIngestJobStats().getStartTime())); + cellValue = dateFormat.format(new Date(snapShot.getStartTime())); break; case 3: - cellValue = schedStat.getIngestJobStats().getFilesProcessed(); + cellValue = snapShot.getFilesProcessed(); break; case 4: - cellValue = schedStat.getIngestJobStats().getSpeed(); + cellValue = snapShot.getSpeed(); break; case 5: - cellValue = schedStat.getRunningListSize(); + cellValue = snapShot.getRunningListSize(); break; case 6: - cellValue = schedStat.getFileQueueSize(); + cellValue = snapShot.getFileQueueSize(); break; case 7: - cellValue = schedStat.getDirQueueSize(); + cellValue = snapShot.getDirQueueSize(); break; case 8: - cellValue = schedStat.getRootQueueSize(); + cellValue = snapShot.getRootQueueSize(); break; case 9: - cellValue = schedStat.getDsQueueSize(); + cellValue = snapShot.getDsQueueSize(); break; default: cellValue = null; diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java deleted file mode 100755 index 4fef5eaad9..0000000000 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestScheduler.java +++ /dev/null @@ -1,678 +0,0 @@ -/* - * Autopsy Forensic Browser - * - * Copyright 2012-2014 Basis Technology Corp. - * Contact: carrier sleuthkit org - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.sleuthkit.autopsy.ingest; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.sleuthkit.autopsy.coreutils.Logger; -import org.sleuthkit.autopsy.ingest.IngestJob.IngestJobStats; -import org.sleuthkit.datamodel.AbstractFile; -import org.sleuthkit.datamodel.Content; -import org.sleuthkit.datamodel.File; -import org.sleuthkit.datamodel.FileSystem; -import org.sleuthkit.datamodel.TskCoreException; -import org.sleuthkit.datamodel.TskData; - -/** - * Creates ingest jobs and their constituent ingest tasks, queuing the tasks in - * priority order for execution by the ingest manager's ingest threads. - */ -final class IngestScheduler { - - private static final Logger logger = Logger.getLogger(IngestScheduler.class.getName()); - - private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); - - private static IngestScheduler instance = null; - - private final AtomicLong nextIngestJobId = new AtomicLong(0L); - - private final ConcurrentHashMap ingestJobsById = new ConcurrentHashMap<>(); - - private volatile boolean enabled = false; - - private final DataSourceIngestTaskQueue dataSourceTaskDispenser = new DataSourceIngestTaskQueue(); - - private final FileIngestTaskQueue fileTaskDispenser = new FileIngestTaskQueue(); - - // The following five collections lie at the heart of the scheduler. - // The pending tasks queues are used to schedule tasks for an ingest job. If - // multiple jobs are scheduled, tasks from different jobs may become - // interleaved in these queues. - // FIFO queue for data source-level tasks. - private final LinkedBlockingQueue pendingDataSourceTasks = new LinkedBlockingQueue<>(); // Guarded by this - - // File tasks are "shuffled" - // through root directory (priority queue), directory (LIFO), and file tasks - // queues (LIFO). If a file task makes it into the pending file tasks queue, - // it is consumed by the ingest threads. - private final TreeSet pendingRootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this - - private final List pendingDirectoryTasks = new ArrayList<>(); // Guarded by this - - private final BlockingDeque pendingFileTasks = new LinkedBlockingDeque<>(); // Not guarded - - // The "tasks in progress" list has: - // - File and data source tasks that are running - // - File tasks that are in the pending file queue - // It is used to determine when a job is done. It has both pending and running - // tasks because we do not lock the 'pendingFileTasks' and a task needs to be in - // at least one of the pending or inprogress lists at all times before it is completed. - // files are added to this when the are added to pendingFilesTasks and removed when they complete - private final List tasksInProgressAndPending = new ArrayList<>(); // Guarded by this - - synchronized static IngestScheduler getInstance() { - if (instance == null) { - instance = new IngestScheduler(); - } - return instance; - } - - private IngestScheduler() { - } - - void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - /** - * Creates an ingest job for a data source. - * - * @param dataSource The data source to ingest. - * @param ingestModuleTemplates The ingest module templates to use to create - * the ingest pipelines for the job. - * @param processUnallocatedSpace Whether or not the job should include - * processing of unallocated space. - * - * @return A collection of ingest module start up errors, empty on success. - * - * @throws InterruptedException - */ - List startIngestJob(Content dataSource, List ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException { - List errors = new ArrayList<>(); - if (enabled) { - long jobId = nextIngestJobId.incrementAndGet(); - IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace); - errors = job.start(ingestModuleTemplates); - if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) { - ingestJobsById.put(jobId, job); - IngestManager.getInstance().fireIngestJobStarted(jobId); - scheduleIngestTasks(job); - logger.log(Level.INFO, "Ingest job {0} started", jobId); - } - } - return errors; - } - - synchronized private void scheduleIngestTasks(IngestJob job) throws InterruptedException { - // This is synchronized to guard the task queues and make ingest - // scheduling for a job an an atomic operation. Otherwise, the data - // source task might be completed before the file tasks were scheduled, - // resulting in a false positive for a job completion check. - if (job.hasDataSourceIngestPipeline()) { - scheduleDataSourceIngestTask(job); - } - if (job.hasFileIngestPipeline()) { - scheduleFileIngestTasks(job); - } - } - - synchronized private void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException { - DataSourceIngestTask task = new DataSourceIngestTask(job); - tasksInProgressAndPending.add(task); - try { - // Should not block, queue is (theoretically) unbounded. - pendingDataSourceTasks.put(task); - } catch (InterruptedException ex) { - tasksInProgressAndPending.remove(task); - Logger.getLogger(IngestScheduler.class.getName()).log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS - throw ex; - } - } - - synchronized private void scheduleFileIngestTasks(IngestJob job) throws InterruptedException { - List topLevelFiles = getTopLevelFiles(job.getDataSource()); - for (AbstractFile firstLevelFile : topLevelFiles) { - FileIngestTask task = new FileIngestTask(job, firstLevelFile); - if (shouldEnqueueFileTask(task)) { - pendingRootDirectoryTasks.add(task); - } - } - updatePendingFileTasksQueues(); - } - - private static List getTopLevelFiles(Content dataSource) { - List topLevelFiles = new ArrayList<>(); - Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); - if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { - // The data source is itself a file to be processed. - topLevelFiles.add((AbstractFile) dataSource); - } else { - for (AbstractFile root : rootObjects) { - List children; - try { - children = root.getChildren(); - if (children.isEmpty()) { - // Add the root object itself, it could be an unallocated - // space file, or a child of a volume or an image. - topLevelFiles.add(root); - } else { - // The root object is a file system root directory, get - // the files within it. - for (Content child : children) { - if (child instanceof AbstractFile) { - topLevelFiles.add((AbstractFile) child); - } - } - } - } catch (TskCoreException ex) { - logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS - } - } - } - return topLevelFiles; - } - - synchronized private void updatePendingFileTasksQueues() throws InterruptedException { - // This is synchronized to guard the pending file tasks queues and make - // this an atomic operation. - if (enabled) { - while (true) { - // Loop until either the pending file tasks queue is NOT empty - // or the upstream queues that feed into it ARE empty. - if (pendingFileTasks.isEmpty() == false) { - return; - } - if (pendingDirectoryTasks.isEmpty()) { - if (pendingRootDirectoryTasks.isEmpty()) { - return; - } - pendingDirectoryTasks.add(pendingRootDirectoryTasks.pollFirst()); - } - - // Try to add the most recently added from the pending directory tasks queue to - // the pending file tasks queue. - boolean tasksEnqueuedForDirectory = false; - FileIngestTask directoryTask = pendingDirectoryTasks.remove(pendingDirectoryTasks.size() - 1); - if (shouldEnqueueFileTask(directoryTask)) { - addToPendingFileTasksQueue(directoryTask); - tasksEnqueuedForDirectory = true; - } - - // If the directory contains subdirectories or files, try to - // enqueue tasks for them as well. - final AbstractFile directory = directoryTask.getFile(); - try { - for (Content child : directory.getChildren()) { - if (child instanceof AbstractFile) { - AbstractFile file = (AbstractFile) child; - FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file); - if (file.hasChildren()) { - // Found a subdirectory, put the task in the - // pending directory tasks queue. - pendingDirectoryTasks.add(childTask); - tasksEnqueuedForDirectory = true; - } else if (shouldEnqueueFileTask(childTask)) { - // Found a file, put the task directly into the - // pending file tasks queue. - addToPendingFileTasksQueue(childTask); - tasksEnqueuedForDirectory = true; - } - } - } - } catch (TskCoreException ex) { - String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS - logger.log(Level.SEVERE, errorMessage, ex); - } - - // In the case where the directory task is not pushed into the - // the pending file tasks queue and has no children, check to - // see if the job is completed - the directory task might have - // been the last task for the job. - if (!tasksEnqueuedForDirectory) { - IngestJob job = directoryTask.getIngestJob(); - if (ingestJobIsComplete(job)) { - finishIngestJob(job); - } - } - } - } - } - - private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) { - final AbstractFile aFile = processTask.getFile(); - //if it's unalloc file, skip if so scheduled - if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { - return false; - } - String fileName = aFile.getName(); - if (fileName.equals(".") || fileName.equals("..")) { - return false; - } else if (aFile instanceof org.sleuthkit.datamodel.File) { - final org.sleuthkit.datamodel.File f = (File) aFile; - //skip files in root dir, starting with $, containing : (not default attributes) - //with meta address < 32, i.e. some special large NTFS and FAT files - FileSystem fs = null; - try { - fs = f.getFileSystem(); - } catch (TskCoreException ex) { - logger.log(Level.SEVERE, "Could not get FileSystem for " + f, ex); //NON-NLS - } - TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP; - if (fs != null) { - fsType = fs.getFsType(); - } - if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) { - //not fat or ntfs, accept all files - return true; - } - boolean isInRootDir = false; - try { - isInRootDir = f.getParentDirectory().isRoot(); - } catch (TskCoreException ex) { - logger.log(Level.WARNING, "Could not check if should enqueue the file: " + f.getName(), ex); //NON-NLS - } - if (isInRootDir && f.getMetaAddr() < 32) { - String name = f.getName(); - if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) { - return false; - } - } else { - return true; - } - } - return true; - } - - synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException { - tasksInProgressAndPending.add(task); - try { - // Should not block, queue is (theoretically) unbounded. - /* add to top of list because we had one image that had a folder - * with - * lots of zip files. This queue had thousands of entries because - * it just kept on getting bigger and bigger. So focus on pushing - * out - * the ZIP file contents out of the queue to try to keep it small. - */ - pendingFileTasks.addFirst(task); - } catch (IllegalStateException ex) { - tasksInProgressAndPending.remove(task); - Logger.getLogger(IngestScheduler.class.getName()).log(Level.SEVERE, "Interruption of unexpected block on pending file tasks queue", ex); //NON-NLS - throw ex; - } - } - - void scheduleAdditionalFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException { - if (enabled) { - FileIngestTask task = new FileIngestTask(job, file); - if (shouldEnqueueFileTask(task)) { - // Send the file task directly to file tasks queue, no need to - // update the pending root directory or pending directory tasks - // queues. - addToPendingFileTasksQueue(task); - } - } - } - - IngestTaskQueue getDataSourceIngestTaskQueue() { - return dataSourceTaskDispenser; - } - - IngestTaskQueue getFileIngestTaskQueue() { - return fileTaskDispenser; - } - - void notifyTaskCompleted(IngestTask task) { - boolean jobIsCompleted; - IngestJob job = task.getIngestJob(); - synchronized (this) { - tasksInProgressAndPending.remove(task); - jobIsCompleted = ingestJobIsComplete(job); - } - if (jobIsCompleted) { - // The lock does not need to be held for the job shut down. - finishIngestJob(job); - } - } - - /** - * Queries whether or not ingest jobs are running. - * - * @return True or false. - */ - boolean ingestJobsAreRunning() { - return !ingestJobsById.isEmpty(); - } - - /** - * Clears the pending ingest task queues for an ingest job. If job is - * complete (no pending or in progress tasks) the job is finished up. - * Otherwise, the last worker thread with an in progress task will finish / - * clean up the job. - * - * @param job The job to cancel. - */ - synchronized void cancelPendingTasksForIngestJob(IngestJob job) { - long jobId = job.getId(); - removeAllPendingTasksForJob(pendingRootDirectoryTasks, jobId); - removeAllPendingTasksForJob(pendingDirectoryTasks, jobId); - removeAllPendingTasksForJob(pendingFileTasks, jobId); - removeAllPendingTasksForJob(pendingDataSourceTasks, jobId); - if (ingestJobIsComplete(job)) { - finishIngestJob(job); - } - } - - /** - * Return the number of tasks in the queue for the given job ID - * - * @param - * @param queue - * @param jobId - * - * @return - */ - int countJobsInCollection(Collection queue, long jobId) { - Iterator iterator = queue.iterator(); - int count = 0; - while (iterator.hasNext()) { - IngestTask task = (IngestTask) iterator.next(); - if (task.getIngestJob().getId() == jobId) { - count++; - } - } - return count; - } - - synchronized private void removeAllPendingTasksForJob(Collection taskQueue, long jobId) { - Iterator iterator = taskQueue.iterator(); - while (iterator.hasNext()) { - IngestTask task = iterator.next(); - if (task.getIngestJob().getId() == jobId) { - tasksInProgressAndPending.remove(task); - iterator.remove(); - } - } - } - - void cancelAllIngestJobs() { - synchronized (this) { - removeAllPendingTasks(pendingRootDirectoryTasks); - removeAllPendingTasks(pendingDirectoryTasks); - removeAllPendingTasks(pendingFileTasks); - removeAllPendingTasks(pendingDataSourceTasks); - for (IngestJob job : ingestJobsById.values()) { - job.cancel(); - if (ingestJobIsComplete(job)) { - finishIngestJob(job); - } - } - } - } - - synchronized private void removeAllPendingTasks(Collection taskQueue) { - Iterator iterator = taskQueue.iterator(); - while (iterator.hasNext()) { - tasksInProgressAndPending.remove((IngestTask) iterator.next()); - iterator.remove(); - } - } - - synchronized private boolean ingestJobIsComplete(IngestJob job) { - for (IngestTask task : tasksInProgressAndPending) { - if (task.getIngestJob().getId() == job.getId()) { - return false; - } - } - return true; - } - - /** - * Called after all work is completed to free resources. - * - * @param job - */ - private void finishIngestJob(IngestJob job) { - job.finish(); - long jobId = job.getId(); - ingestJobsById.remove(jobId); - if (!job.isCancelled()) { - logger.log(Level.INFO, "Ingest job {0} completed", jobId); - IngestManager.getInstance().fireIngestJobCompleted(job.getId()); - } else { - logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); - IngestManager.getInstance().fireIngestJobCancelled(job.getId()); - } - } - - private static class RootDirectoryTaskComparator implements Comparator { - - @Override - public int compare(FileIngestTask q1, FileIngestTask q2) { - AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile()); - AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile()); - if (p1 == p2) { - return (int) (q2.getFile().getId() - q1.getFile().getId()); - } else { - return p2.ordinal() - p1.ordinal(); - } - } - - private static class AbstractFilePriority { - - enum Priority { - - LAST, LOW, MEDIUM, HIGH - } - - static final List LAST_PRI_PATHS = new ArrayList<>(); - - static final List LOW_PRI_PATHS = new ArrayList<>(); - - static final List MEDIUM_PRI_PATHS = new ArrayList<>(); - - static final List HIGH_PRI_PATHS = new ArrayList<>(); - /* prioritize root directory folders based on the assumption that we - * are - * looking for user content. Other types of investigations may want - * different - * priorities. */ - - static /* prioritize root directory - * folders based on the assumption that we are - * looking for user content. Other types of investigations may want - * different - * priorities. */ { - // these files have no structure, so they go last - //unalloc files are handled as virtual files in getPriority() - //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE)); - //LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE)); - LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE)); - LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE)); - // orphan files are often corrupt and windows does not typically have - // user content, so put them towards the bottom - LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE)); - LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE)); - // all other files go into the medium category too - MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE)); - // user content is top priority - HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE)); - HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE)); - HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE)); - HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE)); - } - - /** - * Get the enabled priority for a given file. - * - * @param abstractFile - * - * @return - */ - static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) { - if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) { - //quickly filter out unstructured content - //non-fs virtual files and dirs, such as representing unalloc space - return AbstractFilePriority.Priority.LAST; - } - //determine the fs files priority by name - final String path = abstractFile.getName(); - if (path == null) { - return AbstractFilePriority.Priority.MEDIUM; - } - for (Pattern p : HIGH_PRI_PATHS) { - Matcher m = p.matcher(path); - if (m.find()) { - return AbstractFilePriority.Priority.HIGH; - } - } - for (Pattern p : MEDIUM_PRI_PATHS) { - Matcher m = p.matcher(path); - if (m.find()) { - return AbstractFilePriority.Priority.MEDIUM; - } - } - for (Pattern p : LOW_PRI_PATHS) { - Matcher m = p.matcher(path); - if (m.find()) { - return AbstractFilePriority.Priority.LOW; - } - } - for (Pattern p : LAST_PRI_PATHS) { - Matcher m = p.matcher(path); - if (m.find()) { - return AbstractFilePriority.Priority.LAST; - } - } - //default is medium - return AbstractFilePriority.Priority.MEDIUM; - } - } - } - - private final class DataSourceIngestTaskQueue implements IngestTaskQueue { - - @Override - public IngestTask getNextTask() throws InterruptedException { - return pendingDataSourceTasks.take(); - } - } - - private final class FileIngestTaskQueue implements IngestTaskQueue { - - @Override - public IngestTask getNextTask() throws InterruptedException { - FileIngestTask task = pendingFileTasks.takeFirst(); - updatePendingFileTasksQueues(); - return task; - } - } - - /** - * Stores basic stats for a given job - */ - class IngestJobSchedulerStats { - - private final IngestJobStats ingestJobStats; - - private final long jobId; - - private final String dataSource; - - private final long rootQueueSize; - - private final long dirQueueSize; - - private final long fileQueueSize; - - private final long dsQueueSize; - - private final long runningListSize; - - IngestJobSchedulerStats(IngestJob job) { - ingestJobStats = job.getStats(); - jobId = job.getId(); - dataSource = job.getDataSource().getName(); - rootQueueSize = countJobsInCollection(pendingRootDirectoryTasks, jobId); - dirQueueSize = countJobsInCollection(pendingDirectoryTasks, jobId); - fileQueueSize = countJobsInCollection(pendingFileTasks, jobId); - dsQueueSize = countJobsInCollection(pendingDataSourceTasks, jobId); - runningListSize = countJobsInCollection(tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize; - } - - protected long getJobId() { - return jobId; - } - - protected String getDataSource() { - return dataSource; - } - - protected long getRootQueueSize() { - return rootQueueSize; - } - - protected long getDirQueueSize() { - return dirQueueSize; - } - - protected long getFileQueueSize() { - return fileQueueSize; - } - - protected long getDsQueueSize() { - return dsQueueSize; - } - - protected long getRunningListSize() { - return runningListSize; - } - - protected IngestJobStats getIngestJobStats() { - return ingestJobStats; - } - } - - /** - * Get basic performance / stats on all running jobs - * - * @return - */ - synchronized List getJobStats() { - List stats = new ArrayList<>(); - for (IngestJob job : Collections.list(ingestJobsById.elements())) { - stats.add(new IngestJobSchedulerStats(job)); - } - return stats; - } -} diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java new file mode 100755 index 0000000000..a9abef15f7 --- /dev/null +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestTasksScheduler.java @@ -0,0 +1,757 @@ +/* + * Autopsy Forensic Browser + * + * Copyright 2012-2014 Basis Technology Corp. + * Contact: carrier sleuthkit org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.sleuthkit.autopsy.ingest; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Level; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.sleuthkit.autopsy.coreutils.Logger; +import org.sleuthkit.datamodel.AbstractFile; +import org.sleuthkit.datamodel.Content; +import org.sleuthkit.datamodel.FileSystem; +import org.sleuthkit.datamodel.TskCoreException; +import org.sleuthkit.datamodel.TskData; + +/** + * Creates ingest tasks for ingest jobs, queuing the tasks in priority order for + * execution by the ingest manager's ingest threads. + */ +final class IngestTasksScheduler { + + private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName()); + private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue(); + private static IngestTasksScheduler instance; + + // Scheduling of data source ingest tasks is accomplished by putting them + // in a FIFO queue to be consumed by the ingest threads. The pending data + // tasks queue is therefore wrapped in a "dispenser" that implements the + // IngestTaskQueue interface and is exposed via a getter method. + private final LinkedBlockingQueue pendingDataSourceTasks; + private final DataSourceIngestTaskQueue dataSourceTasksDispenser; + + // Scheduling of file ingest tasks is accomplished by "shuffling" them + // through a sequence of internal queues that allows for the interleaving of + // tasks from different ingest jobs based on priority. These scheduling + // queues are: + // 1. root directory tasks (priority queue) + // 2. directory tasks (FIFO queue) + // 3. pending file tasks (LIFO queue) + // Tasks in the pending file tasks queue are ready to be consumed by the + // ingest threads. The pending file tasks queue is therefore wrapped in a + // "dispenser" that implements the IngestTaskQueue interface and is exposed + // via a getter method. + private final TreeSet rootDirectoryTasks; + private final List directoryTasks; + private final BlockingDeque pendingFileTasks; + private final FileIngestTaskQueue fileTasksDispenser; + + // The ingest scheduler is responsible for notifying an ingest jobs whenever + // all of the ingest tasks currently associated with the job are complete. + // To make this possible, the ingest tasks scheduler needs to keep track not + // only of the tasks in its queues, but also of the tasks that have been + // handed out for processing by code running on the ingest manager's ingest + // threads. Therefore all ingest tasks are added to this list and are not + // removed when an ingest thread takes an ingest task. Instead, the ingest + // thread calls back into the scheduler when the task is completed, at + // which time the task will be removed from this list. + private final List tasksInProgressAndPending; + + /** + * Gets the ingest tasks scheduler singleton. + */ + synchronized static IngestTasksScheduler getInstance() { + if (IngestTasksScheduler.instance == null) { + IngestTasksScheduler.instance = new IngestTasksScheduler(); + } + return IngestTasksScheduler.instance; + } + + /** + * Constructs an ingest tasks scheduler. + */ + private IngestTasksScheduler() { + this.pendingDataSourceTasks = new LinkedBlockingQueue<>(); + this.dataSourceTasksDispenser = new DataSourceIngestTaskQueue(); + this.rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); + this.directoryTasks = new ArrayList<>(); + this.pendingFileTasks = new LinkedBlockingDeque<>(); + this.fileTasksDispenser = new FileIngestTaskQueue(); + this.tasksInProgressAndPending = new ArrayList<>(); + } + + /** + * Gets this ingest task scheduler's implementation of the IngestTaskQueue + * interface for data source ingest tasks. + * + * @return The data source ingest tasks queue. + */ + IngestTaskQueue getDataSourceIngestTaskQueue() { + return this.dataSourceTasksDispenser; + } + + /** + * Gets this ingest task scheduler's implementation of the IngestTaskQueue + * interface for file ingest tasks. + * + * @return The file ingest tasks queue. + */ + IngestTaskQueue getFileIngestTaskQueue() { + return this.fileTasksDispenser; + } + + /** + * Schedules a data source ingest task and file ingest tasks for an ingest + * job. + * + * @param job The job for which the tasks are to be scheduled. + * @throws InterruptedException if the calling thread is blocked due to a + * full tasks queue and is interrupted. + */ + synchronized void scheduleIngestTasks(IngestJob job) throws InterruptedException { + // The initial ingest scheduling for a job an an atomic operation. + // Otherwise, the data source task might be completed before the file + // tasks are created, resulting in a potential false positive when this + // task scheduler checks whether or not all the tasks for the job are + // completed. + if (job.hasDataSourceIngestPipeline()) { + scheduleDataSourceIngestTask(job); + } + if (job.hasFileIngestPipeline()) { + scheduleFileIngestTasks(job); + } + } + + /** + * Schedules a data source ingest task for an ingest job. + * + * @param job The job for which the tasks are to be scheduled. + * @throws InterruptedException if the calling thread is blocked due to a + * full tasks queue and is interrupted. + */ + synchronized void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException { + // Create a data source ingest task for the data source associated with + // the ingest job and add the task to the pending data source tasks + // queue. Data source tasks are scheduled on a first come, first served + // basis. + DataSourceIngestTask task = new DataSourceIngestTask(job); + this.tasksInProgressAndPending.add(task); + try { + // This call should not block because the queue is (theoretically) + // unbounded. + this.pendingDataSourceTasks.put(task); + } catch (InterruptedException ex) { + this.tasksInProgressAndPending.remove(task); + IngestTasksScheduler.logger.log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS + throw ex; + } + } + + /** + * Schedules file ingest tasks for an ingest job. + * + * @param job The job for which the tasks are to be scheduled. + * @throws InterruptedException if the calling thread is blocked due to a + * full tasks queue and is interrupted. + */ + synchronized void scheduleFileIngestTasks(IngestJob job) throws InterruptedException { + // Get the top level files for the data source associated with this job + // and add them to the root directories priority queue. The file tasks + // may be interleaved with file tasks from other jobs, based on priority. + List topLevelFiles = getTopLevelFiles(job.getDataSource()); + for (AbstractFile firstLevelFile : topLevelFiles) { + FileIngestTask task = new FileIngestTask(job, firstLevelFile); + if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { + this.tasksInProgressAndPending.add(task); + this.rootDirectoryTasks.add(task); + } + } + shuffleFileTaskQueues(); + } + + /** + * Schedules a file ingest task for an ingest job. + * + * @param job The job for which the tasks are to be scheduled. + * @param file The file associated with the task. + * @throws InterruptedException if the calling thread is blocked due to a + * full tasks queue and is interrupted. + */ + void scheduleFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException, IllegalStateException { + FileIngestTask task = new FileIngestTask(job, file); + if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { + // This synchronized method sends the file task directly to the + // pending file tasks queue. This is done to prioritize derived + // and carved files generated by a file ingest task in progress. + addToPendingFileTasksQueue(task); + } + } + + /** + * Allows an ingest thread to notify this ingest task scheduler that a task + * has been completed. + * + * @param task The completed task. + */ + synchronized void notifyTaskCompleted(IngestTask task) throws InterruptedException { + tasksInProgressAndPending.remove(task); + IngestJob job = task.getIngestJob(); + if (this.tasksForJobAreCompleted(job)) { + job.notifyTasksCompleted(); + } + } + + /** + * Clears the task scheduling queues for an ingest job, but does nothing + * about tasks that have already been taken by ingest threads. Those tasks + * will be flushed out when the ingest threads call back with their task + * completed notifications. + * + * @param job The job for which the tasks are to to canceled. + */ + synchronized void cancelPendingTasksForIngestJob(IngestJob job) { + // The scheduling queues are cleared of tasks for the job, and the tasks + // that are removed from the scheduling queues are also removed from the + // tasks in progress list. However, a tasks in progress check for the + // job may still return true since the tasks that have been taken by the + // ingest threads are still in the tasks in progress list. + long jobId = job.getId(); + this.removeTasksForJob(this.rootDirectoryTasks, jobId); + this.removeTasksForJob(this.directoryTasks, jobId); + this.removeTasksForJob(this.pendingFileTasks, jobId); + this.removeTasksForJob(this.pendingDataSourceTasks, jobId); + if (this.tasksForJobAreCompleted(job)) { + job.notifyTasksCompleted(); + } + } + + /** + * A helper that gets the top level files such as file system root + * directories, layout files and virtual directories for a data source. Used + * to create file tasks to put into the root directories queue. + * + * @param dataSource The data source. + * @return A list of top level files. + */ + private static List getTopLevelFiles(Content dataSource) { + List topLevelFiles = new ArrayList<>(); + Collection rootObjects = dataSource.accept(new GetRootDirectoryVisitor()); + if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) { + // The data source is itself a file to be processed. + topLevelFiles.add((AbstractFile) dataSource); + } else { + for (AbstractFile root : rootObjects) { + List children; + try { + children = root.getChildren(); + if (children.isEmpty()) { + // Add the root object itself, it could be an unallocated + // space file, or a child of a volume or an image. + topLevelFiles.add(root); + } else { + // The root object is a file system root directory, get + // the files within it. + for (Content child : children) { + if (child instanceof AbstractFile) { + topLevelFiles.add((AbstractFile) child); + } + } + } + } catch (TskCoreException ex) { + logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS + } + } + } + return topLevelFiles; + } + + /** + * A helper that "shuffles" the file task queues to ensure that there is at + * least one task in the pending file ingest tasks queue, as long as there + * are still file ingest tasks to be performed. + * + * @throws InterruptedException if the calling thread is blocked due to a + * full tasks queue and is interrupted. + */ + synchronized private void shuffleFileTaskQueues() throws InterruptedException, IllegalStateException { + // This is synchronized because it is called both by synchronized + // methods of this ingest scheduler and an unsynchronized method of its + // file tasks "dispenser". + while (true) { + // Loop until either the pending file tasks queue is NOT empty + // or the upstream queues that feed into it ARE empty. + if (!this.pendingFileTasks.isEmpty()) { + // There are file tasks ready to be consumed, exit. + return; + } + if (this.directoryTasks.isEmpty()) { + if (this.rootDirectoryTasks.isEmpty()) { + // There are no root directory tasks to move into the + // directory queue, exit. + return; + } else { + // Move the next root directory task into the + // directories queue. Note that the task was already + // added to the tasks in progress list when the task was + // created in scheduleFileIngestTasks(). + this.directoryTasks.add(this.rootDirectoryTasks.pollFirst()); + } + } + + // Try to add the most recently added directory from the + // directory tasks queue to the pending file tasks queue. Note + // the removal of the task from the tasks in progress list. If + // the task is enqueued, it will be put back in the list by + // the addToPendingFileTasksQueue() helper. + boolean tasksEnqueuedForDirectory = false; + FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1); + this.tasksInProgressAndPending.remove(directoryTask); + if (shouldEnqueueFileTask(directoryTask)) { + addToPendingFileTasksQueue(directoryTask); + tasksEnqueuedForDirectory = true; + } + + // If the directory contains subdirectories or files, try to + // enqueue tasks for them as well. + final AbstractFile directory = directoryTask.getFile(); + try { + for (Content child : directory.getChildren()) { + if (child instanceof AbstractFile) { + AbstractFile file = (AbstractFile) child; + FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file); + if (file.hasChildren()) { + // Found a subdirectory, put the task in the + // pending directory tasks queue. Note the + // addition of the task to the tasks in progress + // list. This is necessary because this is the + // first appearance of this task in the queues. + this.tasksInProgressAndPending.add(childTask); + this.directoryTasks.add(childTask); + tasksEnqueuedForDirectory = true; + } else if (shouldEnqueueFileTask(childTask)) { + // Found a file, put the task directly into the + // pending file tasks queue. The new task will + // be put into the tasks in progress list by the + // addToPendingFileTasksQueue() helper. + addToPendingFileTasksQueue(childTask); + tasksEnqueuedForDirectory = true; + } + } + } + } catch (TskCoreException ex) { + String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS + logger.log(Level.SEVERE, errorMessage, ex); + } + + // In the case where the directory task is not pushed into the + // the pending file tasks queue and has no children, check to + // see if the job is completed - the directory task might have + // been the last task for the job. + if (!tasksEnqueuedForDirectory) { + IngestJob job = directoryTask.getIngestJob(); + if (this.tasksForJobAreCompleted(job)) { + job.notifyTasksCompleted(); + } + } + } + } + + /** + * A helper method that examines the file associated with a file ingest task + * to determine whether or not the file should be processed and therefore + * the task should be enqueued. + * + * @param task The task to be scrutinized. + * @return True or false. + */ + private static boolean shouldEnqueueFileTask(final FileIngestTask task) { + final AbstractFile file = task.getFile(); + + // Skip the task if the file is an unallocated space file and the + // process unallocated space flag is not set for this job. + if (!task.getIngestJob().shouldProcessUnallocatedSpace() + && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) { + return false; + } + + // Skip the task if the file is actually the pseudo-file for the parent + // or current directory. + String fileName = file.getName(); + if (fileName.equals(".") || fileName.equals("..")) { + return false; + } + + // Skip the task if the file is one of a select group of special, large + // NTFS or FAT file system files. + // the file is in the root directory, has a file name + // starting with $, containing : (not default attributes) + //with meta address < 32, i.e. some special large NTFS and FAT files + if (file instanceof org.sleuthkit.datamodel.File) { + final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file; + + // Get the type of the file system, if any, that owns the file. + TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP; + try { + FileSystem fs = f.getFileSystem(); + if (fs != null) { + fsType = fs.getFsType(); + } + } catch (TskCoreException ex) { + logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS + } + + // If the file system is not NTFS or FAT, don't skip the file. + if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) { + return true; + } + + // Find out whether the file is in a root directory. + boolean isInRootDir = false; + try { + AbstractFile parent = f.getParentDirectory(); + isInRootDir = parent.isRoot(); + } catch (TskCoreException ex) { + logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS + } + + // If the file is in the root directory of an NTFS or FAT file + // system, check its meta-address and check its name for the '$' + // character and a ':' character (not a default attribute). + if (isInRootDir && f.getMetaAddr() < 32) { + String name = f.getName(); + if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) { + return false; + } + } + } + + return true; + } + + /** + * A helper method to safely add a file ingest task to the blocking pending + * tasks queue. + * + * @param task + * @throws IllegalStateException + */ + synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException { + tasksInProgressAndPending.add(task); + try { + // The file is added to the front of the pending files queue because + // at least one image has been processed that had a folder full of + // archive files. The queue grew to have thousands of entries, so + // this (might) help with pushing those files through ingest. + this.pendingFileTasks.addFirst(task); + } catch (IllegalStateException ex) { + tasksInProgressAndPending.remove(task); + Logger.getLogger(IngestTasksScheduler.class.getName()).log(Level.SEVERE, "Pending file tasks queue is full", ex); //NON-NLS + throw ex; + } + } + + /** + * Determines whether or not all current ingest tasks for an ingest job are + * completed. + * + * @param job The job for which the query is to be performed. + * @return True or false. + */ + private boolean tasksForJobAreCompleted(IngestJob job) { + for (IngestTask task : tasksInProgressAndPending) { + if (task.getIngestJob().getId() == job.getId()) { + return false; + } + } + return true; + } + + /** + * A helper that removes all of the ingest tasks associated with an ingest + * job from a tasks queue. The task is removed from the the tasks in + * progress list as well. + * + * @param taskQueue The queue from which to remove the tasks. + * @param jobId The id of the job for which the tasks are to be removed. + */ + private void removeTasksForJob(Collection taskQueue, long jobId) { + Iterator iterator = taskQueue.iterator(); + while (iterator.hasNext()) { + IngestTask task = iterator.next(); + if (task.getIngestJob().getId() == jobId) { + this.tasksInProgressAndPending.remove(task); + iterator.remove(); + } + } + } + + /** + * A helper that counts the number of ingest tasks in a task queue for a + * given job. + * + * @param queue The queue for which to count tasks. + * @param jobId The id of the job for which the tasks are to be counted. + * @return The count. + */ + private static int countTasksForJob(Collection queue, long jobId) { + Iterator iterator = queue.iterator(); + int count = 0; + while (iterator.hasNext()) { + IngestTask task = (IngestTask) iterator.next(); + if (task.getIngestJob().getId() == jobId) { + count++; + } + } + return count; + } + + /** + * RJCTODO + * + * @param jobId + * @return + */ + synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) { + return new IngestJobTasksSnapshot(jobId); + } + + /** + * Prioritizes tasks for the root directories file ingest tasks queue (file + * system root directories, layout files and virtual directories). + */ + private static class RootDirectoryTaskComparator implements Comparator { + + @Override + public int compare(FileIngestTask q1, FileIngestTask q2) { + AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile()); + AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile()); + if (p1 == p2) { + return (int) (q2.getFile().getId() - q1.getFile().getId()); + } else { + return p2.ordinal() - p1.ordinal(); + } + } + + private static class AbstractFilePriority { + + enum Priority { + + LAST, LOW, MEDIUM, HIGH + } + + static final List LAST_PRI_PATHS = new ArrayList<>(); + + static final List LOW_PRI_PATHS = new ArrayList<>(); + + static final List MEDIUM_PRI_PATHS = new ArrayList<>(); + + static final List HIGH_PRI_PATHS = new ArrayList<>(); + /* prioritize root directory folders based on the assumption that we + * are + * looking for user content. Other types of investigations may want + * different + * priorities. */ + + static /* prioritize root directory + * folders based on the assumption that we are + * looking for user content. Other types of investigations may want + * different + * priorities. */ { + // these files have no structure, so they go last + //unalloc files are handled as virtual files in getPriority() + //LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE)); + //LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE)); + LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE)); + LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE)); + // orphan files are often corrupt and windows does not typically have + // user content, so put them towards the bottom + LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE)); + LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE)); + // all other files go into the medium category too + MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE)); + // user content is top priority + HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE)); + HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE)); + HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE)); + HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE)); + } + + /** + * Get the enabled priority for a given file. + * + * @param abstractFile + * + * @return + */ + static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) { + if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) { + //quickly filter out unstructured content + //non-fs virtual files and dirs, such as representing unalloc space + return AbstractFilePriority.Priority.LAST; + } + //determine the fs files priority by name + final String path = abstractFile.getName(); + if (path == null) { + return AbstractFilePriority.Priority.MEDIUM; + } + for (Pattern p : HIGH_PRI_PATHS) { + Matcher m = p.matcher(path); + if (m.find()) { + return AbstractFilePriority.Priority.HIGH; + } + } + for (Pattern p : MEDIUM_PRI_PATHS) { + Matcher m = p.matcher(path); + if (m.find()) { + return AbstractFilePriority.Priority.MEDIUM; + } + } + for (Pattern p : LOW_PRI_PATHS) { + Matcher m = p.matcher(path); + if (m.find()) { + return AbstractFilePriority.Priority.LOW; + } + } + for (Pattern p : LAST_PRI_PATHS) { + Matcher m = p.matcher(path); + if (m.find()) { + return AbstractFilePriority.Priority.LAST; + } + } + //default is medium + return AbstractFilePriority.Priority.MEDIUM; + } + } + } + + /** + * Wraps access to pending data source ingest tasks in the interface + * required by the ingest threads. + */ + private final class DataSourceIngestTaskQueue implements IngestTaskQueue { + + /** + * @inheritDoc + */ + @Override + public IngestTask getNextTask() throws InterruptedException { + return IngestTasksScheduler.this.pendingDataSourceTasks.take(); + } + } + + /** + * Wraps access to pending file ingest tasks in the interface required by + * the ingest threads. + */ + private final class FileIngestTaskQueue implements IngestTaskQueue { + + /** + * @inheritDoc + */ + @Override + public IngestTask getNextTask() throws InterruptedException { + FileIngestTask task = IngestTasksScheduler.this.pendingFileTasks.takeFirst(); + shuffleFileTaskQueues(); + return task; + } + + } + + /** + * A snapshot of ingest tasks data for an ingest job. + */ + class IngestJobTasksSnapshot { + private final long jobId; + private final long rootQueueSize; + private final long dirQueueSize; + private final long fileQueueSize; + private final long dsQueueSize; + private final long runningListSize; + + /** + * RJCTODO + * @param jobId + */ + IngestJobTasksSnapshot(long jobId) { + this.jobId = jobId; + this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootDirectoryTasks, jobId); + this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId); + this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId); + this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId); + this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize; + } + + /** + * RJCTODO + * @return + */ + long getJobId() { + return jobId; + } + + /** + * RJCTODO + * @return + */ + long getRootQueueSize() { + return rootQueueSize; + } + + /** + * RJCTODO + * @return + */ + long getDirQueueSize() { + return dirQueueSize; + } + + /** + * RJCTODO + * @return + */ + long getFileQueueSize() { + return fileQueueSize; + } + + /** + * RJCTODO + * @return + */ + long getDsQueueSize() { + return dsQueueSize; + } + + /** + * RJCTODO + * @return + */ + long getRunningListSize() { + return runningListSize; + } + } + +}