Partially implement multi-stage ingest

This commit is contained in:
Richard Cordovano 2014-11-02 17:51:41 -05:00
parent 7575670be7
commit 90a103455b
8 changed files with 1344 additions and 1008 deletions

View File

@ -61,7 +61,6 @@ import org.sleuthkit.datamodel.BlackboardArtifact.ARTIFACT_TYPE;
import org.sleuthkit.datamodel.BlackboardAttribute.ATTRIBUTE_TYPE;
import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.Image;
import org.sleuthkit.datamodel.TskCoreException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;

View File

@ -67,7 +67,7 @@ public class DataSourceIngestModuleProgress {
* @param message Message to display
*/
public void progress(String message) {
this.job.advanceDataSourceIngestProgressBar(message);
this.job.advanceDataSourceIngestProgressBar(message); // RJCTODO: Is this right?
}
/**

View File

@ -21,7 +21,9 @@ package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import javax.swing.JOptionPane;
import org.netbeans.api.progress.ProgressHandle;
@ -39,13 +41,29 @@ import org.sleuthkit.datamodel.Content;
final class IngestJob {
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
private static final IngestScheduler ingestScheduler = IngestScheduler.getInstance();
private static final IngestTasksScheduler ingestScheduler = IngestTasksScheduler.getInstance();
// These static fields are used for the creation and management of ingest
// jobs in progress.
private static volatile boolean jobCreationIsEnabled;
private static final AtomicLong nextIngestJobId = new AtomicLong(0L);
private static final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
// An ingest job may have multiple stages.
private enum Stages {
FIRST, // High priority data source ingest modules plus file ingest modules
SECOND // Low priority data source ingest modules
};
// These fields define the ingest job and the work it entails.
private final long id;
private final Content dataSource;
private final boolean processUnallocatedSpace;
private Stages stage;
private DataSourceIngestPipeline dataSourceIngestPipeline;
private DataSourceIngestPipeline firstStageDataSourceIngestPipeline;
private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelines;
// These fields are used to update the ingest progress UI components. The
@ -68,6 +86,74 @@ final class IngestJob {
// This field is used for generating ingest job diagnostic data.
private final long startTime;
/**
* Enables and disables ingest job creation.
*
* @param enabled True or false.
*/
static void jobCreationEnabled(boolean enabled) {
IngestJob.jobCreationIsEnabled = enabled;
}
/**
* Creates an ingest job for a data source.
*
* @param dataSource The data source to ingest.
* @param ingestModuleTemplates The ingest module templates to use to create
* the ingest pipelines for the job.
* @param processUnallocatedSpace Whether or not the job should include
* processing of unallocated space.
*
* @return A collection of ingest module start up errors, empty on success.
*
* @throws InterruptedException
*/
static List<IngestModuleError> startJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException {
List<IngestModuleError> errors = new ArrayList<>();
if (IngestJob.jobCreationIsEnabled) {
long jobId = nextIngestJobId.incrementAndGet();
IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace);
errors = job.start(ingestModuleTemplates);
if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) {
ingestJobsById.put(jobId, job);
IngestManager.getInstance().fireIngestJobStarted(jobId);
IngestJob.ingestScheduler.scheduleIngestTasks(job);
logger.log(Level.INFO, "Ingest job {0} started", jobId);
}
}
return errors;
}
/**
* Queries whether or not ingest jobs are running.
*
* @return True or false.
*/
static boolean ingestJobsAreRunning() {
return !ingestJobsById.isEmpty();
}
/**
* RJCTODO
* @return
*/
static List<IngestJobSnapshot> getJobSnapshots() {
List<IngestJobSnapshot> snapShots = new ArrayList<>();
for (IngestJob job : IngestJob.ingestJobsById.values()) {
snapShots.add(job.getIngestJobSnapshot());
}
return snapShots;
}
/**
* RJCTODO
*/
static void cancelAllJobs() {
for (IngestJob job : ingestJobsById.values()) {
job.cancel();
}
}
/**
* Constructs an ingest job.
*
@ -80,6 +166,7 @@ final class IngestJob {
this.id = id;
this.dataSource = dataSource;
this.processUnallocatedSpace = processUnallocatedSpace;
this.stage = IngestJob.Stages.FIRST;
this.fileIngestPipelines = new LinkedBlockingQueue<>();
this.filesInProgress = new ArrayList<>();
this.dataSourceIngestProgressLock = new Object();
@ -122,19 +209,344 @@ final class IngestJob {
* @throws InterruptedException
*/
List<IngestModuleError> start(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException {
createIngestPipelines(ingestModuleTemplates);
this.createIngestPipelines(ingestModuleTemplates);
List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) {
if (!this.dataSourceIngestPipeline.isEmpty()) {
startDataSourceIngestProgressBar();
this.startDataSourceIngestProgressBar();
}
if (!this.fileIngestPipelines.peek().isEmpty()) {
startFileIngestProgressBar();
this.startFileIngestProgressBar();
}
}
return errors;
}
/**
* Checks to see if this job has a data source ingest pipeline.
*
* @return True or false.
*/
boolean hasDataSourceIngestPipeline() {
return (this.dataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if the job has a file ingest pipeline.
*
* @return True or false.
*/
boolean hasFileIngestPipeline() {
return (this.fileIngestPipelines.peek().isEmpty() == false);
}
/**
* Passes the data source for this job through the data source ingest
* pipeline.
*
* @param task A data source ingest task wrapping the data source.
* @throws InterruptedException
*/
void process(DataSourceIngestTask task) throws InterruptedException {
try {
if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) {
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(this.dataSourceIngestPipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
}
// Shut down the data source ingest progress bar right away.
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.finish();
this.dataSourceIngestProgress = null;
}
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
}
}
/**
* Passes the a file from the data source for this job through the file
* ingest pipeline.
*
* @param task A file ingest task.
* @throws InterruptedException
*/
void process(FileIngestTask task) throws InterruptedException {
try {
if (!this.isCancelled()) {
// Get a file ingest pipeline not currently in use by another
// file ingest thread.
FileIngestPipeline pipeline = this.fileIngestPipelines.take();
if (!pipeline.isEmpty()) {
// Get the file to process.
AbstractFile file = task.getFile();
// Update the file ingest progress bar.
synchronized (this.fileIngestProgressLock) {
++this.processedFiles;
if (this.processedFiles <= this.estimatedFilesToProcess) {
this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles);
} else {
this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess);
}
this.filesInProgress.add(file.getName());
}
// Run the file through the pipeline.
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(pipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
// Update the file ingest progress bar again in case the
// file was being displayed.
if (!this.cancelled) {
synchronized (this.fileIngestProgressLock) {
this.filesInProgress.remove(file.getName());
if (this.filesInProgress.size() > 0) {
this.fileIngestProgress.progress(this.filesInProgress.get(0));
} else {
this.fileIngestProgress.progress("");
}
}
}
}
// Relinquish the pipeline so it can be reused by another file
// ingest thread.
this.fileIngestPipelines.put(pipeline);
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
}
}
/**
*
* @param file
*/
void addFiles(List<AbstractFile> files) {
// RJCTODO: Add handling of lack of support for file ingest in second stage
for (AbstractFile file : files) {
try {
// RJCTODO: Deal with possible IllegalStateException; maybe don't need logging here
IngestJob.ingestScheduler.scheduleFileIngestTask(this, file);
} catch (InterruptedException ex) {
// Handle the unexpected interrupt here rather than make ingest
// module writers responsible for writing this exception handler.
// The interrupt flag of the thread is reset for detection by
// the thread task code.
Thread.currentThread().interrupt();
IngestJob.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS
}
}
}
/**
* Allows the ingest tasks scheduler to notify this ingest job whenever all
* the scheduled tasks for this ingest job have been completed.
*/
void notifyTasksCompleted() {
switch (this.stage) {
case FIRST:
this.finishFirstStage();
this.startSecondStage();
break;
case SECOND:
this.finish();
break;
}
}
/**
* Updates the display name of the data source ingest progress bar.
*
* @param displayName The new display name.
*/
void updateDataSourceIngestProgressBarDisplayName(String displayName) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.setDisplayName(displayName);
}
}
}
/**
* Switches the data source progress bar to determinate mode. This should be
* called if the total work units to process the data source is known.
*
* @param workUnits Total number of work units for the processing of the
* data source.
*/
void switchDataSourceIngestProgressBarToDeterminate(int workUnits) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.switchToDeterminate(workUnits);
}
}
}
}
/**
* Switches the data source ingest progress bar to indeterminate mode. This
* should be called if the total work units to process the data source is
* unknown.
*/
void switchDataSourceIngestProgressBarToIndeterminate() {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.switchToIndeterminate();
}
}
}
}
/**
* Updates the data source ingest progress bar with the number of work units
* performed, if in the determinate mode.
*
* @param workUnits Number of work units performed.
*/
void advanceDataSourceIngestProgressBar(int workUnits) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.progress("", workUnits);
}
}
}
}
// RJCTODO: Is this right?
/**
* Updates the data source ingest progress bar display name.
*
* @param displayName The new display name.
*/
void advanceDataSourceIngestProgressBar(String displayName) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.progress(displayName);
}
}
}
}
/**
* Updates the progress bar with the number of work units performed, if in
* the determinate mode.
*
* @param message Message to display in sub-title
* @param workUnits Number of work units performed.
*/
void advanceDataSourceIngestProgressBar(String message, int workUnits) {
if (!this.cancelled) {
synchronized (this.fileIngestProgressLock) {
this.dataSourceIngestProgress.progress(message, workUnits);
}
}
}
/**
* Determines whether or not a temporary cancellation of data source ingest
* in order to stop the currently executing data source ingest module is in
* effect.
*
* @return True or false.
*/
boolean currentDataSourceIngestModuleIsCancelled() {
return this.currentDataSourceIngestModuleCancelled;
}
/**
* Rescind a temporary cancellation of data source ingest in order to stop
* the currently executing data source ingest module.
*/
void currentDataSourceIngestModuleCancellationCompleted() {
this.currentDataSourceIngestModuleCancelled = false;
// A new progress bar must be created because the cancel button of the
// previously constructed component is disabled by NetBeans when the
// user selects the "OK" button of the cancellation confirmation dialog
// popped up by NetBeans when the progress bar cancel button was
// pressed.
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.finish();
this.dataSourceIngestProgress = null;
this.startDataSourceIngestProgressBar();
}
}
/**
* Requests cancellation of ingest, i.e., a shutdown of the data source and
* file ingest pipelines.
*/
void cancel() {
// Put a cancellation message on data source ingest progress bar,
// if it is still running.
synchronized (this.dataSourceIngestProgressLock) {
if (dataSourceIngestProgress != null) {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.initialDisplayName",
dataSource.getName());
dataSourceIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(),
"IngestJob.progress.cancelling",
displayName));
}
}
// Put a cancellation message on the file ingest progress bar,
// if it is still running.
synchronized (this.fileIngestProgressLock) {
if (this.fileIngestProgress != null) {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.fileIngest.displayName",
this.dataSource.getName());
this.fileIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling",
displayName));
}
}
this.cancelled = true;
// Tell the ingest scheduler to cancel all pending tasks.
IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this);
}
/**
* Queries whether or not cancellation of ingest i.e., a shutdown of the
* data source and file ingest pipelines, has been requested
*
* @return True or false.
*/
boolean isCancelled() {
return this.cancelled;
}
/**
* Get some basic performance statistics on this job.
*
* @return An ingest job statistics object.
*/
IngestJobSnapshot getIngestJobSnapshot() {
return new IngestJobSnapshot();
}
/**
* Creates the file and data source ingest pipelines.
*
@ -143,10 +555,28 @@ final class IngestJob {
* @throws InterruptedException
*/
private void createIngestPipelines(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException {
this.dataSourceIngestPipeline = new DataSourceIngestPipeline(this, ingestModuleTemplates);
// RJCTODO: Use config file
// Sort the ingest module templates as required for the pipelines.
List<IngestModuleTemplate> firstStageDataSourceModuleTemplates = new ArrayList<>();
List<IngestModuleTemplate> secondStageDataSourceModuleTemplates = new ArrayList<>();
List<IngestModuleTemplate> fileIngestModuleTemplates = new ArrayList<>();
for (IngestModuleTemplate template : ingestModuleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
firstStageDataSourceModuleTemplates.add(template);
} else {
firstStageDataSourceModuleTemplates.add(template);
}
}
// Contruct the pipelines.
this.firstStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, firstStageDataSourceModuleTemplates);
this.secondStageDataSourceIngestPipeline = new DataSourceIngestPipeline(this, secondStageDataSourceModuleTemplates);
this.dataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
// Construct the file ingest pipelines.
int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
this.fileIngestPipelines.put(new FileIngestPipeline(this, ingestModuleTemplates));
this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates));
}
}
@ -160,9 +590,12 @@ final class IngestJob {
private List<IngestModuleError> startUpIngestPipelines() throws InterruptedException {
List<IngestModuleError> errors = new ArrayList<>();
// Start up the data source ingest pipeline.
// Start up the first stage data source ingest pipeline.
errors.addAll(this.dataSourceIngestPipeline.startUp());
// Start up the second stage data source ingest pipeline.
errors.addAll(this.secondStageDataSourceIngestPipeline.startUp());
// Start up the file ingest pipelines (one per file ingest thread).
for (FileIngestPipeline pipeline : this.fileIngestPipelines) {
errors.addAll(pipeline.startUp());
@ -249,201 +682,10 @@ final class IngestJob {
}
/**
* Checks to see if this job has a data source ingest pipeline.
*
* @return True or false.
* Shuts down the file ingest pipelines and current progress bars, if any,
* for this job.
*/
boolean hasDataSourceIngestPipeline() {
return (this.dataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if the job has a file ingest pipeline.
*
* @return True or false.
*/
boolean hasFileIngestPipeline() {
return (this.fileIngestPipelines.peek().isEmpty() == false);
}
/**
* Passes the data source for this job through the data source ingest
* pipeline.
*
* @param task A data source ingest task wrapping the data source.
* @throws InterruptedException
*/
void process(DataSourceIngestTask task) throws InterruptedException {
try {
if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) {
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(this.dataSourceIngestPipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
}
// The single data source ingest task for this job is done, so shut
// down the data source ingest progress bar right away.
synchronized (this.dataSourceIngestProgressLock) {
if (null != this.dataSourceIngestProgress) {
this.dataSourceIngestProgress.finish();
this.dataSourceIngestProgress = null;
}
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
}
}
/**
* Updates the display name of the data source ingest progress bar.
*
* @param displayName The new display name.
*/
void updateDataSourceIngestProgressBarDisplayName(String displayName) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.setDisplayName(displayName);
}
}
}
/**
* Updates the data source progress bar and switches it to determinate mode.
*
* @param workUnits Total number of work units for the processing of the
* data source.
*/
void switchDataSourceIngestProgressBarToDeterminate(int workUnits) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.switchToDeterminate(workUnits);
}
}
}
/**
* Switches the data source ingest progress bar to indeterminate mode. This
* should be called if the total work units to process the data source is
* unknown.
*/
void switchDataSourceIngestProgressBarToIndeterminate() {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.switchToIndeterminate();
}
}
}
/**
* Updates the data source ingest progress bar with the number of work units
* performed, if in the determinate mode.
*
* @param workUnits Number of work units performed.
*/
void advanceDataSourceIngestProgressBar(int workUnits) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.progress("", workUnits);
}
}
}
/**
* Updates the data source ingest progress bar display name.
*
* @param displayName The new display name.
*/
void advanceDataSourceIngestProgressBar(String displayName) {
if (!this.cancelled) {
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.progress(displayName);
}
}
}
/**
* Updates the progress bar with the number of work units performed, if in
* the determinate mode.
*
* @param message Message to display in sub-title
* @param workUnits Number of work units performed.
*/
void advanceDataSourceIngestProgressBar(String message, int workUnits) {
if (!this.cancelled) {
synchronized (this.fileIngestProgressLock) {
this.dataSourceIngestProgress.progress(message, workUnits);
}
}
}
/**
* Passes the a file from the data source for this job through the file
* ingest pipeline.
*
* @param task A file ingest task.
* @throws InterruptedException
*/
void process(FileIngestTask task) throws InterruptedException {
try {
if (!this.isCancelled()) {
// Get a file ingest pipeline not currently in use by another
// file ingest thread.
FileIngestPipeline pipeline = this.fileIngestPipelines.take();
if (!pipeline.isEmpty()) {
// Get the file to process.
AbstractFile file = task.getFile();
// Update the file ingest progress bar.
synchronized (this.fileIngestProgressLock) {
++this.processedFiles;
if (this.processedFiles <= this.estimatedFilesToProcess) {
this.fileIngestProgress.progress(file.getName(), (int) this.processedFiles);
} else {
this.fileIngestProgress.progress(file.getName(), (int) this.estimatedFilesToProcess);
}
this.filesInProgress.add(file.getName());
}
// Run the file through the pipeline.
List<IngestModuleError> errors = new ArrayList<>();
errors.addAll(pipeline.process(task));
if (!errors.isEmpty()) {
logIngestModuleErrors(errors);
}
// Update the file ingest progress bar again in case the
// file was being displayed.
if (!this.cancelled) {
synchronized (this.fileIngestProgressLock) {
this.filesInProgress.remove(file.getName());
if (this.filesInProgress.size() > 0) {
this.fileIngestProgress.progress(this.filesInProgress.get(0));
} else {
this.fileIngestProgress.progress("");
}
}
}
}
// Relinquish the pipeline so it can be reused by another file
// ingest thread.
this.fileIngestPipelines.put(pipeline);
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
}
}
/**
* Shuts down the ingest pipelines and progress bars for this job.
*/
void finish() {
private void finishFirstStage() {
// Shut down the file ingest pipelines. Note that no shut down is
// required for the data source ingest pipeline because data source
// ingest modules do not have a shutdown() method.
@ -456,8 +698,8 @@ final class IngestJob {
logIngestModuleErrors(errors);
}
// Finish the data source ingest progress bar, if it hasn't already
// been finished.
// Finish the first stage data source ingest progress bar, if it hasn't
// already been finished.
synchronized (this.dataSourceIngestProgressLock) {
if (this.dataSourceIngestProgress != null) {
this.dataSourceIngestProgress.finish();
@ -475,6 +717,48 @@ final class IngestJob {
}
}
/**
* RJCTODO
*/
private void startSecondStage() {
this.stage = IngestJob.Stages.SECOND;
if (!this.cancelled && !this.secondStageDataSourceIngestPipeline.isEmpty()) {
this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
this.startDataSourceIngestProgressBar();
try {
IngestJob.ingestScheduler.scheduleDataSourceIngestTask(this);
} catch (InterruptedException ex) {
// RJCTODO:
this.finish();
}
} else {
this.finish();
}
}
/**
* Shuts down the ingest pipelines and progress bars for this job.
*/
private void finish() {
// Finish the second stage data source ingest progress bar, if it hasn't
// already been finished.
synchronized (this.dataSourceIngestProgressLock) {
if (this.dataSourceIngestProgress != null) {
this.dataSourceIngestProgress.finish();
this.dataSourceIngestProgress = null;
}
}
IngestJob.ingestJobsById.remove(this.id);
if (!this.isCancelled()) {
logger.log(Level.INFO, "Ingest job {0} completed", this.id);
IngestManager.getInstance().fireIngestJobCompleted(this.id);
} else {
logger.log(Level.INFO, "Ingest job {0} cancelled", this.id);
IngestManager.getInstance().fireIngestJobCancelled(this.id);
}
}
/**
* Write ingest module errors to the log.
*
@ -494,116 +778,51 @@ final class IngestJob {
this.currentDataSourceIngestModuleCancelled = true;
}
/**
* Determines whether or not a temporary cancellation of data source ingest
* in order to stop the currently executing data source ingest module is in
* effect.
*
* @return True or false.
*/
boolean currentDataSourceIngestModuleIsCancelled() {
return this.currentDataSourceIngestModuleCancelled;
}
/**
* Rescind a temporary cancellation of data source ingest in order to stop
* the currently executing data source ingest module.
*/
void currentDataSourceIngestModuleCancellationCompleted() {
this.currentDataSourceIngestModuleCancelled = false;
// A new progress bar must be created because the cancel button of the
// previously constructed component is disabled by NetBeans when the
// user selects the "OK" button of the cancellation confirmation dialog
// popped up by NetBeans when the progress bar cancel button was
// pressed.
synchronized (this.dataSourceIngestProgressLock) {
this.dataSourceIngestProgress.finish();
this.dataSourceIngestProgress = null;
this.startDataSourceIngestProgressBar();
}
}
/**
* Requests cancellation of ingest, i.e., a shutdown of the data source and
* file ingest pipelines.
*/
void cancel() {
// Put a cancellation message on data source ingest progress bar,
// if it is still running.
synchronized (this.dataSourceIngestProgressLock) {
if (dataSourceIngestProgress != null) {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.initialDisplayName",
dataSource.getName());
dataSourceIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(),
"IngestJob.progress.cancelling",
displayName));
}
}
// Put a cancellation message on the file ingest progress bar,
// if it is still running.
synchronized (this.fileIngestProgressLock) {
if (this.fileIngestProgress != null) {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.fileIngest.displayName",
this.dataSource.getName());
this.fileIngestProgress.setDisplayName(
NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling",
displayName));
}
}
this.cancelled = true;
// Tell the ingest scheduler to cancel all pending tasks.
IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this);
}
/**
* Queries whether or not cancellation of ingest i.e., a shutdown of the
* data source and file ingest pipelines, has been requested
*
* @return True or false.
*/
boolean isCancelled() {
return this.cancelled;
}
/**
* Get some basic performance statistics on this job.
*
* @return An ingest job statistics object.
*/
IngestJobStats getStats() {
return new IngestJobStats();
}
/**
* Stores basic diagnostic statistics for an ingest job.
*/
class IngestJobStats {
class IngestJobSnapshot {
private final long jobId;
private final String dataSource;
private final long startTime;
private final long processedFiles;
private final long estimatedFilesToProcess;
private final long snapShotTime;
private final IngestTasksScheduler.IngestJobTasksSnapshot tasksSnapshot;
/**
* Constructs an object to stores basic diagnostic statistics for an
* ingest job.
*/
IngestJobStats() {
IngestJobSnapshot() {
this.jobId = IngestJob.this.id;
this.dataSource = IngestJob.this.dataSource.getName();
this.startTime = IngestJob.this.startTime;
synchronized (IngestJob.this.fileIngestProgressLock) {
this.processedFiles = IngestJob.this.processedFiles;
this.estimatedFilesToProcess = IngestJob.this.estimatedFilesToProcess;
this.snapShotTime = new Date().getTime();
}
this.tasksSnapshot = IngestJob.ingestScheduler.getTasksSnapshotForJob(this.jobId);
}
/**
* RJCTODO
* @return
*/
long getJobId() {
return this.jobId;
}
/**
* RJCTODO
* @return
*/
String getDataSource() {
return dataSource;
}
/**
* Gets files per second throughput since job started.
*
@ -651,6 +870,47 @@ final class IngestJob {
long getFilesEstimated() {
return estimatedFilesToProcess;
}
/**
* RJCTODO
* @return
*/
long getRootQueueSize() {
return this.tasksSnapshot.getRootQueueSize();
}
/**
* RJCTODO
* @return
*/
long getDirQueueSize() {
return this.tasksSnapshot.getDirQueueSize();
}
/**
* RJCTODO
* @return
*/
long getFileQueueSize() {
return this.tasksSnapshot.getFileQueueSize();
}
/**
* RJCTODO
* @return
*/
long getDsQueueSize() {
return this.tasksSnapshot.getDsQueueSize();
}
/**
* RJCTODO
* @return
*/
long getRunningListSize() {
return this.tasksSnapshot.getRunningListSize();
}
}
}

View File

@ -19,7 +19,6 @@
package org.sleuthkit.autopsy.ingest;
import java.util.List;
import java.util.logging.Level;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
@ -31,7 +30,6 @@ import org.sleuthkit.datamodel.Content;
public final class IngestJobContext {
private static final Logger logger = Logger.getLogger(IngestJobContext.class.getName());
private static final IngestScheduler scheduler = IngestScheduler.getInstance();
private final IngestJob ingestJob;
IngestJobContext(IngestJob ingestJob) {
@ -107,20 +105,21 @@ public final class IngestJobContext {
* pipeline of the ingest job associated with this context.
*
* @param files The files to be processed by the file ingest pipeline.
* @deprecated use addFilesToJob() instead
*/
@Deprecated
public void scheduleFiles(List<AbstractFile> files) {
for (AbstractFile file : files) {
try {
IngestJobContext.scheduler.scheduleAdditionalFileIngestTask(this.ingestJob, file);
} catch (InterruptedException ex) {
// Handle the unexpected interrupt here rather than make ingest
// module writers responsible for writing this exception handler.
// The interrupt flag of the thread is reset for detection by
// the thread task code.
Thread.currentThread().interrupt();
IngestJobContext.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS
}
}
this.addFilesToJob(files);
}
/**
* Adds one or more files to the files to be passed through the file ingest
* pipeline of the ingest job associated with this context.
*
* @param files The files to be processed by the file ingest pipeline.
*/
public void addFilesToJob(List<AbstractFile> files) {
this.ingestJob.addFiles(files);
}
}

View File

@ -134,7 +134,7 @@ public class IngestManager {
*/
private void startDataSourceIngestTask() {
long threadId = nextThreadId.incrementAndGet();
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getDataSourceIngestTaskQueue()));
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
@ -144,7 +144,7 @@ public class IngestManager {
*/
private void startFileIngestTask() {
long threadId = nextThreadId.incrementAndGet();
fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestScheduler.getInstance().getFileIngestTaskQueue()));
fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
@ -174,12 +174,12 @@ public class IngestManager {
}
void handleCaseOpened() {
IngestScheduler.getInstance().setEnabled(true);
IngestJob.jobCreationEnabled(true);
clearIngestMessageBox();
}
void handleCaseClosed() {
IngestScheduler.getInstance().setEnabled(false);
IngestJob.jobCreationEnabled(false);
cancelAllIngestJobs();
clearIngestMessageBox();
}
@ -197,7 +197,7 @@ public class IngestManager {
* @return True if any ingest jobs are in progress, false otherwise.
*/
public boolean isIngestRunning() {
return IngestScheduler.getInstance().ingestJobsAreRunning();
return IngestJob.ingestJobsAreRunning();
}
@ -293,7 +293,7 @@ public class IngestManager {
}
// Cancel all the jobs already created.
IngestScheduler.getInstance().cancelAllIngestJobs();
IngestJob.cancelAllJobs();
}
/**
@ -555,7 +555,7 @@ public class IngestManager {
}
// Start an ingest job for the data source.
List<IngestModuleError> errors = IngestScheduler.getInstance().startIngestJob(dataSource, moduleTemplates, processUnallocatedSpace);
List<IngestModuleError> errors = IngestJob.startJob(dataSource, moduleTemplates, processUnallocatedSpace);
if (!errors.isEmpty()) {
// Report the errors to the user. They have already been logged.
StringBuilder moduleStartUpErrors = new StringBuilder();

View File

@ -29,7 +29,6 @@ import javax.swing.table.AbstractTableModel;
import javax.swing.table.TableColumn;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.ingest.IngestScheduler.IngestJobSchedulerStats;
public class IngestProgressSnapshotPanel extends javax.swing.JPanel {
@ -161,20 +160,20 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel {
private final String[] columnNames = {"Job ID",
"Data Source", "Start", "Num Processed", "Files/Sec", "In Progress", "Files Queued", "Dir Queued", "Root Queued", "DS Queued"};
private List<IngestJobSchedulerStats> schedStats;
private List<IngestJob.IngestJobSnapshot> jobSnapshots;
private IngestJobTableModel() {
refresh();
}
private void refresh() {
schedStats = IngestScheduler.getInstance().getJobStats();
jobSnapshots = IngestJob.getJobSnapshots();
fireTableDataChanged();
}
@Override
public int getRowCount() {
return schedStats.size();
return jobSnapshots.size();
}
@Override
@ -189,39 +188,39 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel {
@Override
public Object getValueAt(int rowIndex, int columnIndex) {
IngestJobSchedulerStats schedStat = schedStats.get(rowIndex);
IngestJob.IngestJobSnapshot snapShot = jobSnapshots.get(rowIndex);
Object cellValue;
switch (columnIndex) {
case 0:
cellValue = schedStat.getJobId();
cellValue = snapShot.getJobId();
break;
case 1:
cellValue = schedStat.getDataSource();
cellValue = snapShot.getDataSource();
break;
case 2:
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
cellValue = dateFormat.format(new Date(schedStat.getIngestJobStats().getStartTime()));
cellValue = dateFormat.format(new Date(snapShot.getStartTime()));
break;
case 3:
cellValue = schedStat.getIngestJobStats().getFilesProcessed();
cellValue = snapShot.getFilesProcessed();
break;
case 4:
cellValue = schedStat.getIngestJobStats().getSpeed();
cellValue = snapShot.getSpeed();
break;
case 5:
cellValue = schedStat.getRunningListSize();
cellValue = snapShot.getRunningListSize();
break;
case 6:
cellValue = schedStat.getFileQueueSize();
cellValue = snapShot.getFileQueueSize();
break;
case 7:
cellValue = schedStat.getDirQueueSize();
cellValue = snapShot.getDirQueueSize();
break;
case 8:
cellValue = schedStat.getRootQueueSize();
cellValue = snapShot.getRootQueueSize();
break;
case 9:
cellValue = schedStat.getDsQueueSize();
cellValue = snapShot.getDsQueueSize();
break;
default:
cellValue = null;

View File

@ -1,678 +0,0 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2012-2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.ingest.IngestJob.IngestJobStats;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.File;
import org.sleuthkit.datamodel.FileSystem;
import org.sleuthkit.datamodel.TskCoreException;
import org.sleuthkit.datamodel.TskData;
/**
* Creates ingest jobs and their constituent ingest tasks, queuing the tasks in
* priority order for execution by the ingest manager's ingest threads.
*/
final class IngestScheduler {
private static final Logger logger = Logger.getLogger(IngestScheduler.class.getName());
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
private static IngestScheduler instance = null;
private final AtomicLong nextIngestJobId = new AtomicLong(0L);
private final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
private volatile boolean enabled = false;
private final DataSourceIngestTaskQueue dataSourceTaskDispenser = new DataSourceIngestTaskQueue();
private final FileIngestTaskQueue fileTaskDispenser = new FileIngestTaskQueue();
// The following five collections lie at the heart of the scheduler.
// The pending tasks queues are used to schedule tasks for an ingest job. If
// multiple jobs are scheduled, tasks from different jobs may become
// interleaved in these queues.
// FIFO queue for data source-level tasks.
private final LinkedBlockingQueue<DataSourceIngestTask> pendingDataSourceTasks = new LinkedBlockingQueue<>(); // Guarded by this
// File tasks are "shuffled"
// through root directory (priority queue), directory (LIFO), and file tasks
// queues (LIFO). If a file task makes it into the pending file tasks queue,
// it is consumed by the ingest threads.
private final TreeSet<FileIngestTask> pendingRootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator()); // Guarded by this
private final List<FileIngestTask> pendingDirectoryTasks = new ArrayList<>(); // Guarded by this
private final BlockingDeque<FileIngestTask> pendingFileTasks = new LinkedBlockingDeque<>(); // Not guarded
// The "tasks in progress" list has:
// - File and data source tasks that are running
// - File tasks that are in the pending file queue
// It is used to determine when a job is done. It has both pending and running
// tasks because we do not lock the 'pendingFileTasks' and a task needs to be in
// at least one of the pending or inprogress lists at all times before it is completed.
// files are added to this when the are added to pendingFilesTasks and removed when they complete
private final List<IngestTask> tasksInProgressAndPending = new ArrayList<>(); // Guarded by this
synchronized static IngestScheduler getInstance() {
if (instance == null) {
instance = new IngestScheduler();
}
return instance;
}
private IngestScheduler() {
}
void setEnabled(boolean enabled) {
this.enabled = enabled;
}
/**
* Creates an ingest job for a data source.
*
* @param dataSource The data source to ingest.
* @param ingestModuleTemplates The ingest module templates to use to create
* the ingest pipelines for the job.
* @param processUnallocatedSpace Whether or not the job should include
* processing of unallocated space.
*
* @return A collection of ingest module start up errors, empty on success.
*
* @throws InterruptedException
*/
List<IngestModuleError> startIngestJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException {
List<IngestModuleError> errors = new ArrayList<>();
if (enabled) {
long jobId = nextIngestJobId.incrementAndGet();
IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace);
errors = job.start(ingestModuleTemplates);
if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) {
ingestJobsById.put(jobId, job);
IngestManager.getInstance().fireIngestJobStarted(jobId);
scheduleIngestTasks(job);
logger.log(Level.INFO, "Ingest job {0} started", jobId);
}
}
return errors;
}
synchronized private void scheduleIngestTasks(IngestJob job) throws InterruptedException {
// This is synchronized to guard the task queues and make ingest
// scheduling for a job an an atomic operation. Otherwise, the data
// source task might be completed before the file tasks were scheduled,
// resulting in a false positive for a job completion check.
if (job.hasDataSourceIngestPipeline()) {
scheduleDataSourceIngestTask(job);
}
if (job.hasFileIngestPipeline()) {
scheduleFileIngestTasks(job);
}
}
synchronized private void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException {
DataSourceIngestTask task = new DataSourceIngestTask(job);
tasksInProgressAndPending.add(task);
try {
// Should not block, queue is (theoretically) unbounded.
pendingDataSourceTasks.put(task);
} catch (InterruptedException ex) {
tasksInProgressAndPending.remove(task);
Logger.getLogger(IngestScheduler.class.getName()).log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS
throw ex;
}
}
synchronized private void scheduleFileIngestTasks(IngestJob job) throws InterruptedException {
List<AbstractFile> topLevelFiles = getTopLevelFiles(job.getDataSource());
for (AbstractFile firstLevelFile : topLevelFiles) {
FileIngestTask task = new FileIngestTask(job, firstLevelFile);
if (shouldEnqueueFileTask(task)) {
pendingRootDirectoryTasks.add(task);
}
}
updatePendingFileTasksQueues();
}
private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
List<AbstractFile> topLevelFiles = new ArrayList<>();
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
// The data source is itself a file to be processed.
topLevelFiles.add((AbstractFile) dataSource);
} else {
for (AbstractFile root : rootObjects) {
List<Content> children;
try {
children = root.getChildren();
if (children.isEmpty()) {
// Add the root object itself, it could be an unallocated
// space file, or a child of a volume or an image.
topLevelFiles.add(root);
} else {
// The root object is a file system root directory, get
// the files within it.
for (Content child : children) {
if (child instanceof AbstractFile) {
topLevelFiles.add((AbstractFile) child);
}
}
}
} catch (TskCoreException ex) {
logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS
}
}
}
return topLevelFiles;
}
synchronized private void updatePendingFileTasksQueues() throws InterruptedException {
// This is synchronized to guard the pending file tasks queues and make
// this an atomic operation.
if (enabled) {
while (true) {
// Loop until either the pending file tasks queue is NOT empty
// or the upstream queues that feed into it ARE empty.
if (pendingFileTasks.isEmpty() == false) {
return;
}
if (pendingDirectoryTasks.isEmpty()) {
if (pendingRootDirectoryTasks.isEmpty()) {
return;
}
pendingDirectoryTasks.add(pendingRootDirectoryTasks.pollFirst());
}
// Try to add the most recently added from the pending directory tasks queue to
// the pending file tasks queue.
boolean tasksEnqueuedForDirectory = false;
FileIngestTask directoryTask = pendingDirectoryTasks.remove(pendingDirectoryTasks.size() - 1);
if (shouldEnqueueFileTask(directoryTask)) {
addToPendingFileTasksQueue(directoryTask);
tasksEnqueuedForDirectory = true;
}
// If the directory contains subdirectories or files, try to
// enqueue tasks for them as well.
final AbstractFile directory = directoryTask.getFile();
try {
for (Content child : directory.getChildren()) {
if (child instanceof AbstractFile) {
AbstractFile file = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file);
if (file.hasChildren()) {
// Found a subdirectory, put the task in the
// pending directory tasks queue.
pendingDirectoryTasks.add(childTask);
tasksEnqueuedForDirectory = true;
} else if (shouldEnqueueFileTask(childTask)) {
// Found a file, put the task directly into the
// pending file tasks queue.
addToPendingFileTasksQueue(childTask);
tasksEnqueuedForDirectory = true;
}
}
}
} catch (TskCoreException ex) {
String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS
logger.log(Level.SEVERE, errorMessage, ex);
}
// In the case where the directory task is not pushed into the
// the pending file tasks queue and has no children, check to
// see if the job is completed - the directory task might have
// been the last task for the job.
if (!tasksEnqueuedForDirectory) {
IngestJob job = directoryTask.getIngestJob();
if (ingestJobIsComplete(job)) {
finishIngestJob(job);
}
}
}
}
}
private static boolean shouldEnqueueFileTask(final FileIngestTask processTask) {
final AbstractFile aFile = processTask.getFile();
//if it's unalloc file, skip if so scheduled
if (processTask.getIngestJob().shouldProcessUnallocatedSpace() == false && aFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) {
return false;
}
String fileName = aFile.getName();
if (fileName.equals(".") || fileName.equals("..")) {
return false;
} else if (aFile instanceof org.sleuthkit.datamodel.File) {
final org.sleuthkit.datamodel.File f = (File) aFile;
//skip files in root dir, starting with $, containing : (not default attributes)
//with meta address < 32, i.e. some special large NTFS and FAT files
FileSystem fs = null;
try {
fs = f.getFileSystem();
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Could not get FileSystem for " + f, ex); //NON-NLS
}
TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
if (fs != null) {
fsType = fs.getFsType();
}
if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
//not fat or ntfs, accept all files
return true;
}
boolean isInRootDir = false;
try {
isInRootDir = f.getParentDirectory().isRoot();
} catch (TskCoreException ex) {
logger.log(Level.WARNING, "Could not check if should enqueue the file: " + f.getName(), ex); //NON-NLS
}
if (isInRootDir && f.getMetaAddr() < 32) {
String name = f.getName();
if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
return false;
}
} else {
return true;
}
}
return true;
}
synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException {
tasksInProgressAndPending.add(task);
try {
// Should not block, queue is (theoretically) unbounded.
/* add to top of list because we had one image that had a folder
* with
* lots of zip files. This queue had thousands of entries because
* it just kept on getting bigger and bigger. So focus on pushing
* out
* the ZIP file contents out of the queue to try to keep it small.
*/
pendingFileTasks.addFirst(task);
} catch (IllegalStateException ex) {
tasksInProgressAndPending.remove(task);
Logger.getLogger(IngestScheduler.class.getName()).log(Level.SEVERE, "Interruption of unexpected block on pending file tasks queue", ex); //NON-NLS
throw ex;
}
}
void scheduleAdditionalFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException {
if (enabled) {
FileIngestTask task = new FileIngestTask(job, file);
if (shouldEnqueueFileTask(task)) {
// Send the file task directly to file tasks queue, no need to
// update the pending root directory or pending directory tasks
// queues.
addToPendingFileTasksQueue(task);
}
}
}
IngestTaskQueue getDataSourceIngestTaskQueue() {
return dataSourceTaskDispenser;
}
IngestTaskQueue getFileIngestTaskQueue() {
return fileTaskDispenser;
}
void notifyTaskCompleted(IngestTask task) {
boolean jobIsCompleted;
IngestJob job = task.getIngestJob();
synchronized (this) {
tasksInProgressAndPending.remove(task);
jobIsCompleted = ingestJobIsComplete(job);
}
if (jobIsCompleted) {
// The lock does not need to be held for the job shut down.
finishIngestJob(job);
}
}
/**
* Queries whether or not ingest jobs are running.
*
* @return True or false.
*/
boolean ingestJobsAreRunning() {
return !ingestJobsById.isEmpty();
}
/**
* Clears the pending ingest task queues for an ingest job. If job is
* complete (no pending or in progress tasks) the job is finished up.
* Otherwise, the last worker thread with an in progress task will finish /
* clean up the job.
*
* @param job The job to cancel.
*/
synchronized void cancelPendingTasksForIngestJob(IngestJob job) {
long jobId = job.getId();
removeAllPendingTasksForJob(pendingRootDirectoryTasks, jobId);
removeAllPendingTasksForJob(pendingDirectoryTasks, jobId);
removeAllPendingTasksForJob(pendingFileTasks, jobId);
removeAllPendingTasksForJob(pendingDataSourceTasks, jobId);
if (ingestJobIsComplete(job)) {
finishIngestJob(job);
}
}
/**
* Return the number of tasks in the queue for the given job ID
*
* @param <T>
* @param queue
* @param jobId
*
* @return
*/
<T> int countJobsInCollection(Collection<T> queue, long jobId) {
Iterator<T> iterator = queue.iterator();
int count = 0;
while (iterator.hasNext()) {
IngestTask task = (IngestTask) iterator.next();
if (task.getIngestJob().getId() == jobId) {
count++;
}
}
return count;
}
synchronized private void removeAllPendingTasksForJob(Collection<? extends IngestTask> taskQueue, long jobId) {
Iterator<? extends IngestTask> iterator = taskQueue.iterator();
while (iterator.hasNext()) {
IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) {
tasksInProgressAndPending.remove(task);
iterator.remove();
}
}
}
void cancelAllIngestJobs() {
synchronized (this) {
removeAllPendingTasks(pendingRootDirectoryTasks);
removeAllPendingTasks(pendingDirectoryTasks);
removeAllPendingTasks(pendingFileTasks);
removeAllPendingTasks(pendingDataSourceTasks);
for (IngestJob job : ingestJobsById.values()) {
job.cancel();
if (ingestJobIsComplete(job)) {
finishIngestJob(job);
}
}
}
}
synchronized private <T> void removeAllPendingTasks(Collection<T> taskQueue) {
Iterator<T> iterator = taskQueue.iterator();
while (iterator.hasNext()) {
tasksInProgressAndPending.remove((IngestTask) iterator.next());
iterator.remove();
}
}
synchronized private boolean ingestJobIsComplete(IngestJob job) {
for (IngestTask task : tasksInProgressAndPending) {
if (task.getIngestJob().getId() == job.getId()) {
return false;
}
}
return true;
}
/**
* Called after all work is completed to free resources.
*
* @param job
*/
private void finishIngestJob(IngestJob job) {
job.finish();
long jobId = job.getId();
ingestJobsById.remove(jobId);
if (!job.isCancelled()) {
logger.log(Level.INFO, "Ingest job {0} completed", jobId);
IngestManager.getInstance().fireIngestJobCompleted(job.getId());
} else {
logger.log(Level.INFO, "Ingest job {0} cancelled", jobId);
IngestManager.getInstance().fireIngestJobCancelled(job.getId());
}
}
private static class RootDirectoryTaskComparator implements Comparator<FileIngestTask> {
@Override
public int compare(FileIngestTask q1, FileIngestTask q2) {
AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile());
AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile());
if (p1 == p2) {
return (int) (q2.getFile().getId() - q1.getFile().getId());
} else {
return p2.ordinal() - p1.ordinal();
}
}
private static class AbstractFilePriority {
enum Priority {
LAST, LOW, MEDIUM, HIGH
}
static final List<Pattern> LAST_PRI_PATHS = new ArrayList<>();
static final List<Pattern> LOW_PRI_PATHS = new ArrayList<>();
static final List<Pattern> MEDIUM_PRI_PATHS = new ArrayList<>();
static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
/* prioritize root directory folders based on the assumption that we
* are
* looking for user content. Other types of investigations may want
* different
* priorities. */
static /* prioritize root directory
* folders based on the assumption that we are
* looking for user content. Other types of investigations may want
* different
* priorities. */ {
// these files have no structure, so they go last
//unalloc files are handled as virtual files in getPriority()
//LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
//LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE));
LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE));
LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE));
// orphan files are often corrupt and windows does not typically have
// user content, so put them towards the bottom
LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE));
// all other files go into the medium category too
MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE));
// user content is top priority
HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE));
}
/**
* Get the enabled priority for a given file.
*
* @param abstractFile
*
* @return
*/
static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) {
if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
//quickly filter out unstructured content
//non-fs virtual files and dirs, such as representing unalloc space
return AbstractFilePriority.Priority.LAST;
}
//determine the fs files priority by name
final String path = abstractFile.getName();
if (path == null) {
return AbstractFilePriority.Priority.MEDIUM;
}
for (Pattern p : HIGH_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.HIGH;
}
}
for (Pattern p : MEDIUM_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.MEDIUM;
}
}
for (Pattern p : LOW_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.LOW;
}
}
for (Pattern p : LAST_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.LAST;
}
}
//default is medium
return AbstractFilePriority.Priority.MEDIUM;
}
}
}
private final class DataSourceIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
return pendingDataSourceTasks.take();
}
}
private final class FileIngestTaskQueue implements IngestTaskQueue {
@Override
public IngestTask getNextTask() throws InterruptedException {
FileIngestTask task = pendingFileTasks.takeFirst();
updatePendingFileTasksQueues();
return task;
}
}
/**
* Stores basic stats for a given job
*/
class IngestJobSchedulerStats {
private final IngestJobStats ingestJobStats;
private final long jobId;
private final String dataSource;
private final long rootQueueSize;
private final long dirQueueSize;
private final long fileQueueSize;
private final long dsQueueSize;
private final long runningListSize;
IngestJobSchedulerStats(IngestJob job) {
ingestJobStats = job.getStats();
jobId = job.getId();
dataSource = job.getDataSource().getName();
rootQueueSize = countJobsInCollection(pendingRootDirectoryTasks, jobId);
dirQueueSize = countJobsInCollection(pendingDirectoryTasks, jobId);
fileQueueSize = countJobsInCollection(pendingFileTasks, jobId);
dsQueueSize = countJobsInCollection(pendingDataSourceTasks, jobId);
runningListSize = countJobsInCollection(tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize;
}
protected long getJobId() {
return jobId;
}
protected String getDataSource() {
return dataSource;
}
protected long getRootQueueSize() {
return rootQueueSize;
}
protected long getDirQueueSize() {
return dirQueueSize;
}
protected long getFileQueueSize() {
return fileQueueSize;
}
protected long getDsQueueSize() {
return dsQueueSize;
}
protected long getRunningListSize() {
return runningListSize;
}
protected IngestJobStats getIngestJobStats() {
return ingestJobStats;
}
}
/**
* Get basic performance / stats on all running jobs
*
* @return
*/
synchronized List<IngestJobSchedulerStats> getJobStats() {
List<IngestJobSchedulerStats> stats = new ArrayList<>();
for (IngestJob job : Collections.list(ingestJobsById.elements())) {
stats.add(new IngestJobSchedulerStats(job));
}
return stats;
}
}

