Skip to content

Commit

Permalink
Fix leftover exchange in ManyShardsIT (elastic#117309) (elastic#117441)
Browse files Browse the repository at this point in the history
In the ManyShardsIT#testRejection test, we intercept exchange requests
and fail them with EsRejectedExecutionException, verifying that we
return a 400 response instead of a 500.

The issue with the current test is that if a data-node request never
arrives because the whole request was canceled after the exchange
request failed—the leftover exchange sink remains until it times out,
which defaults to 5 minutes. This change adjusts the test to use a
single data node and ensures exchange requests are only failed after the
data-node request has arrived.

Closes elastic#112406
Closes elastic#112418
Closes elastic#112424
  • Loading branch information
dnhatn authored Nov 25, 2024
1 parent 72f626f commit 1cb94d9
Showing 1 changed file with 72 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.MockSearchService;
Expand All @@ -26,6 +30,7 @@
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.hamcrest.Matchers;
import org.junit.Before;
Expand Down Expand Up @@ -56,6 +61,18 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
return plugins;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), InternalExchangePlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 5000)))
.build();
}

@Before
public void setupIndices() {
int numIndices = between(10, 20);
Expand Down Expand Up @@ -113,32 +130,64 @@ public void testConcurrentQueries() throws Exception {
}

public void testRejection() throws Exception {
String[] nodes = internalCluster().getNodeNames();
for (String node : nodes) {
MockTransportService ts = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
ts.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
handler.messageReceived(request, new TransportChannel() {
@Override
public String getProfileName() {
return channel.getProfileName();
}

@Override
public void sendResponse(TransportResponse response) {
channel.sendResponse(new RemoteTransportException("simulated", new EsRejectedExecutionException("test queue")));
}

@Override
public void sendResponse(Exception exception) {
channel.sendResponse(exception);
}
}, task);
DiscoveryNode dataNode = randomFrom(internalCluster().clusterService().state().nodes().getDataNodes().values());
String indexName = "single-node-index";
client().admin()
.indices()
.prepareCreate(indexName)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.require._name", dataNode.getName())
)
.setMapping("user", "type=keyword", "tags", "type=keyword")
.get();
client().prepareIndex(indexName)
.setSource("user", "u1", "tags", "lucene")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();

MockTransportService ts = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getName());
CountDownLatch dataNodeRequestLatch = new CountDownLatch(1);
ts.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
handler.messageReceived(request, channel, task);
dataNodeRequestLatch.countDown();
});

ts.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
ts.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
channel.sendResponse(e);
}

@Override
protected void doRun() throws Exception {
assertTrue(dataNodeRequestLatch.await(30, TimeUnit.SECONDS));
handler.messageReceived(request, new TransportChannel() {
@Override
public String getProfileName() {
return channel.getProfileName();
}

@Override
public void sendResponse(TransportResponse response) {
channel.sendResponse(new RemoteTransportException("simulated", new EsRejectedExecutionException("test queue")));
}

@Override
public void sendResponse(Exception exception) {
channel.sendResponse(exception);
}
}, task);
}
});
}
});

try {
AtomicReference<Exception> failure = new AtomicReference<>();
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("from test-* | stats count(user) by tags");
request.query("from single-node-index | stats count(user) by tags");
request.acceptedPragmaRisks(true);
request.pragmas(randomPragmas());
CountDownLatch queryLatch = new CountDownLatch(1);
Expand All @@ -151,9 +200,7 @@ public void sendResponse(Exception exception) {
assertThat(ExceptionsHelper.status(failure.get()), equalTo(RestStatus.TOO_MANY_REQUESTS));
assertThat(failure.get().getMessage(), equalTo("test queue"));
} finally {
for (String node : nodes) {
((MockTransportService) internalCluster().getInstance(TransportService.class, node)).clearAllRules();
}
ts.clearAllRules();
}
}

Expand Down

0 comments on commit 1cb94d9

Please sign in to comment.