Skip to content

Commit

Permalink
[fix][test] Improve the declare exchange test (#1150)
Browse files Browse the repository at this point in the history
(cherry picked from commit 21fb671)
  • Loading branch information
gaoran10 committed Mar 6, 2024
1 parent 3ddd411 commit fe08a7f
Showing 1 changed file with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

/**
Expand Down Expand Up @@ -121,24 +123,47 @@ private void doTestExchangeDeclaredWithEnumerationEquivalent(Channel channel)
for (BuiltinExchangeType exchangeType : BuiltinExchangeType.values()) {
channel.exchangeDeclare(NAME, exchangeType);
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
channel.exchangeDelete(NAME);
deleteExchangeWithRetry();

channel.exchangeDeclare(NAME, exchangeType, false);
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
channel.exchangeDelete(NAME);
deleteExchangeWithRetry();

channel.exchangeDeclare(NAME, exchangeType, false, false, null);
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
channel.exchangeDelete(NAME);
deleteExchangeWithRetry();

channel.exchangeDeclare(NAME, exchangeType, false, false, false, null);
verifyEquivalent(NAME, exchangeType.getType(), false, false, null);
channel.exchangeDelete(NAME);
deleteExchangeWithRetry();

channel.exchangeDeclareNoWait(NAME, exchangeType, false,
false, false, null);
// no check, this one is asynchronous
channel.exchangeDelete(NAME);
deleteExchangeWithRetry();
}
}

public void deleteExchangeWithRetry() throws IOException {
// the replicator cursor of exchange is created in async way,
// delete the exchange may fail due to non-empty directory error
//
// KeeperErrorCode = Directory not empty for
// /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test
//
// - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test
// - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test/__amqp_replicator__exchange_test
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> {
try {
channel.exchangeDelete(NAME);
} catch (Exception e) {
return false;
}
return true;
});
}

}

0 comments on commit fe08a7f

Please sign in to comment.