Skip to content

Commit

Permalink
Update UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Oct 11, 2023
1 parent 9e1a497 commit 7a6470e
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ public static final IndexShard newIndexShard(
nodeId,
null,
null

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public SegmentReplicationPressureService(
this.failStaleReplicaTask = new AsyncFailStaleReplicaTask(this);
}

/** Only used for testing **/
public SegmentReplicationStatsTracker getTracker() {
return tracker;
}

// visible for testing
AsyncFailStaleReplicaTask getFailStaleReplicaTask() {
return failStaleReplicaTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public long getRejectionCount() {
return rejectionCount.get();
}

void incrementRejectionCount() {
/** public only for testing **/
public void incrementRejectionCount() {
rejectionCount.incrementAndGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,11 @@ protected RemoteStoreStatsTrackerFactory getRemoteStoreStatsTrackerFactory() {
return remoteStoreStatsTrackerFactory;
}

/** Only used for testing **/
public SegmentReplicationPressureService getSegmentReplicationPressureService() {
return segmentReplicationPressureService;
}

public String getNodeId() {
return translogConfig.getNodeId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
Expand Down Expand Up @@ -1827,6 +1829,7 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
.getRemoteSegmentTransferTracker(shard.shardId);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory()
.getRemoteTranslogTransferTracker(shard.shardId);
populateSampleReplicationStats(shard);
populateSampleRemoteSegmentStats(remoteSegmentTransferTracker);
populateSampleRemoteTranslogStats(remoteTranslogTransferTracker);
ShardStats shardStats = new ShardStats(
Expand All @@ -1841,9 +1844,28 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException {
assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats);
RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats();
assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats);
ReplicationStats replicationStats = shardStats.getStats().getSegments().getReplicationStats();
assertReplicationStats(shard, replicationStats);
closeShards(shard);
}

private static void assertReplicationStats(IndexShard shard, ReplicationStats replicationStats) {
if (shard.isPrimaryMode()) {
assertEquals(5, replicationStats.totalRejections);
} else {
assertEquals(0, replicationStats.totalRejections);
}
}

private static void populateSampleReplicationStats(IndexShard shard) {
if (shard.isPrimaryMode()) {
SegmentReplicationStatsTracker tracker = shard.getSegmentReplicationPressureService().getTracker();
for (int i = 0; i < 5; i++) {
tracker.incrementRejectionCount(shard.shardId);
}
}
}

public void testRefreshMetric() throws IOException {
IndexShard shard = newStartedShard();
// refresh on: finalize and end of recovery
Expand Down Expand Up @@ -4910,6 +4932,8 @@ private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker track
tracker.addUploadBytesStarted(30L);
tracker.addUploadBytesSucceeded(10L);
tracker.addUploadBytesFailed(10L);
tracker.incrementRejectionCount();
tracker.incrementRejectionCount();
}

private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) {
Expand Down Expand Up @@ -4943,5 +4967,7 @@ private static void assertRemoteSegmentStats(
assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted());
assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded());
assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed());
assertTrue(remoteSegmentStats.getTotalRejections() > 0);
assertEquals(remoteSegmentTransferTracker.getRejectionCount(), remoteSegmentStats.getTotalRejections());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.junit.Assert;
import org.mockito.Mockito;
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
Expand Down Expand Up @@ -85,6 +88,7 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MapperTestUtils;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.IndexCache;
import org.opensearch.index.cache.query.DisabledQueryCache;
Expand Down Expand Up @@ -157,7 +161,6 @@
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.junit.Assert;

import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -179,10 +182,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.mockito.Mockito;

import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand All @@ -191,6 +190,8 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;

/**
* A base class for unit tests that need to create and shutdown {@link IndexShard} instances easily,
Expand Down Expand Up @@ -643,6 +644,13 @@ protected IndexShard newShard(
);
Store remoteStore;
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = null;
SegmentReplicationPressureService segmentReplicationPressureService = new SegmentReplicationPressureService(
Settings.EMPTY,
clusterService,
mock(IndicesService.class),
mock(ShardStateAction.class),
mock(ThreadPool.class)
);
RepositoriesService mockRepoSvc = mock(RepositoriesService.class);

if (indexSettings.isRemoteStoreEnabled()) {
Expand Down Expand Up @@ -703,7 +711,7 @@ protected IndexShard newShard(
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
"dummy-node",
null,
null
segmentReplicationPressureService
);
indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER);
if (remoteStoreStatsTrackerFactory != null) {
Expand Down

0 comments on commit 7a6470e

Please sign in to comment.