diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 8e7dff2d6b8..53ba5e3f82f 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -29,7 +29,6 @@ import ( "github.com/urfave/cli" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/multierr" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" @@ -54,6 +53,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/store/migrate" @@ -101,35 +101,6 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme attributes = append(attributes, attribute.String(k, v)) } - // note: due to the OTEL specification, all histogram buckets - // must be defined when the beholder client is created - globalMetricViews := []metric.View{ - metric.NewView( - metric.Instrument{Name: "platform_engine_workflow_earlyexit_time_seconds"}, - metric.Stream{Aggregation: metric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{0, 1, 10, 100}, - }}, - ), - metric.NewView( - metric.Instrument{Name: "platform_engine_workflow_completed_time_seconds"}, - metric.Stream{Aggregation: metric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{0, 100, 1000, 10_000, 50_000, 100_0000, 500_000}, - }}, - ), - metric.NewView( - metric.Instrument{Name: "platform_engine_workflow_error_time_seconds"}, - metric.Stream{Aggregation: metric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{0, 20, 60, 120, 240}, - }}, - ), - metric.NewView( - metric.Instrument{Name: "platform_engine_workflow_step_time_seconds"}, - metric.Stream{Aggregation: metric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{0, 20, 60, 120, 240}, - }}, - ), - } - clientCfg := beholder.Config{ InsecureConnection: cfgTelemetry.InsecureConnection(), CACertFile: cfgTelemetry.CACertFile(), @@ -138,10 +109,13 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme TraceSampleRatio: cfgTelemetry.TraceSampleRatio(), EmitterBatchProcessor: cfgTelemetry.EmitterBatchProcessor(), EmitterExportTimeout: cfgTelemetry.EmitterExportTimeout(), - MetricViews: globalMetricViews, AuthPublicKeyHex: csaPubKeyHex, AuthHeaders: beholderAuthHeaders, } + // note: due to the OTEL specification, all histogram buckets + // must be defined when the beholder client is created + clientCfg.MetricViews = append(clientCfg.MetricViews, workflows.MetricViews()...) + if tracingCfg.Enabled { clientCfg.TraceSpanExporter, err = tracingCfg.NewSpanExporter() if err != nil { diff --git a/core/services/workflows/monitoring.go b/core/services/workflows/monitoring.go index 205ce529c28..8457dadeb60 100644 --- a/core/services/workflows/monitoring.go +++ b/core/services/workflows/monitoring.go @@ -5,6 +5,7 @@ import ( "fmt" "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/metrics" @@ -135,6 +136,37 @@ func initMonitoringResources() (em *engineMetrics, err error) { return em, nil } +// Note: due to the OTEL specification, all histogram buckets +// Must be defined when the beholder client is created +func MetricViews() []sdkmetric.View { + return []sdkmetric.View{ + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_earlyexit_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 1, 10, 100}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_completed_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 100, 1000, 10_000, 50_000, 100_0000, 500_000}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_error_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + sdkmetric.NewView( + sdkmetric.Instrument{Name: "platform_engine_workflow_step_time_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 20, 60, 120, 240}, + }}, + ), + } +} + // workflowsMetricLabeler wraps monitoring.MetricsLabeler to provide workflow specific utilities // for monitoring resources type workflowsMetricLabeler struct {