Skip to content

Commit

Permalink
Minor improvement in RabbitMQ executor (#1500)
Browse files Browse the repository at this point in the history
* minor improvement in rabbit mq executor

* fix flaky test
  • Loading branch information
ivan-kripakov-m10 authored Jun 5, 2024
1 parent 5ffe0ec commit 8324be2
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 45 deletions.
39 changes: 35 additions & 4 deletions builtin/bins/dkron-executor-rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
### Create a job to send rabbitmq text Message
### RabbitMQ Executor

#### Executor Configuration

The options names are inherited from the [RabbitMQ Publishers](https://www.rabbitmq.com/docs/publishers)

| Option | Required | Description | Default |
|-----------------------|----------|-------------------------------|------------|
| url | yes | RabbitMQ connection string | - |
| queue.name | yes | Queue name to send message to | - |
| queue.create | no | Create queue if not exists | false |
| queue.durable | no | Durable queue | false |
| queue.auto_delete | no | Auto delete queue | false |
| queue.exclusive | no | Exclusive queue | false |
| message.content_type | no | Message content type | text/plain |
| message.delivery_mode | no | Message delivery mode | 0 |
| message.messageId | no | Message id | "" |
| message.body | yes | Message body | - |
| message.base64Body | yes | Base64 encoded message body | - |

#### Example

```shell
curl localhost:8080/v1/jobs -XPOST -d '{
"name": "job1",
"schedule": "@every 10s",
Expand All @@ -15,8 +37,17 @@ curl localhost:8080/v1/jobs -XPOST -d '{
"concurrency": "allow",
"executor": "rabbitmq",
"executor_config": {
"url": "amqp://guest:guest@localhost:5672/",
"text": "hello world!",
"queue": "test"
"url": "amqp://guest:guest@localhost:5672/",
"queue.name": "test",
"queue.create": "true",
"queue.durable": "true",
"queue.auto_delete": "false",
"queue.exclusive": "false",
"message.content_type": "application/json",
"message.delivery_mode": "2",
"message.messageId": "4373732772",
"message.body": "{\"key\":\"value\"}"
}
}'
```

139 changes: 100 additions & 39 deletions builtin/bins/dkron-executor-rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/base64"
"errors"
"strconv"

dkplugin "github.com/distribworks/dkron/v4/plugin"
dktypes "github.com/distribworks/dkron/v4/types"
Expand All @@ -17,9 +18,16 @@ type RabbitMQ struct {
// "executor": "rabbitmq",
//
// "executor_config": {
// "url": "amqp://guest:guest@localhost:5672/", // rabbitmq server url
// "text": "hello world!", // or "base64" to send bytes as rabbitmq message
// "queue": "test", //
// "url": "amqp://guest:guest@localhost:5672/",
// "queue.name": "test",
// "queue.create": "true",
// "queue.durable": "true",
// "queue.auto_delete": "false",
// "queue.exclusive": "false",
// "message.content_type": "application/json",
// "message.delivery_mode": "2",
// "message.messageId": "4373732772",
// "message.body": "{\"key\":\"value\"}"
// }
func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) {
out, err := r.ExecuteImpl(args, cb)
Expand All @@ -32,64 +40,117 @@ func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelpe

// ExecuteImpl do rabbitmq publish
func (r *RabbitMQ) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) ([]byte, error) {
// validate config
cfg := args.Config
if cfg == nil {
return nil, errors.New("RabbitMQ config is empty")
}

url := cfg["url"]
if url == "" {
return nil, errors.New("RabbitMQ url is empty")
}

if args.Config["url"] == "" {
return nil, errors.New("url is empty")
queueName := cfg["queue.name"]
if queueName == "" {
return nil, errors.New("RabbitMQ queue name is empty")
}

if args.Config["queue"] == "" {
return nil, errors.New("queue is empty")
if cfg["message.body"] != "" && args.Config["message.base64"] != "" {
return nil, errors.New("RabbitMQ message.body and message.base64 are both set")
}

// broker := "amqp://guest:guest@localhost:5672/"
broker := args.Config["url"]
conn, err := amqp.Dial(broker)
// establish connection
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}
defer conn.Close()

defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
// DO NOTHING
}
}(conn)
ch, err := conn.Channel()
if err != nil {
return nil, err
}
defer ch.Close()

queue := args.Config["queue"]
q, err := ch.QueueDeclare(
queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
// DO NOTHING
}
}(ch)

// create queue if necessary
if err := createQueueIfNecessary(args.Config, queueName, ch); err != nil {
return nil, err
}

// publish message
if err = publish(cfg, ch); err != nil {
return nil, err
}
return nil, nil
}

func createQueueIfNecessary(cfg map[string]string, queue string, ch *amqp.Channel) error {
if val, ok := cfg["queue.create"]; !ok || (ok && val == "false") {
return nil
}

durable, _ := strconv.ParseBool(cfg["queue.durable"])
autoDelete, _ := strconv.ParseBool(cfg["queue.auto_delete"])
exclusive, _ := strconv.ParseBool(cfg["queue.exclusive"])

_, err := ch.QueueDeclare(
queue,
durable,
autoDelete,
exclusive,
false,
nil,
)

return err
}

func publish(cfg map[string]string, ch *amqp.Channel) error {
var body []byte
b64, ok := args.Config["base64"]
b64, ok := cfg["message.base64Body"]
if ok {
decoded, err := base64.StdEncoding.DecodeString(b64)
if err != nil {
return nil, err
return err
}
body = decoded
} else {
text := args.Config["text"]
body = []byte(text)
stringBody := cfg["message.body"]
body = []byte(stringBody)
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})

contentType := cfg["message.content_type"]
if contentType == "" {
contentType = "text/plain"
}
messageId := cfg["message.messageId"]
rawDeliveryMode := cfg["message.delivery_mode"]
if rawDeliveryMode == "" {
rawDeliveryMode = "0"
}
deliveryMode, err := strconv.ParseUint(rawDeliveryMode, 10, 8)
if err != nil {
return nil, err
return err
}
return nil, nil
return ch.Publish(
"", // exchange
cfg["queue.name"], // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: contentType,
Body: body,
MessageId: messageId,
DeliveryMode: uint8(deliveryMode),
})
}
4 changes: 2 additions & 2 deletions dkron/server_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func TestAddServer(t *testing.T) {

// assert
servers := lookup.Servers()
expectedServers := []*ServerParts{server1, server2}
require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers)
require.Containsf(t, servers, server1, "Expected %v to contain %+v", servers, server1)
require.Containsf(t, servers, server2, "Expected %v to contain %+v", servers, server2)

got, err := lookup.ServerAddr(raft.ServerID(id1))
require.NoErrorf(t, err, "Unexpected error: %v", err)
Expand Down

0 comments on commit 8324be2

Please sign in to comment.