diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 467b4f5370c0e..5c9944afb2fde 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -715,7 +715,6 @@ public static final IndexShard newIndexShard( nodeId, null, null - ); } diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 4284daf9ffef4..da049e237d79f 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -135,6 +135,11 @@ public SegmentReplicationPressureService( this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this); } + /** Only used for testing **/ + public SegmentReplicationStatsTracker getTracker() { + return tracker; + } + // visible for testing AsyncFailStaleReplicaTask getFailStaleReplicaTask() { return failStaleReplicaTask; diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 05081180bb179..2a703f17aa953 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -267,7 +267,8 @@ public long getRejectionCount() { return rejectionCount.get(); } - void incrementRejectionCount() { + /** public only for testing **/ + public void incrementRejectionCount() { rejectionCount.incrementAndGet(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index a56c0b42817f7..2972a8748bfee 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -568,6 +568,11 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() { return remoteStoreStatsTrackerFactory; } + /** Only used for testing **/ + public SegmentReplicationPressureService getSegmentReplicationPressureService() { + return segmentReplicationPressureService; + } + public String getNodeId() { return translogConfig.getNodeId(); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index b2eb41828a4df..794f884b4b8d8 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -96,6 +96,8 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; +import org.opensearch.index.ReplicationStats; +import org.opensearch.index.SegmentReplicationStatsTracker; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.CommitStats; import org.opensearch.index.engine.DocIdSeqNoAndSource; @@ -1827,6 +1829,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .getRemoteSegmentTransferTracker(shard.shardId); RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory() .getRemoteTranslogTransferTracker(shard.shardId); + populateSampleReplicationStats(shard); populateSampleRemoteSegmentStats(remoteSegmentTransferTracker); populateSampleRemoteTranslogStats(remoteTranslogTransferTracker); ShardStats shardStats = new ShardStats( @@ -1841,9 +1844,28 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats); RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats(); assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats); + ReplicationStats replicationStats = shardStats.getStats().getSegments().getReplicationStats(); + assertReplicationStats(shard, replicationStats); closeShards(shard); } + private static void assertReplicationStats(IndexShard shard, ReplicationStats replicationStats) { + if (shard.isPrimaryMode()) { + assertEquals(5, replicationStats.totalRejections); + } else { + assertEquals(0, replicationStats.totalRejections); + } + } + + private static void populateSampleReplicationStats(IndexShard shard) { + if (shard.isPrimaryMode()) { + for (int i = 0; i < 5; i++) { + SegmentReplicationStatsTracker tracker = shard.getSegmentReplicationPressureService().getTracker(); + tracker.incrementRejectionCount(shard.shardId); + } + } + } + public void testRefreshMetric() throws IOException { IndexShard shard = newStartedShard(); // refresh on: finalize and end of recovery @@ -4910,6 +4932,8 @@ private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker track tracker.addUploadBytesStarted(30L); tracker.addUploadBytesSucceeded(10L); tracker.addUploadBytesFailed(10L); + tracker.incrementRejectionCount(); + tracker.incrementRejectionCount(); } private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) { @@ -4943,5 +4967,7 @@ private static void assertRemoteSegmentStats( assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted()); assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded()); assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed()); + assertTrue(remoteSegmentStats.getTotalRejections() > 0); + assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections()); } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 6ebb3ad13cd48..078e06cadb521 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -37,12 +37,15 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.junit.Assert; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; @@ -85,6 +88,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.MapperTestUtils; +import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; import org.opensearch.index.cache.query.DisabledQueryCache; @@ -157,7 +161,6 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import org.junit.Assert; import java.io.IOException; import java.nio.file.Path; @@ -179,10 +182,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import org.mockito.Mockito; - -import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -191,6 +190,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; /** * A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily, @@ -643,6 +644,13 @@ protected IndexShard newShard( ); Store remoteStore; RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null; + SegmentReplicationPressureService segmentReplicationPressureService = new SegmentReplicationPressureService( + Settings.EMPTY, + clusterService, + mock(IndicesService.class), + mock(ShardStateAction.class), + mock(ThreadPool.class) + ); RepositoriesService mockRepoSvc = mock(RepositoriesService.class); if (indexSettings.isRemoteStoreEnabled()) { @@ -703,7 +711,7 @@ protected IndexShard newShard( () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, "dummy-node", null, - null + segmentReplicationPressureService ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {