Skip to content

Commit

Permalink
Merge pull request #2 from jpzk/release/1.0
Browse files Browse the repository at this point in the history
Release/1.0
  • Loading branch information
jpzk authored Oct 23, 2016
2 parents d036ce5 + 05eab88 commit eec06a7
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 32 deletions.
33 changes: 14 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
# Mocked Streams (preview)
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Coverage Status](https://coveralls.io/repos/github/jpzk/mockedstreams/badge.svg?branch=master)](https://coveralls.io/github/jpzk/mockedstreams?branch=master) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
# Mocked Streams

**Mocked Streams is a Kafka Streams testing library** for Kafka >= 0.10.1 (snapshot JARs included in /lib) which makes use of the ProcessorTopologyTestDriver, therefore **no Kafka brokers and Zookeeper needed** and tests can be run in parallel. It integrates well with any testing framework. The library will be published on the Maven repositories once the new Kafka version is released.
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)

If you had your first hands-on Kafka Streams already, you might have noticed that there is no easy way to unit-test your topologies. Even though Kafka is documented quite well, it lacks, for now, of good documentation of how to test. In general, you can choose between running a full Kafka cluster including Zookeeper in your environment, running an embedded Kafka broker and Zookeeper, like in this Kafka Streams [integration test](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java), or you use mock producers and consumers for lightweight unit-tests of your topologies. While the first two approaches are the way to go for integration tests, we should also be able to unit-test our topologies in a simple way:
Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.0.0" % "test"

## Example

It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
Expand All @@ -14,15 +21,9 @@ If you had your first hands-on Kafka Streams already, you might have noticed tha
.input("topic-in", strings, strings, input)
.output("topic-out", strings, strings, exp.size) shouldEqual exp

### Mocked Streams

[Mocked Streams](https://github.com/jpzk/mockedstreams) is a library which allows you to do the latter without much boilerplate code and in your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple. The example above is testing the following topology:
It also allows you to have multiple input and output streams:

builder.stream(strings, strings, "topic-in")
.map((k, v) => new KeyValue(k, v.toUpperCase))
.to(strings, strings, "topic-out")

In the example, we have only one input and one output topic. However, we are able to define multiple inputs and multiple outputs. Assuming we have a topology [MultiInputOutputTopology](https://github.com/jpzk/mockedstreams/blob/master/src/test/scala/MockedStreamsSpec.scala) which consumes two input streams, does an aggregation with a local state, and sends records to two output topics, we can test it like this:
import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
Expand All @@ -32,10 +33,4 @@ In the example, we have only one input and one output topic. However, we are abl

mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

Note, that the order .input(...) calls is very important: When calling .output(...) Mocked Streams produces to the Kafka input topics in the same order as specified. In the example, it would produce all messages to topic "out-a" first, then to "out-b". Each output call will start an isolated run, fetching from the specified output topic. For a better understanding I like to refer to the [tests](https://github.com/jpzk/mockedstreams/blob/master/src/test/scala/MockedStreamsSpec.scala) of Mocked Streams itself.

### Usage in the Next Release of Kafka

Personally, I work on next still unstable version 0.10.1 of Kafka. I was experiencing some issues back-porting Mocked Streams to the stable 0.10.0.1 release. Therefore, I decided to only support and distribute JARs for the next stable release. However, if you are interested in scratching your own itch, contribution & collaboration would be great! Unfortunately, for now, if you want to use it, you need to add Mocked Streams manually. But I will add it to the [Maven Repositories](https://mvnrepository.com/) when the next Kafka version is released!


64 changes: 54 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
lazy val commonSettings = Seq(
organization := "mwt.mockedstreams",
version := "0.0.1-SNAPSHOT",
organization := "com.madewithtea",
version := "1.0.0",
scalaVersion := "2.11.8",
description := "",
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
organizationHomepage := Some(url("https://www.madewithtea.com")),
parallelExecution in Test := false,
coverageEnabled := true,
scalacOptions := Seq("-Xexperimental"))

val log4jVersion = "1.2.17"
val slf4jVersion = "1.7.21"
val scalaTestVersion = "2.2.6"
val rocksDBVersion = "4.11.2"
val kafkaVersion = "0.10.1.0"

lazy val scalaTest = "org.scalatest" %% "scalatest" % "2.2.5"
lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % "4.9.0"
lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test",
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"org.apache.kafka" % "kafka-streams" % kafkaVersion classifier "test",
"org.apache.kafka" %% "kafka" % kafkaVersion
)

lazy val logging = Seq("log4j" % "log4j" % log4jVersion,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion)
lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test"
lazy val logging = Seq("log4j" % "log4j" % log4jVersion % "test",
"org.slf4j" % "slf4j-api" % slf4jVersion % "test",
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test")

lazy val mockedstreams = (project in file(".")).
settings(commonSettings: _*).
Expand All @@ -27,8 +36,43 @@ lazy val mockedstreams = (project in file(".")).
libraryDependencies ++= Seq(
scalaTest,
rocksDB
) ++ logging
) ++ kafka ++ logging
)

publishTo := {
val nexus = "https://oss.sonatype.org/"
if (isSnapshot.value)
Some("snapshots" at nexus + "content/repositories/snapshots")
else
Some("releases" at nexus + "service/local/staging/deploy/maven2")
}

publishMavenStyle := true

publishArtifact in Test := false

pomIncludeRepository := { _ => false }

pomExtra := (
<url>https://www.madewithtea.com/pages/mocked-streams.html</url>
<licenses>
<license>
<name>Apache License Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>git@github.com:jpzk/mockedstreams.git</url>
<connection>scm:git:git@github.com:jpzk/mockedstreams.git</connection>
</scm>
<developers>
<developer>
<id>jpzk</id>
<name>Jendrik Poloczek</name>
<url>https://www.madewithtea.com</url>
</developer>
</developers>
)


Binary file removed lib/kafka-clients-0.10.1.0-test.jar
Binary file not shown.
Binary file removed lib/kafka-clients-0.10.1.0.jar
Binary file not shown.
Binary file removed lib/kafka-streams-0.10.1.0-test.jar
Binary file not shown.
Binary file removed lib/kafka-streams-0.10.1.0.jar
Binary file not shown.
Binary file removed lib/kafka_2.10-0.10.1.0.jar
Binary file not shown.
Binary file removed lib/kafka_2.11-0.10.1.0-test.jar
Binary file not shown.
Binary file removed lib/kafka_2.11-0.10.1.0.jar
Binary file not shown.
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.4.0")
4 changes: 2 additions & 2 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mwt.mockedstreams
package com.madewithtea.mockedstreams

import java.util.{Properties, UUID}

Expand Down Expand Up @@ -50,7 +50,7 @@ object MockedStreams {
def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0)
throw new ExpectedOutputIsEmpty
if (inputs.size == 0)
if (inputs.isEmpty)
throw new NoInputSpecified

val driver = stream
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package mwt.mockedstreams
package com.madewithtea.mockedstreams

import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
Expand Down

0 comments on commit eec06a7

Please sign in to comment.