diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
index ab8f319e525c9..5f7064be14eb0 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/ImplicitConversions.scala
@@ -80,13 +80,17 @@ object ImplicitConversions {
implicit def groupedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Grouped[K, V] =
Grouped.`with`[K, V]
- implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
- valueSerde: Serde[V],
- otherValueSerde: Serde[VO]): Joined[K, V, VO] =
+ implicit def joinedFromKeyValueOtherSerde[K, V, VO](implicit
+ keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]
+ ): Joined[K, V, VO] =
Joined.`with`[K, V, VO]
- implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],
- valueSerde: Serde[V]): Materialized[K, V, S] =
+ implicit def materializedFromSerde[K, V, S <: StateStore](implicit
+ keySerde: Serde[K],
+ valueSerde: Serde[V]
+ ): Materialized[K, V, S] =
Materialized.`with`[K, V, S]
implicit def producedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K, V] =
@@ -95,8 +99,10 @@ object ImplicitConversions {
implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Repartitioned[K, V] =
Repartitioned.`with`[K, V]
- implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit keySerde: Serde[K],
- valueSerde: Serde[V],
- otherValueSerde: Serde[VO]): StreamJoined[K, V, VO] =
+ implicit def streamJoinFromKeyValueOtherSerde[K, V, VO](implicit
+ keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]
+ ): StreamJoined[K, V, VO] =
StreamJoined.`with`[K, V, VO]
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
index a36fef83beb88..2e42090d13deb 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/Serdes.scala
@@ -58,8 +58,10 @@ object Serdes {
}
)
- def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
- deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+ def fromFn[T >: Null](
+ serializer: (String, T) => Array[Byte],
+ deserializer: (String, Array[Byte]) => Option[T]
+ ): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 3eeffc484ba52..9430a511f71a4 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -120,8 +120,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @see #table(String)
* @see `org.apache.kafka.streams.StreamsBuilder#table`
*/
- def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
- implicit consumed: Consumed[K, V]
+ def table[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
+ consumed: Consumed[K, V]
): KTable[K, V] =
new KTable(inner.table[K, V](topic, consumed, materialized))
@@ -146,8 +146,8 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
* @return a `GlobalKTable` for the specified topic
* @see `org.apache.kafka.streams.StreamsBuilder#globalTable`
*/
- def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(
- implicit consumed: Consumed[K, V]
+ def globalTable[K, V](topic: String, materialized: Materialized[K, V, ByteArrayKeyValueStore])(implicit
+ consumed: Consumed[K, V]
): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)
@@ -177,10 +177,12 @@ class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
"Use #addGlobalStore(StoreBuilder, String, Consumed, org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
"2.7.0"
)
- def addGlobalStore[K, V](storeBuilder: StoreBuilder[_ <: StateStore],
- topic: String,
- consumed: Consumed[K, V],
- stateUpdateSupplier: ProcessorSupplier[K, V]): StreamsBuilderJ =
+ def addGlobalStore[K, V](
+ storeBuilder: StoreBuilder[_ <: StateStore],
+ topic: String,
+ consumed: Consumed[K, V],
+ stateUpdateSupplier: ProcessorSupplier[K, V]
+ ): StreamsBuilderJ =
inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
index 102c2577902cd..c606c0096898c 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
@@ -110,7 +110,7 @@ class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
def noDefaultBranch(): Map[String, KStream[K, V]] = toScalaMap(inner.noDefaultBranch())
private def toScalaMap(m: util.Map[String, kstream.KStream[K, V]]): collection.immutable.Map[String, KStream[K, V]] =
- m.asScala.map {
- case (name, kStreamJ) => (name, new KStream(kStreamJ))
+ m.asScala.map { case (name, kStreamJ) =>
+ (name, new KStream(kStreamJ))
}.toMap
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
index f4fe9fc5ca852..2bf58ca0e5670 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
@@ -43,8 +43,10 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* @param aggregator a function that computes a new aggregate result
* @return a [[CogroupedKStream]]
*/
- def cogroup[VIn](groupedStream: KGroupedStream[KIn, VIn],
- aggregator: (KIn, VIn, VOut) => VOut): CogroupedKStream[KIn, VOut] =
+ def cogroup[VIn](
+ groupedStream: KGroupedStream[KIn, VIn],
+ aggregator: (KIn, VIn, VOut) => VOut
+ ): CogroupedKStream[KIn, VOut] =
new CogroupedKStream(inner.cogroup(groupedStream.inner, aggregator.asAggregator))
/**
@@ -58,8 +60,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
- def aggregate(initializer: => VOut)(
- implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
+ def aggregate(initializer: => VOut)(implicit
+ materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, materialized))
/**
@@ -74,8 +76,8 @@ class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.CogroupedKStream#aggregate`
*/
- def aggregate(initializer: => VOut, named: Named)(
- implicit materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
+ def aggregate(initializer: => VOut, named: Named)(implicit
+ materialized: Materialized[KIn, VOut, ByteArrayKeyValueStore]
): KTable[KIn, VOut] = new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
index a105ed65541a5..714df97c17595 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -61,8 +61,9 @@ object Consumed {
* @tparam V value type
* @return a new instance of [[Consumed]]
*/
- def `with`[K, V](timestampExtractor: TimestampExtractor)(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): ConsumedJ[K, V] =
+ def `with`[K, V](
+ timestampExtractor: TimestampExtractor
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(timestampExtractor).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
@@ -73,7 +74,8 @@ object Consumed {
* @param resetPolicy the offset reset policy to be used. If `null` the default reset policy from config will be used
* @return a new instance of [[Consumed]]
*/
- def `with`[K, V](resetPolicy: Topology.AutoOffsetReset)(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): ConsumedJ[K, V] =
+ def `with`[K, V](
+ resetPolicy: Topology.AutoOffsetReset
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ConsumedJ[K, V] =
ConsumedJ.`with`(resetPolicy).withKeySerde(keySerde).withValueSerde(valueSerde)
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
index b6dbb0575a990..c614e1488f8c5 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -34,9 +34,11 @@ object Joined {
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[org.apache.kafka.streams.kstream.Joined]] instance with the provided serdes
*/
- def `with`[K, V, VO](implicit keySerde: Serde[K],
- valueSerde: Serde[V],
- otherValueSerde: Serde[VO]): JoinedJ[K, V, VO] =
+ def `with`[K, V, VO](implicit
+ keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]
+ ): JoinedJ[K, V, VO] =
JoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 44a3e568d859e..60a9c572d16e7 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -111,8 +111,9 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#reduce`
*/
- def reduce(reducer: (V, V) => V,
- named: Named)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ def reduce(reducer: (V, V) => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]
+ ): KTable[K, V] =
new KTable(inner.reduce(reducer.asReducer, materialized))
/**
@@ -125,8 +126,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
- def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))
@@ -141,8 +142,8 @@ class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedStream#aggregate`
*/
- def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 292155dbff2e8..3d9e052a2f17c 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -76,8 +76,9 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
- def reduce(adder: (V, V) => V,
- subtractor: (V, V) => V)(implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ def reduce(adder: (V, V) => V, subtractor: (V, V) => V)(implicit
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]
+ ): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, materialized))
/**
@@ -92,8 +93,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#reduce`
*/
- def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(
- implicit materialized: Materialized[K, V, ByteArrayKeyValueStore]
+ def reduce(adder: (V, V) => V, subtractor: (V, V) => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]
): KTable[K, V] =
new KTable(inner.reduce(adder.asReducer, subtractor.asReducer, named, materialized))
@@ -109,8 +110,8 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
- def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ def aggregate[VR](initializer: => VR)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer, adder.asAggregator, subtractor.asAggregator, materialized)
@@ -129,14 +130,16 @@ class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.KGroupedTable#aggregate`
*/
- def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ def aggregate[VR](initializer: => VR, named: Named)(adder: (K, V, VR) => VR, subtractor: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
): KTable[K, VR] =
new KTable(
- inner.aggregate((() => initializer).asInitializer,
- adder.asAggregator,
- subtractor.asAggregator,
- named,
- materialized)
+ inner.aggregate(
+ (() => initializer).asInitializer,
+ adder.asAggregator,
+ subtractor.asAggregator,
+ named,
+ materialized
+ )
)
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index d097b8590350b..dedb4246aaf02 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -558,8 +558,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
- stateStoreNames: String*): KStream[K1, V1] =
+ def transform[K1, V1](
+ transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
+ stateStoreNames: String*
+ ): KStream[K1, V1] =
new KStream(inner.transform(transformerSupplier, stateStoreNames: _*))
/**
@@ -578,9 +580,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def transform[K1, V1](transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
- named: Named,
- stateStoreNames: String*): KStream[K1, V1] =
+ def transform[K1, V1](
+ transformerSupplier: TransformerSupplier[K, V, KeyValue[K1, V1]],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K1, V1] =
new KStream(inner.transform(transformerSupplier, named, stateStoreNames: _*))
/**
@@ -598,8 +602,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
- stateStoreNames: String*): KStream[K1, V1] =
+ def flatTransform[K1, V1](
+ transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+ stateStoreNames: String*
+ ): KStream[K1, V1] =
new KStream(inner.flatTransform(transformerSupplier.asJava, stateStoreNames: _*))
/**
@@ -618,9 +624,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains more or less records with new key and value (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transform`
*/
- def flatTransform[K1, V1](transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
- named: Named,
- stateStoreNames: String*): KStream[K1, V1] =
+ def flatTransform[K1, V1](
+ transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K1, V1] =
new KStream(inner.flatTransform(transformerSupplier.asJava, named, stateStoreNames: _*))
/**
@@ -638,8 +646,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
- stateStoreNames: String*): KStream[K, VR] =
+ def flatTransformValues[VR](
+ valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
/**
@@ -658,9 +668,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
- named: Named,
- stateStoreNames: String*): KStream[K, VR] =
+ def flatTransformValues[VR](
+ valueTransformerSupplier: ValueTransformerSupplier[V, Iterable[VR]],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
/**
@@ -678,8 +690,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
- stateStoreNames: String*): KStream[K, VR] =
+ def flatTransformValues[VR](
+ valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, stateStoreNames: _*))
/**
@@ -698,9 +712,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def flatTransformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
- named: Named,
- stateStoreNames: String*): KStream[K, VR] =
+ def flatTransformValues[VR](
+ valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, Iterable[VR]],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, named, stateStoreNames: _*))
/**
@@ -717,8 +733,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
- stateStoreNames: String*): KStream[K, VR] =
+ def transformValues[VR](
+ valueTransformerSupplier: ValueTransformerSupplier[V, VR],
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*))
/**
@@ -736,9 +754,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerSupplier: ValueTransformerSupplier[V, VR],
- named: Named,
- stateStoreNames: String*): KStream[K, VR] =
+ def transformValues[VR](
+ valueTransformerSupplier: ValueTransformerSupplier[V, VR],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*))
/**
@@ -755,8 +775,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
- stateStoreNames: String*): KStream[K, VR] =
+ def transformValues[VR](
+ valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*))
/**
@@ -774,9 +796,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
- named: Named,
- stateStoreNames: String*): KStream[K, VR] =
+ def transformValues[VR](
+ valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ named: Named,
+ stateStoreNames: String*
+ ): KStream[K, VR] =
new KStream(inner.transformValues[VR](valueTransformerSupplier, named, stateStoreNames: _*))
/**
@@ -792,8 +816,10 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#process`
*/
@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
- def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
- stateStoreNames: String*): Unit = {
+ def process(
+ processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
+ stateStoreNames: String*
+ ): Unit = {
val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
inner.process(processorSupplierJ, stateStoreNames: _*)
}
@@ -830,9 +856,11 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#process`
*/
@deprecated(since = "3.0", message = "Use process(ProcessorSupplier, String*) instead.")
- def process(processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
- named: Named,
- stateStoreNames: String*): Unit = {
+ def process(
+ processorSupplier: () => org.apache.kafka.streams.processor.Processor[K, V],
+ named: Named,
+ stateStoreNames: String*
+ ): Unit = {
val processorSupplierJ: org.apache.kafka.streams.processor.ProcessorSupplier[K, V] = () => processorSupplier()
inner.process(processorSupplierJ, named, stateStoreNames: _*)
}
@@ -1039,7 +1067,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
*/
def join[GK, GV, RV](globalKTable: GlobalKTable[GK, GV])(
keyValueMapper: (K, V) => GK,
- joiner: (V, GV) => RV,
+ joiner: (V, GV) => RV
): KStream[K, RV] =
new KStream(
inner.join[GK, GV, RV](
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 7aba69d50e722..892f39eac76dc 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -86,9 +86,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains only those records that satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
- def filter(predicate: (K, V) => Boolean,
- named: Named,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ def filter(
+ predicate: (K, V) => Boolean,
+ named: Named,
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]
+ ): KTable[K, V] =
new KTable(inner.filter(predicate.asPredicate, named, materialized))
/**
@@ -138,9 +140,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains only those records that do not satisfy the given predicate
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
- def filterNot(predicate: (K, V) => Boolean,
- named: Named,
- materialized: Materialized[K, V, ByteArrayKeyValueStore]): KTable[K, V] =
+ def filterNot(
+ predicate: (K, V) => Boolean,
+ named: Named,
+ materialized: Materialized[K, V, ByteArrayKeyValueStore]
+ ): KTable[K, V] =
new KTable(inner.filterNot(predicate.asPredicate, named, materialized))
/**
@@ -198,9 +202,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
- def mapValues[VR](mapper: V => VR,
- named: Named,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+ def mapValues[VR](
+ mapper: V => VR,
+ named: Named,
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ ): KTable[K, VR] =
new KTable(inner.mapValues[VR](mapper.asValueMapper, named, materialized))
/**
@@ -258,9 +264,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KTable#mapValues`
*/
- def mapValues[VR](mapper: (K, V) => VR,
- named: Named,
- materialized: Materialized[K, VR, ByteArrayKeyValueStore]): KTable[K, VR] =
+ def mapValues[VR](
+ mapper: (K, V) => VR,
+ named: Named,
+ materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+ ): KTable[K, VR] =
new KTable(inner.mapValues[VR](mapper.asValueMapperWithKey, named, materialized))
/**
@@ -337,8 +345,10 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
- stateStoreNames: String*): KTable[K, VR] =
+ def transformValues[VR](
+ valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ stateStoreNames: String*
+ ): KTable[K, VR] =
new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, stateStoreNames: _*))
/**
@@ -364,9 +374,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
- named: Named,
- stateStoreNames: String*): KTable[K, VR] =
+ def transformValues[VR](
+ valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ named: Named,
+ stateStoreNames: String*
+ ): KTable[K, VR] =
new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, named, stateStoreNames: _*))
/**
@@ -389,9 +401,11 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
- stateStoreNames: String*): KTable[K, VR] =
+ def transformValues[VR](
+ valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
+ stateStoreNames: String*
+ ): KTable[K, VR] =
new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, stateStoreNames: _*))
/**
@@ -415,10 +429,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KStream]] that contains records with unmodified key and new values (possibly of different type)
* @see `org.apache.kafka.streams.kstream.KStream#transformValues`
*/
- def transformValues[VR](valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
- named: Named,
- stateStoreNames: String*): KTable[K, VR] =
+ def transformValues[VR](
+ valueTransformerWithKeySupplier: ValueTransformerWithKeySupplier[K, V, VR],
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]],
+ named: Named,
+ stateStoreNames: String*
+ ): KTable[K, VR] =
new KTable(inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, named, stateStoreNames: _*))
/**
@@ -619,10 +635,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
- def join[VR, KO, VO](other: KTable[KO, VO],
- keyExtractor: Function[V, KO],
- joiner: ValueJoiner[V, VO, VR],
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+ def join[VR, KO, VO](
+ other: KTable[KO, VO],
+ keyExtractor: Function[V, KO],
+ joiner: ValueJoiner[V, VO, VR],
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+ ): KTable[K, VR] =
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
/**
@@ -638,11 +656,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
- def join[VR, KO, VO](other: KTable[KO, VO],
- keyExtractor: Function[V, KO],
- joiner: ValueJoiner[V, VO, VR],
- named: Named,
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+ def join[VR, KO, VO](
+ other: KTable[KO, VO],
+ keyExtractor: Function[V, KO],
+ joiner: ValueJoiner[V, VO, VR],
+ named: Named,
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+ ): KTable[K, VR] =
new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
/**
@@ -657,10 +677,12 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
- def leftJoin[VR, KO, VO](other: KTable[KO, VO],
- keyExtractor: Function[V, KO],
- joiner: ValueJoiner[V, VO, VR],
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+ def leftJoin[VR, KO, VO](
+ other: KTable[KO, VO],
+ keyExtractor: Function[V, KO],
+ joiner: ValueJoiner[V, VO, VR],
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+ ): KTable[K, VR] =
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, materialized))
/**
@@ -676,11 +698,13 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @return a [[KTable]] that contains join-records for each key and values computed by the given joiner,
* one for each matched record-pair with the same key
*/
- def leftJoin[VR, KO, VO](other: KTable[KO, VO],
- keyExtractor: Function[V, KO],
- joiner: ValueJoiner[V, VO, VR],
- named: Named,
- materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]): KTable[K, VR] =
+ def leftJoin[VR, KO, VO](
+ other: KTable[KO, VO],
+ keyExtractor: Function[V, KO],
+ joiner: ValueJoiner[V, VO, VR],
+ named: Named,
+ materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+ ): KTable[K, VR] =
new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, joiner, named, materialized))
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
index eb126f043ccc8..421ac5afeb3ad 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
@@ -50,8 +50,9 @@ object Materialized {
* @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given storeName
*/
- def as[K, V, S <: StateStore](storeName: String)(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): MaterializedJ[K, V, S] =
+ def as[K, V, S <: StateStore](
+ storeName: String
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, S] =
MaterializedJ.as(storeName).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
@@ -68,8 +69,9 @@ object Materialized {
* @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given supplier
*/
- def as[K, V](supplier: WindowBytesStoreSupplier)(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
+ def as[K, V](
+ supplier: WindowBytesStoreSupplier
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArrayWindowStore] =
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
@@ -86,8 +88,9 @@ object Materialized {
* @param valueSerde the value serde to use.
* @return a new [[Materialized]] instance with the given supplier
*/
- def as[K, V](supplier: SessionBytesStoreSupplier)(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
+ def as[K, V](
+ supplier: SessionBytesStoreSupplier
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): MaterializedJ[K, V, ByteArraySessionStore] =
MaterializedJ.as(supplier).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
index 351e0a5b375dd..48f917875867b 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
@@ -53,7 +53,8 @@ object Produced {
* @see KStream#through(String, Produced)
* @see KStream#to(String, Produced)
*/
- def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): ProducedJ[K, V] =
+ def `with`[K, V](
+ partitioner: StreamPartitioner[K, V]
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): ProducedJ[K, V] =
ProducedJ.`with`(keySerde, valueSerde, partitioner)
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
index 06a0f4f4ced0b..5f33efa78aa4f 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
@@ -65,8 +65,9 @@ object Repartitioned {
* @return A new [[Repartitioned]] instance configured with keySerde, valueSerde, and partitioner
* @see KStream#repartition(Repartitioned)
*/
- def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K],
- valueSerde: Serde[V]): RepartitionedJ[K, V] =
+ def `with`[K, V](
+ partitioner: StreamPartitioner[K, V]
+ )(implicit keySerde: Serde[K], valueSerde: Serde[V]): RepartitionedJ[K, V] =
RepartitionedJ.`streamPartitioner`(partitioner).withKeySerde(keySerde).withValueSerde(valueSerde)
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
index e5c5823b6758d..1b20179d5d38d 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
@@ -40,8 +40,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate`
*/
- def aggregate(initializer: => V, merger: (K, V, V) => V)(
- implicit materialized: Materialized[K, V, ByteArraySessionStore]
+ def aggregate(initializer: => V, merger: (K, V, V) => V)(implicit
+ materialized: Materialized[K, V, ByteArraySessionStore]
): KTable[Windowed[K], V] =
new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, materialized))
@@ -56,8 +56,8 @@ class SessionWindowedCogroupedKStream[K, V](val inner: SessionWindowedCogroupedK
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream#aggregate`
*/
- def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)(
- implicit materialized: Materialized[K, V, ByteArraySessionStore]
+ def aggregate(initializer: => V, merger: (K, V, V) => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArraySessionStore]
): KTable[Windowed[K], V] =
new KTable(inner.aggregate((() => initializer).asInitializer, merger.asMerger, named, materialized))
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 0a20444d68197..3d6e157ecdced 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -49,8 +49,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
*/
- def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArraySessionStore]
+ def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArraySessionStore]
): KTable[Windowed[K], VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized)
@@ -68,8 +68,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#aggregate`
*/
- def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArraySessionStore]
+ def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR, merger: (K, VR, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArraySessionStore]
): KTable[Windowed[K], VR] =
new KTable(
inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, named, materialized)
@@ -127,8 +127,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V)(
- implicit materialized: Materialized[K, V, ByteArraySessionStore]
+ def reduce(reducer: (V, V) => V)(implicit
+ materialized: Materialized[K, V, ByteArraySessionStore]
): KTable[Windowed[K], V] =
new KTable(inner.reduce(reducer.asReducer, materialized))
@@ -141,8 +141,8 @@ class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
* the latest (rolling) aggregate for each key within a window
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V, named: Named)(
- implicit materialized: Materialized[K, V, ByteArraySessionStore]
+ def reduce(reducer: (V, V) => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArraySessionStore]
): KTable[Windowed[K], V] =
new KTable(inner.reduce(reducer.asReducer, named, materialized))
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
index 15bda130f21f4..9caad638e4cd7 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
@@ -35,9 +35,11 @@ object StreamJoined {
* @param otherValueSerde the otherValue serde to use. If `null` the default value serde from config will be used
* @return new [[StreamJoined]] instance with the provided serdes
*/
- def `with`[K, V, VO](implicit keySerde: Serde[K],
- valueSerde: Serde[V],
- otherValueSerde: Serde[VO]): StreamJoinedJ[K, V, VO] =
+ def `with`[K, V, VO](implicit
+ keySerde: Serde[K],
+ valueSerde: Serde[V],
+ otherValueSerde: Serde[VO]
+ ): StreamJoinedJ[K, V, VO] =
StreamJoinedJ.`with`(keySerde, valueSerde, otherValueSerde)
/**
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
index e9962a63d81f4..ad24228ecc686 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
@@ -39,8 +39,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate`
*/
- def aggregate(initializer: => V)(
- implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+ def aggregate(initializer: => V)(implicit
+ materialized: Materialized[K, V, ByteArrayWindowStore]
): KTable[Windowed[K], V] =
new KTable(inner.aggregate((() => initializer).asInitializer, materialized))
@@ -54,8 +54,8 @@ class TimeWindowedCogroupedKStream[K, V](val inner: TimeWindowedCogroupedKStream
* (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream#aggregate`
*/
- def aggregate(initializer: => V, named: Named)(
- implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+ def aggregate(initializer: => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArrayWindowStore]
): KTable[Windowed[K], V] =
new KTable(inner.aggregate((() => initializer).asInitializer, named, materialized))
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index fdb137c83c247..4fcf227e03723 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -47,8 +47,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
*/
- def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
+ def aggregate[VR](initializer: => VR)(aggregator: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayWindowStore]
): KTable[Windowed[K], VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, materialized))
@@ -63,8 +63,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#aggregate`
*/
- def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(
- implicit materialized: Materialized[K, VR, ByteArrayWindowStore]
+ def aggregate[VR](initializer: => VR, named: Named)(aggregator: (K, V, VR) => VR)(implicit
+ materialized: Materialized[K, VR, ByteArrayWindowStore]
): KTable[Windowed[K], VR] =
new KTable(inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, named, materialized))
@@ -120,8 +120,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V)(
- implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+ def reduce(reducer: (V, V) => V)(implicit
+ materialized: Materialized[K, V, ByteArrayWindowStore]
): KTable[Windowed[K], V] =
new KTable(inner.reduce(reducer.asReducer, materialized))
@@ -135,8 +135,8 @@ class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
* latest (rolling) aggregate for each key
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream#reduce`
*/
- def reduce(reducer: (V, V) => V, named: Named)(
- implicit materialized: Materialized[K, V, ByteArrayWindowStore]
+ def reduce(reducer: (V, V) => V, named: Named)(implicit
+ materialized: Materialized[K, V, ByteArrayWindowStore]
): KTable[Windowed[K], V] =
new KTable(inner.reduce(reducer.asReducer, materialized))
}
diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
index c4e7537f924ff..0c72358c15f3e 100644
--- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
+++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
@@ -57,8 +57,10 @@ object Serdes extends LowPrioritySerdes {
}
)
- def fromFn[T >: Null](serializer: (String, T) => Array[Byte],
- deserializer: (String, Array[Byte]) => Option[T]): Serde[T] =
+ def fromFn[T >: Null](
+ serializer: (String, T) => Array[Byte],
+ deserializer: (String, Array[Byte]) => Option[T]
+ ): Serde[T] =
JSerdes.serdeFrom(
new Serializer[T] {
override def serialize(topic: String, data: T): Array[Byte] = serializer(topic, data)
@@ -75,13 +77,13 @@ object Serdes extends LowPrioritySerdes {
trait LowPrioritySerdes {
- implicit val nullSerde: Serde[Null] = {
+ implicit val nullSerde: Serde[Null] =
Serdes.fromFn[Null](
{ _: Null =>
null
- }, { _: Array[Byte] =>
+ },
+ { _: Array[Byte] =>
None
}
)
- }
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 56b1c29378e19..e9577bcf73c6b 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -54,7 +54,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val clicksPerRegion: KTable[String, Long] =
userClicksStream
- // Join the stream against the table.
+ // Join the stream against the table.
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
// Change the stream from -> to ->
@@ -98,7 +98,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ
val clicksPerRegion: KTable[String, Long] =
userClicksStream
- // Join the stream against the table.
+ // Join the stream against the table.
.leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks))
// Change the stream from -> to ->
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
index ce75f76984f84..325e26ce0d118 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/TopologyTest.scala
@@ -287,13 +287,12 @@ class TopologyTest {
val textLines = streamBuilder.stream[String, String](inputTopic)
val _: KTable[String, Long] = textLines
- .transform(
- () =>
- new Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext): Unit = ()
- override def transform(key: String, value: String): KeyValue[String, String] =
- new KeyValue(key, value.toLowerCase)
- override def close(): Unit = ()
+ .transform(() =>
+ new Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = ()
+ override def transform(key: String, value: String): KeyValue[String, String] =
+ new KeyValue(key, value.toLowerCase)
+ override def close(): Unit = ()
}
)
.groupBy((_, v) => v)
@@ -308,13 +307,12 @@ class TopologyTest {
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopic)
- val lowered: KStreamJ[String, String] = textLines.transform(
- () =>
- new Transformer[String, String, KeyValue[String, String]] {
- override def init(context: ProcessorContext): Unit = ()
- override def transform(key: String, value: String): KeyValue[String, String] =
- new KeyValue(key, value.toLowerCase)
- override def close(): Unit = ()
+ val lowered: KStreamJ[String, String] = textLines.transform(() =>
+ new Transformer[String, String, KeyValue[String, String]] {
+ override def init(context: ProcessorContext): Unit = ()
+ override def transform(key: String, value: String): KeyValue[String, String] =
+ new KeyValue(key, value.toLowerCase)
+ override def close(): Unit = ()
}
)
@@ -378,16 +376,20 @@ class TopologyTest {
mappedStream
.filter((k: String, _: String) => k == "A")
- .join(stream2)((v1: String, v2: Int) => v1 + ":" + v2.toString,
- JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
+ .join(stream2)(
+ (v1: String, v2: Int) => v1 + ":" + v2.toString,
+ JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24))
+ )(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.intSerde)
)
.to(JOINED_TOPIC)
mappedStream
.filter((k: String, _: String) => k == "A")
- .join(stream3)((v1: String, v2: String) => v1 + ":" + v2.toString,
- JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24)))(
+ .join(stream3)(
+ (v1: String, v2: String) => v1 + ":" + v2.toString,
+ JoinWindows.of(Duration.ofMillis(5000)).grace(Duration.ofHours(24))
+ )(
StreamJoined.`with`(NewSerdes.stringSerde, NewSerdes.stringSerde, NewSerdes.stringSerde)
)
.to(JOINED_TOPIC)
@@ -457,10 +459,14 @@ class TopologyTest {
builder
}
- assertNotEquals(getTopologyScala.build(props).describe.toString,
- getTopologyScala.build(propsNoOptimization).describe.toString)
- assertEquals(getTopologyScala.build(propsNoOptimization).describe.toString,
- getTopologyJava.build(propsNoOptimization).describe.toString)
+ assertNotEquals(
+ getTopologyScala.build(props).describe.toString,
+ getTopologyScala.build(propsNoOptimization).describe.toString
+ )
+ assertEquals(
+ getTopologyScala.build(propsNoOptimization).describe.toString,
+ getTopologyJava.build(propsNoOptimization).describe.toString
+ )
assertEquals(getTopologyScala.build(props).describe.toString, getTopologyJava.build(props).describe.toString)
}
diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
index 5b2aa7626825d..984cb74a6e2fd 100644
--- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
+++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinScalaIntegrationTestBase.scala
@@ -100,36 +100,44 @@ class StreamToTableJoinScalaIntegrationTestBase extends StreamToTableJoinTestDat
p
}
- def produceNConsume(userClicksTopic: String,
- userRegionsTopic: String,
- outputTopic: String,
- waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String, Long]] = {
+ def produceNConsume(
+ userClicksTopic: String,
+ userRegionsTopic: String,
+ outputTopic: String,
+ waitTillRecordsReceived: Boolean = true
+ ): java.util.List[KeyValue[String, Long]] = {
import _root_.scala.jdk.CollectionConverters._
// Publish user-region information.
val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig()
- IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic,
- userRegions.asJava,
- userRegionsProducerConfig,
- mockTime,
- false)
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ userRegionsTopic,
+ userRegions.asJava,
+ userRegionsProducerConfig,
+ mockTime,
+ false
+ )
// Publish user-click information.
val userClicksProducerConfig: Properties = getUserClicksProducerConfig()
- IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic,
- userClicks.asJava,
- userClicksProducerConfig,
- mockTime,
- false)
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ userClicksTopic,
+ userClicks.asJava,
+ userClicksProducerConfig,
+ mockTime,
+ false
+ )
if (waitTillRecordsReceived) {
// consume and verify result
val consumerConfig = getConsumerConfig()
- IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig,
- outputTopic,
- expectedClicksPerRegion.asJava)
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ consumerConfig,
+ outputTopic,
+ expectedClicksPerRegion.asJava
+ )
} else {
java.util.Collections.emptyList()
}