Skip to content

Commit

Permalink
Reworked publisher to return an F[Publisher[F, ???]]
Browse files Browse the repository at this point in the history
Co-authored-by: Jos Bogan <[email protected]>
  • Loading branch information
andrewgee and JosBogan committed Jul 5, 2024
1 parent b7949a9 commit f92eb0f
Show file tree
Hide file tree
Showing 24 changed files with 311 additions and 265 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class AmqpClientConnectionManager[F[_]: Async](
})
}

def publish(cmd: PublishCommand): F[Unit] =
def addConfirmListeningToPublisher(publisher: Publisher[F, PublishCommand]): Publisher[F, PublishCommand] = (cmd: PublishCommand) =>
for {
deliveryTag <- Ref.of[F, Option[Long]](None)
_ <- (for {
Expand All @@ -44,7 +44,7 @@ class AmqpClientConnectionManager[F[_]: Async](
nextPublishSeq <- Async[F].blocking(publishChannel.getNextPublishSeqNo)
_ <- deliveryTag.set(Some(nextPublishSeq))
_ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal))
_ <- publisher()(cmd)
_ <- publisher(cmd)
} yield ()
}
_ <- signal.get.flatMap(maybeError => maybeError.traverse(Async[F].raiseError[Unit]))
Expand All @@ -62,27 +62,6 @@ class AmqpClientConnectionManager[F[_]: Async](
}
} yield ()

def publisher2(mandatory: Boolean): F[Publisher[F, PublishCommand]] = {
val publisher = client.createBasicPublisherWithListener[PublishCommand](
PublishingFlag(mandatory),
_ => Async[F].unit
)

publisher.map(f => (publishCommand: PublishCommand) => f(model.ExchangeName(publishCommand.exchange.value), model.RoutingKey(publishCommand.exchange.value), publishCommand))
}

def publisher(): Publisher[F, PublishCommand] = (publishCommand: PublishCommand) =>
for {
publisher <- client
.createPublisherWithListener[PublishCommand](
model.ExchangeName(publishCommand.exchange.value),
model.RoutingKey(publishCommand.routingKey.value),
PublishingFlag(publishCommand.mandatory),
_ => Async[F].unit
)
_ <- publisher(publishCommand)
} yield ()

}

private[bucky] object AmqpClientConnectionManager extends StrictLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,43 @@ import com.itv.bucky.backend.fs2rabbit.Fs2RabbitAmqpClient.deliveryDecoder
import com.itv.bucky.consume.DeliveryMode
import com.itv.bucky.decl.ExchangeType
import com.itv.bucky.publish.{ContentEncoding, ContentType, PublishCommand}
import com.itv.bucky.{AmqpClient, AmqpClientConfig, Envelope, ExchangeName, Handler, Payload, Publisher, QueueName, RoutingKey, consume, decl, publish}
import com.itv.bucky.{
AmqpClient,
AmqpClientConfig,
Envelope,
ExchangeName,
Handler,
Payload,
Publisher,
QueueName,
RoutingKey,
consume,
decl,
publish
}
import com.rabbitmq.client.LongString
import dev.profunktor.fs2rabbit.arguments.SafeArg
import dev.profunktor.fs2rabbit.config.Fs2RabbitConfig
import dev.profunktor.fs2rabbit.config.declaration._
import dev.profunktor.fs2rabbit.effects.{EnvelopeDecoder, MessageEncoder}
import dev.profunktor.fs2rabbit.interpreter.RabbitClient
import dev.profunktor.fs2rabbit.model
import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{ArrayVal, BooleanVal, ByteArrayVal, ByteVal, DecimalVal, DoubleVal, FloatVal, IntVal, LongVal, NullVal, ShortVal, StringVal, TableVal, TimestampVal}
import dev.profunktor.fs2rabbit.model.AmqpFieldValue.{
ArrayVal,
BooleanVal,
ByteArrayVal,
ByteVal,
DecimalVal,
DoubleVal,
FloatVal,
IntVal,
LongVal,
NullVal,
ShortVal,
StringVal,
TableVal,
TimestampVal
}
import dev.profunktor.fs2rabbit.model.{AMQPChannel, PublishingFlag, ShortString}
import scodec.bits.ByteVector

