> META = Stream.of(new Object[][] {
+ {SolaceBinderHeaders.PARTITION_KEY, new SolaceBinderHeaderMeta<>(String.class, false, true, Scope.WIRE)},
{SolaceBinderHeaders.MESSAGE_VERSION, new SolaceBinderHeaderMeta<>(Integer.class, true, false, Scope.WIRE)},
{SolaceBinderHeaders.SERIALIZED_PAYLOAD, new SolaceBinderHeaderMeta<>(Boolean.class, false, false, Scope.WIRE)},
{SolaceBinderHeaders.SERIALIZED_HEADERS, new SolaceBinderHeaderMeta<>(String.class, false, false, Scope.WIRE)},
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java
index d0d07782..dcabbf44 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/messaging/SolaceBinderHeaders.java
@@ -24,6 +24,14 @@ public final class SolaceBinderHeaders {
*/
static final String PREFIX = SolaceHeaders.PREFIX + "scst_";
+ /**
+ * Acceptable Value Type: {@link String}
+ * Access: Write
+ *
+ * The partition key for PubSub+ partitioned queues.
+ */
+ public static final String PARTITION_KEY = PREFIX + "partitionKey";
+
/**
* Acceptable Value Type: {@link Integer}
* Access: Read
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMeterAccessor.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMeterAccessor.java
index b2f5ba8e..29a74159 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMeterAccessor.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/meter/SolaceMeterAccessor.java
@@ -2,6 +2,11 @@
import com.solacesystems.jcsmp.XMLMessage;
+/**
+ * Proxy class for the Solace binder to access meter components.
+ * Always use this instead of directly using meter components in Solace binder code.
+ * Allows for the Solace binder to still function correctly without micrometer on the classpath.
+ */
public class SolaceMeterAccessor {
private final SolaceMessageMeterBinder solaceMessageMeterBinder;
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceSessionHealthProperties.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceSessionHealthProperties.java
new file mode 100644
index 00000000..137c32ad
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/properties/SolaceSessionHealthProperties.java
@@ -0,0 +1,23 @@
+package com.solace.spring.cloud.stream.binder.properties;
+
+import jakarta.validation.constraints.Min;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+@Validated
+@ConfigurationProperties("solace.health-check.connection")
+public class SolaceSessionHealthProperties {
+ /**
+ * The number of session reconnect attempts until the health goes {@code DOWN}. This will happen regardless if
+ * the underlying session is actually still reconnecting. Setting this to {@code 0} will disable this feature.
+ * This feature operates independently of the PubSub+ session reconnect feature. Meaning that if PubSub+
+ * session reconnect is configured to retry less than the value given to this property, then this feature
+ * effectively does nothing.
+ */
+ @Min(0)
+ @Getter
+ @Setter
+ private long reconnectAttemptsUntilDown = 0;
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/FlowReceiverContainer.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/FlowReceiverContainer.java
index c5799860..1e87118a 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/FlowReceiverContainer.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/FlowReceiverContainer.java
@@ -1,9 +1,11 @@
package com.solace.spring.cloud.stream.binder.util;
+import com.solace.spring.cloud.stream.binder.health.handlers.SolaceFlowHealthEventHandler;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
+import com.solacesystems.jcsmp.FlowEventHandler;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
@@ -69,7 +71,7 @@ public class FlowReceiverContainer {
private static final Log logger = LogFactory.getLog(FlowReceiverContainer.class);
private final XMLMessageMapper xmlMessageMapper = new XMLMessageMapper();
- private final SolaceFlowEventHandler eventHandler;
+ private FlowEventHandler eventHandler;
public FlowReceiverContainer(JCSMPSession session,
String queueName,
@@ -108,6 +110,9 @@ public UUID bind() throws JCSMPException {
.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT)
.setStartState(!isPaused.get());
FlowReceiver flowReceiver = session.createFlow(null, flowProperties, endpointProperties, eventHandler);
+ if (eventHandler != null && eventHandler instanceof SolaceFlowHealthEventHandler) {
+ ((SolaceFlowHealthEventHandler) eventHandler).setHealthStatusUp();
+ }
FlowReceiverReference newFlowReceiverReference = new FlowReceiverReference(flowReceiver);
flowReceiverAtomicReference.set(newFlowReceiverReference);
xmlMessageMapper.resetIgnoredProperties(id.toString());
@@ -685,6 +690,10 @@ public XMLMessageMapper getXMLMessageMapper() {
return xmlMessageMapper;
}
+ public void setEventHandler(FlowEventHandler eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
static class FlowReceiverReference {
private final UUID id = UUID.randomUUID();
private final FlowReceiver flowReceiver;
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceFlowEventHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceFlowEventHandler.java
index 79e67a4f..86e4f664 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceFlowEventHandler.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceFlowEventHandler.java
@@ -8,24 +8,28 @@
public class SolaceFlowEventHandler implements FlowEventHandler {
- private static final Log logger = LogFactory.getLog(SolaceFlowEventHandler.class);
- private final XMLMessageMapper xmlMessageMapper;
- private final String flowReceiverContainerId;
+ private static final Log logger = LogFactory.getLog(SolaceFlowEventHandler.class);
+ private final XMLMessageMapper xmlMessageMapper;
+ private final String flowReceiverContainerId;
- public SolaceFlowEventHandler(XMLMessageMapper xmlMessageMapper, String flowReceiverContainerId) {
- this.xmlMessageMapper = xmlMessageMapper;
- this.flowReceiverContainerId = flowReceiverContainerId;
- }
+ public SolaceFlowEventHandler(XMLMessageMapper xmlMessageMapper, String flowReceiverContainerId) {
+ this.xmlMessageMapper = xmlMessageMapper;
+ this.flowReceiverContainerId = flowReceiverContainerId;
+ }
- @Override
- public void handleEvent(Object o, FlowEventArgs flowEventArgs) {
- if (flowEventArgs.getEvent() == FlowEvent.FLOW_RECONNECTED && xmlMessageMapper != null) {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Received flow event %s for flow receiver container %s. Will clear ignored properties.",
- flowEventArgs.getEvent().name(), flowReceiverContainerId));
- }
- xmlMessageMapper.resetIgnoredProperties(flowReceiverContainerId);
- }
- }
+ @Override
+ public void handleEvent(Object source, FlowEventArgs flowEventArgs) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("(%s): Received Solace Flow event [%s].", source, flowEventArgs));
+ }
-}
\ No newline at end of file
+ if (flowEventArgs.getEvent() == FlowEvent.FLOW_RECONNECTED && xmlMessageMapper != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(String.format("Received flow event %s for flow receiver container %s. Will clear ignored properties.",
+ flowEventArgs.getEvent().name(), flowReceiverContainerId));
+ }
+ xmlMessageMapper.resetIgnoredProperties(flowReceiverContainerId);
+ }
+ }
+
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java
index 0d5d107f..469aea4e 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/main/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapper.java
@@ -7,7 +7,6 @@
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaderMeta;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaderMeta;
-import com.solace.spring.cloud.stream.binder.messaging.SolaceHeaders;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solacesystems.common.util.ByteArray;
import com.solacesystems.jcsmp.BytesMessage;
@@ -29,7 +28,6 @@
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
-import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -50,10 +48,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
public class XMLMessageMapper {
private static final Log logger = LogFactory.getLog(XMLMessageMapper.class);
@@ -308,6 +305,12 @@ SDTMap map(MessageHeaders headers, Collection excludedHeaders, boolean c
convertNonSerializableHeadersToString);
}
+ if (headers.containsKey(SolaceBinderHeaders.PARTITION_KEY)) {
+ rethrowableCall(metadata::putString, XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
+ this., String>rethrowableCall(headers::get,
+ SolaceBinderHeaders.PARTITION_KEY, String.class));
+ }
+
if (!serializedHeaders.isEmpty()) {
rethrowableCall(metadata::putString, SolaceBinderHeaders.SERIALIZED_HEADERS,
rethrowableCall(stringSetWriter::writeValueAsString, serializedHeaders));
@@ -406,6 +409,10 @@ private R rethrowableCall(ThrowingFunction function, T var) {
return function.apply(var);
}
+ private R rethrowableCall(ThrowingBiFunction function, T var0, U var1) {
+ return function.apply(var0, var1);
+ }
+
private void rethrowableCall(ThrowingBiConsumer consumer, T var0, U var1) {
consumer.accept(var0, var1);
}
@@ -441,6 +448,23 @@ default R apply(T t) {
R applyThrows(T t) throws Exception;
}
+ @FunctionalInterface
+ private interface ThrowingBiFunction extends BiFunction {
+
+ @Override
+ default R apply(T t, U u) {
+ try {
+ return applyThrows(t, u);
+ } catch (Exception e) {
+ SolaceMessageConversionException wrappedException = new SolaceMessageConversionException(e);
+ logger.warn(wrappedException);
+ throw wrappedException;
+ }
+ }
+
+ R applyThrows(T t, U u) throws Exception;
+ }
+
@FunctionalInterface
private interface ThrowingBiConsumer extends BiConsumer {
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/SolaceBinderHealthAccessorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/SolaceBinderHealthAccessorTest.java
new file mode 100644
index 00000000..842299e5
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/SolaceBinderHealthAccessorTest.java
@@ -0,0 +1,103 @@
+package com.solace.spring.cloud.stream.binder.health;
+
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.SolaceBinderHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
+import com.solace.spring.cloud.stream.binder.health.indicators.SessionHealthIndicator;
+import com.solace.spring.cloud.stream.binder.util.FlowReceiverContainer;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.boot.actuate.health.NamedContributor;
+
+import java.util.UUID;
+import java.util.stream.StreamSupport;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@ExtendWith(MockitoExtension.class)
+public class SolaceBinderHealthAccessorTest {
+ @CartesianTest(name = "[{index}] bindingHealthContributorExists={0} flowHealthExists={1}")
+ public void testAddFlow(@CartesianTest.Values(booleans = {false, true}) boolean bindingHealthContributorExists,
+ @CartesianTest.Values(booleans = {false, true}) boolean flowHealthExists,
+ @Mock FlowReceiverContainer flowReceiverContainer) {
+ Mockito.when(flowReceiverContainer.getId()).thenReturn(UUID.randomUUID());
+ SolaceBinderHealthContributor healthContributor = new SolaceBinderHealthContributor(
+ new SessionHealthIndicator(),
+ new BindingsHealthContributor());
+ SolaceBinderHealthAccessor healthAccessor = new SolaceBinderHealthAccessor(healthContributor);
+
+ String bindingName = "binding-name";
+ int concurrencyIdx = 55;
+
+ if (bindingHealthContributorExists) {
+ FlowsHealthContributor flowsHealthContributor = new FlowsHealthContributor();
+ if (flowHealthExists) {
+ flowsHealthContributor.addFlowContributor("flow-" + concurrencyIdx, new FlowHealthIndicator());
+ }
+ healthContributor.getSolaceBindingsHealthContributor()
+ .addBindingContributor(bindingName, new BindingHealthContributor(flowsHealthContributor));
+ }
+
+ healthAccessor.addFlow(bindingName, concurrencyIdx, flowReceiverContainer);
+
+ assertThat(StreamSupport.stream(healthContributor.getSolaceBindingsHealthContributor().spliterator(), false))
+ .singleElement()
+ .satisfies(n -> assertThat(n.getName()).isEqualTo(bindingName))
+ .extracting(NamedContributor::getContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingHealthContributor.class))
+ .extracting(BindingHealthContributor::getFlowsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(FlowsHealthContributor.class))
+ .extracting(c -> StreamSupport.stream(c.spliterator(), false))
+ .asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class))
+ .singleElement()
+ .satisfies(n -> assertThat(n.getName()).isEqualTo("flow-" + concurrencyIdx))
+ .extracting(NamedContributor::getContributor)
+ .isInstanceOf(FlowHealthIndicator.class);
+ }
+
+ @CartesianTest(name = "[{index}] bindingHealthContributorExists={0} flowHealthExists={1}")
+ public void testRemoveFlow(
+ @CartesianTest.Values(booleans = {false, true}) boolean bindingHealthContributorExists,
+ @CartesianTest.Values(booleans = {false, true}) boolean flowHealthExists) {
+ SolaceBinderHealthContributor healthContributor = new SolaceBinderHealthContributor(
+ new SessionHealthIndicator(),
+ new BindingsHealthContributor());
+ SolaceBinderHealthAccessor healthAccessor = new SolaceBinderHealthAccessor(healthContributor);
+
+ String bindingName = "binding-name";
+ int concurrencyIdx = 55;
+
+ if (bindingHealthContributorExists) {
+ FlowsHealthContributor flowsHealthContributor = new FlowsHealthContributor();
+ if (flowHealthExists) {
+ flowsHealthContributor.addFlowContributor("flow-" + concurrencyIdx, new FlowHealthIndicator());
+ }
+ healthContributor.getSolaceBindingsHealthContributor()
+ .addBindingContributor(bindingName, new BindingHealthContributor(flowsHealthContributor));
+ }
+
+ healthAccessor.removeFlow(bindingName, concurrencyIdx);
+
+ if (bindingHealthContributorExists) {
+ assertThat(StreamSupport.stream(healthContributor.getSolaceBindingsHealthContributor().spliterator(), false))
+ .singleElement()
+ .satisfies(n -> assertThat(n.getName()).isEqualTo(bindingName))
+ .extracting(NamedContributor::getContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingHealthContributor.class))
+ .extracting(BindingHealthContributor::getFlowsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(FlowsHealthContributor.class))
+ .extracting(c -> StreamSupport.stream(c.spliterator(), false))
+ .asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class))
+ .isEmpty();
+ } else {
+ assertThat(StreamSupport.stream(healthContributor.getSolaceBindingsHealthContributor().spliterator(), false))
+ .isEmpty();
+ }
+ }
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/base/SolaceHealthIndicatorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/base/SolaceHealthIndicatorTest.java
new file mode 100644
index 00000000..57c12b46
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/base/SolaceHealthIndicatorTest.java
@@ -0,0 +1,62 @@
+package com.solace.spring.cloud.stream.binder.health.base;
+
+import com.solacesystems.jcsmp.FlowEvent;
+import com.solacesystems.jcsmp.FlowEventArgs;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SolaceHealthIndicatorTest {
+
+ private SolaceHealthIndicator solaceHealthIndicator;
+
+ @BeforeEach
+ void setUp() {
+ this.solaceHealthIndicator = new SolaceHealthIndicator();
+ }
+
+ @Test
+ void healthUp() {
+ this.solaceHealthIndicator.healthUp();
+ assertEquals(this.solaceHealthIndicator.health(), Health.up().build());
+ }
+
+ @Test
+ void healthReconnecting() {
+ this.solaceHealthIndicator.healthReconnecting(null);
+ assertEquals(this.solaceHealthIndicator.health(), Health.status("RECONNECTING").build());
+ }
+
+ @Test
+ void healthDown() {
+ this.solaceHealthIndicator.healthDown(null);
+ assertEquals(this.solaceHealthIndicator.health(), Health.down().build());
+ }
+
+ @Test
+ void addFlowEventDetails() {
+ // as SessionEventArgs constructor has package level access modifier, we have to test with FlowEventArgs only
+ FlowEventArgs flowEventArgs = new FlowEventArgs(FlowEvent.FLOW_DOWN, "String_infoStr",
+ new Exception("Test Exception"), 500);
+ Health health = this.solaceHealthIndicator.addEventDetails(Health.down(),flowEventArgs).build();
+
+ assertEquals(health.getStatus(), Status.DOWN);
+ assertEquals(health.getDetails().get("error"), "java.lang.Exception: Test Exception");
+ assertEquals(health.getDetails().get("responseCode"), 500);
+ }
+
+ @Test
+ void getHealth() {
+ this.solaceHealthIndicator.setHealth(Health.up().build());
+ assertEquals(this.solaceHealthIndicator.health(), Health.up().build());
+ }
+
+ @Test
+ void setHealth() {
+ this.solaceHealthIndicator.setHealth(Health.down().build());
+ assertEquals(this.solaceHealthIndicator.health(), Health.down().build());
+ }
+}
\ No newline at end of file
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/FlowHealthIndicatorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/FlowHealthIndicatorTest.java
new file mode 100644
index 00000000..4281ccba
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/FlowHealthIndicatorTest.java
@@ -0,0 +1,122 @@
+package com.solace.spring.cloud.stream.binder.health.indicators;
+
+import com.solacesystems.jcsmp.FlowEventArgs;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class FlowHealthIndicatorTest {
+
+ @Test
+ public void testInitialHealth() {
+ assertThat(new FlowHealthIndicator().health()).isNull();
+ }
+
+ @Test
+ public void testUp(SoftAssertions softly) {
+ FlowHealthIndicator healthIndicator = new FlowHealthIndicator();
+ healthIndicator.up();
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(Status.UP);
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+
+ @Test
+ public void testDown(SoftAssertions softly) {
+ FlowHealthIndicator healthIndicator = new FlowHealthIndicator();
+ healthIndicator.down(null);
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+
+ @Test
+ public void testReconnecting(SoftAssertions softly) {
+ FlowHealthIndicator healthIndicator = new FlowHealthIndicator();
+ healthIndicator.reconnecting(null);
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(new Status("RECONNECTING"));
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+
+ @CartesianTest(name = "[{index}] status={0} responseCode={1} info={2}")
+ public void testDetails(@CartesianTest.Values(strings = {"DOWN", "RECONNECTING", "UP"}) String status,
+ @CartesianTest.Values(booleans = {false, true}) boolean withException,
+ @CartesianTest.Values(ints = {-1, 0, 1}) int responseCode,
+ @CartesianTest.Values(strings = {"", "some-info"}) String info,
+ SoftAssertions softly) {
+ FlowHealthIndicator healthIndicator = new FlowHealthIndicator();
+ Exception healthException = withException ? new Exception("test") : null;
+ FlowEventArgs flowEventArgs = new FlowEventArgs(null, info, healthException, responseCode);
+ switch (status) {
+ case "DOWN":
+ healthIndicator.down(flowEventArgs);
+ break;
+ case "RECONNECTING":
+ healthIndicator.reconnecting(flowEventArgs);
+ break;
+ case "UP":
+ healthIndicator.up();
+ break;
+ default:
+ throw new IllegalArgumentException("Test error: No handling for status=" + status);
+ }
+ Health health = healthIndicator.health();
+
+ softly.assertThat(health.getStatus()).isEqualTo(new Status(status));
+
+ if (withException && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .isNotEmpty()
+ .extractingByKey("error")
+ .isEqualTo(healthException.getClass().getName() + ": " + healthException.getMessage());
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("error");
+ }
+
+ if (responseCode != 0 && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .extractingByKey("responseCode")
+ .isEqualTo(responseCode);
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("responseCode");
+ }
+
+ if (!info.isEmpty() && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .extractingByKey("info")
+ .isEqualTo(info);
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("info");
+ }
+ }
+
+ @ParameterizedTest(name = "[{index}] status={0}")
+ @ValueSource(strings = {"DOWN", "RECONNECTING", "UP"})
+ public void testWithoutDetails(String status, SoftAssertions softly) {
+ FlowHealthIndicator healthIndicator = new FlowHealthIndicator();
+ FlowEventArgs flowEventArgs = new FlowEventArgs(null, "some-info", new RuntimeException("test"), 1);
+ switch (status) {
+ case "DOWN":
+ healthIndicator.down(flowEventArgs);
+ break;
+ case "RECONNECTING":
+ healthIndicator.reconnecting(flowEventArgs);
+ break;
+ case "UP":
+ healthIndicator.up();
+ break;
+ default:
+ throw new IllegalArgumentException("Test error: No handling for status=" + status);
+ }
+ Health health = healthIndicator.getHealth(false);
+ softly.assertThat(health.getStatus()).isEqualTo(new Status(status));
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+}
\ No newline at end of file
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SessionHealthIndicatorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SessionHealthIndicatorTest.java
new file mode 100644
index 00000000..525d364e
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SessionHealthIndicatorTest.java
@@ -0,0 +1,171 @@
+package com.solace.spring.cloud.stream.binder.health.indicators;
+
+import com.solace.spring.cloud.stream.binder.properties.SolaceSessionHealthProperties;
+import com.solacesystems.jcsmp.SessionEventArgs;
+import com.solacesystems.jcsmp.impl.SessionEventArgsImpl;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SessionHealthIndicatorTest {
+ @Test
+ public void testInitialHealth() {
+ assertNull(new SessionHealthIndicator(new SolaceSessionHealthProperties()).health());
+ }
+
+ @Test
+ void testUp() {
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(new SolaceSessionHealthProperties());
+ healthIndicator.up();
+ assertEquals(healthIndicator.health(), Health.up().build());
+ assertTrue(healthIndicator.getHealth(true).getDetails().isEmpty());
+ }
+
+ @Test
+ void testReconnecting() {
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(new SolaceSessionHealthProperties());
+ healthIndicator.reconnecting(null);
+ assertEquals(healthIndicator.health().getStatus(), Health.status("RECONNECTING").build().getStatus());
+ assertTrue(healthIndicator.getHealth(true).getDetails().isEmpty());
+ }
+
+ @ParameterizedTest(name = "[{index}] reconnectAttemptsUntilDown={0}")
+ @ValueSource(longs = {1, 10})
+ void testReconnectingDownThresholdReached(long reconnectAttemptsUntilDown, SoftAssertions softly) {
+ SolaceSessionHealthProperties properties = new SolaceSessionHealthProperties();
+ properties.setReconnectAttemptsUntilDown(reconnectAttemptsUntilDown);
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(properties);
+ for (int i = 0; i < reconnectAttemptsUntilDown; i++) {
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health()).satisfies(
+ h -> assertThat(h.getStatus()).isEqualTo(new Status("RECONNECTING")),
+ h -> assertThat(h.getDetails()).isEmpty());
+ }
+
+ for (int i = 0; i < 3; i++) {
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN);
+ }
+ }
+
+ @ParameterizedTest(name = "[{index}] resetStatus={0}")
+ @ValueSource(strings = {"DOWN", "UP"})
+ public void testReconnectingDownThresholdReset(String resetStatus, SoftAssertions softly) {
+ SolaceSessionHealthProperties properties = new SolaceSessionHealthProperties();
+ properties.setReconnectAttemptsUntilDown(1L);
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(properties);
+
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health().getStatus()).isEqualTo(new Status("RECONNECTING"));
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN);
+
+ switch (resetStatus) {
+ case "DOWN":
+ healthIndicator.down(null);
+ break;
+ case "UP":
+ healthIndicator.up();
+ break;
+ default:
+ throw new IllegalArgumentException("Test error: No handling for status=" + resetStatus);
+ }
+
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health().getStatus()).isEqualTo(new Status("RECONNECTING"));
+ healthIndicator.reconnecting(null);
+ softly.assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN);
+ }
+
+ @Test
+ void testDown() {
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(new SolaceSessionHealthProperties());
+ healthIndicator.down(null);
+ assertEquals(healthIndicator.health().getStatus(), Status.DOWN);
+ assertTrue(healthIndicator.getHealth(true).getDetails().isEmpty());
+ }
+
+ @CartesianTest(name = "[{index}] status={0} withException={1} responseCode={2} info={3}")
+ public void testDetails(@CartesianTest.Values(strings = {"DOWN", "RECONNECTING", "UP"}) String status,
+ @CartesianTest.Values(booleans = {false, true}) boolean withException,
+ @CartesianTest.Values(ints = {-1, 0, 1}) int responseCode,
+ @CartesianTest.Values(strings = {"", "some-info"}) String info,
+ SoftAssertions softly) {
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(new SolaceSessionHealthProperties());
+ Exception healthException = withException ? new Exception("test") : null;
+ SessionEventArgs sessionEventArgs = new SessionEventArgsImpl(null, info, healthException, responseCode);
+ switch (status) {
+ case "DOWN":
+ healthIndicator.down(sessionEventArgs);
+ break;
+ case "RECONNECTING":
+ healthIndicator.reconnecting(sessionEventArgs);
+ break;
+ case "UP":
+ healthIndicator.up();
+ break;
+ default:
+ throw new IllegalArgumentException("Test error: No handling for status=" + status);
+ }
+ Health health = healthIndicator.health();
+
+ softly.assertThat(health.getStatus()).isEqualTo(new Status(status));
+
+ if (withException && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .isNotEmpty()
+ .extractingByKey("error")
+ .isEqualTo(healthException.getClass().getName() + ": " + healthException.getMessage());
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("error");
+ }
+
+ if (responseCode != 0 && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .extractingByKey("responseCode")
+ .isEqualTo(responseCode);
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("responseCode");
+ }
+
+ if (!info.isEmpty() && !status.equals("UP")) {
+ softly.assertThat(health.getDetails())
+ .extractingByKey("info")
+ .isEqualTo(info);
+ } else {
+ softly.assertThat(health.getDetails()).doesNotContainKey("info");
+ }
+ }
+
+ @ParameterizedTest(name = "[{index}] status={0}")
+ @ValueSource(strings = {"DOWN", "RECONNECTING", "UP"})
+ public void testWithoutDetails(String status, SoftAssertions softly) {
+ SessionHealthIndicator healthIndicator = new SessionHealthIndicator(new SolaceSessionHealthProperties());
+ SessionEventArgs sessionEventArgs = new SessionEventArgsImpl(null, "some-info", new RuntimeException("test"), 1);
+ switch (status) {
+ case "DOWN":
+ healthIndicator.down(sessionEventArgs);
+ break;
+ case "RECONNECTING":
+ healthIndicator.reconnecting(sessionEventArgs);
+ break;
+ case "UP":
+ healthIndicator.up();
+ break;
+ default:
+ throw new IllegalArgumentException("Test error: No handling for status=" + status);
+ }
+ Health health = healthIndicator.getHealth(false);
+ softly.assertThat(health.getStatus()).isEqualTo(new Status(status));
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+}
\ No newline at end of file
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java
index d0fc3ec1..ffe18e2e 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/JmsCompatibilityIT.java
@@ -169,6 +169,9 @@ public void testBinderHeaders(SoftAssertions softly) throws Exception {
for (Map.Entry> headerMeta : SolaceBinderHeaderMeta.META.entrySet()) {
if (!HeaderMeta.Scope.WIRE.equals(headerMeta.getValue().getScope())) continue;
String headerName = headerMeta.getKey();
+ if (headerName.equals(SolaceBinderHeaders.PARTITION_KEY)) {
+ headerName = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY;
+ }
// Everything should be receivable as a String in JMS
softly.assertThat(msg.getStringProperty(headerName))
.withFailMessage("Expecting JMS property %s to not be null", headerName)
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java
index f4d55838..1a0044c5 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder-core/src/test/java/com/solace/spring/cloud/stream/binder/util/XMLMessageMapperTest.java
@@ -259,6 +259,12 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce
case SolaceHeaders.USER_DATA:
value = RandomStringUtils.randomAlphanumeric(10).getBytes();
break;
+ case SolaceBinderHeaders.PARTITION_KEY:
+ value = RandomStringUtils.randomAlphanumeric(10);
+ // This value is overwritten by binder-defined partition key header
+ messageBuilder.setHeader(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
+ RandomStringUtils.randomAlphanumeric(10));
+ break;
default:
value = null;
fail(String.format("no test for header %s", header.getKey()));
@@ -315,6 +321,10 @@ public void testMapSpringMessageToXMLMessage_WriteSolaceProperties() throws Exce
case SolaceHeaders.USER_DATA:
assertEquals(expectedValue, xmlMessage.getUserData());
break;
+ case SolaceBinderHeaders.PARTITION_KEY:
+ assertEquals(expectedValue, xmlMessage.getProperties()
+ .getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
+ break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
@@ -366,7 +376,7 @@ public void testMapSpringMessageToXMLMessage_NonWriteableSolaceProperties() thro
assertFalse(xmlMessage.getRedelivered());
break;
case SolaceBinderHeaders.MESSAGE_VERSION:
- assertEquals(new Integer(XMLMessageMapper.MESSAGE_VERSION),
+ assertEquals(Integer.valueOf(XMLMessageMapper.MESSAGE_VERSION),
xmlMessage.getProperties().getInteger(header.getKey()));
break;
case SolaceBinderHeaders.SERIALIZED_HEADERS:
@@ -485,10 +495,21 @@ public void testFailMapSpringMessageToXMLMessage_InvalidHeaderType() {
xmlMessageMapper.map(testSpringMessage, null, false);
fail(String.format("Expected message mapping to fail for header %s", header.getKey()));
} catch (SolaceMessageConversionException e) {
- assertEquals(e.getMessage(), String.format(
- "Message %s has an invalid value type for header %s. Expected %s but received %s.",
- testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(),
- Object.class));
+ if (header.getValue() instanceof SolaceHeaderMeta>) {
+ assertEquals(e.getMessage(), String.format(
+ "Message %s has an invalid value type for header %s. Expected %s but received %s.",
+ testSpringMessage.getHeaders().getId(), header.getKey(), header.getValue().getType(),
+ Object.class));
+ } else {
+ switch (header.getKey()) {
+ case SolaceBinderHeaders.PARTITION_KEY ->
+ Assertions.assertThat(e).rootCause()
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContainingAll(header.getKey(), header.getValue().getType().toString());
+ default ->
+ fail(String.format("no test for header %s", header.getKey()));
+ }
+ }
}
}
}
@@ -585,6 +606,7 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo
case SolaceHeaders.CORRELATION_ID:
case SolaceHeaders.HTTP_CONTENT_ENCODING:
case SolaceHeaders.SENDER_ID:
+ case SolaceBinderHeaders.PARTITION_KEY:
value = RandomStringUtils.randomAlphanumeric(10);
break;
case SolaceHeaders.DMQ_ELIGIBLE:
@@ -668,16 +690,22 @@ public void testMapProducerSpringMessageToXMLMessage_WithExcludedHeader_ShouldNo
case SolaceHeaders.USER_DATA:
assertEquals(expectedValue, xmlMessage.getUserData());
break;
+ case SolaceBinderHeaders.PARTITION_KEY:
+ assertEquals(expectedValue, xmlMessage.getProperties()
+ .getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
+ break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
}
- for (Map.Entry> binderHeaderMetaEntry : SolaceBinderHeaderMeta.META.entrySet()) {
- if (SolaceHeaderMeta.Scope.WIRE.equals(binderHeaderMetaEntry.getValue().getScope())) {
- assertNotNull(xmlMessage.getProperties().get(binderHeaderMetaEntry.getKey()));
- }
- }
+ Assertions.assertThat(SolaceBinderHeaderMeta.META
+ .entrySet()
+ .stream()
+ .filter(e -> SolaceHeaderMeta.Scope.WIRE.equals(e.getValue().getScope()))
+ .filter(e -> !e.getValue().isWritable()) // already tested earlier in this test
+ .map(Map.Entry::getKey))
+ .allSatisfy(h -> Assertions.assertThat(xmlMessage.getProperties().get(h)).isNotNull());
Mockito.verify(xmlMessageMapper).map(testSpringMessage, excludedHeaders, false);
}
@@ -1062,6 +1090,9 @@ public void testMapXMLMessageToSpringMessage_NonReadableSolaceProperties(boolean
case SolaceBinderHeaders.TARGET_DESTINATION_TYPE:
metadata.putString(header.getKey(), "topic");
break;
+ case SolaceBinderHeaders.PARTITION_KEY:
+ metadata.putString(header.getKey(), "partitionKey");
+ break;
default:
fail(String.format("no test for header %s", header.getKey()));
}
@@ -1303,6 +1334,17 @@ public void testMapXMLMessageToSpringMessage_WithListPayload(boolean batchMode)
}
}
+ @Test
+ public void testMapMessageHeadersToSDTMap_JMSXGroupID() throws Exception {
+ String jmsxGroupID = "partition-key-value";
+ SDTMap sdtMap = xmlMessageMapper.map(
+ new MessageHeaders(Collections.singletonMap(
+ XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, jmsxGroupID)),
+ Collections.emptyList(), false);
+ assertThat(sdtMap.keySet(), hasItem(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
+ assertEquals(jmsxGroupID, sdtMap.getString(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY));
+ }
+
@Test
public void testMapMessageHeadersToSDTMap_Serializable() throws Exception {
String key = "a";
@@ -1559,15 +1601,22 @@ private void validateXMLProperties(XMLMessage xmlMessage, Message> springMessa
.filter(h -> h.getValue().isReadable())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- for (Map.Entry header : expectedHeaders.entrySet()) {
- if (readWriteableSolaceHeaders.containsKey(header.getKey())) {
- Object value = readWriteableSolaceHeaders.get(header.getKey()).getReadAction().apply(xmlMessage);
- assertEquals(header.getValue(), value);
- } else if (!serializedHeaders.contains(header.getKey())) {
- assertThat(metadata.keySet(), hasItem(header.getKey()));
- assertEquals(header.getValue(), metadata.get(header.getKey()));
- }
- }
+ Assertions.assertThat(expectedHeaders)
+ .allSatisfy((headerKey, headerValue) -> {
+ if (readWriteableSolaceHeaders.containsKey(headerKey)) {
+ Object value = readWriteableSolaceHeaders.get(headerKey).getReadAction().apply(xmlMessage);
+ assertEquals(headerValue, value);
+ } else if (!serializedHeaders.contains(headerKey)) {
+ switch (headerKey) {
+ case SolaceBinderHeaders.PARTITION_KEY ->
+ headerKey = XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY;
+ case XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY ->
+ headerValue = expectedHeaders.getOrDefault(SolaceBinderHeaders.PARTITION_KEY, headerValue);
+ }
+ assertThat(metadata.keySet(), hasItem(headerKey));
+ assertEquals(headerValue, ((ThrowingFunction) metadata::get).apply(headerKey));
+ }
+ });
}
private void validateSpringHeaders(MessageHeaders messageHeaders, XMLMessage xmlMessage)
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml
index 40c8bd5d..42415ddc 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/pom.xml
@@ -5,12 +5,12 @@
com.solace.spring.cloud
solace-spring-cloud-parent
- 3.0.1-SNAPSHOT
+ 3.1.0-SNAPSHOT
../../solace-spring-cloud-parent/pom.xml
spring-cloud-stream-binder-solace
- 4.0.1-SNAPSHOT
+ 4.1.0-SNAPSHOT
jar
Solace Spring Cloud Stream Binder
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java
index 0b5efa18..54693d37 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/SolaceMessageChannelBinder.java
@@ -1,5 +1,6 @@
package com.solace.spring.cloud.stream.binder;
+import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
import com.solace.spring.cloud.stream.binder.inbound.BatchCollector;
import com.solace.spring.cloud.stream.binder.inbound.JCSMPInboundChannelAdapter;
import com.solace.spring.cloud.stream.binder.inbound.JCSMPMessageSource;
@@ -33,6 +34,7 @@
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.ErrorMessageStrategy;
+import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -56,10 +58,9 @@ public class SolaceMessageChannelBinder
private final String errorHandlerProducerKey = UUID.randomUUID().toString();
private SolaceMeterAccessor solaceMeterAccessor;
private SolaceExtendedBindingProperties extendedBindingProperties = new SolaceExtendedBindingProperties();
-
private final RetryableTaskService taskService = new RetryableTaskService();
-
private static final SolaceMessageHeaderErrorMessageStrategy errorMessageStrategy = new SolaceMessageHeaderErrorMessageStrategy();
+ @Nullable private SolaceBinderHealthAccessor solaceBinderHealthAccessor;
public SolaceMessageChannelBinder(JCSMPSession jcsmpSession, SolaceQueueProvisioner solaceQueueProvisioner) {
this(jcsmpSession, null, solaceQueueProvisioner);
@@ -120,6 +121,10 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
getConsumerEndpointProperties(properties),
solaceMeterAccessor);
+ if (solaceBinderHealthAccessor != null) {
+ adapter.setSolaceBinderHealthAccessor(solaceBinderHealthAccessor);
+ }
+
adapter.setRemoteStopFlag(consumersRemoteStopFlag);
adapter.setPostStart(getConsumerPostStart(solaceDestination, properties));
@@ -163,6 +168,10 @@ protected PolledConsumerResources createPolledConsumerResources(String name, Str
endpointProperties,
solaceMeterAccessor);
+ if (solaceBinderHealthAccessor != null) {
+ messageSource.setSolaceBinderHealthAccessor(solaceBinderHealthAccessor);
+ }
+
messageSource.setRemoteStopFlag(consumersRemoteStopFlag::get);
messageSource.setPostStart(getConsumerPostStart(solaceDestination, consumerProperties));
@@ -210,17 +219,6 @@ protected ErrorMessageStrategy getErrorMessageStrategy() {
return errorMessageStrategy;
}
- @Override
- protected String errorsBaseName(ConsumerDestination destination, String group,
- ExtendedConsumerProperties consumerProperties) {
- SolaceConsumerDestination solaceDestination = (SolaceConsumerDestination) destination;
- StringBuilder errorsBaseName = new StringBuilder(solaceDestination.getBindingDestinationName()).append('.');
- if (solaceDestination.isTemporary()) {
- errorsBaseName.append("anon").append('.');
- }
- return errorsBaseName.append(solaceDestination.getPhysicalGroupName()).append(".errors").toString();
- }
-
@Override
public SolaceConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
@@ -249,6 +247,10 @@ public void setSolaceMeterAccessor(SolaceMeterAccessor solaceMeterAccessor) {
this.solaceMeterAccessor = solaceMeterAccessor;
}
+ public void setSolaceBinderHealthAccessor(@Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor) {
+ this.solaceBinderHealthAccessor = solaceBinderHealthAccessor;
+ }
+
/**
WORKAROUND (SOL-4272) ----------------------------------------------------------
Temporary endpoints are only provisioned when the consumer is created.
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderHealthIndicatorConfiguration.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderHealthIndicatorConfiguration.java
deleted file mode 100644
index 9046a73d..00000000
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderHealthIndicatorConfiguration.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.solace.spring.cloud.stream.binder.config;
-
-import com.solace.spring.cloud.stream.binder.util.SolaceBinderHealthIndicator;
-import com.solace.spring.cloud.stream.binder.util.SolaceSessionEventHandler;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
-@ConditionalOnEnabledHealthIndicator("binders")
-public class SolaceBinderHealthIndicatorConfiguration {
-
- private static final Log logger = LogFactory.getLog(SolaceBinderHealthIndicatorConfiguration.class);
-
- @Bean
- public SolaceBinderHealthIndicator solaceBinderHealthIndicator() {
- if (logger.isDebugEnabled()) {
- logger.debug("Creating Solace Binder Health Indicator");
- }
- return new SolaceBinderHealthIndicator();
- }
-
- @Bean
- public SolaceSessionEventHandler solaceSessionEventHandler(SolaceBinderHealthIndicator solaceBinderHealthIndicator) {
- if (logger.isDebugEnabled()) {
- logger.debug("Creating Solace Session Event Handler");
- }
- return new SolaceSessionEventHandler(solaceBinderHealthIndicator);
- }
-
-}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceHealthIndicatorsConfiguration.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceHealthIndicatorsConfiguration.java
new file mode 100644
index 00000000..948ed4f3
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceHealthIndicatorsConfiguration.java
@@ -0,0 +1,50 @@
+package com.solace.spring.cloud.stream.binder.config;
+
+import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.SolaceBinderHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.handlers.SolaceSessionEventHandler;
+import com.solace.spring.cloud.stream.binder.health.indicators.SessionHealthIndicator;
+import com.solace.spring.cloud.stream.binder.properties.SolaceSessionHealthProperties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
+@ConditionalOnEnabledHealthIndicator("binders")
+@EnableConfigurationProperties({SolaceSessionHealthProperties.class})
+public class SolaceHealthIndicatorsConfiguration {
+ private static final Log logger = LogFactory.getLog(SolaceHealthIndicatorsConfiguration.class);
+
+ @Bean
+ public SolaceBinderHealthAccessor solaceBinderHealthAccessor(
+ SolaceBinderHealthContributor solaceBinderHealthContributor) {
+ return new SolaceBinderHealthAccessor(solaceBinderHealthContributor);
+ }
+
+ @Bean
+ public SolaceBinderHealthContributor solaceBinderHealthContributor(
+ SolaceSessionHealthProperties solaceSessionHealthProperties) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating Solace Connection Health Indicators Hierarchy");
+ }
+ return new SolaceBinderHealthContributor(
+ new SessionHealthIndicator(solaceSessionHealthProperties),
+ new BindingsHealthContributor()
+ );
+ }
+
+ @Bean
+ public SolaceSessionEventHandler solaceSessionEventHandler(SolaceBinderHealthContributor healthContributor) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating Solace Session Event Handler for monitoring Health");
+ }
+ return new SolaceSessionEventHandler(healthContributor.getSolaceSessionHealthIndicator());
+ }
+
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java
index c46c9799..08ac5cba 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/config/SolaceMessageChannelBinderConfiguration.java
@@ -1,16 +1,18 @@
package com.solace.spring.cloud.stream.binder.config;
import com.solace.spring.cloud.stream.binder.SolaceMessageChannelBinder;
+import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
+import com.solace.spring.cloud.stream.binder.health.handlers.SolaceSessionEventHandler;
import com.solace.spring.cloud.stream.binder.meter.SolaceMeterAccessor;
import com.solace.spring.cloud.stream.binder.properties.SolaceExtendedBindingProperties;
import com.solace.spring.cloud.stream.binder.provisioning.SolaceQueueProvisioner;
-import com.solace.spring.cloud.stream.binder.util.SolaceSessionEventHandler;
import com.solacesystems.jcsmp.Context;
import com.solacesystems.jcsmp.ContextProperties;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
+import jakarta.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -19,11 +21,9 @@
import org.springframework.context.annotation.Import;
import org.springframework.lang.Nullable;
-import jakarta.annotation.PostConstruct;
-
@Configuration
-@Import(SolaceBinderHealthIndicatorConfiguration.class)
-@EnableConfigurationProperties({ SolaceExtendedBindingProperties.class })
+@Import(SolaceHealthIndicatorsConfiguration.class)
+@EnableConfigurationProperties({SolaceExtendedBindingProperties.class})
public class SolaceMessageChannelBinderConfiguration {
private final JCSMPProperties jcsmpProperties;
private final SolaceExtendedBindingProperties solaceExtendedBindingProperties;
@@ -35,11 +35,11 @@ public class SolaceMessageChannelBinderConfiguration {
private static final Log logger = LogFactory.getLog(SolaceMessageChannelBinderConfiguration.class);
public SolaceMessageChannelBinderConfiguration(JCSMPProperties jcsmpProperties,
- SolaceExtendedBindingProperties solaceExtendedBindingProperties,
- @Nullable SolaceSessionEventHandler solaceSessionEventHandler) {
+ SolaceExtendedBindingProperties solaceExtendedBindingProperties,
+ @Nullable SolaceSessionEventHandler eventHandler) {
this.jcsmpProperties = jcsmpProperties;
this.solaceExtendedBindingProperties = solaceExtendedBindingProperties;
- this.solaceSessionEventHandler = solaceSessionEventHandler;
+ this.solaceSessionEventHandler = eventHandler;
}
@PostConstruct
@@ -59,7 +59,11 @@ private void initSession() throws JCSMPException {
logger.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName()));
jcsmpSession.connect();
if (solaceSessionEventHandler != null) {
- solaceSessionEventHandler.connected();
+ // after setting the session health indicator status to UP,
+ // we should not be worried about setting its status to DOWN,
+ // as the call closing JCSMP session also delete the context
+ // and terminates the application
+ solaceSessionEventHandler.setSessionHealthUp();
}
} catch (Exception e) {
if (context != null) {
@@ -71,10 +75,14 @@ private void initSession() throws JCSMPException {
@Bean
SolaceMessageChannelBinder solaceMessageChannelBinder(SolaceQueueProvisioner solaceQueueProvisioner,
- @Nullable SolaceMeterAccessor solaceMeterAccessor) {
+ @Nullable SolaceBinderHealthAccessor solaceBinderHealthAccessor,
+ @Nullable SolaceMeterAccessor solaceMeterAccessor) {
SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, context, solaceQueueProvisioner);
binder.setExtendedBindingProperties(solaceExtendedBindingProperties);
binder.setSolaceMeterAccessor(solaceMeterAccessor);
+ if (solaceBinderHealthAccessor != null) {
+ binder.setSolaceBinderHealthAccessor(solaceBinderHealthAccessor);
+ }
return binder;
}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicator.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicator.java
deleted file mode 100644
index c354663e..00000000
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicator.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.solace.spring.cloud.stream.binder.util;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.boot.actuate.health.Status;
-
-public class SolaceBinderHealthIndicator implements HealthIndicator {
-
- private static final String STATUS_RECONNECTING = "RECONNECTING";
- private static final String INFO = "info";
- private static final String RESPONSE_CODE = "responseCode";
-
- private volatile Health healthStatus;
-
- private static final Log logger = LogFactory.getLog(SolaceBinderHealthIndicator.class);
-
- public void up() {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Solace binder status is %s", Status.UP));
- }
- healthStatus = Health.up().build();
- }
-
- public void reconnecting() {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Solace binder status is %s", STATUS_RECONNECTING));
- }
- healthStatus = Health.status(STATUS_RECONNECTING).build();
- }
-
- public void down(Exception exception, int responseCode, String info) {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Solace binder status is %s", Status.DOWN));
- }
- Health.Builder builder = Health.down();
- if (exception != null) builder.withException(exception);
- if (responseCode != 0) builder.withDetail(RESPONSE_CODE, responseCode);
- if (info != null && !info.isEmpty()) builder.withDetail(INFO, info);
- healthStatus = builder.build();
- }
-
- @Override
- public Health health() {
- return healthStatus;
- }
-}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandler.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandler.java
deleted file mode 100644
index f9c08019..00000000
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/main/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.solace.spring.cloud.stream.binder.util;
-
-import com.solacesystems.jcsmp.SessionEvent;
-import com.solacesystems.jcsmp.SessionEventArgs;
-import com.solacesystems.jcsmp.SessionEventHandler;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.springframework.stereotype.Component;
-
-@Component
-public class SolaceSessionEventHandler implements SessionEventHandler {
-
- private final SolaceBinderHealthIndicator solaceBinderHealthIndicator;
- private static final Log logger = LogFactory.getLog(SolaceSessionEventHandler.class);
-
- public SolaceSessionEventHandler(SolaceBinderHealthIndicator solaceBinderHealthIndicator) {
- this.solaceBinderHealthIndicator = solaceBinderHealthIndicator;
- }
-
- @Override
- public void handleEvent(SessionEventArgs sessionEvent) {
- if (logger.isDebugEnabled()) {
- logger.debug(String.format("Received Solace session event %s.", sessionEvent));
- }
- if (sessionEvent.getEvent() == SessionEvent.DOWN_ERROR) {
- solaceBinderHealthIndicator.down(sessionEvent.getException(), sessionEvent.getResponseCode(), sessionEvent.getInfo());
- } else if (sessionEvent.getEvent() == SessionEvent.RECONNECTING) {
- solaceBinderHealthIndicator.reconnecting();
- } else if (sessionEvent.getEvent() == SessionEvent.RECONNECTED) {
- solaceBinderHealthIndicator.up();
- }
- }
-
- public void connected() {
- solaceBinderHealthIndicator.up();
- }
-}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderCustomErrorMessageHandlerIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderCustomErrorMessageHandlerIT.java
index 5d5c6331..125cd0c4 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderCustomErrorMessageHandlerIT.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderCustomErrorMessageHandlerIT.java
@@ -92,14 +92,15 @@ public void testConsumerOverrideErrorMessageHandler(
String destination0 = RandomStringUtils.randomAlphanumeric(10);
String group0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = destination0 + context.getDestinationNameDelimiter() + group0 +
- context.getDestinationNameDelimiter() + "errors";
+ String inputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String inputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + inputBindingName + ".errors";
String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME);
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(inputBindingName);
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(maxAttempts);
@@ -112,7 +113,7 @@ public void testConsumerOverrideErrorMessageHandler(
// Need to create channel before so that the override actually works
final CountDownLatch errorLatch = new CountDownLatch(1);
- context.createChannel(errorDestination0, DirectChannel.class, msg -> {
+ context.createChannel(inputErrorChannelName, DirectChannel.class, msg -> {
logger.info("Got error message: {}", StaticMessageHeaderAccessor.getId(msg));
softly.assertThat(msg).satisfies(isValidConsumerErrorMessage(consumerProperties,
channelType.isAssignableFrom(PollableSource.class), true, messages));
@@ -169,15 +170,15 @@ public void testConsumerOverrideErrorMessageHandlerThrowException(
String destination0 = RandomStringUtils.randomAlphanumeric(10);
String group0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = destination0 + context.getDestinationNameDelimiter() + group0 +
- context.getDestinationNameDelimiter() + "errors";
+ String inputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String inputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + inputBindingName + ".errors";
String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME);
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
// Need to create channel before so that the override actually works
- context.createChannel(errorDestination0, DirectChannel.class, msg -> {
+ context.createChannel(inputErrorChannelName, DirectChannel.class, msg -> {
logger.info("Got error message: {}", StaticMessageHeaderAccessor.getId(msg));
throw new ConsumerInfrastructureUtil.ExpectedMessageHandlerException("test");
});
@@ -186,6 +187,7 @@ public void testConsumerOverrideErrorMessageHandlerThrowException(
destination0, moduleOutputChannel, context.createProducerProperties(testInfo));
ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(inputBindingName);
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(maxAttempts);
consumerProperties.getExtension().setAutoBindErrorQueue(true);
@@ -244,15 +246,15 @@ public void testConsumerOverrideErrorMessageHandlerThrowRequeueException(
String destination0 = RandomStringUtils.randomAlphanumeric(10);
String group0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = destination0 + context.getDestinationNameDelimiter() + group0 +
- context.getDestinationNameDelimiter() + "errors";
+ String inputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String inputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + inputBindingName + ".errors";
String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME);
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
// Need to create channel before so that the override actually works
- context.createChannel(errorDestination0, DirectChannel.class, msg -> {
+ context.createChannel(inputErrorChannelName, DirectChannel.class, msg -> {
logger.info("Got error message: {}", StaticMessageHeaderAccessor.getId(msg));
throw new RequeueCurrentMessageException("test");
});
@@ -261,6 +263,7 @@ public void testConsumerOverrideErrorMessageHandlerThrowRequeueException(
destination0, moduleOutputChannel, context.createProducerProperties(testInfo));
ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(inputBindingName);
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(maxAttempts);
consumerProperties.getExtension().setAutoBindErrorQueue(true);
@@ -321,8 +324,8 @@ public void testConsumerOverrideErrorMessageHandlerThrowExceptionAndStale(
String destination0 = RandomStringUtils.randomAlphanumeric(10);
String group0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = destination0 + context.getDestinationNameDelimiter() + group0 +
- context.getDestinationNameDelimiter() + "errors";
+ String inputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String inputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + inputBindingName + ".errors";
String vpnName = (String) jcsmpSession.getProperty(JCSMPProperties.VPN_NAME);
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
@@ -331,7 +334,7 @@ public void testConsumerOverrideErrorMessageHandlerThrowExceptionAndStale(
// Need to create channel before so that the override actually works
CountDownLatch continueLatch = new CountDownLatch(1);
CountDownLatch errorStartLatch = new CountDownLatch(1);
- context.createChannel(errorDestination0, DirectChannel.class, msg -> {
+ context.createChannel(inputErrorChannelName, DirectChannel.class, msg -> {
logger.info("Got error message: {}", StaticMessageHeaderAccessor.getId(msg));
errorStartLatch.countDown();
try {
@@ -347,6 +350,7 @@ public void testConsumerOverrideErrorMessageHandlerThrowExceptionAndStale(
destination0, moduleOutputChannel, context.createProducerProperties(testInfo));
ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(inputBindingName);
consumerProperties.setBatchMode(batchMode);
consumerProperties.setMaxAttempts(1);
consumerProperties.getExtension().setAutoBindErrorQueue(true);
@@ -436,19 +440,19 @@ public void testPublisherErrorMessageHandler(SpringCloudStreamContext context, S
SolaceTestBinder binder = context.getBinder();
String destination0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = binder.getBinder().getBinderIdentity() + context.getDestinationNameDelimiter() +
- destination0 + context.getDestinationNameDelimiter() + "errors";
+ String outputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String outputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + outputBindingName + ".errors";
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo);
producerProperties.setErrorChannelEnabled(true);
- producerProperties.populateBindingName(destination0);
+ producerProperties.populateBindingName(outputBindingName);
Binding producerBinding = binder.bindProducer(destination0, moduleOutputChannel,
producerProperties);
final CountDownLatch errorLatch = new CountDownLatch(1);
- context.createChannel(errorDestination0, PublishSubscribeChannel.class, msg -> {
+ context.createChannel(outputErrorChannelName, PublishSubscribeChannel.class, msg -> {
logger.info("Got error message: " + msg);
softly.assertThat(msg).satisfies(isValidProducerErrorMessage(false));
errorLatch.countDown();
@@ -474,19 +478,19 @@ public void testPublisherAsyncErrorMessageHandler(JCSMPSession jcsmpSession,
SolaceTestBinder binder = context.getBinder();
String destination0 = RandomStringUtils.randomAlphanumeric(10);
- String errorDestination0 = binder.getBinder().getBinderIdentity() + context.getDestinationNameDelimiter() +
- destination0 + context.getDestinationNameDelimiter() + "errors";
+ String outputBindingName = RandomStringUtils.randomAlphanumeric(10);
+ String outputErrorChannelName = binder.getBinder().getBinderIdentity() + "." + outputBindingName + ".errors";
DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
ExtendedProducerProperties producerProperties = context.createProducerProperties(testInfo);
producerProperties.setErrorChannelEnabled(true);
- producerProperties.populateBindingName(destination0);
+ producerProperties.populateBindingName(outputBindingName);
Binding producerBinding = binder.bindProducer(destination0, moduleOutputChannel,
producerProperties);
final CountDownLatch errorLatch = new CountDownLatch(1);
- context.createChannel(errorDestination0, PublishSubscribeChannel.class, msg -> {
+ context.createChannel(outputErrorChannelName, PublishSubscribeChannel.class, msg -> {
logger.info("Got error message: " + msg);
softly.assertThat(msg).satisfies(isValidProducerErrorMessage(true));
errorLatch.countDown();
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java
new file mode 100644
index 00000000..7f36ff7e
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java
@@ -0,0 +1,273 @@
+package com.solace.spring.cloud.stream.binder;
+
+import com.solace.spring.boot.autoconfigure.SolaceJavaAutoConfiguration;
+import com.solace.spring.cloud.stream.binder.config.SolaceHealthIndicatorsConfiguration;
+import com.solace.spring.cloud.stream.binder.health.SolaceBinderHealthAccessor;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.SolaceBinderHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
+import com.solace.spring.cloud.stream.binder.health.indicators.SessionHealthIndicator;
+import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
+import com.solace.spring.cloud.stream.binder.test.junit.extension.SpringCloudStreamExtension;
+import com.solace.spring.cloud.stream.binder.test.spring.ConsumerInfrastructureUtil;
+import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext;
+import com.solace.spring.cloud.stream.binder.test.util.SolaceSpringCloudStreamAssertions;
+import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder;
+import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
+import com.solace.test.integration.semp.v2.SempV2Api;
+import com.solace.test.integration.semp.v2.config.model.ConfigMsgVpnQueue;
+import com.solacesystems.jcsmp.JCSMPProperties;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.api.InstanceOfAssertFactories;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.actuate.health.NamedContributor;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.cloud.stream.binder.Binding;
+import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
+import org.springframework.cloud.stream.binder.PollableSource;
+import org.springframework.cloud.stream.config.BindingProperties;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.support.MessageBuilder;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
+
+import static com.solace.spring.cloud.stream.binder.test.util.RetryableAssertions.retryAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SpringJUnitConfig(classes = {
+ SolaceHealthIndicatorsConfiguration.class,
+ SolaceJavaAutoConfiguration.class
+}, initializers = ConfigDataApplicationContextInitializer.class)
+@ExtendWith(PubSubPlusExtension.class)
+@ExtendWith(SpringCloudStreamExtension.class)
+public class SolaceBinderHealthIT {
+ private static final Logger logger = LoggerFactory.getLogger(SolaceBinderHealthIT.class);
+
+ @CartesianTest(name = "[{index}] channelType={0}, autoStart={1} concurrency={2}")
+ public void testConsumerFlowHealthProvisioning(
+ @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType,
+ @Values(booleans = {true, false}) boolean autoStart,
+ @Values(ints = {1, 3}) int concurrency,
+ SpringCloudStreamContext context) throws Exception {
+ if (concurrency > 1 && channelType.equals(PollableSource.class)) {
+ return;
+ }
+
+ SolaceTestBinder binder = context.getBinder();
+
+ BindingsHealthContributor bindingsHealthContributor = new BindingsHealthContributor();
+ binder.getBinder().setSolaceBinderHealthAccessor(new SolaceBinderHealthAccessor(
+ new SolaceBinderHealthContributor(new SessionHealthIndicator(), bindingsHealthContributor)));
+
+ ConsumerInfrastructureUtil consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType);
+ T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
+
+ String destination0 = RandomStringUtils.randomAlphanumeric(10);
+
+ ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10));
+ consumerProperties.setAutoStartup(autoStart);
+ consumerProperties.setConcurrency(concurrency);
+
+ Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder,
+ destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties);
+
+ context.binderBindUnbindLatency();
+
+ if (!autoStart) {
+ assertThat(bindingsHealthContributor.iterator().hasNext()).isFalse();
+ logger.info("Starting binding...");
+ consumerBinding.start();
+ }
+
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));
+
+ logger.info("Pausing binding...");
+ consumerBinding.pause();
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));
+
+ logger.info("Stopping binding...");
+ consumerBinding.stop();
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .extracting(c -> c.getContributor(consumerProperties.getBindingName()))
+ .extracting(BindingHealthContributor::getFlowsHealthContributor)
+ .extracting(f -> StreamSupport.stream(f.spliterator(), false))
+ .asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class))
+ .isEmpty();
+
+ logger.info("Starting binding...");
+ consumerBinding.start();
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));
+
+ logger.info("Resuming binding...");
+ consumerBinding.resume();
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));
+
+ consumerBinding.unbind();
+ }
+
+ @CartesianTest(name = "[{index}] channelType={0}, concurrency={1} healthStatus={2}")
+ public void testConsumerFlowHealthUnhealthy(
+ @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType,
+ @Values(ints = {1, 3}) int concurrency,
+ @Values(strings = {"DOWN", "RECONNECTING"}) String healthStatus,
+ SempV2Api sempV2Api,
+ SpringCloudStreamContext context) throws Exception {
+ if (concurrency > 1 && channelType.equals(PollableSource.class)) {
+ return;
+ }
+
+ SolaceTestBinder binder = context.getBinder();
+
+ BindingsHealthContributor bindingsHealthContributor = new BindingsHealthContributor();
+ binder.getBinder().setSolaceBinderHealthAccessor(new SolaceBinderHealthAccessor(
+ new SolaceBinderHealthContributor(new SessionHealthIndicator(), bindingsHealthContributor)));
+
+ ConsumerInfrastructureUtil consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType);
+ T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
+
+ String destination0 = RandomStringUtils.randomAlphanumeric(10);
+
+ ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10));
+ consumerProperties.setConcurrency(concurrency);
+
+ Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder,
+ destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties);
+
+ context.binderBindUnbindLatency();
+
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));
+
+ String vpnName = (String) context.getJcsmpSession().getProperty(JCSMPProperties.VPN_NAME);
+ String queueName = binder.getConsumerQueueName(consumerBinding);
+ logger.info(String.format("Disabling egress for queue %s", queueName));
+ switch (healthStatus) {
+ case "DOWN" -> sempV2Api.config().deleteMsgVpnQueue(vpnName, queueName);
+ case "RECONNECTING" -> sempV2Api.config()
+ .updateMsgVpnQueue(vpnName, queueName, new ConfigMsgVpnQueue().egressEnabled(false), null);
+ default -> throw new IllegalArgumentException("No test for health status: " + healthStatus);
+ }
+
+ retryAssert(2, TimeUnit.MINUTES,
+ () -> assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(
+ consumerProperties.getBindingName(), concurrency, new Status(healthStatus))));
+
+ if (healthStatus.equals("RECONNECTING")) {
+ sempV2Api.config()
+ .updateMsgVpnQueue(vpnName, queueName, new ConfigMsgVpnQueue().egressEnabled(true), null);
+ retryAssert(2, TimeUnit.MINUTES,
+ () -> assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(
+ consumerProperties.getBindingName(), concurrency, Status.UP)));
+ }
+
+ consumerBinding.unbind();
+ }
+
+ @CartesianTest(name = "[{index}] channelType={0}")
+ public void testConsumerFlowHealthNack(
+ @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType,
+ SpringCloudStreamContext context,
+ TestInfo testInfo) throws Exception {
+ SolaceTestBinder binder = context.getBinder();
+
+ BindingsHealthContributor bindingsHealthContributor = Mockito.spy(new BindingsHealthContributor());
+ binder.getBinder().setSolaceBinderHealthAccessor(new SolaceBinderHealthAccessor(
+ new SolaceBinderHealthContributor(new SessionHealthIndicator(), bindingsHealthContributor)));
+
+ ConsumerInfrastructureUtil consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType);
+
+ DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
+ T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());
+
+ String destination0 = RandomStringUtils.randomAlphanumeric(10);
+
+ ExtendedConsumerProperties consumerProperties = context.createConsumerProperties();
+ consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10));
+
+ Binding producerBinding = binder.bindProducer(
+ destination0, moduleOutputChannel, context.createProducerProperties(testInfo));
+ Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder,
+ destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties);
+
+ context.binderBindUnbindLatency();
+
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP));
+
+ String flowHealthId = "flow-0";
+ BindingHealthContributor bindingHealthContributor = (BindingHealthContributor) bindingsHealthContributor
+ .getContributor(consumerProperties.getBindingName());
+ FlowsHealthContributor flowsHealthContributor = bindingHealthContributor.getFlowsHealthContributor();
+
+ logger.info("Injecting Mockito spy into flow health indicator: {}", flowHealthId);
+ FlowHealthIndicator flowHealthIndicator = Mockito.spy((FlowHealthIndicator) (flowsHealthContributor
+ .getContributor(flowHealthId)));
+ flowsHealthContributor.removeFlowContributor(flowHealthId);
+ flowsHealthContributor.addFlowContributor(flowHealthId, flowHealthIndicator);
+
+ logger.info("Injecting Mockito spy into flows health indicator for binding: {}", consumerProperties.getBindingName());
+ flowsHealthContributor = Mockito.spy(flowsHealthContributor);
+ bindingsHealthContributor.removeBindingContributor(consumerProperties.getBindingName());
+ bindingsHealthContributor.addBindingContributor(consumerProperties.getBindingName(),
+ Mockito.spy(new BindingHealthContributor(flowsHealthContributor)));
+
+ // Clear invocations due to spy injection
+ // Real test begins now...
+ Mockito.clearInvocations(bindingsHealthContributor);
+
+ consumerInfrastructureUtil.sendAndSubscribe(moduleInputChannel, consumerProperties.getMaxAttempts(),
+ () -> moduleOutputChannel.send(MessageBuilder.withPayload(UUID.randomUUID().toString().getBytes())
+ .build()),
+ (msg, callback) -> {
+ callback.run();
+ throw new RuntimeException("Throwing expected exception!");
+ });
+
+ Mockito.verify(flowHealthIndicator, Mockito.never()
+ .description("Flow rebind should not have caused health to go down"))
+ .down(Mockito.any());
+ Mockito.verify(flowsHealthContributor, Mockito.never()
+ .description("Flow rebind should not have caused flow health indicator to be removed"))
+ .removeFlowContributor(Mockito.any());
+ Mockito.verify(bindingsHealthContributor, Mockito.never()
+ .description("Flow rebind should not have caused health component to be removed"))
+ .removeBindingContributor(Mockito.any());
+
+ assertThat(bindingsHealthContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP));
+
+ producerBinding.unbind();
+ consumerBinding.unbind();
+ }
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderConfigIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderConfigIT.java
index fb7fc9bf..f02f9199 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderConfigIT.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/config/SolaceBinderConfigIT.java
@@ -56,7 +56,7 @@ public void testClientInfoProvider(JCSMPProperties jcsmpProperties, SempV2Api se
throws Exception {
MonitorMsgVpnClient client;
SolaceMessageChannelBinder solaceMessageChannelBinder = binderConfiguration.solaceMessageChannelBinder(
- binderConfiguration.provisioningProvider(), null);
+ binderConfiguration.provisioningProvider(), null, null);
try {
String vpnName = jcsmpProperties.getStringProperty(JCSMPProperties.VPN_NAME);
client = sempV2Api.monitor()
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/handlers/SolaceSessionEventHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/handlers/SolaceSessionEventHandlerTest.java
new file mode 100644
index 00000000..a6dc9394
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/handlers/SolaceSessionEventHandlerTest.java
@@ -0,0 +1,40 @@
+package com.solace.spring.cloud.stream.binder.health.handlers;
+
+import com.solace.spring.cloud.stream.binder.health.indicators.SessionHealthIndicator;
+import com.solacesystems.jcsmp.SessionEvent;
+import com.solacesystems.jcsmp.SessionEventArgs;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class SolaceSessionEventHandlerTest {
+ @Test
+ public void testConnected(@Mock SessionHealthIndicator healthIndicator) {
+ SolaceSessionEventHandler sessionEventHandler = new SolaceSessionEventHandler(healthIndicator);
+ sessionEventHandler.setSessionHealthUp();
+ Mockito.verify(healthIndicator, Mockito.times(1)).up();
+ Mockito.verifyNoMoreInteractions(healthIndicator);
+ }
+
+ @ParameterizedTest
+ @EnumSource(SessionEvent.class)
+ public void testHandleEvent(SessionEvent event,
+ @Mock SessionEventArgs eventArgs,
+ @Mock SessionHealthIndicator healthIndicator) {
+ Mockito.when(eventArgs.getEvent()).thenReturn(event);
+
+ SolaceSessionEventHandler sessionEventHandler = new SolaceSessionEventHandler(healthIndicator);
+ sessionEventHandler.handleEvent(eventArgs);
+
+ switch (event) {
+ case DOWN_ERROR -> Mockito.verify(healthIndicator, Mockito.times(1)).down(eventArgs);
+ case RECONNECTING -> Mockito.verify(healthIndicator, Mockito.times(1)).reconnecting(eventArgs);
+ case RECONNECTED -> Mockito.verify(healthIndicator, Mockito.times(1)).up();
+ }
+ }
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SolaceBinderHealthIndicatorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SolaceBinderHealthIndicatorTest.java
new file mode 100644
index 00000000..82e4756f
--- /dev/null
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/health/indicators/SolaceBinderHealthIndicatorTest.java
@@ -0,0 +1,43 @@
+package com.solace.spring.cloud.stream.binder.health.indicators;
+
+import com.solace.spring.cloud.stream.binder.properties.SolaceSessionHealthProperties;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+
+public class SolaceBinderHealthIndicatorTest {
+ private SessionHealthIndicator healthIndicator;
+
+ @BeforeEach
+ void BeforeEach(){
+ SolaceSessionHealthProperties solaceSessionHealthProperties = new SolaceSessionHealthProperties();
+ solaceSessionHealthProperties.setReconnectAttemptsUntilDown(10);
+ this.healthIndicator = new SessionHealthIndicator(solaceSessionHealthProperties);
+ }
+
+ @Test
+ public void testUp(SoftAssertions softly) {
+ healthIndicator.up();
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(Status.UP);
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+
+ @Test
+ public void testDown(SoftAssertions softly) {
+ healthIndicator.down(null);
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+
+ @Test
+ public void testReconnecting(SoftAssertions softly) {
+ healthIndicator.reconnecting(null);
+ Health health = healthIndicator.health();
+ softly.assertThat(health.getStatus()).isEqualTo(new Status("RECONNECTING"));
+ softly.assertThat(health.getDetails()).isEmpty();
+ }
+}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java
index 81bfde76..fcde2168 100644
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java
+++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java
@@ -1,5 +1,9 @@
package com.solace.spring.cloud.stream.binder.test.util;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
+import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.meter.SolaceMessageMeterBinder;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
@@ -16,6 +20,8 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.ThrowingConsumer;
+import org.springframework.boot.actuate.health.NamedContributor;
+import org.springframework.boot.actuate.health.Status;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
@@ -31,6 +37,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE;
@@ -321,4 +330,36 @@ public static ThrowingConsumer isValidMessageSizeMeter(String nameTagValu
.isEqualTo(value)
);
}
+
+ public static ThrowingConsumer isSingleBindingHealthAvailable(String bindingName, int concurrency, Status status) {
+ return bindingsHealthContributor -> assertThat(StreamSupport.stream(bindingsHealthContributor.spliterator(), false))
+ .singleElement()
+ .satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo(bindingName))
+
+ .extracting(NamedContributor::getContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(BindingHealthContributor.class))
+ .satisfies(SolaceSpringCloudStreamAssertions.isBindingHealthAvailable(concurrency, status));
+ }
+
+ public static ThrowingConsumer isBindingHealthAvailable(int concurrency, Status status) {
+ return bindingHealthContributor -> assertThat(StreamSupport.stream(bindingHealthContributor.spliterator(), false))
+ .asInstanceOf(InstanceOfAssertFactories.list(NamedContributor.class))
+ .singleElement()
+
+ .satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo("flows"))
+ .extracting(NamedContributor::getContributor)
+ .asInstanceOf(InstanceOfAssertFactories.type(FlowsHealthContributor.class))
+
+ .extracting(flowsContrib -> StreamSupport.stream(flowsContrib.spliterator(), false))
+ .asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class))
+ .satisfies(flowsContrib -> assertThat(flowsContrib.stream().map(NamedContributor::getName))
+ .containsExactlyElementsOf(IntStream.range(0, concurrency)
+ .mapToObj(i -> "flow-" + i).collect(Collectors.toSet())))
+
+
+ .extracting(NamedContributor::getContributor)
+ .asInstanceOf(InstanceOfAssertFactories.list(FlowHealthIndicator.class))
+ .extracting(flowIndicator -> flowIndicator.getHealth(false))
+ .allSatisfy(health -> assertThat(health.getStatus()).isEqualTo(status));
+ }
}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicatorTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicatorTest.java
deleted file mode 100644
index 3229a8a7..00000000
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceBinderHealthIndicatorTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.solace.spring.cloud.stream.binder.util;
-
-import org.assertj.core.api.SoftAssertions;
-import org.junit.jupiter.api.Test;
-import org.junitpioneer.jupiter.cartesian.CartesianTest;
-import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.Status;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-public class SolaceBinderHealthIndicatorTest {
- @Test
- public void testInitialHealth() {
- assertNull(new SolaceBinderHealthIndicator().health());
- }
-
- @Test
- public void testUp(SoftAssertions softly) {
- SolaceBinderHealthIndicator healthIndicator = new SolaceBinderHealthIndicator();
- healthIndicator.up();
- Health health = healthIndicator.health();
- softly.assertThat(health.getStatus()).isEqualTo(Status.UP);
- softly.assertThat(health.getDetails()).isEmpty();
- }
-
- @Test
- public void testDown(SoftAssertions softly) {
- SolaceBinderHealthIndicator healthIndicator = new SolaceBinderHealthIndicator();
- healthIndicator.down(null, 0, null);
- Health health = healthIndicator.health();
- softly.assertThat(health.getStatus()).isEqualTo(Status.DOWN);
- softly.assertThat(health.getDetails()).isEmpty();
- }
-
- @CartesianTest(name = "[{index}] responseCode={0} info={1}")
- public void testDownDetails(@Values(ints = {-1, 0, 1}) int responseCode,
- @Values(strings = {"", "some-info"}) String info,
- SoftAssertions softly) {
- SolaceBinderHealthIndicator healthIndicator = new SolaceBinderHealthIndicator();
- Exception healthException = new Exception("test");
- healthIndicator.down(healthException, responseCode, info);
- Health health = healthIndicator.health();
-
- softly.assertThat(health.getStatus()).isEqualTo(Status.DOWN);
- softly.assertThat(health.getDetails())
- .isNotEmpty()
- .hasEntrySatisfying("error", error -> assertThat(error)
- .isEqualTo(healthException.getClass().getName() + ": " + healthException.getMessage()));
-
- if (responseCode != 0) {
- softly.assertThat(health.getDetails())
- .hasEntrySatisfying("responseCode", r -> assertThat(r).isEqualTo(responseCode));
- } else {
- softly.assertThat(health.getDetails()).doesNotContainKey("responseCode");
- }
-
- if (!info.isEmpty()) {
- softly.assertThat(health.getDetails())
- .hasEntrySatisfying("info", i -> assertThat(i).isEqualTo(info));
- } else {
- softly.assertThat(health.getDetails()).doesNotContainKey("info");
- }
- }
-
- @Test
- public void testDownWithoutDetails(SoftAssertions softly) {
- SolaceBinderHealthIndicator healthIndicator = new SolaceBinderHealthIndicator();
- healthIndicator.down(new RuntimeException("test"), 1, "some-info");
- Health health = healthIndicator.getHealth(false);
- softly.assertThat(health.getStatus()).isEqualTo(Status.DOWN);
- softly.assertThat(health.getDetails()).isEmpty();
- }
-
- @Test
- public void testReconnecting(SoftAssertions softly) {
- SolaceBinderHealthIndicator healthIndicator = new SolaceBinderHealthIndicator();
- healthIndicator.reconnecting();
- Health health = healthIndicator.health();
- softly.assertThat(health.getStatus()).isEqualTo(new Status("RECONNECTING"));
- softly.assertThat(health.getDetails()).isEmpty();
- }
-}
diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandlerTest.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandlerTest.java
deleted file mode 100644
index 423a73db..00000000
--- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/util/SolaceSessionEventHandlerTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.solace.spring.cloud.stream.binder.util;
-
-import com.solacesystems.jcsmp.SessionEvent;
-import com.solacesystems.jcsmp.SessionEventArgs;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-@ExtendWith(MockitoExtension.class)
-public class SolaceSessionEventHandlerTest {
- @Test
- public void testConnected(@Mock SolaceBinderHealthIndicator healthIndicator) {
- SolaceSessionEventHandler sessionEventHandler = new SolaceSessionEventHandler(healthIndicator);
- sessionEventHandler.connected();
- Mockito.verify(healthIndicator, Mockito.times(1)).up();
- Mockito.verifyNoMoreInteractions(healthIndicator);
- }
-
- @ParameterizedTest
- @EnumSource(SessionEvent.class)
- public void testHandleEvent(SessionEvent event,
- @Mock SessionEventArgs eventArgs,
- @Mock SolaceBinderHealthIndicator healthIndicator) {
- Exception exception = new Exception("test");
- String info = "test=info";
- int responseCode = 0;
- Mockito.when(eventArgs.getEvent()).thenReturn(event);
- if (event.equals(SessionEvent.DOWN_ERROR)) {
- Mockito.when(eventArgs.getException()).thenReturn(exception);
- Mockito.when(eventArgs.getInfo()).thenReturn(info);
- Mockito.when(eventArgs.getResponseCode()).thenReturn(responseCode);
- }
-
- SolaceSessionEventHandler sessionEventHandler = new SolaceSessionEventHandler(healthIndicator);
- sessionEventHandler.handleEvent(eventArgs);
-
- switch (event) {
- case DOWN_ERROR:
- Mockito.verify(healthIndicator, Mockito.times(1))
- .down(exception, responseCode, info);
- break;
- case RECONNECTING:
- Mockito.verify(healthIndicator, Mockito.times(1)).reconnecting();
- break;
- case RECONNECTED:
- Mockito.verify(healthIndicator, Mockito.times(1)).up();
- break;
- default:
- Mockito.verifyNoInteractions(healthIndicator);
- }
-
- Mockito.verifyNoMoreInteractions(healthIndicator);
- }
-}