diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java index 608d7d9d02581..9641c013bf226 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureIT.java @@ -17,7 +17,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.repositories.RepositoriesService; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; @@ -92,7 +92,7 @@ private void validateBackpressure( assertTrue(ex.getMessage().contains("rejected execution on primary shard")); assertTrue(ex.getMessage().contains(breachMode)); - RemoteRefreshSegmentTracker.Stats stats = stats(); + RemoteSegmentTransferTracker.Stats stats = stats(); assertTrue(stats.bytesLag > 0); assertTrue(stats.refreshTimeLagMs > 0); assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0); @@ -102,7 +102,7 @@ private void validateBackpressure( .setRandomControlIOExceptionRate(0d); assertBusy(() -> { - RemoteRefreshSegmentTracker.Stats finalStats = stats(); + RemoteSegmentTransferTracker.Stats finalStats = stats(); assertEquals(0, finalStats.bytesLag); assertEquals(0, finalStats.refreshTimeLagMs); assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber); @@ -115,11 +115,11 @@ private void validateBackpressure( deleteRepo(); } - private RemoteRefreshSegmentTracker.Stats stats() { + private RemoteSegmentTransferTracker.Stats stats() { String shardId = "0"; RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); - List matches = Arrays.stream(response.getShards()) + List matches = Arrays.stream(response.getRemoteStoreStats()) .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 56339581edfa4..c15ef8164a19f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -9,18 +9,33 @@ package org.opensearch.remotestore; import org.junit.Before; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.test.OpenSearchIntegTestCase; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import java.util.stream.Stream; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3) public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { @@ -52,14 +67,41 @@ public void testStatsResponseFromAllNodes() { for (String node : nodes) { RemoteStoreStatsResponse response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); assertTrue(response.getSuccessfulShards() > 0); - assertTrue(response.getShards() != null && response.getShards().length != 0); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0); final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); - List matches = Arrays.stream(response.getShards()) + List matches = Arrays.stream(response.getRemoteStoreStats()) .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) .collect(Collectors.toList()); assertEquals(1, matches.size()); - RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); - assertResponseStats(stats); + RemoteSegmentTransferTracker.Stats stats = matches.get(0).getStats(); + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + } + + // Step 3 - Enable replicas on the existing indices and ensure that download + // stats are being populated as well + changeReplicaCountAndEnsureGreen(1); + for (String node : nodes) { + RemoteStoreStatsResponse response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get(); + assertTrue(response.getSuccessfulShards() > 0); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId); + List matches = Arrays.stream(response.getRemoteStoreStats()) + .filter(stat -> indexShardId.equals(stat.getStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(2, matches.size()); + for (RemoteStoreStats stat : matches) { + ShardRouting routing = stat.getShardRouting(); + validateShardRouting(routing); + RemoteSegmentTransferTracker.Stats stats = stat.getStats(); + if (routing.primary()) { + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + } else { + validateDownloadStats(stats); + assertEquals(0, stats.totalUploadsStarted); + } + } } } @@ -84,10 +126,31 @@ public void testStatsResponseAllShards() { .cluster() .prepareRemoteStoreStats(INDEX_NAME, null); RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); - assertTrue(response.getSuccessfulShards() == 3); - assertTrue(response.getShards() != null && response.getShards().length == 3); - RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); - assertResponseStats(stats); + assertEquals(3, response.getSuccessfulShards()); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 3); + RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats(); + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + + // Step 3 - Enable replicas on the existing indices and ensure that download + // stats are being populated as well + changeReplicaCountAndEnsureGreen(1); + response = client(node).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, null).get(); + assertEquals(6, response.getSuccessfulShards()); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 6); + for (RemoteStoreStats stat : response.getRemoteStoreStats()) { + ShardRouting routing = stat.getShardRouting(); + validateShardRouting(routing); + stats = stat.getStats(); + if (routing.primary()) { + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + } else { + validateDownloadStats(stats); + assertEquals(0, stats.totalUploadsStarted); + } + } + } public void testStatsResponseFromLocalNode() { @@ -112,29 +175,398 @@ public void testStatsResponseFromLocalNode() { .prepareRemoteStoreStats(INDEX_NAME, null); remoteStoreStatsRequestBuilder.setLocal(true); RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); - assertTrue(response.getSuccessfulShards() == 1); - assertTrue(response.getShards() != null && response.getShards().length == 1); - RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); - assertResponseStats(stats); + assertEquals(1, response.getSuccessfulShards()); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 1); + RemoteSegmentTransferTracker.Stats stats = response.getRemoteStoreStats()[0].getStats(); + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + } + changeReplicaCountAndEnsureGreen(1); + for (String node : nodes) { + RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin() + .cluster() + .prepareRemoteStoreStats(INDEX_NAME, null); + remoteStoreStatsRequestBuilder.setLocal(true); + RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); + assertTrue(response.getSuccessfulShards() > 0); + assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length != 0); + for (RemoteStoreStats stat : response.getRemoteStoreStats()) { + ShardRouting routing = stat.getShardRouting(); + validateShardRouting(routing); + RemoteSegmentTransferTracker.Stats stats = stat.getStats(); + if (routing.primary()) { + validateUploadStats(stats); + assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted); + } else { + validateDownloadStats(stats); + assertEquals(0, stats.totalUploadsStarted); + } + } + } + } + + public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception { + // Scenario: + // - Create index with single primary and single replica shard + // - Disable Refresh Interval for the index + // - Index documents + // - Trigger refresh and flush + // - Assert that download stats == upload stats + // - Repeat this step for random times (between 5 and 10) + + // Create index with 1 pri and 1 replica and refresh interval disabled + createIndex( + INDEX_NAME, + Settings.builder().put(remoteStoreIndexSettings(1, 1)).put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build() + ); + ensureGreen(INDEX_NAME); + + // Manually invoke a refresh + refresh(INDEX_NAME); + + // Get zero state values + // Extract and assert zero state primary stats + RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()) + .get(0) + .getStats(); + assertTrue( + zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded + && zeroStatePrimaryStats.totalUploadsSucceeded == 1 + ); + assertTrue( + zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded + && zeroStatePrimaryStats.uploadBytesSucceeded > 0 + ); + assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0); + + // Extract and assert zero state replica stats + RemoteSegmentTransferTracker.Stats zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()) + .get(0) + .getStats(); + assertTrue( + zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0 + && zeroStateReplicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 + ); + + // Index documents + for (int i = 1; i <= randomIntBetween(5, 10); i++) { + indexSingleDoc(INDEX_NAME); + // Running Flush & Refresh manually + flushAndRefresh(INDEX_NAME); + ensureGreen(INDEX_NAME); + + // Poll for RemoteStore Stats + assertBusy(() -> { + RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + // Iterate through the response and extract the relevant segment upload and download stats + List primaryStatsList = Arrays.stream(response.getRemoteStoreStats()) + .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()); + assertEquals(1, primaryStatsList.size()); + List replicaStatsList = Arrays.stream(response.getRemoteStoreStats()) + .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()); + assertEquals(1, replicaStatsList.size()); + RemoteSegmentTransferTracker.Stats primaryStats = primaryStatsList.get(0).getStats(); + RemoteSegmentTransferTracker.Stats replicaStats = replicaStatsList.get(0).getStats(); + // Assert Upload syncs - zero state uploads == download syncs + assertTrue(primaryStats.totalUploadsStarted > 0); + assertTrue(primaryStats.totalUploadsSucceeded > 0); + assertTrue( + replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0 + && primaryStats.uploadBytesStarted + - zeroStatePrimaryStats.uploadBytesStarted == replicaStats.directoryFileTransferTrackerStats.transferredBytesStarted + ); + assertTrue( + replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0 + && primaryStats.uploadBytesSucceeded + - zeroStatePrimaryStats.uploadBytesSucceeded == replicaStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + ); + // Assert zero failures + assertEquals(0, primaryStats.uploadBytesFailed); + assertEquals(0, replicaStats.directoryFileTransferTrackerStats.transferredBytesFailed); + }, 60, TimeUnit.SECONDS); + } + } + + public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception { + // Scenario: + // - Create index with single primary and N-1 replica shards (N = no of data nodes) + // - Disable Refresh Interval for the index + // - Index documents + // - Trigger refresh and flush + // - Assert that download stats == upload stats + // - Repeat this step for random times (between 5 and 10) + + // Create index + int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(remoteStoreIndexSettings(dataNodeCount - 1, 1)) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1) + .build() + ); + ensureGreen(INDEX_NAME); + + // Manually invoke a refresh + refresh(INDEX_NAME); + + // Get zero state values + // Extract and assert zero state primary stats + RemoteStoreStatsResponse zeroStateResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + RemoteSegmentTransferTracker.Stats zeroStatePrimaryStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()) + .get(0) + .getStats(); + assertTrue( + zeroStatePrimaryStats.totalUploadsStarted == zeroStatePrimaryStats.totalUploadsSucceeded + && zeroStatePrimaryStats.totalUploadsSucceeded == 1 + ); + assertTrue( + zeroStatePrimaryStats.uploadBytesStarted == zeroStatePrimaryStats.uploadBytesSucceeded + && zeroStatePrimaryStats.uploadBytesSucceeded > 0 + ); + assertTrue(zeroStatePrimaryStats.totalUploadsFailed == 0 && zeroStatePrimaryStats.uploadBytesFailed == 0); + + // Extract and assert zero state replica stats + List zeroStateReplicaStats = Arrays.stream(zeroStateResponse.getRemoteStoreStats()) + .filter(remoteStoreStats -> !remoteStoreStats.getShardRouting().primary()) + .collect(Collectors.toList()); + zeroStateReplicaStats.forEach(stats -> { + assertTrue( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted == 0 + && stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 + ); + }); + + int currentNodesInCluster = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + for (int i = 0; i < randomIntBetween(5, 10); i++) { + indexSingleDoc(INDEX_NAME); + // Running Flush & Refresh manually + flushAndRefresh(INDEX_NAME); + + assertBusy(() -> { + RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + assertEquals(currentNodesInCluster, response.getSuccessfulShards()); + long uploadsStarted = 0, uploadsSucceeded = 0, uploadsFailed = 0; + long uploadBytesStarted = 0, uploadBytesSucceeded = 0, uploadBytesFailed = 0; + List downloadBytesStarted = new ArrayList<>(), downloadBytesSucceeded = new ArrayList<>(), downloadBytesFailed = + new ArrayList<>(); + + // Assert that stats for primary shard and replica shard set are equal + for (RemoteStoreStats eachStatsObject : response.getRemoteStoreStats()) { + RemoteSegmentTransferTracker.Stats stats = eachStatsObject.getStats(); + if (eachStatsObject.getShardRouting().primary()) { + uploadBytesStarted = stats.uploadBytesStarted; + uploadBytesSucceeded = stats.uploadBytesSucceeded; + uploadBytesFailed = stats.uploadBytesFailed; + } else { + downloadBytesStarted.add(stats.directoryFileTransferTrackerStats.transferredBytesStarted); + downloadBytesSucceeded.add(stats.directoryFileTransferTrackerStats.transferredBytesSucceeded); + downloadBytesFailed.add(stats.directoryFileTransferTrackerStats.transferredBytesFailed); + } + } + + assertEquals(0, uploadsFailed); + assertEquals(0, uploadBytesFailed); + for (int j = 0; j < response.getSuccessfulShards() - 1; j++) { + assertEquals(uploadBytesStarted - zeroStatePrimaryStats.uploadBytesStarted, (long) downloadBytesStarted.get(j)); + assertEquals(uploadBytesSucceeded - zeroStatePrimaryStats.uploadBytesSucceeded, (long) downloadBytesSucceeded.get(j)); + assertEquals(0, (long) downloadBytesFailed.get(j)); + } + }, 60, TimeUnit.SECONDS); } } + public void testStatsOnShardRelocation() { + // Scenario: + // - Create index with single primary and single replica shard + // - Index documents + // - Reroute replica shard to one of the remaining nodes + // - Assert that remote store stats reflects the new node ID + + // Create index + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); + ensureGreen(INDEX_NAME); + // Index docs + indexDocs(); + + // Fetch current set of nodes in the cluster + List currentNodesInCluster = getClusterState().nodes() + .getDataNodes() + .values() + .stream() + .map(DiscoveryNode::getId) + .collect(Collectors.toList()); + DiscoveryNode[] discoveryNodesForIndex = client().admin().cluster().prepareSearchShards(INDEX_NAME).get().getNodes(); + + // Fetch nodes with shard copies of the created index + List nodeIdsWithShardCopies = new ArrayList<>(); + Arrays.stream(discoveryNodesForIndex).forEach(eachNode -> nodeIdsWithShardCopies.add(eachNode.getId())); + + // Fetch nodes which does not have any copies of the index + List nodeIdsWithoutShardCopy = currentNodesInCluster.stream() + .filter(eachNode -> !nodeIdsWithShardCopies.contains(eachNode)) + .collect(Collectors.toList()); + assertEquals(1, nodeIdsWithoutShardCopy.size()); + + // Manually reroute shard to a node which does not have any shard copy at present + ShardRouting replicaShardRouting = getClusterState().routingTable() + .index(INDEX_NAME) + .shard(0) + .assignedShards() + .stream() + .filter(shard -> !shard.primary()) + .collect(Collectors.toList()) + .get(0); + String sourceNode = replicaShardRouting.currentNodeId(); + String destinationNode = nodeIdsWithoutShardCopy.get(0); + relocateShard(0, sourceNode, destinationNode); + RemoteStoreStats[] allShardsStats = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get().getRemoteStoreStats(); + RemoteStoreStats replicaShardStat = Arrays.stream(allShardsStats) + .filter(eachStat -> !eachStat.getShardRouting().primary()) + .collect(Collectors.toList()) + .get(0); + + // Assert that remote store stats reflect the new shard state + assertEquals(ShardRoutingState.STARTED, replicaShardStat.getShardRouting().state()); + assertEquals(destinationNode, replicaShardStat.getShardRouting().currentNodeId()); + } + + public void testStatsOnShardUnassigned() throws IOException { + // Scenario: + // - Create index with single primary and two replica shard + // - Index documents + // - Stop one data node + // - Assert: + // a. Total shard Count in the response object is equal to the previous node count + // b. Successful shard count in the response object is equal to the new node count + createIndex(INDEX_NAME, remoteStoreIndexSettings(2, 1)); + ensureGreen(INDEX_NAME); + indexDocs(); + int dataNodeCountBeforeStop = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + internalCluster().stopRandomDataNode(); + RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + int dataNodeCountAfterStop = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + assertEquals(dataNodeCountBeforeStop, response.getTotalShards()); + assertEquals(dataNodeCountAfterStop, response.getSuccessfulShards()); + } + + public void testStatsOnRemoteStoreRestore() throws IOException { + // Creating an index with primary shard count == total nodes in cluster and 0 replicas + int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount)); + ensureGreen(INDEX_NAME); + + // Index some docs to ensure segments being uploaded to remote store + indexDocs(); + refresh(INDEX_NAME); + + // Stop one data node to force the index into a red state + internalCluster().stopRandomDataNode(); + ensureRed(INDEX_NAME); + + // Start another data node to fulfil the previously launched capacity + internalCluster().startDataOnlyNode(); + + // Restore index from remote store + assertAcked(client().admin().indices().prepareClose(INDEX_NAME)); + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true), PlainActionFuture.newFuture()); + + // Ensure that the index is green + ensureGreen(INDEX_NAME); + + // Index some more docs to force segment uploads to remote store + indexDocs(); + + RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + Arrays.stream(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(statObject -> { + RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats(); + // Assert that we have both upload and download stats for the index + assertTrue( + segmentTracker.totalUploadsStarted > 0 && segmentTracker.totalUploadsSucceeded > 0 && segmentTracker.totalUploadsFailed == 0 + ); + assertTrue( + segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted > 0 + && segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0 + ); + }); + } + + public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception { + // Create an index with one primary and one replica shard + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); + ensureGreen(INDEX_NAME); + refresh(INDEX_NAME); + + // Ensure that the index has 0 documents in it + assertEquals(0, client().admin().indices().prepareStats(INDEX_NAME).get().getTotal().docs.getCount()); + + // Assert that within 5 seconds the download and upload stats moves to a non-zero value + assertBusy(() -> { + RemoteStoreStats[] remoteStoreStats = client().admin() + .cluster() + .prepareRemoteStoreStats(INDEX_NAME, "0") + .get() + .getRemoteStoreStats(); + Arrays.stream(remoteStoreStats).forEach(statObject -> { + RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getStats(); + if (statObject.getShardRouting().primary()) { + assertTrue( + segmentTracker.totalUploadsSucceeded == 1 + && segmentTracker.totalUploadsStarted == segmentTracker.totalUploadsSucceeded + && segmentTracker.totalUploadsFailed == 0 + ); + } else { + assertTrue( + segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted == 0 + && segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 + ); + } + }); + }, 5, TimeUnit.SECONDS); + } + private void indexDocs() { - // Indexing documents along with refreshes and flushes. for (int i = 0; i < randomIntBetween(5, 10); i++) { if (randomBoolean()) { flush(INDEX_NAME); } else { refresh(INDEX_NAME); } - int numberOfOperations = randomIntBetween(20, 50); + int numberOfOperations = randomIntBetween(10, 30); for (int j = 0; j < numberOfOperations; j++) { indexSingleDoc(INDEX_NAME); } } } - private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) { + private void changeReplicaCountAndEnsureGreen(int replicaCount) { + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + ); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + } + + private void relocateShard(int shardId, String sourceNode, String destNode) { + assertAcked(client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(INDEX_NAME, shardId, sourceNode, destNode))); + ensureGreen(INDEX_NAME); + } + + private void validateUploadStats(RemoteSegmentTransferTracker.Stats stats) { assertEquals(0, stats.refreshTimeLagMs); assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber); assertTrue(stats.uploadBytesStarted > 0); @@ -150,4 +582,32 @@ private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) { assertTrue(stats.uploadBytesPerSecMovingAverage > 0); assertTrue(stats.uploadTimeMovingAverage > 0); } + + private void validateDownloadStats(RemoteSegmentTransferTracker.Stats stats) { + assertTrue(stats.directoryFileTransferTrackerStats.lastTransferTimestampMs > 0); + assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesStarted > 0); + assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0); + assertEquals(stats.directoryFileTransferTrackerStats.transferredBytesFailed, 0); + assertTrue(stats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes > 0); + assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesMovingAverage > 0); + assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage > 0); + } + + // Validate if the shardRouting obtained from cluster state contains the exact same routing object + // parameters as obtained from the remote store stats API + private void validateShardRouting(ShardRouting routing) { + Stream currentRoutingTable = getClusterState().routingTable() + .getIndicesRouting() + .get(INDEX_NAME) + .shard(routing.id()) + .assignedShards() + .stream(); + assertTrue( + currentRoutingTable.anyMatch( + r -> (r.currentNodeId().equals(routing.currentNodeId()) + && r.state().equals(routing.state()) + && r.primary() == routing.primary()) + ) + ); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index 5ac9c1cf5f74c..6b4c9a26ab19b 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -11,9 +11,10 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import java.io.IOException; @@ -24,72 +25,128 @@ */ public class RemoteStoreStats implements Writeable, ToXContentFragment { - private final RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats; + private final RemoteSegmentTransferTracker.Stats remoteSegmentShardStats; - public RemoteStoreStats(RemoteRefreshSegmentTracker.Stats remoteSegmentUploadShardStats) { - this.remoteSegmentUploadShardStats = remoteSegmentUploadShardStats; + private final ShardRouting shardRouting; + + public RemoteStoreStats(RemoteSegmentTransferTracker.Stats remoteSegmentUploadShardStats, ShardRouting shardRouting) { + this.remoteSegmentShardStats = remoteSegmentUploadShardStats; + this.shardRouting = shardRouting; } public RemoteStoreStats(StreamInput in) throws IOException { - remoteSegmentUploadShardStats = in.readOptionalWriteable(RemoteRefreshSegmentTracker.Stats::new); + this.remoteSegmentShardStats = in.readOptionalWriteable(RemoteSegmentTransferTracker.Stats::new); + this.shardRouting = new ShardRouting(in); + } + + public RemoteSegmentTransferTracker.Stats getStats() { + return remoteSegmentShardStats; } - public RemoteRefreshSegmentTracker.Stats getStats() { - return remoteSegmentUploadShardStats; + public ShardRouting getShardRouting() { + return shardRouting; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject() - .field(Fields.SHARD_ID, remoteSegmentUploadShardStats.shardId) - .field(Fields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.localRefreshClockTimeMs) - .field(Fields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentUploadShardStats.remoteRefreshClockTimeMs) - .field(Fields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentUploadShardStats.refreshTimeLagMs) - .field(Fields.REFRESH_LAG, remoteSegmentUploadShardStats.localRefreshNumber - remoteSegmentUploadShardStats.remoteRefreshNumber) - .field(Fields.BYTES_LAG, remoteSegmentUploadShardStats.bytesLag) - - .field(Fields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentUploadShardStats.rejectionCount) - .field(Fields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentUploadShardStats.consecutiveFailuresCount); - - builder.startObject(Fields.TOTAL_REMOTE_REFRESH); - builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.totalUploadsStarted) - .field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.totalUploadsSucceeded) - .field(SubFields.FAILED, remoteSegmentUploadShardStats.totalUploadsFailed); + builder.startObject(); + buildShardRouting(builder); + builder.startObject(Fields.SEGMENT); + builder.startObject(SubFields.DOWNLOAD); + // Ensuring that we are not showing 0 metrics to the user + if (remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted != 0) { + buildDownloadStats(builder); + } + builder.endObject(); + builder.startObject(SubFields.UPLOAD); + // Ensuring that we are not showing 0 metrics to the user + if (remoteSegmentShardStats.totalUploadsStarted != 0) { + buildUploadStats(builder); + } builder.endObject(); - - builder.startObject(Fields.TOTAL_UPLOADS_IN_BYTES); - builder.field(SubFields.STARTED, remoteSegmentUploadShardStats.uploadBytesStarted) - .field(SubFields.SUCCEEDED, remoteSegmentUploadShardStats.uploadBytesSucceeded) - .field(SubFields.FAILED, remoteSegmentUploadShardStats.uploadBytesFailed); builder.endObject(); + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(remoteSegmentShardStats); + shardRouting.writeTo(out); + } - builder.startObject(Fields.REMOTE_REFRESH_SIZE_IN_BYTES); - builder.field(SubFields.LAST_SUCCESSFUL, remoteSegmentUploadShardStats.lastSuccessfulRemoteRefreshBytes); - builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesMovingAverage); + private void buildUploadStats(XContentBuilder builder) throws IOException { + builder.field(UploadStatsFields.LOCAL_REFRESH_TIMESTAMP, remoteSegmentShardStats.localRefreshClockTimeMs) + .field(UploadStatsFields.REMOTE_REFRESH_TIMESTAMP, remoteSegmentShardStats.remoteRefreshClockTimeMs) + .field(UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS, remoteSegmentShardStats.refreshTimeLagMs) + .field(UploadStatsFields.REFRESH_LAG, remoteSegmentShardStats.localRefreshNumber - remoteSegmentShardStats.remoteRefreshNumber) + .field(UploadStatsFields.BYTES_LAG, remoteSegmentShardStats.bytesLag) + .field(UploadStatsFields.BACKPRESSURE_REJECTION_COUNT, remoteSegmentShardStats.rejectionCount) + .field(UploadStatsFields.CONSECUTIVE_FAILURE_COUNT, remoteSegmentShardStats.consecutiveFailuresCount); + builder.startObject(UploadStatsFields.TOTAL_SYNCS_TO_REMOTE) + .field(SubFields.STARTED, remoteSegmentShardStats.totalUploadsStarted) + .field(SubFields.SUCCEEDED, remoteSegmentShardStats.totalUploadsSucceeded) + .field(SubFields.FAILED, remoteSegmentShardStats.totalUploadsFailed); builder.endObject(); + builder.startObject(UploadStatsFields.TOTAL_UPLOADS_IN_BYTES) + .field(SubFields.STARTED, remoteSegmentShardStats.uploadBytesStarted) + .field(SubFields.SUCCEEDED, remoteSegmentShardStats.uploadBytesSucceeded) + .field(SubFields.FAILED, remoteSegmentShardStats.uploadBytesFailed); + builder.endObject(); + builder.startObject(UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES) + .field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.lastSuccessfulRemoteRefreshBytes) + .field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesMovingAverage); + builder.endObject(); + builder.startObject(UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC) + .field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadBytesPerSecMovingAverage); + builder.endObject(); + builder.startObject(UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS) + .field(SubFields.MOVING_AVG, remoteSegmentShardStats.uploadTimeMovingAverage); + builder.endObject(); + } - builder.startObject(Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC); - builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadBytesPerSecMovingAverage); + private void buildDownloadStats(XContentBuilder builder) throws IOException { + builder.field( + DownloadStatsFields.LAST_SYNC_TIMESTAMP, + remoteSegmentShardStats.directoryFileTransferTrackerStats.lastTransferTimestampMs + ); + builder.startObject(DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES) + .field(SubFields.STARTED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesStarted) + .field(SubFields.SUCCEEDED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesSucceeded) + .field(SubFields.FAILED, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesFailed); builder.endObject(); - builder.startObject(Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS); - builder.field(SubFields.MOVING_AVG, remoteSegmentUploadShardStats.uploadTimeMovingAverage); + builder.startObject(DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES) + .field(SubFields.LAST_SUCCESSFUL, remoteSegmentShardStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes) + .field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage); builder.endObject(); + builder.startObject(DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC) + .field(SubFields.MOVING_AVG, remoteSegmentShardStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage); builder.endObject(); + } - return builder; + private void buildShardRouting(XContentBuilder builder) throws IOException { + builder.startObject(Fields.ROUTING); + builder.field(RoutingFields.STATE, shardRouting.state()); + builder.field(RoutingFields.PRIMARY, shardRouting.primary()); + builder.field(RoutingFields.NODE_ID, shardRouting.currentNodeId()); + builder.endObject(); } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(remoteSegmentUploadShardStats); + static final class Fields { + static final String ROUTING = "routing"; + static final String SEGMENT = "segment"; + static final String TRANSLOG = "translog"; + } + + static final class RoutingFields { + static final String STATE = "state"; + static final String PRIMARY = "primary"; + static final String NODE_ID = "node"; } /** * Fields for remote store stats response */ - static final class Fields { - static final String SHARD_ID = "shard_id"; - + static final class UploadStatsFields { /** * Lag in terms of bytes b/w local and remote store */ @@ -128,7 +185,7 @@ static final class Fields { /** * Represents the number of remote refreshes */ - static final String TOTAL_REMOTE_REFRESH = "total_remote_refresh"; + static final String TOTAL_SYNCS_TO_REMOTE = "total_syncs_to_remote"; /** * Represents the total uploads to remote store in bytes @@ -151,21 +208,46 @@ static final class Fields { static final String REMOTE_REFRESH_LATENCY_IN_MILLIS = "remote_refresh_latency_in_millis"; } + static final class DownloadStatsFields { + /** + * Last successful sync from remote in milliseconds + */ + static final String LAST_SYNC_TIMESTAMP = "last_sync_timestamp"; + + /** + * Total bytes of segment files downloaded from the remote store for a specific shard + */ + static final String TOTAL_DOWNLOADS_IN_BYTES = "total_downloads_in_bytes"; + + /** + * Size of each segment file downloaded from the remote store + */ + static final String DOWNLOAD_SIZE_IN_BYTES = "download_size_in_bytes"; + + /** + * Speed (in bytes/sec) for segment file downloads + */ + static final String DOWNLOAD_SPEED_IN_BYTES_PER_SEC = "download_speed_in_bytes_per_sec"; + } + /** - * Reusable sub fields for {@link Fields} + * Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields} */ static final class SubFields { static final String STARTED = "started"; static final String SUCCEEDED = "succeeded"; static final String FAILED = "failed"; + static final String DOWNLOAD = "download"; + static final String UPLOAD = "upload"; + /** - * Moving avg over last N values stat for a {@link Fields} + * Moving avg over last N values stat */ static final String MOVING_AVG = "moving_avg"; /** - * Most recent successful attempt stat for a {@link Fields} + * Most recent successful attempt stat */ static final String LAST_SUCCESSFUL = "last_successful"; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponse.java index 20023e30a271e..f6613c1d2ac50 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponse.java @@ -17,7 +17,10 @@ import org.opensearch.core.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Remote Store stats response @@ -26,49 +29,71 @@ */ public class RemoteStoreStatsResponse extends BroadcastResponse { - private final RemoteStoreStats[] shards; + private final RemoteStoreStats[] remoteStoreStats; public RemoteStoreStatsResponse(StreamInput in) throws IOException { super(in); - shards = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new); + remoteStoreStats = in.readArray(RemoteStoreStats::new, RemoteStoreStats[]::new); } public RemoteStoreStatsResponse( - RemoteStoreStats[] shards, + RemoteStoreStats[] remoteStoreStats, int totalShards, int successfulShards, int failedShards, List shardFailures ) { super(totalShards, successfulShards, failedShards, shardFailures); - this.shards = shards; + this.remoteStoreStats = remoteStoreStats; } - public RemoteStoreStats[] getShards() { - return this.shards; + public RemoteStoreStats[] getRemoteStoreStats() { + return this.remoteStoreStats; } - public RemoteStoreStats getAt(int position) { - return shards[position]; + public Map>> groupByIndexAndShards() { + Map>> indexWiseStats = new HashMap<>(); + for (RemoteStoreStats shardStat : remoteStoreStats) { + indexWiseStats.computeIfAbsent(shardStat.getShardRouting().getIndexName(), k -> new HashMap<>()) + .computeIfAbsent(shardStat.getShardRouting().getId(), k -> new ArrayList<>()) + .add(shardStat); + } + return indexWiseStats; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeArray(shards); + out.writeArray(remoteStoreStats); } @Override protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { - builder.startArray("stats"); - for (RemoteStoreStats shard : shards) { - shard.toXContent(builder, params); + Map>> indexWiseStats = groupByIndexAndShards(); + builder.startObject(Fields.INDICES); + for (String indexName : indexWiseStats.keySet()) { + builder.startObject(indexName); + builder.startObject(Fields.SHARDS); + for (int shardId : indexWiseStats.get(indexName).keySet()) { + builder.startArray(Integer.toString(shardId)); + for (RemoteStoreStats shardStat : indexWiseStats.get(indexName).get(shardId)) { + shardStat.toXContent(builder, params); + } + builder.endArray(); + } + builder.endObject(); + builder.endObject(); } - builder.endArray(); + builder.endObject(); } @Override public String toString() { return Strings.toString(XContentType.JSON, this, true, false); } + + static final class Fields { + static final String SHARDS = "shards"; + static final String INDICES = "indices"; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java index 434abd1207f50..37835a5add3d6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsAction.java @@ -24,7 +24,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.index.IndexService; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.indices.IndicesService; @@ -49,6 +49,7 @@ public class TransportRemoteStoreStatsAction extends TransportBroadcastByNodeAct RemoteStoreStats> { private final IndicesService indicesService; + private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; @Inject @@ -95,7 +96,6 @@ protected ShardsIterator shards(ClusterState clusterState, RemoteStoreStatsReque || (shardRouting.currentNodeId() == null || shardRouting.currentNodeId().equals(clusterState.getNodes().getLocalNodeId())) ) - .filter(ShardRouting::primary) .filter( shardRouting -> Boolean.parseBoolean( clusterState.getMetadata().index(shardRouting.index()).getSettings().get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) @@ -153,11 +153,10 @@ protected RemoteStoreStats shardOperation(RemoteStoreStatsRequest request, Shard throw new ShardNotFoundException(indexShard.shardId()); } - RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker( + RemoteSegmentTransferTracker remoteSegmentTransferTracker = remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker( indexShard.shardId() ); - assert Objects.nonNull(remoteRefreshSegmentTracker); - - return new RemoteStoreStats(remoteRefreshSegmentTracker.stats()); + assert Objects.nonNull(remoteSegmentTransferTracker); + return new RemoteStoreStats(remoteSegmentTransferTracker.stats(), indexShard.routingEntry()); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java index 3f1161f0c5e03..6f6364ac3b8a6 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureService.java @@ -26,7 +26,7 @@ import java.util.function.BiConsumer; /** - * Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteRefreshSegmentTracker}. + * Service used to validate if the incoming indexing request should be rejected based on the {@link RemoteSegmentTransferTracker}. * * @opensearch.internal */ @@ -37,7 +37,7 @@ public class RemoteRefreshSegmentPressureService implements IndexEventListener { /** * Keeps map of remote-backed index shards and their corresponding backpressure tracker. */ - private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); + private final Map trackerMap = ConcurrentCollections.newConcurrentMap(); /** * Remote refresh segment pressure settings which is used for creation of the backpressure tracker and as well as rejection. @@ -57,12 +57,12 @@ public RemoteRefreshSegmentPressureService(ClusterService clusterService, Settin } /** - * Get {@code RemoteRefreshSegmentTracker} only if the underlying Index has remote segments integration enabled. + * Get {@code RemoteSegmentTransferTracker} only if the underlying Index has remote segments integration enabled. * * @param shardId shard id * @return the tracker if index is remote-backed, else null. */ - public RemoteRefreshSegmentTracker getRemoteRefreshSegmentTracker(ShardId shardId) { + public RemoteSegmentTransferTracker getRemoteRefreshSegmentTracker(ShardId shardId) { return trackerMap.get(shardId); } @@ -74,8 +74,9 @@ public void afterIndexShardCreated(IndexShard indexShard) { ShardId shardId = indexShard.shardId(); trackerMap.put( shardId, - new RemoteRefreshSegmentTracker( + new RemoteSegmentTransferTracker( shardId, + indexShard.store().getDirectoryFileTransferTracker(), pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -86,8 +87,8 @@ public void afterIndexShardCreated(IndexShard indexShard) { @Override public void afterIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) { - RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = trackerMap.remove(shardId); - if (remoteRefreshSegmentTracker != null) { + RemoteSegmentTransferTracker remoteSegmentTransferTracker = trackerMap.remove(shardId); + if (remoteSegmentTransferTracker != null) { logger.trace("Deleted tracker for shardId={}", shardId); } } @@ -107,34 +108,34 @@ public boolean isSegmentsUploadBackpressureEnabled() { * @param shardId shardId for which the validation needs to be done. */ public void validateSegmentsUploadLag(ShardId shardId) { - RemoteRefreshSegmentTracker remoteRefreshSegmentTracker = getRemoteRefreshSegmentTracker(shardId); + RemoteSegmentTransferTracker remoteSegmentTransferTracker = getRemoteRefreshSegmentTracker(shardId); // condition 1 - This will be null for non-remote backed indexes // condition 2 - This will be zero if the remote store is - if (remoteRefreshSegmentTracker == null || remoteRefreshSegmentTracker.getRefreshSeqNoLag() == 0) { + if (remoteSegmentTransferTracker == null || remoteSegmentTransferTracker.getRefreshSeqNoLag() == 0) { return; } for (LagValidator lagValidator : lagValidators) { - if (lagValidator.validate(remoteRefreshSegmentTracker, shardId) == false) { - remoteRefreshSegmentTracker.incrementRejectionCount(lagValidator.name()); - throw new OpenSearchRejectedExecutionException(lagValidator.rejectionMessage(remoteRefreshSegmentTracker, shardId)); + if (lagValidator.validate(remoteSegmentTransferTracker, shardId) == false) { + remoteSegmentTransferTracker.incrementRejectionCount(lagValidator.name()); + throw new OpenSearchRejectedExecutionException(lagValidator.rejectionMessage(remoteSegmentTransferTracker, shardId)); } } } void updateUploadBytesMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesMovingAverageWindowSize, updatedSize); } void updateUploadBytesPerSecMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadBytesPerSecMovingAverageWindowSize, updatedSize); } void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { - updateMovingAverageWindowSize(RemoteRefreshSegmentTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); + updateMovingAverageWindowSize(RemoteSegmentTransferTracker::updateUploadTimeMsMovingAverageWindowSize, updatedSize); } - void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { + void updateMovingAverageWindowSize(BiConsumer biConsumer, int updatedSize) { trackerMap.values().forEach(tracker -> biConsumer.accept(tracker, updatedSize)); } @@ -158,7 +159,7 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { * @param shardId shard id of the {@code IndexShard} currently being validated. * @return true if successfully validated that lag is acceptable. */ - abstract boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); + abstract boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId); /** * Returns the name of the lag validator. @@ -167,7 +168,7 @@ private LagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) { */ abstract String name(); - abstract String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId); + abstract String rejectionMessage(RemoteSegmentTransferTracker pressureTracker, ShardId shardId); } /** @@ -184,7 +185,7 @@ private BytesLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) } @Override - public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { if (pressureTracker.getRefreshSeqNoLag() <= 1) { return true; } @@ -198,7 +199,7 @@ public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId sha } @Override - public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public String rejectionMessage(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { double dynamicBytesLagThreshold = pressureTracker.getUploadBytesAverage() * pressureSettings.getBytesLagVarianceFactor(); return String.format( Locale.ROOT, @@ -230,7 +231,7 @@ private TimeLagValidator(RemoteRefreshSegmentPressureSettings pressureSettings) } @Override - public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { if (pressureTracker.getRefreshSeqNoLag() <= 1) { return true; } @@ -244,7 +245,7 @@ public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId sha } @Override - public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public String rejectionMessage(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { double dynamicTimeLagThreshold = pressureTracker.getUploadTimeMsAverage() * pressureSettings.getUploadTimeLagVarianceFactor(); return String.format( Locale.ROOT, @@ -276,14 +277,14 @@ private ConsecutiveFailureValidator(RemoteRefreshSegmentPressureSettings pressur } @Override - public boolean validate(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public boolean validate(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { int failureStreakCount = pressureTracker.getConsecutiveFailureCount(); int minConsecutiveFailureThreshold = pressureSettings.getMinConsecutiveFailuresLimit(); return failureStreakCount <= minConsecutiveFailureThreshold; } @Override - public String rejectionMessage(RemoteRefreshSegmentTracker pressureTracker, ShardId shardId) { + public String rejectionMessage(RemoteSegmentTransferTracker pressureTracker, ShardId shardId) { return String.format( Locale.ROOT, "rejected execution on primary shard:%s due to remote segments lagging behind local segments." diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java similarity index 94% rename from server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java rename to server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 332b0d1698800..cd5d461584f0f 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteRefreshSegmentTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -15,6 +15,7 @@ import org.opensearch.common.util.Streak; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.DirectoryFileTransferTracker; import java.io.IOException; import java.util.HashMap; @@ -30,7 +31,7 @@ * * @opensearch.internal */ -public class RemoteRefreshSegmentTracker { +public class RemoteSegmentTransferTracker { /** * ShardId for which this instance tracks the remote segment upload metadata. @@ -169,8 +170,14 @@ public class RemoteRefreshSegmentTracker { private final Object uploadTimeMsMutex = new Object(); - public RemoteRefreshSegmentTracker( + /** + * {@link org.opensearch.index.store.Store.StoreDirectory} level file transfer tracker, used to show download stats + */ + private final DirectoryFileTransferTracker directoryFileTransferTracker; + + public RemoteSegmentTransferTracker( ShardId shardId, + DirectoryFileTransferTracker directoryFileTransferTracker, int uploadBytesMovingAverageWindowSize, int uploadBytesPerSecMovingAverageWindowSize, int uploadTimeMsMovingAverageWindowSize @@ -188,6 +195,7 @@ public RemoteRefreshSegmentTracker( uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize)); latestLocalFileNameLengthMap = new HashMap<>(); + this.directoryFileTransferTracker = directoryFileTransferTracker; } ShardId getShardId() { @@ -472,8 +480,12 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { } } - public RemoteRefreshSegmentTracker.Stats stats() { - return new RemoteRefreshSegmentTracker.Stats( + public DirectoryFileTransferTracker getDirectoryFileTransferTracker() { + return directoryFileTransferTracker; + } + + public RemoteSegmentTransferTracker.Stats stats() { + return new RemoteSegmentTransferTracker.Stats( shardId, localRefreshClockTimeMs, remoteRefreshClockTimeMs, @@ -492,7 +504,8 @@ public RemoteRefreshSegmentTracker.Stats stats() { uploadBytesMovingAverageReference.get().getAverage(), uploadBytesPerSecMovingAverageReference.get().getAverage(), uploadTimeMsMovingAverageReference.get().getAverage(), - getBytesLag() + getBytesLag(), + directoryFileTransferTracker.stats() ); } @@ -522,6 +535,7 @@ public static class Stats implements Writeable { public final double uploadBytesPerSecMovingAverage; public final double uploadTimeMovingAverage; public final long bytesLag; + public final DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats; public Stats( ShardId shardId, @@ -542,7 +556,8 @@ public Stats( double uploadBytesMovingAverage, double uploadBytesPerSecMovingAverage, double uploadTimeMovingAverage, - long bytesLag + long bytesLag, + DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats ) { this.shardId = shardId; this.localRefreshClockTimeMs = localRefreshClockTimeMs; @@ -563,6 +578,7 @@ public Stats( this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage; this.uploadTimeMovingAverage = uploadTimeMovingAverage; this.bytesLag = bytesLag; + this.directoryFileTransferTrackerStats = directoryFileTransferTrackerStats; } public Stats(StreamInput in) throws IOException { @@ -586,6 +602,7 @@ public Stats(StreamInput in) throws IOException { this.uploadBytesPerSecMovingAverage = in.readDouble(); this.uploadTimeMovingAverage = in.readDouble(); this.bytesLag = in.readLong(); + this.directoryFileTransferTrackerStats = in.readOptionalWriteable(DirectoryFileTransferTracker.Stats::new); } catch (IOException e) { throw e; } @@ -612,7 +629,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeDouble(uploadBytesPerSecMovingAverage); out.writeDouble(uploadTimeMovingAverage); out.writeLong(bytesLag); + out.writeOptionalWriteable(directoryFileTransferTrackerStats); } } - } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index a79747038b9ae..4c0a8167f72d0 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -28,7 +28,7 @@ import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; @@ -87,7 +87,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL private final IndexShard indexShard; private final Directory storeDirectory; private final RemoteSegmentStoreDirectory remoteDirectory; - private final RemoteRefreshSegmentTracker segmentTracker; + private final RemoteSegmentTransferTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; private volatile Iterator backoffDelayIterator; @@ -104,7 +104,7 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, - RemoteRefreshSegmentTracker segmentTracker + RemoteSegmentTransferTracker segmentTracker ) { super(indexShard.getThreadPool()); logger = Loggers.getLogger(getClass(), indexShard.shardId()); diff --git a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java new file mode 100644 index 0000000000000..7e0e231d7bad9 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java @@ -0,0 +1,195 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.opensearch.common.util.MovingAverage; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Tracks the amount of bytes transferred between two {@link org.apache.lucene.store.Directory} instances + * + * @opensearch.internal + */ +public class DirectoryFileTransferTracker { + /** + * Cumulative size of files (in bytes) attempted to be transferred over from the source {@link org.apache.lucene.store.Directory} + */ + private volatile long transferredBytesStarted; + + /** + * Cumulative size of files (in bytes) successfully transferred over from the source {@link org.apache.lucene.store.Directory} + */ + private volatile long transferredBytesFailed; + + /** + * Cumulative size of files (in bytes) failed in transfer over from the source {@link org.apache.lucene.store.Directory} + */ + private volatile long transferredBytesSucceeded; + + /** + * Time in milliseconds for the last successful transfer from the source {@link org.apache.lucene.store.Directory} + */ + private volatile long lastTransferTimestampMs; + + /** + * Provides moving average over the last N total size in bytes of files transferred from the source {@link org.apache.lucene.store.Directory}. + * N is window size + */ + private volatile MovingAverage transferredBytesMovingAverageReference; + + private volatile long lastSuccessfulTransferInBytes; + + /** + * Provides moving average over the last N transfer speed (in bytes/s) of segment files transferred from the source {@link org.apache.lucene.store.Directory}. + * N is window size + */ + private volatile MovingAverage transferredBytesPerSecMovingAverageReference; + + private final int DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE = 20; + + public long getTransferredBytesStarted() { + return transferredBytesStarted; + } + + public void addTransferredBytesStarted(long size) { + transferredBytesStarted += size; + } + + public long getTransferredBytesFailed() { + return transferredBytesFailed; + } + + public void addTransferredBytesFailed(long size) { + transferredBytesFailed += size; + } + + public long getTransferredBytesSucceeded() { + return transferredBytesSucceeded; + } + + public void addTransferredBytesSucceeded(long size, long startTimeInMs) { + transferredBytesSucceeded += size; + updateLastSuccessfulTransferSize(size); + long currentTimeInMs = System.currentTimeMillis(); + updateLastTransferTimestampMs(currentTimeInMs); + long timeTakenInMS = Math.max(1, currentTimeInMs - startTimeInMs); + addTransferredBytesPerSec((size * 1_000L) / timeTakenInMS); + } + + public boolean isTransferredBytesPerSecAverageReady() { + return transferredBytesPerSecMovingAverageReference.isReady(); + } + + public double getTransferredBytesPerSecAverage() { + return transferredBytesPerSecMovingAverageReference.getAverage(); + } + + // Visible for testing + public void addTransferredBytesPerSec(long bytesPerSec) { + this.transferredBytesPerSecMovingAverageReference.record(bytesPerSec); + } + + public boolean isTransferredBytesAverageReady() { + return transferredBytesMovingAverageReference.isReady(); + } + + public double getTransferredBytesAverage() { + return transferredBytesMovingAverageReference.getAverage(); + } + + // Visible for testing + public void updateLastSuccessfulTransferSize(long size) { + lastSuccessfulTransferInBytes = size; + this.transferredBytesMovingAverageReference.record(size); + } + + public long getLastTransferTimestampMs() { + return lastTransferTimestampMs; + } + + // Visible for testing + public void updateLastTransferTimestampMs(long downloadTimestampInMs) { + this.lastTransferTimestampMs = downloadTimestampInMs; + } + + public DirectoryFileTransferTracker() { + transferredBytesMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); + transferredBytesPerSecMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); + } + + public DirectoryFileTransferTracker.Stats stats() { + return new Stats( + transferredBytesStarted, + transferredBytesFailed, + transferredBytesSucceeded, + lastTransferTimestampMs, + transferredBytesMovingAverageReference.getAverage(), + lastSuccessfulTransferInBytes, + transferredBytesPerSecMovingAverageReference.getAverage() + ); + } + + /** + * Represents the tracker's stats presentable to an API. + * + * @opensearch.internal + */ + public static class Stats implements Writeable { + public final long transferredBytesStarted; + public final long transferredBytesFailed; + public final long transferredBytesSucceeded; + public final long lastTransferTimestampMs; + public final double transferredBytesMovingAverage; + public final long lastSuccessfulTransferInBytes; + public final double transferredBytesPerSecMovingAverage; + + public Stats( + long transferredBytesStarted, + long transferredBytesFailed, + long downloadBytesSucceeded, + long lastTransferTimestampMs, + double transferredBytesMovingAverage, + long lastSuccessfulTransferInBytes, + double transferredBytesPerSecMovingAverage + ) { + this.transferredBytesStarted = transferredBytesStarted; + this.transferredBytesFailed = transferredBytesFailed; + this.transferredBytesSucceeded = downloadBytesSucceeded; + this.lastTransferTimestampMs = lastTransferTimestampMs; + this.transferredBytesMovingAverage = transferredBytesMovingAverage; + this.lastSuccessfulTransferInBytes = lastSuccessfulTransferInBytes; + this.transferredBytesPerSecMovingAverage = transferredBytesPerSecMovingAverage; + } + + public Stats(StreamInput in) throws IOException { + this.transferredBytesStarted = in.readLong(); + this.transferredBytesFailed = in.readLong(); + this.transferredBytesSucceeded = in.readLong(); + this.lastTransferTimestampMs = in.readLong(); + this.transferredBytesMovingAverage = in.readDouble(); + this.lastSuccessfulTransferInBytes = in.readLong(); + this.transferredBytesPerSecMovingAverage = in.readDouble(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(transferredBytesStarted); + out.writeLong(transferredBytesFailed); + out.writeLong(transferredBytesSucceeded); + out.writeLong(lastTransferTimestampMs); + out.writeDouble(transferredBytesMovingAverage); + out.writeLong(lastSuccessfulTransferInBytes); + out.writeDouble(transferredBytesPerSecMovingAverage); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 8967100d4faf0..a67b87f58110c 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -963,18 +963,24 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l } } + public DirectoryFileTransferTracker getDirectoryFileTransferTracker() { + return directory.getDirectoryFileTransferTracker(); + } + /** * A store directory * * @opensearch.internal */ static final class StoreDirectory extends FilterDirectory { - private final Logger deletesLogger; + public final DirectoryFileTransferTracker directoryFileTransferTracker; + StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) { super(delegateDirectory); this.deletesLogger = deletesLogger; + this.directoryFileTransferTracker = new DirectoryFileTransferTracker(); } /** Estimate the cumulative size of all files in this directory in bytes. */ @@ -1012,6 +1018,52 @@ public Set getPendingDeletions() throws IOException { // to be removed once fixed in FilterDirectory. return unwrap(this).getPendingDeletions(); } + + public DirectoryFileTransferTracker getDirectoryFileTransferTracker() { + return directoryFileTransferTracker; + } + + @Override + public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException { + long fileSize = from.fileLength(src); + beforeDownload(fileSize); + boolean success = false; + try { + long startTime = System.currentTimeMillis(); + super.copyFrom(from, src, dest, context); + success = true; + afterDownload(fileSize, startTime); + } finally { + if (!success) { + downloadFailed(fileSize); + } + } + } + + /** + * Updates the amount of bytes attempted for download + */ + private void beforeDownload(long fileSize) { + directoryFileTransferTracker.addTransferredBytesStarted(fileSize); + } + + /** + * Updates + * - The amount of bytes that has been successfully downloaded from the source store + * - The last successful download completion timestamp + * - The last successfully downloaded file + * - Download speed (in bytes/sec) + */ + private void afterDownload(long fileSize, long startTimeInMs) { + directoryFileTransferTracker.addTransferredBytesSucceeded(fileSize, startTimeInMs); + } + + /** + * Updates the amount of bytes failed in download + */ + private void downloadFailed(long fileSize) { + directoryFileTransferTracker.addTransferredBytesFailed(fileSize); + } } /** diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java index a476b66719d3f..64dfda86c1af9 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsResponseTests.java @@ -10,10 +10,11 @@ import org.opensearch.core.action.support.DefaultShardOperationFailedException; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.core.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -23,7 +24,10 @@ import java.util.Map; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createShardRouting; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewPrimary; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewReplica; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForRemoteStoreRestoredPrimary; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; public class RemoteStoreStatsResponseTests extends OpenSearchTestCase { @@ -43,11 +47,12 @@ public void tearDown() throws Exception { threadPool.shutdownNow(); } - public void testSerialization() throws Exception { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + public void testSerializationForPrimary() throws Exception { + RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId); + ShardRouting primaryShardRouting = createShardRouting(shardId, true); + RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, primaryShardRouting); RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse( - new RemoteStoreStats[] { stats }, + new RemoteStoreStats[] { primaryShardStats }, 1, 1, 0, @@ -58,15 +63,96 @@ public void testSerialization() throws Exception { statsResponse.toXContent(builder, EMPTY_PARAMS); Map jsonResponseObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()) .v2(); + Map metadataShardsObject = (Map) jsonResponseObject.get("_shards"); + assertEquals(metadataShardsObject.get("total"), 1); + assertEquals(metadataShardsObject.get("successful"), 1); + assertEquals(metadataShardsObject.get("failed"), 0); + Map indicesObject = (Map) jsonResponseObject.get("indices"); + assertTrue(indicesObject.containsKey("index")); + Map shardsObject = (Map) ((Map) indicesObject.get("index")).get("shards"); + ArrayList> perShardNumberObject = (ArrayList>) shardsObject.get("0"); + assertEquals(perShardNumberObject.size(), 1); + Map perShardCopyObject = perShardNumberObject.get(0); + compareStatsResponse(perShardCopyObject, mockPrimaryTrackerStats, primaryShardRouting); + } + + public void testSerializationForBothPrimaryAndReplica() throws Exception { + RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForNewPrimary(shardId); + RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId); + ShardRouting primaryShardRouting = createShardRouting(shardId, true); + ShardRouting replicaShardRouting = createShardRouting(shardId, false); + RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, primaryShardRouting); + RemoteStoreStats replicaShardStats = new RemoteStoreStats(mockReplicaTrackerStats, replicaShardRouting); + RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse( + new RemoteStoreStats[] { primaryShardStats, replicaShardStats }, + 2, + 2, + 0, + new ArrayList() + ); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + statsResponse.toXContent(builder, EMPTY_PARAMS); + Map jsonResponseObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()) + .v2(); + Map metadataShardsObject = (Map) jsonResponseObject.get("_shards"); + assertEquals(2, metadataShardsObject.get("total")); + assertEquals(2, metadataShardsObject.get("successful")); + assertEquals(0, metadataShardsObject.get("failed")); + Map indicesObject = (Map) jsonResponseObject.get("indices"); + assertTrue(indicesObject.containsKey("index")); + Map shardsObject = (Map) ((Map) indicesObject.get("index")).get("shards"); + ArrayList> perShardNumberObject = (ArrayList>) shardsObject.get("0"); + assertEquals(2, perShardNumberObject.size()); + perShardNumberObject.forEach(shardObject -> { + boolean isPrimary = (boolean) ((Map) shardObject.get(RemoteStoreStats.Fields.ROUTING)).get( + RemoteStoreStats.RoutingFields.PRIMARY + ); + if (isPrimary) { + compareStatsResponse(shardObject, mockPrimaryTrackerStats, primaryShardRouting); + } else { + compareStatsResponse(shardObject, mockReplicaTrackerStats, replicaShardRouting); + } + }); + } - ArrayList> statsObjectArray = (ArrayList>) jsonResponseObject.get("stats"); - assertEquals(statsObjectArray.size(), 1); - Map statsObject = statsObjectArray.get(0); - Map shardsObject = (Map) jsonResponseObject.get("_shards"); + public void testSerializationForBothRemoteStoreRestoredPrimaryAndReplica() throws Exception { + RemoteSegmentTransferTracker.Stats mockPrimaryTrackerStats = createStatsForRemoteStoreRestoredPrimary(shardId); + RemoteSegmentTransferTracker.Stats mockReplicaTrackerStats = createStatsForNewReplica(shardId); + ShardRouting primaryShardRouting = createShardRouting(shardId, true); + ShardRouting replicaShardRouting = createShardRouting(shardId, false); + RemoteStoreStats primaryShardStats = new RemoteStoreStats(mockPrimaryTrackerStats, primaryShardRouting); + RemoteStoreStats replicaShardStats = new RemoteStoreStats(mockReplicaTrackerStats, replicaShardRouting); + RemoteStoreStatsResponse statsResponse = new RemoteStoreStatsResponse( + new RemoteStoreStats[] { primaryShardStats, replicaShardStats }, + 2, + 2, + 0, + new ArrayList() + ); - assertEquals(shardsObject.get("total"), 1); - assertEquals(shardsObject.get("successful"), 1); - assertEquals(shardsObject.get("failed"), 0); - compareStatsResponse(statsObject, pressureTrackerStats); + XContentBuilder builder = XContentFactory.jsonBuilder(); + statsResponse.toXContent(builder, EMPTY_PARAMS); + Map jsonResponseObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()) + .v2(); + Map metadataShardsObject = (Map) jsonResponseObject.get("_shards"); + assertEquals(2, metadataShardsObject.get("total")); + assertEquals(2, metadataShardsObject.get("successful")); + assertEquals(0, metadataShardsObject.get("failed")); + Map indicesObject = (Map) jsonResponseObject.get("indices"); + assertTrue(indicesObject.containsKey("index")); + Map shardsObject = (Map) ((Map) indicesObject.get("index")).get("shards"); + ArrayList> perShardNumberObject = (ArrayList>) shardsObject.get("0"); + assertEquals(2, perShardNumberObject.size()); + perShardNumberObject.forEach(shardObject -> { + boolean isPrimary = (boolean) ((Map) shardObject.get(RemoteStoreStats.Fields.ROUTING)).get( + RemoteStoreStats.RoutingFields.PRIMARY + ); + if (isPrimary) { + compareStatsResponse(shardObject, mockPrimaryTrackerStats, primaryShardRouting); + } else { + compareStatsResponse(shardObject, mockReplicaTrackerStats, replicaShardRouting); + } + }); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 747dc692b1d5d..0c081ee238e2d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -8,80 +8,264 @@ package org.opensearch.action.admin.cluster.remotestore.stats; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.DirectoryFileTransferTracker; import java.util.Map; +import static org.junit.Assert.assertTrue; import static org.opensearch.test.OpenSearchTestCase.assertEquals; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; /** * Helper utilities for Remote Store stats tests */ public class RemoteStoreStatsTestHelper { - static RemoteRefreshSegmentTracker.Stats createPressureTrackerStats(ShardId shardId) { - return new RemoteRefreshSegmentTracker.Stats(shardId, 101, 102, 100, 3, 2, 10, 5, 5, 10, 5, 5, 3, 2, 5, 2, 3, 4, 9); + static RemoteSegmentTransferTracker.Stats createStatsForNewPrimary(ShardId shardId) { + return new RemoteSegmentTransferTracker.Stats( + shardId, + 101, + 102, + 100, + 0, + 10, + 2, + 10, + 5, + 5, + 0, + 0, + 0, + 5, + 5, + 5, + 0, + 0, + 0, + createZeroDirectoryFileTransferStats() + ); } - static void compareStatsResponse(Map statsObject, RemoteRefreshSegmentTracker.Stats pressureTrackerStats) { - assertEquals(statsObject.get(RemoteStoreStats.Fields.SHARD_ID), pressureTrackerStats.shardId.toString()); - assertEquals(statsObject.get(RemoteStoreStats.Fields.LOCAL_REFRESH_TIMESTAMP), (int) pressureTrackerStats.localRefreshClockTimeMs); - assertEquals( - statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_TIMESTAMP), - (int) pressureTrackerStats.remoteRefreshClockTimeMs - ); - assertEquals(statsObject.get(RemoteStoreStats.Fields.REFRESH_TIME_LAG_IN_MILLIS), (int) pressureTrackerStats.refreshTimeLagMs); - assertEquals( - statsObject.get(RemoteStoreStats.Fields.REFRESH_LAG), - (int) (pressureTrackerStats.localRefreshNumber - pressureTrackerStats.remoteRefreshNumber) + static RemoteSegmentTransferTracker.Stats createStatsForNewReplica(ShardId shardId) { + return new RemoteSegmentTransferTracker.Stats( + shardId, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + createSampleDirectoryFileTransferStats() ); - assertEquals(statsObject.get(RemoteStoreStats.Fields.BYTES_LAG), (int) pressureTrackerStats.bytesLag); + } - assertEquals(statsObject.get(RemoteStoreStats.Fields.BACKPRESSURE_REJECTION_COUNT), (int) pressureTrackerStats.rejectionCount); - assertEquals( - statsObject.get(RemoteStoreStats.Fields.CONSECUTIVE_FAILURE_COUNT), - (int) pressureTrackerStats.consecutiveFailuresCount + static RemoteSegmentTransferTracker.Stats createStatsForRemoteStoreRestoredPrimary(ShardId shardId) { + return new RemoteSegmentTransferTracker.Stats( + shardId, + 50, + 50, + 0, + 50, + 11, + 11, + 10, + 10, + 0, + 10, + 10, + 0, + 10, + 10, + 0, + 0, + 0, + 100, + createSampleDirectoryFileTransferStats() ); + } + static DirectoryFileTransferTracker.Stats createSampleDirectoryFileTransferStats() { + return new DirectoryFileTransferTracker.Stats(10, 0, 10, 12345, 5, 5, 5); + } + + static DirectoryFileTransferTracker.Stats createZeroDirectoryFileTransferStats() { + return new DirectoryFileTransferTracker.Stats(0, 0, 0, 0, 0, 0, 0); + } + + static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { + return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); + } + + static void compareStatsResponse( + Map statsObject, + RemoteSegmentTransferTracker.Stats statsTracker, + ShardRouting routing + ) { assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.STARTED), - (int) pressureTrackerStats.uploadBytesStarted - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.SUCCEEDED), - (int) pressureTrackerStats.uploadBytesSucceeded - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_UPLOADS_IN_BYTES)).get(RemoteStoreStats.SubFields.FAILED), - (int) pressureTrackerStats.uploadBytesFailed - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadBytesMovingAverage - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_SIZE_IN_BYTES)).get(RemoteStoreStats.SubFields.LAST_SUCCESSFUL), - (int) pressureTrackerStats.lastSuccessfulRemoteRefreshBytes - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadBytesPerSecMovingAverage - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.STARTED), - (int) pressureTrackerStats.totalUploadsStarted - ); - assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.SUCCEEDED), - (int) pressureTrackerStats.totalUploadsSucceeded + ((Map) statsObject.get(RemoteStoreStats.Fields.ROUTING)).get(RemoteStoreStats.RoutingFields.NODE_ID), + routing.currentNodeId() ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.TOTAL_REMOTE_REFRESH)).get(RemoteStoreStats.SubFields.FAILED), - (int) pressureTrackerStats.totalUploadsFailed + ((Map) statsObject.get(RemoteStoreStats.Fields.ROUTING)).get(RemoteStoreStats.RoutingFields.STATE), + routing.state().toString() ); assertEquals( - ((Map) statsObject.get(RemoteStoreStats.Fields.REMOTE_REFRESH_LATENCY_IN_MILLIS)).get(RemoteStoreStats.SubFields.MOVING_AVG), - pressureTrackerStats.uploadTimeMovingAverage + ((Map) statsObject.get(RemoteStoreStats.Fields.ROUTING)).get(RemoteStoreStats.RoutingFields.PRIMARY), + routing.primary() ); + + Map segment = ((Map) statsObject.get(RemoteStoreStats.Fields.SEGMENT)); + Map segmentDownloads = ((Map) segment.get(RemoteStoreStats.SubFields.DOWNLOAD)); + Map segmentUploads = ((Map) segment.get(RemoteStoreStats.SubFields.UPLOAD)); + + if (statsTracker.directoryFileTransferTrackerStats.transferredBytesStarted != 0) { + assertEquals( + segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.LAST_SYNC_TIMESTAMP), + (int) statsTracker.directoryFileTransferTrackerStats.lastTransferTimestampMs + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.STARTED + ), + (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesStarted + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ), + (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.TOTAL_DOWNLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.FAILED + ), + (int) statsTracker.directoryFileTransferTrackerStats.transferredBytesFailed + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.LAST_SUCCESSFUL + ), + (int) statsTracker.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ), + statsTracker.directoryFileTransferTrackerStats.transferredBytesMovingAverage + ); + assertEquals( + ((Map) segmentDownloads.get(RemoteStoreStats.DownloadStatsFields.DOWNLOAD_SPEED_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ), + statsTracker.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage + ); + } else { + assertTrue(segmentDownloads.isEmpty()); + } + + if (statsTracker.totalUploadsStarted != 0) { + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.LOCAL_REFRESH_TIMESTAMP), + (int) statsTracker.localRefreshClockTimeMs + ); + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_TIMESTAMP), + (int) statsTracker.remoteRefreshClockTimeMs + ); + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.REFRESH_TIME_LAG_IN_MILLIS), + (int) statsTracker.refreshTimeLagMs + ); + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.REFRESH_LAG), + (int) (statsTracker.localRefreshNumber - statsTracker.remoteRefreshNumber) + ); + assertEquals(segmentUploads.get(RemoteStoreStats.UploadStatsFields.BYTES_LAG), (int) statsTracker.bytesLag); + + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.BACKPRESSURE_REJECTION_COUNT), + (int) statsTracker.rejectionCount + ); + assertEquals( + segmentUploads.get(RemoteStoreStats.UploadStatsFields.CONSECUTIVE_FAILURE_COUNT), + (int) statsTracker.consecutiveFailuresCount + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.STARTED + ), + (int) statsTracker.uploadBytesStarted + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ), + (int) statsTracker.uploadBytesSucceeded + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS_IN_BYTES)).get( + RemoteStoreStats.SubFields.FAILED + ), + (int) statsTracker.uploadBytesFailed + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ), + statsTracker.uploadBytesMovingAverage + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_SIZE_IN_BYTES)).get( + RemoteStoreStats.SubFields.LAST_SUCCESSFUL + ), + (int) statsTracker.lastSuccessfulRemoteRefreshBytes + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.UPLOAD_LATENCY_IN_BYTES_PER_SEC)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ), + statsTracker.uploadBytesPerSecMovingAverage + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get( + RemoteStoreStats.SubFields.STARTED + ), + (int) statsTracker.totalUploadsStarted + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get( + RemoteStoreStats.SubFields.SUCCEEDED + ), + (int) statsTracker.totalUploadsSucceeded + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.TOTAL_SYNCS_TO_REMOTE)).get(RemoteStoreStats.SubFields.FAILED), + (int) statsTracker.totalUploadsFailed + ); + assertEquals( + ((Map) segmentUploads.get(RemoteStoreStats.UploadStatsFields.REMOTE_REFRESH_LATENCY_IN_MILLIS)).get( + RemoteStoreStats.SubFields.MOVING_AVG + ), + statsTracker.uploadTimeMovingAverage + ); + } else { + assertTrue(segmentUploads.isEmpty()); + } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java index fc057b71b15f8..3597a5350e1fb 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTests.java @@ -9,12 +9,13 @@ package org.opensearch.action.admin.cluster.remotestore.stats; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.core.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -24,7 +25,10 @@ import java.util.Map; import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.compareStatsResponse; -import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createPressureTrackerStats; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewReplica; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createShardRouting; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForNewPrimary; +import static org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsTestHelper.createStatsForRemoteStoreRestoredPrimary; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; public class RemoteStoreStatsTests extends OpenSearchTestCase { @@ -44,43 +48,175 @@ public void tearDown() throws Exception { threadPool.shutdownNow(); } - public void testXContentBuilder() throws IOException { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + public void testXContentBuilderWithPrimaryShard() throws IOException { + RemoteSegmentTransferTracker.Stats uploadStats = createStatsForNewPrimary(shardId); + ShardRouting routing = createShardRouting(shardId, true); + RemoteStoreStats stats = new RemoteStoreStats(uploadStats, routing); XContentBuilder builder = XContentFactory.jsonBuilder(); stats.toXContent(builder, EMPTY_PARAMS); Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); - compareStatsResponse(jsonObject, pressureTrackerStats); + compareStatsResponse(jsonObject, uploadStats, routing); } - public void testSerialization() throws Exception { - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = createPressureTrackerStats(shardId); - RemoteStoreStats stats = new RemoteStoreStats(pressureTrackerStats); + public void testXContentBuilderWithReplicaShard() throws IOException { + RemoteSegmentTransferTracker.Stats downloadStats = createStatsForNewReplica(shardId); + ShardRouting routing = createShardRouting(shardId, false); + RemoteStoreStats stats = new RemoteStoreStats(downloadStats, routing); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + stats.toXContent(builder, EMPTY_PARAMS); + Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + compareStatsResponse(jsonObject, downloadStats, routing); + } + + public void testXContentBuilderWithRemoteStoreRestoredShard() throws IOException { + RemoteSegmentTransferTracker.Stats remotestoreRestoredShardStats = createStatsForRemoteStoreRestoredPrimary(shardId); + ShardRouting routing = createShardRouting(shardId, true); + RemoteStoreStats stats = new RemoteStoreStats(remotestoreRestoredShardStats, routing); + + XContentBuilder builder = XContentFactory.jsonBuilder(); + stats.toXContent(builder, EMPTY_PARAMS); + Map jsonObject = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + compareStatsResponse(jsonObject, remotestoreRestoredShardStats, routing); + } + + public void testSerializationForPrimaryShard() throws Exception { + RemoteSegmentTransferTracker.Stats primaryShardStats = createStatsForNewPrimary(shardId); + RemoteStoreStats stats = new RemoteStoreStats(primaryShardStats, createShardRouting(shardId, true)); + try (BytesStreamOutput out = new BytesStreamOutput()) { + stats.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); + assertEquals(stats.getStats().refreshTimeLagMs, deserializedStats.refreshTimeLagMs); + assertEquals(stats.getStats().localRefreshNumber, deserializedStats.localRefreshNumber); + assertEquals(stats.getStats().remoteRefreshNumber, deserializedStats.remoteRefreshNumber); + assertEquals(stats.getStats().uploadBytesStarted, deserializedStats.uploadBytesStarted); + assertEquals(stats.getStats().uploadBytesSucceeded, deserializedStats.uploadBytesSucceeded); + assertEquals(stats.getStats().uploadBytesFailed, deserializedStats.uploadBytesFailed); + assertEquals(stats.getStats().totalUploadsStarted, deserializedStats.totalUploadsStarted); + assertEquals(stats.getStats().totalUploadsFailed, deserializedStats.totalUploadsFailed); + assertEquals(stats.getStats().totalUploadsSucceeded, deserializedStats.totalUploadsSucceeded); + assertEquals(stats.getStats().rejectionCount, deserializedStats.rejectionCount); + assertEquals(stats.getStats().consecutiveFailuresCount, deserializedStats.consecutiveFailuresCount); + assertEquals(stats.getStats().uploadBytesMovingAverage, deserializedStats.uploadBytesMovingAverage, 0); + assertEquals(stats.getStats().uploadBytesPerSecMovingAverage, deserializedStats.uploadBytesPerSecMovingAverage, 0); + assertEquals(stats.getStats().uploadTimeMovingAverage, deserializedStats.uploadTimeMovingAverage, 0); + assertEquals(stats.getStats().bytesLag, deserializedStats.bytesLag); + assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted); + assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed); + assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded); + assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes); + assertEquals(0, deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs); + } + } + } + + public void testSerializationForReplicaShard() throws Exception { + RemoteSegmentTransferTracker.Stats replicaShardStats = createStatsForNewReplica(shardId); + RemoteStoreStats stats = new RemoteStoreStats(replicaShardStats, createShardRouting(shardId, false)); try (BytesStreamOutput out = new BytesStreamOutput()) { stats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - RemoteStoreStats deserializedStats = new RemoteStoreStats(in); - assertEquals(deserializedStats.getStats().shardId.toString(), stats.getStats().shardId.toString()); - assertEquals(deserializedStats.getStats().refreshTimeLagMs, stats.getStats().refreshTimeLagMs); - assertEquals(deserializedStats.getStats().localRefreshNumber, stats.getStats().localRefreshNumber); - assertEquals(deserializedStats.getStats().remoteRefreshNumber, stats.getStats().remoteRefreshNumber); - assertEquals(deserializedStats.getStats().uploadBytesStarted, stats.getStats().uploadBytesStarted); - assertEquals(deserializedStats.getStats().uploadBytesSucceeded, stats.getStats().uploadBytesSucceeded); - assertEquals(deserializedStats.getStats().uploadBytesFailed, stats.getStats().uploadBytesFailed); - assertEquals(deserializedStats.getStats().totalUploadsStarted, stats.getStats().totalUploadsStarted); - assertEquals(deserializedStats.getStats().totalUploadsFailed, stats.getStats().totalUploadsFailed); - assertEquals(deserializedStats.getStats().totalUploadsSucceeded, stats.getStats().totalUploadsSucceeded); - assertEquals(deserializedStats.getStats().rejectionCount, stats.getStats().rejectionCount); - assertEquals(deserializedStats.getStats().consecutiveFailuresCount, stats.getStats().consecutiveFailuresCount); - assertEquals(deserializedStats.getStats().uploadBytesMovingAverage, stats.getStats().uploadBytesMovingAverage, 0); + RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); + assertEquals(0, deserializedStats.refreshTimeLagMs); + assertEquals(0, deserializedStats.localRefreshNumber); + assertEquals(0, deserializedStats.remoteRefreshNumber); + assertEquals(0, deserializedStats.uploadBytesStarted); + assertEquals(0, deserializedStats.uploadBytesSucceeded); + assertEquals(0, deserializedStats.uploadBytesFailed); + assertEquals(0, deserializedStats.totalUploadsStarted); + assertEquals(0, deserializedStats.totalUploadsFailed); + assertEquals(0, deserializedStats.totalUploadsSucceeded); + assertEquals(0, deserializedStats.rejectionCount); + assertEquals(0, deserializedStats.consecutiveFailuresCount); + assertEquals(0, deserializedStats.bytesLag); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesFailed, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes, + deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.lastTransferTimestampMs, + deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, + 0 + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesMovingAverage, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage, + 0 + ); + } + } + } + + public void testSerializationForRemoteStoreRestoredPrimaryShard() throws Exception { + RemoteSegmentTransferTracker.Stats primaryShardStats = createStatsForRemoteStoreRestoredPrimary(shardId); + RemoteStoreStats stats = new RemoteStoreStats(primaryShardStats, createShardRouting(shardId, true)); + try (BytesStreamOutput out = new BytesStreamOutput()) { + stats.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteStoreStats(in).getStats(); + assertEquals(stats.getStats().refreshTimeLagMs, deserializedStats.refreshTimeLagMs); + assertEquals(stats.getStats().localRefreshNumber, deserializedStats.localRefreshNumber); + assertEquals(stats.getStats().remoteRefreshNumber, deserializedStats.remoteRefreshNumber); + assertEquals(stats.getStats().uploadBytesStarted, deserializedStats.uploadBytesStarted); + assertEquals(stats.getStats().uploadBytesSucceeded, deserializedStats.uploadBytesSucceeded); + assertEquals(stats.getStats().uploadBytesFailed, deserializedStats.uploadBytesFailed); + assertEquals(stats.getStats().totalUploadsStarted, deserializedStats.totalUploadsStarted); + assertEquals(stats.getStats().totalUploadsFailed, deserializedStats.totalUploadsFailed); + assertEquals(stats.getStats().totalUploadsSucceeded, deserializedStats.totalUploadsSucceeded); + assertEquals(stats.getStats().rejectionCount, deserializedStats.rejectionCount); + assertEquals(stats.getStats().consecutiveFailuresCount, deserializedStats.consecutiveFailuresCount); + assertEquals(stats.getStats().uploadBytesMovingAverage, deserializedStats.uploadBytesMovingAverage, 0); + assertEquals(stats.getStats().uploadBytesPerSecMovingAverage, deserializedStats.uploadBytesPerSecMovingAverage, 0); + assertEquals(stats.getStats().uploadTimeMovingAverage, deserializedStats.uploadTimeMovingAverage, 0); + assertEquals(stats.getStats().bytesLag, deserializedStats.bytesLag); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesStarted, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesFailed, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesFailed + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesSucceeded, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes, + deserializedStats.directoryFileTransferTrackerStats.lastSuccessfulTransferInBytes + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.lastTransferTimestampMs, + deserializedStats.directoryFileTransferTrackerStats.lastTransferTimestampMs + ); + assertEquals( + stats.getStats().directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, + 0 + ); assertEquals( - deserializedStats.getStats().uploadBytesPerSecMovingAverage, - stats.getStats().uploadBytesPerSecMovingAverage, + stats.getStats().directoryFileTransferTrackerStats.transferredBytesMovingAverage, + deserializedStats.directoryFileTransferTrackerStats.transferredBytesMovingAverage, 0 ); - assertEquals(deserializedStats.getStats().uploadTimeMovingAverage, stats.getStats().uploadTimeMovingAverage, 0); - assertEquals(deserializedStats.getStats().bytesLag, stats.getStats().bytesLag); } } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java index 5c17765903504..aa3e7ab1fb2c7 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/TransportRemoteStoreStatsActionTests.java @@ -29,7 +29,7 @@ import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; @@ -89,7 +89,7 @@ public void setUp() throws Exception { Collections.emptySet() ); - when(pressureService.getRemoteRefreshSegmentTracker(any())).thenReturn(mock(RemoteRefreshSegmentTracker.class)); + when(pressureService.getRemoteRefreshSegmentTracker(any())).thenReturn(mock(RemoteSegmentTransferTracker.class)); when(indicesService.indexService(INDEX)).thenReturn(indexService); when(indexService.getIndexSettings()).thenReturn(new IndexSettings(remoteStoreIndexMetadata, Settings.EMPTY)); statsAction = new TransportRemoteStoreStatsAction( @@ -111,7 +111,7 @@ public void tearDown() throws Exception { clusterService.close(); } - public void testOnlyPrimaryShards() throws Exception { + public void testAllShardCopies() throws Exception { FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); RoutingTable routingTable = RoutingTable.builder().addAsNew(remoteStoreIndexMetadata).build(); Metadata metadata = Metadata.builder().put(remoteStoreIndexMetadata, false).build(); @@ -128,7 +128,7 @@ public void testOnlyPrimaryShards() throws Exception { new String[] { INDEX.getName() } ); - assertEquals(shardsIterator.size(), 2); + assertEquals(shardsIterator.size(), 4); } public void testOnlyLocalShards() throws Exception { @@ -156,10 +156,10 @@ public void testOnlyLocalShards() throws Exception { remoteStoreStatsRequest.local(true); ShardsIterator shardsIterator = statsAction.shards(clusterService.state(), remoteStoreStatsRequest, concreteIndices); - assertEquals(shardsIterator.size(), 1); + assertEquals(shardsIterator.size(), 2); } - public void testOnlyRemoteStoreEnabledShards() throws Exception { + public void testOnlyRemoteStoreEnabledShardCopies() throws Exception { FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); Index NEW_INDEX = new Index("newIndex", "newUUID"); IndexMetadata indexMetadataWithoutRemoteStore = IndexMetadata.builder(NEW_INDEX.getName()) @@ -192,6 +192,6 @@ public void testOnlyRemoteStoreEnabledShards() throws Exception { new String[] { INDEX.getName() } ); - assertEquals(shardsIterator.size(), 2); + assertEquals(shardsIterator.size(), 4); } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java index 69f3456e825f8..8d444f5d10f26 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentPressureServiceTests.java @@ -17,6 +17,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.index.store.Store; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -100,7 +101,7 @@ public void testValidateSegmentUploadLag() { pressureService = new RemoteRefreshSegmentPressureService(clusterService, Settings.EMPTY); pressureService.afterIndexShardCreated(indexShard); - RemoteRefreshSegmentTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); + RemoteSegmentTransferTracker pressureTracker = pressureService.getRemoteRefreshSegmentTracker(shardId); pressureTracker.updateLocalRefreshSeqNo(6); // 1. time lag more than dynamic threshold @@ -156,10 +157,11 @@ private static IndexShard createIndexShard(ShardId shardId, boolean remoteStoreE .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, String.valueOf(remoteStoreEnabled)) .build(); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test_index", settings); + Store store = mock(Store.class); IndexShard indexShard = mock(IndexShard.class); when(indexShard.indexSettings()).thenReturn(indexSettings); when(indexShard.shardId()).thenReturn(shardId); + when(indexShard.store()).thenReturn(store); return indexShard; } - } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java similarity index 66% rename from server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java rename to server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index badfeb0d67c05..208cea111f2e1 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteRefreshSegmentTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -14,6 +14,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.store.DirectoryFileTransferTracker; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -24,7 +25,7 @@ import static org.mockito.Mockito.mock; -public class RemoteRefreshSegmentTrackerTests extends OpenSearchTestCase { +public class RemoteSegmentTransferTrackerTests extends OpenSearchTestCase { private RemoteRefreshSegmentPressureSettings pressureSettings; @@ -34,7 +35,9 @@ public class RemoteRefreshSegmentTrackerTests extends OpenSearchTestCase { private ShardId shardId; - private RemoteRefreshSegmentTracker pressureTracker; + private RemoteSegmentTransferTracker pressureTracker; + + private DirectoryFileTransferTracker directoryFileTransferTracker; @Override public void setUp() throws Exception { @@ -51,6 +54,7 @@ public void setUp() throws Exception { mock(RemoteRefreshSegmentPressureService.class) ); shardId = new ShardId("index", "uuid", 0); + directoryFileTransferTracker = new DirectoryFileTransferTracker(); } @Override @@ -60,8 +64,9 @@ public void tearDown() throws Exception { } public void testGetShardId() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -70,8 +75,9 @@ public void testGetShardId() { } public void testUpdateLocalRefreshSeqNo() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -82,8 +88,9 @@ public void testUpdateLocalRefreshSeqNo() { } public void testUpdateRemoteRefreshSeqNo() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -94,8 +101,9 @@ public void testUpdateRemoteRefreshSeqNo() { } public void testUpdateLocalRefreshTimeMs() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -106,8 +114,9 @@ public void testUpdateLocalRefreshTimeMs() { } public void testUpdateRemoteRefreshTimeMs() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -117,9 +126,23 @@ public void testUpdateRemoteRefreshTimeMs() { assertEquals(refreshTimeMs, pressureTracker.getRemoteRefreshTimeMs()); } + public void testLastDownloadTimestampMs() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long currentTimeInMs = System.currentTimeMillis(); + pressureTracker.getDirectoryFileTransferTracker().updateLastTransferTimestampMs(currentTimeInMs); + assertEquals(currentTimeInMs, pressureTracker.getDirectoryFileTransferTracker().getLastTransferTimestampMs()); + } + public void testComputeSeqNoLagOnUpdate() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -133,8 +156,9 @@ public void testComputeSeqNoLagOnUpdate() { } public void testComputeTimeLagOnUpdate() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -150,8 +174,9 @@ public void testComputeTimeLagOnUpdate() { } public void testAddUploadBytesStarted() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -165,8 +190,9 @@ public void testAddUploadBytesStarted() { } public void testAddUploadBytesFailed() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -180,8 +206,9 @@ public void testAddUploadBytesFailed() { } public void testAddUploadBytesSucceeded() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -194,9 +221,58 @@ public void testAddUploadBytesSucceeded() { assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getUploadBytesSucceeded()); } + public void testAddDownloadBytesStarted() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(bytesToAdd); + assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesStarted()); + } + + public void testAddDownloadBytesFailed() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(bytesToAdd); + assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(moreBytesToAdd); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); + } + + public void testAddDownloadBytesSucceeded() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long bytesToAdd = randomLongBetween(1000, 1000000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(bytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); + long moreBytesToAdd = randomLongBetween(1000, 10000); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(moreBytesToAdd, System.currentTimeMillis()); + assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesSucceeded()); + } + public void testGetInflightUploadBytes() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -211,8 +287,9 @@ public void testGetInflightUploadBytes() { } public void testIncrementTotalUploadsStarted() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -224,8 +301,9 @@ public void testIncrementTotalUploadsStarted() { } public void testIncrementTotalUploadsFailed() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -237,8 +315,9 @@ public void testIncrementTotalUploadsFailed() { } public void testIncrementTotalUploadSucceeded() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -250,8 +329,9 @@ public void testIncrementTotalUploadSucceeded() { } public void testGetInflightUploads() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -267,8 +347,9 @@ public void testGetInflightUploads() { } public void testIncrementRejectionCount() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -280,8 +361,9 @@ public void testIncrementRejectionCount() { } public void testGetConsecutiveFailureCount() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -295,8 +377,9 @@ public void testGetConsecutiveFailureCount() { } public void testComputeBytesLag() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -324,8 +407,9 @@ public void testComputeBytesLag() { } public void testIsUploadBytesAverageReady() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -351,8 +435,9 @@ public void testIsUploadBytesAverageReady() { } public void testIsUploadBytesPerSecAverageReady() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -378,8 +463,9 @@ public void testIsUploadBytesPerSecAverageReady() { } public void testIsUploadTimeMsAverageReady() { - pressureTracker = new RemoteRefreshSegmentTracker( + pressureTracker = new RemoteSegmentTransferTracker( shardId, + directoryFileTransferTracker, pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -404,12 +490,68 @@ public void testIsUploadTimeMsAverageReady() { assertEquals((double) sum / 20, pressureTracker.getUploadTimeMsAverage(), 0.0d); } + public void testIsDownloadBytesAverageReady() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + + long sum = 0; + for (int i = 1; i < 20; i++) { + pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(i); + sum += i; + assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + assertEquals((double) sum / i, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + } + + pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(20); + sum += 20; + assertTrue(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); + assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + + pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(100); + sum = sum + 100 - 1; + assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); + } + + public void testIsDownloadBytesPerSecAverageReady() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + + long sum = 0; + for (int i = 1; i < 20; i++) { + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(i); + sum += i; + assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + assertEquals((double) sum / i, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + } + + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(20); + sum += 20; + assertTrue(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesPerSecAverageReady()); + assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(100); + sum = sum + 100 - 1; + assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); + } + /** - * Tests whether RemoteRefreshSegmentTracker.Stats object generated correctly from RemoteRefreshSegmentTracker. + * Tests whether RemoteSegmentTransferTracker.Stats object generated correctly from RemoteSegmentTransferTracker. * */ public void testStatsObjectCreation() { pressureTracker = constructTracker(); - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats(); + RemoteSegmentTransferTracker.Stats pressureTrackerStats = pressureTracker.stats(); assertEquals(pressureTracker.getShardId(), pressureTrackerStats.shardId); assertEquals(pressureTracker.getTimeMsLag(), (int) pressureTrackerStats.refreshTimeLagMs); assertEquals(pressureTracker.getLocalRefreshSeqNo(), (int) pressureTrackerStats.localRefreshNumber); @@ -429,16 +571,16 @@ public void testStatsObjectCreation() { } /** - * Tests whether RemoteRefreshSegmentTracker.Stats object serialize and deserialize is working fine. + * Tests whether RemoteSegmentTransferTracker.Stats object serialize and deserialize is working fine. * This comes into play during internode data transfer. * */ public void testStatsObjectCreationViaStream() throws IOException { pressureTracker = constructTracker(); - RemoteRefreshSegmentTracker.Stats pressureTrackerStats = pressureTracker.stats(); + RemoteSegmentTransferTracker.Stats pressureTrackerStats = pressureTracker.stats(); try (BytesStreamOutput out = new BytesStreamOutput()) { pressureTrackerStats.writeTo(out); try (StreamInput in = out.bytes().streamInput()) { - RemoteRefreshSegmentTracker.Stats deserializedStats = new RemoteRefreshSegmentTracker.Stats(in); + RemoteSegmentTransferTracker.Stats deserializedStats = new RemoteSegmentTransferTracker.Stats(in); assertEquals(deserializedStats.shardId, pressureTrackerStats.shardId); assertEquals((int) deserializedStats.refreshTimeLagMs, (int) pressureTrackerStats.refreshTimeLagMs); assertEquals((int) deserializedStats.localRefreshNumber, (int) pressureTrackerStats.localRefreshNumber); @@ -459,13 +601,26 @@ public void testStatsObjectCreationViaStream() throws IOException { assertEquals((int) deserializedStats.totalUploadsStarted, (int) pressureTrackerStats.totalUploadsStarted); assertEquals((int) deserializedStats.totalUploadsSucceeded, (int) pressureTrackerStats.totalUploadsSucceeded); assertEquals((int) deserializedStats.totalUploadsFailed, (int) pressureTrackerStats.totalUploadsFailed); + assertEquals( + (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesStarted, + (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesStarted + ); + assertEquals( + (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesSucceeded, + (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesSucceeded + ); + assertEquals( + (int) deserializedStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage, + (int) pressureTrackerStats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage + ); } } } - private RemoteRefreshSegmentTracker constructTracker() { - RemoteRefreshSegmentTracker segmentPressureTracker = new RemoteRefreshSegmentTracker( + private RemoteSegmentTransferTracker constructTracker() { + RemoteSegmentTransferTracker segmentPressureTracker = new RemoteSegmentTransferTracker( shardId, + new DirectoryFileTransferTracker(), pressureSettings.getUploadBytesMovingAverageWindowSize(), pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), pressureSettings.getUploadTimeMovingAverageWindowSize() @@ -475,6 +630,9 @@ private RemoteRefreshSegmentTracker constructTracker() { segmentPressureTracker.addUploadBytes(99); segmentPressureTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); segmentPressureTracker.incrementRejectionCount(); + segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesStarted(10); + segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesSucceeded(10, System.currentTimeMillis()); + segmentPressureTracker.getDirectoryFileTransferTracker().addTransferredBytesPerSec(5); return segmentPressureTracker; } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 7c119bfbbc573..f13f89c6e067c 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -26,7 +26,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; -import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; @@ -255,7 +255,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); RemoteRefreshSegmentPressureService pressureService = tuple.v2(); - RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 0); } @@ -276,7 +276,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); RemoteRefreshSegmentPressureService pressureService = tuple.v2(); - RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 1); } @@ -322,11 +322,11 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { assertBusy(() -> assertEquals(0, refreshCountLatch.getCount())); assertBusy(() -> assertEquals(0, successLatch.getCount())); RemoteRefreshSegmentPressureService pressureService = tuple.v2(); - RemoteRefreshSegmentTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTransferTracker segmentTracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLagAndTotalUploadsFailed(segmentTracker, 2); } - private void assertNoLagAndTotalUploadsFailed(RemoteRefreshSegmentTracker segmentTracker, long totalUploadsFailed) throws Exception { + private void assertNoLagAndTotalUploadsFailed(RemoteSegmentTransferTracker segmentTracker, long totalUploadsFailed) throws Exception { assertBusy(() -> { assertEquals(0, segmentTracker.getBytesLag()); assertEquals(0, segmentTracker.getRefreshSeqNoLag()); @@ -339,7 +339,7 @@ public void testTrackerData() throws Exception { Tuple tuple = mockIndexShardWithRetryAndScheduleRefresh(1); RemoteStoreRefreshListener listener = tuple.v1(); RemoteRefreshSegmentPressureService pressureService = tuple.v2(); - RemoteRefreshSegmentTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); + RemoteSegmentTransferTracker tracker = pressureService.getRemoteRefreshSegmentTracker(indexShard.shardId()); assertNoLag(tracker); indexDocs(100, randomIntBetween(100, 200)); indexShard.refresh("test"); @@ -347,7 +347,7 @@ public void testTrackerData() throws Exception { assertBusy(() -> assertNoLag(tracker)); } - private void assertNoLag(RemoteRefreshSegmentTracker tracker) { + private void assertNoLag(RemoteSegmentTransferTracker tracker) { assertEquals(0, tracker.getRefreshSeqNoLag()); assertEquals(0, tracker.getBytesLag()); assertEquals(0, tracker.getTimeMsLag());