Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Sync segments in refresh listener on refresh after commit #10830

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -23,15 +25,20 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void setup() {
internalCluster().startNodes(3);
}

public void testStatsResponseFromAllNodes() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() {
}

public void testStatsResponseAllShards() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() {
}

public void testStatsResponseFromLocalNode() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() {
}

public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}

public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Index documents
Expand Down Expand Up @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() {
}

public void testStatsOnShardUnassigned() throws IOException {
setup();
// Scenario:
// - Create index with single primary and two replica shard
// - Index documents
Expand All @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException {
}

public void testStatsOnRemoteStoreRestore() throws IOException {
setup();
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
Expand Down Expand Up @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
}

public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
setup();
// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}, 5, TimeUnit.SECONDS);
}

public void testStatsCorrectnessOnFailover() {
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(nodeSettings(0))
.build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startDataOnlyNodes(2, clusterSettings);

// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);

// Index some docs and refresh
indexDocs();
refresh(INDEX_NAME);

String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();
ensureStableCluster(2, clusterManagerNode);

RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
logger.info("Test completed");
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,63 @@ public RemoteTranslogTransferTracker.Stats stats() {
);
}

@Override
public String toString() {
return "RemoteTranslogTransferStats{"
+ "lastSuccessfulUploadTimestamp="
+ lastSuccessfulUploadTimestamp.get()
+ ","
+ "totalUploadsStarted="
+ totalUploadsStarted.get()
+ ","
+ "totalUploadsSucceeded="
+ totalUploadsSucceeded.get()
+ ","
+ "totalUploadsFailed="
+ totalUploadsFailed.get()
+ ","
+ "uploadBytesStarted="
+ uploadBytesStarted.get()
+ ","
+ "uploadBytesFailed="
+ uploadBytesFailed.get()
+ ","
+ "totalUploadTimeInMillis="
+ totalUploadTimeInMillis.get()
+ ","
+ "uploadBytesMovingAverage="
+ uploadBytesMovingAverageReference.get().getAverage()
+ ","
+ "uploadBytesPerSecMovingAverage="
+ uploadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "uploadTimeMovingAverage="
+ uploadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "lastSuccessfulDownloadTimestamp="
+ lastSuccessfulDownloadTimestamp.get()
+ ","
+ "totalDownloadsSucceeded="
+ totalDownloadsSucceeded.get()
+ ","
+ "downloadBytesSucceeded="
+ downloadBytesSucceeded.get()
+ ","
+ "totalDownloadTimeInMillis="
+ totalDownloadTimeInMillis.get()
+ ","
+ "downloadBytesMovingAverage="
+ downloadBytesMovingAverageReference.get().getAverage()
+ ","
+ "downloadBytesPerSecMovingAverage="
+ downloadBytesPerSecMovingAverageReference.get().getAverage()
+ ","
+ "downloadTimeMovingAverage="
+ downloadTimeMsMovingAverageReference.get().getAverage()
+ ","
+ "}";
}

