From f5d313993e9b990772a7a82d7bf065bba91c8961 Mon Sep 17 00:00:00 2001 From: Artur Zheludkov Date: Wed, 28 Aug 2024 13:05:10 -0400 Subject: [PATCH 1/3] removing pods one by one in case of pvc resize --- mocks/service/k8s/Services.go | 14 +++++++++++++ service/k8s/statefulset.go | 35 +++++++++++++++++++++++++++++++++ service/k8s/statefulset_test.go | 5 +++++ 3 files changed, 54 insertions(+) diff --git a/mocks/service/k8s/Services.go b/mocks/service/k8s/Services.go index 19175eacf..4dbcec790 100644 --- a/mocks/service/k8s/Services.go +++ b/mocks/service/k8s/Services.go @@ -393,6 +393,20 @@ func (_m *Services) DeleteStatefulSet(namespace string, name string) error { return r0 } +// DeleteStatefulSetPods provides a mock function with given fields: namespace, name +func (_m *Services) DeleteStatefulSetPods(namespace string, name string) error { + ret := _m.Called(namespace, name) + + var r0 error + if rf, ok := ret.Get(0).(func(string, string) error); ok { + r0 = rf(namespace, name) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // GetClusterRole provides a mock function with given fields: name func (_m *Services) GetClusterRole(name string) (*rbacv1.ClusterRole, error) { ret := _m.Called(name) diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..f4413a816 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -29,6 +29,7 @@ type StatefulSet interface { CreateOrUpdateStatefulSet(namespace string, statefulSet *appsv1.StatefulSet) error DeleteStatefulSet(namespace string, name string) error ListStatefulSets(namespace string) (*appsv1.StatefulSetList, error) + DeleteStatefulSetPods(namespace string, name string) error } // StatefulSetService is the service account service implementation using API calls to kubernetes. @@ -161,7 +162,17 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu annotations["storageCapacity"] = fmt.Sprintf("%d", stateCapacity) storedStatefulSet.Annotations = annotations if realUpdate { + s.logger.WithField("namespace", namespace).WithField("statefulSet", statefulSet.Name).Infof("resize statefulset pvcs from %d to %d Success", storedCapacity, stateCapacity) + + s.logger.WithField("namespace", namespace).WithField("statefulSet", statefulSet.Name).Infof("deleting statefulset pods in order to update pvc mount") + err := s.DeleteStatefulSetPods(namespace, storedStatefulSet.Name) + + if err != nil { + s.logger.WithField("namespace", namespace).WithField("statefulSet", statefulSet.Name).Warningf("deletion of sts pods failed:%s", err.Error()) + return err + } + } else { s.logger.WithField("namespace", namespace).WithField("pvc", rfName).Warningf("set annotations,resize nothing") } @@ -188,3 +199,27 @@ func (s *StatefulSetService) ListStatefulSets(namespace string) (*appsv1.Statefu recordMetrics(namespace, "StatefulSet", metrics.NOT_APPLICABLE, "LIST", err, s.metricsRecorder) return stsList, err } + +func (s *StatefulSetService) DeleteStatefulSetPods(namespace, name string) error { + + rps, err := s.GetStatefulSetPods(namespace, name) + + if err != nil { + return err + } + + var deleteErrors []string + for _, rp := range rps.Items { + err := s.kubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), rp.Name, metav1.DeleteOptions{}) + recordMetrics(namespace, "Pod", name, "DELETE", err, s.metricsRecorder) + if err != nil { + deleteErrors = append(deleteErrors, err.Error()) + } + } + + if len(deleteErrors) > 0 { + return fmt.Errorf("failed to delete some pods: %s", strings.Join(deleteErrors, "; ")) + } + + return nil +} diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index 1d6781767..d32122772 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -130,6 +130,11 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { ResourceVersion: "10", }, Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "sts": "sts", + }, + }, VolumeClaimTemplates: []v1.PersistentVolumeClaim{ { Spec: v1.PersistentVolumeClaimSpec{ From 1c48366ce309ba1ce45139f5adc4a74f196bf2c4 Mon Sep 17 00:00:00 2001 From: Artur Zheludkov Date: Thu, 29 Aug 2024 11:56:12 -0400 Subject: [PATCH 2/3] Adding tests --- service/k8s/statefulset_test.go | 79 ++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index d32122772..353d2fd6c 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -20,10 +20,14 @@ import ( "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" "github.com/spotahome/redis-operator/service/k8s" + + corev1 "k8s.io/api/core/v1" ) var ( - statefulSetsGroup = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"} + statefulSetsGroup = schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "statefulsets"} + persistentVolumeClaimGroup = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "persistentvolumeclaims"} + podGroup = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} ) func newStatefulSetUpdateAction(ns string, statefulSet *appsv1.StatefulSet) kubetesting.UpdateActionImpl { @@ -38,6 +42,26 @@ func newStatefulSetCreateAction(ns string, statefulSet *appsv1.StatefulSet) kube return kubetesting.NewCreateAction(statefulSetsGroup, ns, statefulSet) } +func newPVCGetAction(pvc *corev1.PersistentVolumeClaim) kubetesting.CreateActionImpl { + return kubetesting.NewCreateAction(statefulSetsGroup, "", pvc) +} + +func newPVCUpdateAction(pvc *corev1.PersistentVolumeClaim) kubetesting.UpdateActionImpl { + return kubetesting.NewUpdateAction(persistentVolumeClaimGroup, "", pvc) +} + +func newPVCListAction(opts metav1.ListOptions) kubetesting.ListActionImpl { + return kubetesting.NewListAction(persistentVolumeClaimGroup, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "PersistentVolumeClaim"}, "", opts) +} + +func newPodListAction(ns string, opts metav1.ListOptions) kubetesting.ListActionImpl { + return kubetesting.NewListAction(podGroup, schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}, ns, opts) +} + +func newPodDeleteAction(ns string, name string) kubetesting.DeleteActionImpl { + return kubetesting.NewDeleteAction(podGroup, ns, name) +} + func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { testStatefulSet := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ @@ -130,11 +154,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { ResourceVersion: "10", }, Spec: appsv1.StatefulSetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "sts": "sts", - }, - }, + Selector: &metav1.LabelSelector{}, VolumeClaimTemplates: []v1.PersistentVolumeClaim{ { Spec: v1.PersistentVolumeClaimSpec{ @@ -167,6 +187,17 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { }, }, } + + podList := &v1.PodList{ + Items: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Labels: map[string]string{}, + }, + }, + }, + } pvcList := &v1.PersistentVolumeClaimList{ Items: []v1.PersistentVolumeClaim{ { @@ -200,6 +231,18 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { }, } // Mock. + + expActions := []kubetesting.Action{ + newStatefulSetGetAction(testns, beforeSts.ObjectMeta.Name), + newPVCListAction(metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/component=redis,app.kubernetes.io/name=teststatefulSet1,app.kubernetes.io/part-of=redis-failover", + }), + newPVCUpdateAction(&pvcList.Items[0]), + newStatefulSetGetAction(testns, afterSts.ObjectMeta.Name), + newPodListAction(testns, metav1.ListOptions{}), + newPodDeleteAction(testns, podList.Items[0].ObjectMeta.Name), + newStatefulSetUpdateAction(testns, afterSts), + } mcli := &kubernetes.Clientset{} mcli.AddReactor("get", "statefulsets", func(action kubetesting.Action) (bool, runtime.Object, error) { return true, beforeSts, nil @@ -212,17 +255,41 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim) return true, action.(kubetesting.UpdateActionImpl).Object, nil }) + + mcli.AddReactor("list", "pods", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + return true, podList, nil + }) + + mcli.AddReactor("delete", "pods", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { + return true, podList, nil + }) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources) + assert.Equal(expActions, mcli.Actions()) + // should not call update + mcli = &kubernetes.Clientset{} + + // no deletion pods anymore, as pvc is already resized + expActions = []kubetesting.Action{ + newStatefulSetGetAction(testns, beforeSts.ObjectMeta.Name), + newPVCListAction(metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/component=redis,app.kubernetes.io/name=,app.kubernetes.io/part-of=redis-failover", + }), + newStatefulSetUpdateAction(testns, afterSts), + } + mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { panic("shouldn't call update") }) service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err = service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) + + assert.Equal(expActions, mcli.Actions()) }) } } From 5142c9d9fadc75e5c4131fa354c0d3ebecf036d8 Mon Sep 17 00:00:00 2001 From: Artur Zheludkov Date: Thu, 29 Aug 2024 12:07:46 -0400 Subject: [PATCH 3/3] test refactoring --- service/k8s/statefulset_test.go | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index 353d2fd6c..194e7ccb6 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -42,10 +42,6 @@ func newStatefulSetCreateAction(ns string, statefulSet *appsv1.StatefulSet) kube return kubetesting.NewCreateAction(statefulSetsGroup, ns, statefulSet) } -func newPVCGetAction(pvc *corev1.PersistentVolumeClaim) kubetesting.CreateActionImpl { - return kubetesting.NewCreateAction(statefulSetsGroup, "", pvc) -} - func newPVCUpdateAction(pvc *corev1.PersistentVolumeClaim) kubetesting.UpdateActionImpl { return kubetesting.NewUpdateAction(persistentVolumeClaimGroup, "", pvc) } @@ -231,12 +227,12 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { }, } // Mock. - + opts := metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/component=redis,app.kubernetes.io/name=teststatefulSet1,app.kubernetes.io/part-of=redis-failover", + } expActions := []kubetesting.Action{ newStatefulSetGetAction(testns, beforeSts.ObjectMeta.Name), - newPVCListAction(metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/component=redis,app.kubernetes.io/name=teststatefulSet1,app.kubernetes.io/part-of=redis-failover", - }), + newPVCListAction(opts), newPVCUpdateAction(&pvcList.Items[0]), newStatefulSetGetAction(testns, afterSts.ObjectMeta.Name), newPodListAction(testns, metav1.ListOptions{}), @@ -272,13 +268,14 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { // should not call update mcli = &kubernetes.Clientset{} + mcli.AddReactor("get", "statefulsets", func(action kubetesting.Action) (bool, runtime.Object, error) { + return true, afterSts, nil + }) // no deletion pods anymore, as pvc is already resized expActions = []kubetesting.Action{ - newStatefulSetGetAction(testns, beforeSts.ObjectMeta.Name), - newPVCListAction(metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/component=redis,app.kubernetes.io/name=,app.kubernetes.io/part-of=redis-failover", - }), + newStatefulSetGetAction(testns, afterSts.ObjectMeta.Name), + newPVCListAction(opts), newStatefulSetUpdateAction(testns, afterSts), } @@ -288,7 +285,6 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err = service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) - assert.Equal(expActions, mcli.Actions()) }) }