Skip to content

Commit

Permalink
Add another test for non segrep.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Aug 1, 2023
1 parent 7d26193 commit bf9423e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ public void updateShardState(

replicationTracker.activatePrimaryMode(getLocalCheckpoint());

if (checkpointPublisher != null) {
if (indexSettings.isSegRepEnabled()) {
// force publish a checkpoint once in primary mode so that replicas not caught up to previous primary
// are brought up to date.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public void testShardIdleWithNoReplicas() throws Exception {
*/
public void testPublishCheckpointOnPrimaryMode() throws IOException, InterruptedException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(p -> newShard(false, mock), false);
IndexShard shard = newStartedShard(p -> newShard(false, mock, settings), false);

final ShardRouting shardRouting = shard.routingEntry();
promoteReplica(
Expand Down Expand Up @@ -520,6 +520,38 @@ public void onFailure(Exception e) {
closeShards(shard);
}

public void testPublishCheckpointOnPrimaryMode_segrep_off() throws IOException, InterruptedException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build();
IndexShard shard = newStartedShard(p -> newShard(false, mock, settings), false);

final ShardRouting shardRouting = shard.routingEntry();
promoteReplica(
shard,
Collections.singleton(shardRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(shardRouting.shardId()).addShard(shardRouting).build()
);

final CountDownLatch latch = new CountDownLatch(1);
shard.acquirePrimaryOperationPermit(new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
releasable.close();
latch.countDown();
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
}, ThreadPool.Names.GENERIC, "");

latch.await();
// verify checkpoint is published
verify(mock, times(0)).publish(any(), any());
closeShards(shard);
}

public void testPublishCheckpointPostFailover() throws IOException {
final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class);
IndexShard shard = newStartedShard(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationListener;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -527,12 +528,18 @@ protected IndexShard newShard(
);
}

protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();
return newShard(primary, checkpointPublisher, settings);
}

/**
* creates a new initializing shard. The shard will be put in its proper path under the
* current node id the shard is assigned to.
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*/
protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException {
protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher, Settings settings)
throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 0);
final ShardRouting shardRouting = TestShardRouting.newShardRouting(
shardId,
Expand All @@ -545,10 +552,10 @@ protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPubli
ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId);

Settings indexSettings = Settings.builder()
.put(settings)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT")
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000))
.put(Settings.EMPTY)
.build();
Expand Down

0 comments on commit bf9423e

Please sign in to comment.