diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java new file mode 100644 index 00000000..cdeac9eb --- /dev/null +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/SolaceBinderHealthIT.java @@ -0,0 +1,193 @@ +package com.solace.spring.cloud.stream.binder; + +import com.solace.spring.boot.autoconfigure.SolaceJavaAutoConfiguration; +import com.solace.spring.cloud.stream.binder.config.SolaceHealthIndicatorsConfiguration; +import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor; +import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor; +import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor; +import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator; +import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; +import com.solace.spring.cloud.stream.binder.properties.SolaceFlowHealthProperties; +import com.solace.spring.cloud.stream.binder.test.junit.extension.SpringCloudStreamExtension; +import com.solace.spring.cloud.stream.binder.test.spring.ConsumerInfrastructureUtil; +import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext; +import com.solace.spring.cloud.stream.binder.test.util.SolaceSpringCloudStreamAssertions; +import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder; +import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension; +import org.apache.commons.lang3.RandomStringUtils; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.actuate.health.Status; +import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; +import org.springframework.cloud.stream.binder.Binding; +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.PollableSource; +import org.springframework.cloud.stream.config.BindingProperties; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringJUnitConfig(classes = { + SolaceHealthIndicatorsConfiguration.class, + SolaceJavaAutoConfiguration.class +}, initializers = ConfigDataApplicationContextInitializer.class) +@ExtendWith(PubSubPlusExtension.class) +@ExtendWith(SpringCloudStreamExtension.class) +public class SolaceBinderHealthIT { + private static final Logger logger = LoggerFactory.getLogger(SolaceBinderHealthIT.class); + + @CartesianTest(name = "[{index}] channelType={0}, autoStart={1} concurrency={2}") + public void testConsumerFlowHealth( + @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType, + @Values(booleans = {true, false}) boolean autoStart, + @Values(ints = {1, 3}) int concurrency, + SpringCloudStreamContext context) throws Exception { + if (concurrency > 1 && channelType.equals(PollableSource.class)) { + return; + } + + SolaceTestBinder binder = context.getBinder(); + + BindingsHealthContributor bindingsHealthContributor = new BindingsHealthContributor(new SolaceFlowHealthProperties()); + binder.getBinder().setBindingsHealthContributor(bindingsHealthContributor); + + ConsumerInfrastructureUtil consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType); + T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties()); + + String destination0 = RandomStringUtils.randomAlphanumeric(10); + + ExtendedConsumerProperties consumerProperties = context.createConsumerProperties(); + consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10)); + consumerProperties.setAutoStartup(autoStart); + consumerProperties.setConcurrency(concurrency); + + Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder, + destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties); + + context.binderBindUnbindLatency(); + + if (!autoStart) { + assertThat(bindingsHealthContributor.iterator().hasNext()).isFalse(); + logger.info("Starting binding..."); + consumerBinding.start(); + } + + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP)); + + logger.info("Pausing binding..."); + consumerBinding.pause(); + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP)); + + logger.info("Stopping binding..."); + consumerBinding.stop(); + assertThat(bindingsHealthContributor.iterator().hasNext()).isFalse(); + + logger.info("Starting binding..."); + consumerBinding.start(); + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP)); + + logger.info("Resuming binding..."); + consumerBinding.resume(); + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP)); + + consumerBinding.unbind(); + } + + @CartesianTest(name = "[{index}] channelType={0}") + public void testConsumerFlowHealthNack( + @Values(classes = {DirectChannel.class, PollableSource.class}) Class channelType, + SpringCloudStreamContext context, + TestInfo testInfo) throws Exception { + SolaceTestBinder binder = context.getBinder(); + + BindingsHealthContributor bindingsHealthContributor = Mockito.spy(new BindingsHealthContributor( + new SolaceFlowHealthProperties())); + binder.getBinder().setBindingsHealthContributor(bindingsHealthContributor); + + ConsumerInfrastructureUtil consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType); + + DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties()); + T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties()); + + String destination0 = RandomStringUtils.randomAlphanumeric(10); + + ExtendedConsumerProperties consumerProperties = context.createConsumerProperties(); + consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10)); + + Binding producerBinding = binder.bindProducer( + destination0, moduleOutputChannel, context.createProducerProperties(testInfo)); + Binding consumerBinding = consumerInfrastructureUtil.createBinding(binder, + destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties); + + context.binderBindUnbindLatency(); + + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP)); + + String flowHealthId = "flow-0"; + BindingHealthContributor bindingHealthContributor = (BindingHealthContributor) bindingsHealthContributor + .getContributor(consumerProperties.getBindingName()); + FlowsHealthContributor flowsHealthContributor = bindingHealthContributor.getFlowsHealthContributor(); + + logger.info("Injecting Mockito spy into flow health indicator: {}", flowHealthId); + FlowHealthIndicator flowHealthIndicator = Mockito.spy((FlowHealthIndicator) (flowsHealthContributor + .getContributor(flowHealthId))); + flowsHealthContributor.removeFlowContributor(flowHealthId); + flowsHealthContributor.addFlowContributor(flowHealthId, flowHealthIndicator); + + logger.info("Injecting Mockito spy into flows health indicator for binding: {}", consumerProperties.getBindingName()); + flowsHealthContributor = Mockito.spy(flowsHealthContributor); + bindingsHealthContributor.removeBindingContributor(consumerProperties.getBindingName()); + bindingsHealthContributor.addBindingContributor(consumerProperties.getBindingName(), + Mockito.spy(new BindingHealthContributor(flowsHealthContributor))); + + // Clear invocations due to spy injection + // Real test begins now... + Mockito.clearInvocations(bindingsHealthContributor); + + consumerInfrastructureUtil.sendAndSubscribe(moduleInputChannel, consumerProperties.getMaxAttempts(), + () -> moduleOutputChannel.send(MessageBuilder.withPayload(UUID.randomUUID().toString().getBytes()) + .build()), + (msg, callback) -> { + callback.run(); + throw new RuntimeException("Throwing expected exception!"); + }); + + Mockito.verify(flowHealthIndicator, Mockito.never() + .description("Flow rebind should not have caused health to go down")) + .down(Mockito.any()); + Mockito.verify(flowsHealthContributor, Mockito.never() + .description("Flow rebind should not have caused flow health indicator to be removed")) + .removeFlowContributor(Mockito.any()); + Mockito.verify(bindingsHealthContributor, Mockito.never() + .description("Flow rebind should not have caused health component to be removed")) + .removeBindingContributor(Mockito.any()); + + assertThat(bindingsHealthContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP)); + + producerBinding.unbind(); + consumerBinding.unbind(); + } +} diff --git a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java index 81bfde76..fcde2168 100644 --- a/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java +++ b/solace-spring-cloud-stream-binder/solace-spring-cloud-stream-binder/src/test/java/com/solace/spring/cloud/stream/binder/test/util/SolaceSpringCloudStreamAssertions.java @@ -1,5 +1,9 @@ package com.solace.spring.cloud.stream.binder.test.util; +import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor; +import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor; +import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor; +import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator; import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders; import com.solace.spring.cloud.stream.binder.meter.SolaceMessageMeterBinder; import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties; @@ -16,6 +20,8 @@ import org.assertj.core.api.InstanceOfAssertFactories; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.ThrowingConsumer; +import org.springframework.boot.actuate.health.NamedContributor; +import org.springframework.boot.actuate.health.Status; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; @@ -31,6 +37,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; @@ -321,4 +330,36 @@ public static ThrowingConsumer isValidMessageSizeMeter(String nameTagValu .isEqualTo(value) ); } + + public static ThrowingConsumer isSingleBindingHealthAvailable(String bindingName, int concurrency, Status status) { + return bindingsHealthContributor -> assertThat(StreamSupport.stream(bindingsHealthContributor.spliterator(), false)) + .singleElement() + .satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo(bindingName)) + + .extracting(NamedContributor::getContributor) + .asInstanceOf(InstanceOfAssertFactories.type(BindingHealthContributor.class)) + .satisfies(SolaceSpringCloudStreamAssertions.isBindingHealthAvailable(concurrency, status)); + } + + public static ThrowingConsumer isBindingHealthAvailable(int concurrency, Status status) { + return bindingHealthContributor -> assertThat(StreamSupport.stream(bindingHealthContributor.spliterator(), false)) + .asInstanceOf(InstanceOfAssertFactories.list(NamedContributor.class)) + .singleElement() + + .satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo("flows")) + .extracting(NamedContributor::getContributor) + .asInstanceOf(InstanceOfAssertFactories.type(FlowsHealthContributor.class)) + + .extracting(flowsContrib -> StreamSupport.stream(flowsContrib.spliterator(), false)) + .asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class)) + .satisfies(flowsContrib -> assertThat(flowsContrib.stream().map(NamedContributor::getName)) + .containsExactlyElementsOf(IntStream.range(0, concurrency) + .mapToObj(i -> "flow-" + i).collect(Collectors.toSet()))) + + + .extracting(NamedContributor::getContributor) + .asInstanceOf(InstanceOfAssertFactories.list(FlowHealthIndicator.class)) + .extracting(flowIndicator -> flowIndicator.getHealth(false)) + .allSatisfy(health -> assertThat(health.getStatus()).isEqualTo(status)); + } }