Skip to content

Commit

Permalink
Merge pull request #57 from jpzk/release/3.4
Browse files Browse the repository at this point in the history
Release/3.4
  • Loading branch information
jpzk authored Aug 16, 2019
2 parents 929849f + 3292f19 commit a3e1a75
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 98 deletions.
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = "2.0.0-RC4"
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
language: scala
scala:
- 2.11.12
- 2.12.8

jdk:
- oraclejdk8
- openjdk11

script:
- sbt clean coverage test coverageReport
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## Mocked Streams 3.4

* Added support for Apache 2.3.0
* Dropped support for Scala 2.11

## Mocked Streams 3.3

* Added support for Apache Kafka 2.2.0
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade)
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)

Mocked Streams 3.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 3.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.12.X which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.3.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.4.0" % "test"

Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka)

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
|------------- |-------------|
| 3.4.0 | 2.3.0.0 |
| 3.3.0 | 2.2.0.0 |
| 3.2.0 | 2.1.1.0 |
| 3.1.0 | 2.1.0.0 |
| 2.2.0 | 2.1.0.0 |
| 2.1.0 | 2.0.0.0 |
2.0.0 | 2.0.0.0 |
| 2.0.0 | 2.0.0.0 |
| 1.8.0 | 1.1.1.0 |
| 1.7.0 | 1.1.0.0 |
| 1.6.0 | 1.0.1.0 |
Expand All @@ -28,7 +29,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc
| 1.2.1 | 0.10.2.1 |
| 1.2.0 | 0.10.2.0 |
| 1.1.0 | 0.10.1.1 |
| 1.0.0 | 0.10.1.0 |
| 1.0.0 | 0.10.1.0 |

## Simple Example

Expand Down
31 changes: 21 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "3.3.0",
version := "3.4.0",
scalaVersion := "2.12.8",
crossScalaVersions := Seq("2.12.8", "2.11.12"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
description := "Topology Unit-Testing Library for Kafka Streams",
organizationHomepage := Some(url("https://www.madewithtea.com")),
scalacOptions := Seq("-Xexperimental"))

val scalaTestVersion = "3.0.5"
val rocksDBVersion = "5.17.2"
val kafkaVersion = "2.2.0"
val scalaTestVersion = "3.0.8"
val rocksDBVersion = "5.18.3"
val kafkaVersion = "2.3.0"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down Expand Up @@ -56,10 +55,6 @@ pomExtra :=
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>git@github.com:jpzk/mockedstreams.git</url>
<connection>scm:git:git@github.com:jpzk/mockedstreams.git</connection>
</scm>
<developers>
<developer>
<id>jpzk</id>
Expand All @@ -69,3 +64,19 @@ pomExtra :=
</developers>


micrositeName := "Mocked Streams"
micrositeDescription := "Unit-Testing Topologies in Kafka Streams"
micrositeUrl := "http://mockedstreams.madewithtea.com"
micrositeGithubOwner := "jpzk"
micrositeGithubRepo := "mockedstreams"
micrositeAuthor := "Jendrik Poloczek"
micrositeTwitter := "@madewithtea"
micrositeTwitterCreator := "@madewithtea"
micrositeCompilingDocsTool := WithMdoc

lazy val docs = project // new documentation project
.in(file("ms-docs")) // important: it must not be docs/
.dependsOn(mockedstreams)
.enablePlugins(MdocPlugin)

enablePlugins(MicrositesPlugin)
3 changes: 3 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0")
addSbtPlugin("com.47deg" % "sbt-microsites" % "0.9.2")
addSbtPlugin("org.scalameta" % "sbt-mdoc" % "1.3.1" )
150 changes: 103 additions & 47 deletions src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.madewithtea.mockedstreams

import java.time.Instant
import java.util.{Properties, UUID}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver => Driver}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.state.ValueAndTimestamp
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.{
StreamsConfig,
Topology,
TopologyTestDriver => Driver
}

