Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to create empty local translog if remote translog is empty #10854

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -234,6 +236,77 @@ public void testRestoreFlowNoRedIndex() throws Exception {
verifyRestoredData(indexStats, INDEX_NAME);
}

public void testRestoreFlowWithForceEmptyTranslogNoOp() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 0, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 3), randomBoolean(), INDEX_NAME);

assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS));

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

// Do not delete data from remote translog. If data is present in remote translog, forceEmptyTranslog is ignored

logger.info("--> Restore with forceEmptyTranslog, should have no effect as remote translog has data");
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true).forceEmptyTranslog(true),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)),
30,
TimeUnit.SECONDS
);
}

public void testRestoreFlowWithForceEmptyTranslog() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 0, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 3), randomBoolean(), INDEX_NAME);

assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS));

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

// Delete data from remote translog so that forceEmptyTranslog can take effect
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(translogRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(translogRepoPath), indexUUID, "/0/translog/data");
IOUtils.rm(remoteTranslogDataPath, remoteTranslogMetadataPath);

logger.info("--> Restore with forceEmptyTranslog, should turn the index green");
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true).forceEmptyTranslog(true),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS)),
30,
TimeUnit.SECONDS
);
}

/**
* Simulates refreshed data restored using Remote Segment Store
* and unrefreshed data restored using Remote Translog Store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest<Restore
private String[] indices = Strings.EMPTY_ARRAY;
private Boolean waitForCompletion = false;
private Boolean restoreAllShards = false;
private Boolean forceEmptyTranslog = false;

public RestoreRemoteStoreRequest() {}

Expand All @@ -45,6 +47,9 @@ public RestoreRemoteStoreRequest(StreamInput in) throws IOException {
indices = in.readStringArray();
waitForCompletion = in.readOptionalBoolean();
restoreAllShards = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceEmptyTranslog = in.readOptionalBoolean();
}
}

@Override
Expand All @@ -53,6 +58,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeOptionalBoolean(waitForCompletion);
out.writeOptionalBoolean(restoreAllShards);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(forceEmptyTranslog);
}
}

@Override
Expand Down Expand Up @@ -142,6 +150,27 @@ public boolean restoreAllShards() {
return restoreAllShards;
}

/**
* Set the value for forceEmptyTranslog, denoting whether to create empty translog if remote translog does not have any data.
*
* @param forceEmptyTranslog If true and if remote translog does not have any data, the operation will create empty translog on local
* If false, the operation will always try to fetch data from remote translog and will fail if remote translog is empty.
* @return this request
*/
public RestoreRemoteStoreRequest forceEmptyTranslog(boolean forceEmptyTranslog) {
this.forceEmptyTranslog = forceEmptyTranslog;
return this;
}

/**
* Returns forceEmptyTranslog setting
*
* @return true if the operation will create empty translog on local when remote translog is empty
*/
public boolean forceEmptyTranslog() {
return forceEmptyTranslog;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -193,12 +222,13 @@ public boolean equals(Object o) {
RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o;
return waitForCompletion == that.waitForCompletion
&& restoreAllShards == that.restoreAllShards
&& forceEmptyTranslog == that.forceEmptyTranslog
&& Arrays.equals(indices, that.indices);
}

@Override
public int hashCode() {
int result = Objects.hash(waitForCompletion, restoreAllShards);
int result = Objects.hash(waitForCompletion, restoreAllShards, forceEmptyTranslog);
result = 31 * result + Arrays.hashCode(indices);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,28 @@ public static class RemoteStoreRecoverySource extends RecoverySource {
private final String restoreUUID;
private final IndexId index;
private final Version version;
private final boolean forceEmptyTranslog;

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) {
this(restoreUUID, version, indexId, false);
}

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId, boolean forceEmptyTranslog) {
this.restoreUUID = restoreUUID;
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.forceEmptyTranslog = forceEmptyTranslog;
}

RemoteStoreRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
version = in.readVersion();
index = new IndexId(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceEmptyTranslog = in.readBoolean();
} else {
forceEmptyTranslog = false;
}
}

public String restoreUUID() {
Expand All @@ -450,11 +461,18 @@ public Version version() {
return version;
}

public boolean forceEmptyTranslog() {
return forceEmptyTranslog;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
out.writeVersion(version);
index.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(forceEmptyTranslog);
}
}

