Skip to content

Commit

Permalink
removed retry limit. subagent will not restart if it exits with error…
Browse files Browse the repository at this point in the history
…s. When one subagent stops running, the remaining binaries will terminate as well. This is done through cancelling the parent context.
  • Loading branch information
XuechunHou committed Jan 23, 2025
1 parent 7125450 commit fbc10a5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 51 deletions.
42 changes: 15 additions & 27 deletions cmd/ops_agent_uap_plugin/service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ const (
FluentBitRuntimeDirectory = "run/google-cloud-ops-agent-fluent-bit"
OtelRuntimeDirectory = "run/google-cloud-ops-agent-opentelemetry-collector"
DefaultPluginStateDirectory = "/var/lib/google-guest-agent/plugins/ops-agent-plugin"
RestartLimit = 10 // if a command crashes more than restartLimit times consecutively, the command will no longer be restarted.
)

var (
AgentServiceNameRegex = regexp.MustCompile(`[\w-]+\.service`)
AgentSystemdServiceNames = []string{"google-cloud-ops-agent.service", "stackdriver-agent.service", "google-fluentd.service"}
)

// RestartCommandFunc defines a function type that always restarts a command until the command is terminated by signals or the retry is exhausted. This abstraction is introduced
// RestartCommandFunc defines a function type that always restarts a command until the command is exited with errors. This abstraction is introduced
// primarily to facilitate testing by allowing the injection of mock
// implementations.
type RestartCommandFunc func(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, remainingRetry int, totalRetry int, wg *sync.WaitGroup)
type RestartCommandFunc func(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup)

// Apply applies the config sent or performs the work defined in the message.
// ApplyRequest is opaque to the agent and is expected to be well known contract
Expand Down Expand Up @@ -169,15 +168,15 @@ func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginBaseLoca
"-config", OpsAgentConfigLocationLinux,
)
wg.Add(1)
go restartCommand(ctx, cancel, runDiagnosticsCmd, runCommand, RestartLimit, RestartLimit, &wg)
go restartCommand(ctx, cancel, runDiagnosticsCmd, runCommand, &wg)

// Starting Otel
runOtelCmd := exec.CommandContext(ctx,
path.Join(pluginBaseLocation, OtelBinary),
"--config", path.Join(pluginBaseLocation, OtelRuntimeDirectory, "otel.yaml"),
)
wg.Add(1)
go restartCommand(ctx, cancel, runOtelCmd, runCommand, RestartLimit, RestartLimit, &wg)
go restartCommand(ctx, cancel, runOtelCmd, runCommand, &wg)

// Starting FluentBit
runFluentBitCmd := exec.CommandContext(ctx,
Expand All @@ -190,12 +189,12 @@ func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginBaseLoca
"--storage_path", path.Join(pluginBaseLocation, FluentBitStateDiectory, "buffers"),
)
wg.Add(1)
go restartCommand(ctx, cancel, runFluentBitCmd, runCommand, RestartLimit, RestartLimit, &wg)
go restartCommand(ctx, cancel, runFluentBitCmd, runCommand, &wg)

wg.Wait()
}

func restartCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, remainingRetry int, totalRetry int, wg *sync.WaitGroup) {
func restartCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) {
defer wg.Done()
if cmd == nil {
return
Expand All @@ -205,33 +204,23 @@ func restartCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cm
log.Printf("cannot execute command: %s, because the context has been cancelled", cmd.Args)
return
}
if remainingRetry == 0 {
log.Printf("out of retries, command: %s is not restarted", cmd.Args)
cancel()
return
}

childCtx, childCtxCancel := context.WithCancel(ctx)
defer childCtxCancel()

retryCount := remainingRetry
cmd = exec.CommandContext(childCtx, cmd.Path, cmd.Args[1:]...)
output, err := runCommand(cmd)
cmdCopy := exec.CommandContext(childCtx, cmd.Path, cmd.Args[1:]...)
cmdCopy.Env = cmd.Env
output, err := runCommand(cmdCopy)
if err != nil {
// https://pkg.go.dev/os#ProcessState.ExitCode Don't restart if the command was terminated by signals.
if exiterr, ok := err.(*exec.ExitError); ok && exiterr.ProcessState.ExitCode() == -1 {
log.Printf("command: %s terminated by signals, not restarting.\nCommand output: %s\n Command error:%s", cmd.Args, string(output), err)
return
}
retryCount -= 1

log.Printf("command: %s exited with errors, not restarting.\nCommand output: %s\n Command error:%s", cmd.Args, string(output), err)
cancel() // cancels the parent context which also stops other Ops Agent sub-binaries from running.
return
} else {
log.Printf("command: %s exited successfully.\nCommand output: %s", cmd.Args, string(output))
retryCount = totalRetry
log.Printf("command: %s %s exited successfully.\nCommand output: %s", cmd.Path, cmd.Args, string(output))
}

wg.Add(1)
go restartCommand(ctx, cancel, cmd, runCommand, retryCount, totalRetry, wg)
go restartCommand(ctx, cancel, cmd, runCommand, wg)
}

// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
Expand Down Expand Up @@ -263,9 +252,8 @@ func runCommand(cmd *exec.Cmd) (string, error) {
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Command %s failed, \ncommand output: %s\ncommand error: %s", cmd.Args, string(out), err)
return string(out), err
}
return string(out), nil
return string(out), err
}

func validateOpsAgentConfig(ctx context.Context, runCommand RunCommandFunc, pluginBaseLocation string) error {
Expand Down
43 changes: 19 additions & 24 deletions cmd/ops_agent_uap_plugin/service_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,17 @@ import (
)

func Test_runCommand(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
ctx := context.Background()
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
_, err := runCommand(cmd)
if err != nil {
t.Errorf("runCommand got unexpected error: %v", err)
}
}
func Test_runCommandFailure(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
ctx := context.Background()
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}
if _, err := runCommand(cmd); err == nil {
t.Error("runCommand got nil error, want exec failure")
Expand Down Expand Up @@ -284,24 +286,21 @@ func TestGetStatus(t *testing.T) {
}
}

func Test_restartCommand_CancelContextWhenNoAttemptLeft(t *testing.T) {
func Test_restartCommand_CancelContextWhenCmdExitsWithErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}
var wg sync.WaitGroup
wg.Add(1)
mockRunCommandFunc := func(cmd *exec.Cmd) (string, error) {
return "", nil
}
restartCommand(ctx, cancel, cmd, mockRunCommandFunc, 0, 10, &wg)
restartCommand(ctx, cancel, cmd, runCommand, &wg)
if ctx.Err() == nil {
t.Error("restartCommand() did not cancel context but should")
}
}

func Test_restartCommand_DoNotCancelContextWhenCmdTerminatedBySignals(t *testing.T) {
func Test_restartCommand_CancelContextWhenCmdTerminatedBySignals(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_KILL_BY_SIGNALS=1"}
var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -314,27 +313,23 @@ func Test_restartCommand_DoNotCancelContextWhenCmdTerminatedBySignals(t *testing
err := cmd.Wait()
return "", err
}
restartCommand(ctx, cancel, cmd, mockRunCommandFunc, 1, 10, &wg)
if ctx.Err() != nil {
t.Error("restartCommand() canceled the context but shouldn't")
restartCommand(ctx, cancel, cmd, mockRunCommandFunc, &wg)
if ctx.Err() == nil {
t.Error("restartCommand() didn't cancel the context but should")
}
}

func Test_runSubagents_TerminatesWhenSpawnedGoRoutinesReturn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockCmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
mockCmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
mockCmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
mockCmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}

mockRunCommandFunc := func(cmd *exec.Cmd) (string, error) {
output, err := cmd.CombinedOutput()
t.Logf("%s %s", output, err)
return string(output), err
}
mockRestartCommandFunc := func(ctx context.Context, cancel context.CancelFunc, _ *exec.Cmd, runCommand RunCommandFunc, _ int, totalRetry int, wg *sync.WaitGroup) {
restartCommand(ctx, cancel, mockCmd, runCommand, 0, totalRetry, wg)
mockRestartCommandFunc := func(ctx context.Context, cancel context.CancelFunc, _ *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) {
restartCommand(ctx, cancel, mockCmd, runCommand, wg)
}
cancel() // child go routines return immediately, because the parent context has been cancelled.
// the test times out and fails if runSubagents does not returns
runSubagents(ctx, cancel, "", mockRestartCommandFunc, mockRunCommandFunc)
runSubagents(ctx, cancel, "", mockRestartCommandFunc, runCommand)
}

// TestHelperProcess isn't a real test. It's used as a helper process to mock
Expand Down

0 comments on commit fbc10a5

Please sign in to comment.