Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ops Agent Plugin - Subagents Startup Logic #1864

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions builds/ops_agent_plugin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@

set -x -e
DESTDIR=$1
mkdir -p "$DESTDIR/opt/google-cloud-ops-agent/libexec"
go build -buildvcs=false -ldflags "-s -w" -o "$DESTDIR/opt/google-cloud-ops-agent/libexec/google_cloud_ops_agent_uap_plugin" \
mkdir -p "$DESTDIR/opt/google-cloud-ops-agent"
go build -buildvcs=false -ldflags "-s -w" -o "$DESTDIR/opt/google-cloud-ops-agent/plugin" \
github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_plugin
7 changes: 2 additions & 5 deletions cmd/ops_agent_uap_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os/exec"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_plugin/google_guest_agent/plugin"
)
Expand Down Expand Up @@ -84,11 +85,7 @@ func main() {
// offered mean Guest Agent was successful in installing/launching the plugin
// & will manage the lifecycle (start, stop, or revision change) here onwards.
pb.RegisterGuestAgentPluginServer(server, ps)

ctx := context.Background()
ps.GetStatus(ctx, &pb.GetStatusRequest{})
ps.Start(ctx, &pb.StartRequest{})

reflection.Register(server)
if err := server.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "Exiting, cannot continue serving: %v\n", err)
os.Exit(1)
Expand Down
153 changes: 138 additions & 15 deletions cmd/ops_agent_uap_plugin/service_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"

"google.golang.org/grpc/status"
Expand All @@ -34,18 +38,28 @@ import (
const (
OpsAgentConfigLocationLinux = "/etc/google-cloud-ops-agent/config.yaml"
ConfGeneratorBinary = "libexec/google_cloud_ops_agent_engine"
DiagnosticsBinary = "libexec/google_cloud_ops_agent_diagnostics"
AgentWrapperBinary = "libexec/google_cloud_ops_agent_wrapper"
FluentbitBinary = "subagents/fluent-bit/bin/fluent-bit"
OtelBinary = "subagents/opentelemetry-collector/otelopscol"

LogsDirectory = "log/google-cloud-ops-agent"
FluentBitStateDiectory = "state/fluent-bit"
FluentBitRuntimeDirectory = "run/google-cloud-ops-agent-fluent-bit"
OtelRuntimeDirectory = "run/google-cloud-ops-agent-opentelemetry-collector"
DefaultPluginStateDirectory = "/var/lib/google-guest-agent/plugins/ops-agent-plugin"
DefaultPluginStateDirectory = "/var/lib/google-guest-agent/agent_state/plugins/ops-agent-plugin"
)

var (
AgentServiceNameRegex = regexp.MustCompile(`[\w-]+\.service`)
AgentSystemdServiceNames = []string{"google-cloud-ops-agent.service", "stackdriver-agent.service", "google-fluentd.service"}
)

// RestartCommandFunc defines a function type that always restarts a command until the command is exited with errors. This abstraction is introduced
// primarily to facilitate testing by allowing the injection of mock
// implementations.
type RestartCommandFunc func(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup)

// Apply applies the config sent or performs the work defined in the message.
// ApplyRequest is opaque to the agent and is expected to be well known contract
// between Plugin and the server itself. For e.g. service might want to update
Expand All @@ -67,12 +81,24 @@ func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest)
pContext, cancel := context.WithCancel(context.Background())
ps.cancel = cancel

pluginInstallaPath, err := os.Executable()
if err != nil {
log.Printf("Start() failed, because it cannot determine the plugin install location: %s", err)
return nil, status.Error(1, err.Error())
}
pluginInstallaPath, err = filepath.EvalSymlinks(pluginInstallaPath)
if err != nil {
log.Printf("Start() failed, because it cannot determine the plugin install location: %s", err)
status.Error(1, err.Error())
}
pluginInstallDir := filepath.Dir(pluginInstallaPath)

pluginStateDir := msg.GetConfig().GetStateDirectoryPath()
if pluginStateDir == "" {
pluginStateDir = DefaultPluginStateDirectory
}

// Find pre-existent ops agent installation, and conflicting legacy agent installation.
// Find existing ops agent installation, and conflicting legacy agent installation.
foundConflictingInstallations, err := findPreExistentAgents(pContext, ps.runCommand, AgentSystemdServiceNames)
if foundConflictingInstallations || err != nil {
ps.cancel()
Expand All @@ -82,21 +108,22 @@ func (ps *OpsAgentPluginServer) Start(ctx context.Context, msg *pb.StartRequest)
}

