Skip to content

Commit

Permalink
Prevent race condition when cluster is being initialized (#104)
Browse files Browse the repository at this point in the history
A race condition can occur between the cluster start and cluster event listeners being registered with the cluster, causing the listeners to miss events that are sent before the listener registers.

In this commit, events are queued to be processed only after the cluster has initialized to overcome this race condition.

fixes #103: Prevent race condition on cluster start
  • Loading branch information
guusdk authored Nov 5, 2024
1 parent d6a77ee commit 612c803
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 30 deletions.
1 change: 1 addition & 0 deletions changelog.html
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ <h1>

<p><b>3.0.1</b> -- (To be determined)</p>
<ul>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/103'>Issue #103</a>] - Fix Cluster initialization race condition</li>
<li>[<a href='https://github.com/igniterealtime/openfire-hazelcast-plugin/issues/102'>Issue #102</a>] - Remove unused code in ClusterListener</li>
</ul>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -50,7 +51,8 @@ public class ClusterListener implements MembershipListener, LifecycleListener {

private boolean seniorClusterMember = false;

private final Cluster cluster;
private CompletableFuture<Cluster> clusterFuture = new CompletableFuture<>();
private Cluster cluster; // Must not be used until clusterFuture is done.
private final Map<NodeID, ClusterNodeInfo> clusterNodesInfo = new ConcurrentHashMap<>();

/**
Expand All @@ -65,15 +67,24 @@ public class ClusterListener implements MembershipListener, LifecycleListener {
private boolean clusterMember = false;
private boolean isSenior;

ClusterListener(final Cluster cluster) {

synchronized void register(final Cluster cluster) {
logger.info("Registering with cluster. Executing any queued events to complete Openfire cluster formation.");
this.cluster = cluster;
this.clusterFuture.complete(cluster);

for (final Member member : cluster.getMembers()) {
clusterNodesInfo.put(ClusteredCacheFactory.getNodeID(member),
new HazelcastClusterNodeInfo(member, cluster.getClusterTime()));
}
}

synchronized void unregister() {
logger.info("Unregistering with cluster. Cancelling any events to that where queued for completing Openfire cluster formation.");
if (!this.clusterFuture.isDone()) {
this.clusterFuture.cancel(true);
}
}

@SuppressWarnings("unchecked")
private boolean isDone() {
return done;
Expand Down Expand Up @@ -137,6 +148,17 @@ private synchronized void leaveCluster() {
public void memberAdded(final MembershipEvent event) {
logger.info("Received a Hazelcast memberAdded event {}", event);

synchronized (this) {
if (!clusterFuture.isDone()) {
logger.info("Queue memberAdded event until after cluster has been established.");
clusterFuture = clusterFuture.thenApply(e -> {
memberAdded(event);
return e;
});
return;
}
}

final boolean wasSenior = isSenior;
isSenior = isSeniorClusterMember();
// local member only
Expand Down Expand Up @@ -210,6 +232,17 @@ private boolean waitForClusterCacheToBeInstalled() {
public void memberRemoved(final MembershipEvent event) {
logger.info("Received a Hazelcast memberRemoved event {}", event);

synchronized (this) {
if (!clusterFuture.isDone()) {
logger.info("Queue memberRemoved event until after cluster has been established.");
clusterFuture = clusterFuture.thenApply(e -> {
memberRemoved(event);
return e;
});
return;
}
}

isSenior = isSeniorClusterMember();
final NodeID nodeID = ClusteredCacheFactory.getNodeID(event.getMember());

Expand Down Expand Up @@ -242,6 +275,19 @@ public List<ClusterNodeInfo> getClusterNodesInfo() {

@Override
public void stateChanged(final LifecycleEvent event) {
logger.info("Received a Hazelcast stateChanged event {}", event);

synchronized (this) {
if (!clusterFuture.isDone()) {
logger.info("Queue LifecycleEvent event until after cluster has been established.");
clusterFuture = clusterFuture.thenApply(e -> {
stateChanged(event);
return e;
});
return;
}
}

if (event.getState().equals(LifecycleState.SHUTDOWN)) {
leaveCluster();
} else if (event.getState().equals(LifecycleState.STARTED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ public class ClusteredCacheFactory implements CacheFactoryStrategy {
private static HazelcastInstance hazelcast = null;
private static Cluster cluster = null;
private ClusterListener clusterListener;
private UUID lifecycleListener;
private UUID membershipListener;

/**
* Keeps that running state. Initial state is stopped.
Expand Down Expand Up @@ -165,38 +163,44 @@ public boolean startCluster() {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
final ClassLoader loader = new ClusterClassLoader();
Thread.currentThread().setContextClassLoader(loader);

// Prepare for the cluster configuration.
final Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE.getValue());
final NetworkConfig networkConfig = config.getNetworkConfig();
if (!HAZELCAST_MEMCACHE_ENABLED.getValue()) {
networkConfig.setMemcacheProtocolConfig(new MemcacheProtocolConfig().setEnabled(false));
}
if (!HAZELCAST_REST_ENABLED.getValue()) {
networkConfig.setRestApiConfig(new RestApiConfig().setEnabled(false));
}
final MemberAttributeConfig memberAttributeConfig = config.getMemberAttributeConfig();
memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.HOST_NAME_ATTRIBUTE, XMPPServer.getInstance().getServerInfo().getHostname());
memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.NODE_ID_ATTRIBUTE, XMPPServer.getInstance().getNodeID().toString());
config.setInstanceName("openfire");
config.setClassLoader(loader);
if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED.getValue()) {
config.setProperty("hazelcast.jmx", "true");
config.setProperty("hazelcast.jmx.detailed", "true");
}

// Ensure that listeners are added before the cluster is initialized, to prevent missing events due to a race condition. See issue #103.
clusterListener = new ClusterListener();
config.addListenerConfig(new ListenerConfig(clusterListener));

int retry = 0;
do {
try {
final Config config = new ClasspathXmlConfig(HAZELCAST_CONFIG_FILE.getValue());
final NetworkConfig networkConfig = config.getNetworkConfig();
if (!HAZELCAST_MEMCACHE_ENABLED.getValue()) {
networkConfig.setMemcacheProtocolConfig(new MemcacheProtocolConfig().setEnabled(false));
}
if (!HAZELCAST_REST_ENABLED.getValue()) {
networkConfig.setRestApiConfig(new RestApiConfig().setEnabled(false));
}
final MemberAttributeConfig memberAttributeConfig = config.getMemberAttributeConfig();
memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.HOST_NAME_ATTRIBUTE, XMPPServer.getInstance().getServerInfo().getHostname());
memberAttributeConfig.setAttribute(HazelcastClusterNodeInfo.NODE_ID_ATTRIBUTE, XMPPServer.getInstance().getNodeID().toString());
config.setInstanceName("openfire");
config.setClassLoader(loader);
if (JMXManager.isEnabled() && HAZELCAST_JMX_ENABLED.getValue()) {
config.setProperty("hazelcast.jmx", "true");
config.setProperty("hazelcast.jmx.detailed", "true");
}
hazelcast = Hazelcast.newHazelcastInstance(config);
cluster = hazelcast.getCluster();
state = State.started;
// CacheFactory is now using clustered caches. We can add our listeners.
clusterListener = new ClusterListener(cluster);
// CacheFactory is now using clustered caches.
clusterListener.register(cluster);
clusterListener.joinCluster();
lifecycleListener = hazelcast.getLifecycleService().addLifecycleListener(clusterListener);
membershipListener = cluster.addMembershipListener(clusterListener);
logger.info("Hazelcast clustering started");
break;
} catch (final Exception e) {
cluster = null;
clusterListener.unregister();
if (retry < CLUSTER_STARTUP_RETRY_COUNT.getValue()) {
logger.warn("Failed to start clustering (" + e.getMessage() + "); " +
"will retry in " + StringUtils.getFullElapsedTime(CLUSTER_STARTUP_RETRY_TIME.getValue()));
Expand Down Expand Up @@ -230,12 +234,9 @@ public void stopCluster() {
// cluster is shutdown so it can be copied in to the non-clustered, DefaultCache
fireLeftClusterAndWaitToComplete(Duration.ofSeconds(30));
// Stop the cluster
hazelcast.getLifecycleService().removeLifecycleListener(lifecycleListener);
cluster.removeMembershipListener(membershipListener);
Hazelcast.shutdownAll();
cluster = null;
lifecycleListener = null;
membershipListener = null;
clusterListener.unregister();
clusterListener = null;

// Reset packet router to use to deliver packets to remote cluster nodes
Expand Down

0 comments on commit 612c803

Please sign in to comment.