Skip to content

Commit

Permalink
Revert "Integ Test for RemotePublication can be disabled by rolling r…
Browse files Browse the repository at this point in the history
…estart o…"

This reverts commit 2eb148c.
  • Loading branch information
shiv0408 authored Sep 5, 2024
1 parent 375dda3 commit 8b5e8a9
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationState;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.coordination.PublishClusterStateStats;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
Expand All @@ -30,27 +27,18 @@
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.fs.ReloadableFsRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS;
import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY;
import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL;
import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
Expand Down Expand Up @@ -89,7 +77,10 @@ public void setup() {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled).build();
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled)
.build();
}

@Override
Expand Down Expand Up @@ -229,121 +220,11 @@ public void testRemotePublicationDownloadStats() {
NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(DISCOVERY.metricName())
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);
}

public void testRemotePublicationDisabledByRollingRestart() throws Exception {
prepareCluster(3, 2, INDEX_NAME, 1, 2);
ensureStableCluster(5);
ensureGreen(INDEX_NAME);

Set<String> clusterManagers = internalCluster().getClusterManagerNames();
Set<String> restartedMasters = new HashSet<>();

for (String clusterManager : clusterManagers) {
internalCluster().restartNode(clusterManager, new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
restartedMasters.add(nodeName);
return Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, false).build();
}

@Override
public void doAfterNodes(int n, Client client) {
String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream()
.filter(node -> !Objects.equals(node, activeCM))
.collect(Collectors.toSet());
boolean activeCMRestarted = restartedMasters.contains(activeCM);
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
// after master is flipped to restarted master, publication should happen on Transport
response.getNodes().forEach(nodeStats -> {
if (activeCMRestarted) {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(
stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0
);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
} else {
DiscoveryStats stats = nodeStats.getDiscoveryStats();
assertEquals(0, stats.getPublishStats().getFullClusterStateReceivedCount());
assertEquals(0, stats.getPublishStats().getCompatibleClusterStateDiffReceivedCount());
assertEquals(0, stats.getPublishStats().getIncompatibleClusterStateDiffReceivedCount());
}
});

NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertTrue(
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()) != activeCMRestarted
);

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(
PersistedStateRegistry.PersistedStateType.REMOTE
);
if (activeCMRestarted) {
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
} else {
ClusterState localState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.LOCAL)
.getLastAcceptedState();
ClusterState remotePersistedState = remoteState.getLastAcceptedState();
assertTrue(isGlobalStateEquals(localState.metadata(), remotePersistedState.metadata()));
assertEquals(localState.nodes(), remotePersistedState.nodes());
assertEquals(localState.routingTable(), remotePersistedState.routingTable());
assertEquals(localState.customs(), remotePersistedState.customs());
}
});
}
});

}
ensureGreen(INDEX_NAME);
ensureStableCluster(5);

String activeCM = internalCluster().getClusterManagerName();
Set<String> followingCMs = clusterManagers.stream().filter(node -> !Objects.equals(node, activeCM)).collect(Collectors.toSet());
NodesStatsResponse response = client().admin()
.cluster()
.prepareNodesStats(followingCMs.toArray(new String[0]))
.clear()
.addMetric(DISCOVERY.metricName())
.get();
response.getNodes().forEach(nodeStats -> {
PublishClusterStateStats stats = nodeStats.getDiscoveryStats().getPublishStats();
assertTrue(stats.getFullClusterStateReceivedCount() > 0 || stats.getCompatibleClusterStateDiffReceivedCount() > 0);
assertEquals(0, stats.getIncompatibleClusterStateDiffReceivedCount());
});
NodesInfoResponse nodesInfoResponse = client().admin()
.cluster()
.prepareNodesInfo(activeCM)
.clear()
.addMetric(SETTINGS.metricName())
.get();
// if masterRestarted is true Publication Setting should be false, and vice versa
assertFalse(REMOTE_PUBLICATION_EXPERIMENTAL_SETTING.get(nodesInfoResponse.getNodes().get(0).getSettings()));

followingCMs.forEach(node -> {
PersistedStateRegistry registry = internalCluster().getInstance(PersistedStateRegistry.class, node);
CoordinationState.PersistedState remoteState = registry.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
assertNull(remoteState.getLastAcceptedState());
// assertNull(remoteState.getLastAcceptedManifest());
});
}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2143,17 +2143,6 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
}
}

/**
* Returns the name of all the cluster managers in the cluster
*/
public Set<String> getClusterManagerNames() {
return nodes.entrySet()
.stream()
.filter(entry -> CLUSTER_MANAGER_NODE_PREDICATE.test(entry.getValue()))
.map(entry -> entry.getKey())
.collect(Collectors.toSet());
}

/**
* Returns the name of the current cluster-manager node in the cluster.
*/
Expand Down

0 comments on commit 8b5e8a9

Please sign in to comment.