diff --git a/CHANGELOG.md b/CHANGELOG.md index f32e0ef6cac4a..0cf852a3a2b50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix remove ingest processor handing ignore_missing parameter not correctly ([10089](https://github.com/opensearch-project/OpenSearch/pull/10089)) - Fix registration and initialization of multiple extensions ([10256](https://github.com/opensearch-project/OpenSearch/pull/10256)) - Fix circular dependency in Settings initialization ([10194](https://github.com/opensearch-project/OpenSearch/pull/10194)) +- Fix Segment Replication ShardLockObtainFailedException bug during index corruption ([10370](https://github.com/opensearch-project/OpenSearch/pull/10370)) ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 8e68a8bde39d5..1d93eecd6b245 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -197,9 +197,10 @@ protected IndexShard getIndexShard(String node, ShardId shardId, String indexNam protected IndexShard getIndexShard(String node, String indexName) { final Index index = resolveIndex(indexName); IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); - IndexService indexService = indicesService.indexServiceSafe(index); + IndexService indexService = indicesService.indexService(index); + assertNotNull(indexService); final Optional shardId = indexService.shardIds().stream().findFirst(); - return indexService.getShard(shardId.get()); + return shardId.map(indexService::getShard).orElse(null); } protected boolean segmentReplicationWithRemoteEnabled() { diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 33bc5a8f3afe6..81556cc270151 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -24,6 +24,7 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -58,6 +59,7 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; @@ -71,6 +73,7 @@ import org.opensearch.index.engine.NRTReplicationReaderManager; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; @@ -82,6 +85,7 @@ import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportService; import org.junit.Before; @@ -94,6 +98,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static java.util.Arrays.asList; @@ -1777,4 +1782,134 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException { } + public void testSendCorruptBytesToReplica() throws Exception { + // this test stubs transport calls specific to node-node replication. + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store.", + segmentReplicationWithRemoteEnabled() + ); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + primaryNode + )); + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + primaryTransportService.addSendBehavior( + internalCluster().getInstance(TransportService.class, replicaNode), + (connection, requestId, action, request, options) -> { + if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) { + FileChunkRequest req = (FileChunkRequest) request; + logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk()); + TransportRequest corrupt = new FileChunkRequest( + req.recoveryId(), + ((FileChunkRequest) request).requestSeqNo(), + ((FileChunkRequest) request).shardId(), + ((FileChunkRequest) request).metadata(), + ((FileChunkRequest) request).position(), + new BytesArray("test"), + false, + 0, + 0L + ); + connection.sendRequest(requestId, action, corrupt, options); + latch.countDown(); + } else { + connection.sendRequest(requestId, action, request, options); + } + } + ); + for (int i = 0; i < 100; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); + assertNotEquals(originalRecoveryTime, 0); + refresh(INDEX_NAME); + latch.await(); + assertTrue(failed.get()); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); + // reset checkIndex to ensure our original shard doesn't throw + resetCheckIndexStatus(); + waitForSearchableDocs(100, primaryNode, replicaNode); + } + + public void testWipeSegmentBetweenSyncs() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); + createIndex( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.refresh_interval", -1) + .build() + ); + ensureYellow(INDEX_NAME); + final String replicaNode = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + for (int i = 0; i < 10; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + ensureGreen(INDEX_NAME); + final long originalRecoveryTime = getRecoveryStopTime(replicaNode); + + final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME); + waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode)); + indexShard.store().directory().deleteFile("_0.si"); + + for (int i = 11; i < 21; i++) { + client().prepareIndex(INDEX_NAME) + .setId(String.valueOf(i)) + .setSource(jsonBuilder().startObject().field("field", i).endObject()) + .get(); + } + refresh(INDEX_NAME); + waitForNewPeerRecovery(replicaNode, originalRecoveryTime); + resetCheckIndexStatus(); + waitForSearchableDocs(20, primaryNode, replicaNode); + } + + private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception { + assertBusy(() -> { + // assert we have a peer recovery after the original + final long time = getRecoveryStopTime(replicaNode); + assertNotEquals(time, 0); + assertNotEquals(originalRecoveryTime, time); + + }, 1, TimeUnit.MINUTES); + } + + private long getRecoveryStopTime(String nodeName) { + final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get(); + final List recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME); + logger.info("Recovery states {}", recoveryResponse); + for (RecoveryState recoveryState : recoveryStates) { + if (recoveryState.getTargetNode().getName().equals(nodeName)) { + return recoveryState.getTimer().stopTime(); + } + } + return 0L; + } } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index d5f4480ee02d4..0becd84b11915 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -416,6 +416,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { try { commitSegmentInfos(); } catch (IOException e) { + maybeFailEngine("flush", e); throw new FlushFailedEngineException(shardId, e); } finally { flushLock.unlock(); @@ -489,13 +490,29 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; latestSegmentInfos.changed(); } - commitSegmentInfos(latestSegmentInfos); - IOUtils.close(readerManager, translogManager, store::decRef); + try { + commitSegmentInfos(latestSegmentInfos); + } catch (IOException e) { + // mark the store corrupted unless we are closing as result of engine failure. + // in this case Engine#failShard will handle store corruption. + if (failEngineLock.isHeldByCurrentThread() == false && store.isMarkedCorrupted() == false) { + try { + store.markStoreCorrupted(e); + } catch (IOException ex) { + logger.warn("Unable to mark store corrupted", ex); + } + } + } + IOUtils.close(readerManager, translogManager); } catch (Exception e) { - logger.warn("failed to close engine", e); + logger.error("failed to close engine", e); } finally { - logger.debug("engine closed [{}]", reason); - closedLatch.countDown(); + try { + store.decRef(); + logger.debug("engine closed [{}]", reason); + } finally { + closedLatch.countDown(); + } } } } diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 285241ba89996..9f505121d8dfa 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -385,7 +385,13 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio */ public Map getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException { assert indexSettings.isSegRepEnabled(); - return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; + failIfCorrupted(); + try { + return loadMetadata(segmentInfos, directory, logger, true).fileMetadata; + } catch (NoSuchFileException | CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { + markStoreCorrupted(ex); + throw ex; + } } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 5ae480b7d63a4..0eb6ce36fa63d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -18,7 +18,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.opensearch.OpenSearchCorruptionException; -import org.opensearch.OpenSearchException; import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; import org.opensearch.common.lucene.Lucene; @@ -261,9 +260,7 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are - // broken. We have to clean up this shard entirely, remove all files and bubble it up to the - // source shard since this index might be broken there as well? The Source can handle this and checks - // its content on disk if possible. + // broken. We have to clean up this shard entirely, remove all files and bubble it up. try { try { store.removeCorruptionMarker(); @@ -279,14 +276,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) // In this case the shard is closed at some point while updating the reader. // This can happen when the engine is closed in a separate thread. logger.warn("Shard is already closed, closing replication"); - } catch (OpenSearchException ex) { + } catch (CancellableThreads.ExecutionCancelledException ex) { /* Ignore closed replication target as it can happen due to index shard closed event in a separate thread. In such scenario, ignore the exception */ - assert cancellableThreads.isCancelled() : "Replication target closed but segment replication not cancelled"; + assert cancellableThreads.isCancelled() : "Replication target cancelled but cancellable threads not cancelled"; } catch (Exception ex) { - throw new OpenSearchCorruptionException(ex); + throw new ReplicationFailedException(ex); } finally { if (store != null) { store.decRef(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index ffc4ab86661db..46095adfe96b4 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.support.ChannelActionListener; @@ -28,6 +29,7 @@ import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.store.Store; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.ForceSyncRequest; @@ -46,6 +48,7 @@ import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; +import java.io.IOException; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -522,7 +525,7 @@ public void onResponse(Void o) { @Override public void onFailure(Exception e) { logger.debug("Replication failed {}", target.description()); - if (e instanceof OpenSearchCorruptionException) { + if (isStoreCorrupt(target) || e instanceof CorruptIndexException || e instanceof OpenSearchCorruptionException) { onGoingReplications.fail(replicationId, new ReplicationFailedException("Store corruption during replication", e), true); return; } @@ -531,6 +534,27 @@ public void onFailure(Exception e) { }); } + private boolean isStoreCorrupt(SegmentReplicationTarget target) { + // ensure target is not already closed. In that case + // we can assume the store is not corrupt and that the replication + // event completed successfully. + if (target.refCount() > 0) { + final Store store = target.store(); + if (store.tryIncRef()) { + try { + return store.isMarkedCorrupted(); + } catch (IOException ex) { + logger.warn("Unable to determine if store is corrupt", ex); + return false; + } finally { + store.decRef(); + } + } + } + // store already closed. + return false; + } + private class FileChunkTransportRequestHandler implements TransportRequestHandler { // How many bytes we've copied since we last called RateLimiter.pause diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 805271b05988c..5604c0701e515 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -574,6 +574,50 @@ public void testDecrefToZeroRemovesFile() throws IOException { } } + public void testCommitOnCloseThrowsException_decRefStore() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS); + List operations = generateHistoryOnReplica( + randomIntBetween(1, 10), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + indexOperations(nrtEngine, operations); + // wipe the nrt directory initially so we can sync with primary. + cleanAndCopySegmentsFromPrimary(nrtEngine); + nrtEngineStore.directory().deleteFile("_0.si"); + assertEquals(2, nrtEngineStore.refCount()); + nrtEngine.close(); + assertEquals(1, nrtEngineStore.refCount()); + assertTrue(nrtEngineStore.isMarkedCorrupted()); + // store will throw when eventually closed, not handled here. + assertThrows(RuntimeException.class, nrtEngineStore::close); + } + + public void testFlushThrowsFlushFailedExceptionOnCorruption() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, INDEX_SETTINGS); + List operations = generateHistoryOnReplica( + randomIntBetween(1, 10), + randomBoolean(), + randomBoolean(), + randomBoolean() + ); + indexOperations(nrtEngine, operations); + // wipe the nrt directory initially so we can sync with primary. + cleanAndCopySegmentsFromPrimary(nrtEngine); + nrtEngineStore.directory().deleteFile("_0.si"); + assertThrows(FlushFailedEngineException.class, nrtEngine::flush); + assertTrue(nrtEngineStore.isMarkedCorrupted()); + // store will throw when eventually closed, not handled here. + assertThrows(RuntimeException.class, nrtEngineStore::close); + } + private void copySegments(Collection latestPrimaryFiles, Engine nrtEngine) throws IOException { final Store store = nrtEngine.store; final List replicaFiles = List.of(store.directory().listAll()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 6deccde1afa48..6fe8163f53fd5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; @@ -557,7 +558,7 @@ public void testForceSegmentSyncHandlerWithFailure() throws Exception { ).txGet(); }); Throwable nestedException = finalizeException.getCause().getCause(); - assertTrue(nestedException instanceof IOException); + assertNotNull(ExceptionsHelper.unwrap(finalizeException, IOException.class)); assertTrue(nestedException.getMessage().contains("dummy failure")); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 1192796566dcf..0fae8443c5ce5 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -249,7 +249,7 @@ public void onFailure(Exception e) { }); } - public void testFailure_finalizeReplication_IOException() throws IOException { + public void testFailure_finalizeReplication_NonCorruptionException() throws IOException { IOException exception = new IOException("dummy failure"); SegmentReplicationSource segrepSource = new TestReplicationSource() { @@ -288,6 +288,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + assertEquals(ReplicationFailedException.class, e.getClass()); assertEquals(exception, e.getCause()); segrepTarget.fail(new ReplicationFailedException(e), false); }