Skip to content

Commit

Permalink
feat: add new result channel (#76)
Browse files Browse the repository at this point in the history
Add a new diff.EntityAction type and child types. This struct describes
an object and some action taken by the diff engine.

Add a new ResultChan diff.EntityAction field to the diff.Syncer type,
initialize it on Syncer Run(), and close it when Run() completes.

Disable console output when the result channel is enabled.

Together, these provide tooling for the Syncer to report its actions
downstream, rather than sending text descriptions of them direct to
stdout/stderr.
  • Loading branch information
rainest authored Apr 11, 2024
1 parent 3bf528e commit 54a82f6
Showing 1 changed file with 159 additions and 20 deletions.
179 changes: 159 additions & 20 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ import (
"github.com/kong/go-kong/kong"
)

// ------------------------------------------------------
// Old types used by the direct output diff engine
// ------------------------------------------------------

// TODO https://github.com/Kong/go-database-reconciler/issues/22 Body is an any type field. It is set here
// but apparently never used. It only ever contains the "old"/"new" map with the old and new object from
// the event above. We use the event directly in generateDiffString, so it's not clear what its intended
// purpose was. We should probably do a breaking change to either remove it or change it to a more
// structured type. The latter makes sense if we want downstream to be able to calculate its own diffs
// from structs for whatever reason, e.g. to print a partial diff rather than the complete diff string.

type EntityState struct {
Name string `json:"name"`
Kind string `json:"kind"`
Expand All @@ -45,6 +56,49 @@ type EntityChanges struct {
Deleting []EntityState `json:"deleting"`
}

// ------------------------------------------------------
// New types used by the no output diff engine
// ------------------------------------------------------

// ReconcileAction is an action taken by the diff engine.
type ReconcileAction string

const (
// CreateAction is the ReconcileAction used when a target object did not exist in the current state and was created.
CreateAction = ReconcileAction("create")
// UpdateAction is the ReconcileAction used when a target object exists in the current state and was updated.
UpdateAction = ReconcileAction("update")
// DeleteAction is the ReconcileAction used when a current object exists in the target state and was deleted.
DeleteAction = ReconcileAction("delete")

// eventBuffer is the number of events to buffer in the various syncer channels.
eventBuffer = 10
)

// Entity is an entity processed by the diff engine.
type Entity struct {
// Name is the name of the entity.
Name string `json:"name"`
// Kind is the type of entity.
Kind string `json:"kind"`
// Old is the original entity in the current state, if any.
Old any `json:"old,omitempty"`
// New is the new entity in the target state, if any.
New any `json:"new,omitempty"`
}

// EntityAction describes an entity processed by the diff engine and the action taken on it.
type EntityAction struct {
// Action is the ReconcileAction taken on the entity.
Action ReconcileAction `json:"action"`
// Entity holds the processed entity.
Entity Entity `json:"entity"`
// Diff is diff string describing the modifications made to an entity.
Diff string `json:"-"`
// Error is the error encountered processing and entity, if any.
Error error `json:"error,omitempty"`
}

var errEnqueueFailed = errors.New("failed to queue event")

func defaultBackOff() backoff.BackOff {
Expand All @@ -68,9 +122,10 @@ type Syncer struct {
processor crud.Registry
postProcessor crud.Registry

eventChan chan crud.Event
errChan chan error
stopChan chan struct{}
eventChan chan crud.Event
errChan chan error
stopChan chan struct{}
resultChan chan EntityAction

inFlightOps int32

Expand All @@ -90,6 +145,10 @@ type Syncer struct {
includeLicenses bool

isKonnect bool

// enableEntityActions enables entity actions and disables direct output prints. If set to true, clients must
// consume the Syncer.resultChan channel or Syncer.Solve() will block.
enableEntityActions bool
}

type SyncerOpts struct {
Expand All @@ -110,6 +169,10 @@ type SyncerOpts struct {
CreatePrintln func(a ...interface{})
UpdatePrintln func(a ...interface{})
DeletePrintln func(a ...interface{})

// EnableEntityActions instructs the Syncer to send EntityActions to its resultChan. If enabled, clients must
// consume the Syncer.resultChan channel or Syncer.Solve() will block.
EnableEntityActions bool
}

// NewSyncer constructs a Syncer.
Expand All @@ -131,6 +194,8 @@ func NewSyncer(opts SyncerOpts) (*Syncer, error) {
deletePrintln: opts.DeletePrintln,
includeLicenses: opts.IncludeLicenses,
isKonnect: opts.IsKonnect,

enableEntityActions: opts.EnableEntityActions,
}

if opts.IsKonnect {
Expand All @@ -151,10 +216,16 @@ func NewSyncer(opts SyncerOpts) (*Syncer, error) {
if err != nil {
return nil, err
}
s.resultChan = make(chan EntityAction, eventBuffer)

return s, nil
}

// GetResultChan returns the Syncer's result channel.
func (sc *Syncer) GetResultChan() chan EntityAction {
return sc.resultChan
}

func (sc *Syncer) init() error {
opts := types.EntityOpts{
CurrentState: sc.currentState,
Expand Down Expand Up @@ -337,14 +408,13 @@ func (sc *Syncer) wait() {
}
}

// Run starts a diff and invokes d for every diff.
func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error {
// Run starts a diff and invokes action for every diff.
func (sc *Syncer) Run(ctx context.Context, parallelism int, action Do) []error {
if parallelism < 1 {
return append([]error{}, fmt.Errorf("parallelism can not be negative"))
}

var wg sync.WaitGroup
const eventBuffer = 10

sc.eventChan = make(chan crud.Event, eventBuffer)
sc.stopChan = make(chan struct{})
Expand All @@ -355,7 +425,7 @@ func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error {
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func() {
err := sc.eventLoop(ctx, d)
err := sc.eventLoop(ctx, action)
if err != nil {
sc.errChan <- err
}
Expand All @@ -374,10 +444,11 @@ func (sc *Syncer) Run(ctx context.Context, parallelism int, d Do) []error {
wg.Done()
}()

// close the error chan once all done
// close the error and result chan once all done
go func() {
wg.Wait()
close(sc.errChan)
close(sc.resultChan)
}()

var errs []error
Expand Down Expand Up @@ -494,6 +565,9 @@ func generateDiffString(e crud.Event, isDelete bool, noMaskValues bool) (string,
func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOut bool) (Stats,
[]error, EntityChanges,
) {
// TODO https://github.com/Kong/go-database-reconciler/issues/22
// this can probably be extracted to clients (only deck uses it) by having clients count events through the result
// channel, rather than returning them from Solve.
stats := Stats{
CreateOps: &utils.AtomicInt32Counter{},
UpdateOps: &utils.AtomicInt32Counter{},
Expand All @@ -516,6 +590,9 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
Deleting: []EntityState{},
}

// The length makes it confusing to read, but the code below _isn't being run here_, it's an anon func
// arg to Run(), which parallelizes it. However, because it's defined in Solve()'s scope, the output created above
// is available in aggregate and contains most of the content we need already.
errs := sc.Run(ctx, parallelism, func(e crud.Event) (crud.Arg, error) {
var err error
var result crud.Arg
Expand All @@ -526,32 +603,73 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
"new": e.Obj,
}
item := EntityState{
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this is the current (only) place Body is
// set in an EntityState.
Body: objDiff,
Name: c.Console(),
Kind: string(e.Kind),
}

actionResult := EntityAction{
Entity: Entity{
Name: c.Console(),
Kind: string(e.Kind),
Old: e.OldObj,
New: e.Obj,
},
}

switch e.Op {
case crud.Create:
if isJSONOut {
output.Creating = append(output.Creating, item)
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity
// actions channel or direct console outputs to allow a phased transition to the channel only. Existing console
// prints and JSON blob building will be moved to the deck client.
if sc.enableEntityActions {
actionResult.Action = CreateAction
} else {
sc.createPrintln("creating", e.Kind, c.Console())
if isJSONOut {
output.Creating = append(output.Creating, item)
} else {
sc.createPrintln("creating", e.Kind, c.Console())
}
}
case crud.Update:
diffString, err := generateDiffString(e, false, sc.noMaskValues)
if err != nil {
return nil, err
}
if isJSONOut {
output.Updating = append(output.Updating, item)
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity
// actions channel or direct console outputs to allow a phased transition to the channel only. Existing console
// prints and JSON blob building will be moved to the deck client.
if sc.enableEntityActions {
actionResult.Action = UpdateAction
if err != nil {
actionResult.Error = err
select {
case sc.resultChan <- actionResult:
case <-ctx.Done():
}
return nil, err
}
} else {
sc.updatePrintln("updating", e.Kind, c.Console(), diffString)
if err != nil {
return nil, err
}
if isJSONOut {
output.Updating = append(output.Updating, item)
} else {
sc.updatePrintln("updating", e.Kind, c.Console(), diffString)
}
}
case crud.Delete:
if isJSONOut {
output.Deleting = append(output.Deleting, item)
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this currently supports either the entity
// actions channel or direct console outputs to allow a phased transition to the channel only. Existing console
// prints and JSON blob building will be moved to the deck client.
if sc.enableEntityActions {
actionResult.Action = DeleteAction
} else {
sc.deletePrintln("deleting", e.Kind, c.Console())
if isJSONOut {
output.Deleting = append(output.Deleting, item)
} else {
sc.deletePrintln("deleting", e.Kind, c.Console())
}
}
default:
panic("unknown operation " + e.Op.String())
Expand All @@ -562,6 +680,17 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
// fire the request to Kong
result, err = sc.processor.Do(ctx, e.Kind, e.Op, e)
if err != nil {
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this does not print, but is switched on
// sc.enableEntityActions because the existing behavior returns a result from the anon Run function.
// Refactoring should use only the channel and simplify the return, probably to just an error (all the other
// data will have been sent through the result channel).
if sc.enableEntityActions {
actionResult.Error = err
select {
case sc.resultChan <- actionResult:
case <-ctx.Done():
}
}
return nil, &crud.ActionError{
OperationType: e.Op,
Kind: e.Kind,
Expand All @@ -574,6 +703,16 @@ func (sc *Syncer) Solve(ctx context.Context, parallelism int, dry bool, isJSONOu
// return the new obj as is but with timestamps zeroed out
utils.ZeroOutTimestamps(e.Obj)
utils.ZeroOutTimestamps(e.OldObj)
// TODO https://github.com/Kong/go-database-reconciler/issues/22 this does not print, but is switched on
// sc.enableEntityActions because the existing behavior returns a result from the anon Run function.
// Refactoring should use only the channel and simplify the return, probably to just an error (all the other
// data will have been sent through the result channel).
if sc.enableEntityActions {
select {
case sc.resultChan <- actionResult:
case <-ctx.Done():
}
}
result = e.Obj
}
// record operation in both: diff and sync commands
Expand Down

0 comments on commit 54a82f6

Please sign in to comment.