From 7cc2ef5a3f9f02fd6d0c4756df663a8ccebfd2b5 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 11 Dec 2024 19:51:56 +0545 Subject: [PATCH] chore: use priority queue from commons --- go.mod | 11 ++------ go.sum | 18 ++---------- scrapers/incremental.go | 10 ++----- scrapers/kubernetes/informers.go | 40 +++++++++++++++++---------- scrapers/kubernetes/informers_test.go | 22 +++++++++++---- 5 files changed, 49 insertions(+), 52 deletions(-) diff --git a/go.mod b/go.mod index 20a11a7f..fb031f19 100644 --- a/go.mod +++ b/go.mod @@ -47,10 +47,9 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 github.com/aws/aws-sdk-go-v2/service/support v1.24.3 github.com/aws/smithy-go v1.22.1 - github.com/emirpasic/gods v1.18.1 github.com/evanphx/json-patch v5.7.0+incompatible github.com/flanksource/artifacts v1.0.14 - github.com/flanksource/commons v1.32.1 + github.com/flanksource/commons v1.34.0 github.com/flanksource/duty v1.0.769 github.com/flanksource/is-healthy v1.0.53 github.com/flanksource/ketall v1.1.7 @@ -117,10 +116,8 @@ require ( github.com/asecurityteam/rolling v2.0.4+incompatible // indirect github.com/aws/aws-sdk-go-v2/config v1.27.29 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.29 // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cert-manager/cert-manager v1.9.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -129,6 +126,7 @@ require ( github.com/distribution/reference v0.5.0 // indirect github.com/eko/gocache/lib/v4 v4.1.6 // indirect github.com/eko/gocache/store/go_cache/v4 v4.2.2 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/emirpasic/gods/v2 v2.0.0-alpha // indirect github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/exaring/otelpgx v0.6.2 // indirect @@ -162,7 +160,6 @@ require ( github.com/hashicorp/hcl/v2 v2.21.0 // indirect github.com/henvic/httpretty v0.1.3 // indirect github.com/hirochachacha/go-smb2 v1.1.0 // indirect - github.com/invopop/jsonschema v0.12.0 // indirect github.com/itchyny/gojq v0.12.16 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect @@ -207,11 +204,7 @@ require ( github.com/vadimi/go-http-ntlm v1.0.3 // indirect github.com/vadimi/go-http-ntlm/v2 v2.4.1 // indirect github.com/vadimi/go-ntlm v1.2.1 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect - github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect - github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect - github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect diff --git a/go.sum b/go.sum index 6d9a0934..a5fa347f 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,6 @@ github.com/aws/aws-sdk-go-v2/service/support v1.24.3 h1:Bbesu6YZvEYACyydELMwUTYY github.com/aws/aws-sdk-go-v2/service/support v1.24.3/go.mod h1:NvXUhACskXZ2tiXzECpC/97xKzyY7/Wcc1ug5rla7kY= github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -853,8 +851,6 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0= github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= @@ -972,8 +968,8 @@ github.com/fergusstrange/embedded-postgres v1.25.0 h1:sa+k2Ycrtz40eCRPOzI7Ry7Ttk github.com/fergusstrange/embedded-postgres v1.25.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw= github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z5JYCTJU= github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70= -github.com/flanksource/commons v1.32.1 h1:380cRHbbjGoxHO8DbiqTrBz7XYw/qUjDIwKDcCNZqmU= -github.com/flanksource/commons v1.32.1/go.mod h1:gO/d401JGFx10+6/9V+PXiGAAVUwxrcLgkke1tGyyNU= +github.com/flanksource/commons v1.34.0 h1:cT9VYWMJDE/KSoPa71UUr1pl764MWBVI1PmrhFSc7B8= +github.com/flanksource/commons v1.34.0/go.mod h1:gO/d401JGFx10+6/9V+PXiGAAVUwxrcLgkke1tGyyNU= github.com/flanksource/duty v1.0.769 h1:5EOBis382RhFLNkP2hVhYFAnXUdxRXm3GdCUgjYi+hY= github.com/flanksource/duty v1.0.769/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= @@ -1320,8 +1316,6 @@ github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+h github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI= -github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0= github.com/itchyny/gojq v0.12.13/go.mod h1:JzwzAqenfhrPUuwbmEz3nu3JQmFLlQTQMUcOdnu/Sf4= github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= @@ -1766,16 +1760,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM= github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= -github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= -github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= diff --git a/scrapers/incremental.go b/scrapers/incremental.go index 78fa2cb7..0ea58d03 100644 --- a/scrapers/incremental.go +++ b/scrapers/incremental.go @@ -9,7 +9,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - pq "github.com/emirpasic/gods/queues/priorityqueue" + "github.com/flanksource/commons/collections" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" "github.com/flanksource/config-db/db" @@ -32,7 +32,7 @@ func consumeKubernetesWatchJobKey(id string) string { // ConsumeKubernetesWatchJobFunc returns a job that consumes kubernetes objects received from shared informers // for the given config of the scrapeconfig. -func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, queue *pq.Queue) *job.Job { +func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, queue *collections.Queue[*kubernetes.QueueItem]) *job.Job { return &job.Job{ Name: "ConsumeKubernetesWatch", Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta), @@ -64,7 +64,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q ) for { - val, more := queue.Dequeue() + queueItem, more := queue.Dequeue() if !more { break } @@ -75,10 +75,6 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q break } - queueItem, ok := val.(*kubernetes.QueueItem) - if !ok { - return fmt.Errorf("unexpected item in the priority queue: %T", val) - } obj := queueItem.Obj queuedTime[string(obj.GetUID())] = queueItem.Timestamp diff --git a/scrapers/kubernetes/informers.go b/scrapers/kubernetes/informers.go index b12dcbb5..e67c6339 100644 --- a/scrapers/kubernetes/informers.go +++ b/scrapers/kubernetes/informers.go @@ -7,7 +7,7 @@ import ( "sync" "time" - pq "github.com/emirpasic/gods/queues/priorityqueue" + "github.com/flanksource/commons/collections" "github.com/flanksource/commons/logger" "github.com/flanksource/config-db/api" v1 "github.com/flanksource/config-db/api/v1" @@ -34,10 +34,24 @@ var ( ) // WatchResources watches Kubernetes resources with shared informers -func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*pq.Queue, error) { - priorityQueue := pq.NewWith(pqComparator) +func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*collections.Queue[*QueueItem], error) { + priorityQueue, err := collections.NewQueue(collections.QueueOpts[*QueueItem]{ + Metrics: collections.MetricsOpts[*QueueItem]{ + Name: "shared_informer", + Labels: map[string]any{ + "scraper_id": ctx.ScraperID(), + }, + }, + Comparator: pqComparator, + Equals: queueItemIsEqual, + Dedupe: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to create queue: %w", err) + } + if loaded, ok := WatchQueue.LoadOrStore(config.Hash(), priorityQueue); ok { - priorityQueue = loaded.(*pq.Queue) + priorityQueue = loaded.(*collections.Queue[*QueueItem]) } if config.Kubeconfig != nil { @@ -86,7 +100,7 @@ type SharedInformerManager struct { type DeleteObjHandler func(ctx context.Context, id string) error -func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error { +func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *collections.Queue[*QueueItem]) error { registrationTime := time.Now() apiVersion, kind := watchResource.ApiVersion, watchResource.Kind @@ -385,21 +399,19 @@ func NewQueueItem(obj *unstructured.Unstructured, operation QueueItemOperation) } } -func pqComparator(a, b any) int { - if a == nil || b == nil { - return 0 - } - - qa := a.(*QueueItem) - qb := b.(*QueueItem) +func queueItemIsEqual(qa, qb *QueueItem) bool { + return qa.Obj.GetUID() == qb.Obj.GetUID() +} +func pqComparator(qa, qb *QueueItem) int { if qa.Obj.GetUID() == qb.Obj.GetUID() { resourceVersionA, ok, _ := unstructured.NestedString(qa.Obj.Object, "metadata", "resourceVersion") if ok { resourceVersionB, _, _ := unstructured.NestedString(qb.Obj.Object, "metadata", "resourceVersion") - // Because of the way we are deduping, we want the earlier version in front of the queue. - return strings.Compare(resourceVersionA, resourceVersionB) + // Because of the way we are deduping, we want the latest version in front of the queue. + // the later versions are discarded. + return strings.Compare(resourceVersionB, resourceVersionA) } } diff --git a/scrapers/kubernetes/informers_test.go b/scrapers/kubernetes/informers_test.go index 0ff0811e..ecc8b696 100644 --- a/scrapers/kubernetes/informers_test.go +++ b/scrapers/kubernetes/informers_test.go @@ -6,7 +6,8 @@ import ( "testing" "time" - "github.com/emirpasic/gods/queues/priorityqueue" + "github.com/flanksource/commons/collections" + "github.com/flanksource/commons/hash" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -166,13 +167,23 @@ func TestPqComparator(t *testing.T) { Obj: getUnstructuredWithResourceVersion("Pod", "a", "2c6a2f24-0199-435d-83a6-bd3f6d18d06d", "4", now.Add(-1*time.Hour)), }, }, - expected: []string{"a-1", "a-2", "a-3", "a-4"}, + expected: []string{"a-4"}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - q := priorityqueue.NewWith(pqComparator) + q, err := collections.NewQueue(collections.QueueOpts[*QueueItem]{ + Metrics: collections.MetricsOpts[*QueueItem]{ + Name: fmt.Sprintf("m_%s", hash.Sha256Hex(tt.name)[:10]), + }, + Comparator: pqComparator, + Equals: queueItemIsEqual, + Dedupe: true, + }) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } for _, item := range tt.Items { q.Enqueue(&item) @@ -180,13 +191,11 @@ func TestPqComparator(t *testing.T) { var result []string for { - v, ok := q.Dequeue() + item, ok := q.Dequeue() if !ok { break } - item := v.(*QueueItem) - resourceVersion, ok, _ := unstructured.NestedString(item.Obj.Object, "metadata", "resourceVersion") if ok { result = append(result, fmt.Sprintf("%s-%s", item.Obj.GetName(), resourceVersion)) @@ -207,6 +216,7 @@ func getUnstructuredEvent(kind, name string, creationTimestamp, recreationTimest Object: map[string]any{ "kind": kind, "metadata": map[string]any{ + "uid": uuid.NewString(), "name": name, "creationTimestamp": creationTimestamp.Format(time.RFC3339), "managedFields": []any{