diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d027bf3853384..3ca8ce0d97c61 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1613,8 +1613,11 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Compute and return the latest ReplicationCheckpoint for a particular shard. - * @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices + * return the most recently computed ReplicationCheckpoint for a particular shard. + * The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader. + * To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead. + * + * @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint. */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { return replicationTracker.getLatestReplicationCheckpoint(); @@ -1633,34 +1636,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() { assert indexSettings.isSegRepEnabled(); - Tuple, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>( - new GatedCloseable<>(null, () -> {}), - getLatestReplicationCheckpoint() - ); - - if (getEngineOrNull() == null) { - return nullSegmentInfosEmptyCheckpoint; - } // do not close the snapshot - caller will close it. GatedCloseable snapshot = null; try { snapshot = getSegmentInfosSnapshot(); - if (snapshot.get() != null) { - SegmentInfos segmentInfos = snapshot.get(); - final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); - return new Tuple<>( - snapshot, - new ReplicationCheckpoint( - this.shardId, - getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - segmentInfos.getVersion(), - metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), - getEngine().config().getCodec().getName(), - metadataMap - ) - ); - } + final SegmentInfos segmentInfos = snapshot.get(); + return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos)); } catch (IOException | AlreadyClosedException e) { logger.error("Error Fetching SegmentInfos and latest checkpoint", e); if (snapshot != null) { @@ -1671,7 +1652,39 @@ public Tuple, ReplicationCheckpoint> getLatestSegme } } } - return nullSegmentInfosEmptyCheckpoint; + return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint()); + } + + /** + * Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos. + * This function fetches a metadata snapshot from the store that comes with an IO cost. + * We will reuse the existing stored checkpoint if it is at the same SI version. + * + * @param segmentInfos {@link SegmentInfos} infos to use to compute. + * @return {@link ReplicationCheckpoint} Checkpoint computed from the infos. + * @throws IOException When there is an error computing segment metadata from the store. + */ + ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException { + if (segmentInfos == null) { + return ReplicationCheckpoint.empty(shardId); + } + final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint(); + if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion() + && latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) { + return latestReplicationCheckpoint; + } + final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + segmentInfos.getVersion(), + metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), + getEngine().config().getCodec().getName(), + metadataMap + ); + logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint); + return checkpoint; } /** diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 577a55affd032..df999f77bee0e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -181,7 +181,6 @@ private boolean syncSegments() { // in the remote store. return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine); } - ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); beforeSegmentsSync(); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); @@ -199,10 +198,7 @@ private boolean syncSegments() { try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); - assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: " - + segmentInfos.getGeneration() - + " does not match metadata generation: " - + checkpoint.getSegmentsGen(); + final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 5a13f57db2c87..51814283c5eb3 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -520,8 +520,8 @@ private Tuple mockIn if (counter.incrementAndGet() <= succeedOnAttempt) { throw new RuntimeException("Inducing failure in upload"); } - return indexShard.getLatestSegmentInfosAndCheckpoint(); - })).when(shard).getLatestSegmentInfosAndCheckpoint(); + return indexShard.getLatestReplicationCheckpoint(); + })).when(shard).computeReplicationCheckpoint(any()); doAnswer(invocation -> { if (Objects.nonNull(successLatch)) { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 52f28aead533d..eab38bfe5c64d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception { } } + public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + shards.startAll(); + shards.indexDocs(10); + shards.refresh("test"); + replicateSegments(primaryShard, shards.getReplicas()); + shards.assertAllEqual(10); + final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint(); + try (GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { + assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get())); + } + final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard + .getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable closeable = latestSegmentInfosAndCheckpoint.v1()) { + assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get())); + } + } + } + + public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null)); + } + } + private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) { final TransportService transportService = mock(TransportService.class); when(transportService.getThreadPool()).thenReturn(threadPool);