diff --git a/CHANGELOG.md b/CHANGELOG.md index fcd1db31f3dc1..a6d2488caa010 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -142,6 +142,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Warn about deprecated and ignored index.mapper.dynamic index setting ([#11193](https://github.com/opensearch-project/OpenSearch/pull/11193)) - Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499)) - Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531)) +- onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed ([#12158](https://github.com/opensearch-project/OpenSearch/pull/12158)) ### Security diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index 9e1d065c96dd6..3b99ea9cc1cd2 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -286,6 +286,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator Runnable r = () -> { final Thread thread = Thread.currentThread(); try { + final SearchPhase phase = this; executePhaseOnShard(shardIt, shard, new SearchActionListener(shard, shardIndex) { @Override public void innerOnResponse(Result result) { @@ -299,7 +300,12 @@ public void innerOnResponse(Result result) { @Override public void onFailure(Exception t) { try { - onShardFailure(shardIndex, shard, shardIt, t); + // It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception. + if (totalOps.get() == expectedTotalOps) { + onPhaseFailure(phase, "The phase has failed", t); + } else { + onShardFailure(shardIndex, shard, shardIt, t); + } } finally { executeNext(pendingExecutions, thread); } diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 601aa9dc1856e..3af7af114e96d 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -151,6 +151,7 @@ private AbstractSearchAsyncAction createAction( listener, controlled, false, + false, expected, new SearchShardIterator(null, null, Collections.emptyList(), null) ); @@ -162,6 +163,7 @@ private AbstractSearchAsyncAction createAction( ActionListener listener, final boolean controlled, final boolean failExecutePhaseOnShard, + final boolean catchExceptionWhenExecutePhaseOnShard, final AtomicLong expected, final SearchShardIterator... shards ) { @@ -221,7 +223,15 @@ protected void executePhaseOnShard( if (failExecutePhaseOnShard) { listener.onFailure(new ShardNotFoundException(shardIt.shardId())); } else { - listener.onResponse(new QuerySearchResult()); + if (catchExceptionWhenExecutePhaseOnShard) { + try { + listener.onResponse(new QuerySearchResult()); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + listener.onResponse(new QuerySearchResult()); + } } } @@ -509,6 +519,7 @@ public void onFailure(Exception e) { }, false, true, + false, new AtomicLong(), shards ); @@ -555,6 +566,7 @@ public void onFailure(Exception e) { }, false, false, + false, new AtomicLong(), shards ); @@ -570,7 +582,7 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } - public void testExecutePhaseOnShardFailure() throws InterruptedException { + private void innerTestExecutePhaseOnShardFailure(boolean catchExceptionWhenExecutePhaseOnShard) throws InterruptedException { final Index index = new Index("test", UUID.randomUUID().toString()); final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3)) @@ -606,6 +618,7 @@ public void onFailure(Exception e) { }, false, false, + catchExceptionWhenExecutePhaseOnShard, new AtomicLong(), shards ); @@ -621,6 +634,14 @@ public void onFailure(Exception e) { assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length)); } + public void testExecutePhaseOnShardFailure() throws InterruptedException { + innerTestExecutePhaseOnShardFailure(false); + } + + public void testExecutePhaseOnShardFailureAndThrowException() throws InterruptedException { + innerTestExecutePhaseOnShardFailure(true); + } + public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException { ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); SearchRequestStats testListener = new SearchRequestStats(clusterSettings);