Skip to content

Commit

Permalink
Adding annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
Kaushik-Vijayakumar-1 committed Mar 15, 2024
1 parent 632aa3d commit a8322a8
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 1 deletion.
14 changes: 14 additions & 0 deletions mocks/service/k8s/Services.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions operator/redisfailover/service/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,26 @@ func (r *RedisFailoverChecker) setSlaveLabelIfNecessary(namespace string, pod co
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

func (r *RedisFailoverChecker) setMasterAnnotationIfNecessary(namespace string, pod corev1.Pod) error {
for annotationKey, annotationValue := range pod.ObjectMeta.Annotations {
if annotationKey == clusterAutoscalerSafeToEvictAnnotationKey &&
annotationValue == clusterAutoscalerSafeToEvictAnnotationMaster {
return nil
}
}
return r.k8sService.UpdatePodAnnotations(namespace, pod.ObjectMeta.Name, generateRedisMasterAnnotations())
}

func (r *RedisFailoverChecker) setSlaveAnnotationIfNecessary(namespace string, pod corev1.Pod) error {
for annotationKey, annotationValue := range pod.ObjectMeta.Annotations {
if annotationKey == clusterAutoscalerSafeToEvictAnnotationKey &&
annotationValue == clusterAutoscalerSafeToEvictAnnotationSlave {
return nil
}
}
return r.k8sService.UpdatePodAnnotations(namespace, pod.ObjectMeta.Name, generateRedisSlaveAnnotations())
}

// CheckAllSlavesFromMaster controlls that all slaves have the same master (the real one)
func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redisfailoverv1.RedisFailover) error {
rps, err := r.k8sService.GetStatefulSetPods(rf.Namespace, GetRedisName(rf))
Expand All @@ -121,11 +141,19 @@ func (r *RedisFailoverChecker) CheckAllSlavesFromMaster(master string, rf *redis
if err != nil {
return err
}
err = r.setMasterAnnotationIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
} else {
err = r.setSlaveLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
err = r.setSlaveAnnotationIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
}

slave, err := r.redisClient.GetSlaveOf(rp.Status.PodIP, rport, password)
Expand Down
4 changes: 4 additions & 0 deletions operator/redisfailover/service/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestCheckAllSlavesFromMasterGetStatefulSetError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(nil, errors.New(""))
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}

checker := rfservice.NewRedisFailoverChecker(ms, mr, log.DummyLogger{}, metrics.Dummy)
Expand Down Expand Up @@ -186,6 +187,7 @@ func TestCheckAllSlavesFromMasterGetSlaveOfError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "", "0", "").Once().Return("", errors.New(""))

Expand Down Expand Up @@ -214,6 +216,7 @@ func TestCheckAllSlavesFromMasterDifferentMaster(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "0", "").Once().Return("1.1.1.1", nil)

Expand Down Expand Up @@ -242,6 +245,7 @@ func TestCheckAllSlavesFromMaster(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
mr := &mRedisService.Client{}
mr.On("GetSlaveOf", "0.0.0.0", "0", "").Once().Return("1.1.1.1", nil)

Expand Down
12 changes: 12 additions & 0 deletions operator/redisfailover/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ func generateRedisSlaveRoleLabel() map[string]string {
}
}

func generateRedisMasterAnnotations() map[string]string {
return map[string]string{
clusterAutoscalerSafeToEvictAnnotationKey: clusterAutoscalerSafeToEvictAnnotationMaster,
}
}

func generateRedisSlaveAnnotations() map[string]string {
return map[string]string{
clusterAutoscalerSafeToEvictAnnotationKey: clusterAutoscalerSafeToEvictAnnotationSlave,
}
}

