From b041ca6779b951b761e717a5e899d009bf125877 Mon Sep 17 00:00:00 2001 From: Oleg Smelov Date: Fri, 5 Jul 2024 13:20:53 +0400 Subject: [PATCH] 'connection manager receives messages when publishing is blocked' test simplified getMonitorName fixed --- .../th2/common/metrics/CommonMetrics.kt | 6 ++--- .../connection/TestConnectionManager.kt | 23 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt b/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt index 0d9bcb86..e076036a 100644 --- a/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt +++ b/src/main/kotlin/com/exactpro/th2/common/metrics/CommonMetrics.kt @@ -52,11 +52,11 @@ fun registerLiveness(name: String) = LIVENESS_ARBITER.createMonitor(name) fun registerReadiness(name: String) = READINESS_ARBITER.createMonitor(name) @JvmOverloads -fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, suffix)) +fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, "liveness", suffix)) @JvmOverloads -fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, suffix)) +fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, "readiness", suffix)) -private fun getMonitorName(obj: Any, suffix: String) = "${obj::class.simpleName}_liveness_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix" +private fun getMonitorName(obj: Any, infix: String, suffix: String) = "${obj::class.simpleName}_${infix}_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix" @JvmField val LIVENESS_MONITOR = registerLiveness("user_liveness") diff --git a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt index 7d2dca7a..311db435 100644 --- a/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt +++ b/src/test/kotlin/com/exactpro/th2/common/schema/message/impl/rabbitmq/connection/TestConnectionManager.kt @@ -1204,7 +1204,6 @@ class TestConnectionManager { LOGGER.info { "Started with port ${rabbit.amqpPort}" } LOGGER.info { "Started with port ${rabbit.amqpPort}" } val messagesCount = 10 - val blockAfter = 3 val countDown = CountDownLatch(messagesCount) val messageSizeBytes = 7 createConnectionManager( @@ -1222,15 +1221,14 @@ class TestConnectionManager { ) ).use { manager -> repeat(messagesCount) { index -> - if (index == blockAfter) { - // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) - rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") - } - manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8)) LOGGER.info("Published $index") } + // blocks all publishers ( https://www.rabbitmq.com/docs/memory ) + rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0") + manager.basicPublish(exchange, routingKey, null, "Final message.".toByteArray(Charsets.UTF_8)) // this message initiates publishers blocking + val receivedMessages = linkedSetOf() LOGGER.info { "creating consumer" } @@ -1256,14 +1254,11 @@ class TestConnectionManager { subscribeFuture.cancel(true) } - Thread.sleep(20) // wait to receive messages sent before blocking - assertEquals(blockAfter.toLong(), messagesCount - countDown.count) - - // unblocks publishers - rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4") - - Thread.sleep(20) // wait to receive all messages - assertEquals(messagesCount.toLong(), messagesCount - countDown.count) + // delay receiving all messages + Awaitility.await("all messages received") + .pollInterval(10L, TimeUnit.MILLISECONDS) + .atMost(100L, TimeUnit.MILLISECONDS) + .until { countDown.count == 0L } } } }