Skip to content

Commit

Permalink
metrics: execution duration histograms by status and removing now red…
Browse files Browse the repository at this point in the history
…undant instrumentation
  • Loading branch information
patrickhuie19 committed Nov 21, 2024
1 parent 59bc051 commit 02e3c00
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 53 deletions.
32 changes: 20 additions & 12 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ func (e *Engine) queueIfReady(state store.WorkflowExecution, step *step) {

func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter, executionID string, status string) error {
l := e.logger.With(platform.KeyWorkflowExecutionID, executionID, "status", status)
metrics := e.metrics.with("status", status)

l.Info("finishing execution")

Expand All @@ -689,19 +688,28 @@ func (e *Engine) finishExecution(ctx context.Context, cma custmsg.MessageEmitter
return err
}

executionDuration := execState.FinishedAt.Sub(*execState.CreatedAt).Milliseconds()

e.stepUpdatesChMap.remove(executionID)
metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration)
metrics.incrementWorkflowExecutionFinishedCounter(ctx)

executionDuration := int64(execState.FinishedAt.Sub(*execState.CreatedAt).Seconds())
switch status {
case store.StatusCompleted:
e.metrics.updateWorkflowCompletedDurationHistogram(ctx, executionDuration)
case store.StatusCompletedEarlyExit:
e.metrics.updateWorkflowEarlyExitDurationHistogram(ctx, executionDuration)
case store.StatusErrored:
e.metrics.updateWorkflowErrorDurationHistogram(ctx, executionDuration)
case store.StatusTimeout:
// should expect the same values unless the timeout is adjusted.
// using histogram as it gives count of executions for free
e.metrics.updateWorkflowTimeoutDurationHistogram(ctx, executionDuration)
}

if executionDuration > fifteenMinutesMs {
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration), l)
l.Warnf("execution duration exceeded 15 minutes: %d (seconds)", executionDuration)
}
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d", executionDuration), l)
l.Infof("execution duration: %d", executionDuration)
logCustMsg(ctx, cma, fmt.Sprintf("execution duration: %d (seconds)", executionDuration), l)
l.Infof("execution duration: %d (seconds)", executionDuration)
e.onExecutionFinished(executionID)
return nil
}
Expand Down Expand Up @@ -771,7 +779,6 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {

// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)
e.metrics.with(platform.KeyStepRef, msg.stepRef).incrementWorkflowStepStartedCounter(ctx)

stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()
Expand All @@ -796,7 +803,6 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
logCustMsg(ctx, cma, lmsg, l)
stepStatus = store.StatusCompleted
}
e.metrics.with(platform.KeyStepRef, msg.stepRef, "status", stepStatus).incrementWorkflowStepFinishedCounter(ctx)

stepState.Status = stepStatus
stepState.Outputs.Value = outputs
Expand Down Expand Up @@ -1062,6 +1068,7 @@ func (e *Engine) isWorkflowFullyProcessed(ctx context.Context, state store.Workf
return workflowProcessed, store.StatusCompleted, nil
}

// heartbeat runs by default every defaultHeartbeatCadence minutes
func (e *Engine) heartbeat(ctx context.Context) {
defer e.wg.Done()

Expand All @@ -1075,6 +1082,7 @@ func (e *Engine) heartbeat(ctx context.Context) {
return
case <-ticker.C:
e.metrics.incrementEngineHeartbeatCounter(ctx)
e.metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
logCustMsg(ctx, e.cma, "engine heartbeat at: "+e.clock.Now().Format(time.RFC3339), e.logger)
}
}
Expand Down
105 changes: 64 additions & 41 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
"github.com/smartcontractkit/chainlink-common/pkg/metrics"

localMonitoring "github.com/smartcontractkit/chainlink/v2/core/monitoring"
monutils "github.com/smartcontractkit/chainlink/v2/core/monitoring"
)

var registerTriggerFailureCounter metric.Int64Counter
Expand All @@ -19,13 +19,14 @@ var capabilityInvocationCounter metric.Int64Counter
var capabilityFailureCounter metric.Int64Counter
var workflowRegisteredCounter metric.Int64Counter
var workflowUnregisteredCounter metric.Int64Counter
var workflowExecutionFinishedCounter metric.Int64Counter
var workflowExecutionLatencyGauge metric.Int64Gauge // ms
var workflowStepErrorCounter metric.Int64Counter
var workflowStepStartedCounter metric.Int64Counter
var workflowStepFinishedCounter metric.Int64Counter
var workflowInitializationCounter metric.Int64Counter
var engineHeartbeatCounter metric.Int64UpDownCounter
var engineHeartbeatCounter metric.Int64Counter
var workflowCompletedDurationSeconds metric.Int64Histogram
var workflowEarlyExitDurationSeconds metric.Int64Histogram
var workflowErrorDurationSeconds metric.Int64Histogram
var workflowTimeoutDurationSeconds metric.Int64Histogram

