From dbb618a6ec8a3c65eb3d86898140db5c94fffae5 Mon Sep 17 00:00:00 2001 From: Cesar Araujo <56365373+cesarfda@users.noreply.github.com> Date: Tue, 17 Dec 2024 09:56:45 -0800 Subject: [PATCH] Adding Gowrapper around coroutines (#1988) --- .golangci.yml | 2 ++ cmd/launcher/desktop.go | 10 ++++++++-- .../remoterestartconsumer/remoterestartconsumer.go | 5 +++-- ee/debug/shipper/shipper.go | 5 +++-- ee/debug/shipper/shipper_test.go | 3 +++ ee/desktop/runner/runner.go | 13 +++++++------ ee/desktop/user/menu/menu_systray.go | 6 ++++-- ee/localserver/server.go | 7 ++++--- pkg/debug/debug.go | 8 +++++--- pkg/debug/signal_debug.go | 7 +++++-- pkg/log/logshipper/logshipper.go | 6 ++++-- pkg/log/logshipper/logshipper_test.go | 1 + pkg/rungroup/rungroup.go | 4 ++-- pkg/threadas/threadas.go | 10 ++++++++-- 14 files changed, 59 insertions(+), 28 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 79dcc01f3..2e867cbd0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -39,6 +39,8 @@ linters-settings: msg: do not use logutil.Fatal so that launcher can shut down gracefully - p: ^panic.*$ msg: do not use panic so that launcher can shut down gracefully + - p: ^go func.*$ + msg: use gowrapper.Go() instead of raw goroutines for proper panic handling sloglint: kv-only: true context: "all" diff --git a/cmd/launcher/desktop.go b/cmd/launcher/desktop.go index 9b193a0b2..b65ec4af5 100644 --- a/cmd/launcher/desktop.go +++ b/cmd/launcher/desktop.go @@ -19,6 +19,7 @@ import ( "github.com/kolide/launcher/ee/desktop/user/notify" userserver "github.com/kolide/launcher/ee/desktop/user/server" "github.com/kolide/launcher/ee/desktop/user/universallink" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/pkg/authedclient" "github.com/kolide/launcher/pkg/log/multislogger" "github.com/kolide/launcher/pkg/rungroup" @@ -185,7 +186,7 @@ func runDesktop(_ *multislogger.MultiSlogger, args []string) error { }, func(err error) {}) // run run group - go func() { + gowrapper.Go(context.TODO(), slogger, func() { // have to run this in a goroutine because menu needs the main thread if err := runGroup.Run(); err != nil { slogger.Log(context.TODO(), slog.LevelError, @@ -193,7 +194,12 @@ func runDesktop(_ *multislogger.MultiSlogger, args []string) error { "err", err, ) } - }() + }, func(r any) { + slogger.Log(context.TODO(), slog.LevelError, + "exiting after runGroup panic", + "err", r, + ) + }) // if desktop is not enabled at start up, wait for send on show desktop channel if !*flDesktopEnabled { diff --git a/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go b/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go index 46e21bb20..8b2f2c843 100644 --- a/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go +++ b/ee/control/consumers/remoterestartconsumer/remoterestartconsumer.go @@ -10,6 +10,7 @@ import ( "time" "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/ee/gowrapper" ) const ( @@ -79,7 +80,7 @@ func (r *RemoteRestartConsumer) Do(data io.Reader) error { // Perform the restart by signaling actor shutdown, but delay slightly to give // the actionqueue a chance to process all actions and store their statuses. - go func() { + gowrapper.Go(context.TODO(), r.slogger, func() { r.slogger.Log(context.TODO(), slog.LevelInfo, "received remote restart action for current launcher run ID -- signaling for restart shortly", "action_run_id", restartAction.RunID, @@ -100,7 +101,7 @@ func (r *RemoteRestartConsumer) Do(data io.Reader) error { ) return } - }() + }, func(err any) {}) return nil } diff --git a/ee/debug/shipper/shipper.go b/ee/debug/shipper/shipper.go index 08b4c32fe..ea166b1c8 100644 --- a/ee/debug/shipper/shipper.go +++ b/ee/debug/shipper/shipper.go @@ -23,6 +23,7 @@ import ( "github.com/kolide/launcher/ee/agent/types" "github.com/kolide/launcher/ee/consoleuser" "github.com/kolide/launcher/ee/control" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/pkg/launcher" ) @@ -107,11 +108,11 @@ func (s *shipper) Write(p []byte) (n int, err error) { // OTOH, if we started request in New() we would know sooner if we had a bad upload url ... :shrug: s.uploadRequestStarted = true s.uploadRequestWg.Add(1) - go func() { + gowrapper.Go(context.TODO(), s.knapsack.Slogger(), func() { defer s.uploadRequestWg.Done() // will close the body in the close function s.uploadResponse, s.uploadRequestErr = http.DefaultClient.Do(s.uploadRequest) //nolint:bodyclose - }() + }, func(r any) {}) return s.writer.Write(p) } diff --git a/ee/debug/shipper/shipper_test.go b/ee/debug/shipper/shipper_test.go index 158625c60..28f35663b 100644 --- a/ee/debug/shipper/shipper_test.go +++ b/ee/debug/shipper/shipper_test.go @@ -33,6 +33,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest k := typesMocks.NewKnapsack(t) k.On("EnrollSecret").Return("") k.On("EnrollSecretPath").Return("") + k.On("Slogger").Return(multislogger.NewNopLogger()) return k }, assertion: assert.NoError, @@ -47,6 +48,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest k := typesMocks.NewKnapsack(t) k.On("EnrollSecret").Return("") k.On("EnrollSecretPath").Return("") + k.On("Slogger").Return(multislogger.NewNopLogger()) return k }, assertion: assert.NoError, @@ -63,6 +65,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest k := typesMocks.NewKnapsack(t) k.On("EnrollSecret").Return("enroll_secret_value") + k.On("Slogger").Return(multislogger.NewNopLogger()) return k }, expectSignatureHeaders: true, diff --git a/ee/desktop/runner/runner.go b/ee/desktop/runner/runner.go index be6367f8c..d5bf62d7a 100644 --- a/ee/desktop/runner/runner.go +++ b/ee/desktop/runner/runner.go @@ -29,6 +29,7 @@ import ( "github.com/kolide/launcher/ee/desktop/user/client" "github.com/kolide/launcher/ee/desktop/user/menu" "github.com/kolide/launcher/ee/desktop/user/notify" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/ee/presencedetection" "github.com/kolide/launcher/ee/ui/assets" "github.com/kolide/launcher/pkg/backoff" @@ -184,14 +185,14 @@ func New(k types.Knapsack, messenger runnerserver.Messenger, opts ...desktopUser } runner.runnerServer = rs - go func() { + gowrapper.Go(context.TODO(), runner.slogger, func() { if err := runner.runnerServer.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { runner.slogger.Log(context.TODO(), slog.LevelError, "running monitor server", "err", err, ) } - }() + }, func(err any) {}) if runtime.GOOS == "darwin" { runner.osVersion, err = osversion() @@ -309,10 +310,10 @@ func (r *DesktopUsersProcessesRunner) DetectPresence(reason string, interval tim // killDesktopProcesses kills any existing desktop processes func (r *DesktopUsersProcessesRunner) killDesktopProcesses(ctx context.Context) { wgDone := make(chan struct{}) - go func() { + gowrapper.Go(context.TODO(), r.slogger, func() { defer close(wgDone) r.procsWg.Wait() - }() + }, func(err any) {}) shutdownRequestCount := 0 for uid, proc := range r.uidProcs { @@ -794,7 +795,7 @@ func (r *DesktopUsersProcessesRunner) addProcessTrackingRecordForUser(uid string // The wait group is needed to prevent races. func (r *DesktopUsersProcessesRunner) waitOnProcessAsync(uid string, proc *os.Process) { r.procsWg.Add(1) - go func(username string, proc *os.Process) { + gowrapper.Go(context.TODO(), r.slogger.With("uid", uid, "pid", proc.Pid), func() { defer r.procsWg.Done() // waiting here gives the parent a chance to clean up state, err := proc.Wait() @@ -807,7 +808,7 @@ func (r *DesktopUsersProcessesRunner) waitOnProcessAsync(uid string, proc *os.Pr "state", state, ) } - }(uid, proc) + }, func(err any) {}) } // determineExecutablePath returns DesktopUsersProcessesRunner.executablePath if it is set, diff --git a/ee/desktop/user/menu/menu_systray.go b/ee/desktop/user/menu/menu_systray.go index c663378d7..27892b884 100644 --- a/ee/desktop/user/menu/menu_systray.go +++ b/ee/desktop/user/menu/menu_systray.go @@ -1,8 +1,10 @@ package menu import ( + "context" "sync" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/systray" ) @@ -120,7 +122,7 @@ func (m *menu) makeActionHandler(item *systray.MenuItem, ap ActionPerformer) { done := make(chan struct{}) doneChans = append(doneChans, done) - go func() { + gowrapper.Go(context.TODO(), m.slogger, func() { for { select { case <-item.ClickedCh: @@ -131,5 +133,5 @@ func (m *menu) makeActionHandler(item *systray.MenuItem, ap ActionPerformer) { return } } - }() + }, func(r any) {}) } diff --git a/ee/localserver/server.go b/ee/localserver/server.go index ce4f6a917..053c09de4 100644 --- a/ee/localserver/server.go +++ b/ee/localserver/server.go @@ -21,6 +21,7 @@ import ( "github.com/kolide/krypto/pkg/echelper" "github.com/kolide/launcher/ee/agent" "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/ee/presencedetection" "github.com/kolide/launcher/pkg/osquery" "github.com/kolide/launcher/pkg/traces" @@ -253,7 +254,7 @@ func (ls *localServer) Start() error { var ctx context.Context ctx, ls.cancel = context.WithCancel(context.Background()) - go func() { + gowrapper.Go(ctx, ls.slogger, func() { var lastRun time.Time ticker := time.NewTicker(pollInterval) @@ -278,14 +279,14 @@ func (ls *localServer) Start() error { } } } - }() + }, func(r any) {}) l, err := ls.startListener() if err != nil { return fmt.Errorf("starting listener: %w", err) } - if ls.tlsCerts != nil && len(ls.tlsCerts) > 0 { + if len(ls.tlsCerts) > 0 { ls.slogger.Log(ctx, slog.LevelDebug, "using TLS", ) diff --git a/pkg/debug/debug.go b/pkg/debug/debug.go index f47432f8a..0f1c884f4 100644 --- a/pkg/debug/debug.go +++ b/pkg/debug/debug.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/google/uuid" + "github.com/kolide/launcher/ee/gowrapper" ) const debugPrefix = "/debug/" @@ -42,13 +43,14 @@ func startDebugServer(addrPath string, slogger *slog.Logger) (*http.Server, erro return nil, fmt.Errorf("opening socket: %w", err) } - go func() { + gowrapper.Go(context.TODO(), slogger, func() { if err := serv.Serve(listener); err != nil && err != http.ErrServerClosed { slogger.Log(context.TODO(), slog.LevelInfo, - "debug server failed", "err", err, + "debug server failed", + "err", err, ) } - }() + }, func(r any) {}) url := url.URL{ Scheme: "http", diff --git a/pkg/debug/signal_debug.go b/pkg/debug/signal_debug.go index d0d9b3088..c7efee6bb 100644 --- a/pkg/debug/signal_debug.go +++ b/pkg/debug/signal_debug.go @@ -9,6 +9,8 @@ import ( "os" "os/signal" "syscall" + + "github.com/kolide/launcher/ee/gowrapper" ) const debugSignal = syscall.SIGUSR1 @@ -18,7 +20,7 @@ const debugSignal = syscall.SIGUSR1 func AttachDebugHandler(addrPath string, slogger *slog.Logger) { sig := make(chan os.Signal, 1) signal.Notify(sig, debugSignal) - go func() { + gowrapper.Go(context.TODO(), slogger, func() { for { // Start server on first signal <-sig @@ -45,5 +47,6 @@ func AttachDebugHandler(addrPath string, slogger *slog.Logger) { "shutdown debug server", ) } - }() + }, func(r any) {}) + } diff --git a/pkg/log/logshipper/logshipper.go b/pkg/log/logshipper/logshipper.go index c5cf32da4..4977300dd 100644 --- a/pkg/log/logshipper/logshipper.go +++ b/pkg/log/logshipper/logshipper.go @@ -19,6 +19,7 @@ import ( "github.com/kolide/launcher/ee/agent/flags/keys" "github.com/kolide/launcher/ee/agent/storage" "github.com/kolide/launcher/ee/agent/types" + "github.com/kolide/launcher/ee/gowrapper" "github.com/kolide/launcher/pkg/sendbuffer" slogmulti "github.com/samber/slog-multi" ) @@ -114,9 +115,10 @@ func (ls *LogShipper) Ping() { } ls.isShippingStarted = true - go func() { + gowrapper.Go(context.TODO(), ls.knapsack.Slogger(), func() { ls.startShippingChan <- struct{}{} - }() + }, func(r any) {}) + } func (ls *LogShipper) Run() error { diff --git a/pkg/log/logshipper/logshipper_test.go b/pkg/log/logshipper/logshipper_test.go index 1ea96f67a..117baad5f 100644 --- a/pkg/log/logshipper/logshipper_test.go +++ b/pkg/log/logshipper/logshipper_test.go @@ -40,6 +40,7 @@ func TestLogShipper(t *testing.T) { knapsack.On("RegisterChangeObserver", mock.Anything, keys.LogShippingLevel, keys.LogIngestServerURL) knapsack.On("LogShippingLevel").Return("info").Times(5) knapsack.On("CurrentRunningOsqueryVersion").Return("5.12.3") + knapsack.On("Slogger").Return(multislogger.NewNopLogger()) tokenStore := testKVStore(t, storage.TokenStore.String()) knapsack.On("TokenStore").Return(tokenStore) diff --git a/pkg/rungroup/rungroup.go b/pkg/rungroup/rungroup.go index 6da0a58ff..3ee551ce6 100644 --- a/pkg/rungroup/rungroup.go +++ b/pkg/rungroup/rungroup.go @@ -113,7 +113,7 @@ func (g *Group) Run() error { interruptWait := semaphore.NewWeighted(numActors) for _, a := range g.actors { interruptWait.Acquire(context.Background(), 1) - go func(a rungroupActor) { + gowrapper.Go(context.TODO(), g.slogger, func() { defer interruptWait.Release(1) g.slogger.Log(context.TODO(), slog.LevelDebug, "interrupting actor", @@ -124,7 +124,7 @@ func (g *Group) Run() error { "interrupt complete", "actor", a.name, ) - }(a) + }, func(r any) {}) } interruptCtx, interruptCancel := context.WithTimeout(context.Background(), InterruptTimeout) diff --git a/pkg/threadas/threadas.go b/pkg/threadas/threadas.go index cecbf096c..d245068e8 100644 --- a/pkg/threadas/threadas.go +++ b/pkg/threadas/threadas.go @@ -21,10 +21,14 @@ package threadas import ( + "context" "fmt" + "log/slog" "runtime" "syscall" "time" + + "github.com/kolide/launcher/ee/gowrapper" ) const ( @@ -75,7 +79,7 @@ func ThreadAs(fn func() error, timeout time.Duration, uid uint32, gid uint32) er // sequence starting the child and our listener. errChan := make(chan error, 1) - go func() { + gowrapper.Go(context.TODO(), slog.Default(), func() { // Calling LockOSThread, without a subsequent Unlock, // will cause the thread to terminate when the // goroutine does. This seems simpler than resetting @@ -95,7 +99,9 @@ func ThreadAs(fn func() error, timeout time.Duration, uid uint32, gid uint32) er } errChan <- fn() - }() + }, func(r any) { + errChan <- fmt.Errorf("thread permissions handler panic: %v", r) + }) select { case err := <-errChan: