From 30cb21a7e1da835dd6313d7ad651ea5aa9191626 Mon Sep 17 00:00:00 2001 From: Billy Peake Date: Sat, 17 Apr 2021 01:58:25 -0700 Subject: [PATCH 1/2] added basic comparison benchmarks --- README.md | 31 +++ amqp/benchmarks_test.go | 286 ++++++++++++++++++++++++ amqp/channel.go | 22 +- amqp/channelHandlersBuilder.go | 2 +- amqp/channel_test.go | 2 +- amqp/config.go | 2 +- amqp/connection.go | 6 +- amqp/connection_test.go | 2 +- amqp/eventRelayNotifyFlow.go | 2 +- amqp/streadwayAliases.go | 5 + amqp/transportManager.go | 23 +- amqp/transportManagerHandlersBuilder.go | 2 +- amqp/transportManagerReconnect.go | 26 ++- amqptest/testUtils.go | 18 +- makefile | 4 + zdocs/source/_static/godoc.11.html | 188 +++++++++------- zdocs/source/_static/godoc.6.html | 8 +- zdocs/source/index.rst | 99 +------- zdocs/source/overview.rst | 126 +++++++++++ 19 files changed, 628 insertions(+), 226 deletions(-) create mode 100644 amqp/benchmarks_test.go create mode 100644 zdocs/source/overview.rst diff --git a/README.md b/README.md index 4863497..6a8ebd1 100644 --- a/README.md +++ b/README.md @@ -284,6 +284,37 @@ Current Limitations & Warnings days, and more battle-testing will be needed before this library is promoted to version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features. +Benchmarks +---------- + +Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs +streadway proper. However, initial benchmarks are promising, and show only minimal +impact. For most applications, the overhead cost is likely worth the cost for ease of +development and flexibility. + +Still, if absolute peak throughput is critical to an application, a less general and +more tailored approach may be warranted. + +Benchmarks can be found in `./amqp/benchmark_test.go`. + +Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz + + +| OPERATION | LIB | EXECUTIONS | NS/OP | COMPARISON +| -------------------|------|-------------|------------|------------ +| QueueInspect | sw | 2,838 | 812,594 | -- +| | rr | 2,470 | 813,269 | +0.1% +| Publish | sw | 7,4559 | 28,882 | -- +| | rr | 7,0665 | 30,031 | +4.0% +| Publish & Confirm | sw | 3,4528 | 59,703 | -- +| | rr | 3,5481 | 62,198 | +4.2% + + +The above numbers were calculated by running each benchmark 4 times, then taking the +fastest result for each library. + +The benchmarks were run with the following command: + Acknowledgements ---------------- diff --git a/amqp/benchmarks_test.go b/amqp/benchmarks_test.go new file mode 100644 index 0000000..2f5f39b --- /dev/null +++ b/amqp/benchmarks_test.go @@ -0,0 +1,286 @@ +package amqp_test + +import ( + "github.com/peake100/rogerRabbit-go/amqp" + "github.com/peake100/rogerRabbit-go/amqptest" + streadway "github.com/streadway/amqp" + "sync" + "testing" + "time" +) + +var msgBody = []byte("some message") + +func BenchmarkComparison_QueueInspect_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := channel.QueueInspect(queue.Name) + if err != nil { + b.Fatalf("error getting queue info: %v", err) + } + } +} + +func BenchmarkComparison_QueueInspect_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := channel.QueueInspect(queue.Name) + if err != nil { + b.Fatalf("error getting queue info: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublish_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Fatalf("error publishing message: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublish_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Fatalf("error publishing message: %v", err) + } + } +} + +func BenchmarkComparison_QueuePublishConfirm_Streadway(b *testing.B) { + channel := dialStreadway(b) + queue := setupQueue(b, channel) + + err := channel.Confirm(false) + if err != nil { + b.Fatalf("error putting channel into confrimation mode") + } + + confirmations := make(chan amqp.BasicConfirmation, 100) + channel.NotifyPublish(confirmations) + + done := new(sync.WaitGroup) + done.Add(2) + + errPublish := make(chan error, 1) + + b.ResetTimer() + go func() { + defer done.Done() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Errorf("error publishing message: %v", err) + errPublish <- err + return + } + } + }() + + go func() { + defer done.Done() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for i := 0; i < b.N; i++ { + timer.Reset(5 * time.Second) + select { + case confirm := <-confirmations: + if !confirm.Ack { + b.Errorf( + "publication nacked for tag %v", confirm.DeliveryTag, + ) + return + } + case <-errPublish: + b.Errorf("error publishing. aborting confirmations") + return + case <-timer.C: + b.Errorf("timeout on confirmation %v", b.N) + return + } + } + }() + + done.Wait() +} + +func BenchmarkComparison_QueuePublishConfirm_Roger(b *testing.B) { + channel := dialRoger(b) + queue := setupQueue(b, channel) + + err := channel.Confirm(false) + if err != nil { + b.Fatalf("error putting channel into confrimation mode") + } + + confirmations := make(chan amqp.Confirmation, 100) + channel.NotifyPublish(confirmations) + + done := new(sync.WaitGroup) + done.Add(2) + + errPublish := make(chan error, 1) + + b.ResetTimer() + go func() { + defer done.Done() + + for i := 0; i < b.N; i++ { + err := channel.Publish( + "", + queue.Name, + true, + false, + amqp.Publishing{ + Body: msgBody, + }, + ) + if err != nil { + b.Errorf("error publishing message: %v", err) + errPublish <- err + return + } + } + }() + + go func() { + defer done.Done() + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for i := 0; i < b.N; i++ { + timer.Reset(5 * time.Second) + select { + case confirm := <-confirmations: + if !confirm.Ack { + b.Errorf( + "publication nacked for tag %v", confirm.DeliveryTag, + ) + return + } + case <-errPublish: + b.Errorf("error publishing. aborting confirmations") + return + case <-timer.C: + b.Errorf("timeout on confirmation %v", b.N) + return + } + } + }() + + done.Wait() +} + +// dialStreadway gets a streadway Connection +func dialStreadway(b *testing.B) *amqp.BasicChannel { + conn, err := streadway.Dial(amqptest.TestDialAddress) + if err != nil { + b.Fatalf("error dialing connection") + } + b.Cleanup(func() { + conn.Close() + }) + + channel, err := conn.Channel() + if err != nil { + b.Fatalf("error getting channel: %v", err) + } + return channel +} + +func dialRoger(b *testing.B) *amqp.Channel { + conn, err := amqp.Dial(amqptest.TestDialAddress) + if err != nil { + b.Fatalf("error dialing connection") + } + b.Cleanup(func() { + conn.Close() + }) + + channel, err := conn.Channel() + if err != nil { + b.Fatalf("error getting channel: %v", err) + } + return channel +} + +func setupQueue(b *testing.B, channel OrganizesQueues) amqp.Queue { + queue, err := channel.QueueDeclare( + "benchmark_queue_inspect", + false, + true, + false, + false, + nil, + ) + if err != nil { + b.Fatalf("error getting queue: %v", err) + } + + _, err = channel.QueuePurge(queue.Name, false) + if err != nil { + b.Fatalf("error purging queue: %v", err) + } + + // Delete the queue on the way out. + b.Cleanup(func() { + channel.QueueDelete(queue.Name, false, false, false) + }) + + return queue +} + +// publishesAndConfirms is used to run the publish anc confirm test. +type OrganizesQueues interface { + QueueDeclare( + name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table, + ) (queue amqp.Queue, err error) + QueuePurge(name string, noWait bool) (count int, err error) + QueueDelete( + name string, ifUnused, ifEmpty, noWait bool, + ) (count int, err error) +} diff --git a/amqp/channel.go b/amqp/channel.go index 812a729..84c23ed 100644 --- a/amqp/channel.go +++ b/amqp/channel.go @@ -65,7 +65,7 @@ func (channel *Channel) transportType() amqpmiddleware.TransportType { // streadway.Channel as a livesOnce interface func (channel *Channel) underlyingTransport() livesOnce { // Grab the lock and only release it once we have moved the pointer for the current - // channel into a variable. We don't want it switching out from under us as we + // channel into a variable. We don'tb want it switching out from under us as we // return. channel.underlyingChannelLock.Lock() defer channel.underlyingChannelLock.Unlock() @@ -104,7 +104,7 @@ func (channel *Channel) tryReconnect( } var result amqpmiddleware.ResultsChannelReconnect - result, err = channel.handlers.channelReconnect(channel.ctx, ags) + result, err = channel.handlers.channelReconnect(ctx, ags) if err != nil { return } @@ -246,7 +246,7 @@ func (channel *Channel) Flow(active bool) error { /* QueueDeclare declares a queue to hold messages and deliver to consumers. -Declaring creates a queue if it doesn't already exist, or ensures that an +Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters. Every queue declared gets a default binding to the empty exchange "" which has @@ -895,7 +895,7 @@ Delivery.Ack on the returned delivery when you have fully processed this delivery. When autoAck is true, the server will automatically acknowledge this message so -you don't have to. But if you are unable to fully process this message before +you don'tb have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued. --- @@ -1156,7 +1156,7 @@ func (channel *Channel) NotifyPublish( // and NotifyConfirmOrOrphaned. func notifyConfirmCloseConfirmChannels(tagChannels ...chan uint64) { mainLoop: - // Iterate over the channels and close them. We'll need to make sure we don't close + // Iterate over the channels and close them. We'll need to make sure we don'tb close // the same channel twice. for i, thisChannel := range tagChannels { // Whenever we get a new channel, compare it against all previously closed @@ -1401,7 +1401,7 @@ func (tester *ChannelTesting) ConnTest() *TransportTesting { blocks := int32(0) return &TransportTesting{ - t: tester.t, + tb: tester.tb, manager: &tester.channel.rogerConn.transportManager, blocks: &blocks, } @@ -1430,25 +1430,25 @@ func (tester *ChannelTesting) GetMiddlewareProvider( ) amqpmiddleware.ProvidesMiddleware { provider, ok := tester.channel.handlers.providers[id] if !ok { - tester.t.Errorf("no channel middleware provider %v", id) - tester.t.FailNow() + tester.tb.Errorf("no channel middleware provider %v", id) + tester.tb.FailNow() } return provider } // Test returns an object with methods for testing the Channel. -func (channel *Channel) Test(t *testing.T) *ChannelTesting { +func (channel *Channel) Test(tb testing.TB) *ChannelTesting { blocks := int32(0) chanTester := &ChannelTesting{ TransportTesting: &TransportTesting{ - t: t, + tb: tb, manager: &channel.transportManager, blocks: &blocks, }, channel: channel, } - t.Cleanup(chanTester.cleanup) + tb.Cleanup(chanTester.cleanup) return chanTester } diff --git a/amqp/channelHandlersBuilder.go b/amqp/channelHandlersBuilder.go index f78fd1b..7493683 100644 --- a/amqp/channelHandlersBuilder.go +++ b/amqp/channelHandlersBuilder.go @@ -449,7 +449,7 @@ func (builder channelHandlerBuilder) createConsume() amqpmiddleware.HandlerConsu noLocal: args.NoLocal, noWait: args.NoWait, args: args.Args, - // Make a buffered channel so we don't cause latency from waiting for queues + // Make a buffered channel so we don'tb cause latency from waiting for queues // to be ready callerDeliveryChan: make(chan datamodels.Delivery, 16), } diff --git a/amqp/channel_test.go b/amqp/channel_test.go index 88dafe4..2b8efb5 100644 --- a/amqp/channel_test.go +++ b/amqp/channel_test.go @@ -1792,7 +1792,7 @@ func (suite *ChannelMethodsSuite) Test0370_NotifyFlow() { flowEvents := make(chan bool, 2) suite.ChannelConsume().NotifyFlow(flowEvents) - // Check that we don't get flow notifications right off the bat + // Check that we don'tb get flow notifications right off the bat select { case <-flowEvents: suite.T().Error("got flow event") diff --git a/amqp/config.go b/amqp/config.go index f567c24..714a955 100644 --- a/amqp/config.go +++ b/amqp/config.go @@ -77,6 +77,6 @@ func DefaultConfig() Config { return Config{ Heartbeat: defaultHeartbeat, Locale: defaultLocale, - DefaultLoggerLevel: zerolog.InfoLevel, + DefaultLoggerLevel: zerolog.ErrorLevel, } } diff --git a/amqp/connection.go b/amqp/connection.go index d41b94b..77b9331 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -62,7 +62,7 @@ func (conn *Connection) transportType() amqpmiddleware.TransportType { // streadway.Connection as a livesOnce interface. func (conn *Connection) underlyingTransport() livesOnce { // Grab the lock and only release it once we have moved the pointer for the current - // connection into a variable. We don't want it switching out from under us as we + // connection into a variable. We don'tb want it switching out from under us as we // return. conn.underlyingConnLock.Lock() defer conn.underlyingConnLock.Unlock() @@ -183,12 +183,12 @@ func (conn *Connection) Channel() (*Channel, error) { // Test returns a ConnectionTesting object with a number of helper methods for testing // Connection objects. -func (conn *Connection) Test(t *testing.T) *ConnectionTesting { +func (conn *Connection) Test(tb testing.TB) *ConnectionTesting { blocks := int32(0) return &ConnectionTesting{ conn: conn, TransportTesting: TransportTesting{ - t: t, + tb: tb, manager: &conn.transportManager, blocks: &blocks, }, diff --git a/amqp/connection_test.go b/amqp/connection_test.go index a1afead..0537c13 100644 --- a/amqp/connection_test.go +++ b/amqp/connection_test.go @@ -394,7 +394,7 @@ func TestConnection_IsClosed(t *testing.T) { } func() { - // grab a lock on the livesOnce so we don't auto-reconnectMiddleware + // grab a lock on the livesOnce so we don'tb auto-reconnectMiddleware connTester.BlockReconnect() // release the lock to let the connection reconnectMiddleware diff --git a/amqp/eventRelayNotifyFlow.go b/amqp/eventRelayNotifyFlow.go index 2cfc75b..4d66d11 100644 --- a/amqp/eventRelayNotifyFlow.go +++ b/amqp/eventRelayNotifyFlow.go @@ -72,7 +72,7 @@ func (relay *notifyFlowRelay) RunRelayLeg(legNum int) (done bool) { } // Turn flow to false on broker disconnection if the roger channel has not been - // closed and the last notification sent was a ``true`` (we don't want to send two + // closed and the last notification sent was a ``true`` (we don'tb want to send two // false values in a row). if relay.ChannelCtx.Err() == nil && relay.lastEvent { relay.handler( diff --git a/amqp/streadwayAliases.go b/amqp/streadwayAliases.go index ab9b5bd..9718e79 100644 --- a/amqp/streadwayAliases.go +++ b/amqp/streadwayAliases.go @@ -88,6 +88,11 @@ type ( // type is desired BasicConnection = streadway.Connection + // BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made + // available to avoid having to import both amqp packages if access to the base + // Confirmation type is desired + BasicConfirmation = streadway.Confirmation + // Blocking notifies the server's TCP flow control of the Connection. When a // server hits a memory or disk alarm it will block all connections until the // resources are reclaimed. Use NotifyBlock on the Connection to receive these diff --git a/amqp/transportManager.go b/amqp/transportManager.go index 2369a56..124678c 100644 --- a/amqp/transportManager.go +++ b/amqp/transportManager.go @@ -45,7 +45,7 @@ type reconnects interface { // TestReconnectSignaler allows us to block until a reconnection occurs during a test. type TestReconnectSignaler struct { // The test we are using. - t *testing.T + tb testing.TB // reconnectSignal will close when a reconnection occurs. reconnectSignal chan struct{} @@ -70,17 +70,16 @@ func (signaler *TestReconnectSignaler) WaitOnReconnect(ctx context.Context) { select { case <-signaler.reconnectSignal: case <-ctx.Done(): - signaler.t.Error( - "context cancelled before reconnection occurred: %w", ctx.Err(), + signaler.tb.Fatalf( + "context cancelled before reconnection occurred: %v", ctx.Err(), ) - signaler.t.FailNow() } } } // TransportTesting provides testing methods for testing Channel and Connection. type TransportTesting struct { - t *testing.T + tb testing.TB manager *transportManager // The number of times a connection has been blocked from being acquired. blocks *int32 @@ -133,7 +132,7 @@ func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler { }() signaler := &TestReconnectSignaler{ - t: tester.t, + tb: tester.tb, reconnectSignal: reconnected, original: tester.manager.transport.underlyingTransport(), manager: tester.manager, @@ -145,8 +144,8 @@ func (tester *TransportTesting) SignalOnReconnect() *TestReconnectSignaler { // DisconnectTransport closes the underlying livesOnce to force a reconnection. func (tester *TransportTesting) DisconnectTransport() { err := tester.manager.transport.underlyingTransport().Close() - if !assert.NoError(tester.t, err, "close underlying livesOnce") { - tester.t.FailNow() + if !assert.NoError(tester.tb, err, "close underlying livesOnce") { + tester.tb.FailNow() } } @@ -261,7 +260,7 @@ func isRepeatErr(err error) bool { return false } -// revive:disable:context-as-argument - we have two contexts here, they can't both be first. +// revive:disable:context-as-argument - we have two contexts here, they can'tb both be first. // retryOperationOnClosedSingle attempts a Connection or Channel channel method a single // time. @@ -288,7 +287,7 @@ func (manager *transportManager) retryOperationOnClosedSingle( // occur at the same time, but blocks the connection from being switched // out until the operations resolve. // - // We don't need to worry about lock contention, as once the livesOnce + // We don'tb need to worry about lock contention, as once the livesOnce // reconnection routine requests the lock, and new read acquisitions will // be blocked until the lock is acquired and released for write. manager.transportLock.RLock() @@ -333,7 +332,7 @@ func (manager *transportManager) retryOperationOnClosed( // // We'll give one immediate retry, but after that start increasing how long // we need to wait before re-attempting. - waitDur := 5 * time.Millisecond * time.Duration(attempt-1) + waitDur := time.Second / 2 * time.Duration(attempt-1) if waitDur > maxWait { waitDur = maxWait } @@ -498,7 +497,7 @@ func (manager *transportManager) IsClosed() bool { // Test methods for the livesOnce func (manager *transportManager) Test(t *testing.T) *TransportTesting { return &TransportTesting{ - t: t, + tb: t, manager: manager, } } diff --git a/amqp/transportManagerHandlersBuilder.go b/amqp/transportManagerHandlersBuilder.go index e134afc..124a197 100644 --- a/amqp/transportManagerHandlersBuilder.go +++ b/amqp/transportManagerHandlersBuilder.go @@ -192,7 +192,7 @@ func (builder transportHandlersBuilder) createClose() amqpmiddleware.HandlerClos manager.notificationSubscriberLock.Lock() defer manager.notificationSubscriberLock.Unlock() - // Close all disconnect and connect subscribers, then clear them. We don't + // Close all disconnect and connect subscribers, then clear them. We don'tb // need to grab the lock for this since the cancelled context will keep any new // subscribers from being added. for _, subscriber := range manager.notificationSubscribersDial { diff --git a/amqp/transportManagerReconnect.go b/amqp/transportManagerReconnect.go index cd6e531..4976f9d 100644 --- a/amqp/transportManagerReconnect.go +++ b/amqp/transportManagerReconnect.go @@ -4,13 +4,16 @@ import ( "context" streadway "github.com/streadway/amqp" "sync/atomic" + "time" ) // reconnectRedialOnce attempts to reconnect the livesOnce a single time. -func (manager *transportManager) reconnectRedialOnce(ctx context.Context) error { +func (manager *transportManager) reconnectRedialOnce(ctx context.Context, attempt int) error { + opCtx := context.WithValue(ctx, "opAttempt", attempt) + // Make the connection. err := manager.transport.tryReconnect( - ctx, atomic.LoadUint64(manager.reconnectCount), + opCtx, atomic.LoadUint64(manager.reconnectCount)+uint64(attempt), ) // Send a notification to all listeners subscribed to dial events. manager.sendDialNotifications(err) @@ -32,17 +35,30 @@ func (manager *transportManager) reconnectRedial( ctx context.Context, retry bool, ) error { // Endlessly redial the broker + attempt := 0 for { // Check to see if our context has been cancelled, and exit if so. if ctx.Err() != nil { return ctx.Err() } - err := manager.reconnectRedialOnce(ctx) + err := manager.reconnectRedialOnce(ctx, attempt) // If no error OR there is an error and retry is false return. if err == nil || (err != nil && !retry) { return err } + + // We don'tb want to saturate the connection with retries if we are having + // a hard time reconnecting. + // + // We'll give one immediate retry, but after that start increasing how long + // we need to wait before re-attempting. + waitDur := time.Second / 2 * time.Duration(attempt-1) + if waitDur > maxWait { + waitDur = maxWait + } + time.Sleep(waitDur) + attempt++ } } @@ -52,7 +68,7 @@ func (manager *transportManager) reconnectListenForClose(closeChan <-chan *strea // Wait for the current connection to close disconnectEvent := <-closeChan - // Lock access to the connection and don't unlock until we have reconnected. + // Lock access to the connection and don'tb unlock until we have reconnected. manager.transportLock.Lock() defer manager.transportLock.Unlock() @@ -73,7 +89,7 @@ func (manager *transportManager) reconnectListenForClose(closeChan <-chan *strea // closure. func (manager *transportManager) reconnect(ctx context.Context, retry bool) error { // This may be called directly by Dial methods. It's okay NOT to use the lock here - // since the caller won't be handed back the Connection or Channel until the initial + // since the caller won'tb be handed back the Connection or Channel until the initial // one is established. // // Once the first connection is established, reconnectListenForClose will grab diff --git a/amqptest/testUtils.go b/amqptest/testUtils.go index fe74b93..63d893e 100644 --- a/amqptest/testUtils.go +++ b/amqptest/testUtils.go @@ -5,7 +5,6 @@ package amqptest import ( "context" "github.com/peake100/rogerRabbit-go/amqp" - "github.com/stretchr/testify/assert" "testing" ) @@ -17,20 +16,21 @@ const ( // GetTestConnection creates a new connection to amqp://localhost:57018, where our // test broker will be listening. // -// t.FailNot() is called on any errors. -func GetTestConnection(t *testing.T) *amqp.Connection { - assert := assert.New(t) +// t.FailNow() is called on any errors. +func GetTestConnection(tb testing.TB) *amqp.Connection { conn, err := amqp.DialCtx(context.Background(), TestDialAddress) - if !assert.NoError(err, "dial connection") { - t.FailNow() + if err != nil { + tb.Errorf("error dialing broker: %v", err) + tb.FailNow() } - if !assert.NotNil(conn, "connection is not nil") { - t.FailNow() + if conn == nil { + tb.Errorf("connection is nil: %v", err) + tb.FailNow() } - t.Cleanup( + tb.Cleanup( func() { _ = conn.Close() }, diff --git a/makefile b/makefile index 058d464..18eb799 100644 --- a/makefile +++ b/makefile @@ -28,6 +28,10 @@ test: -python3 ./zdevelop/make_scripts/py_open_test_reports.py -docker stop rabbittest +.PHONY: bench +bench: + -go test -p 1 -count 4 -bench=. -run=Comparisons -benchtime=2s ./... + .PHONY: lint lint: -revive -config revive.toml ./... diff --git a/zdocs/source/_static/godoc.11.html b/zdocs/source/_static/godoc.11.html index b349f71..bd82bd7 100644 --- a/zdocs/source/_static/godoc.11.html +++ b/zdocs/source/_static/godoc.11.html @@ -145,6 +145,11 @@

