Skip to content

Commit

Permalink
added recovering in case of activity on a closed channel
Browse files Browse the repository at this point in the history
  • Loading branch information
pawelsz-rb committed Jul 21, 2022
1 parent 01fd25f commit abb9942
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 2 deletions.
41 changes: 39 additions & 2 deletions async_transport.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package rollbar

import (
"fmt"
"runtime"
"sync"
"time"
)
Expand All @@ -21,6 +23,16 @@ type payload struct {
retriesLeft int
}

func isClosed(ch chan payload) bool {
if len(ch) == 0 {
select {
case _, ok := <-ch:
return !ok
}
}
return false
}

// NewAsyncTransport builds an asynchronous transport which sends data to the Rollbar API at the
// specified endpoint using the given access token. The channel is limited to the size of the input
// buffer argument.
Expand All @@ -38,6 +50,18 @@ func NewAsyncTransport(token string, endpoint string, buffer int) *AsyncTranspor
}

go func() {
defer func() {
if r := recover(); r != nil {
pc, _, _, _ := runtime.Caller(4)
fnName := runtime.FuncForPC(pc).Name()
if isClosed(transport.bodyChannel) {
fmt.Println(fnName, "recover: channel is closed")
} else {
fmt.Println(fnName, "recovered:", r)
}
}
}()

for p := range transport.bodyChannel {
elapsedTime := time.Now().Sub(transport.startTime).Seconds()
if elapsedTime < 0 || elapsedTime >= 60 {
Expand Down Expand Up @@ -81,7 +105,20 @@ func NewAsyncTransport(token string, endpoint string, buffer int) *AsyncTranspor

// Send the body to Rollbar if the channel is not currently full.
// Returns ErrBufferFull if the underlying channel is full.
func (t *AsyncTransport) Send(body map[string]interface{}) error {
func (t *AsyncTransport) Send(body map[string]interface{}) (err error) {
defer func() {
if r := recover(); r != nil {
pc, _, _, _ := runtime.Caller(4)
fnName := runtime.FuncForPC(pc).Name()
if _, ok := err.(*ErrBufferFull); !ok && isClosed(t.bodyChannel) {
fmt.Println(fnName, "recover: channel is closed")
t.waitGroup.Done()
err = ErrChannelClosed{}
} else {
fmt.Println(fnName, "recovered:", r)
}
}
}()
if len(t.bodyChannel) < t.Buffer {
t.waitGroup.Add(1)
p := payload{
Expand All @@ -90,7 +127,7 @@ func (t *AsyncTransport) Send(body map[string]interface{}) error {
}
t.bodyChannel <- p
} else {
err := ErrBufferFull{}
err = ErrBufferFull{}
rollbarError(t.Logger, err.Error())
if t.PrintPayloadOnError {
writePayloadToStderr(t.Logger, body)
Expand Down
12 changes: 12 additions & 0 deletions async_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ func TestAsyncTransportSendFull(t *testing.T) {
}
}

func TestAsyncTransportSendRecover(t *testing.T) {
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})

transport.Close()
result := transport.Send(nil)
if result == nil {
t.Error("Expected to receive ErrChannelClosed")
}
transport.Wait()
}

func TestAsyncTransportClose(t *testing.T) {
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
Expand Down
9 changes: 9 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,12 @@ type ErrBufferFull struct{}
func (e ErrBufferFull) Error() string {
return "buffer full, dropping error on the floor"
}

// ErrChannelClosed is an error which is returned when the asynchronous transport is used and the
// channel used for buffering items for sending to Rollbar is already closed
type ErrChannelClosed struct{}

// Error implements the error interface.
func (e ErrChannelClosed) Error() string {
return "channel is closed"
}

0 comments on commit abb9942

Please sign in to comment.