Skip to content

Commit

Permalink
Primary changes to support dual mode replication
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Mar 8, 2024
1 parent 9cb67fc commit c4fac3a
Show file tree
Hide file tree
Showing 20 changed files with 565 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreTestUtil;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.DummyShardLock;
import org.opensearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -714,7 +715,8 @@ public static final IndexShard newIndexShard(
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL,
nodeId,
null,
false
false,
BlobStoreTestUtil.mockClusterService()
);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,24 @@

package org.opensearch.remotemigration;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;

public class MigrationBaseTestCase extends OpenSearchIntegTestCase {
Expand All @@ -22,9 +34,16 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {

protected Path segmentRepoPath;
protected Path translogRepoPath;

boolean addRemote = false;

private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
Expand All @@ -47,4 +66,103 @@ protected Settings nodeSettings(int nodeOrdinal) {
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

public void initDocRepToRemoteMigration() {
assertTrue(internalCluster().client().admin().cluster().prepareUpdateSettings().setPersistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")
.put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")
).get().isAcknowledged());
}

public BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public class AsyncIndexingService {
private AtomicBoolean finished = new AtomicBoolean();
private AtomicInteger numAutoGenDocs = new AtomicInteger();
private Thread indexingThread;
private String indexName;

AsyncIndexingService(String indexName) {
this(indexName, Integer.MAX_VALUE);
}

AsyncIndexingService(String indexName, int maxDocs) {
indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < maxDocs) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
}

public void stopIndexing() throws InterruptedException {
finished.set(true);
indexingThread.join();
}

public int totalIndexedDocs() {
return numAutoGenDocs.get();
}

public void startIndexing() {
indexingThread.start();
}

public Thread getIndexingThread() {
return indexingThread;
}
}

public class SyncIndexingService {
private int maxDocs;
private int currentIndexedDocs;
private boolean forceStop;
private String indexName;

SyncIndexingService(String indexName) {
this(indexName, Integer.MAX_VALUE);
}

SyncIndexingService(String indexName, int maxDocs) {
this.indexName = indexName;
this.maxDocs = maxDocs;
this.forceStop = false;
}

public void forceStopIndexing() throws InterruptedException {
this.forceStop = true;
}

public int getCurrentIndexedDocs() {
return currentIndexedDocs;
}

public void startIndexing() {
while (currentIndexedDocs < maxDocs && forceStop == false) {
IndexResponse indexResponse = client().prepareIndex(indexName).setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete(indexName, "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex(indexName).setSource("auto", true).get();
currentIndexedDocs += 1;
logger.info("Indexed {} docs here", currentIndexedDocs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,10 @@

import com.carrotsearch.randomizedtesting.generators.RandomNumbers;

import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
Expand All @@ -28,8 +25,6 @@
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -61,20 +56,8 @@ public void testMixedModeRelocation() throws Exception {
client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get();
ensureGreen("test");

AtomicInteger numAutoGenDocs = new AtomicInteger();
final AtomicBoolean finished = new AtomicBoolean(false);
Thread indexingThread = new Thread(() -> {
while (finished.get() == false && numAutoGenDocs.get() < 100) {
IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get();
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
DeleteResponse deleteResponse = client().prepareDelete("test", "id").get();
assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
client().prepareIndex("test").setSource("auto", true).get();
numAutoGenDocs.incrementAndGet();
logger.info("Indexed {} docs here", numAutoGenDocs.get());
}
});
indexingThread.start();
AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test", 100);
asyncIndexingService.startIndexing();

refresh("test");

Expand Down Expand Up @@ -126,16 +109,15 @@ public void testMixedModeRelocation() throws Exception {
.execute()
.actionGet();
}
finished.set(true);
indexingThread.join();
asyncIndexingService.stopIndexing();
refresh("test");
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get());
OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), asyncIndexingService.totalIndexedDocs());
OpenSearchAssertions.assertHitCount(
client().prepareSearch("test")
.setTrackTotalHits(true)// extra paranoia ;)
.setQuery(QueryBuilders.termQuery("auto", true))
.get(),
numAutoGenDocs.get()
asyncIndexingService.totalIndexedDocs()
);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ protected long primaryOperationSize(BulkShardRequest request) {

@Override
public ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) {
return ReplicationMode.PRIMARY_TERM_VALIDATION;
}
return super.getReplicationMode(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.support.replication;

import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.core.action.ActionListener;

Expand Down Expand Up @@ -60,16 +61,21 @@ protected void performOnReplicaProxy(

@Override
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {

// If the current routing is the primary, then it does not need to be replicated
if (shardRouting.isSameAllocation(primaryRouting)) {
return ReplicationMode.NO_REPLICATION;
}

// Perform full replication during primary failover
if (primaryRouting.relocating() && shardRouting.isSameAllocation(primaryRouting.getTargetRelocatingShard())) {
return ReplicationMode.FULL_REPLICATION;
}

/*
Perform full replication if replica is hosted on a non-remote node.
Only applicable during remote migration
*/
if (shardRouting.isAssignedToRemoteStoreNode() == false) {
return ReplicationMode.FULL_REPLICATION;
}
return replicationModeOverride;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void performOn(
* @return the overridden replication mode.
*/
public ReplicationMode getReplicationMode(IndexShard indexShard) {
if (indexShard.isRemoteTranslogEnabled()) {
if (indexShard.isRemoteTranslogEnabled() || indexShard.routingEntry().isAssignedToRemoteStoreNode()) {
return ReplicationMode.NO_REPLICATION;
}
return ReplicationMode.FULL_REPLICATION;
Expand Down Expand Up @@ -642,7 +642,7 @@ public void handleException(TransportException exp) {
primaryRequest.getPrimaryTerm(),
initialRetryBackoffBound,
retryTimeout,
indexShard.isRemoteTranslogEnabled()
indexShard.isRemoteTranslogEnabled() || indexShard.indexSettings().isRemoteNode()
? new ReplicationModeAwareProxy<>(getReplicationMode(indexShard), replicasProxy, termValidationProxy)
: new FanoutReplicationProxy<>(replicasProxy)
).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class ShardRouting implements Writeable, ToXContentObject {
private final long expectedShardSize;
@Nullable
private final ShardRouting targetRelocatingShard;
private boolean assignedToRemoteStoreNode;

/**
* A constructor to internally create shard routing instances, note, the internal flag should only be set to true
Expand Down Expand Up @@ -878,4 +879,12 @@ public boolean unassignedReasonIndexCreated() {
}
return false;
}

public boolean isAssignedToRemoteStoreNode() {
return assignedToRemoteStoreNode;
}

public void setAssignedToRemoteStoreNode(boolean assignedToRemoteStoreNode) {
this.assignedToRemoteStoreNode = assignedToRemoteStoreNode;
}
}
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ public synchronized IndexShard createShard(
clusterRemoteTranslogBufferIntervalSupplier,
nodeEnv.nodeId(),
recoverySettings,
seedRemote
seedRemote,
clusterService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@

package org.opensearch.index.remote;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.node.remotestore.RemoteStoreNodeService;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

/**
* Utils for remote store
*
Expand Down Expand Up @@ -101,4 +108,17 @@ public static void verifyNoMultipleWriters(List<String> mdFiles, Function<String
});
}


/**
* Helper method to check the values for the following cluster settings:
* - `remote_store.compatibility_mode` (should be `mixed`)
* - `migration.direction` (should NOT be `none`)
* Used as a source of truth to confirm if a remote store migration is in progress
* @param clusterService Current clusterService ref to fetch cluster settings
*/
public static boolean isMigrationDirectionSet(ClusterService clusterService) {
RemoteStoreNodeService.Direction migrationDirection = clusterService.getClusterSettings().get(MIGRATION_DIRECTION_SETTING);
RemoteStoreNodeService.CompatibilityMode currentCompatiblityMode = clusterService.getClusterSettings().get(REMOTE_STORE_COMPATIBILITY_MODE_SETTING);
return currentCompatiblityMode.equals(RemoteStoreNodeService.CompatibilityMode.MIXED) == true && migrationDirection.equals(RemoteStoreNodeService.Direction.NONE) == false;
}
}
Loading

0 comments on commit c4fac3a

Please sign in to comment.