View File

@ -0,0 +1,757 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2012-2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.FileSystem;
import org.sleuthkit.datamodel.TskCoreException;
import org.sleuthkit.datamodel.TskData;
/**
* Creates ingest tasks for ingest jobs, queuing the tasks in priority order for
* execution by the ingest manager's ingest threads.
*/
final class IngestTasksScheduler {
private static final Logger logger = Logger.getLogger(IngestTasksScheduler.class.getName());
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
private static IngestTasksScheduler instance;
// Scheduling of data source ingest tasks is accomplished by putting them
// in a FIFO queue to be consumed by the ingest threads. The pending data
// tasks queue is therefore wrapped in a "dispenser" that implements the
// IngestTaskQueue interface and is exposed via a getter method.
private final LinkedBlockingQueue<DataSourceIngestTask> pendingDataSourceTasks;
private final DataSourceIngestTaskQueue dataSourceTasksDispenser;
// Scheduling of file ingest tasks is accomplished by "shuffling" them
// through a sequence of internal queues that allows for the interleaving of
// tasks from different ingest jobs based on priority. These scheduling
// queues are:
// 1. root directory tasks (priority queue)
// 2. directory tasks (FIFO queue)
// 3. pending file tasks (LIFO queue)
// Tasks in the pending file tasks queue are ready to be consumed by the
// ingest threads. The pending file tasks queue is therefore wrapped in a
// "dispenser" that implements the IngestTaskQueue interface and is exposed
// via a getter method.
private final TreeSet<FileIngestTask> rootDirectoryTasks;
private final List<FileIngestTask> directoryTasks;
private final BlockingDeque<FileIngestTask> pendingFileTasks;
private final FileIngestTaskQueue fileTasksDispenser;
// The ingest scheduler is responsible for notifying an ingest jobs whenever
// all of the ingest tasks currently associated with the job are complete.
// To make this possible, the ingest tasks scheduler needs to keep track not
// only of the tasks in its queues, but also of the tasks that have been
// handed out for processing by code running on the ingest manager's ingest
// threads. Therefore all ingest tasks are added to this list and are not
// removed when an ingest thread takes an ingest task. Instead, the ingest
// thread calls back into the scheduler when the task is completed, at
// which time the task will be removed from this list.
private final List<IngestTask> tasksInProgressAndPending;
/**
* Gets the ingest tasks scheduler singleton.
*/
synchronized static IngestTasksScheduler getInstance() {
if (IngestTasksScheduler.instance == null) {
IngestTasksScheduler.instance = new IngestTasksScheduler();
}
return IngestTasksScheduler.instance;
}
/**
* Constructs an ingest tasks scheduler.
*/
private IngestTasksScheduler() {
this.pendingDataSourceTasks = new LinkedBlockingQueue<>();
this.dataSourceTasksDispenser = new DataSourceIngestTaskQueue();
this.rootDirectoryTasks = new TreeSet<>(new RootDirectoryTaskComparator());
this.directoryTasks = new ArrayList<>();
this.pendingFileTasks = new LinkedBlockingDeque<>();
this.fileTasksDispenser = new FileIngestTaskQueue();
this.tasksInProgressAndPending = new ArrayList<>();
}
/**
* Gets this ingest task scheduler's implementation of the IngestTaskQueue
* interface for data source ingest tasks.
*
* @return The data source ingest tasks queue.
*/
IngestTaskQueue getDataSourceIngestTaskQueue() {
return this.dataSourceTasksDispenser;
}
/**
* Gets this ingest task scheduler's implementation of the IngestTaskQueue
* interface for file ingest tasks.
*
* @return The file ingest tasks queue.
*/
IngestTaskQueue getFileIngestTaskQueue() {
return this.fileTasksDispenser;
}
/**
* Schedules a data source ingest task and file ingest tasks for an ingest
* job.
*
* @param job The job for which the tasks are to be scheduled.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleIngestTasks(IngestJob job) throws InterruptedException {
// The initial ingest scheduling for a job an an atomic operation.
// Otherwise, the data source task might be completed before the file
// tasks are created, resulting in a potential false positive when this
// task scheduler checks whether or not all the tasks for the job are
// completed.
if (job.hasDataSourceIngestPipeline()) {
scheduleDataSourceIngestTask(job);
}
if (job.hasFileIngestPipeline()) {
scheduleFileIngestTasks(job);
}
}
/**
* Schedules a data source ingest task for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException {
// Create a data source ingest task for the data source associated with
// the ingest job and add the task to the pending data source tasks
// queue. Data source tasks are scheduled on a first come, first served
// basis.
DataSourceIngestTask task = new DataSourceIngestTask(job);
this.tasksInProgressAndPending.add(task);
try {
// This call should not block because the queue is (theoretically)
// unbounded.
this.pendingDataSourceTasks.put(task);
} catch (InterruptedException ex) {
this.tasksInProgressAndPending.remove(task);
IngestTasksScheduler.logger.log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS
throw ex;
}
}
/**
* Schedules file ingest tasks for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleFileIngestTasks(IngestJob job) throws InterruptedException {
// Get the top level files for the data source associated with this job
// and add them to the root directories priority queue. The file tasks
// may be interleaved with file tasks from other jobs, based on priority.
List<AbstractFile> topLevelFiles = getTopLevelFiles(job.getDataSource());
for (AbstractFile firstLevelFile : topLevelFiles) {
FileIngestTask task = new FileIngestTask(job, firstLevelFile);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
this.tasksInProgressAndPending.add(task);
this.rootDirectoryTasks.add(task);
}
}
shuffleFileTaskQueues();
}
/**
* Schedules a file ingest task for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @param file The file associated with the task.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
void scheduleFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException, IllegalStateException {
FileIngestTask task = new FileIngestTask(job, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
// This synchronized method sends the file task directly to the
// pending file tasks queue. This is done to prioritize derived
// and carved files generated by a file ingest task in progress.
addToPendingFileTasksQueue(task);
}
}
/**
* Allows an ingest thread to notify this ingest task scheduler that a task
* has been completed.
*
* @param task The completed task.
*/
synchronized void notifyTaskCompleted(IngestTask task) throws InterruptedException {
tasksInProgressAndPending.remove(task);
IngestJob job = task.getIngestJob();
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
}
}
/**
* Clears the task scheduling queues for an ingest job, but does nothing
* about tasks that have already been taken by ingest threads. Those tasks
* will be flushed out when the ingest threads call back with their task
* completed notifications.
*
* @param job The job for which the tasks are to to canceled.
*/
synchronized void cancelPendingTasksForIngestJob(IngestJob job) {
// The scheduling queues are cleared of tasks for the job, and the tasks
// that are removed from the scheduling queues are also removed from the
// tasks in progress list. However, a tasks in progress check for the
// job may still return true since the tasks that have been taken by the
// ingest threads are still in the tasks in progress list.
long jobId = job.getId();
this.removeTasksForJob(this.rootDirectoryTasks, jobId);
this.removeTasksForJob(this.directoryTasks, jobId);
this.removeTasksForJob(this.pendingFileTasks, jobId);
this.removeTasksForJob(this.pendingDataSourceTasks, jobId);
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
}
}
/**
* A helper that gets the top level files such as file system root
* directories, layout files and virtual directories for a data source. Used
* to create file tasks to put into the root directories queue.
*
* @param dataSource The data source.
* @return A list of top level files.
*/
private static List<AbstractFile> getTopLevelFiles(Content dataSource) {
List<AbstractFile> topLevelFiles = new ArrayList<>();
Collection<AbstractFile> rootObjects = dataSource.accept(new GetRootDirectoryVisitor());
if (rootObjects.isEmpty() && dataSource instanceof AbstractFile) {
// The data source is itself a file to be processed.
topLevelFiles.add((AbstractFile) dataSource);
} else {
for (AbstractFile root : rootObjects) {
List<Content> children;
try {
children = root.getChildren();
if (children.isEmpty()) {
// Add the root object itself, it could be an unallocated
// space file, or a child of a volume or an image.
topLevelFiles.add(root);
} else {
// The root object is a file system root directory, get
// the files within it.
for (Content child : children) {
if (child instanceof AbstractFile) {
topLevelFiles.add((AbstractFile) child);
}
}
}
} catch (TskCoreException ex) {
logger.log(Level.WARNING, "Could not get children of root to enqueue: " + root.getId() + ": " + root.getName(), ex); //NON-NLS
}
}
}
return topLevelFiles;
}
/**
* A helper that "shuffles" the file task queues to ensure that there is at
* least one task in the pending file ingest tasks queue, as long as there
* are still file ingest tasks to be performed.
*
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized private void shuffleFileTaskQueues() throws InterruptedException, IllegalStateException {
// This is synchronized because it is called both by synchronized
// methods of this ingest scheduler and an unsynchronized method of its
// file tasks "dispenser".
while (true) {
// Loop until either the pending file tasks queue is NOT empty
// or the upstream queues that feed into it ARE empty.
if (!this.pendingFileTasks.isEmpty()) {
// There are file tasks ready to be consumed, exit.
return;
}
if (this.directoryTasks.isEmpty()) {
if (this.rootDirectoryTasks.isEmpty()) {
// There are no root directory tasks to move into the
// directory queue, exit.
return;
} else {
// Move the next root directory task into the
// directories queue. Note that the task was already
// added to the tasks in progress list when the task was
// created in scheduleFileIngestTasks().
this.directoryTasks.add(this.rootDirectoryTasks.pollFirst());
}
}
// Try to add the most recently added directory from the
// directory tasks queue to the pending file tasks queue. Note
// the removal of the task from the tasks in progress list. If
// the task is enqueued, it will be put back in the list by
// the addToPendingFileTasksQueue() helper.
boolean tasksEnqueuedForDirectory = false;
FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1);
this.tasksInProgressAndPending.remove(directoryTask);
if (shouldEnqueueFileTask(directoryTask)) {
addToPendingFileTasksQueue(directoryTask);
tasksEnqueuedForDirectory = true;
}
// If the directory contains subdirectories or files, try to
// enqueue tasks for them as well.
final AbstractFile directory = directoryTask.getFile();
try {
for (Content child : directory.getChildren()) {
if (child instanceof AbstractFile) {
AbstractFile file = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(directoryTask.getIngestJob(), file);
if (file.hasChildren()) {
// Found a subdirectory, put the task in the
// pending directory tasks queue. Note the
// addition of the task to the tasks in progress
// list. This is necessary because this is the
// first appearance of this task in the queues.
this.tasksInProgressAndPending.add(childTask);
this.directoryTasks.add(childTask);
tasksEnqueuedForDirectory = true;
} else if (shouldEnqueueFileTask(childTask)) {
// Found a file, put the task directly into the
// pending file tasks queue. The new task will
// be put into the tasks in progress list by the
// addToPendingFileTasksQueue() helper.
addToPendingFileTasksQueue(childTask);
tasksEnqueuedForDirectory = true;
}
}
}
} catch (TskCoreException ex) {
String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS
logger.log(Level.SEVERE, errorMessage, ex);
}
// In the case where the directory task is not pushed into the
// the pending file tasks queue and has no children, check to
// see if the job is completed - the directory task might have
// been the last task for the job.
if (!tasksEnqueuedForDirectory) {
IngestJob job = directoryTask.getIngestJob();
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
}
}
}
}
/**
* A helper method that examines the file associated with a file ingest task
* to determine whether or not the file should be processed and therefore
* the task should be enqueued.
*
* @param task The task to be scrutinized.
* @return True or false.
*/
private static boolean shouldEnqueueFileTask(final FileIngestTask task) {
final AbstractFile file = task.getFile();
// Skip the task if the file is an unallocated space file and the
// process unallocated space flag is not set for this job.
if (!task.getIngestJob().shouldProcessUnallocatedSpace()
&& file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS)) {
return false;
}
// Skip the task if the file is actually the pseudo-file for the parent
// or current directory.
String fileName = file.getName();
if (fileName.equals(".") || fileName.equals("..")) {
return false;
}
// Skip the task if the file is one of a select group of special, large
// NTFS or FAT file system files.
// the file is in the root directory, has a file name
// starting with $, containing : (not default attributes)
//with meta address < 32, i.e. some special large NTFS and FAT files
if (file instanceof org.sleuthkit.datamodel.File) {
final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
// Get the type of the file system, if any, that owns the file.
TskData.TSK_FS_TYPE_ENUM fsType = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_UNSUPP;
try {
FileSystem fs = f.getFileSystem();
if (fs != null) {
fsType = fs.getFsType();
}
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error querying file system for " + f, ex); //NON-NLS
}
// If the file system is not NTFS or FAT, don't skip the file.
if ((fsType.getValue() & FAT_NTFS_FLAGS) == 0) {
return true;
}
// Find out whether the file is in a root directory.
boolean isInRootDir = false;
try {
AbstractFile parent = f.getParentDirectory();
isInRootDir = parent.isRoot();
} catch (TskCoreException ex) {
logger.log(Level.WARNING, "Error querying parent directory for" + f.getName(), ex); //NON-NLS
}
// If the file is in the root directory of an NTFS or FAT file
// system, check its meta-address and check its name for the '$'
// character and a ':' character (not a default attribute).
if (isInRootDir && f.getMetaAddr() < 32) {
String name = f.getName();
if (name.length() > 0 && name.charAt(0) == '$' && name.contains(":")) {
return false;
}
}
}
return true;
}
/**
* A helper method to safely add a file ingest task to the blocking pending
* tasks queue.
*
* @param task
* @throws IllegalStateException
*/
synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException {
tasksInProgressAndPending.add(task);
try {
// The file is added to the front of the pending files queue because
// at least one image has been processed that had a folder full of
// archive files. The queue grew to have thousands of entries, so
// this (might) help with pushing those files through ingest.
this.pendingFileTasks.addFirst(task);
} catch (IllegalStateException ex) {
tasksInProgressAndPending.remove(task);
Logger.getLogger(IngestTasksScheduler.class.getName()).log(Level.SEVERE, "Pending file tasks queue is full", ex); //NON-NLS
throw ex;
}
}
/**
* Determines whether or not all current ingest tasks for an ingest job are
* completed.
*
* @param job The job for which the query is to be performed.
* @return True or false.
*/
private boolean tasksForJobAreCompleted(IngestJob job) {
for (IngestTask task : tasksInProgressAndPending) {
if (task.getIngestJob().getId() == job.getId()) {
return false;
}
}
return true;
}
/**
* A helper that removes all of the ingest tasks associated with an ingest
* job from a tasks queue. The task is removed from the the tasks in
* progress list as well.
*
* @param taskQueue The queue from which to remove the tasks.
* @param jobId The id of the job for which the tasks are to be removed.
*/
private void removeTasksForJob(Collection<? extends IngestTask> taskQueue, long jobId) {
Iterator<? extends IngestTask> iterator = taskQueue.iterator();
while (iterator.hasNext()) {
IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) {
this.tasksInProgressAndPending.remove(task);
iterator.remove();
}
}
}
/**
* A helper that counts the number of ingest tasks in a task queue for a
* given job.
*
* @param queue The queue for which to count tasks.
* @param jobId The id of the job for which the tasks are to be counted.
* @return The count.
*/
private static int countTasksForJob(Collection<? extends IngestTask> queue, long jobId) {
Iterator<? extends IngestTask> iterator = queue.iterator();
int count = 0;
while (iterator.hasNext()) {
IngestTask task = (IngestTask) iterator.next();
if (task.getIngestJob().getId() == jobId) {
count++;
}
}
return count;
}
/**
* RJCTODO
*
* @param jobId
* @return
*/
synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
return new IngestJobTasksSnapshot(jobId);
}
/**
* Prioritizes tasks for the root directories file ingest tasks queue (file
* system root directories, layout files and virtual directories).
*/
private static class RootDirectoryTaskComparator implements Comparator<FileIngestTask> {
@Override
public int compare(FileIngestTask q1, FileIngestTask q2) {
AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile());
AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile());
if (p1 == p2) {
return (int) (q2.getFile().getId() - q1.getFile().getId());
} else {
return p2.ordinal() - p1.ordinal();
}
}
private static class AbstractFilePriority {
enum Priority {
LAST, LOW, MEDIUM, HIGH
}
static final List<Pattern> LAST_PRI_PATHS = new ArrayList<>();
static final List<Pattern> LOW_PRI_PATHS = new ArrayList<>();
static final List<Pattern> MEDIUM_PRI_PATHS = new ArrayList<>();
static final List<Pattern> HIGH_PRI_PATHS = new ArrayList<>();
/* prioritize root directory folders based on the assumption that we
* are
* looking for user content. Other types of investigations may want
* different
* priorities. */
static /* prioritize root directory
* folders based on the assumption that we are
* looking for user content. Other types of investigations may want
* different
* priorities. */ {
// these files have no structure, so they go last
//unalloc files are handled as virtual files in getPriority()
//LAST_PRI_PATHS.schedule(Pattern.compile("^\\$Unalloc", Pattern.CASE_INSENSITIVE));
//LAST_PRI_PATHS.schedule(Pattern.compile("^\\Unalloc", Pattern.CASE_INSENSITIVE));
LAST_PRI_PATHS.add(Pattern.compile("^pagefile", Pattern.CASE_INSENSITIVE));
LAST_PRI_PATHS.add(Pattern.compile("^hiberfil", Pattern.CASE_INSENSITIVE));
// orphan files are often corrupt and windows does not typically have
// user content, so put them towards the bottom
LOW_PRI_PATHS.add(Pattern.compile("^\\$OrphanFiles", Pattern.CASE_INSENSITIVE));
LOW_PRI_PATHS.add(Pattern.compile("^Windows", Pattern.CASE_INSENSITIVE));
// all other files go into the medium category too
MEDIUM_PRI_PATHS.add(Pattern.compile("^Program Files", Pattern.CASE_INSENSITIVE));
// user content is top priority
HIGH_PRI_PATHS.add(Pattern.compile("^Users", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^Documents and Settings", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^home", Pattern.CASE_INSENSITIVE));
HIGH_PRI_PATHS.add(Pattern.compile("^ProgramData", Pattern.CASE_INSENSITIVE));
}
/**
* Get the enabled priority for a given file.
*
* @param abstractFile
*
* @return
*/
static AbstractFilePriority.Priority getPriority(final AbstractFile abstractFile) {
if (!abstractFile.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.FS)) {
//quickly filter out unstructured content
//non-fs virtual files and dirs, such as representing unalloc space
return AbstractFilePriority.Priority.LAST;
}
//determine the fs files priority by name
final String path = abstractFile.getName();
if (path == null) {
return AbstractFilePriority.Priority.MEDIUM;
}
for (Pattern p : HIGH_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.HIGH;
}
}
for (Pattern p : MEDIUM_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.MEDIUM;
}
}
for (Pattern p : LOW_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.LOW;
}
}
for (Pattern p : LAST_PRI_PATHS) {
Matcher m = p.matcher(path);
if (m.find()) {
return AbstractFilePriority.Priority.LAST;
}
}
//default is medium
return AbstractFilePriority.Priority.MEDIUM;
}
}
}
/**
* Wraps access to pending data source ingest tasks in the interface
* required by the ingest threads.
*/
private final class DataSourceIngestTaskQueue implements IngestTaskQueue {
/**
* @inheritDoc
*/
@Override
public IngestTask getNextTask() throws InterruptedException {
return IngestTasksScheduler.this.pendingDataSourceTasks.take();
}
}
/**
* Wraps access to pending file ingest tasks in the interface required by
* the ingest threads.
*/
private final class FileIngestTaskQueue implements IngestTaskQueue {
/**
* @inheritDoc
*/
@Override
public IngestTask getNextTask() throws InterruptedException {
FileIngestTask task = IngestTasksScheduler.this.pendingFileTasks.takeFirst();
shuffleFileTaskQueues();
return task;
}
}
/**
* A snapshot of ingest tasks data for an ingest job.
*/
class IngestJobTasksSnapshot {
private final long jobId;
private final long rootQueueSize;
private final long dirQueueSize;
private final long fileQueueSize;
private final long dsQueueSize;
private final long runningListSize;
/**
* RJCTODO
* @param jobId
*/
IngestJobTasksSnapshot(long jobId) {
this.jobId = jobId;
this.rootQueueSize = countTasksForJob(IngestTasksScheduler.this.rootDirectoryTasks, jobId);
this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId);
this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId);
this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId);
this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize;
}
/**
* RJCTODO
* @return
*/
long getJobId() {
return jobId;
}
/**
* RJCTODO
* @return
*/
long getRootQueueSize() {
return rootQueueSize;
}
/**
* RJCTODO
* @return
*/
long getDirQueueSize() {
return dirQueueSize;
}
/**
* RJCTODO
* @return
*/
long getFileQueueSize() {
return fileQueueSize;
}
/**
* RJCTODO
* @return
*/
long getDsQueueSize() {
return dsQueueSize;
}
/**
* RJCTODO
* @return
*/
long getRunningListSize() {
return runningListSize;
}
}
}