From 31dfd2bc45ed6f9f7ea78bfb3b0ec3780fb7fa88 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 26 Nov 2023 15:52:43 +0530 Subject: [PATCH] Add IT for multiple writer validation Signed-off-by: Ashish Singh --- .../RemoteStoreBaseIntegTestCase.java | 16 +++ .../opensearch/remotestore/RemoteStoreIT.java | 119 +++++++++++++++--- 2 files changed, 116 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 8b4981a15433a..0c68b11ebff56 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -8,6 +8,8 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -23,9 +25,13 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.index.Index; import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -43,6 +49,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -380,4 +387,13 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { return filesExisting.get(); } + + protected IndexShard getIndexShard(String dataNode, String indexName) throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + String uuid = getIndexResponse.getSettings().get(indexName).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); + return indexService.getShard(0); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index b3b4f8e10fd31..983d4e336d2eb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -8,30 +8,34 @@ package org.opensearch.remotestore; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexRequest; -import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; -import org.opensearch.core.index.Index; -import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.index.translog.Translog.Durability; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.PeerRecoveryTargetService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.hamcrest.MatcherAssert; import java.nio.file.Path; @@ -40,6 +44,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -222,7 +227,7 @@ public void testDefaultBufferInterval() throws ExecutionException, InterruptedEx ensureGreen(INDEX_NAME); assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); @@ -255,7 +260,7 @@ public void testOverriddenBufferInterval() throws ExecutionException, Interrupte ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); assertBufferInterval(bufferInterval, indexShard); @@ -371,7 +376,7 @@ private void testRestrictSettingFalse(boolean setRestrictFalse, Durability durab .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), durability) .build(); createIndex(INDEX_NAME, indexSettings); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(durability, indexShard.indexSettings().getTranslogDurability()); durability = randomFrom(Durability.values()); @@ -404,7 +409,7 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E // Case 2 - Test update index fails createIndex(INDEX_NAME); - IndexShard indexShard = getIndexShard(dataNode); + IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME); assertEquals(Durability.REQUEST, indexShard.indexSettings().getTranslogDurability()); exception = assertThrows( IllegalArgumentException.class, @@ -416,15 +421,6 @@ public void testAsyncDurabilityThrowsExceptionWhenRestrictSettingTrue() throws E assertEquals(expectedExceptionMsg, exception.getMessage()); } - private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException { - String clusterManagerName = internalCluster().getClusterManagerName(); - IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); - GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); - String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID); - IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid)); - return indexService.getShard(0); - } - private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval()); @@ -516,7 +512,7 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); ensureGreen(INDEX_NAME); - IndexShard indexShard = getIndexShard(primaryShardNode); + IndexShard indexShard = getIndexShard(primaryShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0); @@ -529,7 +525,92 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte ensureGreen(INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); - indexShard = getIndexShard(replicaShardNode); + indexShard = getIndexShard(replicaShardNode, INDEX_NAME); assertFalse(indexShard.isSearchIdleSupported()); } + + public void testNoMultipleWriterDuringPrimaryRelocation() throws ExecutionException, InterruptedException { + // In this test, we trigger a force flush on existing primary while the primary mode on new primary has been + // activated. There was a bug in primary relocation of remote store enabled indexes where the new primary + // starts uploading translog and segments even before the cluster manager has started this shard. With this test, + // we check that we do not overwrite any file on remote store. Here we will also increase the replica count to + // check that there are no duplicate metadata files for translog or upload. + + internalCluster().startClusterManagerOnlyNode(); + String oldPrimary = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexBulk(INDEX_NAME, randomIntBetween(5, 10)); + String newPrimary = internalCluster().startDataOnlyNodes(1).get(0); + ensureStableCluster(3); + + IndexShard oldPrimaryIndexShard = getIndexShard(oldPrimary, INDEX_NAME); + CountDownLatch flushLatch = new CountDownLatch(1); + + MockTransportService mockTargetTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + oldPrimary + )); + mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT.equals(action)) { + flushLatch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + + logger.info("--> relocate the shard"); + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(INDEX_NAME, 0, oldPrimary, newPrimary)) + .execute() + .actionGet(); + + CountDownLatch flushDone = new CountDownLatch(1); + Thread flushThread = new Thread(() -> { + try { + flushLatch.await(2, TimeUnit.SECONDS); + oldPrimaryIndexShard.flush(new FlushRequest().waitIfOngoing(true).force(true)); + // newPrimaryTranslogRepo.setSleepSeconds(0); + } catch (IndexShardClosedException e) { + // this is fine + } catch (InterruptedException e) { + throw new AssertionError(e); + } finally { + flushDone.countDown(); + } + }); + flushThread.start(); + flushDone.await(5, TimeUnit.SECONDS); + flushThread.join(); + + ClusterHealthResponse clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + + client().admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ) + .get(); + + clusterHealthResponse = client().admin() + .cluster() + .prepareHealth() + .setWaitForStatus(ClusterHealthStatus.GREEN) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(TimeValue.timeValueSeconds(5)) + .execute() + .actionGet(); + assertFalse(clusterHealthResponse.isTimedOut()); + } }