Skip to content

Commit

Permalink
propagate current redis cluster state to redisfailver crd
Browse files Browse the repository at this point in the history
  • Loading branch information
Furst Roman authored and Furst Roman committed Oct 6, 2023
1 parent 632aa3d commit 396a348
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 12 deletions.
2 changes: 2 additions & 0 deletions api/redisfailover/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
defaultExporterImage = "quay.io/oliver006/redis_exporter:v1.43.0"
defaultImage = "redis:6.2.6-alpine"
defaultRedisPort = 6379
HealthyState = "Healthy"
NotHealthyState = "NotHealthy"
)

var (
Expand Down
9 changes: 8 additions & 1 deletion api/redisfailover/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
type RedisFailover struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec RedisFailoverSpec `json:"spec"`
Spec RedisFailoverSpec `json:"spec"`
Status RedisFailoverStatus `json:"status,omitempty"`
}

// RedisFailoverSpec represents a Redis failover spec
Expand Down Expand Up @@ -198,3 +199,9 @@ type RedisFailoverList struct {

Items []RedisFailover `json:"items"`
}

type RedisFailoverStatus struct {
State string `json:"state,omitempty"`
LastChanged string `json:"lastChanged,omitempty"`
Message string `json:"message,omitempty"`
}
4 changes: 4 additions & 0 deletions api/redisfailover/v1/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (r *RedisFailover) Validate() error {
r.Spec.Sentinel.CustomConfig = defaultSentinelCustomConfig
}

r.Status = RedisFailoverStatus{
State: HealthyState,
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions api/redisfailover/v1/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func TestValidate(t *testing.T) {
},
BootstrapNode: test.expectedBootstrapNode,
},
Status: RedisFailoverStatus{
State: HealthyState,
LastChanged: "",
Message: "",
},
}
assert.Equal(expectedRF, rf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12387,6 +12387,19 @@ spec:
type: array
type: object
type: object
status:
description: CRD status defined by redisfailover cluster state
properties:
state:
description: state of redis failover cluster
type: string
lastChanged:
description: timestamp of last state change
type: string
message:
description: message for current state if needed
type: string
type: object
required:
- spec
type: object
Expand Down
2 changes: 2 additions & 0 deletions mocks/operator/redisfailover/RedisFailover.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mocks/service/k8s/Services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 121 additions & 9 deletions operator/redisfailover/checker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package redisfailover

import (
"context"
"errors"
"github.com/spotahome/redis-operator/service/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
"time"

Expand Down Expand Up @@ -85,6 +88,15 @@ func (r *RedisFailoverHandler) UpdateRedisesPods(rf *redisfailoverv1.RedisFailov
// CheckAndHeal runs verifcation checks to ensure the RedisFailover is in an expected and healthy state.
// If the checks do not match up to expectations, an attempt will be made to "heal" the RedisFailover into a healthy state.
func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) error {

oldState := rf.Status.State

rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.HealthyState,
}

defer updateStatus(r.k8sservice, rf, oldState)

if rf.Bootstrapping() {
return r.checkAndHealBootstrapMode(rf)
}
Expand All @@ -99,19 +111,33 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
// Sentinel knows the correct slave number

if !r.rfChecker.IsRedisRunning(rf) {
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of redis mismatch, waiting for redis statefulset reconcile")
return nil
}

if !r.rfChecker.IsSentinelRunning(rf) {
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of sentinel mismatch, waiting for sentinel deployment reconcile")
return nil
}

nMasters, err := r.rfChecker.GetNumberMasters(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get number of masters",
}
return err
}

Expand All @@ -125,7 +151,12 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err = r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err)
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err
}
return nil
Expand All @@ -138,6 +169,10 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Number of Masters running is 0")
maxUptime, err := r.rfChecker.GetMaxRedisPodTime(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get Redis POD time",
}
return err
}

Expand All @@ -150,13 +185,22 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err2 := r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err2)
if err2 != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err2
}
} else {
//sentinels are having a quorum to make a failover , but check if redis are not having local hostip (first boot) as master
status, err2 := r.rfChecker.CheckIfMasterLocalhost(rf)
if err2 != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to check if master localhost",
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("CheckIfMasterLocalhost failed retry later")
return err2
} else if status {
Expand All @@ -165,7 +209,12 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err3 := r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err3)
if err3 != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err3
}

