From 0dfe1ad0768fe67415b7f959df9c5f401e8f32d1 Mon Sep 17 00:00:00 2001 From: dan hamilton Date: Thu, 23 Aug 2018 11:42:13 +0100 Subject: [PATCH 1/8] add method to allow input with timestamps --- src/main/scala/MockedStreams.scala | 21 ++++++++++++++++-- src/test/scala/MockedStreamsSpec.scala | 30 ++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 0d1cb7a..41e78dc 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -57,7 +57,7 @@ object MockedStreams { val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - val updatedRecords = newRecords.foldLeft(inputs) { + val updatedRecords: List[ConsumerRecord[Array[Byte], Array[Byte]]] = newRecords.foldLeft(inputs) { case (events, (k, v)) => val newRecord = factory.create(topic, k, v) events :+ newRecord @@ -66,6 +66,20 @@ object MockedStreams { this.copy(inputs = updatedRecords) } + def inputWithTimeStamps[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V, Long)]): Builder = { + val keySer = key.serializer + val valSer = value.serializer + val factory = new ConsumerRecordFactory[K, V](keySer, valSer) + + val updatedRecords: List[ConsumerRecord[Array[Byte], Array[Byte]]] = newRecords.foldLeft(inputs) { + case (events, (k, v, timestampMs)) => + val newRecord = factory.create(topic, k, v, timestampMs) + events :+ newRecord + } + this.copy(inputs = updatedRecords) + } + + def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty withProcessedDriver { driver => @@ -88,8 +102,11 @@ object MockedStreams { list.toMap } - def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, + def windowStateTable[K, V](name: String, + key: K, + timeFrom: Long = 0, timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => + val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) val list = records.asScala.toList.map { record => (record.key, record.value) } diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 91d2612..74adbe5 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -174,6 +174,20 @@ class MockedStreamsSpec extends FlatSpec with Matchers { output shouldEqual expected } + + it should "accept consumer records with custom timestamps" in { + + import Fixtures.Multi._ + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .inputWithTimeStamps(InputCTopic, strings, ints, inputCWithTimeStamps) + .stores(Seq(StoreName)) + + builder.windowStateTable(StoreName, "x") + .shouldEqual(expectedCWithTimeStamps.toMap) + } + class LastInitializer extends Initializer[Integer] { override def apply() = 0 } @@ -216,11 +230,27 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val inputA = Seq(("x", int(1)), ("y", int(2))) val inputB = Seq(("x", int(4)), ("y", int(3))) val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1))) + + val inputCWithTimeStamps = Seq( + ("x", int(1), 1000L), + ("x", int(1), 1000L), + ("x", int(1), 1001L), + ("x", int(1), 1001L), + ("x", int(1), 1002L) + ) + val expectedA = Seq(("x", int(5)), ("y", int(5))) val expectedB = Seq(("x", int(3)), ("y", int(1))) + val expectedCx = Seq((1, 2), (2, 1)) val expectedCy = Seq((1, 1)) + val expectedCWithTimeStamps = Seq( + 1000 -> 2, + 1001 -> 2, + 1002 -> 1 + ) + val strings = Serdes.String() val ints = Serdes.Integer() val serdes = Consumed.`with`(strings, ints) From 85438e759ef018b1e05b9689e09e0d0cd5edb220 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Aug 2018 21:46:26 +0200 Subject: [PATCH 2/8] avoid code duplication --- build.sbt | 2 +- src/main/scala/MockedStreams.scala | 46 +++++++++++++++----------- src/test/scala/MockedStreamsSpec.scala | 2 +- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/build.sbt b/build.sbt index 3c51c3d..85f90eb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "2.0.0", + version := "2.1.0", scalaVersion := "2.12.6", crossScalaVersions := Seq("2.12.6","2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 41e78dc..9836f40 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -17,6 +17,7 @@ package com.madewithtea.mockedstreams +import java.lang import java.util.{Properties, UUID} import org.apache.kafka.clients.consumer.ConsumerRecord @@ -51,34 +52,40 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { + private def inputPriv[K, V](topic: String, key: Serde[K], value: Serde[V], + records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { val keySer = key.serializer val valSer = value.serializer - val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - val updatedRecords: List[ConsumerRecord[Array[Byte], Array[Byte]]] = newRecords.foldLeft(inputs) { - case (events, (k, v)) => - val newRecord = factory.create(topic, k, v) + def foldWithoutTime(events: List[ConsumerRecord[Array[Byte], Array[Byte]]], kv: (K, V)) = { + val newRecord = factory.create(topic, kv._1, kv._2) + events :+ newRecord + } + + def foldTime(events: List[ConsumerRecord[Array[Byte], Array[Byte]]], kvt: (K, V, Long)) = kvt match { + case (k, v, t) => + val newRecord = factory.create(topic, k, v, t) events :+ newRecord } + val updatedRecords = records match { + case Left(withoutTime) => withoutTime.foldLeft(inputs)(foldWithoutTime) + case Right(withTime) => withTime.foldLeft(inputs)(foldTime) + } this.copy(inputs = updatedRecords) } - def inputWithTimeStamps[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V, Long)]): Builder = { - val keySer = key.serializer - val valSer = value.serializer - val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - - val updatedRecords: List[ConsumerRecord[Array[Byte], Array[Byte]]] = newRecords.foldLeft(inputs) { - case (events, (k, v, timestampMs)) => - val newRecord = factory.create(topic, k, v, timestampMs) - events :+ newRecord - } - this.copy(inputs = updatedRecords) + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { + inputPriv(topic, key, value, Left(newRecords)) } + def inputWithTime[K, V](topic: String, + key: Serde[K], + value: Serde[V], + newRecords: Seq[(K, V, Long)]): Builder = { + inputPriv(topic, key, value, Right(newRecords)) + } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty @@ -92,7 +99,7 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = output[K, V](topic, key, value, size).toMap def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => @@ -105,7 +112,7 @@ object MockedStreams { def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, - timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => + timeTo: Long = Long.MaxValue): Map[lang.Long, V] = withProcessedDriver { driver => val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) @@ -127,7 +134,7 @@ object MockedStreams { inputs.foreach(driver.pipeInput) private def withProcessedDriver[T](f: Driver => T): T = { - if(inputs.isEmpty) + if (inputs.isEmpty) throw new NoInputSpecified val driver = stream @@ -143,4 +150,5 @@ object MockedStreams { class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") + } diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 74adbe5..ee8ddd1 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -181,7 +181,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val builder = MockedStreams() .topology(topology1WindowOutput) - .inputWithTimeStamps(InputCTopic, strings, ints, inputCWithTimeStamps) + .inputWithTime(InputCTopic, strings, ints, inputCWithTimeStamps) .stores(Seq(StoreName)) builder.windowStateTable(StoreName, "x") From 6be5a4187040dad85c36043575952a4aa35dd2fe Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:09:55 +0200 Subject: [PATCH 3/8] refactored input method; avoid code duplication --- src/main/scala/MockedStreams.scala | 56 ++++++++++++------------------ 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 9836f40..79a8135 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -52,40 +52,12 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - private def inputPriv[K, V](topic: String, key: Serde[K], value: Serde[V], - records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { - val keySer = key.serializer - val valSer = value.serializer - val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - - def foldWithoutTime(events: List[ConsumerRecord[Array[Byte], Array[Byte]]], kv: (K, V)) = { - val newRecord = factory.create(topic, kv._1, kv._2) - events :+ newRecord - } - - def foldTime(events: List[ConsumerRecord[Array[Byte], Array[Byte]]], kvt: (K, V, Long)) = kvt match { - case (k, v, t) => - val newRecord = factory.create(topic, k, v, t) - events :+ newRecord - } + def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]) = + _input(topic, key, value, Left(records)) - val updatedRecords = records match { - case Left(withoutTime) => withoutTime.foldLeft(inputs)(foldWithoutTime) - case Right(withTime) => withTime.foldLeft(inputs)(foldTime) - } - this.copy(inputs = updatedRecords) - } - - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { - inputPriv(topic, key, value, Left(newRecords)) - } + def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]) = + _input(topic, key, value, Right(records)) - def inputWithTime[K, V](topic: String, - key: Serde[K], - value: Serde[V], - newRecords: Seq[(K, V, Long)]): Builder = { - inputPriv(topic, key, value, Right(newRecords)) - } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty @@ -102,7 +74,7 @@ object MockedStreams { def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String) = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -121,6 +93,23 @@ object MockedStreams { list.toMap } + private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], + records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { + val keySer = key.serializer + val valSer = value.serializer + val factory = new ConsumerRecordFactory[K, V](keySer, valSer) + + val updatedRecords = records match { + case Left(withoutTime) => withoutTime.foldLeft(inputs) { + case (events, (k, v)) => events :+ factory.create(topic, k, v) + } + case Right(withTime) => withTime.foldLeft(inputs) { + case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp) + } + } + this.copy(inputs = updatedRecords) + } + // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties @@ -150,5 +139,4 @@ object MockedStreams { class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") - } From 8eaa68fd609f64d15faacf893ad1bb2afe9653c5 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:20:38 +0200 Subject: [PATCH 4/8] removed useless import and type --- src/main/scala/MockedStreams.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 79a8135..6fc36b7 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -17,7 +17,6 @@ package com.madewithtea.mockedstreams -import java.lang import java.util.{Properties, UUID} import org.apache.kafka.clients.consumer.ConsumerRecord @@ -84,8 +83,7 @@ object MockedStreams { def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, - timeTo: Long = Long.MaxValue): Map[lang.Long, V] = withProcessedDriver { driver => - + timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) val list = records.asScala.toList.map { record => (record.key, record.value) } From 775a0916da46e5b7fa29da2e14182b571b88f541 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:34:50 +0200 Subject: [PATCH 5/8] added documentation for inputWithTime; added new contributor; added changelog --- CHANGELOG.md | 6 ++++++ CONTRIBUTORS.md | 1 + README.md | 16 ++++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 25515ac..4e80513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Mocked Streams 2.1 + +* Added .inputWithTime to allow custom timestamps +* Thanks to Dan Hamilton for .inputWithTime implementation +* Added Dan Hamilton to CONTRIBUTORS.md + ## Mocked Streams 2.0 * Build against Apache Kafka 2.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 2c677cf..4633266 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -5,3 +5,4 @@ * Svend Vanderveken * Daniel Wojda * Michal Dziemianko +* Dan Hamilton diff --git a/README.md b/README.md index cdee1f5..9cf14ff 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,22 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and mstreams.windowStateTable("store-name", "x") shouldEqual someMapX mstreams.windowStateTable("store-name", "y") shouldEqual someMapY +## Adding Timestamps + +With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example: + + val inputA = Seq( + ("x", int(1), 1000L), + ("x", int(1), 1001L), + ("x", int(1), 1002L) + ) + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .inputWithTime(InputCTopic, strings, ints, inputA) + .stores(Seq(StoreName)) + + ## Custom Streams Configuration Sometimes you need to pass a custom configuration to Kafka Streams: From 1a80b7bb1bc4a4e46cc9bf48a953ba1a090a853f Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:36:33 +0200 Subject: [PATCH 6/8] changed version in README.md --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9cf14ff..d2fdd18 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 2.0.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 2.1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.0.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.1.0" % "test" Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka) @@ -14,7 +14,8 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| -| 2.0.0 | 2.0.0.0 | +| 2.1.0 | 2.0.0.0 | + 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | From bc6374c5f776ce7434df1881cf97a8b691d6707f Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:39:50 +0200 Subject: [PATCH 7/8] improved indention --- src/main/scala/MockedStreams.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 6fc36b7..cd2d366 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -92,7 +92,7 @@ object MockedStreams { } private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], - records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { + records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { val keySer = key.serializer val valSer = value.serializer val factory = new ConsumerRecordFactory[K, V](keySer, valSer) @@ -137,4 +137,5 @@ object MockedStreams { class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") + } From a37223346dcd6c04ba9b329ea5e92ed246d27bf0 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 25 Aug 2018 01:41:47 +0200 Subject: [PATCH 8/8] small code formatting changes --- src/main/scala/MockedStreams.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index cd2d366..de93838 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -57,7 +57,6 @@ object MockedStreams { def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]) = _input(topic, key, value, Right(records)) - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty withProcessedDriver { driver => @@ -121,8 +120,7 @@ object MockedStreams { inputs.foreach(driver.pipeInput) private def withProcessedDriver[T](f: Driver => T): T = { - if (inputs.isEmpty) - throw new NoInputSpecified + if (inputs.isEmpty) throw new NoInputSpecified val driver = stream produce(driver)