From 3757c427b8347dc52547d244b33ac6b3e4bcd22c Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Fri, 15 Sep 2017 15:41:34 +0200 Subject: [PATCH 1/7] Now allows to fine control the test events order - previously, the events for each mock topic were submitted all at once during the test, topic per topic - this commit ensure that MockStream records the order in which the test author invoked `input` in the test fixture makes sure they are submitted to the topology in that order --- CONTRIBUTORS.md | 2 +- README.md | 20 ++++++++++ build.sbt | 2 +- project/build.properties | 1 + project/plugins.sbt | 4 +- src/main/scala/MockedStreams.scala | 22 ++++++----- src/test/scala/MockedStreamsSpec.scala | 55 ++++++++++++++++++++++++++ 7 files changed, 92 insertions(+), 14 deletions(-) create mode 100644 project/build.properties diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ebe177f..f37ac8d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,4 +2,4 @@ * Hamidreza Afzali * Jendrik Poloczek - +* Svend Vanderveken diff --git a/README.md b/README.md index 3416c4c..f982685 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,26 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) +## Event order and multiple emissions + +The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario. + +This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed. + +In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + ## State Store When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: diff --git a/build.sbt b/build.sbt index 22781d7..b2b85ba 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.11.0.0" +val kafkaVersion = "0.11.0.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..94005e5 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.0.0 diff --git a/project/plugins.sbt b/project/plugins.sbt index 92096cd..b7a9e63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 3238f26..0a68c66 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,12 +30,13 @@ object MockedStreams { def apply() = Builder() - case class Input(seq: Seq[(Array[Byte], Array[Byte])]) + case class Event(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: Map[String, Input] = Map()) { + inputs: List[Event] = List.empty) { + def config(configuration: Properties) = this.copy(configuration = configuration) @@ -43,11 +44,15 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) } - this.copy(inputs = inputs + (topic -> Input(in))) + + val updatedInputs = newInput.foldLeft(inputs) { + case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events + } + + this.copy(inputs = updatedInputs) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { @@ -99,10 +104,9 @@ object MockedStreams { } private def produce(driver: Driver) = { - inputs.foreach { case (topic, input) => - input.seq.foreach { case (key, value) => + inputs.reverse.foreach{ + case Event(topic, key, value) => driver.process(topic, key, value) - } } } @@ -125,5 +129,3 @@ object MockedStreams { class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") } - - diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5833238..cb197db 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.stateTable(StoreName) shouldEqual inputA.toMap } + it should "assert correctly when joining events sent to 2 Ktables in a specific order" in { + import Fixtures.Multi._ + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + + builder.output(OutputATopic, strings, ints, expectedOutput.size) + .shouldEqual(expectedOutput) + } + it should "assert correctly when processing windowed state output topology" in { import Fixtures.Multi._ @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" + val Store2Name = "store2" def topology1Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) @@ -207,6 +227,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputATopic) } + def topology1Outputbis(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val table = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, ints, StoreName) + + streamA.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) + .to(strings, ints, OutputATopic) + } + def topology1WindowOutput(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputCTopic) streamA.groupByKey(strings, ints).count( @@ -230,6 +262,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } + + def topologyTables(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val tableA = streamA.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + StoreName) + + val tableB = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + Store2Name) + + val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner) + + resultTable + .toStream + .to(strings, ints, OutputATopic) + } } } From e7aa8fc22cd93483af85cdd0ee63062ea1729561 Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Fri, 15 Sep 2017 16:23:27 +0200 Subject: [PATCH 2/7] removed my debug temp method --- src/test/scala/MockedStreamsSpec.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index cb197db..e9af0e9 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -227,18 +227,6 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputATopic) } - def topology1Outputbis(builder: KStreamBuilder) = { - val streamA = builder.stream(strings, ints, InputATopic) - val streamB = builder.stream(strings, ints, InputBTopic) - - val table = streamB.groupByKey(strings, ints).aggregate( - new LastInitializer, - new LastAggregator, ints, StoreName) - - streamA.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) - .to(strings, ints, OutputATopic) - } - def topology1WindowOutput(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputCTopic) streamA.groupByKey(strings, ints).count( From 82fca24dc8fd042c45ab38834c8048dcdbf33601 Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Sun, 24 Sep 2017 09:27:33 +0200 Subject: [PATCH 3/7] some readability improvements + bump sbt to 1.0.2 --- project/build.properties | 2 +- src/main/scala/MockedStreams.scala | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/project/build.properties b/project/build.properties index 94005e5..b7dd3cb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.0 +sbt.version=1.0.2 diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 0a68c66..fcc0b94 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,13 +30,12 @@ object MockedStreams { def apply() = Builder() - case class Event(topic: String, key: Array[Byte], value: Array[Byte]) + case class Record(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: List[Event] = List.empty) { - + inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) @@ -44,20 +43,22 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val updatedInputs = newInput.foldLeft(inputs) { - case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events + val updatedRecords = newRecords.foldLeft(inputs) { + case (events, (k, v)) => + val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + newRecord :: events } - this.copy(inputs = updatedInputs) + this.copy(inputs = updatedRecords) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -67,10 +68,10 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -103,9 +104,9 @@ object MockedStreams { new Driver(new StreamsConfig(props), builder) } - private def produce(driver: Driver) = { + private def produce(driver: Driver): Unit = { inputs.reverse.foreach{ - case Event(topic, key, value) => + case Record(topic, key, value) => driver.process(topic, key, value) } } From 2cfdd9bb40b33c8d01b91051354df1bdf0543a5e Mon Sep 17 00:00:00 2001 From: Jendrik Date: Wed, 27 Sep 2017 11:24:55 +0200 Subject: [PATCH 4/7] simplified: adding of records; replaying of records --- src/main/scala/MockedStreams.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index fcc0b94..dc14adf 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -50,7 +50,7 @@ object MockedStreams { val updatedRecords = newRecords.foldLeft(inputs) { case (events, (k, v)) => val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) - newRecord :: events + events :+ newRecord } this.copy(inputs = updatedRecords) @@ -105,7 +105,7 @@ object MockedStreams { } private def produce(driver: Driver): Unit = { - inputs.reverse.foreach{ + inputs.foreach{ case Record(topic, key, value) => driver.process(topic, key, value) } From bafad3cacdf5d6b510b70a3e36b7733770436f7f Mon Sep 17 00:00:00 2001 From: Jendrik Date: Wed, 27 Sep 2017 11:26:31 +0200 Subject: [PATCH 5/7] README: added new versions in readme --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index f982685..14a980d 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,15 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.3.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.4.0" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.4.0 | 0.11.0.1 | | 1.3.0 | 0.11.0.0 | | 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | From 5efe63f189a4d9096ff14891549d899a475ab22a Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Mon, 2 Oct 2017 13:15:01 +0200 Subject: [PATCH 6/7] Updated: README.md and CHANGELOG.md --- CHANGELOG.md | 7 +++++++ README.md | 8 ++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9e9739..4b86215 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Mocked Streams 1.4.0 + +* Build against Apache Kafka 0.11.0.1 +* Added record order and multiple emissions by Svend Vanderveken +* Updated SBT to 1.0.2 +* Added Svend Vanderveken to CONTRIBUTORS.md + ## Mocked Streams 1.2.2 * Build against Apache Kafka 0.11.0.0 diff --git a/README.md b/README.md index 14a980d..c36203d 100644 --- a/README.md +++ b/README.md @@ -50,13 +50,13 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) -## Event order and multiple emissions +## Record order and multiple emissions -The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario. +The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario. -This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed. +This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed. -In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. +In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) From 12b45aeb6cb52457d847b12c0908062730ecb98e Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Mon, 2 Oct 2017 13:17:50 +0200 Subject: [PATCH 7/7] changed: version bump to 1.4.0 in build.sbt --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index b2b85ba..0d13244 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.3.0", + version := "1.4.0", scalaVersion := "2.11.11", crossScalaVersions := Seq("2.12.2","2.11.11"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",