From a42ae27d781673b6110caf90d20c844b3bef758f Mon Sep 17 00:00:00 2001 From: JuanC Galvis <8420868+juancgalvis@users.noreply.github.com> Date: Fri, 14 Jan 2022 10:50:43 -0500 Subject: [PATCH] include reactive reply router as utils class, remove issue (#9) * include reactive reply router as utils class, remove issue * add parametrized class --- .../RelatedMessageNotFoundException.java | 9 ++++ .../commons/jms/security/SSLContextUtils.java | 3 ++ .../jms/utils/ReactiveReplyRouter.java | 42 +++++++++++++++++++ .../jms/utils/ReactiveReplyRouterTest.java | 34 +++++++++++++++ .../jms/sample/app/config/Config.java | 7 ++++ .../usecase/ReactiveReplyRouterUseCase.java | 37 ---------------- .../sample/domain/usecase/SampleUseCase.java | 9 ++-- .../jms/sample/entrypoints/MyMQListener.java | 4 +- gradle.properties | 2 +- 9 files changed, 103 insertions(+), 44 deletions(-) create mode 100644 commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/exceptions/RelatedMessageNotFoundException.java create mode 100644 commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouter.java create mode 100644 commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouterTest.java delete mode 100644 examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/ReactiveReplyRouterUseCase.java diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/exceptions/RelatedMessageNotFoundException.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/exceptions/RelatedMessageNotFoundException.java new file mode 100644 index 0000000..5765d5c --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/exceptions/RelatedMessageNotFoundException.java @@ -0,0 +1,9 @@ +package co.com.bancolombia.commons.jms.exceptions; + +import javax.jms.JMSRuntimeException; + +public class RelatedMessageNotFoundException extends JMSRuntimeException { + public RelatedMessageNotFoundException(String correlationId) { + super("Processor not found for correlationId: " + correlationId); + } +} diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/security/SSLContextUtils.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/security/SSLContextUtils.java index 083533b..a9624fb 100644 --- a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/security/SSLContextUtils.java +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/security/SSLContextUtils.java @@ -1,5 +1,7 @@ package co.com.bancolombia.commons.jms.security; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; import lombok.SneakyThrows; import javax.net.ssl.CertPathTrustManagerParameters; @@ -16,6 +18,7 @@ import java.security.cert.X509CertSelector; import java.util.EnumSet; +@NoArgsConstructor(access = AccessLevel.PRIVATE) public class SSLContextUtils { public static final String TLS = "TLSv1.3"; public static final String JKS = "JKS"; diff --git a/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouter.java b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouter.java new file mode 100644 index 0000000..bc764be --- /dev/null +++ b/commons-jms-utils/src/main/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouter.java @@ -0,0 +1,42 @@ +package co.com.bancolombia.commons.jms.utils; + +import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException; +import lombok.extern.log4j.Log4j2; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; + +@Log4j2 +public class ReactiveReplyRouter { + private final ConcurrentHashMap> processors = new ConcurrentHashMap<>(); + + public Mono wait(String messageId) { + final Sinks.One processor = Sinks.one(); + processors.put(messageId, processor); + log.info("Waiting for: {}", messageId); + return processor.asMono(); + } + + public Mono wait(String messageId, Duration timeout) { + return this.wait(messageId).timeout(timeout).doOnError(TimeoutException.class, e -> clean(messageId)); + } + + public void reply(String correlationID, T response) { + if (correlationID != null) { + log.info("Replying with id: {}", correlationID); + final Sinks.One processor = processors.remove(correlationID); + if (processor == null) { + throw new RelatedMessageNotFoundException(correlationID); + } else { + processor.tryEmitValue(response); + } + } + } + + public void clean(String correlationId) { + processors.remove(correlationId); + } +} diff --git a/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouterTest.java b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouterTest.java new file mode 100644 index 0000000..f06be5b --- /dev/null +++ b/commons-jms-utils/src/test/java/co/com/bancolombia/commons/jms/utils/ReactiveReplyRouterTest.java @@ -0,0 +1,34 @@ +package co.com.bancolombia.commons.jms.utils; + +import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ReactiveReplyRouterTest { + private final ReactiveReplyRouter router = new ReactiveReplyRouter<>(); + + @Test + void shouldReply() { + Mono flow = router.wait("123"); + router.reply("123", "result"); + StepVerifier.create(flow).expectNext("result").verifyComplete(); + } + + @Test + void shouldFailNotFound() { + assertThrows(RelatedMessageNotFoundException.class, () -> router.reply("1234", "result")); + } + + @Test + void shouldHandleTimeout() { + Mono flow = router.wait("1234", Duration.ZERO); + router.reply(null, null); + StepVerifier.create(flow).expectError(TimeoutException.class).verify(); + } +} diff --git a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/app/config/Config.java b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/app/config/Config.java index 5eca88a..826a08c 100644 --- a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/app/config/Config.java +++ b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/app/config/Config.java @@ -1,5 +1,7 @@ package co.com.bancolombia.jms.sample.app.config; +import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; +import co.com.bancolombia.jms.sample.domain.model.Result; import com.fasterxml.jackson.databind.ObjectMapper; import com.ibm.mq.spring.boot.MQConnectionFactoryCustomizer; import org.springframework.context.annotation.Bean; @@ -29,4 +31,9 @@ public MQConnectionFactoryCustomizer cfCustomizer() { } }; } + + @Bean + public ReactiveReplyRouter resultReactiveReplyRouter() { + return new ReactiveReplyRouter<>(); + } } diff --git a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/ReactiveReplyRouterUseCase.java b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/ReactiveReplyRouterUseCase.java deleted file mode 100644 index 8d4059b..0000000 --- a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/ReactiveReplyRouterUseCase.java +++ /dev/null @@ -1,37 +0,0 @@ -package co.com.bancolombia.jms.sample.domain.usecase; - -import co.com.bancolombia.jms.sample.domain.exceptions.RelatedMessageNotFoundException; -import co.com.bancolombia.jms.sample.domain.model.Result; -import lombok.extern.log4j.Log4j2; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; - -import java.util.concurrent.ConcurrentHashMap; - -@Log4j2 -public class ReactiveReplyRouterUseCase { - private final ConcurrentHashMap> processors = new ConcurrentHashMap<>(); - - public Mono wait(String correlationID) { - final Sinks.One processor = Sinks.one(); - processors.put(correlationID, processor); - log.info("Waiting for: {}", correlationID); - return processor.asMono(); - } - - public void reply(String correlationID, Result response) { - if (correlationID != null) { - log.info("Replying with id: {}", correlationID); - final Sinks.One processor = processors.remove(correlationID); - if (processor == null) { - throw new RelatedMessageNotFoundException(); - } else { - processor.tryEmitValue(response); - } - } - } - - public void clean(String correlationId) { - processors.remove(correlationId); - } -} diff --git a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/SampleUseCase.java b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/SampleUseCase.java index 2b4a136..e9a9a02 100644 --- a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/SampleUseCase.java +++ b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/domain/usecase/SampleUseCase.java @@ -1,5 +1,6 @@ package co.com.bancolombia.jms.sample.domain.usecase; +import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import co.com.bancolombia.jms.sample.domain.model.Request; import co.com.bancolombia.jms.sample.domain.model.RequestGateway; import co.com.bancolombia.jms.sample.domain.model.Result; @@ -14,13 +15,13 @@ @AllArgsConstructor public class SampleUseCase { private final RequestGateway gateway; - private final ReactiveReplyRouterUseCase replier; + private final ReactiveReplyRouter replier; public Mono sendAndListen() { return gateway.send(Request.builder() - .id(UUID.randomUUID().toString()) - .createdAt(new Date().getTime()) - .build()) + .id(UUID.randomUUID().toString()) + .createdAt(new Date().getTime()) + .build()) .doOnSuccess(id -> log.info("Message sent: {}", id)) .flatMap(replier::wait); } diff --git a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/entrypoints/MyMQListener.java b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/entrypoints/MyMQListener.java index fc40cd2..72bc0cb 100644 --- a/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/entrypoints/MyMQListener.java +++ b/examples/mq-reactive/src/main/java/co/com/bancolombia/jms/sample/entrypoints/MyMQListener.java @@ -1,9 +1,9 @@ package co.com.bancolombia.jms.sample.entrypoints; import co.com.bancolombia.commons.jms.mq.MQListener; +import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter; import co.com.bancolombia.jms.sample.domain.model.Request; import co.com.bancolombia.jms.sample.domain.model.Result; -import co.com.bancolombia.jms.sample.domain.usecase.ReactiveReplyRouterUseCase; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; @@ -20,7 +20,7 @@ @Component @AllArgsConstructor public class MyMQListener { - private final ReactiveReplyRouterUseCase useCase; + private final ReactiveReplyRouter useCase; private final ObjectMapper mapper; @MQListener(maxRetries = "10") diff --git a/gradle.properties b/gradle.properties index f6fe150..76fc6d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.1.0 +version=0.1.1 springBootVersion=2.5.8 gradleVersionsVersion=0.28.0 mqJMSVersion=2.5.0