From 3f7588248156a4e65c1d53fc986bf6aa428833e3 Mon Sep 17 00:00:00 2001 From: Sergey Maskalik Date: Sun, 23 Feb 2020 21:19:16 -0800 Subject: [PATCH] Implement message move limit (#9) --- README.MD | 21 +++++++++++++------- main.go | 58 ++++++++++++++++++++++++++++++++----------------------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/README.MD b/README.MD index ef96f63..8efdf2a 100644 --- a/README.MD +++ b/README.MD @@ -10,14 +10,14 @@ Useful when you need to move deadletter queue messages back into the original qu ## Features -* Reliable delivery. Messages are only deleted from the source queue if they -were successfully enqueued to the destination. +* Reliable delivery. Messages are only deleted from the source queue if they were enqueued to the destination. * Messages are sent and received in batches for faster processing. * Progress indicator. * User friendly info and error messages. * Queue name resolution. For ease of use, you only need to provide a queue name and not the full `arn` address. * Message Attributes are copied over. * Support for FIFO queues. MessageGroupId and MessageDeduplicationId are copied over to the destination messages. +* Optional flag to limit the number of messages to move. ## Installation @@ -104,12 +104,14 @@ sqsmover --help usage: sqsmover --source=SOURCE --destination=DESTINATION [] Flags: - --help Show context-sensitive help (also try + -h, --help Show context-sensitive help (also try --help-long and --help-man). - -s, --source=SOURCE Source queue to move messages from - -d, --destination=DESTINATION Destination queue to move messages to - -r, --region="us-west-2" AWS Region for source and destination queues - -p, --profile="" Use a specific profile from your credential file. + -s, --source=SOURCE Source queue name to move messages from. + -d, --destination=DESTINATION Destination queue name to move messages to. + -r, --region="us-west-2" AWS region for source and destination queues. + -p, --profile="" Use a specific profile from AWS credentials file. + -l, --limit=0 Limits number of messages moved. No limit is set by default. + -v, --version Show application version. ``` Examples: @@ -130,3 +132,8 @@ Profile will default to `Default`, you can also override it with `--profile` fla sqsmover --source=my_source_queue_name --destination=my_destination_queuename --profile=user ``` +Limit number of moved messages to 10 +``` +sqsmover -s my_source_queue_name -d my_destination_queuename -l 10 +``` + diff --git a/main.go b/main.go index cc2043e..2d68854 100644 --- a/main.go +++ b/main.go @@ -25,13 +25,13 @@ var ( ) var ( - sourceQueue = kingpin.Flag("source", "Source queue to move messages from").Short('s').Required().String() - destinationQueue = kingpin.Flag("destination", "Destination queue to move messages to").Short('d').Required().String() - region = kingpin.Flag("region", "AWS Region for source and destination queues").Short('r').Default("us-west-2").String() - profile = kingpin.Flag("profile", "Use a specific profile from your credential file.").Short('p').Default("").String() + sourceQueue = kingpin.Flag("source", "Source queue name to move messages from.").Short('s').Required().String() + destinationQueue = kingpin.Flag("destination", "Destination queue name to move messages to.").Short('d').Required().String() + region = kingpin.Flag("region", "AWS region for source and destination queues.").Short('r').Default("us-west-2").String() + profile = kingpin.Flag("profile", "Use a specific profile from AWS credentials file.").Short('p').Default("").String() + limit = kingpin.Flag("limit", "Limits number of messages moved. No limit is set by default.").Short('l').Default("0").Int() ) - func main() { log.SetHandler(cli.Default) @@ -60,23 +60,23 @@ func main() { svc := sqs.New(sess) - err, sourceQueueUrl := resolveQueueUrl(svc, *sourceQueue) + sourceQueueUrl, err := resolveQueueUrl(svc, *sourceQueue) if err != nil { logAwsError("Failed to resolve source queue", err) return } - log.Info(color.New(color.FgCyan).Sprintf("Source queue url: %s", sourceQueueUrl)) + log.Info(color.New(color.FgCyan).Sprintf("Source queue URL: %s", sourceQueueUrl)) - err, destinationQueueUrl := resolveQueueUrl(svc, *destinationQueue) + destinationQueueUrl, err := resolveQueueUrl(svc, *destinationQueue) if err != nil { logAwsError("Failed to resolve destination queue", err) return } - log.Info(color.New(color.FgCyan).Sprintf("Destination queue url: %s", destinationQueueUrl)) + log.Info(color.New(color.FgCyan).Sprintf("Destination queue URL: %s", destinationQueueUrl)) queueAttributes, err := svc.GetQueueAttributes(&sqs.GetQueueAttributesInput{ QueueUrl: aws.String(sourceQueueUrl), @@ -85,29 +85,33 @@ func main() { numberOfMessages, _ := strconv.Atoi(*queueAttributes.Attributes["ApproximateNumberOfMessages"]) - log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %s", - *queueAttributes.Attributes["ApproximateNumberOfMessages"])) + log.Info(color.New(color.FgCyan).Sprintf("Approximate number of messages in the source queue: %d", numberOfMessages)) if numberOfMessages == 0 { log.Info("Looks like nothing to move. Done.") return } + if *limit > 0 && numberOfMessages > *limit { + numberOfMessages = *limit + log.Info(color.New(color.FgCyan).Sprintf("Limit is set, will only move %d messages", numberOfMessages)) + } + moveMessages(sourceQueueUrl, destinationQueueUrl, svc, numberOfMessages) } -func resolveQueueUrl(svc *sqs.SQS, queueName string) (error, string) { +func resolveQueueUrl(svc *sqs.SQS, queueName string) (string, error) { params := &sqs.GetQueueUrlInput{ QueueName: aws.String(queueName), } resp, err := svc.GetQueueUrl(params) if err != nil { - return err, "" + return "", err } - return nil, *resp.QueueUrl + return *resp.QueueUrl, nil } func logAwsError(message string, err error) { @@ -153,7 +157,7 @@ func convertSuccessfulMessageToBatchRequestEntry(messages []*sqs.Message) []*sqs return result } -func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, numberOfMessages int) { +func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQS, totalMessages int) { params := &sqs.ReceiveMessageInput{ QueueUrl: aws.String(sourceQueueUrl), VisibilityTimeout: aws.Int64(2), @@ -171,7 +175,7 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ term.HideCursor() defer term.ShowCursor() - b := progress.NewInt(numberOfMessages) + b := progress.NewInt(totalMessages) b.Width = 40 b.StartDelimiter = color.New(color.FgCyan).Sprint("|") b.EndDelimiter = color.New(color.FgCyan).Sprint("|") @@ -186,9 +190,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ for { resp, err := svc.ReceiveMessage(params) - if len(resp.Messages) == 0 { + if len(resp.Messages) == 0 || messagesProcessed == totalMessages { fmt.Println() - log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(numberOfMessages))) + log.Info(color.New(color.FgCyan).Sprintf("Done. Moved %s messages", strconv.Itoa(totalMessages))) return } @@ -197,9 +201,15 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ return } + messagesToCopy := resp.Messages + + if len(resp.Messages)+messagesProcessed > totalMessages { + messagesToCopy = resp.Messages[0 : totalMessages-messagesProcessed] + } + batch := &sqs.SendMessageBatchInput{ QueueUrl: aws.String(destinationQueueUrl), - Entries: convertToEntries(resp.Messages), + Entries: convertToEntries(messagesToCopy), } sendResp, err := svc.SendMessageBatch(batch) @@ -214,9 +224,9 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ return } - if len(sendResp.Successful) == len(resp.Messages) { + if len(sendResp.Successful) == len(messagesToCopy) { deleteMessageBatch := &sqs.DeleteMessageBatchInput{ - Entries: convertSuccessfulMessageToBatchRequestEntry(resp.Messages), + Entries: convertSuccessfulMessageToBatchRequestEntry(messagesToCopy), QueueUrl: aws.String(sourceQueueUrl), } @@ -232,11 +242,11 @@ func moveMessages(sourceQueueUrl string, destinationQueueUrl string, svc *sqs.SQ return } - messagesProcessed += len(resp.Messages) + messagesProcessed += len(messagesToCopy) } // Increase the total if the approximation was under - avoids exception - if messagesProcessed > numberOfMessages { + if messagesProcessed > totalMessages { b.Total = float64(messagesProcessed) } @@ -257,4 +267,4 @@ func buildVersion(version, commit, date, builtBy string) string { result = fmt.Sprintf("%s\nbuilt by: %s", result, builtBy) } return result -} \ No newline at end of file +}