Skip to content

Commit

Permalink
include reactive reply router as utils class, remove issue (#9)
Browse files Browse the repository at this point in the history
* include reactive reply router as utils class, remove issue

* add parametrized class
  • Loading branch information
juancgalvis authored Jan 14, 2022
1 parent eed6f54 commit a42ae27
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package co.com.bancolombia.commons.jms.exceptions;

import javax.jms.JMSRuntimeException;

public class RelatedMessageNotFoundException extends JMSRuntimeException {
public RelatedMessageNotFoundException(String correlationId) {
super("Processor not found for correlationId: " + correlationId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package co.com.bancolombia.commons.jms.security;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;

import javax.net.ssl.CertPathTrustManagerParameters;
Expand All @@ -16,6 +18,7 @@
import java.security.cert.X509CertSelector;
import java.util.EnumSet;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SSLContextUtils {
public static final String TLS = "TLSv1.3";
public static final String JKS = "JKS";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package co.com.bancolombia.commons.jms.utils;

import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException;
import lombok.extern.log4j.Log4j2;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;

@Log4j2
public class ReactiveReplyRouter<T> {
private final ConcurrentHashMap<String, Sinks.One<T>> processors = new ConcurrentHashMap<>();

public Mono<T> wait(String messageId) {
final Sinks.One<T> processor = Sinks.one();
processors.put(messageId, processor);
log.info("Waiting for: {}", messageId);
return processor.asMono();
}

public Mono<T> wait(String messageId, Duration timeout) {
return this.wait(messageId).timeout(timeout).doOnError(TimeoutException.class, e -> clean(messageId));
}

public void reply(String correlationID, T response) {
if (correlationID != null) {
log.info("Replying with id: {}", correlationID);
final Sinks.One<T> processor = processors.remove(correlationID);
if (processor == null) {
throw new RelatedMessageNotFoundException(correlationID);
} else {
processor.tryEmitValue(response);
}
}
}

public void clean(String correlationId) {
processors.remove(correlationId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package co.com.bancolombia.commons.jms.utils;

import co.com.bancolombia.commons.jms.exceptions.RelatedMessageNotFoundException;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertThrows;

class ReactiveReplyRouterTest {
private final ReactiveReplyRouter<String> router = new ReactiveReplyRouter<>();

@Test
void shouldReply() {
Mono<String> flow = router.wait("123");
router.reply("123", "result");
StepVerifier.create(flow).expectNext("result").verifyComplete();
}

@Test
void shouldFailNotFound() {
assertThrows(RelatedMessageNotFoundException.class, () -> router.reply("1234", "result"));
}

@Test
void shouldHandleTimeout() {
Mono<String> flow = router.wait("1234", Duration.ZERO);
router.reply(null, null);
StepVerifier.create(flow).expectError(TimeoutException.class).verify();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package co.com.bancolombia.jms.sample.app.config;

import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import co.com.bancolombia.jms.sample.domain.model.Result;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.spring.boot.MQConnectionFactoryCustomizer;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -29,4 +31,9 @@ public MQConnectionFactoryCustomizer cfCustomizer() {
}
};
}

@Bean
public ReactiveReplyRouter<Result> resultReactiveReplyRouter() {
return new ReactiveReplyRouter<>();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package co.com.bancolombia.jms.sample.domain.usecase;

import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import co.com.bancolombia.jms.sample.domain.model.Request;
import co.com.bancolombia.jms.sample.domain.model.RequestGateway;
import co.com.bancolombia.jms.sample.domain.model.Result;
Expand All @@ -14,13 +15,13 @@
@AllArgsConstructor
public class SampleUseCase {
private final RequestGateway gateway;
private final ReactiveReplyRouterUseCase replier;
private final ReactiveReplyRouter<Result> replier;

public Mono<Result> sendAndListen() {
return gateway.send(Request.builder()
.id(UUID.randomUUID().toString())
.createdAt(new Date().getTime())
.build())
.id(UUID.randomUUID().toString())
.createdAt(new Date().getTime())
.build())
.doOnSuccess(id -> log.info("Message sent: {}", id))
.flatMap(replier::wait);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package co.com.bancolombia.jms.sample.entrypoints;

import co.com.bancolombia.commons.jms.mq.MQListener;
import co.com.bancolombia.commons.jms.utils.ReactiveReplyRouter;
import co.com.bancolombia.jms.sample.domain.model.Request;
import co.com.bancolombia.jms.sample.domain.model.Result;
import co.com.bancolombia.jms.sample.domain.usecase.ReactiveReplyRouterUseCase;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
Expand All @@ -20,7 +20,7 @@
@Component
@AllArgsConstructor
public class MyMQListener {
private final ReactiveReplyRouterUseCase useCase;
private final ReactiveReplyRouter<Result> useCase;
private final ObjectMapper mapper;

@MQListener(maxRetries = "10")
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=0.1.0
version=0.1.1
springBootVersion=2.5.8
gradleVersionsVersion=0.28.0
mqJMSVersion=2.5.0
Expand Down

0 comments on commit a42ae27

Please sign in to comment.