Skip to content

Commit

Permalink
Add support to create empty local translog if remote translog is empty
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Oct 23, 2023
1 parent 08ffd35 commit 3f1d4a4
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class RestoreRemoteStoreRequest extends ClusterManagerNodeRequest<Restore
private String[] indices = Strings.EMPTY_ARRAY;
private Boolean waitForCompletion = false;
private Boolean restoreAllShards = false;
private Boolean forceEmptyTranslog = false;

public RestoreRemoteStoreRequest() {}

Expand All @@ -45,6 +47,9 @@ public RestoreRemoteStoreRequest(StreamInput in) throws IOException {
indices = in.readStringArray();
waitForCompletion = in.readOptionalBoolean();
restoreAllShards = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceEmptyTranslog = in.readOptionalBoolean();
}
}

@Override
Expand All @@ -53,6 +58,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeOptionalBoolean(waitForCompletion);
out.writeOptionalBoolean(restoreAllShards);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(forceEmptyTranslog);
}
}

@Override
Expand Down Expand Up @@ -142,6 +150,27 @@ public boolean restoreAllShards() {
return restoreAllShards;
}

/**
* Set the value for forceEmptyTranslog, denoting whether to create empty translog if remote translog does not have any data.
*
* @param forceEmptyTranslog If true and if remote translog does not have any data, the operation will create empty translog on local
* If false, the operation will always try to fetch data from remote translog and will fail if remote translog is empty.
* @return this request
*/
public RestoreRemoteStoreRequest forceEmptyTranslog(boolean forceEmptyTranslog) {
this.forceEmptyTranslog = forceEmptyTranslog;
return this;
}

/**
* Returns forceEmptyTranslog setting
*
* @return true if the operation will create empty translog on local when remote translog is empty
*/
public boolean forceEmptyTranslog() {
return forceEmptyTranslog;
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -193,12 +222,13 @@ public boolean equals(Object o) {
RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o;
return waitForCompletion == that.waitForCompletion
&& restoreAllShards == that.restoreAllShards
&& forceEmptyTranslog == that.forceEmptyTranslog
&& Arrays.equals(indices, that.indices);
}

@Override
public int hashCode() {
int result = Objects.hash(waitForCompletion, restoreAllShards);
int result = Objects.hash(waitForCompletion, restoreAllShards, forceEmptyTranslog);
result = 31 * result + Arrays.hashCode(indices);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,28 @@ public static class RemoteStoreRecoverySource extends RecoverySource {
private final String restoreUUID;
private final IndexId index;
private final Version version;
private final boolean forceEmptyTranslog;

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) {
this(restoreUUID, version, indexId, false);
}

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId, boolean forceEmptyTranslog) {
this.restoreUUID = restoreUUID;
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.forceEmptyTranslog = forceEmptyTranslog;
}

RemoteStoreRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
version = in.readVersion();
index = new IndexId(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceEmptyTranslog = in.readBoolean();
} else {
forceEmptyTranslog = false;
}
}

public String restoreUUID() {
Expand All @@ -450,11 +461,18 @@ public Version version() {
return version;
}

public boolean forceEmptyTranslog() {
return forceEmptyTranslog;
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
out.writeVersion(version);
index.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(forceEmptyTranslog);
}
}

@Override
Expand All @@ -464,7 +482,10 @@ public Type getType() {

@Override
public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID);
builder.field("version", version.toString())
.field("index", index.getName())
.field("restoreUUID", restoreUUID)
.field("forceEmptyTranslog", forceEmptyTranslog);
}

@Override
Expand All @@ -482,12 +503,15 @@ public boolean equals(Object o) {
}

RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o;
return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID)
&& index.equals(that.index)
&& version.equals(that.version)
&& forceEmptyTranslog == that.forceEmptyTranslog;
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, index, version);
return Objects.hash(restoreUUID, index, version, forceEmptyTranslog);
}

// TODO: This override should be removed/be updated to return "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void start(
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ public void restore(RestoreRemoteStoreRequest request, final ActionListener<Rest

@Override
public ClusterState execute(ClusterState currentState) {
RemoteRestoreResult remoteRestoreResult = restore(currentState, null, request.restoreAllShards(), request.indices());
RemoteRestoreResult remoteRestoreResult = restore(
currentState,
null,
request.restoreAllShards(),
request.forceEmptyTranslog(),
request.indices()
);
restoreUUID = remoteRestoreResult.getRestoreUUID();
restoreInfo = remoteRestoreResult.getRestoreInfo();
return remoteRestoreResult.getClusterState();
Expand Down Expand Up @@ -135,6 +141,7 @@ public RemoteRestoreResult restore(
ClusterState currentState,
@Nullable String restoreClusterUUID,
boolean restoreAllShards,
boolean forceEmptyTranslog,
String[] indexNames
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
Expand Down Expand Up @@ -176,7 +183,7 @@ public RemoteRestoreResult restore(
}
}
}
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata, forceEmptyTranslog);
}

