Skip to content

Commit

Permalink
feat(multi-qmgr): Enable multi queue manager and reques reply for fix…
Browse files Browse the repository at this point in the history
…ed queues (#35)

* Upgrade annotations, enable multiple broker connections, encapsulate request reply for fixed queues with selector listener
  • Loading branch information
juancgalvis authored Feb 7, 2024
1 parent 0f2032a commit 986675e
Show file tree
Hide file tree
Showing 152 changed files with 2,790 additions and 1,547 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,5 @@ gradle-app.setting
*.hprof
*.log.*
FFDC
commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/internal/models/eda
examples/mq-reactive/src/main/java/co/com/bancolombia/sample/app/config/MQRegistryConfig.java
187 changes: 57 additions & 130 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ This library offers a performant setup for JMS Clients.
There are some scenarios covered by the library:

- Listen messages from a fixed queue.
- Send messages to a fixed queue.
- Listen messages from a temporary queue.
- Send messages to a temporary queue.
- Get messages with specific correlationId from a fixed queue.
- Send messages to a fixed and temporary queues.
- Request Reply pattern with automatic temporary queue.
- Request Reply pattern with automatic get message by selector from fixed queue.

## Compatibility

Expand All @@ -30,6 +28,8 @@ There are some scenarios covered by the library:
| 0.6.0 | 2.7.6 | JMS 2 javax |
| 1.0.1 | 3.0.6 | JMS 3 jakarta |
| 1.1.0 | 3.1.1 | JMS 3 jakarta |
| 1.4.1 | 3.2.1 | JMS 3 jakarta |
| 2.0.0 | 3.2.1 | JMS 3 jakarta |


### Limitations
Expand Down Expand Up @@ -85,87 +85,12 @@ graph TD

The amount of JMSContexts and JMSConsumers is related to the `concurrency` annotation attribute, it is based on JMS 2.0.

#### Listening an autogenerated temporary queue

To listen a temporary queue you should use the `tempQueueAlias` annotation attribute, the assigned value is the key that
you would be use to get the TemporaryQueue object when needed.

```java
// For an autogenerated temporary queue
@MQListener(tempQueueAlias = "any-custom-key")
public Mono<Void> processFromTemporaryQueue(Message message)throws JMSException{
String text=((TextMessage)message).getText();
return doSomething(text);
}
```

```java
// For an autogenerated temporary queue non reactive project
@MQListener(tempQueueAlias = "any-custom-key")
public void processFromTemporaryQueue(Message message)throws JMSException{
String text=((TextMessage)message).getText();
doSomething(text);
}
```

This sample will listen for an autogenerated temporary queue with key `any-custom-key`, the JMS objects structure will
be like this:

```mermaid
graph TD
X[ConnectionFactory] -->|create| A
A[Connection] -->|create| B(Session)
B --> |create| C(TemporaryQueue)
A -->|create| D(Session)
D -->|create| E(MessageConsumer: TemporaryQueue)
E -->|notifies|F[MessageListener]
A -->|create| G(Session)
G -->|create| H(MessageConsumer: TemporaryQueue)
H -->|notifies|F
A -->|create| I(Session)
I -->|create| J(MessageConsumer: TemporaryQueue)
J -->|notifies|F
```

The amount of Sessions and MessageConsumers is related to the `concurrency` annotation attribute, it is based on JMS
1.1.

#### Listening for a specific message

To listen for specific messages is enough with enable message listener selector with the next class annotation:

```java
@Configuration
@EnableMQSelectorMessageListener
public class AnyConfigurationComponentOrService {
}
```

This annotation will create an available bean that offers the ability to get an specific message from a queue using the correlationId attribute, there are two bean options:

For reactive projects will be `MQMessageSelectorListener` bean which has the next two methods:

```java
public interface MQMessageSelectorListener {
Mono<Message> getMessage(String correlationId);
Mono<Message> getMessage(String correlationId, long timeout, Destination destination);
}
```

For non-reactive projects will be `MQMessageSelectorListenerSync` bean which has the next two methods:

```java
public interface MQMessageSelectorListenerSync {
Message getMessage(String correlationId);
Message getMessage(String correlationId, long timeout, Destination destination);
}
```

The above beans can throw a `JMSRuntimeException` or a `ReceiveTimeoutException`.

## Sending messages

To send messages exists the `@EnableMQMessageSender` annotation which enables the producers auto-configuration.
To send messages exists the `@EnableMQGateway` annotation which enables the producers auto-configuration.

This configuration creates a JMS objects structure like this:

Expand Down Expand Up @@ -194,7 +119,7 @@ JMS 2.0.

@Component
@AllArgsConstructor
@EnableMQMessageSender
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class SampleMQMessageSender {
private final MQMessageSender sender;
// private final MQQueuesContainer container; // Inject it to reference a temporary queue
Expand All @@ -215,7 +140,7 @@ public class SampleMQMessageSender {

@Component
@AllArgsConstructor
@EnableMQMessageSender
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class SampleMQMessageSender {
private final MQMessageSenderSync sender;
// private final MQQueuesContainer container; // Inject it to reference a temporary queue
Expand All @@ -233,6 +158,16 @@ public class SampleMQMessageSender {
This sample shows how to send a message to a default destination queue, also shows how reference an autogenerated
temporary queue.

If you need to have another message sender, you can define it with the `@MQSender` annotation.
```java
@MQSender(connectionFactory = "domainB")
public interface XDomainSender extends MQMessageSender {
}
```

In this case we pass a connectionFactory bean called domainB, this configuration allow you to send messages to another
broker. Remind that a MQMessageSender can send messages to all queues in a QueueManager, so you only need to have one by Queue Manager.

#### Send message to another queue

```java
Expand Down Expand Up @@ -277,19 +212,14 @@ next interface signatures:
```

For example, you define an interface like the next, so it could be auto implemented by the library:
this [MyRequestReply](examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/drivenadapters/reqreply/MyRequestReply.java)
```java
public interface MyRequestReply {
Mono<Message> requestReply(String message);
}
```
this [MyRequestReplyTmp](examples/mq-reactive/src/main/java/co/com/bancolombia/sample/drivenadapters/reqreply/MyRequestReplyTmp.java)

To achieve the auto implementation, you should:

1. Annotate the application or a configuration bean with @EnableReqReply, optionally you can define the base package
1. Annotate the application or a configuration bean with @EnableMQGateway, optionally you can define the base package
```java
@SpringBootApplication(scanBasePackages = "co.com.bancolombia")
@EnableReqReply(scanBasePackages = "co.com.bancolombia")
@EnableMQGateway(scanBasePackages = "co.com.bancolombia")
public class MainApplication {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class);
Expand All @@ -299,19 +229,18 @@ To achieve the auto implementation, you should:

2. Annotate the interface with @ReqReply, for example
```java
@ReqReply(requestQueue = "DEV.QUEUE.1", replyQueueTemp = "sample")
public interface MyRequestReply {
Mono<Message> requestReply(String message);
}
@ReqReply(requestQueue = "DEV.QUEUE.1") // in queue names you can use ${some.property.name} spring placeholder notation
public interface MyRequestReplyTmp extends MQRequestReply {
}
```

3. Now you can inject your interface in any spring component.
[MyRequestReplyAdapter](examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/drivenadapters/reqreply/MyRequestReplyAdapter.java)
[MyRequestReplyAdapter](examples/mq-reactive/src/main/java/co/com/bancolombia/sample/drivenadapters/reqreply/MyRequestReplyAdapter.java)
```java
@Component
@AllArgsConstructor
public class MyRequestReplyAdapter implements RequestGateway {
private final MyRequestReply requestReply;
private final MyRequestReplyTmp requestReply;
...
}
```
Expand All @@ -332,45 +261,20 @@ you can use a Request Reply pattern based on a fixed queue, you should consider
In this scenario you should not consider any setup. Following code snippet can show a basic implementation:

```java
public Mono<String> requestReply(String request) {
return sender.send(context -> context.createTextMessage(request))
.flatMap(listener::getMessage)
.map(this::extractResult);
}

@SneakyThrows
private String extractResult(Message message) {
return ((TextMessage) message).getText();
@ReqReply(requestQueue = "DEV.QUEUE.1", replyQueue = "DEV.QUEUE.2", queueType = FIXED)
public interface MyRequestReply extends MQRequestReply {
}
```
Then inject this interface to your adapter like with temporary queue

- Multiple Queue Manager or Clustering:
In this scenario you should guarantee that:
- the application that attends the request follow the replyTo header.
- set to `true` the property `commons.jms.input-queue-set-queue-manager` to identify and set the queue manager to the
response queue (this guarantees that the application that attends the request send the response to the specific
queue manager).
- Then the same like with a single Queue Manager

Following code snippet can show a basic implementation:

```java
public Mono<String> requestReply(String request) {
return sender.send(context -> {
Message message = context.createTextMessage(request);
message.setJMSReplyTo(container.get("RESPONSE.QUEUE.NAME"));
return message;
})
.flatMap(listener::getMessage)
.map(this::extractResult);
}

@SneakyThrows
private String extractResult(Message message) {
return ((TextMessage) message).getText();
}
```

In both scenarios you should use the `MQMessageSender`, the `MQMessageSelectorListener` and maybe the `MQQueuesContainer`.

## Setup

Expand All @@ -387,8 +291,6 @@ the next properties:
- **concurrency**: *Number of open connections to listening the queue*, applies for fixed and temporary queues.
- **connectionFactory**: *Name of a specific `ConnectionFactory` Bean*, used to create the connections for this
consumer.
- **tempQueueAlias**: *An arbitrary key or identifier for an autogenerated temporary queue*, por ejemplo `my-id`, use
only when listen for a temporary queue
- **queueCustomizer**: *Name of a specific `MQQueueCustomizer` Bean*, used to customize the listening queue properties
before start the consumers.

Expand All @@ -397,7 +299,6 @@ application.yaml of your application.

- `commons.jms.input-concurrency`: Equivalent to `concurrency` annotation property.
- `commons.jms.input-queue`: Equivalent to `value` annotation property.
- `commons.jms.input-queue-alias`: Equivalent to `tempQueueAlias` annotation property.
- `commons.jms.input-queue-set-queue-manager`: Enable it to set the resolved queue manager when needed.

### Sender properties
Expand All @@ -424,13 +325,22 @@ This library uses the default bean of kind `ConnectionFactory`, you can customiz
To customize sender you should override the default `MQMessageSenderSync` bean refers to
[Custom configurations](#Custom-configurations)

#### Multiple Broker

If you need multi-broker support you only should define the ConnectionFactory bean with a name
and then use this name on each annotation that you need.

This setting is available for:
- `@MQSender`
- `@MQListener`
- `@ReqReply`

### Custom configurations

You can define custom beans to change default behaviors:

- [`MQAutoconfiguration`](commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfiguration.java)
- [`MQAutoconfigurationSender`](commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSender.java)
- [`MQQueueManagerSetter`](commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/MQAutoconfigurationSelectorListener.java)
- [`MQAutoconfigurationSender`](commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/config/senders/MQAutoconfigurationSender.java)

You should create and register many fixed response queues for request reply, in this case you can override the
`MQQueueManagerSetter` as following:
Expand Down Expand Up @@ -461,6 +371,23 @@ You should create and register many fixed response queues for request reply, in

- [`MQUtils`](commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/utils/MQUtils.java)

## Migration

### From 1.x.x to 2.x.x

Change notes:
- `@MQListener` has removed support to listen a temporary queue, because `@ReqReply` use this behaviour by default.
- `@ReqReply` has added support to do a request reply pattern using fixed queues with get message by selector.

Actions:
- `@EnableMQSelectorMessageListener` has been removed, now you can use `@ReqReply` directly using `queueType` attribute
with value `FIXED`.
- `@EnableMQMessageSender` has been removed, now you should use `@EnableMQGateway`.
- `@EnableReqReply` has been removed, now you should use `@EnableMQGateway` passing the same `scanBasePackages` property.
- property `replyQueueTemp` has been renamed to `replyQueue` in `@ReqReply`.
- `commons.jms.input-queue-alias` has been removed now you only can set the alias with `replyQueue`.


# How can I help?

Review the issues, we hear new ideas. Read more [Contributing](./CONTRIBUTING.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.com.bancolombia.commons.jms.api;

import jakarta.jms.Destination;
import reactor.core.publisher.Mono;

public interface MQDomainMessageSender {
Mono<String> send(String domain, Destination destination, MQMessageCreator messageCreator);

Mono<String> send(String domain, MQMessageCreator messageCreator);

/**
* You can retrieve the MQMessageSender to avoid queries to Map
* @param domain Domain name or connectionFactory bean name
* @return MQMessageSender
*/
MQMessageSender forDomain(String domain);
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package co.com.bancolombia.commons.jms.api;

import reactor.core.publisher.Mono;

import jakarta.jms.Destination;
import reactor.core.publisher.Mono;

public interface MQMessageSender {
Mono<String> send(Destination destination, MQMessageCreator messageCreator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import jakarta.jms.JMSContext;
import jakarta.jms.Queue;

import java.util.function.BiConsumer;

public interface MQQueueManagerSetter extends BiConsumer<JMSContext, Queue> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package co.com.bancolombia.commons.jms.api;

import jakarta.jms.Message;
import reactor.core.publisher.Mono;

import jakarta.jms.Message;
import java.time.Duration;

public interface MQRequestReply {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package co.com.bancolombia.commons.jms.mq;

import co.com.bancolombia.commons.jms.mq.config.MQAutoconfigurationSender;
import co.com.bancolombia.commons.jms.mq.config.proxy.EnableReqReplyRegistrar;
import co.com.bancolombia.commons.jms.mq.config.MQAnnotationAutoconfiguration;
import co.com.bancolombia.commons.jms.mq.config.senders.MQAutoconfigurationSender;
import org.springframework.context.annotation.Import;
import org.springframework.core.annotation.AliasFor;

Expand All @@ -14,9 +14,8 @@
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({MQAutoconfigurationSender.class, EnableReqReplyRegistrar.class})
public @interface EnableReqReply {

@Import({MQAutoconfigurationSender.class, MQAnnotationAutoconfiguration.class})
public @interface EnableMQGateway {
@AliasFor("scanBasePackages")
String value() default "";

Expand Down

This file was deleted.

Loading

0 comments on commit 986675e

Please sign in to comment.