Skip to content

Commit

Permalink
Testing/benchmarks (#46)
Browse files Browse the repository at this point in the history
* added basic comparison benchmarks

* tweaks
  • Loading branch information
peake100 authored Apr 17, 2021
1 parent 2591f8a commit 94686f4
Show file tree
Hide file tree
Showing 19 changed files with 634 additions and 235 deletions.
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ Goals

The goals of the Roger, Rabbit package are as follows:

- **Offer a drop-in replacement for streadway/amqp**: APIs may be extended (adding
- **Offer a Drop-in Replacement for streadway/amqp**: APIs may be extended (adding
fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but
must not break existing code unless absolutely necessary.

- **Add as few additional error paths as possible**: Errors may be *extended* with
- **Add as few Additional Error Paths as Possible**: Errors may be *extended* with
additional information concerning disconnect scenarios, but new error type returns
from *Connection or *Channel should be an absolute last resort.
from `*Connection` or `*amqp.Channel` should be an absolute last resort.

- **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility
via features like middleware, in an effort to reduce the balkanization of amqp client
Expand All @@ -271,19 +271,47 @@ The goals of the Roger, Rabbit package are as follows:
Current Limitations & Warnings
------------------------------

- **Performance**: Roger, Rabbit's implementation is handled primarily through
middlewares, and a *sync.RWMutex on transports that handles blocking methods on
reconnection events. This increases the overhead on each call, but allows for an
enormous amount of extensibility and robustness, but may be a limiting factor for
applications that need the absolute maximum throughput possible.
- **Performance**: Roger, Rabbit has not been extensively benchmarked against
`streadway/amqp`. To see preliminary benchmarks, take a look at the next section.

- **Transaction Support**: Roger, Rabbit does not currently support AMQP Transactions,
- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions,
as the author does not use them. Draft PR's with possible implementations are welcome!

- **Reliability**: While the author uses this library in production, it is still early
days, and more battle-testing will be needed before this library is promoted to
version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features.

Benchmarks
----------

Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs
streadway proper. However, initial benchmarks are promising, and show only minimal
impact. For most applications, the overhead cost is likely worth the cost for ease of
development and flexibility.

Still, if absolute peak throughput is critical to an application, a less general and
more tailored approach may be warranted.

Benchmarks can be found in `./amqp/benchmark_test.go`.

Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz


| OPERATION | LIB | EXECUTIONS | NS/OP | COMPARISON
| -------------------|------|-------------|------------|------------
| QueueInspect | sw | 2,838 | 812,594 | --
| | rr | 2,470 | 813,269 | +0.1%
| Publish | sw | 7,4559 | 28,882 | --
| | rr | 7,0665 | 30,031 | +4.0%
| Publish & Confirm | sw | 3,4528 | 59,703 | --
| | rr | 3,5481 | 62,198 | +4.2%


The above numbers were calculated by running each benchmark 4 times, then taking the
fastest result for each library.

The benchmarks were run with the following command:

Acknowledgements
----------------

Expand Down
286 changes: 286 additions & 0 deletions amqp/benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package amqp_test

import (
"github.com/peake100/rogerRabbit-go/amqp"
"github.com/peake100/rogerRabbit-go/amqptest"
streadway "github.com/streadway/amqp"
"sync"
"testing"
"time"
)

var msgBody = []byte("some message")

func BenchmarkComparison_QueueInspect_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := channel.QueueInspect(queue.Name)
if err != nil {
b.Fatalf("error getting queue info: %v", err)
}
}
}

func BenchmarkComparison_QueueInspect_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := channel.QueueInspect(queue.Name)
if err != nil {
b.Fatalf("error getting queue info: %v", err)
}
}
}

func BenchmarkComparison_QueuePublish_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

b.ResetTimer()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Fatalf("error publishing message: %v", err)
}
}
}

func BenchmarkComparison_QueuePublish_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

b.ResetTimer()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Fatalf("error publishing message: %v", err)
}
}
}