func initMonitoringResources() (err error) {
registerTriggerFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_registertrigger_failures")
Expand Down Expand Up @@ -63,39 +64,56 @@ func initMonitoringResources() (err error) {
return fmt.Errorf("failed to register workflow unregistered counter: %w", err)
}

workflowExecutionFinishedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_execution_finished_count")
workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_time")
if err != nil {
return fmt.Errorf("failed to register workflow execution finished counter: %w", err)
return fmt.Errorf("failed to register workflow execution latency gauge: %w", err)
}

workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_time")
workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations")
if err != nil {
return fmt.Errorf("failed to register workflow execution latency gauge: %w", err)
return fmt.Errorf("failed to register workflow initialization counter: %w", err)
}

workflowStepStartedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_steps_started")
workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors")
if err != nil {
return fmt.Errorf("failed to register workflow step started counter: %w", err)
return fmt.Errorf("failed to register workflow step error counter: %w", err)
}

workflowStepFinishedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_steps_finished")
engineHeartbeatCounter, err = beholder.GetMeter().Int64Counter("platform_engine_heartbeat")
if err != nil {
return fmt.Errorf("failed to register workflow step finished counter: %w", err)
return fmt.Errorf("failed to register engine heartbeat counter: %w", err)
}

workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations")
workflowCompletedDurationSeconds, err = beholder.GetMeter().Int64Histogram(
"platform_engine_workflow_completed_time_seconds",
metric.WithDescription("Distribution of completed execution latencies"),
metric.WithUnit("seconds"))
if err != nil {
return fmt.Errorf("failed to register workflow initialization counter: %w", err)
return fmt.Errorf("failed to register completed duration histogram: %w", err)
}

workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors")
workflowEarlyExitDurationSeconds, err = beholder.GetMeter().Int64Histogram(
"platform_engine_workflow_earlyexit_time_seconds",
metric.WithDescription("Distribution of earlyexit execution latencies"),
metric.WithUnit("seconds"))
if err != nil {
return fmt.Errorf("failed to register workflow step error counter: %w", err)
return fmt.Errorf("failed to register early exit duration histogram: %w", err)
}

engineHeartbeatCounter, err = beholder.GetMeter().Int64UpDownCounter("platform_engine_heartbeat")
workflowErrorDurationSeconds, err = beholder.GetMeter().Int64Histogram(
"platform_engine_workflow_error_time_seconds",
metric.WithDescription("Distribution of error execution latencies"),
metric.WithUnit("seconds"))
if err != nil {
return fmt.Errorf("failed to register engine heartbeat counter: %w", err)
return fmt.Errorf("failed to register error duration histogram: %w", err)
}

workflowTimeoutDurationSeconds, err = beholder.GetMeter().Int64Histogram(
"platform_engine_workflow_timeout_time_seconds",
metric.WithDescription("Distribution of timeout execution latencies"),
metric.WithUnit("seconds"))
if err != nil {
return fmt.Errorf("failed to register timeout duration histogram: %w", err)
}

return nil
Expand All @@ -112,71 +130,76 @@ func (c workflowsMetricLabeler) with(keyValues ...string) workflowsMetricLabeler
}

func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) updateWorkflowExecutionLatencyGauge(ctx context.Context, val int64) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowExecutionLatencyGauge.Record(ctx, val, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementTotalWorkflowStepErrorsCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowStepErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) updateTotalWorkflowsGauge(ctx context.Context, val int64) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowsRunningGauge.Record(ctx, val, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowExecutionFinishedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowExecutionFinishedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) {
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowStepStartedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowStepStartedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
func (c workflowsMetricLabeler) updateWorkflowCompletedDurationHistogram(ctx context.Context, duration int64) {
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowCompletedDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowStepFinishedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowStepFinishedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
func (c workflowsMetricLabeler) updateWorkflowEarlyExitDurationHistogram(ctx context.Context, duration int64) {
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowEarlyExitDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
func (c workflowsMetricLabeler) updateWorkflowErrorDurationHistogram(ctx context.Context, duration int64) {
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowErrorDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) updateWorkflowTimeoutDurationHistogram(ctx context.Context, duration int64) {
otelLabels := monutils.KvMapToOtelAttributes(c.Labels)
workflowTimeoutDurationSeconds.Record(ctx, duration, metric.WithAttributes(otelLabels...))
}

0 comments on commit 02e3c00

Please sign in to comment.