Skip to content

Commit

Permalink
add IT for flow health
Browse files Browse the repository at this point in the history
  • Loading branch information
Nephery committed Oct 23, 2023
1 parent 25c7308 commit be47055
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.solace.spring.cloud.stream.binder;

import com.solace.spring.boot.autoconfigure.SolaceJavaAutoConfiguration;
import com.solace.spring.cloud.stream.binder.config.SolaceHealthIndicatorsConfiguration;
import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
import com.solace.spring.cloud.stream.binder.properties.SolaceFlowHealthProperties;
import com.solace.spring.cloud.stream.binder.test.junit.extension.SpringCloudStreamExtension;
import com.solace.spring.cloud.stream.binder.test.spring.ConsumerInfrastructureUtil;
import com.solace.spring.cloud.stream.binder.test.spring.SpringCloudStreamContext;
import com.solace.spring.cloud.stream.binder.test.util.SolaceSpringCloudStreamAssertions;
import com.solace.spring.cloud.stream.binder.test.util.SolaceTestBinder;
import com.solace.test.integration.junit.jupiter.extension.PubSubPlusExtension;
import org.apache.commons.lang3.RandomStringUtils;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Values;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.PollableSource;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

@SpringJUnitConfig(classes = {
SolaceHealthIndicatorsConfiguration.class,
SolaceJavaAutoConfiguration.class
}, initializers = ConfigDataApplicationContextInitializer.class)
@ExtendWith(PubSubPlusExtension.class)
@ExtendWith(SpringCloudStreamExtension.class)
public class SolaceBinderHealthIT {
private static final Logger logger = LoggerFactory.getLogger(SolaceBinderHealthIT.class);

@CartesianTest(name = "[{index}] channelType={0}, autoStart={1} concurrency={2}")
public <T> void testConsumerFlowHealth(
@Values(classes = {DirectChannel.class, PollableSource.class}) Class<T> channelType,
@Values(booleans = {true, false}) boolean autoStart,
@Values(ints = {1, 3}) int concurrency,
SpringCloudStreamContext context) throws Exception {
if (concurrency > 1 && channelType.equals(PollableSource.class)) {
return;
}

SolaceTestBinder binder = context.getBinder();

BindingsHealthContributor bindingsHealthContributor = new BindingsHealthContributor(new SolaceFlowHealthProperties());
binder.getBinder().setBindingsHealthContributor(bindingsHealthContributor);

ConsumerInfrastructureUtil<T> consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType);
T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());

String destination0 = RandomStringUtils.randomAlphanumeric(10);

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = context.createConsumerProperties();
consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10));
consumerProperties.setAutoStartup(autoStart);
consumerProperties.setConcurrency(concurrency);

Binding<T> consumerBinding = consumerInfrastructureUtil.createBinding(binder,
destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties);

context.binderBindUnbindLatency();

if (!autoStart) {
assertThat(bindingsHealthContributor.iterator().hasNext()).isFalse();
logger.info("Starting binding...");
consumerBinding.start();
}

assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));

logger.info("Pausing binding...");
consumerBinding.pause();
assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));

logger.info("Stopping binding...");
consumerBinding.stop();
assertThat(bindingsHealthContributor.iterator().hasNext()).isFalse();

logger.info("Starting binding...");
consumerBinding.start();
assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));

logger.info("Resuming binding...");
consumerBinding.resume();
assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), concurrency, Status.UP));

consumerBinding.unbind();
}

@CartesianTest(name = "[{index}] channelType={0}")
public <T> void testConsumerFlowHealthNack(
@Values(classes = {DirectChannel.class, PollableSource.class}) Class<T> channelType,
SpringCloudStreamContext context,
TestInfo testInfo) throws Exception {
SolaceTestBinder binder = context.getBinder();

BindingsHealthContributor bindingsHealthContributor = Mockito.spy(new BindingsHealthContributor(
new SolaceFlowHealthProperties()));
binder.getBinder().setBindingsHealthContributor(bindingsHealthContributor);

ConsumerInfrastructureUtil<T> consumerInfrastructureUtil = context.createConsumerInfrastructureUtil(channelType);

DirectChannel moduleOutputChannel = context.createBindableChannel("output", new BindingProperties());
T moduleInputChannel = consumerInfrastructureUtil.createChannel("input", new BindingProperties());

String destination0 = RandomStringUtils.randomAlphanumeric(10);

ExtendedConsumerProperties<SolaceConsumerProperties> consumerProperties = context.createConsumerProperties();
consumerProperties.populateBindingName(RandomStringUtils.randomAlphanumeric(10));

Binding<MessageChannel> producerBinding = binder.bindProducer(
destination0, moduleOutputChannel, context.createProducerProperties(testInfo));
Binding<T> consumerBinding = consumerInfrastructureUtil.createBinding(binder,
destination0, RandomStringUtils.randomAlphanumeric(10), moduleInputChannel, consumerProperties);

context.binderBindUnbindLatency();

assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP));

String flowHealthId = "flow-0";
BindingHealthContributor bindingHealthContributor = (BindingHealthContributor) bindingsHealthContributor
.getContributor(consumerProperties.getBindingName());
FlowsHealthContributor flowsHealthContributor = bindingHealthContributor.getFlowsHealthContributor();

