Skip to content

Commit

Permalink
'connection manager receives messages when publishing is blocked' tes…
Browse files Browse the repository at this point in the history
…t simplified

getMonitorName fixed
  • Loading branch information
Oleg Smelov committed Jul 5, 2024
1 parent 5710005 commit b041ca6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ fun registerLiveness(name: String) = LIVENESS_ARBITER.createMonitor(name)
fun registerReadiness(name: String) = READINESS_ARBITER.createMonitor(name)

@JvmOverloads
fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, suffix))
fun registerLiveness(obj: Any, suffix: String = "") = LIVENESS_ARBITER.createMonitor(getMonitorName(obj, "liveness", suffix))
@JvmOverloads
fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, suffix))
fun registerReadiness(obj: Any, suffix: String = "") = READINESS_ARBITER.createMonitor(getMonitorName(obj, "readiness", suffix))

private fun getMonitorName(obj: Any, suffix: String) = "${obj::class.simpleName}_liveness_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix"
private fun getMonitorName(obj: Any, infix: String, suffix: String) = "${obj::class.simpleName}_${infix}_${obj.hashCode()}" + if (suffix.isEmpty()) "" else "_$suffix"

@JvmField
val LIVENESS_MONITOR = registerLiveness("user_liveness")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,6 @@ class TestConnectionManager {
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
LOGGER.info { "Started with port ${rabbit.amqpPort}" }
val messagesCount = 10
val blockAfter = 3
val countDown = CountDownLatch(messagesCount)
val messageSizeBytes = 7
createConnectionManager(
Expand All @@ -1222,15 +1221,14 @@ class TestConnectionManager {
)
).use { manager ->
repeat(messagesCount) { index ->
if (index == blockAfter) {
// blocks all publishers ( https://www.rabbitmq.com/docs/memory )
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
}

manager.basicPublish(exchange, routingKey, null, "Hello $index".toByteArray(Charsets.UTF_8))
LOGGER.info("Published $index")
}

// blocks all publishers ( https://www.rabbitmq.com/docs/memory )
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0")
manager.basicPublish(exchange, routingKey, null, "Final message.".toByteArray(Charsets.UTF_8)) // this message initiates publishers blocking

val receivedMessages = linkedSetOf<String>()
LOGGER.info { "creating consumer" }

Expand All @@ -1256,14 +1254,11 @@ class TestConnectionManager {
subscribeFuture.cancel(true)
}

Thread.sleep(20) // wait to receive messages sent before blocking
assertEquals(blockAfter.toLong(), messagesCount - countDown.count)

// unblocks publishers
rabbit.executeInContainerWithLogging("rabbitmqctl", "set_vm_memory_high_watermark", "0.4")

Thread.sleep(20) // wait to receive all messages
assertEquals(messagesCount.toLong(), messagesCount - countDown.count)
// delay receiving all messages
Awaitility.await("all messages received")
.pollInterval(10L, TimeUnit.MILLISECONDS)
.atMost(100L, TimeUnit.MILLISECONDS)
.until { countDown.count == 0L }
}
}
}
Expand Down

0 comments on commit b041ca6

Please sign in to comment.