diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 1581f11..36f149b 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -16,6 +16,7 @@ */ package com.madewithtea.mockedstreams +import collection.JavaConverters._ import java.util.{Properties, UUID} import org.apache.kafka.common.serialization.Serde @@ -70,6 +71,20 @@ object MockedStreams { output[K,V](topic, key, value, size).toMap } + def stateTable(name: String) = { + if (inputs.isEmpty) + throw new NoInputSpecified + + val driver = stream + produce(driver) + + val store = driver.getKeyValueStore(name) + val records = store.all() + val list = records.asScala.toList.map { record => (record.key, record.value) } + records.close() + list.toMap + } + // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 977e50f..5b98e97 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -77,14 +77,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers { it should "assert correctly when processing multi input topology" in { import Fixtures.Multi._ - val output = MockedStreams() + val builder = MockedStreams() .topology(topology1Output _) .input(InputATopic, strings, ints, inputA) .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) - .output(OutputATopic, strings, ints, expectedA.size) - output shouldEqual expectedA + builder.output(OutputATopic, strings, ints, expectedA.size) shouldEqual expectedA + builder.stateTable(StoreName) shouldEqual inputA.toMap } it should "assert correctly when processing multi input output topology" in { @@ -101,6 +101,9 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.output(OutputBTopic, strings, ints, expectedB.size) .shouldEqual(expectedB) + + builder.stateTable(StoreName) + .shouldEqual(inputA.toMap) } class LastInitializer extends Initializer[Integer] {