Skip to content

Commit

Permalink
fix: metrics updates
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Dec 6, 2024
1 parent 218ff33 commit d73fb20
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
2 changes: 2 additions & 0 deletions api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,6 @@ var (
// DeletedReasonFromDeleteField is used when a deletion field (& reason)
// is picked up from the JSONPath expression provided in the scraper config.
DeletedReasonFromDeleteField ConfigDeleteReason = "FROM_DELETE_FIELD"

DeleteReasonEvent ConfigDeleteReason = "FROM_EVENT"
)
10 changes: 7 additions & 3 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/lib/pq"
"github.com/ohler55/ojg/oj"
"github.com/samber/lo"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

Expand Down Expand Up @@ -224,10 +223,15 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du
return ci, nil
}

func SoftDeleteConfigItems(ctx context.Context, ids ...string) (int, error) {
func SoftDeleteConfigItems(ctx context.Context, reason v1.ConfigDeleteReason, ids ...string) (int, error) {
tx := ctx.DB().
Model(&models.ConfigItem{}).
Where("id IN ?", ids).
Update("deleted_at", gorm.Expr("NOW()"))
UpdateColumns(
map[string]any{
"deleted_at": duty.Now(),
"delete_reason": reason,
},
)
return int(tx.RowsAffected), tx.Error
}
31 changes: 22 additions & 9 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/flanksource/config-db/scrapers/kubernetes"
"github.com/flanksource/config-db/utils/kube"
"github.com/flanksource/duty/job"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q

var (
objs []*unstructured.Unstructured
deletedObjects []string
deletedObjects []*unstructured.Unstructured
queuedTime = map[string]time.Time{}

seenObjects = map[string]struct{}{}
Expand All @@ -77,7 +78,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
queuedTime[string(obj.GetUID())] = queueItem.Timestamp

if queueItem.Operation == kubernetes.QueueItemOperationDelete {
deletedObjects = append(deletedObjects, string(obj.GetUID()))
deletedObjects = append(deletedObjects, obj)
continue
}

Expand Down Expand Up @@ -152,7 +153,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
}
}

func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured, deletedResourcesIDs []string) error {
func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs, deletedResources []*unstructured.Unstructured) error {
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := processObjects(cc, config, objs)
Expand All @@ -174,17 +175,29 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v
}
}

if len(deletedResourcesIDs) > 0 {
total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...)
if len(deletedResources) > 0 {
deletedResourceIDs := lo.Map(deletedResources, func(item *unstructured.Unstructured, _ int) string {
return string(item.GetUID())
})

total, err := db.SoftDeleteConfigItems(ctx.Context, v1.DeleteReasonEvent, deletedResourceIDs...)
if err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
} else if total != len(deletedResourcesIDs) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourcesIDs))
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResources), err)
} else if total != len(deletedResources) {
ctx.GetSpan().SetAttributes(attribute.StringSlice("deletedResourcesIDs", deletedResourceIDs))
if cc.PropertyOn(false, "log.missing") {
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResourcesIDs), total)
ctx.Logger.Warnf("attempted to delete %d resources but only deleted %d", len(deletedResources), total)
}
}

for _, c := range deletedResources {
ctx.Counter("scraper_deleted",
"scraper_id", cc.ScraperID(),
"kind", kubernetes.ConfigTypePrefix+c.GetKind(),
"reason", string(v1.DeleteReasonEvent),
).Add(1)
}

ctx.History.SuccessCount += total
}

Expand Down
31 changes: 19 additions & 12 deletions scrapers/stale.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flanksource/commons/duration"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db/models"
"github.com/flanksource/duty/context"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -40,22 +41,28 @@ func DeleteStaleConfigItems(ctx context.Context, staleTimeout string, scraperID
}

deleteQuery := `
UPDATE config_items
SET
deleted_at = NOW(),
delete_reason = ?
WHERE
((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND
deleted_at IS NULL AND
scraper_id = ?`

result := ctx.DB().Exec(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID)
UPDATE config_items
SET
deleted_at = NOW(),
delete_reason = ?
WHERE
((NOW() - last_scraped_time) > INTERVAL '1 SECOND' * ?) AND
deleted_at IS NULL AND
scraper_id = ?
RETURNING type`

var deletedConfigs []models.ConfigItem
result := ctx.DB().Raw(deleteQuery, v1.DeletedReasonStale, staleDuration.Seconds(), scraperID).Scan(&deletedConfigs)
if err := result.Error; err != nil {
return 0, fmt.Errorf("failed to delete stale config items: %w", err)
}

if result.RowsAffected > 0 {
ctx.Logger.V(3).Infof("Deleted %d stale config items", result.RowsAffected)
if len(deletedConfigs) > 0 {
ctx.Logger.V(3).Infof("deleted %d stale config items for scraper: %s", len(deletedConfigs), scraperID)
}

for _, c := range deletedConfigs {
ctx.Counter("scraper_deleted", "scraper_id", scraperID.String(), "kind", c.Type, "reason", string(v1.DeletedReasonStale)).Add(1)
}

return result.RowsAffected, nil
Expand Down
2 changes: 1 addition & 1 deletion utils/kube/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
v1 "github.com/flanksource/config-db/api/v1"
)

var fetchDelayBuckets = []float64{500, 1_000, 3_000, 5_000, 10_000, 20_000, 30_000, 60_000}
var fetchDelayBuckets = []float64{10, 50, 100, 500, 1_000, 5_000, 10_000, 30_000, 60_000}

func FetchInvolvedObjects(ctx api.ScrapeContext, iObjs []v1.InvolvedObject) ([]*unstructured.Unstructured, error) {
clientMap := map[schema.GroupVersionKind]dynamic.NamespaceableResourceInterface{}
Expand Down

0 comments on commit d73fb20

Please sign in to comment.