Skip to content

Commit

Permalink
Add beholder metrics on workflow engine (#15238)
Browse files Browse the repository at this point in the history
* Adds metrics on workflow engine

* Adds trigger event metric

* Removes comment

* metrics: execution duration histograms by status and removing now redundant instrumentation

* adding step execution time histogram

* fixing data race for global instruments

* cleanup + fixing tests

* renaming vars somehow fixes broken test

* removing short circuit in workferForStepRequest if Vertex call fails

* nil guard if Vertex errs

* updating workflow.name to workflow.hexName and fixing err log

---------

Co-authored-by: patrickhuie19 <[email protected]>
  • Loading branch information
vyzaldysanchez and patrickhuie19 authored Nov 27, 2024
1 parent 7a8a079 commit 2902b91
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 67 deletions.
84 changes: 56 additions & 28 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,7 @@ func (e *Engine) Start(_ context.Context) error {
// create a new context, since the one passed in via Start is short-lived.
ctx, _ := e.stopCh.NewCtx()

// spin up monitoring resources
err := initMonitoringResources()
if err != nil {
return fmt.Errorf("could not initialize monitoring resources: %w", err)
}
e.metrics.incrementWorkflowInitializationCounter(ctx)

e.wg.Add(e.maxWorkerLimit)
for i := 0; i < e.maxWorkerLimit; i++ {
Expand Down Expand Up @@ -358,6 +354,7 @@ func (e *Engine) init(ctx context.Context) {

e.logger.Info("engine initialized")
logCustMsg(ctx, e.cma, "workflow registered", e.logger)
e.metrics.incrementWorkflowRegisteredCounter(ctx)
e.afterInit(true)
}

Expand Down Expand Up @@ -439,7 +436,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig
Metadata: capabilities.RequestMetadata{
WorkflowID: e.workflow.id,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: t.Ref,
Expand Down Expand Up @@ -678,7 +675,6 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error {
l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status)
metrics := e.metrics.with("status", status)

l.Info("finishing execution")

Expand All @@ -692,18 +688,28 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
return err
}

executionDuration := execState.FinishedAt.Sub(*execState.CreatedAt).Milliseconds()

e.stepUpdatesChMap.remove(executionID)
metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration)

executionDuration := int64(execState.FinishedAt.Sub(*execState.CreatedAt).Seconds())
switch status {
case store.StatusCompleted:
e.metrics.updateWorkflowCompletedDurationHistogram(ctx, executionDuration)
case store.StatusCompletedEarlyExit:
e.metrics.updateWorkflowEarlyExitDurationHistogram(ctx, executionDuration)
case store.StatusErrored:
e.metrics.updateWorkflowErrorDurationHistogram(ctx, executionDuration)
case store.StatusTimeout:
// should expect the same values unless the timeout is adjusted.
// using histogram as it gives count of executions for free
e.metrics.updateWorkflowTimeoutDurationHistogram(ctx, executionDuration)
}

if executionDuration > fifteenMinutesMs {
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration)
}
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l)
l.Infof("execution duration: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
l.Infof("execution duration: %d (seconds)", executionDuration)
e.onExecutionFinished(executionID)
return nil
}
Expand Down Expand Up @@ -747,6 +753,7 @@ func (e *Engine) worker(ctx context.Context) {
if err != nil {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
e.metrics.with(platform.KeyTriggerID, te.ID).incrementTriggerWorkflowStarterErrorCounter(ctx)
} else {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
Expand All @@ -770,10 +777,21 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
Ref: msg.stepRef,
}

// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)

stepExecutionStartTime := time.Now()
inputs, outputs, err := e.executeStep(ctx, l, msg)
stepExecutionDuration := time.Since(stepExecutionStartTime).Seconds()

curStepID := "UNSET"
curStep, verr := e.workflow.Vertex(msg.stepRef)
if verr == nil {
curStepID = curStep.ID
} else {
l.Errorf("failed to resolve step in workflow; error %v", verr)
}
e.metrics.with(platform.KeyCapabilityID, curStepID).updateWorkflowStepDurationHistogram(ctx, int64(stepExecutionDuration))

var stepStatus string
switch {
case errors.Is(capabilities.ErrStopExecution, err):
Expand Down Expand Up @@ -850,7 +868,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.hexName)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down Expand Up @@ -894,16 +912,16 @@ func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *st

// executeStep executes the referenced capability within a step and returns the result.
func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRequest) (*values.Map, values.Value, error) {
step, err := e.workflow.Vertex(msg.stepRef)
curStep, err := e.workflow.Vertex(msg.stepRef)
if err != nil {
return nil, nil, err
}

var inputs any
if step.Inputs.OutputRef != "" {
inputs = step.Inputs.OutputRef
if curStep.Inputs.OutputRef != "" {
inputs = curStep.Inputs.OutputRef
} else {
inputs = step.Inputs.Mapping
inputs = curStep.Inputs.Mapping
}

i, err := exec.FindAndInterpolateAllKeys(inputs, msg.state)
Expand All @@ -916,7 +934,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
return nil, nil, err
}

config, err := e.configForStep(ctx, lggr, step)
config, err := e.configForStep(ctx, lggr, curStep)
if err != nil {
return nil, nil, err
}
Expand All @@ -942,7 +960,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
WorkflowID: msg.state.WorkflowID,
WorkflowExecutionID: msg.state.ExecutionID,
WorkflowOwner: e.workflow.owner,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
ReferenceID: msg.stepRef,
Expand All @@ -952,9 +970,10 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
stepCtx, cancel := context.WithTimeout(ctx, stepTimeoutDuration)
defer cancel()

e.metrics.incrementCapabilityInvocationCounter(stepCtx)
output, err := step.capability.Execute(stepCtx, tr)
e.metrics.with(platform.KeyCapabilityID, curStep.ID).incrementCapabilityInvocationCounter(ctx)
output, err := curStep.capability.Execute(stepCtx, tr)
if err != nil {
e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, curStep.ID).incrementCapabilityFailureCounter(ctx)
return inputsMap, nil, err
}

