From 75c2e3c4e6ac20cd42a72705d48a8546c74e79be Mon Sep 17 00:00:00 2001 From: ran Date: Wed, 6 Mar 2024 15:50:32 +0800 Subject: [PATCH] [fix][test] Improve the declare exchange test (#1150) (cherry picked from commit 21fb6712548da3bab096e326e8e3fba3851357d1) --- .../functional/ExchangeDeclareTest.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java index c46b2fd4..57658254 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.awaitility.Awaitility; import org.testng.annotations.Test; /** @@ -121,24 +123,47 @@ private void doTestExchangeDeclaredWithEnumerationEquivalent(Channel channel) for (BuiltinExchangeType exchangeType : BuiltinExchangeType.values()) { channel.exchangeDeclare(NAME, exchangeType); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false, false, null); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false, false, false, null); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclareNoWait(NAME, exchangeType, false, false, false, null); // no check, this one is asynchronous - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); } } + + public void deleteExchangeWithRetry() throws IOException { + // the replicator cursor of exchange is created in async way, + // delete the exchange may fail due to non-empty directory error + // + // KeeperErrorCode = Directory not empty for + // /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test + // + // - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test + // - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test/__amqp_replicator__exchange_test + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> { + try { + channel.exchangeDelete(NAME); + } catch (Exception e) { + return false; + } + return true; + }); + } + }