diff --git a/CHANGELOG.md b/CHANGELOG.md index 25515ac..4e80513 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## Mocked Streams 2.1 + +* Added .inputWithTime to allow custom timestamps +* Thanks to Dan Hamilton for .inputWithTime implementation +* Added Dan Hamilton to CONTRIBUTORS.md + ## Mocked Streams 2.0 * Build against Apache Kafka 2.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 2c677cf..4633266 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -5,3 +5,4 @@ * Svend Vanderveken * Daniel Wojda * Michal Dziemianko +* Dan Hamilton diff --git a/README.md b/README.md index 8426e41..908d0bf 100644 --- a/README.md +++ b/README.md @@ -4,9 +4,9 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 2.0.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 2.1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.0.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.1.0" % "test" Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka) @@ -14,7 +14,8 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| -| 2.0.0 | 2.0.0.0 | +| 2.1.0 | 2.0.0.0 | + 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | @@ -110,6 +111,22 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and mstreams.windowStateTable("store-name", "x") shouldEqual someMapX mstreams.windowStateTable("store-name", "y") shouldEqual someMapY +## Adding Timestamps + +With .input the input records timestamps are set to 0 default timestamp of 0. This e.g. prevents testing Join windows of Kafka streams as it cannot produce records with different timestamps. However, using .inputWithTime allows adding timestamps like in the following example: + + val inputA = Seq( + ("x", int(1), 1000L), + ("x", int(1), 1001L), + ("x", int(1), 1002L) + ) + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .inputWithTime(InputCTopic, strings, ints, inputA) + .stores(Seq(StoreName)) + + ## Custom Streams Configuration Sometimes you need to pass a custom configuration to Kafka Streams: diff --git a/build.sbt b/build.sbt index 3c51c3d..85f90eb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "2.0.0", + version := "2.1.0", scalaVersion := "2.12.6", crossScalaVersions := Seq("2.12.6","2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 0d1cb7a..de93838 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -51,20 +51,11 @@ object MockedStreams { def stores(stores: Seq[String]) = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = { - val keySer = key.serializer - val valSer = value.serializer - - val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - - val updatedRecords = newRecords.foldLeft(inputs) { - case (events, (k, v)) => - val newRecord = factory.create(topic, k, v) - events :+ newRecord - } + def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]) = + _input(topic, key, value, Left(records)) - this.copy(inputs = updatedRecords) - } + def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]) = + _input(topic, key, value, Right(records)) def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty @@ -78,17 +69,19 @@ object MockedStreams { } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + def stateTable(name: String) = withProcessedDriver { driver => val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() list.toMap } - def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, + def windowStateTable[K, V](name: String, + key: K, + timeFrom: Long = 0, timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) @@ -97,6 +90,23 @@ object MockedStreams { list.toMap } + private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], + records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { + val keySer = key.serializer + val valSer = value.serializer + val factory = new ConsumerRecordFactory[K, V](keySer, valSer) + + val updatedRecords = records match { + case Left(withoutTime) => withoutTime.foldLeft(inputs) { + case (events, (k, v)) => events :+ factory.create(topic, k, v) + } + case Right(withTime) => withTime.foldLeft(inputs) { + case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp) + } + } + this.copy(inputs = updatedRecords) + } + // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties @@ -110,8 +120,7 @@ object MockedStreams { inputs.foreach(driver.pipeInput) private def withProcessedDriver[T](f: Driver => T): T = { - if(inputs.isEmpty) - throw new NoInputSpecified + if (inputs.isEmpty) throw new NoInputSpecified val driver = stream produce(driver) @@ -126,4 +135,5 @@ object MockedStreams { class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") + } diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 91d2612..ee8ddd1 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -174,6 +174,20 @@ class MockedStreamsSpec extends FlatSpec with Matchers { output shouldEqual expected } + + it should "accept consumer records with custom timestamps" in { + + import Fixtures.Multi._ + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .inputWithTime(InputCTopic, strings, ints, inputCWithTimeStamps) + .stores(Seq(StoreName)) + + builder.windowStateTable(StoreName, "x") + .shouldEqual(expectedCWithTimeStamps.toMap) + } + class LastInitializer extends Initializer[Integer] { override def apply() = 0 } @@ -216,11 +230,27 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val inputA = Seq(("x", int(1)), ("y", int(2))) val inputB = Seq(("x", int(4)), ("y", int(3))) val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1))) + + val inputCWithTimeStamps = Seq( + ("x", int(1), 1000L), + ("x", int(1), 1000L), + ("x", int(1), 1001L), + ("x", int(1), 1001L), + ("x", int(1), 1002L) + ) + val expectedA = Seq(("x", int(5)), ("y", int(5))) val expectedB = Seq(("x", int(3)), ("y", int(1))) + val expectedCx = Seq((1, 2), (2, 1)) val expectedCy = Seq((1, 1)) + val expectedCWithTimeStamps = Seq( + 1000 -> 2, + 1001 -> 2, + 1002 -> 1 + ) + val strings = Serdes.String() val ints = Serdes.Integer() val serdes = Consumed.`with`(strings, ints)