Skip to content

Commit

Permalink
Add support for FIFO queues (#7)
Browse files Browse the repository at this point in the history
* Add support for FIFO queues

* Update readme for FIFO
  • Loading branch information
mercury2269 authored Feb 22, 2020
1 parent e6e564f commit 51d4b5e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
4 changes: 3 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ to move deadletter queue messages back into the original queue.
were successfully enqueued to the destination.
* Messages are sent and received in batches for faster processing.
* Progress indicator.
* Friendly output and error message.
* Queue name resolution. For ease of use, you only need to provide a queue name and not the full `arn` address.
* Message Attributes are copied over.
* Message Attributes are copied over.
* Support for FIFO queues. MessageGroupId and MessageDeduplicationId are copied over to the destination messages.

## Installation

Expand Down
15 changes: 14 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,21 @@ func logAwsError(message string, err error) {
func convertToEntries(messages []*sqs.Message) []*sqs.SendMessageBatchRequestEntry {
result := make([]*sqs.SendMessageBatchRequestEntry, len(messages))
for i, message := range messages {
result[i] = &sqs.SendMessageBatchRequestEntry{
requestEntry := &sqs.SendMessageBatchRequestEntry{
MessageBody: message.Body,
Id: message.MessageId,
MessageAttributes: message.MessageAttributes,
}

if messageGroupId, ok := message.Attributes[sqs.MessageSystemAttributeNameMessageGroupId]; ok {
requestEntry.MessageGroupId = messageGroupId
}

if messageDeduplicationId, ok := message.Attributes[sqs.MessageSystemAttributeNameMessageDeduplicationId]; ok {
requestEntry.MessageDeduplicationId = messageDeduplicationId
}

result[i] = requestEntry
}

return result
Expand All @@ -137,6 +147,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ
WaitTimeSeconds: aws.Int64(0),
MaxNumberOfMessages: aws.Int64(10),
MessageAttributeNames: []*string{aws.String(sqs.QueueAttributeNameAll)},
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameMessageGroupId),
aws.String(sqs.MessageSystemAttributeNameMessageDeduplicationId)},
}

log.Info(color.New(color.FgCyan).Sprintf("Starting to move messages..."))
Expand Down

0 comments on commit 51d4b5e

Please sign in to comment.