diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java new file mode 100644 index 0000000000000..66b26b5d25cfe --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationDisruptionIT.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportService; +import org.junit.Before; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; + +/** + * These tests simulate corruption cases during replication. They are skipped on WindowsFS simulation where file renaming + * can fail with an access denied IOException because deletion is not permitted. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +@LuceneTestCase.SuppressFileSystems("WindowsFS") +public class SegmentReplicationDisruptionIT extends SegmentReplicationBaseIT { + @Before + private void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + public void testSendCorruptBytesToReplica() throws Exception { + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + primaryTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) { + FileChunkRequest req = (FileChunkRequest) request; + TransportRequest corrupt = new FileChunkRequest( + req.recoveryId(), + ((FileChunkRequest) request).requestSeqNo(), + ((FileChunkRequest) request).shardId(), + ((FileChunkRequest) request).metadata(), + ((FileChunkRequest) request).position(), + new BytesArray("test"), + false, + 0, + 0L + ); + connection.sendRequest(requestId, action, corrupt, options); + latch.countDown(); + } else { + connection.sendRequest(requestId, action, request, options); + } + } + ); + for (int i = 0; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); + assertNotEquals(originalRecoveryTime, 0); + refresh(INDEX_NAME); + latch.await(); + assertTrue(failed.get()); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); + // reset checkIndex to ensure our original shard doesn't throw + resetCheckIndexStatus(); + waitForSearchableDocs(100, primaryNode, replicaNode); + } + + public void testWipeSegmentBetweenSyncs() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + ensureGreen(INDEX_NAME); + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); + + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode)); + indexShard.store().directory().deleteFile("_0.si"); + + for (int i = 11; i < 21; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); + resetCheckIndexStatus(); + waitForSearchableDocs(20, primaryNode, replicaNode); + } + + private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception { + assertBusy(() -> { + // assert we have a peer recovery after the original + final long time = getRecoveryStopTime(replicaNode); + assertNotEquals(time, 0); + assertNotEquals(originalRecoveryTime, time); + + }, 1, TimeUnit.MINUTES); + } + + private long getRecoveryStopTime(String nodeName) { + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); + for (RecoveryState recoveryState : recoveryStates) { + if (recoveryState.getTargetNode().getName().equals(nodeName)) { + return recoveryState.getTimer().stopTime(); + } + } + return 0L; + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 81556cc270151..f48df082a25dc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -24,7 +24,6 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; -import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -59,7 +58,6 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; @@ -73,7 +71,6 @@ import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; -import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; @@ -85,7 +82,6 @@ import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; -import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.Before; @@ -98,7 +94,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -1781,135 +1776,4 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { assertThat(response.getIndex(), equalTo(INDEX_NAME)); } - - public void testSendCorruptBytesToReplica() throws Exception { - // this test stubs transport calls specific to node-node replication. - assumeFalse( - "Skipping the test as its not compatible with segment replication with remote store.", - segmentReplicationWithRemoteEnabled() - ); - final String primaryNode = internalCluster().startDataOnlyNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.refresh_interval", -1) - .build() - ); - ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startDataOnlyNode(); - ensureGreen(INDEX_NAME); - - MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance( - TransportService.class, - primaryNode - )); - CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean failed = new AtomicBoolean(false); - primaryTransportService.addSendBehavior( - internalCluster().getInstance(TransportService.class, replicaNode), - (connection, requestId, action, request, options) -> { - if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) { - FileChunkRequest req = (FileChunkRequest) request; - logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk()); - TransportRequest corrupt = new FileChunkRequest( - req.recoveryId(), - ((FileChunkRequest) request).requestSeqNo(), - ((FileChunkRequest) request).shardId(), - ((FileChunkRequest) request).metadata(), - ((FileChunkRequest) request).position(), - new BytesArray("test"), - false, - 0, - 0L - ); - connection.sendRequest(requestId, action, corrupt, options); - latch.countDown(); - } else { - connection.sendRequest(requestId, action, request, options); - } - } - ); - for (int i = 0; i < 100; i++) { - client().prepareIndex(INDEX_NAME) - .setId(String.valueOf(i)) - .setSource(jsonBuilder().startObject().field("field", i).endObject()) - .get(); - } - final long originalRecoveryTime = getRecoveryStopTime(replicaNode); - assertNotEquals(originalRecoveryTime, 0); - refresh(INDEX_NAME); - latch.await(); - assertTrue(failed.get()); - waitForNewPeerRecovery(replicaNode, originalRecoveryTime); - // reset checkIndex to ensure our original shard doesn't throw - resetCheckIndexStatus(); - waitForSearchableDocs(100, primaryNode, replicaNode); - } - - public void testWipeSegmentBetweenSyncs() throws Exception { - internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(); - createIndex( - INDEX_NAME, - Settings.builder() - .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.refresh_interval", -1) - .build() - ); - ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startDataOnlyNode(); - ensureGreen(INDEX_NAME); - - for (int i = 0; i < 10; i++) { - client().prepareIndex(INDEX_NAME) - .setId(String.valueOf(i)) - .setSource(jsonBuilder().startObject().field("field", i).endObject()) - .get(); - } - refresh(INDEX_NAME); - ensureGreen(INDEX_NAME); - final long originalRecoveryTime = getRecoveryStopTime(replicaNode); - - final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); - waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode)); - indexShard.store().directory().deleteFile("_0.si"); - - for (int i = 11; i < 21; i++) { - client().prepareIndex(INDEX_NAME) - .setId(String.valueOf(i)) - .setSource(jsonBuilder().startObject().field("field", i).endObject()) - .get(); - } - refresh(INDEX_NAME); - waitForNewPeerRecovery(replicaNode, originalRecoveryTime); - resetCheckIndexStatus(); - waitForSearchableDocs(20, primaryNode, replicaNode); - } - - private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception { - assertBusy(() -> { - // assert we have a peer recovery after the original - final long time = getRecoveryStopTime(replicaNode); - assertNotEquals(time, 0); - assertNotEquals(originalRecoveryTime, time); - - }, 1, TimeUnit.MINUTES); - } - - private long getRecoveryStopTime(String nodeName) { - final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get(); - final List recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); - logger.info("Recovery states {}", recoveryResponse); - for (RecoveryState recoveryState : recoveryStates) { - if (recoveryState.getTargetNode().getName().equals(nodeName)) { - return recoveryState.getTimer().stopTime(); - } - } - return 0L; - } }