Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use shared informers to watch resource #615

Merged
merged 7 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du
return ci, nil
}

func SoftDeleteConfigItems(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}

func SoftDeleteConfigItem(ctx context.Context, id string) error {
return ctx.DB().
Model(&models.ConfigItem{}).
Where("id IN (?)", ids).
Where("id = ?", id).
Update("deleted_at", gorm.Expr("NOW()")).
Error
}
40 changes: 4 additions & 36 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,28 +87,6 @@ func watchKubernetesEventsWithRetry(ctx api.ScrapeContext, config v1.Kubernetes)
}
}

func watchKubernetesResourcesWithRetry(ctx api.ScrapeContext, config v1.Kubernetes) {
const (
timeout = time.Minute // how long to keep retrying before we reset and retry again
exponentialBaseDuration = time.Second
)

for {
backoff := retry.WithMaxDuration(timeout, retry.NewExponential(exponentialBaseDuration))
err := retry.Do(ctx, backoff, func(ctxt gocontext.Context) error {
ctx := ctxt.(api.ScrapeContext)
if err := kubernetes.WatchResources(ctx, config); err != nil {
logger.Errorf("failed to watch resources: %v", err)
return retry.RetryableError(err)
}

return nil
})

logger.Errorf("failed to watch kubernetes resources. cluster=%s: %v", config.ClusterName, err)
}
}

func SyncScrapeJob(sc api.ScrapeContext) error {
id := sc.ScrapeConfig().GetPersistedID().String()

Expand Down Expand Up @@ -175,7 +153,10 @@ func scheduleScraperJob(sc api.ScrapeContext) error {
}

go watchKubernetesEventsWithRetry(sc, config)
go watchKubernetesResourcesWithRetry(sc, config)

if err := kubernetes.WatchResources(sc, config); err != nil {
return fmt.Errorf("failed to watch kubernetes resources: %v", err)
}

eventsWatchJob := ConsumeKubernetesWatchEventsJobFunc(sc, config)
if err := eventsWatchJob.AddToScheduler(scrapeJobScheduler); err != nil {
Expand Down Expand Up @@ -315,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
}
}

_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
deletChan := _deleteCh.(chan string)

if len(deletChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan))
if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
}
}

return nil
},
}
Expand Down
73 changes: 36 additions & 37 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"encoding/json"
"fmt"
"strings"
"sync"
Expand All @@ -10,13 +11,12 @@ import (
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/utils"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/utils/kube"
)

Expand All @@ -31,61 +31,59 @@ var (

// WatchEventBuffers stores a sync buffer per kubernetes config
WatchResourceBuffer = sync.Map{}

// DeleteResourceBuffer stores a buffer per kubernetes config
// that contains the ids of resources that have been deleted.
DeleteResourceBuffer = sync.Map{}
)

// WatchResources watches Kubernetes resources
func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) {
b, err := json.Marshal(obj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to marshal and unmarshall - It should be unstructured already ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we receive concrete types like core v1.Pod

if err != nil {
return nil, fmt.Errorf("failed to marshal: %v", err)
}

var m map[string]any
if err := json.Unmarshal(b, &m); err != nil {
return nil, fmt.Errorf("failed to unmarshal on add func: %v", err)
}

// The object returned by the informers do not have kind and apiversion set
m["kind"] = resource.Kind
m["apiVersion"] = resource.ApiVersion

return &unstructured.Unstructured{Object: m}, nil
}

// WatchResources watches Kubernetes resources with shared informers
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize))
WatchResourceBuffer.Store(config.Hash(), buffer)

deleteBuffer := make(chan string, WatchResourceBufferSize)
DeleteResourceBuffer.Store(config.Hash(), deleteBuffer)

var restConfig *rest.Config
var err error
if config.Kubeconfig != nil {
ctx, restConfig, err = applyKubeconfig(ctx, *config.Kubeconfig)
ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig)
if err != nil {
return fmt.Errorf("failed to apply kube config")
return fmt.Errorf("failed to apply custom kube config(%s): %w", config.Kubeconfig, err)
}
} else {
restConfig, err = kube.DefaultRestConfig()
_, err = kube.DefaultRestConfig()
if err != nil {
return fmt.Errorf("failed to apply kube config")
return fmt.Errorf("failed to apply default kube config: %w", err)
}
}

var channels []<-chan watch.Event
for _, k := range lo.Uniq(config.Watch) {
client, err := kube.GetClientByGroupVersionKind(restConfig, k.ApiVersion, k.Kind)
if err != nil {
return fmt.Errorf("failed to create client for kind(%s): %v", k, err)
}

watcher, err := client.Watch(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create watcher for kind(%s): %v", k, err)
for _, watchResource := range lo.Uniq(config.Watch) {
if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil {
return fmt.Errorf("failed to register informer: %w", err)
}
defer watcher.Stop()

channels = append(channels, watcher.ResultChan())
}

for watchEvent := range utils.MergeChannels(channels...) {
obj, ok := watchEvent.Object.(*unstructured.Unstructured)
if ok {
if watchEvent.Type == watch.Deleted {
deleteBuffer <- string(obj.GetUID())
} else {
buffer <- obj
}
}
// Stop all the other active shared informers, if any, that were previously started
// but then removed from the config.
var existingWatches []string
for _, w := range config.Watch {
existingWatches = append(existingWatches, w.ApiVersion+w.Kind)
}
globalSharedInformerManager.stop(ctx, config.Kubeconfig.ValueStatic, existingWatches...)

ctx.Counter("kubernetes_scraper_resource_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
return nil
}

Expand All @@ -110,6 +108,7 @@ func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error {
}
defer watcher.Stop()

ctx.Counter("kubernetes_scraper_event_watcher", "scraper_id", lo.FromPtr(ctx.ScrapeConfig().GetPersistedID()).String()).Add(1)
for watchEvent := range watcher.ResultChan() {
var event v1.KubernetesEvent
if err := event.FromObj(watchEvent.Object); err != nil {
Expand Down
Loading
Loading