@Override
Expand All @@ -464,7 +482,10 @@ public Type getType() {

@Override
public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID);
builder.field("version", version.toString())
.field("index", index.getName())
.field("restoreUUID", restoreUUID)
.field("forceEmptyTranslog", forceEmptyTranslog);
}

@Override
Expand All @@ -482,12 +503,15 @@ public boolean equals(Object o) {
}

RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o;
return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID)
&& index.equals(that.index)
&& version.equals(that.version)
&& forceEmptyTranslog == that.forceEmptyTranslog;
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, index, version);
return Objects.hash(restoreUUID, index, version, forceEmptyTranslog);
}

// TODO: This override should be removed/be updated to return "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void start(
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ public void restore(RestoreRemoteStoreRequest request, final ActionListener<Rest

@Override
public ClusterState execute(ClusterState currentState) {
RemoteRestoreResult remoteRestoreResult = restore(currentState, null, request.restoreAllShards(), request.indices());
RemoteRestoreResult remoteRestoreResult = restore(
currentState,
null,
request.restoreAllShards(),
request.forceEmptyTranslog(),
request.indices()
);
restoreUUID = remoteRestoreResult.getRestoreUUID();
restoreInfo = remoteRestoreResult.getRestoreInfo();
return remoteRestoreResult.getClusterState();
Expand Down Expand Up @@ -135,6 +141,7 @@ public RemoteRestoreResult restore(
ClusterState currentState,
@Nullable String restoreClusterUUID,
boolean restoreAllShards,
boolean forceEmptyTranslog,
String[] indexNames
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
Expand Down Expand Up @@ -176,7 +183,7 @@ public RemoteRestoreResult restore(
}
}
}
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata, forceEmptyTranslog);
}

/**
Expand All @@ -190,7 +197,8 @@ private RemoteRestoreResult executeRestore(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards,
Metadata remoteMetadata
Metadata remoteMetadata,
boolean forceEmptyTranslog
) {
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
Expand Down Expand Up @@ -228,7 +236,8 @@ private RemoteRestoreResult executeRestore(
RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
indexId,
forceEmptyTranslog
);

rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
Expand Down Expand Up @@ -406,7 +407,7 @@ void recoverFromSnapshotAndRemoteStore(
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
bootstrap(indexShard, store);
} else {
bootstrapForSnapshot(indexShard, store);
bootstrapFromLastCommit(indexShard, store);
}
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
Expand Down Expand Up @@ -538,16 +539,24 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncTranslogFilesFromRemoteTranslog();

Path location = indexShard.shardPath().resolveTranslog();

// On index creation, the only segment file that is created is segments_N. We can safely discard this file
// as there is no data associated with this shard as part of segments.
if (store.directory().listAll().length <= 1) {
Path location = indexShard.shardPath().resolveTranslog();
boolean remoteSegmentEmpty = store.directory().listAll().length <= 1;
boolean remoteTranslogEmpty = Files.exists(location.resolve(CHECKPOINT_FILE_NAME)) == false;

if (remoteSegmentEmpty && remoteTranslogEmpty == false) {
Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration()));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel);
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion, translogHeader.getTranslogUUID());
}
} else if (remoteSegmentEmpty == false && remoteTranslogEmpty) {
if (((RecoverySource.RemoteStoreRecoverySource) indexShard.shardRouting.recoverySource()).forceEmptyTranslog()) {
bootstrapFromLastCommit(indexShard, store);
}
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
Expand Down Expand Up @@ -686,7 +695,7 @@ private void restore(
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
bootstrap(indexShard, store);
} else {
bootstrapForSnapshot(indexShard, store);
bootstrapFromLastCommit(indexShard, store);
}
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
Expand Down Expand Up @@ -738,7 +747,7 @@ private void restore(
}
}

private void bootstrapForSnapshot(final IndexShard indexShard, final Store store) throws IOException {
private void bootstrapFromLastCommit(final IndexShard indexShard, final Store store) throws IOException {
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
);
restoreRemoteStoreRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
restoreRemoteStoreRequest.restoreAllShards(request.paramAsBoolean("restore_all_shards", false));
restoreRemoteStoreRequest.forceEmptyTranslog(request.paramAsBoolean("force_empty_translog", false));
request.applyContentParser(p -> restoreRemoteStoreRequest.source(p.mapOrdered()));
return channel -> client.admin().cluster().restoreRemoteStore(restoreRemoteStoreRequest, new RestToXContentListener<>(channel));
}
Expand Down
Loading
Loading