Skip to content

Commit

Permalink
Merge pull request #10 from jpzk/v1.2.0
Browse files Browse the repository at this point in the history
V1.2.0
  • Loading branch information
jpzk authored Feb 26, 2017
2 parents 890eeee + 1e14e51 commit 9d61458
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 68 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG → CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
# Change Log
# Changelog

## Mocked Streams 1.2.0

* Build against Apache Kafka 0.10.2
* Added support for Scala 2.12.1
* Added .stateTable and .windowStateTable method for retrieving the content of the state stores as Map
* Added contributors file
* Removed dependencies to Log4j and Slf4j
* Updated RocksDB version to 5.0.1
* Updated ScalaTest version to 3.0.1
* Added more assertions in the test for input validation

## Mocked Streams 1.1.0

Expand Down
5 changes: 5 additions & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Contributors

* Hamidreza Afzali
* Jendrik Poloczek

42 changes: 38 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)


Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.1.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test"

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
| ------------- |-------------|
| 1.2.0 | 0.10.2.0 |
| 1.1.0 | 0.10.1.1 |
| 1.0.0 | 0.10.1.0 |

Expand All @@ -32,7 +33,7 @@ It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.

## Multiple Input / Output Example and State

It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(Seq[String]):
It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]):

import com.madewithtea.mockedstreams.MockedStreams

Expand All @@ -44,7 +45,40 @@ It also allows you to have multiple input and output streams. If your topology u

mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)


## State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:

import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.input("in-a", strings, ints, inputA)
.input("in-b", strings, ints, inputB)
.stores(Seq("store-name"))

mstreams.stateTable("store-name") shouldEqual Map('a' -> 1)

## Window State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method:

import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.input("in-a", strings, ints, inputA)
.stores(Seq("store-name"))
.config(props)

mstreams.windowStateTable("store-name", "x") shouldEqual someMapX
mstreams.windowStateTable("store-name", "y") shouldEqual someMapY

## Custom Streams Configuration

Sometimes you need to pass a custom configuration to Kafka Streams:
Expand Down
18 changes: 7 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "1.1.0",
version := "1.2.0",
scalaVersion := "2.11.8",
crossScalaVersions := Seq("2.12.0", "2.11.8"),
crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
organizationHomepage := Some(url("https://www.madewithtea.com")),
scalacOptions := Seq("-Xexperimental"))

val log4jVersion = "1.2.17"
val slf4jVersion = "1.7.21"
val scalaTestVersion = "2.2.6"
val rocksDBVersion = "4.11.2"
val kafkaVersion = "0.10.1.1"
val scalaTestVersion = "3.0.1"
val rocksDBVersion = "5.0.1"
val kafkaVersion = "0.10.2.0"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand All @@ -23,17 +22,14 @@ lazy val kafka = Seq(

lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test"
lazy val logging = Seq("log4j" % "log4j" % log4jVersion % "test",
"org.slf4j" % "slf4j-api" % slf4jVersion % "test",
"org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test")

lazy val mockedstreams = (project in file(".")).
settings(commonSettings: _*).
settings(
libraryDependencies ++= Seq(
scalaTest,
rocksDB
) ++ kafka ++ logging
) ++ kafka
)

publishTo := {
Expand Down
56 changes: 35 additions & 21 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
*/
package com.madewithtea.mockedstreams

import collection.JavaConverters._
import java.util.{Properties, UUID}

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.test.ProcessorTopologyTestDriver

import scala.collection.JavaConverters._

object MockedStreams {

def apply() = Builder()
Expand All @@ -48,33 +50,25 @@ object MockedStreams {
this.copy(inputs = inputs + (topic -> Input(in)))
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0)
throw new ExpectedOutputIsEmpty
if (inputs.isEmpty)
throw new NoInputSpecified
def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput {
if (size <= 0) throw new ExpectedOutputIsEmpty

val driver = stream
produce(driver)
val driver = stream
produce(driver)

val keyDes = key.deserializer
val valDes = value.deserializer
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, keyDes, valDes)) match {
case Some(record) => Some((record.key, record.value))
case None => None
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
case None => None
}
}
}
}

def outputTable[K,V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
output[K,V](topic, key, value, size).toMap
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = {
output[K, V](topic, key, value, size).toMap
}

def stateTable(name: String) = {
if (inputs.isEmpty)
throw new NoInputSpecified

def stateTable(name: String): Map[Nothing, Nothing] = withValidInput {
val driver = stream
produce(driver)

Expand All @@ -85,6 +79,19 @@ object MockedStreams {
list.toMap
}

def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0,
timeTo: Long = Long.MaxValue) = withValidInput {

val driver = stream
produce(driver)

val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]]
val records = store.fetch(key, timeFrom, timeTo)
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
list.toMap
}

// state store is temporarily created in ProcessorTopologyTestDriver
private def stream = {
val props = new Properties
Expand All @@ -109,6 +116,13 @@ object MockedStreams {
}
}
}

private def withValidInput[T](f: => T): T = {
if (inputs.isEmpty)
throw new NoInputSpecified
f
}

}

class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.")
Expand Down
Loading

0 comments on commit 9d61458

Please sign in to comment.