Index ▾

+
type BasicConfirmation
+ + + +
type BasicConnection
@@ -255,7 +260,7 @@

Index ▾

    func (channel *Channel) Reject(tag uint64, requeue bool) error
-
    func (channel *Channel) Test(t *testing.T) *ChannelTesting
+
    func (channel *Channel) Test(tb testing.TB) *ChannelTesting
    func (channel *Channel) Tx() error
@@ -478,7 +483,7 @@

Index ▾

    func (manager *Connection) NotifyDisconnect(receiver chan error) error
-
    func (conn *Connection) Test(t *testing.T) *ConnectionTesting
+
    func (conn *Connection) Test(tb testing.TB) *ConnectionTesting
@@ -888,6 +893,33 @@

type type BasicConfirmation + + + +

+

+BasicConfirmation is an alias to streadway/amqp.Confirmation, and is made +available to avoid having to import both amqp packages if access to the base +Confirmation type is desired +

+ +
type BasicConfirmation = streadway.Confirmation
+ + + + + + + + + + + + + + +

type BasicConnection @@ -915,7 +947,7 @@

type type Blocking +

type Blocking @@ -1118,7 +1150,7 @@

type

Example (Reconnect)

-

Channel channelReconnect examples. +

Channel reconnect examples.

@@ -1139,9 +1171,9 @@

