Skip to content

Commit

Permalink
Merge pull request #84 from rollbar/pawel/set_items_per_minute
Browse files Browse the repository at this point in the history
added support for setting items per minute
  • Loading branch information
pawelsz-rb authored Jun 18, 2021
2 parents e2e8a88 + 66c2359 commit 987360b
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 95 deletions.
40 changes: 26 additions & 14 deletions async_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rollbar

import (
"sync"
"time"
)

// AsyncTransport is a concrete implementation of the Transport type which communicates with the
Expand Down Expand Up @@ -30,34 +31,45 @@ func NewAsyncTransport(token string, endpoint string, buffer int) *AsyncTranspor
Endpoint: endpoint,
RetryAttempts: DefaultRetryAttempts,
PrintPayloadOnError: true,
ItemsPerMinute: 0,
},
bodyChannel: make(chan payload, buffer),
Buffer: buffer,
}

go func() {
for p := range transport.bodyChannel {
canRetry, err := transport.post(p.body)
if err != nil {
if canRetry && p.retriesLeft > 0 {
p.retriesLeft -= 1
select {
case transport.bodyChannel <- p:
default:
// This can happen if the bodyChannel had an item added to it from another
// thread while we are processing such that the channel is now full. If we try
// to send the payload back to the channel without this select statement we
// could deadlock. Instead we consider this a retry failure.
elapsedTime := time.Now().Sub(transport.startTime).Seconds()
if elapsedTime < 0 || elapsedTime >= 60 {
transport.startTime = time.Now()
transport.perMinCounter = 0
}
if transport.shouldSend() {
canRetry, err := transport.post(p.body)
if err != nil {
if canRetry && p.retriesLeft > 0 {
p.retriesLeft -= 1
select {
case transport.bodyChannel <- p:
default:
// This can happen if the bodyChannel had an item added to it from another
// thread while we are processing such that the channel is now full. If we try
// to send the payload back to the channel without this select statement we
// could deadlock. Instead we consider this a retry failure.
if transport.PrintPayloadOnError {
writePayloadToStderr(transport.Logger, p.body)
}
transport.waitGroup.Done()
}
} else {
if transport.PrintPayloadOnError {
writePayloadToStderr(transport.Logger, p.body)
}
transport.waitGroup.Done()
}
} else {
if transport.PrintPayloadOnError {
writePayloadToStderr(transport.Logger, p.body)
}
transport.waitGroup.Done()
transport.perMinCounter++
}
} else {
transport.waitGroup.Done()
Expand Down
49 changes: 37 additions & 12 deletions async_transport_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package rollbar_test
package rollbar

import (
"github.com/rollbar/rollbar-go"
"testing"
)

func TestAsyncTransportSend(t *testing.T) {
transport := rollbar.NewAsyncTransport("", "", 1)
transport.SetLogger(&rollbar.SilentClientLogger{})
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
body := map[string]interface{}{
"hello": "world",
}
Expand All @@ -19,8 +18,8 @@ func TestAsyncTransportSend(t *testing.T) {
}

func TestAsyncTransportSendFull(t *testing.T) {
transport := rollbar.NewAsyncTransport("", "", 1)
transport.SetLogger(&rollbar.SilentClientLogger{})
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
body := map[string]interface{}{
"hello": "world",
}
Expand All @@ -31,20 +30,23 @@ func TestAsyncTransportSendFull(t *testing.T) {
t.Error("Expected to receive ErrBufferFull")
}
transport.Wait()
if transport.perMinCounter != 1 {
t.Error("shouldSend check failed")
}
}

func TestAsyncTransportClose(t *testing.T) {
transport := rollbar.NewAsyncTransport("", "", 1)
transport.SetLogger(&rollbar.SilentClientLogger{})
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
result := transport.Close()
if result != nil {
t.Error("Close returned an unexpected error:", result)
}
}

func TestAsyncTransportSetToken(t *testing.T) {
transport := rollbar.NewAsyncTransport("", "", 1)
transport.SetLogger(&rollbar.SilentClientLogger{})
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
token := "abc"
transport.SetToken(token)
if transport.Token != token {
Expand All @@ -53,11 +55,34 @@ func TestAsyncTransportSetToken(t *testing.T) {
}

func TestAsyncTransportSetEndpoint(t *testing.T) {
transport := rollbar.NewAsyncTransport("", "", 1)
transport.SetLogger(&rollbar.SilentClientLogger{})
transport := NewAsyncTransport("", "", 1)
transport.SetLogger(&SilentClientLogger{})
endpoint := "https://fake.com"
transport.SetEndpoint(endpoint)
if transport.Endpoint != endpoint {
t.Error("SetEndpoint failed")
}
}

func TestAsyncTransportNotSend(t *testing.T) {
transport := NewAsyncTransport("", "", 2)
transport.SetLogger(&SilentClientLogger{})
transport.SetItemsPerMinute(1)
if transport.ItemsPerMinute != 1 {
t.Error("SetItemsPerMinute failed")
}

body := map[string]interface{}{
"hello": "world",
}

transport.Send(body)
result := transport.Send(body)
if result != nil {
t.Error("Send returned an unexpected error:", result)
}
transport.Wait()
if transport.perMinCounter != 1 {
t.Error("shouldSend check failed")
}
}
21 changes: 21 additions & 0 deletions base_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package rollbar
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
)

type baseTransport struct {
Expand All @@ -24,8 +26,13 @@ type baseTransport struct {
// PrintPayloadOnError is whether or not to output the payload to the set logger or to stderr if
// an error occurs during transport to the Rollbar API.
PrintPayloadOnError bool
// ItemsPerMinute has the max number of items to send in a given minute
ItemsPerMinute int
// custom http client (http.DefaultClient used by default)
httpClient *http.Client

perMinCounter int
startTime time.Time
}

// SetToken updates the token to use for future API requests.
Expand All @@ -38,6 +45,11 @@ func (t *baseTransport) SetEndpoint(endpoint string) {
t.Endpoint = endpoint
}

// SetItemsPerMinute sets the max number of items to send in a given minute
func (t *baseTransport) SetItemsPerMinute(itemsPerMinute int) {
t.ItemsPerMinute = itemsPerMinute
}

// SetLogger updates the logger that this transport uses for reporting errors that occur while
// processing items.
func (t *baseTransport) SetLogger(logger ClientLogger) {
Expand Down Expand Up @@ -106,3 +118,12 @@ func (t *baseTransport) post(body map[string]interface{}) (bool, error) {

return false, nil
}

func (t *baseTransport) shouldSend() bool {
if t.ItemsPerMinute > 0 && t.perMinCounter >= t.ItemsPerMinute {
rollbarError(t.Logger, fmt.Sprintf("item per minute limit reached: %d occurences, "+
"ignoring errors until timeout", t.perMinCounter))
return false
}
return true
}
83 changes: 48 additions & 35 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ func (c *Client) SetRetryAttempts(retryAttempts int) {
c.Transport.SetRetryAttempts(retryAttempts)
}

// SetItemsPerMinute sets the max number of items to send in a given minute
func (c *Client) SetItemsPerMinute(itemsPerMinute int) {
c.configuration.itemsPerMinute = itemsPerMinute
c.Transport.SetItemsPerMinute(itemsPerMinute)
}

// SetPrintPayloadOnError sets whether or not to output the payload to the set logger or to
// stderr if an error occurs during transport to the Rollbar API. For example, if you hit
// your rate limit and we run out of retry attempts, then if this is true we will output the
Expand All @@ -258,6 +264,11 @@ func (c *Client) Token() string {
return c.configuration.token
}

// ItemsPerMinute is the currently set Rollbar items per minute
func (c *Client) ItemsPerMinute() int {
return c.configuration.itemsPerMinute
}

// Environment is the currently set environment underwhich all errors and
// messages will be submitted.
func (c *Client) Environment() string {
Expand Down Expand Up @@ -652,24 +663,25 @@ const (
)

type configuration struct {
enabled bool
token string
environment string
platform string
codeVersion string
serverHost string
serverRoot string
endpoint string
custom map[string]interface{}
fingerprint bool
scrubHeaders *regexp.Regexp
scrubFields *regexp.Regexp
checkIgnore func(string) bool
transform func(map[string]interface{})
unwrapper UnwrapperFunc
stackTracer StackTracerFunc
person Person
captureIp captureIp
enabled bool
token string
environment string
platform string
codeVersion string
serverHost string
serverRoot string
endpoint string
custom map[string]interface{}
fingerprint bool
scrubHeaders *regexp.Regexp
scrubFields *regexp.Regexp
checkIgnore func(string) bool
transform func(map[string]interface{})
unwrapper UnwrapperFunc
stackTracer StackTracerFunc
person Person
captureIp captureIp
itemsPerMinute int
}

func createConfiguration(token, environment, codeVersion, serverHost, serverRoot string) configuration {
Expand All @@ -678,23 +690,24 @@ func createConfiguration(token, environment, codeVersion, serverHost, serverRoot
hostname, _ = os.Hostname()
}
return configuration{
enabled: true,
token: token,
environment: environment,
platform: runtime.GOOS,
endpoint: "https://api.rollbar.com/api/1/item/",
scrubHeaders: regexp.MustCompile("Authorization"),
scrubFields: regexp.MustCompile("password|secret|token"),
codeVersion: codeVersion,
serverHost: hostname,
serverRoot: serverRoot,
fingerprint: false,
checkIgnore: func(_s string) bool { return false },
transform: func(_d map[string]interface{}) {},
unwrapper: DefaultUnwrapper,
stackTracer: DefaultStackTracer,
person: Person{},
captureIp: CaptureIpFull,
enabled: true,
token: token,
environment: environment,
platform: runtime.GOOS,
endpoint: "https://api.rollbar.com/api/1/item/",
scrubHeaders: regexp.MustCompile("Authorization"),
scrubFields: regexp.MustCompile("password|secret|token"),
codeVersion: codeVersion,
serverHost: hostname,
serverRoot: serverRoot,
fingerprint: false,
checkIgnore: func(_s string) bool { return false },
transform: func(_d map[string]interface{}) {},
unwrapper: DefaultUnwrapper,
stackTracer: DefaultStackTracer,
person: Person{},
captureIp: CaptureIpFull,
itemsPerMinute: 0,
}
}

Expand Down
7 changes: 6 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (t *TestTransport) SetLogger(_l rollbar.ClientLogger) {}
func (t *TestTransport) SetRetryAttempts(_r int) {}
func (t *TestTransport) SetPrintPayloadOnError(_p bool) {}
func (t *TestTransport) SetHTTPClient(_c *http.Client) {}
func (t *TestTransport) SetItemsPerMinute(_r int) {}
func (t *TestTransport) Send(body map[string]interface{}) error {
t.Body = body
return nil
Expand Down Expand Up @@ -425,6 +426,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) {
scrubHeaders := regexp.MustCompile("Foo")
scrubFields := regexp.MustCompile("squirrel|doggo")
captureIP := rollbar.CaptureIpNone
itemsPerMinute := 10

errorIfEqual(token, client.Token(), t)
errorIfEqual(environment, client.Environment(), t)
Expand All @@ -438,6 +440,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) {
errorIfEqual(scrubHeaders, client.ScrubHeaders(), t)
errorIfEqual(scrubHeaders, client.Telemetry.Network.ScrubHeaders, t)
errorIfEqual(scrubFields, client.ScrubFields(), t)
errorIfEqual(itemsPerMinute, client.ItemsPerMinute(), t)

if client.Fingerprint() {
t.Error("expected fingerprint to default to false")
Expand Down Expand Up @@ -468,6 +471,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) {
client.SetTelemetry()

client.SetEnabled(true)
client.SetItemsPerMinute(itemsPerMinute)

errorIfNotEqual(token, client.Token(), t)
errorIfNotEqual(environment, client.Environment(), t)
Expand All @@ -481,6 +485,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) {
errorIfNotEqual(scrubHeaders, client.ScrubHeaders(), t)
errorIfNotEqual(scrubHeaders, client.Telemetry.Network.ScrubHeaders, t)
errorIfNotEqual(scrubFields, client.ScrubFields(), t)
errorIfNotEqual(itemsPerMinute, client.ItemsPerMinute(), t)

if !client.Fingerprint() {
t.Error("expected fingerprint to default to false")
Expand Down Expand Up @@ -513,7 +518,7 @@ func testGettersAndSetters(client *rollbar.Client, t *testing.T) {
errorIfNotEqual(fingerprint, configuredOptions["fingerprint"].(bool), t)
errorIfNotEqual(scrubHeaders, configuredOptions["scrubHeaders"].(*regexp.Regexp), t)
errorIfNotEqual(scrubFields, configuredOptions["scrubFields"].(*regexp.Regexp), t)

errorIfNotEqual(itemsPerMinute, configuredOptions["itemsPerMinute"].(int), t)
} else {
t.Fail()
}
Expand Down
5 changes: 5 additions & 0 deletions rollbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func SetEndpoint(endpoint string) {
std.SetEndpoint(endpoint)
}

// SetItemsPerMinute sets the max number of items to send in a given minute
func SetItemsPerMinute(itemsPerMinute int) {
std.SetItemsPerMinute(itemsPerMinute)
}

// SetPlatform sets the platform on the managed Client instance.
// The platform is reported for all Rollbar items. The default is
// the running operating system (darwin, freebsd, linux, etc.) but it can
Expand Down
Loading

0 comments on commit 987360b

Please sign in to comment.