Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

propagate current redis cluster state to redisfailver crd #667

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/redisfailover/v1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const (
defaultExporterImage = "quay.io/oliver006/redis_exporter:v1.43.0"
defaultImage = "redis:6.2.6-alpine"
defaultRedisPort = 6379
HealthyState = "Healthy"
NotHealthyState = "NotHealthy"
)

var (
Expand Down
9 changes: 8 additions & 1 deletion api/redisfailover/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
type RedisFailover struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec RedisFailoverSpec `json:"spec"`
Spec RedisFailoverSpec `json:"spec"`
Status RedisFailoverStatus `json:"status,omitempty"`
}

// RedisFailoverSpec represents a Redis failover spec
Expand Down Expand Up @@ -198,3 +199,9 @@ type RedisFailoverList struct {

Items []RedisFailover `json:"items"`
}

type RedisFailoverStatus struct {
State string `json:"state,omitempty"`
LastChanged string `json:"lastChanged,omitempty"`
Message string `json:"message,omitempty"`
}
4 changes: 4 additions & 0 deletions api/redisfailover/v1/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (r *RedisFailover) Validate() error {
r.Spec.Sentinel.CustomConfig = defaultSentinelCustomConfig
}

r.Status = RedisFailoverStatus{
State: HealthyState,
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions api/redisfailover/v1/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func TestValidate(t *testing.T) {
},
BootstrapNode: test.expectedBootstrapNode,
},
Status: RedisFailoverStatus{
State: HealthyState,
LastChanged: "",
Message: "",
},
}
assert.Equal(expectedRF, rf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12387,6 +12387,19 @@ spec:
type: array
type: object
type: object
status:
description: CRD status defined by redisfailover cluster state
properties:
state:
description: state of redis failover cluster
type: string
lastChanged:
description: timestamp of last state change
type: string
message:
description: message for current state if needed
type: string
type: object
required:
- spec
type: object
Expand Down
2 changes: 2 additions & 0 deletions mocks/operator/redisfailover/RedisFailover.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions mocks/service/k8s/Services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 121 additions & 9 deletions operator/redisfailover/checker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package redisfailover

import (
"context"
"errors"
"github.com/spotahome/redis-operator/service/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strconv"
"time"

Expand Down Expand Up @@ -85,6 +88,15 @@ func (r *RedisFailoverHandler) UpdateRedisesPods(rf *redisfailoverv1.RedisFailov
// CheckAndHeal runs verifcation checks to ensure the RedisFailover is in an expected and healthy state.
// If the checks do not match up to expectations, an attempt will be made to "heal" the RedisFailover into a healthy state.
func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) error {

oldState := rf.Status.State

rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.HealthyState,
}

defer updateStatus(r.k8sservice, rf, oldState)

if rf.Bootstrapping() {
return r.checkAndHealBootstrapMode(rf)
}
Expand All @@ -99,19 +111,33 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
// Sentinel knows the correct slave number

if !r.rfChecker.IsRedisRunning(rf) {
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of redis mismatch, waiting for redis statefulset reconcile")
return nil
}

if !r.rfChecker.IsSentinelRunning(rf) {
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of sentinel mismatch, waiting for sentinel deployment reconcile")
return nil
}

nMasters, err := r.rfChecker.GetNumberMasters(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get number of masters",
}
return err
}

Expand All @@ -125,7 +151,12 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err = r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err)
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err
}
return nil
Expand All @@ -138,6 +169,10 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Number of Masters running is 0")
maxUptime, err := r.rfChecker.GetMaxRedisPodTime(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get Redis POD time",
}
return err
}

Expand All @@ -150,13 +185,22 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err2 := r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err2)
if err2 != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err2
}
} else {
//sentinels are having a quorum to make a failover , but check if redis are not having local hostip (first boot) as master
status, err2 := r.rfChecker.CheckIfMasterLocalhost(rf)
if err2 != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to check if master localhost",
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("CheckIfMasterLocalhost failed retry later")
return err2
} else if status {
Expand All @@ -165,7 +209,12 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
err3 := r.rfHealer.SetOldestAsMaster(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NO_MASTER, metrics.NOT_APPLICABLE, err3)
if err3 != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf("Error in Setting oldest Pod as master")
errorMsg := "Error in Setting oldest Pod as master"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Errorf(errorMsg)
return err3
}

Expand All @@ -183,11 +232,20 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NUMBER_OF_MASTERS, metrics.NOT_APPLICABLE, nil)
default:
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.NUMBER_OF_MASTERS, metrics.NOT_APPLICABLE, errors.New("multiple masters detected"))
return errors.New("more than one master, fix manually")
errorMsg := "more than one master, fix manually"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
return errors.New(errorMsg)
}

