diff --git a/channel.go b/channel.go index 825ac7874..dcd1eeb25 100644 --- a/channel.go +++ b/channel.go @@ -41,6 +41,10 @@ type ChannelType string // AnyChannelType is our empty channel type used when doing lookups without channel type assertions var AnyChannelType = ChannelType("") +func (ct ChannelType) String() string { + return string(ct) +} + // ChannelUUID is our typing of a channel's UUID type ChannelUUID struct { uuid.UUID diff --git a/config/courier.go b/config/courier.go index 6fd32fd89..11010e4ff 100644 --- a/config/courier.go +++ b/config/courier.go @@ -25,6 +25,9 @@ type Courier struct { MaxWorkers int `default:"32"` + LibratoUsername string `default:""` + LibratoToken string `default:""` + RapidproHandleURL string `default:"https://app.rapidpro.io/handlers/mage/handle_message"` RapidproToken string `default:"missing_rapidpro_token"` diff --git a/librato/librato.go b/librato/librato.go new file mode 100644 index 000000000..139555e90 --- /dev/null +++ b/librato/librato.go @@ -0,0 +1,152 @@ +package librato + +import ( + "bytes" + "encoding/json" + "net/http" + "strings" + "sync" + "time" + + "github.com/nyaruka/courier/utils" + "github.com/sirupsen/logrus" +) + +// Default is our default librato collector +var Default *Sender + +// NewSender creates a new librato Sender with the passed in parameters +func NewSender(waitGroup *sync.WaitGroup, username string, token string, source string, timeout time.Duration) *Sender { + return &Sender{ + waitGroup: waitGroup, + stop: make(chan bool), + + buffer: make(chan gauge, 1000), + username: username, + token: token, + source: source, + timeout: timeout, + } +} + +// AddGauge can be used to add a new gauge to be sent to librato +func (c *Sender) AddGauge(name string, value float64) { + // if no librato configured, return + if c == nil { + return + } + + // our buffer is full, log an error but continue + if len(c.buffer) >= cap(c.buffer) { + logrus.Error("unable to add new gauges, buffer full, you may want to increase your buffer size or decrease your timeout") + return + } + + c.buffer <- gauge{Name: strings.ToLower(name), Value: value, MeasureTime: time.Now().Unix()} +} + +// Start starts our librato sender, callers can use Stop to stop it +func (c *Sender) Start() { + if c == nil { + return + } + + go func() { + c.waitGroup.Add(1) + defer c.waitGroup.Done() + for { + select { + case <-c.stop: + for len(c.buffer) > 0 { + c.flush(250) + } + logrus.WithField("comp", "librato").Info("stopped") + return + + case <-time.After(c.timeout * time.Second): + for i := 0; i < 4; i++ { + c.flush(250) + } + } + } + }() +} + +func (c *Sender) flush(count int) { + if len(c.buffer) <= 0 { + return + } + + // build our payload + reqPayload := &payload{ + MeasureTime: time.Now().Unix(), + Source: c.source, + Gauges: make([]gauge, 0, len(c.buffer)), + } + + // read up to our count of gauges + for i := 0; i < count; i++ { + select { + case g := <-c.buffer: + reqPayload.Gauges = append(reqPayload.Gauges, g) + default: + break + } + } + + // send it off + encoded, err := json.Marshal(reqPayload) + if err != nil { + logrus.WithField("comp", "librato").WithError(err).Error("error encoding librato metrics") + return + } + + req, err := http.NewRequest("POST", "https://metrics-api.librato.com/v1/metrics", bytes.NewReader(encoded)) + if err != nil { + logrus.WithField("comp", "librato").WithError(err).Error("error sending librato metrics") + return + } + req.SetBasicAuth(c.username, c.token) + req.Header.Set("Content-Type", "application/json") + _, err = utils.MakeHTTPRequest(req) + + if err != nil { + logrus.WithField("comp", "librato").WithError(err).Error("error sending librato metrics") + return + } + + logrus.WithField("comp", "librato").WithField("body", string(encoded)).WithField("count", len(reqPayload.Gauges)).Debug("flushed to librato") +} + +// Stop stops our sender, callers can use the WaitGroup used during initialization to block for stop +func (c *Sender) Stop() { + if c == nil { + return + } + close(c.stop) +} + +type gauge struct { + Name string `json:"name"` + Value float64 `json:"value"` + MeasureTime int64 `json:"measure_time"` +} + +type payload struct { + MeasureTime int64 `json:"measure_time"` + Source string `json:"source"` + Gauges []gauge `json:"gauges"` +} + +// Sender is responsible for collecting gauges and sending them in batches to our librato server +type Sender struct { + waitGroup *sync.WaitGroup + stop chan bool + + buffer chan gauge + + username string + token string + source string + timeout time.Duration +} diff --git a/sender.go b/sender.go index 10b35c546..063e0c79c 100644 --- a/sender.go +++ b/sender.go @@ -1,8 +1,10 @@ package courier import ( + "fmt" "time" + "github.com/nyaruka/courier/librato" "github.com/sirupsen/logrus" ) @@ -171,11 +173,16 @@ func (w *Sender) Send() { } else { // send our message status, err = server.SendMsg(msg) + duration := time.Now().Sub(start) + secondDuration := float64(duration) / float64(time.Second) + if err != nil { status = backend.NewMsgStatusForID(msg.Channel(), msg.ID(), MsgErrored) - msgLog.WithError(err).WithField("elapsed", time.Now().Sub(start)).Error("msg errored") + msgLog.WithError(err).WithField("elapsed", duration).Error("msg errored") + librato.Default.AddGauge(fmt.Sprintf("courier.msg_send_error_%s", msg.Channel().ChannelType()), secondDuration) } else { - msgLog.WithField("elapsed", time.Now().Sub(start)).Info("msg sent") + msgLog.WithField("elapsed", duration).Info("msg sent") + librato.Default.AddGauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration) } } diff --git a/server.go b/server.go index db044fc95..42114eb48 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "net/http/httputil" + "os" "sort" "strings" "time" @@ -15,6 +16,7 @@ import ( "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/nyaruka/courier/config" + "github.com/nyaruka/courier/librato" "github.com/nyaruka/courier/utils" "github.com/pressly/lg" "github.com/sirupsen/logrus" @@ -88,6 +90,13 @@ func (s *server) Start() error { // set our user agent, needs to happen before we do anything so we don't change have threading issues utils.HTTPUserAgent = fmt.Sprintf("Courier/%s", s.config.Version) + // configure librato if we have configuration options for it + host, _ := os.Hostname() + if s.config.LibratoUsername != "" { + librato.Default = librato.NewSender(s.waitGroup, s.config.LibratoUsername, s.config.LibratoToken, host, 5) + librato.Default.Start() + } + // start our backend err := s.backend.Start() if err != nil { @@ -161,6 +170,9 @@ func (s *server) Stop() error { s.stopped = true close(s.stopChan) + // stop our librato sender + librato.Default.Stop() + // wait for everything to stop s.waitGroup.Wait() @@ -265,15 +277,18 @@ func (s *server) channelUpdateStatusWrapper(handler ChannelHandler, handlerFunc logs := make([]*ChannelLog, 0, 1) statuses, err := handlerFunc(channel, ww, r) - elapsed := time.Now().Sub(start) + duration := time.Now().Sub(start) + secondDuration := float64(duration) / float64(time.Second) + + // create channel logs for each of our msgs or errors if err != nil { WriteError(ww, r, err) - logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), response.String(), elapsed, start)) + logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), response.String(), duration, start)) + librato.Default.AddGauge(fmt.Sprintf("courier.msg_status_error_%s", channel.ChannelType()), secondDuration) } - - // create channel logs for each of our msgs for _, status := range statuses { - logs = append(logs, NewChannelLog(channel, status.ID(), r.Method, url, ww.Status(), err, string(request), response.String(), elapsed, start)) + logs = append(logs, NewChannelLog(channel, status.ID(), r.Method, url, ww.Status(), err, string(request), response.String(), duration, start)) + librato.Default.AddGauge(fmt.Sprintf("courier.msg_status_%s", channel.ChannelType()), secondDuration) } // and write these out @@ -305,15 +320,18 @@ func (s *server) channelReceiveMsgWrapper(handler ChannelHandler, handlerFunc Ch logs := make([]*ChannelLog, 0, 1) msgs, err := handlerFunc(channel, ww, r) - elapsed := time.Now().Sub(start) + duration := time.Now().Sub(start) + secondDuration := float64(duration) / float64(time.Second) + + // create channel logs for each of our msgs or errors if err != nil { WriteError(ww, r, err) - logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start)) + logs = append(logs, NewChannelLog(channel, NilMsgID, r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), duration, start)) + librato.Default.AddGauge(fmt.Sprintf("courier.msg_receive_error_%s", channel.ChannelType()), secondDuration) } - - // create channel logs for each of our msgs for _, msg := range msgs { - logs = append(logs, NewChannelLog(channel, msg.ID(), r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), elapsed, start)) + logs = append(logs, NewChannelLog(channel, msg.ID(), r.Method, url, ww.Status(), err, string(request), prependHeaders(response.String(), ww.Status(), w), duration, start)) + librato.Default.AddGauge(fmt.Sprintf("courier.msg_receive_%s", channel.ChannelType()), secondDuration) } // and write these out