Created ServicesMonitor and refactored CollaborationMonitor

This commit is contained in:
Eugene Livis 2015-07-02 14:42:04 -04:00
parent da3c3fc050
commit 7c1da35b33
2 changed files with 169 additions and 90 deletions

View File

@ -22,7 +22,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener; import java.beans.PropertyChangeListener;
import java.io.Serializable; import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
@ -36,45 +35,35 @@ import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandle;
import org.netbeans.api.progress.ProgressHandleFactory; import org.netbeans.api.progress.ProgressHandleFactory;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceEvent; import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceEvent;
import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceFailedEvent; import org.sleuthkit.autopsy.casemodule.events.AddingDataSourceFailedEvent;
import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent; import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent;
import org.sleuthkit.autopsy.core.UserPreferences;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
import org.sleuthkit.autopsy.events.AutopsyEvent; import org.sleuthkit.autopsy.events.AutopsyEvent;
import org.sleuthkit.autopsy.events.AutopsyEventException; import org.sleuthkit.autopsy.events.AutopsyEventException;
import org.sleuthkit.autopsy.events.AutopsyEventPublisher; import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
import org.sleuthkit.autopsy.events.MessageServiceConnectionInfo;
import org.sleuthkit.autopsy.ingest.IngestManager; import org.sleuthkit.autopsy.ingest.IngestManager;
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent; import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisCompletedEvent;
import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent; import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService;
import org.sleuthkit.datamodel.CaseDbConnectionInfo;
/** /**
* A collaboration monitor listens to local events and translates them into * A collaboration monitor listens to local events and translates them into
* collaboration tasks that are broadcast to collaborating nodes, informs the * collaboration tasks that are broadcast to collaborating nodes and informs the
* user of collaboration tasks on other nodes using progress bars, and monitors * user of collaboration tasks on other nodes using progress bars.
* the health of key collaboration services.
*/ */
final class CollaborationMonitor { final class CollaborationMonitor {
private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events"; private static final String EVENT_CHANNEL_NAME = "%s-Collaboration-Monitor-Events";
private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT"; private static final String COLLABORATION_MONITOR_EVENT = "COLLABORATION_MONITOR_EVENT";
private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()})); private static final Set<String> CASE_EVENTS_OF_INTEREST = new HashSet<>(Arrays.asList(new String[]{Case.Events.ADDING_DATA_SOURCE.toString(), Case.Events.DATA_SOURCE_ADDED.toString(), Case.Events.ADDING_DATA_SOURCE_FAILED.toString()}));
private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 3; private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 2;
private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d"; private static final String PERIODIC_TASK_THREAD_NAME = "collab-monitor-periodic-tasks-%d";
private static final long HEARTBEAT_INTERVAL_MINUTES = 1; private static final long HEARTBEAT_INTERVAL_MINUTES = 1;
private static final long MAX_MISSED_HEARTBEATS = 5; private static final long MAX_MISSED_HEARTBEATS = 5;
private static final long STALE_TASKS_DETECTION_INTERVAL_MINUTES = 2; private static final long STALE_TASKS_DETECTION_INTERVAL_MINUTES = 2;
private static final long CRASH_DETECTION_INTERVAL_MINUTES = 2;
private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30; private static final long EXECUTOR_TERMINATION_WAIT_SECS = 30;
private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName()); private static final Logger logger = Logger.getLogger(CollaborationMonitor.class.getName());
private final String hostName; private final String hostName;
@ -129,12 +118,10 @@ final class CollaborationMonitor {
* *
* 1. Send heartbeats to collaboration monitors on other nodes.<br> * 1. Send heartbeats to collaboration monitors on other nodes.<br>
* 2. Check for stale remote tasks.<br> * 2. Check for stale remote tasks.<br>
* 3. Check the availability of key collaboration services.<br>
*/ */
periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build()); periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES); periodicTasksExecutor.scheduleAtFixedRate(new HeartbeatTask(), HEARTBEAT_INTERVAL_MINUTES, HEARTBEAT_INTERVAL_MINUTES, TimeUnit.MINUTES);
periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_MINUTES, STALE_TASKS_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES); periodicTasksExecutor.scheduleAtFixedRate(new StaleTaskDetectionTask(), STALE_TASKS_DETECTION_INTERVAL_MINUTES, STALE_TASKS_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
periodicTasksExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_MINUTES, CRASH_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
} }
/** /**
@ -514,80 +501,6 @@ final class CollaborationMonitor {
} }
} }
/**
* A Runnable task that periodically checks the availability of
* collaboration resources (PostgreSQL server, Solr server, Active MQ
* message broker) and reports status to the user in case of a gap in
* service.
*/
private final static class CrashDetectionTask implements Runnable {
private static boolean dbServerIsRunning = true;
private static boolean solrServerIsRunning = true;
private static boolean messageServerIsRunning = true;
private static final Object lock = new Object();
/**
* Monitor the availability of collaboration resources
*/
@Override
public void run() {
synchronized (lock) {
CaseDbConnectionInfo dbInfo = UserPreferences.getDatabaseConnectionInfo();
if (dbInfo.canConnect()) {
if (!dbServerIsRunning) {
dbServerIsRunning = true;
logger.log(Level.INFO, "Connection to PostgreSQL server restored"); //NON-NLS
MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredDbService.notify.msg"));
}
} else {
if (dbServerIsRunning) {
dbServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to PostgreSQL server"); //NON-NLS
MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedDbService.notify.msg"));
}
}
KeywordSearchService kwsService = Case.getCurrentCase().getServices().getKeywordSearchService();
if (kwsService.canConnectToRemoteSolrServer()) {
if (!solrServerIsRunning) {
solrServerIsRunning = true;
logger.log(Level.INFO, "Connection to Solr server restored"); //NON-NLS
MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredSolrService.notify.msg"));
}
}
else {
if (solrServerIsRunning) {
solrServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to Solr server"); //NON-NLS
MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedSolrService.notify.msg"));
}
}
MessageServiceConnectionInfo msgInfo = UserPreferences.getMessageServiceConnectionInfo();
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(msgInfo.getUserName(), msgInfo.getPassword(), msgInfo.getURI());
Connection connection = connectionFactory.createConnection();
connection.start();
connection.close();
if (!messageServerIsRunning) {
messageServerIsRunning = true;
logger.log(Level.INFO, "Connection to ActiveMQ server restored"); //NON-NLS
MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredMessageService.notify.msg"));
}
} catch (URISyntaxException | JMSException ex) {
if (messageServerIsRunning) {
messageServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to ActiveMQ server", ex); //NON-NLS
MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedMessageService.notify.msg"));
}
}
}
}
}
/** /**
* An Autopsy event to be sent in event messages to the collaboration * An Autopsy event to be sent in event messages to the collaboration
* monitors on other Autopsy nodes. * monitors on other Autopsy nodes.

View File

@ -0,0 +1,166 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2013-2015 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.core;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
import org.sleuthkit.autopsy.events.AutopsyEventPublisher;
import org.sleuthkit.autopsy.events.MessageServiceConnectionInfo;
import org.sleuthkit.autopsy.keywordsearchservice.KeywordSearchService;
import org.sleuthkit.datamodel.CaseDbConnectionInfo;
/**
* This class periodically checks availability of collaboration resources
* (PostgreSQL server, Solr server, Active MQ message broker) and reports status
* to the user in case of a gap in service.
*/
public class ServicesMonitor {
private AutopsyEventPublisher eventPublisher;
private static final Logger logger = Logger.getLogger(ServicesMonitor.class.getName());
private static ServicesMonitor instance;
private final ScheduledThreadPoolExecutor periodicTasksExecutor;
private static final String PERIODIC_TASK_THREAD_NAME = "services-monitor-periodic-task-%d";
private static final int NUMBER_OF_PERIODIC_TASK_THREADS = 1;
private static final long CRASH_DETECTION_INTERVAL_MINUTES = 2;
/**
* List of services that are being monitored.
*/
public enum Service {
/**
* Property change event fired when ....TODO.... The old value of the
* PropertyChangeEvent object is set to the ingest job id, and the new
* value is set to null.
*/
REMOTE_CASE_DATABASE,
REMOTE_KEYWORD_SEARCH,
MESSAGING
};
enum ServiceStatus {
UP,
DOWN
};
public synchronized static ServicesMonitor getInstance() {
if (instance == null) {
instance = new ServicesMonitor();
}
return instance;
}
private ServicesMonitor() {
this.eventPublisher = new AutopsyEventPublisher();
/**
* Start periodic task that check the availability of key collaboration
* services.
*/
periodicTasksExecutor = new ScheduledThreadPoolExecutor(NUMBER_OF_PERIODIC_TASK_THREADS, new ThreadFactoryBuilder().setNameFormat(PERIODIC_TASK_THREAD_NAME).build());
periodicTasksExecutor.scheduleAtFixedRate(new CrashDetectionTask(), CRASH_DETECTION_INTERVAL_MINUTES, CRASH_DETECTION_INTERVAL_MINUTES, TimeUnit.MINUTES);
}
/**
* A Runnable task that periodically checks the availability of
* collaboration resources (PostgreSQL server, Solr server, Active MQ
* message broker) and reports status to the user in case of a gap in
* service.
*/
private final static class CrashDetectionTask implements Runnable {
private static boolean dbServerIsRunning = true;
private static boolean solrServerIsRunning = true;
private static boolean messageServerIsRunning = true;
private static final Object lock = new Object();
/**
* Monitor the availability of collaboration resources
*/
@Override
public void run() {
synchronized (lock) {
CaseDbConnectionInfo dbInfo = UserPreferences.getDatabaseConnectionInfo();
if (dbInfo.canConnect()) {
if (!dbServerIsRunning) {
dbServerIsRunning = true;
logger.log(Level.INFO, "Connection to PostgreSQL server restored"); //NON-NLS
//MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredDbService.notify.msg"));
}
} else {
if (dbServerIsRunning) {
dbServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to PostgreSQL server"); //NON-NLS
//MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedDbService.notify.msg"));
}
}
KeywordSearchService kwsService = Case.getCurrentCase().getServices().getKeywordSearchService();
if (kwsService.canConnectToRemoteSolrServer()) {
if (!solrServerIsRunning) {
solrServerIsRunning = true;
logger.log(Level.INFO, "Connection to Solr server restored"); //NON-NLS
//MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredSolrService.notify.msg"));
}
} else {
if (solrServerIsRunning) {
solrServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to Solr server"); //NON-NLS
//MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedSolrService.notify.msg"));
}
}
MessageServiceConnectionInfo msgInfo = UserPreferences.getMessageServiceConnectionInfo();
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(msgInfo.getUserName(), msgInfo.getPassword(), msgInfo.getURI());
Connection connection = connectionFactory.createConnection();
connection.start();
connection.close();
if (!messageServerIsRunning) {
messageServerIsRunning = true;
logger.log(Level.INFO, "Connection to ActiveMQ server restored"); //NON-NLS
//MessageNotifyUtil.Notify.info(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.restoredMessageService.notify.msg"));
}
} catch (URISyntaxException | JMSException ex) {
if (messageServerIsRunning) {
messageServerIsRunning = false;
logger.log(Level.SEVERE, "Failed to connect to ActiveMQ server", ex); //NON-NLS
//MessageNotifyUtil.Notify.error(NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedService.notify.title"), NbBundle.getMessage(CollaborationMonitor.class, "CollaborationMonitor.failedMessageService.notify.msg"));
}
}
}
}
}
}