Additional work on CollaborationMonitor

This commit is contained in:
Richard Cordovano 2015-06-03 14:19:23 -04:00
parent 5e198ab098
commit 23fafe0f1f
3 changed files with 324 additions and 94 deletions

View File

@ -113,6 +113,13 @@ public class Case implements SleuthkitCase.ErrorObserver {
* no examiner set. * no examiner set.
*/ */
EXAMINER, EXAMINER,
/**
* Property name used for a property change event that indicates a new
* data source (image, local/logical file or local disk) is being added
* to the current case. The new value field of the property change event
* is the path of the data source.
*/
ADDING_DATA_SOURCE,
/** /**
* Property name that indicates a new data source (image, disk or local * Property name that indicates a new data source (image, disk or local
* file) has been added to the current case. The new value is the * file) has been added to the current case. The new value is the
@ -200,7 +207,6 @@ public class Case implements SleuthkitCase.ErrorObserver {
private final static String REPORTS_FOLDER = "Reports"; //NON-NLS private final static String REPORTS_FOLDER = "Reports"; //NON-NLS
private final static String TEMP_FOLDER = "Temp"; //NON-NLS private final static String TEMP_FOLDER = "Temp"; //NON-NLS
// we cache if the case has data in it yet since a few places ask for it and we dont' need to keep going to DB // we cache if the case has data in it yet since a few places ask for it and we dont' need to keep going to DB
private boolean hasData = false; private boolean hasData = false;
@ -1133,7 +1139,6 @@ public class Case implements SleuthkitCase.ErrorObserver {
* The methods below are used to manage the case directories (creating, * The methods below are used to manage the case directories (creating,
* checking, deleting, etc) * checking, deleting, etc)
*/ */
/** /**
* Create the case directory and its needed subfolders. * Create the case directory and its needed subfolders.
* *
@ -1162,8 +1167,8 @@ public class Case implements SleuthkitCase.ErrorObserver {
} }
// create the folders inside the case directory // create the folders inside the case directory
String hostClause=""; String hostClause = "";
if (caseType == CaseType.MULTI_USER_CASE) { if (caseType == CaseType.MULTI_USER_CASE) {
hostClause = File.separator + getLocalHostName(); hostClause = File.separator + getLocalHostName();
} }
@ -1192,7 +1197,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.cantCreateReportsDir", NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.cantCreateReportsDir",
modulesOutDir)); modulesOutDir));
} }
} catch (Exception e) { } catch (Exception e) {
throw new CaseActionException( throw new CaseActionException(
NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.gen", caseDir), e); NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.gen", caseDir), e);
@ -1379,7 +1384,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
} }
return hasData; return hasData;
} }
/** /**
* Set the host name variable. Sometimes the network can be finicky, so the * Set the host name variable. Sometimes the network can be finicky, so the
* answer returned by getHostName() could throw an exception or be null. * answer returned by getHostName() could throw an exception or be null.

View File

@ -23,22 +23,30 @@ import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener; import java.beans.PropertyChangeListener;
import java.io.Serializable; import java.io.Serializable;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandle;
import org.netbeans.api.progress.ProgressHandleFactory;
import org.openide.util.Exceptions;
import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.events.AutopsyEvent; import org.sleuthkit.autopsy.events.AutopsyEvent;
import org.sleuthkit.autopsy.events.AutopsyEventException; import org.sleuthkit.autopsy.events.AutopsyEventException;
import org.sleuthkit.autopsy.events.AutopsyEventPublisher; import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
import org.sleuthkit.autopsy.ingest.IngestManager; import org.sleuthkit.autopsy.ingest.IngestManager;
import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.TskCoreException;
/** /**
* A monitor needs to listen to local event messages so that it can create and * A monitor needs to listen to local event messages so that it can create and
@ -66,88 +74,43 @@ import org.sleuthkit.autopsy.ingest.IngestManager;
final class CollaborationMonitor { final class CollaborationMonitor {
private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events";
private static final String COLLABORATION_EVENT = "COLLABORATION_MONITOR_EVENT"; private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT";
private static final String HEARTBEAT_THREAD_NAME = "collab-monitor-heartbeat-%d"; private static final String HEARTBEAT_THREAD_NAME = "collab-monitor-heartbeat-%d";
private static final long HEARTBEAT_INTERVAL_SECS = 1; private static final long MINUTES_BETWEEN_HEARTBEATS = 1;
private static final String CRASH_DETECTION_THREAD_NAME = "collab-monitor-crash-detector-%d"; private static final String CRASH_DETECTION_THREAD_NAME = "collab-monitor-crash-detector-%d";
private static final long CRASH_DETECTION_INTERVAL_SECS = 60; private static final long CRASH_DETECTION_INTERVAL_SECS = 60;
private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30; private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName()); private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
private final String hostName; private final String hostName;
private final MonitorEventListener eventListener; private final LocalTasksManager localTasksManager;
private final RemoteTasksManager remoteTasksManager;
private final AutopsyEventPublisher eventPublisher; private final AutopsyEventPublisher eventPublisher;
private final ScheduledThreadPoolExecutor heartbeatExecutor; private final ScheduledThreadPoolExecutor heartbeatExecutor;
private final ScheduledThreadPoolExecutor crashDetectionExecutor; private final ScheduledThreadPoolExecutor crashDetectionExecutor;
private Map<String, Task> dataSourceAddTasks;
Object dataSourceAddTasksLock;
private Map<Long, Task> dataSourceIngestTasks;
Object dataSourceIngestTasksLock;
private Map<String, List<Task>> collaboratorTasks;
Object collaboratorTasksLock;
private CaseEventListener caseEventListener;
private IngestJobEventListener ingestEventListener;
/** /**
* RJCTODO * RJCTODO
*/ */
CollaborationMonitor() throws CollaborationMonitorException { CollaborationMonitor() throws CollaborationMonitorException {
hostName = getLocalHostName(); hostName = getLocalHostName();
eventListener = new MonitorEventListener(); remoteTasksManager = new RemoteTasksManager();
eventPublisher = new AutopsyEventPublisher(); eventPublisher = new AutopsyEventPublisher();
eventPublisher.addSubscriber(COLLABORATION_EVENT, eventListener); eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
try { try {
eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, Case.getCurrentCase().getName())); eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, Case.getCurrentCase().getName()));
} catch (AutopsyEventException ex) { } catch (AutopsyEventException ex) {
// RJCTODO: // RJCTODO:
throw new CollaborationMonitorException("Failed to initialize", ex); throw new CollaborationMonitorException("Failed to initialize", ex);
} }
heartbeatExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(HEARTBEAT_THREAD_NAME).build()); heartbeatExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(HEARTBEAT_THREAD_NAME).build());
heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_SECS, HEARTBEAT_INTERVAL_SECS, TimeUnit.SECONDS); heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), MINUTES_BETWEEN_HEARTBEATS, MINUTES_BETWEEN_HEARTBEATS, TimeUnit.MINUTES);
crashDetectionExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(CRASH_DETECTION_THREAD_NAME).build()); crashDetectionExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(CRASH_DETECTION_THREAD_NAME).build());
heartbeatExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_SECS, CRASH_DETECTION_INTERVAL_SECS, TimeUnit.SECONDS); heartbeatExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_SECS, CRASH_DETECTION_INTERVAL_SECS, TimeUnit.SECONDS);
IngestManager.getInstance().addIngestJobEventListener(ingestEventListener);
Case.addEventSubscriber("", caseEventListener); // RJCTODO
}
/** localTasksManager = new LocalTasksManager();
* RJCTODO IngestManager.getInstance().addIngestJobEventListener(localTasksManager);
*/ Case.addEventSubscriber("", localTasksManager); // RJCTODO: Fill in events of interest
void stop() {
stopExecutor(crashDetectionExecutor, CRASH_DETECTION_THREAD_NAME);
stopExecutor(heartbeatExecutor, HEARTBEAT_THREAD_NAME);
eventPublisher.removeSubscriber(COLLABORATION_EVENT, eventListener);
eventPublisher.closeRemoteEventChannel();
}
private void updateDataSourceTasks(AutopsyEvent event) {
}
/**
* RJCTODO
* @param event
*/
private void updateCollaboratorTasks(CollaborationEvent event) {
/**
* RJCTODO
*/
}
/**
* RJCTODO
*/
private static void stopExecutor(ScheduledThreadPoolExecutor executor, String name) {
if (null != executor) {
executor.shutdownNow();
try {
while (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Waited at least thirty seconds for {0} executor to shut down, continuing to wait", name); //NON-NLS
}
} catch (InterruptedException ex) {
// RJCTODO:
}
}
} }
/** /**
@ -166,25 +129,96 @@ final class CollaborationMonitor {
return hostName; return hostName;
} }
private static final class Task implements Serializable { /**
*
*/
private void subscribeToRemoteEvents() {
private static final long serialVersionUID = 1L; }
long id;
String status; /**
*
*/
private void subscribeToLocalEvents() {
} }
/** /**
* RJCTODO * RJCTODO
*/ */
private final class CaseEventListener implements PropertyChangeListener { void stop() {
stopExecutor(crashDetectionExecutor, CRASH_DETECTION_THREAD_NAME);
stopExecutor(heartbeatExecutor, HEARTBEAT_THREAD_NAME);
eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager);
eventPublisher.closeRemoteEventChannel();
}
private void updateDataSourceTasks(AutopsyEvent event) {
}
/**
* RJCTODO
*
* @param event
*/
private void updateCollaboratorTasks(CollaborationEvent event) {
/** /**
* RJCTODO * RJCTODO
*/
}
/**
* RJCTODO
*/
private static void stopExecutor(ScheduledThreadPoolExecutor executor, String name) {
if (null != executor) {
executor.shutdownNow();
try {
while (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) {
logger.log(Level.WARNING, "Waited at least thirty seconds for {0} executor to shut down, continuing to wait", name); //NON-NLS
}
} catch (InterruptedException ex) {
// RJCTODO:
}
}
}
/**
* The local tasks manager listens to local events and tracks them as
* collaboration tasks that can be broadcast to collaborating nodes.
*/
private final class LocalTasksManager implements PropertyChangeListener {
private final AtomicLong nextTaskId;
private final Map<String, Task> pathsToAddDataSourceTasks;
private final Map<Long, Task> jobIdsTodataSourceAnalysisTasks;
/**
* Constructs a local tasks manager that listens to local events and
* tracks them as collaboration tasks that can be broadcast to
* collaborating nodes.
*/
LocalTasksManager() {
nextTaskId = new AtomicLong(0L);
pathsToAddDataSourceTasks = new HashMap<>();
jobIdsTodataSourceAnalysisTasks = new HashMap<>();
}
/**
* Updates the collection of tasks this node is performing in response
* to receiving an event in the form of a property change.
* *
* @param evt * @param evt A PropertyChangeEvent.
*/ */
@Override @Override
public void propertyChange(PropertyChangeEvent evt) { public void propertyChange(PropertyChangeEvent evt) {
String eventName = evt.getPropertyName();
if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) {
addDataSourceAddTask(evt);
} else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) {
addDataSourceAddTask(evt);
}
/** /**
* RJCTODO: When local ADDING_DATA_SOURCE (new) event is received, * RJCTODO: When local ADDING_DATA_SOURCE (new) event is received,
* create a new task. When local DATA_SOURCE_ADDED event is * create a new task. When local DATA_SOURCE_ADDED event is
@ -192,22 +226,6 @@ final class CollaborationMonitor {
* have full image paths to match up with the Content objects on the * have full image paths to match up with the Content objects on the
* DATA_SOURCE_ADDED events. * DATA_SOURCE_ADDED events.
*/ */
}
}
/**
* RJCTODO
*/
private final class IngestJobEventListener implements PropertyChangeListener {
/**
* RJCTODO
*
* @param evt
*/
@Override
public void propertyChange(PropertyChangeEvent evt) {
/** /**
* RJCTODO: When local DATA_SOURCE_INGEST_STARTED event (new) is * RJCTODO: When local DATA_SOURCE_INGEST_STARTED event (new) is
* received, create new data source analysis tasks. When local * received, create new data source analysis tasks. When local
@ -218,38 +236,190 @@ final class CollaborationMonitor {
*/ */
} }
synchronized void addDataSourceAddTask(PropertyChangeEvent evt) {
String dataSourcePath = (String) evt.getNewValue();
String taskStatus = String.format("Adding data source %s", dataSourcePath); // RJCTODO: Bundle
// RJCTODO: This probably will not work, path needs to be sanitized
pathsToAddDataSourceTasks.put(dataSourcePath, new Task(nextTaskId.getAndIncrement(), taskStatus));
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks()));
}
synchronized void removeDataSourceAddTask(PropertyChangeEvent evt) {
DataSourceAddedEvent event = (DataSourceAddedEvent) evt;
Content dataSource = event.getDataSource();
try {
pathsToAddDataSourceTasks.remove(dataSource.getUniquePath());
} catch (TskCoreException ex) {
// RJCTODO
}
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks()));
}
synchronized void addDataSourceAnalysisTask(PropertyChangeEvent evt) {
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks()));
}
synchronized void removeDataSourceAnalysisTask(PropertyChangeEvent evt) {
eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks()));
}
/**
* Gets the current local tasks.
*
* @return A mapping of task IDs to tasks
*/
synchronized Map<Long, Task> getCurrentTasks() {
Map<Long, Task> currentTasks = new HashMap<>();
pathsToAddDataSourceTasks.values().stream().forEach((task) -> {
currentTasks.put(task.getId(), task);
});
jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> {
currentTasks.put(task.getId(), task);
});
return currentTasks;
}
} }
/** /**
* RJCTODO * RJCTODO
*/ */
private final class MonitorEventListener implements PropertyChangeListener { private final class RemoteTasksManager implements PropertyChangeListener {
private Map<String, RemoteTasks> hostToTasks;
/** /**
* RJCTODO * RJCTODO
* *
* @param evt * @param event
*/ */
@Override @Override
public void propertyChange(PropertyChangeEvent evt) { public void propertyChange(PropertyChangeEvent event) {
if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) {
updateTasks((CollaborationEvent) event);
}
}
synchronized void updateTasks(CollaborationEvent event) {
RemoteTasks tasks = hostToTasks.get(event.getHostName());
if (null != tasks) {
// Update time stamp
} else {
hostToTasks.put(event.getHostName(), new RemoteTasks(event)); // Pass in task list
}
// Lookup the RemoteTasks for this host.
// If found
// update the time stamp
// for each key in task keyset
// if task id not in current list
// finish progress, discard task
// else
// update progress bar with new text
// update time stamp
// endif
// if none found
// add to task map for host
// else
}
/**
* A collection of progress bars for tasks on a collaborating node,
* obtained from a collaboration event and bundled with the time of the
* last update.
*/
class RemoteTasks {
private final Duration MAX_MINUTES_WITHOUT_UPDATE = Duration.ofSeconds(MINUTES_BETWEEN_HEARTBEATS * 5);
private Instant lastUpdateTime;
private Map<Long, ProgressHandle> taskIdsToProgressBars;
/** /**
* RJCTODO: Update tasks lists, progress bars * RJCTODO
*
* @param event
*/ */
RemoteTasks(CollaborationEvent event) {
lastUpdateTime = Instant.now();
event.getCurrentTasks().values().stream().forEach((task) -> {
ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName());
progress.start();
progress.progress(task.getStatus());
taskIdsToProgressBars.put(task.getId(), progress);
});
}
/**
* Updates this remotes tasks collection.
*
* @param event A collaboration event from the collaborating node
* associated with these tasks.
*/
void update(CollaborationEvent event) {
/**
* Update the timestamp.
*/
lastUpdateTime = Instant.now();
Map<Long, Task> currentTasks = event.getCurrentTasks();
/**
* Create or update the progress bars for the current tasks of
* the node that published the event.
*/
currentTasks.values().stream().forEach((task) -> {
ProgressHandle progress = taskIdsToProgressBars.get(task.getId());
if (null != progress) {
progress.progress(task.getStatus());
} else {
progress = ProgressHandleFactory.createHandle(event.getHostName());
progress.start();
progress.progress(task.getStatus());
taskIdsToProgressBars.put(task.getId(), progress);
}
});
/**
* If a task is no longer in the task list from the remote node,
* it is finished. Remove the progress bars for finished tasks.
*/
for (Long id : taskIdsToProgressBars.keySet()) {
if (!currentTasks.containsKey(id)) {
ProgressHandle progress = taskIdsToProgressBars.remove(id);
progress.finish();
}
}
}
/**
* Determines whether or not the time since the last update of this
* remote tasks collection is greater than the maximum acceptable
* interval between updates.
*
* @return True or false.
*/
boolean isStale() {
return MAX_MINUTES_WITHOUT_UPDATE.compareTo(Duration.between(lastUpdateTime, Instant.now())) >= 0;
}
} }
} }
/** /**
* RJCTODO * A Runnable task that periodically publishes the local tasks in progress
* for the current case on this node, providing a heartbeat message for
* other nodes. The current local tasks are included in the heartbeat
* message so that nodes that have just joined the event channel know what
* this node is doing when they join the collaboration on the current case.
*/ */
private final class HeartbeatTask implements Runnable { private final class HeartbeatTask implements Runnable {
/** /**
* RJCTODO * Publish a heartbeat message.
*/ */
@Override @Override
public void run() { public void run() {
// eventPublisher.publish(new CollaborationEvent(hostName, EventType.HEARTBEAT)); eventPublisher.publish(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks()));
} }
} }
@ -273,14 +443,61 @@ final class CollaborationMonitor {
private static final class CollaborationEvent extends AutopsyEvent implements Serializable { private static final class CollaborationEvent extends AutopsyEvent implements Serializable {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
List<Task> currentTasks; private final String hostName;
private final Map<Long, Task> currentTasks;
/** /**
* RJCTODO * RJCTODO
* *
* @param hostName
* @param currentTasks
*/ */
CollaborationEvent(String hostName) { CollaborationEvent(String hostName, Map<Long, Task> currentTasks) {
super(COLLABORATION_EVENT, hostName, null); super(COLLABORATION_MONITOR_EVENT, null, null);
this.hostName = hostName;
this.currentTasks = currentTasks;
}
/**
* RJCTODO
*
* @return
*/
String getHostName() {
return hostName;
}
/**
* RJCTODO
*
* @return
*/
Map<Long, Task> getCurrentTasks() {
return currentTasks;
}
}
/**
* RJCTODO
*/
private static final class Task implements Serializable {
private static final long serialVersionUID = 1L;
private final long id;
private final String status;
Task(long id, String status) {
this.id = id;
this.status = status;
}
long getId() {
return id;
}
String getStatus() {
return status;
} }
} }

View File

@ -79,4 +79,12 @@ public final class DataSourceAddedEvent extends AutopsyEvent implements Serializ
} }
} }
/**
* Gets the data source that was added.
*
* @return The data source.
*/
public Content getDataSource() {
return (Content)getNewValue();
}
} }