Skip to content

Commit

Permalink
By pass cancellation when closing sinks (elastic#117797)
Browse files Browse the repository at this point in the history
> **java.lang.AssertionError: Leftover exchanges ExchangeService{sinks=[veZSyrPATq2Sg83dtgK3Jg:700/3]} on node node_s4**

I looked into the test failure described in 
elastic#117253. The reason we
don't clean up the exchange sink quickly is that, once a failure occurs,
we cancel the request along with all its child requests. These exchange
sinks will be cleaned up only after they become inactive, which by
default takes 5 minutes.

We could override the `esql.exchange.sink_inactive_interval` setting in 
the test to remove these exchange sinks faster. However, I think we
should allow exchange requests that close exchange sinks to bypass
cancellation, enabling quicker resource cleanup than the default
inactive interval.

Closes elastic#117253
  • Loading branch information
dnhatn authored Dec 3, 2024
1 parent af7d3f9 commit 4a9f632
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(sourcesFinished);
}

@Override
public TaskId getParentTask() {
// Exchange requests with `sourcesFinished=true` complete the remote sink and return without blocking.
// Masking the parent task allows these requests to bypass task cancellation, ensuring cleanup of the remote sink.
// TODO: Maybe add a separate action/request for closing exchange sinks?
if (sourcesFinished) {
return TaskId.EMPTY_TASK_ID;
}
return super.getParentTask();
}

/**
* True if the {@link ExchangeSourceHandler} has enough input.
* The corresponding {@link ExchangeSinkHandler} can drain pages and finish itself.
Expand Down Expand Up @@ -70,9 +81,9 @@ public int hashCode() {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (parentTaskId.isSet() == false) {
assert false : "ExchangeRequest must have a parent task";
throw new IllegalStateException("ExchangeRequest must have a parent task");
if (sourcesFinished == false && parentTaskId.isSet() == false) {
assert false : "ExchangeRequest with sourcesFinished=false must have a parent task";
throw new IllegalStateException("ExchangeRequest with sourcesFinished=false must have a parent task");
}
return new CancellableTask(id, type, action, "", parentTaskId, headers) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,28 +314,20 @@ static final class TransportRemoteSink implements RemoteSink {
@Override
public void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
if (allSourcesFinished) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener);
} else {
// already finished or promised
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
}
} else {
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
}
listener.onResponse(r);
}, e -> {
finished.set(true);
listener.onFailure(e);
}));
close(listener.map(unused -> new ExchangeResponse(blockFactory, null, true)));
return;
}
// already finished
if (finished.get()) {
listener.onResponse(new ExchangeResponse(blockFactory, null, true));
return;
}
doFetchPageAsync(false, ActionListener.wrap(r -> {
if (r.finished()) {
finished.set(true);
}
listener.onResponse(r);
}, e -> close(ActionListener.running(() -> listener.onFailure(e)))));
}

private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener) {
Expand All @@ -361,6 +353,15 @@ private void doFetchPageAsync(boolean allSourcesFinished, ActionListener<Exchang
}, responseExecutor)
);
}

@Override
public void close(ActionListener<Void> listener) {
if (finished.compareAndSet(false, true)) {
doFetchPageAsync(true, listener.delegateFailure((l, unused) -> l.onResponse(null)));
} else {
listener.onResponse(null);
}
}
}

// For testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ void onSinkFailed(Exception e) {
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
if (finished == false) {
finished = true;
outstandingSinks.finishInstance();
completionListener.onFailure(e);
remoteSink.close(ActionListener.running(() -> {
outstandingSinks.finishInstance();
completionListener.onFailure(e);
}));
}
}

Expand Down Expand Up @@ -262,7 +264,7 @@ public void onFailure(Exception e) {
failure.unwrapAndCollect(e);
}
buffer.waitForReading().listener().onResponse(null); // resume the Driver if it is being blocked on reading
sinkListener.onFailure(e);
remoteSink.close(ActionListener.running(() -> sinkListener.onFailure(e)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,14 @@
public interface RemoteSink {

void fetchPageAsync(boolean allSourcesFinished, ActionListener<ExchangeResponse> listener);

default void close(ActionListener<Void> listener) {
fetchPageAsync(true, listener.delegateFailure((l, r) -> {
try {
r.close();
} finally {
l.onResponse(null);
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.Matchers.equalTo;

public class ExchangeRequestTests extends ESTestCase {

public void testParentTask() {
ExchangeRequest r1 = new ExchangeRequest("1", true);
r1.setParentTask(new TaskId("node-1", 1));
assertSame(TaskId.EMPTY_TASK_ID, r1.getParentTask());

ExchangeRequest r2 = new ExchangeRequest("1", false);
r2.setParentTask(new TaskId("node-2", 2));
assertTrue(r2.getParentTask().isSet());
assertThat(r2.getParentTask(), equalTo((new TaskId("node-2", 2))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ public void testConcurrentWithTransportActions() {
}
}

public void testFailToRespondPage() {
public void testFailToRespondPage() throws Exception {
Settings settings = Settings.builder().build();
MockTransportService node0 = newTransportService();
ExchangeService exchange0 = new ExchangeService(settings, threadPool, ESQL_TEST_EXECUTOR, blockFactory());
Expand Down Expand Up @@ -558,7 +558,9 @@ public void sendResponse(TransportResponse transportResponse) {
Throwable cause = ExceptionsHelper.unwrap(err, IOException.class);
assertNotNull(cause);
assertThat(cause.getMessage(), equalTo("page is too large"));
sinkHandler.onFailure(new RuntimeException(cause));
PlainActionFuture<Void> sinkCompletionFuture = new PlainActionFuture<>();
sinkHandler.addCompletionListener(sinkCompletionFuture);
assertBusy(() -> assertTrue(sinkCompletionFuture.isDone()));
expectThrows(Exception.class, () -> sourceCompletionFuture.actionGet(10, TimeUnit.SECONDS));
}
}
Expand Down

0 comments on commit 4a9f632

Please sign in to comment.