refactored tasks and pipeline context data model to make it more clear and inline with original goals

This commit is contained in:
Brian Carrier 2014-01-30 23:20:59 -05:00
parent 8a4a8003ca
commit 80e66d2b78
5 changed files with 134 additions and 135 deletions

View File

@ -23,29 +23,45 @@ import java.util.List;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
/** /**
* A task that will be scheduled. Contains the top-level data to analyze and the pipeline. * Represents a data source-level task to schedule and analyze.
* Children of the data will also be scheduled. * Children of the data will also be scheduled.
* *
* @param T type of Ingest Module / Pipeline (file or data source content) associated with this task * @param T type of Ingest Module / Pipeline (file or data source content) associated with this task
*/ */
class ScheduledTask<T extends IngestModuleAbstract> { class DataSourceTask<T extends IngestModuleAbstract> {
private Content input; private Content input;
private List<T> modules; private List<T> modules;
private boolean processUnallocated;
private PipelineContext<T> pipelineContext;
public ScheduledTask(Content input, List<T> modules) { public DataSourceTask(Content input, List<T> modules, boolean processUnallocated) {
this.input = input; this.input = input;
this.modules = modules; this.modules = modules;
this.processUnallocated = processUnallocated;
pipelineContext = new PipelineContext(this);
} }
public Content getContent() { public Content getContent() {
return input; return input;
} }
public PipelineContext<T> getPipelineContext() {
return pipelineContext;
}
public List<T> getModules() { public List<T> getModules() {
return modules; return modules;
} }
/**
* Returns value of if unallocated space should be analyzed (and scheduled)
* @return True if pipeline should process unallocated space.
*/
boolean isProcessUnalloc() {
return processUnallocated;
}
// @@@ BC: I think this should go away.
void addModules(List<T> newModules) { void addModules(List<T> newModules) {
for (T newModule : newModules) { for (T newModule : newModules) {
if (!modules.contains(newModule)) { if (!modules.contains(newModule)) {
@ -75,7 +91,7 @@ class ScheduledTask<T extends IngestModuleAbstract> {
if (getClass() != obj.getClass()) { if (getClass() != obj.getClass()) {
return false; return false;
} }
final ScheduledTask<T> other = (ScheduledTask<T>) obj; final DataSourceTask<T> other = (DataSourceTask<T>) obj;
if (this.input != other.input && (this.input == null || !this.input.equals(other.input))) { if (this.input != other.input && (this.input == null || !this.input.equals(other.input))) {
return false; return false;
} }

View File

@ -68,7 +68,7 @@ import org.sleuthkit.datamodel.Content;
} }
Content getContent() { Content getContent() {
return pipelineContext.getScheduledTask().getContent(); return pipelineContext.getDataSourceTask().getContent();
} }
IngestModuleDataSource getModule() { IngestModuleDataSource getModule() {

View File

@ -41,7 +41,7 @@ import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
import org.sleuthkit.autopsy.coreutils.PlatformUtil; import org.sleuthkit.autopsy.coreutils.PlatformUtil;
import org.sleuthkit.autopsy.coreutils.StopWatch; import org.sleuthkit.autopsy.coreutils.StopWatch;
import org.sleuthkit.autopsy.ingest.IngestMessage.MessageType; import org.sleuthkit.autopsy.ingest.IngestMessage.MessageType;
import org.sleuthkit.autopsy.ingest.IngestScheduler.FileScheduler.ProcessTask; import org.sleuthkit.autopsy.ingest.IngestScheduler.FileScheduler.FileTask;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
@ -369,10 +369,10 @@ public class IngestManager {
} }
//dequeue //dequeue
// get next data source content and set of modules // get next data source content and set of modules
final ScheduledTask<IngestModuleDataSource> dataSourceTask = dataSourceScheduler.next(); final DataSourceTask<IngestModuleDataSource> dataSourceTask = dataSourceScheduler.next();
// check if each module for this data source content is already running // check if each module for this data source content is already running
for (IngestModuleDataSource taskModule : dataSourceTask.getModules()) { for (IngestModuleDataSource dataSourceTaskModule : dataSourceTask.getModules()) {
boolean alreadyRunning = false; boolean alreadyRunning = false;
for (IngestDataSourceThread worker : dataSourceIngesters) { for (IngestDataSourceThread worker : dataSourceIngesters) {
// ignore threads that are on different data sources // ignore threads that are on different data sources
@ -380,31 +380,32 @@ public class IngestManager {
continue; //check next worker continue; //check next worker
} }
//same data source, check module (by name, not id, since different instances) //same data source, check module (by name, not id, since different instances)
if (worker.getModule().getName().equals(taskModule.getName())) { if (worker.getModule().getName().equals(dataSourceTaskModule.getName())) {
alreadyRunning = true; alreadyRunning = true;
logger.log(Level.INFO, "Data Source Ingester <" + dataSourceTask.getContent() logger.log(Level.INFO, "Data Source Ingester <" + dataSourceTask.getContent()
+ ", " + taskModule.getName() + "> is already running"); + ", " + dataSourceTaskModule.getName() + "> is already running");
break; break;
} }
} }
//checked all workers //checked all workers
if (alreadyRunning == false) { if (alreadyRunning == false) {
logger.log(Level.INFO, "Starting new data source Ingester <" + dataSourceTask.getContent() logger.log(Level.INFO, "Starting new data source Ingester <" + dataSourceTask.getContent()
+ ", " + taskModule.getName() + ">"); + ", " + dataSourceTaskModule.getName() + ">");
//data source modules are now initialized per instance //data source modules are now initialized per instance
IngestModuleInit moduleInit = new IngestModuleInit(); IngestModuleInit moduleInit = new IngestModuleInit();
PipelineContext<IngestModuleDataSource> dataSourcepipelineContext = PipelineContext<IngestModuleDataSource> dataSourcepipelineContext =
new PipelineContext<IngestModuleDataSource>(dataSourceTask, getProcessUnallocSpace()); dataSourceTask.getPipelineContext();
final IngestDataSourceThread newDataSourceWorker = new IngestDataSourceThread(this, final IngestDataSourceThread newDataSourceWorker = new IngestDataSourceThread(this,
dataSourcepipelineContext, dataSourceTask.getContent(), taskModule, moduleInit); dataSourcepipelineContext, dataSourceTask.getContent(), dataSourceTaskModule, moduleInit);
try { try {
newDataSourceWorker.init(); newDataSourceWorker.init();
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "DataSource ingest module failed init(): " + taskModule.getName(), e); logger.log(Level.SEVERE, "DataSource ingest module failed init(): " + dataSourceTaskModule.getName(), e);
allInited = false; allInited = false;
failedModule = taskModule; failedModule = dataSourceTaskModule;
errorMessage = e.getMessage(); errorMessage = e.getMessage();
break; break;
} }
@ -1012,12 +1013,14 @@ public class IngestManager {
int totalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst(); int totalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst();
progress.switchToDeterminate(totalEnqueuedFiles); progress.switchToDeterminate(totalEnqueuedFiles);
int processedFiles = 0; int processedFiles = 0;
//process AbstractFiles queue //process AbstractFiles queue
while (fileScheduler.hasNext()) { while (fileScheduler.hasNext()) {
final ProcessTask fileTask = fileScheduler.next(); final FileTask fileTask = fileScheduler.next();
final PipelineContext<IngestModuleAbstractFile> filepipelineContext = fileTask.context; final DataSourceTask<IngestModuleAbstractFile> dataSourceTask = fileTask.getDataSourceTask();
final ScheduledTask<IngestModuleAbstractFile> fileIngestTask = filepipelineContext.getScheduledTask(); final PipelineContext<IngestModuleAbstractFile> filepipelineContext = dataSourceTask.getPipelineContext();
final AbstractFile fileToProcess = fileTask.file;
final AbstractFile fileToProcess = fileTask.getFile();
//clear return values from modules for last file //clear return values from modules for last file
synchronized (abstractFileModulesRetValues) { synchronized (abstractFileModulesRetValues) {
@ -1026,7 +1029,7 @@ public class IngestManager {
//logger.log(Level.INFO, "IngestManager: Processing: {0}", fileToProcess.getName()); //logger.log(Level.INFO, "IngestManager: Processing: {0}", fileToProcess.getName());
for (IngestModuleAbstractFile module : fileIngestTask.getModules()) { for (IngestModuleAbstractFile module : dataSourceTask.getModules()) {
//process the file with every file module //process the file with every file module
if (isCancelled()) { if (isCancelled()) {
logger.log(Level.INFO, "Terminating file ingest due to cancellation."); logger.log(Level.INFO, "Terminating file ingest due to cancellation.");
@ -1265,29 +1268,25 @@ public class IngestManager {
/* Schedule the data source-level ingest modules for this data source */ /* Schedule the data source-level ingest modules for this data source */
final ScheduledTask<IngestModuleDataSource> dataSourceTask = final DataSourceTask<IngestModuleDataSource> dataSourceTask =
new ScheduledTask<IngestModuleDataSource>(input, dataSourceMods); new DataSourceTask<IngestModuleDataSource>(input, dataSourceMods, getProcessUnallocSpace());
final boolean processUnalloc = getProcessUnallocSpace();
final PipelineContext<IngestModuleDataSource> dataSourcePipelineContext =
new PipelineContext<IngestModuleDataSource>(dataSourceTask, processUnalloc);
logger.log(Level.INFO, "Queing data source ingest task: " + dataSourceTask); logger.log(Level.INFO, "Queing data source ingest task: " + dataSourceTask);
progress.progress("DataSource Ingest" + " " + inputName, processed); progress.progress("DataSource Ingest" + " " + inputName, processed);
final IngestScheduler.DataSourceScheduler dataSourceScheduler = scheduler.getDataSourceScheduler(); final IngestScheduler.DataSourceScheduler dataSourceScheduler = scheduler.getDataSourceScheduler();
dataSourceScheduler.schedule(dataSourcePipelineContext); dataSourceScheduler.schedule(dataSourceTask);
progress.progress("DataSource Ingest" + " " + inputName, ++processed); progress.progress("DataSource Ingest" + " " + inputName, ++processed);
/* Schedule the file-level ingest modules for the children of the data source */ /* Schedule the file-level ingest modules for the children of the data source */
final ScheduledTask<IngestModuleAbstractFile> fTask = final DataSourceTask<IngestModuleAbstractFile> fTask =
new ScheduledTask(input, fileMods); new DataSourceTask(input, fileMods, getProcessUnallocSpace());
final PipelineContext<IngestModuleAbstractFile> filepipelineContext
= new PipelineContext<IngestModuleAbstractFile>(fTask, processUnalloc);
logger.log(Level.INFO, "Queing file ingest task: " + fTask); logger.log(Level.INFO, "Queing file ingest task: " + fTask);
progress.progress("File Ingest" + " " + inputName, processed); progress.progress("File Ingest" + " " + inputName, processed);
final IngestScheduler.FileScheduler fileScheduler = scheduler.getFileScheduler(); final IngestScheduler.FileScheduler fileScheduler = scheduler.getFileScheduler();
fileScheduler.schedule(filepipelineContext); fileScheduler.schedule(fTask);
progress.progress("File Ingest" + " " + inputName, ++processed); progress.progress("File Ingest" + " " + inputName, ++processed);
} //for data sources } //for data sources

View File

@ -32,7 +32,7 @@ import java.util.logging.Logger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.ingest.IngestScheduler.FileScheduler.ProcessTask; import org.sleuthkit.autopsy.ingest.IngestScheduler.FileScheduler.FileTask;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.ContentVisitor; import org.sleuthkit.datamodel.ContentVisitor;
@ -97,18 +97,18 @@ class IngestScheduler {
* Enqueues files and modules, and sorts the files by priority. Maintains * Enqueues files and modules, and sorts the files by priority. Maintains
* only top level directories in memory (not all children files of the scheduled container content objects) * only top level directories in memory (not all children files of the scheduled container content objects)
* *
* getNext() will return next ProcessTask - tuple of (file, modules) * getNext() will return next FileTask - tuple of (file, modules)
* *
*/ */
static class FileScheduler implements Iterator<FileScheduler.ProcessTask> { static class FileScheduler implements Iterator<FileScheduler.FileTask> {
//root folders enqueued //root folders enqueued
private TreeSet<ProcessTask> rootProcessTasks; private TreeSet<FileTask> rootProcessTasks;
//stack of current dirs to be processed recursively //stack of current dirs to be processed recursively
private List<ProcessTask> curDirProcessTasks; private List<FileTask> curDirProcessTasks;
//list of files being processed in the currently processed directory //list of files being processed in the currently processed directory
private LinkedList<ProcessTask> curFileProcessTasks; //need to add to start and end quickly private LinkedList<FileTask> curFileProcessTasks; //need to add to start and end quickly
//estimated total files to be enqueued for currently scheduled content objects //estimated total files to be enqueued for currently scheduled content objects
private int filesEnqueuedEst; private int filesEnqueuedEst;
@ -135,15 +135,15 @@ class IngestScheduler {
public synchronized String toString() { public synchronized String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("\nRootDirs(sorted), size: ").append(rootProcessTasks.size()); sb.append("\nRootDirs(sorted), size: ").append(rootProcessTasks.size());
for (ProcessTask task : rootProcessTasks) { for (FileTask task : rootProcessTasks) {
sb.append(task.toString()).append(" "); sb.append(task.toString()).append(" ");
} }
sb.append("\nCurDirs(stack), size: ").append(curDirProcessTasks.size()); sb.append("\nCurDirs(stack), size: ").append(curDirProcessTasks.size());
for (ProcessTask task : curDirProcessTasks) { for (FileTask task : curDirProcessTasks) {
sb.append(task.toString()).append(" "); sb.append(task.toString()).append(" ");
} }
sb.append("\nCurFiles, size: ").append(curFileProcessTasks.size()); sb.append("\nCurFiles, size: ").append(curFileProcessTasks.size());
for (ProcessTask task : curFileProcessTasks) { for (FileTask task : curFileProcessTasks) {
sb.append(task.toString()).append(" "); sb.append(task.toString()).append(" ");
} }
return sb.toString(); return sb.toString();
@ -202,28 +202,35 @@ class IngestScheduler {
/** /**
* Task for a specific file to process. More specific than the * Task for a specific file to process. More specific than the
* higher-level ScheduledTask. * higher-level DataSourceTask.
*/ */
static class ProcessTask { static class FileTask {
private final AbstractFile file;
private final DataSourceTask dataSourceTask;
final AbstractFile file; public FileTask(AbstractFile file, DataSourceTask dataSourceTask) {
final PipelineContext<IngestModuleAbstractFile> context;
public ProcessTask(AbstractFile file, PipelineContext<IngestModuleAbstractFile> context) {
this.file = file; this.file = file;
this.context = context; this.dataSourceTask = dataSourceTask;
}
public DataSourceTask getDataSourceTask() {
return dataSourceTask;
}
public AbstractFile getFile() {
return file;
} }
@Override @Override
public String toString() { public String toString() {
try { try {
return "ProcessTask{" + "file=" + file.getId() + ": " return "ProcessTask{" + "file=" + file.getId() + ": "
+ file.getUniquePath() + "}"; // + ", scheduledTask=" + scheduledTask + '}'; + file.getUniquePath() + "}"; // + ", dataSourceTask=" + dataSourceTask + '}';
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Cound not get unique path of file in queue, ", ex); logger.log(Level.SEVERE, "Cound not get unique path of file in queue, ", ex);
} }
return "ProcessTask{" + "file=" + file.getId() + ": " return "ProcessTask{" + "file=" + file.getId() + ": "
+ file.getName() + ", context=" + context + '}'; + file.getName() + '}';
} }
/** /**
@ -242,12 +249,12 @@ class IngestScheduler {
if (getClass() != obj.getClass()) { if (getClass() != obj.getClass()) {
return false; return false;
} }
final ProcessTask other = (ProcessTask) obj; final FileTask other = (FileTask) obj;
if (this.file != other.file && (this.file == null || !this.file.equals(other.file))) { if (this.file != other.file && (this.file == null || !this.file.equals(other.file))) {
return false; return false;
} }
ScheduledTask<IngestModuleAbstractFile> thisTask = this.context.getScheduledTask(); DataSourceTask<IngestModuleAbstractFile> thisTask = this.getDataSourceTask();
ScheduledTask<IngestModuleAbstractFile> otherTask = other.context.getScheduledTask(); DataSourceTask<IngestModuleAbstractFile> otherTask = other.getDataSourceTask();
if (thisTask != otherTask if (thisTask != otherTask
&& (thisTask == null || !thisTask.equals(otherTask))) { && (thisTask == null || !thisTask.equals(otherTask))) {
@ -256,12 +263,6 @@ class IngestScheduler {
return true; return true;
} }
//constructor that converts from enqueued process task in dir stack
//to enqueued processtask in file queue
ProcessTask(ProcessTask orig, AbstractFile childFile) {
this.file = childFile;;
this.context = orig.context;
}
/** /**
* Create 1 or more ProcessTasks for each root dir in the Content from * Create 1 or more ProcessTasks for each root dir in the Content from
@ -270,8 +271,7 @@ class IngestScheduler {
* @param context the original ingest context * @param context the original ingest context
* @return * @return
*/ */
private static List<ProcessTask> createFromScheduledTask(PipelineContext<IngestModuleAbstractFile> context) { private static List<FileTask> createFromScheduledTask(DataSourceTask<IngestModuleAbstractFile> scheduledTask) {
ScheduledTask<IngestModuleAbstractFile> scheduledTask = context.getScheduledTask();
final Content scheduledContent = scheduledTask.getContent(); final Content scheduledContent = scheduledTask.getContent();
Collection<AbstractFile> rootObjects = scheduledContent.accept(new GetRootDirVisitor()); Collection<AbstractFile> rootObjects = scheduledContent.accept(new GetRootDirVisitor());
List<AbstractFile> firstLevelFiles = new ArrayList<>(); List<AbstractFile> firstLevelFiles = new ArrayList<>();
@ -303,9 +303,9 @@ class IngestScheduler {
} }
} }
List<ProcessTask> processTasks = new ArrayList<>(); List<FileTask> processTasks = new ArrayList<>();
for (AbstractFile firstLevelFile : firstLevelFiles) { for (AbstractFile firstLevelFile : firstLevelFiles) {
ProcessTask newTask = new ProcessTask(firstLevelFile, context); FileTask newTask = new FileTask(firstLevelFile, scheduledTask);
if (shouldEnqueueTask(newTask)) { if (shouldEnqueueTask(newTask)) {
processTasks.add(newTask); processTasks.add(newTask);
} }
@ -320,13 +320,13 @@ class IngestScheduler {
* *
* @param task tasks similar to this one should be removed * @param task tasks similar to this one should be removed
*/ */
private void removeDupTasks(ScheduledTask task) { private void removeDupTasks(DataSourceTask task) {
final Content inputContent = task.getContent(); final Content inputContent = task.getContent();
//remove from root queue //remove from root queue
List<ProcessTask> toRemove = new ArrayList<>(); List<FileTask> toRemove = new ArrayList<>();
for (ProcessTask pt : rootProcessTasks) { for (FileTask pt : rootProcessTasks) {
if (pt.context.getScheduledTask().getContent().equals(inputContent)) { if (pt.getDataSourceTask().getContent().equals(inputContent)) {
toRemove.add(pt); toRemove.add(pt);
} }
} }
@ -334,8 +334,8 @@ class IngestScheduler {
//remove from dir stack //remove from dir stack
toRemove = new ArrayList<>(); toRemove = new ArrayList<>();
for (ProcessTask pt : curDirProcessTasks) { for (FileTask pt : curDirProcessTasks) {
if (pt.context.getScheduledTask().getContent().equals(inputContent)) { if (pt.getDataSourceTask().getContent().equals(inputContent)) {
toRemove.add(pt); toRemove.add(pt);
} }
} }
@ -343,8 +343,8 @@ class IngestScheduler {
//remove from file queue //remove from file queue
toRemove = new ArrayList<>(); toRemove = new ArrayList<>();
for (ProcessTask pt : curFileProcessTasks) { for (FileTask pt : curFileProcessTasks) {
if (pt.context.getScheduledTask().getContent().equals(inputContent)) { if (pt.getDataSourceTask().getContent().equals(inputContent)) {
toRemove.add(pt); toRemove.add(pt);
} }
} }
@ -366,14 +366,14 @@ class IngestScheduler {
* to schedule the parent origin content, with the modules, settings, etc. * to schedule the parent origin content, with the modules, settings, etc.
*/ */
synchronized void schedule(AbstractFile file, PipelineContext originalContext) { synchronized void schedule(AbstractFile file, PipelineContext originalContext) {
ScheduledTask originalTask = originalContext.getScheduledTask(); DataSourceTask originalTask = originalContext.getDataSourceTask();
//skip if task contains no modules //skip if task contains no modules
if (originalTask.getModules().isEmpty()) { if (originalTask.getModules().isEmpty()) {
return; return;
} }
ProcessTask fileTask = new ProcessTask(file, originalContext); FileTask fileTask = new FileTask(file, originalContext.getDataSourceTask());
if (shouldEnqueueTask(fileTask)) { if (shouldEnqueueTask(fileTask)) {
this.curFileProcessTasks.addFirst(fileTask); this.curFileProcessTasks.addFirst(fileTask);
++filesEnqueuedEst; ++filesEnqueuedEst;
@ -388,9 +388,7 @@ class IngestScheduler {
* *
* @param context context to schedule, with scheduled task containing content to process and modules * @param context context to schedule, with scheduled task containing content to process and modules
*/ */
synchronized void schedule(PipelineContext<IngestModuleAbstractFile> context) { synchronized void schedule(DataSourceTask<IngestModuleAbstractFile> task) {
final ScheduledTask task = context.getScheduledTask();
//skip if task contains no modules //skip if task contains no modules
if (task.getModules().isEmpty()) { if (task.getModules().isEmpty()) {
@ -409,7 +407,7 @@ class IngestScheduler {
//remove duplicate scheduled tasks still in queues for this content if enqueued previously //remove duplicate scheduled tasks still in queues for this content if enqueued previously
removeDupTasks(task); removeDupTasks(task);
List<ProcessTask> rootTasks = ProcessTask.createFromScheduledTask(context); List<FileTask> rootTasks = FileTask.createFromScheduledTask(task);
//adds and resorts the tasks //adds and resorts the tasks
this.rootProcessTasks.addAll(rootTasks); this.rootProcessTasks.addAll(rootTasks);
@ -432,13 +430,13 @@ class IngestScheduler {
} }
@Override @Override
public synchronized ProcessTask next() { public synchronized FileTask next() {
if (!hasNext()) { if (!hasNext()) {
throw new IllegalStateException("No next ProcessTask, check hasNext() first!"); throw new IllegalStateException("No next ProcessTask, check hasNext() first!");
} }
//dequeue the last in the list //dequeue the last in the list
final ProcessTask task = curFileProcessTasks.pollLast(); final FileTask task = curFileProcessTasks.pollLast();
filesDequeued++; filesDequeued++;
updateQueues(); updateQueues();
@ -465,13 +463,13 @@ class IngestScheduler {
if (rootProcessTasks.isEmpty()) { if (rootProcessTasks.isEmpty()) {
return; return;
} }
ProcessTask rootTask = this.rootProcessTasks.pollFirst(); FileTask rootTask = this.rootProcessTasks.pollFirst();
curDirProcessTasks.add(rootTask); curDirProcessTasks.add(rootTask);
} }
//pop and push AbstractFile directory children if any //pop and push AbstractFile directory children if any
//add the popped and its leaf children onto cur file list //add the popped and its leaf children onto cur file list
ProcessTask parentTask = curDirProcessTasks.remove(curDirProcessTasks.size() - 1); FileTask parentTask = curDirProcessTasks.remove(curDirProcessTasks.size() - 1);
final AbstractFile parentFile = parentTask.file; final AbstractFile parentFile = parentTask.file;
// add itself to the file list // add itself to the file list
@ -485,7 +483,7 @@ class IngestScheduler {
for (Content c : children) { for (Content c : children) {
if (c instanceof AbstractFile) { if (c instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) c; AbstractFile childFile = (AbstractFile) c;
ProcessTask childTask = new ProcessTask(parentTask, childFile); FileTask childTask = new FileTask(childFile, parentTask.getDataSourceTask());
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.curDirProcessTasks.add(childTask); this.curDirProcessTasks.add(childTask);
@ -519,14 +517,14 @@ class IngestScheduler {
synchronized List<Content> getSourceContent() { synchronized List<Content> getSourceContent() {
final Set<Content> contentSet = new HashSet<Content>(); final Set<Content> contentSet = new HashSet<Content>();
for (ProcessTask task : rootProcessTasks) { for (FileTask task : rootProcessTasks) {
contentSet.add(task.context.getScheduledTask().getContent()); contentSet.add(task.getDataSourceTask().getContent());
} }
for (ProcessTask task : curDirProcessTasks) { for (FileTask task : curDirProcessTasks) {
contentSet.add(task.context.getScheduledTask().getContent()); contentSet.add(task.getDataSourceTask().getContent());
} }
for (ProcessTask task : curFileProcessTasks) { for (FileTask task : curFileProcessTasks) {
contentSet.add(task.context.getScheduledTask().getContent()); contentSet.add(task.getDataSourceTask().getContent());
} }
return new ArrayList<Content>(contentSet); return new ArrayList<Content>(contentSet);
@ -538,24 +536,27 @@ class IngestScheduler {
* @return true if it is in the queue. * @return true if it is in the queue.
*/ */
synchronized boolean hasModuleEnqueued(IngestModuleAbstractFile module) { synchronized boolean hasModuleEnqueued(IngestModuleAbstractFile module) {
for (ProcessTask task : rootProcessTasks) { for (FileTask task : rootProcessTasks) {
for (IngestModuleAbstractFile m : task.context.getScheduledTask().getModules()) { List<IngestModuleAbstractFile> modules = task.getDataSourceTask().getModules();
for (IngestModuleAbstractFile m : modules) {
if (m.getName().equals(module.getName())) { if (m.getName().equals(module.getName())) {
return true; return true;
} }
} }
} }
for (ProcessTask task : curDirProcessTasks) { for (FileTask task : curDirProcessTasks) {
for (IngestModuleAbstractFile m : task.context.getScheduledTask().getModules()) { List<IngestModuleAbstractFile> modules = task.getDataSourceTask().getModules();
for (IngestModuleAbstractFile m : modules) {
if (m.getName().equals(module.getName())) { if (m.getName().equals(module.getName())) {
return true; return true;
} }
} }
} }
for (ProcessTask task : curFileProcessTasks) { for (FileTask task : curFileProcessTasks) {
for (IngestModuleAbstractFile m : task.context.getScheduledTask().getModules()) { List<IngestModuleAbstractFile> modules = task.getDataSourceTask().getModules();
for (IngestModuleAbstractFile m : modules) {
if (m.getName().equals(module.getName())) { if (m.getName().equals(module.getName())) {
return true; return true;
} }
@ -578,11 +579,11 @@ class IngestScheduler {
* skipped * skipped
* @return true if should be enqueued, false otherwise * @return true if should be enqueued, false otherwise
*/ */
private static boolean shouldEnqueueTask(final ProcessTask processTask) { private static boolean shouldEnqueueTask(final FileTask processTask) {
final AbstractFile aFile = processTask.file; final AbstractFile aFile = processTask.file;
//if it's unalloc file, skip if so scheduled //if it's unalloc file, skip if so scheduled
if (processTask.context.isProcessUnalloc() == false if (processTask.getDataSourceTask().isProcessUnalloc() == false
&& aFile.getType().equals(TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS //unalloc files && aFile.getType().equals(TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS //unalloc files
)) { )) {
return false; return false;
@ -639,10 +640,10 @@ class IngestScheduler {
/** /**
* Root dir sorter * Root dir sorter
*/ */
private static class RootTaskComparator implements Comparator<ProcessTask> { private static class RootTaskComparator implements Comparator<FileTask> {
@Override @Override
public int compare(ProcessTask q1, ProcessTask q2) { public int compare(FileTask q1, FileTask q2) {
AbstractFilePriotity.Priority p1 = AbstractFilePriotity.getPriority(q1.file); AbstractFilePriotity.Priority p1 = AbstractFilePriotity.getPriority(q1.file);
AbstractFilePriotity.Priority p2 = AbstractFilePriotity.getPriority(q2.file); AbstractFilePriotity.Priority p2 = AbstractFilePriotity.getPriority(q2.file);
if (p1 == p2) { if (p1 == p2) {
@ -882,17 +883,15 @@ class IngestScheduler {
/** /**
* DataSourceScheduler ingest scheduler * DataSourceScheduler ingest scheduler
*/ */
static class DataSourceScheduler implements Iterator<ScheduledTask<IngestModuleDataSource>> { static class DataSourceScheduler implements Iterator<DataSourceTask<IngestModuleDataSource>> {
private LinkedList<ScheduledTask<IngestModuleDataSource>> tasks; private LinkedList<DataSourceTask<IngestModuleDataSource>> tasks;
DataSourceScheduler() { DataSourceScheduler() {
tasks = new LinkedList<ScheduledTask<IngestModuleDataSource>>(); tasks = new LinkedList<DataSourceTask<IngestModuleDataSource>>();
} }
synchronized void schedule(PipelineContext<IngestModuleDataSource> context) { synchronized void schedule(DataSourceTask<IngestModuleDataSource> task) {
ScheduledTask<IngestModuleDataSource> task = context.getScheduledTask();
//skip if task contains no modules //skip if task contains no modules
if (task.getModules().isEmpty()) { if (task.getModules().isEmpty()) {
@ -910,14 +909,18 @@ class IngestScheduler {
return; return;
} }
ScheduledTask<IngestModuleDataSource> existTask = null; // see if we already have a task for this data source
for (ScheduledTask<IngestModuleDataSource> curTask : tasks) { DataSourceTask<IngestModuleDataSource> existTask = null;
for (DataSourceTask<IngestModuleDataSource> curTask : tasks) {
if (curTask.getContent().equals(task.getContent())) { if (curTask.getContent().equals(task.getContent())) {
existTask = curTask; existTask = curTask;
break; break;
} }
} }
// add these modules to the existing task for the data source
// @@@ BC: I'm not sure I like this and it will probably break a more formal pipeline structure
// @@@ TODO: Verify that if this is called mid-way during ingest that all of the already ingested files get scheduled with the new modules...
if (existTask != null) { if (existTask != null) {
//merge modules for the data source task //merge modules for the data source task
existTask.addModules(task.getModules()); existTask.addModules(task.getModules());
@ -928,12 +931,12 @@ class IngestScheduler {
} }
@Override @Override
public synchronized ScheduledTask<IngestModuleDataSource> next() throws IllegalStateException { public synchronized DataSourceTask<IngestModuleDataSource> next() throws IllegalStateException {
if (!hasNext()) { if (!hasNext()) {
throw new IllegalStateException("There is no data source tasks in the queue, check hasNext()"); throw new IllegalStateException("There is no data source tasks in the queue, check hasNext()");
} }
final ScheduledTask<IngestModuleDataSource> ret = tasks.pollFirst(); final DataSourceTask<IngestModuleDataSource> ret = tasks.pollFirst();
return ret; return ret;
} }
@ -944,7 +947,7 @@ class IngestScheduler {
*/ */
synchronized List<org.sleuthkit.datamodel.Content> getContents() { synchronized List<org.sleuthkit.datamodel.Content> getContents() {
List<org.sleuthkit.datamodel.Content> contents = new ArrayList<org.sleuthkit.datamodel.Content>(); List<org.sleuthkit.datamodel.Content> contents = new ArrayList<org.sleuthkit.datamodel.Content>();
for (ScheduledTask<IngestModuleDataSource> task : tasks) { for (DataSourceTask<IngestModuleDataSource> task : tasks) {
contents.add(task.getContent()); contents.add(task.getContent());
} }
return contents; return contents;
@ -972,7 +975,7 @@ class IngestScheduler {
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("DataSourceQueue, size: ").append(getCount()); sb.append("DataSourceQueue, size: ").append(getCount());
for (ScheduledTask<IngestModuleDataSource> task : tasks) { for (DataSourceTask<IngestModuleDataSource> task : tasks) {
sb.append(task.toString()).append(" "); sb.append(task.toString()).append(" ");
} }
return sb.toString(); return sb.toString();

View File

@ -20,7 +20,6 @@ package org.sleuthkit.autopsy.ingest;
import java.util.Objects; import java.util.Objects;
/** /**
* Stores information about a given pipeline, which is a series of modules. * Stores information about a given pipeline, which is a series of modules.
* This is passed into modules for their reference. * This is passed into modules for their reference.
@ -29,43 +28,30 @@ import java.util.Objects;
* *
*/ */
public class PipelineContext <T extends IngestModuleAbstract> { public class PipelineContext <T extends IngestModuleAbstract> {
private final ScheduledTask<T> task; private final DataSourceTask<T> task;
private final boolean processUnalloc;
PipelineContext(ScheduledTask<T> task, boolean processUnalloc) { PipelineContext(DataSourceTask<T> task) {
this.task = task; this.task = task;
this.processUnalloc = processUnalloc;
} }
/** /**
* Returns the currently scheduled task. * Returns the currently scheduled task.
* @return * @return
*/ */
ScheduledTask<T> getScheduledTask() { DataSourceTask<T> getDataSourceTask() {
return task; return task;
} }
/**
* Returns value of if unallocated space is going to be scheduled.
* @return True if pipeline is processing unallocated space.
*/
boolean isProcessUnalloc() {
return processUnalloc;
}
@Override @Override
public String toString() { public String toString() {
return "pipelineContext{" + "task=" + task + ", processUnalloc=" + processUnalloc + '}'; return "pipelineContext{" + "task=" + task + '}';
} }
@Override @Override
public int hashCode() { public int hashCode() {
int hash = 5; int hash = 5;
hash = 53 * hash + Objects.hashCode(this.task); hash = 53 * hash + Objects.hashCode(this.task);
hash = 53 * hash + (this.processUnalloc ? 1 : 0);
return hash; return hash;
} }
@ -82,12 +68,7 @@ public class PipelineContext <T extends IngestModuleAbstract> {
if (!Objects.equals(this.task, other.task)) { if (!Objects.equals(this.task, other.task)) {
return false; return false;
} }
if (this.processUnalloc != other.processUnalloc) {
return false;
}
return true; return true;
} }
} }