// Ops Agent config validation
if err := validateOpsAgentConfig(pContext, ps.runCommand, pluginStateDir); err != nil {
if err := validateOpsAgentConfig(pContext, pluginInstallDir, ps.runCommand); err != nil {
log.Printf("Start() failed: %s", err)
ps.cancel()
ps.cancel = nil
return nil, status.Errorf(1, "failed to validate Ops Agent config: %s", err)
}
// Subagent config generation
if err := generateSubagentConfigs(pContext, ps.runCommand, pluginStateDir); err != nil {
if err := generateSubagentConfigs(pContext, ps.runCommand, pluginInstallDir, pluginStateDir); err != nil {
log.Printf("Start() failed: %s", err)
ps.cancel()
ps.cancel = nil
return nil, status.Errorf(1, "failed to generate subagent configs: %s", err)
}

// Sub-agent startup functionality is not yet implemented and will be added.
// the diagnostics service and subagent startups
go runSubagents(pContext, cancel, pluginInstallDir, pluginStateDir, restartCommand, ps.runCommand)
return &pb.StartResponse{}, nil
}

Expand Down Expand Up @@ -130,6 +157,103 @@ func (ps *OpsAgentPluginServer) GetStatus(ctx context.Context, msg *pb.GetStatus
return &pb.Status{Code: 0, Results: []string{"The Ops Agent Plugin is running ok."}}, nil
}

// runSubagents starts up the diagnostics service, otel, and fluent bit subagents in separate goroutines.
// All child goroutines create a new context derived from the same parent context.
// This ensures that crashes in one goroutine don't affect other goroutines.
// However, when one goroutine exits with errors, it won't be restarted, and all other goroutines are also terminated.
// This is done by canceling the parent context.
// This makes sure that GetStatus() returns a non-healthy status, signaling UAP to Start() the plugin again.
//
// ctx: the parent context that all child goroutines share.
//
// cancel: the cancel function for the parent context. By calling this function, the parent context is canceled,
// and GetStatus() returns a non-healthy status, signaling UAP to re-trigger Start().
func runSubagents(ctx context.Context, cancel context.CancelFunc, pluginInstallDirectory string, pluginStateDirectory string, restartCommand RestartCommandFunc, runCommand RunCommandFunc) {
// Register signal handler and implements its callback.
sigHandler(ctx, func(_ os.Signal) {
cancel()
})

var wg sync.WaitGroup
// Starting the diagnostics service
runDiagnosticsCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, DiagnosticsBinary),
"-config", OpsAgentConfigLocationLinux,
)
wg.Add(1)
go restartCommand(ctx, cancel, runDiagnosticsCmd, runCommand, &wg)

// Starting Otel
runOtelCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, OtelBinary),
"--config", path.Join(pluginStateDirectory, OtelRuntimeDirectory, "otel.yaml"),
)
wg.Add(1)
go restartCommand(ctx, cancel, runOtelCmd, runCommand, &wg)

// Starting FluentBit
runFluentBitCmd := exec.CommandContext(ctx,
path.Join(pluginInstallDirectory, AgentWrapperBinary),
"-config_path", OpsAgentConfigLocationLinux,
"-log_path", path.Join(pluginStateDirectory, LogsDirectory, "subagents/logging-module.log"),
path.Join(pluginInstallDirectory, FluentbitBinary),
"--config", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory, "fluent_bit_main.conf"),
"--parser", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory, "fluent_bit_parser.conf"),
"--storage_path", path.Join(pluginStateDirectory, FluentBitStateDiectory, "buffers"),
)
wg.Add(1)
go restartCommand(ctx, cancel, runFluentBitCmd, runCommand, &wg)

wg.Wait()
}

