Skip to content

Commit

Permalink
Add unit tests fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
juancgalvis committed Oct 27, 2023
1 parent 47aa8be commit d89bbe4
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

@Log4j2
public class MQAutoconfigurationSelectorListener {
public static int MAX_THREADS = 200;
public static long KEEP_ALIVE_SECONDS = 5L;
public static final int MAX_THREADS = 200;
public static final long KEEP_ALIVE_SECONDS = 5L;

@Bean
@ConditionalOnMissingBean(ExecutorService.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ void shouldHandleError() {
MQProperties properties = new MQProperties();
properties.setMaxRetries(10);
properties.setInitialRetryIntervalMillis(1000);
SelectorModeProvider provider = SelectorModeProvider.defaultSelector();
// Act
// Assert
assertThrows(MQInvalidListenerException.class,
() -> configurator.defaultMQMultiContextMessageSelectorListenerSync(null, config, healthListener,
properties, SelectorModeProvider.defaultSelector()));
properties, provider));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ protected String name() {

@Override
protected void disconnect() throws JMSException {
// do not disconnect to avoid another thread exceptions
}

@Override
protected MQContextMessageSelectorListenerSync connect() {
long handled = System.currentTimeMillis();
synchronized ("connectSelectorListener") {
synchronized (this) {
if (handled > lastSuccess.get()) {
log.info("Starting listener {}", getProcess());
JMSContext context = connectionFactory.createContext();
Expand Down Expand Up @@ -82,7 +83,7 @@ public Message getMessageBySelector(String selector, long timeout, Destination d
return getMessageBySelector(selector, timeout, destination, true);
}

public Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) {
protected Message getMessageBySelector(String selector, long timeout, Destination destination, boolean retry) {
try {
return strategy.getMessageBySelector(selector, timeout, destination);
} catch (JMSRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

@Log4j2
@AllArgsConstructor
Expand All @@ -22,84 +23,52 @@ public class MQMultiContextMessageSelectorListener implements MQMessageSelectorL
@Override
public Mono<Message> getMessage(String correlationId) {
return router.wait(correlationId)
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessage(correlationId);
router.reply(correlationId, message);
} catch (Exception e) {
log.warn("Error getting message with correlationId: {}", correlationId, e);
router.error(correlationId, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId))));
}

@Override
public Mono<Message> getMessage(String correlationId, long timeout) {
return router.wait(correlationId, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessage(correlationId, timeout);
router.reply(correlationId, message);
} catch (Exception e) {
log.warn("Error getting message with correlationId: {}", correlationId, e);
router.error(correlationId, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId, timeout))));
}

@Override
public Mono<Message> getMessage(String correlationId, long timeout, Destination destination) {
return router.wait(correlationId, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessage(correlationId, timeout, destination);
router.reply(correlationId, message);
} catch (Exception e) {
log.warn("Error getting message with correlationId: {}", correlationId, e);
router.error(correlationId, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(correlationId,
() -> listenerSync.getMessage(correlationId, timeout, destination))));
}

@Override
public Mono<Message> getMessageBySelector(String selector) {
return router.wait(selector)
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessageBySelector(selector);
router.reply(selector, message);
} catch (Exception e) {
log.warn("Error getting message with selector: {}", selector, e);
router.error(selector, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector))));
}

@Override
public Mono<Message> getMessageBySelector(String selector, long timeout) {
return router.wait(selector, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessageBySelector(selector, timeout);
router.reply(selector, message);
} catch (Exception e) {
log.warn("Error getting message with selector: {}", selector, e);
router.error(selector, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector, timeout))));
}

@Override
public Mono<Message> getMessageBySelector(String selector, long timeout, Destination destination) {
return router.wait(selector, Duration.ofMillis(timeout))
.doOnSubscribe(s -> executorService.submit(() -> {
try {
Message message = listenerSync.getMessageBySelector(selector, timeout, destination);
router.reply(selector, message);
} catch (Exception e) {
log.warn("Error getting message with selector: {}", selector, e);
router.error(selector, e);
}
}));
.doOnSubscribe(s -> executorService.submit(() -> realGetMessageBySelector(selector,
() -> listenerSync.getMessageBySelector(selector, timeout, destination))));
}

private void realGetMessageBySelector(String selector, Supplier<Message> supplier) {
try {
Message message = supplier.get();
router.reply(selector, message);
} catch (Exception e) {
log.warn("Error getting message with selector: {}", selector, e);
router.error(selector, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Message getMessageBySelector(String selector, long timeout, Destination d
return getRandom().getMessageBySelector(selector, timeout, destination);
}

private MQMessageSelectorListenerSync getRandom() {
protected MQMessageSelectorListenerSync getRandom() {
int selectIndex = (int) (System.currentTimeMillis() % config.getConcurrency());
return adapterList.get(selectIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class MQListenerConfig {
@Builder.Default
private final int maxRetries = -1; //NOSONAR
@Builder.Default
private final MQQueueManagerSetter qmSetter = (ctx, queue) -> {
private final MQQueueManagerSetter qmSetter = (ctx, queueName) -> {
}; //NOSONAR

@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@
import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.MessageListener;
import jakarta.jms.Queue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -62,4 +62,17 @@ void shouldStartListener() {
// Assert
verify(consumer, times(1)).setMessageListener(listener);
}

@Test
void shouldDisconnectAndConnectOnException() throws JMSException {
// Arrange
when(connectionFactory.createContext()).thenReturn(context);
when(context.createQueue(anyString())).thenReturn(queue);
when(context.createConsumer(queue)).thenReturn(consumer);
contextListener.call();
// Act
contextListener.disconnect();
// Assert
verify(consumer, times(1)).close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package co.com.bancolombia.commons.jms.internal.listener;

import co.com.bancolombia.commons.jms.api.exceptions.MQHealthListener;
import co.com.bancolombia.commons.jms.internal.models.MQListenerConfig;
import co.com.bancolombia.commons.jms.utils.MQQueuesContainerImp;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import jakarta.jms.JMSRuntimeException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import jakarta.jms.TemporaryQueue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.concurrent.Executors;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class MQMultiConnectionListenerTest {
@Mock
private Session session;
@Mock
private MessageListener listener;
@Mock
private Connection connection;

@Mock
private ConnectionFactory connectionFactory;

@Mock
private MQHealthListener healthListener;

@Mock
private TemporaryQueue tmpQueue;

private MQMultiConnectionListener connectionListener;

@BeforeEach
void setup() throws JMSException {
when(connectionFactory.createConnection()).thenReturn(connection);
when(connection.createSession()).thenReturn(session);
when(session.createTemporaryQueue()).thenReturn(tmpQueue);
connectionListener = MQMultiConnectionListener.builder()
.listener(listener)
.connection(connection)
.connectionFactory(connectionFactory)
.container(new MQQueuesContainerImp())
.healthListener(healthListener)
.config(MQListenerConfig.builder().build())
.service(Executors.newCachedThreadPool())
.build();
}

@Test
void shouldStartListener() throws JMSException {
// Arrange
// Act
connectionListener.call();
// Assert
verify(connection, times(1)).start();
}

@Test
void shouldDisconnect() throws JMSException {
// Arrange
connectionListener.call();
// Act
connectionListener.disconnect();
// Assert
verify(connection, times(1)).close();
}


}
Loading

0 comments on commit d89bbe4

Please sign in to comment.