From 4484bac063b95559f1aeb0734b57991f8b0e6c0d Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Fri, 12 Jul 2024 12:48:00 +0530 Subject: [PATCH] Solace Direct Messaging Capability Removed Unused config fix failing test Fix failing tests Review comments and Documentation update Fixed failing test and updated documentation Direct Messaging: Publish failed listener to track failures and update health status Updated property names which are common to direct and persistent messaging Solace Direct Messaging capability --- .../quarkus-solace-extension-common.adoc | 20 ++ .../quarkus-solace-extension-incoming.adoc | 79 ++++- .../quarkus-solace-extension-outgoing.adoc | 22 +- docs/modules/ROOT/pages/index.adoc | 55 +++- .../quarkus/messaging/SolaceConnector.java | 65 +++- .../messaging/fault/SolaceErrorTopic.java | 35 +-- .../quarkus/messaging/fault/SolaceFail.java | 6 +- .../SolaceDirectMessageIncomingChannel.java | 241 +++++++++++++++ .../incoming/SolaceInboundMessage.java | 7 +- .../incoming/SolaceIncomingChannel.java | 19 +- .../SolaceDirectMessageOutgoingChannel.java | 267 ++++++++++++++++ .../outgoing/SolaceOutboundMetadata.java | 17 +- .../outgoing/SolaceOutgoingChannel.java | 4 + .../quarkus/messaging/SolaceConsumerTest.java | 24 +- .../SolaceDirectMessageConsumerTest.java | 292 ++++++++++++++++++ .../SolaceDirectMessagePublisherTest.java | 212 +++++++++++++ .../quarkus/messaging/SolaceOAuthTest.java | 2 +- .../messaging/SolaceProcessorTest.java | 2 +- .../health/SolaceConsumerHealthTest.java | 4 +- ...SolaceDirectMessageConsumerHealthTest.java | 137 ++++++++ ...olaceDirectMessagePublisherHealthTest.java | 117 +++++++ .../locals/LocalPropagationAckTest.java | 2 +- .../locals/LocalPropagationTest.java | 2 +- .../perf/EndToEndPerformanceTest.java | 16 +- .../perf/SolaceConsumerPerformanceTest.java | 2 +- ...eDirectMessageConsumerPerformanceTest.java | 86 ++++++ ...DirectMessagePublisherPerformanceTest.java | 201 ++++++++++++ .../tracing/TracingPropogationTest.java | 4 +- .../src/main/resources/application.properties | 4 +- 29 files changed, 1838 insertions(+), 106 deletions(-) create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceDirectMessageIncomingChannel.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceDirectMessageOutgoingChannel.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessageConsumerTest.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessagePublisherTest.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessageConsumerHealthTest.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessagePublisherHealthTest.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessageConsumerPerformanceTest.java create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessagePublisherPerformanceTest.java diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc index 4d2cbf0..854ae1d 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-common.adoc @@ -9,6 +9,26 @@ h|[[quarkus-solace_configuration_common]]link:#quarkus-solace_configuration_comm h|Type h|Default +a| [[quarkus-solace_quarkus.client.type]]`link:#quarkus-solace_quarkus.client.type[client.type]` + + +[.description] +-- +The type of client when establishing connection to Solace. Solace supports two types of client Direct and Persistent. + +Use Direct client where message loss can be tolerated. The publisher publishes the event and the broker doesn't send any acknowledgement back to publisher for guaranteed delivery. + +Use Persistent client where message loss cannot be tolerated. The publisher publishes the event and the broker sends an acknowledgement that message is guaranteed for delivery. + +// ifdef::add-copy-button-to-env-var[] +// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[] +// endif::add-copy-button-to-env-var[] +// ifndef::add-copy-button-to-env-var[] +// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` +// endif::add-copy-button-to-env-var[] +--|string +| `persistent` + a| [[quarkus-solace_quarkus.client.lazy.start]]`link:#quarkus-solace_quarkus.client.lazy.start[client.lazy.start]` diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc index 50a6b4c..e76c19b 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc @@ -9,6 +9,43 @@ h|[[quarkus-solace_configuration_incoming]]link:#quarkus-solace_configuration_in h|Type h|Default +a| [[quarkus-solace_quarkus.client.type.direct.back-pressure.strategy]]`link:#quarkus-solace_quarkus.client.type.direct.back-pressure.strategy[client.type.direct.back-pressure.strategy]` + + +[.description] +-- +The back-pressure strategy to be applied for direct message consumer. + +Supported values are `oldest`, `latest` & `elastic`. + +Refer to https://docs.solace.com/API/API-Developer-Guide-Java/Java-DM-Subscribe.htm#Configuring-Back-Pressure[Handling Back-Pressure When Subscribing to Direct Messages] + +// ifdef::add-copy-button-to-env-var[] +// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[] +// endif::add-copy-button-to-env-var[] +// ifndef::add-copy-button-to-env-var[] +// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` +// endif::add-copy-button-to-env-var[] +--|string +| `elastic` + +a| [[quarkus-solace_quarkus.client.type.direct.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.client.type.direct.back-pressure.buffer-capacity[client.type.direct.back-pressure.buffer-capacity]` + + +[.description] +-- + +It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages. + +// ifdef::add-copy-button-to-env-var[] +// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[] +// endif::add-copy-button-to-env-var[] +// ifndef::add-copy-button-to-env-var[] +// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` +// endif::add-copy-button-to-env-var[] +--|int +| `1024` + a| [[quarkus-solace_quarkus.consumer.queue.name]]`link:#quarkus-solace_quarkus.consumer.queue.name[consumer.queue.name]` @@ -77,12 +114,16 @@ Whether to add configured subscriptions to queue. Will fail if permissions to co |`false` -a| [[quarkus-solace_quarkus.consumer.queue.subscriptions]]`link:#quarkus-solace_quarkus.consumer.queue.subscriptions[consumer.queue.subscriptions]` +a| [[quarkus-solace_quarkus.consumer.subscriptions]]`link:#quarkus-solace_quarkus.consumer.subscriptions[consumer.subscriptions]` [.description] -- -The comma separated list of subscriptions, the channel name if empty. This configuration is considered if `consumer.queue.add-additional-subscriptions` is set to true. +The comma separated list of subscriptions, the channel name if empty. + +If `client.type` is `persistent` this configuration is considered only if `consumer.queue.add-additional-subscriptions` is set to true. + +If `client.type` is `direct` this is used by default. // ifdef::add-copy-button-to-env-var[] // Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_DEVSERVICES_SHARED+++[] @@ -91,7 +132,7 @@ The comma separated list of subscriptions, the channel name if empty. This confi // Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SHARED+++` // endif::add-copy-button-to-env-var[] --|string -| +| required icon:exclamation-circle[title=Configuration property is required] a| [[quarkus-solace_quarkus.consumer.queue.selector-query]]`link:#quarkus-solace_quarkus.consumer.queue.selector-query[quarkus.consumer.queue.selector-query]` @@ -158,16 +199,18 @@ The receiver replay replication group message id. // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_CONTAINER_ENV+++` // endif::add-copy-button-to-env-var[] ---|`string` +--|string | -a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]` +a| [[quarkus-solace_quarkus.consumer.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.failure-strategy[consumer.failure-strategy]` [.description] -- Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`. +Following are the failure strategies supported when `client.type` is `persistent`. + `ignore` - Mark the message as IGNORED, will continue processing with next message. `fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. @@ -176,16 +219,22 @@ Specify the failure strategy to apply when a message consumed from Solace broker `error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue. +Following are the failure strategies supported when `client.type` is `direct`. + +`ignore` - Mark the message as IGNORED, will continue processing with next message. + +`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue. + // ifdef::add-copy-button-to-env-var[] // Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[] // endif::add-copy-button-to-env-var[] // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`string` +--|string | ignore -a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]` +a| [[quarkus-solace_quarkus.consumer.error.topic]]`link:#quarkus-solace_quarkus.consumer.error.topic[consumer.error.topic]` [.description] @@ -198,10 +247,10 @@ The error topic where message should be published in case of error. // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`string` +--|string | -a| [[quarkus-solace_quarkus.consumer.queue.error.message.dmq-eligible]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.dmq-eligible[consumer.queue.error.message.dmq-eligible]` +a| [[quarkus-solace_quarkus.consumer.error.message.dmq-eligible]]`link:#quarkus-solace_quarkus.consumer.error.message.dmq-eligible[consumer.error.message.dmq-eligible]` [.description] @@ -214,10 +263,10 @@ Whether error message is eligible to move to dead message queue. // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`boolean` +--|boolean | `false` -a| [[quarkus-solace_quarkus.consumer.queue.error.message.ttl]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.ttl[consumer.queue.error.message.ttl]` +a| [[quarkus-solace_quarkus.consumer.error.message.ttl]]`link:#quarkus-solace_quarkus.consumer.error.message.ttl[consumer.error.message.ttl]` [.description] @@ -230,10 +279,10 @@ TTL for Error message before moving to dead message queue. // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`long` +--|long | `null` -a| [[quarkus-solace_quarkus.consumer.queue.error.message.max-delivery-attempts]]`link:#quarkus-solace_quarkus.consumer.queue.error.message.max-delivery-attempts[consumer.queue.error.message.max-delivery-attempts]` +a| [[quarkus-solace_quarkus.consumer.error.message.max-delivery-attempts]]`link:#quarkus-solace_quarkus.consumer.error.message.max-delivery-attempts[consumer.error.message.max-delivery-attempts]` [.description] @@ -246,7 +295,7 @@ Maximum number of attempts to send a failed message to the error topic in case o // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`int` +--|int | `3` a| [[quarkus-solace_quarkus.consumer.queue.supports-nacks]]`link:#quarkus-solace_quarkus.consumer.queue.supports-nacks[consumer.queue.supports-nacks]` @@ -262,7 +311,7 @@ Whether to enable negative acknowledgments on failed messages. Nacks are support // ifndef::add-copy-button-to-env-var[] // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] ---|`boolean` +--|boolean | `false` |=== \ No newline at end of file diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc index 73ccbed..4590238 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc @@ -9,6 +9,24 @@ h|[[quarkus-solace_configuration_outgoing]]link:#quarkus-solace_configuration_ou h|Type h|Default +a| [[quarkus-solace_quarkus.client.type.direct.waitForPublishReceipt.timeout]]`link:#quarkus-solace_quarkus.client.type.direct.waitForPublishReceipt.timeout[client.type.direct.waitForPublishReceipt.timeout]` + + +[.description] +-- +In case of direct messaging broker will not send any acknowledgement for published messages. + +We can configure a timeout in milliseconds to check for any publish failures. If no failed event is received during this timeout published message is assumed to be successful. + +// ifdef::add-copy-button-to-env-var[] +// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_METRICS_ENABLED+++[] +// endif::add-copy-button-to-env-var[] +// ifndef::add-copy-button-to-env-var[] +// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` +// endif::add-copy-button-to-env-var[] +--|long +| `30` + a| [[quarkus-solace_quarkus.producer.topic]]`link:#quarkus-solace_quarkus.producer.topic[producer.topic]` @@ -48,7 +66,9 @@ a| [[quarkus-solace_quarkus.producer.waitForPublishReceipt]]`link:#quarkus-solac [.description] -- -Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message. +Whether the client waits to receive publish receipt from Solace broker before sending acknowledgment. This property is considered only when `client.type` is `persistent`. + +In case of `client.type` is `direct` publish receipt is not sent by broker and extension returns success acknowledgment by default. However, the extension will log if there is any failure when publishing the event. // ifdef::add-copy-button-to-env-var[] // Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE_DEVSERVICES_ENABLED+++[] diff --git a/docs/modules/ROOT/pages/index.adoc b/docs/modules/ROOT/pages/index.adoc index f224e78..7c07791 100644 --- a/docs/modules/ROOT/pages/index.adoc +++ b/docs/modules/ROOT/pages/index.adoc @@ -170,8 +170,6 @@ quarkus.solace.authentication.basic.password=test In similar way other authentication mechanisms can be enabled -CAUTION: In the current version we don't recommend to use OAuth as it in evolving phase. - [[configuring-quarkus-solace-messaging-connector]] == Configuring Quarkus Solace Messaging Connector @@ -184,7 +182,9 @@ Reactive Messaging framework supports different messaging backends it employs a * Channels are connected to message backends using connectors. Connectors are configured to map incoming messages to a specific channel (consumed by the application) and collect outgoing messages sent to a specific channel. Each connector is dedicated to a specific messaging technology. For example, the connector dealing with Solace is named `quarkus-solace`. -A minimal configuration for the Solace connector with an incoming channel looks like the following: +The extension supports two types of messaging client `direct` and `persistent`. By default `persistent` client is enabled. + +A minimal configuration for the Solace connector with an incoming channel and `persistent` client looks like the following: The following lines of configuration assumes that a exclusive queue is already provisioned on the broker [source,properties] @@ -210,7 +210,21 @@ quarkus.solace.authentication.basic.password=basic mp.messaging.incoming.temperatures.connector=quarkus-solace mp.messaging.incoming.temperatures.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.temperatures.consumer.queue.add-additional-subscriptions=true -mp.messaging.incoming.temperatures.consumer.queue.subscriptions=hello/foobar +mp.messaging.incoming.temperatures.consumer.subscriptions=hello/foobar +---- + +A minimal configuration for the Solace connector with an incoming channel and `direct` client looks like the following: + +[source,properties] +---- +quarkus.solace.host=tcp://localhost:55555 +quarkus.solace.vpn=default +quarkus.solace.authentication.basic.username=basic +quarkus.solace.authentication.basic.password=basic + +mp.messaging.incoming.temperatures.client.type=direct +mp.messaging.incoming.temperatures.connector=quarkus-solace +mp.messaging.incoming.temperatures.consumer.subscriptions=sensor/temperatures ---- 1. When running in dev mode or tests dev services will automatically start a Solace PubSub+ broker and if broker configuration details are not provided the extension automatically picks up the details of broker started by dev services. @@ -257,7 +271,8 @@ __SolaceInboundMessage__ This is a wrapper to incoming Inbound Message from Sola [source,java] ---- -import com.solace.messaging.receiver.InboundMessage;@ApplicationScoped +import com.solace.messaging.receiver.InboundMessage; +@ApplicationScoped public class TemperaturesConsumer { @Incoming("temperatures") public void consume(InboundMessage inboundMessage) { @@ -270,7 +285,9 @@ public class TemperaturesConsumer { [[acknowledgement-handling]] == Acknowledgment Handling -By default, acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing. +By default, for `persistent` client acknowledgement strategy is set to client acknowledgement. This gives greater control over acknowledgement and ensures that messages are acknowledged only after successful processing. + +In case of `direct` client no acknowledgement is sent to the broker. [source,java] ---- @@ -291,7 +308,7 @@ public class TemperaturesConsumer { [[failure-strategies]] == Failure Strategies -If a message is nacked, a failure strategy is applied. Refer to <><>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension. +If a message is nacked, a failure strategy is applied. Refer to <><>. The default strategy is set to `ignore` and move on to next message. Following are the strategies supported by Quarkus Solace Messaging Connector extension. `ignore` - Mark the message as IGNORED, will continue processing with next message. It TTL and DMQ are configured on the queue message will be moved to DMQ once TTL is reached. If no DMQ is configured but TTL is set message will be lost. @@ -304,7 +321,7 @@ If a message is nacked, a failure strategy is applied. Refer to <> is enabled. + +This property is considered only when <> is set to `persistent`. The connector will wait for response from broker and will return success or failed acknowledgement. + +In case of <> is set to `direct` this property is ignored as broker will not send any response. By default, success acknowledgement is returned and any failures during publish are logged as exceptions. + == Producer Back-Pressure strategies Quarkus Solace Messaging connector provides three different strategies to handle back-pressure when publishing messages diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java index 289408f..449b90c 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java @@ -19,7 +19,9 @@ import org.eclipse.microprofile.reactive.messaging.spi.Connector; import com.solace.messaging.MessagingService; +import com.solace.quarkus.messaging.incoming.SolaceDirectMessageIncomingChannel; import com.solace.quarkus.messaging.incoming.SolaceIncomingChannel; +import com.solace.quarkus.messaging.outgoing.SolaceDirectMessageOutgoingChannel; import com.solace.quarkus.messaging.outgoing.SolaceOutgoingChannel; import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; @@ -34,25 +36,28 @@ @Connector(SolaceConnector.CONNECTOR_NAME) // TODO only persisted is implemented -//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted") +@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persistent") @ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false") @ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true") @ConnectorAttribute(name = "client.tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to enable tracing for incoming and outgoing messages", defaultValue = "false") @ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000") + +@ConnectorAttribute(name = "client.type.direct.back-pressure.strategy", type = "string", direction = INCOMING, description = "It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "elastic") +@ConnectorAttribute(name = "client.type.direct.back-pressure.buffer-capacity", type = "int", direction = INCOMING, description = "It is possible for the client application to consume messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "1024") @ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver.") @ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver. Supported values `durable-exclusive`, `durable-non-exclusive`, `non-durable-exclusive`", defaultValue = "durable-exclusive") @ConnectorAttribute(name = "consumer.queue.missing-resource-creation-strategy", type = "string", direction = INCOMING, description = "Missing resource creation strategy", defaultValue = "do-not-create") @ConnectorAttribute(name = "consumer.queue.add-additional-subscriptions", type = "boolean", direction = INCOMING, description = "Whether to add configured subscriptions to queue. Will fail if permissions to configure subscriptions is not allowed on broker", defaultValue = "false") -@ConnectorAttribute(name = "consumer.queue.subscriptions", type = "string", direction = INCOMING, description = "The comma separated list of subscriptions, the channel name if empty") +@ConnectorAttribute(name = "consumer.subscriptions", type = "string", direction = INCOMING, description = "The comma separated list of subscriptions, the channel name if empty") @ConnectorAttribute(name = "consumer.queue.selector-query", type = "string", direction = INCOMING, description = "The receiver selector query") @ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy. Supported values all-messages, time-based, replication-group-message-id") @ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time") @ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id") -@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") -@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") -@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") -@ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.") -@ConnectorAttribute(name = "consumer.queue.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3") +@ConnectorAttribute(name = "consumer.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") +@ConnectorAttribute(name = "consumer.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") +@ConnectorAttribute(name = "consumer.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") +@ConnectorAttribute(name = "consumer.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.") +@ConnectorAttribute(name = "consumer.error.message.max-delivery-attempts", type = "int", direction = INCOMING, description = "Maximum number of attempts to send a failed message to the error topic in case of failure. Each attempt will have a backoff interval of 1 second. When all delivery attempts have been exhausted, the failed message will be requeued on the queue for redelivery.", defaultValue = "3") @ConnectorAttribute(name = "consumer.queue.supports-nacks", type = "boolean", direction = INCOMING, description = "Whether to enable negative acknowledgments on failed messages. Nacks are supported on event brokers 10.2.1 and later. If an event broker does not support Nacks, an exception is thrown", defaultValue = "false") @ConnectorAttribute(name = "producer.topic", type = "string", direction = OUTGOING, description = "The topic to publish messages, by default the channel name") @@ -75,12 +80,16 @@ public class SolaceConnector implements InboundConnector, OutboundConnector, Hea Vertx vertx; List incomingChannels = new CopyOnWriteArrayList<>(); + List directMessageIncomingChannels = new CopyOnWriteArrayList<>(); List outgoingChannels = new CopyOnWriteArrayList<>(); + List directMessageOutgoingChannels = new CopyOnWriteArrayList<>(); public void terminate( @Observes(notifyObserver = Reception.IF_EXISTS) @Priority(50) @BeforeDestroyed(ApplicationScoped.class) Object event) { incomingChannels.forEach(SolaceIncomingChannel::close); + directMessageIncomingChannels.forEach(SolaceDirectMessageIncomingChannel::close); outgoingChannels.forEach(SolaceOutgoingChannel::close); + directMessageOutgoingChannels.forEach(SolaceDirectMessageOutgoingChannel::close); } @PostConstruct @@ -91,17 +100,29 @@ void init() { @Override public Flow.Publisher> getPublisher(Config config) { var ic = new SolaceConnectorIncomingConfiguration(config); - SolaceIncomingChannel channel = new SolaceIncomingChannel(vertx, ic, solace); - incomingChannels.add(channel); - return channel.getStream(); + if (ic.getClientType().equals("direct")) { + SolaceDirectMessageIncomingChannel channel = new SolaceDirectMessageIncomingChannel(vertx, ic, solace); + directMessageIncomingChannels.add(channel); + return channel.getStream(); + } else { + SolaceIncomingChannel channel = new SolaceIncomingChannel(vertx, ic, solace); + incomingChannels.add(channel); + return channel.getStream(); + } } @Override public Flow.Subscriber> getSubscriber(Config config) { var oc = new SolaceConnectorOutgoingConfiguration(config); - SolaceOutgoingChannel channel = new SolaceOutgoingChannel(vertx, oc, solace); - outgoingChannels.add(channel); - return channel.getSubscriber(); + if (oc.getClientType().equals("direct")) { + SolaceDirectMessageOutgoingChannel channel = new SolaceDirectMessageOutgoingChannel(vertx, oc, solace); + directMessageOutgoingChannels.add(channel); + return channel.getSubscriber(); + } else { + SolaceOutgoingChannel channel = new SolaceOutgoingChannel(vertx, oc, solace); + outgoingChannels.add(channel); + return channel.getSubscriber(); + } } @Override @@ -110,9 +131,15 @@ public HealthReport getStartup() { for (SolaceIncomingChannel in : incomingChannels) { in.isStarted(builder); } + for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) { + in.isStarted(builder); + } for (SolaceOutgoingChannel sink : outgoingChannels) { sink.isStarted(builder); } + for (SolaceDirectMessageOutgoingChannel sink : directMessageOutgoingChannels) { + sink.isStarted(builder); + } return builder.build(); } @@ -122,9 +149,15 @@ public HealthReport getReadiness() { for (SolaceIncomingChannel in : incomingChannels) { in.isReady(builder); } + for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) { + in.isReady(builder); + } for (SolaceOutgoingChannel sink : outgoingChannels) { sink.isReady(builder); } + for (SolaceDirectMessageOutgoingChannel sink : directMessageOutgoingChannels) { + sink.isReady(builder); + } return builder.build(); } @@ -135,9 +168,15 @@ public HealthReport getLiveness() { for (SolaceIncomingChannel in : incomingChannels) { in.isAlive(builder); } + for (SolaceDirectMessageIncomingChannel in : directMessageIncomingChannels) { + in.isAlive(builder); + } for (SolaceOutgoingChannel out : outgoingChannels) { out.isAlive(builder); } + for (SolaceDirectMessageOutgoingChannel out : directMessageOutgoingChannels) { + out.isAlive(builder); + } return builder.build(); } } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceErrorTopic.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceErrorTopic.java index 76352b4..99f34e5 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceErrorTopic.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceErrorTopic.java @@ -14,35 +14,22 @@ public class SolaceErrorTopic implements SolaceFailureHandler { private final String channel; private final AcknowledgementSupport ackSupport; - private final MessagingService solace; private final SolaceErrorTopicPublisherHandler solaceErrorTopicPublisherHandler; - private long maxDeliveryAttempts; - private String errorTopic; - private boolean dmqEligible; - private Long timeToLive; + private final long maxDeliveryAttempts; + private final String errorTopic; + private final boolean dmqEligible; + private final Long timeToLive; - public SolaceErrorTopic(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { + public SolaceErrorTopic(String channel, String errorTopic, boolean dmqEligible, Long timeToLive, long maxDeliveryAttempts, + AcknowledgementSupport ackSupport, MessagingService solace) { this.channel = channel; - this.ackSupport = ackSupport; - this.solace = solace; - this.solaceErrorTopicPublisherHandler = new SolaceErrorTopicPublisherHandler(solace); - } - - public void setMaxDeliveryAttempts(long maxDeliveryAttempts) { - this.maxDeliveryAttempts = maxDeliveryAttempts; - } - - public void setErrorTopic(String errorTopic) { this.errorTopic = errorTopic; - } - - public void setDmqEligible(boolean dmqEligible) { this.dmqEligible = dmqEligible; - } - - public void setTimeToLive(Long timeToLive) { this.timeToLive = timeToLive; + this.maxDeliveryAttempts = maxDeliveryAttempts; + this.ackSupport = ackSupport; + this.solaceErrorTopicPublisherHandler = new SolaceErrorTopicPublisherHandler(solace); } @Override @@ -54,7 +41,9 @@ public CompletionStage handle(SolaceInboundMessage msg, Throwable reaso SolaceLogging.log.messageSettled(channel, MessageAcknowledgementConfiguration.Outcome.ACCEPTED.toString().toLowerCase(), "Message is published to error topic and acknowledged on queue."); - ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED); + if (ackSupport != null) { + ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED); + } }) .replaceWithVoid() .onFailure().invoke(t -> SolaceLogging.log.unsuccessfulToTopic(errorTopic, channel, t)) diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceFail.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceFail.java index b0769e6..5615d1c 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceFail.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/fault/SolaceFail.java @@ -34,7 +34,11 @@ public CompletionStage handle(SolaceInboundMessage msg, Throwable reaso SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage()); return Uni.createFrom().voidItem() - .invoke(() -> ackSupport.settle(msg.getMessage(), outcome)) + .invoke(() -> { + if (ackSupport != null) { + ackSupport.settle(msg.getMessage(), outcome); + } + }) .runSubscriptionOn(msg::runOnMessageContext) .subscribeAsCompletionStage(); } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceDirectMessageIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceDirectMessageIncomingChannel.java new file mode 100644 index 0000000..2e2df71 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceDirectMessageIncomingChannel.java @@ -0,0 +1,241 @@ +package com.solace.quarkus.messaging.incoming; + +import static com.solace.quarkus.messaging.i18n.SolaceExceptions.ex; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import com.solace.messaging.DirectMessageReceiverBuilder; +import com.solace.messaging.MessagingService; +import com.solace.messaging.config.*; +import com.solace.messaging.receiver.DirectMessageReceiver; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.resources.TopicSubscription; +import com.solace.quarkus.messaging.SolaceConnectorIncomingConfiguration; +import com.solace.quarkus.messaging.fault.*; +import com.solace.quarkus.messaging.i18n.SolaceLogging; +import com.solace.quarkus.messaging.tracing.SolaceOpenTelemetryInstrumenter; +import com.solace.quarkus.messaging.tracing.SolaceTrace; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; +import io.vertx.mutiny.core.Vertx; + +public class SolaceDirectMessageIncomingChannel + implements ReceiverActivationPassivationConfiguration.ReceiverStateChangeListener { + private final String channel; + private final Context context; + private final SolaceAckHandler ackHandler; + private final SolaceFailureHandler failureHandler; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean alive = new AtomicBoolean(true); + private final DirectMessageReceiver receiver; + private final Flow.Publisher> stream; + private final ExecutorService pollerThread; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; + private final List failures = new ArrayList<>(); + private final SolaceOpenTelemetryInstrumenter solaceOpenTelemetryInstrumenter; + private final MessagingService solace; + + // Assuming we won't ever exceed the limit of an unsigned long... + private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); + + public SolaceDirectMessageIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { + this.solace = solace; + this.channel = ic.getChannel(); + this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); + this.gracefulShutdown = ic.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); + MessageAcknowledgementConfiguration.Outcome[] outcomes = new MessageAcknowledgementConfiguration.Outcome[] { + MessageAcknowledgementConfiguration.Outcome.ACCEPTED }; + + DirectMessageReceiverBuilder builder = solace.createDirectMessageReceiverBuilder(); + String subscriptions = ic.getConsumerSubscriptions().orElse(this.channel); + builder.withSubscriptions(Arrays.stream(subscriptions.split(",")) + .map(TopicSubscription::of) + .toArray(TopicSubscription[]::new)); + + switch (ic.getClientTypeDirectBackPressureStrategy()) { + case "oldest": + builder.onBackPressureDropLatest(ic.getClientTypeDirectBackPressureBufferCapacity()); + break; + case "latest": + builder.onBackPressureDropOldest(ic.getClientTypeDirectBackPressureBufferCapacity()); + break; + default: + builder.onBackPressureElastic(); + break; + } + + this.receiver = builder.build(); + + boolean lazyStart = ic.getClientLazyStart(); + this.ackHandler = null; + this.failureHandler = createFailureHandler(ic, solace); + + // TODO Here use a subscription receiver.receiveAsync with an internal queue + this.pollerThread = Executors.newSingleThreadExecutor(); + + Multi> incomingMulti = Multi.createBy().repeating() + .uni(() -> Uni.createFrom().item(this.receiver::receiveMessage) + .runSubscriptionOn(pollerThread)) + .until(__ -> closed.get()) + .emitOn(context::runOnContext) + .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, + unacknowledgedMessageTracker, this::reportFailure)); + + if (ic.getClientTracingEnabled()) { + solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForIncoming(); + incomingMulti = incomingMulti.map(message -> { + InboundMessage consumedMessage = message.getMetadata(SolaceInboundMetadata.class).get().getMessage(); + Map messageProperties = new HashMap<>(); + + messageProperties.put("messaging.solace.replication_group_message_id", + consumedMessage.getReplicationGroupMessageId().toString()); + messageProperties.put("messaging.solace.priority", Integer.toString(consumedMessage.getPriority())); + if (consumedMessage.getProperties().size() > 0) { + messageProperties.putAll(consumedMessage.getProperties()); + } + SolaceTrace solaceTrace = new SolaceTrace.Builder() + .withDestinationKind("queue") + .withTopic(consumedMessage.getDestinationName()) + .withMessageID(consumedMessage.getApplicationMessageId()) + .withCorrelationID(consumedMessage.getCorrelationId()) + .withPartitionKey( + consumedMessage + .hasProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY) + ? consumedMessage + .getProperty( + SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY) + : null) + .withPayloadSize(Long.valueOf(consumedMessage.getPayloadAsBytes().length)) + .withProperties(messageProperties) + .build(); + return solaceOpenTelemetryInstrumenter.traceIncoming(message, solaceTrace, true); + }); + } else { + solaceOpenTelemetryInstrumenter = null; + } + + this.stream = incomingMulti.plug(m -> lazyStart + ? m.onSubscription() + .call(() -> Uni.createFrom().completionStage(this.receiver.startAsync())) + : m) + .onItem().invoke(() -> alive.set(true)) + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure); + + if (!lazyStart) { + this.receiver.start(); + } + + } + + private synchronized void reportFailure(Throwable throwable) { + alive.set(false); + // Don't keep all the failures, there are only there for reporting. + if (failures.size() == 10) { + failures.remove(0); + } + failures.add(throwable); + } + + private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { + String strategy = ic.getConsumerFailureStrategy(); + SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy); + switch (actualStrategy) { + case IGNORE: + return new SolaceIgnoreFailure(ic.getChannel()); + case ERROR_TOPIC: + if (ic.getConsumerErrorTopic().isEmpty()) { + throw ex.illegalArgumentInvalidFailureStrategy(strategy); + } + return new SolaceErrorTopic(ic.getChannel(), ic.getConsumerErrorTopic().get(), + ic.getConsumerErrorMessageDmqEligible(), ic.getConsumerErrorMessageTtl().orElse(null), + ic.getConsumerErrorMessageMaxDeliveryAttempts(), null, solace); + default: + throw ex.illegalArgumentInvalidFailureStrategy( + "Direct Consumer supports 'ignore' and 'error_topic' failure strategies. Please check your configured failure strategy :: " + + strategy); + } + + } + + public Flow.Publisher> getStream() { + return this.stream; + } + + public void waitForUnAcknowledgedMessages() { + try { + receiver.terminate(3000); + SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel); + if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); + } + } catch (InterruptedException e) { + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); + throw new RuntimeException(e); + } + } + + public void close() { + if (this.gracefulShutdown) { + waitForUnAcknowledgedMessages(); + } + closed.compareAndSet(false, true); + if (this.pollerThread != null) { + if (this.gracefulShutdown) { + this.pollerThread.shutdown(); + try { + this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + SolaceLogging.log.shutdownException(e.getMessage()); + throw new RuntimeException(e); + } + } else { + this.pollerThread.shutdownNow(); + } + } + if (receiver.isRunning()) { + receiver.terminate(3000); + } + } + + public void isStarted(HealthReport.HealthReportBuilder builder) { + builder.add(channel, solace.isConnected()); + } + + public void isReady(HealthReport.HealthReportBuilder builder) { + builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning()); + } + + public void isAlive(HealthReport.HealthReportBuilder builder) { + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } + } + + @Override + public void onStateChange(ReceiverState receiverState, ReceiverState receiverState1, long l) { + + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java index 1bb277e..e01d694 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMessage.java @@ -14,6 +14,7 @@ import com.solace.quarkus.messaging.i18n.SolaceLogging; import io.netty.handler.codec.http.HttpHeaderValues; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; import io.vertx.core.buffer.Buffer; @@ -97,7 +98,11 @@ public Function> getNack() { @Override public CompletionStage ack() { this.unacknowledgedMessageTracker.decrement(); - return ackHandler.handle(this); + if (this.ackHandler != null) { + return ackHandler.handle(this); + } + + return Uni.createFrom().voidItem().subscribeAsCompletionStage(); } @Override diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java index 7f0c685..5090c5d 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java @@ -88,7 +88,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i } }); if (ic.getConsumerQueueAddAdditionalSubscriptions()) { - String subscriptions = ic.getConsumerQueueSubscriptions().orElse(this.channel); + String subscriptions = ic.getConsumerSubscriptions().orElse(this.channel); builder.withSubscriptions(Arrays.stream(subscriptions.split(",")) .map(TopicSubscription::of) .toArray(TopicSubscription[]::new)); @@ -127,7 +127,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i messageProperties.put("messaging.solace.replication_group_message_id", consumedMessage.getReplicationGroupMessageId().toString()); messageProperties.put("messaging.solace.priority", Integer.toString(consumedMessage.getPriority())); - if (consumedMessage.getProperties().size() > 0) { + if (!consumedMessage.getProperties().isEmpty()) { messageProperties.putAll(consumedMessage.getProperties()); } SolaceTrace solaceTrace = new SolaceTrace.Builder() @@ -142,7 +142,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .getProperty( SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY) : null) - .withPayloadSize(Long.valueOf(consumedMessage.getPayloadAsBytes().length)) + .withPayloadSize((long) consumedMessage.getPayloadAsBytes().length) .withProperties(messageProperties) .build(); return solaceOpenTelemetryInstrumenter.traceIncoming(message, solaceTrace, true); @@ -173,7 +173,7 @@ private synchronized void reportFailure(Throwable throwable) { } private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { - String strategy = ic.getConsumerQueueFailureStrategy(); + String strategy = ic.getConsumerFailureStrategy(); SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy); switch (actualStrategy) { case IGNORE: @@ -183,15 +183,12 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu case DISCARD: return new SolaceDiscard(ic.getChannel(), receiver); case ERROR_TOPIC: - SolaceErrorTopic solaceErrorTopic = new SolaceErrorTopic(ic.getChannel(), receiver, solace); - if (ic.getConsumerQueueErrorTopic().isEmpty()) { + if (ic.getConsumerErrorTopic().isEmpty()) { throw ex.illegalArgumentInvalidFailureStrategy(strategy); } - solaceErrorTopic.setErrorTopic(ic.getConsumerQueueErrorTopic().get()); - solaceErrorTopic.setDmqEligible(ic.getConsumerQueueErrorMessageDmqEligible().booleanValue()); - solaceErrorTopic.setTimeToLive(ic.getConsumerQueueErrorMessageTtl().orElse(null)); - solaceErrorTopic.setMaxDeliveryAttempts(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()); - return solaceErrorTopic; + return new SolaceErrorTopic(ic.getChannel(), ic.getConsumerErrorTopic().get(), + ic.getConsumerErrorMessageDmqEligible(), ic.getConsumerErrorMessageTtl().orElse(null), + ic.getConsumerErrorMessageMaxDeliveryAttempts(), receiver, solace); default: throw ex.illegalArgumentInvalidFailureStrategy(strategy); } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceDirectMessageOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceDirectMessageOutgoingChannel.java new file mode 100644 index 0000000..df25e77 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceDirectMessageOutgoingChannel.java @@ -0,0 +1,267 @@ +package com.solace.quarkus.messaging.outgoing; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import com.solace.messaging.DirectMessagePublisherBuilder; +import com.solace.messaging.MessagingService; +import com.solace.messaging.PubSubPlusClientException; +import com.solace.messaging.config.SolaceConstants; +import com.solace.messaging.config.SolaceProperties; +import com.solace.messaging.publisher.*; +import com.solace.messaging.resources.Topic; +import com.solace.quarkus.messaging.SolaceConnectorOutgoingConfiguration; +import com.solace.quarkus.messaging.i18n.SolaceLogging; +import com.solace.quarkus.messaging.tracing.SolaceOpenTelemetryInstrumenter; +import com.solace.quarkus.messaging.tracing.SolaceTrace; + +import io.netty.handler.codec.http.HttpHeaderValues; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; +import io.vertx.core.json.Json; +import io.vertx.mutiny.core.Vertx; + +public class SolaceDirectMessageOutgoingChannel + implements PublisherHealthCheck.PublisherReadinessListener { + + private final DirectMessagePublisher publisher; + private final String channel; + private final Flow.Subscriber> subscriber; + private final Topic topic; + private final SenderProcessor processor; + private final boolean gracefulShutdown; + private final long gracefulShutdownWaitTimeout; + private final AtomicBoolean alive = new AtomicBoolean(true); + private final List failures = new ArrayList<>(); + private final SolaceOpenTelemetryInstrumenter solaceOpenTelemetryInstrumenter; + private final MessagingService solace; + private volatile boolean isPublisherReady = true; + // Assuming we won't ever exceed the limit of an unsigned long... + private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); + + public SolaceDirectMessageOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) { + this.solace = solace; + this.channel = oc.getChannel(); + DirectMessagePublisherBuilder builder = solace.createDirectMessagePublisherBuilder(); + switch (oc.getProducerBackPressureStrategy()) { + case "wait": + builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); + break; + case "reject": + builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity()); + break; + default: + builder.onBackPressureElastic(); + break; + } + this.gracefulShutdown = oc.getClientGracefulShutdown(); + this.gracefulShutdownWaitTimeout = oc.getClientGracefulShutdownWaitTimeout(); + this.publisher = builder.build(); + boolean lazyStart = oc.getClientLazyStart(); + this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel)); + if (oc.getClientTracingEnabled()) { + solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForOutgoing(); + } else { + solaceOpenTelemetryInstrumenter = null; + } + this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(), + m -> sendMessage(solace, m, oc.getClientTracingEnabled()).onFailure() + .invoke(this::reportFailure)); + this.subscriber = MultiUtils.via(processor, multi -> multi.plug( + m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m)); + if (!lazyStart) { + this.publisher.start(); + } + + this.publisher.setPublisherReadinessListener(() -> isPublisherReady = true); + this.publisher.setPublishFailureListener(failedPublishEvent -> { + SolaceLogging.log.error("Failed to publish direct message"); + reportFailure(failedPublishEvent.getException()); + }); + } + + private Uni sendMessage(MessagingService solace, Message m, boolean isTracingEnabled) { + + // TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true + return publishMessage(publisher, m, solace.messageBuilder(), isTracingEnabled) + .onItem().transformToUni(receipt -> { + alive.set(true); + return Uni.createFrom().completionStage(m.getAck()); + }) + .onFailure().recoverWithUni(t -> { + reportFailure(t); + return Uni.createFrom().completionStage(m.nack(t)); + }); + } + + private synchronized void reportFailure(Throwable throwable) { + alive.set(false); + // Don't keep all the failures, there are only there for reporting. + if (failures.size() == 10) { + failures.remove(0); + } + failures.add(throwable); + } + + private Uni publishMessage(DirectMessagePublisher publisher, Message m, + OutboundMessageBuilder msgBuilder, boolean isTracingEnabled) { + publishedMessagesTracker.increment(); + AtomicReference topic = new AtomicReference<>(this.topic); + OutboundMessage outboundMessage; + m.getMetadata(SolaceOutboundMetadata.class).ifPresent(metadata -> { + if (metadata.getHttpContentHeaders() != null && !metadata.getHttpContentHeaders().isEmpty()) { + metadata.getHttpContentHeaders().forEach(msgBuilder::withHTTPContentHeader); + } + if (metadata.getProperties() != null && !metadata.getProperties().isEmpty()) { + metadata.getProperties().forEach(msgBuilder::withProperty); + } + if (metadata.getExpiration() != null) { + msgBuilder.withExpiration(metadata.getExpiration()); + } + if (metadata.getPriority() != null) { + msgBuilder.withPriority(metadata.getPriority()); + } + if (metadata.getSenderId() != null) { + msgBuilder.withSenderId(metadata.getSenderId()); + } + if (metadata.getApplicationMessageType() != null) { + msgBuilder.withApplicationMessageType(metadata.getApplicationMessageType()); + } + if (metadata.getTimeToLive() != null) { + msgBuilder.withTimeToLive(metadata.getTimeToLive()); + } + if (metadata.getApplicationMessageId() != null) { + msgBuilder.withApplicationMessageId(metadata.getApplicationMessageId()); + } + if (metadata.getClassOfService() != null) { + msgBuilder.withClassOfService(metadata.getClassOfService()); + } + if (metadata.getPartitionKey() != null) { + msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, + metadata.getPartitionKey()); + } + if (metadata.getCorrelationId() != null) { + msgBuilder.withProperty(SolaceProperties.MessageProperties.CORRELATION_ID, metadata.getCorrelationId()); + } + + if (metadata.getDynamicDestination() != null) { + topic.set(Topic.of(metadata.getDynamicDestination())); + } + }); + + Object payload = m.getPayload(); + if (payload instanceof OutboundMessage) { + outboundMessage = (OutboundMessage) payload; + } else if (payload instanceof String) { + outboundMessage = msgBuilder + .withHTTPContentHeader(HttpHeaderValues.TEXT_PLAIN.toString(), "") + .build((String) payload); + } else if (payload instanceof byte[]) { + outboundMessage = msgBuilder.build((byte[]) payload); + } else { + outboundMessage = msgBuilder + .withHTTPContentHeader(HttpHeaderValues.APPLICATION_JSON.toString(), "") + .build(Json.encode(payload)); + } + + if (isTracingEnabled) { + SolaceTrace solaceTrace = new SolaceTrace.Builder() + .withDestinationKind("topic") + .withTopic(topic.get().getName()) + .withMessageID(outboundMessage.getApplicationMessageId()) + .withCorrelationID(outboundMessage.getCorrelationId()) + .withPartitionKey( + outboundMessage + .hasProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY) + ? outboundMessage + .getProperty( + SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY) + : null) + .withPayloadSize(Long.valueOf(outboundMessage.getPayloadAsBytes().length)) + .withProperties(outboundMessage.getProperties()).build(); + solaceOpenTelemetryInstrumenter.traceOutgoing(m, solaceTrace); + } + + return Uni.createFrom(). emitter(e -> { + boolean exitExceptionally = false; + try { + if (isPublisherReady) { + publisher.publish(outboundMessage, topic.get()); + publishedMessagesTracker.decrement(); + e.complete(null); + } + } catch (PubSubPlusClientException.PublisherOverflowException publisherOverflowException) { + isPublisherReady = false; + exitExceptionally = true; + e.fail(publisherOverflowException); + } catch (Throwable t) { + e.fail(t); + } finally { + if (exitExceptionally) { + publisher.notifyWhenReady(); + } + } + }).invoke(() -> SolaceLogging.log.successfullyToTopic(channel, topic.get().getName())); + } + + public Flow.Subscriber> getSubscriber() { + return this.subscriber; + } + + public void waitForPublishedMessages() { + try { + SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel); + if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); + } + } catch (InterruptedException e) { + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); + throw new RuntimeException(e); + } + } + + public void close() { + if (this.gracefulShutdown) { + waitForPublishedMessages(); + } + if (processor != null) { + processor.cancel(); + } + + publisher.terminate(5000); + } + + public void isStarted(HealthReport.HealthReportBuilder builder) { + builder.add(channel, solace.isConnected()); + } + + public void isReady(HealthReport.HealthReportBuilder builder) { + builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady()); + } + + public void isAlive(HealthReport.HealthReportBuilder builder) { + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } + } + + @Override + public void ready() { + isPublisherReady = true; + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java index bd23ef0..80648c6 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java @@ -16,6 +16,7 @@ public class SolaceOutboundMetadata { private final String dynamicDestination; private final String partitionKey; + private final String correlationId; public static PubSubOutboundMetadataBuilder builder() { return new PubSubOutboundMetadataBuilder(); @@ -29,7 +30,7 @@ public SolaceOutboundMetadata(Map httpContentHeaders, String applicationMessageType, Long timeToLive, String applicationMessageId, - Integer classOfService, String dynamicDestination, String partitionKey) { + Integer classOfService, String dynamicDestination, String partitionKey, String correlationId) { this.httpContentHeaders = httpContentHeaders; this.expiration = expiration; this.priority = priority; @@ -41,6 +42,7 @@ public SolaceOutboundMetadata(Map httpContentHeaders, this.classOfService = classOfService; this.dynamicDestination = dynamicDestination; this.partitionKey = partitionKey; + this.correlationId = correlationId; } public Map getHttpContentHeaders() { @@ -87,6 +89,10 @@ public String getPartitionKey() { return partitionKey; } + public String getCorrelationId() { + return correlationId; + } + public static class PubSubOutboundMetadataBuilder { private Map httpContentHeaders; private Long expiration; @@ -100,6 +106,7 @@ public static class PubSubOutboundMetadataBuilder { private String dynamicDestination; private String partitionKey; + private String correlationId; public PubSubOutboundMetadataBuilder setHttpContentHeaders(Map httpContentHeader) { this.httpContentHeaders = httpContentHeaders; @@ -156,9 +163,15 @@ public PubSubOutboundMetadataBuilder setPartitionKey(String partitionKey) { return this; } + public PubSubOutboundMetadataBuilder setCorrelationId(String correlationId) { + this.correlationId = correlationId; + return this; + } + public SolaceOutboundMetadata createPubSubOutboundMetadata() { return new SolaceOutboundMetadata(httpContentHeaders, expiration, priority, senderId, properties, - applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination, partitionKey); + applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination, partitionKey, + correlationId); } } } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java index 84c0282..f4ecacc 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java @@ -14,6 +14,7 @@ import com.solace.messaging.PersistentMessagePublisherBuilder; import com.solace.messaging.PubSubPlusClientException; import com.solace.messaging.config.SolaceConstants; +import com.solace.messaging.config.SolaceProperties; import com.solace.messaging.publisher.OutboundMessage; import com.solace.messaging.publisher.OutboundMessageBuilder; import com.solace.messaging.publisher.PersistentMessagePublisher; @@ -164,6 +165,9 @@ private Uni publishMessage(PersistentMessagePublisher publisher, msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, metadata.getPartitionKey()); } + if (metadata.getCorrelationId() != null) { + msgBuilder.withProperty(SolaceProperties.MessageProperties.CORRELATION_ID, metadata.getCorrelationId()); + } if (metadata.getDynamicDestination() != null) { topic.set(Topic.of(metadata.getDynamicDestination())); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java index d5708b2..ad2b7c0 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java @@ -51,7 +51,7 @@ void consumer() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages"); + .with("mp.messaging.incoming.in.consumer.subscriptions", "quarkus/integration/test/replay/messages"); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); @@ -80,7 +80,7 @@ void consumerReplay() { .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages") + .with("mp.messaging.incoming.in.consumer.subscriptions", "quarkus/integration/test/replay/messages") .with("mp.messaging.incoming.in.consumer.queue.replay.strategy", "all-messages"); // Run app that consumes messages @@ -100,7 +100,7 @@ void consumerWithSelectorQuery() { .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") .with("mp.messaging.incoming.in.consumer.queue.selector-query", "id = '1'") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); @@ -128,10 +128,10 @@ void consumerFailedProcessingPublishToErrorTopic() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic") - .with("mp.messaging.incoming.in.consumer.queue.error.topic", + .with("mp.messaging.incoming.in.consumer.failure-strategy", "error_topic") + .with("mp.messaging.incoming.in.consumer.error.topic", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION) - .with("mp.messaging.incoming.in.consumer.queue.error.message.ttl", 1000) + .with("mp.messaging.incoming.in.consumer.error.message.ttl", 1000) .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); @@ -163,7 +163,7 @@ void consumerFailedProcessingMoveToDMQ() { .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") .with("mp.messaging.incoming.in.consumer.queue.supports-nacks", "true") - .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "discard") + .with("mp.messaging.incoming.in.consumer.failure-strategy", "discard") .with("mp.messaging.incoming.dmq-in.connector", "quarkus-solace") .with("mp.messaging.incoming.dmq-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_DMQ_NAME) .with("mp.messaging.incoming.dmq-in.consumer.queue.type", "durable-exclusive"); @@ -266,10 +266,10 @@ void consumerPublishToErrorTopicPermissionException() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.failure-strategy", "error_topic") - .with("mp.messaging.incoming.in.consumer.queue.error.topic", + .with("mp.messaging.incoming.in.consumer.failure-strategy", "error_topic") + .with("mp.messaging.incoming.in.consumer.error.topic", "publish/deny") - .with("mp.messaging.incoming.in.consumer.queue.error.message.max-delivery-attempts", 0) + .with("mp.messaging.incoming.in.consumer.error.message.max-delivery-attempts", 0) .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); @@ -300,7 +300,7 @@ void consumerGracefulCloseTest() { .with("consumer.queue.name", queue) .with("consumer.queue.add-additional-subscriptions", true) .with("consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("consumer.queue.subscriptions", topic); + .with("consumer.subscriptions", topic); // Initialize incoming channel to consumes messages SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), @@ -346,7 +346,7 @@ void consumerCreateMissingResourceAddSubscriptionPermissionException() { .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); Exception exception = assertThrows(Exception.class, () -> { // Run app that consumes messages diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessageConsumerTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessageConsumerTest.java new file mode 100644 index 0000000..f714cd7 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessageConsumerTest.java @@ -0,0 +1,292 @@ +package com.solace.quarkus.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.*; +import java.util.concurrent.*; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.awaitility.Durations; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import com.solace.messaging.config.SolaceConstants; +import com.solace.messaging.publisher.DirectMessagePublisher; +import com.solace.messaging.publisher.OutboundMessage; +import com.solace.messaging.publisher.OutboundMessageBuilder; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.resources.Topic; +import com.solace.quarkus.messaging.base.SolaceContainer; +import com.solace.quarkus.messaging.base.WeldTestBase; +import com.solace.quarkus.messaging.incoming.SolaceDirectMessageIncomingChannel; +import com.solace.quarkus.messaging.incoming.SolaceInboundMessage; +import com.solace.quarkus.messaging.logging.SolaceTestAppender; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SolaceDirectMessageConsumerTest extends WeldTestBase { + private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus.messaging"); + private SolaceTestAppender solaceTestAppender = new SolaceTestAppender(); + + private SolaceDirectMessageConsumerTest() { + rootLogger.addAppender(solaceTestAppender); + } + + @Test + @Order(1) + void consumer() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.consumer.subscriptions", "quarkus/integration/test/replay/messages"); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of("quarkus/integration/test/replay/messages"); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + // Assert on published messages + await().untilAsserted(() -> assertThat(app.getReceived()).contains("1", "2", "3", "4", "5")); + } + + @Test + @Order(2) + void consumerFailedProcessingPublishToErrorTopic() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.consumer.subscriptions", + SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION) + .with("mp.messaging.incoming.in.consumer.failure-strategy", "error_topic") + .with("mp.messaging.incoming.in.consumer.error.topic", + SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION) + .with("mp.messaging.incoming.in.consumer.error.message.ttl", 1000) + .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") + .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) + .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); + + // Run app that consumes messages + MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class); + + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); + OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); + OutboundMessage outboundMessage = messageBuilder.build("1"); + publisher.publish(outboundMessage, tp); + + // Assert on published messages + await().untilAsserted(() -> assertThat(app.getReceived().size()).isEqualTo(0)); + await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages()).contains("1")); + await().pollDelay(Durations.FIVE_SECONDS).until(() -> true); + } + + @Test + @Order(3) + void consumerPublishToErrorTopicPermissionException() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.consumer.subscriptions", + SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION) + .with("mp.messaging.incoming.in.consumer.failure-strategy", "error_topic") + .with("mp.messaging.incoming.in.consumer.error.topic", + "publish/deny") + .with("mp.messaging.incoming.in.consumer.error.message.max-delivery-attempts", 0) + .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") + .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) + .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); + + // Run app that consumes messages + MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class); + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); + OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); + OutboundMessage outboundMessage = messageBuilder.build("2"); + publisher.publish(outboundMessage, tp); + + await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0)); + // await().untilAsserted(() -> assertThat(inMemoryLogHandler.getRecords().stream().filter(record -> record.getMessage().contains("A exception occurred when publishing to topic")).count()).isEqualTo(4)); + await().untilAsserted(() -> assertThat(solaceTestAppender.getLog().stream() + .anyMatch(record -> record.getMessage().toString().contains("Publishing error message to topic"))) + .isEqualTo(true)); + } + + @Test + @Order(4) + void consumerGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "in") + .with("client.type", "direct") + .with("consumer.subscriptions", topic); + + // Initialize incoming channel to consumes messages + SolaceDirectMessageIncomingChannel solaceIncomingChannel = new SolaceDirectMessageIncomingChannel(Vertx.vertx(), + new SolaceConnectorIncomingConfiguration(config), messagingService); + + CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); + CopyOnWriteArrayList ackedMessageList = new CopyOnWriteArrayList<>(); + + Flow.Publisher> stream = solaceIncomingChannel.getStream(); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + Multi.createFrom().publisher(stream).subscribe().with(message -> { + list.add(message); + executorService.schedule(() -> { + ackedMessageList.add(message); + CompletableFuture.runAsync(message::ack); + }, 1, TimeUnit.SECONDS); + }); + + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + await().until(() -> list.size() == 5); + // Assert on acknowledged messages + solaceIncomingChannel.close(); + await().atMost(2, TimeUnit.MINUTES).until(() -> ackedMessageList.size() == 5); + executorService.shutdown(); + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + CompletionStage in(SolaceInboundMessage msg) { + received.add(msg.getMessage().getPayloadAsString()); + return msg.ack(); + } + + public List getReceived() { + return received; + } + } + + @ApplicationScoped + static class MyDMQConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + private List receivedDMQMessages = new CopyOnWriteArrayList<>(); + + @Incoming("in") + void in(String msg) { + received.add(msg); + } + + @Incoming("dmq-in") + void dmqin(InboundMessage msg) { + receivedDMQMessages.add(msg.getPayloadAsString()); + } + + public List getReceived() { + return received; + } + + public List getReceivedDMQMessages() { + return receivedDMQMessages; + } + } + + @ApplicationScoped + static class MyErrorQueueConsumer { + private final List received = new CopyOnWriteArrayList<>(); + private List receivedFailedMessages = new CopyOnWriteArrayList<>(); + + @Incoming("in") + void in(String msg) { + received.add(msg); + } + + @Incoming("error-in") + void errorin(InboundMessage msg) { + receivedFailedMessages.add(msg.getPayloadAsString()); + } + + public List getReceived() { + return received; + } + + public List getReceivedFailedMessages() { + return receivedFailedMessages; + } + } + + @ApplicationScoped + static class MyPartitionedQueueConsumer { + Map partitionMessages = new HashMap<>() { + { + put("Group-1", 0); + put("Group-2", 0); + put("Group-3", 0); + put("Group-4", 0); + } + }; + + @Incoming("consumer-1") + CompletionStage consumer1(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-2") + CompletionStage consumer2(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-3") + CompletionStage consumer3(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-4") + CompletionStage consumer4(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + private void updatePartitionMessages(SolaceInboundMessage msg) { + String partitionKey = msg.getMessage() + .getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); + int count = partitionMessages.get(partitionKey); + partitionMessages.put(partitionKey, (count + 1)); + } + + public Map getPartitionMessages() { + return partitionMessages; + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessagePublisherTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessagePublisherTest.java new file mode 100644 index 0000000..42f0485 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceDirectMessagePublisherTest.java @@ -0,0 +1,212 @@ +package com.solace.quarkus.messaging; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Flow; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Metadata; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.receiver.DirectMessageReceiver; +import com.solace.messaging.resources.TopicSubscription; +import com.solace.quarkus.messaging.base.WeldTestBase; +import com.solace.quarkus.messaging.outgoing.SolaceOutboundMetadata; +import com.solace.quarkus.messaging.outgoing.SolaceOutgoingChannel; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mutiny.core.Vertx; + +public class SolaceDirectMessagePublisherTest extends WeldTestBase { + + @Test + void publisher() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + // Assert on published messages + await().untilAsserted(() -> assertThat(app.getAcked()).contains("1", "2", "3", "4", "5")); + // Assert on received messages + await().untilAsserted(() -> assertThat(expected).contains("1", "2", "3", "4", "5")); + } + + @Test + void publisherWithDynamicDestination() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of("quarkus/integration/test/dynamic/topic/*")) + .build(); + receiver.receiveAsync(inboundMessage -> { + expected.add(inboundMessage.getDestinationName()); + }); + receiver.start(); + + // Run app that publish messages + MyDynamicDestinationApp app = runApplication(config, MyDynamicDestinationApp.class); + // Assert on published messages + await().untilAsserted(() -> assertThat(app.getAcked()).contains("1", "2", "3", "4", "5")); + // Assert on received messages + await().untilAsserted(() -> assertThat(expected).contains("quarkus/integration/test/dynamic/topic/1", + "quarkus/integration/test/dynamic/topic/2", "quarkus/integration/test/dynamic/topic/3", + "quarkus/integration/test/dynamic/topic/4", "quarkus/integration/test/dynamic/topic/5")); + } + + @Test + void publisherGracefulCloseTest() { + MapBasedConfig config = new MapBasedConfig() + .with("channel-name", "out") + .with("client.type", "direct") + .with("producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + SolaceOutgoingChannel solaceOutgoingChannel = new SolaceOutgoingChannel(Vertx.vertx(), + new SolaceConnectorOutgoingConfiguration(config), messagingService); + // Publish messages + Multi.createFrom().range(0, 10) + .map(Message::of) + .subscribe((Flow.Subscriber>) solaceOutgoingChannel.getSubscriber()); + + solaceOutgoingChannel.close(); + // Assert on received messages + await().untilAsserted(() -> assertThat(expected.size()).isEqualTo(10)); + + } + + // @Test + // void publisherWithBackPressureRejectWaitForPublisherReadiness() { + // MapBasedConfig config = new MapBasedConfig() + // .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + // .with("mp.messaging.outgoing.out.producer.topic", topic) + // .with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1); + // + // List expected = new CopyOnWriteArrayList<>(); + // + // // Start listening first + // PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + // .withSubscriptions(TopicSubscription.of("topic")) + // .build(Queue.nonDurableExclusiveQueue()); + // receiver.receiveAsync(inboundMessage -> { + // expected.add(inboundMessage.getPayloadAsString()); + // }); + // receiver.start(); + // + // // Run app that publish messages + // MyBackPressureRejectApp app = runApplication(config, MyBackPressureRejectApp.class); + // // Assert on published messages + // await().untilAsserted(() -> assertThat(app.getAcked()).contains("1", "2", "3", "4", "5")); + // } + + @ApplicationScoped + static class MyApp { + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + Multi> out() { + + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(payload -> Message.of(payload).withAck(() -> { + acked.add(payload); + return CompletableFuture.completedFuture(null); + })); + } + + public List getAcked() { + return acked; + } + } + + // @ApplicationScoped + // static class MyBackPressureRejectApp { + // private final List acked = new CopyOnWriteArrayList<>(); + // public boolean waitForPublisherReadiness = false; + // @Channel("outgoing") + // MutinyEmitter foobar; + // + // void out() { + // List items = new ArrayList<>(); + // items.add("1"); + // items.add("2"); + // items.add("3"); + // items.add("4"); + // items.add("5"); + // items.forEach(payload -> { + // Message message = Message.of(payload).withAck(() -> { + // acked.add(payload); + // return CompletableFuture.completedFuture(null); + // }); + // if (waitForPublisherReadiness) { + // while (SolaceOutgoingChannel.isPublisherReady) { + // foobar.sendMessage(message); + // } + // } else { + // foobar.sendMessage(message); + // } + // }); + // } + // + // public List getAcked() { + // return acked; + // } + // } + + @ApplicationScoped + static class MyDynamicDestinationApp { + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + Multi> out() { + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(payload -> { + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId("test") + .setDynamicDestination("quarkus/integration/test/dynamic/topic/" + payload) + .createPubSubOutboundMetadata(); + Message message = Message.of(payload, Metadata.of(outboundMetadata)); + return message.withAck(() -> { + acked.add(payload); + return CompletableFuture.completedFuture(null); + }); + }); + } + + public List getAcked() { + return acked; + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java index 63db077..960cd9a 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java @@ -132,7 +132,7 @@ void oauthTest() throws IOException { .with("consumer.queue.name", "queue-" + UUID.randomUUID().getMostSignificantBits()) .with("consumer.queue.add-additional-subscriptions", true) .with("consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("consumer.queue.subscriptions", SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); + .with("consumer.subscriptions", SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); MessagingService messagingService = getMessagingService(); SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(), diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java index e9071e8..af6e0b5 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java @@ -32,7 +32,7 @@ void consumer() { .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceConsumerHealthTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceConsumerHealthTest.java index cebe907..e5ab904 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceConsumerHealthTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceConsumerHealthTest.java @@ -31,7 +31,7 @@ void solaceConsumerHealthCheck() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); @@ -71,7 +71,7 @@ void solaceConsumerLivenessCheck() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); // Run app that consumes messages MyErrorConsumer app = runApplication(config, MyErrorConsumer.class); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessageConsumerHealthTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessageConsumerHealthTest.java new file mode 100644 index 0000000..909d9d9 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessageConsumerHealthTest.java @@ -0,0 +1,137 @@ +package com.solace.quarkus.messaging.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.DirectMessagePublisher; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.resources.Topic; +import com.solace.quarkus.messaging.base.WeldTestBase; +import com.solace.quarkus.messaging.incoming.SolaceInboundMessage; + +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceDirectMessageConsumerHealthTest extends WeldTestBase { + + @Test + void solaceConsumerHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + } + + @Test + void solaceConsumerLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); + + // Run app that consumes messages + MyErrorConsumer app = runApplication(config, MyErrorConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + publisher.publish("3", tp); + await().until(() -> { + HealthReport healthReport = getHealth().getLiveness(); + return (healthReport.isOk() == false && !healthReport.getChannels().get(0).getMessage().isEmpty()); + }); + + publisher.publish("4", tp); + publisher.publish("5", tp); + await().until(() -> getHealth().getLiveness().isOk() == true); + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + void in(InboundMessage msg) { + received.add(msg.getPayloadAsString()); + } + + public List getReceived() { + return received; + } + } + + @ApplicationScoped + static class MyErrorConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + CompletionStage in(SolaceInboundMessage msg) { + String payload = new String(msg.getPayload(), StandardCharsets.UTF_8); + if (payload.equals("3")) { + return msg.nack(new IllegalArgumentException("Nacking message with payload 3")); + } + + return msg.ack(); + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessagePublisherHealthTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessagePublisherHealthTest.java new file mode 100644 index 0000000..790cdde --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolaceDirectMessagePublisherHealthTest.java @@ -0,0 +1,117 @@ +package com.solace.quarkus.messaging.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import com.solace.messaging.receiver.DirectMessageReceiver; +import com.solace.messaging.resources.TopicSubscription; +import com.solace.quarkus.messaging.base.WeldTestBase; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +public class SolaceDirectMessagePublisherHealthTest extends WeldTestBase { + @Test + @Order(2) + void publisherHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().until(() -> isStarted() && isReady() && isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + } + + @Test + @Order(1) + void publisherLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", "publish/deny"); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of("publish/deny")) + .build(); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().untilAsserted(() -> assertThat(isStarted() && isReady() && !isAlive()).isTrue()); + + await().untilAsserted(() -> assertThat(!isAlive()).isTrue()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isFalse(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + assertThat(liveness.getChannels().get(0).getMessage()).isNotEmpty(); + } + + @ApplicationScoped + static class MyApp { + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + Multi> out() { + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(payload -> Message.of(payload).withAck(() -> { + acked.add(payload); + return CompletableFuture.completedFuture(null); + })); + } + + public List getAcked() { + return acked; + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java index ea77aa9..5a8b065 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java @@ -30,7 +30,7 @@ public class LocalPropagationAckTest extends WeldTestBase { private MapBasedConfig dataconfig() { return commonConfig() .with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME) - .with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.data.consumer.subscriptions", topic) .with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.data.consumer.queue.missing-resource-creation-strategy", "create-on-start") .with("mp.messaging.incoming.data.consumer.queue.name", topic); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java index 3956aa7..4b287bd 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java @@ -41,7 +41,7 @@ public class LocalPropagationTest extends WeldTestBase { private MapBasedConfig dataconfig() { return commonConfig() .with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME) - .with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.data.consumer.subscriptions", topic) .with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.data.consumer.queue.missing-resource-creation-strategy", "create-on-start") .with("mp.messaging.incoming.data.consumer.queue.name", queue); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java index 1eb67cf..efa8310 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java @@ -42,7 +42,7 @@ public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic); @@ -81,7 +81,7 @@ public void endToEndPerformanceTesttWithBackPressureWaitAndNoWaitForPublishRecei .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); @@ -122,7 +122,7 @@ public void endToEndPerformanceTesttWithBackPressureElasticAndWaitForPublishRece .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); @@ -163,7 +163,7 @@ public void endToEndPerformanceTesttWithBackPressureElasticAndNoWaitForPublishRe .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") @@ -219,7 +219,7 @@ public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndWait .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic); @@ -258,7 +258,7 @@ public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndNoWa .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); @@ -298,7 +298,7 @@ public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndW .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); @@ -338,7 +338,7 @@ public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndN .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", processedTopic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java index 71276a1..665c694 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java @@ -36,7 +36,7 @@ public void solaceConsumerPerformanceTest() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessageConsumerPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessageConsumerPerformanceTest.java new file mode 100644 index 0000000..48e6903 --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessageConsumerPerformanceTest.java @@ -0,0 +1,86 @@ +package com.solace.quarkus.messaging.perf; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.DirectMessagePublisher; +import com.solace.messaging.resources.Topic; +import com.solace.quarkus.messaging.base.WeldTestBase; +import com.solace.quarkus.messaging.incoming.SolaceInboundMessage; + +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceDirectMessageConsumerPerformanceTest extends WeldTestBase { + private static final int COUNT = 100000; + private static final int TIMEOUT_IN_SECONDS = 400; + + @Test + public void solaceConsumerPerformanceTest() { + // Produce messages + DirectMessagePublisher publisher = messagingService.createDirectMessagePublisherBuilder() + .build() + .start(); + + MapBasedConfig config = commonConfig() + .with("mp.messaging.incoming.in.client.type", "direct") + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.subscriptions", topic); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + await().until(() -> isStarted() && isReady()); + + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long start = app.getStart(); + long end = System.currentTimeMillis(); + + System.out.println("Total time : " + (end - start) + " ms"); + + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + LongAdder count = new LongAdder(); + long start; + + @Incoming("in") + public CompletionStage in(SolaceInboundMessage msg) { + if (count.longValue() == 0L) { + start = System.currentTimeMillis(); + } + count.increment(); + return msg.ack(); + } + + public List getReceived() { + return received; + } + + public long getStart() { + return start; + } + + public long getCount() { + return count.longValue(); + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessagePublisherPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessagePublisherPerformanceTest.java new file mode 100644 index 0000000..ec9009f --- /dev/null +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceDirectMessagePublisherPerformanceTest.java @@ -0,0 +1,201 @@ +package com.solace.quarkus.messaging.perf; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.receiver.DirectMessageReceiver; +import com.solace.messaging.resources.TopicSubscription; +import com.solace.quarkus.messaging.base.WeldTestBase; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceDirectMessagePublisherPerformanceTest extends WeldTestBase { + + private static final int COUNT = 100000; + private static final int TIMEOUT_IN_SECONDS = 400; + + // @Test + // void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() { + // MapBasedConfig config = commonConfig() + // .with("mp.messaging.outgoing.out.client.type", "direct") + // .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + // .with("mp.messaging.outgoing.out.producer.topic", topic) + // .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "wait"); + // + // List received = new CopyOnWriteArrayList<>(); + // + // // Start listening first + // DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + // .withSubscriptions(TopicSubscription.of(topic)) + // .build(); + // receiver.receiveAsync(inboundMessage -> { + // received.add(inboundMessage.getPayloadAsString()); + // }); + // receiver.start(); + // + // // Run app that publish messages + // MyApp app = runApplication(config, MyApp.class); + // long start = System.currentTimeMillis(); + // // app.run(); + // await() + // .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + // .until(() -> app.getCount() == COUNT); + // long end = System.currentTimeMillis(); + // + // await() + // .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + // .until(() -> received.size() == COUNT); + // + // System.out.println("Total time : " + (end - start) + " ms"); + // long duration = end - start; + // double speed = (COUNT * 1.0) / (duration / 1000.0); + // System.out.println(speed + " messages/ms"); + // } + + @Test + void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic) + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "wait"); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + // @Test + // void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() { + // MapBasedConfig config = commonConfig() + // .with("mp.messaging.outgoing.out.client.type", "direct") + // .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + // .with("mp.messaging.outgoing.out.producer.topic", topic) + // .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); + // + // List received = new CopyOnWriteArrayList<>(); + // + // // Start listening first + // DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + // .withSubscriptions(TopicSubscription.of(topic)) + // .build(); + // receiver.receiveAsync(inboundMessage -> { + // received.add(inboundMessage.getPayloadAsString()); + // }); + // receiver.start(); + // + // // Run app that publish messages + // MyApp app = runApplication(config, MyApp.class); + // long start = System.currentTimeMillis(); + // // app.run(); + // await() + // .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + // .until(() -> app.getCount() == COUNT); + // long end = System.currentTimeMillis(); + // + // await() + // .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + // .until(() -> received.size() == COUNT); + // + // System.out.println("Total time : " + (end - start) + " ms"); + // long duration = end - start; + // double speed = (COUNT * 1.0) / (duration / 1000.0); + // System.out.println(speed + " messages/ms"); + // } + + @Test + void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() { + MapBasedConfig config = commonConfig() + .with("mp.messaging.outgoing.out.client.type", "direct") + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + DirectMessageReceiver receiver = messagingService.createDirectMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + @ApplicationScoped + static class MyApp { + LongAdder count = new LongAdder(); + + @Outgoing("out") + Multi> out() { + + return Multi.createFrom().range(0, COUNT) + .map(payload -> Message.of(payload).withAck(() -> { + count.increment(); + return CompletableFuture.completedFuture(null); + })); + } + + public long getCount() { + return count.longValue(); + } + } +} diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java index c119aa7..2bdd31f 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java @@ -79,7 +79,7 @@ void consumer() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages"); + .with("mp.messaging.incoming.in.consumer.subscriptions", "quarkus/integration/test/replay/messages"); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); @@ -154,7 +154,7 @@ void processor() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.subscriptions", topic) .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.client.tracing-enabled", "true") .with("mp.messaging.outgoing.out.producer.topic", processedTopic); diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index fa3c93a..77b2880 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -15,7 +15,7 @@ mp.messaging.incoming.hello-in.consumer.queue.name=queue.foobar mp.messaging.incoming.hello-in.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.hello-in.consumer.queue.type=durable-non-exclusive mp.messaging.incoming.hello-in.consumer.queue.add-additional-subscriptions=true -mp.messaging.incoming.hello-in.consumer.queue.subscriptions=hello/foobar +mp.messaging.incoming.hello-in.consumer.subscriptions=hello/foobar mp.messaging.incoming.hello-plain-message-in.connector=quarkus-solace mp.messaging.incoming.hello-plain-message-in.consumer.queue.supports-nacks=true @@ -31,7 +31,7 @@ mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=queue.dynamic.t mp.messaging.incoming.dynamic-destination-in.consumer.queue.missing-resource-creation-strategy=create-on-start mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive mp.messaging.incoming.dynamic-destination-in.consumer.queue.add-additional-subscriptions=true -mp.messaging.incoming.dynamic-destination-in.consumer.queue.subscriptions=test/topic/> +mp.messaging.incoming.dynamic-destination-in.consumer.subscriptions=test/topic/> mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace mp.messaging.outgoing.dynamic-destination-out.producer.topic=test/dynamic/topic