Revise CollaborationMonitor

This commit is contained in:
Richard Cordovano 2015-06-05 13:49:34 -04:00
parent a16af5c208
commit 40520cf312
4 changed files with 192 additions and 128 deletions

View File

@ -47,96 +47,82 @@ import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent; import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
/** /**
* Listens to local events and tracks them as collaboration tasks that can be * A collaboration monitor listens to local events and represents them as
* broadcast to collaborating nodes, receives event messages from collaborating * collaboration tasks that are broadcast to collaborating nodes, informs the
* nodes and translates them into case activity indicators for the user, and * user of collaboration tasks on other nodes using progress bars, and monitors
* monitors the health of the services needed for collaboration so that users * the health of the key collaboration services.
* can be informed if those service become unavailable.
*/ */
// TODO: This class probably has too many responsibilities!
final class CollaborationMonitor { final class CollaborationMonitor {
private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events";
private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT";
private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString()})); private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString()}));
private static final String HEARTBEAT_THREAD_NAME = "collab-monitor-heartbeat-%d"; private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 1;
private static final long MINUTES_BETWEEN_HEARTBEATS = 1; private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d";
private static final String STALE_TASKS_DETECTION_THREAD_NAME = "collab-monitor-stale-remote-tasks-detector-%d"; private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
private static final long STALE_TASKS_DETECTION_INTERVAL_SECS = 60; private static final long MAX_MISSED_HEARTBEATS = 5;
private static final String CRASH_DETECTION_THREAD_NAME = "collab-monitor-crash-detector-%d"; private static final long STALE_TASKS_DETECTION_INTERVAL_MINUTES = 2;
private static final long CRASH_DETECTION_INTERVAL_SECS = 60; private static final long CRASH_DETECTION_INTERVAL_MINUTES = 5;
private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30; private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName()); private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
private final String hostName; private final String hostName;
private final LocalTasksManager localTasksManager; private final LocalTasksManager localTasksManager;
private final RemoteTasksManager remoteTasksManager; private final RemoteTasksManager remoteTasksManager;
private final AutopsyEventPublisher eventPublisher; private final AutopsyEventPublisher eventPublisher;
private final ScheduledThreadPoolExecutor heartbeatExecutor; private final ScheduledThreadPoolExecutor periodicTasksExecutor;
private final ScheduledThreadPoolExecutor staleTasksCheckExecutor;
private final ScheduledThreadPoolExecutor crashDetectionExecutor;
/** /**
* Constructs an object that listens to local events and tracks them as * Constructs a collaboration monitor that listens to local events and
* collaboration tasks that can be broadcast to collaborating nodes, * represents them as collaboration tasks that are broadcast to
* receives event messages from collaborating nodes and translates them into * collaborating nodes, informs the user of collaboration tasks on other
* case activity indicators for the user, and monitors the health of the * nodes using progress bars, and monitors the health of the key
* services needed for collaboration so that users can be informed if those * collaboration services.
* service become unavailable.
*/ */
CollaborationMonitor() throws CollaborationMonitorException { CollaborationMonitor() throws CollaborationMonitorException {
/**
* Get the local host name so it can be used to identify the source of
* collaboration tasks broadcast by this node.
*/
hostName = getHostName(); hostName = getHostName();
/** /**
* Create an Autopsy event publisher for the current case. This will be * Create an event publisher that will be used to communicate with
* used to communicate with collaboration monitors on other Autopsy * collaboration monitors on other nodes working on the case.
* nodes working on the case.
*/ */
eventPublisher = new AutopsyEventPublisher(); eventPublisher = new AutopsyEventPublisher();
try { try {
eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, Case.getCurrentCase().getName())); Case openedCase = Case.getCurrentCase();
String channelPrefix = openedCase.getTextIndexName();
eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, channelPrefix));
} catch (AutopsyEventException ex) { } catch (AutopsyEventException ex) {
throw new CollaborationMonitorException("Failed to initialize", ex); throw new CollaborationMonitorException("Failed to initialize", ex);
} }
/** /**
* Create a remote tasks manager to track and display the progress of * Create a remote tasks manager to track and display the progress of
* tasks other Autopsy nodes as reported by their collaboration * remote tasks.
* monitors.
*/ */
remoteTasksManager = new RemoteTasksManager(); remoteTasksManager = new RemoteTasksManager();
eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager); eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
/** /**
* Create a local tasks manager to keep track of and broadcast the * Create a local tasks manager to track and broadcast local tasks.
* events happening on this Autopsy node to the collaboration monitors
* on other nodes.
*/ */
localTasksManager = new LocalTasksManager(); localTasksManager = new LocalTasksManager();
IngestManager.getInstance().addIngestJobEventListener(localTasksManager); IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager); Case.addEventSubscriber(CASE_EVENTS_OF_INTEREST, localTasksManager);
// RJCTODO: Do we really need three ScheduledThreadPoolExecutors?
// Perhaps one is enough....
/** /**
* Start a periodic task on its own thread that will send heartbeat * Start periodic tasks that:
* messages to the collaboration monitors on other nodes. *
* 1. Send heartbeats to collaboration monitors on other nodes.<br>
* 2. Check for stale remote tasks.<br>
* 3. Check the availability of key collaboration services.<br>
*/ */
heartbeatExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(HEARTBEAT_THREAD_NAME).build()); periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), MINUTES_BETWEEN_HEARTBEATS, MINUTES_BETWEEN_HEARTBEATS, TimeUnit.MINUTES); periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_MINUTES, STALE_TASKS_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
/** periodicTasksExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_MINUTES, CRASH_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
* Start a periodic task on its own thread that will check for stale
* remote tasks.
*/
staleTasksCheckExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(STALE_TASKS_DETECTION_THREAD_NAME).build());
staleTasksCheckExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_SECS, STALE_TASKS_DETECTION_INTERVAL_SECS, TimeUnit.SECONDS);
/**
* Start a periodic task on its own thread that will report on the
* availability of key collaboration services.
*/
crashDetectionExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(CRASH_DETECTION_THREAD_NAME).build());
crashDetectionExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_SECS, CRASH_DETECTION_INTERVAL_SECS, TimeUnit.SECONDS);
} }
/** /**
@ -148,8 +134,7 @@ final class CollaborationMonitor {
String name; String name;
try { try {
name = java.net.InetAddress.getLocalHost().getHostName(); name = java.net.InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) { } catch (UnknownHostException notUsed) {
// RJCTODO: Log info message
name = System.getenv("COMPUTERNAME"); name = System.getenv("COMPUTERNAME");
} }
return name; return name;
@ -159,28 +144,23 @@ final class CollaborationMonitor {
* Shuts down this collaboration monitor. * Shuts down this collaboration monitor.
*/ */
void stop() { void stop() {
stopExecutor(crashDetectionExecutor, CRASH_DETECTION_THREAD_NAME); if (null != periodicTasksExecutor) {
stopExecutor(heartbeatExecutor, HEARTBEAT_THREAD_NAME); periodicTasksExecutor.shutdownNow();
// RJCTODO: Shut down other stuff try {
while (!periodicTasksExecutor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Waited at least {0} seconds for periodic tasks executor to shut down, continuing to wait", EXECUTOR_TERMINATION_WAIT_SECS); //NON-NLS
}
} catch (InterruptedException ex) {
logger.log(Level.SEVERE, "Unexpected interrupt while stopping periodic tasks executor", ex); //NON-NLS
}
}
if (null != eventPublisher) {
eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager); eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
eventPublisher.closeRemoteEventChannel(); eventPublisher.closeRemoteEventChannel();
} }
/** // RJCTODO: Shut down other stuff?
* Gracefully shuts down a scheduled thread pool executor.
*/
private static void stopExecutor(ScheduledThreadPoolExecutor executor, String name) {
if (null != executor) {
executor.shutdownNow();
try {
while (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Waited at least {0} seconds for {1} executor to shut down, continuing to wait", new Object[]{EXECUTOR_TERMINATION_WAIT_SECS, name}); //NON-NLS
}
} catch (InterruptedException ex) {
logger.log(Level.SEVERE, "Unexpected interrupt while stopping thread executor", ex); //NON-NLS
// RJCTODO: Reset?
}
}
} }
/** /**
@ -207,8 +187,8 @@ final class CollaborationMonitor {
} }
/** /**
* Updates the collection of tasks this node is performing in response * Translates events into updates of the collection of local tasks this
* to receiving an event in the form of a property change. * node is broadcasting to other nodes.
* *
* @param event A PropertyChangeEvent. * @param event A PropertyChangeEvent.
*/ */
@ -227,53 +207,56 @@ final class CollaborationMonitor {
} }
/** /**
* Adds a task for tracking adding the data source and publishes the * Adds an adding data source task to the collection of local tasks and
* updated local tasks list to any collaborating nodes. * publishes the updated collection to any collaborating nodes.
* *
* @param evt An adding data source event. * @param event An adding data source event.
*/ */
synchronized void addDataSourceAddTask(AddingDataSourceEvent event) { synchronized void addDataSourceAddTask(AddingDataSourceEvent event) {
String status = String.format("%s adding data source", hostName); // RJCTODO: Bundle String status = String.format("%s adding data source", hostName); // RJCTODO: Bundle
uuidsToAddDataSourceTasks.put(event.getDataSourceId().hashCode(), new Task(++nextTaskId, status)); uuidsToAddDataSourceTasks.put(event.getDataSourceId().hashCode(), new Task(++nextTaskId, status));
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
} }
/** /**
* Removes the task for adding the data source and publishes the updated * Removes an adding data source task from the collection of local tasks
* local tasks list to any collaborating nodes. * and publishes the updated collection to any collaborating nodes.
* *
* @param evt A data source added event * @param event A data source added event
*/ */
synchronized void removeDataSourceAddTask(DataSourceAddedEvent event) { synchronized void removeDataSourceAddTask(DataSourceAddedEvent event) {
uuidsToAddDataSourceTasks.remove(event.getDataSourceId().hashCode()); uuidsToAddDataSourceTasks.remove(event.getDataSourceId().hashCode());
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
} }
/** /**
* RJCTODO * Adds a data source analysis task to the collection of local tasks and
* publishes the updated collection to any collaborating nodes.
* *
* @param evt * @param event A data source analysis started event.
*/ */
synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) { synchronized void addDataSourceAnalysisTask(DataSourceAnalysisStartedEvent event) {
String status = String.format("%s analyzing %s", hostName, event.getDataSource().getName()); // RJCTODO: Bundle String status = String.format("%s analyzing %s", hostName, event.getDataSource().getName()); // RJCTODO: Bundle
jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status)); jobIdsTodataSourceAnalysisTasks.put(event.getDataSourceIngestJobId(), new Task(++nextTaskId, status));
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
} }
/** /**
* RJCTODO * Removes a data source analysis task from the collection of local
* tasks and publishes the updated collection to any collaborating
* nodes.
* *
* @param evt * @param event A data source analysis completed event.
*/ */
synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) { synchronized void removeDataSourceAnalysisTask(DataSourceAnalysisCompletedEvent event) {
jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId()); jobIdsTodataSourceAnalysisTasks.remove(event.getDataSourceIngestJobId());
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); eventPublisher.publishRemotely(new CollaborationEvent(hostName, getCurrentTasks()));
} }
/** /**
* Gets the current local tasks. * Gets the current local tasks.
* *
* @return A mapping of task IDs to tasks * @return A mapping of task IDs to tasks, may be empty.
*/ */
synchronized Map<Long, Task> getCurrentTasks() { synchronized Map<Long, Task> getCurrentTasks() {
Map<Long, Task> currentTasks = new HashMap<>(); Map<Long, Task> currentTasks = new HashMap<>();
@ -288,18 +271,31 @@ final class CollaborationMonitor {
} }
/** /**
* The remote tasks manager listens to events broadcast by collaboration * Listens for collaboration event messages broadcast by collaboration
* monitors on other nodes and translates them into remote tasks represented * monitors on other nodes and translates them into remote tasks represented
* using progress bars. * locally using progress bars. Note that all access to the remote tasks is
* synchronized since it may be accessed by a JMS thread running code in the
* Autopsy event publisher and by a thread running periodic checks for
* "stale" tasks.
*/ */
private final class RemoteTasksManager implements PropertyChangeListener { private final class RemoteTasksManager implements PropertyChangeListener {
private Map<String, RemoteTasks> hostToTasks; private final Map<String, RemoteTasks> hostsToTasks;
/** /**
* RJCTODO * Constructs an object that listens for collaboration event messages
* broadcast by collaboration monitors on other nodes and translates
* them into remote tasks represented locally using progress bars.
*/
RemoteTasksManager() {
hostsToTasks = new HashMap<>();
}
/**
* Update the remote tasks based to reflect a collaboration event
* received from another node.
* *
* @param event * @param event A collaboration event.
*/ */
@Override @Override
public void propertyChange(PropertyChangeEvent event) { public void propertyChange(PropertyChangeEvent event) {
@ -315,19 +311,19 @@ final class CollaborationMonitor {
*/ */
synchronized void updateTasks(CollaborationEvent event) { synchronized void updateTasks(CollaborationEvent event) {
// RJCTODO: This is a little hard to understand, consider some renaming // RJCTODO: This is a little hard to understand, consider some renaming
RemoteTasks tasks = hostToTasks.get(event.getHostName()); RemoteTasks tasks = hostsToTasks.get(event.getHostName());
if (null != tasks) { if (null != tasks) {
tasks.update(event); tasks.update(event);
} else { } else {
hostToTasks.put(event.getHostName(), new RemoteTasks(event)); hostsToTasks.put(event.getHostName(), new RemoteTasks(event));
} }
} }
/** /**
* RJCTODO * RJCTODO
*/ */
synchronized void finshStaleTasks() { synchronized void finishStaleTasks() {
for (Iterator<Map.Entry<String, RemoteTasks>> it = hostToTasks.entrySet().iterator(); it.hasNext();) { for (Iterator<Map.Entry<String, RemoteTasks>> it = hostsToTasks.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, RemoteTasks> entry = it.next(); Map.Entry<String, RemoteTasks> entry = it.next();
RemoteTasks tasks = entry.getValue(); RemoteTasks tasks = entry.getValue();
if (tasks.isStale()) { if (tasks.isStale()) {
@ -338,22 +334,24 @@ final class CollaborationMonitor {
} }
/** /**
* A collection of progress bars for tasks on a collaborating node, * A collection of progress bars for tasks on a collaborating node.
* obtained from a collaboration event and bundled with the time of the
* last update.
*/ */
class RemoteTasks { class RemoteTasks {
private final Duration MAX_MINUTES_WITHOUT_UPDATE = Duration.ofSeconds(MINUTES_BETWEEN_HEARTBEATS * 5); private final Duration MAX_MINUTES_WITHOUT_UPDATE = Duration.ofSeconds(HEARTBEAT_INTERVAL_MINUTES * MAX_MISSED_HEARTBEATS);
private Instant lastUpdateTime; private Instant lastUpdateTime;
private Map<Long, ProgressHandle> taskIdsToProgressBars; private Map<Long, ProgressHandle> taskIdsToProgressBars;
/** /**
* RJCTODO * Construct a set of progress bars to represent remote tasks for a
* particular host.
* *
* @param event * @param event A collaboration event.
*/ */
RemoteTasks(CollaborationEvent event) { RemoteTasks(CollaborationEvent event) {
/**
* Set the initial value of the last update time stamp.
*/
lastUpdateTime = Instant.now(); lastUpdateTime = Instant.now();
event.getCurrentTasks().values().stream().forEach((task) -> { event.getCurrentTasks().values().stream().forEach((task) -> {
ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName()); ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName());
@ -371,21 +369,27 @@ final class CollaborationMonitor {
*/ */
void update(CollaborationEvent event) { void update(CollaborationEvent event) {
/** /**
* Update the timestamp. * Update the last update timestamp.
*/ */
lastUpdateTime = Instant.now(); lastUpdateTime = Instant.now();
Map<Long, Task> currentTasks = event.getCurrentTasks();
/** /**
* Create or update the progress bars for the current tasks of * Create or update the progress bars for the current tasks of
* the node that published the event. * the node that published the event.
*/ */
currentTasks.values().stream().forEach((task) -> { Map<Long, Task> remoteTasks = event.getCurrentTasks();
remoteTasks.values().stream().forEach((task) -> {
ProgressHandle progress = taskIdsToProgressBars.get(task.getId()); ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
if (null != progress) { if (null != progress) {
/**
* Update the existing progress bar.
*/
progress.progress(task.getStatus()); progress.progress(task.getStatus());
} else { } else {
/**
* A new task, create a progress bar.
*/
progress = ProgressHandleFactory.createHandle(event.getHostName()); progress = ProgressHandleFactory.createHandle(event.getHostName());
progress.start(); progress.start();
progress.progress(task.getStatus()); progress.progress(task.getStatus());
@ -398,13 +402,17 @@ final class CollaborationMonitor {
* it is finished. Remove the progress bars for finished tasks. * it is finished. Remove the progress bars for finished tasks.
*/ */
for (Long id : taskIdsToProgressBars.keySet()) { for (Long id : taskIdsToProgressBars.keySet()) {
if (!currentTasks.containsKey(id)) { if (!remoteTasks.containsKey(id)) {
ProgressHandle progress = taskIdsToProgressBars.remove(id); ProgressHandle progress = taskIdsToProgressBars.remove(id);
progress.finish(); progress.finish();
} }
} }
} }
/**
* Unconditionally finishes the entire set or remote tasks. To be
* used when a host drops off unexpectedly.
*/
void finishAllTasks() { void finishAllTasks() {
taskIdsToProgressBars.values().stream().forEach((progress) -> { taskIdsToProgressBars.values().stream().forEach((progress) -> {
progress.finish(); progress.finish();
@ -440,7 +448,7 @@ final class CollaborationMonitor {
*/ */
@Override @Override
public void run() { public void run() {
eventPublisher.publish(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks())); eventPublisher.publishRemotely(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
} }
} }
@ -454,7 +462,7 @@ final class CollaborationMonitor {
*/ */
@Override @Override
public void run() { public void run() {
remoteTasksManager.finshStaleTasks(); remoteTasksManager.finishStaleTasks();
} }
} }

View File

@ -134,13 +134,7 @@ public final class AutopsyEventPublisher {
*/ */
public void publish(AutopsyEvent event) { public void publish(AutopsyEvent event) {
publishLocally(event); publishLocally(event);
if (null != remotePublisher) { publishRemotely(event);
try {
remotePublisher.publish(event);
} catch (JMSException ex) {
logger.log(Level.SEVERE, String.format("Failed to publish %s event remotely", event.getPropertyName()), ex); //NON-NLS
}
}
} }
/** /**
@ -152,4 +146,19 @@ public final class AutopsyEventPublisher {
localPublisher.publish(event); localPublisher.publish(event);
} }
/**
* Publishes an event to other Autopsy nodes only.
*
* @param event The event to publish.
*/
public void publishRemotely(AutopsyEvent event) {
if (null != remotePublisher) {
try {
remotePublisher.publish(event);
} catch (JMSException ex) {
logger.log(Level.SEVERE, String.format("Failed to publish %s event remotely", event.getPropertyName()), ex); //NON-NLS
}
}
}
} }

View File

@ -116,10 +116,15 @@ public final class IngestJob {
* startup is going to fail, it will likely fail for the first child * startup is going to fail, it will likely fail for the first child
* data source ingest job. * data source ingest job.
*/ */
if (!errors.isEmpty()) { if (errors.isEmpty()) {
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { IngestManager ingestManager = IngestManager.getInstance();
this.dataSourceJobs.values().stream().forEach((dataSourceJob) -> {
ingestManager.fireDataSourceAnalysisStarted(id, dataSourceJob.getId(), dataSourceJob.getDataSource());
});
} else {
this.dataSourceJobs.values().stream().forEach((dataSourceJob) -> {
dataSourceJob.cancel(); dataSourceJob.cancel();
} });
} }
return errors; return errors;
@ -151,9 +156,9 @@ public final class IngestJob {
*/ */
List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() { List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>(); List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { this.dataSourceJobs.values().stream().forEach((dataSourceJob) -> {
snapshots.add(dataSourceJob.getSnapshot(true)); snapshots.add(dataSourceJob.getSnapshot(true));
} });
return snapshots; return snapshots;
} }
@ -164,9 +169,11 @@ public final class IngestJob {
* pipelines respond by stopping processing. * pipelines respond by stopping processing.
*/ */
public void cancel() { public void cancel() {
for (DataSourceIngestJob job : this.dataSourceJobs.values()) { IngestManager ingestManager = IngestManager.getInstance();
this.dataSourceJobs.values().stream().forEach((job) -> {
job.cancel(); job.cancel();
} ingestManager.fireDataSourceAnalysisCancelled(id, job.getId(), job.getDataSource());
});
this.cancelled = true; this.cancelled = true;
} }
@ -184,11 +191,13 @@ public final class IngestJob {
* Provides a callback for completed data source ingest jobs, allowing this * Provides a callback for completed data source ingest jobs, allowing this
* ingest job to notify the ingest manager when it is complete. * ingest job to notify the ingest manager when it is complete.
* *
* @param dataSourceIngestJob A completed data source ingest job. * @param job A completed data source ingest job.
*/ */
void dataSourceJobFinished(DataSourceIngestJob dataSourceIngestJob) { void dataSourceJobFinished(DataSourceIngestJob job) {
IngestManager ingestManager = IngestManager.getInstance();
ingestManager.fireDataSourceAnalysisCompleted(id, job.getId(), job.getDataSource());
if (incompleteJobsCount.decrementAndGet() == 0) { if (incompleteJobsCount.decrementAndGet() == 0) {
IngestManager.getInstance().finishIngestJob(this); ingestManager.finishIngestJob(this);
} }
} }

View File

@ -51,6 +51,8 @@ import org.sleuthkit.autopsy.events.AutopsyEventException;
import org.sleuthkit.autopsy.events.AutopsyEventPublisher; import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
import org.sleuthkit.autopsy.ingest.events.BlackboardPostEvent; import org.sleuthkit.autopsy.ingest.events.BlackboardPostEvent;
import org.sleuthkit.autopsy.ingest.events.ContentChangedEvent; import org.sleuthkit.autopsy.ingest.events.ContentChangedEvent;
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent; import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
@ -643,6 +645,42 @@ public class IngestManager {
eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher)); eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
} }
/**
* Fire an ingest event signifying analysis of a data source started.
*
* @param ingestJobId The ingest job id.
* @param dataSourceIngestJobId The data source ingest job id.
* @param dataSource The data source.
*/
void fireDataSourceAnalysisStarted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
AutopsyEvent event = new DataSourceAnalysisStartedEvent(ingestJobId, dataSourceIngestJobId, dataSource);
eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
}
/**
* Fire an ingest event signifying analysis of a data source finished.
*
* @param ingestJobId The ingest job id.
* @param dataSourceIngestJobId The data source ingest job id.
* @param dataSource The data source.
*/
void fireDataSourceAnalysisCompleted(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.COMPLETED);
eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
}
/**
* Fire an ingest event signifying analysis of a data source was canceled.
*
* @param ingestJobId The ingest job id.
* @param dataSourceIngestJobId The data source ingest job id.
* @param dataSource The data source.
*/
void fireDataSourceAnalysisCancelled(long ingestJobId, long dataSourceIngestJobId, Content dataSource) {
AutopsyEvent event = new DataSourceAnalysisCompletedEvent(ingestJobId, dataSourceIngestJobId, dataSource, DataSourceAnalysisCompletedEvent.Reason.CANCELLED);
eventPublishingExecutor.submit(new PublishEventTask(event, jobEventPublisher));
}
/** /**
* Fire an ingest event signifying the ingest of a file is completed. * Fire an ingest event signifying the ingest of a file is completed.
* *