Skip to content

Commit

Permalink
restrict update KV subscription by the deployment facility
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorVin authored and joelrebel committed Aug 25, 2023
1 parent f6248b6 commit 883c306
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 6 deletions.
11 changes: 10 additions & 1 deletion cmd/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

var (
replicaCount int
facility string
)

// install orchestrator command
Expand Down Expand Up @@ -68,6 +69,7 @@ var cmdOrchestrator = &cobra.Command{
orchestrator.WithStore(repository),
orchestrator.WithStreamBroker(streamBroker),
orchestrator.WithNotifier(notifier),
orchestrator.WithFacility(facility),
}

app.Logger.Info("configuring status KV support")
Expand All @@ -80,8 +82,15 @@ var cmdOrchestrator = &cobra.Command{

// install command flags
func init() {
cmdOrchestrator.PersistentFlags().IntVarP(&replicaCount, "replica-count", "r", 3,
pflags := cmdOrchestrator.PersistentFlags()
pflags.IntVarP(&replicaCount, "replica-count", "r", 3,
"the number of replicas to configure for the NATS status KV store")

pflags.StringVarP(&facility, "facility", "f", "", "a site-specific token to focus this orchestrator's activities")

if err := cmdOrchestrator.MarkPersistentFlagRequired("facility"); err != nil {
log.Fatal("marking facility as required:", err)
}

rootCmd.AddCommand(cmdOrchestrator)
}
8 changes: 8 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Orchestrator struct {
eventHandler *v1EventHandlers.Handler
replicaCount int
notifier notify.Sender
facility string
}

// Option type sets a parameter on the Orchestrator type.
Expand Down Expand Up @@ -92,6 +93,13 @@ func WithNotifier(s notify.Sender) Option {
}
}

// WithFacility sets a site-specific descriptor to focus the orchestrator's work.
func WithFacility(f string) Option {
return func(o *Orchestrator) {
o.facility = f
}
}

// New returns a new orchestrator service with the given options set.
func New(opts ...Option) *Orchestrator {
o := &Orchestrator{concurrency: concurrency, syncWG: &sync.WaitGroup{}}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (o *Orchestrator) startUpdateListener(ctx context.Context) {

func (o *Orchestrator) statusKVListener(ctx context.Context) {
// start the watchers and return the associated channels
installWatcher, err := status.WatchFirmwareInstallStatus(ctx)
installWatcher, err := status.WatchFirmwareInstallStatus(ctx, o.facility)
if err != nil {
o.logger.WithError(err).Fatal("unable to watch install status KV")
}
Expand Down
9 changes: 5 additions & 4 deletions internal/status/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package status

import (
"context"
"fmt"
"time"

ptypes "github.com/metal-toolbox/conditionorc/pkg/types"
Expand Down Expand Up @@ -48,8 +49,8 @@ func ConnectToKVStores(s events.Stream, log *logrus.Logger, opts ...kv.Option) {

// WatchFirmwareInstallStatus specializes some generic NATS functionality, mainly to keep
// the callers cleaner of the NATS-specific details.
func WatchFirmwareInstallStatus(ctx context.Context) (nats.KeyWatcher, error) {
// we can restrict the keys we watch (e.g. by facility code) here by using
// the KV Watch function instead.
return firmwareInstallKV.WatchAll(nats.Context(ctx))
func WatchFirmwareInstallStatus(ctx context.Context, facility string) (nats.KeyWatcher, error) {
// format the facility as a NATS subject to use as a filter for relevant KVs
keyStr := fmt.Sprintf("%s.*", facility)
return firmwareInstallKV.Watch(keyStr, nats.Context(ctx))
}

0 comments on commit 883c306

Please sign in to comment.