Skip to content

Commit

Permalink
fix(RequestReply): Add additional methods for request reply with mess…
Browse files Browse the repository at this point in the history
…age selector
  • Loading branch information
juancgalvis committed Nov 13, 2024
1 parent 6fd8f61 commit 251118c
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package co.com.bancolombia.commons.jms.api;

import co.com.bancolombia.commons.jms.api.exceptions.InvalidUsageException;
import jakarta.jms.Destination;
import reactor.core.publisher.Mono;

import java.time.Duration;
Expand All @@ -12,4 +14,12 @@ public interface MQRequestReplyBase<T> {
Mono<T> requestReply(MQMessageCreator messageCreator);

Mono<T> requestReply(MQMessageCreator messageCreator, Duration timeout);

default Mono<T> requestReply(String message, Destination request, Destination reply, Duration timeout) {
return Mono.error(() -> new InvalidUsageException("This method is not supported"));
} // For fixed queues only

default Mono<T> requestReply(MQMessageCreator messageCreator, Destination request, Destination reply, Duration timeout) {
return Mono.error(() -> new InvalidUsageException("This method is not supported"));
} // For fixed queues only
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package co.com.bancolombia.commons.jms.api.exceptions;

public class InvalidUsageException extends RuntimeException {

public InvalidUsageException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public Mono<Message> requestReply(String message, Destination request, Destinati
.flatMap(id -> listener.getMessageBySelector(selector.buildSelector(id), timeout.toMillis(), reply));
}

public Mono<Message> requestReply(MQMessageCreator messageCreator, Destination request, Destination reply, Duration timeout) {
return sender.send(request, messageCreator)
.flatMap(id -> listener.getMessageBySelector(selector.buildSelector(id), timeout.toMillis(), reply));
}

private MQMessageCreator defaultCreator(String message) {
return ctx -> {
Message jmsMessage = ctx.createTextMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import co.com.bancolombia.commons.jms.api.MQMessageCreator;
import co.com.bancolombia.commons.jms.api.MQMessageSender;
import co.com.bancolombia.commons.jms.api.MQQueuesContainer;
import co.com.bancolombia.commons.jms.api.exceptions.InvalidUsageException;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import jakarta.jms.Destination;
Expand Down Expand Up @@ -100,4 +101,24 @@ void shouldNotFailWhenNoRelatedMessage() throws JMSException {
// Assert
}

@Test
void shouldFailWhenUsingFixedMethod() throws JMSException {
// Arrange
// Act
Mono<Message> flow = listener.requestReply(context1 -> message, destination, destination, Duration.ofSeconds(1));
// Assert
StepVerifier.create(flow)
.verifyError(InvalidUsageException.class);
}

@Test
void shouldFailWhenUsingFixedMethodStringMessage() throws JMSException {
// Arrange
// Act
Mono<Message> flow = listener.requestReply("message", destination, destination, Duration.ofSeconds(1));
// Assert
StepVerifier.create(flow)
.verifyError(InvalidUsageException.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class MQRequestReplySelectorTest {
@Mock
private Queue destination;
@Mock
private Queue replyDestination;
@Mock
private JMSContext context;
@Mock
private MQMessageSender sender;
Expand Down Expand Up @@ -74,6 +76,32 @@ void shouldSendAndGetReplyFromFixed() {
.verifyComplete();
}

@Test
void shouldSendAndGetReplyFromFixedWithSpecificQueuesFromStringMessage() {
// Arrange
when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.just("id"));
when(listener.getMessageBySelector(anyString(), anyLong(), any(Destination.class))).thenReturn(Mono.just(message));
// Act
Mono<Message> reply = reqReply.requestReply("MyMessage", destination, replyDestination, Duration.ofSeconds(1));
// Assert
StepVerifier.create(reply)
.assertNext(message1 -> assertEquals(message, message1))
.verifyComplete();
}

@Test
void shouldSendAndGetReplyFromFixedWithSpecificQueues() {
// Arrange
when(sender.send(any(Destination.class), any(MQMessageCreator.class))).thenReturn(Mono.just("id"));
when(listener.getMessageBySelector(anyString(), anyLong(), any(Destination.class))).thenReturn(Mono.just(message));
// Act
Mono<Message> reply = reqReply.requestReply(context -> message, destination, replyDestination, Duration.ofSeconds(1));
// Assert
StepVerifier.create(reply)
.assertNext(message1 -> assertEquals(message, message1))
.verifyComplete();
}

@Test
void shouldReplyWithTimeoutFromFixed() {
// Arrange
Expand Down

0 comments on commit 251118c

Please sign in to comment.