From 5d2f9919fe5dd3cac9f098e2470646d18dff8735 Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Mon, 21 Oct 2024 14:47:40 -0400 Subject: [PATCH] feat(k8s_tagger): Added pagination when fetching pods from the k8s API --- .changelog/1687.added.txt | 1 + pkg/processor/k8sprocessor/client_test.go | 4 +-- pkg/processor/k8sprocessor/kube/client.go | 2 +- .../k8sprocessor/kube/fake_informer.go | 2 ++ pkg/processor/k8sprocessor/kube/informer.go | 28 +++++++++++++++---- .../k8sprocessor/kube/informer_test.go | 13 +++++++-- 6 files changed, 39 insertions(+), 11 deletions(-) create mode 100644 .changelog/1687.added.txt diff --git a/.changelog/1687.added.txt b/.changelog/1687.added.txt new file mode 100644 index 0000000000..caacd9544b --- /dev/null +++ b/.changelog/1687.added.txt @@ -0,0 +1 @@ +feat(k8s_tagger): Added pagination when fetching pods from the k8s API \ No newline at end of file diff --git a/pkg/processor/k8sprocessor/client_test.go b/pkg/processor/k8sprocessor/client_test.go index 519f7592f2..f24d68a081 100644 --- a/pkg/processor/k8sprocessor/client_test.go +++ b/pkg/processor/k8sprocessor/client_test.go @@ -44,7 +44,7 @@ func selectors() (labels.Selector, fields.Selector) { // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type func newFakeClient( - _ *zap.Logger, + logger *zap.Logger, apiCfg k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, @@ -65,7 +65,7 @@ func newFakeClient( Rules: rules, Filters: filters, Associations: associations, - Informer: kube.NewFakeInformer(cs, "", ls, fs), + Informer: kube.NewFakeInformer(logger, cs, "", ls, fs), StopCh: make(chan struct{}), }, nil } diff --git a/pkg/processor/k8sprocessor/kube/client.go b/pkg/processor/k8sprocessor/kube/client.go index 0fca7ee5a5..c3b4ec0cd0 100644 --- a/pkg/processor/k8sprocessor/kube/client.go +++ b/pkg/processor/k8sprocessor/kube/client.go @@ -122,7 +122,7 @@ func New( fieldSelector = addNodeSelector(fieldSelector, filters.Node) } - c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector) + c.informer = newInformer(logger, c.kc, c.Filters.Namespace, labelSelector, fieldSelector) return c, err } diff --git a/pkg/processor/k8sprocessor/kube/fake_informer.go b/pkg/processor/k8sprocessor/kube/fake_informer.go index da98475143..a35f0e0973 100644 --- a/pkg/processor/k8sprocessor/kube/fake_informer.go +++ b/pkg/processor/k8sprocessor/kube/fake_informer.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -33,6 +34,7 @@ type FakeInformer struct { } func NewFakeInformer( + logger *zap.Logger, _ kubernetes.Interface, namespace string, labelSelector labels.Selector, diff --git a/pkg/processor/k8sprocessor/kube/informer.go b/pkg/processor/k8sprocessor/kube/informer.go index 1ecfcd6526..99d27047c4 100644 --- a/pkg/processor/k8sprocessor/kube/informer.go +++ b/pkg/processor/k8sprocessor/kube/informer.go @@ -17,6 +17,7 @@ package kube import ( "context" + "go.uber.org/zap" api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -30,6 +31,7 @@ import ( // InformerProvider defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProvider func( + logger *zap.Logger, client kubernetes.Interface, namespace string, labelSelector labels.Selector, @@ -37,6 +39,7 @@ type InformerProvider func( ) cache.SharedInformer func newSharedInformer( + logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, @@ -44,7 +47,7 @@ func newSharedInformer( ) cache.SharedInformer { informer := cache.NewSharedInformer( &cache.ListWatch{ - ListFunc: informerListFuncWithSelectors(client, namespace, ls, fs), + ListFunc: informerListFuncWithSelectors(logger, client, namespace, ls, fs), WatchFunc: informerWatchFuncWithSelectors(client, namespace, ls, fs), }, &api_v1.Pod{}, @@ -53,11 +56,26 @@ func newSharedInformer( return informer } -func informerListFuncWithSelectors(client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc { +func informerListFuncWithSelectors(logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc { return func(opts metav1.ListOptions) (runtime.Object, error) { - opts.LabelSelector = ls.String() - opts.FieldSelector = fs.String() - return client.CoreV1().Pods(namespace).List(context.Background(), opts) + podList := &api_v1.PodList{} + for { + opts.LabelSelector = ls.String() + opts.FieldSelector = fs.String() + opts.Limit = 20 + opts.ResourceVersion = "" + pods, err := client.CoreV1().Pods(namespace).List(context.Background(), opts) + if err != nil { + return nil, err + } + podList.Items = append(podList.Items, pods.Items...) + if pods.Continue == "" { + break + } + opts.Continue = pods.Continue + logger.Debug("Fetching more pods", zap.String("continue", pods.Continue)) + } + return podList, nil } } diff --git a/pkg/processor/k8sprocessor/kube/informer_test.go b/pkg/processor/k8sprocessor/kube/informer_test.go index c3e51cc691..65f8532e17 100644 --- a/pkg/processor/k8sprocessor/kube/informer_test.go +++ b/pkg/processor/k8sprocessor/kube/informer_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/selection" @@ -31,9 +32,11 @@ import ( func Test_newSharedInformer(t *testing.T) { labelSelector, fieldSelector, err := selectorsFromFilters(Filters{}) require.NoError(t, err) + logger, err := zap.NewDevelopment() + require.NoError(t, err) client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) require.NoError(t, err) - informer := newSharedInformer(client, "testns", labelSelector, fieldSelector) + informer := newSharedInformer(logger, client, "testns", labelSelector, fieldSelector) assert.NotNil(t, informer) } @@ -57,7 +60,9 @@ func Test_informerListFuncWithSelectors(t *testing.T) { assert.NoError(t, err) c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) assert.NoError(t, err) - listFunc := informerListFuncWithSelectors(c, "test-ns", ls, fs) + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + listFunc := informerListFuncWithSelectors(logger, c, "test-ns", ls, fs) opts := metav1.ListOptions{} obj, err := listFunc(opts) assert.NoError(t, err) @@ -95,7 +100,9 @@ func Test_fakeInformer(t *testing.T) { // nothing real to test here. just to make coverage happy c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) assert.NoError(t, err) - i := NewFakeInformer(c, "ns", nil, nil) + logger, err := zap.NewDevelopment() + assert.NoError(t, err) + i := NewFakeInformer(logger, c, "ns", nil, nil) _, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second) assert.NoError(t, err) i.HasSynced()