Skip to content

Commit

Permalink
Update checkpoint from remote nodes replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed May 30, 2024
1 parent d26cd46 commit 48325e3
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,11 @@ private Thread getIndexingThread() {
indexSingleDoc(indexName);
long currentDocCount = indexedDocs.incrementAndGet();
if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
if (rarely()) {
logger.info("--> [iteration {}] flushing index", currentDocCount);
client().admin().indices().prepareFlush(indexName).get();
} else {
logger.info("--> [iteration {}] refreshing index", currentDocCount);
client().admin().indices().prepareRefresh(indexName).get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,11 @@

package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
Expand Down Expand Up @@ -66,8 +63,8 @@ public void testRemotePrimaryRelocation() throws Exception {

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);

AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
asyncIndexingService.startIndexing();
refresh("test");

// add remote node in mixed mode cluster
Expand Down Expand Up @@ -141,17 +138,19 @@ public void testRemotePrimaryRelocation() throws Exception {
logger.info("--> relocation from remote to remote complete");

finished.set(true);
indexingThread.join();
asyncIndexingService.stopIndexing();
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test").setTrackTotalHits(true).get(),
asyncIndexingService.getIndexedDocs()
);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
numAutoGenDocs.get()
asyncIndexingService.getIndexedDocs()
);

}

public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
Expand All @@ -165,9 +164,8 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getIndexingThread(finished, numAutoGenDocs);
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
asyncIndexingService.startIndexing();

refresh("test");

Expand Down Expand Up @@ -209,27 +207,11 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception {
assertEquals(actionGet.getRelocatingShards(), 0);
assertEquals(docRepNode, primaryNodeName("test"));

finished.set(true);
indexingThread.join();
asyncIndexingService.stopIndexing();
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null))
.get();
}

private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 10_000) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
}
});
indexingThread.start();
return indexingThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,27 @@

package org.opensearch.remotemigration;

import com.carrotsearch.randomizedtesting.generators.RandomNumbers;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)

public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase {

protected int maximumNumberOfShards() {
Expand Down Expand Up @@ -63,10 +58,8 @@ public void testReplicaRecovery() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
String replicaNode = internalCluster().startNode();
ensureGreen("test");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = getThread(finished, numAutoGenDocs);
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test");
asyncIndexingService.startIndexing();

refresh("test");

Expand All @@ -78,12 +71,10 @@ public void testReplicaRecovery() throws Exception {
updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

String remoteNode2 = internalCluster().startNode();
internalCluster().startNode();
internalCluster().validateClusterFormed();

// identify the primary

Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode);
client().admin()
.cluster()
Expand All @@ -102,7 +93,6 @@ public void testReplicaRecovery() throws Exception {

assertEquals(0, clusterHealthResponse.getRelocatingShards());
logger.info("--> relocation of primary from docrep to remote complete");
Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));

logger.info("--> getting up the new replicas now to doc rep node as well as remote node ");
// Increase replica count to 3
Expand All @@ -129,52 +119,33 @@ public void testReplicaRecovery() throws Exception {
logger.info("--> replica is up now on another docrep now as well as remote node");

assertEquals(0, clusterHealthResponse.getRelocatingShards());
asyncIndexingService.stopIndexing();
refresh("test");

Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000));
// segrep lag should be zero
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats("test")
.setDetailed(true)
.execute()
.actionGet();
SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0);
assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1);
perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0));
}, 20, TimeUnit.SECONDS);

// Stop replicas on docrep now.
// ToDo : Remove once we have dual replication enabled
client().admin()
.indices()
.updateSettings(
new UpdateSettingsRequest("test").settings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode)
.build()
)
)
.get();

finished.set(true);
indexingThread.join();
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test").setTrackTotalHits(true).get(),
asyncIndexingService.getIndexedDocs()
);
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
// .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2))
.get(),
numAutoGenDocs.get()
asyncIndexingService.getIndexedDocs()
);

}

private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) {
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();
return indexingThread;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,6 @@ public boolean shouldSeedRemoteStore() {
public Function<String, Boolean> isShardOnRemoteEnabledNode = nodeId -> {
DiscoveryNode node = discoveryNodes.get(nodeId);
if (node != null) {
logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode());
return node.isRemoteStoreNode();
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai
protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) {
// Update replication checkpoint on source via transport call only supported for remote store integration. For node-
// node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call
if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) {
if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) {
return;
}
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn
// make sure the store is not released until we are done.
this.cancellableThreads = new CancellableThreads();
store.incRef();
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
indexShard.remoteStore().incRef();
}
}
Expand Down Expand Up @@ -284,7 +284,7 @@ protected void closeInternal() {
try {
store.decRef();
} finally {
if (indexShard.indexSettings().isRemoteStoreEnabled()) {
if (indexShard.indexSettings().isAssignedOnRemoteNode()) {
indexShard.remoteStore().decRef();
}
}
Expand Down

0 comments on commit 48325e3

Please sign in to comment.