Skip to content

Commit

Permalink
Adding Gowrapper around coroutines (kolide#1988)
Browse files Browse the repository at this point in the history
  • Loading branch information
cesarfda authored Dec 17, 2024
1 parent 338c676 commit dbb618a
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ linters-settings:
msg: do not use logutil.Fatal so that launcher can shut down gracefully
- p: ^panic.*$
msg: do not use panic so that launcher can shut down gracefully
- p: ^go func.*$
msg: use gowrapper.Go() instead of raw goroutines for proper panic handling
sloglint:
kv-only: true
context: "all"
Expand Down
10 changes: 8 additions & 2 deletions cmd/launcher/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kolide/launcher/ee/desktop/user/notify"
userserver "github.com/kolide/launcher/ee/desktop/user/server"
"github.com/kolide/launcher/ee/desktop/user/universallink"
"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/launcher/pkg/authedclient"
"github.com/kolide/launcher/pkg/log/multislogger"
"github.com/kolide/launcher/pkg/rungroup"
Expand Down Expand Up @@ -185,15 +186,20 @@ func runDesktop(_ *multislogger.MultiSlogger, args []string) error {
}, func(err error) {})

// run run group
go func() {
gowrapper.Go(context.TODO(), slogger, func() {
// have to run this in a goroutine because menu needs the main thread
if err := runGroup.Run(); err != nil {
slogger.Log(context.TODO(), slog.LevelError,
"running run group",
"err", err,
)
}
}()
}, func(r any) {
slogger.Log(context.TODO(), slog.LevelError,
"exiting after runGroup panic",
"err", r,
)
})

// if desktop is not enabled at start up, wait for send on show desktop channel
if !*flDesktopEnabled {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/gowrapper"
)

const (
Expand Down Expand Up @@ -79,7 +80,7 @@ func (r *RemoteRestartConsumer) Do(data io.Reader) error {

// Perform the restart by signaling actor shutdown, but delay slightly to give
// the actionqueue a chance to process all actions and store their statuses.
go func() {
gowrapper.Go(context.TODO(), r.slogger, func() {
r.slogger.Log(context.TODO(), slog.LevelInfo,
"received remote restart action for current launcher run ID -- signaling for restart shortly",
"action_run_id", restartAction.RunID,
Expand All @@ -100,7 +101,7 @@ func (r *RemoteRestartConsumer) Do(data io.Reader) error {
)
return
}
}()
}, func(err any) {})

return nil
}
Expand Down
5 changes: 3 additions & 2 deletions ee/debug/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/consoleuser"
"github.com/kolide/launcher/ee/control"
"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/launcher/pkg/launcher"
)

Expand Down Expand Up @@ -107,11 +108,11 @@ func (s *shipper) Write(p []byte) (n int, err error) {
// OTOH, if we started request in New() we would know sooner if we had a bad upload url ... :shrug:
s.uploadRequestStarted = true
s.uploadRequestWg.Add(1)
go func() {
gowrapper.Go(context.TODO(), s.knapsack.Slogger(), func() {
defer s.uploadRequestWg.Done()
// will close the body in the close function
s.uploadResponse, s.uploadRequestErr = http.DefaultClient.Do(s.uploadRequest) //nolint:bodyclose
}()
}, func(r any) {})

return s.writer.Write(p)
}
Expand Down
3 changes: 3 additions & 0 deletions ee/debug/shipper/shipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest
k := typesMocks.NewKnapsack(t)
k.On("EnrollSecret").Return("")
k.On("EnrollSecretPath").Return("")
k.On("Slogger").Return(multislogger.NewNopLogger())
return k
},
assertion: assert.NoError,
Expand All @@ -47,6 +48,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest
k := typesMocks.NewKnapsack(t)
k.On("EnrollSecret").Return("")
k.On("EnrollSecretPath").Return("")
k.On("Slogger").Return(multislogger.NewNopLogger())
return k
},
assertion: assert.NoError,
Expand All @@ -63,6 +65,7 @@ func TestShip(t *testing.T) { //nolint:paralleltest

k := typesMocks.NewKnapsack(t)
k.On("EnrollSecret").Return("enroll_secret_value")
k.On("Slogger").Return(multislogger.NewNopLogger())
return k
},
expectSignatureHeaders: true,
Expand Down
13 changes: 7 additions & 6 deletions ee/desktop/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kolide/launcher/ee/desktop/user/client"
"github.com/kolide/launcher/ee/desktop/user/menu"
"github.com/kolide/launcher/ee/desktop/user/notify"
"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/launcher/ee/presencedetection"
"github.com/kolide/launcher/ee/ui/assets"
"github.com/kolide/launcher/pkg/backoff"
Expand Down Expand Up @@ -184,14 +185,14 @@ func New(k types.Knapsack, messenger runnerserver.Messenger, opts ...desktopUser
}