Expand All @@ -967,7 +986,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr
WorkflowID: e.workflow.id,
WorkflowDonID: e.localNode.WorkflowDON.ID,
WorkflowDonConfigVersion: e.localNode.WorkflowDON.ConfigVersion,
WorkflowName: e.workflow.name,
WorkflowName: e.workflow.hexName,
WorkflowOwner: e.workflow.owner,
ReferenceID: t.Ref,
},
Expand Down Expand Up @@ -1074,6 +1093,7 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

// heartbeat runs by default every defaultHeartbeatCadence minutes
func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

Expand All @@ -1087,6 +1107,7 @@ func (e *Engine) heartbeat(ctx context.Context) {
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
Expand Down Expand Up @@ -1153,6 +1174,7 @@ func (e *Engine) Close() error {
return err
}
logCustMsg(ctx, e.cma, "workflow unregistered", e.logger)
e.metrics.incrementWorkflowUnregisteredCounter(ctx)
return nil
})
}
Expand Down Expand Up @@ -1249,6 +1271,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {
// - that the resulting graph is strongly connected (i.e. no disjointed subgraphs exist)
// - etc.

// spin up monitoring resources
em, err := initMonitoringResources()
if err != nil {
return nil, fmt.Errorf("could not initialize monitoring resources: %w", err)
}

cma := custmsg.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)
workflow, err := Parse(cfg.Workflow)
if err != nil {
Expand All @@ -1258,12 +1286,12 @@ func NewEngine(ctx context.Context, cfg Config) (engine *Engine, err error) {

workflow.id = cfg.WorkflowID
workflow.owner = cfg.WorkflowOwner
workflow.name = hex.EncodeToString([]byte(cfg.WorkflowName))
workflow.hexName = hex.EncodeToString([]byte(cfg.WorkflowName))

engine = &Engine{
cma: cma,
logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID),
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName)},
metrics: workflowsMetricLabeler{metrics.NewLabeler().With(platform.KeyWorkflowID, cfg.WorkflowID, platform.KeyWorkflowOwner, cfg.WorkflowOwner, platform.KeyWorkflowName, cfg.WorkflowName), *em},
registry: cfg.Registry,
workflow: workflow,
secretsFetcher: cfg.SecretsFetcher,
Expand Down
6 changes: 3 additions & 3 deletions core/services/workflows/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
// treated differently due to their nature of being the starting
// point of a workflow.
type workflow struct {
id string
owner string
name string
id string
owner string
hexName string
graph.Graph[string, *step]

triggers []*triggerCapability
Expand Down
Loading

0 comments on commit 2902b91

Please sign in to comment.