type // We can use the test method to return an testing object with some additional -// methods. ForceReconnect force-closes the underlying livesOnce, causing the -// robust connection to channelReconnect. +// We can use the Test method to return a testing harness with some additional +// methods. ForceReconnect force-closes the underlying streadway Channel, causing +// the robust Channel to reconnect. // // We'll use a dummy *testing.T object here. These methods are designed for tests // only and should not be used in production code. @@ -1191,7 +1223,7 @@

type func (*Channel) Ack +

func (*Channel) Ack @@ -1226,7 +1258,7 @@

See also Delivery.Ack

-

func (*Channel) Close +

func (*Channel) Close @@ -1242,7 +1274,7 @@

func (*Channel) func (*Channel) Confirm +

func (*Channel) Confirm @@ -1291,7 +1323,7 @@

func (*Channel) func (*Channel) Consume +

func (*Channel) Consume @@ -1459,7 +1491,7 @@

func (*Channel) // Range over the consume channel for delivery := range consume { - // Force-channelReconnect the channel after each delivery. + // Force-reconnect the channel after each delivery. consumeChannel.Test(new(testing.T)).ForceReconnect(context.Background()) // Tick down the consumeComplete WaitGroup @@ -1633,7 +1665,7 @@

func (*Channel) func (*Channel) ExchangeBind +

