Skip to content

Commit

Permalink
feat: use shared informers to watch resource
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jun 5, 2024
1 parent a4417f6 commit 4c1cd77
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 24 deletions.
125 changes: 104 additions & 21 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -10,13 +11,13 @@ import (
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/utils"
"github.com/flanksource/config-db/utils/kube"
)

Expand All @@ -37,6 +38,24 @@ var (
DeleteResourceBuffer = sync.Map{}
)

func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) {
b, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %v", err)
}

var m map[string]any
if err := json.Unmarshal(b, &m); err != nil {
return nil, fmt.Errorf("failed to unmarshal on add func: %v", err)
}

// The object returned by the informers do not have kind and apiversion set
m["kind"] = resource.Kind
m["apiVersion"] = resource.ApiVersion

return &unstructured.Unstructured{Object: m}, nil
}

// WatchResources watches Kubernetes resources
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize))
Expand All @@ -59,30 +78,94 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
}
}

var channels []<-chan watch.Event
for _, k := range lo.Uniq(config.Watch) {
client, err := kube.GetClientByGroupVersionKind(restConfig, k.ApiVersion, k.Kind)
if err != nil {
return fmt.Errorf("failed to create client for kind(%s): %v", k, err)
}
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create kubernetes clientset from rest config: %w", err)
}

watcher, err := client.Watch(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create watcher for kind(%s): %v", k, err)
factory := informers.NewSharedInformerFactory(clientset, 0)
stopper := make(chan struct{})
defer close(stopper)

var wg sync.WaitGroup
for _, watchResource := range lo.Uniq(config.Watch) {
wg.Add(1)

informer := getInformer(factory, watchResource.ApiVersion, watchResource.Kind)
if informer == nil {
return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", watchResource.ApiVersion, watchResource.Kind)
}
defer watcher.Stop()

channels = append(channels, watcher.ResultChan())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{

Check failure on line 99 in scrapers/kubernetes/events_watch.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `informer.AddEventHandler` is not checked (errcheck)
AddFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
if err != nil {
logger.Errorf("failed to get unstructured from new object: %v", err)
return
}

buffer <- u
},
UpdateFunc: func(oldObj any, newObj any) {
u, err := getUnstructuredFromInformedObj(watchResource, newObj)
if err != nil {
logger.Errorf("failed to get unstructured from updated object: %v", err)
return
}

buffer <- u
},
DeleteFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
if err != nil {
logger.Errorf("failed to get unstructured from deleted object: %v", err)
return
}

deleteBuffer <- string(u.GetUID())
},
})

go informer.Run(stopper)
}

for watchEvent := range utils.MergeChannels(channels...) {
obj, ok := watchEvent.Object.(*unstructured.Unstructured)
if ok {
if watchEvent.Type == watch.Deleted {
deleteBuffer <- string(obj.GetUID())
} else {
buffer <- obj
}
ctx.Logger.V(1).Infof("waiting for informers: %w", err)
wg.Wait()

return nil
}

func getInformer(factory informers.SharedInformerFactory, apiVersion, kind string) cache.SharedInformer {
// TODO: need to populate this

kind = strings.ToLower(kind)
switch strings.ToLower(apiVersion) {
case "v1":
switch kind {
case "pod":
return factory.Core().V1().Pods().Informer()
case "node":
return factory.Core().V1().Nodes().Informer()
}

case "apps/v1":
switch kind {
case "deployment":
return factory.Apps().V1().Deployments().Informer()
case "daemonset":
return factory.Apps().V1().DaemonSets().Informer()
case "replicaset":
return factory.Apps().V1().ReplicaSets().Informer()
case "statefulset":
return factory.Apps().V1().StatefulSets().Informer()
}

case "batch/v1":
switch kind {
case "cronjob":
return factory.Batch().V1().CronJobs().Informer()
case "job":
return factory.Batch().V1().Jobs().Informer()
}
}

Expand Down
4 changes: 2 additions & 2 deletions utils/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func getRestMapper(config *rest.Config) (meta.RESTMapper, error) {
return restmapper.NewDeferredDiscoveryRESTMapper(cache), nil
}

func getGroupVersion(apiVersion string) (string, string) {
func GetGroupVersion(apiVersion string) (string, string) {
split := strings.Split(apiVersion, "/")
if len(split) == 1 {
return "", apiVersion
Expand All @@ -73,7 +73,7 @@ func GetClientByGroupVersionKind(cfg *rest.Config, apiVersion, kind string) (dyn
return nil, err
}

group, version := getGroupVersion(apiVersion)
group, version := GetGroupVersion(apiVersion)
gvk, err := rm.KindFor(schema.GroupVersionResource{Group: group, Version: version, Resource: kind})
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion utils/kube/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestGetGroupVersion(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
group, version := getGroupVersion(tc.apiVersion)
group, version := GetGroupVersion(tc.apiVersion)
if group != tc.expectedGroup || version != tc.expectedVersion {
t.Errorf("getGroupVersion(%q) = %q, %q; expected %q, %q",
tc.apiVersion, group, version, tc.expectedGroup, tc.expectedVersion)
Expand Down

0 comments on commit 4c1cd77

Please sign in to comment.