From fdd88d549d2c6d3de5e5ef3cc9c79dccdf40e394 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Thu, 12 Jan 2017 15:43:32 -0500 Subject: [PATCH 1/2] Tidy up CoordinationService --- .../sleuthkit/autopsy/casemodule/Case.java | 8 +- .../CoordinationService.java | 255 +++++++++--------- .../autoingest/AutoIngestJobLogger.java | 2 +- .../autoingest/AutoIngestManager.java | 2 +- .../configuration/SharedConfiguration.java | 4 +- 5 files changed, 139 insertions(+), 132 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/casemodule/Case.java b/Core/src/org/sleuthkit/autopsy/casemodule/Case.java index 04c110518f..6410a269ce 100644 --- a/Core/src/org/sleuthkit/autopsy/casemodule/Case.java +++ b/Core/src/org/sleuthkit/autopsy/casemodule/Case.java @@ -1011,7 +1011,7 @@ public class Case implements SleuthkitCase.ErrorObserver { // The shared lock needs to be created on a special thread so it can be released // from the same thread. Future future = getCurrentCaseExecutor().submit(() -> { - currentCaseLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, caseDir); + currentCaseLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, caseDir); if (null == currentCaseLock) { throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension())); } @@ -1022,7 +1022,7 @@ public class Case implements SleuthkitCase.ErrorObserver { // The exclusive lock uses the unique case name. // This lock does not need to be on a special thread since it will be released before // leaving this method - exclusiveResourceLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE, + exclusiveResourceLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE, dbName, 12, TimeUnit.HOURS); if (null == exclusiveResourceLock) { throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension())); @@ -1272,7 +1272,7 @@ public class Case implements SleuthkitCase.ErrorObserver { // The shared lock needs to be created on a special thread so it can be released // from the same thread. Future future = getCurrentCaseExecutor().submit(() -> { - currentCaseLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, metadata.getCaseDirectory()); + currentCaseLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetSharedLock(CoordinationService.CategoryNode.CASES, metadata.getCaseDirectory()); if (null == currentCaseLock) { throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension())); } @@ -1283,7 +1283,7 @@ public class Case implements SleuthkitCase.ErrorObserver { // The exclusive lock uses the unique case name // This lock does not need to be on a special thread since it will be released before // leaving this method - exclusiveResourceLock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE, + exclusiveResourceLock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.RESOURCE, metadata.getCaseDatabaseName(), 12, TimeUnit.HOURS); if (null == exclusiveResourceLock) { throw new CaseActionException(NbBundle.getMessage(Case.class, "Case.exception.errorLocking", CaseMetadata.getFileExtension())); diff --git a/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationService.java b/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationService.java index 39c5c25ba6..adc89d4afa 100644 --- a/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationService.java +++ b/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationService.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2015 Basis Technology Corp. + * Copyright 2011-2017 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -18,129 +18,89 @@ */ package org.sleuthkit.autopsy.coordinationservice; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.ZooDefs; import org.apache.curator.RetryPolicy; -import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; -import org.sleuthkit.autopsy.core.UserPreferences; -import java.io.IOException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.sleuthkit.autopsy.core.UserPreferences; /** - * A centralized service for maintaining configuration information and providing - * distributed synchronization using a shared hierarchical namespace of nodes. + * A coordination service for maintaining configuration information and + * providing distributed synchronization using a shared hierarchical namespace + * of nodes. + * + * TODO (JIRA 2205): Simple refactoring for general use. */ public final class CoordinationService { - /** - * Category nodes are the immediate children of the root node of a shared - * hierarchical namespace managed by the coordination service. - */ - public enum CategoryNode { // RJCTODO: Move this to CoordinationServiceNamespace - - CASES("cases"), - MANIFESTS("manifests"), - CONFIG("config"), - RESOURCE("resource"); - - private final String displayName; - - private CategoryNode(String displayName) { - this.displayName = displayName; - } - - public String getDisplayName() { - return displayName; - } - } - - /** - * Exception type thrown by the coordination service. - */ - public final static class CoordinationServiceException extends Exception { - - private static final long serialVersionUID = 1L; - - private CoordinationServiceException(String message) { - super(message); - } - - private CoordinationServiceException(String message, Throwable cause) { - super(message, cause); - } - } - - /** - * An opaque encapsulation of a lock for use in distributed synchronization. - * Instances are obtained by calling a get lock method and must be passed to - * a release lock method. - */ - public static class Lock implements AutoCloseable { - - /** - * This implementation uses the Curator read/write lock. see - * http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html - */ - private final InterProcessMutex interProcessLock; - private final String nodePath; - - private Lock(String nodePath, InterProcessMutex lock) { - this.nodePath = nodePath; - this.interProcessLock = lock; - } - - public String getNodePath() { - return nodePath; - } - - public void release() throws CoordinationServiceException { - try { - this.interProcessLock.release(); - } catch (Exception ex) { - throw new CoordinationServiceException(String.format("Failed to release the lock on %s", nodePath), ex); - } - } - - @Override - public void close() throws CoordinationServiceException { - release(); - } - } - private static CuratorFramework curator = null; private static final Map rootNodesToServices = new HashMap<>(); - private final Map categoryNodeToPath = new HashMap<>(); private static final int SESSION_TIMEOUT_MILLISECONDS = 300000; private static final int CONNECTION_TIMEOUT_MILLISECONDS = 300000; private static final int ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 3000; private static final int ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS = 15000; - private static final int PORT_OFFSET = 1000; + private static final int PORT_OFFSET = 1000; // When run in Solr, ZooKeeper defaults to Solr port + 1000 + private final Map categoryNodeToPath = new HashMap<>(); /** - * Gets an instance of the centralized coordination service for a specific - * namespace. + * Determines if ZooKeeper is accessible with the current settings. Closes + * the connection prior to returning. + * + * @return true if a connection was achieved, false otherwise + * + * @throws InterruptedException + * @throws IOException + */ + private static boolean isZooKeeperAccessible() throws InterruptedException, IOException { + boolean result = false; + Object workerThreadWaitNotifyLock = new Object(); + int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET; + String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort; + ZooKeeper zooKeeper = new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS, + (WatchedEvent event) -> { + synchronized (workerThreadWaitNotifyLock) { + workerThreadWaitNotifyLock.notify(); + } + }); + synchronized (workerThreadWaitNotifyLock) { + workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS); + } + ZooKeeper.States state = zooKeeper.getState(); + if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) { + result = true; + } + zooKeeper.close(); + return result; + } + + /** + * Gets a coordination service for a specific namespace. * * @param rootNode The name of the root node that defines the namespace. * - * @return The service for the namespace defined by the root node name. + * @return The coordination service. * - * @throws CoordinationServiceException If an instaNce of the coordination + * @throws CoordinationServiceException If an instance of the coordination * service cannot be created. */ - public static synchronized CoordinationService getInstance(String rootNode) throws CoordinationServiceException { + public static synchronized CoordinationService getServiceForNamespace(String rootNode) throws CoordinationServiceException { + /* + * Connect to ZooKeeper via Curator. + */ if (null == curator) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); - // When run in Solr, ZooKeeper defaults to Solr port + 1000 int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET; String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort; curator = CuratorFrameworkFactory.newClient(connectString, SESSION_TIMEOUT_MILLISECONDS, CONNECTION_TIMEOUT_MILLISECONDS, retryPolicy); @@ -157,7 +117,7 @@ public final class CoordinationService { CoordinationService service; try { service = new CoordinationService(rootNode); - } catch (Exception ex) { + } catch (IOException | InterruptedException | KeeperException | CoordinationServiceException ex) { curator = null; throw new CoordinationServiceException("Failed to create coordination service", ex); } @@ -167,15 +127,18 @@ public final class CoordinationService { } /** - * Constructs an instance of the centralized coordination service for a - * specific namespace. + * Constructs an instance of the coordination service for a specific + * namespace. * * @param rootNodeName The name of the root node that defines the namespace. + * + * @throws Exception (calls Curator methods that throw Exception instead of + * more specific exceptions) */ - private CoordinationService(String rootNodeName) throws Exception { + private CoordinationService(String rootNodeName) throws InterruptedException, IOException, KeeperException, CoordinationServiceException { if (false == isZooKeeperAccessible()) { - throw new Exception("Unable to access ZooKeeper"); + throw new CoordinationServiceException("Unable to access ZooKeeper"); } String rootNode = rootNodeName; @@ -191,6 +154,8 @@ public final class CoordinationService { if (ex.code() != KeeperException.Code.NODEEXISTS) { throw ex; } + } catch (Exception ex) { + throw new CoordinationServiceException("Curator experienced an error", ex); } categoryNodeToPath.put(node.getDisplayName(), nodePath); } @@ -385,35 +350,77 @@ public final class CoordinationService { } /** - * Determines if ZooKeeper is accessible with the current settings. Closes - * the connection prior to returning. - * - * @return true if a connection was achieved, false otherwise + * Exception type thrown by the coordination service. */ - private static boolean isZooKeeperAccessible() { - boolean result = false; - Object workerThreadWaitNotifyLock = new Object(); - int zooKeeperServerPort = Integer.valueOf(UserPreferences.getIndexingServerPort()) + PORT_OFFSET; - String connectString = UserPreferences.getIndexingServerHost() + ":" + zooKeeperServerPort; + public final static class CoordinationServiceException extends Exception { - try { - ZooKeeper zooKeeper = new ZooKeeper(connectString, ZOOKEEPER_SESSION_TIMEOUT_MILLIS, - (WatchedEvent event) -> { + private static final long serialVersionUID = 1L; - synchronized (workerThreadWaitNotifyLock) { - workerThreadWaitNotifyLock.notify(); - } - }); - synchronized (workerThreadWaitNotifyLock) { - workerThreadWaitNotifyLock.wait(ZOOKEEPER_CONNECTION_TIMEOUT_MILLIS); - } - ZooKeeper.States state = zooKeeper.getState(); - if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTEDREADONLY) { - result = true; - } - zooKeeper.close(); - } catch (InterruptedException | IOException ignored) { + private CoordinationServiceException(String message) { + super(message); + } + + private CoordinationServiceException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * An opaque encapsulation of a lock for use in distributed synchronization. + * Instances are obtained by calling a get lock method and must be passed to + * a release lock method. + */ + public static class Lock implements AutoCloseable { + + /** + * This implementation uses the Curator read/write lock. see + * http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html + */ + private final InterProcessMutex interProcessLock; + private final String nodePath; + + private Lock(String nodePath, InterProcessMutex lock) { + this.nodePath = nodePath; + this.interProcessLock = lock; + } + + public String getNodePath() { + return nodePath; + } + + public void release() throws CoordinationServiceException { + try { + this.interProcessLock.release(); + } catch (Exception ex) { + throw new CoordinationServiceException(String.format("Failed to release the lock on %s", nodePath), ex); + } + } + + @Override + public void close() throws CoordinationServiceException { + release(); + } + } + + /** + * Category nodes are the immediate children of the root node of a shared + * hierarchical namespace managed by a coordination service. + */ + public enum CategoryNode { + + CASES("cases"), + MANIFESTS("manifests"), + CONFIG("config"), + RESOURCE("resource"); + + private final String displayName; + + private CategoryNode(String displayName) { + this.displayName = displayName; + } + + public String getDisplayName() { + return displayName; } - return result; } } diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestJobLogger.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestJobLogger.java index 3165ad23c9..e9406950fd 100644 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestJobLogger.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestJobLogger.java @@ -431,7 +431,7 @@ final class AutoIngestJobLogger { * log file. */ private void log(MessageCategory category, String message) throws AutoIngestJobLoggerException, InterruptedException { - try (Lock lock = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, getLogPath(caseDirectoryPath).toString(), LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) { + try (Lock lock = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()).tryGetExclusiveLock(CoordinationService.CategoryNode.CASES, getLogPath(caseDirectoryPath).toString(), LOCK_TIME_OUT, LOCK_TIME_OUT_UNIT)) { if (null != lock) { File logFile = getLogPath(caseDirectoryPath).toFile(); try (PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(logFile, logFile.exists())), true)) { diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java index 867ae631e8..bb55281227 100644 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestManager.java @@ -232,7 +232,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang void startUp() throws AutoIngestManagerStartupException { SYS_LOGGER.log(Level.INFO, "Auto ingest starting"); try { - coordinationService = CoordinationService.getInstance(CoordinationServiceNamespace.getRoot()); + coordinationService = CoordinationService.getServiceForNamespace(CoordinationServiceNamespace.getRoot()); } catch (CoordinationServiceException ex) { throw new AutoIngestManagerStartupException("Failed to get coordination service", ex); } diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/configuration/SharedConfiguration.java b/Experimental/src/org/sleuthkit/autopsy/experimental/configuration/SharedConfiguration.java index 7599ed4648..653fc65807 100644 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/configuration/SharedConfiguration.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/configuration/SharedConfiguration.java @@ -160,7 +160,7 @@ public class SharedConfiguration { File remoteFolder = getSharedFolder(); - try (Lock writeLock = CoordinationService.getInstance(LOCK_ROOT).tryGetExclusiveLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) { + try (Lock writeLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetExclusiveLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) { if (writeLock == null) { logger.log(Level.INFO, String.format("Failed to lock %s - another node is currently uploading or downloading configuration", remoteFolder.getAbsolutePath())); return SharedConfigResult.LOCKED; @@ -230,7 +230,7 @@ public class SharedConfiguration { File remoteFolder = getSharedFolder(); - try (Lock readLock = CoordinationService.getInstance(LOCK_ROOT).tryGetSharedLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) { + try (Lock readLock = CoordinationService.getServiceForNamespace(LOCK_ROOT).tryGetSharedLock(CoordinationService.CategoryNode.CONFIG, remoteFolder.getAbsolutePath(), 30, TimeUnit.MINUTES)) { if (readLock == null) { return SharedConfigResult.LOCKED; } From cf2e5811de0801043cebf456791c6d9ce65f2b58 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Thu, 12 Jan 2017 16:03:58 -0500 Subject: [PATCH 2/2] Tidy up CoordinationServiceNamespace --- .../coordinationservice/CoordinationServiceNamespace.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationServiceNamespace.java b/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationServiceNamespace.java index e1f2a3df42..567dd38bc6 100644 --- a/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationServiceNamespace.java +++ b/Core/src/org/sleuthkit/autopsy/coordinationservice/CoordinationServiceNamespace.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2015 Basis Technology Corp. + * Copyright 2016-2017 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -19,7 +19,7 @@ package org.sleuthkit.autopsy.coordinationservice; /** - * Namespace elements for auto ingest coordination service nodes. + * Root node for Autopsy coordination service namespace. */ public final class CoordinationServiceNamespace { private static final String ROOT = "autopsy";