diff --git a/CHANGELOG b/CHANGELOG.md similarity index 52% rename from CHANGELOG rename to CHANGELOG.md index da271c4..b3215bb 100644 --- a/CHANGELOG +++ b/CHANGELOG.md @@ -1,4 +1,15 @@ -# Change Log +# Changelog + +## Mocked Streams 1.2.0 + +* Build against Apache Kafka 0.10.2 +* Added support for Scala 2.12.1 +* Added .stateTable and .windowStateTable method for retrieving the content of the state stores as Map +* Added contributors file +* Removed dependencies to Log4j and Slf4j +* Updated RocksDB version to 5.0.1 +* Updated ScalaTest version to 3.0.1 +* Added more assertions in the test for input validation ## Mocked Streams 1.1.0 diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..ebe177f --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,5 @@ +# Contributors + +* Hamidreza Afzali +* Jendrik Poloczek + diff --git a/README.md b/README.md index 1d38764..c534f85 100644 --- a/README.md +++ b/README.md @@ -3,14 +3,15 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.1.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | | 1.0.0 | 0.10.1.0 | @@ -32,7 +33,7 @@ It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github. ## Multiple Input / Output Example and State -It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(Seq[String]): +It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]): import com.madewithtea.mockedstreams.MockedStreams @@ -44,7 +45,40 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) - + +## State Store + +When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: + + import com.madewithtea.mockedstreams.MockedStreams + + val mstreams = MockedStreams() + .topology { builder => builder.stream(...) [...] } + .input("in-a", strings, ints, inputA) + .input("in-b", strings, ints, inputB) + .stores(Seq("store-name")) + + mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) + +## Window State Store + +When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method: + + import com.madewithtea.mockedstreams.MockedStreams + + val props = new Properties + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + classOf[TimestampExtractors.CustomTimestampExtractor].getName) + + val mstreams = MockedStreams() + .topology { builder => builder.stream(...) [...] } + .input("in-a", strings, ints, inputA) + .stores(Seq("store-name")) + .config(props) + + mstreams.windowStateTable("store-name", "x") shouldEqual someMapX + mstreams.windowStateTable("store-name", "y") shouldEqual someMapY + ## Custom Streams Configuration Sometimes you need to pass a custom configuration to Kafka Streams: diff --git a/build.sbt b/build.sbt index 1c7c200..01452bb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,17 +1,16 @@ + lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.1.0", + version := "1.2.0", scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.12.0", "2.11.8"), + crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val log4jVersion = "1.2.17" -val slf4jVersion = "1.7.21" -val scalaTestVersion = "2.2.6" -val rocksDBVersion = "4.11.2" -val kafkaVersion = "0.10.1.1" +val scalaTestVersion = "3.0.1" +val rocksDBVersion = "5.0.1" +val kafkaVersion = "0.10.2.0" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, @@ -23,9 +22,6 @@ lazy val kafka = Seq( lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test" lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test" -lazy val logging = Seq("log4j" % "log4j" % log4jVersion % "test", - "org.slf4j" % "slf4j-api" % slf4jVersion % "test", - "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test") lazy val mockedstreams = (project in file(".")). settings(commonSettings: _*). @@ -33,7 +29,7 @@ lazy val mockedstreams = (project in file(".")). libraryDependencies ++= Seq( scalaTest, rocksDB - ) ++ kafka ++ logging + ) ++ kafka ) publishTo := { diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 36f149b..f119c85 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -16,14 +16,16 @@ */ package com.madewithtea.mockedstreams -import collection.JavaConverters._ import java.util.{Properties, UUID} import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStreamBuilder +import org.apache.kafka.streams.state.ReadOnlyWindowStore import org.apache.kafka.test.ProcessorTopologyTestDriver +import scala.collection.JavaConverters._ + object MockedStreams { def apply() = Builder() @@ -48,33 +50,25 @@ object MockedStreams { this.copy(inputs = inputs + (topic -> Input(in))) } - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { - if (size <= 0) - throw new ExpectedOutputIsEmpty - if (inputs.isEmpty) - throw new NoInputSpecified + def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput { + if (size <= 0) throw new ExpectedOutputIsEmpty - val driver = stream - produce(driver) + val driver = stream + produce(driver) - val keyDes = key.deserializer - val valDes = value.deserializer - (0 until size).flatMap { i => - Option(driver.readOutput(topic, keyDes, valDes)) match { - case Some(record) => Some((record.key, record.value)) - case None => None + (0 until size).flatMap { i => + Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { + case Some(record) => Some((record.key, record.value)) + case None => None + } } } - } - def outputTable[K,V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { - output[K,V](topic, key, value, size).toMap + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = { + output[K, V](topic, key, value, size).toMap } - def stateTable(name: String) = { - if (inputs.isEmpty) - throw new NoInputSpecified - + def stateTable(name: String): Map[Nothing, Nothing] = withValidInput { val driver = stream produce(driver) @@ -85,6 +79,19 @@ object MockedStreams { list.toMap } + def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, + timeTo: Long = Long.MaxValue) = withValidInput { + + val driver = stream + produce(driver) + + val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] + val records = store.fetch(key, timeFrom, timeTo) + val list = records.asScala.toList.map { record => (record.key, record.value) } + records.close() + list.toMap + } + // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties @@ -109,6 +116,13 @@ object MockedStreams { } } } + + private def withValidInput[T](f: => T): T = { + if (inputs.isEmpty) + throw new NoInputSpecified + f + } + } class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.") diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5b98e97..5833238 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -16,47 +16,60 @@ */ package com.madewithtea.mockedstreams +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KeyValue -import org.apache.kafka.streams.kstream.{Aggregator, Initializer, KStreamBuilder, ValueJoiner} +import org.apache.kafka.streams.kstream._ +import org.apache.kafka.streams.processor.TimestampExtractor +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import org.scalatest.{FlatSpec, Matchers} class MockedStreamsSpec extends FlatSpec with Matchers { behavior of "MockedStreams" - it should "throw exception when expected size is <= 0" in { + it should "throw exception when expected size in output methods is <= 0" in { import Fixtures.Uppercase._ import MockedStreams.ExpectedOutputIsEmpty - an[ExpectedOutputIsEmpty] should be thrownBy - MockedStreams() - .topology(topology _) - .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, 0) - - an[ExpectedOutputIsEmpty] should be thrownBy - MockedStreams() - .topology(topology _) - .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, -1) + val spec = MockedStreams() + .topology(topology) + .input(InputTopic, strings, strings, input) + + Seq(-1, 0).foreach { size => + an[ExpectedOutputIsEmpty] should be thrownBy + spec.output(OutputTopic, strings, strings, size) + + an[ExpectedOutputIsEmpty] should be thrownBy + spec.outputTable(OutputTopic, strings, strings, size) + } } - it should "throw exception when no input specified" in { + it should "throw exception when no input specified for all output and state methods" in { import Fixtures.Uppercase._ import MockedStreams.NoInputSpecified + val t = MockedStreams().topology(topology) + + an[NoInputSpecified] should be thrownBy + t.output(OutputTopic, strings, strings, expected.size) + an[NoInputSpecified] should be thrownBy - MockedStreams() - .topology(topology _) - .output(OutputTopic, strings, strings, expected.size) + t.outputTable(OutputTopic, strings, strings, expected.size) + + an[NoInputSpecified] should be thrownBy + t.stateTable("state-table") + + an[NoInputSpecified] should be thrownBy + t.windowStateTable("window-state-table", 0) } it should "assert correctly when processing strings to uppercase" in { import Fixtures.Uppercase._ val output = MockedStreams() - .topology(topology _) + .topology(topology) .input(InputTopic, strings, strings, input) .output(OutputTopic, strings, strings, expected.size) @@ -67,7 +80,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Uppercase._ val output = MockedStreams() - .topology(topology _) + .topology(topology) .input(InputTopic, strings, strings, input) .outputTable(OutputTopic, strings, strings, expected.size) @@ -78,7 +91,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val builder = MockedStreams() - .topology(topology1Output _) + .topology(topology1Output) .input(InputATopic, strings, ints, inputA) .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) @@ -91,7 +104,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val builder = MockedStreams() - .topology(topology2Output _) + .topology(topology2Output) .input(InputATopic, strings, ints, inputA) .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) @@ -102,8 +115,27 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.output(OutputBTopic, strings, ints, expectedB.size) .shouldEqual(expectedB) - builder.stateTable(StoreName) - .shouldEqual(inputA.toMap) + builder.stateTable(StoreName) shouldEqual inputA.toMap + } + + it should "assert correctly when processing windowed state output topology" in { + import Fixtures.Multi._ + + val props = new Properties + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + classOf[TimestampExtractors.CustomTimestampExtractor].getName) + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .input(InputCTopic, strings, ints, inputC) + .stores(Seq(StoreName)) + .config(props) + + builder.windowStateTable(StoreName, "x") + .shouldEqual(expectedCx.toMap) + + builder.windowStateTable(StoreName, "y") + .shouldEqual(expectedCy.toMap) } class LastInitializer extends Initializer[Integer] { @@ -111,15 +143,15 @@ class MockedStreamsSpec extends FlatSpec with Matchers { } class LastAggregator extends Aggregator[String, Integer, Integer] { - override def apply(k: String, v: Integer, t: Integer): Integer = v + override def apply(k: String, v: Integer, t: Integer) = v } class AddJoiner extends ValueJoiner[Integer, Integer, Integer] { - override def apply(v1: Integer, v2: Integer): Integer = v1 + v2 + override def apply(v1: Integer, v2: Integer) = v1 + v2 } class SubJoiner extends ValueJoiner[Integer, Integer, Integer] { - override def apply(v1: Integer, v2: Integer): Integer = v1 - v2 + override def apply(v1: Integer, v2: Integer) = v1 - v2 } object Fixtures { @@ -136,7 +168,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { def topology(builder: KStreamBuilder) = { builder.stream(strings, strings, InputTopic) - .map((k, v) => new KeyValue(k, v.toUpperCase)) + .map[String, String]((k, v) => new KeyValue(k, v.toUpperCase)) .to(strings, strings, OutputTopic) } } @@ -147,14 +179,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val inputA = Seq(("x", int(1)), ("y", int(2))) val inputB = Seq(("x", int(4)), ("y", int(3))) + val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1))) val expectedA = Seq(("x", int(5)), ("y", int(5))) val expectedB = Seq(("x", int(3)), ("y", int(1))) + val expectedCx = Seq((1, 2), (2, 1)) + val expectedCy = Seq((1, 1)) val strings = Serdes.String() val ints = Serdes.Integer() val InputATopic = "inputA" val InputBTopic = "inputB" + val InputCTopic = "inputC" val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" @@ -167,10 +203,17 @@ class MockedStreamsSpec extends FlatSpec with Matchers { new LastInitializer, new LastAggregator, ints, StoreName) - streamB.leftJoin(table, new AddJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) .to(strings, ints, OutputATopic) } + def topology1WindowOutput(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputCTopic) + streamA.groupByKey(strings, ints).count( + TimeWindows.of(1), + StoreName) + } + def topology2Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) val streamB = builder.stream(strings, ints, InputBTopic) @@ -181,10 +224,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { ints, StoreName) - streamB.leftJoin(table, new AddJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) .to(strings, ints, OutputATopic) - streamB.leftJoin(table, new SubJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } } @@ -193,4 +236,13 @@ class MockedStreamsSpec extends FlatSpec with Matchers { } +object TimestampExtractors { + class CustomTimestampExtractor extends TimestampExtractor { + override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long) = record.value match { + case value: Integer => value.toLong + case _ => record.timestamp() + } + } + +}