Merge pull request #990 from rcordovano/ingest_api_for_automation

Restore batching of ingest jobs via IngestManager API
This commit is contained in:
Richard Cordovano 2014-12-09 00:00:44 -05:00
commit 35aded5ca5
5 changed files with 120 additions and 98 deletions

View File

@ -211,10 +211,7 @@ class AddImageWizardIngestConfigPanel implements WizardDescriptor.Panel<WizardDe
private void startIngest() { private void startIngest() {
if (!newContents.isEmpty() && readyToIngest && !ingested) { if (!newContents.isEmpty() && readyToIngest && !ingested) {
ingested = true; ingested = true;
IngestManager ingestManager = IngestManager.getInstance(); IngestManager.getInstance().startIngestJobs(newContents, ingestJobSettingsPanel.getSettings(), true);
for (Content content : newContents) {
ingestManager.startIngestJob(content, ingestJobSettingsPanel.getSettings(), true);
}
progressPanel.setStateFinished(); progressPanel.setStateFinished();
} }
} }

View File

@ -49,6 +49,7 @@ IngestMessageTopComponent.msgDlg.ingestRpt.text=Ingest Report
IngestMonitor.mgrErrMsg.lowDiskSpace.title=Ingest stopped - low disk space on {0} IngestMonitor.mgrErrMsg.lowDiskSpace.title=Ingest stopped - low disk space on {0}
IngestMonitor.mgrErrMsg.lowDiskSpace.msg=Stopping ingest due to low disk space on disk {0}. \nEnsure the Case drive has at least 1GB free space and restart ingest. IngestMonitor.mgrErrMsg.lowDiskSpace.msg=Stopping ingest due to low disk space on disk {0}. \nEnsure the Case drive has at least 1GB free space and restart ingest.
IngestManager.StartIngestJobsTask.run.displayName=Starting ingest IngestManager.StartIngestJobsTask.run.displayName=Starting ingest
IngestManager.StartIngestJobsTask.run.progressingDisplayName=Starting ingest of {0}
IngestManager.StartIngestJobsTask.run.cancelling={0} (Cancelling...) IngestManager.StartIngestJobsTask.run.cancelling={0} (Cancelling...)
IngestMessagePanel.sortByComboBox.model.time=Time IngestMessagePanel.sortByComboBox.model.time=Time
IngestMessagePanel.sortByComboBox.model.priority=Priority IngestMessagePanel.sortByComboBox.model.priority=Priority

View File

@ -38,8 +38,8 @@ public final class IngestJobConfigurator {
/** /**
* Constructs an ingest job launcher that creates and persists ingest job * Constructs an ingest job launcher that creates and persists ingest job
* settings for a particular context and launches ingest jobs that * settings for a particular context and launches ingest jobs that process
* process one or more data sources using the settings. * one or more data sources using the settings.
* *
* @param context The context identifier. * @param context The context identifier.
*/ */
@ -50,8 +50,8 @@ public final class IngestJobConfigurator {
} }
/** /**
* Gets any warnings generated when the persisted ingest job settings * Gets any warnings generated when the persisted ingest job settings for
* for the specified context are loaded or saved. * the specified context are loaded or saved.
* *
* @return A collection of warning messages, possibly empty. * @return A collection of warning messages, possibly empty.
*/ */
@ -87,9 +87,6 @@ public final class IngestJobConfigurator {
*/ */
@Deprecated @Deprecated
public void startIngestJobs(List<Content> dataSources) { public void startIngestJobs(List<Content> dataSources) {
IngestManager ingestManager = IngestManager.getInstance(); IngestManager.getInstance().startIngestJobs(dataSources, this.settings, true);
for (Content dataSource : dataSources) {
ingestManager.startIngestJob(dataSource, this.settings, true);
}
} }
} }

View File

