diff --git a/CHANGELOG.md b/CHANGELOG.md index d9e9739..4b86215 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## Mocked Streams 1.4.0 + +* Build against Apache Kafka 0.11.0.1 +* Added record order and multiple emissions by Svend Vanderveken +* Updated SBT to 1.0.2 +* Added Svend Vanderveken to CONTRIBUTORS.md + ## Mocked Streams 1.2.2 * Build against Apache Kafka 0.11.0.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ebe177f..f37ac8d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,4 +2,4 @@ * Hamidreza Afzali * Jendrik Poloczek - +* Svend Vanderveken diff --git a/README.md b/README.md index 3416c4c..c36203d 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,15 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.3.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.4.0" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.4.0 | 0.11.0.1 | | 1.3.0 | 0.11.0.0 | | 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | @@ -49,6 +50,26 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) +## Record order and multiple emissions + +The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario. + +This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed. + +In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + ## State Store When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: diff --git a/build.sbt b/build.sbt index 22781d7..0d13244 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.3.0", + version := "1.4.0", scalaVersion := "2.11.11", crossScalaVersions := Seq("2.12.2","2.11.11"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.11.0.0" +val kafkaVersion = "0.11.0.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..b7dd3cb --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.0.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 92096cd..b7a9e63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 3238f26..dc14adf 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,12 +30,12 @@ object MockedStreams { def apply() = Builder() - case class Input(seq: Seq[(Array[Byte], Array[Byte])]) + case class Record(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: Map[String, Input] = Map()) { + inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) @@ -43,16 +43,22 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) } - this.copy(inputs = inputs + (topic -> Input(in))) + + val updatedRecords = newRecords.foldLeft(inputs) { + case (events, (k, v)) => + val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + events :+ newRecord + } + + this.copy(inputs = updatedRecords) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -62,10 +68,10 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -98,11 +104,10 @@ object MockedStreams { new Driver(new StreamsConfig(props), builder) } - private def produce(driver: Driver) = { - inputs.foreach { case (topic, input) => - input.seq.foreach { case (key, value) => + private def produce(driver: Driver): Unit = { + inputs.foreach{ + case Record(topic, key, value) => driver.process(topic, key, value) - } } } @@ -125,5 +130,3 @@ object MockedStreams { class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") } - - diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5833238..e9af0e9 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.stateTable(StoreName) shouldEqual inputA.toMap } + it should "assert correctly when joining events sent to 2 Ktables in a specific order" in { + import Fixtures.Multi._ + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + + builder.output(OutputATopic, strings, ints, expectedOutput.size) + .shouldEqual(expectedOutput) + } + it should "assert correctly when processing windowed state output topology" in { import Fixtures.Multi._ @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" + val Store2Name = "store2" def topology1Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) @@ -230,6 +250,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } + + def topologyTables(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val tableA = streamA.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + StoreName) + + val tableB = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + Store2Name) + + val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner) + + resultTable + .toStream + .to(strings, ints, OutputATopic) + } } }