Skip to content

Commit

Permalink
metrics revamp (#7)
Browse files Browse the repository at this point in the history
Co-authored-by: Sasidharan Gopal <[email protected]>
  • Loading branch information
Sasidharan3094 and Sasidharan-Gopal authored Dec 19, 2024
1 parent 1c07081 commit 2e29576
Showing 1 changed file with 93 additions and 90 deletions.
183 changes: 93 additions & 90 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@ const (
metricsGCIntervalMinutes = 5
)

func init() {
go removeStaleMetrics()
}

// variables for setting various indicator labels
// Constants for metric labels
const (
SUCCESS = "SUCCESS"
FAIL = "FAIL"
Expand All @@ -41,18 +37,21 @@ const (
MISC = "MISC_ERROR"
SENTINEL_NUMBER_IN_MEMORY_MISMATCH = "SENTINEL_NUMBER_IN_MEMORY_MISMATCH"
REDIS_SLAVES_NUMBER_IN_MEMORY_MISMATCH = "REDIS_SLAVES_NUMBER_IN_MEMORY_MISMATCH"
// redis connection related errors

// Redis connection related errors
WRONG_PASSWORD_USED = "WRONG_PASSWORD_USED"
NOAUTH = "AUTH_CREDENTIALS_NOT_PROVIDED"
NOPERM = "REDIS_USER_DOES_NOT_HAVE_PERMISSIONS"
IO_TIMEOUT = "CONNECTION_TIMEDOUT"
CONNECTION_REFUSED = "CONNECTION_REFUSED"

// Kubernetes related errors
K8S_FORBIDDEN_ERR = "USER_FORBIDDEN_TO_PERFORM_ACTION"
K8S_UNAUTH = "CLIENT_NOT_AUTHORISED"
K8S_MISC = "MISC_ERROR_CHECK_LOGS"
K8S_NOT_FOUND = "RESOURCE_NOT_FOUND"

// Operation types
KIND_REDIS = "REDIS"
KIND_SENTINEL = "SENTINEL"
APPLY_REDIS_CONFIG = "APPLY_REDIS_CONFIG"
Expand All @@ -71,14 +70,22 @@ const (
SLAVE_IS_READY = "CHECK_IF_SLAVE_IS_READY"
)

var ( // used for grabage collection of metrics
mutex sync.Mutex
recorders = []recorder{}
instanceMetricLastUpdated = map[string]time.Time{}
resourceMetricLastUpdated = map[string]time.Time{}
)
// MetricsTracker handles thread-safe tracking of metric updates
type MetricsTracker struct {
mu sync.RWMutex
resourceMetrics map[string]time.Time
instanceMetrics map[string]time.Time
}

// Instrumenter is the interface that will collect the metrics and has ability to send/expose those metrics.
// NewMetricsTracker creates a new MetricsTracker
func NewMetricsTracker() *MetricsTracker {
return &MetricsTracker{
resourceMetrics: make(map[string]time.Time),
instanceMetrics: make(map[string]time.Time),
}
}

// Recorder interface defines methods for metric collection
type Recorder interface {
koopercontroller.MetricsRecorder

Expand All @@ -100,6 +107,7 @@ type Recorder interface {
// PromMetrics implements the instrumenter so the metrics can be managed by Prometheus.
type recorder struct {
// Metrics fields.
metricsTracker *MetricsTracker
clusterOK *prometheus.GaugeVec // clusterOk is the status of a cluster
ensureResource *prometheus.CounterVec // number of successful "ensure" operators performed by the controller.
redisCheck *prometheus.CounterVec // indicates any error encountered in managed redis instance(s)
Expand All @@ -111,7 +119,8 @@ type recorder struct {

// NewPrometheusMetrics returns a new PromMetrics object.
func NewRecorder(namespace string, reg prometheus.Registerer) Recorder {
// Create metrics.
metricsTracker := NewMetricsTracker()

clusterOK := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: promControllerSubsystem,
Expand Down Expand Up @@ -157,7 +166,8 @@ func NewRecorder(namespace string, reg prometheus.Registerer) Recorder {
}, []string{"namespace", "kind", "name", "operation", "status", "err"})

// Create the instance.
r := recorder{
r := &recorder{
metricsTracker: metricsTracker,
clusterOK: clusterOK,
ensureResource: ensureResource,
redisCheck: redisCheck,
Expand All @@ -178,101 +188,73 @@ func NewRecorder(namespace string, reg prometheus.Registerer) Recorder {
r.k8sServiceOperations,
r.redisOperations,
)
recorders = append(recorders, r)

// Start the metrics garbage collector
go r.removeStaleMetrics()

return r
}

// SetClusterOK set the cluster status to OK
func (r recorder) SetClusterOK(namespace string, name string) {
func (r *recorder) SetClusterOK(namespace string, name string) {
r.clusterOK.WithLabelValues(namespace, name).Set(1)
}

// SetClusterError set the cluster status to Error
func (r recorder) SetClusterError(namespace string, name string) {
func (r *recorder) SetClusterError(namespace string, name string) {
r.clusterOK.WithLabelValues(namespace, name).Set(0)
}

// DeleteCluster set the cluster status to Error
func (r recorder) DeleteCluster(namespace string, name string) {
func (r *recorder) DeleteCluster(namespace string, name string) {
r.clusterOK.DeleteLabelValues(namespace, name)
}

func (r recorder) RecordEnsureOperation(objectNamespace string, objectName string, objectKind string, resourceName string, status string) {
func (r *recorder) RecordEnsureOperation(objectNamespace string, objectName string, objectKind string, resourceName string, status string) {
r.ensureResource.WithLabelValues(objectNamespace, objectName, objectKind, resourceName, status).Add(1)
updateResourceMetricLastUpdatedTracker(objectNamespace, objectKind, objectName)
r.metricsTracker.UpdateResourceMetric(objectNamespace, objectKind, objectName)
}

func (r recorder) RecordRedisCheck(namespace string, resource string, indicator /* aspect of redis that is unhealthy */ string, instance string, status string) {
func (r *recorder) RecordRedisCheck(namespace string, resource string, indicator /* aspect of redis that is unhealthy */ string, instance string, status string) {
r.redisCheck.WithLabelValues(namespace, resource, indicator, instance, status).Add(1)
updateResourceMetricLastUpdatedTracker(namespace, "redisfailover", resource)
r.metricsTracker.UpdateResourceMetric(namespace, "redisfailover", resource)
}

func (r recorder) RecordSentinelCheck(namespace string, resource string, indicator /* aspect of sentinel that is unhealthy */ string, instance string, status string) {
func (r *recorder) RecordSentinelCheck(namespace string, resource string, indicator /* aspect of sentinel that is unhealthy */ string, instance string, status string) {
r.sentinelCheck.WithLabelValues(namespace, resource, indicator, instance, status).Add(1)
updateResourceMetricLastUpdatedTracker(namespace, "redisfailover", resource)
r.metricsTracker.UpdateResourceMetric(namespace, "redisfailover", resource)
}

func (r recorder) RecordK8sOperation(namespace string, kind string, name string, operation string, status string, err string) {
func (r *recorder) RecordK8sOperation(namespace string, kind string, name string, operation string, status string, err string) {
r.k8sServiceOperations.WithLabelValues(namespace, kind, name, operation, status, err).Add(1)
updateResourceMetricLastUpdatedTracker(namespace, kind, name)
r.metricsTracker.UpdateResourceMetric(namespace, kind, name)
}

func (r recorder) RecordRedisOperation(kind /*redis/sentinel? */ string, IP string, operation string, status string, err string) {
func (r *recorder) RecordRedisOperation(kind /*redis/sentinel? */ string, IP string, operation string, status string, err string) {
r.redisOperations.WithLabelValues(kind, IP, operation, status, err).Add(1)
updateInstanceMetricLastUpdatedTracker(IP)
r.metricsTracker.UpdateInstanceMetric(IP)
}

func updateResourceMetricLastUpdatedTracker(namespace string, kind string, name string) {
mutex.Lock()
resourceMetricLastUpdated[fmt.Sprintf("%v/%v/%v", namespace, kind, name)] = time.Now()
mutex.Unlock()
// MetricsTracker methods
func (mt *MetricsTracker) UpdateResourceMetric(namespace, kind, name string) {
key := fmt.Sprintf("%v/%v/%v", namespace, kind, name)
mt.mu.Lock()
mt.resourceMetrics[key] = time.Now()
mt.mu.Unlock()
}

func updateInstanceMetricLastUpdatedTracker(IP string) {
mutex.Lock()
instanceMetricLastUpdated[IP] = time.Now()
mutex.Unlock()
func (mt *MetricsTracker) UpdateInstanceMetric(IP string) {
mt.mu.Lock()
mt.instanceMetrics[IP] = time.Now()
mt.mu.Unlock()
}

// Garbage collection
func removeStaleMetrics() {
// Runs every `metricsGCIntervalMinutes`. It keeps track of recently updated metrics
// And every metric that was not updated after `metricsGCIntervalMinutes` gets deleted
for {
metricsDeletedCount := 0
kubernetesResourceBasedLabels, customResourceBasedLabels, ipBasedLabels := getLabelsOfStaleMetrics()
for _, recorder := range recorders {
for _, label := range kubernetesResourceBasedLabels {
metricsDeletedCount += recorder.ensureResource.DeletePartialMatch(label)
metricsDeletedCount += recorder.k8sServiceOperations.DeletePartialMatch(label)
}
for _, label := range customResourceBasedLabels {
metricsDeletedCount += recorder.redisCheck.DeletePartialMatch(label)
metricsDeletedCount += recorder.sentinelCheck.DeletePartialMatch(label)
labelWithName := label
labelWithName["name"] = labelWithName["resource"]
delete(labelWithName, "resource")
metricsDeletedCount += recorder.clusterOK.DeletePartialMatch(label)
}
for _, label := range ipBasedLabels {
metricsDeletedCount += recorder.redisOperations.DeletePartialMatch(label)
}
}
log.Debugf("delete %v stale metrics", metricsDeletedCount)
time.Sleep(metricsGCIntervalMinutes * time.Minute)
}
}

func getLabelsOfStaleMetrics() (kubernetesResourceBasedLabels []prometheus.Labels, customResourceBasedLabels []prometheus.Labels, ipBasedLabels []prometheus.Labels) {
func (mt *MetricsTracker) GetStaleMetrics(gcInterval time.Duration) ([]prometheus.Labels, []prometheus.Labels, []prometheus.Labels) {
var kubernetesResourceBasedLabels, customResourceBasedLabels, ipBasedLabels []prometheus.Labels
staleTime := time.Now().Add(-gcInterval)

kubernetesResourceBasedLabels = []prometheus.Labels{}
customResourceBasedLabels = []prometheus.Labels{}
ipBasedLabels = []prometheus.Labels{}
mt.mu.Lock()
defer mt.mu.Unlock()

for key, value := range resourceMetricLastUpdated {
// if the key is stale
if value.Before(time.Now().Add(-metricsGCIntervalMinutes * time.Minute)) {
// extract keys and create labels
for key, lastUpdate := range mt.resourceMetrics {
if lastUpdate.Before(staleTime) {
ids := strings.Split(key, "/")
namespace := ids[0]
kind := ids[1]
Expand All @@ -290,27 +272,48 @@ func getLabelsOfStaleMetrics() (kubernetesResourceBasedLabels []prometheus.Label
"resource": resource,
},
)
// once we have created labels out of the contents of the key,
// its not longer required - since it is known to be stale. remove it from the tracker.
mutex.Lock()
delete(resourceMetricLastUpdated, key)
mutex.Unlock()
delete(mt.resourceMetrics, key)
}
}
for IP, value := range instanceMetricLastUpdated {
if value.Before(time.Now().Add(-metricsGCIntervalMinutes * time.Minute)) {

for IP, lastUpdate := range mt.instanceMetrics {
if lastUpdate.Before(staleTime) {
ipBasedLabels = append(ipBasedLabels,
prometheus.Labels{
"IP": IP,
},
)
// once we have created labels out of the contents of the key,
// its not longer required - since it is known to be stale. remove it from the tracker.
mutex.Lock()
delete(instanceMetricLastUpdated, IP)
mutex.Unlock()
delete(mt.instanceMetrics, IP)
}

}

return kubernetesResourceBasedLabels, customResourceBasedLabels, ipBasedLabels
}

// Garbage collection routine
func (r *recorder) removeStaleMetrics() {
for {
k, c, i := r.metricsTracker.GetStaleMetrics(metricsGCIntervalMinutes * time.Minute)
metricsDeletedCount := 0

for _, label := range k {
metricsDeletedCount += r.ensureResource.DeletePartialMatch(label)
metricsDeletedCount += r.k8sServiceOperations.DeletePartialMatch(label)
}
for _, label := range c {
metricsDeletedCount += r.redisCheck.DeletePartialMatch(label)
metricsDeletedCount += r.sentinelCheck.DeletePartialMatch(label)
labelWithName := prometheus.Labels{
"namespace": label["namespace"],
"name": label["resource"],
}
metricsDeletedCount += r.clusterOK.DeletePartialMatch(labelWithName)
}
for _, label := range i {
metricsDeletedCount += r.redisOperations.DeletePartialMatch(label)
}

log.Debugf("deleted %v stale metrics", metricsDeletedCount)
time.Sleep(metricsGCIntervalMinutes * time.Minute)
}
}

0 comments on commit 2e29576

Please sign in to comment.