From 0a64554ce75ddb7b2e154a98563e85cb06446e8b Mon Sep 17 00:00:00 2001 From: Rob Best Date: Wed, 19 May 2021 09:30:15 +0100 Subject: [PATCH] metrics: add requeue metric (#27) Track the number of items that have been requeued but not processed yet. If this is value is > 0 for more than a few minutes then it indicates an issue reconciling objects. --- metrics/queue.go | 13 +++++++++++++ queue.go | 42 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/metrics/queue.go b/metrics/queue.go index c8a7c5b..70548cc 100644 --- a/metrics/queue.go +++ b/metrics/queue.go @@ -55,6 +55,13 @@ var ( }, []string{"name"}, ) + queueRequeued = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "semaphore_service_mirror_queue_requeued_items", + Help: "Items that have been requeued but not reconciled yet, by queue name", + }, + []string{"name"}, + ) ) func init() { @@ -66,10 +73,16 @@ func init() { queueUnfinishedWork, queueLongestRunningProcessor, queueRetries, + queueRequeued, ) workqueue.SetProvider(&workqueueProvider{}) } +// SetRequeued updates the number of requeued items +func SetRequeued(name string, val float64) { + queueRequeued.With(prometheus.Labels{"name": name}).Set(val) +} + // workqueueProvider implements workqueue.MetricsProvider type workqueueProvider struct{} diff --git a/queue.go b/queue.go index 41a91f8..e6c4aaf 100644 --- a/queue.go +++ b/queue.go @@ -2,6 +2,7 @@ package main import ( "github.com/utilitywarehouse/semaphore-service-mirror/log" + "github.com/utilitywarehouse/semaphore-service-mirror/metrics" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) @@ -15,6 +16,7 @@ type queue struct { name string reconcileFunc queueReconcileFunc queue workqueue.RateLimitingInterface + requeued []string } // newQueue returns a new queue @@ -39,7 +41,9 @@ func (q *queue) Add(obj interface{}) { // Run processes items from the queue as they're added func (q *queue) Run() { + q.updateMetrics() for q.processItem() { + q.updateMetrics() } } @@ -65,7 +69,7 @@ func (q *queue) processItem() bool { "key", key.(string), "err", err, ) - q.queue.Forget(key) + q.forget(key) return true } @@ -83,7 +87,7 @@ func (q *queue) processItem() bool { "name", name, "err", err, ) - q.queue.AddRateLimited(key) + q.requeue(key) log.Logger.Info( "requeued item", "queue", q.name, @@ -97,8 +101,40 @@ func (q *queue) processItem() bool { "namespace", namespace, "name", name, ) - q.queue.Forget(key) + q.forget(key) } return true } + +func (q *queue) requeue(key interface{}) { + q.queue.AddRateLimited(key) + q.addRequeued(key.(string)) +} + +func (q *queue) forget(key interface{}) { + q.queue.Forget(key) + q.removeRequeued(key.(string)) +} + +func (q *queue) addRequeued(key string) { + for _, k := range q.requeued { + if k == key { + return + } + } + q.requeued = append(q.requeued, key) +} + +func (q *queue) removeRequeued(key string) { + for i, k := range q.requeued { + if k == key { + q.requeued = append(q.requeued[:i], q.requeued[i+1:]...) + break + } + } +} + +func (q *queue) updateMetrics() { + metrics.SetRequeued(q.name, float64(len(q.requeued))) +}