func (*Channel) ExchangeBind @@ -1689,7 +1721,7 @@

func (*Channel) func (*Channel) ExchangeDeclare +

func (*Channel) ExchangeDeclare @@ -1779,7 +1811,7 @@

func (*Channel) func (*Channel) ExchangeDeclarePassive +

func (*Channel) ExchangeDeclarePassive @@ -1800,7 +1832,7 @@

func (*Channel) func (*Channel) ExchangeDelete +

func (*Channel) ExchangeDelete @@ -1839,7 +1871,7 @@

func (*Channel) func (*Channel) ExchangeUnbind +

func (*Channel) ExchangeUnbind @@ -1868,7 +1900,7 @@

func (*Channel) func (*Channel) func (*Channel) Flow +

func (*Channel) Flow @@ -1915,7 +1947,7 @@

func (*Channel) func (*Channel) Get +

func (*Channel) Get @@ -1941,7 +1973,7 @@

func (*Channel) func (*Channel) func (*Channel) IsClosed +

func (*Channel) IsClosed @@ -1984,7 +2016,7 @@

func (*Channel) func (*Channel) Nack +

func (*Channel) Nack @@ -2016,7 +2048,7 @@

See also Delivery.Nack

-

func (*Channel) NotifyCancel +

func (*Channel) NotifyCancel @@ -2036,7 +2068,7 @@

