Skip to content

Commit

Permalink
OpenSearch crashes on closed client connection before search reply wh…
Browse files Browse the repository at this point in the history
…en total ops higher compared to expected (opensearch-project#4143) (opensearch-project#4149)

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Aug 5, 2022
1 parent b2b6c05 commit 4b0f2ec
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,16 @@ public void onFailure(Exception t) {
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
fork(() -> {
// It only happens when onPhaseDone() is called and executePhaseOnShard() fails hard with an exception.
// In this case calling onShardFailure() would overflow the operations counter, so the best we could do
// here is to fail the phase and move on to the next one.
if (totalOps.get() == expectedTotalOps) {
onPhaseFailure(this, "The phase has failed", e);
} else {
onShardFailure(shardIndex, shard, shardIt, e);
}
});
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,57 @@ public void onFailure(Exception e) {
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

public void testExecutePhaseOnShardFailure() throws InterruptedException {
final Index index = new Index("test", UUID.randomUUID().toString());

final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(3))
.mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1", "n2", "n3"), null, null, null))
.toArray(SearchShardIterator[]::new);

final AtomicBoolean fail = new AtomicBoolean(true);
final CountDownLatch latch = new CountDownLatch(1);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
searchRequest.setMaxConcurrentShardRequests(5);

final ArraySearchPhaseResults<SearchPhaseResult> queryResult = new ArraySearchPhaseResults<>(shards.length);
AbstractSearchAsyncAction<SearchPhaseResult> action = createAction(
searchRequest,
queryResult,
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {}

@Override
public void onFailure(Exception e) {
try {
// We end up here only when onPhaseDone() is called (causing NPE) and
// ending up in the onPhaseFailure() callback
if (fail.compareAndExchange(true, false)) {
assertThat(e, instanceOf(SearchPhaseExecutionException.class));
throw new RuntimeException("Simulated exception");
}
} finally {
executor.submit(() -> latch.countDown());
}
}
},
false,
false,
new AtomicLong(),
shards
);
action.run();
assertTrue(latch.await(1, TimeUnit.SECONDS));

InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null);
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile());
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
assertThat(searchResponse.getSuccessfulShards(), equalTo(shards.length));
}

private static final class PhaseResult extends SearchPhaseResult {
PhaseResult(ShardSearchContextId contextId) {
this.contextId = contextId;
Expand Down

0 comments on commit 4b0f2ec

Please sign in to comment.