Skip to content

Commit

Permalink
Merge branch 'master' of github.com:jpzk/mockedstreams
Browse files Browse the repository at this point in the history
  • Loading branch information
jpzk committed Jan 15, 2019
2 parents b9014bc + 495f4eb commit 077a3ec
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 107 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Mocked Streams 3.1

* Added support for Scala DSL in topology method
* Java DSL is deprecated in topology method
* Thanks Michal Dziemianko for migrating to Scala DSL

## Mocked Streams 2.2

* Added compatibility with Apache Kafka 2.1
Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade)
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)

Mocked Streams 3.1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

Mocked Streams 2.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.2.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.1.0" % "test"

Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka)

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
|------------- |-------------|
| 3.1.0 | 2.1.0.0 |
| 2.2.0 | 2.1.0.0 |
| 2.1.0 | 2.0.0.0 |
2.0.0 | 2.0.0.0 |
Expand All @@ -39,7 +39,7 @@ It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/ap
val strings = Serdes.String()

MockedStreams()
.topology { builder => builder.stream(...) [...] }
.topology { builder => builder.stream(...) [...] } // Scala DSL
.input("topic-in", strings, strings, input)
.output("topic-out", strings, strings, exp.size) shouldEqual exp

Expand All @@ -50,7 +50,7 @@ It also allows you to have multiple input and output streams. If your topology u
import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.topology { builder => builder.stream(...) [...] } // Scala DSL
.input("in-a", strings, ints, inputA)
.input("in-b", strings, ints, inputB)
.stores(Seq("store-name"))
Expand All @@ -73,7 +73,7 @@ In the example below, 2 records are first submitted to topic A, then 3 to topic
val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.topology(topologyTables) // Scala DSL
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)
Expand All @@ -85,7 +85,7 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2, yo
import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.topology { builder => builder.stream(...) [...] } // Scala DSL
.input("in-a", strings, ints, inputA)
.input("in-b", strings, ints, inputB)
.stores(Seq("store-name"))
Expand All @@ -103,7 +103,7 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and
classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.topology { builder => builder.stream(...) [...] } // Scala DSL
.input("in-a", strings, ints, inputA)
.stores(Seq("store-name"))
.config(props)
Expand Down Expand Up @@ -137,7 +137,7 @@ Sometimes you need to pass a custom configuration to Kafka Streams:
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
.topology { builder => builder.stream(...) [...] } // Scala DSL
.config(props)
.input("in-a", strings, ints, inputA)
.input("in-b", strings, ints, inputB)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "2.2.0",
version := "3.1.0",
scalaVersion := "2.12.7",
crossScalaVersions := Seq("2.12.7", "2.11.12"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology, TopologyTestDriver => Driver}
import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver => Driver}
import org.apache.kafka.streams.scala.StreamsBuilder

import scala.collection.JavaConverters._
import scala.collection.immutable
Expand Down
172 changes: 76 additions & 96 deletions src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package com.madewithtea.mockedstreams

import java.lang
import java.time.{Duration, Instant}
import java.time.Instant

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.{Materialized, TimeWindows}
import org.apache.kafka.streams.processor.TimestampExtractor
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes.{Integer => intSerde, String => stringSerde}
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KTable
import org.scalatest.{FlatSpec, Matchers}

class MockedStreamsSpec extends FlatSpec with Matchers {
Expand Down Expand Up @@ -67,7 +70,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
t.windowStateTable("window-state-table", 0)

an[NoInputSpecified] should be thrownBy
t.windowStateTable("window-state-table", 0, Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue))
t.windowStateTable("window-state-table", 0,
Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue))
}

it should "assert correctly when processing strings to uppercase" in {
Expand Down Expand Up @@ -126,9 +130,9 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
it should "assert correctly when joining events sent to 2 Ktables in a specific order" in {
import Fixtures.Multi._

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))
val firstInputForTopicA = Seq(("x", 1), ("y", 2))
val firstInputForTopicB = Seq(("x", 4), ("y", 3), ("y", 5))
val secondInputForTopicA = Seq(("y", 4))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

Expand Down Expand Up @@ -163,10 +167,12 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
builder.windowStateTable(StoreName, "y")
.shouldEqual(expectedCy.toMap)

builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue))
builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue),
Instant.ofEpochMilli(Long.MaxValue))
.shouldEqual(expectedCx.toMap)

builder.windowStateTable(StoreName, "y", Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue))
builder.windowStateTable(StoreName, "y", Instant.ofEpochMilli(Long.MinValue),
Instant.ofEpochMilli(Long.MaxValue))
.shouldEqual(expectedCy.toMap)
}

Expand All @@ -187,7 +193,6 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
output shouldEqual expected
}


it should "accept consumer records with custom timestamps" in {

import Fixtures.Multi._
Expand All @@ -200,63 +205,51 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
builder.windowStateTable(StoreName, "x")
.shouldEqual(expectedCWithTimeStamps.toMap)

builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue))
builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue),
Instant.ofEpochMilli(Long.MaxValue))
.shouldEqual(expectedCWithTimeStamps.toMap)
}

class LastInitializer extends Initializer[Integer] {
override def apply() = 0
}

class LastAggregator extends Aggregator[String, Integer, Integer] {
override def apply(k: String, v: Integer, t: Integer): Integer = v
}