func (*Channel) func (*Channel) NotifyClose +

func (*Channel) NotifyClose @@ -2061,7 +2093,7 @@

func (*Channel) func (*Channel) NotifyConfirm +

func (*Channel) NotifyConfirm @@ -2090,7 +2122,7 @@

func (*Channel) func (*Channel) NotifyConfirmOrOrphaned +

func (*Channel) NotifyConfirmOrOrphaned @@ -2108,7 +2140,7 @@

func (*Channel) func (*Channel) NotifyDial +

func (*Channel) NotifyDial @@ -2127,7 +2159,7 @@

func (*Channel) func (*Channel) NotifyDisconnect +

func (*Channel) NotifyDisconnect @@ -2146,7 +2178,7 @@

func (*Channel) func (*Channel) NotifyFlow +

func (*Channel) NotifyFlow @@ -2207,7 +2239,7 @@

func (*Channel) func (*Channel) NotifyPublish +

func (*Channel) NotifyPublish @@ -2266,7 +2298,7 @@

func (*Channel) func (*Channel) NotifyReturn +

func (*Channel) NotifyReturn @@ -2295,7 +2327,7 @@

func (*Channel) func (*Channel) Publish +

func (*Channel) Publish @@ -2488,7 +2520,7 @@

func (*Channel) func (*Channel) Qos +

func (*Channel) Qos @@ -2542,7 +2574,7 @@

func (*Channel) func (*Channel) QueueBind +

func (*Channel) QueueBind @@ -2604,7 +2636,7 @@

func (*Channel) func (*Channel) QueueDeclare +

func (*Channel) QueueDeclare @@ -2614,7 +2646,7 @@

func (*Channel) Queue, err error)

QueueDeclare declares a queue to hold messages and deliver to consumers. -Declaring creates a queue if it doesn't already exist, or ensures that an +Declaring creates a queue if it doesn'tb already exist, or ensures that an existing queue matches the same parameters.

@@ -2772,7 +2804,7 @@

func (*Channel) func (*Channel) QueueDeclarePassive +

func (*Channel) QueueDeclarePassive @@ -2793,7 +2825,7 @@

func (*Channel) func (*Channel) QueueDelete +

