diff --git a/.gitignore b/.gitignore index 8f9609c..6c1dd92 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ target .idea .bloop .metals +project/metals.sbt diff --git a/CHANGELOG.md b/CHANGELOG.md index d6e5a87..e2da5da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## Mocked Streams 3.6 + +* Changed internal code to interface with new 2.4 TopologyTestDriver methods (https://issues.apache.org/jira/browse/KAFKA-8233,https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements) +* Changed deprecated (from => 2.4.0) calls to Kafka Streams API (https://issues.apache.org/jira/browse/KAFKA-7277) +* Added input timestamps can be specified in Long and in Instant (https://issues.apache.org/jira/browse/KAFKA-7277) +* Removed expected size in new .output and .outputTable methods +* Deprecated output methods with size parameters (not needed anymore) + ## Mocked Streams 3.5 * Added support for Apache 2.4.0 diff --git a/README.md b/README.md index b723425..f3154dc 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,9 @@ Documentation located at http://mockedstreams.madewithtea.com/ -Mocked Streams 3.5.2 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.12 and 2.13 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 3.6.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.12 and 2.13 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.5.2" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.6.0" % "test" Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka) @@ -16,6 +16,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 3.6.0 | 2.4.1.0 | | 3.5.2 | 2.4.0.0 | | 3.5.1 | 2.4.0.0 | | 3.5.0 | 2.4.0.0 | @@ -50,7 +51,7 @@ It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/ap MockedStreams() .topology { builder => builder.stream(...) [...] } // Scala DSL .input("topic-in", strings, strings, input) - .output("topic-out", strings, strings, exp.size) shouldEqual exp + .output("topic-out", strings, strings) shouldEqual exp ## Multiple Input / Output Example and State @@ -64,8 +65,8 @@ It also allows you to have multiple input and output streams. If your topology u .input("in-b", strings, ints, inputB) .stores(Seq("store-name")) - mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) - mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) + mstreams.output("out-a", strings, ints) shouldEqual(expectedA) + mstreams.output("out-b", strings, ints) shouldEqual(expectedB) ## Record order and multiple emissions @@ -152,8 +153,8 @@ Sometimes you need to pass a custom configuration to Kafka Streams: .input("in-b", strings, ints, inputB) .stores(Seq("store-name")) - mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) - mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) + mstreams.output("out-a", strings, ints) shouldEqual(expectedA) + mstreams.output("out-b", strings, ints) shouldEqual(expectedB) ## Companies using Mocked Streams diff --git a/build.sbt b/build.sbt index 058952a..3161d60 100644 --- a/build.sbt +++ b/build.sbt @@ -1,16 +1,22 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", +<<<<<<< HEAD + version := "3.6.0", + scalaVersion := "2.13.1", + crossScalaVersions := List("2.12.10", "2.13.1"), +======= version := "3.5.2", scalaVersion := "2.13.1", crossScalaVersions := List("2.12.11", "2.13.1"), +>>>>>>> master description := "Topology Unit-Testing Library for Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), - scalacOptions := Seq("-Xexperimental") + scalacOptions := Seq("-deprecation","-feature") ) val scalaTestVersion = "3.0.8" -val rocksDBVersion = "5.18.3" -val kafkaVersion = "2.4.0" +val rocksDBVersion = "5.18.4" +val kafkaVersion = "2.4.1" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, @@ -24,6 +30,8 @@ lazy val kafka = Seq( lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test" lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test" +Global / useGpg := false + lazy val mockedstreams = (project in file(".")) .settings(commonSettings: _*) .settings( @@ -84,6 +92,7 @@ micrositeTwitterCreator := "@madewithtea" micrositeCompilingDocsTool := WithMdoc micrositeShareOnSocial := true + lazy val docs = project // new documentation project .in(file("ms-docs")) // important: it must not be docs/ .dependsOn(mockedstreams) diff --git a/docs/docs/compatibility.md b/docs/docs/compatibility.md index 5bf8b60..f676cfe 100644 --- a/docs/docs/compatibility.md +++ b/docs/docs/compatibility.md @@ -9,6 +9,7 @@ Please use the corresponding Mocked Streams version to a concrete Apache Kafka v | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 3.6.0 | 2.4.1.0 | | 3.5.2 | 2.4.0.0 | | 3.5.1 | 2.4.0.0 | | 3.5.0 | 2.4.0.0 | diff --git a/docs/docs/custom.md b/docs/docs/custom.md index 09f8781..896c687 100644 --- a/docs/docs/custom.md +++ b/docs/docs/custom.md @@ -19,6 +19,6 @@ Sometimes you need to pass a custom configuration to Kafka Streams: .input("in-b", strings, ints, inputB) .stores(Seq("store-name")) - mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) - mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) + mstreams.output("out-a", strings, ints) shouldEqual(expectedA) + mstreams.output("out-b", strings, ints) shouldEqual(expectedB) \ No newline at end of file diff --git a/docs/docs/index.md b/docs/docs/index.md index 7863ec3..4b66d0a 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -16,6 +16,6 @@ It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/ap MockedStreams() .topology { builder => builder.stream(...) [...] } // Scala DSL .input("topic-in", strings, strings, input) - .output("topic-out", strings, strings, exp.size) shouldEqual exp + .output("topic-out", strings, strings) shouldEqual exp diff --git a/docs/docs/multiple.md b/docs/docs/multiple.md index 579f3dc..5288c5e 100644 --- a/docs/docs/multiple.md +++ b/docs/docs/multiple.md @@ -15,5 +15,5 @@ It also allows you to have multiple input and output streams. If your topology u .input("in-b", strings, ints, inputB) .stores(Seq("store-name")) - mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) - mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) \ No newline at end of file + mstreams.output("out-a", strings, ints) shouldEqual(expectedA) + mstreams.output("out-b", strings, ints) shouldEqual(expectedB) \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index 03d47d3..cb274f1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -11,9 +11,9 @@ technologies: [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) -Mocked Streams 3.5.2 is a library for Scala 2.12 and Scala 2.13 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 3.6.0 is a library for Scala 2.12 and Scala 2.13 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.5.2" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.6.0" % "test" ## Getting Started @@ -28,7 +28,7 @@ It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/ap MockedStreams() .topology { builder => builder.stream(...) [...] } // Scala DSL .input("topic-in", strings, strings, input) - .output("topic-out", strings, strings, exp.size) shouldEqual exp + .output("topic-out", strings, strings) shouldEqual exp ## Apache Kafka Compatibility @@ -36,6 +36,7 @@ Please use the corresponding Mocked Streams version to a concrete Apache Kafka v | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 3.6.0 | 2.4.1.0 | | 3.5.2 | 2.4.0.0 | | 3.5.1 | 2.4.0.0 | | 3.5.0 | 2.4.0.0 | @@ -45,7 +46,7 @@ Please use the corresponding Mocked Streams version to a concrete Apache Kafka v | 3.1.0 | 2.1.0.0 | | 2.2.0 | 2.1.0.0 | | 2.1.0 | 2.0.0.0 | - 2.0.0 | 2.0.0.0 | +| 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | diff --git a/project/plugins.sbt b/project/plugins.sbt index 89fde1e..e8bd468 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0") addSbtPlugin("com.47deg" % "sbt-microsites" % "1.1.5") addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.1.5") diff --git a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala index 9fb4cda..b2705c1 100644 --- a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala +++ b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala @@ -16,13 +16,14 @@ */ package com.madewithtea.mockedstreams -import java.time.Instant +import java.time.{Duration, Instant} import java.util.{Properties, UUID} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.state.ValueAndTimestamp +import org.apache.kafka.streams.{TestInputTopic, TestOutputTopic} import org.apache.kafka.streams.test.ConsumerRecordFactory import org.apache.kafka.streams.{ StreamsConfig, @@ -30,36 +31,66 @@ import org.apache.kafka.streams.{ TopologyTestDriver => Driver } -import scala.collection.JavaConverters._ -import scala.collection.immutable +import scala.jdk.CollectionConverters._ +import scala.language.implicitConversions object MockedStreams { def apply() = Builder() + sealed trait Input + case class Record(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]) + extends Input + + implicit def recordsInstant[K, V](list: Seq[(K, V, Instant)]) = + RecordsInstant(list) + implicit def recordsLong[K, V](list: Seq[(K, V, Long)]) = RecordsLong(list) + + case class RecordsInstant[K, V](seq: Seq[(K, V, Instant)]) + case class RecordsLong[K, V](seq: Seq[(K, V, Long)]) + case class Builder( - topology: Option[() => Topology] = None, configuration: Properties = new Properties(), - stateStores: Seq[String] = Seq(), - inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty + driver: Option[Driver] = None, + stateStores: Seq[String] = Seq() ) { def config(configuration: Properties): Builder = this.copy(configuration = configuration) def topology(func: StreamsBuilder => Unit): Builder = { - val buildTopology = () => { - val builder = new StreamsBuilder() - func(builder) - builder.build() - } - this.copy(topology = Some(buildTopology)) + val builder = new StreamsBuilder() + func(builder) + val topology = builder.build() + withTopology(() => topology) } - def withTopology(t: () => Topology): Builder = this.copy(topology = Some(t)) + def withTopology(t: () => Topology): Builder = { + val props = new Properties + props.put( + StreamsConfig.APPLICATION_ID_CONFIG, + s"mocked-${UUID.randomUUID().toString}" + ) + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + configuration.asScala.foreach { case (k, v) => props.put(k, v) } + + this.copy( + driver = Some( + new Driver(t(), props) + ) + ) + } + + def withDriver[A](f: Driver => A) = driver match { + case Some(d) => f(d) + case None => throw new TopologyNotSet + } def stores(stores: Seq[String]): Builder = this.copy(stateStores = stores) + /** + * @throws TopologyNotSet if called before setting topology + */ def input[K, V]( topic: String, key: Serde[K], @@ -68,142 +99,157 @@ object MockedStreams { ): Builder = _input(topic, key, value, Left(records)) + /** + * @throws TopologyNotSet if called before setting topology + */ + def inputWithTime[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: RecordsLong[K, V] + ): Builder = _input[K, V](topic, key, value, Right(records.seq)) + + /** + * @throws TopologyNotSet if called before setting topology + */ def inputWithTime[K, V]( topic: String, key: Serde[K], value: Serde[V], - records: Seq[(K, V, Long)] + records: RecordsInstant[K, V] ): Builder = - _input(topic, key, value, Right(records)) + _input(topic, key, value, Right(records.seq.map { + case (k, v, t) => (k, v, t.toEpochMilli()) + })) + /** + * @throws TopologyNotSet if called before setting topology + */ def output[K, V]( topic: String, key: Serde[K], - value: Serde[V], - size: Int - ): immutable.IndexedSeq[(K, V)] = { - if (size <= 0) throw new ExpectedOutputIsEmpty - withProcessedDriver { driver => - (0 until size).flatMap { _ => - Option(driver.readOutput(topic, key.deserializer, value.deserializer)) - .map(r => (r.key, r.value)) + value: Serde[V] + ): Seq[(K, V)] = withDriver { driver => + val testTopic = driver + .createOutputTopic(topic, key.deserializer(), value.deserializer()) + testTopic + .readRecordsToList() + .asScala + .map { tr => + (tr.getKey(), tr.getValue()) } - } + .toSeq } + /** + * @throws TopologyNotSet if called before setting topology + */ + def outputTable[K, V]( + topic: String, + key: Serde[K], + value: Serde[V] + ): Map[K, V] = + output[K, V](topic, key, value).toMap + + + /** + * @throws TopologyNotSet if called before setting topology + * @deprecated Use without size argument instead + */ + def output[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + size: Int + ): Seq[(K, V)] = output(topic, key, value) + + /** + * @throws TopologyNotSet if called before setting topology + * @deprecated Use without size argument instead + */ def outputTable[K, V]( topic: String, key: Serde[K], value: Serde[V], size: Int - ): Map[K, V] = - output[K, V](topic, key, value, size).toMap + ): Map[K, V] = output(topic, key, value).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { - driver => - val records = driver.getKeyValueStore(name).all() - val list = records.asScala.toList.map { record => - (record.key, record.value) - } - records.close() - list.toMap + /** + * @throws TopologyNotSet if called before setting topology + */ + def stateTable(name: String): Map[Nothing, Nothing] = withDriver { driver => + val records = driver.getKeyValueStore(name).all() + val list = records.asScala.toList.map { record => + (record.key, record.value) + } + records.close() + list.toMap } /** - * @throws lllegalArgumentException if duration is negative or can't be represented as long milliseconds + * @throws TopologyNotSet if called before setting topology */ def windowStateTable[K, V]( name: String, key: K, timeFrom: Long = 0, timeTo: Long = Long.MaxValue - ): Map[java.lang.Long, ValueAndTimestamp[V]] = { + ): Map[java.lang.Long, ValueAndTimestamp[V]] = windowStateTable[K, V]( name, key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo) ) - } /** - * @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds + * @throws TopologyNotSet if called before setting topology */ def windowStateTable[K, V]( name: String, key: K, timeFrom: Instant, timeTo: Instant - ): Map[java.lang.Long, ValueAndTimestamp[V]] = - withProcessedDriver { driver => - val store = driver.getTimestampedWindowStore[K, V](name) - val records = store.fetch(key, timeFrom, timeTo) - val list = records.asScala.toList.map { record => - (record.key, record.value) - } - records.close() - list.toMap + ): Map[java.lang.Long, ValueAndTimestamp[V]] = withDriver { driver => + val store = driver.getTimestampedWindowStore[K, V](name) + val records = store.fetch(key, timeFrom, timeTo) + val list = records.asScala.toList.map { record => + (record.key, record.value) } + records.close() + list.toMap + } private def _input[K, V]( topic: String, key: Serde[K], value: Serde[V], records: Either[Seq[(K, V)], Seq[(K, V, Long)]] - ) = { - val keySer = key.serializer - val valSer = value.serializer - val factory = new ConsumerRecordFactory[K, V](keySer, valSer) - + ) = withDriver { driver => + val testTopic = + driver.createInputTopic(topic, key.serializer, value.serializer) val updatedRecords = records match { case Left(withoutTime) => - withoutTime.foldLeft(inputs) { - case (events, (k, v)) => events :+ factory.create(topic, k, v) + withoutTime.foreach { + case (k, v) => testTopic.pipeInput(k, v) } case Right(withTime) => - withTime.foldLeft(inputs) { - case (events, (k, v, timestamp)) => - events :+ factory.create(topic, k, v, timestamp) + withTime.foreach { + case (k, v, timestamp) => + testTopic.pipeInput(k, v, timestamp) } } - this.copy(inputs = updatedRecords) - } - - // state store is temporarily created in ProcessorTopologyTestDriver - private def stream = { - val props = new Properties - props.put( - StreamsConfig.APPLICATION_ID_CONFIG, - s"mocked-${UUID.randomUUID().toString}" - ) - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - configuration.asScala.foreach { case (k, v) => props.put(k, v) } - new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props) - } - - private def produce(driver: Driver): Unit = - inputs.foreach(driver.pipeInput) - - private def withProcessedDriver[T](f: Driver => T): T = { - if (inputs.isEmpty) throw new NoInputSpecified - - val driver = stream - produce(driver) - val result: T = f(driver) - driver.close() - result + this } } + class TopologyNotSet + extends IllegalArgumentException( + "Call a topology method before inputs, outputs and state store methods. Changed in Mocked Streams >= 3.6.0" + ) + class NoTopologySpecified - extends Exception("No topology specified. Call topology() on builder.") - - class NoInputSpecified - extends Exception( - "No input fixtures specified. Call input() method on builder." + extends IllegalArgumentException( + "No topology specified. Call topology() on builder." ) - - class ExpectedOutputIsEmpty - extends Exception("Output size needs to be greater than 0.") - } diff --git a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala index 82f9088..bbe9ae5 100644 --- a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala +++ b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala @@ -32,52 +32,38 @@ import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.kstream.KTable import org.apache.kafka.streams.state.ValueAndTimestamp import org.scalatest.{FlatSpec, Matchers} +import java.time.Duration +import org.apache.kafka.streams.processor.ProcessorContext +import com.madewithtea.mockedstreams.MockedStreams.TopologyNotSet class MockedStreamsSpec extends FlatSpec with Matchers { import CustomEquality._ behavior of "MockedStreams" - it should "throw exception when expected size in output methods is <= 0" in { - import Fixtures.Uppercase._ - import MockedStreams.ExpectedOutputIsEmpty - - val spec = MockedStreams() - .topology(topology) - .input(InputTopic, strings, strings, input) - - Seq(-1, 0).foreach { size => - an[ExpectedOutputIsEmpty] should be thrownBy - spec.output(OutputTopic, strings, strings, size) - - an[ExpectedOutputIsEmpty] should be thrownBy - spec.outputTable(OutputTopic, strings, strings, size) - } + it should "throw exception when inputs specified before topology" in { + an[TopologyNotSet] should be thrownBy + MockedStreams().input("input", stringSerde, stringSerde, Seq()) } - it should "throw exception when no input specified for all output and state methods" in { - import Fixtures.Uppercase._ - import MockedStreams.NoInputSpecified - - val t = MockedStreams().topology(topology) - - an[NoInputSpecified] should be thrownBy - t.output(OutputTopic, strings, strings, expected.size) - - an[NoInputSpecified] should be thrownBy - t.outputTable(OutputTopic, strings, strings, expected.size) - - an[NoInputSpecified] should be thrownBy - t.stateTable("state-table") + it should "throw exception when outputs specified before topology" in { + an[TopologyNotSet] should be thrownBy + MockedStreams().output("output", stringSerde, stringSerde) + } - an[NoInputSpecified] should be thrownBy - t.windowStateTable("window-state-table", 0) + it should "throw exception when outputTable specified before topology" in { + an[TopologyNotSet] should be thrownBy + MockedStreams().outputTable("output", stringSerde, stringSerde) + } - an[NoInputSpecified] should be thrownBy - t.windowStateTable("window-state-table", - 0, - Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) + it should "throw exception state store access before topology " in { + an[TopologyNotSet] should be thrownBy + MockedStreams().windowStateTable( + "table", + "key", + Instant.now(), + Instant.now().plusMillis(1) + ) } it should "assert correctly when processing strings to uppercase" in { @@ -86,7 +72,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val output = MockedStreams() .topology(topology) .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, expected.size) + .output(OutputTopic, strings, strings) output shouldEqual expected } @@ -97,7 +83,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val output = MockedStreams() .topology(topology) .input(InputTopic, strings, strings, input) - .outputTable(OutputTopic, strings, strings, expected.size) + .outputTable(OutputTopic, strings, strings) output shouldEqual expected.toMap } @@ -111,7 +97,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) - builder.output(OutputATopic, strings, ints, expectedA.size) shouldEqual expectedA + builder.output(OutputATopic, strings, ints) shouldEqual expectedA builder.stateTable(StoreName) shouldEqual inputA.toMap } @@ -125,11 +111,11 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .stores(Seq(StoreName)) builder - .output(OutputATopic, strings, ints, expectedA.size) + .output(OutputATopic, strings, ints) .shouldEqual(expectedA) builder - .output(OutputBTopic, strings, ints, expectedB.size) + .output(OutputBTopic, strings, ints) .shouldEqual(expectedB) builder.stateTable(StoreName) shouldEqual inputA.toMap @@ -151,7 +137,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputATopic, strings, ints, secondInputForTopicA) builder - .output(OutputATopic, strings, ints, expectedOutput.size) + .output(OutputATopic, strings, ints) .shouldEqual(expectedOutput) } @@ -161,8 +147,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val props = new Properties - props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - classOf[TimestampExtractors.CustomTimestampExtractor].getName) + props.put( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + classOf[TimestampExtractors.CustomTimestampExtractor].getName + ) val builder = MockedStreams() .topology(topology1WindowOutput) @@ -177,21 +165,24 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder .windowStateTable[String, Int](StoreName, "y") .shouldEqual(expectedCy.toMap) - + builder - .windowStateTable[String, Int](StoreName, - "y", - Instant.ofEpochMilli(0L), - Instant.ofEpochMilli(1L)) + .windowStateTable[String, Int]( + StoreName, + "y", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L) + ) .shouldEqual(expectedCy.toMap) - builder - .windowStateTable[String, Int](StoreName, - "x", - Instant.ofEpochMilli(0L), - Instant.ofEpochMilli(1L)) - .shouldEqual(expectedCx.toMap) + .windowStateTable[String, Int]( + StoreName, + "x", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L) + ) + .shouldEqual(expectedCx.toMap) } it should "accept already built topology" in { @@ -206,13 +197,12 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val output = MockedStreams() .withTopology(() => getTopology) .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, expected.size) + .output(OutputTopic, strings, strings) output shouldEqual expected } it should "accept consumer records with custom timestamps" in { - import Fixtures.Multi._ val builder = MockedStreams() @@ -225,10 +215,12 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .shouldEqual(expectedCWithTimeStamps.toMap)(valueAndTimestampEq[Int]) builder - .windowStateTable[String, Long](StoreName, - "x", - Instant.ofEpochMilli(1000L), - Instant.ofEpochMilli(1002L)) + .windowStateTable[String, Long]( + StoreName, + "x", + Instant.ofEpochMilli(1000L), + Instant.ofEpochMilli(1002L) + ) .shouldEqual(expectedCWithTimeStamps.toMap) } @@ -275,8 +267,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val expectedA = Seq(("x", 5), ("y", 5)) val expectedB = Seq(("x", 3), ("y", 1)) - val expectedCx = Seq((1L, ValueAndTimestamp.make(2, 1L)), - (2L, ValueAndTimestamp.make(1, 2L))) + val expectedCx = Seq( + (1L, ValueAndTimestamp.make(2, 1L)), + (2L, ValueAndTimestamp.make(1, 2L)) + ) val expectedCy = Seq((1, 1)) val expectedCWithTimeStamps = Seq( @@ -316,7 +310,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { def topology1WindowOutput(builder: StreamsBuilder) = { val streamA = builder.stream[String, Int](InputCTopic) streamA.groupByKey - .windowedBy(TimeWindows.of(1)) + .windowedBy(TimeWindows.of(Duration.ofMillis(1))) .count()(Materialized.as(StoreName)) } @@ -364,8 +358,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { object TimestampExtractors { class CustomTimestampExtractor extends TimestampExtractor { - override def extract(record: ConsumerRecord[AnyRef, AnyRef], - previous: Long): Long = record.value match { + override def extract( + record: ConsumerRecord[AnyRef, AnyRef], + previous: Long + ): Long = record.value match { case value: Integer => value.toLong case _ => record.timestamp() } @@ -375,9 +371,13 @@ object TimestampExtractors { object CustomEquality { import org.scalactic.Equality - implicit def valueAndTimestampEq[A]: Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] = - new Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] { - override def areEqual(a: Map[java.lang.Long, ValueAndTimestamp[A]], b: Any): Boolean = { + implicit def valueAndTimestampEq[A] + : Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] = + new Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] { + override def areEqual( + a: Map[java.lang.Long, ValueAndTimestamp[A]], + b: Any + ): Boolean = { true } }