Merge pull request #2465 from rcordovano/open_case_resources

Tidy up coordination service classes
This commit is contained in:
Richard Cordovano 2017-01-12 16:06:11 -05:00 committed by GitHub
commit f197b47d21
6 changed files with 141 additions and 134 deletions

View File

@ -1011,7 +1011,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
// The shared lock needs to be created on a special thread so it can be released
// from the same thread.
Future<Void> future = getCurrentCaseExecutor().submit(() -> {
currentCaseLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, caseDir);
currentCaseLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, caseDir);
if (null == currentCaseLock) {
throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension()));
}
@ -1022,7 +1022,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
// The exclusive lock uses the unique case name.
// This lock does not need to be on a special thread since it will be released before
// leaving this method
exclusiveResourceLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE,
exclusiveResourceLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE,
dbName, 12, TimeUnit.HOURS);
if (null == exclusiveResourceLock) {
throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension()));
@ -1272,7 +1272,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
// The shared lock needs to be created on a special thread so it can be released
// from the same thread.
Future<Void> future = getCurrentCaseExecutor().submit(() -> {
currentCaseLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, metadata.getCaseDirectory());
currentCaseLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, metadata.getCaseDirectory());
if (null == currentCaseLock) {
throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension()));
}
@ -1283,7 +1283,7 @@ public class Case implements SleuthkitCase.ErrorObserver {
// The exclusive lock uses the unique case name
// This lock does not need to be on a special thread since it will be released before
// leaving this method
exclusiveResourceLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE,
exclusiveResourceLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE,
metadata.getCaseDatabaseName(), 12, TimeUnit.HOURS);
if (null == exclusiveResourceLock) {
throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension()));

View File

