From 54a82f63783faff8c5f58a3be44a23601d7f235c Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Thu, 11 Apr 2024 00:57:49 -0700 Subject: [PATCH] feat: add new result channel (#76) Add a new diff.EntityAction type and child types. This struct describes an object and some action taken by the diff engine. Add a new ResultChan diff.EntityAction field to the diff.Syncer type, initialize it on Syncer Run(), and close it when Run() completes. Disable console output when the result channel is enabled. Together, these provide tooling for the Syncer to report its actions downstream, rather than sending text descriptions of them direct to stdout/stderr. --- pkg/diff/diff.go | 179 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 159 insertions(+), 20 deletions(-) diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index 8730e82..e54d5cd 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -19,6 +19,17 @@ import ( "github.com/kong/go-kong/kong" ) +// ------------------------------------------------------ +// Old types used by the direct output diff engine +// ------------------------------------------------------ + +// TODO https://github.com/Kong/go-database-reconciler/issues/22 Body is an any type field. It is set here +// but apparently never used. It only ever contains the "old"/"new" map with the old and new object from +// the event above. We use the event directly in generateDiffString, so it's not clear what its intended +// purpose was. We should probably do a breaking change to either remove it or change it to a more +// structured type. The latter makes sense if we want downstream to be able to calculate its own diffs +// from structs for whatever reason, e.g. to print a partial diff rather than the complete diff string. + type EntityState struct { Name string `json:"name"` Kind string `json:"kind"` @@ -45,6 +56,49 @@ type EntityChanges struct { Deleting []EntityState `json:"deleting"` } +// ------------------------------------------------------ +// New types used by the no output diff engine +// ------------------------------------------------------ + +// ReconcileAction is an action taken by the diff engine. +type ReconcileAction string + +const ( + // CreateAction is the ReconcileAction used when a target object did not exist in the current state and was created. + CreateAction = ReconcileAction("create") + // UpdateAction is the ReconcileAction used when a target object exists in the current state and was updated. + UpdateAction = ReconcileAction("update") + // DeleteAction is the ReconcileAction used when a current object exists in the target state and was deleted. + DeleteAction = ReconcileAction("delete") + + // eventBuffer is the number of events to buffer in the various syncer channels. + eventBuffer = 10 +) + +// Entity is an entity processed by the diff engine. +type Entity struct { + // Name is the name of the entity. + Name string `json:"name"` + // Kind is the type of entity. + Kind string `json:"kind"` + // Old is the original entity in the current state, if any. + Old any `json:"old,omitempty"` + // New is the new entity in the target state, if any. + New any `json:"new,omitempty"` +} + +// EntityAction describes an entity processed by the diff engine and the action taken on it. +type EntityAction struct { + // Action is the ReconcileAction taken on the entity. + Action ReconcileAction `json:"action"` + // Entity holds the processed entity. + Entity Entity `json:"entity"` + // Diff is diff string describing the modifications made to an entity. + Diff string `json:"-"` + // Error is the error encountered processing and entity, if any. + Error error `json:"error,omitempty"` +} + var errEnqueueFailed = errors.New("failed to queue event") func defaultBackOff() backoff.BackOff { @@ -68,9 +122,10 @@ type Syncer struct { processor crud.Registry postProcessor crud.Registry - eventChan chan crud.Event - errChan chan error - stopChan chan struct{} + eventChan chan crud.Event + errChan chan error + stopChan chan struct{} + resultChan chan EntityAction inFlightOps int32 @@ -90,6 +145,10 @@ type Syncer struct { includeLicenses bool isKonnect bool + + // enableEntityActions enables entity actions and disables direct output prints. If set to true, clients must + // consume the Syncer.resultChan channel or Syncer.Solve() will block. + enableEntityActions bool } type SyncerOpts struct { @@ -110,6 +169,10 @@ type SyncerOpts struct { CreatePrintln func(a ...interface{}) UpdatePrintln func(a ...interface{}) DeletePrintln func(a ...interface{}) + + // EnableEntityActions instructs the Syncer to send EntityActions to its resultChan. If enabled, clients must + // consume the Syncer.resultChan channel or Syncer.Solve() will block. + EnableEntityActions bool } // NewSyncer constructs a Syncer. @@ -131,6 +194,8 @@ func NewSyncer(opts SyncerOpts) (*Syncer, error) { deletePrintln: opts.DeletePrintln, includeLicenses: opts.IncludeLicenses, isKonnect: opts.IsKonnect, + + enableEntityActions: opts.EnableEntityActions, } if opts.IsKonnect { @@ -151,10 +216,16 @@ func NewSyncer(opts SyncerOpts) (*Syncer, error) { if err != nil { return nil, err } + s.resultChan = make(chan EntityAction, eventBuffer) return s, nil } +// GetResultChan returns the Syncer's result channel. +func (sc *Syncer) GetResultChan() chan EntityAction { + return sc.resultChan +} + func (sc *Syncer) init() error { opts := types.EntityOpts{ CurrentState: sc.currentState, @@ -337,14 +408,13 @@ func (sc *Syncer) wait() { } } -// Run starts a diff and invokes d for every diff. -func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error { +// Run starts a diff and invokes action for every diff. +func (sc *Syncer) Run(ctx context.Context, parallelism int, action Do) []error { if parallelism < 1 { return append([]error{}, fmt.Errorf("parallelism can not be negative")) } var wg sync.WaitGroup - const eventBuffer = 10 sc.eventChan = make(chan crud.Event, eventBuffer) sc.stopChan = make(chan struct{}) @@ -355,7 +425,7 @@ func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error { wg.Add(parallelism) for i := 0; i < parallelism; i++ { go func() { - err := sc.eventLoop(ctx, d) + err := sc.eventLoop(ctx, action) if err != nil { sc.errChan <- err } @@ -374,10 +444,11 @@ func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error { wg.Done() }() - // close the error chan once all done + // close the error and result chan once all done go func() { wg.Wait() close(sc.errChan) + close(sc.resultChan) }() var errs []error @@ -494,6 +565,9 @@ func generateDiffString(e crud.Event, isDelete bool, noMaskValues bool) (string, func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOut bool) (Stats, []error, EntityChanges, ) { + // TODO https://github.com/Kong/go-database-reconciler/issues/22 + // this can probably be extracted to clients (only deck uses it) by having clients count events through the result + // channel, rather than returning them from Solve. stats := Stats{ CreateOps: &utils.AtomicInt32Counter{}, UpdateOps: &utils.AtomicInt32Counter{}, @@ -516,6 +590,9 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu Deleting: []EntityState{}, } + // The length makes it confusing to read, but the code below _isn't being run here_, it's an anon func + // arg to Run(), which parallelizes it. However, because it's defined in Solve()'s scope, the output created above + // is available in aggregate and contains most of the content we need already. errs := sc.Run(ctx, parallelism, func(e crud.Event) (crud.Arg, error) { var err error var result crud.Arg @@ -526,32 +603,73 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu "new": e.Obj, } item := EntityState{ + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this is the current (only) place Body is + // set in an EntityState. Body: objDiff, Name: c.Console(), Kind: string(e.Kind), } + + actionResult := EntityAction{ + Entity: Entity{ + Name: c.Console(), + Kind: string(e.Kind), + Old: e.OldObj, + New: e.Obj, + }, + } + switch e.Op { case crud.Create: - if isJSONOut { - output.Creating = append(output.Creating, item) + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity + // actions channel or direct console outputs to allow a phased transition to the channel only. Existing console + // prints and JSON blob building will be moved to the deck client. + if sc.enableEntityActions { + actionResult.Action = CreateAction } else { - sc.createPrintln("creating", e.Kind, c.Console()) + if isJSONOut { + output.Creating = append(output.Creating, item) + } else { + sc.createPrintln("creating", e.Kind, c.Console()) + } } case crud.Update: diffString, err := generateDiffString(e, false, sc.noMaskValues) - if err != nil { - return nil, err - } - if isJSONOut { - output.Updating = append(output.Updating, item) + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity + // actions channel or direct console outputs to allow a phased transition to the channel only. Existing console + // prints and JSON blob building will be moved to the deck client. + if sc.enableEntityActions { + actionResult.Action = UpdateAction + if err != nil { + actionResult.Error = err + select { + case sc.resultChan <- actionResult: + case <-ctx.Done(): + } + return nil, err + } } else { - sc.updatePrintln("updating", e.Kind, c.Console(), diffString) + if err != nil { + return nil, err + } + if isJSONOut { + output.Updating = append(output.Updating, item) + } else { + sc.updatePrintln("updating", e.Kind, c.Console(), diffString) + } } case crud.Delete: - if isJSONOut { - output.Deleting = append(output.Deleting, item) + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity + // actions channel or direct console outputs to allow a phased transition to the channel only. Existing console + // prints and JSON blob building will be moved to the deck client. + if sc.enableEntityActions { + actionResult.Action = DeleteAction } else { - sc.deletePrintln("deleting", e.Kind, c.Console()) + if isJSONOut { + output.Deleting = append(output.Deleting, item) + } else { + sc.deletePrintln("deleting", e.Kind, c.Console()) + } } default: panic("unknown operation " + e.Op.String()) @@ -562,6 +680,17 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu // fire the request to Kong result, err = sc.processor.Do(ctx, e.Kind, e.Op, e) if err != nil { + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this does not print, but is switched on + // sc.enableEntityActions because the existing behavior returns a result from the anon Run function. + // Refactoring should use only the channel and simplify the return, probably to just an error (all the other + // data will have been sent through the result channel). + if sc.enableEntityActions { + actionResult.Error = err + select { + case sc.resultChan <- actionResult: + case <-ctx.Done(): + } + } return nil, &crud.ActionError{ OperationType: e.Op, Kind: e.Kind, @@ -574,6 +703,16 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu // return the new obj as is but with timestamps zeroed out utils.ZeroOutTimestamps(e.Obj) utils.ZeroOutTimestamps(e.OldObj) + // TODO https://github.com/Kong/go-database-reconciler/issues/22 this does not print, but is switched on + // sc.enableEntityActions because the existing behavior returns a result from the anon Run function. + // Refactoring should use only the channel and simplify the return, probably to just an error (all the other + // data will have been sent through the result channel). + if sc.enableEntityActions { + select { + case sc.resultChan <- actionResult: + case <-ctx.Done(): + } + } result = e.Obj } // record operation in both: diff and sync commands