Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync translog to remote on primary activate (#10839) #10882

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -21,20 +22,31 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
Expand Down Expand Up @@ -359,6 +371,90 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {
assertDocsPresentInIndex(client, indexName1, numDocsInIndex1 + 2);
}

public void testRemoteRestoreIndexRestoredFromSnapshot() throws IOException, ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(2);

String indexName1 = "testindex1";
String snapshotRepoName = "test-restore-snapshot-repo";
String snapshotName1 = "test-restore-snapshot1";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepoName, "fs", getRepositorySettings(absolutePath1, true));

Settings indexSettings = getIndexSettings(1, 0).build();
createIndex(indexName1, indexSettings);

final int numDocsInIndex1 = randomIntBetween(20, 30);
indexDocuments(client(), indexName1, numDocsInIndex1);
flushAndRefresh(indexName1);
ensureGreen(indexName1);

logger.info("--> snapshot");
SnapshotInfo snapshotInfo1 = createSnapshot(snapshotRepoName, snapshotName1, new ArrayList<>(Arrays.asList(indexName1)));
assertThat(snapshotInfo1.successfulShards(), greaterThan(0));
assertThat(snapshotInfo1.successfulShards(), equalTo(snapshotInfo1.totalShards()));
assertThat(snapshotInfo1.state(), equalTo(SnapshotState.SUCCESS));

assertAcked(client().admin().indices().delete(new DeleteIndexRequest(indexName1)).get());
assertFalse(indexExists(indexName1));

RestoreSnapshotResponse restoreSnapshotResponse1 = client().admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepoName, snapshotName1)
.setWaitForCompletion(false)
.setIndices(indexName1)
.get();

assertEquals(restoreSnapshotResponse1.status(), RestStatus.ACCEPTED);
ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);

// Make sure remote translog is empty
String indexUUID = client().admin()
.indices()
.prepareGetSettings(indexName1)
.get()
.getSetting(indexName1, IndexMetadata.SETTING_INDEX_UUID);

Path remoteTranslogMetadataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/metadata");
Path remoteTranslogDataPath = Path.of(String.valueOf(remoteRepoPath), indexUUID, "/0/translog/data");

try (
Stream<Path> translogMetadata = Files.list(remoteTranslogMetadataPath);
Stream<Path> translogData = Files.list(remoteTranslogDataPath)
) {
assertTrue(translogData.count() > 0);
assertTrue(translogMetadata.count() > 0);
}

// Clear the local data before stopping the node. This will make sure that remote translog is empty.
IndexShard indexShard = getIndexShard(primaryNodeName(indexName1), indexName1);
try (Stream<Path> files = Files.list(indexShard.shardPath().resolveTranslog())) {
IOUtils.deleteFilesIgnoringExceptions(files.collect(Collectors.toList()));
}
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(indexName1)));

ensureRed(indexName1);

client().admin()
.cluster()
.restoreRemoteStore(new RestoreRemoteStoreRequest().indices(indexName1).restoreAllShards(false), PlainActionFuture.newFuture());

ensureGreen(indexName1);
assertDocsPresentInIndex(client(), indexName1, numDocsInIndex1);
}

protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexService(index);
assertNotNull(indexService);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return shardId.map(indexService::getShard).orElse(null);
}

public void testRestoreShallowCopySnapshotWithDifferentRepo() throws IOException {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
String primary = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,21 +589,23 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
.getRemoteStoreStats();
Arrays.stream(remoteStoreStats).forEach(statObject -> {
RemoteSegmentTransferTracker.Stats segmentStats = statObject.getSegmentStats();
RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
if (statObject.getShardRouting().primary()) {
assertTrue(
segmentStats.totalUploadsSucceeded == 1
&& segmentStats.totalUploadsStarted == segmentStats.totalUploadsSucceeded
&& segmentStats.totalUploadsFailed == 0
);
// On primary shard creation, we upload to remote translog post primary mode activation.
// This changes upload stats to non-zero for primary shard.
assertNonZeroTranslogUploadStatsNoFailures(translogStats);
} else {
assertTrue(
segmentStats.directoryFileTransferTrackerStats.transferredBytesStarted == 0
&& segmentStats.directoryFileTransferTrackerStats.transferredBytesSucceeded == 0
);
assertZeroTranslogUploadStats(translogStats);
}

RemoteTranslogTransferTracker.Stats translogStats = statObject.getTranslogStats();
assertZeroTranslogUploadStats(translogStats);
assertZeroTranslogDownloadStats(translogStats);
});
}, 5, TimeUnit.SECONDS);
Expand Down
19 changes: 16 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ public void updateShardState(
if (currentRouting.initializing() && currentRouting.isRelocationTarget() == false && newRouting.active()) {
// the cluster-manager started a recovering primary, activate primary mode.
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
ensurePeerRecoveryRetentionLeasesExist();
postActivatePrimaryMode();
}
} else {
assert currentRouting.primary() == false : "term is only increased as part of primary promotion";
Expand Down Expand Up @@ -713,8 +713,7 @@ public void updateShardState(
// are brought up to date.
checkpointPublisher.publish(this, getLatestReplicationCheckpoint());
}

ensurePeerRecoveryRetentionLeasesExist();
postActivatePrimaryMode();
/*
* If this shard was serving as a replica shard when another shard was promoted to primary then
* its Lucene index was reset during the primary term transition. In particular, the Lucene index
Expand Down Expand Up @@ -3404,6 +3403,20 @@ assert getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingE
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}
postActivatePrimaryMode();
}

private void postActivatePrimaryMode() {
if (indexSettings.isRemoteStoreEnabled()) {
// We make sure to upload translog (even if it does not contain any operations) to remote translog.
// This helps to get a consistent state in remote store where both remote segment store and remote
// translog contains data.
try {
getEngine().syncTranslog();
} catch (IOException e) {
logger.error("Failed to sync translog to remote from new primary", e);
}
}
ensurePeerRecoveryRetentionLeasesExist();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2683,6 +2683,7 @@ public void testRelocatedForRemoteTranslogBackedIndexWithAsyncDurability() throw
AllocationId.newRelocation(routing.allocationId())
);
IndexShardTestCase.updateRoutingEntry(indexShard, routing);
indexDoc(indexShard, "_doc", "0");
assertTrue(indexShard.isSyncNeeded());
try {
indexShard.relocated(routing.getTargetRelocatingShard().allocationId().getId(), primaryContext -> {}, () -> {});
Expand Down
Loading