Expand All @@ -31,9 +59,9 @@ import Fs2RabbitAmqpClient._
class Fs2RabbitAmqpClient[F[_]: Async](
client: RabbitClient[F],
connection: model.AMQPConnection,
// publishChannel: model.AMQPChannel,
publishChannel: model.AMQPChannel,
amqpClientConnectionManager: AmqpClientConnectionManager[F]
)(implicit amqpChannel: AMQPChannel) extends AmqpClient[F] {
) extends AmqpClient[F] {

override def declare(declarations: decl.Declaration*): F[Unit] = declare(declarations.toList)

Expand Down Expand Up @@ -63,70 +91,78 @@ class Fs2RabbitAmqpClient[F[_]: Async](
case decl.Fanout => model.ExchangeType.FanOut
}

// implicit val channel: model.AMQPChannel = publishChannel

declarations.toList
.sortBy {
case _: decl.Queue => 0
case _: decl.Exchange => 1
case _ => 2
}
.map {
case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) =>
client.declareExchange(
DeclarationExchangeConfig
.default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType))
.copy(
arguments = argumentsFromAnyRef(arguments),
durable = if (isDurable) Durable else NonDurable,
autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete,
internal = if (isInternal) Internal else NonInternal
)
) *>
bindings.traverse_ { binding =>
client.bindQueue(
model.QueueName(binding.queueName.value),
model.ExchangeName(binding.exchangeName.value),
model.RoutingKey(binding.routingKey.value),
model.QueueBindingArgs(argumentsFromAnyRef(binding.arguments))
)
}
case decl.Binding(exchangeName, queueName, routingKey, arguments) =>
client.bindQueue(
model.QueueName(queueName.value),
model.ExchangeName(exchangeName.value),
model.RoutingKey(routingKey.value),
model.QueueBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) =>
client.bindExchange(
model.ExchangeName(destinationExchangeName.value),
model.ExchangeName(sourceExchangeName.value),
model.RoutingKey(routingKey.value),
model.ExchangeBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) =>
client.declareQueue(
DeclarationQueueConfig
.default(model.QueueName(name.value))
.copy(
arguments = argumentsFromAnyRef(arguments),
durable = if (isDurable) Durable else NonDurable,
autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete,
exclusive = if (isExclusive) Exclusive else NonExclusive
)
)
}
.sequence_

}
client.createChannel(connection).use { implicit channel =>
declarations.toList
.sortBy {
case _: decl.Queue => 0
case _: decl.Exchange => 1
case _ => 2
}
.map {
case decl.Exchange(name, exchangeType, isDurable, shouldAutoDelete, isInternal, arguments, bindings) =>
client.declareExchange(
DeclarationExchangeConfig
.default(model.ExchangeName(name.value), exchangeTypeToFs2ExchangeType(exchangeType))
.copy(
arguments = argumentsFromAnyRef(arguments),
durable = if (isDurable) Durable else NonDurable,
autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete,
internal = if (isInternal) Internal else NonInternal
)
) *>
bindings.traverse_ { binding =>
client.bindQueue(
model.QueueName(binding.queueName.value),
model.ExchangeName(binding.exchangeName.value),
model.RoutingKey(binding.routingKey.value),
model.QueueBindingArgs(argumentsFromAnyRef(binding.arguments))
)
}
case decl.Binding(exchangeName, queueName, routingKey, arguments) =>
client.bindQueue(
model.QueueName(queueName.value),
model.ExchangeName(exchangeName.value),
model.RoutingKey(routingKey.value),
model.QueueBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.ExchangeBinding(destinationExchangeName, sourceExchangeName, routingKey, arguments) =>
client.bindExchange(
model.ExchangeName(destinationExchangeName.value),
model.ExchangeName(sourceExchangeName.value),
model.RoutingKey(routingKey.value),
model.ExchangeBindingArgs(argumentsFromAnyRef(arguments))
)
case decl.Queue(name, isDurable, isExclusive, shouldAutoDelete, arguments) =>
client.declareQueue(
DeclarationQueueConfig
.default(model.QueueName(name.value))
.copy(
arguments = argumentsFromAnyRef(arguments),
durable = if (isDurable) Durable else NonDurable,
autoDelete = if (shouldAutoDelete) AutoDelete else NonAutoDelete,
exclusive = if (isExclusive) Exclusive else NonExclusive
)
)
}
.sequence_
}

private def publisher2(mandatory: Boolean): F[Publisher[F, publish.PublishCommand]] = {
amqpClientConnectionManager.publish(publishCommand)
}

override def publisher(): Publisher[F, publish.PublishCommand] = (publishCommand: PublishCommand) =>
amqpClientConnectionManager.publish(publishCommand)
override def publisher(mandatory: Boolean): F[Publisher[F, publish.PublishCommand]] =
client
.createBasicPublisherWithListener[PublishCommand](
PublishingFlag(mandatory),
_ => Async[F].unit // Mandatory returns ignored here, but are handled in AmqpClientConnectionManager
)(publishChannel, implicitly)
.map { publisher => (publishCommand: PublishCommand) =>
publisher(
model.ExchangeName(publishCommand.exchange.value),
model.RoutingKey(publishCommand.exchange.value),
publishCommand
)
}
.map(amqpClientConnectionManager.addConfirmListeningToPublisher)

override def registerConsumer(
queueName: bucky.QueueName,
Expand Down Expand Up @@ -185,7 +221,7 @@ object Fs2RabbitAmqpClient {
amqpChannel = publishChannel
)
)
} yield new Fs2RabbitAmqpClient(client, connection, publishChannel, amqpClientConnectionManager)(publishChannel)
} yield new Fs2RabbitAmqpClient(client, connection, publishChannel, amqpClientConnectionManager)
}