/**
Expand All @@ -190,7 +197,8 @@ private RemoteRestoreResult executeRestore(
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards,
Metadata remoteMetadata
Metadata remoteMetadata,
boolean forceEmptyTranslog
) {
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
Expand Down Expand Up @@ -228,7 +236,8 @@ private RemoteRestoreResult executeRestore(
RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
indexId,
forceEmptyTranslog
);

rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
Expand Down Expand Up @@ -538,16 +539,24 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncTranslogFilesFromRemoteTranslog();

Path location = indexShard.shardPath().resolveTranslog();

// On index creation, the only segment file that is created is segments_N. We can safely discard this file
// as there is no data associated with this shard as part of segments.
if (store.directory().listAll().length <= 1) {
Path location = indexShard.shardPath().resolveTranslog();
boolean remoteSegmentEmpty = store.directory().listAll().length <= 1;
boolean remoteTranslogEmpty = Files.exists(location.resolve(CHECKPOINT_FILE_NAME)) == false;

if (remoteSegmentEmpty && remoteTranslogEmpty == false) {
Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration()));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel);
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion, translogHeader.getTranslogUUID());
}
} else if (remoteSegmentEmpty == false && remoteTranslogEmpty) {
if (((RecoverySource.RemoteStoreRecoverySource) indexShard.shardRouting.recoverySource()).forceEmptyTranslog()) {
bootstrap(indexShard, store);
}
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ public void testGatewayForRemoteState() throws IOException {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid");
RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenReturn(
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), anyBoolean(), any())).thenReturn(
RemoteRestoreResult.build("test-cluster-uuid", null, ClusterState.EMPTY_STATE)
);
gateway = new MockGatewayMetaState(localNode, bigArrays, remoteClusterStateService, remoteStoreRestoreService);
Expand Down Expand Up @@ -832,7 +832,7 @@ public void testGatewayForRemoteStateForInitialBootstrap() throws IOException {
when(remoteClusterStateService.getLastKnownUUIDFromRemote(clusterName.value())).thenReturn(ClusterState.UNKNOWN_UUID);

final RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenReturn(
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), anyBoolean(), any())).thenReturn(
RemoteRestoreResult.build("test-cluster-uuid", null, ClusterState.EMPTY_STATE)
);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
Expand Down Expand Up @@ -879,7 +879,7 @@ public void testGatewayForRemoteStateForNodeReplacement() throws IOException {
);

final RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenReturn(
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), anyBoolean(), any())).thenReturn(
RemoteRestoreResult.build("test-cluster-uuid", null, previousState)
);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
Expand All @@ -893,7 +893,7 @@ public void testGatewayForRemoteStateForNodeReplacement() throws IOException {
final CoordinationState.PersistedState lucenePersistedState = gateway.getPersistedState();
PersistedState remotePersistedState = persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE);
verify(remoteClusterStateService).getLastKnownUUIDFromRemote(Mockito.any());
verify(remoteStoreRestoreService).restore(any(), any(), anyBoolean(), any());
verify(remoteStoreRestoreService).restore(any(), any(), anyBoolean(), anyBoolean(), any());
assertThat(remotePersistedState.getLastAcceptedState(), nullValue());
assertThat(lucenePersistedState.getLastAcceptedState().metadata(), equalTo(previousState.metadata()));
} finally {
Expand Down Expand Up @@ -969,7 +969,7 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I
).nodes(DiscoveryNodes.EMPTY_NODES).build();

final RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenReturn(
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), anyBoolean(), any())).thenReturn(
RemoteRestoreResult.build("test-cluster-uuid", null, clusterState)
);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
Expand All @@ -983,7 +983,13 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I
PersistedState remotePersistedState = persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE);
PersistedState lucenePersistedState = persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL);
verify(remoteClusterStateService).getLastKnownUUIDFromRemote(clusterName.value()); // change this
verify(remoteStoreRestoreService).restore(any(ClusterState.class), any(String.class), anyBoolean(), any(String[].class));
verify(remoteStoreRestoreService).restore(
any(ClusterState.class),
any(String.class),
anyBoolean(),
anyBoolean(),
any(String[].class)
);
assertThat(remotePersistedState.getLastAcceptedState(), nullValue());
assertThat(
Metadata.isGlobalStateEquals(lucenePersistedState.getLastAcceptedState().metadata(), clusterState.metadata()),
Expand Down

0 comments on commit 3f1d4a4

Please sign in to comment.