runner.runnerServer = rs
go func() {
gowrapper.Go(context.TODO(), runner.slogger, func() {
if err := runner.runnerServer.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
runner.slogger.Log(context.TODO(), slog.LevelError,
"running monitor server",
"err", err,
)
}
}()
}, func(err any) {})

if runtime.GOOS == "darwin" {
runner.osVersion, err = osversion()
Expand Down Expand Up @@ -309,10 +310,10 @@ func (r *DesktopUsersProcessesRunner) DetectPresence(reason string, interval tim
// killDesktopProcesses kills any existing desktop processes
func (r *DesktopUsersProcessesRunner) killDesktopProcesses(ctx context.Context) {
wgDone := make(chan struct{})
go func() {
gowrapper.Go(context.TODO(), r.slogger, func() {
defer close(wgDone)
r.procsWg.Wait()
}()
}, func(err any) {})

shutdownRequestCount := 0
for uid, proc := range r.uidProcs {
Expand Down Expand Up @@ -794,7 +795,7 @@ func (r *DesktopUsersProcessesRunner) addProcessTrackingRecordForUser(uid string
// The wait group is needed to prevent races.
func (r *DesktopUsersProcessesRunner) waitOnProcessAsync(uid string, proc *os.Process) {
r.procsWg.Add(1)
go func(username string, proc *os.Process) {
gowrapper.Go(context.TODO(), r.slogger.With("uid", uid, "pid", proc.Pid), func() {
defer r.procsWg.Done()
// waiting here gives the parent a chance to clean up
state, err := proc.Wait()
Expand All @@ -807,7 +808,7 @@ func (r *DesktopUsersProcessesRunner) waitOnProcessAsync(uid string, proc *os.Pr
"state", state,
)
}
}(uid, proc)
}, func(err any) {})
}

// determineExecutablePath returns DesktopUsersProcessesRunner.executablePath if it is set,
Expand Down
6 changes: 4 additions & 2 deletions ee/desktop/user/menu/menu_systray.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package menu

import (
"context"
"sync"

"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/systray"
)

Expand Down Expand Up @@ -120,7 +122,7 @@ func (m *menu) makeActionHandler(item *systray.MenuItem, ap ActionPerformer) {
done := make(chan struct{})
doneChans = append(doneChans, done)

go func() {
gowrapper.Go(context.TODO(), m.slogger, func() {
for {
select {
case <-item.ClickedCh:
Expand All @@ -131,5 +133,5 @@ func (m *menu) makeActionHandler(item *systray.MenuItem, ap ActionPerformer) {
return
}
}
}()
}, func(r any) {})
}
7 changes: 4 additions & 3 deletions ee/localserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/kolide/krypto/pkg/echelper"
"github.com/kolide/launcher/ee/agent"
"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/launcher/ee/presencedetection"
"github.com/kolide/launcher/pkg/osquery"
"github.com/kolide/launcher/pkg/traces"
Expand Down Expand Up @@ -253,7 +254,7 @@ func (ls *localServer) Start() error {
var ctx context.Context
ctx, ls.cancel = context.WithCancel(context.Background())

go func() {
gowrapper.Go(ctx, ls.slogger, func() {
var lastRun time.Time

ticker := time.NewTicker(pollInterval)
Expand All @@ -278,14 +279,14 @@ func (ls *localServer) Start() error {
}
}
}
}()
}, func(r any) {})

l, err := ls.startListener()
if err != nil {
return fmt.Errorf("starting listener: %w", err)
}

if ls.tlsCerts != nil && len(ls.tlsCerts) > 0 {
if len(ls.tlsCerts) > 0 {
ls.slogger.Log(ctx, slog.LevelDebug,
"using TLS",
)
Expand Down
8 changes: 5 additions & 3 deletions pkg/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"

"github.com/google/uuid"
"github.com/kolide/launcher/ee/gowrapper"
)

const debugPrefix = "/debug/"
Expand All @@ -42,13 +43,14 @@ func startDebugServer(addrPath string, slogger *slog.Logger) (*http.Server, erro
return nil, fmt.Errorf("opening socket: %w", err)
}

go func() {
gowrapper.Go(context.TODO(), slogger, func() {
if err := serv.Serve(listener); err != nil && err != http.ErrServerClosed {
slogger.Log(context.TODO(), slog.LevelInfo,
"debug server failed", "err", err,
"debug server failed",
"err", err,
)
}
}()
}, func(r any) {})

url := url.URL{
Scheme: "http",
Expand Down
7 changes: 5 additions & 2 deletions pkg/debug/signal_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os"
"os/signal"
"syscall"

"github.com/kolide/launcher/ee/gowrapper"
)

const debugSignal = syscall.SIGUSR1
Expand All @@ -18,7 +20,7 @@ const debugSignal = syscall.SIGUSR1
func AttachDebugHandler(addrPath string, slogger *slog.Logger) {
sig := make(chan os.Signal, 1)
signal.Notify(sig, debugSignal)
go func() {
gowrapper.Go(context.TODO(), slogger, func() {
for {
// Start server on first signal
<-sig
Expand All @@ -45,5 +47,6 @@ func AttachDebugHandler(addrPath string, slogger *slog.Logger) {
"shutdown debug server",
)
}
}()
}, func(r any) {})

}
6 changes: 4 additions & 2 deletions pkg/log/logshipper/logshipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kolide/launcher/ee/agent/flags/keys"
"github.com/kolide/launcher/ee/agent/storage"
"github.com/kolide/launcher/ee/agent/types"
"github.com/kolide/launcher/ee/gowrapper"
"github.com/kolide/launcher/pkg/sendbuffer"
slogmulti "github.com/samber/slog-multi"
)
Expand Down Expand Up @@ -114,9 +115,10 @@ func (ls *LogShipper) Ping() {
}

ls.isShippingStarted = true
go func() {
gowrapper.Go(context.TODO(), ls.knapsack.Slogger(), func() {
ls.startShippingChan <- struct{}{}
}()
}, func(r any) {})

}

func (ls *LogShipper) Run() error {
Expand Down
1 change: 1 addition & 0 deletions pkg/log/logshipper/logshipper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestLogShipper(t *testing.T) {
knapsack.On("RegisterChangeObserver", mock.Anything, keys.LogShippingLevel, keys.LogIngestServerURL)
knapsack.On("LogShippingLevel").Return("info").Times(5)
knapsack.On("CurrentRunningOsqueryVersion").Return("5.12.3")
knapsack.On("Slogger").Return(multislogger.NewNopLogger())

tokenStore := testKVStore(t, storage.TokenStore.String())
knapsack.On("TokenStore").Return(tokenStore)
Expand Down
4 changes: 2 additions & 2 deletions pkg/rungroup/rungroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (g *Group) Run() error {
interruptWait := semaphore.NewWeighted(numActors)
for _, a := range g.actors {
interruptWait.Acquire(context.Background(), 1)
go func(a rungroupActor) {
gowrapper.Go(context.TODO(), g.slogger, func() {
defer interruptWait.Release(1)
g.slogger.Log(context.TODO(), slog.LevelDebug,
"interrupting actor",
Expand All @@ -124,7 +124,7 @@ func (g *Group) Run() error {
"interrupt complete",
"actor", a.name,
)
}(a)
}, func(r any) {})
}

interruptCtx, interruptCancel := context.WithTimeout(context.Background(), InterruptTimeout)
Expand Down
10 changes: 8 additions & 2 deletions pkg/threadas/threadas.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
package threadas

import (
"context"
"fmt"
"log/slog"
"runtime"
"syscall"
"time"

"github.com/kolide/launcher/ee/gowrapper"
)

const (
Expand Down Expand Up @@ -75,7 +79,7 @@ func ThreadAs(fn func() error, timeout time.Duration, uid uint32, gid uint32) er
// sequence starting the child and our listener.
errChan := make(chan error, 1)

go func() {
gowrapper.Go(context.TODO(), slog.Default(), func() {
// Calling LockOSThread, without a subsequent Unlock,
// will cause the thread to terminate when the
// goroutine does. This seems simpler than resetting
Expand All @@ -95,7 +99,9 @@ func ThreadAs(fn func() error, timeout time.Duration, uid uint32, gid uint32) er
}

errChan <- fn()
}()
}, func(r any) {
errChan <- fmt.Errorf("thread permissions handler panic: %v", r)
})

select {
case err := <-errChan:
Expand Down

0 comments on commit dbb618a

Please sign in to comment.