@ -22,6 +22,7 @@ import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener; import java.beans.PropertyChangeListener;
import java.beans.PropertyChangeSupport; import java.beans.PropertyChangeSupport;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -46,8 +47,8 @@ import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
/** /**
* Manages the creation and execution of ingest jobs, i.e., processing of data * Manages the creation and execution of ingest jobs, i.e., the processing of
* sources by ingest modules. * data sources by ingest modules.
*/ */
public class IngestManager { public class IngestManager {
@ -65,7 +66,7 @@ public class IngestManager {
private final ExecutorService fileIngestThreadPool; private final ExecutorService fileIngestThreadPool;
private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor(); private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor();
private final AtomicLong nextThreadId = new AtomicLong(0L); private final AtomicLong nextThreadId = new AtomicLong(0L);
private final ConcurrentHashMap<Long, Future<Void>> startIngestJobsCallables = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. private final ConcurrentHashMap<Long, Future<Void>> ingestJobStarters = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles.
private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots.
private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
@ -129,9 +130,10 @@ public class IngestManager {
}; };
/** /**
* Gets the ingest manager. * Gets the manager of the creation and execution of ingest jobs, i.e., the
* processing of data sources by ingest modules.
* *
* @return A singleton IngestManager object. * @return A singleton ingest manager object.
*/ */
public synchronized static IngestManager getInstance() { public synchronized static IngestManager getInstance() {
if (instance == null) { if (instance == null) {
@ -156,15 +158,14 @@ public class IngestManager {
} }
/** /**
* Starts an ingest job, i.e., processing by ingest modules, for a data * Starts an ingest job, i.e., processing by ingest modules, for each data
* source. * source in a collection of data sources.
* *
* @param dataSource The data source to be processed. * @param dataSources The data sources to be processed.
* @param settings The ingest job settings. * @param settings The ingest job settings.
* @param doStartupErrorsMsgBox Whether or not to display ingest module * @param doMessageBoxes Whether or not to display message boxes for errors.
* startup errors in a message box.
*/ */
public synchronized void startIngestJob(Content dataSource, IngestJobSettings settings, boolean doStartupErrorsMsgBox) { public synchronized void startIngestJobs(Collection<Content> dataSources, IngestJobSettings settings, boolean doMessageBoxes) {
if (!isIngestRunning()) { if (!isIngestRunning()) {
clearIngestMessageBox(); clearIngestMessageBox();
} }
@ -174,12 +175,12 @@ public class IngestManager {
} }
long taskId = nextThreadId.incrementAndGet(); long taskId = nextThreadId.incrementAndGet();
Future<Void> task = startIngestJobsThreadPool.submit(new StartIngestJobsCallable(taskId, dataSource, settings, doStartupErrorsMsgBox)); Future<Void> task = startIngestJobsThreadPool.submit(new IngestJobsStarter(taskId, dataSources, settings, doMessageBoxes));
startIngestJobsCallables.put(taskId, task); ingestJobStarters.put(taskId, task);
} }
/** /**
* Queries whether any ingest jobs are in progress. * Queries whether or not any ingest jobs are in progress.
* *
* @return True or false. * @return True or false.
*/ */
@ -192,7 +193,7 @@ public class IngestManager {
*/ */
public void cancelAllIngestJobs() { public void cancelAllIngestJobs() {
// Stop creating new ingest jobs. // Stop creating new ingest jobs.
for (Future<Void> handle : startIngestJobsCallables.values()) { for (Future<Void> handle : ingestJobStarters.values()) {
handle.cancel(true); handle.cancel(true);
} }
@ -250,7 +251,7 @@ public class IngestManager {
} }
/** /**
* Remove an ingest job and ingest module event property change listener. * Removes an ingest job and ingest module event property change listener.
* *
* @param listener The PropertyChangeListener to unregister. * @param listener The PropertyChangeListener to unregister.
* @deprecated Use removeIngestJobEventListener() and/or * @deprecated Use removeIngestJobEventListener() and/or
@ -263,10 +264,11 @@ public class IngestManager {
} }
/** /**
* Starts the ingest monitor and submits task execution tasks (Callable * Constructs a manager of the creation and execution of ingest jobs, i.e.,
* objects) to the data source ingest and file ingest thread pools. The task * the processing of data sources by ingest modules. The manager immediately
* execution tasks are simple consumers that will normally run as long as * submits ingest task executers (Callable objects) to the data source level
* the application runs * ingest and file level ingest thread pools. The ingest task executers are
* simple consumers that will normally run as long as the application runs.
*/ */
private IngestManager() { private IngestManager() {
startDataSourceIngestThread(); startDataSourceIngestThread();
@ -292,22 +294,22 @@ public class IngestManager {
} }
/** /**
* Submits a ExecuteIngestTasksTask Callable to the data source ingest task * Submits an ingest task executer Callable to the data source level ingest
* thread pool. * thread pool.
*/ */
private void startDataSourceIngestThread() { private void startDataSourceIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
dataSourceIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); dataSourceIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
} }
/** /**
* Submits a ExecuteIngestTasksTask Callable to the data source ingest * Submits a ingest task executer Callable to the file level ingest thread
* thread pool. * pool.
*/ */
private void startFileIngestThread() { private void startFileIngestThread() {
long threadId = nextThreadId.incrementAndGet(); long threadId = nextThreadId.incrementAndGet();
fileIngestThreadPool.submit(new ExecuteIngestTasksRunnable(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); fileIngestThreadPool.submit(new IngestTaskExecuter(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
} }
@ -442,7 +444,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobStarted(long ingestJobId) { void fireIngestJobStarted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.STARTED, ingestJobId, null));
} }
/** /**
@ -451,7 +453,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobCompleted(long ingestJobId) { void fireIngestJobCompleted(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.COMPLETED, ingestJobId, null));
} }
/** /**
@ -460,7 +462,7 @@ public class IngestManager {
* @param ingestJobId The ingest job id. * @param ingestJobId The ingest job id.
*/ */
void fireIngestJobCancelled(long ingestJobId) { void fireIngestJobCancelled(long ingestJobId) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestJobEventPublisher, IngestJobEvent.CANCELLED, ingestJobId, null));
} }
/** /**
@ -469,7 +471,7 @@ public class IngestManager {
* @param file The file that is completed. * @param file The file that is completed.
*/ */
void fireFileIngestDone(AbstractFile file) { void fireFileIngestDone(AbstractFile file) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.FILE_DONE, file.getId(), file));
} }
/** /**
@ -478,7 +480,7 @@ public class IngestManager {
* @param moduleDataEvent A ModuleDataEvent with the details of the posting. * @param moduleDataEvent A ModuleDataEvent with the details of the posting.
*/ */
void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) { void fireIngestModuleDataEvent(ModuleDataEvent moduleDataEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.DATA_ADDED, moduleDataEvent, null));
} }
/** /**
@ -489,7 +491,7 @@ public class IngestManager {
* content. * content.
*/ */
void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) { void fireIngestModuleContentEvent(ModuleContentEvent moduleContentEvent) {
fireIngestEventsThreadPool.submit(new FireIngestEventRunnable(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null)); fireIngestEventsThreadPool.submit(new IngestEventPublisher(ingestModuleEventPublisher, IngestModuleEvent.CONTENT_CHANGED, moduleContentEvent, null));
} }
/** /**
@ -532,27 +534,31 @@ public class IngestManager {
/** /**
* Creates and starts an ingest job, i.e., processing by ingest modules, for * Creates and starts an ingest job, i.e., processing by ingest modules, for
* a data source. * each data source in a collection of data sources.
*/ */
private final class StartIngestJobsCallable implements Callable<Void> { private final class IngestJobsStarter implements Callable<Void> {
private final long threadId; private final long threadId;
private final Content dataSource; private final Collection<Content> dataSources;
private final IngestJobSettings settings; private final IngestJobSettings settings;
private final boolean doStartupErrorsMsgBox; private final boolean doStartupErrorsMsgBox;
private ProgressHandle progress; private ProgressHandle progress;
StartIngestJobsCallable(long threadId, Content dataSource, IngestJobSettings settings, boolean doStartupErrorsMsgBox) { IngestJobsStarter(long threadId, Collection<Content> dataSources, IngestJobSettings settings, boolean doMessageDialogs) {
this.threadId = threadId; this.threadId = threadId;
this.dataSource = dataSource; this.dataSources = dataSources;
this.settings = settings; this.settings = settings;
this.doStartupErrorsMsgBox = doStartupErrorsMsgBox; this.doStartupErrorsMsgBox = doMessageDialogs;
} }
@Override @Override
public Void call() { public Void call() {
try { try {
if (Thread.currentThread().isInterrupted()) { /**
* Bail out if there is nothing to do or cancellation has been
* requested.
*/
if (this.dataSources.isEmpty() || Thread.currentThread().isInterrupted()) {
return null; return null;
} }
@ -569,63 +575,89 @@ public class IngestManager {
"IngestManager.StartIngestJobsTask.run.cancelling", "IngestManager.StartIngestJobsTask.run.cancelling",
displayName)); displayName));
} }
Future<?> handle = startIngestJobsCallables.remove(threadId); Future<?> handle = ingestJobStarters.remove(threadId);
handle.cancel(true); handle.cancel(true);
return true; return true;
} }
}); });
progress.switchToIndeterminate(); progress.start(dataSources.size());
progress.start();
/** /**
* Create and start an ingest job for the data source. * Try to start the ingest jobs.
*/ */
List<IngestModuleError> errors = IngestJob.startJob(dataSource, this.settings); int workUnitsCompleted = 0;
if (!errors.isEmpty() && this.doStartupErrorsMsgBox) { for (Content dataSource : this.dataSources) {
// Report the errors to the user. They have already been logged.
StringBuilder moduleStartUpErrors = new StringBuilder();
for (IngestModuleError error : errors) {
String moduleName = error.getModuleDisplayName();
moduleStartUpErrors.append(moduleName);
moduleStartUpErrors.append(": ");
moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage());
moduleStartUpErrors.append("\n");
}
StringBuilder notifyMessage = new StringBuilder();
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg"));
notifyMessage.append("\n");
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution"));
notifyMessage.append("\n");
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList",
moduleStartUpErrors.toString()));
notifyMessage.append("\n\n");
JOptionPane.showMessageDialog(null, notifyMessage.toString(),
NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE);
}
} catch (Exception ex) {
logger.log(Level.SEVERE, "Failed to create ingest job", ex); //NON-NLS
} finally {
progress.finish();
startIngestJobsCallables.remove(threadId);
}
return null; /**
* Cancellation check.
*/
if (Thread.currentThread().isInterrupted()) {
return null;
}
/**
* Add a "subtitle" to the display name of the progress bar
* to indicate an ingest job is being started for this data
* source.
*/
String progressMessage = NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.progressingDisplayName",
dataSource.getName());
progress.progress(progressMessage);
/**
* Start an ingest job for this data source.
*/
List<IngestModuleError> errors = IngestJob.startJob(dataSource, this.settings);
if (!errors.isEmpty() && this.doStartupErrorsMsgBox) {
StringBuilder moduleStartUpErrors = new StringBuilder();
for (IngestModuleError error : errors) {
String moduleName = error.getModuleDisplayName();
moduleStartUpErrors.append(moduleName);
moduleStartUpErrors.append(": ");
moduleStartUpErrors.append(error.getModuleError().getLocalizedMessage());
moduleStartUpErrors.append("\n");
}
StringBuilder notifyMessage = new StringBuilder();
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgMsg"));
notifyMessage.append("\n");
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgSolution"));
notifyMessage.append("\n");
notifyMessage.append(NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgErrorList",
moduleStartUpErrors.toString()));
notifyMessage.append("\n\n");
JOptionPane.showMessageDialog(null, notifyMessage.toString(),
NbBundle.getMessage(this.getClass(),
"IngestManager.StartIngestJobsTask.run.startupErr.dlgTitle"), JOptionPane.ERROR_MESSAGE);
}
progress.progress(progressMessage, ++workUnitsCompleted);
}
return null;
} finally {
if (null != progress) {
progress.finish();
}
ingestJobStarters.remove(threadId);
}
} }
} }
/** /**
* A consumer for an ingest task queue. * A consumer for an ingest task queue.
*/ */
private final class ExecuteIngestTasksRunnable implements Runnable { private final class IngestTaskExecuter implements Runnable {
private final long threadId; private final long threadId;
private final IngestTaskQueue tasks; private final IngestTaskQueue tasks;
ExecuteIngestTasksRunnable(long threadId, IngestTaskQueue tasks) { IngestTaskExecuter(long threadId, IngestTaskQueue tasks) {
this.threadId = threadId; this.threadId = threadId;
this.tasks = tasks; this.tasks = tasks;
} }
@ -649,7 +681,7 @@ public class IngestManager {
/** /**
* Fires ingest events to ingest manager property change listeners. * Fires ingest events to ingest manager property change listeners.
*/ */
private static final class FireIngestEventRunnable implements Runnable { private static final class IngestEventPublisher implements Runnable {
private final PropertyChangeSupport publisher; private final PropertyChangeSupport publisher;
private final IngestJobEvent jobEvent; private final IngestJobEvent jobEvent;
@ -657,7 +689,7 @@ public class IngestManager {
private final Object oldValue; private final Object oldValue;
private final Object newValue; private final Object newValue;
FireIngestEventRunnable(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) { IngestEventPublisher(PropertyChangeSupport publisher, IngestJobEvent event, Object oldValue, Object newValue) {
this.publisher = publisher; this.publisher = publisher;
this.jobEvent = event; this.jobEvent = event;
this.moduleEvent = null; this.moduleEvent = null;
@ -665,7 +697,7 @@ public class IngestManager {
this.newValue = newValue; this.newValue = newValue;
} }
FireIngestEventRunnable(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) { IngestEventPublisher(PropertyChangeSupport publisher, IngestModuleEvent event, Object oldValue, Object newValue) {
this.publisher = publisher; this.publisher = publisher;
this.jobEvent = null; this.jobEvent = null;
this.moduleEvent = event; this.moduleEvent = event;

View File

@ -198,14 +198,9 @@ public final class RunIngestModulesDialog extends JDialog {
IngestJobSettings ingestJobSettings = this.ingestJobSettingsPanel.getSettings(); IngestJobSettings ingestJobSettings = this.ingestJobSettingsPanel.getSettings();
ingestJobSettings.save(); ingestJobSettings.save();
showWarnings(ingestJobSettings); showWarnings(ingestJobSettings);
if (startIngestJob) { if (startIngestJob) {
IngestManager ingestManager = IngestManager.getInstance(); IngestManager.getInstance().startIngestJobs(RunIngestModulesDialog.this.dataSources, ingestJobSettings, true);
for (Content dataSource : RunIngestModulesDialog.this.dataSources) {
ingestManager.startIngestJob(dataSource, ingestJobSettings, true);
}
} }
setVisible(false); setVisible(false);
dispose(); dispose();
} }