diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeRequest.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeRequest.java index 6ed2cc7e587be..1e8700bcd4030 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeRequest.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeRequest.java @@ -40,6 +40,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(sourcesFinished); } + @Override + public TaskId getParentTask() { + // Exchange requests with `sourcesFinished=true` complete the remote sink and return without blocking. + // Masking the parent task allows these requests to bypass task cancellation, ensuring cleanup of the remote sink. + // TODO: Maybe add a separate action/request for closing exchange sinks? + if (sourcesFinished) { + return TaskId.EMPTY_TASK_ID; + } + return super.getParentTask(); + } + /** * True if the {@link ExchangeSourceHandler} has enough input. * The corresponding {@link ExchangeSinkHandler} can drain pages and finish itself. @@ -70,9 +81,9 @@ public int hashCode() { @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - if (parentTaskId.isSet() == false) { - assert false : "ExchangeRequest must have a parent task"; - throw new IllegalStateException("ExchangeRequest must have a parent task"); + if (sourcesFinished == false && parentTaskId.isSet() == false) { + assert false : "ExchangeRequest with sourcesFinished=false must have a parent task"; + throw new IllegalStateException("ExchangeRequest with sourcesFinished=false must have a parent task"); } return new CancellableTask(id, type, action, "", parentTaskId, headers) { @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index a943a90d02e87..00c68c4f48e86 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -314,28 +314,20 @@ static final class TransportRemoteSink implements RemoteSink { @Override public void fetchPageAsync(boolean allSourcesFinished, ActionListener listener) { if (allSourcesFinished) { - if (finished.compareAndSet(false, true)) { - doFetchPageAsync(true, listener); - } else { - // already finished or promised - listener.onResponse(new ExchangeResponse(blockFactory, null, true)); - } - } else { - // already finished - if (finished.get()) { - listener.onResponse(new ExchangeResponse(blockFactory, null, true)); - return; - } - doFetchPageAsync(false, ActionListener.wrap(r -> { - if (r.finished()) { - finished.set(true); - } - listener.onResponse(r); - }, e -> { - finished.set(true); - listener.onFailure(e); - })); + close(listener.map(unused -> new ExchangeResponse(blockFactory, null, true))); + return; + } + // already finished + if (finished.get()) { + listener.onResponse(new ExchangeResponse(blockFactory, null, true)); + return; } + doFetchPageAsync(false, ActionListener.wrap(r -> { + if (r.finished()) { + finished.set(true); + } + listener.onResponse(r); + }, e -> close(ActionListener.running(() -> listener.onFailure(e))))); } private void doFetchPageAsync(boolean allSourcesFinished, ActionListener listener) { @@ -361,6 +353,15 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener listener) { + if (finished.compareAndSet(false, true)) { + doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null))); + } else { + listener.onResponse(null); + } + } } // For testing diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index 61b3386ce0274..375016a5d51d5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -224,8 +224,10 @@ void onSinkFailed(Exception e) { buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading if (finished == false) { finished = true; - outstandingSinks.finishInstance(); - completionListener.onFailure(e); + remoteSink.close(ActionListener.running(() -> { + outstandingSinks.finishInstance(); + completionListener.onFailure(e); + })); } } @@ -262,7 +264,7 @@ public void onFailure(Exception e) { failure.unwrapAndCollect(e); } buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading - sinkListener.onFailure(e); + remoteSink.close(ActionListener.running(() -> sinkListener.onFailure(e))); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java index 7d81cd3f66600..aaa937ef17c0e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/RemoteSink.java @@ -12,4 +12,14 @@ public interface RemoteSink { void fetchPageAsync(boolean allSourcesFinished, ActionListener listener); + + default void close(ActionListener listener) { + fetchPageAsync(true, listener.delegateFailure((l, r) -> { + try { + r.close(); + } finally { + l.onResponse(null); + } + })); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeRequestTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeRequestTests.java new file mode 100644 index 0000000000000..8a0891651a497 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeRequestTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.exchange; + +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ExchangeRequestTests extends ESTestCase { + + public void testParentTask() { + ExchangeRequest r1 = new ExchangeRequest("1", true); + r1.setParentTask(new TaskId("node-1", 1)); + assertSame(TaskId.EMPTY_TASK_ID, r1.getParentTask()); + + ExchangeRequest r2 = new ExchangeRequest("1", false); + r2.setParentTask(new TaskId("node-2", 2)); + assertTrue(r2.getParentTask().isSet()); + assertThat(r2.getParentTask(), equalTo((new TaskId("node-2", 2)))); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index 4178f02898d79..fc6c850ba187b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -491,7 +491,7 @@ public void testConcurrentWithTransportActions() { } } - public void testFailToRespondPage() { + public void testFailToRespondPage() throws Exception { Settings settings = Settings.builder().build(); MockTransportService node0 = newTransportService(); ExchangeService exchange0 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR, blockFactory()); @@ -558,7 +558,9 @@ public void sendResponse(TransportResponse transportResponse) { Throwable cause = ExceptionsHelper.unwrap(err, IOException.class); assertNotNull(cause); assertThat(cause.getMessage(), equalTo("page is too large")); - sinkHandler.onFailure(new RuntimeException(cause)); + PlainActionFuture sinkCompletionFuture = new PlainActionFuture<>(); + sinkHandler.addCompletionListener(sinkCompletionFuture); + assertBusy(() -> assertTrue(sinkCompletionFuture.isDone())); expectThrows(Exception.class, () -> sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS)); } }