From 4a0112161228efa9597a03f83df1de8512bc3cef Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 13 Sep 2023 10:37:36 +0530 Subject: [PATCH] Always create empty translog on replica for remote store enabled index Signed-off-by: Sachin Kale --- .../opensearch/remotestore/RemoteStoreIT.java | 70 +++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 34 ++++++--- .../index/translog/TranslogHeader.java | 6 +- 3 files changed, 99 insertions(+), 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index bd019693f01ff..165aa1e8347ad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexRequest; import org.opensearch.action.admin.indices.get.GetIndexResponse; @@ -33,16 +34,20 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP; import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.comparesEqualTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.oneOf; @@ -346,4 +351,69 @@ private void clearClusterBufferIntervalSetting(String clusterManagerName) { .setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())) .get(); } + + public void testAnotherUUID() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository("test-repo").setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + + logger.info("--> Create index and ingest 50 docs"); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + indexBulk(INDEX_NAME, 50); + flushAndRefresh(INDEX_NAME); + + String originalIndexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + assertNotNull(originalIndexUUID); + assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, originalIndexUUID); + + ensureGreen(); + + logger.info("--> take a snapshot"); + client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices(INDEX_NAME).setWaitForCompletion(true).get(); + + logger.info("--> wipe all indices"); + cluster().wipeIndices(INDEX_NAME); + + logger.info("--> Create index with the same name, different UUID"); + assertAcked( + prepareCreate(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + ensureGreen(TimeValue.timeValueSeconds(30), INDEX_NAME); + + String newIndexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + assertNotNull(newIndexUUID); + assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, newIndexUUID); + assertNotEquals(newIndexUUID, originalIndexUUID); + + logger.info("--> close index"); + client().admin().indices().prepareClose(INDEX_NAME).get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true) + .execute() + .actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + flushAndRefresh(INDEX_NAME); + + ensureGreen(INDEX_NAME); + assertBusy(() -> { + assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + }); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 71df5467ada7b..f14465a55a80e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -197,6 +197,7 @@ import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; import java.util.ArrayList; @@ -2363,16 +2364,29 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) { syncSegmentsFromRemoteSegmentStore(false); } - if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { - if (syncFromRemote) { - syncRemoteTranslogAndUpdateGlobalCheckpoint(); - } else { - // we will enter this block when we do not want to recover from remote translog. - // currently only during snapshot restore, we are coming into this block. - // here, as while initiliazing remote translog we cannot skip downloading translog files, - // so before that step, we are deleting the translog files present in remote store. - deleteTranslogFilesFromRemoteTranslog(); - + if (indexSettings.isRemoteTranslogStoreEnabled()) { + if (shardRouting.primary()) { + if (syncFromRemote) { + syncRemoteTranslogAndUpdateGlobalCheckpoint(); + } else { + // we will enter this block when we do not want to recover from remote translog. + // currently only during snapshot restore, we are coming into this block. + // here, as while initiliazing remote translog we cannot skip downloading translog files, + // so before that step, we are deleting the translog files present in remote store. + deleteTranslogFilesFromRemoteTranslog(); + } + } else if (syncFromRemote) { + final SegmentInfos lastCommittedSegmentInfos = store().readLastCommittedSegmentsInfo(); + final String translogUUID = lastCommittedSegmentInfos.userData.get(TRANSLOG_UUID_KEY); + final long checkpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + Translog.createEmptyTranslog( + shardPath().resolveTranslog(), + shardId(), + checkpoint, + getPendingPrimaryTerm(), + translogUUID, + FileChannel::open + ); } } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java index 42bda11d75783..7b5be9505f27a 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogHeader.java @@ -147,7 +147,11 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil if (actualUUID.bytesEquals(expectedUUID) == false) { throw new TranslogCorruptedException( path.toString(), - "expected shard UUID " + expectedUUID + " but got: " + actualUUID + " this translog file belongs to a different translog" + "expected shard UUID " + + translogUUID + + " but got: " + + translogHeader.translogUUID + + " this translog file belongs to a different translog" ); } return translogHeader;