Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing/benchmarks #46

Merged
merged 2 commits into from
Apr 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ Goals

The goals of the Roger, Rabbit package are as follows:

- **Offer a drop-in replacement for streadway/amqp**: APIs may be extended (adding
- **Offer a Drop-in Replacement for streadway/amqp**: APIs may be extended (adding
fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but
must not break existing code unless absolutely necessary.

- **Add as few additional error paths as possible**: Errors may be *extended* with
- **Add as few Additional Error Paths as Possible**: Errors may be *extended* with
additional information concerning disconnect scenarios, but new error type returns
from *Connection or *Channel should be an absolute last resort.
from `*Connection` or `*amqp.Channel` should be an absolute last resort.

- **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility
via features like middleware, in an effort to reduce the balkanization of amqp client
Expand All @@ -271,19 +271,47 @@ The goals of the Roger, Rabbit package are as follows:
Current Limitations & Warnings
------------------------------

- **Performance**: Roger, Rabbit's implementation is handled primarily through
middlewares, and a *sync.RWMutex on transports that handles blocking methods on
reconnection events. This increases the overhead on each call, but allows for an
enormous amount of extensibility and robustness, but may be a limiting factor for
applications that need the absolute maximum throughput possible.
- **Performance**: Roger, Rabbit has not been extensively benchmarked against
`streadway/amqp`. To see preliminary benchmarks, take a look at the next section.

- **Transaction Support**: Roger, Rabbit does not currently support AMQP Transactions,
- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions,
as the author does not use them. Draft PR's with possible implementations are welcome!

- **Reliability**: While the author uses this library in production, it is still early
days, and more battle-testing will be needed before this library is promoted to
version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features.

Benchmarks
----------

Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs
streadway proper. However, initial benchmarks are promising, and show only minimal
impact. For most applications, the overhead cost is likely worth the cost for ease of
development and flexibility.

Still, if absolute peak throughput is critical to an application, a less general and
more tailored approach may be warranted.

Benchmarks can be found in `./amqp/benchmark_test.go`.

Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz


| OPERATION | LIB | EXECUTIONS | NS/OP | COMPARISON
| -------------------|------|-------------|------------|------------
| QueueInspect | sw | 2,838 | 812,594 | --
| | rr | 2,470 | 813,269 | +0.1%
| Publish | sw | 7,4559 | 28,882 | --
| | rr | 7,0665 | 30,031 | +4.0%
| Publish & Confirm | sw | 3,4528 | 59,703 | --
| | rr | 3,5481 | 62,198 | +4.2%


The above numbers were calculated by running each benchmark 4 times, then taking the
fastest result for each library.

The benchmarks were run with the following command:

Acknowledgements
----------------

Expand Down
286 changes: 286 additions & 0 deletions amqp/benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package amqp_test

import (
"github.com/peake100/rogerRabbit-go/amqp"
"github.com/peake100/rogerRabbit-go/amqptest"
streadway "github.com/streadway/amqp"
"sync"
"testing"
"time"
)

var msgBody = []byte("some message")

func BenchmarkComparison_QueueInspect_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := channel.QueueInspect(queue.Name)
if err != nil {
b.Fatalf("error getting queue info: %v", err)
}
}
}

func BenchmarkComparison_QueueInspect_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := channel.QueueInspect(queue.Name)
if err != nil {
b.Fatalf("error getting queue info: %v", err)
}
}
}

func BenchmarkComparison_QueuePublish_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

b.ResetTimer()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Fatalf("error publishing message: %v", err)
}
}
}

func BenchmarkComparison_QueuePublish_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

b.ResetTimer()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Fatalf("error publishing message: %v", err)
}
}
}

func BenchmarkComparison_QueuePublishConfirm_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

err := channel.Confirm(false)
if err != nil {
b.Fatalf("error putting channel into confrimation mode")
}

confirmations := make(chan amqp.BasicConfirmation, 100)
channel.NotifyPublish(confirmations)

done := new(sync.WaitGroup)
done.Add(2)

errPublish := make(chan error, 1)

b.ResetTimer()
go func() {
defer done.Done()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Errorf("error publishing message: %v", err)
errPublish <- err
return
}
}
}()

go func() {
defer done.Done()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for i := 0; i < b.N; i++ {
timer.Reset(5 * time.Second)
select {
case confirm := <-confirmations:
if !confirm.Ack {
b.Errorf(
"publication nacked for tag %v", confirm.DeliveryTag,
)
return
}
case <-errPublish:
b.Errorf("error publishing. aborting confirmations")
return
case <-timer.C:
b.Errorf("timeout on confirmation %v", b.N)
return
}
}
}()

done.Wait()
}

func BenchmarkComparison_QueuePublishConfirm_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

err := channel.Confirm(false)
if err != nil {
b.Fatalf("error putting channel into confrimation mode")
}

confirmations := make(chan amqp.Confirmation, 100)
channel.NotifyPublish(confirmations)

done := new(sync.WaitGroup)
done.Add(2)

errPublish := make(chan error, 1)

b.ResetTimer()
go func() {
defer done.Done()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Errorf("error publishing message: %v", err)
errPublish <- err
return
}
}
}()

go func() {
defer done.Done()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for i := 0; i < b.N; i++ {
timer.Reset(5 * time.Second)
select {
case confirm := <-confirmations:
if !confirm.Ack {
b.Errorf(
"publication nacked for tag %v", confirm.DeliveryTag,
)
return
}
case <-errPublish:
b.Errorf("error publishing. aborting confirmations")
return
case <-timer.C:
b.Errorf("timeout on confirmation %v", b.N)
return
}
}
}()

done.Wait()
}

// dialStreadway gets a streadway Connection
func dialStreadway(b *testing.B) *amqp.BasicChannel {
conn, err := streadway.Dial(amqptest.TestDialAddress)
if err != nil {
b.Fatalf("error dialing connection")
}
b.Cleanup(func() {
conn.Close()
})

channel, err := conn.Channel()
if err != nil {
b.Fatalf("error getting channel: %v", err)
}
return channel
}

func dialRoger(b *testing.B) *amqp.Channel {
conn, err := amqp.Dial(amqptest.TestDialAddress)
if err != nil {
b.Fatalf("error dialing connection")
}
b.Cleanup(func() {
conn.Close()
})

channel, err := conn.Channel()
if err != nil {
b.Fatalf("error getting channel: %v", err)
}
return channel
}

func setupQueue(b *testing.B, channel OrganizesQueues) amqp.Queue {
queue, err := channel.QueueDeclare(
"benchmark_queue_inspect",
false,
true,
false,
false,
nil,
)
if err != nil {
b.Fatalf("error getting queue: %v", err)
}

_, err = channel.QueuePurge(queue.Name, false)
if err != nil {
b.Fatalf("error purging queue: %v", err)
}

// Delete the queue on the way out.
b.Cleanup(func() {
channel.QueueDelete(queue.Name, false, false, false)
})

return queue
}

// publishesAndConfirms is used to run the publish anc confirm test.
type OrganizesQueues interface {
QueueDeclare(
name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table,
) (queue amqp.Queue, err error)
QueuePurge(name string, noWait bool) (count int, err error)
QueueDelete(
name string, ifUnused, ifEmpty, noWait bool,
) (count int, err error)
}
Loading