Complete first version of multi-stage ingest

This commit is contained in:
Richard Cordovano 2014-11-03 23:56:12 -05:00
parent f598fd6614
commit 3b9b10ea2a
9 changed files with 442 additions and 439 deletions

View File

@ -67,7 +67,7 @@ public class DataSourceIngestModuleProgress {
* @param message Message to display
*/
public void progress(String message) {
this.job.advanceDataSourceIngestProgressBar(message); // RJCTODO: Is this right?
this.job.advanceDataSourceIngestProgressBar(message);
}
/**

View File

@ -42,23 +42,35 @@ import org.sleuthkit.datamodel.Content;
*/
final class IngestJob {
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
private static final IngestTasksScheduler ingestScheduler = IngestTasksScheduler.getInstance();
// These static fields are used for the creation and management of ingest
// jobs in progress.
private static volatile boolean jobCreationIsEnabled;
private static final AtomicLong nextIngestJobId = new AtomicLong(0L);
private static final ConcurrentHashMap<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
// An ingest job may have multiple stages.
/**
* An ingest job may have multiple stages.
*/
private enum Stages {
FIRST, // High priority data source ingest modules plus file ingest modules
SECOND // Low priority data source ingest modules
/**
* High priority data source ingest modules and file ingest modules.
*/
FIRST,
/**
* Lower priority, usually long-running, data source ingest modules.
*/
SECOND
};
// These fields define the ingest job and the work it entails.
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance();
/**
* These static fields are used for the creation and management of ingest
* jobs in progress.
*/
private static volatile boolean jobCreationIsEnabled;
private static final AtomicLong nextJobId = new AtomicLong(0L);
private static final ConcurrentHashMap<Long, IngestJob> jobsById = new ConcurrentHashMap<>();
/**
* These fields define the ingest job and the work it entails.
*/
private final long id;
private final Content dataSource;
private final boolean processUnallocatedSpace;
@ -68,10 +80,12 @@ final class IngestJob {
private DataSourceIngestPipeline secondStageDataSourceIngestPipeline;
private final LinkedBlockingQueue<FileIngestPipeline> fileIngestPipelines;
// These fields are used to update the ingest progress UI components. The
// filesInProgress collection contains the names of the files that are in
// the file ingest pipelines and the two file counter fields are used to
// update the file ingest progress bar.
/**
* These fields are used to update ingest progress UI components for the
* job. The filesInProgress collection contains the names of the files that
* are in the file ingest pipelines and the two file counter fields are used
* to update the file ingest progress bar.
*/
private ProgressHandle dataSourceIngestProgress;
private final Object dataSourceIngestProgressLock;
private final List<String> filesInProgress;
@ -80,12 +94,16 @@ final class IngestJob {
private ProgressHandle fileIngestProgress;
private final Object fileIngestProgressLock;
// These fields support cancellation of either the currently running data
// source ingest module or the entire ingest job.
/**
* These fields support cancellation of either the currently running data
* source ingest module or the entire ingest job.
*/
private volatile boolean currentDataSourceIngestModuleCancelled;
private volatile boolean cancelled;
// This field is used for generating ingest job diagnostic data.
/**
* This field is used for generating ingest job diagnostic data.
*/
private final long startTime;
/**
@ -98,29 +116,27 @@ final class IngestJob {
}
/**
* Creates an ingest job for a data source.
* Starts an ingest job for a data source.
*
* @param dataSource The data source to ingest.
* @param ingestModuleTemplates The ingest module templates to use to create
* the ingest pipelines for the job.
* @param processUnallocatedSpace Whether or not the job should include
* processing of unallocated space.
*
* @return A collection of ingest module start up errors, empty on success.
*
* @throws InterruptedException
*/
static List<IngestModuleError> startJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) throws InterruptedException {
static List<IngestModuleError> startJob(Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) {
List<IngestModuleError> errors = new ArrayList<>();
if (IngestJob.jobCreationIsEnabled) {
long jobId = nextIngestJobId.incrementAndGet();
long jobId = nextJobId.incrementAndGet();
IngestJob job = new IngestJob(jobId, dataSource, processUnallocatedSpace);
IngestJob.jobsById.put(jobId, job);
errors = job.start(ingestModuleTemplates);
if (errors.isEmpty() && (job.hasDataSourceIngestPipeline() || job.hasFileIngestPipeline())) { // RJCTODO: What about 2nd stage only?
ingestJobsById.put(jobId, job);
if (errors.isEmpty() && job.hasIngestPipeline()) {
IngestManager.getInstance().fireIngestJobStarted(jobId);
IngestJob.ingestScheduler.scheduleIngestTasks(job);
logger.log(Level.INFO, "Ingest job {0} started", jobId);
IngestJob.logger.log(Level.INFO, "Ingest job {0} started", jobId);
} else {
IngestJob.jobsById.remove(jobId);
}
}
return errors;
@ -132,27 +148,27 @@ final class IngestJob {
* @return True or false.
*/
static boolean ingestJobsAreRunning() {
return !ingestJobsById.isEmpty();
return !jobsById.isEmpty();
}
/**
* RJCTODO
* Gets snapshots of the state of all running ingest jobs.
*
* @return
* @return A list of ingest job state snapshots.
*/
static List<IngestJobSnapshot> getJobSnapshots() {
List<IngestJobSnapshot> snapShots = new ArrayList<>();
for (IngestJob job : IngestJob.ingestJobsById.values()) {
for (IngestJob job : IngestJob.jobsById.values()) {
snapShots.add(job.getIngestJobSnapshot());
}
return snapShots;
}
/**
* RJCTODO
* Cancels all running ingest jobs.
*/
static void cancelAllJobs() {
for (IngestJob job : ingestJobsById.values()) {
for (IngestJob job : jobsById.values()) {
job.cancel();
}
}
@ -165,7 +181,7 @@ final class IngestJob {
* @param processUnallocatedSpace Whether or not unallocated space should be
* processed during the ingest job.
*/
IngestJob(long id, Content dataSource, boolean processUnallocatedSpace) {
private IngestJob(long id, Content dataSource, boolean processUnallocatedSpace) {
this.id = id;
this.dataSource = dataSource;
this.processUnallocatedSpace = processUnallocatedSpace;
@ -178,9 +194,9 @@ final class IngestJob {
}
/**
* Gets the identifier assigned to the ingest job.
* Gets the identifier assigned to this job.
*
* @return The ingest job identifier.
* @return The job identifier.
*/
long getId() {
return this.id;
@ -206,51 +222,12 @@ final class IngestJob {
}
/**
* Starts up the ingest pipelines and ingest progress bars.
*
* @return A collection of ingest module startup errors, empty on success.
* @throws InterruptedException
*/
List<IngestModuleError> start(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException {
this.createIngestPipelines(ingestModuleTemplates);
List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) {
if (!this.dataSourceIngestPipeline.isEmpty()) {
this.startDataSourceIngestProgressBar();
}
if (!this.fileIngestPipelines.peek().isEmpty()) {
this.startFileIngestProgressBar();
}
}
return errors;
}
/**
* Checks to see if this job has a data source ingest pipeline.
*
* @return True or false.
*/
boolean hasDataSourceIngestPipeline() {
return (this.dataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if the job has a file ingest pipeline.
*
* @return True or false.
*/
boolean hasFileIngestPipeline() {
return (this.fileIngestPipelines.peek().isEmpty() == false);
}
/**
* Passes the data source for this job through the data source ingest
* Passes the data source for this job through a data source ingest
* pipeline.
*
* @param task A data source ingest task wrapping the data source.
* @throws InterruptedException
*/
void process(DataSourceIngestTask task) throws InterruptedException {
void process(DataSourceIngestTask task) {
try {
if (!this.isCancelled() && !this.dataSourceIngestPipeline.isEmpty()) {
List<IngestModuleError> errors = new ArrayList<>();
@ -268,9 +245,12 @@ final class IngestJob {
}
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
// No matter what happens, let the task scheduler know that this
// task is completed and check for job completion.
IngestJob.taskScheduler.notifyTaskCompleted(task);
if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) {
this.handleTasksCompleted();
}
}
}
@ -284,11 +264,15 @@ final class IngestJob {
void process(FileIngestTask task) throws InterruptedException {
try {
if (!this.isCancelled()) {
// Get a file ingest pipeline not currently in use by another
// file ingest thread.
/**
* Get a file ingest pipeline not currently in use by another
* file ingest thread.
*/
FileIngestPipeline pipeline = this.fileIngestPipelines.take();
if (!pipeline.isEmpty()) {
// Get the file to process.
/**
* Get the file to process.
*/
AbstractFile file = task.getFile();
// Update the file ingest progress bar.
@ -328,46 +312,28 @@ final class IngestJob {
this.fileIngestPipelines.put(pipeline);
}
} finally {
// No matter what happens, let the ingest scheduler know that this
// task is completed.
IngestJob.ingestScheduler.notifyTaskCompleted(task);
}
}
/**
*
* @param file
*/
void addFiles(List<AbstractFile> files) {
// RJCTODO: Add handling of lack of support for file ingest in second stage
for (AbstractFile file : files) {
try {
// RJCTODO: Deal with possible IllegalStateException; maybe don't need logging here
IngestJob.ingestScheduler.scheduleFileIngestTask(this, file);
} catch (InterruptedException ex) {
// Handle the unexpected interrupt here rather than make ingest
// module writers responsible for writing this exception handler.
// The interrupt flag of the thread is reset for detection by
// the thread task code.
Thread.currentThread().interrupt();
IngestJob.logger.log(Level.SEVERE, "File task scheduling unexpectedly interrupted", ex); //NON-NLS
// No matter what happens, let the task scheduler know that this
// task is completed and check for job completion.
IngestJob.taskScheduler.notifyTaskCompleted(task);
if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) {
this.handleTasksCompleted();
}
}
}
/**
* Allows the ingest tasks scheduler to notify this ingest job whenever all
* the scheduled tasks for this ingest job have been completed.
* Adds more files to an ingest job, i.e., derived or carved files. Not
* currently supported for the second stage of the job.
*
* @param files A list of files to add.
*/
void notifyTasksCompleted() {
switch (this.stage) {
case FIRST:
this.finishFirstStage();
this.startSecondStage();
break;
case SECOND:
this.finish();
break;
void addFiles(List<AbstractFile> files) {
if (IngestJob.Stages.FIRST == this.stage) {
for (AbstractFile file : files) {
IngestJob.taskScheduler.scheduleFileIngestTask(this, file);
}
} else {
IngestJob.logger.log(Level.SEVERE, "Adding files during second stage not supported"); //NON-NLS
}
}
@ -432,7 +398,6 @@ final class IngestJob {
}
}
// RJCTODO: Is this right?
/**
* Updates the data source ingest progress bar display name.
*
@ -527,8 +492,10 @@ final class IngestJob {
this.cancelled = true;
// Tell the ingest scheduler to cancel all pending tasks.
IngestJob.ingestScheduler.cancelPendingTasksForIngestJob(this);
/**
* Tell the task scheduler to cancel all pending tasks.
*/
IngestJob.taskScheduler.cancelPendingTasksForIngestJob(this);
}
/**
@ -541,25 +508,13 @@ final class IngestJob {
return this.cancelled;
}
/**
* Get some basic performance statistics on this job.
*
* @return An ingest job statistics object.
*/
IngestJobSnapshot getIngestJobSnapshot() {
return new IngestJobSnapshot();
}
/**
* Creates the file and data source ingest pipelines.
*
* @param ingestModuleTemplates Ingest module templates to use to populate
* the pipelines.
* @throws InterruptedException
*/
private void createIngestPipelines(List<IngestModuleTemplate> ingestModuleTemplates) throws InterruptedException {
// RJCTODO: Improve variable names!
private void createIngestPipelines(List<IngestModuleTemplate> ingestModuleTemplates) {
// Make mappings of ingest module factory class names to templates.
Map<String, IngestModuleTemplate> dataSourceModuleTemplates = new HashMap<>();
Map<String, IngestModuleTemplate> fileModuleTemplates = new HashMap<>();
@ -595,9 +550,18 @@ final class IngestJob {
this.dataSourceIngestPipeline = firstStageDataSourceIngestPipeline;
// Construct the file ingest pipelines.
int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates));
try {
int numberOfFileIngestThreads = IngestManager.getInstance().getNumberOfFileIngestThreads();
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
this.fileIngestPipelines.put(new FileIngestPipeline(this, fileIngestModuleTemplates));
}
} catch (InterruptedException ex) {
/**
* The current thread was interrupted while blocked on a full queue.
* Blocking should never happen here, but reset the interrupted flag
* rather than just swallowing the exception.
*/
Thread.currentThread().interrupt();
}
}
@ -623,14 +587,121 @@ final class IngestJob {
return templates;
}
/**
* Starts up the ingest pipelines and ingest progress bars.
*
* @return A collection of ingest module startup errors, empty on success.
*/
private List<IngestModuleError> start(List<IngestModuleTemplate> ingestModuleTemplates) {
this.createIngestPipelines(ingestModuleTemplates);
List<IngestModuleError> errors = startUpIngestPipelines();
if (errors.isEmpty()) {
if (this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline()) {
// There is at least one first stage pipeline.
this.startFirstStage();
} else if (this.hasSecondStageDataSourceIngestPipeline()) {
// There is no first stage pipeline, but there is a second stage
// ingest pipeline.
this.startSecondStage();
}
}
return errors;
}
/**
* Starts the first stage of the job.
*/
private void startFirstStage() {
this.stage = IngestJob.Stages.FIRST;
/**
* Start one or both of the first stage progress bars.
*/
if (this.hasFirstStageDataSourceIngestPipeline()) {
this.startDataSourceIngestProgressBar();
}
if (this.hasFileIngestPipeline()) {
this.startFileIngestProgressBar();
}
/**
* Schedule the first stage tasks.
*/
if (this.hasFirstStageDataSourceIngestPipeline() && this.hasFileIngestPipeline()) {
IngestJob.taskScheduler.scheduleIngestTasks(this);
} else if (this.hasFirstStageDataSourceIngestPipeline()) {
IngestJob.taskScheduler.scheduleDataSourceIngestTask(this);
} else {
IngestJob.taskScheduler.scheduleFileIngestTasks(this);
/**
* No data source ingest task has been scheduled for this stage, and
* it is possible, if unlikely, that no file ingest tasks were
* actually scheduled since there are files that get filtered out by
* the tasks scheduler. In this special case, an ingest thread will
* never get to make the following check for this stage of the job.
*/
if (IngestJob.taskScheduler.tasksForJobAreCompleted(this)) {
this.handleTasksCompleted();
}
}
}
/**
* Starts the second stage of the ingest job.
*/
private void startSecondStage() {
this.stage = IngestJob.Stages.SECOND;
this.startDataSourceIngestProgressBar();
this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
IngestJob.taskScheduler.scheduleDataSourceIngestTask(this);
}
/**
* Checks to see if this job has at least one ingest pipeline.
*
* @return True or false.
*/
private boolean hasIngestPipeline() {
return this.hasFirstStageDataSourceIngestPipeline()
|| this.hasFileIngestPipeline()
|| this.hasSecondStageDataSourceIngestPipeline();
}
/**
* Checks to see if this job has a first stage data source ingest pipeline.
*
* @return True or false.
*/
private boolean hasFirstStageDataSourceIngestPipeline() {
return (this.firstStageDataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if this job has a second stage data source ingest pipeline.
*
* @return True or false.
*/
private boolean hasSecondStageDataSourceIngestPipeline() {
return (this.secondStageDataSourceIngestPipeline.isEmpty() == false);
}
/**
* Checks to see if the job has a file ingest pipeline.
*
* @return True or false.
*/
private boolean hasFileIngestPipeline() {
return (this.fileIngestPipelines.peek().isEmpty() == false);
}
/**
* Starts up each of the file and data source ingest modules to collect
* possible errors.
*
* @return A collection of ingest module startup errors, empty on success.
* @throws InterruptedException
*/
private List<IngestModuleError> startUpIngestPipelines() throws InterruptedException {
private List<IngestModuleError> startUpIngestPipelines() {
List<IngestModuleError> errors = new ArrayList<>();
// Start up the first stage data source ingest pipeline.
@ -725,8 +796,23 @@ final class IngestJob {
}
/**
* Shuts down the file ingest pipelines and current progress bars, if any,
* for this job.
* Handles when all ingest tasks for this job are completed by finishing the
* current stage and possibly starting the next stage.
*/
private void handleTasksCompleted() {
switch (this.stage) {
case FIRST:
this.finishFirstStage();
break;
case SECOND:
this.finish();
break;
}
}
/**
* Shuts down the first stage ingest pipelines and progress bars and starts
* the second stage, if appropriate.
*/
private void finishFirstStage() {
// Shut down the file ingest pipelines. Note that no shut down is
@ -758,22 +844,12 @@ final class IngestJob {
this.fileIngestProgress = null;
}
}
}
/**
* RJCTODO
*/
private void startSecondStage() {
this.stage = IngestJob.Stages.SECOND;
if (!this.cancelled && !this.secondStageDataSourceIngestPipeline.isEmpty()) {
this.dataSourceIngestPipeline = this.secondStageDataSourceIngestPipeline;
this.startDataSourceIngestProgressBar();
try {
IngestJob.ingestScheduler.scheduleDataSourceIngestTask(this);
} catch (InterruptedException ex) {
// RJCTODO:
this.finish();
}
/**
* Start the second stage, if appropriate.
*/
if (!this.cancelled && this.hasSecondStageDataSourceIngestPipeline()) {
this.startSecondStage();
} else {
this.finish();
}
@ -792,7 +868,7 @@ final class IngestJob {
}
}
IngestJob.ingestJobsById.remove(this.id);
IngestJob.jobsById.remove(this.id);
if (!this.isCancelled()) {
logger.log(Level.INFO, "Ingest job {0} completed", this.id);
IngestManager.getInstance().fireIngestJobCompleted(this.id);
@ -821,6 +897,15 @@ final class IngestJob {
this.currentDataSourceIngestModuleCancelled = true;
}
/**
* Gets a snapshot of this jobs state and performance.
*
* @return An ingest job statistics object.
*/
private IngestJobSnapshot getIngestJobSnapshot() {
return new IngestJobSnapshot();
}
/**
* Stores basic diagnostic statistics for an ingest job.
*/
@ -847,23 +932,13 @@ final class IngestJob {
this.estimatedFilesToProcess = IngestJob.this.estimatedFilesToProcess;
this.snapShotTime = new Date().getTime();
}
this.tasksSnapshot = IngestJob.ingestScheduler.getTasksSnapshotForJob(this.jobId);
this.tasksSnapshot = IngestJob.taskScheduler.getTasksSnapshotForJob(this.jobId);
}
/**
* RJCTODO
*
* @return
*/
long getJobId() {
return this.jobId;
}
/**
* RJCTODO
*
* @return
*/
String getDataSource() {
return dataSource;
}
@ -916,47 +991,22 @@ final class IngestJob {
return estimatedFilesToProcess;
}
/**
* RJCTODO
*
* @return
*/
long getRootQueueSize() {
return this.tasksSnapshot.getRootQueueSize();
}
/**
* RJCTODO
*
* @return
*/
long getDirQueueSize() {
return this.tasksSnapshot.getDirQueueSize();
return this.tasksSnapshot.getDirectoryTasksQueueSize();
}
/**
* RJCTODO
*
* @return
*/
long getFileQueueSize() {
return this.tasksSnapshot.getFileQueueSize();
}
/**
* RJCTODO
*
* @return
*/
long getDsQueueSize() {
return this.tasksSnapshot.getDsQueueSize();
}
/**
* RJCTODO
*
* @return
*/
long getRunningListSize() {
return this.tasksSnapshot.getRunningListSize();
}

View File

@ -19,7 +19,6 @@
package org.sleuthkit.autopsy.ingest;
import java.util.List;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
@ -29,7 +28,6 @@ import org.sleuthkit.datamodel.Content;
*/
public final class IngestJobContext {
private static final Logger logger = Logger.getLogger(IngestJobContext.class.getName());
private final IngestJob ingestJob;
IngestJobContext(IngestJob ingestJob) {
@ -101,25 +99,25 @@ public final class IngestJobContext {
}
/**
* Adds one or more files to the files to be passed through the file ingest
* pipeline of the ingest job associated with this context.
* Adds one or more files, i.e., extracted or carved files, to the ingest
* job associated with this context.
*
* @param files The files to be processed by the file ingest pipeline.
* @param files The files to be added.
* @deprecated use addFilesToJob() instead
*/
@Deprecated
public void scheduleFiles(List<AbstractFile> files) {
this.addFilesToJob(files);
}
/**
* Adds one or more files to the files to be passed through the file ingest
* pipeline of the ingest job associated with this context.
* Adds one or more files, i.e., extracted or carved files, to the ingest
* job associated with this context.
*
* @param files The files to be processed by the file ingest pipeline.
* @param files The files to be added.
*/
public void addFilesToJob(List<AbstractFile> files) {
this.ingestJob.addFiles(files);
}
}

View File

@ -134,7 +134,7 @@ public class IngestManager {
*/
private void startDataSourceIngestTask() {
long threadId = nextThreadId.incrementAndGet();
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
@ -144,7 +144,7 @@ public class IngestManager {
*/
private void startFileIngestTask() {
long threadId = nextThreadId.incrementAndGet();
fileIngestThreadPool.submit(new ExecuteIngestTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
fileIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
@ -154,7 +154,7 @@ public class IngestManager {
}
long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobsTask(taskId, dataSources, moduleTemplates, processUnallocatedSpace));
Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobsCallable(taskId, dataSources, moduleTemplates, processUnallocatedSpace));
startIngestJobsTasks.put(taskId, task);
}
@ -200,11 +200,11 @@ public class IngestManager {
return IngestJob.ingestJobsAreRunning();
}
/**
* Called each time a module in a data source pipeline starts
*
* @param task
* @param ingestModuleDisplayName
* @param ingestModuleDisplayName
*/
void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource()));
@ -212,20 +212,22 @@ public class IngestManager {
/**
* Called each time a module in a file ingest pipeline starts
*
* @param task
* @param ingestModuleDisplayName
* @param ingestModuleDisplayName
*/
void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
}
/**
* Called each time a data source ingest task completes
* @param task
*
* @param task
*/
void setIngestTaskProgressCompleted(DataSourceIngestTask task) {
ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId()));
@ -233,7 +235,8 @@ public class IngestManager {
/**
* Called when a file ingest pipeline is complete for a given file
* @param task
*
* @param task
*/
void setIngestTaskProgressCompleted(FileIngestTask task) {
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
@ -242,19 +245,21 @@ public class IngestManager {
synchronized (processedFilesSnapshotLock) {
processedFilesSnapshot.incrementProcessedFilesCount();
}
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
}
/**
* Internal method to update the times associated with each module.
* Internal method to update the times associated with each module.
*
* @param moduleName
* @param duration
* @param duration
*/
private void incrementModuleRunTime(String moduleName, Long duration) {
if (moduleName.equals("IDLE"))
if (moduleName.equals("IDLE")) {
return;
}
synchronized (ingestModuleRunTimes) {
Long prevTimeL = ingestModuleRunTimes.get(moduleName);
long prevTime = 0;
@ -262,12 +267,13 @@ public class IngestManager {
prevTime = prevTimeL;
}
prevTime += duration;
ingestModuleRunTimes.put(moduleName, prevTime);
ingestModuleRunTimes.put(moduleName, prevTime);
}
}
/**
* Return the list of run times for each module
*
* @return Map of module name to run time (in milliseconds)
*/
Map<String, Long> getModuleRunTimes() {
@ -279,13 +285,13 @@ public class IngestManager {
/**
* Get the stats on current state of each thread
* @return
*
* @return
*/
List<IngestThreadActivitySnapshot> getIngestThreadActivitySnapshots() {
return new ArrayList<>(ingestThreadActivitySnapshots.values());
}
public void cancelAllIngestJobs() {
// Stop creating new ingest jobs.
for (Future<Void> handle : startIngestJobsTasks.values()) {
@ -418,7 +424,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobStarted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
}
/**
@ -427,7 +433,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCompleted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
}
/**
@ -436,7 +442,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCancelled(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
}
/**
@ -445,7 +451,7 @@ public class IngestManager {
* @param file The file that is completed.
*/
void fireFileIngestDone(AbstractFile file) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
}
/**
@ -454,7 +460,7 @@ public class IngestManager {
* @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
}
/**
@ -465,7 +471,7 @@ public class IngestManager {
* content.
*/
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventTask(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
}
/**
@ -509,7 +515,7 @@ public class IngestManager {
/**
* Creates ingest jobs.
*/
private class StartIngestJobsTask implements Callable<Void> {
private final class StartIngestJobsCallable implements Callable<Void> {
private final long threadId;
private final List<Content> dataSources;
@ -517,7 +523,7 @@ public class IngestManager {
private final boolean processUnallocatedSpace;
private ProgressHandle progress;
StartIngestJobsTask(long threadId, List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
StartIngestJobsCallable(long threadId, List<Content> dataSources, List<IngestModuleTemplate> moduleTemplates, boolean processUnallocatedSpace) {
this.threadId = threadId;
this.dataSources = dataSources;
this.moduleTemplates = moduleTemplates;
@ -587,9 +593,6 @@ public class IngestManager {
break;
}
}
} catch (InterruptedException ex) {
// Reset interrupted status.
Thread.currentThread().interrupt();
} catch (Exception ex) {
logger.log(Level.SEVERE, "Failed to create ingest job", ex); //NON-NLS
} finally {
@ -603,12 +606,12 @@ public class IngestManager {
/**
* A consumer for an ingest task queue.
*/
private class ExecuteIngestTasksTask implements Runnable {
private final class ExecuteIngestTasksRunnable implements Runnable {
private final long threadId;
private final IngestTaskQueue tasks;
ExecuteIngestTasksTask(long threadId, IngestTaskQueue tasks) {
ExecuteIngestTasksRunnable(long threadId, IngestTaskQueue tasks) {
this.threadId = threadId;
this.tasks = tasks;
}
@ -632,7 +635,7 @@ public class IngestManager {
/**
* Fires ingest events to ingest manager property change listeners.
*/
private static class FireIngestEventTask implements Runnable {
private static final class FireIngestEventRunnable implements Runnable {
private final PropertyChangeSupport publisher;
private final IngestJobEvent jobEvent;
@ -640,7 +643,7 @@ public class IngestManager {
private final Object oldValue;
private final Object newValue;
FireIngestEventTask(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) {
FireIngestEventRunnable(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) {
this.publisher = publisher;
this.jobEvent = event;
this.moduleEvent = null;
@ -648,7 +651,7 @@ public class IngestManager {
this.newValue = newValue;
}
FireIngestEventTask(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) {
FireIngestEventRunnable(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) {
this.publisher = publisher;
this.jobEvent = null;
this.moduleEvent = event;
@ -695,9 +698,9 @@ public class IngestManager {
startTime = new Date();
this.activity = activity;
this.dataSourceName = dataSource.getName();
this.fileName = "";
this.fileName = "";
}
// file ingest thread
IngestThreadActivitySnapshot(long threadId, long jobId, String activity, Content dataSource, AbstractFile file) {
this.threadId = threadId;
@ -711,7 +714,7 @@ public class IngestManager {
long getJobId() {
return jobId;
}
long getThreadId() {
return threadId;
}

View File

@ -38,6 +38,7 @@ import org.sleuthkit.autopsy.modules.fileextmismatch.FileExtMismatchDetectorModu
import org.sleuthkit.autopsy.modules.filetypeid.FileTypeIdModuleFactory;
import org.sleuthkit.autopsy.modules.hashdatabase.HashLookupModuleFactory;
import org.sleuthkit.autopsy.modules.interestingitems.InterestingItemsIngestModuleFactory;
import org.sleuthkit.autopsy.modules.photoreccarver.PhotoRecCarverIngestModuleFactory;
import org.sleuthkit.autopsy.modules.sevenzip.ArchiveFileExtractorModuleFactory;
import org.sleuthkit.autopsy.python.JythonModuleLoader;
@ -51,8 +52,6 @@ final class IngestModuleFactoryLoader {
private static final String SAMPLE_EXECUTABLE_MODULE_FACTORY_CLASS_NAME = SampleExecutableIngestModuleFactory.class.getCanonicalName();
private static final ArrayList<String> coreModuleOrdering = new ArrayList<String>() {
{
// RJCTODO: Find out wherer ot put the photorec carver
// The ordering of the core ingest module factories implemented
// using Java is hard-coded.
add("org.sleuthkit.autopsy.recentactivity.RecentActivityExtracterModuleFactory"); //NON-NLS
@ -66,6 +65,7 @@ final class IngestModuleFactoryLoader {
add(E01VerifierModuleFactory.class.getCanonicalName());
add(AndroidModuleFactory.class.getCanonicalName());
add(InterestingItemsIngestModuleFactory.class.getCanonicalName());
add(PhotoRecCarverIngestModuleFactory.class.getCanonicalName());
}
};

View File

@ -44,9 +44,8 @@ final class IngestPipelinesConfiguration {
private static final String PIPELINE_TYPE_ATTR = "type"; //NON-NLS
private static final String STAGE_ONE_DATA_SOURCE_INGEST_PIPELINE_ELEM = "ImageAnalysisStageOne"; //NON-NLS
private static final String STAGE_TWO_DATA_SOURCE_INGEST_PIPELINE_ELEM = "ImageAnalysisStageTwo"; //NON-NLS
private static final String FILE_INGEST_PIPELINE_TYPE = "FileAnalysis"; //NON-NLS
private static final String FILE_INGEST_PIPELINE_ELEM = "FileAnalysis"; //NON-NLS
private static final String INGEST_MODULE_ELEM = "MODULE"; //NON-NLS
private static final String XML_MODULE_CLASS_NAME_ATTR = "location"; //NON-NLS
private static IngestPipelinesConfiguration instance;
@ -54,10 +53,6 @@ final class IngestPipelinesConfiguration {
private final List<String> fileIngestPipelineConfig = new ArrayList<>();
private final List<String> stageTwoDataSourceIngestPipelineConfig = new ArrayList<>();
// RJCTODO: Bring this code back into use, use it in IngestJob to sort things
// into the now three pipelines. Other NBMs built on top of Autopsy that
// have custom pipeline config files can do a PlatformUtil.extractResourceToUserConfigDir()
// before this is called.
/**
* Gets the ingest pipelines configuration singleton.
*
@ -140,7 +135,6 @@ final class IngestPipelinesConfiguration {
// Parse the pipeline elements to populate the pipeline
// configuration lists.
// RJCTODO: SHould check that each element is unique. Or could try the XSD bit.
List<String> pipelineConfig = null;
for (int pipelineNum = 0; pipelineNum < numPipelines; ++pipelineNum) {
Element pipelineElement = (Element) pipelineElements.item(pipelineNum);
@ -150,7 +144,7 @@ final class IngestPipelinesConfiguration {
case STAGE_ONE_DATA_SOURCE_INGEST_PIPELINE_ELEM:
pipelineConfig = this.stageOneDataSourceIngestPipelineConfig;
break;
case FILE_INGEST_PIPELINE_TYPE:
case FILE_INGEST_PIPELINE_ELEM:
pipelineConfig = this.fileIngestPipelineConfig;
break;
case STAGE_TWO_DATA_SOURCE_INGEST_PIPELINE_ELEM:

View File

@ -47,39 +47,55 @@ final class IngestTasksScheduler {
private static final int FAT_NTFS_FLAGS = TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT12.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT16.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_FAT32.getValue() | TskData.TSK_FS_TYPE_ENUM.TSK_FS_TYPE_NTFS.getValue();
private static IngestTasksScheduler instance;
// Scheduling of data source ingest tasks is accomplished by putting them
// in a FIFO queue to be consumed by the ingest threads. The pending data
// tasks queue is therefore wrapped in a "dispenser" that implements the
// IngestTaskQueue interface and is exposed via a getter method.
/**
* Scheduling of data source ingest tasks is accomplished by putting them in
* a FIFO queue to be consumed by the ingest threads, so the queue is
* wrapped in a "dispenser" that implements the IngestTaskQueue interface
* and is exposed via a getter method.
*/
private final LinkedBlockingQueue<DataSourceIngestTask> pendingDataSourceTasks;
private final DataSourceIngestTaskQueue dataSourceTasksDispenser;
// Scheduling of file ingest tasks is accomplished by "shuffling" them
// through a sequence of internal queues that allows for the interleaving of
// tasks from different ingest jobs based on priority. These scheduling
// queues are:
// 1. root directory tasks (priority queue)
// 2. directory tasks (FIFO queue)
// 3. pending file tasks (LIFO queue)
// Tasks in the pending file tasks queue are ready to be consumed by the
// ingest threads. The pending file tasks queue is therefore wrapped in a
// "dispenser" that implements the IngestTaskQueue interface and is exposed
// via a getter method.
/**
* Scheduling of file ingest tasks is accomplished by "shuffling" them
* through a sequence of internal queues that allows for the interleaving of
* tasks from different ingest jobs based on priority. These scheduling
* queues are:
*
* 1. Root directory tasks (priority queue)
*
* 2. Directory tasks (FIFO queue)
*
* 3. Pending file tasks (LIFO queue).
*
* The pending file tasks queue is LIFO to handle large numbers of files
* extracted from archive files. At least one image has been processed that
* had a folder full of archive files. The queue grew to have thousands of
* entries, as each successive archive file was expanded, so now extracted
* files get added to the front of the queue so that in such a scenario they
* would be processed before the expansion of the next archive file.
*
* Tasks in the pending file tasks queue are ready to be consumed by the
* ingest threads, so the queue is wrapped in a "dispenser" that implements
* the IngestTaskQueue interface and is exposed via a getter method.
*/
private final TreeSet<FileIngestTask> rootDirectoryTasks;
private final List<FileIngestTask> directoryTasks;
private final BlockingDeque<FileIngestTask> pendingFileTasks;
private final FileIngestTaskQueue fileTasksDispenser;
// The ingest scheduler is responsible for notifying an ingest jobs whenever
// all of the ingest tasks currently associated with the job are complete.
// To make this possible, the ingest tasks scheduler needs to keep track not
// only of the tasks in its queues, but also of the tasks that have been
// handed out for processing by code running on the ingest manager's ingest
// threads. Therefore all ingest tasks are added to this list and are not
// removed when an ingest thread takes an ingest task. Instead, the ingest
// thread calls back into the scheduler when the task is completed, at
// which time the task will be removed from this list.
private final List<IngestTask> tasksInProgressAndPending;
/**
* The ingest tasks scheduler allows ingest jobs to query it to see if there
* are any tasks in progress for the job. To make this possible, the ingest
* tasks scheduler needs to keep track not only of the tasks in its queues,
* but also of the tasks that have been handed out for processing by the
* ingest threads. Therefore all ingest tasks are added to this list when
* they are created and are not removed when an ingest thread takes an
* ingest task. Instead, the ingest thread calls back into the scheduler
* when the task is completed, at which time the task will be removed from
* this list.
*/
private final List<IngestTask> tasksInProgress;
/**
* Gets the ingest tasks scheduler singleton.
@ -101,7 +117,7 @@ final class IngestTasksScheduler {
this.directoryTasks = new ArrayList<>();
this.pendingFileTasks = new LinkedBlockingDeque<>();
this.fileTasksDispenser = new FileIngestTaskQueue();
this.tasksInProgressAndPending = new ArrayList<>();
this.tasksInProgress = new ArrayList<>();
}
/**
@ -132,42 +148,33 @@ final class IngestTasksScheduler {
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleIngestTasks(IngestJob job) throws InterruptedException {
// The initial ingest scheduling for a job an an atomic operation.
// Otherwise, the data source task might be completed before the file
// tasks are created, resulting in a potential false positive when this
// task scheduler checks whether or not all the tasks for the job are
// completed.
if (job.hasDataSourceIngestPipeline()) {
scheduleDataSourceIngestTask(job);
}
if (job.hasFileIngestPipeline()) {
scheduleFileIngestTasks(job);
}
synchronized void scheduleIngestTasks(IngestJob job) {
// Scheduling of both a data source ingest task and file ingest tasks
// for a job must be an atomic operation. Otherwise, the data source
// task might be completed before the file tasks are scheduled,
// resulting in a potential false positive when another thread checks
// whether or not all the tasks for the job are completed.
this.scheduleDataSourceIngestTask(job);
this.scheduleFileIngestTasks(job);
}
/**
* Schedules a data source ingest task for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleDataSourceIngestTask(IngestJob job) throws InterruptedException {
// Create a data source ingest task for the data source associated with
// the ingest job and add the task to the pending data source tasks
// queue. Data source tasks are scheduled on a first come, first served
// basis.
synchronized void scheduleDataSourceIngestTask(IngestJob job) {
DataSourceIngestTask task = new DataSourceIngestTask(job);
this.tasksInProgressAndPending.add(task);
this.tasksInProgress.add(task);
try {
// This call should not block because the queue is (theoretically)
// unbounded.
this.pendingDataSourceTasks.put(task);
} catch (InterruptedException ex) {
this.tasksInProgressAndPending.remove(task);
IngestTasksScheduler.logger.log(Level.SEVERE, "Interruption of unexpected block on pending data source tasks queue", ex); //NON-NLS
throw ex;
/**
* The current thread was interrupted while blocked on a full queue.
* Discard the task and reset the interrupted flag.
*/
this.tasksInProgress.remove(task);
Thread.currentThread().interrupt();
}
}
@ -175,18 +182,15 @@ final class IngestTasksScheduler {
* Schedules file ingest tasks for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
*/
synchronized void scheduleFileIngestTasks(IngestJob job) throws InterruptedException {
synchronized void scheduleFileIngestTasks(IngestJob job) {
// Get the top level files for the data source associated with this job
// and add them to the root directories priority queue. The file tasks
// may be interleaved with file tasks from other jobs, based on priority.
// and add them to the root directories priority queue.
List<AbstractFile> topLevelFiles = getTopLevelFiles(job.getDataSource());
for (AbstractFile firstLevelFile : topLevelFiles) {
FileIngestTask task = new FileIngestTask(job, firstLevelFile);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
this.tasksInProgressAndPending.add(task);
this.tasksInProgress.add(task);
this.rootDirectoryTasks.add(task);
}
}
@ -197,16 +201,12 @@ final class IngestTasksScheduler {
* Schedules a file ingest task for an ingest job.
*
* @param job The job for which the tasks are to be scheduled.
* @param file The file associated with the task.
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
* @param file The file to be associated with the task.
*/
void scheduleFileIngestTask(IngestJob job, AbstractFile file) throws InterruptedException, IllegalStateException {
synchronized void scheduleFileIngestTask(IngestJob job, AbstractFile file) {
FileIngestTask task = new FileIngestTask(job, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
// This synchronized method sends the file task directly to the
// pending file tasks queue. This is done to prioritize derived
// and carved files generated by a file ingest task in progress.
this.tasksInProgress.add(task);
addToPendingFileTasksQueue(task);
}
}
@ -217,12 +217,24 @@ final class IngestTasksScheduler {
*
* @param task The completed task.
*/
synchronized void notifyTaskCompleted(IngestTask task) throws InterruptedException {
tasksInProgressAndPending.remove(task);
IngestJob job = task.getIngestJob();
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
synchronized void notifyTaskCompleted(IngestTask task) {
tasksInProgress.remove(task);
}
/**
* Queries the task scheduler to determine whether or not all current ingest
* tasks for an ingest job are completed.
*
* @param job The job for which the query is to be performed.
* @return True or false.
*/
synchronized boolean tasksForJobAreCompleted(IngestJob job) {
for (IngestTask task : tasksInProgress) {
if (task.getIngestJob().getId() == job.getId()) {
return false;
}
}
return true;
}
/**
@ -234,25 +246,17 @@ final class IngestTasksScheduler {
* @param job The job for which the tasks are to to canceled.
*/
synchronized void cancelPendingTasksForIngestJob(IngestJob job) {
// The scheduling queues are cleared of tasks for the job, and the tasks
// that are removed from the scheduling queues are also removed from the
// tasks in progress list. However, a tasks in progress check for the
// job may still return true since the tasks that have been taken by the
// ingest threads are still in the tasks in progress list.
long jobId = job.getId();
this.removeTasksForJob(this.rootDirectoryTasks, jobId);
this.removeTasksForJob(this.directoryTasks, jobId);
this.removeTasksForJob(this.pendingFileTasks, jobId);
this.removeTasksForJob(this.pendingDataSourceTasks, jobId);
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
}
}
/**
* A helper that gets the top level files such as file system root
* directories, layout files and virtual directories for a data source. Used
* to create file tasks to put into the root directories queue.
* Gets the top level files such as file system root directories, layout
* files and virtual directories for a data source. Used to create file
* tasks to put into the root directories queue.
*
* @param dataSource The data source.
* @return A list of top level files.
@ -290,14 +294,11 @@ final class IngestTasksScheduler {
}
/**
* A helper that "shuffles" the file task queues to ensure that there is at
* least one task in the pending file ingest tasks queue, as long as there
* are still file ingest tasks to be performed.
*
* @throws InterruptedException if the calling thread is blocked due to a
* full tasks queue and is interrupted.
* "Shuffles" the file task queues to ensure that there is at least one task
* in the pending file ingest tasks queue, as long as there are still file
* ingest tasks to be performed.
*/
synchronized private void shuffleFileTaskQueues() throws InterruptedException, IllegalStateException {
synchronized private void shuffleFileTaskQueues() {
// This is synchronized because it is called both by synchronized
// methods of this ingest scheduler and an unsynchronized method of its
// file tasks "dispenser".
@ -323,16 +324,13 @@ final class IngestTasksScheduler {
}
// Try to add the most recently added directory from the
// directory tasks queue to the pending file tasks queue. Note
// the removal of the task from the tasks in progress list. If
// the task is enqueued, it will be put back in the list by
// the addToPendingFileTasksQueue() helper.
boolean tasksEnqueuedForDirectory = false;
// directory tasks queue to the pending file tasks queue.
FileIngestTask directoryTask = this.directoryTasks.remove(this.directoryTasks.size() - 1);
this.tasksInProgressAndPending.remove(directoryTask);
this.tasksInProgress.remove(directoryTask);
if (shouldEnqueueFileTask(directoryTask)) {
addToPendingFileTasksQueue(directoryTask);
tasksEnqueuedForDirectory = true;
} else {
this.tasksInProgress.remove(directoryTask);
}
// If the directory contains subdirectories or files, try to
@ -349,16 +347,15 @@ final class IngestTasksScheduler {
// addition of the task to the tasks in progress
// list. This is necessary because this is the
// first appearance of this task in the queues.
this.tasksInProgressAndPending.add(childTask);
this.tasksInProgress.add(childTask);
this.directoryTasks.add(childTask);
tasksEnqueuedForDirectory = true;
} else if (shouldEnqueueFileTask(childTask)) {
// Found a file, put the task directly into the
// pending file tasks queue. The new task will
// be put into the tasks in progress list by the
// addToPendingFileTasksQueue() helper.
this.tasksInProgress.add(childTask);
addToPendingFileTasksQueue(childTask);
tasksEnqueuedForDirectory = true;
}
}
}
@ -366,24 +363,13 @@ final class IngestTasksScheduler {
String errorMessage = String.format("An error occurred getting the children of %s", directory.getName()); //NON-NLS
logger.log(Level.SEVERE, errorMessage, ex);
}
// In the case where the directory task is not pushed into the
// the pending file tasks queue and has no children, check to
// see if the job is completed - the directory task might have
// been the last task for the job.
if (!tasksEnqueuedForDirectory) {
IngestJob job = directoryTask.getIngestJob();
if (this.tasksForJobAreCompleted(job)) {
job.notifyTasksCompleted();
}
}
}
}
/**
* A helper method that examines the file associated with a file ingest task
* to determine whether or not the file should be processed and therefore
* the task should be enqueued.
* Examines the file associated with a file ingest task to determine whether
* or not the file should be processed and therefore whether or not the task
* should be enqueued.
*
* @param task The task to be scrutinized.
* @return True or false.
@ -407,9 +393,6 @@ final class IngestTasksScheduler {
// Skip the task if the file is one of a select group of special, large
// NTFS or FAT file system files.
// the file is in the root directory, has a file name
// starting with $, containing : (not default attributes)
//with meta address < 32, i.e. some special large NTFS and FAT files
if (file instanceof org.sleuthkit.datamodel.File) {
final org.sleuthkit.datamodel.File f = (org.sleuthkit.datamodel.File) file;
@ -452,50 +435,28 @@ final class IngestTasksScheduler {
return true;
}
// RJCTODO: Is this still necessary? There is code elsewhere to remove and
// re-add the task to the tasks in progress list.
/**
* A helper method to safely add a file ingest task to the blocking pending
* tasks queue.
* Adds a file ingest task to the blocking pending tasks queue.
*
* @param task
* @throws IllegalStateException
* @param task The task to add.
*/
synchronized private void addToPendingFileTasksQueue(FileIngestTask task) throws IllegalStateException {
tasksInProgressAndPending.add(task);
synchronized private void addToPendingFileTasksQueue(FileIngestTask task) {
try {
// The file is added to the front of the pending files queue because
// at least one image has been processed that had a folder full of
// archive files. The queue grew to have thousands of entries, so
// this (might) help with pushing those files through ingest.
this.pendingFileTasks.addFirst(task);
} catch (IllegalStateException ex) {
tasksInProgressAndPending.remove(task);
Logger.getLogger(IngestTasksScheduler.class.getName()).log(Level.SEVERE, "Pending file tasks queue is full", ex); //NON-NLS
throw ex;
this.pendingFileTasks.putFirst(task);
} catch (InterruptedException ex) {
/**
* The current thread was interrupted while blocked on a full queue.
* Discard the task and reset the interrupted flag.
*/
this.tasksInProgress.remove(task);
Thread.currentThread().interrupt();
}
}
/**
* Determines whether or not all current ingest tasks for an ingest job are
* completed.
*
* @param job The job for which the query is to be performed.
* @return True or false.
*/
private boolean tasksForJobAreCompleted(IngestJob job) {
for (IngestTask task : tasksInProgressAndPending) {
if (task.getIngestJob().getId() == job.getId()) {
return false;
}
}
return true;
}
/**
* A helper that removes all of the ingest tasks associated with an ingest
* job from a tasks queue. The task is removed from the the tasks in
* progress list as well.
* Removes all of the ingest tasks associated with an ingest job from a
* tasks queue. The task is removed from the the tasks in progress list as
* well.
*
* @param taskQueue The queue from which to remove the tasks.
* @param jobId The id of the job for which the tasks are to be removed.
@ -505,15 +466,14 @@ final class IngestTasksScheduler {
while (iterator.hasNext()) {
IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) {
this.tasksInProgressAndPending.remove(task);
this.tasksInProgress.remove(task);
iterator.remove();
}
}
}
/**
* A helper that counts the number of ingest tasks in a task queue for a
* given job.
* Counts the number of ingest tasks in a task queue for a given job.
*
* @param queue The queue for which to count tasks.
* @param jobId The id of the job for which the tasks are to be counted.
@ -532,10 +492,11 @@ final class IngestTasksScheduler {
}
/**
* RJCTODO
*
* @param jobId
* @return
* Returns a snapshot of the states of the tasks in progress for an ingest
* job.
*
* @param jobId The identifier assigned to the job.
* @return
*/
synchronized IngestJobTasksSnapshot getTasksSnapshotForJob(long jobId) {
return new IngestJobTasksSnapshot(jobId);
@ -684,9 +645,10 @@ final class IngestTasksScheduler {
}
/**
* A snapshot of ingest tasks data for an ingest job.
* A snapshot of ingest tasks data for an ingest job.
*/
class IngestJobTasksSnapshot {
private final long jobId;
private final long rootQueueSize;
private final long dirQueueSize;
@ -695,8 +657,9 @@ final class IngestTasksScheduler {
private final long runningListSize;
/**
* RJCTODO
* @param jobId
* Constructs a snapshot of ingest tasks data for an ingest job.
*
* @param jobId The identifier associated with the job.
*/
IngestJobTasksSnapshot(long jobId) {
this.jobId = jobId;
@ -704,56 +667,51 @@ final class IngestTasksScheduler {
this.dirQueueSize = countTasksForJob(IngestTasksScheduler.this.directoryTasks, jobId);
this.fileQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingFileTasks, jobId);
this.dsQueueSize = countTasksForJob(IngestTasksScheduler.this.pendingDataSourceTasks, jobId);
this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgressAndPending, jobId) - fileQueueSize - dsQueueSize;
this.runningListSize = countTasksForJob(IngestTasksScheduler.this.tasksInProgress, jobId) - fileQueueSize - dsQueueSize;
}
/**
* RJCTODO
* @return
* Gets the identifier associated with the ingest job for which this
* snapshot was created.
*
* @return The ingest job identifier.
*/
long getJobId() {
return jobId;
}
/**
* RJCTODO
* @return
* Gets the number of file ingest tasks associated with the job that are
* in the root directories queue.
*
* @return The tasks count.
*/
long getRootQueueSize() {
return rootQueueSize;
}
/**
* RJCTODO
* @return
* Gets the number of file ingest tasks associated with the job that are
* in the root directories queue.
*
* @return The tasks count.
*/
long getDirQueueSize() {
long getDirectoryTasksQueueSize() {
return dirQueueSize;
}
/**
* RJCTODO
* @return
*/
long getFileQueueSize() {
return fileQueueSize;
}
/**
* RJCTODO
* @return
*/
long getDsQueueSize() {
return dsQueueSize;
}
/**
* RJCTODO
* @return
*/
long getRunningListSize() {
return runningListSize;
}
}
}
}

View File

@ -16,6 +16,7 @@ Contains only the core ingest modules that ship with Autopsy -->
<MODULE>org.sleuthkit.autopsy.thunderbirdparser.EmailParserModuleFactory</MODULE>
<MODULE>org.sleuthkit.autopsy.modules.fileextmismatch.FileExtMismatchDetectorModuleFactory</MODULE>
<MODULE>org.sleuthkit.autopsy.modules.interestingitems.InterestingItemsIngestModuleFactory</MODULE>
<MODULE>org.sleuthkit.autopsy.modules.photoreccarver.PhotoRecCarverIngestModuleFactory</MODULE>
</PIPELINE>
<PIPELINE type="ImageAnalysisStageTwo">

View File

@ -25,7 +25,6 @@ import org.sleuthkit.autopsy.ingest.FileIngestModule;
import org.sleuthkit.autopsy.ingest.IngestModuleFactory;
import org.sleuthkit.autopsy.ingest.IngestModuleFactoryAdapter;
import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettings;
import org.sleuthkit.autopsy.ingest.IngestModuleIngestJobSettingsPanel;
/**
* A factory for creating instances of file ingest modules that carve unallocated space