Skip to content

Commit

Permalink
Merge branch 'main' of github.com:elastic/elasticsearch into es-117916
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Dec 6, 2024
2 parents b057948 + ab6fcc4 commit 23a6059
Show file tree
Hide file tree
Showing 20 changed files with 180 additions and 61 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117657.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117657
summary: Ignore cancellation exceptions
area: ES|QL
type: bug
issues: []
6 changes: 6 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ tests:
- class: org.elasticsearch.packaging.test.KeystoreManagementTests
method: test30KeystorePasswordFromFile
issue: https://github.com/elastic/elasticsearch/issues/118123
- class: org.elasticsearch.packaging.test.KeystoreManagementTests
method: test31WrongKeystorePasswordFromFile
issue: https://github.com/elastic/elasticsearch/issues/118123
- class: org.elasticsearch.packaging.test.ArchiveTests
method: test41AutoconfigurationNotTriggeredWhenNodeCannotContainData
issue: https://github.com/elastic/elasticsearch/issues/118110
Expand All @@ -260,6 +263,9 @@ tests:
- class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2UnavailableRemotesIT
method: testEsqlRcs2UnavailableRemoteScenarios
issue: https://github.com/elastic/elasticsearch/issues/117419
- class: org.elasticsearch.packaging.test.DebPreservationTests
method: test40RestartOnUpgrade
issue: https://github.com/elastic/elasticsearch/issues/118170

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,16 @@ public static String name(Expression e) {
return e instanceof NamedExpression ne ? ne.name() : e.sourceText();
}

