From a77b784ea5eb786f74ce3be0b167e7e81cc5816c Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Tue, 10 Oct 2023 00:17:36 +0530 Subject: [PATCH] Fix shard failure on flush during upload failures for remote indexes Signed-off-by: Ashish Singh --- .../opensearch/OpenSearchServerException.java | 9 +++++ .../index/engine/InternalEngine.java | 4 +++ .../translog/InternalTranslogManager.java | 4 +++ .../transfer/TranslogTransferManager.java | 7 ++-- .../TranslogUploadFailedException.java | 34 +++++++++++++++++++ 5 files changed, 56 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java diff --git a/server/src/main/java/org/opensearch/OpenSearchServerException.java b/server/src/main/java/org/opensearch/OpenSearchServerException.java index 39c22e60f038a..fdef317c5a426 100644 --- a/server/src/main/java/org/opensearch/OpenSearchServerException.java +++ b/server/src/main/java/org/opensearch/OpenSearchServerException.java @@ -15,6 +15,7 @@ import static org.opensearch.OpenSearchException.OpenSearchExceptionHandleRegistry.registerExceptionHandle; import static org.opensearch.OpenSearchException.UNKNOWN_VERSION_ADDED; import static org.opensearch.Version.V_2_10_0; +import static org.opensearch.Version.V_2_11_0; import static org.opensearch.Version.V_2_1_0; import static org.opensearch.Version.V_2_4_0; import static org.opensearch.Version.V_2_5_0; @@ -1175,6 +1176,14 @@ public static void registerExceptions() { ) ); registerExceptionHandle(new OpenSearchExceptionHandle(CryptoRegistryException.class, CryptoRegistryException::new, 171, V_2_10_0)); + registerExceptionHandle( + new OpenSearchExceptionHandle( + org.opensearch.index.translog.transfer.TranslogUploadFailedException.class, + org.opensearch.index.translog.transfer.TranslogUploadFailedException::new, + 172, + V_2_11_0 + ) + ); registerExceptionHandle( new OpenSearchExceptionHandle( org.opensearch.cluster.block.IndexCreateBlockException.class, diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3eeceff2253c1..656abde8862b1 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -112,6 +112,7 @@ import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.listener.CompositeTranslogEventListener; import org.opensearch.index.translog.listener.TranslogEventListener; +import org.opensearch.index.translog.transfer.TranslogUploadFailedException; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.threadpool.ThreadPool; @@ -1888,6 +1889,9 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { } translogManager.trimUnreferencedReaders(); + } catch (TranslogUploadFailedException e) { + // Do not fail engine as this is due to translog upload failure + throw new FlushFailedEngineException(shardId, e); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 4bc9a711894b7..a201b8ba0e66a 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -18,6 +18,7 @@ import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.translog.listener.TranslogEventListener; +import org.opensearch.index.translog.transfer.TranslogUploadFailedException; import java.io.Closeable; import java.io.IOException; @@ -88,6 +89,9 @@ public void rollTranslogGeneration() throws TranslogException { engineLifeCycleAware.ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); + } catch (TranslogUploadFailedException e) { + // Do not trigger the translogEventListener as it fails the Engine while this is only an issue with remote upload + throw e; } catch (AlreadyClosedException e) { translogEventListener.onFailure("translog roll generation failed", e); throw e; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index d988b8a6254ff..3056ffbe2aa95 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -176,7 +176,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L); remoteTranslogTransferTracker.addUploadBytesFailed(metadataBytesToUpload); // outer catch handles capturing stats on upload failure - throw exception; + throw new TranslogUploadFailedException(shardId, "Failed to upload " + tlogMetadata.getName(), exception); } remoteTranslogTransferTracker.addUploadTimeInMillis((System.nanoTime() - metadataUploadStartTime) / 1_000_000L); @@ -185,7 +185,10 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { - Exception ex = new IOException("Failed to upload " + exceptionList.size() + " files during transfer"); + Exception ex = new TranslogUploadFailedException( + shardId, + "Failed to upload " + exceptionList.size() + " files during transfer" + ); exceptionList.forEach(ex::addSuppressed); throw ex; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java new file mode 100644 index 0000000000000..b5b205c2a81b6 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogUploadFailedException.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.translog.TranslogException; + +import java.io.IOException; + +/** + * Exception is thrown if there are any exceptions while uploading translog to remote store. + * @opensearch.internal + */ +public class TranslogUploadFailedException extends TranslogException { + + public TranslogUploadFailedException(ShardId shardId, String message) { + super(shardId, message); + } + + public TranslogUploadFailedException(ShardId shardId, String message, Throwable cause) { + super(shardId, message, cause); + } + + public TranslogUploadFailedException(StreamInput in) throws IOException { + super(in); + } +}