logger.info("Injecting Mockito spy into flow health indicator: {}", flowHealthId);
FlowHealthIndicator flowHealthIndicator = Mockito.spy((FlowHealthIndicator) (flowsHealthContributor
.getContributor(flowHealthId)));
flowsHealthContributor.removeFlowContributor(flowHealthId);
flowsHealthContributor.addFlowContributor(flowHealthId, flowHealthIndicator);

logger.info("Injecting Mockito spy into flows health indicator for binding: {}", consumerProperties.getBindingName());
flowsHealthContributor = Mockito.spy(flowsHealthContributor);
bindingsHealthContributor.removeBindingContributor(consumerProperties.getBindingName());
bindingsHealthContributor.addBindingContributor(consumerProperties.getBindingName(),
Mockito.spy(new BindingHealthContributor(flowsHealthContributor)));

// Clear invocations due to spy injection
// Real test begins now...
Mockito.clearInvocations(bindingsHealthContributor);

consumerInfrastructureUtil.sendAndSubscribe(moduleInputChannel, consumerProperties.getMaxAttempts(),
() -> moduleOutputChannel.send(MessageBuilder.withPayload(UUID.randomUUID().toString().getBytes())
.build()),
(msg, callback) -> {
callback.run();
throw new RuntimeException("Throwing expected exception!");
});

Mockito.verify(flowHealthIndicator, Mockito.never()
.description("Flow rebind should not have caused health to go down"))
.down(Mockito.any());
Mockito.verify(flowsHealthContributor, Mockito.never()
.description("Flow rebind should not have caused flow health indicator to be removed"))
.removeFlowContributor(Mockito.any());
Mockito.verify(bindingsHealthContributor, Mockito.never()
.description("Flow rebind should not have caused health component to be removed"))
.removeBindingContributor(Mockito.any());

assertThat(bindingsHealthContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingsHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isSingleBindingHealthAvailable(consumerProperties.getBindingName(), 1, Status.UP));

producerBinding.unbind();
consumerBinding.unbind();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.solace.spring.cloud.stream.binder.test.util;

import com.solace.spring.cloud.stream.binder.health.contributors.BindingHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.BindingsHealthContributor;
import com.solace.spring.cloud.stream.binder.health.contributors.FlowsHealthContributor;
import com.solace.spring.cloud.stream.binder.health.indicators.FlowHealthIndicator;
import com.solace.spring.cloud.stream.binder.messaging.SolaceBinderHeaders;
import com.solace.spring.cloud.stream.binder.meter.SolaceMessageMeterBinder;
import com.solace.spring.cloud.stream.binder.properties.SolaceConsumerProperties;
Expand All @@ -16,6 +20,8 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.ThrowingConsumer;
import org.springframework.boot.actuate.health.NamedContributor;
import org.springframework.boot.actuate.health.Status;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
Expand All @@ -31,6 +37,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE;
Expand Down Expand Up @@ -321,4 +330,36 @@ public static ThrowingConsumer<Meter> isValidMessageSizeMeter(String nameTagValu
.isEqualTo(value)
);
}

public static ThrowingConsumer<BindingsHealthContributor> isSingleBindingHealthAvailable(String bindingName, int concurrency, Status status) {
return bindingsHealthContributor -> assertThat(StreamSupport.stream(bindingsHealthContributor.spliterator(), false))
.singleElement()
.satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo(bindingName))

.extracting(NamedContributor::getContributor)
.asInstanceOf(InstanceOfAssertFactories.type(BindingHealthContributor.class))
.satisfies(SolaceSpringCloudStreamAssertions.isBindingHealthAvailable(concurrency, status));
}

public static ThrowingConsumer<BindingHealthContributor> isBindingHealthAvailable(int concurrency, Status status) {
return bindingHealthContributor -> assertThat(StreamSupport.stream(bindingHealthContributor.spliterator(), false))
.asInstanceOf(InstanceOfAssertFactories.list(NamedContributor.class))
.singleElement()

.satisfies(bindingContrib -> assertThat(bindingContrib.getName()).isEqualTo("flows"))
.extracting(NamedContributor::getContributor)
.asInstanceOf(InstanceOfAssertFactories.type(FlowsHealthContributor.class))

.extracting(flowsContrib -> StreamSupport.stream(flowsContrib.spliterator(), false))
.asInstanceOf(InstanceOfAssertFactories.stream(NamedContributor.class))
.satisfies(flowsContrib -> assertThat(flowsContrib.stream().map(NamedContributor::getName))
.containsExactlyElementsOf(IntStream.range(0, concurrency)
.mapToObj(i -> "flow-" + i).collect(Collectors.toSet())))


.extracting(NamedContributor::getContributor)
.asInstanceOf(InstanceOfAssertFactories.list(FlowHealthIndicator.class))
.extracting(flowIndicator -> flowIndicator.getHealth(false))
.allSatisfy(health -> assertThat(health.getStatus()).isEqualTo(status));
}
}

0 comments on commit be47055

Please sign in to comment.