From 8324be2901efbb060b4beae5a2ca435df915079d Mon Sep 17 00:00:00 2001 From: Ivan Kripakov <108407979+ivan-kripakov-m10@users.noreply.github.com> Date: Wed, 5 Jun 2024 13:40:02 +0400 Subject: [PATCH] Minor improvement in RabbitMQ executor (#1500) * minor improvement in rabbit mq executor * fix flaky test --- .../bins/dkron-executor-rabbitmq/README.md | 39 ++++- .../bins/dkron-executor-rabbitmq/rabbitmq.go | 139 +++++++++++++----- dkron/server_lookup_test.go | 4 +- 3 files changed, 137 insertions(+), 45 deletions(-) diff --git a/builtin/bins/dkron-executor-rabbitmq/README.md b/builtin/bins/dkron-executor-rabbitmq/README.md index e866fbc3a..276f4d084 100644 --- a/builtin/bins/dkron-executor-rabbitmq/README.md +++ b/builtin/bins/dkron-executor-rabbitmq/README.md @@ -1,4 +1,26 @@ -### Create a job to send rabbitmq text Message +### RabbitMQ Executor + +#### Executor Configuration + +The options names are inherited from the [RabbitMQ Publishers](https://www.rabbitmq.com/docs/publishers) + +| Option | Required | Description | Default | +|-----------------------|----------|-------------------------------|------------| +| url | yes | RabbitMQ connection string | - | +| queue.name | yes | Queue name to send message to | - | +| queue.create | no | Create queue if not exists | false | +| queue.durable | no | Durable queue | false | +| queue.auto_delete | no | Auto delete queue | false | +| queue.exclusive | no | Exclusive queue | false | +| message.content_type | no | Message content type | text/plain | +| message.delivery_mode | no | Message delivery mode | 0 | +| message.messageId | no | Message id | "" | +| message.body | yes | Message body | - | +| message.base64Body | yes | Base64 encoded message body | - | + +#### Example + +```shell curl localhost:8080/v1/jobs -XPOST -d '{ "name": "job1", "schedule": "@every 10s", @@ -15,8 +37,17 @@ curl localhost:8080/v1/jobs -XPOST -d '{ "concurrency": "allow", "executor": "rabbitmq", "executor_config": { - "url": "amqp://guest:guest@localhost:5672/", - "text": "hello world!", - "queue": "test" + "url": "amqp://guest:guest@localhost:5672/", + "queue.name": "test", + "queue.create": "true", + "queue.durable": "true", + "queue.auto_delete": "false", + "queue.exclusive": "false", + "message.content_type": "application/json", + "message.delivery_mode": "2", + "message.messageId": "4373732772", + "message.body": "{\"key\":\"value\"}" } }' +``` + diff --git a/builtin/bins/dkron-executor-rabbitmq/rabbitmq.go b/builtin/bins/dkron-executor-rabbitmq/rabbitmq.go index ed38ce3b7..17d9263a4 100644 --- a/builtin/bins/dkron-executor-rabbitmq/rabbitmq.go +++ b/builtin/bins/dkron-executor-rabbitmq/rabbitmq.go @@ -3,6 +3,7 @@ package main import ( "encoding/base64" "errors" + "strconv" dkplugin "github.com/distribworks/dkron/v4/plugin" dktypes "github.com/distribworks/dkron/v4/types" @@ -17,9 +18,16 @@ type RabbitMQ struct { // "executor": "rabbitmq", // // "executor_config": { -// "url": "amqp://guest:guest@localhost:5672/", // rabbitmq server url -// "text": "hello world!", // or "base64" to send bytes as rabbitmq message -// "queue": "test", // +// "url": "amqp://guest:guest@localhost:5672/", +// "queue.name": "test", +// "queue.create": "true", +// "queue.durable": "true", +// "queue.auto_delete": "false", +// "queue.exclusive": "false", +// "message.content_type": "application/json", +// "message.delivery_mode": "2", +// "message.messageId": "4373732772", +// "message.body": "{\"key\":\"value\"}" // } func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) { out, err := r.ExecuteImpl(args, cb) @@ -32,64 +40,117 @@ func (r *RabbitMQ) Execute(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelpe // ExecuteImpl do rabbitmq publish func (r *RabbitMQ) ExecuteImpl(args *dktypes.ExecuteRequest, cb dkplugin.StatusHelper) ([]byte, error) { + // validate config + cfg := args.Config + if cfg == nil { + return nil, errors.New("RabbitMQ config is empty") + } + + url := cfg["url"] + if url == "" { + return nil, errors.New("RabbitMQ url is empty") + } - if args.Config["url"] == "" { - return nil, errors.New("url is empty") + queueName := cfg["queue.name"] + if queueName == "" { + return nil, errors.New("RabbitMQ queue name is empty") } - if args.Config["queue"] == "" { - return nil, errors.New("queue is empty") + if cfg["message.body"] != "" && args.Config["message.base64"] != "" { + return nil, errors.New("RabbitMQ message.body and message.base64 are both set") } - // broker := "amqp://guest:guest@localhost:5672/" - broker := args.Config["url"] - conn, err := amqp.Dial(broker) + // establish connection + conn, err := amqp.Dial(url) if err != nil { return nil, err } - defer conn.Close() - + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + // DO NOTHING + } + }(conn) ch, err := conn.Channel() if err != nil { return nil, err } - defer ch.Close() - - queue := args.Config["queue"] - q, err := ch.QueueDeclare( - queue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + // DO NOTHING + } + }(ch) + + // create queue if necessary + if err := createQueueIfNecessary(args.Config, queueName, ch); err != nil { + return nil, err + } + + // publish message + if err = publish(cfg, ch); err != nil { return nil, err } + return nil, nil +} + +func createQueueIfNecessary(cfg map[string]string, queue string, ch *amqp.Channel) error { + if val, ok := cfg["queue.create"]; !ok || (ok && val == "false") { + return nil + } + + durable, _ := strconv.ParseBool(cfg["queue.durable"]) + autoDelete, _ := strconv.ParseBool(cfg["queue.auto_delete"]) + exclusive, _ := strconv.ParseBool(cfg["queue.exclusive"]) + + _, err := ch.QueueDeclare( + queue, + durable, + autoDelete, + exclusive, + false, + nil, + ) + + return err +} + +func publish(cfg map[string]string, ch *amqp.Channel) error { var body []byte - b64, ok := args.Config["base64"] + b64, ok := cfg["message.base64Body"] if ok { decoded, err := base64.StdEncoding.DecodeString(b64) if err != nil { - return nil, err + return err } body = decoded } else { - text := args.Config["text"] - body = []byte(text) + stringBody := cfg["message.body"] + body = []byte(stringBody) } - err = ch.Publish( - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "text/plain", - Body: body, - }) + + contentType := cfg["message.content_type"] + if contentType == "" { + contentType = "text/plain" + } + messageId := cfg["message.messageId"] + rawDeliveryMode := cfg["message.delivery_mode"] + if rawDeliveryMode == "" { + rawDeliveryMode = "0" + } + deliveryMode, err := strconv.ParseUint(rawDeliveryMode, 10, 8) if err != nil { - return nil, err + return err } - return nil, nil + return ch.Publish( + "", // exchange + cfg["queue.name"], // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: contentType, + Body: body, + MessageId: messageId, + DeliveryMode: uint8(deliveryMode), + }) } diff --git a/dkron/server_lookup_test.go b/dkron/server_lookup_test.go index 92a59aa6d..bea55123f 100644 --- a/dkron/server_lookup_test.go +++ b/dkron/server_lookup_test.go @@ -32,8 +32,8 @@ func TestAddServer(t *testing.T) { // assert servers := lookup.Servers() - expectedServers := []*ServerParts{server1, server2} - require.EqualValuesf(t, expectedServers, servers, "Expected %v but got %v", expectedServers, servers) + require.Containsf(t, servers, server1, "Expected %v to contain %+v", servers, server1) + require.Containsf(t, servers, server2, "Expected %v to contain %+v", servers, server2) got, err := lookup.ServerAddr(raft.ServerID(id1)) require.NoErrorf(t, err, "Unexpected error: %v", err)