class AddJoiner extends ValueJoiner[Integer, Integer, Integer] {
override def apply(v1: Integer, v2: Integer): Integer = v1 + v2
}
object Fixtures {
object Operations {
val lastAggregator = (_: String, v: Int, _: Int) => v

class SubJoiner extends ValueJoiner[Integer, Integer, Integer] {
override def apply(v1: Integer, v2: Integer): Integer = v1 - v2
}
val addJoiner = (v1: Int, v2: Int) => v1 + v2

object Fixtures {
val subJoiner = (v1: Int, v2: Int) => v1 - v2
}

object Uppercase {
val input = Seq(("x", "v1"), ("y", "v2"))
val expected = Seq(("x", "V1"), ("y", "V2"))

val strings: Serde[String] = Serdes.String()
val serdes: Consumed[String, String] = Consumed.`with`(strings, strings)
val strings: Serde[String] = stringSerde

val InputTopic = "input"
val OutputTopic = "output"

def topology(builder: StreamsBuilder): Unit = {
builder.stream(InputTopic, serdes)
.map[String, String]((k, v) => new KeyValue(k, v.toUpperCase))
.to(OutputTopic, Produced.`with`(strings, strings))
def topology(builder: StreamsBuilder) = {
builder.stream[String, String](InputTopic)
.map((k, v) => (k, v.toUpperCase))
.to(OutputTopic)
}
}

object Multi {

def int(i: Int): Integer = Integer.valueOf(i)

val inputA = Seq(("x", int(1)), ("y", int(2)))
val inputB = Seq(("x", int(4)), ("y", int(3)))
val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1)))
val inputA = Seq(("x", 1), ("y", 2))
val inputB = Seq(("x", 4), ("y", 3))
val inputC = Seq(("x", 1), ("x", 1), ("x", 2), ("y", 1))

val inputCWithTimeStamps = Seq(
("x", int(1), 1000L),
("x", int(1), 1000L),
("x", int(1), 1001L),
("x", int(1), 1001L),
("x", int(1), 1002L)
("x", 1, 1000L),
("x", 1, 1000L),
("x", 1, 1001L),
("x", 1, 1001L),
("x", 1, 1002L)
)

val expectedA = Seq(("x", int(5)), ("y", int(5)))
val expectedB = Seq(("x", int(3)), ("y", int(1)))
val expectedA = Seq(("x", 5), ("y", 5))
val expectedB = Seq(("x", 3), ("y", 1))

val expectedCx = Seq((1, 2), (2, 1))
val expectedCy = Seq((1, 1))
Expand All @@ -267,9 +260,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
1002 -> 1
)

val strings: Serde[String] = Serdes.String()
val ints: Serde[Integer] = Serdes.Integer()
val serdes: Consumed[String, Integer] = Consumed.`with`(strings, ints)
val strings: Serde[String] = stringSerde
val ints: Serde[Int] = intSerde

val InputATopic = "inputA"
val InputBTopic = "inputB"
Expand All @@ -279,72 +271,60 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
val StoreName = "store"
val Store2Name = "store2"

def topology1Output(builder: StreamsBuilder): Unit = {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)
def topology1Output(builder: StreamsBuilder) = {
val streamA = builder.stream[String, Int](InputATopic)
val streamB = builder.stream[String, Int](InputBTopic)

val table = streamA.groupByKey(Grouped.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)
val table = streamA.groupByKey
.aggregate[Int](0)(Operations.lastAggregator)(
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)

streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputATopic, Produced.`with`(strings, ints))
streamB.leftJoin[Int, Int](table)(Operations.addJoiner)
.to(OutputATopic)
}

def topology1WindowOutput(builder: StreamsBuilder): KTable[Windowed[String], lang.Long] = {
val streamA = builder.stream(InputCTopic, serdes)
streamA.groupByKey(Grouped.`with`(strings, ints))
.windowedBy(TimeWindows.of(Duration.ofMillis(1)))
.count(Materialized.as(StoreName))
def topology1WindowOutput(builder: StreamsBuilder) = {
val streamA = builder.stream[String, Int](InputCTopic)
streamA.groupByKey
.windowedBy(TimeWindows.of(1))
.count()(Materialized.as(StoreName))
}

def topology2Output(builder: StreamsBuilder): Unit = {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)
def topology2Output(builder: StreamsBuilder) = {
val streamA = builder.stream[String, Int](InputATopic)
val streamB = builder.stream[String, Int](InputBTopic)

val table = streamA.groupByKey(Grouped.`with`(strings, ints)).aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints))
val table = streamA.groupByKey
.aggregate(0)(Operations.lastAggregator)(
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)

streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputATopic, Produced.`with`(strings, ints))
streamB.join(table)(Operations.addJoiner)
.to(OutputATopic)

streamB.leftJoin[Integer, Integer](table, new SubJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputBTopic, Produced.`with`(strings, ints))
streamB.leftJoin(table)(Operations.subJoiner)
.to(OutputBTopic)
}

def topologyTables(builder: StreamsBuilder): Unit = {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)
def topologyTables(builder: StreamsBuilder) = {
val streamA = builder.stream[String, Int](InputATopic)
val streamB = builder.stream[String, Int](InputBTopic)

val tableA = streamA.groupByKey(Grouped.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)
val tableA: KTable[String, Int] = streamA.groupByKey
.aggregate[Int](0)(Operations.lastAggregator)

val tableB = streamB.groupByKey(Grouped.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(Store2Name).withKeySerde(strings).withValueSerde(ints)
)
val tableB: KTable[String, Int] = streamB.groupByKey
.aggregate[Int](0)(Operations.lastAggregator)

val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner)
val resultTable: KTable[String, Int] = tableA.join[Int, Int](tableB)(Operations.addJoiner)

resultTable
.toStream
.to(OutputATopic, Produced.`with`(strings, ints))
.to(OutputATopic)
}
}

}

}

object TimestampExtractors {
Expand Down

0 comments on commit 077a3ec

Please sign in to comment.