import scala.collection.JavaConverters._
import scala.collection.immutable
Expand All @@ -34,12 +37,15 @@ object MockedStreams {

def apply() = Builder()

case class Builder(topology: Option[() => Topology] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty) {
case class Builder(
topology: Option[() => Topology] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty
) {

def config(configuration: Properties): Builder = this.copy(configuration = configuration)
def config(configuration: Properties): Builder =
this.copy(configuration = configuration)

def topology(func: StreamsBuilder => Unit): Builder = {
val buildTopology = () => {
Expand All @@ -54,81 +60,126 @@ object MockedStreams {

def stores(stores: Seq[String]): Builder = this.copy(stateStores = stores)

def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]): Builder =
def input[K, V](
topic: String,
key: Serde[K],
value: Serde[V],
records: Seq[(K, V)]
): Builder =
_input(topic, key, value, Left(records))

def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]): Builder =
def inputWithTime[K, V](
topic: String,
key: Serde[K],
value: Serde[V],
records: Seq[(K, V, Long)]
): Builder =
_input(topic, key, value, Right(records))

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): immutable.IndexedSeq[(K, V)] = {
def output[K, V](
topic: String,
key: Serde[K],
value: Serde[V],
size: Int
): immutable.IndexedSeq[(K, V)] = {
if (size <= 0) throw new ExpectedOutputIsEmpty
withProcessedDriver { driver =>
(0 until size).flatMap { _ =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
case None => None
case None => None
}
}
}
}

def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
def outputTable[K, V](
topic: String,
key: Serde[K],
value: Serde[V],
size: Int
): Map[K, V] =
output[K, V](topic, key, value, size).toMap

def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
list.toMap
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver {
driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record =>
(record.key, record.value)
}
records.close()
list.toMap
}

/**
* @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds
*/
def windowStateTable[K, V](name: String,
key: K,
timeFrom: Long = 0,
timeTo: Long = Long.MaxValue): Map[java.lang.Long, V] = {
windowStateTable[K, V](name, key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo))
def windowStateTable[K, V](
name: String,
key: K,
timeFrom: Long = 0,
timeTo: Long = Long.MaxValue
): Map[java.lang.Long, ValueAndTimestamp[V]] = {
windowStateTable[K, V](
name,
key,
Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo)
)
}

/**
* @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds
*/
def windowStateTable[K, V](name: String,
key: K,
timeFrom: Instant,
timeTo: Instant): Map[java.lang.Long, V] = withProcessedDriver { driver =>
val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]]
val records = store.fetch(key, timeFrom, timeTo)
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
list.toMap
}
def windowStateTable[K, V](
name: String,
key: K,
timeFrom: Instant,
timeTo: Instant
): Map[java.lang.Long, ValueAndTimestamp[V]] =
withProcessedDriver { driver =>
val store = driver.getTimestampedWindowStore[K, V](name)
val records = store.fetch(key, timeFrom, timeTo)
val list = records.asScala.toList.map { record =>
(record.key, record.value)
}
records.close()
list.toMap
}

private def _input[K, V](topic: String, key: Serde[K], value: Serde[V],
records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = {
private def _input[K, V](
topic: String,
key: Serde[K],
value: Serde[V],
records: Either[Seq[(K, V)], Seq[(K, V, Long)]]
) = {
val keySer = key.serializer
val valSer = value.serializer
val factory = new ConsumerRecordFactory[K, V](keySer, valSer)

val updatedRecords = records match {
case Left(withoutTime) => withoutTime.foldLeft(inputs) {
case (events, (k, v)) => events :+ factory.create(topic, k, v)
}
case Right(withTime) => withTime.foldLeft(inputs) {
case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp)
}
case Left(withoutTime) =>
withoutTime.foldLeft(inputs) {
case (events, (k, v)) => events :+ factory.create(topic, k, v)
}
case Right(withTime) =>
withTime.foldLeft(inputs) {
case (events, (k, v, timestamp)) =>
events :+ factory.create(topic, k, v, timestamp)
}
}
this.copy(inputs = updatedRecords)
}

// state store is temporarily created in ProcessorTopologyTestDriver
private def stream = {
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}")
props.put(
StreamsConfig.APPLICATION_ID_CONFIG,
s"mocked-${UUID.randomUUID().toString}"
)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.putAll(configuration)
configuration.asScala.foreach { case (k, v) => props.put(k, v) }
new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props)
}

Expand All @@ -146,10 +197,15 @@ object MockedStreams {
}
}

class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.")
class NoTopologySpecified
extends Exception("No topology specified. Call topology() on builder.")

class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.")
class NoInputSpecified
extends Exception(
"No input fixtures specified. Call input() method on builder."
)

class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.")
class ExpectedOutputIsEmpty
extends Exception("Output size needs to be greater than 0.")

}
Loading

0 comments on commit a3e1a75

Please sign in to comment.