Skip to content

Commit

Permalink
Verify driver thread context in responding thread (elastic#101097)
Browse files Browse the repository at this point in the history
The test failed when the Driver completed too quickly, causing the 
verification process to occur in the test thread rather than the
responding thread. This change ensures that the responding thread
performs the verification by moving the path inside the listener.

Closes elastic#101095
  • Loading branch information
dnhatn authored Oct 19, 2023
1 parent a62572f commit 2e9ef62
Showing 1 changed file with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
Expand All @@ -34,12 +32,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;

public class DriverTests extends ESTestCase {

public void testThreadContext() {
public void testThreadContext() throws Exception {
DriverContext driverContext = driverContext();
ThreadPool threadPool = threadPool();
try {
Expand All @@ -58,27 +58,28 @@ public Page getOutput() {
outPages.add(page);
}), () -> {});
ThreadContext threadContext = threadPool.getThreadContext();
SubscribableListener<Void> future = new SubscribableListener<>();
CountDownLatch latch = new CountDownLatch(1);
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader("user", "user1");
Driver.start(threadContext, threadPool.executor("esql"), driver, between(1, 1000), future);
Driver.start(threadContext, threadPool.executor("esql"), driver, between(1, 1000), ActionListener.running(() -> {
try {
assertRunningWithRegularUser(threadPool);
assertThat(outPages, equalTo(inPages));
Map<String, Set<String>> actualResponseHeaders = new HashMap<>();
for (Map.Entry<String, List<String>> e : threadPool.getThreadContext().getResponseHeaders().entrySet()) {
actualResponseHeaders.put(e.getKey(), Sets.newHashSet(e.getValue()));
}
Map<String, Set<String>> expectedResponseHeaders = new HashMap<>(warning1.warnings);
for (Map.Entry<String, Set<String>> e : warning2.warnings.entrySet()) {
expectedResponseHeaders.merge(e.getKey(), e.getValue(), Sets::union);
}
assertThat(actualResponseHeaders, equalTo(expectedResponseHeaders));
} finally {
latch.countDown();
}
}));
}
future.addListener(ActionListener.running(() -> {
assertRunningWithRegularUser(threadPool);
assertThat(outPages, equalTo(inPages));
Map<String, Set<String>> actualResponseHeaders = new HashMap<>();
for (Map.Entry<String, List<String>> e : threadPool.getThreadContext().getResponseHeaders().entrySet()) {
actualResponseHeaders.put(e.getKey(), Sets.newHashSet(e.getValue()));
}
Map<String, Set<String>> expectedResponseHeaders = new HashMap<>(warning1.warnings);
for (Map.Entry<String, Set<String>> e : warning2.warnings.entrySet()) {
expectedResponseHeaders.merge(e.getKey(), e.getValue(), Sets::union);
}
assertThat(actualResponseHeaders, equalTo(expectedResponseHeaders));
}));
PlainActionFuture<Void> completion = new PlainActionFuture<>();
future.addListener(completion);
completion.actionGet(TimeValue.timeValueSeconds(30));
assertTrue(latch.await(30, TimeUnit.SECONDS));
} finally {
terminate(threadPool);
}
Expand Down

0 comments on commit 2e9ef62

Please sign in to comment.