Skip to content

Commit

Permalink
Simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Aug 24, 2023
1 parent 90c6df9 commit 5dd1b83
Showing 1 changed file with 8 additions and 38 deletions.
46 changes: 8 additions & 38 deletions kafka/src/main/scala/ox/kafka/KafkaStage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,7 @@ object KafkaStage:
mapPublish(settings.toProducer, closeWhenComplete = true)

def mapPublish(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
val c = StageCapacity.newChannel[RecordMetadata]
val exceptions = Channel[Exception](Int.MaxValue)
val metadata = Channel[(Long, RecordMetadata)](128)

// each produced message gets a sequence number; this is used to send the record metadata downstream in the
// same order, as the received producer records
val sendInSequence = SendInSequence(c)

fork {
try
repeatWhile {
select(exceptions.receiveClause, metadata.receiveClause, source.receiveOrDoneClause) match
case ChannelClosed.Error(r) => c.error(r); false
case ChannelClosed.Done => sendInSequence.drainFromThenDone(exceptions, metadata); false
case exceptions.Received(e) => c.error(e); false
case metadata.Received((s, m)) => sendInSequence.send(s, m); true
case source.Received(record) =>
val sequenceNo = sendInSequence.nextSequenceNo

try
producer.send(
record,
(m: RecordMetadata, e: Exception) => if e != null then exceptions.send(e) else metadata.send((sequenceNo, m))
)
true
catch
case e: Exception =>
c.error(e)
false
}
finally
if closeWhenComplete then
logger.debug("Closing the Kafka producer")
uninterruptible(producer.close())
}

c
source.mapAsView(r => SendPacket(List(r), Nil)).mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = false)

extension [K, V](source: Source[SendPacket[K, V]])
/** For each packet, first all messages (producer records) are sent. Then, all messages up to the offsets of the consumer messages are
Expand All @@ -70,6 +34,12 @@ object KafkaStage:
* The producer that is used to send messages.
*/
def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean)(using StageCapacity, Ox): Source[RecordMetadata] =
mapPublishAndCommit(producer, closeWhenComplete, commitOffsets = true)

private def mapPublishAndCommit(producer: KafkaProducer[K, V], closeWhenComplete: Boolean, commitOffsets: Boolean)(using
StageCapacity,
Ox
): Source[RecordMetadata] =
val c = StageCapacity.newChannel[RecordMetadata]
val exceptions = Channel[Exception](Int.MaxValue)
val metadata = Channel[(Long, RecordMetadata)](128)
Expand All @@ -82,7 +52,7 @@ object KafkaStage:
// starting a nested scope, so that the committer is interrupted when the main process ends
scoped {
// committer
fork(tapException(doCommit(toCommit))(c.error))
if commitOffsets then fork(tapException(doCommit(toCommit))(c.error))

repeatWhile {
select(exceptions.receiveClause, metadata.receiveClause, source.receiveOrDoneClause) match
Expand Down

0 comments on commit 5dd1b83

Please sign in to comment.