From 6df838c64b40eed18fd91dcde0fa06f85c74ef27 Mon Sep 17 00:00:00 2001 From: Rebecca Mahany-Horton Date: Tue, 16 Jul 2024 14:15:00 -0400 Subject: [PATCH] Add new action type force_full_control_data_fetch (#1779) --- cmd/launcher/launcher.go | 3 +++ ee/control/control.go | 31 ++++++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/cmd/launcher/launcher.go b/cmd/launcher/launcher.go index e9f0f7fa1..ca4a3bdfb 100644 --- a/cmd/launcher/launcher.go +++ b/cmd/launcher/launcher.go @@ -30,6 +30,7 @@ import ( "github.com/kolide/launcher/ee/agent/storage" agentbbolt "github.com/kolide/launcher/ee/agent/storage/bbolt" "github.com/kolide/launcher/ee/agent/timemachine" + "github.com/kolide/launcher/ee/control" "github.com/kolide/launcher/ee/control/actionqueue" "github.com/kolide/launcher/ee/control/consumers/acceleratecontrolconsumer" "github.com/kolide/launcher/ee/control/consumers/flareconsumer" @@ -449,6 +450,8 @@ func runLauncher(ctx context.Context, cancel func(), multiSlogger, systemMultiSl actionsQueue.RegisterActor(uninstallconsumer.UninstallSubsystem, uninstallconsumer.New(k)) // register flare consumer actionsQueue.RegisterActor(flareconsumer.FlareSubsystem, flareconsumer.New(k)) + // register force full control data fetch consumer + actionsQueue.RegisterActor(control.ForceFullControlDataFetchAction, controlService) // create notification consumer notificationConsumer, err := notificationconsumer.NewNotifyConsumer( diff --git a/ee/control/control.go b/ee/control/control.go index 8e2d34f0f..f4ea325a6 100644 --- a/ee/control/control.go +++ b/ee/control/control.go @@ -16,6 +16,8 @@ import ( "golang.org/x/exp/slices" ) +const ForceFullControlDataFetchAction = "force_full_control_data_fetch" + // ControlService is the main object that manages the control service. It is responsible for fetching // and caching control data, and updating consumers and subscribers. type ControlService struct { @@ -27,6 +29,8 @@ type ControlService struct { requestTicker *time.Ticker fetcher dataProvider fetchMutex sync.Mutex + fetchFull bool + fetchFullMutex sync.Mutex store types.GetterSetter lastFetched map[string]string consumers map[string]consumer @@ -276,6 +280,14 @@ func (cs *ControlService) Fetch() error { return fmt.Errorf("decoding subsystems map: %w", err) } + fetchFull := false + cs.fetchFullMutex.Lock() + if cs.fetchFull { + fetchFull = true // fetch all subsystems during this Fetch + cs.fetchFull = false // reset for next Fetch + } + cs.fetchFullMutex.Unlock() + for subsystem, hash := range subsystems { lastHash, ok := cs.lastFetched[subsystem] if !ok && cs.store != nil { @@ -286,7 +298,7 @@ func (cs *ControlService) Fetch() error { } } - if hash == lastHash && !cs.knapsack.ForceControlSubsystems() { + if hash == lastHash && !cs.knapsack.ForceControlSubsystems() && !fetchFull { // The last fetched update is still fresh // Nothing to do, skip to the next subsystem continue @@ -382,3 +394,20 @@ func (cs *ControlService) update(subsystem string, reader io.Reader) error { return nil } + +// Do handles the force_full_control_data_fetch action. +func (cs *ControlService) Do(data io.Reader) error { + cs.slogger.Log(context.TODO(), slog.LevelDebug, + "received request to perform full fetch of all subsystems", + ) + + // We receive this request in the middle of a `Fetch`, so we can't call + // `Fetch` again immediately. Instead, set `fetchFull` so that the next + // call to `Fetch` will fetch all subsystems. + cs.fetchFullMutex.Lock() + defer cs.fetchFullMutex.Unlock() + cs.fetchFull = true + + // Treat this action as best-effort: try to do it once, no need to retry. + return nil +}