diff --git a/cmd/redisoperator/main.go b/cmd/redisoperator/main.go index 4137161cc..fd77062a6 100644 --- a/cmd/redisoperator/main.go +++ b/cmd/redisoperator/main.go @@ -79,7 +79,7 @@ func (m *Main) Run() error { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.UseCache) // Create the redis clients redisClient := redis.New(metricsRecorder) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 578fe0e62..6ce8673c8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -19,6 +19,7 @@ type CMDFlags struct { K8sQueriesBurstable int Concurrency int LogLevel string + UseCache bool } // Init initializes and parse the flags @@ -35,6 +36,7 @@ func (c *CMDFlags) Init() { // reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89 flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events") flag.StringVar(&c.LogLevel, "log-level", "info", "set log level") + flag.BoolVar(&c.UseCache, "use-cache", false, "use cache stores to get k8s objects") // Parse flags flag.Parse() } diff --git a/service/k8s/configmap.go b/service/k8s/configmap.go index 1b6fc1424..066041e48 100644 --- a/service/k8s/configmap.go +++ b/service/k8s/configmap.go @@ -2,14 +2,17 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/tools/cache" ) // ConfigMap the ServiceAccount service that knows how to interact with k8s to manage them @@ -26,26 +29,51 @@ type ConfigMap interface { type ConfigMapService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewConfigMapService returns a new ConfigMap KubeService. func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService { logger = logger.With("service", "k8s.configMap") + var err error + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cmCacheStore *cache.Store + if ShouldUseCache() { + cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &ConfigMapService{ kubeClient: kubeClient, logger: logger, + cacheStore: cmCacheStore, metricsRecorder: metricsRecorder, } } func (p *ConfigMapService) GetConfigMap(namespace string, name string) (*corev1.ConfigMap, error) { - configMap, err := p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var cm *corev1.ConfigMap + var err error + var exists bool + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + cm = item.(*corev1.ConfigMap) + } + if !exists { + err = fmt.Errorf("configmap %v not found in namespace %v", name, namespace) + } + } else { + cm, err = p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return configMap, err + + recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder) + + return cm, err } func (p *ConfigMapService) CreateConfigMap(namespace string, configMap *corev1.ConfigMap) error { diff --git a/service/k8s/deployment.go b/service/k8s/deployment.go index 46d63bb93..0b0da975b 100644 --- a/service/k8s/deployment.go +++ b/service/k8s/deployment.go @@ -10,6 +10,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -30,26 +32,49 @@ type Deployment interface { type DeploymentService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewDeploymentService returns a new Deployment KubeService. func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService { logger = logger.With("service", "k8s.deployment") + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = DeploymentCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &DeploymentService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetDeployment will retrieve the requested deployment based on namespace and name func (d *DeploymentService) GetDeployment(namespace, name string) (*appsv1.Deployment, error) { - deployment, err := d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) - if err != nil { - return nil, err + var deployment *appsv1.Deployment + var err error + var exists bool + if d.cacheStore != nil { + c := *d.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + deployment = item.(*appsv1.Deployment) + } + if !exists { + err = fmt.Errorf("deployment %v not found in namespace %v", name, namespace) + } + } else { + deployment, err = d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder) return deployment, err } diff --git a/service/k8s/k8s.go b/service/k8s/k8s.go index b6e68ae44..323338302 100644 --- a/service/k8s/k8s.go +++ b/service/k8s/k8s.go @@ -9,6 +9,14 @@ import ( "github.com/spotahome/redis-operator/metrics" ) +var ( + useCache bool +) + +func ShouldUseCache() bool { + return useCache +} + // Service is the K8s service entrypoint. type Services interface { ConfigMap @@ -35,7 +43,8 @@ type services struct { } // New returns a new Kubernetes service. -func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services { +func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, cacheEnabled bool) Services { + useCache = cacheEnabled return &services{ ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder), Secret: NewSecretService(kubecli, logger, metricsRecorder), diff --git a/service/k8s/pod.go b/service/k8s/pod.go index 0583302ce..97e3b1672 100644 --- a/service/k8s/pod.go +++ b/service/k8s/pod.go @@ -3,16 +3,18 @@ package k8s import ( "context" "encoding/json" + "fmt" "k8s.io/apimachinery/pkg/types" + "github.com/spotahome/redis-operator/log" + "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - - "github.com/spotahome/redis-operator/log" - "github.com/spotahome/redis-operator/metrics" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Pod the ServiceAccount service that knows how to interact with k8s to manage them @@ -30,25 +32,52 @@ type Pod interface { type PodService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodService returns a new Pod KubeService. func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService { logger = logger.With("service", "k8s.pod") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + fmt.Printf("[POD]-- rest client interface: %v\n", rc) + var podCacheStore *cache.Store + var err error + if ShouldUseCache() { + podCacheStore, err = PodCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &PodService{ kubeClient: kubeClient, logger: logger, + cacheStore: podCacheStore, metricsRecorder: metricsRecorder, } } func (p *PodService) GetPod(namespace string, name string) (*corev1.Pod, error) { - pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var pod *corev1.Pod + var err error + var exists bool + if p.cacheStore != nil { + + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + pod = item.(*corev1.Pod) + } + if !exists { + err = fmt.Errorf("pod %v not found in namespace %v", name, namespace) + } + } else { + pod, err = p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder) return pod, err } diff --git a/service/k8s/poddisruptionbudget.go b/service/k8s/poddisruptionbudget.go index 48350bc43..20eb56b06 100644 --- a/service/k8s/poddisruptionbudget.go +++ b/service/k8s/poddisruptionbudget.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -25,26 +28,54 @@ type PodDisruptionBudget interface { type PodDisruptionBudgetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewPodDisruptionBudgetService returns a new PodDisruptionBudget KubeService. func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodDisruptionBudgetService { logger = logger.With("service", "k8s.podDisruptionBudget") + + rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = PodDisruptionBudgetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &PodDisruptionBudgetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (p *PodDisruptionBudgetService) GetPodDisruptionBudget(namespace string, name string) (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudget, err := p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) - if err != nil { - return nil, err + var podDisruptionBudget *policyv1.PodDisruptionBudget + var err error + var exists bool + + if p.cacheStore != nil { + c := *p.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + podDisruptionBudget = item.(*policyv1.PodDisruptionBudget) + } + if !exists { + + err = fmt.Errorf("podDisruptionBudget %v not found in namespace %v", name, namespace) + } + } else { + podDisruptionBudget, err = p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return podDisruptionBudget, nil + recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder) + + return podDisruptionBudget, err } func (p *PodDisruptionBudgetService) CreatePodDisruptionBudget(namespace string, podDisruptionBudget *policyv1.PodDisruptionBudget) error { diff --git a/service/k8s/rbac.go b/service/k8s/rbac.go index a5534b445..7b8820857 100644 --- a/service/k8s/rbac.go +++ b/service/k8s/rbac.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,37 +30,117 @@ type RBAC interface { // NamespaceService is the Namespace service implementation using API calls to kubernetes. type RBACService struct { - kubeClient kubernetes.Interface - logger log.Logger - metricsRecorder metrics.Recorder + kubeClient kubernetes.Interface + logger log.Logger + roleCacheStore *cache.Store + roleBindingCacheStore *cache.Store + clusterRoleCacheStore *cache.Store + metricsRecorder metrics.Recorder } // NewRBACService returns a new RBAC KubeService. func NewRBACService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *RBACService { logger = logger.With("service", "k8s.rbac") + + rc := kubeClient.RbacV1().RESTClient().(*rest.RESTClient) + + var roleCacheStore *cache.Store + var roleBindingCacheStore *cache.Store + var clusterRoleCacheStore *cache.Store + var err error + + if ShouldUseCache() { + roleCacheStore, err = RoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for roles: %v", err) + } + roleBindingCacheStore, err = RoleBindingCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache for rolebinding: %v", err) + } + clusterRoleCacheStore, err = ClusterRoleCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache cluster role: %v", err) + } + } + return &RBACService{ - kubeClient: kubeClient, - logger: logger, - metricsRecorder: metricsRecorder, + kubeClient: kubeClient, + logger: logger, + roleCacheStore: roleCacheStore, + roleBindingCacheStore: roleBindingCacheStore, + clusterRoleCacheStore: clusterRoleCacheStore, + metricsRecorder: metricsRecorder, } } func (r *RBACService) GetClusterRole(name string) (*rbacv1.ClusterRole, error) { - clusterRole, err := r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + var clusterRole *rbacv1.ClusterRole + var err error + var exists bool + if r.clusterRoleCacheStore != nil { + + c := *r.clusterRoleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v", name)) + if exists && nil == err { + clusterRole = item.(*rbacv1.ClusterRole) + } + + if !exists { + err = fmt.Errorf("clusterRole %v not found", name) + } + + } else { + clusterRole, err = r.kubeClient.RbacV1().ClusterRoles().Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(metrics.NOT_APPLICABLE, "ClusterRole", name, "GET", err, r.metricsRecorder) return clusterRole, err + } func (r *RBACService) GetRole(namespace, name string) (*rbacv1.Role, error) { - role, err := r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var role *rbacv1.Role + var err error + var exists bool + if r.roleCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + role = item.(*rbacv1.Role) + } + if !exists { + err = fmt.Errorf("role %v not found in namespace %v", name, namespace) + } + + } else { + role, err = r.kubeClient.RbacV1().Roles(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } + recordMetrics(namespace, "Role", name, "GET", err, r.metricsRecorder) return role, err } func (r *RBACService) GetRoleBinding(namespace, name string) (*rbacv1.RoleBinding, error) { - rolbinding, err := r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + var roleBinding *rbacv1.RoleBinding + var err error + var exists bool + if r.roleBindingCacheStore != nil { + c := *r.roleCacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + roleBinding = item.(*rbacv1.RoleBinding) + } + if !exists { + err = fmt.Errorf("role binding %v not found in namespace %v", name, namespace) + } + } else { + roleBinding, err = r.kubeClient.RbacV1().RoleBindings(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } recordMetrics(namespace, "RoleBinding", name, "GET", err, r.metricsRecorder) - return rolbinding, err + return roleBinding, err } func (r *RBACService) DeleteRole(namespace, name string) error { diff --git a/service/k8s/secret.go b/service/k8s/secret.go index 0edd23dea..6b540a51f 100644 --- a/service/k8s/secret.go +++ b/service/k8s/secret.go @@ -2,12 +2,15 @@ package k8s import ( "context" + "fmt" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // Secret interacts with k8s to get secrets @@ -19,26 +22,49 @@ type Secret interface { type SecretService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } func NewSecretService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *SecretService { logger = logger.With("service", "k8s.secret") + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = SecretCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &SecretService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *SecretService) GetSecret(namespace, name string) (*corev1.Secret, error) { + var secret *corev1.Secret + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + secret = item.(*corev1.Secret) + } + if !exists { + err = fmt.Errorf("secret %v not found in namespace %v", name, namespace) + } + } else { + secret, err = s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + } - secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) recordMetrics(namespace, "Secret", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err - } return secret, err } diff --git a/service/k8s/service.go b/service/k8s/service.go index 712cc4c0d..d72b14181 100644 --- a/service/k8s/service.go +++ b/service/k8s/service.go @@ -2,11 +2,14 @@ package k8s import ( "context" + "fmt" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -27,25 +30,51 @@ type Service interface { type ServiceService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewServiceService returns a new Service KubeService. func NewServiceService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ServiceService { logger = logger.With("service", "k8s.service") + + rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + + if ShouldUseCache() { + cacheStore, err = ServiceCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } + return &ServiceService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } func (s *ServiceService) GetService(namespace string, name string) (*corev1.Service, error) { - service, err := s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var service *corev1.Service + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + service = item.(*corev1.Service) + } + if !exists { + err = fmt.Errorf("svc %v/%v not found", namespace, name) + } + } else { + service, err = s.kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + recordMetrics(namespace, "Service", name, "GET", err, s.metricsRecorder) return service, err } diff --git a/service/k8s/statefulset.go b/service/k8s/statefulset.go index 38cc95ff2..08b79cdec 100644 --- a/service/k8s/statefulset.go +++ b/service/k8s/statefulset.go @@ -15,6 +15,8 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" "github.com/spotahome/redis-operator/log" "github.com/spotahome/redis-operator/metrics" @@ -35,27 +37,52 @@ type StatefulSet interface { type StatefulSetService struct { kubeClient kubernetes.Interface logger log.Logger + cacheStore *cache.Store metricsRecorder metrics.Recorder } // NewStatefulSetService returns a new StatefulSet KubeService. func NewStatefulSetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *StatefulSetService { logger = logger.With("service", "k8s.statefulSet") + + rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient) + var cacheStore *cache.Store + var err error + if ShouldUseCache() { + cacheStore, err = StatefulSetCacheStoreFromKubeClient(rc) + if err != nil { + logger.Errorf("unable to initialize cache: %v", err) + } + } return &StatefulSetService{ kubeClient: kubeClient, logger: logger, + cacheStore: cacheStore, metricsRecorder: metricsRecorder, } } // GetStatefulSet will retrieve the requested statefulset based on namespace and name func (s *StatefulSetService) GetStatefulSet(namespace, name string) (*appsv1.StatefulSet, error) { - statefulSet, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) - if err != nil { - return nil, err + var ss *appsv1.StatefulSet + var err error + var exists bool + if s.cacheStore != nil { + c := *s.cacheStore + var item interface{} + item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name)) + if exists && nil == err { + ss = item.(*appsv1.StatefulSet) + } + if !exists { + err = fmt.Errorf("statefulset %s not found in namespace %v", name, namespace) + } + } else { + ss, err = s.kubeClient.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } - return statefulSet, err + + recordMetrics(namespace, "StatefulSet", name, "GET", err, s.metricsRecorder) + return ss, err } // GetStatefulSetPods will give a list of pods that are managed by the statefulset diff --git a/service/k8s/util.go b/service/k8s/util.go index 2cd9bbd73..397301c8a 100644 --- a/service/k8s/util.go +++ b/service/k8s/util.go @@ -1,11 +1,23 @@ package k8s import ( + "context" "fmt" + "time" redisfailoverv1 "github.com/spotahome/redis-operator/api/redisfailover/v1" "github.com/spotahome/redis-operator/metrics" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" ) // GetRedisPassword retreives password from kubernetes secret or, if @@ -42,3 +54,428 @@ func recordMetrics(namespace string, kind string, object string, operation strin metricsRecorder.RecordK8sOperation(namespace, kind, object, operation, metrics.FAIL, metrics.K8S_MISC) } } + +// TODO: Update *CacheStoreFromKubeClient be implemented via generics + +func PodCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("pods"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.PodList, error) { + result := corev1.PodList{} + err := rc.Get().Resource("pods").Do(context.Background()).Into(&result) + return &result, err + } + podCacheStore, podCacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Pod{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go podCacheController.Run(wait.NeverStop) + + return &podCacheStore, nil + +} + +func ServiceCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("services"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ServiceList, error) { + result := corev1.ServiceList{} + err := rc.Get().Resource("services").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Service{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func ConfigMapCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if err != nil { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("configmap"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.ConfigMapList, error) { + fmt.Printf("cm lister calling...") + fmt.Printf("resr client: %v...", rc) + result := corev1.ConfigMapList{} + err := rc.Get().Resource("configmap").Do(context.Background()).Into(&result) + fmt.Printf("cm lister called; error found: %v\n", err) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.ConfigMap{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func DeploymentCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("deployments"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.DeploymentList, error) { + result := appsv1.DeploymentList{} + err := rc.Get().Resource("deployments").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.Deployment{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func PodDisruptionBudgetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := policyv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("poddisruptionbudgets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*policyv1.PodDisruptionBudgetList, error) { + result := policyv1.PodDisruptionBudgetList{} + err := rc.Get().Resource("poddisruptionbudgets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &policyv1.PodDisruptionBudget{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func RoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("roles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleList, error) { + result := rbacv1.RoleList{} + err := rc.Get().Resource("roles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.Role{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, nil +} + +func ClusterRoleCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("clusterroles"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.ClusterRoleList, error) { + result := rbacv1.ClusterRoleList{} + err := rc.Get().Resource("clusterroles").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.ClusterRole{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} + +func RoleBindingCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := rbacv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("rolebindings"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*rbacv1.RoleBindingList, error) { + result := rbacv1.RoleBindingList{} + err := rc.Get().Resource("rolebindings").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &rbacv1.RoleBinding{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} +func SecretCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := corev1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("secrets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*corev1.SecretList, error) { + result := corev1.SecretList{} + err := rc.Get().Resource("secrets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &corev1.Secret{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} + +func StatefulSetCacheStoreFromKubeClient(rc *rest.RESTClient) (*cache.Store, error) { + if rc == nil { + // this case usually happens during testing where dummy / fake clientsets are used + return nil, fmt.Errorf("rest client not initialized") + } + s := runtime.NewScheme() + err := appsv1.AddToScheme(s) + if nil != err { + return nil, err + } + watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true + parameterCodec := runtime.NewParameterCodec(s) + return rc.Get(). + Resource("statefulsets"). + VersionedParams(&opts, parameterCodec). + Watch(context.Background()) + } + listFunc := func(opts metav1.ListOptions) (*appsv1.StatefulSetList, error) { + result := appsv1.StatefulSetList{} + err := rc.Get().Resource("statefulsets").Do(context.Background()).Into(&result) + return &result, err + } + cacheStore, cacheController := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(lo metav1.ListOptions) (result runtime.Object, err error) { + return listFunc(lo) + }, + WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { + return watchFunc(lo) + }, + }, + &appsv1.StatefulSet{}, + 0*time.Second, + cache.ResourceEventHandlerFuncs{}, + ) + + go cacheController.Run(wait.NeverStop) + + return &cacheStore, err +} diff --git a/test/integration/redisfailover/creation_test.go b/test/integration/redisfailover/creation_test.go index 5458634ba..c63af3f64 100644 --- a/test/integration/redisfailover/creation_test.go +++ b/test/integration/redisfailover/creation_test.go @@ -94,7 +94,7 @@ func TestRedisFailover(t *testing.T) { } // Create kubernetes service. - k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy) + k8sservice := k8s.New(k8sClient, customClient, aeClientset, log.Dummy, metrics.Dummy, true) // Prepare namespace prepErr := clients.prepareNS()