func BenchmarkComparison_QueuePublishConfirm_Streadway(b *testing.B) {
channel := dialStreadway(b)
queue := setupQueue(b, channel)

err := channel.Confirm(false)
if err != nil {
b.Fatalf("error putting channel into confrimation mode")
}

confirmations := make(chan amqp.BasicConfirmation, 100)
channel.NotifyPublish(confirmations)

done := new(sync.WaitGroup)
done.Add(2)

errPublish := make(chan error, 1)

b.ResetTimer()
go func() {
defer done.Done()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Errorf("error publishing message: %v", err)
errPublish <- err
return
}
}
}()

go func() {
defer done.Done()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for i := 0; i < b.N; i++ {
timer.Reset(5 * time.Second)
select {
case confirm := <-confirmations:
if !confirm.Ack {
b.Errorf(
"publication nacked for tag %v", confirm.DeliveryTag,
)
return
}
case <-errPublish:
b.Errorf("error publishing. aborting confirmations")
return
case <-timer.C:
b.Errorf("timeout on confirmation %v", b.N)
return
}
}
}()

done.Wait()
}

func BenchmarkComparison_QueuePublishConfirm_Roger(b *testing.B) {
channel := dialRoger(b)
queue := setupQueue(b, channel)

err := channel.Confirm(false)
if err != nil {
b.Fatalf("error putting channel into confrimation mode")
}

confirmations := make(chan amqp.Confirmation, 100)
channel.NotifyPublish(confirmations)

done := new(sync.WaitGroup)
done.Add(2)

errPublish := make(chan error, 1)

b.ResetTimer()
go func() {
defer done.Done()

for i := 0; i < b.N; i++ {
err := channel.Publish(
"",
queue.Name,
true,
false,
amqp.Publishing{
Body: msgBody,
},
)
if err != nil {
b.Errorf("error publishing message: %v", err)
errPublish <- err
return
}
}
}()

go func() {
defer done.Done()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()

for i := 0; i < b.N; i++ {
timer.Reset(5 * time.Second)
select {
case confirm := <-confirmations:
if !confirm.Ack {
b.Errorf(
"publication nacked for tag %v", confirm.DeliveryTag,
)
return
}
case <-errPublish:
b.Errorf("error publishing. aborting confirmations")
return
case <-timer.C:
b.Errorf("timeout on confirmation %v", b.N)
return
}
}
}()

done.Wait()
}

// dialStreadway gets a streadway Connection
func dialStreadway(b *testing.B) *amqp.BasicChannel {
conn, err := streadway.Dial(amqptest.TestDialAddress)
if err != nil {
b.Fatalf("error dialing connection")
}
b.Cleanup(func() {
conn.Close()
})

channel, err := conn.Channel()
if err != nil {
b.Fatalf("error getting channel: %v", err)
}
return channel
}

func dialRoger(b *testing.B) *amqp.Channel {
conn, err := amqp.Dial(amqptest.TestDialAddress)
if err != nil {
b.Fatalf("error dialing connection")
}
b.Cleanup(func() {
conn.Close()
})

channel, err := conn.Channel()
if err != nil {
b.Fatalf("error getting channel: %v", err)
}
return channel
}

func setupQueue(b *testing.B, channel OrganizesQueues) amqp.Queue {
queue, err := channel.QueueDeclare(
"benchmark_queue_inspect",
false,
true,
false,
false,
nil,
)
if err != nil {
b.Fatalf("error getting queue: %v", err)
}

_, err = channel.QueuePurge(queue.Name, false)
if err != nil {
b.Fatalf("error purging queue: %v", err)
}

// Delete the queue on the way out.
b.Cleanup(func() {
channel.QueueDelete(queue.Name, false, false, false)
})

return queue
}

// publishesAndConfirms is used to run the publish anc confirm test.
type OrganizesQueues interface {
QueueDeclare(
name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table,
) (queue amqp.Queue, err error)
QueuePurge(name string, noWait bool) (count int, err error)
QueueDelete(
name string, ifUnused, ifEmpty, noWait bool,
) (count int, err error)
}
Loading

0 comments on commit 94686f4

Please sign in to comment.