Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close message channel issue #3026

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@
* @author Chris Bono
* @author Artem Bilan
* @author Kotaro Matsumoto
* @author Omer Celik
*/
class BindingServiceTests {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void defaultGroup() throws Exception {
void defaultGroup() {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
Expand Down Expand Up @@ -175,7 +176,7 @@ void multipleConsumerBindings() {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void multipleConsumerBindingsFromIndexList() throws Exception {
void multipleConsumerBindingsFromIndexList() {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
Expand Down Expand Up @@ -236,7 +237,7 @@ void multipleConsumerBindingsFromIndexList() throws Exception {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void consumerBindingWhenMultiplexingIsEnabled() throws Exception {
void consumerBindingWhenMultiplexingIsEnabled() {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
Expand Down Expand Up @@ -282,7 +283,7 @@ void consumerBindingWhenMultiplexingIsEnabled() throws Exception {

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void explicitGroup() throws Exception {
void explicitGroup() {
BindingServiceProperties properties = new BindingServiceProperties();
Map<String, BindingProperties> bindingProperties = new HashMap<>();
BindingProperties props = new BindingProperties();
Expand Down Expand Up @@ -528,7 +529,7 @@ void lateBindingProducer() throws Exception {

assertThat(service.getProducerBinding("output")).isSameAs(binding);

service.unbindProducers(outputChannelName);
service.unbindProducers(null, outputChannelName);
verify(binder, times(2)).bindProducer(eq("foo"), same(outputChannel),
any(ProducerProperties.class));
verify(delegate).unbind();
Expand All @@ -552,9 +553,8 @@ void bindingAutostartup() throws Exception {
assertThat(inputBinding.isRunning()).isFalse();
}

@SuppressWarnings("unchecked")
@Test
void bindingNameAsTopLevelProperty() throws Exception {
void bindingNameAsTopLevelProperty() {
ApplicationContext context = new SpringApplicationBuilder(BarConfiguration.class)
.web(WebApplicationType.NONE).run();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
*
* @author Oleg Zhurakousky
* @author Soby Chacko
* @author Omer Celik
*
*/
class StreamBridgeTests {
Expand Down Expand Up @@ -384,7 +385,7 @@ void withOutputContentTypeWildCardBindings() {

// See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream/issues/2805
@Test
void streamBridgeSendWithBinderNameAndCustomContentType() throws Exception {
void streamBridgeSendWithBinderNameAndCustomContentType() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(ConsumerConfiguration.class, EmptyConfigurationWithCustomConverters.class))
.web(WebApplicationType.NONE).run(
Expand Down Expand Up @@ -767,6 +768,45 @@ void dynamicDestinationDestroy() {
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0);
}

@Test
void dynamicDestinationWithBinderNameDestroy() {
BindingService bindingService;
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(InterceptorConfiguration.class))
.web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.stream.binders.kafka1.type=kafka",
"--spring.cloud.stream.binders.anotherKafka.type=kafka"
)) {
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("binding1", "kafka1", "Omer Celik");
bridge.send("binding2", "anotherKafka", "Omer Celik");

bindingService = context.getBean(BindingService.class);
}
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(0);
}

@Test
void dynamicDestinationWithBinderNameDestroyForCacheSize() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
.getCompleteConfiguration(InterceptorConfiguration.class))
.web(WebApplicationType.NONE).run(
"--spring.jmx.enabled=false",
"--spring.cloud.stream.dynamic-destination-cache-size=1",
"--spring.cloud.stream.binders.kafka1.type=kafka",
"--spring.cloud.stream.binders.anotherKafka.type=kafka"
)) {
StreamBridge bridge = context.getBean(StreamBridge.class);
bridge.send("binding1", "kafka1", "Omer Celik");
bridge.send("binding2", "anotherKafka", "Omer Celik");

BindingService bindingService = context.getBean(BindingService.class);
assertThat(bindingService.getProducerBindingNames().length).isEqualTo(1);
assertThat(bindingService.getProducerBindingNames()[0]).isEqualTo("anotherKafka:binding2");
}
}

@Test
void withIntegrationFlowBecauseMarcinSaidSo() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(TestChannelBinderConfiguration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder;

/**
* Holds information about the binder and channel.
*
* @author Omer Celik
*/
public record BinderWrapper(Binder binder, String destinationName, String cacheKey) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void unbindOutputs(BindingService bindingService) {
for (Map.Entry<String, BoundTargetHolder> boundTargetHolderEntry : this.outputHolders
.entrySet()) {
if (boundTargetHolderEntry.getValue().bindable()) {
bindingService.unbindProducers(boundTargetHolderEntry.getKey());
bindingService.unbindProducers(null, boundTargetHolderEntry.getKey());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.beans.BeanUtils;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderWrapper;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
Expand All @@ -50,6 +51,9 @@
import org.springframework.util.StringUtils;
import org.springframework.validation.DataBinder;

import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.createChannelCacheKey;
import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.getBinderNameIfNeeded;

/**
* Handles binding of input/output targets by delegating to an underlying {@link Binder}.
*
Expand Down Expand Up @@ -270,56 +274,52 @@ public <T> void reschedulePollableConsumerBinding(final T input,
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Binding<T> bindProducer(T output, String outputName, boolean cache, @Nullable Binder<T, ?, ProducerProperties> binder) {
String bindingTarget = this.bindingServiceProperties.getBindingDestination(outputName);
Class<?> outputClass = output.getClass();
if (output instanceof Advised advisedOutput) {
outputClass = Stream.of(advisedOutput.getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
.orElse(outputClass);
}
if (binder == null) {
binder = (Binder<T, ?, ProducerProperties>) getBinder(outputName, outputClass);
}

public <T> Binding<T> bindProducer(T output, boolean cache, BinderWrapper binderWrapper) {
ProducerProperties producerProperties = this.bindingServiceProperties
.getProducerProperties(outputName);
if (binder instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
Object extension = extendedPropertiesBinder.getExtendedProducerProperties(outputName);
.getProducerProperties(binderWrapper.destinationName());
if (binderWrapper.binder() instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
Object extension = extendedPropertiesBinder.getExtendedProducerProperties(binderWrapper.destinationName());
ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties<>(
extension);
BeanUtils.copyProperties(producerProperties, extendedProducerProperties);

producerProperties = extendedProducerProperties;
}
producerProperties.populateBindingName(outputName);
producerProperties.populateBindingName(binderWrapper.destinationName());
validate(producerProperties);
Binding<T> binding = doBindProducer(output, bindingTarget, binder,
String bindingTarget = this.bindingServiceProperties.getBindingDestination(binderWrapper.destinationName());
Binding<T> binding = doBindProducer(output, bindingTarget, binderWrapper.binder(),
producerProperties);
// If the downstream binder modified the partition count in the extended producer properties
// based on the higher number of partitions provisioned on the target middleware, update that
// in the original producer properties.
ProducerProperties originalProducerProperties = this.bindingServiceProperties
.getProducerProperties(outputName);
.getProducerProperties(binderWrapper.destinationName());
if (originalProducerProperties.getPartitionCount() < producerProperties.getPartitionCount()) {
originalProducerProperties.setPartitionCount(producerProperties.getPartitionCount());
}
if (cache) {
this.producerBindings.put(outputName, binding);
this.producerBindings.put(binderWrapper.cacheKey(), binding);
}
return binding;
}

public <T> Binding<T> bindProducer(T output, String outputName, boolean cache) {
return this.bindProducer(output, outputName, cache, null);
Class<?> outputClass = output.getClass();
if (output instanceof Advised advisedOutput) {
outputClass = Stream.of(advisedOutput.getProxiedInterfaces()).filter(c -> !c.getName().contains("org.springframework")).findFirst()
.orElse(outputClass);
}
BinderWrapper binderWrapper = createBinderWrapper(null, outputName, outputClass);
return this.bindProducer(output, cache, binderWrapper);
}

public <T> Binding<T> bindProducer(T output, String outputName) {
return this.bindProducer(output, outputName, true);
}

@SuppressWarnings("rawtypes")
public Object getExtendedProducerProperties(Object output, String outputName) {
Binder binder = getBinder(outputName, output.getClass());
public Object getExtendedProducerProperties(Binder binder, String outputName) {
if (binder instanceof ExtendedPropertiesBinder extendedPropertiesBinder) {
return extendedPropertiesBinder.getExtendedProducerProperties(outputName);
}
Expand Down Expand Up @@ -398,16 +398,21 @@ else if (this.log.isWarnEnabled()) {
}
}

public void unbindProducers(String outputName) {
Binding<?> binding = this.producerBindings.remove(outputName);
public void unbindProducers(@Nullable String binderName, String outputName) {
String cacheKey = createChannelCacheKey(binderName, outputName, bindingServiceProperties);
unbindProducers(cacheKey);
}

public void unbindProducers(String cacheKey) {
Binding<?> binding = this.producerBindings.remove(cacheKey);

if (binding != null) {
binding.stop();
//then
binding.unbind();
}
else if (this.log.isWarnEnabled()) {
this.log.warn("Trying to unbind '" + outputName + "', but no binding found.");
this.log.warn("Trying to unbind '" + cacheKey + "', but no binding found.");
}
}

Expand Down Expand Up @@ -443,6 +448,14 @@ private void assertNotIllegalException(RuntimeException exception)
}
}

public BinderWrapper createBinderWrapper(@Nullable String binderName, String destinationName, Class<?> outputClass) {
binderName = getBinderNameIfNeeded(binderName, destinationName, bindingServiceProperties);
Binder binder = binderFactory.getBinder(binderName, outputClass);
String channelCacheKey = createChannelCacheKey(binderName, destinationName);
return new BinderWrapper(binder, destinationName, channelCacheKey);
}


public static class LateBinding<T> implements Binding<T> {

private volatile Binding<T> delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.BinderWrapper;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.DefaultPartitioningInterceptor;
Expand All @@ -68,6 +69,8 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import static org.springframework.cloud.stream.utils.CacheKeyCreatorUtils.createChannelCacheKey;


/**
* A class which allows user to send data to an output binding.
Expand All @@ -85,6 +88,7 @@
* @author Soby Chacko
* @author Byungjun You
* @author Michał Rowicki
* @author Omer Celik
* @since 3.0.3
*
*/
Expand Down Expand Up @@ -128,7 +132,6 @@ public final class StreamBridge implements StreamOperations, SmartInitializingSi
* @param bindingServiceProperties instance of {@link BindingServiceProperties}
* @param applicationContext instance of {@link ConfigurableApplicationContext}
*/
@SuppressWarnings("serial")
StreamBridge(FunctionCatalog functionCatalog, BindingServiceProperties bindingServiceProperties,
ConfigurableApplicationContext applicationContext, @Nullable NewDestinationBindingCallback destinationBindingCallback) {
this.executorService = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -268,13 +271,7 @@ public void afterSingletonsInstantiated() {
MessageChannel resolveDestination(String destinationName, ProducerProperties producerProperties, String binderName) {
lock.lock();
try {
MessageChannel messageChannel = null;
if (StringUtils.hasText(binderName)) {
messageChannel = this.channelCache.get(binderName + ":" + destinationName);
}
else {
messageChannel = this.channelCache.get(destinationName);
}
MessageChannel messageChannel = this.channelCache.get(createChannelCacheKey(binderName, destinationName, bindingServiceProperties));
if (messageChannel == null) {
if (this.applicationContext.containsBean(destinationName)) {
messageChannel = this.applicationContext.getBean(destinationName, MessageChannel.class);
Expand All @@ -291,28 +288,20 @@ MessageChannel resolveDestination(String destinationName, ProducerProperties pro
messageChannel = this.isAsync() ? new ExecutorChannel(this.executorService) : new DirectWithAttributesChannel();
((AbstractSubscribableChannel) messageChannel).setApplicationContext(applicationContext);
((AbstractSubscribableChannel) messageChannel).setComponentName(destinationName);

BinderWrapper binderWrapper = bindingService.createBinderWrapper(binderName, destinationName, messageChannel.getClass());
if (this.destinationBindingCallback != null) {
Object extendedProducerProperties = this.bindingService
.getExtendedProducerProperties(messageChannel, destinationName);
.getExtendedProducerProperties(binderWrapper.binder(), destinationName);
this.destinationBindingCallback.configure(destinationName, messageChannel,
producerProperties, extendedProducerProperties);
}

Binder binder = null;
if (StringUtils.hasText(binderName)) {
BinderFactory binderFactory = this.applicationContext.getBean(BinderFactory.class);
binder = binderFactory.getBinder(binderName, messageChannel.getClass());
}
addPartitioningInterceptorIfNeedBe(producerProperties, destinationName, (AbstractMessageChannel) messageChannel);
addGlobalChannelInterceptorProcessor((AbstractMessageChannel) messageChannel, destinationName);

this.bindingService.bindProducer(messageChannel, destinationName, true, binder);
if (StringUtils.hasText(binderName)) {
this.channelCache.put(binderName + ":" + destinationName, messageChannel);
}
else {
this.channelCache.put(destinationName, messageChannel);
}
this.bindingService.bindProducer(messageChannel, true, binderWrapper);
this.channelCache.put(binderWrapper.cacheKey(), messageChannel);
}
}

Expand Down
Loading
Loading