From fb49621e4987e80f4dda8a7347730c4492bea7f6 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Mon, 22 May 2017 10:59:14 +0200 Subject: [PATCH 1/7] sbt: changed kafka version and mocked streams version --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 01452bb..488ebf0 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.2.0", + version := "1.2.1", scalaVersion := "2.11.8", crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", @@ -10,7 +10,7 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.1" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.10.2.0" +val kafkaVersion = "0.10.2.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, From d4e6a665bc535c172a463c36cfaa4a6074b4cf44 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Tue, 23 May 2017 08:52:56 +0200 Subject: [PATCH 2/7] added closing of driver after processing --- CHANGELOG.md | 5 ++++ src/main/scala/MockedStreams.scala | 45 +++++++++++++----------------- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3215bb..6a8d811 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## Mocked Streams 1.2.1 + +* Build against Apache Kafka 0.10.2.1 +* Added calling of clean up method after driver run + ## Mocked Streams 1.2.0 * Build against Apache Kafka 0.10.2 diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index f119c85..4b5a6f4 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.streams.state.ReadOnlyWindowStore -import org.apache.kafka.test.ProcessorTopologyTestDriver +import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver} import scala.collection.JavaConverters._ @@ -50,12 +50,9 @@ object MockedStreams { this.copy(inputs = inputs + (topic -> Input(in))) } - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput { - if (size <= 0) throw new ExpectedOutputIsEmpty - - val driver = stream - produce(driver) - + def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { + if (size <= 0) throw new ExpectedOutputIsEmpty + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -63,28 +60,20 @@ object MockedStreams { } } } - - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = { - output[K, V](topic, key, value, size).toMap } - def stateTable(name: String): Map[Nothing, Nothing] = withValidInput { - val driver = stream - produce(driver) + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + output[K, V](topic, key, value, size).toMap - val store = driver.getKeyValueStore(name) - val records = store.all() + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() list.toMap } def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, - timeTo: Long = Long.MaxValue) = withValidInput { - - val driver = stream - produce(driver) - + timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) val list = records.asScala.toList.map { record => (record.key, record.value) } @@ -106,10 +95,10 @@ object MockedStreams { case _ => throw new NoTopologySpecified } - new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStores: _*) + new Driver(new StreamsConfig(props), builder, stateStores: _*) } - private def produce(driver: ProcessorTopologyTestDriver) = { + private def produce(driver: Driver) = { inputs.foreach { case (topic, input) => input.seq.foreach { case (key, value) => driver.process(topic, key, value) @@ -117,12 +106,16 @@ object MockedStreams { } } - private def withValidInput[T](f: => T): T = { - if (inputs.isEmpty) + private def withProcessedDriver[T](f: Driver => T): T = { + if(inputs.isEmpty) throw new NoInputSpecified - f - } + val driver = stream + produce(driver) + val result: T = f(driver) + driver.close + result + } } class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.") From eb62ff9ffbb8baefac9b372940e56ac0c4de4f37 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Tue, 23 May 2017 14:12:45 +0200 Subject: [PATCH 3/7] sbt: added scoverage and codecov --- .travis.yml | 4 ++++ README.md | 5 +++-- project/plugins.sbt | 1 + 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index c984b67..87f1aab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,8 @@ language: scala scala: - 2.11.8 +script: + - sbt clean coverage test coverageReport +after_success: + - bash <(curl -s https://codecov.io/bash) -t "4b854b7d-2c0e-42b8-80dd-4d2bd4a60535" diff --git a/README.md b/README.md index c534f85..d1de627 100644 --- a/README.md +++ b/README.md @@ -3,14 +3,15 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.2.1 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.1" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | | 1.0.0 | 0.10.1.0 | diff --git a/project/plugins.sbt b/project/plugins.sbt index 26fd978..6d1194a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1,2 @@ +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") From 0a0111fc11a2196e97e442e90582ed89298cd839 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Tue, 23 May 2017 14:32:54 +0200 Subject: [PATCH 4/7] Updated README.md; added coverage badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d1de627..e0297c5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Mocked Streams -[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) +[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) Mocked Streams 1.2.1 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): From f4c8748e545aefb42128033930773d55684523b4 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Wed, 24 May 2017 09:47:24 +0200 Subject: [PATCH 5/7] sbt: updated ScalaTest dependency; improved changelog --- CHANGELOG.md | 2 ++ build.sbt | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a8d811..9423479 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ * Build against Apache Kafka 0.10.2.1 * Added calling of clean up method after driver run +* Updated ScalaTest version to 3.0.2 +* Added CodeCov and SCoverage coverage report ## Mocked Streams 1.2.0 diff --git a/build.sbt b/build.sbt index 488ebf0..9936a83 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val commonSettings = Seq( organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val scalaTestVersion = "3.0.1" +val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" val kafkaVersion = "0.10.2.1" From c0b6cce1fd62a6dfa2aa29c5ae54b739fa0194ef Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Wed, 24 May 2017 10:07:45 +0200 Subject: [PATCH 6/7] sbt: updated scoverage to 1.5.0 to be compatible with 2.12 --- CHANGELOG.md | 1 + build.sbt | 4 ++-- project/plugins.sbt | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9423479..2cb3da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * Build against Apache Kafka 0.10.2.1 * Added calling of clean up method after driver run * Updated ScalaTest version to 3.0.2 +* Updated Scala Versions * Added CodeCov and SCoverage coverage report ## Mocked Streams 1.2.0 diff --git a/build.sbt b/build.sbt index 9936a83..ee831d8 100644 --- a/build.sbt +++ b/build.sbt @@ -2,8 +2,8 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", version := "1.2.1", - scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), + scalaVersion := "2.11.11", + crossScalaVersions := Seq("2.12.2","2.11.11"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) diff --git a/project/plugins.sbt b/project/plugins.sbt index 6d1194a..92096cd 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,2 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") From fb416920c62a08524d3de33befd06b2b82d77fbd Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Wed, 24 May 2017 10:20:47 +0200 Subject: [PATCH 7/7] travis: set JDK to 8, compat. issue with ScalaTest 3.0.2 and JDK 7 --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.travis.yml b/.travis.yml index 87f1aab..24eb9d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,6 +2,9 @@ language: scala scala: - 2.11.8 +jdk: + - oraclejdk8 + script: - sbt clean coverage test coverageReport