Simplify CoordinationService API and make it thread-safe

This commit is contained in:
Richard Cordovano 2017-03-11 17:14:23 -05:00
parent de8dc8f425
commit 6c32991bc9
6 changed files with 57 additions and 67 deletions

View File

@ -72,6 +72,7 @@ import org.sleuthkit.autopsy.casemodule.events.DataSourceAddedEvent;
import org.sleuthkit.autopsy.casemodule.events.ReportAddedEvent;
import org.sleuthkit.autopsy.casemodule.services.Services;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CategoryNode;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CoordinationServiceException;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.Lock;
import org.sleuthkit.autopsy.core.RuntimeProperties;
@ -672,7 +673,7 @@ public class Case {
* cannot be deleted if another node has it open.
*/
progressIndicator.start(Bundle.Case_progressMessage_acquiringLocks());
try (CoordinationService.Lock dirLock = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, metadata.getCaseDirectory())) {
try (CoordinationService.Lock dirLock = CoordinationService.getInstance().tryGetExclusiveLock(CategoryNode.CASES, metadata.getCaseDirectory())) {
assert (null != dirLock);
/*
@ -944,7 +945,7 @@ public class Case {
@Messages({"Case.creationException.couldNotAcquireNameLock=Failed to get lock on case name"})
private static CoordinationService.Lock acquireExclusiveCaseNameLock(String caseName) throws CaseActionException {
try {
Lock lock = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, caseName, NAME_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
Lock lock = CoordinationService.getInstance().tryGetExclusiveLock(CategoryNode.CASES, caseName, NAME_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
if (null == lock) {
throw new CaseActionException(Bundle.Case_creationException_couldNotAcquireNameLock());
}
@ -970,7 +971,7 @@ public class Case {
private static CoordinationService.Lock acquireExclusiveCaseResourcesLock(String caseName) throws CaseActionException {
try {
String resourcesNodeName = caseName + "_resources";
Lock lock = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, resourcesNodeName, RESOURCE_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
Lock lock = CoordinationService.getInstance().tryGetExclusiveLock(CategoryNode.CASES, resourcesNodeName, RESOURCE_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
if (null == lock) {
throw new CaseActionException(Bundle.Case_creationException_couldNotAcquireResourcesLock());
}
@ -2336,7 +2337,7 @@ public class Case {
@Messages({"Case.creationException.couldNotAcquireDirLock=Failed to get lock on case directory."})
private void acquireSharedCaseDirLock(String caseDir) throws CaseActionException {
try {
caseDirLock = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, caseDir, SHARED_DIR_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
caseDirLock = CoordinationService.getInstance().tryGetSharedLock(CategoryNode.CASES, caseDir, SHARED_DIR_LOCK_TIMOUT_HOURS, TimeUnit.HOURS);
if (null == caseDirLock) {
throw new CaseActionException(Bundle.Case_creationException_couldNotAcquireDirLock());
}

View File

@ -20,10 +20,12 @@ package org.sleuthkit.autopsy.coordinationservice;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@ -43,9 +45,8 @@ import org.sleuthkit.autopsy.core.UserPreferences;
* A coordination service for maintaining configuration information and
* providing distributed synchronization using a shared hierarchical namespace
* of nodes.
*
* TODO (JIRA 2205): Simple refactoring for general use.
*/
@ThreadSafe
public final class CoordinationService {
private static final int SESSION_TIMEOUT_MILLISECONDS = 300000;
@ -54,8 +55,10 @@ public final class CoordinationService {
private static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS = 15000;
private static final int PORT_OFFSET = 1000; // When run in Solr, ZooKeeper defaults to Solr port + 1000
private static final String DEFAULT_NAMESPACE_ROOT = "autopsy";
private static final Map<String, CoordinationService> rootNodesToServices = new HashMap<>();
private static CuratorFramework curator;
@GuardedBy("CoordinationService.class")
private static CoordinationService instance;
private final CuratorFramework curator;
@GuardedBy("categoryNodeToPath")
private final Map<String, String> categoryNodeToPath;
/**
@ -90,59 +93,31 @@ public final class CoordinationService {
}
/**
* Gets the name of the root node of the namespace for the application.
* Gets the coordination service for maintaining configuration information
* and providing distributed synchronization using a shared hierarchical
* namespace of nodes.
*
* @return The name of the root node for the application namespace.
* @return The corrdination service.
*
* @throws CoordinationServiceException
*/
public static String getAppNamespaceRoot() {
Collection<? extends AppCoordinationServiceNamespace> providers = Lookup.getDefault().lookupAll(AppCoordinationServiceNamespace.class);
Iterator<? extends AppCoordinationServiceNamespace> it = providers.iterator();
public synchronized static CoordinationService getInstance() throws CoordinationServiceException {
if (null == instance) {
String rootNode;
Collection<? extends CoordinationServiceNamespace> providers = Lookup.getDefault().lookupAll(CoordinationServiceNamespace.class);
Iterator<? extends CoordinationServiceNamespace> it = providers.iterator();
if (it.hasNext()) {
return it.next().getAppNamespaceRoot();
rootNode = it.next().getNamespaceRoot();
} else {
return DEFAULT_NAMESPACE_ROOT;
rootNode = DEFAULT_NAMESPACE_ROOT;
}
}
/**
* Gets a coordination service for a specific namespace.
*
* @param rootNode The name of the root node that defines the namespace.
*
* @return The coordination service.
*
* @throws CoordinationServiceException If an instance of the coordination
* service cannot be created.
*/
public static synchronized CoordinationService getServiceForNamespace(String rootNode) throws CoordinationServiceException {
/*
* Connect to ZooKeeper via Curator.
*/
if (null == curator) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
curator.start();
}
/*
* Get or create a coordination service for the namespace defined by the
* specified root node.
*/
if (rootNodesToServices.containsKey(rootNode)) {
return rootNodesToServices.get(rootNode);
} else {
CoordinationService service;
try {
service = new CoordinationService(rootNode);
instance = new CoordinationService(rootNode);
} catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
curator = null;
throw new CoordinationServiceException("Failed to create coordination service", ex);
}
rootNodesToServices.put(rootNode, service);
return service;
}
return instance;
}
/**
@ -160,12 +135,23 @@ public final class CoordinationService {
throw new CoordinationServiceException("Unable to access ZooKeeper");
}
/*
* Connect to ZooKeeper via Curator.
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
curator.start();
/*
* Create the top-level root and category nodes.
*/
String rootNode = rootNodeName;
if (!rootNode.startsWith("/")) {
rootNode = "/" + rootNode;
}
categoryNodeToPath = new HashMap<>();
categoryNodeToPath = new ConcurrentHashMap<>();
for (CategoryNode node : CategoryNode.values()) {
String nodePath = rootNode + "/" + node.getDisplayName();
try {

View File

@ -19,15 +19,18 @@
package org.sleuthkit.autopsy.coordinationservice;
/**
* Interface for providers of application-level coordination service namespaces.
* An interface that allows the root node of the coordination service namespace
* for the application to be specified at runtime. An application built on the
* Autopsy platform should provide at most one implementation of this interface
* (additional implementations are ignored).
*/
public interface AppCoordinationServiceNamespace {
public interface CoordinationServiceNamespace {
/**
* Gets the name of the root node of the namespace for the application.
* Gets the name of the root node of the coordination service namespace.
*
* @return The name of the root node for the application namespace.
* @return The name of the root node.
*/
public String getAppNamespaceRoot();
public String getNamespaceRoot();
}

View File

@ -436,7 +436,7 @@ final class AutoIngestJobLogger {
* log file.
*/
private void log(MessageCategory category, String message) throws AutoIngestJobLoggerException, InterruptedException {
try (Lock lock = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, logLockName, LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) {
try (Lock lock = CoordinationService.getInstance().tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, logLockName, LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) {
if (null != lock) {
File logFile = getLogPath(caseDirectoryPath).toFile();
try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(logFile, logFile.exists())), true)) {

View File

@ -211,7 +211,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
void startUp() throws AutoIngestManagerStartupException {
SYS_LOGGER.log(Level.INFO, "Auto ingest starting");
try {
coordinationService = CoordinationService.getServiceForNamespace(CoordinationService.getAppNamespaceRoot());
coordinationService = CoordinationService.getInstance();
} catch (CoordinationServiceException ex) {
throw new AutoIngestManagerStartupException("Failed to get coordination service", ex);
}

View File

@ -50,6 +50,7 @@ import org.sleuthkit.autopsy.core.ServicesMonitor;
import org.sleuthkit.autopsy.modules.hashdatabase.HashDbManager.HashDb;
import org.sleuthkit.autopsy.experimental.configuration.AutoIngestSettingsPanel.UpdateConfigSwingWorker;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CategoryNode;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.Lock;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CoordinationServiceException;
@ -86,7 +87,6 @@ public class SharedConfiguration {
private static final String PREFERENCES_FOLDER = "Preferences"; //NON-NLS
public static final String FILE_EXPORTER_FOLDER = "Automated File Exporter"; //NON-NLS
private static final String LOCK_ROOT = "/autopsy"; // NON-NLS
private static final String UPLOAD_IN_PROGRESS_FILE = "uploadInProgress"; // NON-NLS
private static final String moduleDirPath = PlatformUtil.getUserConfigDirectory();
private static final Logger logger = Logger.getLogger(SharedConfiguration.class.getName());
@ -160,7 +160,7 @@ public class SharedConfiguration {
File remoteFolder = getSharedFolder();
try (Lock writeLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetExclusiveLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
try (Lock writeLock = CoordinationService.getInstance().tryGetExclusiveLock(CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
if (writeLock == null) {
logger.log(Level.INFO, String.format("Failed to lock %s - another node is currently uploading or downloading configuration", remoteFolder.getAbsolutePath()));
return SharedConfigResult.LOCKED;
@ -230,7 +230,7 @@ public class SharedConfiguration {
File remoteFolder = getSharedFolder();
try (Lock readLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetSharedLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
try (Lock readLock = CoordinationService.getInstance().tryGetSharedLock(CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
if (readLock == null) {
return SharedConfigResult.LOCKED;
}