Skip to content

Commit

Permalink
[Backport 2.x] onShardResult and onShardFailure are executed on one s…
Browse files Browse the repository at this point in the history
…hard causes opensearch jvm crashed (#12612)

* onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed

Signed-off-by: kkewwei <[email protected]>

* unit test

Signed-off-by: kkewwei <[email protected]>

* spotlessJavaCheck

Signed-off-by: kkewwei <[email protected]>

* rename variable names

Signed-off-by: kkewwei <[email protected]>

* add changelog

Signed-off-by: kkewwei <[email protected]>

---------

Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei authored Mar 12, 2024
1 parent ac9147b commit 30b7a95
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix `terms` query on `float` field when `doc_values` are turned off by reverting back to `FloatPoint` from `FloatField` ([#12499](https://github.com/opensearch-project/OpenSearch/pull/12499))
- Fix get task API does not refresh resource stats ([#11531](https://github.com/opensearch-project/OpenSearch/pull/11531))
- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679))
- onShardResult and onShardFailure are executed on one shard causes opensearch jvm crashed ([#12158](https://github.com/opensearch-project/OpenSearch/pull/12158))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
Runnable r = () -> {
final Thread thread = Thread.currentThread();
try {
final SearchPhase phase = this;
executePhaseOnShard(shardIt, shard, new SearchActionListener<Result>(shard, shardIndex) {
@Override
public void innerOnResponse(Result result) {
Expand All @@ -299,7 +300,12 @@ public void innerOnResponse(Result result) {
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shardIt, t);
// It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception.
if (totalOps.get() == expectedTotalOps) {
onPhaseFailure(phase, "The phase has failed", t);
} else {
onShardFailure(shardIndex, shard, shardIt, t);
}
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
listener,
controlled,
false,
false,
expected,
new SearchShardIterator(null, null, Collections.emptyList(), null)
);
Expand All @@ -162,6 +163,7 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
ActionListener<SearchResponse> listener,
final boolean controlled,
final boolean failExecutePhaseOnShard,
final boolean catchExceptionWhenExecutePhaseOnShard,
final AtomicLong expected,
final SearchShardIterator... shards
) {
Expand Down Expand Up @@ -221,7 +223,15 @@ protected void executePhaseOnShard(
if (failExecutePhaseOnShard) {
listener.onFailure(new ShardNotFoundException(shardIt.shardId()));
} else {
listener.onResponse(new QuerySearchResult());
if (catchExceptionWhenExecutePhaseOnShard) {
try {
listener.onResponse(new QuerySearchResult());
} catch (Exception e) {
listener.onFailure(e);
}
} else {
listener.onResponse(new QuerySearchResult());
}
}
}

Expand Down Expand Up @@ -509,6 +519,7 @@ public void onFailure(Exception e) {
},
false,
true,
false,
new AtomicLong(),
shards
);
Expand Down Expand Up @@ -555,6 +566,7 @@ public void onFailure(Exception e) {
},
false,
false,
false,
new AtomicLong(),
shards
);
Expand All @@ -570,7 +582,7 @@ public void onFailure(Exception e) {
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

public void testExecutePhaseOnShardFailure() throws InterruptedException {
private void innerTestExecutePhaseOnShardFailure(boolean catchExceptionWhenExecutePhaseOnShard) throws InterruptedException {
final Index index = new Index("test", UUID.randomUUID().toString());

final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3))
Expand Down Expand Up @@ -606,6 +618,7 @@ public void onFailure(Exception e) {
},
false,
false,
catchExceptionWhenExecutePhaseOnShard,
new AtomicLong(),
shards
);
Expand All @@ -621,6 +634,14 @@ public void onFailure(Exception e) {
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

public void testExecutePhaseOnShardFailure() throws InterruptedException {
innerTestExecutePhaseOnShardFailure(false);
}

public void testExecutePhaseOnShardFailureAndThrowException() throws InterruptedException {
innerTestExecutePhaseOnShardFailure(true);
}

public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedException {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testListener = new SearchRequestStats(clusterSettings);
Expand Down

0 comments on commit 30b7a95

Please sign in to comment.