From 4387cb8168a9919e33e15d802261076b6c3d2d58 Mon Sep 17 00:00:00 2001 From: Ashish Date: Tue, 28 Nov 2023 11:21:22 +0530 Subject: [PATCH] [Remote Store] Handoff refreshes, translog uploads during relocation from old to new primary (#11330) --------- Signed-off-by: Ashish Singh --- .../remotestore/BaseRemoteStoreRestoreIT.java | 15 -- .../RemoteStoreBaseIntegTestCase.java | 31 +++ .../opensearch/remotestore/RemoteStoreIT.java | 221 ++++++++++++++++-- .../opensearch/index/engine/EngineConfig.java | 16 +- .../index/engine/EngineConfigFactory.java | 4 +- .../index/engine/InternalEngine.java | 2 +- .../index/engine/NRTReplicationEngine.java | 2 +- .../opensearch/index/engine/NoOpEngine.java | 2 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../shard/CheckpointRefreshListener.java | 2 +- .../opensearch/index/shard/IndexShard.java | 37 ++- ...> ReleasableRetryableRefreshListener.java} | 50 +++- .../shard/RemoteStoreRefreshListener.java | 13 +- .../translog/InternalTranslogFactory.java | 2 +- .../translog/InternalTranslogManager.java | 15 +- .../index/translog/LocalTranslog.java | 6 + .../index/translog/NoOpTranslogManager.java | 7 + ...emoteBlobStoreInternalTranslogFactory.java | 4 +- .../index/translog/RemoteFsTranslog.java | 79 +++++-- .../opensearch/index/translog/Translog.java | 5 + .../index/translog/TranslogFactory.java | 2 +- .../index/translog/TranslogManager.java | 6 + .../translog/WriteOnlyTranslogManager.java | 4 +- ...easableRetryableRefreshListenerTests.java} | 183 ++++++++++++--- .../RemoteStoreRefreshListenerTests.java | 14 +- .../index/translog/RemoteFsTranslogTests.java | 127 +++++++++- .../index/translog/TestTranslog.java | 12 + 27 files changed, 713 insertions(+), 150 deletions(-) rename server/src/main/java/org/opensearch/index/shard/{CloseableRetryableRefreshListener.java => ReleasableRetryableRefreshListener.java} (80%) rename server/src/test/java/org/opensearch/index/shard/{CloseableRetryableRefreshListenerTests.java => ReleasableRetryableRefreshListenerTests.java} (66%) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index 99c5d7fb2bae7..d29dacb001434 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -8,9 +8,7 @@ package org.opensearch.remotestore; -import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; import org.opensearch.test.transport.MockTransportService; @@ -20,7 +18,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; public class BaseRemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase { @@ -49,18 +46,6 @@ protected void restore(String... indices) { restore(randomBoolean(), indices); } - protected void restore(boolean restoreAllShards, String... indices) { - if (restoreAllShards) { - assertAcked(client().admin().indices().prepareClose(indices)); - } - client().admin() - .cluster() - .restoreRemoteStore( - new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), - PlainActionFuture.newFuture() - ); - } - protected void verifyRestoredData(Map indexStats, String indexName, boolean indexMoreData) throws Exception { ensureYellowAndNoInitializingShards(indexName); ensureGreen(indexName); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 8b4981a15433a..8c15ebd0505d9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -8,12 +8,16 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.WriteRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; @@ -23,9 +27,13 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -43,6 +51,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -51,6 +60,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final String REPOSITORY_NAME = "test-remote-store-repo"; @@ -380,4 +390,25 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { return filesExisting.get(); } + + protected IndexShard getIndexShard(String dataNode, String indexName) throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + return indexService.getShard(0); + } + + protected void restore(boolean restoreAllShards, String... indices) { + if (restoreAllShards) { + assertAcked(client().admin().indices().prepareClose(indices)); + } + client().admin() + .cluster() + .restoreRemoteStore( + new RestoreRemoteStoreRequest().indices(indices).restoreAllShards(restoreAllShards), + PlainActionFuture.newFuture() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 28294686d4370..e1997fea3433a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,30 +8,37 @@ package org.opensearch.remotestore; +import org.opensearch.OpenSearchException; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; -import org.opensearch.core.index.Index; -import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.hamcrest.MatcherAssert; import java.io.IOException; @@ -42,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -180,7 +188,7 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception { .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); int lastNMetadataFilesToKeep = indexShard.getRecoverySettings().getMinRemoteSegmentMetadataFiles(); // Delete is async. assertBusy(() -> { @@ -265,7 +273,7 @@ public void testDefaultBufferInterval() throws ExecutionException, InterruptedEx ensureGreen(INDEX_NAME); assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); @@ -298,7 +306,7 @@ public void testOverriddenBufferInterval() throws ExecutionException, Interrupte ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(bufferInterval, indexShard); @@ -414,7 +422,7 @@ private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durab .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) .build(); createIndex(INDEX_NAME, indexSettings); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); durability = randomFrom(Durability.values()); @@ -447,7 +455,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E // Case 2 - Test update index fails createIndex(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability()); exception = assertThrows( IllegalArgumentException.class, @@ -459,15 +467,6 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E assertEquals(expectedExceptionMsg, exception.getMessage()); } - private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException { - String clusterManagerName = internalCluster().getClusterManagerName(); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); - GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); - String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID); - IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid)); - return indexService.getShard(0); - } - private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval()); @@ -559,7 +558,7 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(primaryShardNode); + IndexShard indexShard = getIndexShard(primaryShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0); @@ -572,7 +571,7 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte ensureGreen(INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); - indexShard = getIndexShard(replicaShardNode); + indexShard = getIndexShard(replicaShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); } @@ -621,4 +620,186 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception { assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); }); } + + public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { + // In this test, we trigger a force flush on existing primary while the primary mode on new primary has been + // activated. There was a bug in primary relocation of remote store enabled indexes where the new primary + // starts uploading translog and segments even before the cluster manager has started this shard. With this test, + // we check that we do not overwrite any file on remote store. Here we will also increase the replica count to + // check that there are no duplicate metadata files for translog or upload. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexBulk(INDEX_NAME, randomIntBetween(5, 10)); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch flushLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + flushLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + CountDownLatch flushDone = new CountDownLatch(1); + Thread flushThread = new Thread(() -> { + try { + flushLatch.await(2, TimeUnit.SECONDS); + oldPrimaryIndexShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + // newPrimaryTranslogRepo.setSleepSeconds(0); + } catch (IndexShardClosedException e) { + // this is fine + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + flushDone.countDown(); + } + }); + flushThread.start(); + flushDone.await(5, TimeUnit.SECONDS); + flushThread.join(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ) + .get(); + + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + } + + public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionException, InterruptedException, IOException { + // In this test, we fail the hand off during the primary relocation. This will undo the drainRefreshes and + // drainSync performed as part of relocation handoff (before performing the handoff transport action). + // We validate the same here by failing the peer recovery and ensuring we can index afterward as well. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + int docs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, docs); + flushAndRefresh(INDEX_NAME); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch handOffLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + handOffLatch.countDown(); + throw new OpenSearchException("failing recovery for test purposes"); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + handOffLatch.await(30, TimeUnit.SECONDS); + + assertTrue(oldPrimaryIndexShard.isStartedPrimary()); + assertEquals(oldPrimary, primaryNodeName(INDEX_NAME)); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs); + + SearchPhaseExecutionException ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + + int moreDocs = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, moreDocs); + flushAndRefresh(INDEX_NAME); + int uncommittedOps = randomIntBetween(5, 10); + indexBulk(INDEX_NAME, uncommittedOps); + assertHitCount(client(oldPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), docs + moreDocs); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME))); + + restore(true, INDEX_NAME); + ensureGreen(INDEX_NAME); + assertHitCount( + client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + + String newNode = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, newPrimary, newNode)) + .execute() + .actionGet(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(10)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + ex = assertThrows( + SearchPhaseExecutionException.class, + () -> client(newPrimary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get() + ); + assertEquals("all shards failed", ex.getMessage()); + assertHitCount( + client(newNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), + docs + moreDocs + uncommittedOps + ); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index af65e993fcf26..bf3e10d684c94 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -108,7 +108,7 @@ public final class EngineConfig { private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; private final boolean isReadOnlyReplica; - private final BooleanSupplier primaryModeSupplier; + private final BooleanSupplier startedPrimarySupplier; private final Comparator leafSorter; /** @@ -287,7 +287,7 @@ private EngineConfig(Builder builder) { this.primaryTermSupplier = builder.primaryTermSupplier; this.tombstoneDocSupplier = builder.tombstoneDocSupplier; this.isReadOnlyReplica = builder.isReadOnlyReplica; - this.primaryModeSupplier = builder.primaryModeSupplier; + this.startedPrimarySupplier = builder.startedPrimarySupplier; this.translogFactory = builder.translogFactory; this.leafSorter = builder.leafSorter; } @@ -495,11 +495,11 @@ public boolean isReadOnlyReplica() { } /** - * Returns the underlying primaryModeSupplier. + * Returns the underlying startedPrimarySupplier. * @return the primary mode supplier. */ - public BooleanSupplier getPrimaryModeSupplier() { - return primaryModeSupplier; + public BooleanSupplier getStartedPrimarySupplier() { + return startedPrimarySupplier; } /** @@ -577,7 +577,7 @@ public static class Builder { private TombstoneDocSupplier tombstoneDocSupplier; private TranslogDeletionPolicyFactory translogDeletionPolicyFactory; private boolean isReadOnlyReplica; - private BooleanSupplier primaryModeSupplier; + private BooleanSupplier startedPrimarySupplier; private TranslogFactory translogFactory = new InternalTranslogFactory(); Comparator leafSorter; @@ -701,8 +701,8 @@ public Builder readOnlyReplica(boolean isReadOnlyReplica) { return this; } - public Builder primaryModeSupplier(BooleanSupplier primaryModeSupplier) { - this.primaryModeSupplier = primaryModeSupplier; + public Builder startedPrimarySupplier(BooleanSupplier startedPrimarySupplier) { + this.startedPrimarySupplier = startedPrimarySupplier; return this; } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index 38eea92b6c757..77e2f1c55201d 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -152,7 +152,7 @@ public EngineConfig newEngineConfig( LongSupplier primaryTermSupplier, EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, boolean isReadOnlyReplica, - BooleanSupplier primaryModeSupplier, + BooleanSupplier startedPrimarySupplier, TranslogFactory translogFactory, Comparator leafSorter ) { @@ -185,7 +185,7 @@ public EngineConfig newEngineConfig( .primaryTermSupplier(primaryTermSupplier) .tombstoneDocSupplier(tombstoneDocSupplier) .readOnlyReplica(isReadOnlyReplica) - .primaryModeSupplier(primaryModeSupplier) + .startedPrimarySupplier(startedPrimarySupplier) .translogFactory(translogFactory) .leafSorter(leafSorter) .build(); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 8e1627af274c5..650cb4688f9bf 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -292,7 +292,7 @@ public void onFailure(String reason, Exception ex) { new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId), this::ensureOpen, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ); this.translogManager = translogManagerRef; this.softDeletesPolicy = newSoftDeletesPolicy(); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 3b55c4e648f1c..ed8dba2f8902d 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -125,7 +125,7 @@ public void onAfterTranslogSync() { }, this, engineConfig.getTranslogFactory(), - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ); this.translogManager = translogManagerRef; success = true; diff --git a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java index ca31d5518df47..9071b0e7a1eb3 100644 --- a/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NoOpEngine.java @@ -203,7 +203,7 @@ public void trimUnreferencedTranslogFiles() throws TranslogException { engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {}, - engineConfig.getPrimaryModeSupplier() + engineConfig.getStartedPrimarySupplier() ) ) { translog.trimUnreferencedReaders(); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 80cef214a08cd..7ff3145055df8 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -278,7 +278,7 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}, - config.getPrimaryModeSupplier() + config.getStartedPrimarySupplier() ) ) { return translog.stats(); diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 85d744e58265f..675d60ec2b63d 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -21,7 +21,7 @@ * * @opensearch.internal */ -public class CheckpointRefreshListener extends CloseableRetryableRefreshListener { +public class CheckpointRefreshListener extends ReleasableRetryableRefreshListener { protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 405845d58f227..7f9e5f31d1976 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -851,6 +851,9 @@ public void relocated( final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; + // The below list of releasable ensures that if the relocation does not happen, we undo the activity of close and + // acquire all permits. This will ensure that the remote store uploads can still be done by the existing primary shard. + List releasablesOnHandoffFailures = new ArrayList<>(2); try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { forceRefreshes.close(); @@ -863,11 +866,15 @@ public void relocated( maybeSync(); } - // Ensures all in-flight remote store operations drain, before we perform the handoff. - internalRefreshListener.stream() - .filter(refreshListener -> refreshListener instanceof Closeable) - .map(refreshListener -> (Closeable) refreshListener) - .close(); + // Ensures all in-flight remote store refreshes drain, before we perform the performSegRep. + for (ReferenceManager.RefreshListener refreshListener : internalRefreshListener) { + if (refreshListener instanceof ReleasableRetryableRefreshListener) { + releasablesOnHandoffFailures.add(((ReleasableRetryableRefreshListener) refreshListener).drainRefreshes()); + } + } + + // Ensure all in-flight remote store translog upload drains, before we perform the performSegRep. + releasablesOnHandoffFailures.add(getEngine().translogManager().drainSync()); // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED @@ -902,6 +909,13 @@ public void relocated( // Fail primary relocation source and target shards. failShard("timed out waiting for relocation hand-off to complete", null); throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete"); + } catch (Exception ex) { + assert replicationTracker.isPrimaryMode(); + // If the primary mode is still true after the end of handoff attempt, it basically means that the relocation + // failed. The existing primary will continue to be the primary, so we need to allow the segments and translog + // upload to resume. + Releasables.close(releasablesOnHandoffFailures); + throw ex; } } @@ -3871,10 +3885,10 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro circuitBreakerService, globalCheckpointSupplier, replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), + this::getOperationPrimaryTerm, tombstoneDocSupplier(), isReadOnlyReplica, - replicationTracker::isPrimaryMode, + this::isStartedPrimary, translogFactorySupplier.apply(indexSettings, shardRouting), isTimeSeriesDescSortOptimizationEnabled() ? DataStream.TIMESERIES_LEAF_SORTER : null // DESC @timestamp default order for // timeseries @@ -3889,6 +3903,15 @@ public boolean isRemoteTranslogEnabled() { return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled(); } + /** + * This checks if we are in state to upload to remote store. Until the cluster-manager informs the shard through + * cluster state, the shard will not be in STARTED state. This method is used to prevent pre-emptive segment or + * translog uploads. + */ + public boolean isStartedPrimary() { + return getReplicationTracker().isPrimaryMode() && state() == IndexShardState.STARTED; + } + /** * @return true if segment reverse search optimization is enabled for time series based workload. */ diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java similarity index 80% rename from server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java rename to server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java index 3ee74e5267718..757275932c5f1 100644 --- a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java @@ -10,10 +10,11 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; -import java.io.Closeable; import java.io.IOException; import java.util.Objects; import java.util.concurrent.Semaphore; @@ -22,11 +23,11 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener - * is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides + * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the {@code drainRefreshes()} + * is called, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides * necessary abstract methods to schedule retry. */ -public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable { +public abstract class ReleasableRetryableRefreshListener implements ReferenceManager.RefreshListener { /** * Total permits = 1 ensures that there is only single instance of runAfterRefreshWithPermit that is running at a time. @@ -34,6 +35,8 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana */ private static final int TOTAL_PERMITS = 1; + private static final TimeValue DRAIN_TIMEOUT = TimeValue.timeValueMinutes(10); + private final AtomicBoolean closed = new AtomicBoolean(false); private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS); @@ -45,11 +48,11 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana */ private final AtomicBoolean retryScheduled = new AtomicBoolean(false); - public CloseableRetryableRefreshListener() { + public ReleasableRetryableRefreshListener() { this.threadPool = null; } - public CloseableRetryableRefreshListener(ThreadPool threadPool) { + public ReleasableRetryableRefreshListener(ThreadPool threadPool) { assert Objects.nonNull(threadPool); this.threadPool = threadPool; } @@ -184,23 +187,38 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) { */ protected abstract boolean performAfterRefreshWithPermit(boolean didRefresh); - @Override - public final void close() throws IOException { + public final Releasable drainRefreshes() { try { - if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { + TimeValue timeout = getDrainTimeout(); + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout.seconds(), TimeUnit.SECONDS)) { boolean result = closed.compareAndSet(false, true); assert result && semaphore.availablePermits() == 0; getLogger().info("All permits are acquired and refresh listener is closed"); + return Releasables.releaseOnce(() -> { + semaphore.release(TOTAL_PERMITS); + boolean wasClosed = closed.getAndSet(false); + assert semaphore.availablePermits() == TOTAL_PERMITS : "Available permits is " + semaphore.availablePermits(); + assert wasClosed : "RefreshListener is not closed before reopening it"; + getLogger().info("All permits are released and refresh listener is open"); + }); } else { - throw new TimeoutException("timeout while closing gated refresh listener"); + throw new TimeoutException("Timeout while acquiring all permits"); } } catch (InterruptedException | TimeoutException e) { - throw new RuntimeException("Failed to close the closeable retryable listener", e); + throw new RuntimeException("Failed to acquire all permits", e); } } protected abstract Logger getLogger(); + // Made available for unit testing purpose only + /** + * Returns the timeout which is used while draining refreshes. + */ + TimeValue getDrainTimeout() { + return DRAIN_TIMEOUT; + } + // Visible for testing /** * Returns if the retry is scheduled or not. @@ -210,4 +228,14 @@ public final void close() throws IOException { boolean getRetryScheduledStatus() { return retryScheduled.get(); } + + // Visible for testing + int availablePermits() { + return semaphore.availablePermits(); + } + + // Visible for testing + boolean isClosed() { + return closed.get(); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index b02d858ce8b39..d96a7e7c95ecf 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -54,7 +54,7 @@ * * @opensearch.internal */ -public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshListener { +public final class RemoteStoreRefreshListener extends ReleasableRetryableRefreshListener { private final Logger logger; @@ -484,11 +484,10 @@ private void initializeRemoteDirectoryOnTermUpdate() throws IOException { /** * This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the * returned value of this method for scheduling retries in syncSegments method. - * @return true iff primaryMode is true and index shard is not in closed state. + * @return true iff the shard is a started with primary mode true or it is local or snapshot recovery. */ private boolean isReadyForUpload() { - boolean isReady = (indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED) - || isLocalOrSnapshotRecovery(); + boolean isReady = indexShard.isStartedPrimary() || isLocalOrSnapshotRecovery(); if (isReady == false) { StringBuilder sb = new StringBuilder("Skipped syncing segments with"); @@ -501,11 +500,11 @@ private boolean isReadyForUpload() { if (indexShard.getEngineOrNull() != null) { sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName()); } - if (isLocalOrSnapshotRecovery() == false) { + if (indexShard.recoveryState() != null) { sb.append(" recoverySourceType=").append(indexShard.recoveryState().getRecoverySource().getType()); sb.append(" primary=").append(indexShard.shardRouting.primary()); } - logger.trace(sb.toString()); + logger.info(sb.toString()); } return isReady; } @@ -514,8 +513,8 @@ private boolean isLocalOrSnapshotRecovery() { // In this case when the primary mode is false, we need to upload segments to Remote Store // This is required in case of snapshots/shrink/ split/clone where we need to durable persist // all segments to remote before completing the recovery to ensure durability. - return (indexShard.state() == IndexShardState.RECOVERING && indexShard.shardRouting.primary()) + && indexShard.recoveryState() != null && (indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS || indexShard.recoveryState().getRecoverySource().getType() == RecoverySource.Type.SNAPSHOT); } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java index d7be1250c0b5b..415d7dc4d1a9d 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java @@ -28,7 +28,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { return new LocalTranslog( diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 4d0fc13d433c6..a22c538286a88 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -58,7 +59,7 @@ public InternalTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifeCycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { this.shardId = shardId; this.readLock = readLock; @@ -71,7 +72,7 @@ public InternalTranslogManager( if (tracker != null) { tracker.markSeqNoAsPersisted(seqNo); } - }, translogUUID, translogFactory, primaryModeSupplier); + }, translogUUID, translogFactory, startedPrimarySupplier); assert translog.getGeneration() != null; this.translog = translog; assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it"; @@ -301,10 +302,16 @@ public void setMinSeqNoToKeep(long seqNo) { translog.setMinSeqNoToKeep(seqNo); } + @Override public void onDelete() { translog.onDelete(); } + @Override + public Releasable drainSync() { + return translog.drainSync(); + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return translog.getGeneration(); @@ -362,7 +369,7 @@ protected Translog openTranslog( LongConsumer persistedSequenceNumberConsumer, String translogUUID, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { return translogFactory.newTranslog( translogConfig, @@ -371,7 +378,7 @@ protected Translog openTranslog( globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer, - primaryModeSupplier + startedPrimarySupplier ); } diff --git a/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java index 22dba3973cfc1..7664631e0ed07 100644 --- a/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/LocalTranslog.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -140,6 +141,11 @@ public TranslogStats stats() { } } + @Override + Releasable drainSync() { + return () -> {}; // noop + } + @Override public void close() throws IOException { assert Translog.calledFromOutsideOrViaTragedyClose() diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 3e6a8e69edfbb..b4aa7865570a6 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -8,6 +8,7 @@ package org.opensearch.index.translog; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.index.shard.ShardId; @@ -121,8 +122,14 @@ public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolea throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs"); } + @Override public void onDelete() {} + @Override + public Releasable drainSync() { + return () -> {}; + } + @Override public Translog.TranslogGeneration getTranslogGeneration() { return null; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index 1e2cb388e690e..e100ffaabf13d 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -59,7 +59,7 @@ public Translog newTranslog( LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier, LongConsumer persistedSequenceNumberConsumer, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; @@ -73,7 +73,7 @@ public Translog newTranslog( persistedSequenceNumberConsumer, blobStoreRepository, threadPool, - primaryModeSupplier, + startedPrimarySupplier, remoteTranslogTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 8fb420e8fa1da..7b969a37e4aa6 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,8 +9,10 @@ package org.opensearch.index.translog; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; @@ -38,6 +40,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; @@ -53,10 +58,9 @@ public class RemoteFsTranslog extends Translog { private final Logger logger; - private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; - private final BooleanSupplier primaryModeSupplier; + private final BooleanSupplier startedPrimarySupplier; private final RemoteTranslogTransferTracker remoteTranslogTransferTracker; private volatile long maxRemoteTranslogGenerationUploaded; @@ -75,6 +79,11 @@ public class RemoteFsTranslog extends Translog { // Semaphore used to allow only single remote generation to happen at a time private final Semaphore remoteGenerationDeletionPermits = new Semaphore(REMOTE_DELETION_PERMITS); + // These permits exist to allow any inflight background triggered upload. + private static final int SYNC_PERMIT = 1; + private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT); + private final AtomicBoolean pauseSync = new AtomicBoolean(false); + public RemoteFsTranslog( TranslogConfig config, String translogUUID, @@ -84,13 +93,12 @@ public RemoteFsTranslog( LongConsumer persistedSequenceNumberConsumer, BlobStoreRepository blobStoreRepository, ThreadPool threadPool, - BooleanSupplier primaryModeSupplier, + BooleanSupplier startedPrimarySupplier, RemoteTranslogTransferTracker remoteTranslogTransferTracker ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); logger = Loggers.getLogger(getClass(), shardId); - this.blobStoreRepository = blobStoreRepository; - this.primaryModeSupplier = primaryModeSupplier; + this.startedPrimarySupplier = startedPrimarySupplier; this.remoteTranslogTransferTracker = remoteTranslogTransferTracker; fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker); this.translogTransferManager = buildTranslogTransferManager( @@ -267,6 +275,16 @@ public void rollGeneration() throws IOException { } private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException { + // During primary relocation, both the old and new primary have engine created with RemoteFsTranslog and having + // ReplicationTracker.primaryMode() as true. However, before we perform the `internal:index/shard/replication/segments_sync` + // action which re-downloads the segments and translog on the new primary. We are ensuring 2 things here - + // 1. Using startedPrimarySupplier, we prevent the new primary to do pre-emptive syncs + // 2. Using syncPermits, we prevent syncs at the desired time during primary relocation. + if (startedPrimarySupplier.getAsBoolean() == false || syncPermit.tryAcquire(SYNC_PERMIT) == false) { + logger.debug("skipped uploading translog for {} {} syncPermits={}", primaryTerm, generation, syncPermit.availablePermits()); + // NO-OP + return false; + } long maxSeqNo = -1; try (Releasable ignored = writeLock.acquire()) { if (generation == null || generation == current.getGeneration()) { @@ -316,16 +334,6 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc } private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws IOException { - // During primary relocation (primary-primary peer recovery), both the old and the new primary have engine - // created with the RemoteFsTranslog. Both primaries are equipped to upload the translogs. The primary mode check - // below ensures that the real primary only is uploading. Before the primary mode is set as true for the new - // primary, the engine is reset to InternalEngine which also initialises the RemoteFsTranslog which in turns - // downloads all the translogs from remote store and does a flush before the relocation finishes. - if (primaryModeSupplier.getAsBoolean() == false) { - logger.debug("skipped uploading translog for {} {}", primaryTerm, generation); - // NO-OP - return true; - } logger.trace("uploading translog for {} {}", primaryTerm, generation); try ( TranslogCheckpointTransferSnapshot transferSnapshotProvider = new TranslogCheckpointTransferSnapshot.Builder( @@ -341,6 +349,8 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws transferSnapshotProvider, new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo) ); + } finally { + syncPermit.release(SYNC_PERMIT); } } @@ -423,11 +433,39 @@ protected void setMinSeqNoToKeep(long seqNo) { this.minSeqNoToKeep = seqNo; } + @Override + protected Releasable drainSync() { + try { + if (syncPermit.tryAcquire(SYNC_PERMIT, 1, TimeUnit.MINUTES)) { + boolean result = pauseSync.compareAndSet(false, true); + assert result && syncPermit.availablePermits() == 0; + logger.info("All inflight remote translog syncs finished and further syncs paused"); + return Releasables.releaseOnce(() -> { + syncPermit.release(SYNC_PERMIT); + boolean wasSyncPaused = pauseSync.getAndSet(false); + assert syncPermit.availablePermits() == SYNC_PERMIT : "Available permits is " + syncPermit.availablePermits(); + assert wasSyncPaused : "RemoteFsTranslog sync was not paused before re-enabling it"; + logger.info("Resumed remote translog sync back on relocation failure"); + }); + } else { + throw new TimeoutException("Timeout while acquiring all permits"); + } + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException("Failed to acquire all permits", e); + } + } + @Override public void trimUnreferencedReaders() throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); + // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote + // store. + if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) { + return; + } + // Since remote generation deletion is async, this ensures that only one generation deletion happens at a time. // Remote generations involves 2 async operations - 1) Delete translog generation files 2) Delete metadata files // We try to acquire 2 permits and if we can not, we return from here itself. @@ -505,11 +543,7 @@ public static void cleanup(Repository repository, ShardId shardId, ThreadPool th } protected void onDelete() { - if (primaryModeSupplier.getAsBoolean() == false) { - logger.trace("skipped delete translog"); - // NO-OP - return; - } + ClusterService.assertClusterOrClusterManagerStateThread(); // clean up all remote translog files translogTransferManager.delete(); } @@ -570,4 +604,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minSeqNoToKeep; } + + // Visible for testing + int availablePermits() { + return syncPermit.availablePermits(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 8b4662238ed25..9f877e87415dd 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1817,6 +1817,11 @@ protected void setMinSeqNoToKeep(long seqNo) {} protected void onDelete() {} + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + abstract Releasable drainSync(); + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java index a139b10f563b2..4300435093b5d 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogFactory.java @@ -32,6 +32,6 @@ Translog newTranslog( final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier, final LongConsumer persistedSequenceNumberConsumer, - final BooleanSupplier primaryModeSupplier + final BooleanSupplier startedPrimarySupplier ) throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 4279e0289c1dc..e1a0b7d1c1293 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.annotation.PublicApi; +import org.opensearch.common.lease.Releasable; import java.io.IOException; import java.util.stream.Stream; @@ -135,5 +136,10 @@ public interface TranslogManager { */ void onDelete(); + /** + * Drains ongoing syncs to the underlying store. It returns a releasable which can be closed to resume the syncs back. + */ + Releasable drainSync(); + Translog.TranslogGeneration getTranslogGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index adeeb213b2913..a7a524ad78e95 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -38,7 +38,7 @@ public WriteOnlyTranslogManager( TranslogEventListener translogEventListener, LifecycleAware engineLifecycleAware, TranslogFactory translogFactory, - BooleanSupplier primaryModeSupplier + BooleanSupplier startedPrimarySupplier ) throws IOException { super( translogConfig, @@ -52,7 +52,7 @@ public WriteOnlyTranslogManager( translogEventListener, engineLifecycleAware, translogFactory, - primaryModeSupplier + startedPrimarySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java similarity index 66% rename from server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java rename to server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java index 01242063caa77..a0641c365a2a1 100644 --- a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReleasableRetryableRefreshListenerTests.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; @@ -26,9 +27,9 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class CloseableRetryableRefreshListenerTests extends OpenSearchTestCase { +public class ReleasableRetryableRefreshListenerTests extends OpenSearchTestCase { - private static final Logger logger = LogManager.getLogger(CloseableRetryableRefreshListenerTests.class); + private static final Logger logger = LogManager.getLogger(ReleasableRetryableRefreshListenerTests.class); private ThreadPool threadPool; @@ -43,7 +44,7 @@ public void init() { public void testPerformAfterRefresh() throws IOException { CountDownLatch countDownLatch = new CountDownLatch(2); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -66,7 +67,7 @@ protected Logger getLogger() { // Second invocation of afterRefresh method testRefreshListener.afterRefresh(true); assertEquals(0, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -75,7 +76,7 @@ protected Logger getLogger() { public void testCloseAfterRefresh() throws IOException { final int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -98,7 +99,7 @@ protected Logger getLogger() { assertEquals(initialCount - refreshCount, countDownLatch.getCount()); // Closing the refresh listener so that no further afterRefreshes are executed going forward - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); for (int i = 0; i < initialCount - refreshCount; i++) { testRefreshListener.afterRefresh(true); @@ -112,7 +113,7 @@ protected Logger getLogger() { public void testNoRetry() throws IOException { int initialCount = randomIntBetween(10, 100); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -129,9 +130,9 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 1, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -148,9 +149,9 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 2, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -172,9 +173,9 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 3, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); - testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -196,7 +197,7 @@ protected Logger getLogger() { }; testRefreshListener.afterRefresh(true); assertEquals(initialCount - 4, countDownLatch.getCount()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -205,7 +206,7 @@ protected Logger getLogger() { public void testRetry() throws Exception { int initialCount = randomIntBetween(10, 20); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -237,7 +238,7 @@ protected boolean isRetryEnabled() { }; testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); } /** @@ -246,7 +247,7 @@ protected boolean isRetryEnabled() { public void testCloseWithRetryPending() throws IOException { int initialCount = randomIntBetween(10, 20); final CountDownLatch countDownLatch = new CountDownLatch(initialCount); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { countDownLatch.countDown(); @@ -272,13 +273,14 @@ protected Logger getLogger() { } }; testRefreshListener.afterRefresh(randomBoolean()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); assertNotEquals(0, countDownLatch.getCount()); + assertRefreshListenerClosed(testRefreshListener); } public void testCloseWaitsForAcquiringAllPermits() throws Exception { final CountDownLatch countDownLatch = new CountDownLatch(1); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -307,13 +309,14 @@ protected Logger getLogger() { }); thread.start(); assertBusy(() -> assertEquals(0, countDownLatch.getCount())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testScheduleRetryAfterClose() throws Exception { // This tests that once the listener has been closed, even the retries would not be scheduled. final AtomicLong runCount = new AtomicLong(); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { try { @@ -358,8 +361,8 @@ protected TimeValue getNextRetryInterval() { Thread thread2 = new Thread(() -> { try { Thread.sleep(500); - testRefreshListener.close(); - } catch (IOException | InterruptedException e) { + testRefreshListener.drainRefreshes(); + } catch (InterruptedException e) { throw new AssertionError(e); } }); @@ -368,13 +371,14 @@ protected TimeValue getNextRetryInterval() { thread1.join(); thread2.join(); assertBusy(() -> assertEquals(1, runCount.get())); + assertRefreshListenerClosed(testRefreshListener); } public void testConcurrentScheduleRetry() throws Exception { // This tests that there can be only 1 retry that can be scheduled at a time. final AtomicLong runCount = new AtomicLong(); final AtomicInteger retryCount = new AtomicInteger(0); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(threadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { retryCount.incrementAndGet(); @@ -408,7 +412,8 @@ protected boolean isRetryEnabled() { testRefreshListener.afterRefresh(true); testRefreshListener.afterRefresh(true); assertBusy(() -> assertEquals(3, runCount.get())); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); } public void testExceptionDuringThreadPoolSchedule() throws Exception { @@ -417,7 +422,7 @@ public void testExceptionDuringThreadPoolSchedule() throws Exception { AtomicInteger runCount = new AtomicInteger(); ThreadPool mockThreadPool = mock(ThreadPool.class); when(mockThreadPool.schedule(any(), any(), any())).thenThrow(new RuntimeException()); - CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mockThreadPool) { + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mockThreadPool) { @Override protected boolean performAfterRefreshWithPermit(boolean didRefresh) { runCount.incrementAndGet(); @@ -450,7 +455,121 @@ protected boolean isRetryEnabled() { assertThrows(RuntimeException.class, () -> testRefreshListener.afterRefresh(true)); assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus())); assertEquals(1, runCount.get()); - testRefreshListener.close(); + testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + } + + public void testTimeoutDuringClose() throws Exception { + // This test checks the expected behaviour when the drainRefreshes times out. + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(1); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + thread1.join(); + } + + public void testThreadInterruptDuringClose() throws Exception { + // This test checks the expected behaviour when the thread performing the drainRefresh is interrupted. + CountDownLatch latch = new CountDownLatch(2); + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + try { + Thread.sleep(TimeValue.timeValueSeconds(2).millis()); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + + @Override + TimeValue getDrainTimeout() { + return TimeValue.timeValueSeconds(2); + } + }; + Thread thread1 = new Thread(() -> { + try { + testRefreshListener.afterRefresh(true); + latch.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + Thread thread2 = new Thread(() -> { + RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes); + assertEquals("Failed to acquire all permits", ex.getMessage()); + latch.countDown(); + }); + thread1.start(); + assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits())); + thread2.start(); + thread2.interrupt(); + thread1.join(); + thread2.join(); + assertEquals(0, latch.getCount()); + } + + public void testResumeRefreshesAfterDrainRefreshes() { + // This test checks the expected behaviour when the refresh listener is drained, but then refreshes are resumed again + // by closing the releasables acquired by calling the drainRefreshes method. + ReleasableRetryableRefreshListener testRefreshListener = new ReleasableRetryableRefreshListener(mock(ThreadPool.class)) { + @Override + protected boolean performAfterRefreshWithPermit(boolean didRefresh) { + return true; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + assertRefreshListenerOpen(testRefreshListener); + Releasable releasable = testRefreshListener.drainRefreshes(); + assertRefreshListenerClosed(testRefreshListener); + releasable.close(); + assertRefreshListenerOpen(testRefreshListener); } @After @@ -458,4 +577,14 @@ public void tearDown() throws Exception { super.tearDown(); terminate(threadPool); } + + private void assertRefreshListenerClosed(ReleasableRetryableRefreshListener testRefreshListener) { + assertTrue(testRefreshListener.isClosed()); + assertEquals(0, testRefreshListener.availablePermits()); + } + + private void assertRefreshListenerOpen(ReleasableRetryableRefreshListener testRefreshListener) { + assertFalse(testRefreshListener.isClosed()); + assertEquals(1, testRefreshListener.availablePermits()); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 74da9a3fff19c..811d6a722d0f6 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -320,7 +320,7 @@ public void testRefreshSuccessOnFirstAttempt() throws Exception { // This is the case of isRetry=false, shouldRetry=false // Succeed on 1st attempt int succeedOnAttempt = 1; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -341,7 +341,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { // This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false // Succeed on 2nd attempt int succeedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -365,7 +365,7 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception { public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception { int succeedOnAttempt = 1; int checkpointPublishSucceedOnAttempt = 2; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 6 as during a successful upload IndexShard.getEngine() is hit thrice and here we are running the flow twice @@ -387,7 +387,7 @@ public void testRefreshSuccessOnThirdAttempt() throws Exception { // This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true // Succeed on 3rd attempt int succeedOnAttempt = 3; - // We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation. + // We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation. CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt); // We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload. // Value has been set as 3 as during a successful upload IndexShard.getEngine() is hit thrice and with mockito we are counting down @@ -479,6 +479,7 @@ private Tuple mockIn IndexShard shard = mock(IndexShard.class); Store store = mock(Store.class); when(shard.store()).thenReturn(store); + when(shard.state()).thenReturn(IndexShardState.STARTED); when(store.directory()).thenReturn(indexShard.store().directory()); // Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) @@ -500,13 +501,12 @@ private Tuple mockIn when(shard.getThreadPool()).thenReturn(threadPool); // Mock indexShard.getReplicationTracker().isPrimaryMode() - doAnswer(invocation -> { if (Objects.nonNull(refreshCountLatch)) { refreshCountLatch.countDown(); } - return indexShard.getReplicationTracker(); - }).when(shard).getReplicationTracker(); + return true; + }).when(shard).isStartedPrimary(); AtomicLong counter = new AtomicLong(); // Mock indexShard.getSegmentInfosSnapshot() diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index 3cb65610fab58..6bfab278993ed 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -25,6 +25,8 @@ import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lease.Releasables; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -125,7 +127,6 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { private ThreadPool threadPool; private final static String METADATA_DIR = "metadata"; private final static String DATA_DIR = "data"; - AtomicInteger writeCalls = new AtomicInteger(); BlobStoreRepository repository; @@ -133,6 +134,8 @@ public class RemoteFsTranslogTests extends OpenSearchTestCase { TestTranslog.FailSwitch fail; + TestTranslog.SlowDownWriteSwitch slowDown; + private LongConsumer getPersistedSeqNoConsumer() { return seqNo -> { final LongConsumer consumer = persistedSeqNoConsumer.get(); @@ -228,13 +231,15 @@ private BlobStoreRepository createRepository() { final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); fail = new TestTranslog.FailSwitch(); fail.failNever(); + slowDown = new TestTranslog.SlowDownWriteSwitch(); final FsRepository repository = new ThrowingBlobRepository( repositoryMetadata, createEnvironment(), xContentRegistry(), clusterService, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), - fail + fail, + slowDown ) { @Override protected void assertSnapshotOrGenericThread() { @@ -819,6 +824,79 @@ public void testMetadataFileDeletion() throws Exception { } } + public void testDrainSync() throws Exception { + // This test checks following scenarios - + // 1. During ongoing uploads, the available permits are 0. + // 2. During an upload, if drainSync is called, it will wait for it to acquire and available permits are 0. + // 3. After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store. + // 4. After drainSync, if an upload is an attempted, we do not upload to remote store. + ArrayList ops = new ArrayList<>(); + assertEquals(0, translog.allUploaded().size()); + assertEquals(1, translog.readers.size()); + + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(0), 0, primaryTerm.get(), new byte[] { 1 })); + assertEquals(4, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + + translog.setMinSeqNoToKeep(0); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + + // Case 1 - During ongoing uploads, the available permits are 0. + slowDown.setSleepSeconds(2); + CountDownLatch latch = new CountDownLatch(1); + Thread thread1 = new Thread(() -> { + try { + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(1), 1, primaryTerm.get(), new byte[] { 1 })); + assertEquals(2, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + latch.countDown(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread1.start(); + assertBusy(() -> assertEquals(0, translog.availablePermits())); + // Case 2 - During an upload, if drainSync is called, it will wait for it to acquire and available permits are 0. + Releasable releasable = translog.drainSync(); + assertBusy(() -> assertEquals(0, latch.getCount())); + assertEquals(0, translog.availablePermits()); + slowDown.setSleepSeconds(0); + assertEquals(6, translog.allUploaded().size()); + assertEquals(2, translog.readers.size()); + Set mdFiles = blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)); + + // Case 3 - After drainSync, if trimUnreferencedReaders is attempted, we do not delete from remote store. + translog.setMinSeqNoToKeep(1); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); + + // Case 4 - After drainSync, if an upload is an attempted, we do not upload to remote store. + Translog.Location loc = addToTranslogAndListAndUpload( + translog, + ops, + new Translog.Index(String.valueOf(2), 2, primaryTerm.get(), new byte[] { 1 }) + ); + assertEquals(1, translog.readers.size()); + assertEquals(6, translog.allUploaded().size()); + assertEquals(mdFiles, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR))); + + // Refill the permits back + Releasables.close(releasable); + addToTranslogAndListAndUpload(translog, ops, new Translog.Index(String.valueOf(3), 3, primaryTerm.get(), new byte[] { 1 })); + assertEquals(2, translog.readers.size()); + assertEquals(8, translog.allUploaded().size()); + assertEquals(3, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + + translog.setMinSeqNoToKeep(3); + translog.trimUnreferencedReaders(); + assertEquals(1, translog.readers.size()); + assertBusy(() -> assertEquals(4, translog.allUploaded().size())); + assertBusy(() -> assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size())); + } + private BlobPath getTranslogDirectory() { return repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add(TRANSLOG); } @@ -1624,9 +1702,10 @@ public void testDownloadWithRetries() throws IOException { } public class ThrowingBlobRepository extends FsRepository { - private final Environment environment; - private TestTranslog.FailSwitch fail; + private final Environment environment; + private final TestTranslog.FailSwitch fail; + private final TestTranslog.SlowDownWriteSwitch slowDown; public ThrowingBlobRepository( RepositoryMetadata metadata, @@ -1634,33 +1713,43 @@ public ThrowingBlobRepository( NamedXContentRegistry namedXContentRegistry, ClusterService clusterService, RecoverySettings recoverySettings, - TestTranslog.FailSwitch fail + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown ) { super(metadata, environment, namedXContentRegistry, clusterService, recoverySettings); this.environment = environment; this.fail = fail; + this.slowDown = slowDown; } protected BlobStore createBlobStore() throws Exception { final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings()); final Path locationFile = environment.resolveRepoFile(location); - return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail); + return new ThrowingBlobStore(bufferSize, locationFile, isReadOnly(), fail, slowDown); } } private class ThrowingBlobStore extends FsBlobStore { - private TestTranslog.FailSwitch fail; + private final TestTranslog.FailSwitch fail; + private final TestTranslog.SlowDownWriteSwitch slowDown; - public ThrowingBlobStore(int bufferSizeInBytes, Path path, boolean readonly, TestTranslog.FailSwitch fail) throws IOException { + public ThrowingBlobStore( + int bufferSizeInBytes, + Path path, + boolean readonly, + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown + ) throws IOException { super(bufferSizeInBytes, path, readonly); this.fail = fail; + this.slowDown = slowDown; } @Override public BlobContainer blobContainer(BlobPath path) { try { - return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail); + return new ThrowingBlobContainer(this, path, buildAndCreate(path), fail, slowDown); } catch (IOException ex) { throw new OpenSearchException("failed to create blob container", ex); } @@ -1670,17 +1759,33 @@ public BlobContainer blobContainer(BlobPath path) { private class ThrowingBlobContainer extends FsBlobContainer { private TestTranslog.FailSwitch fail; - - public ThrowingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path, TestTranslog.FailSwitch fail) { + private final TestTranslog.SlowDownWriteSwitch slowDown; + + public ThrowingBlobContainer( + FsBlobStore blobStore, + BlobPath blobPath, + Path path, + TestTranslog.FailSwitch fail, + TestTranslog.SlowDownWriteSwitch slowDown + ) { super(blobStore, blobPath, path); this.fail = fail; + this.slowDown = slowDown; } + @Override public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) throws IOException { if (fail.fail()) { throw new IOException("blob container throwing error"); } + if (slowDown.getSleepSeconds() > 0) { + try { + Thread.sleep(slowDown.getSleepSeconds() * 1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } super.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/server/src/test/java/org/opensearch/index/translog/TestTranslog.java b/server/src/test/java/org/opensearch/index/translog/TestTranslog.java index fd4be1d7a8635..01c8844b51b02 100644 --- a/server/src/test/java/org/opensearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/opensearch/index/translog/TestTranslog.java @@ -335,6 +335,18 @@ public void onceFailedFailAlways() { } } + static class SlowDownWriteSwitch { + private volatile int sleepSeconds; + + public void setSleepSeconds(int sleepSeconds) { + this.sleepSeconds = sleepSeconds; + } + + public int getSleepSeconds() { + return sleepSeconds; + } + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null;