diff --git a/Core/src/org/sleuthkit/autopsy/casemodule/Case.java b/Core/src/org/sleuthkit/autopsy/casemodule/Case.java index 33a2b00386..93ca97874e 100644 --- a/Core/src/org/sleuthkit/autopsy/casemodule/Case.java +++ b/Core/src/org/sleuthkit/autopsy/casemodule/Case.java @@ -113,6 +113,13 @@ public class Case implements SleuthkitCase.ErrorObserver { * no examiner set. */ EXAMINER, + /** + * Property name used for a property change event that indicates a new + * data source (image, local/logical file or local disk) is being added + * to the current case. The new value field of the property change event + * is the path of the data source. + */ + ADDING_DATA_SOURCE, /** * Property name that indicates a new data source (image, disk or local * file) has been added to the current case. The new value is the @@ -200,7 +207,6 @@ public class Case implements SleuthkitCase.ErrorObserver { private final static String REPORTS_FOLDER = "Reports"; //NON-NLS private final static String TEMP_FOLDER = "Temp"; //NON-NLS - // we cache if the case has data in it yet since a few places ask for it and we dont' need to keep going to DB private boolean hasData = false; @@ -1133,7 +1139,6 @@ public class Case implements SleuthkitCase.ErrorObserver { * The methods below are used to manage the case directories (creating, * checking, deleting, etc) */ - /** * Create the case directory and its needed subfolders. * @@ -1162,8 +1167,8 @@ public class Case implements SleuthkitCase.ErrorObserver { } // create the folders inside the case directory - String hostClause=""; - + String hostClause = ""; + if (caseType == CaseType.MULTI_USER_CASE) { hostClause = File.separator + getLocalHostName(); } @@ -1192,7 +1197,7 @@ public class Case implements SleuthkitCase.ErrorObserver { NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.cantCreateReportsDir", modulesOutDir)); } - + } catch (Exception e) { throw new CaseActionException( NbBundle.getMessage(Case.class, "Case.createCaseDir.exception.gen", caseDir), e); @@ -1379,7 +1384,7 @@ public class Case implements SleuthkitCase.ErrorObserver { } return hasData; } - + /** * Set the host name variable. Sometimes the network can be finicky, so the * answer returned by getHostName() could throw an exception or be null. diff --git a/Core/src/org/sleuthkit/autopsy/casemodule/CollaborationMonitor.java b/Core/src/org/sleuthkit/autopsy/casemodule/CollaborationMonitor.java index 4ccbfab54e..ff99f609ae 100644 --- a/Core/src/org/sleuthkit/autopsy/casemodule/CollaborationMonitor.java +++ b/Core/src/org/sleuthkit/autopsy/casemodule/CollaborationMonitor.java @@ -23,22 +23,30 @@ import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; import java.io.Serializable; import java.net.UnknownHostException; +import java.time.Duration; import java.time.Instant; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.stream.Collectors; import java.util.stream.Stream; import org.netbeans.api.progress.ProgressHandle; +import org.netbeans.api.progress.ProgressHandleFactory; +import org.openide.util.Exceptions; +import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent; import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.events.AutopsyEvent; import org.sleuthkit.autopsy.events.AutopsyEventException; import org.sleuthkit.autopsy.events.AutopsyEventPublisher; import org.sleuthkit.autopsy.ingest.IngestManager; +import org.sleuthkit.datamodel.Content; +import org.sleuthkit.datamodel.TskCoreException; /** * A monitor needs to listen to local event messages so that it can create and @@ -66,88 +74,43 @@ import org.sleuthkit.autopsy.ingest.IngestManager; final class CollaborationMonitor { private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; - private static final String COLLABORATION_EVENT = "COLLABORATION_MONITOR_EVENT"; + private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; private static final String HEARTBEAT_THREAD_NAME = "collab-monitor-heartbeat-%d"; - private static final long HEARTBEAT_INTERVAL_SECS = 1; + private static final long MINUTES_BETWEEN_HEARTBEATS = 1; private static final String CRASH_DETECTION_THREAD_NAME = "collab-monitor-crash-detector-%d"; private static final long CRASH_DETECTION_INTERVAL_SECS = 60; private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30; private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName()); private final String hostName; - private final MonitorEventListener eventListener; + private final LocalTasksManager localTasksManager; + private final RemoteTasksManager remoteTasksManager; private final AutopsyEventPublisher eventPublisher; private final ScheduledThreadPoolExecutor heartbeatExecutor; private final ScheduledThreadPoolExecutor crashDetectionExecutor; - private Map dataSourceAddTasks; - Object dataSourceAddTasksLock; - private Map dataSourceIngestTasks; - Object dataSourceIngestTasksLock; - private Map> collaboratorTasks; - Object collaboratorTasksLock; - private CaseEventListener caseEventListener; - private IngestJobEventListener ingestEventListener; /** * RJCTODO */ CollaborationMonitor() throws CollaborationMonitorException { hostName = getLocalHostName(); - eventListener = new MonitorEventListener(); + remoteTasksManager = new RemoteTasksManager(); eventPublisher = new AutopsyEventPublisher(); - eventPublisher.addSubscriber(COLLABORATION_EVENT, eventListener); + eventPublisher.addSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager); try { eventPublisher.openRemoteEventChannel(String.format(EVENT_CHANNEL_NAME, Case.getCurrentCase().getName())); } catch (AutopsyEventException ex) { // RJCTODO: throw new CollaborationMonitorException("Failed to initialize", ex); } + heartbeatExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(HEARTBEAT_THREAD_NAME).build()); - heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_SECS, HEARTBEAT_INTERVAL_SECS, TimeUnit.SECONDS); + heartbeatExecutor.scheduleAtFixedRate(new HeartbeatTask(), MINUTES_BETWEEN_HEARTBEATS, MINUTES_BETWEEN_HEARTBEATS, TimeUnit.MINUTES); crashDetectionExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(CRASH_DETECTION_THREAD_NAME).build()); heartbeatExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_SECS, CRASH_DETECTION_INTERVAL_SECS, TimeUnit.SECONDS); - IngestManager.getInstance().addIngestJobEventListener(ingestEventListener); - Case.addEventSubscriber("", caseEventListener); // RJCTODO - } - /** - * RJCTODO - */ - void stop() { - stopExecutor(crashDetectionExecutor, CRASH_DETECTION_THREAD_NAME); - stopExecutor(heartbeatExecutor, HEARTBEAT_THREAD_NAME); - eventPublisher.removeSubscriber(COLLABORATION_EVENT, eventListener); - eventPublisher.closeRemoteEventChannel(); - } - - private void updateDataSourceTasks(AutopsyEvent event) { - - } - - - /** - * RJCTODO - * @param event - */ - private void updateCollaboratorTasks(CollaborationEvent event) { - /** - * RJCTODO - */ - } - - /** - * RJCTODO - */ - private static void stopExecutor(ScheduledThreadPoolExecutor executor, String name) { - if (null != executor) { - executor.shutdownNow(); - try { - while (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) { - logger.log(Level.WARNING, "Waited at least thirty seconds for {0} executor to shut down, continuing to wait", name); //NON-NLS - } - } catch (InterruptedException ex) { - // RJCTODO: - } - } + localTasksManager = new LocalTasksManager(); + IngestManager.getInstance().addIngestJobEventListener(localTasksManager); + Case.addEventSubscriber("", localTasksManager); // RJCTODO: Fill in events of interest } /** @@ -166,25 +129,96 @@ final class CollaborationMonitor { return hostName; } - private static final class Task implements Serializable { + /** + * + */ + private void subscribeToRemoteEvents() { - private static final long serialVersionUID = 1L; - long id; - String status; + } + + /** + * + */ + private void subscribeToLocalEvents() { } /** * RJCTODO */ - private final class CaseEventListener implements PropertyChangeListener { + void stop() { + stopExecutor(crashDetectionExecutor, CRASH_DETECTION_THREAD_NAME); + stopExecutor(heartbeatExecutor, HEARTBEAT_THREAD_NAME); + eventPublisher.removeSubscriber(COLLABORATION_MONITOR_EVENT, remoteTasksManager); + eventPublisher.closeRemoteEventChannel(); + } + private void updateDataSourceTasks(AutopsyEvent event) { + + } + + /** + * RJCTODO + * + * @param event + */ + private void updateCollaboratorTasks(CollaborationEvent event) { /** * RJCTODO + */ + } + + /** + * RJCTODO + */ + private static void stopExecutor(ScheduledThreadPoolExecutor executor, String name) { + if (null != executor) { + executor.shutdownNow(); + try { + while (!executor.awaitTermination(EXECUTOR_TERMINATION_WAIT_SECS, TimeUnit.SECONDS)) { + logger.log(Level.WARNING, "Waited at least thirty seconds for {0} executor to shut down, continuing to wait", name); //NON-NLS + } + } catch (InterruptedException ex) { + // RJCTODO: + } + } + } + + /** + * The local tasks manager listens to local events and tracks them as + * collaboration tasks that can be broadcast to collaborating nodes. + */ + private final class LocalTasksManager implements PropertyChangeListener { + + private final AtomicLong nextTaskId; + private final Map pathsToAddDataSourceTasks; + private final Map jobIdsTodataSourceAnalysisTasks; + + /** + * Constructs a local tasks manager that listens to local events and + * tracks them as collaboration tasks that can be broadcast to + * collaborating nodes. + */ + LocalTasksManager() { + nextTaskId = new AtomicLong(0L); + pathsToAddDataSourceTasks = new HashMap<>(); + jobIdsTodataSourceAnalysisTasks = new HashMap<>(); + } + + /** + * Updates the collection of tasks this node is performing in response + * to receiving an event in the form of a property change. * - * @param evt + * @param evt A PropertyChangeEvent. */ @Override public void propertyChange(PropertyChangeEvent evt) { + String eventName = evt.getPropertyName(); + if (eventName.equals(Case.Events.ADDING_DATA_SOURCE.toString())) { + addDataSourceAddTask(evt); + } else if (eventName.equals(Case.Events.DATA_SOURCE_ADDED.toString())) { + addDataSourceAddTask(evt); + } + /** * RJCTODO: When local ADDING_DATA_SOURCE (new) event is received, * create a new task. When local DATA_SOURCE_ADDED event is @@ -192,22 +226,6 @@ final class CollaborationMonitor { * have full image paths to match up with the Content objects on the * DATA_SOURCE_ADDED events. */ - } - - } - - /** - * RJCTODO - */ - private final class IngestJobEventListener implements PropertyChangeListener { - - /** - * RJCTODO - * - * @param evt - */ - @Override - public void propertyChange(PropertyChangeEvent evt) { /** * RJCTODO: When local DATA_SOURCE_INGEST_STARTED event (new) is * received, create new data source analysis tasks. When local @@ -218,38 +236,190 @@ final class CollaborationMonitor { */ } + synchronized void addDataSourceAddTask(PropertyChangeEvent evt) { + String dataSourcePath = (String) evt.getNewValue(); + String taskStatus = String.format("Adding data source %s", dataSourcePath); // RJCTODO: Bundle + // RJCTODO: This probably will not work, path needs to be sanitized + pathsToAddDataSourceTasks.put(dataSourcePath, new Task(nextTaskId.getAndIncrement(), taskStatus)); + eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); + } + + synchronized void removeDataSourceAddTask(PropertyChangeEvent evt) { + DataSourceAddedEvent event = (DataSourceAddedEvent) evt; + Content dataSource = event.getDataSource(); + try { + pathsToAddDataSourceTasks.remove(dataSource.getUniquePath()); + } catch (TskCoreException ex) { + // RJCTODO + } + eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); + } + + synchronized void addDataSourceAnalysisTask(PropertyChangeEvent evt) { + eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); + + } + + synchronized void removeDataSourceAnalysisTask(PropertyChangeEvent evt) { + eventPublisher.publish(new CollaborationEvent(hostName, getCurrentTasks())); + } + + /** + * Gets the current local tasks. + * + * @return A mapping of task IDs to tasks + */ + synchronized Map getCurrentTasks() { + Map currentTasks = new HashMap<>(); + pathsToAddDataSourceTasks.values().stream().forEach((task) -> { + currentTasks.put(task.getId(), task); + }); + jobIdsTodataSourceAnalysisTasks.values().stream().forEach((task) -> { + currentTasks.put(task.getId(), task); + }); + return currentTasks; + } } /** * RJCTODO */ - private final class MonitorEventListener implements PropertyChangeListener { + private final class RemoteTasksManager implements PropertyChangeListener { + + private Map hostToTasks; /** * RJCTODO * - * @param evt + * @param event */ @Override - public void propertyChange(PropertyChangeEvent evt) { + public void propertyChange(PropertyChangeEvent event) { + if (event.getPropertyName().equals(COLLABORATION_MONITOR_EVENT)) { + updateTasks((CollaborationEvent) event); + } + } + + synchronized void updateTasks(CollaborationEvent event) { + RemoteTasks tasks = hostToTasks.get(event.getHostName()); + if (null != tasks) { + // Update time stamp + } else { + hostToTasks.put(event.getHostName(), new RemoteTasks(event)); // Pass in task list + } + // Lookup the RemoteTasks for this host. + // If found + // update the time stamp + // for each key in task keyset + // if task id not in current list + // finish progress, discard task + // else + // update progress bar with new text + // update time stamp + // endif + + // if none found + // add to task map for host + // else + } + + /** + * A collection of progress bars for tasks on a collaborating node, + * obtained from a collaboration event and bundled with the time of the + * last update. + */ + class RemoteTasks { + + private final Duration MAX_MINUTES_WITHOUT_UPDATE = Duration.ofSeconds(MINUTES_BETWEEN_HEARTBEATS * 5); + private Instant lastUpdateTime; + private Map taskIdsToProgressBars; + /** - * RJCTODO: Update tasks lists, progress bars + * RJCTODO + * + * @param event */ + RemoteTasks(CollaborationEvent event) { + lastUpdateTime = Instant.now(); + event.getCurrentTasks().values().stream().forEach((task) -> { + ProgressHandle progress = ProgressHandleFactory.createHandle(event.getHostName()); + progress.start(); + progress.progress(task.getStatus()); + taskIdsToProgressBars.put(task.getId(), progress); + }); + } + + /** + * Updates this remotes tasks collection. + * + * @param event A collaboration event from the collaborating node + * associated with these tasks. + */ + void update(CollaborationEvent event) { + /** + * Update the timestamp. + */ + lastUpdateTime = Instant.now(); + + Map currentTasks = event.getCurrentTasks(); + + /** + * Create or update the progress bars for the current tasks of + * the node that published the event. + */ + currentTasks.values().stream().forEach((task) -> { + ProgressHandle progress = taskIdsToProgressBars.get(task.getId()); + if (null != progress) { + progress.progress(task.getStatus()); + } else { + progress = ProgressHandleFactory.createHandle(event.getHostName()); + progress.start(); + progress.progress(task.getStatus()); + taskIdsToProgressBars.put(task.getId(), progress); + } + }); + + /** + * If a task is no longer in the task list from the remote node, + * it is finished. Remove the progress bars for finished tasks. + */ + for (Long id : taskIdsToProgressBars.keySet()) { + if (!currentTasks.containsKey(id)) { + ProgressHandle progress = taskIdsToProgressBars.remove(id); + progress.finish(); + } + } + } + + /** + * Determines whether or not the time since the last update of this + * remote tasks collection is greater than the maximum acceptable + * interval between updates. + * + * @return True or false. + */ + boolean isStale() { + return MAX_MINUTES_WITHOUT_UPDATE.compareTo(Duration.between(lastUpdateTime, Instant.now())) >= 0; + } } } /** - * RJCTODO + * A Runnable task that periodically publishes the local tasks in progress + * for the current case on this node, providing a heartbeat message for + * other nodes. The current local tasks are included in the heartbeat + * message so that nodes that have just joined the event channel know what + * this node is doing when they join the collaboration on the current case. */ private final class HeartbeatTask implements Runnable { /** - * RJCTODO + * Publish a heartbeat message. */ @Override public void run() { -// eventPublisher.publish(new CollaborationEvent(hostName, EventType.HEARTBEAT)); + eventPublisher.publish(new CollaborationEvent(hostName, localTasksManager.getCurrentTasks())); } } @@ -273,14 +443,61 @@ final class CollaborationMonitor { private static final class CollaborationEvent extends AutopsyEvent implements Serializable { private static final long serialVersionUID = 1L; - List currentTasks; + private final String hostName; + private final Map currentTasks; /** * RJCTODO * + * @param hostName + * @param currentTasks */ - CollaborationEvent(String hostName) { - super(COLLABORATION_EVENT, hostName, null); + CollaborationEvent(String hostName, Map currentTasks) { + super(COLLABORATION_MONITOR_EVENT, null, null); + this.hostName = hostName; + this.currentTasks = currentTasks; + } + + /** + * RJCTODO + * + * @return + */ + String getHostName() { + return hostName; + } + + /** + * RJCTODO + * + * @return + */ + Map getCurrentTasks() { + return currentTasks; + } + + } + + /** + * RJCTODO + */ + private static final class Task implements Serializable { + + private static final long serialVersionUID = 1L; + private final long id; + private final String status; + + Task(long id, String status) { + this.id = id; + this.status = status; + } + + long getId() { + return id; + } + + String getStatus() { + return status; } } diff --git a/Core/src/org/sleuthkit/autopsy/casemodule/events/DataSourceAddedEvent.java b/Core/src/org/sleuthkit/autopsy/casemodule/events/DataSourceAddedEvent.java index 2d547c7526..0f5dff6a95 100644 --- a/Core/src/org/sleuthkit/autopsy/casemodule/events/DataSourceAddedEvent.java +++ b/Core/src/org/sleuthkit/autopsy/casemodule/events/DataSourceAddedEvent.java @@ -79,4 +79,12 @@ public final class DataSourceAddedEvent extends AutopsyEvent implements Serializ } } + /** + * Gets the data source that was added. + * + * @return The data source. + */ + public Content getDataSource() { + return (Content)getNewValue(); + } }