SQS Consumer & Worker for Go
go get github.com/haijianyang/go-sqs-consumer
// create worker for the queue
worker := consumer.New(&consumer.Config{
Region: aws.String("region"),
QueueUrl: aws.String("url"),
}, nil)
// start worker for polling messages
worker.Start(func(message *sqs.Message) error {
fmt.Println("handle", message)
return nil
}
By default the consumer will look for AWS credentials in the places specified by the AWS SDK.
// consumer will create SQS client for worker
worker := consumer.New(&consumer.Config{
Region: aws.String("region"),
QueueUrl: aws.String("url"),
}, nil)
You can set a instance of the AWS SQS client when create worker.
// init AWS session
sess, err := session.NewSession(&aws.Config{
Region: aws.String("region")},
)
// init SQS client
svr = sqs.New(sess)
// set SQS client
worker := consumer.New(&consumer.Config{
Region: aws.String("region"),
QueueUrl: aws.String("url"),
}, svr)
worker.On(consumer.EventReceiveMessage, consumer.OnReceiveMessage(func(messages []*sqs.Message) {
fmt.Println("OnReceiveMessage", messages)
}))
worker.On(consumer.EventProcessMessage, consumer.OnProcessMessage(func(message *sqs.Message) {
fmt.Println("OnProcessMessage", message)
}))
worker.On(consumer.EventReceiveMessageError, consumer.OnReceiveMessageError(func(err error) {
fmt.Println("OnReceiveMessageError", err)
}))
func Handler(message *sqs.Message) error {
fmt.Println("handle", message)
return nil
}
worker := consumer.New(&consumer.Config{
Region: aws.String("region"),
QueueUrl: aws.String("url"),
}, nil)
// 1. do it yourself
go worker.Start(Handler)
go worker.Start(Handler)
...
// 2. automation
concurrency := 6
worker.Concurrent(Handler, concurrency)
Region
- String - The AWS region.AttributeNames
- []String - List of queue attributes to retrieve (i.e. ['All', 'ApproximateFirstReceiveTimestamp', 'ApproximateReceiveCount']).MaxNumberOfMessages
- Int64 - The maximum number of messages to return. Valid values: 1 to 10. Default: 1.MessageAttributeNames
- []String - The name of the message attribute.QueueUrl
- String - The URL of the Amazon SQS queue from which messages are received.ReceiveRequestAttemptId
- String - This parameter applies only to FIFO (first-in-first-out) queues.VisibilityTimeout
- Int64 - The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. Default: 30.WaitTimeSeconds
- Int64 - The duration (in seconds) for which the call waits for a message to arrive in the queue before returning.Idle
- Int - The number of worker to sleep.Sleep
- Int - The duration (in seconds) of worker sleep.
Creates a new SQS worker.
config
- Config - The consumer.Config, configuration for workersvr
- *sqs.SQS - The AWS SQS client
Start polling the queue for messages.
handler
- func(message *sqs.Message) error - To be called whenever a message is received
Register event listener.
event
- String - The worker events.callback
- Func - The worker event callback functions.
Event | Func | Description |
---|---|---|
EventReceiveMessage |
OnReceiveMessage func(messages []*sqs.Message) | Fired when messages is received from SQS queue. |
EventProcessMessage |
OnProcessMessage func(message *sqs.Message) | Fired when a message is successfully processed and removed from the queue. |
OnReceiveMessageError |
OnReceiveMessageError func(err error) | Fired when receiving messages from SQS queue fail. |