From 2f57b5b19fc894f0f448c5b706ff76e2c5f12829 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 10:13:23 +0530 Subject: [PATCH 1/5] Sync translog to remote on primary activate Signed-off-by: Sachin Kale --- .../remotestore/RemoteRestoreSnapshotIT.java | 77 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 19 ++++- 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 865b2d13f189e..394e86df6903c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -11,6 +11,7 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.delete.DeleteResponse; @@ -20,8 +21,13 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; @@ -32,11 +38,15 @@ import org.junit.Before; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; @@ -345,6 +355,73 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4); } + public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + + String indexName1 = "testindex1"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true)); + + Settings indexSettings = getIndexSettings(1, 0).build(); + createIndex(indexName1, indexSettings); + + final int numDocsInIndex1 = randomIntBetween(20, 30); + indexDocuments(client(), indexName1, numDocsInIndex1); + flushAndRefresh(indexName1); + ensureGreen(indexName1); + + logger.info("--> snapshot"); + SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1))); + assertThat(snapshotInfo1.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards())); + assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS)); + + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get()); + assertFalse(indexExists(indexName1)); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client().admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIndices(indexName1) + .get(); + + assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED); + ensureGreen(indexName1); + assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + + // Clear the local data before stopping the node. + IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1); + try (Stream files = Files.list(indexShard.shardPath().resolveTranslog())) { + IOUtils.deleteFilesIgnoringExceptions(files.collect(Collectors.toList())); + } + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1))); + + ensureRed(indexName1); + + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1).restoreAllShards(false), PlainActionFuture.newFuture()); + + ensureGreen(indexName1); + assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + assertNotNull(indexService); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return shardId.map(indexService::getShard).orElse(null); + } + public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String primary = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f990a3b56e856..7c187c5bd8143 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -640,7 +640,7 @@ public void updateShardState( if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) { // the cluster-manager started a recovering primary, activate primary mode. replicationTracker.activatePrimaryMode(getLocalCheckpoint()); - ensurePeerRecoveryRetentionLeasesExist(); + postActivatePrimaryMode(); } } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; @@ -711,8 +711,7 @@ public void updateShardState( // are brought up to date. checkpointPublisher.publish(this, getLatestReplicationCheckpoint()); } - - ensurePeerRecoveryRetentionLeasesExist(); + postActivatePrimaryMode(); /* * If this shard was serving as a replica shard when another shard was promoted to primary then * its Lucene index was reset during the primary term transition. In particular, the Lucene index @@ -3393,6 +3392,20 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingE synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } + postActivatePrimaryMode(); + } + + private void postActivatePrimaryMode() { + if (indexSettings.isRemoteStoreEnabled()) { + // We make sure to upload translog (even if it does not contain any operations) to remote translog. + // This helps to get a consistent state in remote store where both remote segment store and remote + // translog contains data. + try { + getEngine().translogManager().syncTranslog(); + } catch (IOException e) { + throw new IllegalStateException("Failed to sync translog to remote from new primary", e); + } + } ensurePeerRecoveryRetentionLeasesExist(); } From 6a64edb7002a0ee90b1229f413544e2f23fac2b4 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 13:03:11 +0530 Subject: [PATCH 2/5] Fix failing tests Signed-off-by: Sachin Kale --- .../org/opensearch/remotestore/RemoteRestoreSnapshotIT.java | 2 +- .../test/java/org/opensearch/index/shard/IndexShardTests.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 394e86df6903c..e8a38d8c70b15 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -396,7 +396,7 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); - // Clear the local data before stopping the node. + // Clear the local data before stopping the node. This will make sure that remote translog is empty. IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1); try (Stream files = Files.list(indexShard.shardPath().resolveTranslog())) { IOUtils.deleteFilesIgnoringExceptions(files.collect(Collectors.toList())); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 9ef9bec01cb38..fa3cf7676f55c 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2745,6 +2745,7 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw AllocationId.newRelocation(routing.allocationId()) ); IndexShardTestCase.updateRoutingEntry(indexShard, routing); + indexDoc(indexShard, "_doc", "0"); assertTrue(indexShard.isSyncNeeded()); try { indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {}); From e0a596a5576fe6c86e1ae63bafc08c0999231060 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 13:22:47 +0530 Subject: [PATCH 3/5] Do not throw exception on failure Signed-off-by: Sachin Kale --- server/src/main/java/org/opensearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7c187c5bd8143..049a0cdc8a1d3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3403,7 +3403,7 @@ private void postActivatePrimaryMode() { try { getEngine().translogManager().syncTranslog(); } catch (IOException e) { - throw new IllegalStateException("Failed to sync translog to remote from new primary", e); + logger.error("Failed to sync translog to remote from new primary", e); } } ensurePeerRecoveryRetentionLeasesExist(); From 7af013ea8d937abd09dedc6a9a064b09e7b2fb0d Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 15:30:10 +0530 Subject: [PATCH 4/5] Fix failing test Signed-off-by: Sachin Kale --- .../org/opensearch/remotestore/RemoteStoreStatsIT.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 8ae25c6758195..6fdef34645098 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -561,21 +561,23 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce .getRemoteStoreStats(); Arrays.stream(remoteStoreStats).forEach(statObject -> { RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats(); + RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats(); if (statObject.getShardRouting().primary()) { assertTrue( segmentStats.totalUploadsSucceeded == 1 && segmentStats.totalUploadsStarted == segmentStats.totalUploadsSucceeded && segmentStats.totalUploadsFailed == 0 ); + // On primary shard creation, we upload to remote translog post primary mode activation. + // This changes upload stats to non-zero for primary shard. + assertNonZeroTranslogUploadStatsNoFailures(translogStats); } else { assertTrue( segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0 && segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0 ); + assertZeroTranslogUploadStats(translogStats); } - - RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats(); - assertZeroTranslogUploadStats(translogStats); assertZeroTranslogDownloadStats(translogStats); }); }, 5, TimeUnit.SECONDS); From 27245ff93e87d0e464968fcfaa41893aef5b629d Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 17:52:56 +0530 Subject: [PATCH 5/5] Address PR comments Signed-off-by: Sachin Kale --- .../remotestore/RemoteRestoreSnapshotIT.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index e8a38d8c70b15..9e0b2a66467de 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -357,8 +357,7 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { internalCluster().startClusterManagerOnlyNode(); - internalCluster().startDataOnlyNode(); - internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNodes(2); String indexName1 = "testindex1"; String snapshotRepoName = "test-restore-snapshot-repo"; @@ -396,6 +395,24 @@ public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, Exe ensureGreen(indexName1); assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + // Make sure remote translog is empty + String indexUUID = client().admin() + .indices() + .prepareGetSettings(indexName1) + .get() + .getSetting(indexName1, IndexMetadata.SETTING_INDEX_UUID); + + Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata"); + Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data"); + + try ( + Stream translogMetadata = Files.list(remoteTranslogMetadataPath); + Stream translogData = Files.list(remoteTranslogDataPath) + ) { + assertTrue(translogData.count() > 0); + assertTrue(translogMetadata.count() > 0); + } + // Clear the local data before stopping the node. This will make sure that remote translog is empty. IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1); try (Stream files = Files.list(indexShard.shardPath().resolveTranslog())) {