Skip to content

Commit

Permalink
feat: remove delete buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Jun 7, 2024
1 parent 639385b commit 8c65ceb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 40 deletions.
8 changes: 2 additions & 6 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du
return ci, nil
}

func SoftDeleteConfigItems(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}

func SoftDeleteConfigItem(ctx context.Context, id string) error {
return ctx.DB().
Model(&models.ConfigItem{}).
Where("id IN (?)", ids).
Where("id = ?", id).
Update("deleted_at", gorm.Expr("NOW()")).
Error
}
13 changes: 0 additions & 13 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,6 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
}
}

_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
deletChan := _deleteCh.(chan string)

if len(deletChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan))
if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
}
}

return nil
},
}
Expand Down
10 changes: 2 additions & 8 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
"github.com/flanksource/config-db/utils/kube"
)

Expand All @@ -30,10 +31,6 @@ var (

// WatchEventBuffers stores a sync buffer per kubernetes config
WatchResourceBuffer = sync.Map{}

// DeleteResourceBuffer stores a buffer per kubernetes config
// that contains the ids of resources that have been deleted.
DeleteResourceBuffer = sync.Map{}
)

func getUnstructuredFromInformedObj(resource v1.KubernetesResourceToWatch, obj any) (*unstructured.Unstructured, error) {
Expand All @@ -59,9 +56,6 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize))
WatchResourceBuffer.Store(config.Hash(), buffer)

deleteBuffer := make(chan string, WatchResourceBufferSize)
DeleteResourceBuffer.Store(config.Hash(), deleteBuffer)

var err error
if config.Kubeconfig != nil {
ctx, _, err = applyKubeconfig(ctx, *config.Kubeconfig)
Expand All @@ -76,7 +70,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
}

for _, watchResource := range lo.Uniq(config.Watch) {
if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, deleteBuffer); err != nil {
if err := globalSharedInformerManager.Register(ctx, config.Kubeconfig.ValueStatic, watchResource, buffer, db.SoftDeleteConfigItem); err != nil {
return fmt.Errorf("failed to register informer: %w", err)
}
}
Expand Down
45 changes: 32 additions & 13 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/informers"
Expand All @@ -31,10 +34,12 @@ type SharedInformerManager struct {
cache map[string]map[string]*informerCacheData
}

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteBuffer chan<- string) error {
type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig string, watchResource v1.KubernetesResourceToWatch, buffer chan<- *unstructured.Unstructured, deleteHandler DeleteObjHandler) error {
apiVersion, kind := watchResource.ApiVersion, watchResource.Kind

informer, stopper, isNew := t.get(ctx, kubeconfig, apiVersion, kind)
informer, stopper, isNew := t.getOrCreate(ctx, kubeconfig, apiVersion, kind)
if informer == nil {
return fmt.Errorf("could not find informer for: apiVersion=%v kind=%v", apiVersion, kind)
}
Expand All @@ -45,7 +50,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return nil
}

ctx.Logger.V(0).Infof("registering shared informer for: %v", watchResource)
ctx.Logger.V(1).Infof("registering shared informer for: %v", watchResource)
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
Expand All @@ -54,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

logger.Infof("added: %s %s", u.GetKind(), u.GetName())
ctx.Logger.V(2).Infof("added: %s %s", u.GetKind(), u.GetName())
buffer <- u
},
UpdateFunc: func(oldObj any, newObj any) {
Expand All @@ -64,33 +69,36 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin
return
}

logger.Infof("updated: %s %s", u.GetKind(), u.GetName())
ctx.Logger.V(2).Infof("updated: %s %s", u.GetKind(), u.GetName())
buffer <- u
},
DeleteFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
if err != nil {
logger.Errorf("failed to get unstructured from deleted object: %v", err)
logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(), "failed to get unstructured %v", err)
return
}

logger.Infof("deleted:%s %s", u.GetKind(), u.GetName())
deleteBuffer <- string(u.GetUID())
if err := deleteHandler(ctx.Context, string(u.GetUID())); err != nil {
logToJobHistory(ctx.DutyContext(), "DeleteK8sWatchResource", ctx.ScrapeConfig().GetPersistedID(),
"failed to delete (%s) %s/%s/%s resources: %v", u.GetUID(), u.GetKind(), u.GetNamespace(), u.GetName(), err)
return
}
},
})
if err != nil {
return fmt.Errorf("failed to add informent event handlers: %w", err)
return fmt.Errorf("failed to add informer event handlers: %w", err)
}

go func() {
informer.Run(stopper)
ctx.Logger.V(0).Infof("stopped shared informer for: %v", watchResource)
ctx.Logger.V(1).Infof("stopped shared informer for: %v", watchResource)
}()
return nil
}

// get returns an existing shared informer instance or creates & returns a new one.
func (t *SharedInformerManager) get(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) {
// getOrCreate returns an existing shared informer instance or creates & returns a new one.
func (t *SharedInformerManager) getOrCreate(ctx api.ScrapeContext, kubeconfig, apiVersion, kind string) (cache.SharedInformer, chan struct{}, bool) {
t.mu.Lock()
defer t.mu.Unlock()

Expand Down Expand Up @@ -133,7 +141,7 @@ func (t *SharedInformerManager) stop(ctx api.ScrapeContext, kubeconfig string, e
if informers, ok := t.cache[kubeconfig]; ok {
for key, cached := range informers {
if !lo.Contains(exception, key) {
ctx.Logger.V(0).Infof("stopping informer for %s", key)
ctx.Logger.V(1).Infof("stopping informer for %s", key)

cached.informer.IsStopped()
ctx.Gauge("kubernetes_active_shared_informers").Sub(1)
Expand Down Expand Up @@ -185,3 +193,14 @@ func getInformer(factory informers.SharedInformerFactory, apiVersion, kind strin

return nil
}

// logToJobHistory logs any failures in saving a playbook run to the job history.
func logToJobHistory(ctx context.Context, job string, scraperID *uuid.UUID, err string, args ...any) {
jobHistory := models.NewJobHistory(ctx.Logger, job, "", lo.FromPtr(scraperID).String())
jobHistory.Start()
jobHistory.AddErrorf(err, args...)

if err := jobHistory.End().Persist(ctx.DB()); err != nil {
logger.Errorf("error persisting job history: %v", err)
}
}

0 comments on commit 8c65ceb

Please sign in to comment.