Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some producer ops using the new send operation with batches #825

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions core/src/main/scala/com/banno/kafka/producer/ProducerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.banno.kafka.producer

import cats.*
import cats.data.NonEmptyList
import cats.syntax.all.*
import fs2.*
import org.apache.kafka.common.*
Expand All @@ -40,12 +41,99 @@ case class ProducerOps[F[_], K, V](producer: ProducerApi[F, K, V]) {
)(implicit F: Applicative[F]): F[G[RecordMetadata]] =
records.traverse(producer.sendAsync)

/** Sends all of the records to the producer (synchronously), so the producer
* may batch them. After all records are sent, asynchronously waits for all
* acks. Returns the write metadatas, in order. This is the only batch write
* operation that allows the producer to perform its own batching, while
* semantically blocking until all writes have succeeded. It maximizes
* concurrency and producer batching, and also simplicity of usage. Fails if
* any individual send or ack fails.
*/
def sendBatch[G[_]: Traverse](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth requiring that G have a NonEmptyTraverse? I think that'd allow the constraint on F below to be a FlatMap.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL NonEmptyTraverse, had no idea. That does indeed work for sendBatch, the problem is down below there is no NonEmptyTraverse[Chunk] in fs2. Maybe we could impl one, not sure what that takes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we ditch the chunk-related stuff. It seemed cool, but no idea if it's useful in practice.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chunk does have a toNel: Option[NonEmptyList[O]] that could be useful?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

590ffc2 shows one option: a parallel series of NonEmpty operations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or 3253892 shows one way to use Chunk with NonEmptyTraverse.

records: G[ProducerRecord[K, V]]
)(implicit F: Monad[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] = records.map(producer.send)
for {
acks <- sends.sequence
rms <- acks.sequence
} yield rms
}

def sendBatchNonEmpty[G[_]: NonEmptyTraverse](
records: G[ProducerRecord[K, V]]
)(implicit F: FlatMap[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] = records.map(producer.send)
for {
acks <- sends.nonEmptySequence
rms <- acks.nonEmptySequence
} yield rms
}

def sendBatchWithCallbacks[G[_]: Traverse](
records: G[ProducerRecord[K, V]],
onSend: ProducerRecord[K, V] => F[Unit],
)(implicit F: Monad[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] =
records.map(r => producer.send(r) <* onSend(r))
for {
acks <- sends.sequence
rms <- acks.sequence
} yield rms
}

def sendBatchWithCallbacks[G[_]: NonEmptyTraverse](
records: G[ProducerRecord[K, V]],
onSend: ProducerRecord[K, V] => F[Unit],
)(implicit F: FlatMap[F]): F[G[RecordMetadata]] = {
val sends: G[F[F[RecordMetadata]]] =
records.map(r => producer.send(r) <* onSend(r))
for {
acks <- sends.nonEmptySequence
rms <- acks.nonEmptySequence
} yield rms
}

def pipeSync: Pipe[F, ProducerRecord[K, V], RecordMetadata] =
_.evalMap(producer.sendSync)

def pipeAsync: Pipe[F, ProducerRecord[K, V], RecordMetadata] =
_.evalMap(producer.sendAsync)

def pipeSendBatch[G[_]: Traverse](implicit
F: Monad[F]
): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] =
_.evalMap(sendBatch[G])

def pipeSendBatchNonEmpty[G[_]: NonEmptyTraverse](implicit
F: FlatMap[F]
): Pipe[F, G[ProducerRecord[K, V]], G[RecordMetadata]] =
_.evalMap(sendBatchNonEmpty[G])

/** Uses the stream's chunks as batches of records to send to the producer. */
def pipeSendBatchChunks(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatch[Chunk](Traverse[Chunk], F)(s.chunks).flatMap(Stream.chunk)

def pipeSendBatchChunksNonEmpty(implicit
F: FlatMap[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatchNonEmpty[NonEmptyList](NonEmptyTraverse[NonEmptyList], F)(
s.chunks.map(_.toNel).unNone
).flatMap(nel => Stream.emits(nel.toList))

/** Calls chunkN on the input stream, to create chunks of size `n`, and sends
* those chunks as batches to the producer.
*/
def pipeSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], RecordMetadata] =
s =>
pipeSendBatch[Chunk](Traverse[Chunk], F)(s.chunkN(n, allowFewer))
.flatMap(Stream.chunk)

def sink: Pipe[F, ProducerRecord[K, V], Unit] =
_.evalMap(producer.sendAndForget)

Expand All @@ -55,6 +143,26 @@ case class ProducerOps[F[_], K, V](producer: ProducerApi[F, K, V]) {
def sinkAsync: Pipe[F, ProducerRecord[K, V], Unit] =
pipeAsync.apply(_).void

def sinkSendBatch[G[_]: Traverse](implicit
F: Monad[F]
): Pipe[F, G[ProducerRecord[K, V]], Unit] =
pipeSendBatch.apply(_).void

def sinkSendBatch[G[_]: NonEmptyTraverse](implicit
F: FlatMap[F]
): Pipe[F, G[ProducerRecord[K, V]], Unit] =
pipeSendBatchNonEmpty.apply(_).void

def sinkSendBatchChunks(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], Unit] =
pipeSendBatchChunks.apply(_).void

def sinkSendBatchChunkN(n: Int, allowFewer: Boolean = true)(implicit
F: Monad[F]
): Pipe[F, ProducerRecord[K, V], Unit] =
pipeSendBatchChunkN(n, allowFewer).apply(_).void

def transaction[G[_]: Foldable](
records: G[ProducerRecord[K, V]]
)(implicit F: MonadError[F, Throwable]): F[Unit] =
Expand Down