diff --git a/pkg/osquery/runtime/runner.go b/pkg/osquery/runtime/runner.go index 6f0250794..d304e44a2 100644 --- a/pkg/osquery/runtime/runner.go +++ b/pkg/osquery/runtime/runner.go @@ -22,6 +22,7 @@ const ( type Runner struct { registrationIds []string // we expect to run one instance per registration ID + regIDLock sync.Mutex // locks access to registrationIds instances map[string]*OsqueryInstance // maps registration ID to currently-running instance instanceLock sync.Mutex // locks access to `instances` to avoid e.g. restarting an instance that isn't running yet slogger *slog.Logger @@ -54,9 +55,13 @@ func New(k types.Knapsack, serviceClient service.KolideService, opts ...OsqueryI func (r *Runner) Run() error { for { - // if our instances ever exit unexpectedly, return immediately - if err := r.runRegisteredInstances(); err != nil { - return err + err := r.runRegisteredInstances() + if err != nil { + // log any errors but continue, in case we intend to reload + r.slogger.Log(context.TODO(), slog.LevelWarn, + "runRegisteredInstances terminated with error", + "err", err, + ) } // if we're in a state that required re-running all registered instances, @@ -66,8 +71,7 @@ func (r *Runner) Run() error { continue } - // otherwise, exit cleanly - return nil + return err } } @@ -82,7 +86,11 @@ func (r *Runner) runRegisteredInstances() error { wg, ctx := errgroup.WithContext(context.Background()) // Start each worker for each instance - for _, registrationId := range r.registrationIds { + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() + + for _, registrationId := range regIDs { id := registrationId wg.Go(func() error { if err := r.runInstance(id); err != nil { @@ -327,7 +335,11 @@ func (r *Runner) Healthy() error { defer r.instanceLock.Unlock() healthcheckErrs := make([]error, 0) - for _, registrationId := range r.registrationIds { + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() + + for _, registrationId := range regIDs { instance, ok := r.instances[registrationId] if !ok { healthcheckErrs = append(healthcheckErrs, fmt.Errorf("running instance does not exist for %s", registrationId)) @@ -350,8 +362,11 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { r.instanceLock.Lock() defer r.instanceLock.Unlock() + r.regIDLock.Lock() + regIDs := r.registrationIds + r.regIDLock.Unlock() instanceStatuses := make(map[string]types.InstanceStatus) - for _, registrationId := range r.registrationIds { + for _, registrationId := range regIDs { instance, ok := r.instances[registrationId] if !ok { instanceStatuses[registrationId] = types.InstanceStatusNotStarted @@ -373,7 +388,10 @@ func (r *Runner) InstanceStatuses() map[string]types.InstanceStatus { // and resets the runner instances for the new registrationIDs if required func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { slices.Sort(newRegistrationIDs) + + r.regIDLock.Lock() existingRegistrationIDs := r.registrationIds + r.regIDLock.Unlock() slices.Sort(existingRegistrationIDs) if slices.Equal(newRegistrationIDs, existingRegistrationIDs) { @@ -391,7 +409,9 @@ func (r *Runner) UpdateRegistrationIDs(newRegistrationIDs []string) error { ) // we know there are changes, safe to update the internal registrationIDs now + r.regIDLock.Lock() r.registrationIds = newRegistrationIDs + r.regIDLock.Unlock() // mark rerun as required so that we can safely shutdown all workers and have the changes // picked back up from within the main Run function r.rerunRequired.Store(true)