diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 0aa3a06a108f1..1fb5c2052aded 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; @@ -26,6 +27,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -120,6 +122,16 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Excep testPeerRecovery(randomIntBetween(2, 5), true); } + public void testPeerRecoveryWithLowActivityTimeout() throws Exception { + ClusterUpdateSettingsRequest req = new ClusterUpdateSettingsRequest().persistentSettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), "20kb") + .put(RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.getKey(), "1s") + ); + internalCluster().client().admin().cluster().updateSettings(req).get(); + testPeerRecovery(randomIntBetween(2, 5), true); + } + public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { testPeerRecovery(1, false); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bbb7d3cf5f30f..d476e8b7c9288 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4726,11 +4726,21 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { } /** - * Downloads segments from remote segment store. - * @param overrideLocal flag to override local segment files with those in remote store - * @throws IOException if exception occurs while reading segments from remote store + * Downloads segments from remote segment store + * @param overrideLocal flag to override local segment files with those in remote store. + * @throws IOException if exception occurs while reading segments from remote store. */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { + syncSegmentsFromRemoteSegmentStore(overrideLocal, () -> {}); + } + + /** + * Downloads segments from remote segment store along with updating the access time of the recovery target. + * @param overrideLocal flag to override local segment files with those in remote store. + * @param onFileSync runnable that updates the access time when run. + * @throws IOException if exception occurs while reading segments from remote store. + */ + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4761,7 +4771,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE } else { storeDirectory = store.directory(); } - copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal); + copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync); if (remoteSegmentMetadata != null) { final SegmentInfos infosSnapshot = store.buildSegmentInfos( @@ -4821,7 +4831,8 @@ public void syncSegmentsFromGivenRemoteSegmentStore( sourceRemoteDirectory, remoteDirectory, uploadedSegments, - overrideLocal + overrideLocal, + () -> {} ); if (segmentsNFile != null) { try ( @@ -4854,7 +4865,8 @@ private String copySegmentFiles( RemoteSegmentStoreDirectory sourceRemoteDirectory, RemoteSegmentStoreDirectory targetRemoteDirectory, Map uploadedSegments, - boolean overrideLocal + boolean overrideLocal, + final Runnable onFileSync ) throws IOException { Set toDownloadSegments = new HashSet<>(); Set skippedSegments = new HashSet<>(); @@ -4883,9 +4895,7 @@ private String copySegmentFiles( if (toDownloadSegments.isEmpty() == false) { try { - final PlainActionFuture completionListener = PlainActionFuture.newFuture(); - downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, completionListener); - completionListener.actionGet(); + downloadSegments(storeDirectory, sourceRemoteDirectory, targetRemoteDirectory, toDownloadSegments, onFileSync); } catch (Exception e) { throw new IOException("Error occurred when downloading segments from remote store", e); } @@ -4903,22 +4913,25 @@ private void downloadSegments( RemoteSegmentStoreDirectory sourceRemoteDirectory, RemoteSegmentStoreDirectory targetRemoteDirectory, Set toDownloadSegments, - ActionListener completionListener + final Runnable onFileSync ) { - final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); + final PlainActionFuture completionListener = PlainActionFuture.newFuture(); final GroupedActionListener batchDownloadListener = new GroupedActionListener<>( ActionListener.map(completionListener, v -> null), toDownloadSegments.size() ); final ActionListener segmentsDownloadListener = ActionListener.map(batchDownloadListener, fileName -> { + onFileSync.run(); if (targetRemoteDirectory != null) { targetRemoteDirectory.copyFrom(storeDirectory, fileName, fileName, IOContext.DEFAULT); } return null; }); - toDownloadSegments.forEach(file -> sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener)); + final Path indexPath = store.shardPath() == null ? null : store.shardPath().resolveIndex(); + toDownloadSegments.forEach(file -> { sourceRemoteDirectory.copyTo(file, storeDirectory, indexPath, segmentsDownloadListener); }); + completionListener.actionGet(); } private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index f429d94f7f96c..c0211e1257c8e 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -536,7 +536,6 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco try { // Download segments from remote segment store indexShard.syncSegmentsFromRemoteSegmentStore(true); - indexShard.syncTranslogFilesFromRemoteTranslog(); // On index creation, the only segment file that is created is segments_N. We can safely discard this file diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 1ea32f4e355e3..65c3e976d17fe 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, recoveryTarget::setLastAccessTime); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();