From 3757c427b8347dc52547d244b33ac6b3e4bcd22c Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Fri, 15 Sep 2017 15:41:34 +0200 Subject: [PATCH 1/3] Now allows to fine control the test events order - previously, the events for each mock topic were submitted all at once during the test, topic per topic - this commit ensure that MockStream records the order in which the test author invoked `input` in the test fixture makes sure they are submitted to the topology in that order --- CONTRIBUTORS.md | 2 +- README.md | 20 ++++++++++ build.sbt | 2 +- project/build.properties | 1 + project/plugins.sbt | 4 +- src/main/scala/MockedStreams.scala | 22 ++++++----- src/test/scala/MockedStreamsSpec.scala | 55 ++++++++++++++++++++++++++ 7 files changed, 92 insertions(+), 14 deletions(-) create mode 100644 project/build.properties diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ebe177f..f37ac8d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,4 +2,4 @@ * Hamidreza Afzali * Jendrik Poloczek - +* Svend Vanderveken diff --git a/README.md b/README.md index 3416c4c..f982685 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,26 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) +## Event order and multiple emissions + +The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario. + +This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed. + +In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + ## State Store When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: diff --git a/build.sbt b/build.sbt index 22781d7..b2b85ba 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.11.0.0" +val kafkaVersion = "0.11.0.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..94005e5 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.0.0 diff --git a/project/plugins.sbt b/project/plugins.sbt index 92096cd..b7a9e63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 3238f26..0a68c66 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,12 +30,13 @@ object MockedStreams { def apply() = Builder() - case class Input(seq: Seq[(Array[Byte], Array[Byte])]) + case class Event(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: Map[String, Input] = Map()) { + inputs: List[Event] = List.empty) { + def config(configuration: Properties) = this.copy(configuration = configuration) @@ -43,11 +44,15 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) } - this.copy(inputs = inputs + (topic -> Input(in))) + + val updatedInputs = newInput.foldLeft(inputs) { + case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events + } + + this.copy(inputs = updatedInputs) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { @@ -99,10 +104,9 @@ object MockedStreams { } private def produce(driver: Driver) = { - inputs.foreach { case (topic, input) => - input.seq.foreach { case (key, value) => + inputs.reverse.foreach{ + case Event(topic, key, value) => driver.process(topic, key, value) - } } } @@ -125,5 +129,3 @@ object MockedStreams { class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") } - - diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5833238..cb197db 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.stateTable(StoreName) shouldEqual inputA.toMap } + it should "assert correctly when joining events sent to 2 Ktables in a specific order" in { + import Fixtures.Multi._ + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + + builder.output(OutputATopic, strings, ints, expectedOutput.size) + .shouldEqual(expectedOutput) + } + it should "assert correctly when processing windowed state output topology" in { import Fixtures.Multi._ @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" + val Store2Name = "store2" def topology1Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) @@ -207,6 +227,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputATopic) } + def topology1Outputbis(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val table = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, ints, StoreName) + + streamA.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) + .to(strings, ints, OutputATopic) + } + def topology1WindowOutput(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputCTopic) streamA.groupByKey(strings, ints).count( @@ -230,6 +262,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } + + def topologyTables(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val tableA = streamA.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + StoreName) + + val tableB = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + Store2Name) + + val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner) + + resultTable + .toStream + .to(strings, ints, OutputATopic) + } } } From e7aa8fc22cd93483af85cdd0ee63062ea1729561 Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Fri, 15 Sep 2017 16:23:27 +0200 Subject: [PATCH 2/3] removed my debug temp method --- src/test/scala/MockedStreamsSpec.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index cb197db..e9af0e9 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -227,18 +227,6 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputATopic) } - def topology1Outputbis(builder: KStreamBuilder) = { - val streamA = builder.stream(strings, ints, InputATopic) - val streamB = builder.stream(strings, ints, InputBTopic) - - val table = streamB.groupByKey(strings, ints).aggregate( - new LastInitializer, - new LastAggregator, ints, StoreName) - - streamA.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) - .to(strings, ints, OutputATopic) - } - def topology1WindowOutput(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputCTopic) streamA.groupByKey(strings, ints).count( From 82fca24dc8fd042c45ab38834c8048dcdbf33601 Mon Sep 17 00:00:00 2001 From: Svend Vanderveken Date: Sun, 24 Sep 2017 09:27:33 +0200 Subject: [PATCH 3/3] some readability improvements + bump sbt to 1.0.2 --- project/build.properties | 2 +- src/main/scala/MockedStreams.scala | 25 +++++++++++++------------ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/project/build.properties b/project/build.properties index 94005e5..b7dd3cb 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.0 +sbt.version=1.0.2 diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 0a68c66..fcc0b94 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,13 +30,12 @@ object MockedStreams { def apply() = Builder() - case class Event(topic: String, key: Array[Byte], value: Array[Byte]) + case class Record(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: List[Event] = List.empty) { - + inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) @@ -44,20 +43,22 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val updatedInputs = newInput.foldLeft(inputs) { - case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events + val updatedRecords = newRecords.foldLeft(inputs) { + case (events, (k, v)) => + val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + newRecord :: events } - this.copy(inputs = updatedInputs) + this.copy(inputs = updatedRecords) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -67,10 +68,10 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -103,9 +104,9 @@ object MockedStreams { new Driver(new StreamsConfig(props), builder) } - private def produce(driver: Driver) = { + private def produce(driver: Driver): Unit = { inputs.reverse.foreach{ - case Event(topic, key, value) => + case Record(topic, key, value) => driver.process(topic, key, value) } }