diff --git a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala index c55e72a..4768d79 100644 --- a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala +++ b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/AmqpClientConnectionManager.scala @@ -34,7 +34,7 @@ class AmqpClientConnectionManager[F[_]: Async]( }) } - def publish(cmd: PublishCommand): F[Unit] = + def addConfirmListeningToPublisher(publisher: Publisher[F, PublishCommand]): Publisher[F, PublishCommand] = (cmd: PublishCommand) => for { deliveryTag <- Ref.of[F, Option[Long]](None) _ <- (for { @@ -44,7 +44,7 @@ class AmqpClientConnectionManager[F[_]: Async]( nextPublishSeq <- Async[F].blocking(publishChannel.getNextPublishSeqNo) _ <- deliveryTag.set(Some(nextPublishSeq)) _ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal)) - _ <- publisher()(cmd) + _ <- publisher(cmd) } yield () } _ <- signal.get.flatMap(maybeError => maybeError.traverse(Async[F].raiseError[Unit])) @@ -62,27 +62,6 @@ class AmqpClientConnectionManager[F[_]: Async]( } } yield () - def publisher2(mandatory: Boolean): F[Publisher[F, PublishCommand]] = { - val publisher = client.createBasicPublisherWithListener[PublishCommand]( - PublishingFlag(mandatory), - _ => Async[F].unit - ) - - publisher.map(f => (publishCommand: PublishCommand) => f(model.ExchangeName(publishCommand.exchange.value), model.RoutingKey(publishCommand.exchange.value), publishCommand)) - } - - def publisher(): Publisher[F, PublishCommand] = (publishCommand: PublishCommand) => - for { - publisher <- client - .createPublisherWithListener[PublishCommand]( - model.ExchangeName(publishCommand.exchange.value), - model.RoutingKey(publishCommand.routingKey.value), - PublishingFlag(publishCommand.mandatory), - _ => Async[F].unit - ) - _ <- publisher(publishCommand) - } yield () - } private[bucky] object AmqpClientConnectionManager extends StrictLogging { diff --git a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala index 2568922..245120f 100644 --- a/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala +++ b/backendFs2Rabbit/src/main/scala/com/itv/bucky/backend/fs2rabbit/Fs2RabbitAmqpClient.scala @@ -10,7 +10,20 @@ import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient.deliveryDecoder import com.itv.bucky.consume.DeliveryMode import com.itv.bucky.decl.ExchangeType import com.itv.bucky.publish.{ContentEncoding, ContentType, PublishCommand} -import com.itv.bucky.{AmqpClient, AmqpClientConfig, Envelope, ExchangeName, Handler, Payload, Publisher, QueueName, RoutingKey, consume, decl, publish} +import com.itv.bucky.{ + AmqpClient, + AmqpClientConfig, + Envelope, + ExchangeName, + Handler, + Payload, + Publisher, + QueueName, + RoutingKey, + consume, + decl, + publish +} import com.rabbitmq.client.LongString import dev.profunktor.fs2rabbit.arguments.SafeArg import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig @@ -18,7 +31,22 @@ import dev.profunktor.fs2rabbit.config.declaration._ import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder} import dev.profunktor.fs2rabbit.interpreter.RabbitClient import dev.profunktor.fs2rabbit.model -import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{ArrayVal, BooleanVal, ByteArrayVal, ByteVal, DecimalVal, DoubleVal, FloatVal, IntVal, LongVal, NullVal, ShortVal, StringVal, TableVal, TimestampVal} +import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{ + ArrayVal, + BooleanVal, + ByteArrayVal, + ByteVal, + DecimalVal, + DoubleVal, + FloatVal, + IntVal, + LongVal, + NullVal, + ShortVal, + StringVal, + TableVal, + TimestampVal +} import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag, ShortString} import scodec.bits.ByteVector @@ -31,9 +59,9 @@ import Fs2RabbitAmqpClient._ class Fs2RabbitAmqpClient[F[_]: Async]( client: RabbitClient[F], connection: model.AMQPConnection, -// publishChannel: model.AMQPChannel, + publishChannel: model.AMQPChannel, amqpClientConnectionManager: AmqpClientConnectionManager[F] -)(implicit amqpChannel: AMQPChannel) extends AmqpClient[F] { +) extends AmqpClient[F] { override def declare(declarations: decl.Declaration*): F[Unit] = declare(declarations.toList) @@ -63,70 +91,78 @@ class Fs2RabbitAmqpClient[F[_]: Async]( case decl.Fanout => model.ExchangeType.FanOut } -// implicit val channel: model.AMQPChannel = publishChannel - - declarations.toList - .sortBy { - case _: decl.Queue => 0 - case _: decl.Exchange => 1 - case _ => 2 - } - .map { - case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) => - client.declareExchange( - DeclarationExchangeConfig - .default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)) - .copy( - arguments = argumentsFromAnyRef(arguments), - durable = if (isDurable) Durable else NonDurable, - autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, - internal = if (isInternal) Internal else NonInternal - ) - ) *> - bindings.traverse_ { binding => - client.bindQueue( - model.QueueName(binding.queueName.value), - model.ExchangeName(binding.exchangeName.value), - model.RoutingKey(binding.routingKey.value), - model.QueueBindingArgs(argumentsFromAnyRef(binding.arguments)) - ) - } - case decl.Binding(exchangeName, queueName, routingKey, arguments) => - client.bindQueue( - model.QueueName(queueName.value), - model.ExchangeName(exchangeName.value), - model.RoutingKey(routingKey.value), - model.QueueBindingArgs(argumentsFromAnyRef(arguments)) - ) - case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => - client.bindExchange( - model.ExchangeName(destinationExchangeName.value), - model.ExchangeName(sourceExchangeName.value), - model.RoutingKey(routingKey.value), - model.ExchangeBindingArgs(argumentsFromAnyRef(arguments)) - ) - case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) => - client.declareQueue( - DeclarationQueueConfig - .default(model.QueueName(name.value)) - .copy( - arguments = argumentsFromAnyRef(arguments), - durable = if (isDurable) Durable else NonDurable, - autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, - exclusive = if (isExclusive) Exclusive else NonExclusive - ) - ) - } - .sequence_ - - } + client.createChannel(connection).use { implicit channel => + declarations.toList + .sortBy { + case _: decl.Queue => 0 + case _: decl.Exchange => 1 + case _ => 2 + } + .map { + case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) => + client.declareExchange( + DeclarationExchangeConfig + .default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType)) + .copy( + arguments = argumentsFromAnyRef(arguments), + durable = if (isDurable) Durable else NonDurable, + autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, + internal = if (isInternal) Internal else NonInternal + ) + ) *> + bindings.traverse_ { binding => + client.bindQueue( + model.QueueName(binding.queueName.value), + model.ExchangeName(binding.exchangeName.value), + model.RoutingKey(binding.routingKey.value), + model.QueueBindingArgs(argumentsFromAnyRef(binding.arguments)) + ) + } + case decl.Binding(exchangeName, queueName, routingKey, arguments) => + client.bindQueue( + model.QueueName(queueName.value), + model.ExchangeName(exchangeName.value), + model.RoutingKey(routingKey.value), + model.QueueBindingArgs(argumentsFromAnyRef(arguments)) + ) + case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) => + client.bindExchange( + model.ExchangeName(destinationExchangeName.value), + model.ExchangeName(sourceExchangeName.value), + model.RoutingKey(routingKey.value), + model.ExchangeBindingArgs(argumentsFromAnyRef(arguments)) + ) + case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) => + client.declareQueue( + DeclarationQueueConfig + .default(model.QueueName(name.value)) + .copy( + arguments = argumentsFromAnyRef(arguments), + durable = if (isDurable) Durable else NonDurable, + autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete, + exclusive = if (isExclusive) Exclusive else NonExclusive + ) + ) + } + .sequence_ + } - private def publisher2(mandatory: Boolean): F[Publisher[F, publish.PublishCommand]] = { - amqpClientConnectionManager.publish(publishCommand) } - override def publisher(): Publisher[F, publish.PublishCommand] = (publishCommand: PublishCommand) => - amqpClientConnectionManager.publish(publishCommand) + override def publisher(mandatory: Boolean): F[Publisher[F, publish.PublishCommand]] = + client + .createBasicPublisherWithListener[PublishCommand]( + PublishingFlag(mandatory), + _ => Async[F].unit // Mandatory returns ignored here, but are handled in AmqpClientConnectionManager + )(publishChannel, implicitly) + .map { publisher => (publishCommand: PublishCommand) => + publisher( + model.ExchangeName(publishCommand.exchange.value), + model.RoutingKey(publishCommand.exchange.value), + publishCommand + ) + } + .map(amqpClientConnectionManager.addConfirmListeningToPublisher) override def registerConsumer( queueName: bucky.QueueName, @@ -185,7 +221,7 @@ object Fs2RabbitAmqpClient { amqpChannel = publishChannel ) ) - } yield new Fs2RabbitAmqpClient(client, connection, publishChannel, amqpClientConnectionManager)(publishChannel) + } yield new Fs2RabbitAmqpClient(client, connection, publishChannel, amqpClientConnectionManager) } implicit def deliveryEncoder[F[_]: Async]: MessageEncoder[F, PublishCommand] = diff --git a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala index b166c56..e015982 100644 --- a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/AmqpClientConnectionManager.scala @@ -31,7 +31,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]]( }) } - def publish(cmd: PublishCommand): F[Unit] = + def publish(cmd: PublishCommand, mandatory: Boolean): F[Unit] = for { deliveryTag <- Ref.of[F, Option[Long]](None) _ <- (for { @@ -41,7 +41,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]]( nextPublishSeq <- publishChannel.getNextPublishSeqNo _ <- deliveryTag.set(Some(nextPublishSeq)) _ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal)) - _ <- publishChannel.publish(nextPublishSeq, cmd) + _ <- publishChannel.publish(nextPublishSeq, cmd, mandatory) } yield () } _ <- signal.get.ifM(F.unit, F.raiseError[Unit](new RuntimeException(s"Failed to publish msg: ${cmd}"))) diff --git a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala index 6442428..a1e95d1 100644 --- a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/Channel.scala @@ -25,7 +25,7 @@ trait Channel[F[_]] { def addConfirmListener(listener: ConfirmListener): F[Unit] def addReturnListener(listener: ReturnListener): F[Unit] def getNextPublishSeqNo: F[Long] - def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] + def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] def sendAction(action: ConsumeAction)(envelope: Envelope): F[Unit] def declareExchange(exchange: Exchange): F[Unit] def declareQueue(queue: Queue): F[Unit] @@ -68,12 +68,12 @@ object Channel { override def addReturnListener(listener: ReturnListener): F[Unit] = F.delay(channel.addReturnListener(listener)) override def getNextPublishSeqNo: F[Long] = F.delay(channel.getNextPublishSeqNo) - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = for { _ <- F.delay(logger.debug("Publishing command with exchange:{} rk: {}.", cmd.exchange, cmd.routingKey)) _ <- F.blocking( channel - .basicPublish(cmd.exchange.value, cmd.routingKey.value, cmd.mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value) + .basicPublish(cmd.exchange.value, cmd.routingKey.value, mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value) ) _ <- F.delay(logger.info("Published message: {}", cmd)) } yield () diff --git a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala index 7491255..d11afd9 100644 --- a/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala +++ b/backendJavaAmqp/src/main/scala/com/itv/bucky/backend/javaamqp/JavaBackendAmqpClient.scala @@ -142,7 +142,9 @@ object JavaBackendAmqpClient extends StrictLogging { _ <- if (ended) F.unit else t.sleep(sleep) *> repeatUntil(eval)(pred)(sleep) } yield () - override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext) + override def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]] = F.pure( + cmd => connectionManager.publish(cmd, mandatory).evalOn(executionContext) + ) override def registerConsumer(queueName: QueueName, handler: Handler[F, Delivery], diff --git a/core/src/main/scala/com/itv/bucky/AmqpClient.scala b/core/src/main/scala/com/itv/bucky/AmqpClient.scala index 2e75147..c73e91c 100644 --- a/core/src/main/scala/com/itv/bucky/AmqpClient.scala +++ b/core/src/main/scala/com/itv/bucky/AmqpClient.scala @@ -11,7 +11,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} trait AmqpClient[F[_]] { def declare(declarations: Declaration*): F[Unit] def declare(declarations: Iterable[Declaration]): F[Unit] - def publisher(): Publisher[F, PublishCommand] + def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]] def registerConsumer(queueName: QueueName, handler: Handler[F, Delivery], exceptionalAction: ConsumeAction = DeadLetter, diff --git a/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala b/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala index a96cba7..7b6d5c0 100644 --- a/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala +++ b/core/src/main/scala/com/itv/bucky/LoggingAmqpClient.scala @@ -16,26 +16,32 @@ object LoggingAmqpClient extends StrictLogging { private[bucky] def logSuccessfullPublishMessage[F[_]](charset: Charset, cmd: PublishCommand)(implicit F: Sync[F]): F[Unit] = F.delay( - logger.info("Successfully published message with rk:'{}', exchange:{} and message:'{}'", - cmd.routingKey.value, - cmd.exchange.value, - new String(cmd.body.value, charset)) + logger.info( + "Successfully published message with rk:'{}', exchange:{} and message:'{}'", + cmd.routingKey.value, + cmd.exchange.value, + new String(cmd.body.value, charset) + ) ) private[bucky] def logFailedPublishMessage[F[_]](t: Throwable, charset: Charset, cmd: PublishCommand)(implicit F: Sync[F]): F[Unit] = F.delay( - logger.error("Failed to publish message with rk:'{}', exchange:'{}' and message:'{}'", - cmd.routingKey.value, - cmd.exchange.value, - new String(cmd.body.value, charset), - t) + logger.error( + "Failed to publish message with rk:'{}', exchange:'{}' and message:'{}'", + cmd.routingKey.value, + cmd.exchange.value, + new String(cmd.body.value, charset), + t + ) ) - private[bucky] def logFailedHandler[F[_]](charset: Charset, - queueName: QueueName, - exceptionalAction: ConsumeAction, - delivery: Delivery, - t: Throwable)(implicit F: Sync[F]): F[Unit] = F.delay { + private[bucky] def logFailedHandler[F[_]]( + charset: Charset, + queueName: QueueName, + exceptionalAction: ConsumeAction, + delivery: Delivery, + t: Throwable + )(implicit F: Sync[F]): F[Unit] = F.delay { logger.error( s"Failed to execute handler for message with rk '{}' on queue '{}' and exchange '{}'. Will return '{}'. message: '{}', headers:'{}'", delivery.envelope.routingKey.value, @@ -48,8 +54,9 @@ object LoggingAmqpClient extends StrictLogging { ) } - private[bucky] def logSuccessfulHandler[F[_]](charset: Charset, queueName: QueueName, delivery: Delivery, ca: ConsumeAction)( - implicit F: Sync[F]): F[Unit] = F.delay { + private[bucky] def logSuccessfulHandler[F[_]](charset: Charset, queueName: QueueName, delivery: Delivery, ca: ConsumeAction)(implicit + F: Sync[F] + ): F[Unit] = F.delay { logger.info( "Executed handler for message with rk:'{}' on queue:'{}' and exchange '{}'. Will return '{}'. message: '{}'", delivery.envelope.routingKey.value, @@ -65,30 +72,31 @@ object LoggingAmqpClient extends StrictLogging { override def declare(declarations: decl.Declaration*): F[Unit] = amqpClient.declare(declarations) override def declare(declarations: Iterable[decl.Declaration]): F[Unit] = amqpClient.declare(declarations) - override def publisher(): Publisher[F, PublishCommand] = { - val originalPublisher = amqpClient.publisher() - cmd: PublishCommand => - { - (for { - result <- originalPublisher(cmd).attempt - _ <- result.fold[F[Unit]](logFailedPublishMessage(_, charset, cmd), _ => logSuccessfullPublishMessage(charset, cmd)) - } yield result).rethrow - } - } + override def publisher(mandatory: Boolean): F[Publisher[F, PublishCommand]] = + amqpClient.publisher().map { originalPublisher => + cmd: PublishCommand => + (for { + result <- originalPublisher(cmd).attempt + _ <- result.fold[F[Unit]](logFailedPublishMessage(_, charset, cmd), _ => logSuccessfullPublishMessage(charset, cmd)) + } yield result).rethrow + } - override def registerConsumer(queueName: QueueName, - handler: Handler[F, Delivery], - exceptionalAction: ConsumeAction, - prefetchCount: Int, - shutdownTimeout: FiniteDuration = 1.minutes, - shutdownRetry: FiniteDuration = 500.millis): Resource[F, Unit] = { - val newHandler = (delivery: Delivery) => { + override def registerConsumer( + queueName: QueueName, + handler: Handler[F, Delivery], + exceptionalAction: ConsumeAction, + prefetchCount: Int, + shutdownTimeout: FiniteDuration = 1.minutes, + shutdownRetry: FiniteDuration = 500.millis + ): Resource[F, Unit] = { + val newHandler = (delivery: Delivery) => (for { result <- handler(delivery).attempt - _ <- result.fold(logFailedHandler(charset, queueName, exceptionalAction, delivery, _), - logSuccessfulHandler(charset, queueName, delivery, _)) + _ <- result.fold( + logFailedHandler(charset, queueName, exceptionalAction, delivery, _), + logSuccessfulHandler(charset, queueName, delivery, _) + ) } yield result).rethrow - } amqpClient.registerConsumer(queueName, newHandler, exceptionalAction, prefetchCount) } diff --git a/core/src/main/scala/com/itv/bucky/package.scala b/core/src/main/scala/com/itv/bucky/package.scala index 84944ed..c281956 100644 --- a/core/src/main/scala/com/itv/bucky/package.scala +++ b/core/src/main/scala/com/itv/bucky/package.scala @@ -1,8 +1,8 @@ package com.itv import java.nio.charset.{Charset, StandardCharsets} - import cats.effect.{Resource, Sync} +import cats.implicits.toFunctorOps import cats.{Applicative, ApplicativeError} import com.itv.bucky.Unmarshaller.toDeliveryUnmarshaller import com.itv.bucky.consume._ @@ -100,18 +100,18 @@ package object bucky { } - implicit class PublisherSugar[F[_]](amqpClient: AmqpClient[F]) { + implicit class PublisherSugar[F[_]: Applicative](amqpClient: AmqpClient[F]) { - def publisherOf[T](implicit publishCommandBuilder: PublishCommandBuilder[T]): Publisher[F, T] = { - val basePublisher = amqpClient.publisher() - value: T => - { + def publisherOf[T](implicit publishCommandBuilder: PublishCommandBuilder[T]): F[Publisher[F, T]] = { + amqpClient.publisher().map { basePublisher => + value: T => { val command = publishCommandBuilder.toPublishCommand(value) basePublisher.apply(command) } + } } - def publisherOf[T](exchangeName: ExchangeName, routingKey: RoutingKey)(implicit marshaller: PayloadMarshaller[T]): Publisher[F, T] = { + def publisherOf[T](exchangeName: ExchangeName, routingKey: RoutingKey)(implicit marshaller: PayloadMarshaller[T]): F[Publisher[F, T]] = { val pcb = PublishCommandBuilder .publishCommandBuilder(marshaller) @@ -121,7 +121,7 @@ package object bucky { } def publisherWithHeadersOf[T](exchangeName: ExchangeName, - routingKey: RoutingKey)(implicit F: Sync[F], marshaller: PayloadMarshaller[T]): PublisherWithHeaders[F, T] = { + routingKey: RoutingKey)(implicit F: Sync[F], marshaller: PayloadMarshaller[T]): F[PublisherWithHeaders[F, T]] = { val pcb = PublishCommandBuilder .publishCommandBuilder(marshaller) @@ -130,16 +130,17 @@ package object bucky { publisherWithHeadersOf[T](pcb) } - def publisherWithHeadersOf[T](commandBuilder: PublishCommandBuilder[T])(implicit F: Sync[F]): PublisherWithHeaders[F, T] = - (message: T, headers: Map[String, AnyRef]) => - F.flatMap(F.delay { - val command = commandBuilder.toPublishCommand(message) - - command.copy(basicProperties = headers.foldLeft(command.basicProperties) { - case (props, (headerName, headerValue)) => props.withHeader(headerName -> headerValue) - }) - })(amqpClient.publisher()) - + def publisherWithHeadersOf[T](commandBuilder: PublishCommandBuilder[T])(implicit F: Sync[F]): F[PublisherWithHeaders[F, T]] = + amqpClient.publisher().map { publisher => + (message: T, headers: Map[String, AnyRef]) => + F.flatMap(F.delay { + val command = commandBuilder.toPublishCommand(message) + + command.copy(basicProperties = headers.foldLeft(command.basicProperties) { + case (props, (headerName, headerValue)) => props.withHeader(headerName -> headerValue) + }) + })(publisher) + } } implicit class DeclareSugar[F[_]](amqpClient: AmqpClient[F])(implicit a: Applicative[F]) { diff --git a/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala b/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala index 40a73e1..f935b77 100644 --- a/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala +++ b/core/src/main/scala/com/itv/bucky/pattern/requeue/package.scala @@ -59,10 +59,11 @@ package object requeue { onRequeueExpiryAction: Delivery => F[ConsumeAction] = (_: Delivery) => F.point[ConsumeAction](DeadLetter), prefetchCount: Int = defaultPreFetchCount): Resource[F, Unit] = { val requeueExchange = ExchangeName(s"${queueName.value}.requeue") - val requeuePublish = amqpClient.publisher() - amqpClient.registerConsumer(queueName, - RequeueTransformer(requeuePublish, requeueExchange, requeuePolicy, onHandlerException, onRequeueExpiryAction)(handler), - prefetchCount = prefetchCount) + Resource.eval(amqpClient.publisher()).flatMap { requeuePublish => + amqpClient.registerConsumer(queueName, + RequeueTransformer(requeuePublish, requeueExchange, requeuePolicy, onHandlerException, onRequeueExpiryAction)(handler), + prefetchCount = prefetchCount) + } } } diff --git a/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala b/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala index bf4d028..6b48d3d 100644 --- a/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala +++ b/core/src/main/scala/com/itv/bucky/publish/PublishCommandBuilder.scala @@ -60,19 +60,14 @@ object PublishCommandBuilder { case class Builder[T](exchange: ExchangeName, routingKey: RoutingKey, properties: Option[MessageProperties], - marshaller: PayloadMarshaller[T], - mandatory: Boolean = false) + marshaller: PayloadMarshaller[T]) extends PublishCommandBuilder[T] { override def toPublishCommand(t: T): PublishCommand = - PublishCommand(exchange, routingKey, properties.fold(MessageProperties.persistentBasic)(identity), marshaller(t), mandatory) + PublishCommand(exchange, routingKey, properties.fold(MessageProperties.persistentBasic)(identity), marshaller(t)) def using(basicProperties: MessageProperties): Builder[T] = copy(properties = Some(basicProperties)) - - def usingMandatory(mandatory: Boolean): Builder[T] = - copy(mandatory = mandatory) - } } diff --git a/core/src/main/scala/com/itv/bucky/publish/package.scala b/core/src/main/scala/com/itv/bucky/publish/package.scala index d5c461b..e8f085e 100644 --- a/core/src/main/scala/com/itv/bucky/publish/package.scala +++ b/core/src/main/scala/com/itv/bucky/publish/package.scala @@ -61,7 +61,7 @@ package object publish { priority = Some(0) ) } - case class PublishCommand(exchange: ExchangeName, routingKey: RoutingKey, basicProperties: MessageProperties, body: Payload, mandatory: Boolean = false) { + case class PublishCommand(exchange: ExchangeName, routingKey: RoutingKey, basicProperties: MessageProperties, body: Payload) { def description = s"${exchange.value}:${routingKey.value} $body" } } diff --git a/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala b/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala index f6cb90c..0813ce5 100644 --- a/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala +++ b/core/src/main/scala/com/itv/bucky/wiring/Wiring.scala @@ -14,7 +14,6 @@ import cats.effect.implicits._ import com.itv.bucky.consume.{ConsumeAction, DeadLetter, RequeueConsumeAction} import com.itv.bucky.publish.PublishCommandBuilder - final case class WiringName(value: String) extends AnyVal class Wiring[T]( @@ -81,8 +80,9 @@ class Wiring[T]( s"requeuePolicy=$requeuePolicy" ) ) - _ <- client.declare(publisherDeclarations) - } yield client.publisherOf(publisherBuilder) + _ <- client.declare(publisherDeclarations) + publisher <- client.publisherOf(publisherBuilder) + } yield publisher def publisherWithHeaders[F[_]](client: AmqpClient[F])(implicit F: Sync[F]): F[PublisherWithHeaders[F, T]] = for { @@ -96,8 +96,9 @@ class Wiring[T]( s"requeuePolicy=$requeuePolicy" ) } - _ <- client.declare(publisherDeclarations) - } yield client.publisherWithHeadersOf(publisherBuilder) + _ <- client.declare(publisherDeclarations) + publisher <- client.publisherWithHeadersOf(publisherBuilder) + } yield publisher def registerConsumer[F[_]](client: AmqpClient[F])(handleMessage: T => F[ConsumeAction])(implicit F: Sync[F]): Resource[F, Unit] = { val runDeclarations = diff --git a/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala b/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala index ac09f71..ca5fe5a 100644 --- a/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala +++ b/example/src/main/scala/com/itv/bucky/example/circe/CirceMarshalledPublisher.scala @@ -31,9 +31,9 @@ object CirceMarshalledPublisher extends IOApp { override def run(args: List[String]): IO[ExitCode] = JavaBackendAmqpClient[IO](amqpClientConfig).use { client => for { - _ <- client.declare(Declarations.all) - publisher = client.publisherOf[Person](Declarations.exchange.name, Declarations.routingKey) - _ <- publisher(Person("bob", 22)) + _ <- client.declare(Declarations.all) + publisher <- client.publisherOf[Person](Declarations.exchange.name, Declarations.routingKey) + _ <- publisher(Person("bob", 22)) } yield ExitCode.Success } diff --git a/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala b/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala index cc59482..b1a9e51 100644 --- a/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala +++ b/example/src/main/scala/com/itv/bucky/example/marshalling/MarshallingPublisher.scala @@ -40,7 +40,7 @@ object MarshallingPublisher extends IOApp { JavaBackendAmqpClient[IO](amqpClientConfig).use { client => for { _ <- client.declare(Seq(Declarations.exchange)) - publisher = client.publisherOf[Person] + publisher <- client.publisherOf[Person] _ <- publisher(Person("Bob", 67)) } yield ExitCode.Success } diff --git a/it/src/test/scala/com/itv/bucky/HammerTest.scala b/it/src/test/scala/com/itv/bucky/HammerTest.scala index 9dd29a0..a517bfd 100644 --- a/it/src/test/scala/com/itv/bucky/HammerTest.scala +++ b/it/src/test/scala/com/itv/bucky/HammerTest.scala @@ -23,7 +23,14 @@ import java.util.UUID import scala.collection.immutable.TreeSet import scala.concurrent.duration._ -class HammerTest extends AsyncFunSuite with IntegrationSpec with EffectTestSupport with Eventually with IntegrationPatience with StrictLogging with Matchers { +class HammerTest + extends AsyncFunSuite + with IntegrationSpec + with EffectTestSupport + with Eventually + with IntegrationPatience + with StrictLogging + with Matchers { implicit override val ioRuntime: IORuntime = packageIORuntime case class TestFixture(stubHandler: RecordingHandler[IO, String], publisher: Publisher[IO, String], client: AmqpClient[IO]) @@ -55,9 +62,11 @@ class HammerTest extends AsyncFunSuite with IntegrationSpec with EffectTestSuppo Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumerOf(queueName, handler)) handlerResource.use { _ => - val pub = client.publisherOf[String](exchangeName, routingKey) - val fixture = TestFixture(handler, pub, client) - test(fixture) + client.publisherOf[String](exchangeName, routingKey).flatMap { pub => + val fixture = TestFixture(handler, pub, client) + + test(fixture) + } } } } @@ -107,9 +116,6 @@ class HammerTest extends AsyncFunSuite with IntegrationSpec with EffectTestSuppo val order: Ref[IO, List[String]] = Ref.of[IO, List[String]](List.empty).unsafeRunSync() - val fastPublisher = testFixture.client.publisherOf[String](exchange, fastRk) - val slowPublisher = testFixture.client.publisherOf[String](exchange, slowRk) - val fastHandler = new RecordingHandler[IO, String]((v1: String) => for { _ <- order.update(_ :+ "fast") @@ -134,15 +140,19 @@ class HammerTest extends AsyncFunSuite with IntegrationSpec with EffectTestSuppo val handlersResource = Resource.eval(testFixture.client.declare(declarations)).flatMap(_ => handlers) - handlersResource.use { _ => - for { - _ <- slowPublisher("slow one") - _ <- IO.sleep(1.second) - _ <- fastPublisher("fast one") - } yield eventually { - val messages = order.get.unsafeRunSync() - messages should have size 2 - messages shouldBe List("fast", "slow") + testFixture.client.publisherOf[String](exchange, fastRk).flatMap { fastPublisher => + testFixture.client.publisherOf[String](exchange, slowRk).flatMap { slowPublisher => + handlersResource.use { _ => + for { + _ <- slowPublisher("slow one") + _ <- IO.sleep(1.second) + _ <- fastPublisher("fast one") + } yield eventually { + val messages = order.get.unsafeRunSync() + messages should have size 2 + messages shouldBe List("fast", "slow") + } + } } } } diff --git a/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala index decb663..4a28786 100644 --- a/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/PublishIntegrationTest.scala @@ -27,9 +27,9 @@ class PublishIntegrationTest extends AnyFunSuite with IntegrationSpec with Event val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) test("publisher should error if mandatory is and there is no routing") { - withTestFixture{ + withTestFixture(mandatory = true){ case (builder, publisher) => - publisher(builder.usingMandatory(true).toPublishCommand("Where am I going?")).attempt.map(res => { + publisher(builder.toPublishCommand("Where am I going?")).attempt.map(res => { println("result: " + res) res.isLeft shouldBe true } @@ -37,14 +37,14 @@ class PublishIntegrationTest extends AnyFunSuite with IntegrationSpec with Event } } test("publisher should publish if mandatory is false and there is no routing") { - withTestFixture { + withTestFixture(mandatory = false) { case (builder, publisher) => - publisher(builder.usingMandatory(false).toPublishCommand("But seriously though, where am I going?")).attempt.map(_.isRight shouldBe true) + publisher(builder.toPublishCommand("But seriously though, where am I going?")).attempt.map(_.isRight shouldBe true) } } - def withTestFixture(test: (PublishCommandBuilder.Builder[String], Publisher[IO, PublishCommand]) => IO[Unit]): Unit = { + def withTestFixture(mandatory: Boolean)(test: (PublishCommandBuilder.Builder[String], Publisher[IO, PublishCommand]) => IO[Unit]): Unit = { val rawConfig = ConfigFactory.load("bucky") val config = AmqpClientConfig(rawConfig.getString("rmq.host"), @@ -65,8 +65,9 @@ class PublishIntegrationTest extends AnyFunSuite with IntegrationSpec with Event ) .use { _ => val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val pub = client.publisher() - test(pcb, pub) + client.publisher(mandatory).flatMap { pub => + test(pcb, pub) + } } } .unsafeRunSync() diff --git a/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala index 34c369e..87edb49 100644 --- a/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/RequeueIntegrationTest.scala @@ -35,8 +35,8 @@ class RequeueIntegrationTest extends AsyncFunSuite with IntegrationSpec with Eff ) implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) - val requeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) + val requeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def withTestFixture(test: TestFixture => IO[Unit]): IO[Unit] = { val rawConfig = ConfigFactory.load("bucky") @@ -72,10 +72,11 @@ class RequeueIntegrationTest extends AsyncFunSuite with IntegrationSpec with Eff } yield () ) .use { _ => - val pub = client.publisher() - val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val fixture = TestFixture(handler, dlqHandler, pcb, pub) - test(fixture) + client.publisher().flatMap { pub => + val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) + val fixture = TestFixture(handler, dlqHandler, pcb, pub) + test(fixture) + } } } } diff --git a/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala b/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala index 418e6ca..8097044 100644 --- a/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala +++ b/it/src/test/scala/com/itv/bucky/RequeueWithExpiryActionIntegrationTest.scala @@ -27,7 +27,12 @@ import scala.collection.mutable.ListBuffer import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with IntegrationSpec with EffectTestSupport with Eventually with IntegrationPatience { +class RequeueWithExpiryActionIntegrationTest + extends AsyncFunSuite + with IntegrationSpec + with EffectTestSupport + with Eventually + with IntegrationPatience { case class TestFixture( stubHandler: RecordingRequeueHandler[IO, String], @@ -37,8 +42,8 @@ class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with Integrat ) implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) - val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) + val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def withTestFixture[F[_]](onRequeueExpiryAction: String => IO[ConsumeAction], handlerAction: String => IO[Unit] = _ => IO.unit)( test: TestFixture => IO[Unit] @@ -83,10 +88,11 @@ class RequeueWithExpiryActionIntegrationTest extends AsyncFunSuite with Integrat } yield () ) .use { _ => - val pub = client.publisher() - val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - val fixture = TestFixture(handler, dlqHandler, pcb, pub) - test(fixture) + client.publisher().flatMap { pub => + val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) + val fixture = TestFixture(handler, dlqHandler, pcb, pub) + test(fixture) + } } } } diff --git a/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala b/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala index 568dd7b..03554b3 100644 --- a/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala +++ b/it/src/test/scala/com/itv/bucky/ShutdownTimeoutTest.scala @@ -29,15 +29,15 @@ import scala.concurrent.duration._ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with EffectTestSupport with Eventually with IntegrationPatience { case class TestFixture( - stubHandler: RecordingRequeueHandler[IO, Delivery], - dlqHandler: RecordingHandler[IO, Delivery], - publishCommandBuilder: PublishCommandBuilder.Builder[String], - publisher: Publisher[IO, PublishCommand] - ) + stubHandler: RecordingRequeueHandler[IO, Delivery], + dlqHandler: RecordingHandler[IO, Delivery], + publishCommandBuilder: PublishCommandBuilder.Builder[String], + publisher: Publisher[IO, PublishCommand] + ) implicit override val ioRuntime: IORuntime = packageIORuntime - implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) - val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) + implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(300)) + val requeuePolicy: RequeuePolicy = RequeuePolicy(maximumProcessAttempts = 5, requeueAfter = 2.seconds) def runTest[A](test: IO[A]): IO[A] = { val rawConfig = ConfigFactory.load("bucky") @@ -67,7 +67,10 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect ) .use { _ => val pcb = publishCommandBuilder[String](implicitly).using(exchangeName).using(routingKey) - client.publisher()(pcb.toPublishCommand("a message")).flatMap(_ => test) + client.publisher().flatMap { publisher => + publisher(pcb.toPublishCommand("a message")) + .flatMap(_ => test) + } } } } @@ -83,4 +86,4 @@ class ShutdownTimeoutTest extends AsyncFunSuite with IntegrationSpec with Effect (result.toEpochMilli - after.toEpochMilli) < 3000 shouldBe true } } -} \ No newline at end of file +} diff --git a/test/src/main/scala/com/itv/bucky/test/package.scala b/test/src/main/scala/com/itv/bucky/test/package.scala index df964b3..0581a28 100644 --- a/test/src/main/scala/com/itv/bucky/test/package.scala +++ b/test/src/main/scala/com/itv/bucky/test/package.scala @@ -37,7 +37,7 @@ package object test { def publishNoAck[F[_]](implicit F: Async[F], t: Temporal[F]): StubChannel[F] = new StubChannel[F]() { - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = F.delay { + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = F.delay { pubSeqLock.synchronized { publishSeq = sequenceNumber + 1 } diff --git a/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala b/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala index 59d66c2..e58c6f7 100644 --- a/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala +++ b/test/src/main/scala/com/itv/bucky/test/stubs/StubChannel.scala @@ -135,7 +135,7 @@ abstract class StubChannel[F[_]](implicit F: Async[F]) extends Channel[F] with S .map(_.queueName) .toList) - override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] = { + override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] = { val queues = lookupQueues(cmd) val subscribedHandlers = handlers.view .filterKeys(queues.contains) diff --git a/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala b/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala index 030ddbb..e66a38a 100644 --- a/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/PublishConsumeTest.scala @@ -44,7 +44,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -66,7 +66,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -91,8 +91,8 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder.using(rkRouted).toPublishCommand(message)) - _ <- client.publisher()(commandBuilder.using(rkUnrouted).toPublishCommand(message)) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkRouted).toPublishCommand(message))) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkUnrouted).toPublishCommand(message))) } yield handler.receivedMessages should have size 1 } } @@ -117,8 +117,8 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder.using(rkRouted).toPublishCommand(message)) - _ <- client.publisher()(commandBuilder.using(rkUnrouted).toPublishCommand(message)) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkRouted).toPublishCommand(message))) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder.using(rkUnrouted).toPublishCommand(message))) } yield handler.receivedMessages should have size 1 } } @@ -148,7 +148,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -179,7 +179,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -217,9 +217,9 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(message1) + _ <- client.publisher().flatMap(publisher => publisher(message1)) firstCount = handler.receivedMessages.size - _ <- client.publisher()(message2) + _ <- client.publisher().flatMap(publisher => publisher(message2)) secondCount = handler.receivedMessages.size } yield { firstCount shouldBe 0 @@ -257,7 +257,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - _ <- client.publisher()(commandBuilder) + _ <- client.publisher().flatMap(publisher => publisher(commandBuilder)) } yield handler.receivedMessages should have size 1 } } @@ -280,9 +280,9 @@ class PublishConsumeTest val headers: Map[String, AnyRef] = Map("foo" -> "bar") Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = new PublisherSugar(client).publisherWithHeadersOf(commandBuilder) for { - _ <- publisher(message, headers) + publisher <- new PublisherSugar(client).publisherWithHeadersOf(commandBuilder) + _ <- publisher(message, headers) } yield { handler.receivedMessages should have size 1 handler.receivedMessages.head.properties.headers shouldBe headers @@ -307,7 +307,7 @@ class PublishConsumeTest Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => for { - publishResult <- client.publisher()(commandBuilder).attempt + publishResult <- client.publisher().flatMap(publisher => publisher(commandBuilder)).attempt } yield { publishResult.left.value shouldBe a[TimeoutException] handler.receivedMessages should have size 0 @@ -331,9 +331,9 @@ class PublishConsumeTest val declarations = List(Queue(queue), Exchange(exchange).binding((rk, queue))) Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = client.publisherOf[String] for { - _ <- publisher(message) + publisher <- client.publisherOf[String] + _ <- publisher(message) } yield handler.receivedMessages should have size 1 } } @@ -351,9 +351,9 @@ class PublishConsumeTest val declarations = List(Queue(queue), Exchange(exchange).binding((rk, queue))) Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumer(queue, handler)).use { _ => - val publisher = client.publisherOf[String](exchange, rk) for { - _ <- publisher(message) + publisher <- client.publisherOf[String](exchange, rk) + _ <- publisher(message) } yield handler.receivedMessages should have size 1 } } @@ -388,9 +388,9 @@ class PublishConsumeTest } yield () ) .use { _ => - val publisher = client.publisherOf[String](exchange, rk) for { - _ <- publisher("hello") + publisher <- client.publisherOf[String](exchange, rk) + _ <- publisher("hello") } yield requeueHandler.receivedMessages should have size 1 } } diff --git a/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala b/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala index 843e2d5..f35acab 100644 --- a/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/PublisherTest.scala @@ -41,7 +41,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues runAmqpTestIO(client(channel, Config.empty(10.seconds))) { client => for { pubSeq <- IO(channel.publishSeq) - future <- IO(client.publisher()(commandBuilder).unsafeToFuture()) + future <- IO(client.publisher().flatMap(publisher => publisher(commandBuilder)).unsafeToFuture()) _ <- IO.sleep(3.seconds) _ <- IO.sleep(3.seconds) isCompleted1 <- IO(future.isCompleted) @@ -56,7 +56,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues runAmqpTest(client(channel, Config.empty(10.seconds))) { client => for { pubSeq <- IO(channel.publishSeq) - future <- IO(client.publisher()(commandBuilder).unsafeToFuture()) + future <- IO(client.publisher().flatMap(publisher => publisher(commandBuilder)).unsafeToFuture()) properties = new BasicProperties _ = properties.builder().build() _ <- IO(channel.returnListeners.foreach(_.handleReturn(400, "reply", exchange.value, rk.value, properties, message.getBytes))) @@ -72,7 +72,7 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(1.second))) { client => for { - result <- client.publisher()(commandBuilder).attempt + result <- client.publisher().flatMap(publisher => publisher(commandBuilder)).attempt listeners <- IO(channel.confirmListeners.map(_.asInstanceOf[PendingConfirmListener[IO]])) pendingConf <- listeners.toList.map(_.pendingConfirmations.get).sequence } yield { @@ -89,9 +89,9 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(30.seconds))) { client => for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(5.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleAck(2, true))) outcome1 <- fiber1.join @@ -109,9 +109,9 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(30.seconds))) { client => for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(5.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleNack(2, true))) outcome1 <- fiber1.join @@ -128,12 +128,12 @@ class PublisherTest extends AnyFunSuite with IOAmqpClientTest with EitherValues test("Multiple messages can be published and some can be acked and some can be Nacked.") { val channel = StubChannels.publishNoAck[IO] runAmqpTestIO(client(channel, Config.empty(10.seconds))) { client => - val pub: IO[Unit] = client.publisher()(commandBuilder) + val pub: IO[Unit] = client.publisher().flatMap(publisher => publisher(commandBuilder)) for { - fiber1 <- client.publisher()(commandBuilder).start - fiber2 <- client.publisher()(commandBuilder).start - fiber3 <- client.publisher()(commandBuilder).start - fiber4 <- client.publisher()(commandBuilder).start + fiber1 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber2 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber3 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start + fiber4 <- client.publisher().flatMap(publisher => publisher(commandBuilder)).start _ <- IO.sleep(3.seconds) _ <- IO.sleep(3.seconds) _ <- IO(channel.confirmListeners.foreach(_.handleNack(0, false))) diff --git a/test/src/test/scala/com/itv/bucky/test/StubTest.scala b/test/src/test/scala/com/itv/bucky/test/StubTest.scala index 3f72264..69a4c33 100644 --- a/test/src/test/scala/com/itv/bucky/test/StubTest.scala +++ b/test/src/test/scala/com/itv/bucky/test/StubTest.scala @@ -23,7 +23,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { val consumer = StubHandlers.ackHandler[IO, String] Resource.eval(client.declare(declarations)).flatMap(_ => client.registerConsumerOf(queue, consumer)).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- (1 to 10).toList.map(_ => publisher(message)).sequence } yield { all(consumer.receivedMessages) should be(message) @@ -35,10 +35,11 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { test("Should not suffer from deadlock") { runAmqpTestAllAck { client => - val publisher = client.publisherOf[String](ExchangeName("x"), RoutingKey("y")) val handler = new Handler[IO, String] { override def apply(delivery: String): IO[consume.ConsumeAction] = - publisher("publish from handler").map(_ => Ack) + client.publisherOf[String](ExchangeName("x"), RoutingKey("y")).flatMap { publisher => + publisher("publish from handler").map(_ => Ack) + } } Resource @@ -47,8 +48,9 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { client.registerConsumerOf(queue, handler) } .use { _ => - val publisher = client.publisherOf[String](exchange, rk) - publisher(message) + client.publisherOf[String](exchange, rk).flatMap { publisher => + publisher(message) + } } } } @@ -66,7 +68,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()) .use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- publisher(message) } yield stubPubslisher.recordedMessages shouldBe List(message) } @@ -84,7 +86,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue2, consumer2) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) _ <- (1 to 10).toList.map(_ => publisher(message)).sequence } yield { all(consumer1.receivedMessages) should be(message) @@ -104,7 +106,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -123,7 +125,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -142,7 +144,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -161,7 +163,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -179,7 +181,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { _ <- client.registerConsumerOf(queue, consumer) } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message) @@ -199,7 +201,7 @@ class StubTest extends AnyFunSuite with IOAmqpClientTest { } yield ()).use { _ => for { - publisher <- IO(client.publisherOf[String](exchange, rk)) + publisher <- client.publisherOf[String](exchange, rk) publishRes <- publisher(message).attempt } yield { consumer.receivedMessages shouldBe List(message)