Skip to content

Commit

Permalink
Add support for SQS message with custom message attributes (#412)
Browse files Browse the repository at this point in the history
* Add support for SQS message with custom message attributes

* Extend MessageAttributes with map of custom SQS message attributes instead of creating new types of SqsMessage and RefinedMessage

* Create separate methods with messageAttributes to send messages

Co-authored-by: Dmytro Melnychuk <[email protected]>
  • Loading branch information
meldmy and dmytro-melnychuk authored Mar 5, 2022
1 parent cc6aff0 commit 3c32cf8
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ final case class RefinedMessageWithAttributes[F[_]](
receiptHandle: RefinedReceiptHandle[F],
attributes: MessageAttributes
) extends BaseSqsMessage[F, VisibilityTimeout]
with WithAttributes
with WithAttributes
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import software.amazon.awssdk.regions.Region

import scala.jdk.CollectionConverters._
import eu.timepit.refined.api.Refined
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue

/** A version of `SimpleSqsClient` that has refined versions of method parameters. */
trait RefinedSqsClient[F[_]] {
Expand Down Expand Up @@ -39,18 +40,45 @@ trait RefinedSqsClient[F[_]] {
* @return
* The message ID string of the sent message.
*/
def sendMessage(queueUrl: String, messageBody: String): F[String]
def sendMessage(
queueUrl: String,
messageBody: String
): F[String]

/** Send a message with attributes to an SQS queue. Message delay is determined by the queue settings.
* @return
* The message ID string of the sent message.
*/
def sendMessage(
queueUrl: String,
messageBody: String,
messageAttributes: Map[String, MessageAttributeValue]
): F[String]

/** Sends a message to an SQS queue. Allows specifying the seconds to delay the message.
* @return
* The message ID string of the sent message.
*/
def sendMessage(queueUrl: String, messageBody: String, delaySeconds: Int Refined DelaySeconds): F[String]
def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int Refined DelaySeconds
): F[String]

/** Sends a message with attributes to an SQS queue. Allows specifying the seconds to delay the message.
* @return
* The message ID string of the sent message.
*/
def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int Refined DelaySeconds,
messageAttributes: Map[String, MessageAttributeValue]
): F[String]
}

