Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beholder metrics on workflow engine #15238

Merged
merged 14 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func (e *Engine) Start(_ context.Context) error {
if err != nil {
return fmt.Errorf("could not initialize monitoring resources: %w", err)
}
e.metrics.incrementWorkflowInitializationCounter(ctx)

e.wg.Add(e.maxWorkerLimit)
for i := 0; i < e.maxWorkerLimit; i++ {
Expand Down Expand Up @@ -351,6 +352,7 @@ func (e *Engine) init(ctx context.Context) {

e.logger.Info("engine initialized")
logCustMsg(ctx, e.cma, "workflow registered", e.logger)
e.metrics.incrementWorkflowRegisteredCounter(ctx)
e.afterInit(true)
}

Expand Down Expand Up @@ -687,6 +689,7 @@ func (e *Engine) finishExecution(ctx context.Context, executionID string, status
e.stepUpdatesChMap.remove(executionID)
metrics.updateTotalWorkflowsGauge(ctx, e.stepUpdatesChMap.len())
metrics.updateWorkflowExecutionLatencyGauge(ctx, executionDuration)
metrics.incrementWorkflowExecutionFinishedCounter(ctx) // maybe just decide on `success` vs `failure` here based on status
e.onExecutionFinished(executionID)
return nil
}
Expand Down Expand Up @@ -730,6 +733,7 @@ func (e *Engine) worker(ctx context.Context) {
if err != nil {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Errorf("failed to start execution: %v", err)
logCustMsg(ctx, cma, fmt.Sprintf("failed to start execution: %s", err), e.logger)
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
e.metrics.with(platform.KeyTriggerID, te.ID).incrementTriggerWorkflowStarterErrorCounter(ctx)
} else {
e.logger.With(platform.KeyWorkflowExecutionID, executionID).Debug("execution started")
logCustMsg(ctx, cma, "execution started", e.logger)
Expand All @@ -755,6 +759,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {

// TODO ks-462 inputs
logCustMsg(ctx, cma, "executing step", l)
e.metrics.with(platform.KeyStepRef, msg.stepRef).incrementWorkflowStepStartedCounter(ctx)

stepCtx, cancel := context.WithTimeout(ctx, e.stepTimeoutDuration)
defer cancel()
Expand All @@ -779,6 +784,7 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) {
logCustMsg(ctx, cma, lmsg, l)
stepStatus = store.StatusCompleted
}
e.metrics.with(platform.KeyStepRef, msg.stepRef, "status", stepStatus).incrementWorkflowStepFinishedCounter(ctx)
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved

stepState.Status = stepStatus
stepState.Outputs.Value = outputs
Expand Down Expand Up @@ -924,6 +930,7 @@ func (e *Engine) executeStep(ctx context.Context, lggr logger.Logger, msg stepRe
e.metrics.incrementCapabilityInvocationCounter(ctx)
output, err := step.capability.Execute(ctx, tr)
if err != nil {
e.metrics.with(platform.KeyStepRef, msg.stepRef, platform.KeyCapabilityID, step.ID).incrementCapabilityFailureCounter(ctx)
return inputsMap, nil, err
}

Expand Down Expand Up @@ -1121,6 +1128,7 @@ func (e *Engine) Close() error {
return err
}
logCustMsg(ctx, e.cma, "workflow unregistered", e.logger)
e.metrics.incrementWorkflowUnregisteredCounter(ctx)
return nil
})
}
Expand Down
88 changes: 88 additions & 0 deletions core/services/workflows/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ import (
)

var registerTriggerFailureCounter metric.Int64Counter
var triggerWorkflowStarterErrorCounter metric.Int64Counter
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
var workflowsRunningGauge metric.Int64Gauge
var capabilityInvocationCounter metric.Int64Counter
var capabilityFailureCounter metric.Int64Counter
var workflowRegisteredCounter metric.Int64Counter
var workflowUnregisteredCounter metric.Int64Counter
var workflowExecutionFinishedCounter metric.Int64Counter
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
var workflowExecutionLatencyGauge metric.Int64Gauge // ms
var workflowStepErrorCounter metric.Int64Counter
vyzaldysanchez marked this conversation as resolved.
Show resolved Hide resolved
var workflowStepStartedCounter metric.Int64Counter
var workflowStepFinishedCounter metric.Int64Counter
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
var workflowInitializationCounter metric.Int64Counter
patrickhuie19 marked this conversation as resolved.
Show resolved Hide resolved
var engineHeartbeatCounter metric.Int64UpDownCounter

func initMonitoringResources() (err error) {
Expand All @@ -25,6 +33,11 @@ func initMonitoringResources() (err error) {
return fmt.Errorf("failed to register trigger failure counter: %w", err)
}

triggerWorkflowStarterErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_triggerworkflow_starter_errors")
if err != nil {
return fmt.Errorf("failed to register trigger workflow starter error counter: %w", err)
}

workflowsRunningGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_count")
if err != nil {
return fmt.Errorf("failed to register workflows running gauge: %w", err)
Expand All @@ -35,11 +48,46 @@ func initMonitoringResources() (err error) {
return fmt.Errorf("failed to register capability invocation counter: %w", err)
}

capabilityFailureCounter, err = beholder.GetMeter().Int64Counter("platform_engine_capabilities_failures")
if err != nil {
return fmt.Errorf("failed to register capability failure counter: %w", err)
}

workflowRegisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_registered_count")
if err != nil {
return fmt.Errorf("failed to register workflow registered counter: %w", err)
}

workflowUnregisteredCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_unregistered_count")
if err != nil {
return fmt.Errorf("failed to register workflow unregistered counter: %w", err)
}

workflowExecutionFinishedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_execution_finished_count")
if err != nil {
return fmt.Errorf("failed to register workflow execution finished counter: %w", err)
}

workflowExecutionLatencyGauge, err = beholder.GetMeter().Int64Gauge("platform_engine_workflow_time")
if err != nil {
return fmt.Errorf("failed to register workflow execution latency gauge: %w", err)
}

workflowStepStartedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_steps_started")
if err != nil {
return fmt.Errorf("failed to register workflow step started counter: %w", err)
}

workflowStepFinishedCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_steps_finished")
if err != nil {
return fmt.Errorf("failed to register workflow step finished counter: %w", err)
}

workflowInitializationCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_initializations")
if err != nil {
return fmt.Errorf("failed to register workflow initialization counter: %w", err)
}

workflowStepErrorCounter, err = beholder.GetMeter().Int64Counter("platform_engine_workflow_errors")
if err != nil {
return fmt.Errorf("failed to register workflow step error counter: %w", err)
Expand Down Expand Up @@ -68,6 +116,11 @@ func (c workflowsMetricLabeler) incrementRegisterTriggerFailureCounter(ctx conte
registerTriggerFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementTriggerWorkflowStarterErrorCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
triggerWorkflowStarterErrorCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementCapabilityInvocationCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
capabilityInvocationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
Expand All @@ -92,3 +145,38 @@ func (c workflowsMetricLabeler) incrementEngineHeartbeatCounter(ctx context.Cont
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
engineHeartbeatCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementCapabilityFailureCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
capabilityFailureCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowRegisteredCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowRegisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowUnregisteredCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowUnregisteredCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowExecutionFinishedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowExecutionFinishedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowStepStartedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowStepStartedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowStepFinishedCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowStepFinishedCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c workflowsMetricLabeler) incrementWorkflowInitializationCounter(ctx context.Context) {
otelLabels := localMonitoring.KvMapToOtelAttributes(c.Labels)
workflowInitializationCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}
Loading