Skip to content

Commit

Permalink
fix: cache kubeconfig->clientset creation
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jun 5, 2024
1 parent 6838940 commit 89c6579
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 79 deletions.
22 changes: 7 additions & 15 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,26 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a

// WatchResources watches Kubernetes resources
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {

buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize))
WatchResourceBuffer.Store(config.Hash(), buffer)

deleteBuffer := make(chan string, WatchResourceBufferSize)
DeleteResourceBuffer.Store(config.Hash(), deleteBuffer)

var restConfig *rest.Config
var err error
if config.Kubeconfig != nil {
ctx, restConfig, err = applyKubeconfig(ctx, *config.Kubeconfig)
ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig)
if err != nil {
return fmt.Errorf("failed to apply kube config")
return fmt.Errorf("failed to apply custom kube config(%s): %w", config.Kubeconfig, err)
}
} else {
restConfig, err = kube.DefaultRestConfig()
_, err = kube.DefaultRestConfig()
if err != nil {
return fmt.Errorf("failed to apply kube config")
return fmt.Errorf("failed to apply default kube config: %w", err)
}
}

clientset, err := kube.ClientSetFromRestConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err)
}

factory := informers.NewSharedInformerFactory(clientset, 0)
factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0)
stopper := make(chan struct{})
defer close(stopper)

Expand Down Expand Up @@ -133,8 +126,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
go informer.Run(stopper)
}

ctx.Counter("kubernetes_scraper_resource_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
ctx.Logger.V(1).Infof("waiting for informers")
ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
wg.Wait()

return nil
Expand Down Expand Up @@ -198,7 +190,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error {
}
defer watcher.Stop()

ctx.Counter("kubernetes_scraper_event_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
ctx.Counter("kubernetes_scraper_event_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
for watchEvent := range watcher.ResultChan() {
var event v1.KubernetesEvent
if err := event.FromObj(watchEvent.Object); err != nil {
Expand Down
106 changes: 42 additions & 64 deletions utils/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ limitations under the License.
package kube

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"os"
"path/filepath"
Expand All @@ -42,34 +40,16 @@ import (
"k8s.io/client-go/util/homedir"
)

var (
kubeClientCreatedCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kube_client_from_rest_count",
Help: "The total number of times kubernetes clientset were created from rest configs",
},
[]string{},
)

kubeClientCacheHitCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kube_client_from_rest_count_cache_hit",
Help: "The total number of times kubernetes clientset were created from rest configs",
},
[]string{},
)

kubeClientCreatErrorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kube_client_from_rest_error_count",
Help: "The total number of times kubernetes clientset were failed to be created from rest configs",
},
[]string{},
)
var kubeClientCreatedCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "kube_client_created_count",
Help: "The total number of times kubernetes clientset were created from kube config",
},
[]string{"cached"},
)

func init() {
prometheus.MustRegister(kubeClientCreatedCount, kubeClientCacheHitCount, kubeClientCreatErrorCount)
prometheus.MustRegister(kubeClientCreatedCount)
}

func getRestMapper(config *rest.Config) (meta.RESTMapper, error) {
Expand Down Expand Up @@ -122,56 +102,44 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn
return dc.Resource(mapping.Resource), nil
}

var clientSetCache = cache.New(time.Hour*24, time.Hour*24)
var kubeCache = cache.New(time.Hour, time.Hour)

func ClientSetFromRestConfig(restConfig *rest.Config) (*kubernetes.Clientset, error) {
client, cached, err := clientSetFromRestConfigCached(restConfig)
if err != nil {
kubeClientCreatErrorCount.WithLabelValues().Inc()
return nil, err
}

if cached {
kubeClientCacheHitCount.WithLabelValues().Inc()
} else {
kubeClientCreatedCount.WithLabelValues().Inc()
}

return client, nil
type kubeCacheData struct {
Client kubernetes.Interface
Config *rest.Config
}

func clientSetFromRestConfigCached(restConfig *rest.Config) (*kubernetes.Clientset, bool, error) {
// Using gob encoder because json encoder returned type error for transport wrapper func
var b bytes.Buffer
if err := gob.NewEncoder(&b).Encode(restConfig); err != nil {
return nil, false, fmt.Errorf("failed to gob encode restconfig: %w", err)
}
key := b.String()

if val, ok := clientSetCache.Get(key); ok {
return val.(*kubernetes.Clientset), true, nil
func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) {
key := fmt.Sprintf("kube-config-path-%s", kubeConfigPath)
if val, ok := kubeCache.Get(key); ok {
d := val.(*kubeCacheData)
kubeClientCreatedCount.WithLabelValues("true").Inc()
return d.Client, d.Config, nil
}

client, err := kubernetes.NewForConfig(restConfig)
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, false, err
return fake.NewSimpleClientset(), nil, err
}
clientSetCache.SetDefault(key, client)

return client, false, nil
}

func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
client, err := kubernetes.NewForConfig(config)
if err != nil {
return fake.NewSimpleClientset(), nil, err
}

client, err := ClientSetFromRestConfig(config)
kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config})
kubeClientCreatedCount.WithLabelValues("false").Inc()
return client, config, err
}

func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Config, error) {
key := fmt.Sprintf("kube-config-%s", kubeConfig)
if val, ok := kubeCache.Get(key); ok {
kubeClientCreatedCount.WithLabelValues("true").Inc()
d := val.(*kubeCacheData)
return d.Client, d.Config, nil
}

getter := func() (*clientcmdapi.Config, error) {
clientCfg, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig))
if err != nil {
Expand All @@ -191,7 +159,13 @@ func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Con
return fake.NewSimpleClientset(), nil, err
}

client, err := ClientSetFromRestConfig(config)
client, err := kubernetes.NewForConfig(config)
if err != nil {
return fake.NewSimpleClientset(), nil, err
}

kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config})
kubeClientCreatedCount.WithLabelValues("false").Inc()
return client, config, err
}

Expand Down Expand Up @@ -222,13 +196,17 @@ func NewK8sClient() (kubernetes.Interface, *rest.Config, error) {
}
}

client, err := ClientSetFromRestConfig(restConfig)
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fake.NewSimpleClientset(), nil, err
}

return client, restConfig, err
}

// GetClusterName ...
func GetClusterName(config *rest.Config) string {
clientset, err := ClientSetFromRestConfig(config)
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return ""
}
Expand Down

0 comments on commit 89c6579

Please sign in to comment.