Expand All @@ -183,11 +232,20 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NUMBER_OF_MASTERS, metrics.NOT_APPLICABLE, nil)
default:
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NUMBER_OF_MASTERS, metrics.NOT_APPLICABLE, errors.New("multiple masters detected"))
return errors.New("more than one master, fix manually")
errorMsg := "more than one master, fix manually"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
return errors.New(errorMsg)
}

master, err := r.rfChecker.GetMasterIP(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get master IP",
}
return err
}

Expand All @@ -196,23 +254,38 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Slave not associated to master: %s", err.Error())
if err = r.rfHealer.SetMasterOnAll(master, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
}
return err
}
}

err = r.applyRedisCustomConfig(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_REDIS_CONFIG, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to apply custom config",
}
return err
}

err = r.UpdateRedisesPods(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to update redis PODs",
}
return err
}

sentinels, err := r.rfChecker.GetSentinelsIPs(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get sentinels IPs",
}
return err
}

Expand All @@ -223,6 +296,9 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Fixing sentinel not monitoring expected master: %s", err.Error())
if err := r.rfHealer.NewSentinelMonitor(sip, master, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
}
return err
}
}
Expand All @@ -233,37 +309,62 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
func (r *RedisFailoverHandler) checkAndHealBootstrapMode(rf *redisfailoverv1.RedisFailover) error {

if !r.rfChecker.IsRedisRunning(rf) {
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.UpdateOptions{})
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of redis mismatch, waiting for redis statefulset reconcile")
return nil
}

err := r.UpdateRedisesPods(rf)
if err != nil {
return err
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to update Redis PODs",
}
}
err = r.applyRedisCustomConfig(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_REDIS_CONFIG, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to set Redis custom config",
}
return err
}

bootstrapSettings := rf.Spec.BootstrapNode
err = r.rfHealer.SetExternalMasterOnAll(bootstrapSettings.Host, bootstrapSettings.Port, rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_EXTERNAL_MASTER, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to set external master to all",
}
return err
}

if rf.SentinelsAllowed() {
if !r.rfChecker.IsSentinelRunning(rf) {
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.UpdateOptions{})
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of sentinel mismatch, waiting for sentinel deployment reconcile")
return nil
} else {
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.UpdateOptions{})
}

sentinels, err := r.rfChecker.GetSentinelsIPs(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get sentinels IPs",
}
return err
}
for _, sip := range sentinels {
Expand All @@ -272,6 +373,10 @@ func (r *RedisFailoverHandler) checkAndHealBootstrapMode(rf *redisfailoverv1.Red
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Fixing sentinel not monitoring expected master: %s", err.Error())
if err := r.rfHealer.NewSentinelMonitorWithPort(sip, bootstrapSettings.Host, bootstrapSettings.Port, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to check sentinel monitor",
}
return err
}
}
Expand Down Expand Up @@ -346,3 +451,10 @@ func setRedisCheckerMetrics(metricsClient metrics.Recorder, mode /* redis or sen
}
}
}

func updateStatus(k8sservice k8s.Services, rf *redisfailoverv1.RedisFailover, oldState string) {
if oldState != rf.Status.State {
rf.Status.LastChanged = time.Now().Format(time.RFC3339)
}
k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.UpdateOptions{})
}
3 changes: 3 additions & 0 deletions operator/redisfailover/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redisfailover_test
import (
"errors"
"fmt"
v1 "github.com/spotahome/redis-operator/api/redisfailover/v1"
"testing"
"time"

Expand Down Expand Up @@ -420,8 +421,10 @@ func TestCheckAndHeal(t *testing.T) {

if expErr {
assert.Error(err)
assert.Equal(v1.NotHealthyState, rf.Status.State)
} else {
assert.NoError(err)
assert.Equal(v1.HealthyState, rf.Status.State)
}
mrfc.AssertExpectations(t)
mrfh.AssertExpectations(t)
Expand Down
Loading

0 comments on commit 396a348

Please sign in to comment.