Skip to content

Commit

Permalink
Time :(
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Gee <[email protected]> 🫥
  • Loading branch information
JosBogan committed Jul 10, 2024
1 parent 1d74b7c commit c575c41
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.itv.bucky.backend.fs2rabbit

import cats.data.Kleisli
import cats.effect.implicits.genSpawnOps
import cats.effect.implicits.{genSpawnOps, genTemporalOps}
import cats.effect.std.Dispatcher
import cats.effect.{Async, Resource}
import cats.effect.{Async, Ref, Resource}
import cats.implicits._
import com.itv.bucky
import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient.deliveryDecoder
Expand Down Expand Up @@ -50,13 +50,14 @@ import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{
import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag, ShortString}
import scodec.bits.ByteVector

import java.util.Date
import java.util.{Date, UUID}
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.language.higherKinds
import Fs2RabbitAmqpClient._
import cats.effect.kernel.Temporal

class Fs2RabbitAmqpClient[F[_]: Async](
class Fs2RabbitAmqpClient[F[_]: Async: Temporal](
client: RabbitClient[F],
connection: model.AMQPConnection,
publishChannel: model.AMQPChannel,
Expand Down Expand Up @@ -165,6 +166,13 @@ class Fs2RabbitAmqpClient[F[_]: Async](
}
.map(amqpClientConnectionManager.addConfirmListeningToPublisher)

private def repeatUntil[A](eval: F[A])(pred: A => Boolean)(sleep: FiniteDuration): F[Unit] =
for {
result <- eval
ended <- Async[F].pure(pred(result))
_ <- if (ended) Async[F].unit else Temporal[F].sleep(sleep) *> repeatUntil(eval)(pred)(sleep)
} yield ()

override def registerConsumer(
queueName: bucky.QueueName,
handler: Handler[F, consume.Delivery],
Expand All @@ -175,19 +183,37 @@ class Fs2RabbitAmqpClient[F[_]: Async](
): Resource[F, Unit] =
client.createChannel(connection).flatMap { implicit channel =>
implicit val decoder: EnvelopeDecoder[F, consume.Delivery] = deliveryDecoder(queueName)
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery => handler(delivery.payload).map(_ -> delivery.deliveryTag))
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
Resource
.make(Ref.of[F, Set[UUID]](Set.empty))(set =>
repeatUntil(set.get.flatTap(ids => Async[F].blocking(println(s"$ids left"))))(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
.flatMap(consumptionIds =>
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery =>
for {
uuid <- Async[F].delay(UUID.randomUUID())
_ <- consumptionIds.update(set => set + uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
res <- handler(delivery.payload).attempt
_ <- Async[F].blocking(println("past the handler!"))
tag = delivery.deliveryTag
_ <- consumptionIds.update(set => set - uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
result <- Async[F].fromEither(res)
} yield (result, tag)
)
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
}
.compile
.drain
.background
.map(_ => ())
}
.compile
.drain
.background
.map(_ => ())
}
)
}

override def isConnectionOpen: F[Boolean] = Async[F].pure(connection.value.isOpen)
Expand Down
7 changes: 4 additions & 3 deletions it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect

Fs2RabbitAmqpClient[IO](config)
.use { client =>
val handler = StubHandlers.recordingHandler[IO, Delivery]((_: Delivery) => IO.sleep(3.seconds).map(_ => Ack))
val handler = (_: Delivery) => IO.sleep(3.seconds).map(_ => Ack)
Resource
.eval(client.declare(declarations))
.flatMap(_ =>
Expand All @@ -68,14 +68,15 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect
.use { _ =>
val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey)
client.publisher().flatMap { publisher =>
publisher(pcb.toPublishCommand("a message"))
publisher(pcb.toPublishCommand("a message")) *> IO
.sleep(3.seconds)
.flatMap(_ => test)
}
}
}
}

test("Should wait until a handler finishes executing before shuttind down") {
test("Should wait until a handler finishes executing before shutting down") {
val clock = Clock.systemUTC()
val start = Instant.now(clock)
runTest[Instant](IO.delay(Instant.now())).map { result =>
Expand Down

0 comments on commit c575c41

Please sign in to comment.