diff --git a/scrapers/kubernetes/events_watch.go b/scrapers/kubernetes/events_watch.go index 1ef27498..647caa08 100644 --- a/scrapers/kubernetes/events_watch.go +++ b/scrapers/kubernetes/events_watch.go @@ -58,33 +58,26 @@ func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj a // WatchResources watches Kubernetes resources func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { - buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize)) WatchResourceBuffer.Store(config.Hash(), buffer) deleteBuffer := make(chan string, WatchResourceBufferSize) DeleteResourceBuffer.Store(config.Hash(), deleteBuffer) - var restConfig *rest.Config var err error if config.Kubeconfig != nil { - ctx, restConfig, err = applyKubeconfig(ctx, *config.Kubeconfig) + ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig) if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply custom kube config(%s): %w", config.Kubeconfig, err) } } else { - restConfig, err = kube.DefaultRestConfig() + _, err = kube.DefaultRestConfig() if err != nil { - return fmt.Errorf("failed to apply kube config") + return fmt.Errorf("failed to apply default kube config: %w", err) } } - clientset, err := kube.ClientSetFromRestConfig(restConfig) - if err != nil { - return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err) - } - - factory := informers.NewSharedInformerFactory(clientset, 0) + factory := informers.NewSharedInformerFactory(ctx.Kubernetes(), 0) stopper := make(chan struct{}) defer close(stopper) @@ -133,8 +126,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error { go informer.Run(stopper) } - ctx.Counter("kubernetes_scraper_resource_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) - ctx.Logger.V(1).Infof("waiting for informers") + ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) wg.Wait() return nil @@ -198,7 +190,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error { } defer watcher.Stop() - ctx.Counter("kubernetes_scraper_event_watcher", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) + ctx.Counter("kubernetes_scraper_event_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1) for watchEvent := range watcher.ResultChan() { var event v1.KubernetesEvent if err := event.FromObj(watchEvent.Object); err != nil { diff --git a/utils/kube/kube.go b/utils/kube/kube.go index 8a66eca3..6f6d75a3 100644 --- a/utils/kube/kube.go +++ b/utils/kube/kube.go @@ -14,9 +14,7 @@ limitations under the License. package kube import ( - "bytes" "context" - "encoding/gob" "fmt" "os" "path/filepath" @@ -42,34 +40,16 @@ import ( "k8s.io/client-go/util/homedir" ) -var ( - kubeClientCreatedCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_count", - Help: "The total number of times kubernetes clientset were created from rest configs", - }, - []string{}, - ) - - kubeClientCacheHitCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_count_cache_hit", - Help: "The total number of times kubernetes clientset were created from rest configs", - }, - []string{}, - ) - - kubeClientCreatErrorCount = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "kube_client_from_rest_error_count", - Help: "The total number of times kubernetes clientset were failed to be created from rest configs", - }, - []string{}, - ) +var kubeClientCreatedCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "kube_client_created_count", + Help: "The total number of times kubernetes clientset were created from kube config", + }, + []string{"cached"}, ) func init() { - prometheus.MustRegister(kubeClientCreatedCount, kubeClientCacheHitCount, kubeClientCreatErrorCount) + prometheus.MustRegister(kubeClientCreatedCount) } func getRestMapper(config *rest.Config) (meta.RESTMapper, error) { @@ -122,56 +102,44 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn return dc.Resource(mapping.Resource), nil } -var clientSetCache = cache.New(time.Hour*24, time.Hour*24) +var kubeCache = cache.New(time.Hour, time.Hour) -func ClientSetFromRestConfig(restConfig *rest.Config) (*kubernetes.Clientset, error) { - client, cached, err := clientSetFromRestConfigCached(restConfig) - if err != nil { - kubeClientCreatErrorCount.WithLabelValues().Inc() - return nil, err - } - - if cached { - kubeClientCacheHitCount.WithLabelValues().Inc() - } else { - kubeClientCreatedCount.WithLabelValues().Inc() - } - - return client, nil +type kubeCacheData struct { + Client kubernetes.Interface + Config *rest.Config } -func clientSetFromRestConfigCached(restConfig *rest.Config) (*kubernetes.Clientset, bool, error) { - // Using gob encoder because json encoder returned type error for transport wrapper func - var b bytes.Buffer - if err := gob.NewEncoder(&b).Encode(restConfig); err != nil { - return nil, false, fmt.Errorf("failed to gob encode restconfig: %w", err) - } - key := b.String() - - if val, ok := clientSetCache.Get(key); ok { - return val.(*kubernetes.Clientset), true, nil +func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-path-%s", kubeConfigPath) + if val, ok := kubeCache.Get(key); ok { + d := val.(*kubeCacheData) + kubeClientCreatedCount.WithLabelValues("true").Inc() + return d.Client, d.Config, nil } - client, err := kubernetes.NewForConfig(restConfig) + config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) if err != nil { - return nil, false, err + return fake.NewSimpleClientset(), nil, err } - clientSetCache.SetDefault(key, client) - - return client, false, nil -} -func NewKubeClientWithConfigPath(kubeConfigPath string) (kubernetes.Interface, *rest.Config, error) { - config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath) + client, err := kubernetes.NewForConfig(config) if err != nil { return fake.NewSimpleClientset(), nil, err } - client, err := ClientSetFromRestConfig(config) + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Config, error) { + key := fmt.Sprintf("kube-config-%s", kubeConfig) + if val, ok := kubeCache.Get(key); ok { + kubeClientCreatedCount.WithLabelValues("true").Inc() + d := val.(*kubeCacheData) + return d.Client, d.Config, nil + } + getter := func() (*clientcmdapi.Config, error) { clientCfg, err := clientcmd.NewClientConfigFromBytes([]byte(kubeConfig)) if err != nil { @@ -191,7 +159,13 @@ func NewKubeClientWithConfig(kubeConfig string) (kubernetes.Interface, *rest.Con return fake.NewSimpleClientset(), nil, err } - client, err := ClientSetFromRestConfig(config) + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + + kubeCache.SetDefault(key, &kubeCacheData{Client: client, Config: config}) + kubeClientCreatedCount.WithLabelValues("false").Inc() return client, config, err } @@ -222,13 +196,17 @@ func NewK8sClient() (kubernetes.Interface, *rest.Config, error) { } } - client, err := ClientSetFromRestConfig(restConfig) + client, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return fake.NewSimpleClientset(), nil, err + } + return client, restConfig, err } // GetClusterName ... func GetClusterName(config *rest.Config) string { - clientset, err := ClientSetFromRestConfig(config) + clientset, err := kubernetes.NewForConfig(config) if err != nil { return "" }