func (*Channel) QueueDelete @@ -2835,7 +2867,7 @@

func (*Channel) func (*Channel) QueueInspect +

func (*Channel) QueueInspect @@ -2864,7 +2896,7 @@

func (*Channel) func (*Channel) QueuePurge +

func (*Channel) QueuePurge @@ -2890,7 +2922,7 @@

func (*Channel) func (*Channel) QueueUnbind +

func (*Channel) QueueUnbind @@ -2910,7 +2942,7 @@

func (*Channel) func (*Channel) Reject +

func (*Channel) Reject @@ -2942,12 +2974,12 @@

See also Delivery.Reject

-

func (*Channel) Test +

func (*Channel) Test

-
func (channel *Channel) Test(t *testing.T) *ChannelTesting
+
func (channel *Channel) Test(tb testing.TB) *ChannelTesting

Test returns an object with methods for testing the Channel.

@@ -2982,7 +3014,7 @@

func (*Channel) // test's *testing.T value. Here, we will just pass a dummy one. testHarness := channel.Test(new(testing.T)) -// We can use the test harness to force the channel to channelReconnect. If a reconnection +// We can use the test harness to force the channel to reconnect. If a reconnection // does not occur before the passed context expires, the test will be failed. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -3004,7 +3036,7 @@

func (*Channel) func (*Channel) Tx +

func (*Channel) Tx @@ -3046,7 +3078,7 @@

func (*Channel) func (*Channel) TxCommit +

func (*Channel) TxCommit @@ -3077,7 +3109,7 @@

func (*Channel) func (*Channel) TxRollback +

func (*Channel) TxRollback @@ -3847,7 +3879,7 @@

func (*ChannelMiddlewares) type ChannelTesting +

type ChannelTesting @@ -3875,7 +3907,7 @@

type func (*ChannelTesting) ConnTest +

func (*ChannelTesting) ConnTest @@ -3890,7 +3922,7 @@

func (*ChannelTesting) func (*ChannelTesting) GetMiddlewareProvider +

func (*ChannelTesting) GetMiddlewareProvider @@ -3908,7 +3940,7 @@

func (*ChannelTesting) func (*ChannelTesting) ReconnectionCount +

func (*ChannelTesting) ReconnectionCount @@ -3923,7 +3955,7 @@

func (*ChannelTesting) func (*ChannelTesting) UnderlyingChannel +

func (*ChannelTesting) UnderlyingChannel @@ -3938,7 +3970,7 @@

func (*ChannelTesting) func (*ChannelTesting) UnderlyingConnection +

func (*ChannelTesting) UnderlyingConnection @@ -4237,7 +4269,7 @@

func func (*Connection) Channel +

func (*Connection) Channel @@ -4261,7 +4293,7 @@

func (*Connection) func (*Connection) Close +

func (*Connection) Close @@ -4277,7 +4309,7 @@

func (*Connection) func (*Connection) IsClosed +

func (*Connection) IsClosed @@ -4302,7 +4334,7 @@

func (*Connection) func (*Connection) NotifyClose +

func (*Connection) NotifyClose @@ -4327,7 +4359,7 @@

func (*Connection) func (*Connection) NotifyDial +

func (*Connection) NotifyDial @@ -4346,7 +4378,7 @@

func (*Connection) func (*Connection) NotifyDisconnect +

func (*Connection) NotifyDisconnect @@ -4365,12 +4397,12 @@

func (*Connection) func (*Connection) Test +

func (*Connection) Test

-
func (conn *Connection) Test(t *testing.T) *ConnectionTesting
+
func (conn *Connection) Test(tb testing.TB) *ConnectionTesting

Test returns a ConnectionTesting object with a number of helper methods for testing Connection objects. @@ -4627,7 +4659,7 @@

func (*ConnectionTesting) type Decimal +

type Decimal @@ -4707,7 +4739,7 @@

type type Error +

type Error @@ -4733,7 +4765,7 @@

type type Publishing +

type Publishing @@ -4761,7 +4793,7 @@

type type Queue +

type Queue @@ -4787,7 +4819,7 @@

type type Return +

type Return @@ -4814,7 +4846,7 @@

type type Table +

type Table @@ -4869,7 +4901,7 @@

type type TestReconnectSignaler +

type TestReconnectSignaler @@ -4895,7 +4927,7 @@

type func (*TestReconnectSignaler) WaitOnReconnect +

func (*TestReconnectSignaler) WaitOnReconnect @@ -4917,7 +4949,7 @@

func (*TestReconnectSignaler) type TransportTesting +

type TransportTesting @@ -4943,7 +4975,7 @@

type func (*TransportTesting) BlockReconnect +

func (*TransportTesting) BlockReconnect @@ -4959,7 +4991,7 @@

func (*TransportTesting) func (*TransportTesting) DisconnectTransport +

func (*TransportTesting) DisconnectTransport @@ -4974,7 +5006,7 @@

func (*TransportTesting) func (*TransportTesting) ForceReconnect +

func (*TransportTesting) ForceReconnect @@ -4991,7 +5023,7 @@

func (*TransportTesting) func (*TransportTesting) SignalOnReconnect +

func (*TransportTesting) SignalOnReconnect @@ -5007,7 +5039,7 @@

func (*TransportTesting) func (*TransportTesting) TransportLock +

func (*TransportTesting) TransportLock @@ -5024,7 +5056,7 @@

func (*TransportTesting) func (*TransportTesting) UnblockReconnect +

func (*TransportTesting) UnblockReconnect diff --git a/zdocs/source/_static/godoc.6.html b/zdocs/source/_static/godoc.6.html index f494da9..629d573 100644 --- a/zdocs/source/_static/godoc.6.html +++ b/zdocs/source/_static/godoc.6.html @@ -117,7 +117,7 @@

Index ▾

-
func GetTestConnection(t *testing.T) *amqp.Connection
+
func GetTestConnection(tb testing.TB) *amqp.Connection
@@ -247,18 +247,18 @@

Constants

-

func GetTestConnection +

func GetTestConnection

