Skip to content

Commit

Permalink
Merge pull request #14 from jpzk/release/1.2.1
Browse files Browse the repository at this point in the history
Release/1.2.1
  • Loading branch information
jpzk authored May 24, 2017
2 parents 9d61458 + fb41692 commit ebb2df7
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 34 deletions.
7 changes: 7 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@ language: scala
scala:
- 2.11.8

jdk:
- oraclejdk8

script:
- sbt clean coverage test coverageReport

after_success:
- bash <(curl -s https://codecov.io/bash) -t "4b854b7d-2c0e-42b8-80dd-4d2bd4a60535"
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## Mocked Streams 1.2.1

* Build against Apache Kafka 0.10.2.1
* Added calling of clean up method after driver run
* Updated ScalaTest version to 3.0.2
* Updated Scala Versions
* Added CodeCov and SCoverage coverage report

## Mocked Streams 1.2.0

* Build against Apache Kafka 0.10.2
Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# Mocked Streams

[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)


Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 1.2.1 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.1" % "test"

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
| ------------- |-------------|
| 1.2.1 | 0.10.2.1 |
| 1.2.0 | 0.10.2.0 |
| 1.1.0 | 0.10.1.1 |
| 1.0.0 | 0.10.1.0 |
Expand Down
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "1.2.0",
scalaVersion := "2.11.8",
crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"),
version := "1.2.1",
scalaVersion := "2.11.11",
crossScalaVersions := Seq("2.12.2","2.11.11"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
organizationHomepage := Some(url("https://www.madewithtea.com")),
scalacOptions := Seq("-Xexperimental"))

val scalaTestVersion = "3.0.1"
val scalaTestVersion = "3.0.2"
val rocksDBVersion = "5.0.1"
val kafkaVersion = "0.10.2.0"
val kafkaVersion = "0.10.2.1"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
45 changes: 19 additions & 26 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.test.ProcessorTopologyTestDriver
import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -50,41 +50,30 @@ object MockedStreams {
this.copy(inputs = inputs + (topic -> Input(in)))
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput {
if (size <= 0) throw new ExpectedOutputIsEmpty

val driver = stream
produce(driver)

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0) throw new ExpectedOutputIsEmpty
withProcessedDriver { driver =>
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
case None => None
}
}
}

def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = {
output[K, V](topic, key, value, size).toMap
}

def stateTable(name: String): Map[Nothing, Nothing] = withValidInput {
val driver = stream
produce(driver)
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
output[K, V](topic, key, value, size).toMap

val store = driver.getKeyValueStore(name)
val records = store.all()
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
list.toMap
}

def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0,
timeTo: Long = Long.MaxValue) = withValidInput {

val driver = stream
produce(driver)

timeTo: Long = Long.MaxValue) = withProcessedDriver { driver =>
val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]]
val records = store.fetch(key, timeFrom, timeTo)
val list = records.asScala.toList.map { record => (record.key, record.value) }
Expand All @@ -106,23 +95,27 @@ object MockedStreams {
case _ => throw new NoTopologySpecified
}

new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStores: _*)
new Driver(new StreamsConfig(props), builder, stateStores: _*)
}

private def produce(driver: ProcessorTopologyTestDriver) = {
private def produce(driver: Driver) = {
inputs.foreach { case (topic, input) =>
input.seq.foreach { case (key, value) =>
driver.process(topic, key, value)
}
}
}

private def withValidInput[T](f: => T): T = {
if (inputs.isEmpty)
private def withProcessedDriver[T](f: Driver => T): T = {
if(inputs.isEmpty)
throw new NoInputSpecified
f
}

val driver = stream
produce(driver)
val result: T = f(driver)
driver.close
result
}
}

class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.")
Expand Down

0 comments on commit ebb2df7

Please sign in to comment.