From 7657ced6d8a17540ea3fd1a95c39e1316c71ad43 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sun, 9 Apr 2023 21:18:47 +0530 Subject: [PATCH 01/12] add caching --- cmd/redisoperator/main.go | 2 +- cmd/utils/flags.go | 2 + operator/redisfailover/ensurer.go | 1 + operator/redisfailover/service/client.go | 1 + service/k8s/configmap.go | 36 ++- service/k8s/configmap_test.go | 2 +- service/k8s/deployment.go | 31 +- service/k8s/deployment_test.go | 2 +- service/k8s/k8s.go | 18 +- service/k8s/pod.go | 40 ++- service/k8s/pod_test.go | 2 +- service/k8s/poddisruptionbudget.go | 39 ++- service/k8s/poddisruptionbudget_test.go | 2 +- service/k8s/rbac.go | 95 +++++- service/k8s/rbac_test.go | 2 +- service/k8s/secret.go | 32 +- service/k8s/secret_test.go | 2 +- service/k8s/service.go | 35 +- service/k8s/service_test.go | 2 +- service/k8s/statefulset.go | 35 +- service/k8s/statefulset_test.go | 6 +- service/k8s/util.go | 391 +++++++++++++++++++++++ 22 files changed, 706 insertions(+), 72 deletions(-) diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 4137161cc..fd77062a6 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -79,7 +79,7 @@ func (m *Main) Run() error { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.UseCache) // Create the redis clients redisClient := redis.New(metricsRecorder) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 578fe0e62..6ce8673c8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -19,6 +19,7 @@ type CMDFlags struct { K8sQueriesBurstable int Concurrency int LogLevel string + UseCache bool } // Init initializes and parse the flags @@ -35,6 +36,7 @@ func (c *CMDFlags) Init() { // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") flag.StringVar(&c.LogLevel, "log-level", "info", "set log level") + flag.BoolVar(&c.UseCache, "use-cache", false, "use cache stores to get k8s objects") // Parse flags flag.Parse() } diff --git a/operator/redisfailover/ensurer.go b/operator/redisfailover/ensurer.go index 23b609417..3d02c8ebe 100644 --- a/operator/redisfailover/ensurer.go +++ b/operator/redisfailover/ensurer.go @@ -9,6 +9,7 @@ import ( // Ensure is called to ensure all of the resources associated with a RedisFailover are created func (w *RedisFailoverHandler) Ensure(rf *redisfailoverv1.RedisFailover, labels map[string]string, or []metav1.OwnerReference, metricsClient metrics.Recorder) error { + if rf.Spec.Redis.Exporter.Enabled { if err := w.rfService.EnsureRedisService(rf, labels, or); err != nil { return err diff --git a/operator/redisfailover/service/client.go b/operator/redisfailover/service/client.go index 22c8d60ea..1ed7d548e 100644 --- a/operator/redisfailover/service/client.go +++ b/operator/redisfailover/service/client.go @@ -158,6 +158,7 @@ func (r *RedisFailoverKubeClient) EnsureNotPresentRedisService(rf *redisfailover namespace := rf.Namespace // If the service exists (no get error), delete it if _, err := r.K8SService.GetService(namespace, name); err == nil { + r.logger.Infof("deleting svc %v...", name) return r.K8SService.DeleteService(namespace, name) } return nil diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 1b6fc1424..c8c35e3bb 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -2,11 +2,13 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -26,26 +28,48 @@ type ConfigMap interface { type ConfigMapService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewConfigMapService returns a new ConfigMap KubeService. -func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService { +func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { logger = logger.With("service", "k8s.configMap") + + // rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cmCacheStore *cache.Store + // if !useCache { + // cmCacheStore = ConfigMapCacheStoreFromKubeClient(rc) + // } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, + cacheStore: cmCacheStore, metricsRecorder: metricsRecorder, } } func (p *ConfigMapService) GetConfigMap(namespace string, name string) (*corev1.ConfigMap, error) { - configMap, err := p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var cm *corev1.ConfigMap + var err error + var exists bool + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + cm = item.(*corev1.ConfigMap) + } + if !exists { + err = fmt.Errorf("configmap %v not found in namespace %v", name, namespace) + } + } else { + cm, err = p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return configMap, err + + recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) + + return cm, err } func (p *ConfigMapService) CreateConfigMap(namespace string, configMap *corev1.ConfigMap) error { diff --git a/service/k8s/configmap_test.go b/service/k8s/configmap_test.go index da0701cf5..4efad90ca 100644 --- a/service/k8s/configmap_test.go +++ b/service/k8s/configmap_test.go @@ -104,7 +104,7 @@ func TestConfigMapServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewConfigMapService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewConfigMapService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateConfigMap(testns, test.configMap) if test.expErr { diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 46d63bb93..08052a639 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -10,6 +10,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -30,26 +32,45 @@ type Deployment interface { type DeploymentService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewDeploymentService returns a new Deployment KubeService. -func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService { +func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *DeploymentService { logger = logger.With("service", "k8s.deployment") + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + if useCache { + cacheStore = DeploymentCacheStoreFromKubeClient(rc) + } return &DeploymentService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetDeployment will retrieve the requested deployment based on namespace and name func (d *DeploymentService) GetDeployment(namespace, name string) (*appsv1.Deployment, error) { - deployment, err := d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) - if err != nil { - return nil, err + var deployment *appsv1.Deployment + var err error + var exists bool + if d.cacheStore != nil { + c := *d.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + deployment = item.(*appsv1.Deployment) + } + if !exists { + err = fmt.Errorf("deployment %v not found in namespace %v", name, namespace) + } + } else { + deployment, err = d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) return deployment, err } diff --git a/service/k8s/deployment_test.go b/service/k8s/deployment_test.go index 14be68360..657e04f4c 100644 --- a/service/k8s/deployment_test.go +++ b/service/k8s/deployment_test.go @@ -104,7 +104,7 @@ func TestDeploymentServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewDeploymentService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewDeploymentService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateDeployment(testns, test.deployment) if test.expErr { diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index b6e68ae44..447fbffd0 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -35,16 +35,16 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services { +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) Services { return &services{ - ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), - Secret: NewSecretService(kubecli, logger, metricsRecorder), - Pod: NewPodService(kubecli, logger, metricsRecorder), - PodDisruptionBudget: NewPodDisruptionBudgetService(kubecli, logger, metricsRecorder), + ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder, useCache), + Secret: NewSecretService(kubecli, logger, metricsRecorder, useCache), + Pod: NewPodService(kubecli, logger, metricsRecorder, useCache), + PodDisruptionBudget: NewPodDisruptionBudgetService(kubecli, logger, metricsRecorder, useCache), RedisFailover: NewRedisFailoverService(crdcli, logger, metricsRecorder), - Service: NewServiceService(kubecli, logger, metricsRecorder), - RBAC: NewRBACService(kubecli, logger, metricsRecorder), - Deployment: NewDeploymentService(kubecli, logger, metricsRecorder), - StatefulSet: NewStatefulSetService(kubecli, logger, metricsRecorder), + Service: NewServiceService(kubecli, logger, metricsRecorder, useCache), + RBAC: NewRBACService(kubecli, logger, metricsRecorder, useCache), + Deployment: NewDeploymentService(kubecli, logger, metricsRecorder, useCache), + StatefulSet: NewStatefulSetService(kubecli, logger, metricsRecorder, useCache), } } diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 0583302ce..bd57543b8 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -3,16 +3,18 @@ package k8s import ( "context" "encoding/json" + "fmt" "k8s.io/apimachinery/pkg/types" + "github.com/spotahome/redis-operator/log" + "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - - "github.com/spotahome/redis-operator/log" - "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Pod the ServiceAccount service that knows how to interact with k8s to manage them @@ -30,25 +32,47 @@ type Pod interface { type PodService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodService returns a new Pod KubeService. -func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService { +func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *PodService { logger = logger.With("service", "k8s.pod") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var podCacheStore *cache.Store + if useCache { + podCacheStore = PodCacheStoreFromKubeClient(rc) + } + return &PodService{ kubeClient: kubeClient, logger: logger, + cacheStore: podCacheStore, metricsRecorder: metricsRecorder, } } func (p *PodService) GetPod(namespace string, name string) (*corev1.Pod, error) { - pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var pod *corev1.Pod + var err error + var exists bool + if p.cacheStore != nil { + + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + pod = item.(*corev1.Pod) + } + if !exists { + err = fmt.Errorf("pod %v not found in namespace %v", name, namespace) + } + } else { + pod, err = p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) return pod, err } diff --git a/service/k8s/pod_test.go b/service/k8s/pod_test.go index eaf4e7feb..77b757845 100644 --- a/service/k8s/pod_test.go +++ b/service/k8s/pod_test.go @@ -104,7 +104,7 @@ func TestPodServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewPodService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewPodService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdatePod(testns, test.pod) if test.expErr { diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index 48350bc43..dc0bb7c95 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -25,26 +28,50 @@ type PodDisruptionBudget interface { type PodDisruptionBudgetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodDisruptionBudgetService returns a new PodDisruptionBudget KubeService. -func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodDisruptionBudgetService { +func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *PodDisruptionBudgetService { logger = logger.With("service", "k8s.podDisruptionBudget") + + rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + if useCache { + cacheStore = PodDisruptionBudgetCacheStoreFromKubeClient(rc) + } + return &PodDisruptionBudgetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (p *PodDisruptionBudgetService) GetPodDisruptionBudget(namespace string, name string) (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudget, err := p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var podDisruptionBudget *policyv1.PodDisruptionBudget + var err error + var exists bool + + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + podDisruptionBudget = item.(*policyv1.PodDisruptionBudget) + } + if !exists { + + err = fmt.Errorf("podDisruptionBudget %v not found in namespace %v", name, namespace) + } + } else { + podDisruptionBudget, err = p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return podDisruptionBudget, nil + recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) + + return podDisruptionBudget, err } func (p *PodDisruptionBudgetService) CreatePodDisruptionBudget(namespace string, podDisruptionBudget *policyv1.PodDisruptionBudget) error { diff --git a/service/k8s/poddisruptionbudget_test.go b/service/k8s/poddisruptionbudget_test.go index c2427859d..8848a68a1 100644 --- a/service/k8s/poddisruptionbudget_test.go +++ b/service/k8s/poddisruptionbudget_test.go @@ -102,7 +102,7 @@ func TestPodDisruptionBudgetServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewPodDisruptionBudgetService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewPodDisruptionBudgetService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdatePodDisruptionBudget(testns, test.podDisruptionBudget) if test.expErr { diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index a5534b445..fc6ce19ab 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,37 +30,107 @@ type RBAC interface { // NamespaceService is the Namespace service implementation using API calls to kubernetes. type RBACService struct { - kubeClient kubernetes.Interface - logger log.Logger - metricsRecorder metrics.Recorder + kubeClient kubernetes.Interface + logger log.Logger + roleCacheStore *cache.Store + roleBindingCacheStore *cache.Store + clusterRoleCacheStore *cache.Store + metricsRecorder metrics.Recorder } // NewRBACService returns a new RBAC KubeService. -func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *RBACService { +func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *RBACService { logger = logger.With("service", "k8s.rbac") + + rc := kubeClient.RbacV1().RESTClient().(*rest.RESTClient) + + var roleCacheStore *cache.Store + var roleBindingCacheStore *cache.Store + var clusterRoleCacheStore *cache.Store + + if useCache { + roleCacheStore = RoleCacheStoreFromKubeClient(rc) + roleBindingCacheStore = RoleBindingCacheStoreFromKubeClient(rc) + clusterRoleCacheStore = ClusterRoleCacheStoreFromKubeClient(rc) + } + return &RBACService{ - kubeClient: kubeClient, - logger: logger, - metricsRecorder: metricsRecorder, + kubeClient: kubeClient, + logger: logger, + roleCacheStore: roleCacheStore, + roleBindingCacheStore: roleBindingCacheStore, + clusterRoleCacheStore: clusterRoleCacheStore, + metricsRecorder: metricsRecorder, } } func (r *RBACService) GetClusterRole(name string) (*rbacv1.ClusterRole, error) { - clusterRole, err := r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + var clusterRole *rbacv1.ClusterRole + var err error + var exists bool + if r.clusterRoleCacheStore != nil { + + c := *r.clusterRoleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v", name)) + if exists && nil == err { + clusterRole = item.(*rbacv1.ClusterRole) + } + + if !exists { + err = fmt.Errorf("clusterRole %v not found", name) + } + + } else { + clusterRole, err = r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(metrics.NOT_APPLICABLE, "ClusterRole", name, "GET", err, r.metricsRecorder) return clusterRole, err + } func (r *RBACService) GetRole(namespace, name string) (*rbacv1.Role, error) { - role, err := r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var role *rbacv1.Role + var err error + var exists bool + if r.roleCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + role = item.(*rbacv1.Role) + } + if !exists { + err = fmt.Errorf("role %v not found in namespace %v", name, namespace) + } + + } else { + role, err = r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Role", name, "GET", err, r.metricsRecorder) return role, err } func (r *RBACService) GetRoleBinding(namespace, name string) (*rbacv1.RoleBinding, error) { - rolbinding, err := r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var roleBinding *rbacv1.RoleBinding + var err error + var exists bool + if r.roleBindingCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + roleBinding = item.(*rbacv1.RoleBinding) + } + if !exists { + err = fmt.Errorf("role binding %v not found in namespace %v", name, namespace) + } + } else { + roleBinding, err = r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(namespace, "RoleBinding", name, "GET", err, r.metricsRecorder) - return rolbinding, err + return roleBinding, err } func (r *RBACService) DeleteRole(namespace, name string) error { diff --git a/service/k8s/rbac_test.go b/service/k8s/rbac_test.go index d4f942adb..4304594f0 100644 --- a/service/k8s/rbac_test.go +++ b/service/k8s/rbac_test.go @@ -131,7 +131,7 @@ func TestRBACServiceGetCreateOrUpdateRoleBinding(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewRBACService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewRBACService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateRoleBinding(testns, test.rb) if test.expErr { diff --git a/service/k8s/secret.go b/service/k8s/secret.go index 0edd23dea..71543c049 100644 --- a/service/k8s/secret.go +++ b/service/k8s/secret.go @@ -2,12 +2,15 @@ package k8s import ( "context" + "fmt" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Secret interacts with k8s to get secrets @@ -19,26 +22,45 @@ type Secret interface { type SecretService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } -func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *SecretService { +func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *SecretService { logger = logger.With("service", "k8s.secret") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + if useCache { + cacheStore = SecretCacheStoreFromKubeClient(rc) + } return &SecretService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *SecretService) GetSecret(namespace, name string) (*corev1.Secret, error) { + var secret *corev1.Secret + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + secret = item.(*corev1.Secret) + } + if !exists { + err = fmt.Errorf("secret %v not found in namespace %v", name, namespace) + } + } else { + secret, err = s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } - secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) recordMetrics(namespace, "Secret", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err - } return secret, err } diff --git a/service/k8s/secret_test.go b/service/k8s/secret_test.go index f39dd0e71..ab0af96ca 100644 --- a/service/k8s/secret_test.go +++ b/service/k8s/secret_test.go @@ -47,7 +47,7 @@ func TestSecretServiceGet(t *testing.T) { assert.NoError(err) // test getting the secret - service := NewSecretService(mcli, log.Dummy, metrics.Dummy) + service := NewSecretService(mcli, log.Dummy, metrics.Dummy, false) ss, err := service.GetSecret(secret.ObjectMeta.Namespace, secret.ObjectMeta.Name) assert.NotNil(ss) assert.NoError(err) diff --git a/service/k8s/service.go b/service/k8s/service.go index 712cc4c0d..95f1a568f 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,25 +30,47 @@ type Service interface { type ServiceService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewServiceService returns a new Service KubeService. -func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ServiceService { +func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ServiceService { logger = logger.With("service", "k8s.service") + + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + + if useCache { + cacheStore = ServiceCacheStoreFromKubeClient(rc) + } + return &ServiceService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *ServiceService) GetService(namespace string, name string) (*corev1.Service, error) { - service, err := s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var service *corev1.Service + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + service = item.(*corev1.Service) + } + if !exists { + err = fmt.Errorf("svc %v/%v not found", namespace, name) + } + } else { + service, err = s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) return service, err } diff --git a/service/k8s/service_test.go b/service/k8s/service_test.go index 7b3678e7e..fd1aa838e 100644 --- a/service/k8s/service_test.go +++ b/service/k8s/service_test.go @@ -104,7 +104,7 @@ func TestServiceServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewServiceService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewServiceService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateService(testns, test.service) if test.expErr { diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..88f8e4aa0 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -15,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -35,27 +37,48 @@ type StatefulSet interface { type StatefulSetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewStatefulSetService returns a new StatefulSet KubeService. -func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *StatefulSetService { +func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *StatefulSetService { logger = logger.With("service", "k8s.statefulSet") + + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + if useCache { + cacheStore = StatefulSetCacheStoreFromKubeClient(rc) + } return &StatefulSetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetStatefulSet will retrieve the requested statefulset based on namespace and name func (s *StatefulSetService) GetStatefulSet(namespace, name string) (*appsv1.StatefulSet, error) { - statefulSet, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var ss *appsv1.StatefulSet + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + ss = item.(*appsv1.StatefulSet) + } + if !exists { + err = fmt.Errorf("statefulset %s not found in namespace %v", name, namespace) + } + } else { + ss, err = s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return statefulSet, err + + recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) + return ss, err } // GetStatefulSetPods will give a list of pods that are managed by the statefulset diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index 1d6781767..139f4bea5 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -108,7 +108,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateStatefulSet(testns, test.statefulSet) if test.expErr { @@ -207,7 +207,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim) return true, action.(kubetesting.UpdateActionImpl).Object, nil }) - service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) err := service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources) @@ -215,7 +215,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { panic("shouldn't call update") }) - service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) + service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) err = service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) }) diff --git a/service/k8s/util.go b/service/k8s/util.go index 2cd9bbd73..00d7240f2 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -1,11 +1,23 @@ package k8s import ( + "context" "fmt" + "time" redisfailoverv1 "github.com/spotahome/redis-operator/api/redisfailover/v1" "github.com/spotahome/redis-operator/metrics" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // GetRedisPassword retreives password from kubernetes secret or, if @@ -42,3 +54,382 @@ func recordMetrics(namespace string, kind string, object string, operation strin metricsRecorder.RecordK8sOperation(namespace, kind, object, operation, metrics.FAIL, metrics.K8S_MISC) } } + +// type cacheMeta struct { +// objTypeName string +// objType interface{} +// objListType interface{} +// listFunc func(metav1.ListOptions) (interface{}, error) +// watchFunc func(opts metav1.ListOptions) (watch.Interface, error) +// } +// +// var cacheBuilderData = []cacheMeta{ +// { +// objTypeName: "pods", +// objType: corev1.Pod{}, +// objListType: corev1.PodList{}, +// listFunc: func(opts metav1.ListOptions) (*corev1.PodList, error) { +// result := corev1.PodList{} +// err := rc.Get().Resource("pods").Do(context.Background()).Into(&result) +// return &result, err +// }, +// watchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { +// opts.Watch = true +// parameterCodec := runtime.NewParameterCodec(s) +// return rc.Get(). +// Resource("pods"). +// VersionedParams(&opts, parameterCodec). +// Watch(context.Background()) +// }, +// }, +// } + +func PodCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + corev1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("pods"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.PodList, error) { + result := corev1.PodList{} + err := rc.Get().Resource("pods").Do(context.Background()).Into(&result) + return &result, err + } + podCacheStore, podCacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Pod{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go podCacheController.Run(wait.NeverStop) + + return &podCacheStore + +} + +func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + corev1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("services"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ServiceList, error) { + result := corev1.ServiceList{} + err := rc.Get().Resource("services").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Service{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + corev1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("configmaps"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ConfigMapList, error) { + result := corev1.ConfigMapList{} + err := rc.Get().Resource("configmaps").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.ConfigMap{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + appsv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("deployments"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.DeploymentList, error) { + result := appsv1.DeploymentList{} + err := rc.Get().Resource("deployments").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.Deployment{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + policyv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("poddisruptionbudgets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*policyv1.PodDisruptionBudgetList, error) { + result := policyv1.PodDisruptionBudgetList{} + err := rc.Get().Resource("poddisruptionbudgets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &policyv1.PodDisruptionBudget{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + rbacv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("roles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleList, error) { + result := rbacv1.RoleList{} + err := rc.Get().Resource("roles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.Role{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + rbacv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("clusterroles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.ClusterRoleList, error) { + result := rbacv1.ClusterRoleList{} + err := rc.Get().Resource("clusterroles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.ClusterRole{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + rbacv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("rolebindings"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleBindingList, error) { + result := rbacv1.RoleBindingList{} + err := rc.Get().Resource("rolebindings").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.RoleBinding{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} +func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + corev1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("secrets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.SecretList, error) { + result := corev1.SecretList{} + err := rc.Get().Resource("secrets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Secret{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} + +func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { + s := runtime.NewScheme() + appsv1.AddToScheme(s) + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("statefulsets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.StatefulSetList, error) { + result := appsv1.StatefulSetList{} + err := rc.Get().Resource("statefulsets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.StatefulSet{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore +} From 1a4cb2c967f489947c905e43755fbfdaa413e173 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sun, 9 Apr 2023 21:27:02 +0530 Subject: [PATCH 02/12] update configmap caching --- operator/redisfailover/ensurer.go | 1 - service/k8s/configmap.go | 9 +++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/operator/redisfailover/ensurer.go b/operator/redisfailover/ensurer.go index 3d02c8ebe..23b609417 100644 --- a/operator/redisfailover/ensurer.go +++ b/operator/redisfailover/ensurer.go @@ -9,7 +9,6 @@ import ( // Ensure is called to ensure all of the resources associated with a RedisFailover are created func (w *RedisFailoverHandler) Ensure(rf *redisfailoverv1.RedisFailover, labels map[string]string, or []metav1.OwnerReference, metricsClient metrics.Recorder) error { - if rf.Spec.Redis.Exporter.Enabled { if err := w.rfService.EnsureRedisService(rf, labels, or); err != nil { return err diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index c8c35e3bb..cf16b0b35 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -8,6 +8,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" @@ -36,11 +37,11 @@ type ConfigMapService struct { func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { logger = logger.With("service", "k8s.configMap") - // rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cmCacheStore *cache.Store - // if !useCache { - // cmCacheStore = ConfigMapCacheStoreFromKubeClient(rc) - // } + if !useCache { + cmCacheStore = ConfigMapCacheStoreFromKubeClient(rc) + } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, From a6b43e6fe94d5b8b00cee8f866506387f2a05d58 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sun, 9 Apr 2023 21:46:58 +0530 Subject: [PATCH 03/12] handle errors from addToScheme --- service/k8s/configmap.go | 7 +- service/k8s/deployment.go | 6 +- service/k8s/pod.go | 6 +- service/k8s/poddisruptionbudget.go | 6 +- service/k8s/rbac.go | 16 +++- service/k8s/secret.go | 6 +- service/k8s/service.go | 6 +- service/k8s/statefulset.go | 6 +- service/k8s/util.go | 119 +++++++++++++++-------------- 9 files changed, 108 insertions(+), 70 deletions(-) diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index cf16b0b35..78d02769d 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -36,11 +36,14 @@ type ConfigMapService struct { // NewConfigMapService returns a new ConfigMap KubeService. func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { logger = logger.With("service", "k8s.configMap") - + var err error rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cmCacheStore *cache.Store if !useCache { - cmCacheStore = ConfigMapCacheStoreFromKubeClient(rc) + cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &ConfigMapService{ kubeClient: kubeClient, diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 08052a639..4b8705e6d 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -41,8 +41,12 @@ func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, me logger = logger.With("service", "k8s.deployment") rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store + var err error if useCache { - cacheStore = DeploymentCacheStoreFromKubeClient(rc) + cacheStore, err = DeploymentCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &DeploymentService{ kubeClient: kubeClient, diff --git a/service/k8s/pod.go b/service/k8s/pod.go index bd57543b8..126aebad3 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -41,8 +41,12 @@ func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRe logger = logger.With("service", "k8s.pod") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var podCacheStore *cache.Store + var err error if useCache { - podCacheStore = PodCacheStoreFromKubeClient(rc) + podCacheStore, err = PodCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &PodService{ diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index dc0bb7c95..c89db246d 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -38,8 +38,12 @@ func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.L rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store + var err error if useCache { - cacheStore = PodDisruptionBudgetCacheStoreFromKubeClient(rc) + cacheStore, err = PodDisruptionBudgetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &PodDisruptionBudgetService{ diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index fc6ce19ab..65df912e3 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -47,11 +47,21 @@ func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsR var roleCacheStore *cache.Store var roleBindingCacheStore *cache.Store var clusterRoleCacheStore *cache.Store + var err error if useCache { - roleCacheStore = RoleCacheStoreFromKubeClient(rc) - roleBindingCacheStore = RoleBindingCacheStoreFromKubeClient(rc) - clusterRoleCacheStore = ClusterRoleCacheStoreFromKubeClient(rc) + roleCacheStore, err = RoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for roles: %v", err) + } + roleBindingCacheStore, err = RoleBindingCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for rolebinding: %v", err) + } + clusterRoleCacheStore, err = ClusterRoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache cluster role: %v", err) + } } return &RBACService{ diff --git a/service/k8s/secret.go b/service/k8s/secret.go index 71543c049..b7c3492c6 100644 --- a/service/k8s/secret.go +++ b/service/k8s/secret.go @@ -31,8 +31,12 @@ func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metric logger = logger.With("service", "k8s.secret") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store + var err error if useCache { - cacheStore = SecretCacheStoreFromKubeClient(rc) + cacheStore, err = SecretCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &SecretService{ kubeClient: kubeClient, diff --git a/service/k8s/service.go b/service/k8s/service.go index 95f1a568f..17e23caa1 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -40,9 +40,13 @@ func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metri rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store + var err error if useCache { - cacheStore = ServiceCacheStoreFromKubeClient(rc) + cacheStore, err = ServiceCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &ServiceService{ diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 88f8e4aa0..54f496ba7 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -47,8 +47,12 @@ func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, m rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store + var err error if useCache { - cacheStore = StatefulSetCacheStoreFromKubeClient(rc) + cacheStore, err = StatefulSetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } } return &StatefulSetService{ kubeClient: kubeClient, diff --git a/service/k8s/util.go b/service/k8s/util.go index 00d7240f2..2505d3a07 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -55,38 +55,12 @@ func recordMetrics(namespace string, kind string, object string, operation strin } } -// type cacheMeta struct { -// objTypeName string -// objType interface{} -// objListType interface{} -// listFunc func(metav1.ListOptions) (interface{}, error) -// watchFunc func(opts metav1.ListOptions) (watch.Interface, error) -// } -// -// var cacheBuilderData = []cacheMeta{ -// { -// objTypeName: "pods", -// objType: corev1.Pod{}, -// objListType: corev1.PodList{}, -// listFunc: func(opts metav1.ListOptions) (*corev1.PodList, error) { -// result := corev1.PodList{} -// err := rc.Get().Resource("pods").Do(context.Background()).Into(&result) -// return &result, err -// }, -// watchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { -// opts.Watch = true -// parameterCodec := runtime.NewParameterCodec(s) -// return rc.Get(). -// Resource("pods"). -// VersionedParams(&opts, parameterCodec). -// Watch(context.Background()) -// }, -// }, -// } - -func PodCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - corev1.AddToScheme(s) + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -116,13 +90,16 @@ func PodCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go podCacheController.Run(wait.NeverStop) - return &podCacheStore + return &podCacheStore, nil } -func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - corev1.AddToScheme(s) + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -152,12 +129,15 @@ func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, nil } -func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - corev1.AddToScheme(s) + err := corev1.AddToScheme(s) + if err != nil { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -187,12 +167,15 @@ func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, nil } -func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - appsv1.AddToScheme(s) + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -222,12 +205,15 @@ func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, nil } -func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - policyv1.AddToScheme(s) + err := policyv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -257,12 +243,15 @@ func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Sto go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, nil } -func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - rbacv1.AddToScheme(s) + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -292,12 +281,15 @@ func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, nil } -func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - rbacv1.AddToScheme(s) + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -327,12 +319,15 @@ func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, err } -func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - rbacv1.AddToScheme(s) + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -362,11 +357,14 @@ func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, err } -func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - corev1.AddToScheme(s) + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -396,12 +394,15 @@ func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, err } -func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { +func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { s := runtime.NewScheme() - appsv1.AddToScheme(s) + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) @@ -431,5 +432,5 @@ func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) *cache.Store { go cacheController.Run(wait.NeverStop) - return &cacheStore + return &cacheStore, err } From 2198868c1b8d3d5a0fe56487eb10a7e922ead7e1 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sun, 9 Apr 2023 22:00:55 +0530 Subject: [PATCH 04/12] disable caching cms temporarily --- service/k8s/configmap.go | 17 ++++++++--------- test/integration/redisfailover/creation_test.go | 2 +- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 78d02769d..aae49a5d3 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -8,7 +8,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" @@ -36,15 +35,15 @@ type ConfigMapService struct { // NewConfigMapService returns a new ConfigMap KubeService. func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { logger = logger.With("service", "k8s.configMap") - var err error - rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + // var err error + // rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cmCacheStore *cache.Store - if !useCache { - cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) - if err != nil { - logger.Errorf("unable to initialize cache: %v", err) - } - } + // if !useCache { + // cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) + // if err != nil { + // logger.Errorf("unable to initialize cache: %v", err) + // } + // } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, diff --git a/test/integration/redisfailover/creation_test.go b/test/integration/redisfailover/creation_test.go index 5458634ba..c63af3f64 100644 --- a/test/integration/redisfailover/creation_test.go +++ b/test/integration/redisfailover/creation_test.go @@ -94,7 +94,7 @@ func TestRedisFailover(t *testing.T) { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy, true) // Prepare namespace prepErr := clients.prepareNS() From 6918a1e4796b22e983470389aa8c8ea54c68f442 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Sun, 9 Apr 2023 22:08:13 +0530 Subject: [PATCH 05/12] remove unwanted logs --- operator/redisfailover/service/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/operator/redisfailover/service/client.go b/operator/redisfailover/service/client.go index 1ed7d548e..22c8d60ea 100644 --- a/operator/redisfailover/service/client.go +++ b/operator/redisfailover/service/client.go @@ -158,7 +158,6 @@ func (r *RedisFailoverKubeClient) EnsureNotPresentRedisService(rf *redisfailover namespace := rf.Namespace // If the service exists (no get error), delete it if _, err := r.K8SService.GetService(namespace, name); err == nil { - r.logger.Infof("deleting svc %v...", name) return r.K8SService.DeleteService(namespace, name) } return nil From ea89119c60b72e704d1e1f20ac870ea0096d35f2 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Mon, 10 Apr 2023 11:51:29 +0530 Subject: [PATCH 06/12] account for usage of fake clients --- service/k8s/configmap.go | 19 ++++++++-------- service/k8s/pod.go | 1 + service/k8s/util.go | 47 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 56 insertions(+), 11 deletions(-) diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index aae49a5d3..081023dc1 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -8,10 +8,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/rest" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/tools/cache" ) // ConfigMap the ServiceAccount service that knows how to interact with k8s to manage them @@ -35,15 +36,15 @@ type ConfigMapService struct { // NewConfigMapService returns a new ConfigMap KubeService. func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { logger = logger.With("service", "k8s.configMap") - // var err error - // rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var err error + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cmCacheStore *cache.Store - // if !useCache { - // cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) - // if err != nil { - // logger.Errorf("unable to initialize cache: %v", err) - // } - // } + if !useCache { + cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 126aebad3..279bffe64 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -40,6 +40,7 @@ type PodService struct { func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *PodService { logger = logger.With("service", "k8s.pod") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + fmt.Printf("[POD]-- rest client interface: %v\n", rc) var podCacheStore *cache.Store var err error if useCache { diff --git a/service/k8s/util.go b/service/k8s/util.go index 2505d3a07..30cdde2b0 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -56,6 +56,10 @@ func recordMetrics(namespace string, kind string, object string, operation strin } func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := corev1.AddToScheme(s) if nil != err { @@ -95,6 +99,10 @@ func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { } func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := corev1.AddToScheme(s) if nil != err { @@ -133,6 +141,10 @@ func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) } func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := corev1.AddToScheme(s) if err != nil { @@ -142,13 +154,16 @@ func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error opts.Watch = true parameterCodec := runtime.NewParameterCodec(s) return rc.Get(). - Resource("configmaps"). + Resource("configmap"). VersionedParams(&opts, parameterCodec). Watch(context.Background()) } listFunc := func(opts metav1.ListOptions) (*corev1.ConfigMapList, error) { + fmt.Printf("cm lister calling...") + fmt.Printf("resr client: %v...", rc) result := corev1.ConfigMapList{} - err := rc.Get().Resource("configmaps").Do(context.Background()).Into(&result) + err := rc.Get().Resource("configmap").Do(context.Background()).Into(&result) + fmt.Printf("cm lister called; error found: %v\n", err) return &result, err } cacheStore, cacheController := cache.NewInformer( @@ -171,6 +186,10 @@ func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error } func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := appsv1.AddToScheme(s) if nil != err { @@ -209,6 +228,10 @@ func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, erro } func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := policyv1.AddToScheme(s) if nil != err { @@ -247,6 +270,10 @@ func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.St } func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := rbacv1.AddToScheme(s) if nil != err { @@ -285,6 +312,10 @@ func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { } func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := rbacv1.AddToScheme(s) if nil != err { @@ -323,6 +354,10 @@ func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, err } func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := rbacv1.AddToScheme(s) if nil != err { @@ -360,6 +395,10 @@ func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, err return &cacheStore, err } func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := corev1.AddToScheme(s) if nil != err { @@ -398,6 +437,10 @@ func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { } func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } s := runtime.NewScheme() err := appsv1.AddToScheme(s) if nil != err { From fd4dac20e80c7c2af2881c1116b81e8847b5457f Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 09:29:34 +0530 Subject: [PATCH 07/12] add possible improvements in future versions --- service/k8s/util.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/k8s/util.go b/service/k8s/util.go index 30cdde2b0..397301c8a 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -55,6 +55,8 @@ func recordMetrics(namespace string, kind string, object string, operation strin } } +// TODO: Update *CacheStoreFromKubeClient be implemented via generics + func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { if rc == nil { // this case usually happens during testing where dummy / fake clientsets are used From 14852791947672409a033c396a5eff76b71457dc Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 11:24:56 +0530 Subject: [PATCH 08/12] [optionally] compute hashes of objects and apply only when there is a diff --- cmd/redisoperator/main.go | 2 +- cmd/utils/flags.go | 2 + service/k8s/configmap.go | 8 ++++ service/k8s/deployment.go | 9 +++++ service/k8s/hash_annotations.go | 62 ++++++++++++++++++++++++++++++ service/k8s/k8s.go | 11 +++++- service/k8s/poddisruptionbudget.go | 9 +++++ service/k8s/rbac.go | 17 ++++++++ service/k8s/service.go | 9 +++++ service/k8s/statefulset.go | 11 ++++++ 10 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 service/k8s/hash_annotations.go diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 4137161cc..ba1232fb4 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -79,7 +79,7 @@ func (m *Main) Run() error { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.EnableObjectHashing) // Create the redis clients redisClient := redis.New(metricsRecorder) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 578fe0e62..e326728a1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -19,6 +19,7 @@ type CMDFlags struct { K8sQueriesBurstable int Concurrency int LogLevel string + EnableObjectHashing bool } // Init initializes and parse the flags @@ -35,6 +36,7 @@ func (c *CMDFlags) Init() { // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") flag.StringVar(&c.LogLevel, "log-level", "info", "set log level") + flag.BoolVar(&c.EnableObjectHashing, "enable-hash", false, "Add hashed annotations to k8s objects, apply changes only when theres a diff.") // Parse flags flag.Parse() } diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 1b6fc1424..7c4837f4f 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -76,6 +76,14 @@ func (p *ConfigMapService) CreateOrUpdateConfigMap(namespace string, configMap * return err } + if hashingEnabled() { + if !shouldUpdate(configMap, storedConfigMap) { + p.logger.Debugf("%v/%v configmap is upto date, no need to apply changes...", configMap.Namespace, configMap.Name) + return nil + } + p.logger.Debugf("%v/%v configmap has a different resource hash, updating the object...", configMap.Namespace, configMap.Name) + addHashAnnotation(configMap) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 46d63bb93..a33b47426 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -101,6 +101,15 @@ func (d *DeploymentService) CreateOrUpdateDeployment(namespace string, deploymen return err } + if hashingEnabled() { + if !shouldUpdate(deployment, storedDeployment) { + d.logger.Debugf("%v/%v deployment is upto date, no need to apply changes...", deployment.Namespace, deployment.Name) + return nil + } + d.logger.Debugf("%v/%v deployment has a different resource hash, updating the object...", deployment.Namespace, deployment.Name) + addHashAnnotation(deployment) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/hash_annotations.go b/service/k8s/hash_annotations.go new file mode 100644 index 000000000..f0ec132fc --- /dev/null +++ b/service/k8s/hash_annotations.go @@ -0,0 +1,62 @@ +package k8s + +import ( + "crypto/sha256" + "encoding/base64" + "hash" + + "github.com/davecgh/go-spew/spew" +) + +type Annotated interface { + GetAnnotations() map[string]string + SetAnnotations(annotations map[string]string) + GetName() string +} + +const resourceHashAnnotationKey = "databases.spotahome.com/resource-hash" + +// Create hash of a given object + +func addHashAnnotation(r Annotated) { + hash := deepHashString(r) + m := r.GetAnnotations() + if m == nil { + m = map[string]string{} + } + m[resourceHashAnnotationKey] = hash + r.SetAnnotations(m) +} + +func deepHashString(obj interface{}) string { + hasher := sha256.New() + deepHashObject(hasher, obj) + hashBytes := hasher.Sum([]byte{}) + b64Hash := base64.StdEncoding.EncodeToString(hashBytes) + return b64Hash +} + +// DeepHashObject writes specified object to hash using the spew library +// which follows pointers and prints actual values of the nested objects +// ensuring the hash does not change when a pointer changes. +func deepHashObject(hasher hash.Hash, objectToWrite interface{}) { + hasher.Reset() + printer := spew.ConfigState{ + Indent: " ", + SortKeys: true, + DisableMethods: true, + SpewKeys: true, + } + printer.Fprintf(hasher, "%#v", objectToWrite) +} + +func shouldUpdate(desired Annotated, stored Annotated) bool { + + storedHash, exists := stored.GetAnnotations()[resourceHashAnnotationKey] + if !exists { + return true + } + desiredHash := deepHashString(desired) + + return desiredHash != storedHash +} diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index b6e68ae44..f0c70842d 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -22,6 +22,14 @@ type Services interface { StatefulSet } +var ( + objectHashingEnabled bool +) + +func hashingEnabled() bool { + return objectHashingEnabled +} + type services struct { ConfigMap Secret @@ -35,7 +43,8 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services { +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, enableHashing bool) Services { + objectHashingEnabled = enableHashing return &services{ ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), Secret: NewSecretService(kubecli, logger, metricsRecorder), diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index 48350bc43..98b4f8763 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -77,6 +77,15 @@ func (p *PodDisruptionBudgetService) CreateOrUpdatePodDisruptionBudget(namespace return err } + if hashingEnabled() { + if !shouldUpdate(podDisruptionBudget, storedPodDisruptionBudget) { + p.logger.Debugf("%v/%v pdb is upto date, no need to apply changes...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + return nil + } + p.logger.Debugf("%v/%v pdb has a different resource hash, updating the object...", podDisruptionBudget.Namespace, podDisruptionBudget.Name) + addHashAnnotation(podDisruptionBudget) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index a5534b445..0120e9444 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -100,6 +100,14 @@ func (r *RBACService) CreateOrUpdateRole(namespace string, role *rbacv1.Role) er return err } + if hashingEnabled() { + if !shouldUpdate(role, storedRole) { + r.logger.Debugf("%v/%v role is upto date, no need to apply changes...", role.Namespace, role.Name) + return nil + } + r.logger.Debugf("%v/%v role has a different resource hash, updating the object...", role.Namespace, role.Name) + addHashAnnotation(role) + } // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), @@ -148,6 +156,15 @@ func (r *RBACService) CreateOrUpdateRoleBinding(namespace string, binding *rbacv return err } + if hashingEnabled() { + if !shouldUpdate(binding, storedBinding) { + r.logger.Debugf("%v/%v rolebinding is upto date, no need to apply changes...", binding.Namespace, binding.Name) + return nil + } + r.logger.Debugf("%v/%v rolebinding has a different resource hash, updating the object...", binding.Namespace, binding.Name) + addHashAnnotation(binding) + } + // Check if the role ref has changed, roleref updates are not allowed, if changed then delete and create again the role binding. // https://github.com/kubernetes/kubernetes/blob/0f0a5223dfc75337d03c9b80ae552ae8ef138eeb/pkg/apis/rbac/validation/validation.go#L157-L159 if storedBinding.RoleRef != binding.RoleRef { diff --git a/service/k8s/service.go b/service/k8s/service.go index 712cc4c0d..0959c9203 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -90,6 +90,15 @@ func (s *ServiceService) CreateOrUpdateService(namespace string, service *corev1 return err } + if hashingEnabled() { + if !shouldUpdate(service, storedService) { + s.logger.Debugf("%v/%v service is upto date, no need to apply changes...", service.Namespace, service.Name) + return nil + } + s.logger.Debugf("%v/%v service has a different resource hash, updating the object...", service.Namespace, service.Name) + addHashAnnotation(service) + } + // Already exists, need to Update. // Set the correct resource version to ensure we are on the latest version. This way the only valid // namespace is our spec(https://github.com/kubernetes/community/blob/master/contributors/devel/api-conventions.md#concurrency-control-and-consistency), diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..4af1d00ac 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -97,6 +97,7 @@ func (s *StatefulSetService) UpdateStatefulSet(namespace string, statefulSet *ap // CreateOrUpdateStatefulSet will update the statefulset or create it if does not exist func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefulSet *appsv1.StatefulSet) error { storedStatefulSet, err := s.GetStatefulSet(namespace, statefulSet.Name) + if err != nil { // If no resource we need to create. if errors.IsNotFound(err) { @@ -171,6 +172,16 @@ func (s *StatefulSetService) CreateOrUpdateStatefulSet(namespace string, statefu // set stored.volumeClaimTemplates statefulSet.Spec.VolumeClaimTemplates = storedStatefulSet.Spec.VolumeClaimTemplates statefulSet.Annotations = util.MergeAnnotations(storedStatefulSet.Annotations, statefulSet.Annotations) + + if hashingEnabled() { + if !shouldUpdate(statefulSet, storedStatefulSet) { + s.logger.Debugf("%v/%v statefulset is upto date, no need to apply changes...", statefulSet.Namespace, statefulSet.Name) + return nil + } + s.logger.Debugf("%v/%v statefulset has a different resource hash, updating the object...", statefulSet.Namespace, statefulSet.Name) + addHashAnnotation(statefulSet) + } + return s.UpdateStatefulSet(namespace, statefulSet) } From ef0e26ed4444c2bb70f6c33456e73b4c1a5fee26 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 11:26:01 +0530 Subject: [PATCH 09/12] add source material reference --- service/k8s/hash_annotations.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/k8s/hash_annotations.go b/service/k8s/hash_annotations.go index f0ec132fc..b3449d7ec 100644 --- a/service/k8s/hash_annotations.go +++ b/service/k8s/hash_annotations.go @@ -8,6 +8,8 @@ import ( "github.com/davecgh/go-spew/spew" ) +// taken from https://github.com/k8ssandra/cass-operator/blob/master/pkg/utils/hash_annotation.go + type Annotated interface { GetAnnotations() map[string]string SetAnnotations(annotations map[string]string) From c26ad45ca97183b420ca3df66edd3665bb12d666 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 12:00:20 +0530 Subject: [PATCH 10/12] update use cache evaluation --- service/k8s/configmap.go | 4 ++-- service/k8s/configmap_test.go | 2 +- service/k8s/deployment.go | 4 ++-- service/k8s/deployment_test.go | 2 +- service/k8s/k8s.go | 27 ++++++++++++++++--------- service/k8s/pod.go | 4 ++-- service/k8s/pod_test.go | 2 +- service/k8s/poddisruptionbudget.go | 4 ++-- service/k8s/poddisruptionbudget_test.go | 2 +- service/k8s/rbac.go | 4 ++-- service/k8s/rbac_test.go | 2 +- service/k8s/secret.go | 4 ++-- service/k8s/service.go | 4 ++-- service/k8s/statefulset.go | 4 ++-- 14 files changed, 39 insertions(+), 30 deletions(-) diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 081023dc1..066041e48 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -34,12 +34,12 @@ type ConfigMapService struct { } // NewConfigMapService returns a new ConfigMap KubeService. -func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ConfigMapService { +func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService { logger = logger.With("service", "k8s.configMap") var err error rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cmCacheStore *cache.Store - if !useCache { + if ShouldUseCache() { cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/configmap_test.go b/service/k8s/configmap_test.go index 4efad90ca..da0701cf5 100644 --- a/service/k8s/configmap_test.go +++ b/service/k8s/configmap_test.go @@ -104,7 +104,7 @@ func TestConfigMapServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewConfigMapService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewConfigMapService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateConfigMap(testns, test.configMap) if test.expErr { diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 4b8705e6d..0b0da975b 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -37,12 +37,12 @@ type DeploymentService struct { } // NewDeploymentService returns a new Deployment KubeService. -func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *DeploymentService { +func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService { logger = logger.With("service", "k8s.deployment") rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { cacheStore, err = DeploymentCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/deployment_test.go b/service/k8s/deployment_test.go index 657e04f4c..14be68360 100644 --- a/service/k8s/deployment_test.go +++ b/service/k8s/deployment_test.go @@ -104,7 +104,7 @@ func TestDeploymentServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewDeploymentService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewDeploymentService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateDeployment(testns, test.deployment) if test.expErr { diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index 447fbffd0..323338302 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -9,6 +9,14 @@ import ( "github.com/spotahome/redis-operator/metrics" ) +var ( + useCache bool +) + +func ShouldUseCache() bool { + return useCache +} + // Service is the K8s service entrypoint. type Services interface { ConfigMap @@ -35,16 +43,17 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) Services { +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, cacheEnabled bool) Services { + useCache = cacheEnabled return &services{ - ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder, useCache), - Secret: NewSecretService(kubecli, logger, metricsRecorder, useCache), - Pod: NewPodService(kubecli, logger, metricsRecorder, useCache), - PodDisruptionBudget: NewPodDisruptionBudgetService(kubecli, logger, metricsRecorder, useCache), + ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), + Secret: NewSecretService(kubecli, logger, metricsRecorder), + Pod: NewPodService(kubecli, logger, metricsRecorder), + PodDisruptionBudget: NewPodDisruptionBudgetService(kubecli, logger, metricsRecorder), RedisFailover: NewRedisFailoverService(crdcli, logger, metricsRecorder), - Service: NewServiceService(kubecli, logger, metricsRecorder, useCache), - RBAC: NewRBACService(kubecli, logger, metricsRecorder, useCache), - Deployment: NewDeploymentService(kubecli, logger, metricsRecorder, useCache), - StatefulSet: NewStatefulSetService(kubecli, logger, metricsRecorder, useCache), + Service: NewServiceService(kubecli, logger, metricsRecorder), + RBAC: NewRBACService(kubecli, logger, metricsRecorder), + Deployment: NewDeploymentService(kubecli, logger, metricsRecorder), + StatefulSet: NewStatefulSetService(kubecli, logger, metricsRecorder), } } diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 279bffe64..97e3b1672 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -37,13 +37,13 @@ type PodService struct { } // NewPodService returns a new Pod KubeService. -func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *PodService { +func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService { logger = logger.With("service", "k8s.pod") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) fmt.Printf("[POD]-- rest client interface: %v\n", rc) var podCacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { podCacheStore, err = PodCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/pod_test.go b/service/k8s/pod_test.go index 77b757845..eaf4e7feb 100644 --- a/service/k8s/pod_test.go +++ b/service/k8s/pod_test.go @@ -104,7 +104,7 @@ func TestPodServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewPodService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewPodService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdatePod(testns, test.pod) if test.expErr { diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index c89db246d..20eb56b06 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -33,13 +33,13 @@ type PodDisruptionBudgetService struct { } // NewPodDisruptionBudgetService returns a new PodDisruptionBudget KubeService. -func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *PodDisruptionBudgetService { +func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodDisruptionBudgetService { logger = logger.With("service", "k8s.podDisruptionBudget") rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { cacheStore, err = PodDisruptionBudgetCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/poddisruptionbudget_test.go b/service/k8s/poddisruptionbudget_test.go index 8848a68a1..c2427859d 100644 --- a/service/k8s/poddisruptionbudget_test.go +++ b/service/k8s/poddisruptionbudget_test.go @@ -102,7 +102,7 @@ func TestPodDisruptionBudgetServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewPodDisruptionBudgetService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewPodDisruptionBudgetService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdatePodDisruptionBudget(testns, test.podDisruptionBudget) if test.expErr { diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index 65df912e3..7b8820857 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -39,7 +39,7 @@ type RBACService struct { } // NewRBACService returns a new RBAC KubeService. -func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *RBACService { +func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *RBACService { logger = logger.With("service", "k8s.rbac") rc := kubeClient.RbacV1().RESTClient().(*rest.RESTClient) @@ -49,7 +49,7 @@ func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsR var clusterRoleCacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { roleCacheStore, err = RoleCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache for roles: %v", err) diff --git a/service/k8s/rbac_test.go b/service/k8s/rbac_test.go index 4304594f0..d4f942adb 100644 --- a/service/k8s/rbac_test.go +++ b/service/k8s/rbac_test.go @@ -131,7 +131,7 @@ func TestRBACServiceGetCreateOrUpdateRoleBinding(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewRBACService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewRBACService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateRoleBinding(testns, test.rb) if test.expErr { diff --git a/service/k8s/secret.go b/service/k8s/secret.go index b7c3492c6..6b540a51f 100644 --- a/service/k8s/secret.go +++ b/service/k8s/secret.go @@ -26,13 +26,13 @@ type SecretService struct { metricsRecorder metrics.Recorder } -func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *SecretService { +func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *SecretService { logger = logger.With("service", "k8s.secret") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { cacheStore, err = SecretCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/service.go b/service/k8s/service.go index 17e23caa1..d72b14181 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -35,14 +35,14 @@ type ServiceService struct { } // NewServiceService returns a new Service KubeService. -func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *ServiceService { +func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ServiceService { logger = logger.With("service", "k8s.service") rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { cacheStore, err = ServiceCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 54f496ba7..08b79cdec 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -42,13 +42,13 @@ type StatefulSetService struct { } // NewStatefulSetService returns a new StatefulSet KubeService. -func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder, useCache bool) *StatefulSetService { +func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *StatefulSetService { logger = logger.With("service", "k8s.statefulSet") rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) var cacheStore *cache.Store var err error - if useCache { + if ShouldUseCache() { cacheStore, err = StatefulSetCacheStoreFromKubeClient(rc) if err != nil { logger.Errorf("unable to initialize cache: %v", err) From 2fe13d8c232bf3ea80c77045833a61c6e2fcaf05 Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 12:10:13 +0530 Subject: [PATCH 11/12] fix fn call in test case --- service/k8s/secret_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/k8s/secret_test.go b/service/k8s/secret_test.go index ab0af96ca..f39dd0e71 100644 --- a/service/k8s/secret_test.go +++ b/service/k8s/secret_test.go @@ -47,7 +47,7 @@ func TestSecretServiceGet(t *testing.T) { assert.NoError(err) // test getting the secret - service := NewSecretService(mcli, log.Dummy, metrics.Dummy, false) + service := NewSecretService(mcli, log.Dummy, metrics.Dummy) ss, err := service.GetSecret(secret.ObjectMeta.Namespace, secret.ObjectMeta.Name) assert.NotNil(ss) assert.NoError(err) From 84d2455cb0373ea148094035af972ced35260b2e Mon Sep 17 00:00:00 2001 From: corneredrat Date: Wed, 12 Apr 2023 12:13:24 +0530 Subject: [PATCH 12/12] fix fn call in test case --- service/k8s/service_test.go | 2 +- service/k8s/statefulset_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/service/k8s/service_test.go b/service/k8s/service_test.go index fd1aa838e..7b3678e7e 100644 --- a/service/k8s/service_test.go +++ b/service/k8s/service_test.go @@ -104,7 +104,7 @@ func TestServiceServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewServiceService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewServiceService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateService(testns, test.service) if test.expErr { diff --git a/service/k8s/statefulset_test.go b/service/k8s/statefulset_test.go index 139f4bea5..1d6781767 100644 --- a/service/k8s/statefulset_test.go +++ b/service/k8s/statefulset_test.go @@ -108,7 +108,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { return true, nil, test.errorOnCreation }) - service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateStatefulSet(testns, test.statefulSet) if test.expErr { @@ -207,7 +207,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { pvcList.Items[0] = *action.(kubetesting.UpdateActionImpl).Object.(*v1.PersistentVolumeClaim) return true, action.(kubetesting.UpdateActionImpl).Object, nil }) - service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) + service := k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err := service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) assert.Equal(pvcList.Items[0].Spec.Resources, pvcList.Items[1].Spec.Resources) @@ -215,7 +215,7 @@ func TestStatefulSetServiceGetCreateOrUpdate(t *testing.T) { mcli.AddReactor("update", "persistentvolumeclaims", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) { panic("shouldn't call update") }) - service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy, false) + service = k8s.NewStatefulSetService(mcli, log.Dummy, metrics.Dummy) err = service.CreateOrUpdateStatefulSet(testns, afterSts) assert.NoError(err) })