Skip to content

Commit

Permalink
Always create empty translog on replica for remote store enabled index
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Sep 13, 2023
1 parent bf3d22a commit 4a01121
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
Expand All @@ -33,16 +34,20 @@
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;

Expand Down Expand Up @@ -346,4 +351,69 @@ private void clearClusterBufferIntervalSetting(String clusterManagerName) {
.setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()))
.get();
}

public void testAnotherUUID() throws Exception {
internalCluster().startClusterManagerOnlyNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);

Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository("test-repo").setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);

logger.info("--> Create index and ingest 50 docs");
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

String originalIndexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
assertNotNull(originalIndexUUID);
assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, originalIndexUUID);

ensureGreen();

logger.info("--> take a snapshot");
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setIndices(INDEX_NAME).setWaitForCompletion(true).get();

logger.info("--> wipe all indices");
cluster().wipeIndices(INDEX_NAME);

logger.info("--> Create index with the same name, different UUID");
assertAcked(
prepareCreate(INDEX_NAME).setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1))
);

ensureGreen(TimeValue.timeValueSeconds(30), INDEX_NAME);

String newIndexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
assertNotNull(newIndexUUID);
assertNotEquals(IndexMetadata.INDEX_UUID_NA_VALUE, newIndexUUID);
assertNotEquals(newIndexUUID, originalIndexUUID);

logger.info("--> close index");
client().admin().indices().prepareClose(INDEX_NAME).get();

logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.execute()
.actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));

flushAndRefresh(INDEX_NAME);

ensureGreen(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}
}
34 changes: 24 additions & 10 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
Expand Down Expand Up @@ -2363,16 +2364,29 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b
if (indexSettings.isRemoteStoreEnabled() && syncFromRemote) {
syncSegmentsFromRemoteSegmentStore(false);
}
if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
// so before that step, we are deleting the translog files present in remote store.
deleteTranslogFilesFromRemoteTranslog();

if (indexSettings.isRemoteTranslogStoreEnabled()) {
if (shardRouting.primary()) {
if (syncFromRemote) {
syncRemoteTranslogAndUpdateGlobalCheckpoint();
} else {
// we will enter this block when we do not want to recover from remote translog.
// currently only during snapshot restore, we are coming into this block.
// here, as while initiliazing remote translog we cannot skip downloading translog files,
// so before that step, we are deleting the translog files present in remote store.
deleteTranslogFilesFromRemoteTranslog();
}
} else if (syncFromRemote) {
final SegmentInfos lastCommittedSegmentInfos = store().readLastCommittedSegmentsInfo();
final String translogUUID = lastCommittedSegmentInfos.userData.get(TRANSLOG_UUID_KEY);
final long checkpoint = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
Translog.createEmptyTranslog(
shardPath().resolveTranslog(),
shardId(),
checkpoint,
getPendingPrimaryTerm(),
translogUUID,
FileChannel::open
);
}
}
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
if (actualUUID.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + actualUUID + " this translog file belongs to a different translog"
"expected shard UUID "
+ translogUUID
+ " but got: "
+ translogHeader.translogUUID
+ " this translog file belongs to a different translog"
);
}
return translogHeader;
Expand Down

0 comments on commit 4a01121

Please sign in to comment.