diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java index 2c6db6ae19a9a..98586b60dcc69 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBackpressureAndResiliencyIT.java @@ -11,6 +11,7 @@ import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.action.admin.indices.flush.FlushResponse; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractAsyncTask; @@ -228,4 +229,32 @@ public void testSkipLoadGlobalCheckpointToReplicationTracker() { client().admin().cluster().prepareReroute().setRetryFailed(true).get(); ensureGreen(INDEX_NAME); } + + public void testFlushDuringRemoteUploadFailures() { + Path location = randomRepoPath().toAbsolutePath(); + String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE); + + logger.info("--> Indexing data"); + indexData(randomIntBetween(1, 2), true); + logger.info("--> Indexing succeeded"); + ensureGreen(INDEX_NAME); + + MockRepository translogRepo = (MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName) + .repository(TRANSLOG_REPOSITORY_NAME); + logger.info("--> Failing all remote store interaction"); + translogRepo.setRandomControlIOExceptionRate(1d); + + Exception ex = assertThrows(UncategorizedExecutionException.class, () -> indexSingleDoc()); + assertEquals("Failed execution", ex.getMessage()); + + FlushResponse flushResponse = client().admin().indices().prepareFlush(INDEX_NAME).setForce(true).execute().actionGet(); + assertEquals(1, flushResponse.getFailedShards()); + ensureGreen(INDEX_NAME); + + logger.info("--> Stop failing all remote store interactions"); + translogRepo.setRandomControlIOExceptionRate(0d); + flushResponse = client().admin().indices().prepareFlush(INDEX_NAME).setForce(true).execute().actionGet(); + assertEquals(1, flushResponse.getSuccessfulShards()); + assertEquals(0, flushResponse.getFailedShards()); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 90c40a27cae8f..8189399cee911 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1480,7 +1480,7 @@ public void trimTranslog() { /** * Rolls the tranlog generation and cleans unneeded. */ - public void rollTranslogGeneration() { + public void rollTranslogGeneration() throws IOException { final Engine engine = getEngine(); engine.rollTranslogGeneration(); } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 8c6bff591c1ec..c5c22c2b02365 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -18,6 +18,7 @@ import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.translog.listener.TranslogEventListener; +import org.opensearch.index.translog.transfer.TranslogUploadFailedException; import java.io.Closeable; import java.io.IOException; @@ -87,11 +88,14 @@ public InternalTranslogManager( * Rolls the translog generation and cleans unneeded. */ @Override - public void rollTranslogGeneration() throws TranslogException { + public void rollTranslogGeneration() throws TranslogException, IOException { try (ReleasableLock ignored = readLock.acquire()) { engineLifeCycleAware.ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); + } catch (TranslogUploadFailedException e) { + // Do not trigger the translogEventListener as it fails the Engine while this is only an issue with remote upload + throw e; } catch (AlreadyClosedException e) { translogEventListener.onFailure("translog roll generation failed", e); throw e; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index 303e84dc2b228..4568a3e3d578a 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -21,7 +21,7 @@ public interface TranslogManager { /** * Rolls the translog generation and cleans unneeded. */ - void rollTranslogGeneration() throws TranslogException; + void rollTranslogGeneration() throws TranslogException, IOException; /** * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive). diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index d988b8a6254ff..ece6f6d5a534f 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -176,7 +176,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L); remoteTranslogTransferTracker.addUploadBytesFailed(metadataBytesToUpload); // outer catch handles capturing stats on upload failure - throw exception; + throw new TranslogUploadFailedException("Failed to upload " + tlogMetadata.getName(), exception); } remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L); @@ -185,7 +185,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { - Exception ex = new IOException("Failed to upload " + exceptionList.size() + " files during transfer"); + Exception ex = new TranslogUploadFailedException("Failed to upload " + exceptionList.size() + " files during transfer"); exceptionList.forEach(ex::addSuppressed); throw ex; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java new file mode 100644 index 0000000000000..4a9b10ec5a52e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import java.io.IOException; + +/** + * Exception is thrown if there are any exceptions while uploading translog to remote store. + * @opensearch.internal + */ +public class TranslogUploadFailedException extends IOException { + + public TranslogUploadFailedException(String message) { + super(message); + } + + public TranslogUploadFailedException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index a03b8ac25a18c..293d3a18d120a 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -7286,7 +7286,11 @@ public void testMaxSeqNoInCommitUserData() throws Exception { engine.ensureOpen(); while (running.get() && assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration() < 500) { - engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower + try { + engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower + } catch (IOException e) { + fail("io exception not expected"); + } } }); rollTranslog.start(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java index b2310010620f7..42e0df2dc90c1 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java @@ -47,6 +47,7 @@ import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.index.translog.transfer.TranslogUploadFailedException; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -1113,7 +1114,7 @@ public void testSyncUpAlwaysFailure() throws IOException { try { translog.sync(); fail("io exception expected"); - } catch (IOException e) { + } catch (TranslogUploadFailedException e) { assertTrue("at least one operation pending", translog.syncNeeded()); } }