Skip to content

Commit

Permalink
MessageRouterContext
Browse files Browse the repository at this point in the history
tests updates
  • Loading branch information
Oleg Smelov committed Jul 8, 2024
1 parent e0f6600 commit f80b343
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public abstract class AbstractCommonFactory implements AutoCloseable {
private final Class<? extends MessageRouter<EventBatch>> eventBatchRouterClass;
private final Class<? extends GrpcRouter> grpcRouterClass;
private final Class<? extends NotificationRouter<EventBatch>> notificationEventBatchRouterClass;
private final LazyProvider<ConnectionManager> rabbitMqConnectionManager =
lazyAutocloseable("connection-manager", this::createRabbitMQConnectionManager);
private final LazyProvider<ConnectionManager> rabbitMqPublishConnectionManager =
lazyAutocloseable("publish-connection-manager", this::createRabbitMQConnectionManager);
private final LazyProvider<ConnectionManager> rabbitMqConsumeConnectionManager =
lazyAutocloseable("consume-connection-manager", this::createRabbitMQConnectionManager);
private final LazyProvider<MessageRouterContext> routerContext =
lazy("router-context", this::createMessageRouterContext);
private final LazyProvider<MessageRouter<MessageBatch>> messageRouterParsedBatch =
Expand Down Expand Up @@ -648,7 +650,8 @@ private MessageRouterContext createMessageRouterContext() {
@NotNull
private MessageRouterContext createRouterContext(MessageRouterMonitor contextMonitor) {
return new DefaultMessageRouterContext(
getRabbitMqConnectionManager(),
getRabbitMqPublishConnectionManager(),
getRabbitMqConsumeConnectionManager(),
contextMonitor,
getMessageRouterConfiguration(),
getBoxConfiguration()
Expand All @@ -663,8 +666,12 @@ protected ConnectionManager createRabbitMQConnectionManager() {
return new ConnectionManager(getBoxConfiguration().getBoxName(), getRabbitMqConfiguration(), getConnectionManagerConfiguration());
}

protected ConnectionManager getRabbitMqConnectionManager() {
return rabbitMqConnectionManager.get();
protected ConnectionManager getRabbitMqPublishConnectionManager() {
return rabbitMqPublishConnectionManager.get();
}

protected ConnectionManager getRabbitMqConsumeConnectionManager() {
return rabbitMqConsumeConnectionManager.get();
}

public MessageID.Builder newMessageIDBuilder() {
Expand Down Expand Up @@ -700,9 +707,15 @@ public void close() {
}

try {
rabbitMqConnectionManager.close();
rabbitMqPublishConnectionManager.close();
} catch (Exception e) {
LOGGER.error("Failed to close RabbitMQ publish connection", e);
}

try {
rabbitMqConsumeConnectionManager.close();
} catch (Exception e) {
LOGGER.error("Failed to close RabbitMQ connection", e);
LOGGER.error("Failed to close RabbitMQ consume connection", e);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface MessageRouter<T> extends AutoCloseable {
default void init(@NotNull ConnectionManager connectionManager, @NotNull MessageRouterConfiguration configuration) {
Objects.requireNonNull(connectionManager, "Connection owner can not be null");
Objects.requireNonNull(configuration, "Configuration cannot be null");
init(new DefaultMessageRouterContext(connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration, new BoxConfiguration()));
init(new DefaultMessageRouterContext(connectionManager, connectionManager, MessageRouterMonitor.DEFAULT_MONITOR, configuration, new BoxConfiguration()));
}

default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter<MessageGroupBatch> groupBatchRouter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager

interface MessageRouterContext {
val connectionManager: ConnectionManager
val publishConnectionManager: ConnectionManager
val consumeConnectionManager: ConnectionManager
val routerMonitor: MessageRouterMonitor
val configuration: MessageRouterConfiguration
val boxConfiguration: BoxConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import com.exactpro.th2.common.schema.message.configuration.MessageRouterConfigu
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager

class DefaultMessageRouterContext(
override val connectionManager: ConnectionManager,
override val publishConnectionManager: ConnectionManager,
override val consumeConnectionManager: ConnectionManager,
override val routerMonitor: MessageRouterMonitor,
override val configuration: MessageRouterConfiguration,
override val boxConfiguration: BoxConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ abstract class AbstractRabbitRouter<T> : MessageRouter<T> {
get() = context.configuration

protected val publishConnectionManager: ConnectionManager
get() = context.connectionManager
get() = context.publishConnectionManager

protected val consumeConnectionManager: ConnectionManager
get() = context.connectionManager
get() = context.consumeConnectionManager

private val boxConfiguration: BoxConfiguration
get() = context.boxConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ class NotificationEventBatchRouter : NotificationRouter<EventBatch> {

override fun init(context: MessageRouterContext) {
sender = NotificationEventBatchSender(
context.connectionManager,
context.publishConnectionManager,
context.configuration.globalNotification.exchange
)
queue = context.connectionManager.queueExclusiveDeclareAndBind(
queue = context.consumeConnectionManager.queueExclusiveDeclareAndBind(
context.configuration.globalNotification.exchange
)
subscriber = NotificationEventBatchSubscriber(context.connectionManager, queue)
subscriber = NotificationEventBatchSubscriber(context.consumeConnectionManager, queue)
}

override fun send(message: EventBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,43 +61,46 @@ class AbstractRabbitRouterIntegrationTest {
rabbitMQContainer.start()
K_LOGGER.info { "Started with port ${rabbitMQContainer.amqpPort}, rest ${rabbitMQContainer.httpUrl} ${rabbitMQContainer.adminUsername} ${rabbitMQContainer.adminPassword} " }

createConnectionManager(rabbitMQContainer).use { firstManager ->
createRouter(firstManager).use { firstRouter ->
val messageA = "test-message-a"
val messageB = "test-message-b"
val messageC = "test-message-c"
val messageD = "test-message-d"

val queue = ArrayBlockingQueue<Delivery>(4)

firstRouter.send(messageA)
firstRouter.send(messageB)
firstRouter.send(messageC)
firstRouter.send(messageD)

connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageA, false, ManualAckDeliveryCallback.Confirmation::confirm),
Expectation(messageB, false, ManualAckDeliveryCallback.Confirmation::reject),
Expectation(messageC, false) { },
Expectation(messageD, false) { },
createConnectionManager(rabbitMQContainer).use { publishManager ->
createConnectionManager(rabbitMQContainer).use { consumeManager ->

createRouter(publishManager, consumeManager).use { firstRouter ->
val messageA = "test-message-a"
val messageB = "test-message-b"
val messageC = "test-message-c"
val messageD = "test-message-d"

val queue = ArrayBlockingQueue<Delivery>(4)

firstRouter.send(messageA)
firstRouter.send(messageB)
firstRouter.send(messageC)
firstRouter.send(messageD)

connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageA, false, ManualAckDeliveryCallback.Confirmation::confirm),
Expectation(messageB, false, ManualAckDeliveryCallback.Confirmation::reject),
Expectation(messageC, false) { },
Expectation(messageD, false) { },
)
)
)

connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageC, true, ManualAckDeliveryCallback.Confirmation::confirm),
Expectation(messageD, true) { },
connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageC, true, ManualAckDeliveryCallback.Confirmation::confirm),
Expectation(messageD, true) { },
)
)
)

connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageD, true, ManualAckDeliveryCallback.Confirmation::reject),
connectAndCheck(
rabbitMQContainer, queue, listOf(
Expectation(messageD, true, ManualAckDeliveryCallback.Confirmation::reject),
)
)
)

connectAndCheck(rabbitMQContainer, queue, emptyList())
connectAndCheck(rabbitMQContainer, queue, emptyList())
}
}
}
}
Expand All @@ -108,48 +111,50 @@ class AbstractRabbitRouterIntegrationTest {
queue: ArrayBlockingQueue<Delivery>,
expectations: List<Expectation>,
) {
createConnectionManager(rabbitMQContainer).use { manager ->
createRouter(manager).use { router ->
val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation ->
queue.put(
Delivery(
message,
deliveryMetadata.isRedelivered,
confirmation
createConnectionManager(rabbitMQContainer).use { publishManager ->
createConnectionManager(rabbitMQContainer).use { consumeManager ->
createRouter(publishManager, consumeManager).use { router ->
val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation ->
queue.put(
Delivery(
message,
deliveryMetadata.isRedelivered,
confirmation
)
)
)
})

try {
expectations.forEach { expectation ->
val delivery = assertNotNull(queue.poll(1, TimeUnit.SECONDS))
assertEquals(expectation.message, delivery.message, "Message")
assertEquals(expectation.redelivery, delivery.redelivery, "Redelivery flag")
expectation.action.invoke(delivery.confirmation)
})

try {
expectations.forEach { expectation ->
val delivery = assertNotNull(queue.poll(1, TimeUnit.SECONDS))
assertEquals(expectation.message, delivery.message, "Message")
assertEquals(expectation.redelivery, delivery.redelivery, "Redelivery flag")
expectation.action.invoke(delivery.confirmation)
}

assertNull(queue.poll(1, TimeUnit.SECONDS))
} finally {
monitor.unsubscribe()
}

assertNull(queue.poll(1, TimeUnit.SECONDS))
} finally {
monitor.unsubscribe()
}
}

createRouter(manager).use { router ->
val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation ->
queue.put(
Delivery(
message,
deliveryMetadata.isRedelivered,
confirmation
createRouter(publishManager, consumeManager).use { router ->
val monitor = router.subscribeWithManualAck({ deliveryMetadata, message, confirmation ->
queue.put(
Delivery(
message,
deliveryMetadata.isRedelivered,
confirmation
)
)
)
})
})

try {
// RabbitMQ doesn't resend messages after resubscribe using the same connection and channel
assertNull(queue.poll(1, TimeUnit.SECONDS))
} finally {
monitor.unsubscribe()
try {
// RabbitMQ doesn't resend messages after resubscribe using the same connection and channel
assertNull(queue.poll(1, TimeUnit.SECONDS))
} finally {
monitor.unsubscribe()
}
}
}
}
Expand All @@ -175,14 +180,15 @@ class AbstractRabbitRouterIntegrationTest {
),
)

private fun createRouter(connectionManager: ConnectionManager) = RabbitCustomRouter(
private fun createRouter(publishConnectionManager: ConnectionManager, consumeConnectionManager: ConnectionManager) = RabbitCustomRouter(
"test-custom-tag",
arrayOf("test-label"),
TestMessageConverter()
).apply {
init(
DefaultMessageRouterContext(
connectionManager,
publishConnectionManager,
consumeConnectionManager,
mock { },
MessageRouterConfiguration(
mapOf(
Expand Down
Loading

0 comments on commit f80b343

Please sign in to comment.