diff --git a/docs/changelog/117657.yaml b/docs/changelog/117657.yaml new file mode 100644 index 0000000000000..0a72e9dabe9e8 --- /dev/null +++ b/docs/changelog/117657.yaml @@ -0,0 +1,5 @@ +pr: 117657 +summary: Ignore cancellation exceptions +area: ES|QL +type: bug +issues: [] diff --git a/muted-tests.yml b/muted-tests.yml index fb2fea908ef9e..8f1030279efc9 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -248,6 +248,9 @@ tests: - class: org.elasticsearch.packaging.test.KeystoreManagementTests method: test30KeystorePasswordFromFile issue: https://github.com/elastic/elasticsearch/issues/118123 +- class: org.elasticsearch.packaging.test.KeystoreManagementTests + method: test31WrongKeystorePasswordFromFile + issue: https://github.com/elastic/elasticsearch/issues/118123 - class: org.elasticsearch.packaging.test.ArchiveTests method: test41AutoconfigurationNotTriggeredWhenNodeCannotContainData issue: https://github.com/elastic/elasticsearch/issues/118110 @@ -260,6 +263,9 @@ tests: - class: org.elasticsearch.xpack.remotecluster.CrossClusterEsqlRCS2UnavailableRemotesIT method: testEsqlRcs2UnavailableRemoteScenarios issue: https://github.com/elastic/elasticsearch/issues/117419 +- class: org.elasticsearch.packaging.test.DebPreservationTests + method: test40RestartOnUpgrade + issue: https://github.com/elastic/elasticsearch/issues/118170 # Examples: # diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java index 8baffbf887e47..4e4338aad3704 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java @@ -132,8 +132,16 @@ public static String name(Expression e) { return e instanceof NamedExpression ne ? ne.name() : e.sourceText(); } - public static boolean isNull(Expression e) { - return e.dataType() == DataType.NULL || (e.foldable() && e.fold() == null); + /** + * Is this {@linkplain Expression} guaranteed to have + * only the {@code null} value. {@linkplain Expression}s that + * {@link Expression#fold()} to {@code null} may + * return {@code false} here, but should eventually be folded + * into a {@link Literal} containing {@code null} which will return + * {@code true} from here. + */ + public static boolean isGuaranteedNull(Expression e) { + return e.dataType() == DataType.NULL || (e instanceof Literal lit && lit.value() == null); } public static List names(Collection e) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java new file mode 100644 index 0000000000000..69df0fb8ceff1 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/EsqlRefCountingListener.java @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.RefCountingRunnable; +import org.elasticsearch.compute.operator.FailureCollector; +import org.elasticsearch.core.Releasable; + +/** + * Similar to {@link org.elasticsearch.action.support.RefCountingListener}, + * but prefers non-task-cancelled exceptions over task-cancelled ones as they are more useful for diagnosing issues. + * @see FailureCollector + */ +public final class EsqlRefCountingListener implements Releasable { + private final FailureCollector failureCollector; + private final RefCountingRunnable refs; + + public EsqlRefCountingListener(ActionListener delegate) { + this.failureCollector = new FailureCollector(); + this.refs = new RefCountingRunnable(() -> { + Exception error = failureCollector.getFailure(); + if (error != null) { + delegate.onFailure(error); + } else { + delegate.onResponse(null); + } + }); + } + + public ActionListener acquire() { + return refs.acquireListener().delegateResponse((l, e) -> { + failureCollector.unwrapAndCollect(e); + l.onFailure(e); + }); + } + + @Override + public void close() { + refs.close(); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java index 943ba4dc1f4fa..337075edbdcf6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java @@ -13,9 +13,8 @@ import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.transport.TransportException; -import java.util.List; import java.util.Queue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.Semaphore; /** * {@code FailureCollector} is responsible for collecting exceptions that occur in the compute engine. @@ -26,12 +25,11 @@ */ public final class FailureCollector { private final Queue cancelledExceptions = ConcurrentCollections.newQueue(); - private final AtomicInteger cancelledExceptionsCount = new AtomicInteger(); + private final Semaphore cancelledExceptionsPermits; private final Queue nonCancelledExceptions = ConcurrentCollections.newQueue(); - private final AtomicInteger nonCancelledExceptionsCount = new AtomicInteger(); + private final Semaphore nonCancelledExceptionsPermits; - private final int maxExceptions; private volatile boolean hasFailure = false; private Exception finalFailure = null; @@ -43,7 +41,8 @@ public FailureCollector(int maxExceptions) { if (maxExceptions <= 0) { throw new IllegalArgumentException("maxExceptions must be at least one"); } - this.maxExceptions = maxExceptions; + this.cancelledExceptionsPermits = new Semaphore(maxExceptions); + this.nonCancelledExceptionsPermits = new Semaphore(maxExceptions); } private static Exception unwrapTransportException(TransportException te) { @@ -60,13 +59,12 @@ private static Exception unwrapTransportException(TransportException te) { public void unwrapAndCollect(Exception e) { e = e instanceof TransportException te ? unwrapTransportException(te) : e; if (ExceptionsHelper.unwrap(e, TaskCancelledException.class) != null) { - if (cancelledExceptionsCount.incrementAndGet() <= maxExceptions) { + if (nonCancelledExceptions.isEmpty() && cancelledExceptionsPermits.tryAcquire()) { cancelledExceptions.add(e); } - } else { - if (nonCancelledExceptionsCount.incrementAndGet() <= maxExceptions) { - nonCancelledExceptions.add(e); - } + } else if (nonCancelledExceptionsPermits.tryAcquire()) { + nonCancelledExceptions.add(e); + cancelledExceptions.clear(); } hasFailure = true; } @@ -99,20 +97,22 @@ public Exception getFailure() { private Exception buildFailure() { assert hasFailure; assert Thread.holdsLock(this); - int total = 0; Exception first = null; - for (var exceptions : List.of(nonCancelledExceptions, cancelledExceptions)) { - for (Exception e : exceptions) { - if (first == null) { - first = e; - total++; - } else if (first != e) { - first.addSuppressed(e); - total++; - } - if (total >= maxExceptions) { - return first; - } + for (Exception e : nonCancelledExceptions) { + if (first == null) { + first = e; + } else if (first != e) { + first.addSuppressed(e); + } + } + if (first != null) { + return first; + } + for (Exception e : cancelledExceptions) { + if (first == null) { + first = e; + } else if (first != e) { + first.addSuppressed(e); } } assert first != null; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index 375016a5d51d5..b53ddea3da587 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -9,9 +9,10 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.IsBlockedResult; @@ -54,20 +55,20 @@ public ExchangeSourceHandler(int maxBufferSize, Executor fetchExecutor, ActionLi this.outstandingSinks = new PendingInstances(() -> buffer.finish(false)); this.outstandingSources = new PendingInstances(() -> buffer.finish(true)); buffer.addCompletionListener(ActionListener.running(() -> { - final ActionListener listener = ActionListener.assertAtLeastOnce(completionListener).delegateFailure((l, unused) -> { + final ActionListener listener = ActionListener.assertAtLeastOnce(completionListener); + try (RefCountingRunnable refs = new RefCountingRunnable(() -> { final Exception e = failure.getFailure(); if (e != null) { - l.onFailure(e); + listener.onFailure(e); } else { - l.onResponse(null); + listener.onResponse(null); } - }); - try (RefCountingListener refs = new RefCountingListener(listener)) { + })) { for (PendingInstances pending : List.of(outstandingSinks, outstandingSources)) { // Create an outstanding instance and then finish to complete the completionListener // if we haven't registered any instances of exchange sinks or exchange sources before. pending.trackNewInstance(); - pending.completion.addListener(refs.acquire()); + pending.completion.addListener(refs.acquireListener()); pending.finishInstance(); } } @@ -269,7 +270,7 @@ public void onFailure(Exception e) { @Override protected void doRun() { - try (RefCountingListener refs = new RefCountingListener(sinkListener)) { + try (EsqlRefCountingListener refs = new EsqlRefCountingListener(sinkListener)) { for (int i = 0; i < instances; i++) { var fetcher = new RemoteSinkFetcher(remoteSink, failFast, refs.acquire()); fetcher.fetchPage(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java index 637cbe8892b3e..5fec82b32ddac 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/FailureCollectorTests.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -86,6 +87,14 @@ public void testCollect() throws Exception { assertNotNull(failure); assertThat(failure, Matchers.in(nonCancelledExceptions)); assertThat(failure.getSuppressed().length, lessThan(maxExceptions)); + assertTrue( + "cancellation exceptions must be ignored", + ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TaskCancelledException).isEmpty() + ); + assertTrue( + "remote transport exception must be unwrapped", + ExceptionsHelper.unwrapCausesAndSuppressed(failure, t -> t instanceof TransportException).isEmpty() + ); } public void testEmpty() { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index ec9af33dd6690..5535e801b1b0c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -10,6 +10,7 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Strings; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.NoopCircuitBreaker; @@ -30,7 +31,9 @@ import org.elasticsearch.geo.ShapeTestUtils; import org.elasticsearch.index.IndexMode; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.analysis.EnrichResolution; @@ -129,6 +132,8 @@ import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.PATTERN; import static org.elasticsearch.xpack.esql.parser.ParserUtils.ParamClassification.VALUE; import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; public final class EsqlTestUtils { @@ -784,4 +789,17 @@ public static QueryParam paramAsIdentifier(String name, Object value) { public static QueryParam paramAsPattern(String name, Object value) { return new QueryParam(name, value, NULL, PATTERN); } + + /** + * Asserts that: + * 1. Cancellation exceptions are ignored when more relevant exceptions exist. + * 2. Transport exceptions are unwrapped, and the actual causes are reported to users. + */ + public static void assertEsqlFailure(Exception e) { + assertNotNull(e); + var cancellationFailure = ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof TaskCancelledException).orElse(null); + assertNull("cancellation exceptions must be ignored", cancellationFailure); + ExceptionsHelper.unwrapCausesAndSuppressed(e, t -> t instanceof RemoteTransportException) + .ifPresent(transportFailure -> assertNull("remote transport exception must be unwrapped", transportFailure.getCause())); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index dab99a0f719dd..c4da0bf32ef96 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -143,6 +143,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) { return client.execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); } catch (Exception e) { logger.info("request failed", e); + EsqlTestUtils.assertEsqlFailure(e); ensureBlocksReleased(); } finally { setRequestCircuitBreakerLimit(null); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java index 37833d8aed2d3..ec7ee8b61c2d5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import java.util.ArrayList; import java.util.Collection; @@ -85,6 +86,7 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu } catch (Exception e) { logger.info("request failed", e); ensureBlocksReleased(); + EsqlTestUtils.assertEsqlFailure(e); throw e; } finally { setRequestCircuitBreakerLimit(null); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 1939f81353c0e..abd4f6b49d7b4 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.junit.Before; @@ -338,7 +339,15 @@ private void assertCancelled(ActionFuture response) throws Ex */ assertThat( cancelException.getMessage(), - in(List.of("test cancel", "task cancelled", "request cancelled test cancel", "parent task was cancelled [test cancel]")) + in( + List.of( + "test cancel", + "task cancelled", + "request cancelled test cancel", + "parent task was cancelled [test cancel]", + "cancelled on failure" + ) + ) ); assertBusy( () -> assertThat( @@ -434,6 +443,7 @@ protected void doRun() throws Exception { allowedFetching.countDown(); } Exception failure = expectThrows(Exception.class, () -> future.actionGet().close()); + EsqlTestUtils.assertEsqlFailure(failure); assertThat(failure.getMessage(), containsString("failed to fetch pages")); // If we proceed without waiting for pages, we might cancel the main request before starting the data-node request. // As a result, the exchange sinks on data-nodes won't be removed until the inactive_timeout elapses, which is diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java index e9eada5def0dc..72a60a6b6b928 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportSettings; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import java.util.ArrayList; import java.util.Collection; @@ -111,6 +112,7 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) { assertTrue("request must be failed or completed after clearing disruption", future.isDone()); ensureBlocksReleased(); logger.info("--> failed to execute esql query with disruption; retrying...", e); + EsqlTestUtils.assertEsqlFailure(e); return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/In.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/In.java index eda6aadccc86a..f6c23304c189b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/In.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/predicate/operator/comparison/In.java @@ -151,14 +151,14 @@ public Expression replaceChildren(List newChildren) { public boolean foldable() { // QL's In fold()s to null, if value() is null, but isn't foldable() unless all children are // TODO: update this null check in QL too? - return Expressions.isNull(value) + return Expressions.isGuaranteedNull(value) || Expressions.foldable(children()) - || (Expressions.foldable(list) && list.stream().allMatch(Expressions::isNull)); + || (Expressions.foldable(list) && list.stream().allMatch(Expressions::isGuaranteedNull)); } @Override public Object fold() { - if (Expressions.isNull(value) || list.stream().allMatch(Expressions::isNull)) { + if (Expressions.isGuaranteedNull(value) || list.stream().allMatch(Expressions::isGuaranteedNull)) { return null; } return super.fold(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java index 638fa1b8db456..4f97bf60bd863 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNull.java @@ -30,7 +30,7 @@ public Expression rule(Expression e) { // perform this early to prevent the rule from converting the null filter into nullifying the whole expression // P.S. this could be done inside the Aggregate but this place better centralizes the logic if (e instanceof AggregateFunction agg) { - if (Expressions.isNull(agg.filter())) { + if (Expressions.isGuaranteedNull(agg.filter())) { return agg.withFilter(Literal.of(agg.filter(), false)); } } @@ -38,13 +38,13 @@ public Expression rule(Expression e) { if (result != e) { return result; } else if (e instanceof In in) { - if (Expressions.isNull(in.value())) { + if (Expressions.isGuaranteedNull(in.value())) { return Literal.of(in, null); } } else if (e instanceof Alias == false && e.nullable() == Nullability.TRUE && e instanceof Categorize == false - && Expressions.anyMatch(e.children(), Expressions::isNull)) { + && Expressions.anyMatch(e.children(), Expressions::isGuaranteedNull)) { return Literal.of(e, null); } return e; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java index b6f7ac9e464f4..00698d009ea23 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java @@ -29,7 +29,7 @@ protected LogicalPlan rule(Filter filter) { if (TRUE.equals(condition)) { return filter.child(); } - if (FALSE.equals(condition) || Expressions.isNull(condition)) { + if (FALSE.equals(condition) || Expressions.isGuaranteedNull(condition)) { return PruneEmptyPlans.skipPlan(filter); } } @@ -42,8 +42,8 @@ protected LogicalPlan rule(Filter filter) { private static Expression foldBinaryLogic(BinaryLogic binaryLogic) { if (binaryLogic instanceof Or or) { - boolean nullLeft = Expressions.isNull(or.left()); - boolean nullRight = Expressions.isNull(or.right()); + boolean nullLeft = Expressions.isGuaranteedNull(or.left()); + boolean nullRight = Expressions.isGuaranteedNull(or.right()); if (nullLeft && nullRight) { return new Literal(binaryLogic.source(), null, DataType.NULL); } @@ -55,7 +55,7 @@ private static Expression foldBinaryLogic(BinaryLogic binaryLogic) { } } if (binaryLogic instanceof And and) { - if (Expressions.isNull(and.left()) || Expressions.isNull(and.right())) { + if (Expressions.isGuaranteedNull(and.left()) || Expressions.isGuaranteedNull(and.right())) { return new Literal(binaryLogic.source(), null, DataType.NULL); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SplitInWithFoldableValue.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SplitInWithFoldableValue.java index 930b485dbd374..9e9ae6a9a559d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SplitInWithFoldableValue.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/SplitInWithFoldableValue.java @@ -30,7 +30,7 @@ public Expression rule(In in) { List foldables = new ArrayList<>(in.list().size()); List nonFoldables = new ArrayList<>(in.list().size()); in.list().forEach(e -> { - if (e.foldable() && Expressions.isNull(e) == false) { // keep `null`s, needed for the 3VL + if (e.foldable() && Expressions.isGuaranteedNull(e) == false) { // keep `null`s, needed for the 3VL foldables.add(e); } else { nonFoldables.add(e); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 8d041ffbdf0e4..8bd23230fcde7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -9,8 +9,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.operator.DriverProfile; -import org.elasticsearch.compute.operator.FailureCollector; import org.elasticsearch.compute.operator.ResponseHeadersCollector; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -39,8 +39,7 @@ final class ComputeListener implements Releasable { private static final Logger LOGGER = LogManager.getLogger(ComputeService.class); - private final RefCountingListener refs; - private final FailureCollector failureCollector = new FailureCollector(); + private final EsqlRefCountingListener refs; private final AtomicBoolean cancelled = new AtomicBoolean(); private final CancellableTask task; private final TransportService transportService; @@ -105,7 +104,7 @@ private ComputeListener( : "clusterAlias and executionInfo must both be null or both non-null"; // listener that executes after all the sub-listeners refs (created via acquireCompute) have completed - this.refs = new RefCountingListener(1, ActionListener.wrap(ignored -> { + this.refs = new EsqlRefCountingListener(delegate.delegateFailure((l, ignored) -> { responseHeaders.finish(); ComputeResponse result; @@ -131,7 +130,7 @@ private ComputeListener( } } delegate.onResponse(result); - }, e -> delegate.onFailure(failureCollector.getFailure()))); + })); } private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) { @@ -191,7 +190,6 @@ private boolean isCCSListener(String computeClusterAlias) { */ ActionListener acquireAvoid() { return refs.acquire().delegateResponse((l, e) -> { - failureCollector.unwrapAndCollect(e); try { if (cancelled.compareAndSet(false, true)) { LOGGER.debug("cancelling ESQL task {} on failure", task); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ed037d24139f8..9b59b98a7cdc2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -16,11 +16,11 @@ import org.elasticsearch.action.search.SearchShardsRequest; import org.elasticsearch.action.search.SearchShardsResponse; import org.elasticsearch.action.support.ChannelActionListener; -import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.Driver; @@ -375,7 +375,7 @@ private void startComputeOnDataNodes( var lookupListener = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); // SearchShards API can_match is done in lookupDataNodes lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodeResult -> { - try (RefCountingListener refs = new RefCountingListener(lookupListener)) { + try (EsqlRefCountingListener refs = new EsqlRefCountingListener(lookupListener)) { // update ExecutionInfo with shard counts (total and skipped) executionInfo.swapCluster( clusterAlias, @@ -436,7 +436,7 @@ private void startComputeOnRemoteClusters( ) { var queryPragmas = configuration.pragmas(); var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); - try (RefCountingListener refs = new RefCountingListener(linkExchangeListeners)) { + try (EsqlRefCountingListener refs = new EsqlRefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { final var childSessionId = newChildSession(sessionId); ExchangeService.openExchange( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java index b76781f76f4af..c2a26845d4e88 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java @@ -4820,7 +4820,7 @@ private static boolean oneLeaveIsNull(Expression e) { e.forEachUp(node -> { if (node.children().size() == 0) { - result.set(result.get() || Expressions.isNull(node)); + result.set(result.get() || Expressions.isGuaranteedNull(node)); } }); diff --git a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceLicenseServiceTests.java b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceLicenseServiceTests.java index 90a13b16c028e..0eb0d21ff2e78 100644 --- a/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceLicenseServiceTests.java +++ b/x-pack/plugin/logsdb/src/test/java/org/elasticsearch/xpack/logsdb/SyntheticSourceLicenseServiceTests.java @@ -41,6 +41,7 @@ public void setup() throws Exception { public void testLicenseAllowsSyntheticSource() { MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(true); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); @@ -53,6 +54,7 @@ public void testLicenseAllowsSyntheticSource() { public void testLicenseAllowsSyntheticSourceTemplateValidation() { MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(true); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); @@ -65,6 +67,7 @@ public void testLicenseAllowsSyntheticSourceTemplateValidation() { public void testDefaultDisallow() { MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(false); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); @@ -77,6 +80,7 @@ public void testDefaultDisallow() { public void testFallback() { MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(true); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); @@ -95,6 +99,7 @@ public void testGoldOrPlatinumLicense() throws Exception { when(mockLicenseService.getLicense()).thenReturn(license); MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.getOperationMode()).thenReturn(license.operationMode()); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE_LEGACY))).thenReturn(true); licenseService.setLicenseState(licenseState); @@ -103,6 +108,8 @@ public void testGoldOrPlatinumLicense() throws Exception { "legacy licensed usage is allowed, so not fallback to stored source", licenseService.fallbackToStoredSource(false, true) ); + Mockito.verify(licenseState, Mockito.times(1)).isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE)); + Mockito.verify(licenseState, Mockito.times(1)).isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE_LEGACY)); Mockito.verify(licenseState, Mockito.times(1)).featureUsed(any()); } @@ -112,6 +119,7 @@ public void testGoldOrPlatinumLicenseLegacyLicenseNotAllowed() throws Exception when(mockLicenseService.getLicense()).thenReturn(license); MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.getOperationMode()).thenReturn(license.operationMode()); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(false); licenseService.setLicenseState(licenseState); @@ -125,14 +133,16 @@ public void testGoldOrPlatinumLicenseLegacyLicenseNotAllowed() throws Exception } public void testGoldOrPlatinumLicenseBeyondCutoffDate() throws Exception { - long start = LocalDateTime.of(2025, 1, 1, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); + long start = LocalDateTime.of(2025, 2, 5, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); License license = createGoldOrPlatinumLicense(start); mockLicenseService = mock(LicenseService.class); when(mockLicenseService.getLicense()).thenReturn(license); MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.getOperationMode()).thenReturn(license.operationMode()); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(false); + when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE_LEGACY))).thenReturn(true); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); assertTrue("beyond cutoff date, so fallback to stored source", licenseService.fallbackToStoredSource(false, true)); @@ -143,19 +153,21 @@ public void testGoldOrPlatinumLicenseBeyondCutoffDate() throws Exception { public void testGoldOrPlatinumLicenseCustomCutoffDate() throws Exception { licenseService = new SyntheticSourceLicenseService(Settings.EMPTY, "2025-01-02T00:00"); - long start = LocalDateTime.of(2025, 1, 1, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); + long start = LocalDateTime.of(2025, 1, 3, 0, 0).toInstant(ZoneOffset.UTC).toEpochMilli(); License license = createGoldOrPlatinumLicense(start); mockLicenseService = mock(LicenseService.class); when(mockLicenseService.getLicense()).thenReturn(license); MockLicenseState licenseState = MockLicenseState.createMock(); + when(licenseState.copyCurrentLicenseState()).thenReturn(licenseState); when(licenseState.getOperationMode()).thenReturn(license.operationMode()); + when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE))).thenReturn(false); when(licenseState.isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE_LEGACY))).thenReturn(true); licenseService.setLicenseState(licenseState); licenseService.setLicenseService(mockLicenseService); - assertFalse("custom cutoff date, so fallback to stored source", licenseService.fallbackToStoredSource(false, true)); - Mockito.verify(licenseState, Mockito.times(1)).featureUsed(any()); - Mockito.verify(licenseState, Mockito.times(1)).isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE_LEGACY)); + assertTrue("custom cutoff date, so fallback to stored source", licenseService.fallbackToStoredSource(false, true)); + Mockito.verify(licenseState, Mockito.times(1)).isAllowed(same(SyntheticSourceLicenseService.SYNTHETIC_SOURCE_FEATURE)); + Mockito.verify(licenseState, Mockito.never()).featureUsed(any()); } static License createEnterpriseLicense() throws Exception {