Skip to content

Commit

Permalink
Fix testConcurrentConnectsAndDisconnects (elastic#83098)
Browse files Browse the repository at this point in the history
As mentioned in elastic#77729 each call to `connectToNode` temporarily holds a
reference to the connection it just opened, so we must wait for all the
`connectToNode` calls to complete before we can assert that the node is
no longer connected. However sometimes we run the connection validator
in the background, which means the temporary reference lives for longer
than the call to `connectToNode`.

This commit tracks the lifetime of the temporary reference into the
validator. In fact we don't need to track the calls to `connectToNode`,
it's enough to wait for every call to the validator to complete.

Closes elastic#77728
  • Loading branch information
DaveCTurner authored Jan 26, 2022
1 parent 706281a commit af93c9a
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;

import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
Expand Down Expand Up @@ -335,7 +333,6 @@ public void testConcurrentConnects() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/77728")
public void testConcurrentConnectsAndDisconnects() throws Exception {
final DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
doAnswer(invocationOnMock -> {
Expand All @@ -345,24 +342,27 @@ public void testConcurrentConnectsAndDisconnects() throws Exception {
return null;
}).when(transport).openConnection(eq(node), any(), anyActionListener());

final Semaphore validatorPermits = new Semaphore(Integer.MAX_VALUE);

final ConnectionManager.ConnectionValidator validator = (c, p, l) -> {
if (randomBoolean()) {
l.onResponse(null);
} else {
threadPool.generic().execute(() -> l.onResponse(null));
}
assertTrue(validatorPermits.tryAcquire());
threadPool.executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)).execute(() -> {
try {
l.onResponse(null);
} finally {
validatorPermits.release();
}
});
};

final Semaphore pendingConnections = new Semaphore(1000);
final Semaphore pendingConnections = new Semaphore(between(1, 1000));
final int threadCount = between(1, 10);
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
final ReadWriteLock connectCompletionLock = new ReentrantReadWriteLock();

final Runnable action = new Runnable() {
@Override
public void run() {
if (pendingConnections.tryAcquire()) {
assertTrue(connectCompletionLock.readLock().tryLock());
connectionManager.connectToNode(node, null, validator, new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
Expand All @@ -385,7 +385,6 @@ public void onFailure(Exception e) {
}
}
});
connectCompletionLock.readLock().unlock();
} else {
countDownLatch.countDown();
}
Expand All @@ -396,9 +395,9 @@ public void onFailure(Exception e) {
threadPool.generic().execute(action);
}

assertTrue(countDownLatch.await(10, TimeUnit.SECONDS));
assertTrue(connectCompletionLock.writeLock().tryLock(10, TimeUnit.SECONDS));
assertFalse(connectionManager.nodeConnected(node));
assertTrue("threads did not all complete", countDownLatch.await(10, TimeUnit.SECONDS));
assertTrue("validatorPermits not all released", validatorPermits.tryAcquire(Integer.MAX_VALUE, 10, TimeUnit.SECONDS));
assertFalse("node still connected", connectionManager.nodeConnected(node));
connectionManager.close();
}

Expand Down

0 comments on commit af93c9a

Please sign in to comment.