Skip to content

Commit

Permalink
[Remote Store] Sync segments in refresh listener on refresh after com…
Browse files Browse the repository at this point in the history
…mit (opensearch-project#10830)

* [Remote Store] Sync segments in refresh listener on refresh after commit

Signed-off-by: Ashish Singh <[email protected]>

* Add Integration Tests

Signed-off-by: Ashish Singh <[email protected]>

* Add comments and java doc

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
ashking94 authored and shiv0408 committed Apr 25, 2024
1 parent ab6fd5a commit aa50acd
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -23,15 +25,20 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void setup() {
internalCluster().startNodes(3);
}

public void testStatsResponseFromAllNodes() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() {
}

public void testStatsResponseAllShards() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() {
}

public void testStatsResponseFromLocalNode() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() {
}

public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}

public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Index documents
Expand Down Expand Up @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() {
}

public void testStatsOnShardUnassigned() throws IOException {
setup();
// Scenario:
// - Create index with single primary and two replica shard
// - Index documents
Expand All @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException {
}

public void testStatsOnRemoteStoreRestore() throws IOException {
setup();
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
Expand Down Expand Up @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
}

public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
setup();
// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}, 5, TimeUnit.SECONDS);
}

public void testStatsCorrectnessOnFailover() {
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(nodeSettings(0))
.build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startDataOnlyNodes(2, clusterSettings);

// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);

// Index some docs and refresh
indexDocs();
refresh(INDEX_NAME);

String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();
ensureStableCluster(2, clusterManagerNode);

RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
logger.info("Test completed");
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() {
);
}

@Override
public String toString() {
return "RemoteTranslogTransferStats{"
+ "lastSuccessfulUploadTimestamp="
+ lastSuccessfulUploadTimestamp.get()
+ ","
+ "totalUploadsStarted="
+ totalUploadsStarted.get()
+ ","
+ "totalUploadsSucceeded="
+ totalUploadsSucceeded.get()
+ ","
+ "totalUploadsFailed="
+ totalUploadsFailed.get()
+ ","
+ "uploadBytesStarted="
+ uploadBytesStarted.get()
+ ","
+ "uploadBytesFailed="
+ uploadBytesFailed.get()
+ ","
+ "totalUploadTimeInMillis="
+ totalUploadTimeInMillis.get()
+ ","
+ "uploadBytesMovingAverage="
+ uploadBytesMovingAverageReference.get().getAverage()
+ ","
+ "uploadBytesPerSecMovingAverage="
+ uploadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "uploadTimeMovingAverage="
+ uploadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "lastSuccessfulDownloadTimestamp="
+ lastSuccessfulDownloadTimestamp.get()
+ ","
+ "totalDownloadsSucceeded="
+ totalDownloadsSucceeded.get()
+ ","
+ "downloadBytesSucceeded="
+ downloadBytesSucceeded.get()
+ ","
+ "totalDownloadTimeInMillis="
+ totalDownloadTimeInMillis.get()
+ ","
+ "downloadBytesMovingAverage="
+ downloadBytesMovingAverageReference.get().getAverage()
+ ","
+ "downloadBytesPerSecMovingAverage="
+ downloadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "downloadTimeMovingAverage="
+ downloadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "}";
}

/**
* Represents the tracker's state as seen in the stats API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down Expand Up @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
logger.trace(
"syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}",
syncSegmentSuccess,
(System.currentTimeMillis() - startTimeMs)
);
store.decRef();
remoteStore.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,13 @@ public void beforeRefresh() throws IOException {}

@Override
protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
if (shouldSync(didRefresh)) {
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
// ready state.
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
initializeRemoteDirectoryOnTermUpdate();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand Down Expand Up @@ -160,20 +159,20 @@ protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
}

private boolean shouldSync(boolean didRefresh) {
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
// is important to upload the zero state segments so that the restore does not break.
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
// If the readers change, didRefresh is always true.
|| didRefresh
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
// reference, but it is important to upload the zero state segments so that the restore does not break.
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
}

private boolean syncSegments() {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.debug(
"Skipped syncing segments with primaryMode={} indexShardState={}",
indexShard.getReplicationTracker().isPrimaryMode(),
indexShard.state()
);
if (isReadyForUpload() == false) {
// Following check is required to enable retry and make sure that we do not lose this refresh event
// When primary shard is restored from remote store, the recovery happens first followed by changing
// primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through
Expand Down Expand Up @@ -323,6 +322,19 @@ private boolean isRefreshAfterCommit() throws IOException {
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

/**
* Returns if the current refresh has happened after a commit.
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
*/
private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);
}
return false;
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
Expand Down Expand Up @@ -439,6 +451,48 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB
}
}

/**
* On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that
* has been uploaded to remote store successfully. This method also updates the segment tracker about the latest
* uploaded segment files onto remote store.
*/
private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();

// During failover, the uploaded metadata would have names of files that have been uploaded to remote store.
// Here we update the tracker with latest remote uploaded files.
if (uploadedMetadata != null) {
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
}
}
}

/**
* This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the
* returned value of this method for scheduling retries in syncSegments method.
* @return true iff primaryMode is true and index shard is not in closed state.
*/
private boolean isReadyForUpload() {
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
if (indexShard.getReplicationTracker() != null) {
sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode());
}
if (indexShard.state() != null) {
sb.append(" indexShardState=").append(indexShard.state());
}
if (indexShard.getEngineOrNull() != null) {
sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName());
}
logger.trace(sb.toString());
}
return isReady;
}

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
Expand Down
Loading

0 comments on commit aa50acd

Please sign in to comment.