func restartCommand(ctx context.Context, cancel context.CancelFunc, cmd *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) {
defer wg.Done()
if cmd == nil {
return
}
if ctx.Err() != nil {
// context has been cancelled
log.Printf("cannot execute command: %s, because the context has been cancelled", cmd.Args)
return
}

childCtx, childCtxCancel := context.WithCancel(ctx)
defer childCtxCancel()

cmdCopy := exec.CommandContext(childCtx, cmd.Path, cmd.Args[1:]...)
cmdCopy.Env = cmd.Env
output, err := runCommand(cmdCopy)
if err != nil {
log.Printf("command: %s exited with errors, not restarting.\nCommand output: %s\n Command error:%s", cmd.Args, string(output), err)
cancel() // cancels the parent context which also stops other Ops Agent sub-binaries from running.
return
} else {
log.Printf("command: %s %s exited successfully.\nCommand output: %s", cmd.Path, cmd.Args, string(output))
}

wg.Add(1)
go restartCommand(ctx, cancel, cmd, runCommand, wg)
}

// sigHandler handles SIGTERM, SIGINT etc signals. The function provided in the
// cancel argument handles internal framework termination and the plugin
// interface notification of the "exiting" state.
func sigHandler(ctx context.Context, cancel func(sig os.Signal)) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGHUP)
go func() {
select {
case sig := <-sigChan:
log.Printf("Got signal: %d, leaving...", sig)
close(sigChan)
cancel(sig)
case <-ctx.Done():
break
}
}()
}

