diff --git a/.travis.yml b/.travis.yml index c984b67..24eb9d0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,4 +2,11 @@ language: scala scala: - 2.11.8 +jdk: + - oraclejdk8 +script: + - sbt clean coverage test coverageReport + +after_success: + - bash <(curl -s https://codecov.io/bash) -t "4b854b7d-2c0e-42b8-80dd-4d2bd4a60535" diff --git a/CHANGELOG.md b/CHANGELOG.md index b3215bb..2cb3da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Mocked Streams 1.2.1 + +* Build against Apache Kafka 0.10.2.1 +* Added calling of clean up method after driver run +* Updated ScalaTest version to 3.0.2 +* Updated Scala Versions +* Added CodeCov and SCoverage coverage report + ## Mocked Streams 1.2.0 * Build against Apache Kafka 0.10.2 diff --git a/README.md b/README.md index c534f85..e0297c5 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,17 @@ # Mocked Streams -[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) +[![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.2.1 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.1" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | | 1.0.0 | 0.10.1.0 | diff --git a/build.sbt b/build.sbt index 01452bb..ee831d8 100644 --- a/build.sbt +++ b/build.sbt @@ -1,16 +1,16 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.2.0", - scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), + version := "1.2.1", + scalaVersion := "2.11.11", + crossScalaVersions := Seq("2.12.2","2.11.11"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val scalaTestVersion = "3.0.1" +val scalaTestVersion = "3.0.2" val rocksDBVersion = "5.0.1" -val kafkaVersion = "0.10.2.0" +val kafkaVersion = "0.10.2.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/project/plugins.sbt b/project/plugins.sbt index 26fd978..92096cd 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1 +1,2 @@ +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index f119c85..4b5a6f4 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.streams.state.ReadOnlyWindowStore -import org.apache.kafka.test.ProcessorTopologyTestDriver +import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver} import scala.collection.JavaConverters._ @@ -50,12 +50,9 @@ object MockedStreams { this.copy(inputs = inputs + (topic -> Input(in))) } - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput { - if (size <= 0) throw new ExpectedOutputIsEmpty - - val driver = stream - produce(driver) - + def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { + if (size <= 0) throw new ExpectedOutputIsEmpty + withProcessedDriver { driver => (0 until size).flatMap { i => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) @@ -63,28 +60,20 @@ object MockedStreams { } } } - - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = { - output[K, V](topic, key, value, size).toMap } - def stateTable(name: String): Map[Nothing, Nothing] = withValidInput { - val driver = stream - produce(driver) + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + output[K, V](topic, key, value, size).toMap - val store = driver.getKeyValueStore(name) - val records = store.all() + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => + val records = driver.getKeyValueStore(name).all() val list = records.asScala.toList.map { record => (record.key, record.value) } records.close() list.toMap } def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, - timeTo: Long = Long.MaxValue) = withValidInput { - - val driver = stream - produce(driver) - + timeTo: Long = Long.MaxValue) = withProcessedDriver { driver => val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] val records = store.fetch(key, timeFrom, timeTo) val list = records.asScala.toList.map { record => (record.key, record.value) } @@ -106,10 +95,10 @@ object MockedStreams { case _ => throw new NoTopologySpecified } - new ProcessorTopologyTestDriver(new StreamsConfig(props), builder, stateStores: _*) + new Driver(new StreamsConfig(props), builder, stateStores: _*) } - private def produce(driver: ProcessorTopologyTestDriver) = { + private def produce(driver: Driver) = { inputs.foreach { case (topic, input) => input.seq.foreach { case (key, value) => driver.process(topic, key, value) @@ -117,12 +106,16 @@ object MockedStreams { } } - private def withValidInput[T](f: => T): T = { - if (inputs.isEmpty) + private def withProcessedDriver[T](f: Driver => T): T = { + if(inputs.isEmpty) throw new NoInputSpecified - f - } + val driver = stream + produce(driver) + val result: T = f(driver) + driver.close + result + } } class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.")