From aa50acda71c94af90f41073ace636e8414d9bb1e Mon Sep 17 00:00:00 2001 From: Ashish Date: Mon, 23 Oct 2023 14:10:14 +0530 Subject: [PATCH] [Remote Store] Sync segments in refresh listener on refresh after commit (#10830) * [Remote Store] Sync segments in refresh listener on refresh after commit Signed-off-by: Ashish Singh * Add Integration Tests Signed-off-by: Ashish Singh * Add comments and java doc Signed-off-by: Ashish Singh --------- Signed-off-by: Ashish Singh Signed-off-by: Shivansh Arora --- .../remotestore/RemoteStoreStatsIT.java | 76 ++++++++++++++++- .../remote/RemoteTranslogTransferTracker.java | 57 +++++++++++++ .../opensearch/index/shard/IndexShard.java | 8 ++ .../shard/RemoteStoreRefreshListener.java | 84 +++++++++++++++---- .../index/translog/RemoteFsTranslog.java | 8 +- 5 files changed, 215 insertions(+), 18 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 8ae25c6758195..5e91176ed0473 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -15,6 +15,8 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.FollowersChecker; +import org.opensearch.cluster.coordination.LeaderChecker; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; @@ -23,15 +25,20 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.remote.RemoteSegmentTransferTracker; import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; +import org.opensearch.test.disruption.NetworkDisruption; +import org.opensearch.test.transport.MockTransportService; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; - @Before + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockTransportService.TestPlugin.class); + } + public void setup() { internalCluster().startNodes(3); } public void testStatsResponseFromAllNodes() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() { } public void testStatsResponseAllShards() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() { } public void testStatsResponseFromLocalNode() { + setup(); // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() { } public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Disable Refresh Interval for the index @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce } public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception { + setup(); // Scenario: // - Create index with single primary and N-1 replica shards (N = no of data nodes) // - Disable Refresh Interval for the index @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr } public void testStatsOnShardRelocation() { + setup(); // Scenario: // - Create index with single primary and single replica shard // - Index documents @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() { } public void testStatsOnShardUnassigned() throws IOException { + setup(); // Scenario: // - Create index with single primary and two replica shard // - Index documents @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException { } public void testStatsOnRemoteStoreRestore() throws IOException { + setup(); // Creating an index with primary shard count == total nodes in cluster and 0 replicas int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes(); createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount)); @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException { } public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception { + setup(); // Create an index with one primary and one replica shard createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); ensureGreen(INDEX_NAME); @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce }, 5, TimeUnit.SECONDS); } + public void testStatsCorrectnessOnFailover() { + Settings clusterSettings = Settings.builder() + .put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms") + .put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms") + .put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) + .put(nodeSettings(0)) + .build(); + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings); + internalCluster().startDataOnlyNodes(2, clusterSettings); + + // Create an index with one primary and one replica shard + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1)); + ensureGreen(INDEX_NAME); + + // Index some docs and refresh + indexDocs(); + refresh(INDEX_NAME); + + String primaryNode = primaryNodeName(INDEX_NAME); + String replicaNode = replicaNodeName(INDEX_NAME); + + // Start network disruption - primary node will be isolated + Set nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new)); + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.DISCONNECT + ); + internalCluster().setDisruptionScheme(networkDisruption); + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + ensureStableCluster(2, clusterManagerNode); + + RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get(); + final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0"); + List matches = Arrays.stream(response.getRemoteStoreStats()) + .filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString())) + .collect(Collectors.toList()); + assertEquals(1, matches.size()); + RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats(); + assertEquals(0, segmentStats.refreshTimeLagMs); + + networkDisruption.stopDisrupting(); + internalCluster().clearDisruptionScheme(); + ensureStableCluster(3, clusterManagerNode); + ensureGreen(INDEX_NAME); + logger.info("Test completed"); + } + private void indexDocs() { for (int i = 0; i < randomIntBetween(5, 10); i++) { if (randomBoolean()) { diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java index 1a9896540212e..4214a87049350 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteTranslogTransferTracker.java @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() { ); } + @Override + public String toString() { + return "RemoteTranslogTransferStats{" + + "lastSuccessfulUploadTimestamp=" + + lastSuccessfulUploadTimestamp.get() + + "," + + "totalUploadsStarted=" + + totalUploadsStarted.get() + + "," + + "totalUploadsSucceeded=" + + totalUploadsSucceeded.get() + + "," + + "totalUploadsFailed=" + + totalUploadsFailed.get() + + "," + + "uploadBytesStarted=" + + uploadBytesStarted.get() + + "," + + "uploadBytesFailed=" + + uploadBytesFailed.get() + + "," + + "totalUploadTimeInMillis=" + + totalUploadTimeInMillis.get() + + "," + + "uploadBytesMovingAverage=" + + uploadBytesMovingAverageReference.get().getAverage() + + "," + + "uploadBytesPerSecMovingAverage=" + + uploadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "uploadTimeMovingAverage=" + + uploadTimeMsMovingAverageReference.get().getAverage() + + "," + + "lastSuccessfulDownloadTimestamp=" + + lastSuccessfulDownloadTimestamp.get() + + "," + + "totalDownloadsSucceeded=" + + totalDownloadsSucceeded.get() + + "," + + "downloadBytesSucceeded=" + + downloadBytesSucceeded.get() + + "," + + "totalDownloadTimeInMillis=" + + totalDownloadTimeInMillis.get() + + "," + + "downloadBytesMovingAverage=" + + downloadBytesMovingAverageReference.get().getAverage() + + "," + + "downloadBytesPerSecMovingAverage=" + + downloadBytesPerSecMovingAverageReference.get().getAverage() + + "," + + "downloadTimeMovingAverage=" + + downloadTimeMsMovingAverageReference.get().getAverage() + + "," + + "}"; + } + /** * Represents the tracker's state as seen in the stats API. * diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f990a3b56e856..fb4e9056153aa 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE * @throws IOException if exception occurs while reading segments from remote store. */ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException { + boolean syncSegmentSuccess = false; + long startTimeMs = System.currentTimeMillis(); assert indexSettings.isRemoteStoreEnabled(); logger.trace("Downloading segments from remote segment store"); RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn : "There should not be any segments file in the dir"; store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } + syncSegmentSuccess = true; } catch (IOException e) { throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); } finally { + logger.trace( + "syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}", + syncSegmentSuccess, + (System.currentTimeMillis() - startTimeMs) + ); store.decRef(); remoteStore.decRef(); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index c650edc31da8d..3e97b07abfb5d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -123,14 +123,13 @@ public void beforeRefresh() throws IOException {} @Override protected void runAfterRefreshExactlyOnce(boolean didRefresh) { - if (shouldSync(didRefresh)) { + // We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean + // from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete + // ready state. + if (shouldSync(didRefresh) && isReadyForUpload()) { segmentTracker.updateLocalRefreshTimeAndSeqNo(); try { - if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { - logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); - this.primaryTerm = indexShard.getOperationPrimaryTerm(); - this.remoteDirectory.init(); - } + initializeRemoteDirectoryOnTermUpdate(); try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { Collection localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true); updateLocalSizeMapAndTracker(localSegmentsPostRefresh); @@ -160,20 +159,20 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) { } private boolean shouldSync(boolean didRefresh) { - // The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it - // is important to upload the zero state segments so that the restore does not break. return this.primaryTerm != indexShard.getOperationPrimaryTerm() + // If the readers change, didRefresh is always true. || didRefresh - || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty(); + // The third condition exists for uploading the zero state segments where the refresh has not changed the reader + // reference, but it is important to upload the zero state segments so that the restore does not break. + || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty() + // When the shouldSync is called the first time, then 1st condition on primary term is true. But after that + // we update the primary term and the same condition would not evaluate to true again in syncSegments. + // Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call. + || isRefreshAfterCommitSafe(); } private boolean syncSegments() { - if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { - logger.debug( - "Skipped syncing segments with primaryMode={} indexShardState={}", - indexShard.getReplicationTracker().isPrimaryMode(), - indexShard.state() - ); + if (isReadyForUpload() == false) { // Following check is required to enable retry and make sure that we do not lose this refresh event // When primary shard is restored from remote store, the recovery happens first followed by changing // primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through @@ -323,6 +322,19 @@ private boolean isRefreshAfterCommit() throws IOException { && !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName))); } + /** + * Returns if the current refresh has happened after a commit. + * @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false. + */ + private boolean isRefreshAfterCommitSafe() { + try { + return isRefreshAfterCommit(); + } catch (Exception e) { + logger.info("Exception occurred in isRefreshAfterCommitSafe", e); + } + return false; + } + void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint) throws IOException { final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); @@ -439,6 +451,48 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB } } + /** + * On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that + * has been uploaded to remote store successfully. This method also updates the segment tracker about the latest + * uploaded segment files onto remote store. + */ + private void initializeRemoteDirectoryOnTermUpdate() throws IOException { + if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { + logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm()); + this.primaryTerm = indexShard.getOperationPrimaryTerm(); + RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init(); + + // During failover, the uploaded metadata would have names of files that have been uploaded to remote store. + // Here we update the tracker with latest remote uploaded files. + if (uploadedMetadata != null) { + segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet()); + } + } + } + + /** + * This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the + * returned value of this method for scheduling retries in syncSegments method. + * @return true iff primaryMode is true and index shard is not in closed state. + */ + private boolean isReadyForUpload() { + boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED; + if (isReady == false) { + StringBuilder sb = new StringBuilder("Skipped syncing segments with"); + if (indexShard.getReplicationTracker() != null) { + sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode()); + } + if (indexShard.state() != null) { + sb.append(" indexShardState=").append(indexShard.state()); + } + if (indexShard.getEngineOrNull() != null) { + sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); + } + logger.trace(sb.toString()); + } + return isReady; + } + /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2dd9b1a545d4a..db85a37b556fc 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -161,6 +161,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t remoteTranslogTransferTracker ); RemoteFsTranslog.download(translogTransferManager, location, logger); + logger.trace(remoteTranslogTransferTracker.toString()); } static void download(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException { @@ -173,15 +174,20 @@ static void download(TranslogTransferManager translogTransferManager, Path locat */ IOException ex = null; for (int i = 0; i <= DOWNLOAD_RETRIES; i++) { + boolean success = false; + long startTimeMs = System.currentTimeMillis(); try { downloadOnce(translogTransferManager, location, logger); + success = true; return; } catch (FileNotFoundException | NoSuchFileException e) { // continue till download retries ex = e; + } finally { + logger.trace("downloadOnce success={} timeElapsed={}", success, (System.currentTimeMillis() - startTimeMs)); } } - logger.debug("Exhausted all download retries during translog/checkpoint file download"); + logger.info("Exhausted all download retries during translog/checkpoint file download"); throw ex; }