/**
* Represents the tracker's state as seen in the stats API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4774,6 +4774,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
* @throws IOException if exception occurs while reading segments from remote store.
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
boolean syncSegmentSuccess = false;
long startTimeMs = System.currentTimeMillis();
assert indexSettings.isRemoteStoreEnabled();
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
Expand Down Expand Up @@ -4823,9 +4825,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
: "There should not be any segments file in the dir";
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
syncSegmentSuccess = true;
} catch (IOException e) {
throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e);
} finally {
logger.trace(
"syncSegmentsFromRemoteSegmentStore success={} elapsedTime={}",
syncSegmentSuccess,
(System.currentTimeMillis() - startTimeMs)
);
store.decRef();
remoteStore.decRef();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,13 @@

@Override
protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
if (shouldSync(didRefresh)) {
// We have 2 separate methods to check if sync needs to be done or not. This is required since we use the return boolean
// from isReadyForUpload to schedule refresh retries as the index shard or the primary mode are not in complete
// ready state.
if (shouldSync(didRefresh) && isReadyForUpload()) {
segmentTracker.updateLocalRefreshTimeAndSeqNo();
try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.debug("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
}
initializeRemoteDirectoryOnTermUpdate();
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
Collection<String> localSegmentsPostRefresh = segmentInfosGatedCloseable.get().files(true);
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Expand Down Expand Up @@ -160,20 +159,20 @@
}

private boolean shouldSync(boolean didRefresh) {
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader reference, but it
// is important to upload the zero state segments so that the restore does not break.
return this.primaryTerm != indexShard.getOperationPrimaryTerm()
// If the readers change, didRefresh is always true.
|| didRefresh
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty();
// The third condition exists for uploading the zero state segments where the refresh has not changed the reader
// reference, but it is important to upload the zero state segments so that the restore does not break.
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()
// When the shouldSync is called the first time, then 1st condition on primary term is true. But after that
// we update the primary term and the same condition would not evaluate to true again in syncSegments.
// Below check ensures that if there is commit, then that gets picked up by both 1st and 2nd shouldSync call.
|| isRefreshAfterCommitSafe();
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean syncSegments() {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.debug(
"Skipped syncing segments with primaryMode={} indexShardState={}",
indexShard.getReplicationTracker().isPrimaryMode(),
indexShard.state()
);
if (isReadyForUpload() == false) {
// Following check is required to enable retry and make sure that we do not lose this refresh event
// When primary shard is restored from remote store, the recovery happens first followed by changing
// primaryMode to true. Due to this, the refresh that is triggered post replay of translog will not go through
Expand Down Expand Up @@ -323,6 +322,19 @@
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

/**
* Returns if the current refresh has happened after a commit.
* @return true if this refresh has happened on account of a commit. If otherwise or exception, returns false.
*/
private boolean isRefreshAfterCommitSafe() {
try {
return isRefreshAfterCommit();
} catch (Exception e) {
logger.info("Exception occurred in isRefreshAfterCommitSafe", e);

Check warning on line 333 in server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L332-L333

Added lines #L332 - L333 were not covered by tests
}
return false;

Check warning on line 335 in server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L335

Added line #L335 was not covered by tests
}

void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos segmentInfos, ReplicationCheckpoint replicationCheckpoint)
throws IOException {
final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint();
Expand Down Expand Up @@ -439,6 +451,48 @@
}
}

/**
* On primary term update, we (re)initialise the remote segment directory to reflect the latest metadata file that
* has been uploaded to remote store successfully. This method also updates the segment tracker about the latest
* uploaded segment files onto remote store.
*/
private void initializeRemoteDirectoryOnTermUpdate() throws IOException {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
logger.trace("primaryTerm update from={} to={}", primaryTerm, indexShard.getOperationPrimaryTerm());
this.primaryTerm = indexShard.getOperationPrimaryTerm();
RemoteSegmentMetadata uploadedMetadata = this.remoteDirectory.init();

// During failover, the uploaded metadata would have names of files that have been uploaded to remote store.
// Here we update the tracker with latest remote uploaded files.
if (uploadedMetadata != null) {
segmentTracker.setLatestUploadedFiles(uploadedMetadata.getMetadata().keySet());
}
}
}

/**
* This checks for readiness of the index shard and primary mode. This has separated from shouldSync since we use the
* returned value of this method for scheduling retries in syncSegments method.
* @return true iff primaryMode is true and index shard is not in closed state.
*/
private boolean isReadyForUpload() {
boolean isReady = indexShard.getReplicationTracker().isPrimaryMode() && indexShard.state() != IndexShardState.CLOSED;
if (isReady == false) {
StringBuilder sb = new StringBuilder("Skipped syncing segments with");
if (indexShard.getReplicationTracker() != null) {
sb.append(" primaryMode=").append(indexShard.getReplicationTracker().isPrimaryMode());
}
if (indexShard.state() != null) {
sb.append(" indexShardState=").append(indexShard.state());
}
if (indexShard.getEngineOrNull() != null) {
sb.append(" engineType=").append(indexShard.getEngine().getClass().getSimpleName());
}
logger.trace(sb.toString());
}
return isReady;
}

/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
Expand Down
Loading
Loading