// EnsureSentinelService makes sure the sentinel service exists
func (r *RedisFailoverKubeClient) EnsureSentinelService(rf *redisfailoverv1.RedisFailover, labels map[string]string, ownerRefs []metav1.OwnerReference) error {
svc := generateSentinelService(rf, labels, ownerRefs)
Expand Down
4 changes: 4 additions & 0 deletions operator/redisfailover/service/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ const (
redisRoleLabelKey = "redisfailovers-role"
redisRoleLabelMaster = "master"
redisRoleLabelSlave = "slave"

clusterAutoscalerSafeToEvictAnnotationKey = "cluster-autoscaler.kubernetes.io~1safe-to-evict"
clusterAutoscalerSafeToEvictAnnotationMaster = "false"
clusterAutoscalerSafeToEvictAnnotationSlave = "true"
)
42 changes: 41 additions & 1 deletion operator/redisfailover/service/heal.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ func (r *RedisFailoverHealer) setSlaveLabelIfNecessary(namespace string, pod v1.
return r.k8sService.UpdatePodLabels(namespace, pod.ObjectMeta.Name, generateRedisSlaveRoleLabel())
}

func (r *RedisFailoverHealer) setMasterAnnotationIfNecessary(namespace string, pod v1.Pod) error {
for annotationKey, annotationValue := range pod.ObjectMeta.Annotations {
if annotationKey == clusterAutoscalerSafeToEvictAnnotationKey &&
annotationValue == clusterAutoscalerSafeToEvictAnnotationMaster {
return nil
}
}
return r.k8sService.UpdatePodAnnotations(namespace, pod.ObjectMeta.Name, generateRedisMasterAnnotations())
}

func (r *RedisFailoverHealer) setSlaveAnnotationIfNecessary(namespace string, pod v1.Pod) error {
for annotationKey, annotationValue := range pod.ObjectMeta.Annotations {
if annotationKey == clusterAutoscalerSafeToEvictAnnotationKey &&
annotationValue == clusterAutoscalerSafeToEvictAnnotationSlave {
return nil
}
}
return r.k8sService.UpdatePodAnnotations(namespace, pod.ObjectMeta.Name, generateRedisSlaveAnnotations())
}

func (r *RedisFailoverHealer) MakeMaster(ip string, rf *redisfailoverv1.RedisFailover) error {
password, err := k8s.GetRedisPassword(r.k8sService, rf)
if err != nil {
Expand All @@ -79,7 +99,14 @@ func (r *RedisFailoverHealer) MakeMaster(ip string, rf *redisfailoverv1.RedisFai
}
for _, rp := range rps.Items {
if rp.Status.PodIP == ip {
return r.setMasterLabelIfNecessary(rf.Namespace, rp)
err = r.setMasterLabelIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
err = r.setMasterAnnotationIfNecessary(rf.Namespace, rp)
if err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -121,6 +148,10 @@ func (r *RedisFailoverHealer) SetOldestAsMaster(rf *redisfailoverv1.RedisFailove
if err != nil {
return err
}
err = r.setMasterAnnotationIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}

newMasterIP = pod.Status.PodIP
} else {
Expand All @@ -133,6 +164,11 @@ func (r *RedisFailoverHealer) SetOldestAsMaster(rf *redisfailoverv1.RedisFailove
if err != nil {
return err
}

err = r.setSlaveAnnotationIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
}
if newMasterIP == "" {
Expand Down Expand Up @@ -175,6 +211,10 @@ func (r *RedisFailoverHealer) SetMasterOnAll(masterIP string, rf *redisfailoverv
if err != nil {
return err
}
err = r.setSlaveAnnotationIfNecessary(rf.Namespace, pod)
if err != nil {
return err
}
}
}
return nil
Expand Down
8 changes: 8 additions & 0 deletions operator/redisfailover/service/heal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestSetOldestAsMasterNewMasterError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("MakeMaster", "0.0.0.0", "0", "").Once().Return(errors.New(""))

Expand Down Expand Up @@ -62,6 +63,7 @@ func TestSetOldestAsMaster(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("MakeMaster", "0.0.0.0", "0", "").Once().Return(nil)

Expand Down Expand Up @@ -94,6 +96,7 @@ func TestSetOldestAsMasterMultiplePodsMakeSlaveOfError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("MakeMaster", "0.0.0.0", "0", "").Once().Return(nil)
mr.On("MakeSlaveOfWithPort", "1.1.1.1", "0.0.0.0", "0", "").Once().Return(errors.New(""))
Expand Down Expand Up @@ -127,6 +130,7 @@ func TestSetOldestAsMasterMultiplePods(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("MakeMaster", "0.0.0.0", "0", "").Once().Return(nil)
mr.On("MakeSlaveOfWithPort", "1.1.1.1", "0.0.0.0", "0", "").Once().Return(nil)
Expand Down Expand Up @@ -170,6 +174,7 @@ func TestSetOldestAsMasterOrdering(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("MakeMaster", "1.1.1.1", "0", "").Once().Return(nil)
mr.On("MakeSlaveOfWithPort", "0.0.0.0", "1.1.1.1", "0", "").Once().Return(nil)
Expand Down Expand Up @@ -203,6 +208,7 @@ func TestSetMasterOnAllMakeMasterError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Once().Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("IsMaster", "0.0.0.0", "0", "").Return(false, errors.New(""))
healer := rfservice.NewRedisFailoverHealer(ms, mr, log.DummyLogger{})
Expand Down Expand Up @@ -234,6 +240,7 @@ func TestSetMasterOnAllMakeSlaveOfError(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("IsMaster", "0.0.0.0", "0", "").Return(true, nil)
mr.On("MakeSlaveOfWithPort", "1.1.1.1", "0.0.0.0", "0", "").Once().Return(errors.New(""))
Expand Down Expand Up @@ -267,6 +274,7 @@ func TestSetMasterOnAll(t *testing.T) {
ms := &mK8SService.Services{}
ms.On("GetStatefulSetPods", namespace, rfservice.GetRedisName(rf)).Once().Return(pods, nil)
ms.On("UpdatePodLabels", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
ms.On("UpdatePodAnnotations", namespace, mock.AnythingOfType("string"), mock.Anything).Return(nil)
mr := &mRedisService.Client{}
mr.On("IsMaster", "0.0.0.0", "0", "").Return(true, nil)
mr.On("MakeSlaveOfWithPort", "1.1.1.1", "0.0.0.0", "0", "").Once().Return(nil)
Expand Down
23 changes: 23 additions & 0 deletions service/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Pod interface {
DeletePod(namespace string, name string) error
ListPods(namespace string) (*corev1.PodList, error)
UpdatePodLabels(namespace, podName string, labels map[string]string) error
UpdatePodAnnotations(namespace, podName string, annotations map[string]string) error
}

// PodService is the pod service implementation using API calls to kubernetes.
Expand Down Expand Up @@ -128,3 +129,25 @@ func (p *PodService) UpdatePodLabels(namespace, podName string, labels map[strin
}
return err
}

func (p *PodService) UpdatePodAnnotations(namespace, podName string, annotations map[string]string) error {
p.logger.Infof("Update pod annotation, namespace: %s, pod name: %s, annotations: %v", namespace, podName, annotations)

var payloads []interface{}
for annotationKey, annotationValue := range annotations {
payload := PatchStringValue{
Op: "replace",
Path: "/metadata/annotations/" + annotationKey,
Value: annotationValue,
}
payloads = append(payloads, payload)
}
payloadBytes, _ := json.Marshal(payloads)

_, err := p.kubeClient.CoreV1().Pods(namespace).Patch(context.TODO(), podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
recordMetrics(namespace, "Pod", podName, "PATCH", err, p.metricsRecorder)
if err != nil {
p.logger.Errorf("Update pod annotations failed, namespace: %s, pod name: %s, error: %v", namespace, podName, err)
}
return err
}

0 comments on commit a8322a8

Please sign in to comment.