-
Notifications
You must be signed in to change notification settings - Fork 613
Spring Cloud Stream 2.0.0 Release Notes
- Polling Consumer
- Micrometer
Binder backed by Spring Integration https://github.com/spring-cloud/spring-cloud-stream/pull/1241
Given https://github.com/spring-cloud/spring-cloud-stream/commit/0463f7939d654138d955d9897ae2d27a82fea67f web is no longer included, so users can switch manually between Tomcat and Netty by simply switching dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
and
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
- optional, requires web (above) and
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<optional>true</optional>
</dependency>
. . .is now injectable https://github.com/spring-cloud/spring-cloud-stream/issues/858 - fill in
Stop: curl -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/stop/inOne
Start: curl -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/start/inOne
. . . where inOne
is the channel name.
The http://localhost:8080/actuator/bindings/
(GET) will display the list of existing bindings
- Default Content type is set to
application/json
which needs to be taken into consideration when migrating 1.3 application and/or operating in the mixed mode (i.e., 1.3 producer -> 2.0 consumer). - Messages with textual payloads and "contentType"
text/*
or*/json
will be converted toMessage<String>
to maintain the behavioral compatibility with the previous version of the framework. Message's payload will still be converted to the appropriate argument type by the argument resolvers (MessageConverter
's) if such argument is not aString
(i.e., POJO) essentially resulting in secondary conversion which is not necessary for most cases. We are considering a flag to override this behavior to avoid secondary conversion. - [TODO:Add to the DOC]
@StreamMessageConverter
- to define customMessageConverter
's used by argument resolvers. Added to the top of the list of existing MessageConverters - List of MessageConverter's configured by default (in order):
- TupleJsonMessageConverter - tbd
- ByteArrayMessageConverter - tbd
- ObjectStringMessageConverter - tbd
- JavaSerializationMessageConverter (DEPRECATED) - tbd
- KryoMessageConverter (DEPRECATED) - tbd
- JsonUnmarshallingMessageConverter - tbd
- The
contentType
as a hint to help select the appropriateMessageConverter
. For example, if payload isbyte[]
and argument isFoo
which converter to use? - Add note about the behavior of
@Transformer
around header propagation bug (unless SI fix is available before the release)
NOTE: explain the behavior change for typeless handlers such as: handle(Message<?> message) vs handle(Message<byte[]> message) OR handle(Object val) vs handle(byte[] val) (edited)
- there is still an issue with regard to consistency when dealing with json payloads and collections
public List<Employee<Person>> echo(List<Employee<Person>> value)
- JavaSerializationMessageConverter (DEPRECATED) - tbd
- KryoMessageConverter (DEPRECATED) - tbd
- Note about
partitionKeyExtractorClass
deprecation in favor of Spring configured beans - [TODO: ensure error is thrown at init]
partitionCount
must be accompanied by 'partitionKeyExtractor' otherwise it's an error
TODO 1.3 to 2.0 need migration pass for KryoConversion see #1142
- When upgrading from 1.3 to 2.0, it is possible to override MessageConverters by simply configuring
@StreamMessageConverter
and annotating the targeted content type:
@Bean
@StreamMessageConverter
public AlwaysStringKryoMessageConverter kryoOverrideMessageConverter() {
return new AlwaysStringKryoMessageConverter(MimeType.valueOf("application/x-java-object"));
}
This can also be used to consume messages with a MessageConverter of type "avro/bytes".
StreamListener annotation post processor (StreamListenerAnnotationBeanPostProcessor
) behavior is enhanced in 2.0 to address the needs of downstream implementations. This section is primarily applied to changes at the framework level (i.e. a new Binder requires a different behavior from the post processor). In a normal context, the users don't have to deal with these type of changes.
StreamListenerMethodSetupOrchestrator is an API hook that allows downstream binder implementations or applications to inject custom strategies to alter the default StreamListener adapter method invocations.
Primary motivations for the new API
The default StreamListenerAnnotationBeanPostProcessor
behaves in a strict manner enforcing various rules and validations. For example, it doesn't allow to have SendTo
annotation with multiple destinations or multiple Output
annotations present on a method annotated with StreamListener
. There might be use cases in which a method needs to return a collection type or an array. Then, based on some rules it wants to send the data to multiple destinations through various bindings. If we rely on the default StreamListenerAnnotationBeanPostProcessor
it is not possible to have this behavior for the StreamListener
methods in a natural way. There is an extension mechanism already provided by the bean post processor to enhance the behavior, but this is not sufficient to satisfy this use case as the default validations still apply.
Contract for StreamListenerSetupMethodOrchestrator
Here is the contract of the StreamListenerSetupMethodOrchestrator interface
Implemenation Details
On the inbound side, the interface provides a default implementation which is equivalent to the current existing behavior, with the exception that this method is now available to be overridden by a potential downstream implementation. The main change though is introduced with the following methods.
boolean supports(Method method)
void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean)
The supports
method takes a method and checks if the implementation can support orchestrating this method. The orchestrateStreamListenerSetupMethod
allows an implementer to orchestrate the method by altering method invocation strategies. For instance, the implementation can allow having multiple bindings/destinations on the outbound side, change the way the method is invoked etc.
There is a default internal implementation provided for the framework that is part of the StreamListenerAnnotationBeanPostProcessor
. This implementation is not for extension or used outside of this bean post processor. By default, this internal implementation supports any method that's annotated with StreamListener
and therefore is used out of the box. However, if the user provides an implementation - be it either as part of a binder implementation or a new type of target type adapter (such as the Kafka Streams target type) - and register it as a Spring bean in the ApplicationContext
, in that case, this bean is checked to see if the StreamListener
method can be invoked using this implementation of the StreamListenerSetupMethodOrchestrator
.
- Binder will be called
spring-cloud-stream-binder-kafka-streams
starting with Elmhurst.RC1 (2.0.0.RC1 of the binder) - Many classes those had
KStream
in its name are replaced withKafkaStreams
(Details below) - Many classes are removed from the public API to internal starting with 2.0.0.RC1 (Details below)
- All the properties that required
kstream
will needkafka.streams
starting from 2.0.0.RC1. For example, earlier it was -spring.cloud.stream.kstream.binder.*
orspring.cloud.stream.kstream.bindings.*
, but now they arespring.cloud.stream.kafka.streams.binder.*
andspring.cloud.stream.kafka.streams.bindings.*
respectively.
The following classes are renamed of removed starting with 2.0.0.RC1
-
Package
org.springframework.cloud.stream.binder.kstream
->org.springframework.cloud.stream.binder.kafka.streams
-
Package
org.springframework.cloud.stream.binder.kstream.annotations
->org.springframework.cloud.stream.binder.kafka.streams.annotations
-
KStreamProcessor
->KafkaStreamsProcessor
-
KStreamBinderConfiguration
->KafkaStreamsBinderConfiguration
-
KStreamBinderSupportAutoConfiguration
->KafkaStreamsBinderSupportAutoConfiguration
-
KStreamApplicationSupportAutoConfiguration
->KafkaStreamsApplicaitonSupportAutoConfiguration
-
KStreamApplicationSupportProperties
->org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties
-
KStreamBindingProperties
->org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBindingProperties
-
KStreamCommonProperties
(Class removed) -
KStreamConsumerProperties
->org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties
-
KStreamProducerProperties
->org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties
-
KStreamExtendedBindingProperties
->org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties
-
KStreamBinder
(No longer available as a public class) -
KStreamBoundElementFactory
(No longer available as a public class) -
KStreamListenerParameterAdapter
(No longer available as a public class) -
KStreamStreamListenerResultAdapter
(No longer available as a public class)
All the availble properties through the kafka streams binder, please refer to the reference docs.