From 5f3595bba9bc8485be58fbc0f39e440cffd87bf5 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Mon, 14 Oct 2024 15:18:25 +1100 Subject: [PATCH] Add a callback for onConnectionClosed to MockTransportService (#114564) The callback is added to allow inserting additional behaviour such as delay when handling closed connection. --- .../test/transport/MockTransportService.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 57a6d1e09c52d..c4e1c6c7a0681 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -80,6 +80,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -104,6 +106,7 @@ public class MockTransportService extends TransportService { private final Map> openConnections = new HashMap<>(); private final List onStopListeners = new CopyOnWriteArrayList<>(); + private final AtomicReference> onConnectionClosedCallback = new AtomicReference<>(); public static class TestPlugin extends Plugin { @Override @@ -788,6 +791,19 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi })); } + public void setOnConnectionClosedCallback(Consumer callback) { + onConnectionClosedCallback.set(callback); + } + + @Override + public void onConnectionClosed(Transport.Connection connection) { + final Consumer callback = onConnectionClosedCallback.get(); + if (callback != null) { + callback.accept(connection); + } + super.onConnectionClosed(connection); + } + public void addOnStopListener(Runnable listener) { onStopListeners.add(listener); }