From dd46b25c1159f40871dd8d0de773874f9994f3f7 Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sun, 4 Feb 2024 12:04:01 +0800 Subject: [PATCH] onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed Signed-off-by: kkewwei --- .../action/search/AbstractSearchAsyncAction.java | 9 ++++++++- .../action/search/AbstractSearchAsyncActionTests.java | 6 +++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 9e1d065c96dd6..b5005088f432d 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -286,6 +286,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator Runnable r = () -> { final Thread thread = Thread.currentThread(); try { + final SearchPhase phase = this; executePhaseOnShard(shardIt, shard, new SearchActionListener(shard, shardIndex) { @Override public void innerOnResponse(Result result) { @@ -299,7 +300,13 @@ public void innerOnResponse(Result result) { @Override public void onFailure(Exception t) { try { - onShardFailure(shardIndex, shard, shardIt, t); + // It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception. + if (totalOps.get() == expectedTotalOps) { + onPhaseFailure(phase, "The phase has failed", t); + } else { + onShardFailure(shardIndex, shard, shardIt, t); + } + } finally { executeNext(pendingExecutions, thread); } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 601aa9dc1856e..4ac49634c9968 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -221,7 +221,11 @@ protected void executePhaseOnShard( if (failExecutePhaseOnShard) { listener.onFailure(new ShardNotFoundException(shardIt.shardId())); } else { - listener.onResponse(new QuerySearchResult()); + try { + listener.onResponse(new QuerySearchResult()); + } catch (Exception e) { + listener.onFailure(e); + } } }