Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce number of calls made to k8s api server #581

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion cmd/redisoperator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (m *Main) Run() error {
}

// Create kubernetes service.
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder)
k8sservice := k8s.New(k8sClient, customClient, aeClientset, m.logger, metricsRecorder, m.flags.UseCache)

// Create the redis clients
redisClient := redis.New(metricsRecorder)
Expand Down
2 changes: 2 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type CMDFlags struct {
K8sQueriesBurstable int
Concurrency int
LogLevel string
UseCache bool
}

// Init initializes and parse the flags
Expand All @@ -35,6 +36,7 @@ func (c *CMDFlags) Init() {
// reference: https://github.com/spotahome/kooper/blob/master/controller/controller.go#L89
flag.IntVar(&c.Concurrency, "concurrency", 3, "Number of conccurent workers meant to process events")
flag.StringVar(&c.LogLevel, "log-level", "info", "set log level")
flag.BoolVar(&c.UseCache, "use-cache", false, "use cache stores to get k8s objects")
// Parse flags
flag.Parse()
}
Expand Down
38 changes: 33 additions & 5 deletions service/k8s/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package k8s

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
"k8s.io/client-go/tools/cache"
)

// ConfigMap the ServiceAccount service that knows how to interact with k8s to manage them
Expand All @@ -26,26 +29,51 @@ type ConfigMap interface {
type ConfigMapService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewConfigMapService returns a new ConfigMap KubeService.
func NewConfigMapService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *ConfigMapService {
logger = logger.With("service", "k8s.configMap")
var err error
rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient)
var cmCacheStore *cache.Store
if ShouldUseCache() {
cmCacheStore, err = ConfigMapCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}
return &ConfigMapService{
kubeClient: kubeClient,
logger: logger,
cacheStore: cmCacheStore,
metricsRecorder: metricsRecorder,
}
}

func (p *ConfigMapService) GetConfigMap(namespace string, name string) (*corev1.ConfigMap, error) {
configMap, err := p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder)
if err != nil {
return nil, err
var cm *corev1.ConfigMap
var err error
var exists bool
if p.cacheStore != nil {
c := *p.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
cm = item.(*corev1.ConfigMap)
}
if !exists {
err = fmt.Errorf("configmap %v not found in namespace %v", name, namespace)
}
} else {
cm, err = p.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
return configMap, err

recordMetrics(namespace, "ConfigMap", name, "GET", err, p.metricsRecorder)

return cm, err
}

