Skip to content

Commit

Permalink
[Remote Publication] Add remote download stats (#15291) (#15570)
Browse files Browse the repository at this point in the history
---------
Signed-off-by: Arpit Bandejiya <[email protected]>

(cherry picked from commit b54e867)
  • Loading branch information
Arpit-Bandejiya authored Sep 2, 2024
1 parent 8a9c167 commit 834b7c1
Show file tree
Hide file tree
Showing 15 changed files with 412 additions and 119 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527))
- [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410))
- Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -40,6 +41,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteUploadStats.REMOTE_UPLOAD;
import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_TABLE;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -253,11 +255,13 @@ private void verifyIndexRoutingFilesDeletion(
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(discoveryStats.getClusterStateStats());
for (PersistedStateStats persistedStateStats : discoveryStats.getClusterStateStats().getPersistenceStats()) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemotePersistenceStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
if (Objects.equals(persistedStateStats.getStatsName(), REMOTE_UPLOAD)) {
Map<String, AtomicLong> extendedFields = persistedStateStats.getExtendedFields();
assertTrue(extendedFields.containsKey(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT));
long cleanupAttemptFailedCount = extendedFields.get(RemoteUploadStats.INDEX_ROUTING_FILES_CLEANUP_ATTEMPT_FAILED_COUNT)
.get();
assertEquals(0, cleanupAttemptFailedCount);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
Expand Down Expand Up @@ -155,6 +158,38 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

public void testRemotePublicationDownloadStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

assertDataNodeDownloadStats(nodesStatsResponseDataNode);

}

private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getSuccessCount() > 0);
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getFailedCount());
assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(1).getTotalTimeInMillis() > 0);
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,10 @@ public DiscoveryStats stats() {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
if (coordinationState.get().isRemotePublicationEnabled()) {
stats.add(publicationHandler.getFullDownloadStats());
stats.add(publicationHandler.getDiffDownloadStats());
}
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ public PublishClusterStateStats stats() {
);
}

public PersistedStateStats getFullDownloadStats() {
return remoteClusterStateService.getFullDownloadStats();
}

public PersistedStateStats getDiffDownloadStats() {
return remoteClusterStateService.getDiffDownloadStats();
}

private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportRequest request) throws IOException {
try (StreamInput in = CompressedStreamUtils.decompressBytes(request, namedWriteableRegistry)) {
ClusterState incomingState;
Expand Down Expand Up @@ -231,69 +239,78 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}

// package private for testing
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException, IllegalStateException {
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.trace(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
try {
if (transportService.getLocalNode().equals(request.getSourceNode())) {
return acceptRemoteStateOnLocalNode(request);
}
// TODO Make cluster state download non-blocking: https://github.com/opensearch-project/OpenSearch/issues/14102
ClusterMetadataManifest manifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
request.getClusterUUID(),
request.getManifestFile()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
if (manifest == null) {
throw new IllegalStateException("Publication failed as manifest was not found for " + request);
}
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug(() -> "Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug(() -> "There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug(() -> "Last cluster state not compatible with the diff");
applyFullState = true;
}

if (applyFullState == true) {
logger.debug(
() -> new ParameterizedMessage(
"Downloading full cluster state for term {}, version {}, stateUUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(
request.getClusterName(),
manifest,
transportService.getLocalNode().getId(),
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
logger.debug(
() -> new ParameterizedMessage(
"Downloading diff cluster state for term {}, version {}, previousUUID {}, current UUID {}",
manifest.getClusterTerm(),
manifest.getStateVersion(),
manifest.getDiffManifest().getFromStateUUID(),
manifest.getStateUUID()
)
);
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(
manifest,
lastSeen,
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
} catch (Exception e) {
if (applyFullState) {
remoteClusterStateService.fullDownloadFailed();
} else {
remoteClusterStateService.diffDownloadFailed();
}
throw e;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
return remoteClusterStateService.getUploadStats();
}

private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RemoteClusterStateCleanupManager(
RemoteRoutingTableService remoteRoutingTableService
) {
this.remoteClusterStateService = remoteClusterStateService;
this.remoteStateStats = remoteClusterStateService.getStats();
this.remoteStateStats = remoteClusterStateService.getRemoteStateStats();
ClusterSettings clusterSettings = clusterService.getClusterSettings();
this.clusterApplierService = clusterService.getClusterApplierService();
this.staleFileCleanupInterval = clusterSettings.get(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING);
Expand Down
Loading

0 comments on commit 834b7c1

Please sign in to comment.