Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to create empty local translog if remote translog is empty #10854

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand All @@ -36,7 +38,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRestoreIT extends BaseRemoteStoreRestoreIT {

/**
Expand Down Expand Up @@ -234,6 +236,77 @@ public void testRestoreFlowNoRedIndex() throws Exception {
verifyRestoredData(indexStats, INDEX_NAME);
}

public void testRestoreFlowWithForceAllocateNoOp() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 0, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 3), randomBoolean(), INDEX_NAME);

assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS));

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

// Do not delete data from remote translog. If data is present in remote translog, forceEmptyTranslog is ignored

logger.info("--> Restore with forceEmptyTranslog, should have no effect as remote translog has data");
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true).forceAllocate(true),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)),
30,
TimeUnit.SECONDS
);
}

public void testRestoreFlowWithForceAllocate() throws Exception {
prepareCluster(1, 3, INDEX_NAME, 0, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 3), randomBoolean(), INDEX_NAME);

assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS));

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

// Delete data from remote translog so that forceEmptyTranslog can take effect
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(translogRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(translogRepoPath), indexUUID, "/0/translog/data");
IOUtils.rm(remoteTranslogDataPath, remoteTranslogMetadataPath);

logger.info("--> Restore with forceEmptyTranslog, should turn the index green");
assertAcked(client().admin().indices().prepareClose(INDEX_NAME));
client().admin()
.cluster()
.restoreRemoteStore(
new RestoreRemoteStoreRequest().indices(INDEX_NAME).restoreAllShards(true).forceAllocate(true),
PlainActionFuture.newFuture()
);
ensureGreen(INDEX_NAME);

ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS)),
30,
TimeUnit.SECONDS
);
}

/**
* Simulates refreshed data restored using Remote Segment Store
* and unrefreshed data restored using Remote Translog Store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.action.admin.cluster.remotestore.restore;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
Expand Down Expand Up @@ -37,6 +38,7 @@
private String[] indices = Strings.EMPTY_ARRAY;
private Boolean waitForCompletion = false;
private Boolean restoreAllShards = false;
private Boolean forceAllocate = false;

public RestoreRemoteStoreRequest() {}

Expand All @@ -45,6 +47,9 @@
indices = in.readStringArray();
waitForCompletion = in.readOptionalBoolean();
restoreAllShards = in.readOptionalBoolean();
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceAllocate = in.readOptionalBoolean();
}
}

@Override
Expand All @@ -53,6 +58,9 @@
out.writeStringArray(indices);
out.writeOptionalBoolean(waitForCompletion);
out.writeOptionalBoolean(restoreAllShards);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalBoolean(forceAllocate);
}
}

@Override
Expand Down Expand Up @@ -142,6 +150,27 @@
return restoreAllShards;
}

/**
* Set the value for forceAllocate, denoting whether to create empty store/translog if remote segment store/translog does not have any data.
*
* @param forceAllocate If true and if remote segment store/translog does not have any data, the operation will create empty store/translog on local
* If false, the operation will always try to fetch data from remote segment store/translog and will fail if either one is empty.
* @return this request
*/
public RestoreRemoteStoreRequest forceAllocate(boolean forceAllocate) {
this.forceAllocate = forceAllocate;
return this;

Check warning on line 162 in server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java#L161-L162

Added lines #L161 - L162 were not covered by tests
}

/**
* Returns forceAllocate setting
*
* @return true if the operation will create empty store/translog on local when remote segment store/translog is empty
*/
public boolean forceAllocate() {
return forceAllocate;

Check warning on line 171 in server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/remotestore/restore/RestoreRemoteStoreRequest.java#L171

Added line #L171 was not covered by tests
}

/**
* Parses restore definition
*
Expand Down Expand Up @@ -193,12 +222,13 @@
RestoreRemoteStoreRequest that = (RestoreRemoteStoreRequest) o;
return waitForCompletion == that.waitForCompletion
&& restoreAllShards == that.restoreAllShards
&& forceAllocate == that.forceAllocate
&& Arrays.equals(indices, that.indices);
}

@Override
public int hashCode() {
int result = Objects.hash(waitForCompletion, restoreAllShards);
int result = Objects.hash(waitForCompletion, restoreAllShards, forceAllocate);
result = 31 * result + Arrays.hashCode(indices);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,28 @@
private final String restoreUUID;
private final IndexId index;
private final Version version;
private final boolean forceAllocate;

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId) {
this(restoreUUID, version, indexId, false);
}

public RemoteStoreRecoverySource(String restoreUUID, Version version, IndexId indexId, boolean forceAllocate) {
this.restoreUUID = restoreUUID;
this.version = Objects.requireNonNull(version);
this.index = Objects.requireNonNull(indexId);
this.forceAllocate = forceAllocate;
}

RemoteStoreRecoverySource(StreamInput in) throws IOException {
restoreUUID = in.readString();
version = in.readVersion();
index = new IndexId(in);
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
forceAllocate = in.readBoolean();
} else {
forceAllocate = false;

Check warning on line 442 in server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java#L442

Added line #L442 was not covered by tests
}
}

public String restoreUUID() {
Expand All @@ -450,11 +461,18 @@
return version;
}

public boolean forceAllocate() {
return forceAllocate;

Check warning on line 465 in server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java#L465

Added line #L465 was not covered by tests
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
out.writeString(restoreUUID);
out.writeVersion(version);
index.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeBoolean(forceAllocate);
}
}

@Override
Expand All @@ -464,7 +482,10 @@

@Override
public void addAdditionalFields(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.field("version", version.toString()).field("index", index.getName()).field("restoreUUID", restoreUUID);
builder.field("version", version.toString())
.field("index", index.getName())
.field("restoreUUID", restoreUUID)
.field("forceAllocate", forceAllocate);

Check warning on line 488 in server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java#L485-L488

Added lines #L485 - L488 were not covered by tests
}

@Override
Expand All @@ -482,12 +503,15 @@
}

RemoteStoreRecoverySource that = (RemoteStoreRecoverySource) o;
return restoreUUID.equals(that.restoreUUID) && index.equals(that.index) && version.equals(that.version);
return restoreUUID.equals(that.restoreUUID)
&& index.equals(that.index)
&& version.equals(that.version)
&& forceAllocate == that.forceAllocate;
}

@Override
public int hashCode() {
return Objects.hash(restoreUUID, index, version);
return Objects.hash(restoreUUID, index, version, forceAllocate);

Check warning on line 514 in server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java#L514

Added line #L514 was not covered by tests
}

// TODO: This override should be removed/be updated to return "true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void start(
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@

@Override
public ClusterState execute(ClusterState currentState) {
RemoteRestoreResult remoteRestoreResult = restore(currentState, null, request.restoreAllShards(), request.indices());
RemoteRestoreResult remoteRestoreResult = restore(

Check warning on line 102 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L102

Added line #L102 was not covered by tests
currentState,
null,
request.restoreAllShards(),
request.forceAllocate(),
request.indices()

Check warning on line 107 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L105-L107

Added lines #L105 - L107 were not covered by tests
);
restoreUUID = remoteRestoreResult.getRestoreUUID();
restoreInfo = remoteRestoreResult.getRestoreInfo();
return remoteRestoreResult.getClusterState();
Expand Down Expand Up @@ -135,6 +141,7 @@
ClusterState currentState,
@Nullable String restoreClusterUUID,
boolean restoreAllShards,
boolean forceAllocate,
String[] indexNames
) {
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap = new HashMap<>();
Expand Down Expand Up @@ -176,7 +183,7 @@
}
}
}
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata);
return executeRestore(currentState, indexMetadataMap, restoreAllShards, remoteMetadata, forceAllocate);

Check warning on line 186 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L186

Added line #L186 was not covered by tests
}

/**
Expand All @@ -190,7 +197,8 @@
ClusterState currentState,
Map<String, Tuple<Boolean, IndexMetadata>> indexMetadataMap,
boolean restoreAllShards,
Metadata remoteMetadata
Metadata remoteMetadata,
boolean forceAllocate
) {
final String restoreUUID = UUIDs.randomBase64UUID();
List<String> indicesToBeRestored = new ArrayList<>();
Expand Down Expand Up @@ -228,7 +236,8 @@
RecoverySource.RemoteStoreRecoverySource recoverySource = new RecoverySource.RemoteStoreRecoverySource(
restoreUUID,
updatedIndexMetadata.getCreationVersion(),
indexId
indexId,
forceAllocate
);

rtBuilder.addAsRemoteStoreRestore(updatedIndexMetadata, recoverySource, indexShardRoutingTableMap, restoreAllShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
Expand Down Expand Up @@ -406,7 +407,7 @@
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
bootstrap(indexShard, store);
} else {
bootstrapForSnapshot(indexShard, store);
bootstrapFromLastCommit(indexShard, store);

Check warning on line 410 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L410

Added line #L410 was not covered by tests
}
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
Expand Down Expand Up @@ -538,16 +539,24 @@
indexShard.syncSegmentsFromRemoteSegmentStore(true);
indexShard.syncTranslogFilesFromRemoteTranslog();

Path location = indexShard.shardPath().resolveTranslog();

// On index creation, the only segment file that is created is segments_N. We can safely discard this file
// as there is no data associated with this shard as part of segments.
if (store.directory().listAll().length <= 1) {
Path location = indexShard.shardPath().resolveTranslog();
boolean remoteSegmentEmpty = store.directory().listAll().length <= 1;
boolean remoteTranslogEmpty = Files.exists(location.resolve(CHECKPOINT_FILE_NAME)) == false;

if (remoteSegmentEmpty && remoteTranslogEmpty == false) {
Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration()));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel);
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion, translogHeader.getTranslogUUID());
}
} else if (remoteSegmentEmpty == false && remoteTranslogEmpty) {
if (((RecoverySource.RemoteStoreRecoverySource) indexShard.shardRouting.recoverySource()).forceAllocate()) {
bootstrapFromLastCommit(indexShard, store);

Check warning on line 558 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L558

Added line #L558 was not covered by tests
}
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
Expand Down Expand Up @@ -686,7 +695,7 @@
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled() == false) {
bootstrap(indexShard, store);
} else {
bootstrapForSnapshot(indexShard, store);
bootstrapFromLastCommit(indexShard, store);

Check warning on line 698 in server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java#L698

Added line #L698 was not covered by tests
}
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
writeEmptyRetentionLeasesFile(indexShard);
Expand Down Expand Up @@ -738,7 +747,7 @@
}
}

private void bootstrapForSnapshot(final IndexShard indexShard, final Store store) throws IOException {
private void bootstrapFromLastCommit(final IndexShard indexShard, final Store store) throws IOException {
store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
);
restoreRemoteStoreRequest.waitForCompletion(request.paramAsBoolean("wait_for_completion", false));
restoreRemoteStoreRequest.restoreAllShards(request.paramAsBoolean("restore_all_shards", false));
restoreRemoteStoreRequest.forceAllocate(request.paramAsBoolean("force_allocate", false));

Check warning on line 48 in server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestRestoreRemoteStoreAction.java#L48

Added line #L48 was not covered by tests
request.applyContentParser(p -> restoreRemoteStoreRequest.source(p.mapOrdered()));
return channel -> client.admin().cluster().restoreRemoteStore(restoreRemoteStoreRequest, new RestToXContentListener<>(channel));
}
Expand Down
Loading
Loading