diff --git a/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/MQRequestReplyBase.java b/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/MQRequestReplyBase.java index 9cf4b25..fd63235 100644 --- a/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/MQRequestReplyBase.java +++ b/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/MQRequestReplyBase.java @@ -1,5 +1,7 @@ package co.com.bancolombia.commons.jms.api; +import co.com.bancolombia.commons.jms.api.exceptions.InvalidUsageException; +import jakarta.jms.Destination; import reactor.core.publisher.Mono; import java.time.Duration; @@ -12,4 +14,12 @@ public interface MQRequestReplyBase { Mono requestReply(MQMessageCreator messageCreator); Mono requestReply(MQMessageCreator messageCreator, Duration timeout); + + default Mono requestReply(String message, Destination request, Destination reply, Duration timeout) { + return Mono.error(() -> new InvalidUsageException("This method is not supported")); + } // For fixed queues only + + default Mono requestReply(MQMessageCreator messageCreator, Destination request, Destination reply, Duration timeout) { + return Mono.error(() -> new InvalidUsageException("This method is not supported")); + } // For fixed queues only } diff --git a/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/exceptions/InvalidUsageException.java b/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/exceptions/InvalidUsageException.java new file mode 100644 index 0000000..c2c1728 --- /dev/null +++ b/commons-jms-api/src/main/java/co/com/bancolombia/commons/jms/api/exceptions/InvalidUsageException.java @@ -0,0 +1,8 @@ +package co.com.bancolombia.commons.jms.api.exceptions; + +public class InvalidUsageException extends RuntimeException { + + public InvalidUsageException(String message) { + super(message); + } +} diff --git a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelector.java b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelector.java index 8be02bb..dae464b 100644 --- a/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelector.java +++ b/commons-jms-mq/src/main/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelector.java @@ -68,6 +68,11 @@ public Mono requestReply(String message, Destination request, Destinati .flatMap(id -> listener.getMessageBySelector(selector.buildSelector(id), timeout.toMillis(), reply)); } + public Mono requestReply(MQMessageCreator messageCreator, Destination request, Destination reply, Duration timeout) { + return sender.send(request, messageCreator) + .flatMap(id -> listener.getMessageBySelector(selector.buildSelector(id), timeout.toMillis(), reply)); + } + private MQMessageCreator defaultCreator(String message) { return ctx -> { Message jmsMessage = ctx.createTextMessage(message); diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplyListenerTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplyListenerTest.java index 0918c71..00705c6 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplyListenerTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplyListenerTest.java @@ -3,6 +3,7 @@ import co.com.bancolombia.commons.jms.api.MQMessageCreator; import co.com.bancolombia.commons.jms.api.MQMessageSender; import co.com.bancolombia.commons.jms.api.MQQueuesContainer; +import co.com.bancolombia.commons.jms.api.exceptions.InvalidUsageException; import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp; import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import jakarta.jms.Destination; @@ -100,4 +101,24 @@ void shouldNotFailWhenNoRelatedMessage() throws JMSException { // Assert } + @Test + void shouldFailWhenUsingFixedMethod() throws JMSException { + // Arrange + // Act + Mono flow = listener.requestReply(context1 -> message, destination, destination, Duration.ofSeconds(1)); + // Assert + StepVerifier.create(flow) + .verifyError(InvalidUsageException.class); + } + + @Test + void shouldFailWhenUsingFixedMethodStringMessage() throws JMSException { + // Arrange + // Act + Mono flow = listener.requestReply("message", destination, destination, Duration.ofSeconds(1)); + // Assert + StepVerifier.create(flow) + .verifyError(InvalidUsageException.class); + } + } diff --git a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelectorTest.java b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelectorTest.java index 73eb821..37ca45d 100644 --- a/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelectorTest.java +++ b/commons-jms-mq/src/test/java/co/com/bancolombia/commons/jms/mq/listeners/MQRequestReplySelectorTest.java @@ -37,6 +37,8 @@ class MQRequestReplySelectorTest { @Mock private Queue destination; @Mock + private Queue replyDestination; + @Mock private JMSContext context; @Mock private MQMessageSender sender; @@ -74,6 +76,32 @@ void shouldSendAndGetReplyFromFixed() { .verifyComplete(); } + @Test + void shouldSendAndGetReplyFromFixedWithSpecificQueuesFromStringMessage() { + // Arrange + when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.just("id")); + when(listener.getMessageBySelector(anyString(), anyLong(), any(Destination.class))).thenReturn(Mono.just(message)); + // Act + Mono reply = reqReply.requestReply("MyMessage", destination, replyDestination, Duration.ofSeconds(1)); + // Assert + StepVerifier.create(reply) + .assertNext(message1 -> assertEquals(message, message1)) + .verifyComplete(); + } + + @Test + void shouldSendAndGetReplyFromFixedWithSpecificQueues() { + // Arrange + when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.just("id")); + when(listener.getMessageBySelector(anyString(), anyLong(), any(Destination.class))).thenReturn(Mono.just(message)); + // Act + Mono reply = reqReply.requestReply(context -> message, destination, replyDestination, Duration.ofSeconds(1)); + // Assert + StepVerifier.create(reply) + .assertNext(message1 -> assertEquals(message, message1)) + .verifyComplete(); + } + @Test void shouldReplyWithTimeoutFromFixed() { // Arrange