Skip to content

Commit

Permalink
Vc/catch abandoned events (#90)
Browse files Browse the repository at this point in the history
* add some more utility to the status module

* delete conditions in a final state if the db update succeeds

* refactor event update

* encapsulate one-time test setup

* addressed comments
  • Loading branch information
DoctorVin authored Sep 18, 2023
1 parent d6d0d14 commit 0ed62f6
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 72 deletions.
155 changes: 147 additions & 8 deletions internal/orchestrator/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@ import (
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.hollow.sh/toolbox/events/pkg/kv"
"go.hollow.sh/toolbox/events/registry"
)

var (
updOnce sync.Once
expectedDots = 1 // we expect keys for KV-based status updates to be facilityCode.conditionID
errKeyFormat = errors.New("malformed update key")
errConditionID = errors.New("bad condition uuid")
errInvalidState = errors.New("invalid condition state")
updOnce sync.Once
expectedDots = 1 // we expect keys for KV-based status updates to be facilityCode.conditionID
errKeyFormat = errors.New("malformed update key")
errConditionID = errors.New("bad condition uuid")
errInvalidState = errors.New("invalid condition state")
staleEventThreshold = 30 * time.Minute
reconcilerCadence = 10 * time.Minute
failedByReconciler = []byte(`{ "msg": "worker failed processing this event" }`)
)

func (o *Orchestrator) startUpdateMonitor(ctx context.Context) {
Expand Down Expand Up @@ -76,13 +80,16 @@ func (o *Orchestrator) kvStatusPublisher(ctx context.Context) {
"kind": string(evt.Kind),
})

if err := o.eventHandler.UpdateCondition(ctx, evt); err != nil {
le.WithError(err).Warn("updating condition")
if err := o.eventUpdate(ctx, evt); err != nil {
le.WithError(err).Warn("performing event update")
continue
}

if err := o.notifier.Send(evt); err != nil {
le.WithError(err).Warn("sending notification")
// notifications are advisory, so if we fail to notify we keep processing
}

}
}

Expand Down Expand Up @@ -117,6 +124,8 @@ func (o *Orchestrator) startConditionWatchers(ctx context.Context,
case <-ctx.Done():
o.logger.WithField("condition.kind", string(kind)).Info("stopping KV update listener")
keepRunning = false
//nolint:errcheck,gocritic
watcher.Stop()
case entry := <-watcher.Updates():
if entry == nil {
o.logger.WithField("condition.kind", string(kind)).Debug("nil KV update")
Expand Down Expand Up @@ -175,6 +184,7 @@ type controllerStatus struct {
Target string `json:"target"`
State string `json:"state"`
Status json.RawMessage `json:"status"`
WorkerID string `json:"worker"`
}

// eventUpdateFromKV converts the stored rivets.StatusValue (the value from the KV) to a
Expand Down Expand Up @@ -205,6 +215,11 @@ func eventUpdateFromKV(ctx context.Context, kve nats.KeyValueEntry,
return nil, errInvalidState
}

controllerID, err := registry.ControllerIDFromString(cs.WorkerID)
if err != nil {
return nil, errors.Wrap(err, "parsing worker id")
}

// extract traceID and spanID
traceID, _ := trace.TraceIDFromHex(cs.TraceID)
spanID, _ := trace.SpanIDFromHex(cs.SpanID)
Expand All @@ -231,8 +246,132 @@ func eventUpdateFromKV(ctx context.Context, kve nats.KeyValueEntry,
State: convState,
Status: cs.Status,
},
Kind: kind,
Kind: kind,
UpdatedAt: cs.UpdatedAt,
ControllerID: controllerID,
}

return updEvent, nil
}

func (o *Orchestrator) getEventsToReconcile(ctx context.Context) []*v1types.ConditionUpdateEvent {
// collect all events across multiple condition definitions
evts := []*v1types.ConditionUpdateEvent{}
for _, def := range o.conditionDefs {
kind := def.Kind

entries, err := status.GetAllConditions(kind, o.facility)
if err != nil {
o.logger.WithError(err).WithField("condition.kind", string(kind)).
Warn("reconciler error on condition lookup")
}

for _, kve := range entries {
evt, err := eventUpdateFromKV(ctx, kve, kind)
if err != nil {
o.logger.WithError(err).WithFields(logrus.Fields{
"condition.kind": string(kind),
"kv.key": kve.Key(),
}).Warn("reconciler skipping malformed update")
continue
}
if o.eventNeedsReconciliation(evt) {
if !ptypes.ConditionStateIsComplete(evt.ConditionUpdate.State) {
// we need to deal with this event, so mark it failed
evt.ConditionUpdate.State = ptypes.Failed
evt.ConditionUpdate.Status = failedByReconciler
}
evts = append(evts, evt)
}
}
}
return evts
}

func (o *Orchestrator) eventUpdate(ctx context.Context, evt *v1types.ConditionUpdateEvent) error {
if err := o.eventHandler.UpdateCondition(ctx, evt); err != nil {
return errors.Wrap(err, "updating condition")
}

if ptypes.ConditionStateIsComplete(evt.ConditionUpdate.State) {
err := status.DeleteCondition(evt.Kind, o.facility, evt.ConditionUpdate.ConditionID.String())
if err != nil {
return errors.Wrap(err, "deleting event KV data")
}
}

return nil
}

func (o *Orchestrator) eventNeedsReconciliation(evt *v1types.ConditionUpdateEvent) bool {
// the last update should be later than our internal threshold
// this might still be actively worked
if time.Since(evt.UpdatedAt) < staleEventThreshold {
return false
}

// if the event is in a final state it should be handled
if ptypes.ConditionStateIsComplete(evt.ConditionUpdate.State) {
return true
}

le := o.logger.WithFields(logrus.Fields{
"conditionID": evt.ConditionUpdate.ConditionID.String(),
"conditionState": string(evt.ConditionUpdate.State),
"kind": string(evt.Kind),
"controllerID": evt.ControllerID.String(),
})

lastTime, err := registry.LastContact(evt.ControllerID)
if errors.Is(err, nats.ErrKeyNotFound) {
return true
}

if err == nil {
since := time.Since(lastTime)
le.WithFields(logrus.Fields{
"last.update": evt.UpdatedAt.String(),
"worker.checkin": since.String(),
}).Debug("long running event caught by reconciler")
} else {
le.WithError(err).Warn("error checking registry")
}

return false
}

func (o *Orchestrator) startReconciler(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)

go func() {
defer wg.Done()

ticker := time.NewTicker(reconcilerCadence)
defer ticker.Stop()

for keepRunning := true; keepRunning; {
select {
case <-ctx.Done():
o.logger.Info("reconciler signaled to exit")
keepRunning = false

case <-ticker.C:
evts := o.getEventsToReconcile(ctx)
for _, evt := range evts {
le := o.logger.WithFields(logrus.Fields{
"conditionID": evt.ConditionUpdate.ConditionID.String(),
"conditionState": string(evt.ConditionUpdate.State),
"kind": string(evt.Kind),
})
if err := o.eventUpdate(ctx, evt); err != nil {
le.WithError(err).Warn("reconciler event update")
continue
}
if err := o.notifier.Send(evt); err != nil {
le.WithError(err).Warn("reconciler event notification")
}
}
}
}
}()
}
Loading

0 comments on commit 0ed62f6

Please sign in to comment.