Skip to content

Commit

Permalink
Merge pull request #20 from sv3nd/svend-event-order
Browse files Browse the repository at this point in the history
Now allows to fine control the test events order
  • Loading branch information
jpzk authored Sep 26, 2017
2 parents 0fb39dc + 82fca24 commit cdbd13d
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

* Hamidreza Afzali
* Jendrik Poloczek

* Svend Vanderveken
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,26 @@ It also allows you to have multiple input and output streams. If your topology u
mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

## Event order and multiple emissions

The events provided to the mock stream will be submitted to the topology during the test in the order in which they appear in the fixture. You can also submit events multiple times to the same topics, at various moments in your scenario.

This can be handy to validate that your topology behaviour is or is not dependent on the order in which the events are received and processed.

In the example below, 2 events are first submitted to topic A, then 3 to topic B, then 1 more to topic A again.

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

## State Store

When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy val commonSettings = Seq(

val scalaTestVersion = "3.0.2"
val rocksDBVersion = "5.0.1"
val kafkaVersion = "0.11.0.0"
val kafkaVersion = "0.11.0.1"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
Expand Down
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.0.2
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
31 changes: 17 additions & 14 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,35 @@ object MockedStreams {

def apply() = Builder()

case class Input(seq: Seq[(Array[Byte], Array[Byte])])
case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(KStreamBuilder => Unit)] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: Map[String, Input] = Map()) {
inputs: List[Record] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (KStreamBuilder => Unit)) = this.copy(topology = Some(func))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

def input[K, V](topic: String, key: Serde[K], value: Serde[V], seq: Seq[(K, V)]) = {
def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = {
val keySer = key.serializer
val valSer = value.serializer
val in = seq.map { case (k, v) => (keySer.serialize(topic, k), valSer.serialize(topic, v)) }
this.copy(inputs = inputs + (topic -> Input(in)))

val updatedRecords = newRecords.foldLeft(inputs) {
case (events, (k, v)) =>
val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v))
newRecord :: events
}

this.copy(inputs = updatedRecords)
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0) throw new ExpectedOutputIsEmpty
withProcessedDriver { driver =>
withProcessedDriver { driver =>
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
Expand All @@ -62,10 +68,10 @@ object MockedStreams {
}
}

def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
output[K, V](topic, key, value, size).toMap

def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
Expand Down Expand Up @@ -98,11 +104,10 @@ object MockedStreams {
new Driver(new StreamsConfig(props), builder)
}

private def produce(driver: Driver) = {
inputs.foreach { case (topic, input) =>
input.seq.foreach { case (key, value) =>
private def produce(driver: Driver): Unit = {
inputs.reverse.foreach{
case Record(topic, key, value) =>
driver.process(topic, key, value)
}
}
}

Expand All @@ -125,5 +130,3 @@ object MockedStreams {
class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.")

}


43 changes: 43 additions & 0 deletions src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
builder.stateTable(StoreName) shouldEqual inputA.toMap
}

it should "assert correctly when joining events sent to 2 Ktables in a specific order" in {
import Fixtures.Multi._

val firstInputForTopicA = Seq(("x", int(1)), ("y", int(2)))
val firstInputForTopicB = Seq(("x", int(4)), ("y", int(3)), ("y", int(5)))
val secondInputForTopicA = Seq(("y", int(4)))

val expectedOutput = Seq(("x", 5), ("y", 5), ("y", 7), ("y", 9))

val builder = MockedStreams()
.topology(topologyTables)
.input(InputATopic, strings, ints, firstInputForTopicA)
.input(InputBTopic, strings, ints, firstInputForTopicB)
.input(InputATopic, strings, ints, secondInputForTopicA)

builder.output(OutputATopic, strings, ints, expectedOutput.size)
.shouldEqual(expectedOutput)
}

it should "assert correctly when processing windowed state output topology" in {
import Fixtures.Multi._

Expand Down Expand Up @@ -194,6 +213,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
val OutputATopic = "outputA"
val OutputBTopic = "outputB"
val StoreName = "store"
val Store2Name = "store2"

def topology1Output(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
Expand Down Expand Up @@ -230,6 +250,29 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints)
.to(strings, ints, OutputBTopic)
}

def topologyTables(builder: KStreamBuilder) = {
val streamA = builder.stream(strings, ints, InputATopic)
val streamB = builder.stream(strings, ints, InputBTopic)

val tableA = streamA.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
StoreName)

val tableB = streamB.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
Store2Name)

val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner)

resultTable
.toStream
.to(strings, ints, OutputATopic)
}
}

}
Expand Down

0 comments on commit cdbd13d

Please sign in to comment.