Skip to content

Commit

Permalink
execd output (influxdata#7761)
Browse files Browse the repository at this point in the history
  • Loading branch information
ssoroka authored Jul 2, 2020
1 parent 1b1382c commit 0efcca3
Show file tree
Hide file tree
Showing 15 changed files with 539 additions and 75 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ For documentation on the latest development code see the [documentation index][d
* [ethtool](./plugins/inputs/ethtool)
* [eventhub_consumer](./plugins/inputs/eventhub_consumer) (Azure Event Hubs \& Azure IoT Hub)
* [exec](./plugins/inputs/exec) (generic executable plugin, support JSON, influx, graphite and nagios)
* [execd](./plugins/inputs/execd)
* [execd](./plugins/inputs/execd) (generic executable "daemon" processes)
* [fail2ban](./plugins/inputs/fail2ban)
* [fibaro](./plugins/inputs/fibaro)
* [file](./plugins/inputs/file)
Expand Down Expand Up @@ -368,6 +368,7 @@ For documentation on the latest development code see the [documentation index][d
* [dedup](/plugins/processors/dedup)
* [defaults](/plugins/processors/defaults)
* [enum](/plugins/processors/enum)
* [execd](/plugins/processors/execd)
* [filepath](/plugins/processors/filepath)
* [override](/plugins/processors/override)
* [parser](/plugins/processors/parser)
Expand Down Expand Up @@ -408,6 +409,7 @@ For documentation on the latest development code see the [documentation index][d
* [discard](./plugins/outputs/discard)
* [elasticsearch](./plugins/outputs/elasticsearch)
* [exec](./plugins/outputs/exec)
* [execd](./plugins/outputs/execd)
* [file](./plugins/outputs/file)
* [graphite](./plugins/outputs/graphite)
* [graylog](./plugins/outputs/graylog)
Expand Down
90 changes: 58 additions & 32 deletions internal/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os/exec"
"sync"
"sync/atomic"
"time"

"github.com/influxdata/telegraf"
Expand All @@ -24,6 +25,9 @@ type Process struct {
RestartDelay time.Duration
Log telegraf.Logger

name string
args []string
pid int32
cancel context.CancelFunc
mainLoopWg sync.WaitGroup
}
Expand All @@ -36,32 +40,19 @@ func New(command []string) (*Process, error) {

p := &Process{
RestartDelay: 5 * time.Second,
}
if len(command) > 1 {
p.Cmd = exec.Command(command[0], command[1:]...)
} else {
p.Cmd = exec.Command(command[0])
}
var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdin pipe: %w", err)
name: command[0],
args: []string{},
}

p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error opening stdout pipe: %w", err)
}

p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return nil, fmt.Errorf("error opening stderr pipe: %w", err)
if len(command) > 1 {
p.args = command[1:]
}

return p, nil
}

// Start the process
// Start the process. A &Process can only be started once. It will restart itself
// as necessary.
func (p *Process) Start() error {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel
Expand All @@ -81,35 +72,54 @@ func (p *Process) Start() error {
return nil
}

// Stop is called when the process isn't needed anymore
func (p *Process) Stop() {
if p.cancel != nil {
// signal our intent to shutdown and not restart the process
p.cancel()
}
// close stdin so the app can shut down gracefully.
p.Stdin.Close()
p.mainLoopWg.Wait()
}

func (p *Process) cmdStart() error {
p.Log.Infof("Starting process: %s %s", p.Cmd.Path, p.Cmd.Args)
p.Cmd = exec.Command(p.name, p.args...)

var err error
p.Stdin, err = p.Cmd.StdinPipe()
if err != nil {
return fmt.Errorf("error opening stdin pipe: %w", err)
}

p.Stdout, err = p.Cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("error opening stdout pipe: %w", err)
}

p.Stderr, err = p.Cmd.StderrPipe()
if err != nil {
return fmt.Errorf("error opening stderr pipe: %w", err)
}

p.Log.Infof("Starting process: %s %s", p.name, p.args)

if err := p.Cmd.Start(); err != nil {
return fmt.Errorf("error starting process: %s", err)
}

atomic.StoreInt32(&p.pid, int32(p.Cmd.Process.Pid))
return nil
}

func (p *Process) Pid() int {
pid := atomic.LoadInt32(&p.pid)
return int(pid)
}

// cmdLoop watches an already running process, restarting it when appropriate.
func (p *Process) cmdLoop(ctx context.Context) error {
go func() {
<-ctx.Done()
if p.Stdin != nil {
p.Stdin.Close()
gracefulStop(p.Cmd, 5*time.Second)
}
}()

for {
err := p.cmdWait()
err := p.cmdWait(ctx)
if isQuitting(ctx) {
p.Log.Infof("Process %s shut down", p.Cmd.Path)
return nil
Expand All @@ -130,7 +140,8 @@ func (p *Process) cmdLoop(ctx context.Context) error {
}
}

func (p *Process) cmdWait() error {
// cmdWait waits for the process to finish.
func (p *Process) cmdWait(ctx context.Context) error {
var wg sync.WaitGroup

if p.ReadStdoutFn == nil {
Expand All @@ -140,6 +151,9 @@ func (p *Process) cmdWait() error {
p.ReadStderrFn = defaultReadPipe
}

processCtx, processCancel := context.WithCancel(context.Background())
defer processCancel()

wg.Add(1)
go func() {
p.ReadStdoutFn(p.Stdout)
Expand All @@ -152,8 +166,20 @@ func (p *Process) cmdWait() error {
wg.Done()
}()

wg.Add(1)
go func() {
select {
case <-ctx.Done():
gracefulStop(processCtx, p.Cmd, 5*time.Second)
case <-processCtx.Done():
}
wg.Done()
}()

err := p.Cmd.Wait()
processCancel()
wg.Wait()
return p.Cmd.Wait()
return err
}

func isQuitting(ctx context.Context) bool {
Expand Down
29 changes: 12 additions & 17 deletions internal/process/process_posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,21 @@
package process

import (
"context"
"os/exec"
"syscall"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Signal(syscall.SIGTERM)
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
}
})
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
select {
case <-time.After(timeout):
cmd.Process.Signal(syscall.SIGTERM)
case <-ctx.Done():
}
select {
case <-time.After(timeout):
cmd.Process.Kill()
case <-ctx.Done():
}
}
74 changes: 74 additions & 0 deletions internal/process/process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// +build !windows

package process

import (
"bufio"
"flag"
"fmt"
"io"
"os"
"sync/atomic"
"syscall"
"testing"
"time"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/require"
)

// test that a restarting process resets pipes properly
func TestRestartingRebindsPipes(t *testing.T) {
exe, err := os.Executable()
require.NoError(t, err)

p, err := New([]string{exe, "-external"})
p.RestartDelay = 100 * time.Nanosecond
p.Log = testutil.Logger{}
require.NoError(t, err)

linesRead := int64(0)
p.ReadStdoutFn = func(r io.Reader) {
scanner := bufio.NewScanner(r)

for scanner.Scan() {
atomic.AddInt64(&linesRead, 1)
}
}

require.NoError(t, p.Start())

for atomic.LoadInt64(&linesRead) < 1 {
time.Sleep(1 * time.Millisecond)
}

syscall.Kill(p.Pid(), syscall.SIGKILL)

for atomic.LoadInt64(&linesRead) < 2 {
time.Sleep(1 * time.Millisecond)
}

p.Stop()
}

var external = flag.Bool("external", false,
"if true, run externalProcess instead of tests")

func TestMain(m *testing.M) {
flag.Parse()
if *external {
externalProcess()
os.Exit(0)
}
code := m.Run()
os.Exit(code)
}

// externalProcess is an external "misbehaving" process that won't exit
// cleanly.
func externalProcess() {
wait := make(chan int, 0)
fmt.Fprintln(os.Stdout, "started")
<-wait
os.Exit(2)
}
16 changes: 7 additions & 9 deletions internal/process/process_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
package process

import (
"context"
"os/exec"
"time"
)

func gracefulStop(cmd *exec.Cmd, timeout time.Duration) {
time.AfterFunc(timeout, func() {
if cmd.ProcessState == nil {
return
}
if !cmd.ProcessState.Exited() {
cmd.Process.Kill()
}
})
func gracefulStop(ctx context.Context, cmd *exec.Cmd, timeout time.Duration) {
select {
case <-time.After(timeout):
cmd.Process.Kill()
case <-ctx.Done():
}
}
Loading

0 comments on commit 0efcca3

Please sign in to comment.