Check in new public IngestJob and IngestManager API stubs

This commit is contained in:
Richard Cordovano 2014-12-15 16:09:55 -05:00
parent 50d92767fe
commit 2de22db810
5 changed files with 179 additions and 84 deletions

View File

@ -920,7 +920,7 @@ final class DataSourceIngestJob {
* *
* @return The currently running module, may be null. * @return The currently running module, may be null.
*/ */
DataSourceIngestPipeline.DataSourceIngestModuleDecorator getCurrentDataSourceIngestModule() { DataSourceIngestPipeline.PipelineModule getCurrentDataSourceIngestModule() {
if (null != this.currentDataSourceIngestPipeline) { if (null != this.currentDataSourceIngestPipeline) {
return this.currentDataSourceIngestPipeline.getCurrentlyRunningModule(); return this.currentDataSourceIngestPipeline.getCurrentlyRunningModule();
} else { } else {

View File

@ -28,13 +28,15 @@ import org.sleuthkit.datamodel.Content;
* This class manages a sequence of data source level ingest modules. It starts * This class manages a sequence of data source level ingest modules. It starts
* the modules, runs data sources through them, and shuts them down when data * the modules, runs data sources through them, and shuts them down when data
* source level ingest is complete. * source level ingest is complete.
* <p>
* This class is not thread-safe.
*/ */
final class DataSourceIngestPipeline { final class DataSourceIngestPipeline {
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
private final DataSourceIngestJob job; private final DataSourceIngestJob job;
private final List<DataSourceIngestModuleDecorator> modules = new ArrayList<>(); private final List<PipelineModule> modules = new ArrayList<>();
private volatile DataSourceIngestModuleDecorator currentModule; private volatile PipelineModule currentModule;
/** /**
* Constructs an object that manages a sequence of data source level ingest * Constructs an object that manages a sequence of data source level ingest
@ -54,7 +56,7 @@ final class DataSourceIngestPipeline {
*/ */
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) { if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName()); PipelineModule module = new PipelineModule(template.createDataSourceIngestModule(), template.getModuleName());
modules.add(module); modules.add(module);
} }
} }
@ -76,7 +78,7 @@ final class DataSourceIngestPipeline {
*/ */
List<IngestModuleError> startUp() { List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : modules) { for (PipelineModule module : modules) {
try { try {
module.startUp(new IngestJobContext(this.job)); module.startUp(new IngestJobContext(this.job));
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
@ -96,7 +98,7 @@ final class DataSourceIngestPipeline {
List<IngestModuleError> process(DataSourceIngestTask task) { List<IngestModuleError> process(DataSourceIngestTask task) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
Content dataSource = task.getDataSource(); Content dataSource = task.getDataSource();
for (DataSourceIngestModuleDecorator module : modules) { for (PipelineModule module : modules) {
try { try {
module.setStartTime(); module.setStartTime();
this.currentModule = module; this.currentModule = module;
@ -124,7 +126,7 @@ final class DataSourceIngestPipeline {
/** /**
* Gets the currently running module. * Gets the currently running module.
*/ */
DataSourceIngestModuleDecorator getCurrentlyRunningModule() { PipelineModule getCurrentlyRunningModule() {
return this.currentModule; return this.currentModule;
} }
@ -132,7 +134,7 @@ final class DataSourceIngestPipeline {
* This class decorates a data source level ingest module with a display * This class decorates a data source level ingest module with a display
* name and a start time. * name and a start time.
*/ */
static class DataSourceIngestModuleDecorator implements DataSourceIngestModule { static class PipelineModule implements DataSourceIngestModule {
private final DataSourceIngestModule module; private final DataSourceIngestModule module;
private final String displayName; private final String displayName;
@ -145,7 +147,7 @@ final class DataSourceIngestPipeline {
* @param module The data source level ingest module to be decorated. * @param module The data source level ingest module to be decorated.
* @param displayName The display name. * @param displayName The display name.
*/ */
DataSourceIngestModuleDecorator(DataSourceIngestModule module, String displayName) { PipelineModule(DataSourceIngestModule module, String displayName) {
this.module = module; this.module = module;
this.displayName = displayName; this.displayName = displayName;
this.startTime = new Date(); this.startTime = new Date();

View File

@ -27,13 +27,16 @@ import org.sleuthkit.datamodel.AbstractFile;
* This class manages a sequence of file level ingest modules. It starts the * This class manages a sequence of file level ingest modules. It starts the
* modules, runs files through them, and shuts them down when file level ingest * modules, runs files through them, and shuts them down when file level ingest
* is complete. * is complete.
* <p>
* This class is not thread-safe.
*/ */
final class FileIngestPipeline { final class FileIngestPipeline {
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
private final DataSourceIngestJob job; private final DataSourceIngestJob job;
private final List<FileIngestModuleDecorator> modules = new ArrayList<>(); private final List<PipelineModule> modules = new ArrayList<>();
private Date startTime; private Date startTime;
private boolean running;
/** /**
* Constructs an object that manages a sequence of file level ingest * Constructs an object that manages a sequence of file level ingest
@ -53,24 +56,15 @@ final class FileIngestPipeline {
*/ */
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) { if (template.isFileIngestModuleTemplate()) {
FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName()); PipelineModule module = new PipelineModule(template.createFileIngestModule(), template.getModuleName());
modules.add(module); modules.add(module);
} }
} }
} }
/** /**
* Returns the time when the pipeline began processing files. * Queries whether or not this pipeline has been configured with at least
* * one file level ingest module.
* @return The file processing start time, may be null.
*/
Date getProcessingStartTime() {
return this.startTime;
}
/**
* Queries whether or not the pipeline has been configured with at least one
* file level ingest module.
* *
* @return True or false. * @return True or false.
*/ */
@ -79,35 +73,54 @@ final class FileIngestPipeline {
} }
/** /**
* Start up all of the modules in the pipeline. * Starts up all of the modules in the pipeline.
* *
* @return List of errors or empty list if no errors * @return List of start up errors, possibly empty.
*/ */
List<IngestModuleError> startUp() { List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) { if (this.running) {
throw new IllegalStateException("Attempt to start up a pipeline that is already running"); //NON-NLS
}
for (PipelineModule module : this.modules) {
try { try {
module.startUp(new IngestJobContext(this.job)); module.startUp(new IngestJobContext(this.job));
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
} }
this.running = true;
return errors; return errors;
} }
/**
* Returns the start up time of the pipeline.
*
* @return The file processing start time, may be null.
*/
Date getStartTime() {
return this.startTime;
}
/** /**
* Runs a file through the ingest modules in sequential order. * Runs a file through the ingest modules in sequential order.
* *
* @param task A file level ingest task containing a file to be processed. * @param task A file level ingest task containing a file to be processed.
* @return A list of ingest module errors, possible empty. * @return A list of processing errors, possible empty.
*/ */
List<IngestModuleError> process(FileIngestTask task) { List<IngestModuleError> process(FileIngestTask task) {
if (!this.running) {
throw new IllegalStateException("Attempt to process a file with pipeline that is not running"); //NON-NLS
}
if (null == this.startTime) { if (null == this.startTime) {
this.startTime = new Date(); this.startTime = new Date();
} }
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
AbstractFile file = task.getFile(); AbstractFile file = task.getFile();
for (FileIngestModuleDecorator module : this.modules) { for (PipelineModule module : this.modules) {
try { try {
FileIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName()); FileIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName());
module.process(file); module.process(file);
@ -126,9 +139,18 @@ final class FileIngestPipeline {
return errors; return errors;
} }
/**
* Shuts down all of the modules in the pipeline.
*
* @return A list of shut down errors, possibly empty.
*/
List<IngestModuleError> shutDown() { List<IngestModuleError> shutDown() {
if (!this.running) {
throw new IllegalStateException("Attempt to shut down a pipeline that is not running"); //NON-NLS
}
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) { for (PipelineModule module : this.modules) {
try { try {
module.shutDown(); module.shutDown();
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
@ -138,10 +160,19 @@ final class FileIngestPipeline {
return errors; return errors;
} }
/**
* Queries whether or not this file ingest level pipeline is running.
*
* @return True or false.
*/
boolean isRunning() {
return this.running;
}
/** /**
* This class decorates a file level ingest module with a display name. * This class decorates a file level ingest module with a display name.
*/ */
private static final class FileIngestModuleDecorator implements FileIngestModule { private static final class PipelineModule implements FileIngestModule {
private final FileIngestModule module; private final FileIngestModule module;
private final String displayName; private final String displayName;
@ -153,7 +184,7 @@ final class FileIngestPipeline {
* @param module The file level ingest module to be decorated. * @param module The file level ingest module to be decorated.
* @param displayName The display name. * @param displayName The display name.
*/ */
FileIngestModuleDecorator(FileIngestModule module, String displayName) { PipelineModule(FileIngestModule module, String displayName) {
this.module = module; this.module = module;
this.displayName = displayName; this.displayName = displayName;
} }
@ -168,7 +199,7 @@ final class FileIngestPipeline {
} }
/** /**
* Gets a module name suitable for display in a UI. * Gets display name of the decorated ingest module.
* *
* @return The display name. * @return The display name.
*/ */

View File

@ -68,32 +68,34 @@ public final class IngestJob {
* *
* @return The snapshot. * @return The snapshot.
*/ */
synchronized public Snapshot getSnapshot() { synchronized public ProgressSnapshot getSnapshot() {
// TODO: There are race conditions here that can be easily fixed by /**
// eliminating the child jobs per plan. * There are race conditions in the code that follows, but they are not
* important because this is just a coarse-grained status report. If
* stale data is returned in any single snapshot, it will be corrected
* in subsequent snapshots.
*/
DataSourceIngestModuleHandle moduleHandle = null; DataSourceIngestModuleHandle moduleHandle = null;
Date fileAnalysisStartTime = null; boolean fileIngestRunning = false;
Date fileIngestStartTime = null;
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
/** DataSourceIngestPipeline.PipelineModule module = dataSourceJob.getCurrentDataSourceIngestModule();
* Only one data source ingest module should be running at a time,
* so grab the first one. There is a race condition here.
*/
DataSourceIngestPipeline.DataSourceIngestModuleDecorator module = dataSourceJob.getCurrentDataSourceIngestModule();
if (null != module) { if (null != module) {
moduleHandle = new DataSourceIngestModuleHandle(dataSourceJob.getId(), module); moduleHandle = new DataSourceIngestModuleHandle(dataSourceJob.getId(), module);
} }
// RJCTODO: For each data source job, check for a running flag and
// get the oldest start data for the start dates, if any.
} }
return new Snapshot(moduleHandle, fileAnalysisStartTime, this.cancelled); return new ProgressSnapshot(moduleHandle, fileIngestRunning, fileIngestStartTime, this.cancelled);
} }
/** /**
* Gets snapshots of the state and performance of this ingest job's child * Gets snapshots of the state and performance of this ingest job's child
* data source ingest jobs. * data source ingest jobs.
* *
* @return A list of data source ingest job snapshots. * @return A list of data source ingest job progress snapshots.
*/ */
synchronized List<DataSourceIngestJob.Snapshot> getDetailedSnapshot() { synchronized List<DataSourceIngestJob.Snapshot> getDetailedSnapshot() {
List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>(); List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>();
@ -104,29 +106,34 @@ public final class IngestJob {
} }
/** /**
* Requests cancellation of a specific data source level ingest module.
* Returns immediately, but there may be a delay before the ingest module
* responds by stopping processing, if it is still running when the request
* is made.
* *
* @param module * @param module The handle of the data source ingest module to be canceled,
* which can obtained from a progress snapshot.
*/ */
synchronized public void cancelIngestModule(DataSourceIngestModuleHandle module) { synchronized public void cancelIngestModule(DataSourceIngestModuleHandle module) {
DataSourceIngestJob dataSourceJob = this.dataSourceJobs.get(module.dataSourceIngestJobId); DataSourceIngestJob dataSourceJob = this.dataSourceJobs.get(module.dataSourceIngestJobId);
// RJCTODO: check equality, etc. // RJCTODO: Pass through the cancellation request.
} }
/** /**
* Requests cancellation of this ingest job, i.e., a shutdown of its data * Requests cancellation of the data source level and file level ingest
* source level and file level ingest pipelines. * modules of this ingest job. Returns immediately, but there may be a delay
* before all of the ingest modules respond by stopping processing.
*/ */
synchronized public void cancel() { synchronized public void cancel() {
for (DataSourceIngestJob job : this.dataSourceJobs.values()) { for (DataSourceIngestJob job : this.dataSourceJobs.values()) {
job.cancel(); job.cancel();
} }
this.cancelled = true; // RJCTODO: make children push cancellation up this.cancelled = true;
} }
/** /**
* Queries whether or not cancellation of this ingest job, i.e., a shutdown * Queries whether or not cancellation of the data source level and file
* of its data source level and file level ingest pipelines, has been * level ingest modules of this ingest job has been requested.
* requested.
* *
* @return True or false. * @return True or false.
*/ */
@ -135,38 +142,72 @@ public final class IngestJob {
} }
/** /**
* A thread-safe snapshot of ingest job progress. * A snapshot of ingest job progress.
*/ */
public static final class Snapshot { public static final class ProgressSnapshot {
private final DataSourceIngestModuleHandle dataSourceModule; private final DataSourceIngestModuleHandle dataSourceModule;
private final Date fileAnalysisStartTime; private final boolean fileIngestRunning;
private final Date fileIngestStartTime;
private final boolean cancelled; private final boolean cancelled;
private Snapshot(DataSourceIngestModuleHandle dataSourceModule, Date fileAnalysisStartTime, boolean cancelled) { /**
* Constructs a snapshot of ingest job progress.
*
* @param dataSourceModule The currently running data source level
* ingest module, may be null
* @param fileIngestRunning Whether or not file ingest is currently
* running.
* @param fileIngestStartTime The start time of file level ingest, may
* be null
* @param cancelled Whether or not a cancellation request has been
* issued.
*/
private ProgressSnapshot(DataSourceIngestModuleHandle dataSourceModule, boolean fileIngestRunning, Date fileIngestStartTime, boolean cancelled) {
this.dataSourceModule = dataSourceModule; this.dataSourceModule = dataSourceModule;
this.fileAnalysisStartTime = fileAnalysisStartTime; this.fileIngestRunning = fileIngestRunning;
this.fileIngestStartTime = fileIngestStartTime;
this.cancelled = cancelled; this.cancelled = cancelled;
} }
/**
* Gets a handle to the currently running data source level ingest
* module at the time the snapshot is taken.
*
* @return The handle, may be null.
*/
public DataSourceIngestModuleHandle runningDataSourceIngestModule() { public DataSourceIngestModuleHandle runningDataSourceIngestModule() {
/** /**
* It is safe to hand out this reference because the object is * It is safe to hand out this reference because the object is
* immutable and the mutable * immutable.
* DataSourceIngestPipeline.DataSourceIngestModuleDecorator is
* hidden from public access.
*/ */
return this.dataSourceModule; return this.dataSourceModule;
} }
public boolean fileAnalysisIsRunning() { /**
return (null != this.fileAnalysisStartTime); * Queries whether or not file level ingest is running at the time the
* snapshot is taken.
*
* @return True or false.
*/
public boolean fileIngestIsRunning() {
return this.fileIngestRunning;
} }
public Date fileAnalysisStartTime() { /**
return new Date(this.fileAnalysisStartTime.getTime()); * Gets the time that file level ingest started.
*
* @return The start time, may be null.
*/
public Date fileIngestStartTime() {
return new Date(this.fileIngestStartTime.getTime());
} }
/**
* Queries whether or not a cancellation request has been issued.
*
* @return True or false.
*/
public boolean isCancelled() { public boolean isCancelled() {
return this.cancelled; return this.cancelled;
} }
@ -181,20 +222,36 @@ public final class IngestJob {
public static class DataSourceIngestModuleHandle { public static class DataSourceIngestModuleHandle {
private final long dataSourceIngestJobId; private final long dataSourceIngestJobId;
private final DataSourceIngestPipeline.DataSourceIngestModuleDecorator module; private final DataSourceIngestPipeline.PipelineModule module;
private DataSourceIngestModuleHandle(long dataSourceIngestJobId, DataSourceIngestPipeline.DataSourceIngestModuleDecorator module) { /**
* Constructs a handle to a data source level ingest module that can be
* used to get basic information about the module and to request
* cancellation of the module.
*/
private DataSourceIngestModuleHandle(long dataSourceIngestJobId, DataSourceIngestPipeline.PipelineModule module) {
this.dataSourceIngestJobId = dataSourceIngestJobId; this.dataSourceIngestJobId = dataSourceIngestJobId;
this.module = module; this.module = module;
} }
/**
* Gets the display name of the data source level ingest module
* associated with this handle.
*
* @return The display name.
*/
public String displayName() { public String displayName() {
return this.module.getDisplayName(); return this.module.getDisplayName();
} }
/**
* Returns the time the data source level ingest module associated with
* this handle began processing.
*
* @return The module start time.
*/
public Date startTime() { public Date startTime() {
// RJCTODO return this.module.getStartTime();
return new Date();
} }
} }

View File

@ -213,19 +213,15 @@ public class IngestManager {
} }
/** /**
* Starts an ingest job, i.e., processing by ingest modules, for each data * Starts an ingest job for a collection of data sources.
* source in a collection of data sources. Note that if the provide UI
* argument is set to true, it is assumed this method is being called on the
* EDT and a worker thread will be dispatched to start the job.
* *
* @param dataSources The data sources to be processed. * @param dataSources The data sources to be processed.
* @param settings The ingest job settings. * @param settings The ingest job settings.
* @param provideUI Whether or not to support user interaction, e.g., * @param doUI Whether or not to support user interaction, e.g., showing
* showing message boxes and reporting progress through the NetBeans * message boxes and reporting progress through the NetBeans Progress API.
* Progress API.
* @return The ingest job that was started * @return The ingest job that was started
*/ */
public synchronized void startIngestJobs(Collection<Content> dataSources, IngestJobSettings settings, boolean provideUI) { public synchronized void startIngestJobs(Collection<Content> dataSources, IngestJobSettings settings, boolean doUI) {
if (!isIngestRunning()) { if (!isIngestRunning()) {
clearIngestMessageBox(); clearIngestMessageBox();
} }
@ -234,9 +230,17 @@ public class IngestManager {
ingestMonitor.start(); ingestMonitor.start();
} }
long taskId = nextThreadId.incrementAndGet(); if (doUI) {
Future<Void> task = startIngestJobsThreadPool.submit(new IngestJobsStarter(taskId, dataSources, settings, provideUI)); /**
ingestJobStarters.put(taskId, task); * Assume request is from code running on the EDT and dispatch to a
* worker thread.
*/
long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new IngestJobStarter(taskId, dataSources, settings, doUI));
ingestJobStarters.put(taskId, task);
} else {
this.startJob(dataSources, settings);
}
} }
/** /**
@ -245,6 +249,8 @@ public class IngestManager {
* @return True or false. * @return True or false.
*/ */
public boolean isIngestRunning() { public boolean isIngestRunning() {
// RJCTODO: This may return the wrong answer if an IngestJobStarter has
// been dispatched to the startIngestJobsThreadPool.
return !this.jobsById.isEmpty(); return !this.jobsById.isEmpty();
} }
@ -640,10 +646,9 @@ public class IngestManager {
} }
/** /**
* Creates and starts an ingest job, i.e., processing by ingest modules, for * Creates and starts an ingest job for a collection of data sources.
* each data source in a collection of data sources.
*/ */
private final class IngestJobsStarter implements Callable<Void> { private final class IngestJobStarter implements Callable<Void> {
private final long threadId; private final long threadId;
private final Collection<Content> dataSources; private final Collection<Content> dataSources;
@ -651,7 +656,7 @@ public class IngestManager {
private final boolean doStartupErrorsMsgBox; private final boolean doStartupErrorsMsgBox;
private ProgressHandle progress; private ProgressHandle progress;
IngestJobsStarter(long threadId, Collection<Content> dataSources, IngestJobSettings settings, boolean doMessageDialogs) { IngestJobStarter(long threadId, Collection<Content> dataSources, IngestJobSettings settings, boolean doMessageDialogs) {
this.threadId = threadId; this.threadId = threadId;
this.dataSources = dataSources; this.dataSources = dataSources;
this.settings = settings; this.settings = settings;