Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to use mocks in tests #103

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 28 additions & 50 deletions executors/amqp_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executors
import (
"encoding/json"
"errors"
"fmt"

log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
Expand All @@ -18,74 +19,51 @@ type AMQPVal struct {
ReplyTo string `json:"reply_to"`
}

// DoExecute : Connection to Rabbitmq and sending message into Exchange
func (val *AMQPVal) DoExecute(requestBody interface{}, prefix string) (interface{}, error) {
log.Debugf("%s AMQP Executor: Executing amqp %s body:%v", prefix, val.getName(), requestBody)

conn, err := amqp.Dial(val.ConnectionURL)
var amqpPublishFunc = func(amqpURL, exchange, key string, contentType string, body []byte) error {
conn, err := amqp.Dial(amqpURL)
if err != nil {
log.Errorf("%s AMQP Error: %s", prefix, err.Error())
return nil, err
return fmt.Errorf("error while dialing amqp connection: %w", err)
}

defer conn.Close()

ch, err := conn.Channel()
if err != nil {
log.Errorf("%s AMQP Error: %s", prefix, err.Error())
return nil, err
return fmt.Errorf("error while creating amqp channel: %w", err)
}

defer ch.Close()

if val.ExchangeName != "" {
return sendMessageToExchange(ch, val, requestBody, prefix)
} else if val.QueueName != "" {
return sendMessageToQueue(ch, val, requestBody, prefix)
} else {
return nil, errors.New("AMQP - queue/exchange name not specified")
err = ch.Publish(exchange, key, false, false, amqp.Publishing{ContentType: contentType, Body: body})
if err != nil {
return fmt.Errorf("error while publishing amqp message: %w", err)
}

return nil
}

func sendMessageToQueue(ch *amqp.Channel, val *AMQPVal, body interface{}, prefix string) (interface{}, error) {
bytes, err := json.Marshal(body)
if err != nil {
return nil, err
}
err = ch.Publish(
"",
val.QueueName,
false,
false,
amqp.Publishing{
ContentType: val.ContentType,
Body: bytes,
})
// DoExecute : Connection to Rabbitmq and sending message into Exchange
func (val *AMQPVal) DoExecute(requestBody interface{}, prefix string) (interface{}, error) {
log.Debugf("%s AMQP Executor: Executing amqp %s body:%v", prefix, val.getName(), requestBody)

body, err := json.Marshal(requestBody)
if err != nil {
return nil, err
return nil, fmt.Errorf("error while marshaling amqp request body: %w", err)
}
log.Debugf("%s AMQP Executor: pushed message successfully", prefix)
return nil, nil
}

func sendMessageToExchange(ch *amqp.Channel, val *AMQPVal, body interface{}, prefix string) (interface{}, error) {
bytes, err := json.Marshal(body)
if err != nil {
return nil, err
if val.ExchangeName != "" {
err = amqpPublishFunc(val.ConnectionURL, val.ExchangeName, val.RoutingKey, val.ContentType, body)
} else if val.QueueName != "" {
err = amqpPublishFunc(val.ConnectionURL, "", val.QueueName, val.ContentType, body)
} else {
err = errors.New("AMQP - queue/exchange name not specified")
}

err = ch.Publish(
val.ExchangeName,
val.RoutingKey,
false,
false,
amqp.Publishing{
ContentType: val.ContentType,
Body: bytes,
})
if err != nil {
return nil, err
if err == nil {
log.Debugf("%s AMQP Executor: pushed message successfully", prefix)
}
log.Debugf("%s AMQP Executor: pushed message successfully", prefix)
return nil, nil

return nil, err
}

func (val *AMQPVal) getName() string {
Expand Down
240 changes: 163 additions & 77 deletions executors/amqp_executor_test.go
Original file line number Diff line number Diff line change
@@ -1,89 +1,175 @@
package executors

import (
"encoding/json"
"fmt"
"testing"
"time"

"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqptest"
amqptestserver "github.com/NeowayLabs/wabbit/amqptest/server"
"github.com/stretchr/testify/assert"
)

const (
testConnectionURL = "amqp://localhost:5672/test"
testQueueName = "test_queue"
testExchangeName = "test_exchange"
testRoutingKey = "test_routing_key"
testContentType = "json/application"
)

var (
testMessageBody = map[string]interface{}{"somefield": "somevlaue"}
)

func amqpTestPublish(amqpURL, exchange, key string, contentType string, body []byte) error {
conn, err := amqptest.Dial(amqpURL)
if err != nil {
return fmt.Errorf("error while dialing amqp connection: %w", err)
}

defer conn.Close()

ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("error while creating amqp channel: %w", err)
}

defer ch.Close()

err = ch.Publish(exchange, key, body, wabbit.Option{"contentType": contentType})
if err != nil {
return fmt.Errorf("error while publishing amqp message: %w", err)
}

return nil
}

func popDelivery(deliveryCh <-chan wabbit.Delivery) wabbit.Delivery {
select {
case d := <-deliveryCh:
return d
default:
return nil
}
}

func drainDeliveries(deliveryCh <-chan wabbit.Delivery) {
for {
select {
case <-deliveryCh:
default:
return
}
}
}

func TestAMQPVal_DoExecute(t *testing.T) {
type fields struct {
ConnectionURL string
QueueName string
ExchangeName string
RoutingKey string
ContentType string
amqpPublishFunc = amqpTestPublish

t.Run("should return error if connection fails", func(t *testing.T) {
val := AMQPVal{
ConnectionURL: testConnectionURL,
QueueName: testQueueName,
ExchangeName: "",
RoutingKey: "",
ContentType: testContentType,
}
_, err := val.DoExecute(testMessageBody, "")
assert.Error(t, err)
})

amqpTestServer := amqptestserver.NewServer(testConnectionURL)
amqpTestServer.Start()
defer amqpTestServer.Stop()

amqpConn, err := amqptest.Dial(testConnectionURL)
if err != nil {
t.Errorf("amqptest dial failed: %s", err)
return
}

defer amqpConn.Close()

amqpChannel, err := amqpConn.Channel()
if err != nil {
t.Errorf("amqptest channel creation failed: %s", err)
return
}
type args struct {
requestBody interface{}

defer amqpChannel.Close()

err = amqpChannel.ExchangeDeclare(testExchangeName, "topic", wabbit.Option{})
if err != nil {
t.Errorf("amqptest exchange declare failed: %s", err)
return
}

_, err = amqpChannel.QueueDeclare(testQueueName, wabbit.Option{})
if err != nil {
t.Errorf("amqptest queue declare failed: %s", err)
return
}
tests := []struct {
name string
fields fields
args args
want interface{}
wantErr bool
}{
{
name: "should send message to queue",
fields: fields{
ConnectionURL: "amqp://guest:guest@localhost:5672/",
QueueName: "test_queue",
ExchangeName: "",
RoutingKey: "test_queue",
ContentType: "text/plain",
},
args: args{
requestBody: "message body",
},
want: nil,
wantErr: false,
}, {
name: "should send message to exchange",
fields: fields{
ConnectionURL: "amqp://guest:guest@localhost:5672/",
QueueName: "",
ExchangeName: "test_exchange",
RoutingKey: "test_key",
ContentType: "text/plain",
},
args: args{
requestBody: "message body",
},
want: nil,
wantErr: false,
},
{
name: "should return error if connection fails",
fields: fields{
ConnectionURL: "amqp://test:test@localhost:5674/",
QueueName: "",
ExchangeName: "test_exchange",
RoutingKey: "test_key",
ContentType: "text/plain",
},
args: args{
requestBody: "message body",
},
want: nil,
wantErr: true,
},

err = amqpChannel.QueueBind(testQueueName, testRoutingKey, testExchangeName, wabbit.Option{})
if err != nil {
t.Errorf("amqptest queue bind failed: %s", err)
return
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
val := AMQPVal{
ConnectionURL: tt.fields.ConnectionURL,
QueueName: tt.fields.QueueName,
ExchangeName: tt.fields.ExchangeName,
RoutingKey: tt.fields.RoutingKey,
ContentType: tt.fields.ContentType,
}
got, err := val.DoExecute(tt.args.requestBody, "")
if (err != nil) != tt.wantErr {
t.Errorf("DoExecute() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != nil {
t.Errorf("DoExecute() got = %v, want %v", got, tt.want)
}
})

deliveryCh, err := amqpChannel.Consume(testQueueName, "", wabbit.Option{})
if err != nil {
t.Errorf("amqptest consume failed: %s", err)
return
}

t.Run("should send message to queue", func(t *testing.T) {
val := AMQPVal{
ConnectionURL: testConnectionURL,
QueueName: testQueueName,
ExchangeName: "",
RoutingKey: "",
ContentType: testContentType,
}
_, err := val.DoExecute(testMessageBody, "")
assert.NoError(t, err)

time.Sleep(time.Millisecond) // gives time for the delivery

d := popDelivery(deliveryCh)
if assert.NotNil(t, d) {
var messageBody interface{}
err = json.Unmarshal(d.Body(), &messageBody)
assert.NoError(t, err)
assert.Equal(t, testMessageBody, messageBody)
}
})

drainDeliveries(deliveryCh)

t.Run("should send message to exchange", func(t *testing.T) {
val := AMQPVal{
ConnectionURL: testConnectionURL,
QueueName: "",
ExchangeName: testExchangeName,
RoutingKey: testRoutingKey,
ContentType: testContentType,
}
_, err := val.DoExecute(testMessageBody, "")
assert.NoError(t, err)

time.Sleep(time.Millisecond) // gives time for the delivery

d := popDelivery(deliveryCh)
if assert.NotNil(t, d) {
var messageBody interface{}
err = json.Unmarshal(d.Body(), &messageBody)
assert.NoError(t, err)
assert.Equal(t, testMessageBody, messageBody)
}
})

drainDeliveries(deliveryCh)
}
Loading