@ -1,7 +1,7 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2015 Basis Technology Corp.
* Copyright 2011-2017 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -18,129 +18,89 @@
*/
package org.sleuthkit.autopsy.coordinationservice;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.sleuthkit.autopsy.core.UserPreferences;
import java.io.IOException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.sleuthkit.autopsy.core.UserPreferences;
/**
* A centralized service for maintaining configuration information and providing
* distributed synchronization using a shared hierarchical namespace of nodes.
* A coordination service for maintaining configuration information and
* providing distributed synchronization using a shared hierarchical namespace
* of nodes.
*
* TODO (JIRA 2205): Simple refactoring for general use.
*/
public final class CoordinationService {
/**
* Category nodes are the immediate children of the root node of a shared
* hierarchical namespace managed by the coordination service.
*/
public enum CategoryNode { // RJCTODO: Move this to CoordinationServiceNamespace
CASES("cases"),
MANIFESTS("manifests"),
CONFIG("config"),
RESOURCE("resource");
private final String displayName;
private CategoryNode(String displayName) {
this.displayName = displayName;
}
public String getDisplayName() {
return displayName;
}
}
/**
* Exception type thrown by the coordination service.
*/
public final static class CoordinationServiceException extends Exception {
private static final long serialVersionUID = 1L;
private CoordinationServiceException(String message) {
super(message);
}
private CoordinationServiceException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* An opaque encapsulation of a lock for use in distributed synchronization.
* Instances are obtained by calling a get lock method and must be passed to
* a release lock method.
*/
public static class Lock implements AutoCloseable {
/**
* This implementation uses the Curator read/write lock. see
* http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
*/
private final InterProcessMutex interProcessLock;
private final String nodePath;
private Lock(String nodePath, InterProcessMutex lock) {
this.nodePath = nodePath;
this.interProcessLock = lock;
}
public String getNodePath() {
return nodePath;
}
public void release() throws CoordinationServiceException {
try {
this.interProcessLock.release();
} catch (Exception ex) {
throw new CoordinationServiceException(String.format("Failed to release the lock on %s", nodePath), ex);
}
}
@Override
public void close() throws CoordinationServiceException {
release();
}
}
private static CuratorFramework curator = null;
private static final Map<String, CoordinationService> rootNodesToServices = new HashMap<>();
private final Map<String, String> categoryNodeToPath = new HashMap<>();
private static final int SESSION_TIMEOUT_MILLISECONDS = 300000;
private static final int CONNECTION_TIMEOUT_MILLISECONDS = 300000;
private static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 3000;
private static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS = 15000;
private static final int PORT_OFFSET = 1000;
private static final int PORT_OFFSET = 1000; // When run in Solr, ZooKeeper defaults to Solr port + 1000
private final Map<String, String> categoryNodeToPath = new HashMap<>();
/**
* Gets an instance of the centralized coordination service for a specific
* namespace.
* Determines if ZooKeeper is accessible with the current settings. Closes
* the connection prior to returning.
*
* @return true if a connection was achieved, false otherwise
*
* @throws InterruptedException
* @throws IOException
*/
private static boolean isZooKeeperAccessible() throws InterruptedException, IOException {
boolean result = false;
Object workerThreadWaitNotifyLock = new Object();
int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
ZooKeeper zooKeeper = new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
(WatchedEvent event) -> {
synchronized (workerThreadWaitNotifyLock) {
workerThreadWaitNotifyLock.notify();
}
});
synchronized (workerThreadWaitNotifyLock) {
workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
}
ZooKeeper.States state = zooKeeper.getState();
if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
result = true;
}
zooKeeper.close();
return result;
}
/**
* Gets a coordination service for a specific namespace.
*
* @param rootNode The name of the root node that defines the namespace.
*
* @return The service for the namespace defined by the root node name.
* @return The coordination service.
*
* @throws CoordinationServiceException If an instaNce of the coordination
* @throws CoordinationServiceException If an instance of the coordination
* service cannot be created.
*/
public static synchronized CoordinationService getInstance(String rootNode) throws CoordinationServiceException {
public static synchronized CoordinationService getServiceForNamespace(String rootNode) throws CoordinationServiceException {
/*
* Connect to ZooKeeper via Curator.
*/
if (null == curator) {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
// When run in Solr, ZooKeeper defaults to Solr port + 1000
int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy);
@ -157,7 +117,7 @@ public final class CoordinationService {
CoordinationService service;
try {
service = new CoordinationService(rootNode);
} catch (Exception ex) {
} catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) {
curator = null;
throw new CoordinationServiceException("Failed to create coordination service", ex);
}
@ -167,15 +127,18 @@ public final class CoordinationService {
}
/**
* Constructs an instance of the centralized coordination service for a
* specific namespace.
* Constructs an instance of the coordination service for a specific
* namespace.
*
* @param rootNodeName The name of the root node that defines the namespace.
*
* @throws Exception (calls Curator methods that throw Exception instead of
* more specific exceptions)
*/
private CoordinationService(String rootNodeName) throws Exception {
private CoordinationService(String rootNodeName) throws InterruptedException, IOException, KeeperException, CoordinationServiceException {
if (false == isZooKeeperAccessible()) {
throw new Exception("Unable to access ZooKeeper");
throw new CoordinationServiceException("Unable to access ZooKeeper");
}
String rootNode = rootNodeName;
@ -191,6 +154,8 @@ public final class CoordinationService {
if (ex.code() != KeeperException.Code.NODEEXISTS) {
throw ex;
}
} catch (Exception ex) {
throw new CoordinationServiceException("Curator experienced an error", ex);
}
categoryNodeToPath.put(node.getDisplayName(), nodePath);
}
@ -385,35 +350,77 @@ public final class CoordinationService {
}
/**
* Determines if ZooKeeper is accessible with the current settings. Closes
* the connection prior to returning.
*
* @return true if a connection was achieved, false otherwise
* Exception type thrown by the coordination service.
*/
private static boolean isZooKeeperAccessible() {
boolean result = false;
Object workerThreadWaitNotifyLock = new Object();
int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET;
String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort;
public final static class CoordinationServiceException extends Exception {
private static final long serialVersionUID = 1L;
private CoordinationServiceException(String message) {
super(message);
}
private CoordinationServiceException(String message, Throwable cause) {
super(message, cause);
}
}
/**
* An opaque encapsulation of a lock for use in distributed synchronization.
* Instances are obtained by calling a get lock method and must be passed to
* a release lock method.
*/
public static class Lock implements AutoCloseable {
/**
* This implementation uses the Curator read/write lock. see
* http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html
*/
private final InterProcessMutex interProcessLock;
private final String nodePath;
private Lock(String nodePath, InterProcessMutex lock) {
this.nodePath = nodePath;
this.interProcessLock = lock;
}
public String getNodePath() {
return nodePath;
}
public void release() throws CoordinationServiceException {
try {
ZooKeeper zooKeeper = new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS,
(WatchedEvent event) -> {
this.interProcessLock.release();
} catch (Exception ex) {
throw new CoordinationServiceException(String.format("Failed to release the lock on %s", nodePath), ex);
}
}
synchronized (workerThreadWaitNotifyLock) {
workerThreadWaitNotifyLock.notify();
@Override
public void close() throws CoordinationServiceException {
release();
}
});
synchronized (workerThreadWaitNotifyLock) {
workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS);
}
ZooKeeper.States state = zooKeeper.getState();
if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) {
result = true;
/**
* Category nodes are the immediate children of the root node of a shared
* hierarchical namespace managed by a coordination service.
*/
public enum CategoryNode {
CASES("cases"),
MANIFESTS("manifests"),
CONFIG("config"),
RESOURCE("resource");
private final String displayName;
private CategoryNode(String displayName) {
this.displayName = displayName;
}
zooKeeper.close();
} catch (InterruptedException | IOException ignored) {
public String getDisplayName() {
return displayName;
}
return result;
}
}

View File

@ -1,7 +1,7 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2015 Basis Technology Corp.
* Copyright 2016-2017 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
@ -19,7 +19,7 @@
package org.sleuthkit.autopsy.coordinationservice;
/**
* Namespace elements for auto ingest coordination service nodes.
* Root node for Autopsy coordination service namespace.
*/
public final class CoordinationServiceNamespace {
private static final String ROOT = "autopsy";

View File

@ -431,7 +431,7 @@ final class AutoIngestJobLogger {
* log file.
*/
private void log(MessageCategory category, String message) throws AutoIngestJobLoggerException, InterruptedException {
try (Lock lock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, getLogPath(caseDirectoryPath).toString(), LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) {
try (Lock lock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, getLogPath(caseDirectoryPath).toString(), LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) {
if (null != lock) {
File logFile = getLogPath(caseDirectoryPath).toFile();
try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(logFile, logFile.exists())), true)) {

View File

@ -232,7 +232,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
void startUp() throws AutoIngestManagerStartupException {
SYS_LOGGER.log(Level.INFO, "Auto ingest starting");
try {
coordinationService = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot());
coordinationService = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot());
} catch (CoordinationServiceException ex) {
throw new AutoIngestManagerStartupException("Failed to get coordination service", ex);
}

View File

@ -160,7 +160,7 @@ public class SharedConfiguration {
File remoteFolder = getSharedFolder();
try (Lock writeLock = CoordinationService.getInstance(LOCK_ROOT).tryGetExclusiveLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
try (Lock writeLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetExclusiveLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
if (writeLock == null) {
logger.log(Level.INFO, String.format("Failed to lock %s - another node is currently uploading or downloading configuration", remoteFolder.getAbsolutePath()));
return SharedConfigResult.LOCKED;
@ -230,7 +230,7 @@ public class SharedConfiguration {
File remoteFolder = getSharedFolder();
try (Lock readLock = CoordinationService.getInstance(LOCK_ROOT).tryGetSharedLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
try (Lock readLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetSharedLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) {
if (readLock == null) {
return SharedConfigResult.LOCKED;
}