Skip to content

Commit

Permalink
Kafka sink
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Aug 22, 2023
1 parent e72809d commit c5072a0
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
20 changes: 18 additions & 2 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ trait SourceOps[+T] { this: Source[T] =>

//

/** Invokes the given function for each received element. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
*/
def foreach(f: T => Unit): Unit =
repeatWhile {
receive() match
Expand All @@ -176,19 +180,31 @@ trait SourceOps[+T] { this: Source[T] =>
case t: T @unchecked => f(t); true
}

/** Accumulates all elements received from the channel into a list. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
*/
def toList: List[T] =
val b = List.newBuilder[T]
foreach(b += _)
b.result()

/** Passes each received element from this channel to the given sink. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error, or when the sink is closed.
*/
def pipeTo(sink: Sink[T]): Unit =
repeatWhile {
receive() match
case ChannelClosed.Done => sink.done(); false
case ChannelClosed.Error(r) => sink.error(r); false
case t: T @unchecked => sink.send(t).isValue
case e: ChannelClosed.Error => sink.error(e.reason); throw e.toThrowable
case t: T @unchecked => sink.send(t).orThrow; true
}

/** Receives all elements from the channel. Blocks until the channel is done.
* @throws ChannelClosedException
* when there is an upstream error.
*/
def drain(): Unit = foreach(_ => ())
}

Expand Down
9 changes: 4 additions & 5 deletions kafka/src/main/scala/ox/kafka/KafkaSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import ox.channels.{Source, StageCapacity}
import scala.jdk.CollectionConverters.*
import scala.util.control.NonFatal

object KafkaSource {
object KafkaSource:
def subscribe[K, V](settings: ConsumerSettings[K, V], topic: String, otherTopics: String*)(using
StageCapacity,
Ox
): Source[ConsumerRecord[K, V]] =
subscribe(
new KafkaConsumer(settings.toProperties, settings.keyDeserializer, settings.valueDeserializer),
closeWhenDone = true,
closeWhenComplete = true,
topic,
otherTopics: _*
)

def subscribe[K, V](kafkaConsumer: KafkaConsumer[K, V], closeWhenDone: Boolean, topic: String, otherTopics: String*)(using
def subscribe[K, V](kafkaConsumer: KafkaConsumer[K, V], closeWhenComplete: Boolean, topic: String, otherTopics: String*)(using
StageCapacity,
Ox
): Source[ConsumerRecord[K, V]] =
Expand All @@ -32,8 +32,7 @@ object KafkaSource {
val records = kafkaConsumer.poll(java.time.Duration.ofMillis(100))
records.forEach(c.send)
catch case NonFatal(e) => c.error(e)
finally if closeWhenDone then kafkaConsumer.close()
finally if closeWhenComplete then kafkaConsumer.close()
}

c
}
28 changes: 28 additions & 0 deletions kafka/src/main/scala/ox/kafka/ProducerSettings.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ox.kafka

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}

import java.util.Properties

case class ProducerSettings[K, V](
bootstrapServers: List[String],
keySerializer: Serializer[K],
valueSerializer: Serializer[V],
otherProperties: Map[String, String]
):
def bootstrapServers(servers: String*): ProducerSettings[K, V] = copy(bootstrapServers = servers.toList)
def keySerializer[KK](serializer: Serializer[KK]): ProducerSettings[KK, V] = copy(keySerializer = serializer)
def valueSerializer[VV](serializer: Serializer[VV]): ProducerSettings[K, VV] = copy(valueSerializer = serializer)
def property(key: String, value: String): ProducerSettings[K, V] = copy(otherProperties = otherProperties + (key -> value))

def toProperties: Properties =
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.mkString(","))
otherProperties.foreach { case (key, value) => props.put(key, value) }
props

object ProducerSettings:
private val StringSerializerInstance = new StringSerializer
def default: ProducerSettings[String, String] =
ProducerSettings(DefaultBootstrapServers, StringSerializerInstance, StringSerializerInstance, Map.empty)
4 changes: 4 additions & 0 deletions kafka/src/main/scala/ox/kafka/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package ox

package object kafka:
private[kafka] val DefaultBootstrapServers = List("localhost:9092")
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ox.kafka

import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.channels.*
import ox.kafka.ConsumerSettings.AutoOffsetReset.Earliest
import ox.scoped

class KafkaSourceTest extends AnyFlatSpec with Matchers with EmbeddedKafka with BeforeAndAfterAll {
class KafkaTest extends AnyFlatSpec with Matchers with EmbeddedKafka with BeforeAndAfterAll {

private var kafkaPort: Int = _

Expand Down Expand Up @@ -46,4 +48,22 @@ class KafkaSourceTest extends AnyFlatSpec with Matchers with EmbeddedKafka with
source.receive().orThrow.value() shouldBe "msg4"
}
}

it should "send messages to topics" in {
// given
val topic = "t2"

// when
scoped {
val settings = ProducerSettings.default.bootstrapServers(s"localhost:$kafkaPort")
Source
.fromIterable(List("a", "b", "c"))
.mapAsView(msg => ProducerRecord[String, String](topic, msg))
.pipeTo(KafkaSink.publish(settings))
}

// then
given Deserializer[String] = new StringDeserializer()
consumeNumberMessagesFrom[String](topic, 3) shouldBe List("a", "b", "c")
}
}

0 comments on commit c5072a0

Please sign in to comment.