diff --git a/changelog.html b/changelog.html
index d8a6b6ba5..b0ad3323f 100644
--- a/changelog.html
+++ b/changelog.html
@@ -46,6 +46,7 @@
3.0.1 -- (To be determined)
+ - [Issue #103] - Fix Cluster initialization race condition
- [Issue #102] - Remove unused code in ClusterListener
diff --git a/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusterListener.java b/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusterListener.java
index 5653795f8..ff3af48a4 100644
--- a/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusterListener.java
+++ b/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusterListener.java
@@ -38,6 +38,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -50,7 +51,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private boolean seniorClusterMember = false;
- private final Cluster cluster;
+ private CompletableFuture clusterFuture = new CompletableFuture<>();
+ private Cluster cluster; // Must not be used until clusterFuture is done.
private final Map clusterNodesInfo = new ConcurrentHashMap<>();
/**
@@ -65,15 +67,24 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private boolean clusterMember = false;
private boolean isSenior;
- ClusterListener(final Cluster cluster) {
-
+ synchronized void register(final Cluster cluster) {
+ logger.info("Registering with cluster. Executing any queued events to complete Openfire cluster formation.");
this.cluster = cluster;
+ this.clusterFuture.complete(cluster);
+
for (final Member member : cluster.getMembers()) {
clusterNodesInfo.put(ClusteredCacheFactory.getNodeID(member),
new HazelcastClusterNodeInfo(member, cluster.getClusterTime()));
}
}
+ synchronized void unregister() {
+ logger.info("Unregistering with cluster. Cancelling any events to that where queued for completing Openfire cluster formation.");
+ if (!this.clusterFuture.isDone()) {
+ this.clusterFuture.cancel(true);
+ }
+ }
+
@SuppressWarnings("unchecked")
private boolean isDone() {
return done;
@@ -137,6 +148,17 @@ private synchronized void leaveCluster() {
public void memberAdded(final MembershipEvent event) {
logger.info("Received a Hazelcast memberAdded event {}", event);
+ synchronized (this) {
+ if (!clusterFuture.isDone()) {
+ logger.info("Queue memberAdded event until after cluster has been established.");
+ clusterFuture = clusterFuture.thenApply(e -> {
+ memberAdded(event);
+ return e;
+ });
+ return;
+ }
+ }
+
final boolean wasSenior = isSenior;
isSenior = isSeniorClusterMember();
// local member only
@@ -210,6 +232,17 @@ private boolean waitForClusterCacheToBeInstalled() {
public void memberRemoved(final MembershipEvent event) {
logger.info("Received a Hazelcast memberRemoved event {}", event);
+ synchronized (this) {
+ if (!clusterFuture.isDone()) {
+ logger.info("Queue memberRemoved event until after cluster has been established.");
+ clusterFuture = clusterFuture.thenApply(e -> {
+ memberRemoved(event);
+ return e;
+ });
+ return;
+ }
+ }
+
isSenior = isSeniorClusterMember();
final NodeID nodeID = ClusteredCacheFactory.getNodeID(event.getMember());
@@ -242,6 +275,19 @@ public List getClusterNodesInfo() {
@Override
public void stateChanged(final LifecycleEvent event) {
+ logger.info("Received a Hazelcast stateChanged event {}", event);
+
+ synchronized (this) {
+ if (!clusterFuture.isDone()) {
+ logger.info("Queue LifecycleEvent event until after cluster has been established.");
+ clusterFuture = clusterFuture.thenApply(e -> {
+ stateChanged(event);
+ return e;
+ });
+ return;
+ }
+ }
+
if (event.getState().equals(LifecycleState.SHUTDOWN)) {
leaveCluster();
} else if (event.getState().equals(LifecycleState.STARTED)) {
diff --git a/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusteredCacheFactory.java b/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusteredCacheFactory.java
index 7b4c29cc5..3c37294c8 100644
--- a/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusteredCacheFactory.java
+++ b/src/java/org/jivesoftware/openfire/plugin/util/cache/ClusteredCacheFactory.java
@@ -134,8 +134,6 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private static HazelcastInstance hazelcast = null;
private static Cluster cluster = null;
private ClusterListener clusterListener;
- private UUID lifecycleListener;
- private UUID membershipListener;
/**
* Keeps that running state. Initial state is stopped.
@@ -165,38 +163,44 @@ public boolean startCluster() {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader loader = new ClusterClassLoader();
Thread.currentThread().setContextClassLoader(loader);
+
+ // Prepare for the cluster configuration.
+ final Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE.getValue());
+ final NetworkConfig networkConfig = config.getNetworkConfig();
+ if (!HAZELCAST_MEMCACHE_ENABLED.getValue()) {
+ networkConfig.setMemcacheProtocolConfig(new MemcacheProtocolConfig().setEnabled(false));
+ }
+ if (!HAZELCAST_REST_ENABLED.getValue()) {
+ networkConfig.setRestApiConfig(new RestApiConfig().setEnabled(false));
+ }
+ final MemberAttributeConfig memberAttributeConfig = config.getMemberAttributeConfig();
+ memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.HOST_NAME_ATTRIBUTE, XMPPServer.getInstance().getServerInfo().getHostname());
+ memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.NODE_ID_ATTRIBUTE, XMPPServer.getInstance().getNodeID().toString());
+ config.setInstanceName("openfire");
+ config.setClassLoader(loader);
+ if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED.getValue()) {
+ config.setProperty("hazelcast.jmx", "true");
+ config.setProperty("hazelcast.jmx.detailed", "true");
+ }
+
+ // Ensure that listeners are added before the cluster is initialized, to prevent missing events due to a race condition. See issue #103.
+ clusterListener = new ClusterListener();
+ config.addListenerConfig(new ListenerConfig(clusterListener));
+
int retry = 0;
do {
try {
- final Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE.getValue());
- final NetworkConfig networkConfig = config.getNetworkConfig();
- if (!HAZELCAST_MEMCACHE_ENABLED.getValue()) {
- networkConfig.setMemcacheProtocolConfig(new MemcacheProtocolConfig().setEnabled(false));
- }
- if (!HAZELCAST_REST_ENABLED.getValue()) {
- networkConfig.setRestApiConfig(new RestApiConfig().setEnabled(false));
- }
- final MemberAttributeConfig memberAttributeConfig = config.getMemberAttributeConfig();
- memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.HOST_NAME_ATTRIBUTE, XMPPServer.getInstance().getServerInfo().getHostname());
- memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.NODE_ID_ATTRIBUTE, XMPPServer.getInstance().getNodeID().toString());
- config.setInstanceName("openfire");
- config.setClassLoader(loader);
- if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED.getValue()) {
- config.setProperty("hazelcast.jmx", "true");
- config.setProperty("hazelcast.jmx.detailed", "true");
- }
hazelcast = Hazelcast.newHazelcastInstance(config);
cluster = hazelcast.getCluster();
state = State.started;
- // CacheFactory is now using clustered caches. We can add our listeners.
- clusterListener = new ClusterListener(cluster);
+ // CacheFactory is now using clustered caches.
+ clusterListener.register(cluster);
clusterListener.joinCluster();
- lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
- membershipListener = cluster.addMembershipListener(clusterListener);
logger.info("Hazelcast clustering started");
break;
} catch (final Exception e) {
cluster = null;
+ clusterListener.unregister();
if (retry < CLUSTER_STARTUP_RETRY_COUNT.getValue()) {
logger.warn("Failed to start clustering (" + e.getMessage() + "); " +
"will retry in " + StringUtils.getFullElapsedTime(CLUSTER_STARTUP_RETRY_TIME.getValue()));
@@ -230,12 +234,9 @@ public void stopCluster() {
// cluster is shutdown so it can be copied in to the non-clustered, DefaultCache
fireLeftClusterAndWaitToComplete(Duration.ofSeconds(30));
// Stop the cluster
- hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
- cluster.removeMembershipListener(membershipListener);
Hazelcast.shutdownAll();
cluster = null;
- lifecycleListener = null;
- membershipListener = null;
+ clusterListener.unregister();
clusterListener = null;
// Reset packet router to use to deliver packets to remote cluster nodes