diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ebe177f..f37ac8d 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -2,4 +2,4 @@ * Hamidreza Afzali * Jendrik Poloczek - +* Svend Vanderveken diff --git a/README.md b/README.md index 3416c4c..f982685 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,26 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) +## Event order and multiple emissions + +The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario. + +This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed. + +In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again. + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + ## State Store When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: diff --git a/build.sbt b/build.sbt index 22781d7..b2b85ba 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.11.0.0" +val kafkaVersion = "0.11.0.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..b7dd3cb --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.0.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 92096cd..b7a9e63 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 3238f26..fcc0b94 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -30,12 +30,12 @@ object MockedStreams { def apply() = Builder() - case class Input(seq: Seq[(Array[Byte], Array[Byte])]) + case class Record(topic: String, key: Array[Byte], value: Array[Byte]) case class Builder(topology: Option[(KStreamBuilder => Unit)] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: Map[String, Input] = Map()) { + inputs: List[Record] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) @@ -43,16 +43,22 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = { + def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { val keySer = key.serializer val valSer = value.serializer - val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) } - this.copy(inputs = inputs + (topic -> Input(in))) + + val updatedRecords = newRecords.foldLeft(inputs) { + case (events, (k, v)) => + val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + newRecord :: events + } + + this.copy(inputs = updatedRecords) } def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -62,10 +68,10 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() @@ -98,11 +104,10 @@ object MockedStreams { new Driver(new StreamsConfig(props), builder) } - private def produce(driver: Driver) = { - inputs.foreach { case (topic, input) => - input.seq.foreach { case (key, value) => + private def produce(driver: Driver): Unit = { + inputs.reverse.foreach{ + case Record(topic, key, value) => driver.process(topic, key, value) - } } } @@ -125,5 +130,3 @@ object MockedStreams { class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") } - - diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5833238..e9af0e9 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.stateTable(StoreName) shouldEqual inputA.toMap } + it should "assert correctly when joining events sent to 2 Ktables in a specific order" in { + import Fixtures.Multi._ + + val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2))) + val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5))) + val secondInputForTopicA = Seq(("y", int(4))) + + val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9)) + + val builder = MockedStreams() + .topology(topologyTables) + .input(InputATopic, strings, ints, firstInputForTopicA) + .input(InputBTopic, strings, ints, firstInputForTopicB) + .input(InputATopic, strings, ints, secondInputForTopicA) + + builder.output(OutputATopic, strings, ints, expectedOutput.size) + .shouldEqual(expectedOutput) + } + it should "assert correctly when processing windowed state output topology" in { import Fixtures.Multi._ @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" + val Store2Name = "store2" def topology1Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) @@ -230,6 +250,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } + + def topologyTables(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputATopic) + val streamB = builder.stream(strings, ints, InputBTopic) + + val tableA = streamA.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + StoreName) + + val tableB = streamB.groupByKey(strings, ints).aggregate( + new LastInitializer, + new LastAggregator, + ints, + Store2Name) + + val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner) + + resultTable + .toStream + .to(strings, ints, OutputATopic) + } } }