diff --git a/README.md b/README.md index 7733fa6..4f4561b 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,16 @@ -# Mocked Streams (preview) -[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Coverage Status](https://coveralls.io/repos/github/jpzk/mockedstreams/badge.svg?branch=master)](https://coveralls.io/github/jpzk/mockedstreams?branch=master) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) +# Mocked Streams -**Mocked Streams is a Kafka Streams testing library** for Kafka >= 0.10.1 (snapshot JARs included in /lib) which makes use of the ProcessorTopologyTestDriver, therefore **no Kafka brokers and Zookeeper needed** and tests can be run in parallel. It integrates well with any testing framework. The library will be published on the Maven repositories once the new Kafka version is released. +[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -If you had your first hands-on Kafka Streams already, you might have noticed that there is no easy way to unit-test your topologies. Even though Kafka is documented quite well, it lacks, for now, of good documentation of how to test. In general, you can choose between running a full Kafka cluster including Zookeeper in your environment, running an embedded Kafka broker and Zookeeper, like in this Kafka Streams [integration test](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java), or you use mock producers and consumers for lightweight unit-tests of your topologies. While the first two approaches are the way to go for integration tests, we should also be able to unit-test our topologies in a simple way: +Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): + + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.0.0" % "test" + +## Example + +It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple: + + import com.madewithtea.mockedstreams.MockedStreams val input = Seq(("x", "v1"), ("y", "v2")) val exp = Seq(("x", "V1"), ("y", "V2")) @@ -14,15 +21,9 @@ If you had your first hands-on Kafka Streams already, you might have noticed tha .input("topic-in", strings, strings, input) .output("topic-out", strings, strings, exp.size) shouldEqual exp -### Mocked Streams - -[Mocked Streams](https://github.com/jpzk/mockedstreams) is a library which allows you to do the latter without much boilerplate code and in your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple. The example above is testing the following topology: +It also allows you to have multiple input and output streams: - builder.stream(strings, strings, "topic-in") - .map((k, v) => new KeyValue(k, v.toUpperCase)) - .to(strings, strings, "topic-out") - -In the example, we have only one input and one output topic. However, we are able to define multiple inputs and multiple outputs. Assuming we have a topology [MultiInputOutputTopology](https://github.com/jpzk/mockedstreams/blob/master/src/test/scala/MockedStreamsSpec.scala) which consumes two input streams, does an aggregation with a local state, and sends records to two output topics, we can test it like this: + import com.madewithtea.mockedstreams.MockedStreams val mstreams = MockedStreams() .topology { builder => builder.stream(...) [...] } @@ -32,10 +33,4 @@ In the example, we have only one input and one output topic. However, we are abl mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) - -Note, that the order .input(...) calls is very important: When calling .output(...) Mocked Streams produces to the Kafka input topics in the same order as specified. In the example, it would produce all messages to topic "out-a" first, then to "out-b". Each output call will start an isolated run, fetching from the specified output topic. For a better understanding I like to refer to the [tests](https://github.com/jpzk/mockedstreams/blob/master/src/test/scala/MockedStreamsSpec.scala) of Mocked Streams itself. - -### Usage in the Next Release of Kafka - -Personally, I work on next still unstable version 0.10.1 of Kafka. I was experiencing some issues back-porting Mocked Streams to the stable 0.10.0.1 release. Therefore, I decided to only support and distribute JARs for the next stable release. However, if you are interested in scratching your own itch, contribution & collaboration would be great! Unfortunately, for now, if you want to use it, you need to add Mocked Streams manually. But I will add it to the [Maven Repositories](https://mvnrepository.com/) when the next Kafka version is released! - + diff --git a/build.sbt b/build.sbt index 9133b5b..cc8521f 100644 --- a/build.sbt +++ b/build.sbt @@ -1,22 +1,31 @@ lazy val commonSettings = Seq( - organization := "mwt.mockedstreams", - version := "0.0.1-SNAPSHOT", + organization := "com.madewithtea", + version := "1.0.0", scalaVersion := "2.11.8", - description := "", + description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), - parallelExecution in Test := false, coverageEnabled := true, scalacOptions := Seq("-Xexperimental")) val log4jVersion = "1.2.17" val slf4jVersion = "1.7.21" +val scalaTestVersion = "2.2.6" +val rocksDBVersion = "4.11.2" +val kafkaVersion = "0.10.1.0" -lazy val scalaTest = "org.scalatest" %% "scalatest" % "2.2.5" -lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % "4.9.0" +lazy val kafka = Seq( + "org.apache.kafka" % "kafka-clients" % kafkaVersion, + "org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test", + "org.apache.kafka" % "kafka-streams" % kafkaVersion, + "org.apache.kafka" % "kafka-streams" % kafkaVersion classifier "test", + "org.apache.kafka" %% "kafka" % kafkaVersion + ) -lazy val logging = Seq("log4j" % "log4j" % log4jVersion, - "org.slf4j" % "slf4j-api" % slf4jVersion, - "org.slf4j" % "slf4j-log4j12" % slf4jVersion) +lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test" +lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test" +lazy val logging = Seq("log4j" % "log4j" % log4jVersion % "test", + "org.slf4j" % "slf4j-api" % slf4jVersion % "test", + "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test") lazy val mockedstreams = (project in file(".")). settings(commonSettings: _*). @@ -27,8 +36,43 @@ lazy val mockedstreams = (project in file(".")). libraryDependencies ++= Seq( scalaTest, rocksDB - ) ++ logging + ) ++ kafka ++ logging ) +publishTo := { + val nexus = "https://oss.sonatype.org/" + if (isSnapshot.value) + Some("snapshots" at nexus + "content/repositories/snapshots") + else + Some("releases" at nexus + "service/local/staging/deploy/maven2") +} + +publishMavenStyle := true + +publishArtifact in Test := false + +pomIncludeRepository := { _ => false } + +pomExtra := ( + https://www.madewithtea.com/pages/mocked-streams.html + + + Apache License Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + repo + + + + git@github.com:jpzk/mockedstreams.git + scm:git:git@github.com:jpzk/mockedstreams.git + + + + jpzk + Jendrik Poloczek + https://www.madewithtea.com + + + ) diff --git a/lib/kafka-clients-0.10.1.0-test.jar b/lib/kafka-clients-0.10.1.0-test.jar deleted file mode 100644 index f357c98..0000000 Binary files a/lib/kafka-clients-0.10.1.0-test.jar and /dev/null differ diff --git a/lib/kafka-clients-0.10.1.0.jar b/lib/kafka-clients-0.10.1.0.jar deleted file mode 100644 index 0335258..0000000 Binary files a/lib/kafka-clients-0.10.1.0.jar and /dev/null differ diff --git a/lib/kafka-streams-0.10.1.0-test.jar b/lib/kafka-streams-0.10.1.0-test.jar deleted file mode 100644 index 4d02262..0000000 Binary files a/lib/kafka-streams-0.10.1.0-test.jar and /dev/null differ diff --git a/lib/kafka-streams-0.10.1.0.jar b/lib/kafka-streams-0.10.1.0.jar deleted file mode 100644 index c34f52e..0000000 Binary files a/lib/kafka-streams-0.10.1.0.jar and /dev/null differ diff --git a/lib/kafka_2.10-0.10.1.0.jar b/lib/kafka_2.10-0.10.1.0.jar deleted file mode 100644 index e04b173..0000000 Binary files a/lib/kafka_2.10-0.10.1.0.jar and /dev/null differ diff --git a/lib/kafka_2.11-0.10.1.0-test.jar b/lib/kafka_2.11-0.10.1.0-test.jar deleted file mode 100644 index 8302a86..0000000 Binary files a/lib/kafka_2.11-0.10.1.0-test.jar and /dev/null differ diff --git a/lib/kafka_2.11-0.10.1.0.jar b/lib/kafka_2.11-0.10.1.0.jar deleted file mode 100644 index 5cca273..0000000 Binary files a/lib/kafka_2.11-0.10.1.0.jar and /dev/null differ diff --git a/project/plugins.sbt b/project/plugins.sbt index c389b55..5536e6f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.4.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index b53f99e..1581f11 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package mwt.mockedstreams +package com.madewithtea.mockedstreams import java.util.{Properties, UUID} @@ -50,7 +50,7 @@ object MockedStreams { def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { if (size <= 0) throw new ExpectedOutputIsEmpty - if (inputs.size == 0) + if (inputs.isEmpty) throw new NoInputSpecified val driver = stream diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 74bf40e..977e50f 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package mwt.mockedstreams +package com.madewithtea.mockedstreams import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.KeyValue