func runCommand(cmd *exec.Cmd) (string, error) {
if cmd == nil {
return "", nil
Expand All @@ -141,14 +265,13 @@ func runCommand(cmd *exec.Cmd) (string, error) {
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Command %s failed, \ncommand output: %s\ncommand error: %s", cmd.Args, string(out), err)
return string(out), err
}
return string(out), nil
return string(out), err
}

func validateOpsAgentConfig(ctx context.Context, runCommand RunCommandFunc, pluginBaseLocation string) error {
func validateOpsAgentConfig(ctx context.Context, pluginInstallDirectory string, runCommand RunCommandFunc) error {
configValidationCmd := exec.CommandContext(ctx,
path.Join(pluginBaseLocation, ConfGeneratorBinary),
path.Join(pluginInstallDirectory, ConfGeneratorBinary),
"-in", OpsAgentConfigLocationLinux,
)
if output, err := runCommand(configValidationCmd); err != nil {
Expand All @@ -157,14 +280,14 @@ func validateOpsAgentConfig(ctx context.Context, runCommand RunCommandFunc, plug
return nil
}

func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, pluginBaseLocation string) error {
confGeneratorBinaryFullPath := path.Join(pluginBaseLocation, ConfGeneratorBinary)
func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, pluginInstallDirectory string, pluginStateDirectory string) error {
confGeneratorBinaryFullPath := path.Join(pluginInstallDirectory, ConfGeneratorBinary)
otelConfigGenerationCmd := exec.CommandContext(ctx,
confGeneratorBinaryFullPath,
"-service", "otel",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginBaseLocation, OtelRuntimeDirectory),
"-logs", path.Join(pluginBaseLocation, LogsDirectory))
"-out", path.Join(pluginStateDirectory, OtelRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory))

if output, err := runCommand(otelConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Otel config:\ncommand output: %s\ncommand error: %s", output, err)
Expand All @@ -174,8 +297,8 @@ func generateSubagentConfigs(ctx context.Context, runCommand RunCommandFunc, plu
confGeneratorBinaryFullPath,
"-service", "fluentbit",
"-in", OpsAgentConfigLocationLinux,
"-out", path.Join(pluginBaseLocation, FluentBitRuntimeDirectory),
"-logs", path.Join(pluginBaseLocation, LogsDirectory), "-state", path.Join(pluginBaseLocation, FluentBitStateDiectory))
"-out", path.Join(pluginStateDirectory, FluentBitRuntimeDirectory),
"-logs", path.Join(pluginStateDirectory, LogsDirectory), "-state", path.Join(pluginStateDirectory, FluentBitStateDiectory))

if output, err := runCommand(fluentBitConfigGenerationCmd); err != nil {
return fmt.Errorf("failed to generate Fluntbit config:\ncommand output: %s\ncommand error: %s", output, err)
Expand Down
61 changes: 57 additions & 4 deletions cmd/ops_agent_uap_plugin/service_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,26 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"syscall"
"testing"
"time"

pb "github.com/GoogleCloudPlatform/ops-agent/cmd/ops_agent_uap_plugin/google_guest_agent/plugin"
)

func Test_runCommand(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
ctx := context.Background()
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1"}
_, err := runCommand(cmd)
if err != nil {
t.Errorf("runCommand got unexpected error: %v", err)
}
}
func Test_runCommandFailure(t *testing.T) {
cmd := exec.Command(os.Args[0], "-test.run=TestHelperProcess")
ctx := context.Background()
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}
if _, err := runCommand(cmd); err == nil {
t.Error("runCommand got nil error, want exec failure")
Expand Down Expand Up @@ -110,7 +115,7 @@ func Test_validateOpsAgentConfig(t *testing.T) {
}

ctx := context.Background()
err := validateOpsAgentConfig(ctx, mockRunCommand, "")
err := validateOpsAgentConfig(ctx, "", mockRunCommand)
gotSuccess := (err == nil)
if gotSuccess != tc.wantSuccess {
t.Errorf("%s: validateOpsAgentConfig() failed to valide Ops Agent config: %v, want successful config validation: %v, error:%v", tc.name, gotSuccess, tc.wantSuccess, err)
Expand Down Expand Up @@ -148,7 +153,7 @@ func Test_generateSubagentConfigs(t *testing.T) {
}

ctx := context.Background()
err := generateSubagentConfigs(ctx, mockRunCommand, "")
err := generateSubagentConfigs(ctx, mockRunCommand, "", "")
gotSuccess := (err == nil)
if gotSuccess != tc.wantSuccess {
t.Errorf("%s: generateSubagentConfigs() failed to generate subagents configs: %v, want successful config validation: %v, error:%v", tc.name, gotSuccess, tc.wantSuccess, err)
Expand Down Expand Up @@ -281,6 +286,52 @@ func TestGetStatus(t *testing.T) {
}
}

func Test_restartCommand_CancelContextWhenCmdExitsWithErrors(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}
var wg sync.WaitGroup
wg.Add(1)
restartCommand(ctx, cancel, cmd, runCommand, &wg)
if ctx.Err() == nil {
t.Error("restartCommand() did not cancel context but should")
}
}

func Test_restartCommand_CancelContextWhenCmdTerminatedBySignals(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
cmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_KILL_BY_SIGNALS=1"}
var wg sync.WaitGroup
wg.Add(1)

mockRunCommandFunc := func(cmd *exec.Cmd) (string, error) {
if err := cmd.Start(); err != nil {
t.Errorf("the command %s did not start successfully", cmd.Args)
}
cmd.Process.Signal(syscall.SIGABRT)
err := cmd.Wait()
return "", err
}
restartCommand(ctx, cancel, cmd, mockRunCommandFunc, &wg)
if ctx.Err() == nil {
t.Error("restartCommand() didn't cancel the context but should")
}
}

func Test_runSubagents_TerminatesWhenSpawnedGoRoutinesReturn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
mockCmd := exec.CommandContext(ctx, os.Args[0], "-test.run=TestHelperProcess")
mockCmd.Env = []string{"GO_WANT_HELPER_PROCESS=1", "GO_HELPER_FAILURE=1"}

mockRestartCommandFunc := func(ctx context.Context, cancel context.CancelFunc, _ *exec.Cmd, runCommand RunCommandFunc, wg *sync.WaitGroup) {
restartCommand(ctx, cancel, mockCmd, runCommand, wg)
}
cancel() // child go routines return immediately, because the parent context has been cancelled.
// the test times out and fails if runSubagents does not returns
runSubagents(ctx, cancel, "", "", mockRestartCommandFunc, runCommand)
}

// TestHelperProcess isn't a real test. It's used as a helper process to mock
// command executions.
func TestHelperProcess(t *testing.T) {
Expand All @@ -292,6 +343,8 @@ func TestHelperProcess(t *testing.T) {
switch {
case os.Getenv("GO_HELPER_FAILURE") == "1":
os.Exit(1)
case os.Getenv("GO_HELPER_KILL_BY_SIGNALS") == "1":
time.Sleep(1 * time.Minute)
default:
// A "successful" mock execution exits with a successful (zero) exit code.
os.Exit(0)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ops_agent_uap_plugin/service_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// between Plugin and the server itself. For e.g. service might want to update
// plugin config to enable/disable feature here plugins can react to such requests.
func (ps *OpsAgentPluginServer) Apply(ctx context.Context, msg *pb.ApplyRequest) (*pb.ApplyResponse, error) {
return &pb.ApplyResponse{}, nil
panic("Apply method is not implemented on Windows yet")
}

// Start starts the plugin and initiates the plugin functionality.
Expand Down
Loading