Skip to content

Commit

Permalink
[Remote Store] Fix relocation failure due to transport receive timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 19, 2023
1 parent 69f6f4e commit 62aeb2d
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}

public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
IndexShard indexShard = getIndexShard(primaryShardNode);
assertFalse(indexShard.isSearchIdleSupported());

String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertFalse(indexShard.isSearchIdleSupported());

indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}
}
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,9 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");
}
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4412,7 +4412,6 @@ public final boolean isSearchIdle() {
}

/**
*
* Returns true if this shard supports search idle.
* <p>
* Indices using Segment Replication will ignore search idle unless there are no replicas.
Expand All @@ -4421,6 +4420,11 @@ public final boolean isSearchIdle() {
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled()) {
return false;
}
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,11 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
long minRefSeqNo = translog instanceof RemoteFsTranslog
? ((RemoteFsTranslog) translog).getMinSeqNoToKeep()
: localCheckpointOfLastCommit + 1;
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minRefSeqNo).translogFileGeneration;
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
return false;
}
/*
Expand All @@ -454,7 +455,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,8 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}
}

long getMinSeqNoToKeep() {
return minSeqNoToKeep;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ private Tuple<RecoverySourceHandler, RemoteRecoveryTargetHandler> createRecovery
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),
shard.isRemoteTranslogEnabled()
);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final RetryableTransportClient retryableTransportClient;
private final RemoteSegmentFileChunkWriter fileChunkWriter;
private final boolean remoteStoreEnabled;

public RemoteRecoveryTargetHandler(
long recoveryId,
ShardId shardId,
TransportService transportService,
DiscoveryNode targetNode,
RecoverySettings recoverySettings,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
boolean remoteStoreEnabled
) {
this.transportService = transportService;
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
Expand Down Expand Up @@ -111,6 +113,7 @@ public RemoteRecoveryTargetHandler(
requestSeqNoGenerator,
onSourceThrottle
);
this.remoteStoreEnabled = remoteStoreEnabled;
}

public DiscoveryNode targetNode() {
Expand All @@ -129,7 +132,13 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener<Vo
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
if (remoteStoreEnabled) {
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
// target host along with incremental segments download. This
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);
} else {
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
}

@Override
Expand Down

0 comments on commit 62aeb2d

Please sign in to comment.