-
func GetTestConnection(t *testing.T) *amqp.Connection
+
func GetTestConnection(tb testing.TB) *amqp.Connection

GetTestConnection creates a new connection to amqp://localhost:57018, where our test broker will be listening.

-t.FailNot() is called on any errors. +t.FailNow() is called on any errors.

diff --git a/zdocs/source/index.rst b/zdocs/source/index.rst index 87f5c1d..f079a00 100644 --- a/zdocs/source/index.rst +++ b/zdocs/source/index.rst @@ -12,6 +12,7 @@ Roger, Rabbit is broken into two packages: :maxdepth: 2 :caption: Contents: + ./overview.rst ./amqp.rst ./roger.rst @@ -180,104 +181,6 @@ Demo // Message Published and Confirmed! // Message Published and Confirmed! -Motivations ------------ - -`streadway/amqp`_, the official rabbitMQ driver for go is an excellent library with a -great API, but limited scope. By design, It offers a full implementation of the AMQP -spec, but comes with very few quality-of-life featured beyond that. From it's -documentation: - -.. code-block:: text - - Goals - - Provide a functional interface that closely represents the AMQP 0.9.1 model - targeted to RabbitMQ as a server. This includes the minimum necessary to - interact the semantics of the protocol. - - Things not intended to be supported: - - Auto reconnect and re-synchronization of client and server topologies. - - Reconnection would require understanding the error paths when the topology - cannot be declared on reconnect. This would require a new set of types and code - paths that are best suited at the call-site of this package. AMQP has a dynamic - topology that needs all peers to agree. If this doesn't happen, the behavior - is undefined. Instead of producing a possible interface with undefined - behavior, this package is designed to be simple for the caller to implement the - necessary connection-time topology declaration so that reconnection is trivial - and encapsulated in the caller's application code. - -Without a supplied way to handle reconnections, `bespoke `_ -`solutions `_ -`abound `_. - -Most of these solutions are overly-fitted to a specific problem (consumer vs producer or -involve domain-specific logic), that is prone to data races (can you spot them in the -first link?), cumbersome to inject into a production code (do we abort the business -logic on an error or try to recover in-place?), and bugs (each solution has its own -redial bugs rather than finding them in a single lib where fixes can benefit everyone -and community code coverage is high). - -Nome of this is meant to disparage the above solutions, they likely work great in the -code they were created for, but they point to a need that is not being filled by the -official driver. The nature of the default `*Channel` API encourages solutions that -are ill-suited to stateless handlers OR require you to handle retries every place you -must interact with the broker. Such implementation details can be annoying when writing -higher-level business logic and can lead to either unnecessary error returns, bespoke -solutions in every project, or messy calling code at sites which need to interact with -an AMQP broker. - -Roger, Rabbit is inspired by `aio-pika's `_ -`robust connections and channels `_, -which abstract away connection management with an identical API to their non-robust -connection and channel API's. - -.. note:: - - This library is not meant to supplant `streadway/amqp`_ (Roger, Rabbit is built on - top of it!), but an extension with quality of life features. Roger, Rabbit would not - be possible without the amazing groundwork laid down by `streadway/amqp`_. - -Goals ------ - -The goals of the Roger, Rabbit package are as follows: - -- **Offer a drop-in replacement for rogerRabbit/amqp**: APIs may be extended (adding - fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but - must not break existing code unless absolutely necessary. - -- **Add as few additional error paths as possible**: Errors may be *extended* with - additional information concerning disconnect scenarios, but new error type returns - from `*Connection` or `*amqp.Channel` should be an absolute last resort. - -- **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility - via features like middleware, in an effort to reduce the balkanization of amqp client - solutions. - -Current Limitations & Warnings ------------------------------- - -- **Performance**: Roger, Rabbit's implementation is handled primarily through - middleware and a `*sync.RWMutex` on transports that handles blocking methods on - reconnection events. This increases the overhead allows for an enormous amount of - extensibility and robustness, but may be a limiting factor for applications that need - the absolute maximum throughput possible. - -- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions, - as the author does not use them. Draft PR's with possible implementations are welcome! - -- **Reliability**: While the author uses this library in production, it is still early - days, and more battle-testing will be needed before this library is promoted to - version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features. - -.. note:: - - Currently, there are not side-by-side benchmarks for Roger, Rabbit and - `streadway/amqp`_, but PR's to add them are welcome! - API Documentation ----------------- diff --git a/zdocs/source/overview.rst b/zdocs/source/overview.rst new file mode 100644 index 0000000..316f64f --- /dev/null +++ b/zdocs/source/overview.rst @@ -0,0 +1,126 @@ +Project +======= + +Motivations +----------- + +`streadway/amqp`_, the official rabbitMQ driver for go is an excellent library with a +great API, but limited scope. By design, It offers a full implementation of the AMQP +spec, but comes with very few quality-of-life featured beyond that. From it's +documentation: + +.. code-block:: text + + Goals + + Provide a functional interface that closely represents the AMQP 0.9.1 model + targeted to RabbitMQ as a server. This includes the minimum necessary to + interact the semantics of the protocol. + + Things not intended to be supported: + + Auto reconnect and re-synchronization of client and server topologies. + + Reconnection would require understanding the error paths when the topology + cannot be declared on reconnect. This would require a new set of types and code + paths that are best suited at the call-site of this package. AMQP has a dynamic + topology that needs all peers to agree. If this doesn't happen, the behavior + is undefined. Instead of producing a possible interface with undefined + behavior, this package is designed to be simple for the caller to implement the + necessary connection-time topology declaration so that reconnection is trivial + and encapsulated in the caller's application code. + +Without a supplied way to handle reconnections, `bespoke `_ +`solutions `_ +`abound `_. + +Most of these solutions are overly-fitted to a specific problem (consumer vs producer or +involve domain-specific logic), that is prone to data races (can you spot them in the +first link?), cumbersome to inject into a production code (do we abort the business +logic on an error or try to recover in-place?), and bugs (each solution has its own +redial bugs rather than finding them in a single lib where fixes can benefit everyone +and community code coverage is high). + +Nome of this is meant to disparage the above solutions, they likely work great in the +code they were created for, but they point to a need that is not being filled by the +official driver. The nature of the default `*Channel` API encourages solutions that +are ill-suited to stateless handlers OR require you to handle retries every place you +must interact with the broker. Such implementation details can be annoying when writing +higher-level business logic and can lead to either unnecessary error returns, bespoke +solutions in every project, or messy calling code at sites which need to interact with +an AMQP broker. + +Roger, Rabbit is inspired by `aio-pika's `_ +`robust connections and channels `_, +which abstract away connection management with an identical API to their non-robust +connection and channel API's. + +.. note:: + + This library is not meant to supplant `streadway/amqp`_ (Roger, Rabbit is built on + top of it!), but an extension with quality of life features. Roger, Rabbit would not + be possible without the amazing groundwork laid down by `streadway/amqp`_. + +Goals +----- + +The goals of the Roger, Rabbit package are as follows: + +- **Offer a drop-in replacement for rogerRabbit/amqp**: APIs may be extended (adding + fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but + must not break existing code unless absolutely necessary. + +- **Add as few additional error paths as possible**: Errors may be *extended* with + additional information concerning disconnect scenarios, but new error type returns + from `*Connection` or `*amqp.Channel` should be an absolute last resort. + +- **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility + via features like middleware, in an effort to reduce the balkanization of amqp client + solutions. + +Current Limitations & Warnings +------------------------------ + +- **Performance**: Roger, Rabbit has not been extensively benchmarked against + `streadway/amqp`_. To see preliminary benchmarks, take a look at the next section. + +- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions, + as the author does not use them. Draft PR's with possible implementations are welcome! + +- **Reliability**: While the author uses this library in production, it is still early + days, and more battle-testing will be needed before this library is promoted to + version 1.0. PR's are welcome for Bug Fixes, code coverage, or new features. + +Benchmarks +---------- + +Because of Roger, Rabbit's middleware-driven design, some overhead is expected vs +streadway proper. However, initial benchmarks are promising, and show only minimal +impact. For most applications, the overhead cost is likely worth the cost for ease of +development and flexibility. + +Still, if absolute peak throughput is critical to an application, a less general and +more tailored approach may be warranted. + +Benchmarks can be found in `./amqp/benchmark_test.go`. + +.. table:: Machine: Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz || Calculated by taking the fastest of four runs. + + ================= ==== =========== ========== =========== + OPERATION LIB EXECUTIONS NS/OP COMPARISON + ================= ==== =========== ========== =========== + QueueInspect sw 2,838 812,594 -- + -- rr 2,470 813,269 +0.1% + Publish sw 7,4559 28,882 -- + -- rr 7,0665 30,031 +4.0% + Publish & Confirm sw 3,4528 59,703 -- + -- rr 3,5481 62,198 +4.2% + ================= ==== =========== ========== =========== + +Run with the following command: + +.. code-block:: shell + + go test -p 1 -count 4 -bench=. -run=Comparisons -benchtime=2s ./... + +.. _streadway/amqp: https://github.com/streadway/amqp \ No newline at end of file From 9d6803e650bd0d01362ff751ad1e73d5c88febcd Mon Sep 17 00:00:00 2001 From: Billy Peake Date: Sat, 17 Apr 2021 02:00:53 -0700 Subject: [PATCH 2/2] tweaks --- README.md | 15 ++++++--------- zdocs/source/overview.rst | 4 ++-- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 6a8ebd1..8d1a47e 100644 --- a/README.md +++ b/README.md @@ -256,13 +256,13 @@ Goals The goals of the Roger, Rabbit package are as follows: -- **Offer a drop-in replacement for streadway/amqp**: APIs may be extended (adding +- **Offer a Drop-in Replacement for streadway/amqp**: APIs may be extended (adding fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but must not break existing code unless absolutely necessary. -- **Add as few additional error paths as possible**: Errors may be *extended* with +- **Add as few Additional Error Paths as Possible**: Errors may be *extended* with additional information concerning disconnect scenarios, but new error type returns - from *Connection or *Channel should be an absolute last resort. + from `*Connection` or `*amqp.Channel` should be an absolute last resort. - **Be Highly Extensible**: Roger, Rabbit seeks to offer a high degree of extensibility via features like middleware, in an effort to reduce the balkanization of amqp client @@ -271,13 +271,10 @@ The goals of the Roger, Rabbit package are as follows: Current Limitations & Warnings ------------------------------ -- **Performance**: Roger, Rabbit's implementation is handled primarily through - middlewares, and a *sync.RWMutex on transports that handles blocking methods on - reconnection events. This increases the overhead on each call, but allows for an - enormous amount of extensibility and robustness, but may be a limiting factor for - applications that need the absolute maximum throughput possible. +- **Performance**: Roger, Rabbit has not been extensively benchmarked against + `streadway/amqp`. To see preliminary benchmarks, take a look at the next section. -- **Transaction Support**: Roger, Rabbit does not currently support AMQP Transactions, +- **Transaction Support**: Roger, Rabbit does not currently support amqp Transactions, as the author does not use them. Draft PR's with possible implementations are welcome! - **Reliability**: While the author uses this library in production, it is still early diff --git a/zdocs/source/overview.rst b/zdocs/source/overview.rst index 316f64f..5552894 100644 --- a/zdocs/source/overview.rst +++ b/zdocs/source/overview.rst @@ -66,11 +66,11 @@ Goals The goals of the Roger, Rabbit package are as follows: -- **Offer a drop-in replacement for rogerRabbit/amqp**: APIs may be extended (adding +- **Offer a Drop-in Replacement for streadway/amqp**: APIs may be extended (adding fields to `amqp.Config` or additional methods to `*amqp.Channel`, for instance) but must not break existing code unless absolutely necessary. -- **Add as few additional error paths as possible**: Errors may be *extended* with +- **Add as few Additional Error Paths as Possible**: Errors may be *extended* with additional information concerning disconnect scenarios, but new error type returns from `*Connection` or `*amqp.Channel` should be an absolute last resort.