Skip to content

Commit

Permalink
Merge pull request #9 from hrafzali/feature/state
Browse files Browse the repository at this point in the history
Added stateTable
  • Loading branch information
jpzk authored Feb 13, 2017
2 parents b3f6f8b + 6ed08ed commit 890eeee
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
15 changes: 15 additions & 0 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.madewithtea.mockedstreams

import collection.JavaConverters._
import java.util.{Properties, UUID}

import org.apache.kafka.common.serialization.Serde
Expand Down Expand Up @@ -70,6 +71,20 @@ object MockedStreams {
output[K,V](topic, key, value, size).toMap
}

def stateTable(name: String) = {
if (inputs.isEmpty)
throw new NoInputSpecified

val driver = stream
produce(driver)

val store = driver.getKeyValueStore(name)
val records = store.all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
list.toMap
}

// state store is temporarily created in ProcessorTopologyTestDriver
private def stream = {
val props = new Properties
Expand Down
9 changes: 6 additions & 3 deletions src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
it should "assert correctly when processing multi input topology" in {
import Fixtures.Multi._

val output = MockedStreams()
val builder = MockedStreams()
.topology(topology1Output _)
.input(InputATopic, strings, ints, inputA)
.input(InputBTopic, strings, ints, inputB)
.stores(Seq(StoreName))
.output(OutputATopic, strings, ints, expectedA.size)

output shouldEqual expectedA
builder.output(OutputATopic, strings, ints, expectedA.size) shouldEqual expectedA
builder.stateTable(StoreName) shouldEqual inputA.toMap
}

it should "assert correctly when processing multi input output topology" in {
Expand All @@ -101,6 +101,9 @@ class MockedStreamsSpec extends FlatSpec with Matchers {

builder.output(OutputBTopic, strings, ints, expectedB.size)
.shouldEqual(expectedB)

builder.stateTable(StoreName)
.shouldEqual(inputA.toMap)
}

class LastInitializer extends Initializer[Integer] {
Expand Down

0 comments on commit 890eeee

Please sign in to comment.