public static boolean isNull(Expression e) {
return e.dataType() == DataType.NULL || (e.foldable() && e.fold() == null);
/**
* Is this {@linkplain Expression} <strong>guaranteed</strong> to have
* only the {@code null} value. {@linkplain Expression}s that
* {@link Expression#fold()} to {@code null} <strong>may</strong>
* return {@code false} here, but should <strong>eventually</strong> be folded
* into a {@link Literal} containing {@code null} which will return
* {@code true} from here.
*/
public static boolean isGuaranteedNull(Expression e) {
return e.dataType() == DataType.NULL || (e instanceof Literal lit && lit.value() == null);
}

public static List<String> names(Collection<? extends Expression> e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.core.Releasable;

/**
* Similar to {@link org.elasticsearch.action.support.RefCountingListener},
* but prefers non-task-cancelled exceptions over task-cancelled ones as they are more useful for diagnosing issues.
* @see FailureCollector
*/
public final class EsqlRefCountingListener implements Releasable {
private final FailureCollector failureCollector;
private final RefCountingRunnable refs;

public EsqlRefCountingListener(ActionListener<Void> delegate) {
this.failureCollector = new FailureCollector();
this.refs = new RefCountingRunnable(() -> {
Exception error = failureCollector.getFailure();
if (error != null) {
delegate.onFailure(error);
} else {
delegate.onResponse(null);
}
});
}

public ActionListener<Void> acquire() {
return refs.acquireListener().delegateResponse((l, e) -> {
failureCollector.unwrapAndCollect(e);
l.onFailure(e);
});
}

@Override
public void close() {
refs.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.TransportException;

import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Semaphore;

/**
* {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine.
Expand All @@ -26,12 +25,11 @@
*/
public final class FailureCollector {
private final Queue<Exception> cancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger cancelledExceptionsCount = new AtomicInteger();
private final Semaphore cancelledExceptionsPermits;

private final Queue<Exception> nonCancelledExceptions = ConcurrentCollections.newQueue();
private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger();
private final Semaphore nonCancelledExceptionsPermits;

private final int maxExceptions;
private volatile boolean hasFailure = false;
private Exception finalFailure = null;

Expand All @@ -43,7 +41,8 @@ public FailureCollector(int maxExceptions) {
if (maxExceptions <= 0) {
throw new IllegalArgumentException("maxExceptions must be at least one");
}
this.maxExceptions = maxExceptions;
this.cancelledExceptionsPermits = new Semaphore(maxExceptions);
this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions);
}

private static Exception unwrapTransportException(TransportException te) {
Expand All @@ -60,13 +59,12 @@ private static Exception unwrapTransportException(TransportException te) {
public void unwrapAndCollect(Exception e) {
e = e instanceof TransportException te ? unwrapTransportException(te) : e;
if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) {
if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) {
cancelledExceptions.add(e);
}
} else {
if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) {
nonCancelledExceptions.add(e);
}
} else if (nonCancelledExceptionsPermits.tryAcquire()) {
nonCancelledExceptions.add(e);
cancelledExceptions.clear();
}
hasFailure = true;
}
Expand Down Expand Up @@ -99,20 +97,22 @@ public Exception getFailure() {
private Exception buildFailure() {
assert hasFailure;
assert Thread.holdsLock(this);
int total = 0;
Exception first = null;
for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) {
for (Exception e : exceptions) {
if (first == null) {
first = e;
total++;
} else if (first != e) {
first.addSuppressed(e);
total++;
}
if (total >= maxExceptions) {
return first;
}
for (Exception e : nonCancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
if (first != null) {
return first;
}
for (Exception e : cancelledExceptions) {
if (first == null) {
first = e;
} else if (first != e) {
first.addSuppressed(e);
}
}
assert first != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.EsqlRefCountingListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.FailureCollector;
import org.elasticsearch.compute.operator.IsBlockedResult;
Expand Down Expand Up @@ -54,20 +55,20 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi
this.outstandingSinks = new PendingInstances(() -> buffer.finish(false));
this.outstandingSources = new PendingInstances(() -> buffer.finish(true));
buffer.addCompletionListener(ActionListener.running(() -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> {
final ActionListener<Void> listener = ActionListener.assertAtLeastOnce(completionListener);
try (RefCountingRunnable refs = new RefCountingRunnable(() -> {
final Exception e = failure.getFailure();
if (e != null) {
l.onFailure(e);
listener.onFailure(e);
} else {
l.onResponse(null);
listener.onResponse(null);
}
});
try (RefCountingListener refs = new RefCountingListener(listener)) {
})) {
for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) {
// Create an outstanding instance and then finish to complete the completionListener
// if we haven't registered any instances of exchange sinks or exchange sources before.
pending.trackNewInstance();
pending.completion.addListener(refs.acquire());
pending.completion.addListener(refs.acquireListener());
pending.finishInstance();
}
}
Expand Down Expand Up @@ -269,7 +270,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
try (RefCountingListener refs = new RefCountingListener(sinkListener)) {
try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) {
for (int i = 0; i < instances; i++) {
var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire());
fetcher.fetchPage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -86,6 +87,14 @@ public void testCollect() throws Exception {
assertNotNull(failure);
assertThat(failure, Matchers.in(nonCancelledExceptions));
assertThat(failure.getSuppressed().length, lessThan(maxExceptions));
assertTrue(
"cancellation exceptions must be ignored",
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TaskCancelledException).isEmpty()
);
assertTrue(
"remote transport exception must be unwrapped",
ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TransportException).isEmpty()
);
}

public void testEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
Expand All @@ -30,7 +31,9 @@
import org.elasticsearch.geo.ShapeTestUtils;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
Expand Down Expand Up @@ -129,6 +132,8 @@
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.PATTERN;
import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public final class EsqlTestUtils {

Expand Down Expand Up @@ -784,4 +789,17 @@ public static QueryParam paramAsIdentifier(String name, Object value) {
public static QueryParam paramAsPattern(String name, Object value) {
return new QueryParam(name, value, NULL, PATTERN);
}

/**
* Asserts that:
* 1. Cancellation exceptions are ignored when more relevant exceptions exist.
* 2. Transport exceptions are unwrapped, and the actual causes are reported to users.
*/
public static void assertEsqlFailure(Exception e) {
assertNotNull(e);
var cancellationFailure = ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof TaskCancelledException).orElse(null);
assertNull("cancellation exceptions must be ignored", cancellationFailure);
ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException)
.ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) {
return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info("request failed", e);
EsqlTestUtils.assertEsqlFailure(e);
ensureBlocksReleased();
} finally {
setRequestCircuitBreakerLimit(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -85,6 +86,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu
} catch (Exception e) {
logger.info("request failed", e);
ensureBlocksReleased();
EsqlTestUtils.assertEsqlFailure(e);
throw e;
} finally {
setRequestCircuitBreakerLimit(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before;

Expand Down Expand Up @@ -338,7 +339,15 @@ private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Ex
*/
assertThat(
cancelException.getMessage(),
in(List.of("test cancel", "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]"))
in(
List.of(
"test cancel",
"task cancelled",
"request cancelled test cancel",
"parent task was cancelled [test cancel]",
"cancelled on failure"
)
)
);
assertBusy(
() -> assertThat(
Expand Down Expand Up @@ -434,6 +443,7 @@ protected void doRun() throws Exception {
allowedFetching.countDown();
}
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
EsqlTestUtils.assertEsqlFailure(failure);
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
// If we proceed without waiting for pages, we might cancel the main request before starting the data-node request.
// As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -111,6 +112,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
ensureBlocksReleased();
logger.info("--> failed to execute esql query with disruption; retrying...", e);
EsqlTestUtils.assertEsqlFailure(e);
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ public Expression replaceChildren(List<Expression> newChildren) {
public boolean foldable() {
// QL's In fold()s to null, if value() is null, but isn't foldable() unless all children are
// TODO: update this null check in QL too?
return Expressions.isNull(value)
return Expressions.isGuaranteedNull(value)
|| Expressions.foldable(children())
|| (Expressions.foldable(list) && list.stream().allMatch(Expressions::isNull));
|| (Expressions.foldable(list) && list.stream().allMatch(Expressions::isGuaranteedNull));
}

@Override
public Object fold() {
if (Expressions.isNull(value) || list.stream().allMatch(Expressions::isNull)) {
if (Expressions.isGuaranteedNull(value) || list.stream().allMatch(Expressions::isGuaranteedNull)) {
return null;
}
return super.fold();
Expand Down
Loading

0 comments on commit 23a6059

Please sign in to comment.