Skip to content

Commit

Permalink
fix: use event.type=deleted to know when a job & its pod is deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed May 20, 2024
1 parent 75da805 commit 6864e87
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 15 deletions.
3 changes: 2 additions & 1 deletion api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func (e ExternalID) WhereClause(db *gorm.DB) *gorm.DB {
type ConfigDeleteReason string

var (
// DeletedReasonStale is used when a config item doesn't get updated on a scrape run.
// DeletedReasonStale is used when a config item doesn't get updated
// for a period of `staleItemAge`.
DeletedReasonStale ConfigDeleteReason = "STALE"

// DeletedReasonFromAttribute is used when a deletion field (& reason)
Expand Down
13 changes: 13 additions & 0 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/ohler55/ojg/oj"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

Expand Down Expand Up @@ -211,3 +212,15 @@ func FindConfigChangesByItemID(ctx api.ScrapeContext, configItemID string) ([]du

return ci, nil
}

func SoftDeleteConfigItems(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}

return ctx.DB().
Model(&models.ConfigItem{}).
Where("id IN (?)", ids).
Update("deleted_at", gorm.Expr("NOW()")).
Error
}
20 changes: 18 additions & 2 deletions scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,12 @@ func ConsumeKubernetesWatchEventsJobFunc(sc api.ScrapeContext, config v1.Kuberne
ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name),
ResourceType: job.ResourceTypeScraper,
Fn: func(ctx job.JobRuntime) error {
ch, ok := kubernetes.WatchEventBuffers[config.Hash()]
_ch, ok := kubernetes.WatchEventBuffers.Load(config.Hash())
if !ok {
return fmt.Errorf("no watcher found for config (scrapeconfig: %s) %s", scrapeConfig.GetUID(), config.Hash())
}

ch := _ch.(chan v1.KubernetesEvent)
events, _, _, _ := lo.Buffer(ch, len(ch))

cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History)
Expand Down Expand Up @@ -273,10 +275,11 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
ID: fmt.Sprintf("%s/%s", sc.ScrapeConfig().Namespace, sc.ScrapeConfig().Name),
ResourceType: job.ResourceTypeScraper,
Fn: func(ctx job.JobRuntime) error {
ch, ok := kubernetes.WatchResourceBuffer[config.Hash()]
_ch, ok := kubernetes.WatchResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
ch := _ch.(chan *unstructured.Unstructured)
objs, _, _, _ := lo.Buffer(ch, len(ch))

// NOTE: The resource watcher can return multiple objects for the same NEW resource.
Expand Down Expand Up @@ -308,6 +311,19 @@ func ConsumeKubernetesWatchResourcesJobFunc(sc api.ScrapeContext, config v1.Kube
}
}

_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
deletChan := _deleteCh.(chan string)

if len(deletChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deletChan, len(deletChan))
if err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs); err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
}
}

return nil
},
}
Expand Down
22 changes: 17 additions & 5 deletions scrapers/kubernetes/events_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
"strings"
"sync"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/types"
Expand All @@ -24,18 +25,25 @@ var (
BufferSize = 5000

// WatchEventBuffers stores a sync buffer per kubernetes config
WatchEventBuffers = make(map[string]chan v1.KubernetesEvent)
WatchEventBuffers = sync.Map{}

WatchResourceBufferSize = 5000

// WatchEventBuffers stores a sync buffer per kubernetes config
WatchResourceBuffer = make(map[string]chan *unstructured.Unstructured)
WatchResourceBuffer = sync.Map{}

// DeleteResourceBuffer stores a buffer per kubernetes config
// that contains the ids of resources that have been deleted.
DeleteResourceBuffer = sync.Map{}
)

// WatchResources watches Kubernetes resources
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
buffer := make(chan *unstructured.Unstructured, ctx.DutyContext().Properties().Int("kubernetes.watch.resources.bufferSize", WatchResourceBufferSize))
WatchResourceBuffer[config.Hash()] = buffer
WatchResourceBuffer.Store(config.Hash(), buffer)

deleteBuffer := make(chan string, WatchResourceBufferSize)
DeleteResourceBuffer.Store(config.Hash(), deleteBuffer)

var restConfig *rest.Config
var err error
Expand Down Expand Up @@ -70,7 +78,11 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
for watchEvent := range utils.MergeChannels(channels...) {
obj, ok := watchEvent.Object.(*unstructured.Unstructured)
if ok {
buffer <- obj
if watchEvent.Type == watch.Deleted {
deleteBuffer <- string(obj.GetUID())
} else {
buffer <- obj
}
}
}

Expand All @@ -81,7 +93,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) error {
// the referenced config items in batches.
func WatchEvents(ctx api.ScrapeContext, config v1.Kubernetes) error {
buffer := make(chan v1.KubernetesEvent, ctx.DutyContext().Properties().Int("kubernetes.watch.events.bufferSize", BufferSize))
WatchEventBuffers[config.Hash()] = buffer
WatchEventBuffers.Store(config.Hash(), buffer)

if config.Kubeconfig != nil {
var err error
Expand Down
8 changes: 1 addition & 7 deletions scrapers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,8 @@ func ExtractResults(ctx context.Context, config v1.Kubernetes, objs []*unstructu
resourceHealth = &health.HealthStatus{}
}

createdAt := obj.GetCreationTimestamp().Time
var deletedAt *time.Time
var deleteReason v1.ConfigDeleteReason
if !obj.GetDeletionTimestamp().IsZero() {
deletedAt = &obj.GetDeletionTimestamp().Time
deleteReason = v1.DeletedReasonFromAttribute
}

// Evicted Pods must be considered deleted
if obj.GetKind() == "Pod" {
if objStatus, ok := obj.Object["status"].(map[string]any); ok {
Expand Down Expand Up @@ -437,7 +431,7 @@ func ExtractResults(ctx context.Context, config v1.Kubernetes, objs []*unstructu
Health: models.Health(resourceHealth.Health),
Ready: resourceHealth.Ready,
Description: resourceHealth.Message,
CreatedAt: &createdAt,
CreatedAt: lo.ToPtr(obj.GetCreationTimestamp().Time),
DeletedAt: deletedAt,
DeleteReason: deleteReason,
Config: configObj,
Expand Down

0 comments on commit 6864e87

Please sign in to comment.