master, err := r.rfChecker.GetMasterIP(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get master IP",
}
return err
}

Expand All @@ -196,23 +254,38 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Slave not associated to master: %s", err.Error())
if err = r.rfHealer.SetMasterOnAll(master, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
}
return err
}
}

err = r.applyRedisCustomConfig(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_REDIS_CONFIG, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to apply custom config",
}
return err
}

err = r.UpdateRedisesPods(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to update redis PODs",
}
return err
}

sentinels, err := r.rfChecker.GetSentinelsIPs(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get sentinels IPs",
}
return err
}

Expand All @@ -223,6 +296,9 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Fixing sentinel not monitoring expected master: %s", err.Error())
if err := r.rfHealer.NewSentinelMonitor(sip, master, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
}
return err
}
}
Expand All @@ -233,37 +309,62 @@ func (r *RedisFailoverHandler) CheckAndHeal(rf *redisfailoverv1.RedisFailover) e
func (r *RedisFailoverHandler) checkAndHealBootstrapMode(rf *redisfailoverv1.RedisFailover) error {

if !r.rfChecker.IsRedisRunning(rf) {
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.PatchOptions{})
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.REDIS_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of redis mismatch, waiting for redis statefulset reconcile")
return nil
}

err := r.UpdateRedisesPods(rf)
if err != nil {
return err
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to update Redis PODs",
}
}
err = r.applyRedisCustomConfig(rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_REDIS_CONFIG, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to set Redis custom config",
}
return err
}

bootstrapSettings := rf.Spec.BootstrapNode
err = r.rfHealer.SetExternalMasterOnAll(bootstrapSettings.Host, bootstrapSettings.Port, rf)
setRedisCheckerMetrics(r.mClient, "redis", rf.Namespace, rf.Name, metrics.APPLY_EXTERNAL_MASTER, metrics.NOT_APPLICABLE, err)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to set external master to all",
}
return err
}

if rf.SentinelsAllowed() {
if !r.rfChecker.IsSentinelRunning(rf) {
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New("not all replicas running"))
errorMsg := "not all replicas running"
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: errorMsg,
}
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.PatchOptions{})
setRedisCheckerMetrics(r.mClient, "sentinel", rf.Namespace, rf.Name, metrics.SENTINEL_REPLICA_MISMATCH, metrics.NOT_APPLICABLE, errors.New(errorMsg))
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Debugf("Number of sentinel mismatch, waiting for sentinel deployment reconcile")
return nil
} else {
r.k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.PatchOptions{})
}

sentinels, err := r.rfChecker.GetSentinelsIPs(rf)
if err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to get sentinels IPs",
}
return err
}
for _, sip := range sentinels {
Expand All @@ -272,6 +373,10 @@ func (r *RedisFailoverHandler) checkAndHealBootstrapMode(rf *redisfailoverv1.Red
if err != nil {
r.logger.WithField("redisfailover", rf.ObjectMeta.Name).WithField("namespace", rf.ObjectMeta.Namespace).Warningf("Fixing sentinel not monitoring expected master: %s", err.Error())
if err := r.rfHealer.NewSentinelMonitorWithPort(sip, bootstrapSettings.Host, bootstrapSettings.Port, rf); err != nil {
rf.Status = redisfailoverv1.RedisFailoverStatus{
State: redisfailoverv1.NotHealthyState,
Message: "unable to check sentinel monitor",
}
return err
}
}
Expand Down Expand Up @@ -346,3 +451,10 @@ func setRedisCheckerMetrics(metricsClient metrics.Recorder, mode /* redis or sen
}
}
}

func updateStatus(k8sservice k8s.Services, rf *redisfailoverv1.RedisFailover, oldState string) {
if oldState != rf.Status.State {
rf.Status.LastChanged = time.Now().Format(time.RFC3339)
}
k8sservice.UpdateRedisFailoverStatus(context.Background(), rf.Namespace, rf, metav1.PatchOptions{})
}
3 changes: 3 additions & 0 deletions operator/redisfailover/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redisfailover_test
import (
"errors"
"fmt"
v1 "github.com/spotahome/redis-operator/api/redisfailover/v1"
"testing"
"time"

Expand Down Expand Up @@ -420,8 +421,10 @@ func TestCheckAndHeal(t *testing.T) {

if expErr {
assert.Error(err)
assert.Equal(v1.NotHealthyState, rf.Status.State)
} else {
assert.NoError(err)
assert.Equal(v1.HealthyState, rf.Status.State)
}
mrfc.AssertExpectations(t)
mrfh.AssertExpectations(t)
Expand Down
Loading