Skip to content

Commit

Permalink
[Remote Store] Add support for Remote Translog Store stats in `_remot…
Browse files Browse the repository at this point in the history
…estore/stats/` API (opensearch-project#9263)

---------

Signed-off-by: Bhumika Saini <[email protected]>
Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
Bhumika Saini authored and kaushalmahi12 committed Sep 12, 2023
1 parent ca0e324 commit a70ddca
Show file tree
Hide file tree
Showing 32 changed files with 2,207 additions and 486 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Redefine telemetry context restoration and propagation ([#9617](https://github.com/opensearch-project/OpenSearch/pull/9617))
- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562))
- Added sampler based on `Blanket Probabilistic Sampling rate` and `Override for on demand` ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -72,9 +73,14 @@ public void testStatsResponseFromAllNodes() {
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats stats = matches.get(0).getSegmentStats();
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = matches.get(0).getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}

// Step 3 - Enable replicas on the existing indices and ensure that download
Expand All @@ -92,13 +98,20 @@ public void testStatsResponseFromAllNodes() {
for (RemoteStoreStats stat : matches) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = stat.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(stats);
assertEquals(0, stats.totalUploadsStarted);
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}
}
Expand All @@ -124,10 +137,15 @@ public void testStatsResponseAllShards() {
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertEquals(3, response.getSuccessfulShards());
assertTrue(response.getRemoteStoreStats() != null && response.getRemoteStoreStats().length == 3);

RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = response.getRemoteStoreStats()[0].getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);

// Step 3 - Enable replicas on the existing indices and ensure that download
// stats are being populated as well
changeReplicaCountAndEnsureGreen(1);
Expand All @@ -138,12 +156,19 @@ public void testStatsResponseAllShards() {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
segmentStats = stat.getSegmentStats();
translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}

Expand Down Expand Up @@ -174,6 +199,10 @@ public void testStatsResponseFromLocalNode() {
RemoteSegmentTransferTracker.Stats segmentStats = response.getRemoteStoreStats()[0].getSegmentStats();
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

RemoteTranslogTransferTracker.Stats translogStats = response.getRemoteStoreStats()[0].getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
changeReplicaCountAndEnsureGreen(1);
for (String node : nodes) {
Expand All @@ -187,13 +216,20 @@ public void testStatsResponseFromLocalNode() {
for (RemoteStoreStats stat : response.getRemoteStoreStats()) {
ShardRouting routing = stat.getShardRouting();
validateShardRouting(routing);
RemoteSegmentTransferTracker.Stats stats = stat.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = stat.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = stat.getTranslogStats();
if (routing.primary()) {
validateSegmentUploadStats(stats);
assertEquals(0, stats.directoryFileTransferTrackerStats.transferredBytesStarted);
validateSegmentUploadStats(segmentStats);
assertEquals(0, segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted);

assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertZeroTranslogDownloadStats(translogStats);
} else {
validateSegmentDownloadStats(stats);
assertEquals(0, stats.totalUploadsStarted);
validateSegmentDownloadStats(segmentStats);
assertEquals(0, segmentStats.totalUploadsStarted);

assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
}
}
}
Expand Down Expand Up @@ -491,15 +527,19 @@ public void testStatsOnRemoteStoreRestore() throws IOException {

RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
Arrays.stream(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
// Assert that we have both upload and download stats for the index
assertTrue(
segmentTracker.totalUploadsStarted > 0 && segmentTracker.totalUploadsSucceeded > 0 && segmentTracker.totalUploadsFailed == 0
segmentStats.totalUploadsStarted > 0 && segmentStats.totalUploadsSucceeded > 0 && segmentStats.totalUploadsFailed == 0
);
assertTrue(
segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted > 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded > 0
);

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
assertNonZeroTranslogDownloadStats(translogStats);
});
}

Expand All @@ -520,19 +560,23 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
.get()
.getRemoteStoreStats();
Arrays.stream(remoteStoreStats).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentTracker = statObject.getSegmentStats();
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
if (statObject.getShardRouting().primary()) {
assertTrue(
segmentTracker.totalUploadsSucceeded == 1
&& segmentTracker.totalUploadsStarted == segmentTracker.totalUploadsSucceeded
&& segmentTracker.totalUploadsFailed == 0
segmentStats.totalUploadsSucceeded == 1
&& segmentStats.totalUploadsStarted == segmentStats.totalUploadsSucceeded
&& segmentStats.totalUploadsFailed == 0
);
} else {
assertTrue(
segmentTracker.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentTracker.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
}

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
});
}, 5, TimeUnit.SECONDS);
}
Expand All @@ -545,9 +589,7 @@ private void indexDocs() {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(10, 30);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc(INDEX_NAME);
}
indexBulk(INDEX_NAME, numberOfOperations);
}
}

Expand Down Expand Up @@ -594,6 +636,43 @@ private void validateSegmentDownloadStats(RemoteSegmentTransferTracker.Stats sta
assertTrue(stats.directoryFileTransferTrackerStats.transferredBytesPerSecMovingAverage > 0);
}

private void assertNonZeroTranslogUploadStatsNoFailures(RemoteTranslogTransferTracker.Stats stats) {
assertTrue(stats.uploadBytesStarted > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsSucceeded > 0);
assertTrue(stats.totalUploadTimeInMillis > 0);
assertTrue(stats.lastSuccessfulUploadTimestamp > 0);
}

private void assertZeroTranslogUploadStats(RemoteTranslogTransferTracker.Stats stats) {
assertEquals(0, stats.uploadBytesStarted);
assertEquals(0, stats.totalUploadsStarted);
assertEquals(0, stats.uploadBytesFailed);
assertEquals(0, stats.totalUploadsFailed);
assertEquals(0, stats.uploadBytesSucceeded);
assertEquals(0, stats.totalUploadsSucceeded);
assertEquals(0, stats.totalUploadTimeInMillis);
assertEquals(0, stats.lastSuccessfulUploadTimestamp);
}

private void assertNonZeroTranslogDownloadStats(RemoteTranslogTransferTracker.Stats stats) {
assertTrue(stats.downloadBytesSucceeded > 0);
assertTrue(stats.totalDownloadsSucceeded > 0);
// TODO: Need to simulate a delay for this assertion to avoid flakiness
// assertTrue(stats.totalDownloadTimeInMillis > 0);
assertTrue(stats.lastSuccessfulDownloadTimestamp > 0);
}

private void assertZeroTranslogDownloadStats(RemoteTranslogTransferTracker.Stats stats) {
assertEquals(0, stats.downloadBytesSucceeded);
assertEquals(0, stats.totalDownloadsSucceeded);
assertEquals(0, stats.totalDownloadTimeInMillis);
assertEquals(0, stats.lastSuccessfulDownloadTimestamp);
}

// Validate if the shardRouting obtained from cluster state contains the exact same routing object
// parameters as obtained from the remote store stats API
private void validateShardRouting(ShardRouting routing) {
Expand Down
Loading

0 comments on commit a70ddca

Please sign in to comment.