object RefinedSqsClient {
def apply[F[_]: Sync](pureClient: PureSqsClient[F]) =
def apply[F[_]: Sync](pureClient: PureSqsClient[F]): RefinedSqsClient[F] =
new RefinedSqsClient[F] {
private val simpleClient = SimpleSqsClient(pureClient)
def streamMessages(
Expand Down Expand Up @@ -78,7 +106,10 @@ object RefinedSqsClient {
settings.waitTimeSeconds.value
)
.map { m =>
val attributes = MessageAttributes.fromMap(m.attributes().asScala.toMap)
val attributes = MessageAttributes.fromMap(
m.attributes().asScala.toMap,
m.messageAttributes().asScala.toMap
)
RefinedMessageWithAttributes(
m.body,
RefinedReceiptHandle(m.receiptHandle, queueUrl, this),
Expand All @@ -103,28 +134,33 @@ object RefinedSqsClient {
def sendMessage(queueUrl: String, messageBody: String): F[String] =
simpleClient.sendMessage(queueUrl, messageBody)

def sendMessage(queueUrl: String, messageBody: String, messageAttributes: Map[String, MessageAttributeValue]): F[String] =
simpleClient.sendMessage(queueUrl, messageBody, messageAttributes)

def sendMessage(queueUrl: String, messageBody: String, delaySeconds: Int Refined DelaySeconds): F[String] =
simpleClient.sendMessage(queueUrl, messageBody, delaySeconds.value)

def sendMessage(queueUrl: String, messageBody: String, delaySeconds: Int Refined DelaySeconds, messageAttributes: Map[String, MessageAttributeValue]): F[String] =
simpleClient.sendMessage(queueUrl, messageBody, delaySeconds.value, messageAttributes)
}

/** Constructs a `RefinedSqsClient` using an underlying synchronous client backend.
*
* @param awsRegion
* @param region
* The AWS region you are operating in.
* @return
* A `RefinedSqsClient` instance using a synchronous backend.
*/
def sync[F[_]: Sync](region: Region) =
def sync[F[_]: Sync](region: Region): Resource[F, RefinedSqsClient[F]] =
PureSqsClient.sync[F](region).map(apply[F])

/** Constructs a `RefinedSqsClient` using an underlying asynchronous client backend.
*
* @param awsRegion
* @param region
* The AWS region you are operating in.
* @return
* A `RefinedSqsClient` instance using an asynchronous backend.
*/
def async[F[_]: Async](region: Region) =
def async[F[_]: Async](region: Region): Resource[F, RefinedSqsClient[F]] =
PureSqsClient.async[F](region).map(apply[F])
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.rewardsnetwork.pureaws.sqs

import com.rewardsnetwork.pureaws.compat.Conversions._
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName
import software.amazon.awssdk.services.sqs.model.{MessageAttributeValue, MessageSystemAttributeName}

final case class MessageAttributes(
approximateReceiveCount: Option[Int],
Expand All @@ -10,11 +10,12 @@ final case class MessageAttributes(
messageGroupId: Option[String],
senderId: Option[String],
sentTimestampEpochMillis: Option[Long],
sequenceNumber: Option[Long]
sequenceNumber: Option[Long],
other: Map[String, MessageAttributeValue]
)

object MessageAttributes {
def fromMap(m: Map[MessageSystemAttributeName, String]): MessageAttributes = {
def fromMap(m: Map[MessageSystemAttributeName, String], other: Map[String, MessageAttributeValue]): MessageAttributes = {
import MessageSystemAttributeName._
val approxReceiveCount = m.get(APPROXIMATE_RECEIVE_COUNT).flatMap(toIntOption)
val approxFirstReceiveTimestamp = m.get(APPROXIMATE_FIRST_RECEIVE_TIMESTAMP).flatMap(toLongOption)
Expand All @@ -31,7 +32,8 @@ object MessageAttributes {
groupId,
senderId,
sentTimestamp,
sequenceNumber
sequenceNumber,
other
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,43 @@ trait SimpleSqsClient[F[_]] {
* @return
* The message ID string of the sent message.
*/
def sendMessage(queueUrl: String, messageBody: String): F[String]
def sendMessage(
queueUrl: String,
messageBody: String
): F[String]

/** Send a message with attributes to an SQS queue. Message delay is determined by the queue settings.
* @return
* The message ID string of the sent message.
*/
def sendMessage(
queueUrl: String,
messageBody: String,
messageAttributes: Map[String, MessageAttributeValue]
): F[String]

/** Sends a message to an SQS queue. Allows specifying the seconds to delay the message (valid values between 0 and
* 900).
/** Send a message to an SQS queue. Allows specifying the seconds to delay the message (valid values
* between 0 and 900).
* @return
* The message ID string of the sent message.
*/
def sendMessage(queueUrl: String, messageBody: String, delaySeconds: Int): F[String]
def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int
): F[String]

/** Send a message with attributes to an SQS queue. Allows specifying the seconds to delay the message (valid values
* between 0 and 900).
* @return
* The message ID string of the sent message.
*/
def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int,
messageAttributes: Map[String, MessageAttributeValue]
): F[String]
}

object SimpleSqsClient {
Expand All @@ -79,7 +107,7 @@ object SimpleSqsClient {
) // TODO: make the chunk size configurable
}

def apply[F[_]: Sync](client: PureSqsClient[F]) =
def apply[F[_]: Sync](client: PureSqsClient[F]): SimpleSqsClient[F] =
new SimpleSqsClient[F] {

def streamMessages(
Expand All @@ -104,7 +132,10 @@ object SimpleSqsClient {
settings.waitTimeSeconds,
receiveAttrs = true
).map { m =>
val attributes = MessageAttributes.fromMap(m.attributes().asScala.toMap)
val attributes = MessageAttributes.fromMap(
m.attributes().asScala.toMap,
m.messageAttributes().asScala.toMap
)
SqsMessageWithAttributes(m.body, ReceiptHandle(m.receiptHandle, queueUrl, this), attributes)
}
}
Expand All @@ -127,18 +158,48 @@ object SimpleSqsClient {
client.deleteMessage(req).void
}

def sendMessage(queueUrl: String, messageBody: String): F[String] = {
def sendMessage(
queueUrl: String,
messageBody: String
): F[String] = sendMessage(queueUrl, messageBody, Map.empty[String, MessageAttributeValue])

def sendMessage(
queueUrl: String,
messageBody: String,
messageAttributes: Map[String, MessageAttributeValue]
): F[String] = {
val req =
SendMessageRequest.builder.queueUrl(queueUrl).messageBody(messageBody).build()
SendMessageRequest.builder
.queueUrl(queueUrl)
.messageBody(messageBody)
.messageAttributes(messageAttributes.asJava)
.build()
client.sendMessage(req).map(_.messageId)
}

def sendMessage(queueUrl: String, messageBody: String, delaySeconds: Int): F[String] = {
def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int,
): F[String] = {
sendMessage(queueUrl, messageBody, delaySeconds, Map.empty[String, MessageAttributeValue])
}

def sendMessage(
queueUrl: String,
messageBody: String,
delaySeconds: Int,
messageAttributes: Map[String, MessageAttributeValue]
): F[String] = {
val req =
SendMessageRequest.builder.queueUrl(queueUrl).messageBody(messageBody).delaySeconds(delaySeconds).build()
SendMessageRequest.builder
.queueUrl(queueUrl)
.messageBody(messageBody)
.messageAttributes(messageAttributes.asJava)
.delaySeconds(delaySeconds)
.build()
client.sendMessage(req).map(_.messageId)
}

}

/** Constructs a `SimpleSqsClient` using an underlying synchronous client backend.
Expand Down

0 comments on commit 3c32cf8

Please sign in to comment.