From e38e455c0a09263925a7d4af6b707dec2ff48220 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 23 Oct 2023 10:13:23 +0530 Subject: [PATCH] Sync translog to remote on primary activate Signed-off-by: Sachin Kale --- .../remotestore/RemoteRestoreSnapshotIT.java | 77 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 19 ++++- 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 865b2d13f189e..394e86df6903c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -11,6 +11,7 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.delete.DeleteResponse; @@ -20,8 +21,13 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; @@ -32,11 +38,15 @@ import org.junit.Before; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; @@ -345,6 +355,73 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException { assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 4); } + public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); + + String indexName1 = "testindex1"; + String snapshotRepoName = "test-restore-snapshot-repo"; + String snapshotName1 = "test-restore-snapshot1"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true)); + + Settings indexSettings = getIndexSettings(1, 0).build(); + createIndex(indexName1, indexSettings); + + final int numDocsInIndex1 = randomIntBetween(20, 30); + indexDocuments(client(), indexName1, numDocsInIndex1); + flushAndRefresh(indexName1); + ensureGreen(indexName1); + + logger.info("--> snapshot"); + SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1))); + assertThat(snapshotInfo1.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards())); + assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS)); + + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get()); + assertFalse(indexExists(indexName1)); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client().admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepoName, snapshotName1) + .setWaitForCompletion(false) + .setIndices(indexName1) + .get(); + + assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED); + ensureGreen(indexName1); + assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + + // Clear the local data before stopping the node. + IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1); + try (Stream files = Files.list(indexShard.shardPath().resolveTranslog())) { + IOUtils.deleteFilesIgnoringExceptions(files.collect(Collectors.toList())); + } + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1))); + + ensureRed(indexName1); + + client().admin() + .cluster() + .restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1).restoreAllShards(false), PlainActionFuture.newFuture()); + + ensureGreen(indexName1); + assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1); + } + + protected IndexShard getIndexShard(String node, String indexName) { + final Index index = resolveIndex(indexName); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(index); + assertNotNull(indexService); + final Optional shardId = indexService.shardIds().stream().findFirst(); + return shardId.map(indexService::getShard).orElse(null); + } + public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); String primary = internalCluster().startDataOnlyNode(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f990a3b56e856..7c187c5bd8143 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -640,7 +640,7 @@ public void updateShardState( if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) { // the cluster-manager started a recovering primary, activate primary mode. replicationTracker.activatePrimaryMode(getLocalCheckpoint()); - ensurePeerRecoveryRetentionLeasesExist(); + postActivatePrimaryMode(); } } else { assert currentRouting.primary() == false : "term is only increased as part of primary promotion"; @@ -711,8 +711,7 @@ public void updateShardState( // are brought up to date. checkpointPublisher.publish(this, getLatestReplicationCheckpoint()); } - - ensurePeerRecoveryRetentionLeasesExist(); + postActivatePrimaryMode(); /* * If this shard was serving as a replica shard when another shard was promoted to primary then * its Lucene index was reset during the primary term transition. In particular, the Lucene index @@ -3393,6 +3392,20 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingE synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } + postActivatePrimaryMode(); + } + + private void postActivatePrimaryMode() { + if (indexSettings.isRemoteStoreEnabled()) { + // We make sure to upload translog (even if it does not contain any operations) to remote translog. + // This helps to get a consistent state in remote store where both remote segment store and remote + // translog contains data. + try { + getEngine().translogManager().syncTranslog(); + } catch (IOException e) { + throw new IllegalStateException("Failed to sync translog to remote from new primary", e); + } + } ensurePeerRecoveryRetentionLeasesExist(); }