Skip to content

Commit

Permalink
Fix bug where retries within RemoteStoreRefreshListener cause mismatc…
Browse files Browse the repository at this point in the history
…h between ReplicationCheckpoint and uploaded SegmentInfos.

Retries within RemoteStoreRefreshListener run outside of the refresh thread.  This means that concurrent refreshes
may occur during syncSegments execution updating the on-reader SegmentInfos.  A shard's latest ReplicationCheckpoint
is computed and set in a refresh listener, but it is not guaranteed the listener has run before the retry fetches the infos or checkpoint independently.
This fix ensures the listener recomputes the checkpoint while fetching the SegmentInfos. This change also
ensures that we only recompute the checkpoint when necessary because it comes with an IO cost to compute StoreFileMetadata.

Signed-off-by: Marc Handalian <[email protected]>

Update refresh listener to recompute checkpoint from latest infos snapshot.

Signed-off-by: Marc Handalian <[email protected]>

Fix broken test case by comparing segments gen

Signed-off-by: Marc Handalian <[email protected]>

spotless

Signed-off-by: Marc Handalian <[email protected]>

Fix RemoteStoreRefreshListener tests

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Oct 19, 2023
1 parent 69f6f4e commit 76fc4c7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 34 deletions.
65 changes: 38 additions & 27 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1608,8 +1608,11 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
* return the most recently computed ReplicationCheckpoint for a particular shard.
* The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader.
* To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead.
*
* @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return replicationTracker.getLatestReplicationCheckpoint();
Expand All @@ -1628,34 +1631,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
final SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
Expand All @@ -1666,7 +1647,37 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
}
}
return nullSegmentInfosEmptyCheckpoint;
return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
}

/**
* Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos.
* This function fetches a metadata snapshot from the store that comes with an IO cost.
* We will reuse the existing stored checkpoint if it is at the same SI version.
*
* @param segmentInfos {@link SegmentInfos} infos to use to compute.
* @return {@link ReplicationCheckpoint} Checkpoint computed from the infos.
* @throws IOException When there is an error computing segment metadata from the store.
*/
ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException {
if (segmentInfos == null) {
return ReplicationCheckpoint.empty(shardId);
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ private boolean syncSegments() {
// in the remote store.
return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine);
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -199,10 +198,7 @@ private boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
if (counter.incrementAndGet() <= succeedOnAttempt) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getLatestSegmentInfosAndCheckpoint();
})).when(shard).getLatestSegmentInfosAndCheckpoint();
return indexShard.getLatestReplicationCheckpoint();
})).when(shard).computeReplicationCheckpoint(any());

doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
}
}

public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
shards.startAll();
shards.indexDocs(10);
shards.refresh("test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(10);
final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint();
try (GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get()));
}
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard
.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> closeable = latestSegmentInfosAndCheckpoint.v1()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get()));
}
}
}

public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null));
}
}

private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) {
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);
Expand Down

0 comments on commit 76fc4c7

Please sign in to comment.