Skip to content

Commit

Permalink
success?
Browse files Browse the repository at this point in the history
Co-authored-by: Jos Bogan <[email protected]>
  • Loading branch information
andrewgee and JosBogan committed Jul 10, 2024
1 parent c575c41 commit 80055a4
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,37 +183,35 @@ class Fs2RabbitAmqpClient[F[_]: Async: Temporal](
): Resource[F, Unit] =
client.createChannel(connection).flatMap { implicit channel =>
implicit val decoder: EnvelopeDecoder[F, consume.Delivery] = deliveryDecoder(queueName)
Resource
.make(Ref.of[F, Set[UUID]](Set.empty))(set =>
repeatUntil(set.get.flatTap(ids => Async[F].blocking(println(s"$ids left"))))(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
.flatMap(consumptionIds =>
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery =>
for {
uuid <- Async[F].delay(UUID.randomUUID())
_ <- consumptionIds.update(set => set + uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
res <- handler(delivery.payload).attempt
_ <- Async[F].blocking(println("past the handler!"))
tag = delivery.deliveryTag
_ <- consumptionIds.update(set => set - uuid)
_ <- consumptionIds.get.flatTap(ids => Async[F].blocking(println("set is: " + ids)))
result <- Async[F].fromEither(res)
} yield (result, tag)
Resource.eval(Ref.of[F, Set[UUID]](Set.empty)).flatMap { consumptionIds =>
Resource.eval(client.createAckerConsumer[consume.Delivery](model.QueueName(queueName.value))).flatMap { case (acker, consumer) =>
consumer
.evalMap(delivery =>
for {
uuid <- Async[F].delay(UUID.randomUUID())
_ <- consumptionIds.update(set => set + uuid)
res <- handler(delivery.payload).attempt
tag = delivery.deliveryTag
_ <- consumptionIds.update(set => set - uuid)
result <- Async[F].fromEither(res)
} yield (result, tag)
)
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
}
.compile
.drain
.background
.flatMap { _ =>
Resource.onFinalize(
repeatUntil(consumptionIds.get)(_.isEmpty)(shutdownRetry).timeout(shutdownTimeout)
)
.evalMap {
case (consume.Ack, tag) => acker(model.AckResult.Ack(tag))
case (consume.DeadLetter, tag) => acker(model.AckResult.NAck(tag))
case (consume.RequeueImmediately, tag) => acker(model.AckResult.Reject(tag))
}
.compile
.drain
.background
.map(_ => ())
}
)
}
.map(_ => ())
}
}
}

override def isConnectionOpen: F[Boolean] = Async[F].pure(connection.value.isOpen)
Expand Down
46 changes: 20 additions & 26 deletions it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.itv.bucky

import cats.effect.testing.scalatest.EffectTestSupport
import cats.effect.testing.scalatest.{AsyncIOSpec, EffectTestSupport}
import cats.effect.unsafe.IORuntime
import cats.effect.{IO, Resource}
import cats.effect.{Deferred, IO, Ref, Resource}
import com.itv.bucky.PayloadMarshaller.StringPayloadMarshaller
import com.itv.bucky.Unmarshaller.StringPayloadUnmarshaller
import com.itv.bucky._
Expand All @@ -22,11 +22,9 @@ import org.scalatest.matchers.should.Matchers._

import java.time.{Clock, Instant, LocalDateTime, ZoneOffset}
import java.util.UUID
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with EffectTestSupport with Eventually with IntegrationPatience {
class ShutdownTimeoutTest extends AsyncFunSuite with AsyncIOSpec with Eventually with IntegrationPatience {

case class TestFixture(
stubHandler: RecordingRequeueHandler[IO, Delivery],
Expand All @@ -35,9 +33,7 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect
publisher: Publisher[IO, PublishCommand]
)

implicit override val ioRuntime: IORuntime = packageIORuntime
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300))
val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds)
val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds)

def runTest[A](test: IO[A]): IO[A] = {
val rawConfig = ConfigFactory.load("bucky")
Expand All @@ -55,25 +51,23 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect
val queueName = QueueName(UUID.randomUUID().toString)
val declarations = List(Exchange(exchangeName).binding(routingKey -> queueName)) ++ requeue.requeueDeclarations(queueName, routingKey)

Fs2RabbitAmqpClient[IO](config)
.use { client =>
val handler = (_: Delivery) => IO.sleep(3.seconds).map(_ => Ack)
Resource
.eval(client.declare(declarations))
.flatMap(_ =>
for {
_ <- client.registerConsumer(queueName, handler)
} yield ()
)
.use { _ =>
val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey)
client.publisher().flatMap { publisher =>
publisher(pcb.toPublishCommand("a message")) *> IO
.sleep(3.seconds)
.flatMap(_ => test)
Deferred[IO, Boolean].flatMap { consumingMessage =>
Fs2RabbitAmqpClient[IO](config)
.use { client =>
val handler = (_: Delivery) => consumingMessage.complete(true) *> IO.sleep(3.seconds).as(Ack)
Resource
.eval(client.declare(declarations))
.flatMap(_ => client.registerConsumer(queueName, handler))
.use { _ =>
val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey)
client.publisher().flatMap { publisher =>
publisher(pcb.toPublishCommand("a message")) *>
consumingMessage.get
.flatMap(_ => test)
}
}
}
}
}
}
}

test("Should wait until a handler finishes executing before shutting down") {
Expand Down

0 comments on commit 80055a4

Please sign in to comment.