From 0efcca3c3302dfa1813ccf0fa3f341deb8e1a937 Mon Sep 17 00:00:00 2001 From: Steven Soroka Date: Thu, 2 Jul 2020 11:59:29 -0400 Subject: [PATCH] execd output (#7761) --- README.md | 4 +- internal/process/process.go | 90 ++++++++----- internal/process/process_posix.go | 29 ++--- internal/process/process_test.go | 74 +++++++++++ internal/process/process_windows.go | 16 +-- plugins/inputs/execd/execd_test.go | 71 +++++++--- plugins/outputs/all/all.go | 1 + plugins/outputs/execd/README.md | 26 ++++ plugins/outputs/execd/examples/file/file.sh | 5 + .../outputs/execd/examples/file/telegraf.conf | 9 ++ .../execd/examples/redis/redis_influx.rb | 19 +++ .../execd/examples/redis/redis_json.rb | 21 +++ .../execd/examples/redis/telegraf.conf | 15 +++ plugins/outputs/execd/execd.go | 121 ++++++++++++++++++ plugins/outputs/execd/execd_test.go | 113 ++++++++++++++++ 15 files changed, 539 insertions(+), 75 deletions(-) create mode 100644 internal/process/process_test.go create mode 100644 plugins/outputs/execd/README.md create mode 100644 plugins/outputs/execd/examples/file/file.sh create mode 100644 plugins/outputs/execd/examples/file/telegraf.conf create mode 100644 plugins/outputs/execd/examples/redis/redis_influx.rb create mode 100644 plugins/outputs/execd/examples/redis/redis_json.rb create mode 100644 plugins/outputs/execd/examples/redis/telegraf.conf create mode 100644 plugins/outputs/execd/execd.go create mode 100644 plugins/outputs/execd/execd_test.go diff --git a/README.md b/README.md index b42135fa2596e..8efbff77d2b5f 100644 --- a/README.md +++ b/README.md @@ -188,7 +188,7 @@ For documentation on the latest development code see the [documentation index][d * [ethtool](./plugins/inputs/ethtool) * [eventhub_consumer](./plugins/inputs/eventhub_consumer) (Azure Event Hubs \& Azure IoT Hub) * [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios) -* [execd](./plugins/inputs/execd) +* [execd](./plugins/inputs/execd) (generic executable "daemon" processes) * [fail2ban](./plugins/inputs/fail2ban) * [fibaro](./plugins/inputs/fibaro) * [file](./plugins/inputs/file) @@ -368,6 +368,7 @@ For documentation on the latest development code see the [documentation index][d * [dedup](/plugins/processors/dedup) * [defaults](/plugins/processors/defaults) * [enum](/plugins/processors/enum) +* [execd](/plugins/processors/execd) * [filepath](/plugins/processors/filepath) * [override](/plugins/processors/override) * [parser](/plugins/processors/parser) @@ -408,6 +409,7 @@ For documentation on the latest development code see the [documentation index][d * [discard](./plugins/outputs/discard) * [elasticsearch](./plugins/outputs/elasticsearch) * [exec](./plugins/outputs/exec) +* [execd](./plugins/outputs/execd) * [file](./plugins/outputs/file) * [graphite](./plugins/outputs/graphite) * [graylog](./plugins/outputs/graylog) diff --git a/internal/process/process.go b/internal/process/process.go index b7fd77b92d578..3f88aac57b317 100644 --- a/internal/process/process.go +++ b/internal/process/process.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os/exec" "sync" + "sync/atomic" "time" "github.com/influxdata/telegraf" @@ -24,6 +25,9 @@ type Process struct { RestartDelay time.Duration Log telegraf.Logger + name string + args []string + pid int32 cancel context.CancelFunc mainLoopWg sync.WaitGroup } @@ -36,32 +40,19 @@ func New(command []string) (*Process, error) { p := &Process{ RestartDelay: 5 * time.Second, - } - if len(command) > 1 { - p.Cmd = exec.Command(command[0], command[1:]...) - } else { - p.Cmd = exec.Command(command[0]) - } - var err error - p.Stdin, err = p.Cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("error opening stdin pipe: %w", err) + name: command[0], + args: []string{}, } - p.Stdout, err = p.Cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("error opening stdout pipe: %w", err) - } - - p.Stderr, err = p.Cmd.StderrPipe() - if err != nil { - return nil, fmt.Errorf("error opening stderr pipe: %w", err) + if len(command) > 1 { + p.args = command[1:] } return p, nil } -// Start the process +// Start the process. A &Process can only be started once. It will restart itself +// as necessary. func (p *Process) Start() error { ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel @@ -81,35 +72,54 @@ func (p *Process) Start() error { return nil } +// Stop is called when the process isn't needed anymore func (p *Process) Stop() { if p.cancel != nil { + // signal our intent to shutdown and not restart the process p.cancel() } + // close stdin so the app can shut down gracefully. + p.Stdin.Close() p.mainLoopWg.Wait() } func (p *Process) cmdStart() error { - p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args) + p.Cmd = exec.Command(p.name, p.args...) + + var err error + p.Stdin, err = p.Cmd.StdinPipe() + if err != nil { + return fmt.Errorf("error opening stdin pipe: %w", err) + } + + p.Stdout, err = p.Cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("error opening stdout pipe: %w", err) + } + + p.Stderr, err = p.Cmd.StderrPipe() + if err != nil { + return fmt.Errorf("error opening stderr pipe: %w", err) + } + + p.Log.Infof("Starting process: %s %s", p.name, p.args) if err := p.Cmd.Start(); err != nil { return fmt.Errorf("error starting process: %s", err) } - + atomic.StoreInt32(&p.pid, int32(p.Cmd.Process.Pid)) return nil } +func (p *Process) Pid() int { + pid := atomic.LoadInt32(&p.pid) + return int(pid) +} + // cmdLoop watches an already running process, restarting it when appropriate. func (p *Process) cmdLoop(ctx context.Context) error { - go func() { - <-ctx.Done() - if p.Stdin != nil { - p.Stdin.Close() - gracefulStop(p.Cmd, 5*time.Second) - } - }() - for { - err := p.cmdWait() + err := p.cmdWait(ctx) if isQuitting(ctx) { p.Log.Infof("Process %s shut down", p.Cmd.Path) return nil @@ -130,7 +140,8 @@ func (p *Process) cmdLoop(ctx context.Context) error { } } -func (p *Process) cmdWait() error { +// cmdWait waits for the process to finish. +func (p *Process) cmdWait(ctx context.Context) error { var wg sync.WaitGroup if p.ReadStdoutFn == nil { @@ -140,6 +151,9 @@ func (p *Process) cmdWait() error { p.ReadStderrFn = defaultReadPipe } + processCtx, processCancel := context.WithCancel(context.Background()) + defer processCancel() + wg.Add(1) go func() { p.ReadStdoutFn(p.Stdout) @@ -152,8 +166,20 @@ func (p *Process) cmdWait() error { wg.Done() }() + wg.Add(1) + go func() { + select { + case <-ctx.Done(): + gracefulStop(processCtx, p.Cmd, 5*time.Second) + case <-processCtx.Done(): + } + wg.Done() + }() + + err := p.Cmd.Wait() + processCancel() wg.Wait() - return p.Cmd.Wait() + return err } func isQuitting(ctx context.Context) bool { diff --git a/internal/process/process_posix.go b/internal/process/process_posix.go index f459e00e2fa6c..7b42b7da13214 100644 --- a/internal/process/process_posix.go +++ b/internal/process/process_posix.go @@ -3,26 +3,21 @@ package process import ( + "context" "os/exec" "syscall" "time" ) -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - time.AfterFunc(timeout, func() { - if cmd.ProcessState == nil { - return - } - if !cmd.ProcessState.Exited() { - cmd.Process.Signal(syscall.SIGTERM) - time.AfterFunc(timeout, func() { - if cmd.ProcessState == nil { - return - } - if !cmd.ProcessState.Exited() { - cmd.Process.Kill() - } - }) - } - }) +func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) { + select { + case <-time.After(timeout): + cmd.Process.Signal(syscall.SIGTERM) + case <-ctx.Done(): + } + select { + case <-time.After(timeout): + cmd.Process.Kill() + case <-ctx.Done(): + } } diff --git a/internal/process/process_test.go b/internal/process/process_test.go new file mode 100644 index 0000000000000..7a7c8c6f33fd6 --- /dev/null +++ b/internal/process/process_test.go @@ -0,0 +1,74 @@ +// +build !windows + +package process + +import ( + "bufio" + "flag" + "fmt" + "io" + "os" + "sync/atomic" + "syscall" + "testing" + "time" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +// test that a restarting process resets pipes properly +func TestRestartingRebindsPipes(t *testing.T) { + exe, err := os.Executable() + require.NoError(t, err) + + p, err := New([]string{exe, "-external"}) + p.RestartDelay = 100 * time.Nanosecond + p.Log = testutil.Logger{} + require.NoError(t, err) + + linesRead := int64(0) + p.ReadStdoutFn = func(r io.Reader) { + scanner := bufio.NewScanner(r) + + for scanner.Scan() { + atomic.AddInt64(&linesRead, 1) + } + } + + require.NoError(t, p.Start()) + + for atomic.LoadInt64(&linesRead) < 1 { + time.Sleep(1 * time.Millisecond) + } + + syscall.Kill(p.Pid(), syscall.SIGKILL) + + for atomic.LoadInt64(&linesRead) < 2 { + time.Sleep(1 * time.Millisecond) + } + + p.Stop() +} + +var external = flag.Bool("external", false, + "if true, run externalProcess instead of tests") + +func TestMain(m *testing.M) { + flag.Parse() + if *external { + externalProcess() + os.Exit(0) + } + code := m.Run() + os.Exit(code) +} + +// externalProcess is an external "misbehaving" process that won't exit +// cleanly. +func externalProcess() { + wait := make(chan int, 0) + fmt.Fprintln(os.Stdout, "started") + <-wait + os.Exit(2) +} diff --git a/internal/process/process_windows.go b/internal/process/process_windows.go index fc110841561f9..0995d52469b07 100644 --- a/internal/process/process_windows.go +++ b/internal/process/process_windows.go @@ -3,17 +3,15 @@ package process import ( + "context" "os/exec" "time" ) -func gracefulStop(cmd *exec.Cmd, timeout time.Duration) { - time.AfterFunc(timeout, func() { - if cmd.ProcessState == nil { - return - } - if !cmd.ProcessState.Exited() { - cmd.Process.Kill() - } - }) +func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) { + select { + case <-time.After(timeout): + cmd.Process.Kill() + case <-ctx.Done(): + } } diff --git a/plugins/inputs/execd/execd_test.go b/plugins/inputs/execd/execd_test.go index ce046568c6dcf..2a26cfe56bcd6 100644 --- a/plugins/inputs/execd/execd_test.go +++ b/plugins/inputs/execd/execd_test.go @@ -3,18 +3,23 @@ package execd import ( + "bufio" + "flag" "fmt" + "os" "strings" "testing" "time" "github.com/influxdata/telegraf/agent" "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/models" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/plugins/serializers" "github.com/influxdata/telegraf" ) @@ -23,13 +28,16 @@ func TestExternalInputWorks(t *testing.T) { influxParser, err := parsers.NewInfluxParser() require.NoError(t, err) + exe, err := os.Executable() + require.NoError(t, err) + e := &Execd{ - Command: []string{shell(), fileShellScriptPath()}, + Command: []string{exe, "-counter"}, RestartDelay: config.Duration(5 * time.Second), parser: influxParser, Signal: "STDIN", + Log: testutil.Logger{}, } - e.Log = testutil.Logger{} metrics := make(chan telegraf.Metric, 10) defer close(metrics) @@ -43,12 +51,10 @@ func TestExternalInputWorks(t *testing.T) { e.Stop() - require.Equal(t, "counter_bash", m.Name()) + require.Equal(t, "counter", m.Name()) val, ok := m.GetField("count") require.True(t, ok) - require.Equal(t, float64(0), val) - // test that a later gather will not panic - e.Gather(acc) + require.EqualValues(t, 0, val) } func TestParsesLinesContainingNewline(t *testing.T) { @@ -60,13 +66,12 @@ func TestParsesLinesContainingNewline(t *testing.T) { acc := agent.NewAccumulator(&TestMetricMaker{}, metrics) e := &Execd{ - Command: []string{shell(), fileShellScriptPath()}, RestartDelay: config.Duration(5 * time.Second), parser: parser, Signal: "STDIN", acc: acc, + Log: testutil.Logger{}, } - e.Log = testutil.Logger{} cases := []struct { Name string @@ -109,14 +114,6 @@ func readChanWithTimeout(t *testing.T, metrics chan telegraf.Metric, timeout tim return nil } -func fileShellScriptPath() string { - return "./examples/count.sh" -} - -func shell() string { - return "sh" -} - type TestMetricMaker struct{} func (tm *TestMetricMaker) Name() string { @@ -134,3 +131,45 @@ func (tm *TestMetricMaker) MakeMetric(metric telegraf.Metric) telegraf.Metric { func (tm *TestMetricMaker) Log() telegraf.Logger { return models.NewLogger("TestPlugin", "test", "") } + +var counter = flag.Bool("counter", false, + "if true, act like line input program instead of test") + +func TestMain(m *testing.M) { + flag.Parse() + if *counter { + runCounterProgram() + os.Exit(0) + } + code := m.Run() + os.Exit(code) +} + +func runCounterProgram() { + i := 0 + serializer, err := serializers.NewInfluxSerializer() + if err != nil { + fmt.Fprintln(os.Stderr, "ERR InfluxSerializer failed to load") + os.Exit(1) + } + + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + metric, _ := metric.New("counter", + map[string]string{}, + map[string]interface{}{ + "count": i, + }, + time.Now(), + ) + i++ + + b, err := serializer.Serialize(metric) + if err != nil { + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + fmt.Fprint(os.Stdout, string(b)) + } + +} diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 7d37c2208b092..c7f28bdb91e19 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -12,6 +12,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/discard" _ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch" _ "github.com/influxdata/telegraf/plugins/outputs/exec" + _ "github.com/influxdata/telegraf/plugins/outputs/execd" _ "github.com/influxdata/telegraf/plugins/outputs/file" _ "github.com/influxdata/telegraf/plugins/outputs/graphite" _ "github.com/influxdata/telegraf/plugins/outputs/graylog" diff --git a/plugins/outputs/execd/README.md b/plugins/outputs/execd/README.md new file mode 100644 index 0000000000000..24613d5379422 --- /dev/null +++ b/plugins/outputs/execd/README.md @@ -0,0 +1,26 @@ +# Execd Output Plugin + +The `execd` plugin runs an external program as a daemon. + +### Configuration: + +```toml +[[outputs.execd]] + ## Program to run as daemon + command = ["my-telegraf-output", "--some-flag", "value"] + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" + + ## Data format to export. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +``` + +### Example + +see [examples][] + +[examples]: https://github.com/influxdata/telegraf/blob/master/plugins/outputs/execd/examples/ \ No newline at end of file diff --git a/plugins/outputs/execd/examples/file/file.sh b/plugins/outputs/execd/examples/file/file.sh new file mode 100644 index 0000000000000..4cdd0279b6d2a --- /dev/null +++ b/plugins/outputs/execd/examples/file/file.sh @@ -0,0 +1,5 @@ +# Usage: sh file.sh output_filename.ext +# reads from stdin and writes out to a file named on the command line. +while read line; do + echo "$line" >> $1 +done < /dev/stdin diff --git a/plugins/outputs/execd/examples/file/telegraf.conf b/plugins/outputs/execd/examples/file/telegraf.conf new file mode 100644 index 0000000000000..0259e95f5cb85 --- /dev/null +++ b/plugins/outputs/execd/examples/file/telegraf.conf @@ -0,0 +1,9 @@ +[agent] + interval = "1s" + +[[inputs.execd]] + command = ["ruby", "plugins/inputs/execd/examples/count.rb"] + +[[outputs.execd]] + command = ["sh", "plugins/outputs/execd/examples/file/file.sh"] + data_format = "json" diff --git a/plugins/outputs/execd/examples/redis/redis_influx.rb b/plugins/outputs/execd/examples/redis/redis_influx.rb new file mode 100644 index 0000000000000..9bac10e67b802 --- /dev/null +++ b/plugins/outputs/execd/examples/redis/redis_influx.rb @@ -0,0 +1,19 @@ +#!/usr/bin/env ruby +# +# An example of funneling metrics to Redis pub/sub. +# +# to run this, you may need to: +# gem install redis +# +require 'redis' + +r = Redis.new(host: "127.0.0.1", port: 6379, db: 1) + +loop do + # example input: "counter_ruby count=0 1591741648101185000" + line = STDIN.readline.chomp + + key = line.split(" ")[0] + key = key.split(",")[0] + r.publish(key, line) +end diff --git a/plugins/outputs/execd/examples/redis/redis_json.rb b/plugins/outputs/execd/examples/redis/redis_json.rb new file mode 100644 index 0000000000000..e0939634b6575 --- /dev/null +++ b/plugins/outputs/execd/examples/redis/redis_json.rb @@ -0,0 +1,21 @@ +#!/usr/bin/env ruby +# +# An example of funneling metrics to Redis pub/sub. +# +# to run this, you may need to: +# gem install redis +# +require 'redis' +require 'json' + +r = Redis.new(host: "127.0.0.1", port: 6379, db: 1) + +loop do + # example input: "{"fields":{"count":0},"name":"counter_ruby","tags":{"host":"localhost"},"timestamp":1586374982}" + line = STDIN.readline.chomp + + l = JSON.parse(line) + + key = l["name"] + r.publish(key, line) +end diff --git a/plugins/outputs/execd/examples/redis/telegraf.conf b/plugins/outputs/execd/examples/redis/telegraf.conf new file mode 100644 index 0000000000000..765930c7a9dc7 --- /dev/null +++ b/plugins/outputs/execd/examples/redis/telegraf.conf @@ -0,0 +1,15 @@ +[agent] + flush_interval = "1s" + interval = "1s" + +[[inputs.execd]] + command = ["ruby", "plugins/inputs/execd/examples/count.rb"] + signal = "none" + +[[outputs.execd]] + command = ["ruby", "plugins/outputs/execd/examples/redis/redis_influx.rb"] + data_format = "influx" + +# [[outputs.file]] +# files = ["stdout"] +# data_format = "influx" diff --git a/plugins/outputs/execd/execd.go b/plugins/outputs/execd/execd.go new file mode 100644 index 0000000000000..432fc71f6866e --- /dev/null +++ b/plugins/outputs/execd/execd.go @@ -0,0 +1,121 @@ +package execd + +import ( + "bufio" + "fmt" + "io" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/internal/process" + "github.com/influxdata/telegraf/plugins/outputs" + "github.com/influxdata/telegraf/plugins/serializers" +) + +const sampleConfig = ` + ## Program to run as daemon + command = ["my-telegraf-output", "--some-flag", "value"] + + ## Delay before the process is restarted after an unexpected termination + restart_delay = "10s" + + ## Data format to export. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md + data_format = "influx" +` + +type Execd struct { + Command []string `toml:"command"` + RestartDelay config.Duration `toml:"restart_delay"` + Log telegraf.Logger + + process *process.Process + serializer serializers.Serializer +} + +func (e *Execd) SampleConfig() string { + return sampleConfig +} + +func (e *Execd) Description() string { + return "Run executable as long-running output plugin" +} + +func (e *Execd) SetSerializer(s serializers.Serializer) { + e.serializer = s +} + +func (e *Execd) Init() error { + if len(e.Command) == 0 { + return fmt.Errorf("no command specified") + } + + var err error + + e.process, err = process.New(e.Command) + if err != nil { + return fmt.Errorf("error creating process %s: %w", e.Command, err) + } + e.process.Log = e.Log + e.process.RestartDelay = time.Duration(e.RestartDelay) + e.process.ReadStdoutFn = e.cmdReadOut + e.process.ReadStderrFn = e.cmdReadErr + + return nil +} + +func (e *Execd) Connect() error { + if err := e.process.Start(); err != nil { + return fmt.Errorf("failed to start process %s: %w", e.Command, err) + } + + return nil +} + +func (e *Execd) Close() error { + e.process.Stop() + return nil +} + +func (e *Execd) Write(metrics []telegraf.Metric) error { + for _, m := range metrics { + b, err := e.serializer.Serialize(m) + if err != nil { + return fmt.Errorf("error serializing metrics: %s", err) + } + + if _, err = e.process.Stdin.Write(b); err != nil { + return fmt.Errorf("error writing metrics %s", err) + } + } + return nil +} + +func (e *Execd) cmdReadErr(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + e.Log.Errorf("stderr: %s", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + e.Log.Errorf("Error reading stderr: %s", err) + } +} + +func (e *Execd) cmdReadOut(out io.Reader) { + scanner := bufio.NewScanner(out) + + for scanner.Scan() { + e.Log.Info(scanner.Text()) + } +} + +func init() { + outputs.Add("execd", func() telegraf.Output { + return &Execd{} + }) +} diff --git a/plugins/outputs/execd/execd_test.go b/plugins/outputs/execd/execd_test.go new file mode 100644 index 0000000000000..46bde795ec2ed --- /dev/null +++ b/plugins/outputs/execd/execd_test.go @@ -0,0 +1,113 @@ +package execd + +import ( + "bufio" + "flag" + "fmt" + "io" + "os" + "strings" + "sync" + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/plugins/parsers/influx" + "github.com/influxdata/telegraf/plugins/serializers" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +var now = time.Date(2020, 6, 30, 16, 16, 0, 0, time.UTC) + +func TestExternalOutputWorks(t *testing.T) { + influxSerializer, err := serializers.NewInfluxSerializer() + require.NoError(t, err) + + exe, err := os.Executable() + require.NoError(t, err) + + e := &Execd{ + Command: []string{exe, "-testoutput"}, + RestartDelay: config.Duration(5 * time.Second), + serializer: influxSerializer, + Log: testutil.Logger{}, + } + + require.NoError(t, e.Init()) + + wg := &sync.WaitGroup{} + wg.Add(1) + e.process.ReadStderrFn = func(rstderr io.Reader) { + scanner := bufio.NewScanner(rstderr) + + for scanner.Scan() { + t.Errorf("stderr: %q", scanner.Text()) + } + + if err := scanner.Err(); err != nil { + if !strings.HasSuffix(err.Error(), "already closed") { + t.Errorf("error reading stderr: %v", err) + } + } + wg.Done() + } + + m, err := metric.New( + "cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + require.NoError(t, err) + + require.NoError(t, e.Connect()) + require.NoError(t, e.Write([]telegraf.Metric{m})) + require.NoError(t, e.Close()) + wg.Wait() +} + +var testoutput = flag.Bool("testoutput", false, + "if true, act like line input program instead of test") + +func TestMain(m *testing.M) { + flag.Parse() + if *testoutput { + runOutputConsumerProgram() + os.Exit(0) + } + code := m.Run() + os.Exit(code) +} + +func runOutputConsumerProgram() { + parser := influx.NewStreamParser(os.Stdin) + + for { + metric, err := parser.Next() + if err != nil { + if err == influx.EOF { + return // stream ended + } + if parseErr, isParseError := err.(*influx.ParseError); isParseError { + fmt.Fprintf(os.Stderr, "parse ERR %v\n", parseErr) + os.Exit(1) + } + fmt.Fprintf(os.Stderr, "ERR %v\n", err) + os.Exit(1) + } + + expected := testutil.MustMetric("cpu", + map[string]string{"name": "cpu1"}, + map[string]interface{}{"idle": 50, "sys": 30}, + now, + ) + + if !testutil.MetricEqual(expected, metric) { + fmt.Fprintf(os.Stderr, "metric doesn't match expected\n") + os.Exit(1) + } + } +}