Skip to content

Commit

Permalink
metrics: add requeue metric (#27)
Browse files Browse the repository at this point in the history
Track the number of items that have been requeued but not processed yet.
If this is value is > 0 for more than a few minutes then it indicates an
issue reconciling objects.
  • Loading branch information
ribbybibby authored May 19, 2021
1 parent 8d50263 commit 0a64554
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
13 changes: 13 additions & 0 deletions metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ var (
},
[]string{"name"},
)
queueRequeued = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "semaphore_service_mirror_queue_requeued_items",
Help: "Items that have been requeued but not reconciled yet, by queue name",
},
[]string{"name"},
)
)

func init() {
Expand All @@ -66,10 +73,16 @@ func init() {
queueUnfinishedWork,
queueLongestRunningProcessor,
queueRetries,
queueRequeued,
)
workqueue.SetProvider(&workqueueProvider{})
}

// SetRequeued updates the number of requeued items
func SetRequeued(name string, val float64) {
queueRequeued.With(prometheus.Labels{"name": name}).Set(val)
}

// workqueueProvider implements workqueue.MetricsProvider
type workqueueProvider struct{}

Expand Down
42 changes: 39 additions & 3 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"github.com/utilitywarehouse/semaphore-service-mirror/log"
"github.com/utilitywarehouse/semaphore-service-mirror/metrics"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
Expand All @@ -15,6 +16,7 @@ type queue struct {
name string
reconcileFunc queueReconcileFunc
queue workqueue.RateLimitingInterface
requeued []string
}

// newQueue returns a new queue
Expand All @@ -39,7 +41,9 @@ func (q *queue) Add(obj interface{}) {

// Run processes items from the queue as they're added
func (q *queue) Run() {
q.updateMetrics()
for q.processItem() {
q.updateMetrics()
}
}

Expand All @@ -65,7 +69,7 @@ func (q *queue) processItem() bool {
"key", key.(string),
"err", err,
)
q.queue.Forget(key)
q.forget(key)
return true
}

Expand All @@ -83,7 +87,7 @@ func (q *queue) processItem() bool {
"name", name,
"err", err,
)
q.queue.AddRateLimited(key)
q.requeue(key)
log.Logger.Info(
"requeued item",
"queue", q.name,
Expand All @@ -97,8 +101,40 @@ func (q *queue) processItem() bool {
"namespace", namespace,
"name", name,
)
q.queue.Forget(key)
q.forget(key)
}

return true
}

func (q *queue) requeue(key interface{}) {
q.queue.AddRateLimited(key)
q.addRequeued(key.(string))
}

func (q *queue) forget(key interface{}) {
q.queue.Forget(key)
q.removeRequeued(key.(string))
}

func (q *queue) addRequeued(key string) {
for _, k := range q.requeued {
if k == key {
return
}
}
q.requeued = append(q.requeued, key)
}

func (q *queue) removeRequeued(key string) {
for i, k := range q.requeued {
if k == key {
q.requeued = append(q.requeued[:i], q.requeued[i+1:]...)
break
}
}
}

func (q *queue) updateMetrics() {
metrics.SetRequeued(q.name, float64(len(q.requeued)))
}

0 comments on commit 0a64554

Please sign in to comment.