From af70d532839e610e10e06c9e2ceb54e384c58883 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Wed, 12 Jul 2023 02:59:34 +0530 Subject: [PATCH] Start replication checkpointTimers on primary before segments upload to remote store. (#8221) * Start replication timer before segments upload. Signed-off-by: Ankit Kala * Addressed comments Signed-off-by: Ankit Kala --------- Signed-off-by: Ankit Kala Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + .../java/org/opensearch/index/shard/IndexShard.java | 7 ++++--- .../index/shard/RemoteStoreRefreshListener.java | 3 ++- .../org/opensearch/index/shard/IndexShardTests.java | 2 ++ .../index/shard/RemoteStoreRefreshListenerTests.java | 11 ++++++++--- .../RemoteStoreReplicationSourceTests.java | 7 ++++++- 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index da9460f820f96..686ce86463fbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020)) - Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) - Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029) +- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221) - Add distributed tracing framework ([#7543](https://github.com/opensearch-project/OpenSearch/issues/7543)) - Enable Point based optimization for custom comparators ([#8168](https://github.com/opensearch-project/OpenSearch/pull/8168)) - [Extensions] Support extension additional settings with extension REST initialization ([#8414](https://github.com/opensearch-project/OpenSearch/pull/8414)) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 154e1a4f22242..bc90c42983617 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3661,6 +3661,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro final List internalRefreshListener = new ArrayList<>(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { + internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); + } + if (isRemoteStoreEnabled()) { internalRefreshListener.add( new RemoteStoreRefreshListener( @@ -3672,9 +3676,6 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro ); } - if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { - internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); - } /** * With segment replication enabled for primary relocation, recover replica shard initially as read only and * change to a writeable engine during relocation handoff after a round of segment replication. diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e087bbb265727..2519a4ee48051 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -191,6 +191,8 @@ private synchronized void syncSegments(boolean isRetry) { if (indexShard.getReplicationTracker().isPrimaryMode() == false) { return; } + ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); + indexShard.onCheckpointPublished(checkpoint); beforeSegmentsSync(isRetry); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); @@ -214,7 +216,6 @@ private synchronized void syncSegments(boolean isRetry) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. - ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); Collection localSegmentsPostRefresh = segmentInfos.files(true); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 8832f6a558b7b..60644ebaf94a3 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2692,6 +2692,7 @@ public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedExc public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throws IOException { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) @@ -4815,6 +4816,7 @@ public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOExce public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException { Settings primarySettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index ed1438cbc3b08..782450286cb90 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -58,12 +58,17 @@ public class RemoteStoreRefreshListenerTests extends IndexShardTestCase { public void setup(boolean primary, int numberOfDocs) throws IOException { indexShard = newStartedShard( primary, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(), new InternalEngineFactory() ); - indexDocs(1, numberOfDocs); - indexShard.refresh("test"); + if (primary) { + indexDocs(1, numberOfDocs); + indexShard.refresh("test"); + } clusterService = new ClusterService( Settings.EMPTY, diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java index 04f821a5fc48c..2209514e06438 100644 --- a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -21,6 +21,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; import java.util.Collections; @@ -44,9 +45,13 @@ public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelRepli @Override public void setUp() throws Exception { super.setUp(); + indexShard = newStartedShard( true, - Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + Settings.builder() + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(), new InternalEngineFactory() );