Skip to content

Commit

Permalink
some readability improvements + bump sbt to 1.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
sv3ndk committed Sep 24, 2017
1 parent e7aa8fc commit 82fca24
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.0.0
sbt.version=1.0.2
25 changes: 13 additions & 12 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,35 @@ object MockedStreams {

def apply() = Builder()

case class Event(topic: String, key: Array[Byte], value: Array[Byte])
case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[(KStreamBuilder => Unit)] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[Event] = List.empty) {

inputs: List[Record] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (KStreamBuilder => Unit)) = this.copy(topology = Some(func))

def stores(stores: Seq[String]) = this.copy(stateStores = stores)

def input[K, V](topic: String, key: Serde[K], value: Serde[V], newInput: Seq[(K, V)]) = {
def input[K, V](topic: String, key: Serde[K], value: Serde[V], newRecords: Seq[(K, V)]) = {
val keySer = key.serializer
val valSer = value.serializer

val updatedInputs = newInput.foldLeft(inputs) {
case (events, (k, v)) => Event(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) :: events
val updatedRecords = newRecords.foldLeft(inputs) {
case (events, (k, v)) =>
val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v))
newRecord :: events
}

this.copy(inputs = updatedInputs)
this.copy(inputs = updatedRecords)
}

def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = {
if (size <= 0) throw new ExpectedOutputIsEmpty
withProcessedDriver { driver =>
withProcessedDriver { driver =>
(0 until size).flatMap { i =>
Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match {
case Some(record) => Some((record.key, record.value))
Expand All @@ -67,10 +68,10 @@ object MockedStreams {
}
}

def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] =
output[K, V](topic, key, value, size).toMap

def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver =>
val records = driver.getKeyValueStore(name).all()
val list = records.asScala.toList.map { record => (record.key, record.value) }
records.close()
Expand Down Expand Up @@ -103,9 +104,9 @@ object MockedStreams {
new Driver(new StreamsConfig(props), builder)
}

private def produce(driver: Driver) = {
private def produce(driver: Driver): Unit = {
inputs.reverse.foreach{
case Event(topic, key, value) =>
case Record(topic, key, value) =>
driver.process(topic, key, value)
}
}
Expand Down

0 comments on commit 82fca24

Please sign in to comment.