From 7f2db321c5d8878a93325dfebb6cfed3f9c11a28 Mon Sep 17 00:00:00 2001 From: Oliver Townsend Date: Mon, 22 Jan 2024 20:45:03 -0800 Subject: [PATCH] Add metric for num logs in buffer --- .../evmregistry/v21/logprovider/buffer.go | 4 ++++ .../evmregistry/v21/logprovider/provider.go | 18 ++++++++++++++++++ .../evmregistry/v21/prommetrics/metrics.go | 18 ++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go index 9f11a1fca01..2857fcca712 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/buffer.go @@ -150,6 +150,8 @@ type logEventBuffer struct { blocks []fetchedBlock // latestBlock is the latest block number seen latestBlock int64 + // logsInBuffer is the number of logs currently in the buffer + logsInBuffer int64 } func newLogEventBuffer(lggr logger.Logger, size, numOfLogUpkeeps, fastExecLogsHigh int) *logEventBuffer { @@ -230,6 +232,7 @@ func (b *logEventBuffer) enqueue(id *big.Int, logs ...logpoller.Log) int { } if added > 0 { lggr.Debugw("Added logs to buffer", "addedLogs", added, "dropped", dropped, "latestBlock", latestBlock) + atomic.AddInt64(&b.logsInBuffer, int64(added-dropped)) } return added - dropped @@ -331,6 +334,7 @@ func (b *logEventBuffer) dequeueRange(start, end int64, upkeepLimit, totalLimit if len(results) > 0 { b.lggr.Debugw("Dequeued logs", "results", len(results), "start", start, "end", end) + atomic.AddInt64(&b.logsInBuffer, -int64(len(results))) } return results diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index d1360faaf6d..45a47ec7e85 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/automation_utils_2_1" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/core" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/utils" ) @@ -102,6 +103,8 @@ type logEventProvider struct { opts LogTriggersOptions currentPartitionIdx uint64 + + servedLogs int64 } func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { @@ -142,6 +145,21 @@ func (p *logEventProvider) Start(context.Context) error { }) }) + p.threadCtrl.Go(func(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + p.lggr.Debugw("logs stats", "servedLogs", atomic.LoadInt64(&p.servedLogs), "logsInBuffer", atomic.LoadInt64(&p.buffer.logsInBuffer), "latestBlockSeen", p.buffer.latestBlockSeen()) + prommetrics.AutomationLogsInLogBuffer.Set(float64(atomic.LoadInt64(&p.buffer.logsInBuffer))) + case <-ctx.Done(): + return + + } + } + }) + return nil }) } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go new file mode 100644 index 00000000000..5ea40718923 --- /dev/null +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/prommetrics/metrics.go @@ -0,0 +1,18 @@ +package prommetrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// AutomationNamespace is the namespace for all Automation related metrics +const AutomationNamespace = "automation" + +// Automation metrics +var ( + AutomationLogsInLogBuffer = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: AutomationNamespace, + Name: "num_logs_in_log_buffer", + Help: "The total number of logs currently being stored in the log buffer", + }) +)