func (p *ConfigMapService) CreateConfigMap(namespace string, configMap *corev1.ConfigMap) error {
Expand Down
33 changes: 29 additions & 4 deletions service/k8s/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
Expand All @@ -30,26 +32,49 @@ type Deployment interface {
type DeploymentService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewDeploymentService returns a new Deployment KubeService.
func NewDeploymentService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *DeploymentService {
logger = logger.With("service", "k8s.deployment")
rc := kubeClient.AppsV1().RESTClient().(*rest.RESTClient)
var cacheStore *cache.Store
var err error
if ShouldUseCache() {
cacheStore, err = DeploymentCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}
return &DeploymentService{
kubeClient: kubeClient,
logger: logger,
cacheStore: cacheStore,
metricsRecorder: metricsRecorder,
}
}

// GetDeployment will retrieve the requested deployment based on namespace and name
func (d *DeploymentService) GetDeployment(namespace, name string) (*appsv1.Deployment, error) {
deployment, err := d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder)
if err != nil {
return nil, err
var deployment *appsv1.Deployment
var err error
var exists bool
if d.cacheStore != nil {
c := *d.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
deployment = item.(*appsv1.Deployment)
}
if !exists {
err = fmt.Errorf("deployment %v not found in namespace %v", name, namespace)
}
} else {
deployment, err = d.kubeClient.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
recordMetrics(namespace, "Deployment", name, "GET", err, d.metricsRecorder)
return deployment, err
}

Expand Down
11 changes: 10 additions & 1 deletion service/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ import (
"github.com/spotahome/redis-operator/metrics"
)

var (
useCache bool
)

func ShouldUseCache() bool {
return useCache
}

// Service is the K8s service entrypoint.
type Services interface {
ConfigMap
Expand All @@ -35,7 +43,8 @@ type services struct {
}

// New returns a new Kubernetes service.
func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder) Services {
func New(kubecli kubernetes.Interface, crdcli redisfailoverclientset.Interface, apiextcli apiextensionscli.Interface, logger log.Logger, metricsRecorder metrics.Recorder, cacheEnabled bool) Services {
useCache = cacheEnabled
return &services{
ConfigMap: NewConfigMapService(kubecli, logger, metricsRecorder),
Secret: NewSecretService(kubecli, logger, metricsRecorder),
Expand Down
43 changes: 36 additions & 7 deletions service/k8s/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@ package k8s
import (
"context"
"encoding/json"
"fmt"

"k8s.io/apimachinery/pkg/types"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

// Pod the ServiceAccount service that knows how to interact with k8s to manage them
Expand All @@ -30,25 +32,52 @@ type Pod interface {
type PodService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewPodService returns a new Pod KubeService.
func NewPodService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodService {
logger = logger.With("service", "k8s.pod")
rc := kubeClient.CoreV1().RESTClient().(*rest.RESTClient)
fmt.Printf("[POD]-- rest client interface: %v\n", rc)
var podCacheStore *cache.Store
var err error
if ShouldUseCache() {
podCacheStore, err = PodCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}

return &PodService{
kubeClient: kubeClient,
logger: logger,
cacheStore: podCacheStore,
metricsRecorder: metricsRecorder,
}
}

func (p *PodService) GetPod(namespace string, name string) (*corev1.Pod, error) {
pod, err := p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder)
if err != nil {
return nil, err
var pod *corev1.Pod
var err error
var exists bool
if p.cacheStore != nil {

c := *p.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
pod = item.(*corev1.Pod)
}
if !exists {
err = fmt.Errorf("pod %v not found in namespace %v", name, namespace)
}
} else {
pod, err = p.kubeClient.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})

}
recordMetrics(namespace, "Pod", name, "GET", err, p.metricsRecorder)
return pod, err
}

Expand Down
41 changes: 36 additions & 5 deletions service/k8s/poddisruptionbudget.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package k8s

import (
"context"
"fmt"

policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/spotahome/redis-operator/log"
"github.com/spotahome/redis-operator/metrics"
Expand All @@ -25,26 +28,54 @@ type PodDisruptionBudget interface {
type PodDisruptionBudgetService struct {
kubeClient kubernetes.Interface
logger log.Logger
cacheStore *cache.Store
metricsRecorder metrics.Recorder
}

// NewPodDisruptionBudgetService returns a new PodDisruptionBudget KubeService.
func NewPodDisruptionBudgetService(kubeClient kubernetes.Interface, logger log.Logger, metricsRecorder metrics.Recorder) *PodDisruptionBudgetService {
logger = logger.With("service", "k8s.podDisruptionBudget")

rc := kubeClient.PolicyV1().RESTClient().(*rest.RESTClient)
var cacheStore *cache.Store
var err error
if ShouldUseCache() {
cacheStore, err = PodDisruptionBudgetCacheStoreFromKubeClient(rc)
if err != nil {
logger.Errorf("unable to initialize cache: %v", err)
}
}

return &PodDisruptionBudgetService{
kubeClient: kubeClient,
logger: logger,
cacheStore: cacheStore,
metricsRecorder: metricsRecorder,
}
}

func (p *PodDisruptionBudgetService) GetPodDisruptionBudget(namespace string, name string) (*policyv1.PodDisruptionBudget, error) {
podDisruptionBudget, err := p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder)
if err != nil {
return nil, err
var podDisruptionBudget *policyv1.PodDisruptionBudget
var err error
var exists bool

if p.cacheStore != nil {
c := *p.cacheStore
var item interface{}
item, exists, err = c.GetByKey(fmt.Sprintf("%v/%v", namespace, name))
if exists && nil == err {
podDisruptionBudget = item.(*policyv1.PodDisruptionBudget)
}
if !exists {

err = fmt.Errorf("podDisruptionBudget %v not found in namespace %v", name, namespace)
}
} else {
podDisruptionBudget, err = p.kubeClient.PolicyV1().PodDisruptionBudgets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}
return podDisruptionBudget, nil
recordMetrics(namespace, "PodDisruptionBudget", name, "GET", err, p.metricsRecorder)

return podDisruptionBudget, err
}

func (p *PodDisruptionBudgetService) CreatePodDisruptionBudget(namespace string, podDisruptionBudget *policyv1.PodDisruptionBudget) error {
Expand Down
Loading