From 741f7598a28fe09ff9faf956ba991a1bc67125fc Mon Sep 17 00:00:00 2001 From: Aditya Thebe <contact@adityathebe.com> Date: Tue, 11 Jun 2024 15:14:37 +0545 Subject: [PATCH] fix: add logs to ConsumeKubernetesWatchResourcesJob --- db/config.go | 8 ++++---- db/update.go | 4 ++++ scrapers/cron.go | 16 ++++++++++++---- scrapers/kubernetes/informers.go | 6 +++--- 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/db/config.go b/db/config.go index 8b15ced6..74fb304d 100644 --- a/db/config.go +++ b/db/config.go @@ -211,10 +211,10 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du return ci, nil } -func SoftDeleteConfigItems(ctx context.Context, ids ...string) error { - return ctx.DB(). +func SoftDeleteConfigItems(ctx context.Context, ids ...string) (int, error) { + tx := ctx.DB(). Model(&models.ConfigItem{}). Where("id IN ?", ids). - Update("deleted_at", gorm.Expr("NOW()")). - Error + Update("deleted_at", gorm.Expr("NOW()")) + return int(tx.RowsAffected), tx.Error } diff --git a/db/update.go b/db/update.go index 208d9b83..a12733c7 100644 --- a/db/update.go +++ b/db/update.go @@ -327,6 +327,10 @@ func SavePartialResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error } func saveResults(ctx api.ScrapeContext, isPartialResultSet bool, results []v1.ScrapeResult) error { + if len(results) == 0 { + return nil + } + startTime, err := GetCurrentDBTime(ctx) if err != nil { return fmt.Errorf("unable to get current db time: %w", err) diff --git a/scrapers/cron.go b/scrapers/cron.go index 12e326d0..4280283b 100644 --- a/scrapers/cron.go +++ b/scrapers/cron.go @@ -14,6 +14,7 @@ import ( "github.com/robfig/cron/v3" "github.com/samber/lo" "github.com/sethvargo/go-retry" + "go.opentelemetry.io/otel/attribute" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" @@ -321,13 +322,20 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube if !ok { return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash()) } - deletChan := _deleteCh.(chan string) + deleteChan := _deleteCh.(chan string) - if len(deletChan) > 0 { - deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan)) - if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...); err != nil { + if len(deleteChan) > 0 { + deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan)) + + total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...) + if err != nil { return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err) + } else if total != len(deletedResourcesIDs) { + ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs)) + ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total) } + + ctx.History.SuccessCount += total } return nil diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index 83061d03..57e03b80 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -59,7 +59,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("added: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, UpdateFunc: func(oldObj any, newObj any) { @@ -69,7 +69,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("updated: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("updated: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) buffer <- u }, DeleteFunc: func(obj any) { @@ -79,7 +79,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, kubeconfig strin return } - ctx.Logger.V(2).Infof("deleted: %s %s", u.GetKind(), u.GetName()) + ctx.Logger.V(2).Infof("deleted: %s %s %s", u.GetUID(), u.GetKind(), u.GetName()) deleteBuffer <- string(u.GetUID()) }, })