implicit def deliveryEncoder[F[_]: Async]: MessageEncoder[F, PublishCommand] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]](
})
}

def publish(cmd: PublishCommand): F[Unit] =
def publish(cmd: PublishCommand, mandatory: Boolean): F[Unit] =
for {
deliveryTag <- Ref.of[F, Option[Long]](None)
_ <- (for {
Expand All @@ -41,7 +41,7 @@ private[bucky] case class AmqpClientConnectionManager[F[_]](
nextPublishSeq <- publishChannel.getNextPublishSeqNo
_ <- deliveryTag.set(Some(nextPublishSeq))
_ <- pendingConfirmListener.pendingConfirmations.update(_ + (nextPublishSeq -> signal))
_ <- publishChannel.publish(nextPublishSeq, cmd)
_ <- publishChannel.publish(nextPublishSeq, cmd, mandatory)
} yield ()
}
_ <- signal.get.ifM(F.unit, F.raiseError[Unit](new RuntimeException(s"Failed to publish msg: ${cmd}")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait Channel[F[_]] {
def addConfirmListener(listener: ConfirmListener): F[Unit]
def addReturnListener(listener: ReturnListener): F[Unit]
def getNextPublishSeqNo: F[Long]
def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit]
def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit]
def sendAction(action: ConsumeAction)(envelope: Envelope): F[Unit]
def declareExchange(exchange: Exchange): F[Unit]
def declareQueue(queue: Queue): F[Unit]
Expand Down Expand Up @@ -68,12 +68,12 @@ object Channel {
override def addReturnListener(listener: ReturnListener): F[Unit] = F.delay(channel.addReturnListener(listener))
override def getNextPublishSeqNo: F[Long] = F.delay(channel.getNextPublishSeqNo)

override def publish(sequenceNumber: Long, cmd: PublishCommand): F[Unit] =
override def publish(sequenceNumber: Long, cmd: PublishCommand, mandatory: Boolean): F[Unit] =
for {
_ <- F.delay(logger.debug("Publishing command with exchange:{} rk: {}.", cmd.exchange, cmd.routingKey))
_ <- F.blocking(
channel
.basicPublish(cmd.exchange.value, cmd.routingKey.value, cmd.mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value)
.basicPublish(cmd.exchange.value, cmd.routingKey.value, mandatory, false, MessagePropertiesConverters(cmd.basicProperties), cmd.body.value)
)
_ <- F.delay(logger.info("Published message: {}", cmd))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ object JavaBackendAmqpClient extends StrictLogging {
_ <- if (ended) F.unit else t.sleep(sleep) *> repeatUntil(eval)(pred)(sleep)
} yield ()

override def publisher(): Publisher[F, PublishCommand] = cmd => connectionManager.publish(cmd).evalOn(executionContext)
override def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]] = F.pure(
cmd => connectionManager.publish(cmd, mandatory).evalOn(executionContext)
)

override def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/com/itv/bucky/AmqpClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
trait AmqpClient[F[_]] {
def declare(declarations: Declaration*): F[Unit]
def declare(declarations: Iterable[Declaration]): F[Unit]
def publisher(): Publisher[F, PublishCommand]
def publisher(mandatory: Boolean = false): F[Publisher[F, PublishCommand]]
def registerConsumer(queueName: QueueName,
handler: Handler[F, Delivery],
exceptionalAction: ConsumeAction = DeadLetter,
Expand Down
Loading

0 comments on commit f92eb0f

Please sign in to comment.