Skip to content

Commit

Permalink
Merge pull request #21 from jpzk/release/1.4.0
Browse files Browse the repository at this point in the history
Release/1.4.0
  • Loading branch information
jpzk authored Oct 2, 2017
2 parents 0fb39dc + 12b45ae commit bd7fbf1
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 21 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## Mocked Streams 1.4.0

* Build against Apache Kafka 0.11.0.1
* Added record order and multiple emissions by Svend Vanderveken
* Updated SBT to 1.0.2
* Added Svend Vanderveken to CONTRIBUTORS.md

## Mocked Streams 1.2.2

* Build against Apache Kafka 0.11.0.0
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

* Hamidreza Afzali
* Jendrik Poloczek

* Svend Vanderveken
25 changes: 23 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)


Mocked Streams 1.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 1.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.3.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.4.0" % "test"

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
| ------------- |-------------|
| 1.4.0 | 0.11.0.1 |
| 1.3.0 | 0.11.0.0 |
| 1.2.1 | 0.10.2.1 |
| 1.2.0 | 0.10.2.0 |
Expand Down Expand Up @@ -49,6 +50,26 @@ It also allows you to have multiple input and output streams. If your topology u
mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

## Record order and multiple emissions

The records provided to the mocked stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit records multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the records are received and processed.

In the example below, 2 records are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

## State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "1.3.0",
version := "1.4.0",
scalaVersion := "2.11.11",
crossScalaVersions := Seq("2.12.2","2.11.11"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
Expand All @@ -10,7 +10,7 @@ lazy val commonSettings = Seq(

val scalaTestVersion = "3.0.2"
val rocksDBVersion = "5.0.1"
val kafkaVersion = "0.11.0.0"
val kafkaVersion = "0.11.0.1"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.0.2
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
31 changes: 17 additions & 14 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,35 @@ object MockedStreams {

def apply() = Builder()

case class Input(seq: Seq[(Array[Byte], Array[Byte])])
case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(KStreamBuilder => Unit)] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: Map[String, Input] = Map()) {
inputs: List[Record] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (KStreamBuilder => Unit)) = this.copy(topology = Some(func))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = {
def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = {
val keySer = key.serializer
val valSer = value.serializer
val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) }
this.copy(inputs = inputs + (topic -> Input(in)))

val updatedRecords = newRecords.foldLeft(inputs) {
case (events, (k, v)) =>
val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v))
events :+ newRecord
}

this.copy(inputs = updatedRecords)
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0) throw new ExpectedOutputIsEmpty
withProcessedDriver { driver =>
withProcessedDriver { driver =>
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
Expand All @@ -62,10 +68,10 @@ object MockedStreams {
}
}

def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
output[K, V](topic, key, value, size).toMap

def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
Expand Down Expand Up @@ -98,11 +104,10 @@ object MockedStreams {
new Driver(new StreamsConfig(props), builder)
}

private def produce(driver: Driver) = {
inputs.foreach { case (topic, input) =>
input.seq.foreach { case (key, value) =>
private def produce(driver: Driver): Unit = {
inputs.foreach{
case Record(topic, key, value) =>
driver.process(topic, key, value)
}
}
}

Expand All @@ -125,5 +130,3 @@ object MockedStreams {
class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.")

}


43 changes: 43 additions & 0 deletions src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
builder.stateTable(StoreName) shouldEqual inputA.toMap
}

it should "assert correctly when joining events sent to 2 Ktables in a specific order" in {
import Fixtures.Multi._

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

builder.output(OutputATopic, strings, ints, expectedOutput.size)
.shouldEqual(expectedOutput)
}

it should "assert correctly when processing windowed state output topology" in {
import Fixtures.Multi._

Expand Down Expand Up @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
val OutputATopic = "outputA"
val OutputBTopic = "outputB"
val StoreName = "store"
val Store2Name = "store2"

def topology1Output(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
Expand Down Expand Up @@ -230,6 +250,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints)
.to(strings, ints, OutputBTopic)
}

def topologyTables(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
val streamB = builder.stream(strings, ints, InputBTopic)

val tableA = streamA.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
StoreName)

val tableB = streamB.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
Store2Name)

val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner)

resultTable
.toStream
.to(strings, ints, OutputATopic)
}
}

}
Expand Down

0 comments on commit bd7fbf1

Please sign in to comment.