From 1448f12d23c714a176b090ab2cb956a9195d6e99 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 15 Jan 2025 12:53:49 -0800 Subject: [PATCH] Check for early termination in Driver (#118188) This change introduces support for periodically checking for early termination. This enables early exits in the following scenarios: 1. The query has accumulated sufficient data (e.g., reaching the LIMIT). 2. The query is stopped (either by users or due to failures). Other changes will be addressed in follow-up PRs. --- docs/changelog/118188.yaml | 5 + .../compute/operator/Driver.java | 124 +++++++++++------- .../compute/operator/DriverContext.java | 17 +++ .../DriverEarlyTerminationException.java | 19 +++ .../compute/operator/DriverTests.java | 45 +++++++ .../function/scalar/util/DelayEvaluator.java | 91 ------------- .../function/scalar/util/Delay.java | 44 +++++-- 7 files changed, 194 insertions(+), 151 deletions(-) create mode 100644 docs/changelog/118188.yaml create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java delete mode 100644 x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/util/DelayEvaluator.java diff --git a/docs/changelog/118188.yaml b/docs/changelog/118188.yaml new file mode 100644 index 0000000000000..f24651231b7a0 --- /dev/null +++ b/docs/changelog/118188.yaml @@ -0,0 +1,5 @@ +pr: 118188 +summary: Check for early termination in Driver +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index ee0da6043663e..78572f55cd5eb 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -186,7 +186,13 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie long nextStatus = startTime + statusNanos; int iter = 0; while (true) { - IsBlockedResult isBlocked = runSingleLoopIteration(); + IsBlockedResult isBlocked = Operator.NOT_BLOCKED; + try { + isBlocked = runSingleLoopIteration(); + } catch (DriverEarlyTerminationException unused) { + closeEarlyFinishedOperators(); + assert isFinished() : "not finished after early termination"; + } iter++; if (isBlocked.listener().isDone() == false) { updateStatus(nowSupplier.getAsLong() - startTime, iter, DriverStatus.Status.ASYNC, isBlocked.reason()); @@ -242,39 +248,59 @@ public void abort(Exception reason, ActionListener listener) { } private IsBlockedResult runSingleLoopIteration() { - ensureNotCancelled(); + driverContext.checkForEarlyTermination(); boolean movedPage = false; - if (activeOperators.isEmpty() == false && activeOperators.getLast().isFinished() == false) { - for (int i = 0; i < activeOperators.size() - 1; i++) { - Operator op = activeOperators.get(i); - Operator nextOp = activeOperators.get(i + 1); + for (int i = 0; i < activeOperators.size() - 1; i++) { + Operator op = activeOperators.get(i); + Operator nextOp = activeOperators.get(i + 1); - // skip blocked operator - if (op.isBlocked().listener().isDone() == false) { - continue; - } + // skip blocked operator + if (op.isBlocked().listener().isDone() == false) { + continue; + } - if (op.isFinished() == false && nextOp.needsInput()) { - Page page = op.getOutput(); - if (page == null) { - // No result, just move to the next iteration - } else if (page.getPositionCount() == 0) { - // Empty result, release any memory it holds immediately and move to the next iteration + if (op.isFinished() == false && nextOp.needsInput()) { + driverContext.checkForEarlyTermination(); + Page page = op.getOutput(); + if (page == null) { + // No result, just move to the next iteration + } else if (page.getPositionCount() == 0) { + // Empty result, release any memory it holds immediately and move to the next iteration + page.releaseBlocks(); + } else { + // Non-empty result from the previous operation, move it to the next operation + try { + driverContext.checkForEarlyTermination(); + } catch (DriverEarlyTerminationException | TaskCancelledException e) { page.releaseBlocks(); - } else { - // Non-empty result from the previous operation, move it to the next operation - nextOp.addInput(page); - movedPage = true; + throw e; } + nextOp.addInput(page); + movedPage = true; } + } - if (op.isFinished()) { - nextOp.finish(); - } + if (op.isFinished()) { + driverContext.checkForEarlyTermination(); + nextOp.finish(); } } + closeEarlyFinishedOperators(); + + if (movedPage == false) { + return oneOf( + activeOperators.stream() + .map(Operator::isBlocked) + .filter(laf -> laf.listener().isDone() == false) + .collect(Collectors.toList()) + ); + } + return Operator.NOT_BLOCKED; + } + + private void closeEarlyFinishedOperators() { for (int index = activeOperators.size() - 1; index >= 0; index--) { if (activeOperators.get(index).isFinished()) { /* @@ -300,16 +326,6 @@ private IsBlockedResult runSingleLoopIteration() { break; } } - - if (movedPage == false) { - return oneOf( - activeOperators.stream() - .map(Operator::isBlocked) - .filter(laf -> laf.listener().isDone() == false) - .collect(Collectors.toList()) - ); - } - return Operator.NOT_BLOCKED; } public void cancel(String reason) { @@ -318,13 +334,6 @@ public void cancel(String reason) { } } - private void ensureNotCancelled() { - String reason = cancelReason.get(); - if (reason != null) { - throw new TaskCancelledException(reason); - } - } - public static void start( ThreadContext threadContext, Executor executor, @@ -335,19 +344,36 @@ public static void start( driver.completionListener.addListener(listener); if (driver.started.compareAndSet(false, true)) { driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting"); - // Register a listener to an exchange sink to handle early completion scenarios: - // 1. When the query accumulates sufficient data (e.g., reaching the LIMIT). - // 2. When users abort the query but want to retain the current result. - // This allows the Driver to finish early without waiting for the scheduled task. - if (driver.activeOperators.isEmpty() == false) { - if (driver.activeOperators.getLast() instanceof ExchangeSinkOperator sinkOperator) { - sinkOperator.addCompletionListener(ActionListener.running(driver.scheduler::runPendingTasks)); - } - } + initializeEarlyTerminationChecker(driver); schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener); } } + private static void initializeEarlyTerminationChecker(Driver driver) { + // Register a listener to an exchange sink to handle early completion scenarios: + // 1. When the query accumulates sufficient data (e.g., reaching the LIMIT). + // 2. When users abort the query but want to retain the current result. + // This allows the Driver to finish early without waiting for the scheduled task. + final AtomicBoolean earlyFinished = new AtomicBoolean(); + driver.driverContext.initializeEarlyTerminationChecker(() -> { + final String reason = driver.cancelReason.get(); + if (reason != null) { + throw new TaskCancelledException(reason); + } + if (earlyFinished.get()) { + throw new DriverEarlyTerminationException("Exchange sink is closed"); + } + }); + if (driver.activeOperators.isEmpty() == false) { + if (driver.activeOperators.getLast() instanceof ExchangeSinkOperator sinkOperator) { + sinkOperator.addCompletionListener(ActionListener.running(() -> { + earlyFinished.set(true); + driver.scheduler.runPendingTasks(); + })); + } + } + } + // Drains all active operators and closes them. private void drainAndCloseOperators(@Nullable Exception e) { Iterator itr = activeOperators.iterator(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java index 843aa4aaaa881..1877f564677ba 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverContext.java @@ -60,6 +60,8 @@ public class DriverContext { private final WarningsMode warningsMode; + private Runnable earlyTerminationChecker = () -> {}; + public DriverContext(BigArrays bigArrays, BlockFactory blockFactory) { this(bigArrays, blockFactory, WarningsMode.COLLECT); } @@ -175,6 +177,21 @@ public void removeAsyncAction() { asyncActions.removeInstance(); } + /** + * Checks if the Driver associated with this DriverContext has been cancelled or early terminated. + */ + public void checkForEarlyTermination() { + earlyTerminationChecker.run(); + } + + /** + * Initializes the early termination or cancellation checker for this DriverContext. + * This method should be called when associating this DriverContext with a driver. + */ + public void initializeEarlyTerminationChecker(Runnable checker) { + this.earlyTerminationChecker = checker; + } + /** * Evaluators should use this function to decide their warning behavior. * @return an appropriate {@link WarningsMode} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java new file mode 100644 index 0000000000000..6f79a6341df7d --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverEarlyTerminationException.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.ElasticsearchException; + +/** + * An exception indicates that a compute should be terminated early as the downstream pipeline has enough or no long requires more data. + */ +public final class DriverEarlyTerminationException extends ElasticsearchException { + public DriverEarlyTerminationException(String message) { + super(message); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index 88028bae368ec..b067c44a289b4 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.Page; @@ -40,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.LongSupplier; @@ -280,6 +282,49 @@ public Page getOutput() { } } + public void testEarlyTermination() { + DriverContext driverContext = driverContext(); + ThreadPool threadPool = threadPool(); + try { + int positions = between(1000, 5000); + List inPages = randomList(1, 100, () -> { + var block = driverContext.blockFactory().newConstantIntBlockWith(randomInt(), positions); + return new Page(block); + }); + final var sourceOperator = new CannedSourceOperator(inPages.iterator()); + final int maxAllowedRows = between(1, 100); + final AtomicInteger processedRows = new AtomicInteger(0); + var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), positions, System::currentTimeMillis); + var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(), Function.identity()); + final var delayOperator = new EvalOperator(driverContext.blockFactory(), new EvalOperator.ExpressionEvaluator() { + @Override + public Block eval(Page page) { + for (int i = 0; i < page.getPositionCount(); i++) { + driverContext.checkForEarlyTermination(); + if (processedRows.incrementAndGet() >= maxAllowedRows) { + sinkHandler.fetchPageAsync(true, ActionListener.noop()); + } + } + return driverContext.blockFactory().newConstantBooleanBlockWith(true, page.getPositionCount()); + } + + @Override + public void close() { + + } + }); + Driver driver = new Driver(driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {}); + ThreadContext threadContext = threadPool.getThreadContext(); + PlainActionFuture future = new PlainActionFuture<>(); + + Driver.start(threadContext, threadPool.executor("esql"), driver, between(1, 1000), future); + future.actionGet(30, TimeUnit.SECONDS); + assertThat(processedRows.get(), equalTo(maxAllowedRows)); + } finally { + terminate(threadPool); + } + } + public void testResumeOnEarlyFinish() throws Exception { DriverContext driverContext = driverContext(); ThreadPool threadPool = threadPool(); diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/util/DelayEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/util/DelayEvaluator.java deleted file mode 100644 index 0db714eceb285..0000000000000 --- a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/util/DelayEvaluator.java +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License -// 2.0; you may not use this file except in compliance with the Elastic License -// 2.0. -package org.elasticsearch.xpack.esql.expression.function.scalar.util; - -import java.lang.Override; -import java.lang.String; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BooleanVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.EvalOperator; -import org.elasticsearch.compute.operator.Warnings; -import org.elasticsearch.xpack.esql.core.tree.Source; - -/** - * {@link EvalOperator.ExpressionEvaluator} implementation for {@link Delay}. - * This class is generated. Do not edit it. - */ -public final class DelayEvaluator implements EvalOperator.ExpressionEvaluator { - private final Source source; - - private final long ms; - - private final DriverContext driverContext; - - private Warnings warnings; - - public DelayEvaluator(Source source, long ms, DriverContext driverContext) { - this.source = source; - this.ms = ms; - this.driverContext = driverContext; - } - - @Override - public Block eval(Page page) { - return eval(page.getPositionCount()).asBlock(); - } - - public BooleanVector eval(int positionCount) { - try(BooleanVector.FixedBuilder result = driverContext.blockFactory().newBooleanVectorFixedBuilder(positionCount)) { - position: for (int p = 0; p < positionCount; p++) { - result.appendBoolean(p, Delay.process(this.ms)); - } - return result.build(); - } - } - - @Override - public String toString() { - return "DelayEvaluator[" + "ms=" + ms + "]"; - } - - @Override - public void close() { - } - - private Warnings warnings() { - if (warnings == null) { - this.warnings = Warnings.createWarnings( - driverContext.warningsMode(), - source.source().getLineNumber(), - source.source().getColumnNumber(), - source.text() - ); - } - return warnings; - } - - static class Factory implements EvalOperator.ExpressionEvaluator.Factory { - private final Source source; - - private final long ms; - - public Factory(Source source, long ms) { - this.source = source; - this.ms = ms; - } - - @Override - public DelayEvaluator get(DriverContext context) { - return new DelayEvaluator(source, ms, context); - } - - @Override - public String toString() { - return "DelayEvaluator[" + "ms=" + ms + "]"; - } - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/util/Delay.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/util/Delay.java index 3b17133bf4974..712dff3024de9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/util/Delay.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/util/Delay.java @@ -10,8 +10,9 @@ import org.elasticsearch.Build; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.compute.ann.Evaluator; -import org.elasticsearch.compute.ann.Fixed; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FoldContext; @@ -102,21 +103,42 @@ private long msValue(FoldContext ctx) { @Override public ExpressionEvaluator.Factory toEvaluator(EvaluatorMapper.ToEvaluator toEvaluator) { - return new DelayEvaluator.Factory(source(), msValue(toEvaluator.foldCtx())); + return context -> new DelayEvaluator(context, msValue(toEvaluator.foldCtx())); } - @Evaluator - static boolean process(@Fixed long ms) { - // Only activate in snapshot builds - if (Build.current().isSnapshot()) { + static final class DelayEvaluator implements ExpressionEvaluator { + private final DriverContext driverContext; + private final long ms; + + DelayEvaluator(DriverContext driverContext, long ms) { + if (Build.current().isSnapshot() == false) { + throw new IllegalArgumentException("Delay function is only available in snapshot builds"); + } + this.driverContext = driverContext; + this.ms = ms; + } + + @Override + public Block eval(Page page) { + int positionCount = page.getPositionCount(); + for (int p = 0; p < positionCount; p++) { + delay(ms); + } + return driverContext.blockFactory().newConstantBooleanBlockWith(true, positionCount); + } + + private void delay(long ms) { try { + driverContext.checkForEarlyTermination(); Thread.sleep(ms); } catch (InterruptedException e) { - return true; + Thread.currentThread().interrupt(); } - } else { - throw new IllegalArgumentException("Delay function is only available in snapshot builds"); } - return true; + + @Override + public void close() { + + } } }