Skip to content

Commit

Permalink
New object: ErrChan for concurrent err handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Jun 2, 2016
1 parent 1aabd38 commit 2c448e2
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 54 deletions.
37 changes: 37 additions & 0 deletions internal/errchan/errchan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package errchan

import (
"fmt"
"strings"
)

type ErrChan struct {
C chan error
}

// New returns an error channel of max length 'n'
// errors can be sent to the ErrChan.C channel, and will be returned when
// ErrChan.Error() is called.
func New(n int) *ErrChan {
return &ErrChan{
C: make(chan error, n),
}
}

// Error closes the ErrChan.C channel and returns an error if there are any
// non-nil errors, otherwise returns nil.
func (e *ErrChan) Error() error {
close(e.C)

var out string
for err := range e.C {
if err != nil {
out += "[" + err.Error() + "], "
}
}

if out != "" {
return fmt.Errorf("Errors encountered: " + strings.TrimRight(out, ", "))
}
return nil
}
20 changes: 11 additions & 9 deletions plugins/inputs/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudwatch
import (
"fmt"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
internalaws "github.com/influxdata/telegraf/internal/config/aws"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/internal/limiter"
"github.com/influxdata/telegraf/plugins/inputs"
)
Expand Down Expand Up @@ -166,7 +168,7 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
}

metricCount := len(metrics)
var errChan = make(chan error, metricCount)
errChan := errchan.New(metricCount)

now := time.Now()

Expand All @@ -175,18 +177,18 @@ func (c *CloudWatch) Gather(acc telegraf.Accumulator) error {
// http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
lmtr := limiter.NewRateLimiter(10, time.Second)
defer lmtr.Stop()
var wg sync.WaitGroup
wg.Add(len(metrics))
for _, m := range metrics {
<-lmtr.C
go c.gatherMetric(acc, m, now, errChan)
go func(inm *cloudwatch.Metric) {
defer wg.Done()
c.gatherMetric(acc, inm, now, errChan.C)
}(m)
}
wg.Wait()

for i := 1; i <= metricCount; i++ {
err := <-errChan
if err != nil {
return err
}
}
return nil
return errChan.Error()
}

func init() {
Expand Down
19 changes: 4 additions & 15 deletions plugins/inputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package elasticsearch

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
jsonparser "github.com/influxdata/telegraf/plugins/parsers/json"
)
Expand Down Expand Up @@ -102,7 +101,7 @@ func (e *Elasticsearch) Description() string {
// Gather reads the stats from Elasticsearch and writes it to the
// Accumulator.
func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
errChan := make(chan error, len(e.Servers))
errChan := errchan.New(len(e.Servers))
var wg sync.WaitGroup
wg.Add(len(e.Servers))

Expand All @@ -116,7 +115,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
url = s + statsPath
}
if err := e.gatherNodeStats(url, acc); err != nil {
errChan <- err
errChan.C <- err
return
}
if e.ClusterHealth {
Expand All @@ -126,17 +125,7 @@ func (e *Elasticsearch) Gather(acc telegraf.Accumulator) error {
}

wg.Wait()
close(errChan)
// Get all errors and return them as one giant error
errStrings := []string{}
for err := range errChan {
errStrings = append(errStrings, err.Error())
}

if len(errStrings) == 0 {
return nil
}
return errors.New(strings.Join(errStrings, "\n"))
return errChan.Error()
}

func (e *Elasticsearch) gatherNodeStats(url string, acc telegraf.Accumulator) error {
Expand Down
15 changes: 4 additions & 11 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
"github.com/influxdata/telegraf/plugins/parsers/nagios"
Expand Down Expand Up @@ -182,23 +183,15 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
}
}

e.errChan = make(chan error, len(commands))
errChan := errchan.New(len(commands))
e.errChan = errChan.C

e.wg.Add(len(commands))
for _, command := range commands {
go e.ProcessCommand(command, acc)
}
e.wg.Wait()

select {
default:
close(e.errChan)
return nil
case err := <-e.errChan:
close(e.errChan)
return err
}

return errChan.Error()
}

func init() {
Expand Down
16 changes: 8 additions & 8 deletions plugins/inputs/haproxy/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package haproxy
import (
"encoding/csv"
"fmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
"io"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)

//CSV format: https://cbonte.github.io/haproxy-dconv/configuration-1.5.html#9.1
Expand Down Expand Up @@ -114,18 +115,17 @@ func (g *haproxy) Gather(acc telegraf.Accumulator) error {
}

var wg sync.WaitGroup
errChan := errchan.New(len(g.Servers))
wg.Add(len(g.Servers))
for _, server := range g.Servers {
wg.Add(1)
go func(serv string) {
defer wg.Done()
if err := g.gatherServer(serv, acc); err != nil {
log.Printf("HAProxy error gathering server: %s, %s", serv, err)
}
errChan.C <- g.gatherServer(serv, acc)
}(server)
}

wg.Wait()
return nil
return errChan.Error()
}

func (g *haproxy) gatherServerSocket(addr string, acc telegraf.Accumulator) error {
Expand Down
22 changes: 11 additions & 11 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal/errchan"
"github.com/influxdata/telegraf/plugins/inputs"
)

Expand Down Expand Up @@ -129,20 +131,18 @@ func (r *RabbitMQ) Gather(acc telegraf.Accumulator) error {
}
}

var errChan = make(chan error, len(gatherFunctions))

var wg sync.WaitGroup
wg.Add(len(gatherFunctions))
errChan := errchan.New(len(gatherFunctions))
for _, f := range gatherFunctions {
go f(r, acc, errChan)
go func(gf gatherFunc) {
defer wg.Done()
gf(r, acc, errChan.C)
}(f)
}
wg.Wait()

for i := 1; i <= len(gatherFunctions); i++ {
err := <-errChan
if err != nil {
return err
}
}

return nil
return errChan.Error()
}

func (r *RabbitMQ) requestJSON(u string, target interface{}) error {
Expand Down

0 comments on commit 2c448e2

Please sign in to comment.