Skip to content

Commit

Permalink
feat(k8s_tagger): Added pagination when fetching pods from the k8s API
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Oct 21, 2024
1 parent 8c4f441 commit 5d2f991
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 11 deletions.
1 change: 1 addition & 0 deletions .changelog/1687.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(k8s_tagger): Added pagination when fetching pods from the k8s API
4 changes: 2 additions & 2 deletions pkg/processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func selectors() (labels.Selector, fields.Selector) {

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(
_ *zap.Logger,
logger *zap.Logger,
apiCfg k8sconfig.APIConfig,
rules kube.ExtractionRules,
filters kube.Filters,
Expand All @@ -65,7 +65,7 @@ func newFakeClient(
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
Informer: kube.NewFakeInformer(logger, cs, "", ls, fs),
StopCh: make(chan struct{}),
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func New(
fieldSelector = addNodeSelector(fieldSelector, filters.Node)
}

c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
c.informer = newInformer(logger, c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
return c, err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/kube/fake_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -33,6 +34,7 @@ type FakeInformer struct {
}

func NewFakeInformer(
logger *zap.Logger,
_ kubernetes.Interface,
namespace string,
labelSelector labels.Selector,
Expand Down
28 changes: 23 additions & 5 deletions pkg/processor/k8sprocessor/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kube
import (
"context"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -30,21 +31,23 @@ import (
// InformerProvider defines a function type that returns a new SharedInformer. It is used to
// allow passing custom shared informers to the watch client.
type InformerProvider func(
logger *zap.Logger,
client kubernetes.Interface,
namespace string,
labelSelector labels.Selector,
fieldSelector fields.Selector,
) cache.SharedInformer

func newSharedInformer(
logger *zap.Logger,
client kubernetes.Interface,
namespace string,
ls labels.Selector,
fs fields.Selector,
) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: informerListFuncWithSelectors(client, namespace, ls, fs),
ListFunc: informerListFuncWithSelectors(logger, client, namespace, ls, fs),
WatchFunc: informerWatchFuncWithSelectors(client, namespace, ls, fs),
},
&api_v1.Pod{},
Expand All @@ -53,11 +56,26 @@ func newSharedInformer(
return informer
}

func informerListFuncWithSelectors(client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc {
func informerListFuncWithSelectors(logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc {
return func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
return client.CoreV1().Pods(namespace).List(context.Background(), opts)
podList := &api_v1.PodList{}
for {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
opts.Limit = 20
opts.ResourceVersion = ""
pods, err := client.CoreV1().Pods(namespace).List(context.Background(), opts)
if err != nil {
return nil, err
}
podList.Items = append(podList.Items, pods.Items...)
if pods.Continue == "" {
break
}
opts.Continue = pods.Continue
logger.Debug("Fetching more pods", zap.String("continue", pods.Continue))
}
return podList, nil
}

}
Expand Down
13 changes: 10 additions & 3 deletions pkg/processor/k8sprocessor/kube/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/selection"
Expand All @@ -31,9 +32,11 @@ import (
func Test_newSharedInformer(t *testing.T) {
labelSelector, fieldSelector, err := selectorsFromFilters(Filters{})
require.NoError(t, err)
logger, err := zap.NewDevelopment()
require.NoError(t, err)
client, err := newFakeAPIClientset(k8sconfig.APIConfig{})
require.NoError(t, err)
informer := newSharedInformer(client, "testns", labelSelector, fieldSelector)
informer := newSharedInformer(logger, client, "testns", labelSelector, fieldSelector)
assert.NotNil(t, informer)
}

Expand All @@ -57,7 +60,9 @@ func Test_informerListFuncWithSelectors(t *testing.T) {
assert.NoError(t, err)
c, err := newFakeAPIClientset(k8sconfig.APIConfig{})
assert.NoError(t, err)
listFunc := informerListFuncWithSelectors(c, "test-ns", ls, fs)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
listFunc := informerListFuncWithSelectors(logger, c, "test-ns", ls, fs)
opts := metav1.ListOptions{}
obj, err := listFunc(opts)
assert.NoError(t, err)
Expand Down Expand Up @@ -95,7 +100,9 @@ func Test_fakeInformer(t *testing.T) {
// nothing real to test here. just to make coverage happy
c, err := newFakeAPIClientset(k8sconfig.APIConfig{})
assert.NoError(t, err)
i := NewFakeInformer(c, "ns", nil, nil)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
i := NewFakeInformer(logger, c, "ns", nil, nil)
_, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second)
assert.NoError(t, err)
i.HasSynced()
Expand Down

0 comments on commit 5d2f991

Please sign in to comment.