Continue reworking of ingest API

This commit is contained in:
Richard Cordovano 2014-12-16 23:19:19 -05:00
parent d385d9e085
commit c11b5a0fe0
10 changed files with 1055 additions and 1046 deletions

View File

@ -211,7 +211,7 @@ class AddImageWizardIngestConfigPanel implements WizardDescriptor.Panel<WizardDe
private void startIngest() {
if (!newContents.isEmpty() && readyToIngest && !ingested) {
ingested = true;
IngestManager.getInstance().startIngestJob(newContents, ingestJobSettingsPanel.getSettings(), true);
IngestManager.getInstance().queueIngestJob(newContents, ingestJobSettingsPanel.getSettings(), true);
progressPanel.setStateFinished();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -29,32 +29,27 @@ import org.sleuthkit.datamodel.Content;
* source ingest job. It starts the modules, runs data sources through them, and
* shuts them down when data source level ingest is complete.
* <p>
* This class is not thread-safe.
* This class is thread-safe.
*/
final class DataSourceIngestPipeline {
private static final IngestManager ingestManager = IngestManager.getInstance();
private final DataSourceIngestJob job;
private final List<PipelineModule> modules = new ArrayList<>();
private volatile PipelineModule currentModule;
private boolean running;
private volatile PipelineModule currentModule;
/**
* Constructs an object that manages a sequence of data source level ingest
* modules. It starts the modules, runs data sources through them, and shuts
* them down when data source level ingest is complete.
*
* @param job The data source ingest job to which this pipeline belongs.
* @param moduleTemplates The ingest module templates that define the
* pipeline.
* @param job The data source ingest job that owns this pipeline.
* @param moduleTemplates Templates for the creating the ingest modules that
* make up this pipeline.
*/
DataSourceIngestPipeline(DataSourceIngestJob job, List<IngestModuleTemplate> moduleTemplates) {
this.job = job;
/**
* Create a data source level ingest module instance from each ingest
* module template.
*/
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
PipelineModule module = new PipelineModule(template.createDataSourceIngestModule(), template.getModuleName());
@ -73,11 +68,11 @@ final class DataSourceIngestPipeline {
}
/**
* Starts up the ingest module in this pipeline.
* Starts up the ingest modules in this pipeline.
*
* @return A list of ingest module startup errors, possibly empty.
*/
List<IngestModuleError> startUp() {
synchronized List<IngestModuleError> startUp() {
if (this.running) {
throw new IllegalStateException("Attempt to start up a pipeline that is already running"); //NON-NLS
}
@ -100,7 +95,7 @@ final class DataSourceIngestPipeline {
* be processed.
* @return A list of processing errors, possible empty.
*/
List<IngestModuleError> process(DataSourceIngestTask task) {
synchronized List<IngestModuleError> process(DataSourceIngestTask task) {
if (!this.running) {
throw new IllegalStateException("Attempt to process with pipeline that is not running"); //NON-NLS
}
@ -109,14 +104,13 @@ final class DataSourceIngestPipeline {
Content dataSource = task.getDataSource();
for (PipelineModule module : modules) {
try {
module.setStartTime();
this.currentModule = module;
String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.displayName",
module.getDisplayName(), dataSource.getName());
this.job.updateDataSourceIngestProgressBarDisplayName(displayName);
this.job.switchDataSourceIngestProgressBarToIndeterminate();
ingestManager.setIngestTaskProgress(task, module.getDisplayName());
DataSourceIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName());
module.process(dataSource, new DataSourceIngestModuleProgress(this.job));
} catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex));
@ -135,7 +129,7 @@ final class DataSourceIngestPipeline {
/**
* Gets the currently running module.
*
* @return The module, possibly null.
* @return The module, possibly null if no module is currently running.
*/
PipelineModule getCurrentlyRunningModule() {
return this.currentModule;
@ -143,17 +137,17 @@ final class DataSourceIngestPipeline {
/**
* This class decorates a data source level ingest module with a display
* name and a start time.
* name and a processing start time.
*/
static class PipelineModule implements DataSourceIngestModule {
private final DataSourceIngestModule module;
private final String displayName;
private Date startTime;
private volatile Date processingStartTime;
/**
* Constructs an object that decorates a data source level ingest module
* with a display name and a running time.
* with a display name and a processing start time.
*
* @param module The data source level ingest module to be decorated.
* @param displayName The display name.
@ -161,7 +155,7 @@ final class DataSourceIngestPipeline {
PipelineModule(DataSourceIngestModule module, String displayName) {
this.module = module;
this.displayName = displayName;
this.startTime = new Date();
this.processingStartTime = new Date();
}
/**
@ -174,7 +168,7 @@ final class DataSourceIngestPipeline {
}
/**
* Gets a module name suitable for display in a UI.
* Gets the display of the decorated ingest module.
*
* @return The display name.
*/
@ -182,21 +176,15 @@ final class DataSourceIngestPipeline {
return this.displayName;
}
/**
* Sets the start time to the current time.
*/
void setStartTime() {
this.startTime = new Date();
}
/**
* Gets the time the decorated ingest module started processing the data
* source.
*
* @return The start time.
* @return The start time, will be null if the module has not started
* processing the data source yet.
*/
Date getStartTime() {
return this.startTime;
Date getProcessingStartTime() {
return this.processingStartTime;
}
/**
@ -212,7 +200,7 @@ final class DataSourceIngestPipeline {
*/
@Override
public IngestModule.ProcessResult process(Content dataSource, DataSourceIngestModuleProgress statusHelper) {
this.startTime = new Date();
this.processingStartTime = new Date();
return this.module.process(dataSource, statusHelper);
}

View File

@ -28,9 +28,7 @@ import org.sleuthkit.datamodel.AbstractFile;
* ingest job. It starts the modules, runs files through them, and shuts them
* down when file level ingest is complete.
* <p>
* This class is not thread-safe, it is intended to be used by one file ingest
* thread at at time. However, the running flag is volatile since it may be read
* by another thread looking for a progress snapshot.
* This class is thread-safe.
*/
final class FileIngestPipeline {
@ -51,11 +49,6 @@ final class FileIngestPipeline {
*/
FileIngestPipeline(DataSourceIngestJob job, List<IngestModuleTemplate> moduleTemplates) {
this.job = job;
/**
* Create an ingest module instance from each file ingest module
* template and add it to the pipeline.
*/
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) {
PipelineModule module = new PipelineModule(template.createFileIngestModule(), template.getModuleName());
@ -65,7 +58,7 @@ final class FileIngestPipeline {
}
/**
* Indicates whether or not there are any ingest modules in this pipeline.
* Queries whether or not there are any ingest modules in this pipeline.
*
* @return True or false.
*/
@ -74,7 +67,7 @@ final class FileIngestPipeline {
}
/**
* Queries whether or not this file ingest level pipeline is running.
* Queries whether or not this pipeline is running.
*
* @return True or false.
*/
@ -82,18 +75,28 @@ final class FileIngestPipeline {
return this.running;
}
/**
* Returns the start up time of this pipeline.
*
* @return The file processing start time, may be null if this pipeline has
* not been started yet.
*/
Date getStartTime() {
return this.startTime;
}
/**
* Starts up all of the ingest modules in the pipeline.
*
* @return List of start up errors, possibly empty.
*/
List<IngestModuleError> startUp() {
synchronized List<IngestModuleError> startUp() {
if (this.running) {
throw new IllegalStateException("Attempt to start up a pipeline that is already running"); //NON-NLS
}
this.running = true;
this.startTime = new Date();
List<IngestModuleError> errors = new ArrayList<>();
for (PipelineModule module : this.modules) {
try {
@ -105,22 +108,13 @@ final class FileIngestPipeline {
return errors;
}
/**
* Returns the start up time of the pipeline.
*
* @return The file processing start time, may be null.
*/
Date getStartTime() {
return this.startTime;
}
/**
* Runs a file through the ingest modules in sequential order.
*
* @param task A file level ingest task containing a file to be processed.
* @return A list of processing errors, possible empty.
*/
List<IngestModuleError> process(FileIngestTask task) {
synchronized List<IngestModuleError> process(FileIngestTask task) {
if (!this.running) {
throw new IllegalStateException("Attempt to process file with pipeline that is not running"); //NON-NLS
}
@ -151,7 +145,7 @@ final class FileIngestPipeline {
*
* @return A list of shut down errors, possibly empty.
*/
List<IngestModuleError> shutDown() {
synchronized List<IngestModuleError> shutDown() {
if (!this.running) {
throw new IllegalStateException("Attempt to shut down a pipeline that is not running"); //NON-NLS
}
@ -198,7 +192,7 @@ final class FileIngestPipeline {
}
/**
* Gets display name of the decorated ingest module.
* Gets the display name of the decorated ingest module.
*
* @return The display name.
*/

View File

@ -30,6 +30,8 @@ import org.sleuthkit.datamodel.Content;
/**
* Runs a collection of data sources through a set of ingest modules specified
* via ingest job settings.
* <p>
* This class is thread-safe.
*/
public final class IngestJob {
@ -44,13 +46,14 @@ public final class IngestJob {
*
* @param dataSources The data sources to be ingested.
* @param settings The ingest job settings.
* @param doUI Whether or not to do UI interactions.
* @param runInteractively Whether or not this job should use progress bars,
* message boxes for errors, etc.
*/
IngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean doUI) {
IngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean runInteractively) {
this.id = IngestJob.nextId.getAndIncrement();
this.dataSourceJobs = new HashMap<>();
for (Content dataSource : dataSources) {
DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, doUI);
DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, runInteractively);
this.dataSourceJobs.put(dataSourceIngestJob.getId(), dataSourceIngestJob);
}
}
@ -65,40 +68,73 @@ public final class IngestJob {
}
/**
* Gets a snapshot of the state and performance of this ingest job.
* Checks to see if this ingest job has at least one ingest pipeline when
* its settings are applied.
*
* @return True or false.
*/
boolean hasIngestPipeline() {
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
if (dataSourceJob.hasIngestPipeline()) {
return true;
}
}
return false;
}
/**
* Starts this ingest job by starting its ingest module pipelines and
* scheduling the ingest tasks that make up the job.
*
* @return A collection of ingest module start up errors, empty on success.
*/
synchronized List<IngestModuleError> start() {
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
errors.addAll(dataSourceJob.start());
if (!errors.isEmpty()) {
// RJCTODO: Need to let sucessfully started data source ingest
// jobs know they should shut down.
break;
}
}
return errors;
}
/**
* Gets a snapshot of the progress of this ingest job.
*
* @return The snapshot.
*/
synchronized public ProgressSnapshot getSnapshot() {
/**
* There are race conditions in the code that follows, but they are not
* important because this is just a coarse-grained status report. If
* stale data is returned in any single snapshot, it will be corrected
* in subsequent snapshots.
*/
DataSourceIngestModuleHandle moduleHandle = null;
boolean fileIngestRunning = false;
boolean fileIngestIsRunning = false;
Date fileIngestStartTime = null;
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
DataSourceIngestPipeline.PipelineModule module = dataSourceJob.getCurrentDataSourceIngestModule();
if (null != module) {
moduleHandle = new DataSourceIngestModuleHandle(dataSourceJob.getId(), module);
for (DataSourceIngestJob.Snapshot snapshot : this.getDataSourceIngestJobSnapshots()) {
if (null != moduleHandle) {
DataSourceIngestPipeline.PipelineModule module = snapshot.getDataSourceLevelIngestModule();
if (null != module) {
moduleHandle = new DataSourceIngestModuleHandle(this.dataSourceJobs.get(snapshot.getJobId()), module);
}
}
if (snapshot.fileIngestIsRunning()) {
fileIngestIsRunning = true;
}
Date childFileIngestStartTime = snapshot.fileIngestStartTime();
if (null != childFileIngestStartTime && (null == fileIngestStartTime || childFileIngestStartTime.before(fileIngestStartTime))) {
fileIngestStartTime = childFileIngestStartTime;
}
// RJCTODO: For each data source job, check for a running flag and
// get the oldest start data for the start dates, if any.
}
return new ProgressSnapshot(moduleHandle, fileIngestRunning, fileIngestStartTime, this.cancelled);
return new ProgressSnapshot(moduleHandle, fileIngestIsRunning, fileIngestStartTime, this.cancelled);
}
/**
* Gets snapshots of the state and performance of this ingest job's child
* data source ingest jobs.
* Gets snapshots of the progress of each of this ingest job's child data
* source ingest jobs.
*
* @return A list of data source ingest job progress snapshots.
*/
synchronized List<DataSourceIngestJob.Snapshot> getDetailedSnapshot() { // RJCTODO: Consider renaming
synchronized List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
snapshots.add(dataSourceJob.getSnapshot());
@ -107,21 +143,10 @@ public final class IngestJob {
}
/**
* Requests cancellation of a specific data source level ingest module.
* Returns immediately, but there may be a delay before the ingest module
* responds by stopping processing, if it is still running when the request
* is made.
*
* @param module The handle of the data source ingest module to be canceled,
* which can obtained from a progress snapshot.
*/
// RJCTODO
/**
* Requests cancellation of the data source level and file level ingest
* modules of this ingest job. Returns immediately, but there may be a delay
* before all of the ingest modules respond by stopping processing.
* Requests cancellation of this ingest job, which means discarding
* unfinished tasks and stopping the ingest pipelines. Returns immediately,
* but there may be a delay before all of the ingest modules in the
* pipelines respond by stopping processing.
*/
synchronized public void cancel() {
for (DataSourceIngestJob job : this.dataSourceJobs.values()) {
@ -131,8 +156,8 @@ public final class IngestJob {
}
/**
* Queries whether or not cancellation of the data source level and file
* level ingest modules of this ingest job has been requested.
* Queries whether or not cancellation of this ingest job has been
* requested.
*
* @return True or false.
*/
@ -141,7 +166,20 @@ public final class IngestJob {
}
/**
* A snapshot of ingest job progress.
* Provides a callback for completed data source ingest jobs, allowing this
* ingest job to notify the ingest manager when it is complete.
*
* @param dataSourceIngestJob A completed data source ingest job.
*/
synchronized void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) {
this.dataSourceJobs.remove(dataSourceIngestJob.getId());
if (this.dataSourceJobs.isEmpty()) {
IngestManager.getInstance().finishIngestJob(this);
}
}
/**
* A snapshot of the progress of an ingest job.
*/
public static final class ProgressSnapshot {
@ -154,11 +192,11 @@ public final class IngestJob {
* Constructs a snapshot of ingest job progress.
*
* @param dataSourceModule The currently running data source level
* ingest module, may be null
* ingest module, may be null.
* @param fileIngestRunning Whether or not file ingest is currently
* running.
* @param fileIngestStartTime The start time of file level ingest, may
* be null
* be null.
* @param cancelled Whether or not a cancellation request has been
* issued.
*/
@ -171,21 +209,17 @@ public final class IngestJob {
/**
* Gets a handle to the currently running data source level ingest
* module at the time the snapshot is taken.
* module at the time the snapshot was taken.
*
* @return The handle, may be null.
*/
public DataSourceIngestModuleHandle runningDataSourceIngestModule() {
/**
* It is safe to hand out this reference because the object is
* immutable.
*/
return this.dataSourceModule;
}
/**
* Queries whether or not file level ingest is running at the time the
* snapshot is taken.
* Queries whether or not file level ingest was running at the time the
* snapshot was taken.
*
* @return True or false.
*/
@ -203,7 +237,8 @@ public final class IngestJob {
}
/**
* Queries whether or not a cancellation request has been issued.
* Queries whether or not a cancellation request had been issued at the
* time the snapshot was taken.
*
* @return True or false.
*/
@ -215,21 +250,25 @@ public final class IngestJob {
/**
* A handle to a data source level ingest module that can be used to get
* basic information about the module and to request cancellation, i.e.,
* shut down, of the module.
* basic information about the module and to request cancellation of the
* module.
*/
public static class DataSourceIngestModuleHandle {
private final long dataSourceIngestJobId;
private final DataSourceIngestJob job;
private final DataSourceIngestPipeline.PipelineModule module;
/**
* Constructs a handle to a data source level ingest module that can be
* used to get basic information about the module and to request
* cancellation of the module.
*
* @param DataSourceIngestJob The data source ingest job that owns the
* data source level ingest module.
* @param module The data source level ingest module.
*/
private DataSourceIngestModuleHandle(long dataSourceIngestJobId, DataSourceIngestPipeline.PipelineModule module) {
this.dataSourceIngestJobId = dataSourceIngestJobId;
private DataSourceIngestModuleHandle(DataSourceIngestJob job, DataSourceIngestPipeline.PipelineModule module) {
this.job = job;
this.module = module;
}
@ -247,60 +286,34 @@ public final class IngestJob {
* Returns the time the data source level ingest module associated with
* this handle began processing.
*
* @return The module start time.
* @return The module processing start time.
*/
public Date startTime() {
return this.module.getStartTime();
return this.module.getProcessingStartTime();
}
/**
* Requests cancellation of the ingest module associated with this
* handle. Returns immediately, but there may be a delay before the
* ingest module responds by stopping processing.
*/
public void cancel() {
// RJCTODO:
}
}
/**
* Starts up the ingest pipelines and ingest progress bars for this job.
*
* @return A collection of ingest module start up errors, empty on success.
*/
List<IngestModuleError> start() {
boolean hasIngestPipeline = false;
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
errors.addAll(dataSourceJob.start());
hasIngestPipeline = dataSourceJob.hasIngestPipeline();
}
return errors;
}
/**
* Checks to see if this ingest job has at least one ingest pipeline.
*
* @return True or false.
*/
boolean hasIngestPipeline() {
boolean hasIngestPipeline = false;
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) {
if (dataSourceJob.hasIngestPipeline()) {
hasIngestPipeline = true;
break;
/**
* TODO: Cancellation needs to be more precise. The long-term
* solution is to add a cancel() method to IngestModule and do away
* with the cancellation queries of IngestJobContext. However, until
* an API change is legal, a cancel() method can be added to the
* DataSourceIngestModuleAdapter and FileIngestModuleAdapter classes
* and an instanceof check can be used to call it, with this code as
* the default implementation and the fallback. All of the ingest
* modules participating in this workaround will need to consult the
* cancelled flag in the adapters.
*/
if (this.job.getCurrentDataSourceIngestModule() == this.module) {
this.job.cancelCurrentDataSourceIngestModule();
}
}
return hasIngestPipeline;
}
/**
* Provides a callback for completed data source ingest jobs, allowing the
* ingest job to notify the ingest manager when it is complete.
*
* @param dataSourceIngestJob A completed data source ingest job.
*/
synchronized void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) {
this.dataSourceJobs.remove(dataSourceIngestJob.getId());
if (this.dataSourceJobs.isEmpty()) {
IngestManager.getInstance().finishJob(this);
}
}
}

View File

@ -87,6 +87,6 @@ public final class IngestJobConfigurator {
*/
@Deprecated
public void startIngestJobs(List<Content> dataSources) {
IngestManager.getInstance().startIngestJob(dataSources, this.settings, true);
IngestManager.getInstance().queueIngestJob(dataSources, this.settings, true);
}
}

View File

@ -56,15 +56,17 @@ public class IngestManager {
private static IngestManager instance = null;
/**
* The ingest manager maintains a mapping of ingest job IDs to ingest jobs.
* The ingest manager maintains a mapping of ingest job IDs to running
* ingest jobs. This, in combination with a mapping of thread IDs to
* Callable ingest job starters, determines whether or not ingest is
* running.
*/
private final ConcurrentHashMap<Long, IngestJob> jobsById = new ConcurrentHashMap<>();
private final HashMap<Long, IngestJob> jobsById = new HashMap<>();
/**
* Each runnable/callable task the ingest manager submits to its thread
* pools is given a unique thread/task ID.
*/
// TODO: It is no longer necessary to have multiple thread pools.
private final AtomicLong nextThreadId = new AtomicLong(0L);
/**
@ -73,7 +75,7 @@ public class IngestManager {
* ingest job starter is maintained to provide handles that can be used to
* cancel the ingest job starter.
*/
private final ConcurrentHashMap<Long, Future<Void>> ingestJobStarters = new ConcurrentHashMap<>();
private final HashMap<Long, Future<Void>> ingestJobStarters = new HashMap<>();
private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor();
/**
@ -83,11 +85,11 @@ public class IngestManager {
* ingest task executers. There is a single data source level ingest thread
* and a user configurable number of file level ingest threads.
*/
private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor();
private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1;
private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16;
private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2;
private int numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS;
private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor();
private int numberOfFileIngestThreads;
private final ExecutorService fileIngestThreadPool;
/**
@ -202,6 +204,31 @@ public class IngestManager {
return instance;
}
/**
* Constructs a manager of the creation and execution of ingest jobs, i.e.,
* the processing of data sources by ingest modules. The manager immediately
* submits ingest task executers (Callable objects) to the data source level
* ingest and file level ingest thread pools. The ingest task executers are
* simple consumers that will normally run as long as the application runs.
*/
private IngestManager() {
long threadId = nextThreadId.incrementAndGet();
dataSourceIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS;
UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads);
}
fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads);
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
threadId = nextThreadId.incrementAndGet();
fileIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
}
/**
* Gets the number of file ingest threads the ingest manager will use to do
* ingest jobs.
@ -212,17 +239,127 @@ public class IngestManager {
return numberOfFileIngestThreads;
}
private void subscribeToCaseEvents() {
Case.addPropertyChangeListener(new PropertyChangeListener() {
@Override
public void propertyChange(PropertyChangeEvent event) {
if (event.getPropertyName().equals(Case.Events.CURRENT_CASE.toString())) {
if (event.getNewValue() != null) {
handleCaseOpened();
} else {
handleCaseClosed();
}
}
}
});
}
void handleCaseOpened() {
this.jobCreationIsEnabled = true;
clearIngestMessageBox();
}
void handleCaseClosed() {
this.jobCreationIsEnabled = false;
cancelAllIngestJobs();
clearIngestMessageBox();
}
/**
* Called by the custom installer for this package once the window system is
* initialized, allowing the ingest manager to get the top component used to
* display ingest messages.
*/
void initIngestMessageInbox() {
ingestMessageBox = IngestMessageTopComponent.findInstance();
}
/**
* Post a message to the ingest messages in box.
*
* @param message The message to be posted.
*/
// RJCTODO: Can I cut this off effectively?
void postIngestMessage(IngestMessage message) {
if (ingestMessageBox != null) {
if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
ingestMessageBox.displayMessage(message);
} else {
long errorPosts = ingestErrorMessagePosts.incrementAndGet();
if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
ingestMessageBox.displayMessage(message);
} else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
}
}
}
}
private void clearIngestMessageBox() {
if (ingestMessageBox != null) {
ingestMessageBox.clearMessages();
}
ingestErrorMessagePosts.set(0);
}
/**
* Queues an ingest job that will process a collection of data sources. The
* job will be started on a worker thread.
*
* @param dataSources The data sources to process.
* @param settings The settings for the ingest job.
* @param runInteractively Whether or not this job should use progress bars,
* message boxes for errors, etc.
*/
public synchronized void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean runInteractively) {
if (this.jobCreationIsEnabled) {
IngestJob job = new IngestJob(dataSources, settings, runInteractively);
if (job.hasIngestPipeline()) {
long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new IngestJobStarter(taskId, job, runInteractively));
ingestJobStarters.put(taskId, task);
}
}
}
/**
* Starts an ingest job that will process a collection of data sources.
*
* @param dataSources The data sources to process.
* @param settings The settings for the ingest job.
* @param runInteractively Whether or not this job should use progress bars,
* message boxes for errors, etc.
* @return The ingest job that was started or null if the job could not be
* started.
*/
public synchronized IngestJob startIngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean runInteractively) {
IngestJob job = null;
if (this.jobCreationIsEnabled) {
job = new IngestJob(dataSources, settings, runInteractively);
if (job.hasIngestPipeline()) {
List<IngestModuleError> errors = this.startIngestJob(job, runInteractively);
if (!errors.isEmpty()) {
job = null;
}
}
}
return job;
}
/**
* Starts an ingest job for a collection of data sources.
*
* @param dataSources The data sources to be processed.
* @param settings The ingest job settings.
* @param doUI Whether or not to support user interaction, e.g., showing
* message boxes and reporting progress through the NetBeans Progress API.
* @return The ingest job that was started
* @param dataSource The data sources to ingest.
* @param settings The settings for the job.
* @param runInteractively Whether or not to interact with the UI
* @return A collection of ingest module start up errors, empty on success.
*/
public synchronized void startIngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean doUI) {
if (!isIngestRunning()) {
private List<IngestModuleError> startIngestJob(IngestJob job, boolean runInteractively) {
if (runInteractively && jobsById.isEmpty()) { // RJCTODO: This is sort of broken
clearIngestMessageBox();
}
@ -230,16 +367,25 @@ public class IngestManager {
ingestMonitor.start();
}
if (doUI) {
/**
* Assume request is from code running on the EDT and dispatch to a
* worker thread.
*/
long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new IngestJobStarter(taskId, dataSources, settings, doUI));
ingestJobStarters.put(taskId, task);
List<IngestModuleError> errors = job.start();
if (errors.isEmpty()) {
long jobId = job.getId();
this.jobsById.put(jobId, job);
this.fireIngestJobStarted(jobId);
IngestManager.logger.log(Level.INFO, "Ingest job {0} started", jobId);
}
return errors;
}
void finishIngestJob(IngestJob job) {
long jobId = job.getId();
this.jobsById.remove(jobId);
if (!job.isCancelled()) {
IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId);
this.fireIngestJobCompleted(jobId);
} else {
this.startJob(dataSources, settings, doUI);
IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId);
this.fireIngestJobCancelled(jobId);
}
}
@ -249,9 +395,7 @@ public class IngestManager {
* @return True or false.
*/
public boolean isIngestRunning() {
// RJCTODO: This may return the wrong answer if an IngestJobStarter has
// been dispatched to the startIngestJobsThreadPool.
return !this.jobsById.isEmpty();
return !this.jobsById.isEmpty() || !ingestJobStarters.values().isEmpty();
}
/**
@ -332,123 +476,59 @@ public class IngestManager {
}
/**
* Constructs a manager of the creation and execution of ingest jobs, i.e.,
* the processing of data sources by ingest modules. The manager immediately
* submits ingest task executers (Callable objects) to the data source level
* ingest and file level ingest thread pools. The ingest task executers are
* simple consumers that will normally run as long as the application runs.
*/
private IngestManager() {
startDataSourceIngestThread();
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
if ((numberOfFileIngestThreads < MIN_NUMBER_OF_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUMBER_OF_FILE_INGEST_THREADS)) {
numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS;
UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads);
}
fileIngestThreadPool = Executors.newFixedThreadPool(numberOfFileIngestThreads);
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
startFileIngestThread();
}
}
/**
* Called by the custom installer for this package once the window system is
* initialized, allowing the ingest manager to get the top component used to
* display ingest messages.
*/
void initIngestMessageInbox() {
ingestMessageBox = IngestMessageTopComponent.findInstance();
}
/**
* Submits an ingest task executer Callable to the data source level ingest
* thread pool.
*/
private void startDataSourceIngestThread() {
long threadId = nextThreadId.incrementAndGet();
dataSourceIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
/**
* Submits a ingest task executer Callable to the file level ingest thread
* pool.
*/
private void startFileIngestThread() {
long threadId = nextThreadId.incrementAndGet();
fileIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
private void subscribeToCaseEvents() {
Case.addPropertyChangeListener(new PropertyChangeListener() {
@Override
public void propertyChange(PropertyChangeEvent event) {
if (event.getPropertyName().equals(Case.Events.CURRENT_CASE.toString())) {
if (event.getNewValue() != null) {
handleCaseOpened();
} else {
handleCaseClosed();
}
}
}
});
}
void handleCaseOpened() {
this.jobCreationIsEnabled = true;
clearIngestMessageBox();
}
void handleCaseClosed() {
this.jobCreationIsEnabled = false;
cancelAllIngestJobs();
clearIngestMessageBox();
}
private void clearIngestMessageBox() {
if (ingestMessageBox != null) {
ingestMessageBox.clearMessages();
}
ingestErrorMessagePosts.set(0);
}
/**
* Starts an ingest job for a collection of data sources.
* Fire an ingest event signifying an ingest job started.
*
* @param dataSource The data sources to ingest.
* @param settings The settings for the job.
* @param doUI Whether or not to interact with the UI
* @return A collection of ingest module start up errors, empty on success.
* @param ingestJobId The ingest job id.
*/
private List<IngestModuleError> startJob(Collection<Content> dataSources, IngestJobSettings settings, boolean doUI) {
List<IngestModuleError> errors = new ArrayList<>();
if (this.jobCreationIsEnabled) {
IngestJob job = new IngestJob(dataSources, settings, doUI);
long jobId = job.getId();
this.jobsById.put(jobId, job);
errors = job.start();
if (errors.isEmpty() && job.hasIngestPipeline()) {
this.fireIngestJobStarted(jobId);
IngestManager.logger.log(Level.INFO, "Ingest job {0} started", jobId);
} else {
this.jobsById.remove(jobId);
}
}
return errors;
void fireIngestJobStarted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
}
void finishJob(IngestJob job) {
long jobId = job.getId();
this.jobsById.remove(jobId);
if (!job.isCancelled()) {
IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId);
this.fireIngestJobCompleted(jobId);
} else {
IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId);
this.fireIngestJobCancelled(jobId);
}
/**
* Fire an ingest event signifying an ingest job finished.
*
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCompleted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
}
/**
* Fire an ingest event signifying an ingest job was canceled.
*
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCancelled(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
}
/**
* Fire an ingest event signifying the ingest of a file is completed.
*
* @param file The file that is completed.
*/
void fireFileIngestDone(AbstractFile file) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
}
/**
* Fire an event signifying a blackboard post by an ingest module.
*
* @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
}
/**
* Fire an event signifying discovery of additional content by an ingest
* module.
*
* @param moduleDataEvent A ModuleContentEvent with the details of the new
* content.
*/
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
}
/**
@ -540,83 +620,16 @@ public class IngestManager {
}
/**
* Fire an ingest event signifying an ingest job started.
* Gets snapshots of the state of all running ingest jobs.
*
* @param ingestJobId The ingest job id.
* @return A list of ingest job state snapshots.
*/
void fireIngestJobStarted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
}
/**
* Fire an ingest event signifying an ingest job finished.
*
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCompleted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
}
/**
* Fire an ingest event signifying an ingest job was canceled.
*
* @param ingestJobId The ingest job id.
*/
void fireIngestJobCancelled(long ingestJobId) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
}
/**
* Fire an ingest event signifying the ingest of a file is completed.
*
* @param file The file that is completed.
*/
void fireFileIngestDone(AbstractFile file) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
}
/**
* Fire an event signifying a blackboard post by an ingest module.
*
* @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
}
/**
* Fire an event signifying discovery of additional content by an ingest
* module.
*
* @param moduleDataEvent A ModuleContentEvent with the details of the new
* content.
*/
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
}
/**
* Post a message to the ingest messages in box.
*
* @param message The message to be posted.
*/
void postIngestMessage(IngestMessage message) {
if (ingestMessageBox != null) {
if (message.getMessageType() != IngestMessage.MessageType.ERROR && message.getMessageType() != IngestMessage.MessageType.WARNING) {
ingestMessageBox.displayMessage(message);
} else {
long errorPosts = ingestErrorMessagePosts.incrementAndGet();
if (errorPosts <= MAX_ERROR_MESSAGE_POSTS) {
ingestMessageBox.displayMessage(message);
} else if (errorPosts == MAX_ERROR_MESSAGE_POSTS + 1) {
IngestMessage errorMessageLimitReachedMessage = IngestMessage.createErrorMessage(
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.title"),
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.subject"),
NbBundle.getMessage(this.getClass(), "IngestManager.IngestMessage.ErrorMessageLimitReached.msg", MAX_ERROR_MESSAGE_POSTS));
ingestMessageBox.displayMessage(errorMessageLimitReachedMessage);
}
}
List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
for (IngestJob job : this.jobsById.values()) {
snapShots.addAll(job.getDataSourceIngestJobSnapshots());
}
return snapShots;
}
/**
@ -633,73 +646,56 @@ public class IngestManager {
}
}
/**
* Gets snapshots of the state of all running ingest jobs.
*
* @return A list of ingest job state snapshots.
*/
List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>();
for (IngestJob job : this.jobsById.values()) {
snapShots.addAll(job.getDetailedSnapshot());
}
return snapShots;
}
/**
* Creates and starts an ingest job for a collection of data sources.
*/
private final class IngestJobStarter implements Callable<Void> {
private final long threadId;
private final Collection<Content> dataSources;
private final IngestJobSettings settings;
private final boolean doStartupErrorsMsgBox;
private final IngestJob job;
private final boolean runInteractively;
private ProgressHandle progress;
IngestJobStarter(long threadId, Collection<Content> dataSources, IngestJobSettings settings, boolean doMessageDialogs) {
IngestJobStarter(long threadId, IngestJob job, boolean runInteractively) {
this.threadId = threadId;
this.dataSources = dataSources;
this.settings = settings;
this.doStartupErrorsMsgBox = doMessageDialogs;
this.job = job;
this.runInteractively = runInteractively;
}
@Override
public Void call() {
try {
/**
* Bail out if there is nothing to do or cancellation has been
* requested.
*/
if (this.dataSources.isEmpty() || Thread.currentThread().isInterrupted()) {
if (Thread.currentThread().isInterrupted()) {
return null;
}
/**
* Set up a progress bar.
*/
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (progress != null) {
progress.setDisplayName(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling",
displayName));
if (runInteractively) {
final String displayName = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.displayName");
this.progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (progress != null) {
progress.setDisplayName(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.cancelling",
displayName));
}
Future<?> handle = ingestJobStarters.remove(threadId);
handle.cancel(true);
return true;
}
Future<?> handle = ingestJobStarters.remove(threadId);
handle.cancel(true);
return true;
}
});
progress.start();
});
progress.start();
}
/**
* Try to start the ingest job.
*/
List<IngestModuleError> errors = IngestManager.this.startJob(this.dataSources, this.settings, true);
if (!errors.isEmpty() && this.doStartupErrorsMsgBox) {
List<IngestModuleError> errors = IngestManager.this.startIngestJob(job, runInteractively);
if (!errors.isEmpty() && this.runInteractively) {
StringBuilder moduleStartUpErrors = new StringBuilder();
for (IngestModuleError error : errors) {
String moduleName = error.getModuleDisplayName();
@ -867,6 +863,7 @@ public class IngestManager {
String getFileName() {
return fileName;
}
}
}

View File

@ -81,4 +81,10 @@ public interface IngestModule {
* @throws org.sleuthkit.autopsy.ingest.IngestModule.IngestModuleException
*/
void startUp(IngestJobContext context) throws IngestModuleException;
/**
* TODO: The next time an API change is legal, add a cancel() method and
* remove the "ingest job is canceled" queries from the IngestJobContext
* class.
*/
}

View File

@ -202,7 +202,7 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel {
break;
case 2:
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
cellValue = dateFormat.format(new Date(snapShot.getStartTime()));
cellValue = dateFormat.format(new Date(snapShot.getJobStartTime()));
break;
case 3:
cellValue = snapShot.getFilesProcessed();

View File

@ -199,7 +199,7 @@ public final class RunIngestModulesDialog extends JDialog {
ingestJobSettings.save();
showWarnings(ingestJobSettings);
if (startIngestJob) {
IngestManager.getInstance().startIngestJob(RunIngestModulesDialog.this.dataSources, ingestJobSettings, true);
IngestManager.getInstance().queueIngestJob(RunIngestModulesDialog.this.dataSources, ingestJobSettings, true);
}
setVisible(false);
dispose();