Skip to content

Commit

Permalink
feat(k8s_tagger): Added pagination when fetching pods from the k8s Li…
Browse files Browse the repository at this point in the history
…st API (#1689)

* feat(k8s_tagger): Added pagination when fetching pods from the k8s API

* Make the limit in the List API call configurable

* doc(k8s_tagger): Add the limit field to the readme
  • Loading branch information
rnishtala-sumo authored Oct 22, 2024
1 parent 8c4f441 commit f05c06a
Show file tree
Hide file tree
Showing 16 changed files with 87 additions and 11 deletions.
1 change: 1 addition & 0 deletions .changelog/1689.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(k8s_tagger): Added pagination when fetching pods from the k8s API
4 changes: 4 additions & 0 deletions pkg/processor/k8sprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ it with the in-memory data. If a match is found, the cached metadata is added to
```yaml
processors:
k8s_tagger:
# Limit page size when fetching pods from the k8s API
# default: 200
limit: 300

# List of exclusion rules. For now it's possible to specify
# a list of pod name regexes who's records should not be enriched with metadata.
# default: []
Expand Down
5 changes: 3 additions & 2 deletions pkg/processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func selectors() (labels.Selector, fields.Selector) {

// newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type
func newFakeClient(
_ *zap.Logger,
logger *zap.Logger,
apiCfg k8sconfig.APIConfig,
rules kube.ExtractionRules,
filters kube.Filters,
Expand All @@ -54,6 +54,7 @@ func newFakeClient(
_ kube.InformerProvider,
_ kube.OwnerProvider,
_ string,
_ int,
_ time.Duration,
_ time.Duration,
) (kube.Client, error) {
Expand All @@ -65,7 +66,7 @@ func newFakeClient(
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(cs, "", ls, fs),
Informer: kube.NewFakeInformer(logger, cs, "", ls, fs, 10),
StopCh: make(chan struct{}),
}, nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Config struct {
// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`

// Limit is the page size for the list of pods to fetch from the API.
Limit int `mapstructure:"limit"`
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -234,6 +237,9 @@ type PodAssociationConfig struct {
// DefaultDelimiter is default value for Delimiter for ExtractConfig
const DefaultDelimiter string = ", "

// DefaultLimit is default value for Limit for Config
const DefaultLimit int = 200

// ExcludeConfig represent a list of Pods to exclude
type ExcludeConfig struct {
Pods []ExcludePodConfig `mapstructure:"pods"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestLoadConfig(t *testing.T) {
assert.EqualValues(t,
&Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Limit: 200,
Extract: ExtractConfig{Delimiter: ", "},
},
p0,
Expand All @@ -53,6 +54,7 @@ func TestLoadConfig(t *testing.T) {
assert.EqualValues(t,
&Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
Limit: 200,
Passthrough: false,
OwnerLookupEnabled: true,
Extract: ExtractConfig{
Expand Down
3 changes: 3 additions & 0 deletions pkg/processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewFactory() processor.Factory {
func createDefaultConfig() component.Config {
return &Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Limit: DefaultLimit,
Extract: ExtractConfig{
Delimiter: DefaultDelimiter,
},
Expand Down Expand Up @@ -210,6 +211,8 @@ func createProcessorOpts(cfg component.Config) []Option {

opts = append(opts, WithDelimiter(oCfg.Extract.Delimiter))

opts = append(opts, WithLimit(oCfg.Limit))

opts = append(opts, WithExcludes(oCfg.Exclude))

return opts
Expand Down
5 changes: 4 additions & 1 deletion pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type WatchClient struct {
stopCh chan struct{}
op OwnerAPI
delimiter string
limit int

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand All @@ -67,6 +68,7 @@ func New(
newInformer InformerProvider,
newOwnerProviderFunc OwnerProvider,
delimiter string,
limit int,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (Client, error) {
Expand All @@ -78,6 +80,7 @@ func New(
Exclude: exclude,
stopCh: make(chan struct{}),
delimiter: delimiter,
limit: limit,
Pods: map[PodIdentifier]*Pod{},
}
go c.deleteLoop(deleteInterval, gracePeriod)
Expand Down Expand Up @@ -122,7 +125,7 @@ func New(
fieldSelector = addNodeSelector(fieldSelector, filters.Node)
}

c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
c.informer = newInformer(logger, c.kc, c.Filters.Namespace, labelSelector, fieldSelector, c.limit)
return c, err
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestDefaultClientset(t *testing.T) {
nil,
nil,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand All @@ -138,6 +139,7 @@ func TestDefaultClientset(t *testing.T) {
nil,
nil,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand All @@ -157,6 +159,7 @@ func TestBadFilters(t *testing.T) {
NewFakeInformer,
newFakeOwnerProvider,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down Expand Up @@ -205,6 +208,7 @@ func TestConstructorErrors(t *testing.T) {
NewFakeInformer,
newFakeOwnerProvider,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down Expand Up @@ -1176,6 +1180,7 @@ func Test_PodsGetAddedAndDeletedFromCache(t *testing.T) {
newSharedInformer,
newOwnerProvider,
"_",
10,
10*time.Millisecond,
10*time.Millisecond,
)
Expand Down Expand Up @@ -1293,6 +1298,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters
NewFakeInformer,
newFakeOwnerProvider,
"_",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down
3 changes: 3 additions & 0 deletions pkg/processor/k8sprocessor/kube/fake_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sync"
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -33,10 +34,12 @@ type FakeInformer struct {
}

func NewFakeInformer(
logger *zap.Logger,
_ kubernetes.Interface,
namespace string,
labelSelector labels.Selector,
fieldSelector fields.Selector,
limit int,
) cache.SharedInformer {
return &FakeInformer{
FakeController: &FakeController{},
Expand Down
36 changes: 31 additions & 5 deletions pkg/processor/k8sprocessor/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package kube

import (
"context"
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand All @@ -30,21 +32,25 @@ import (
// InformerProvider defines a function type that returns a new SharedInformer. It is used to
// allow passing custom shared informers to the watch client.
type InformerProvider func(
logger *zap.Logger,
client kubernetes.Interface,
namespace string,
labelSelector labels.Selector,
fieldSelector fields.Selector,
limit int,
) cache.SharedInformer

func newSharedInformer(
logger *zap.Logger,
client kubernetes.Interface,
namespace string,
ls labels.Selector,
fs fields.Selector,
limit int,
) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: informerListFuncWithSelectors(client, namespace, ls, fs),
ListFunc: informerListFuncWithSelectors(logger, client, namespace, ls, fs, limit),
WatchFunc: informerWatchFuncWithSelectors(client, namespace, ls, fs),
},
&api_v1.Pod{},
Expand All @@ -53,11 +59,30 @@ func newSharedInformer(
return informer
}

func informerListFuncWithSelectors(client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc {
func informerListFuncWithSelectors(logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector, limit int) cache.ListFunc {
return func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
return client.CoreV1().Pods(namespace).List(context.Background(), opts)
podList := &api_v1.PodList{}
logger.Info("Limiting page size to:", zap.Int("limit", limit))
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60*time.Second))
defer cancel()
for {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
opts.Limit = int64(limit)
// Unset resource version to get the latest data
opts.ResourceVersion = ""
pods, err := client.CoreV1().Pods(namespace).List(ctx, opts)
if err != nil {
return nil, err
}
podList.Items = append(podList.Items, pods.Items...)
if pods.Continue == "" {
break
}
opts.Continue = pods.Continue
logger.Debug("Fetching more pods", zap.String("continue", pods.Continue))
}
return podList, nil
}

}
Expand All @@ -66,6 +91,7 @@ func informerWatchFuncWithSelectors(client kubernetes.Interface, namespace strin
return func(opts metav1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
opts.ResourceVersion = ""
return client.CoreV1().Pods(namespace).Watch(context.Background(), opts)
}
}
13 changes: 10 additions & 3 deletions pkg/processor/k8sprocessor/kube/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/selection"
Expand All @@ -31,9 +32,11 @@ import (
func Test_newSharedInformer(t *testing.T) {
labelSelector, fieldSelector, err := selectorsFromFilters(Filters{})
require.NoError(t, err)
logger, err := zap.NewDevelopment()
require.NoError(t, err)
client, err := newFakeAPIClientset(k8sconfig.APIConfig{})
require.NoError(t, err)
informer := newSharedInformer(client, "testns", labelSelector, fieldSelector)
informer := newSharedInformer(logger, client, "testns", labelSelector, fieldSelector, 10)
assert.NotNil(t, informer)
}

Expand All @@ -57,7 +60,9 @@ func Test_informerListFuncWithSelectors(t *testing.T) {
assert.NoError(t, err)
c, err := newFakeAPIClientset(k8sconfig.APIConfig{})
assert.NoError(t, err)
listFunc := informerListFuncWithSelectors(c, "test-ns", ls, fs)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
listFunc := informerListFuncWithSelectors(logger, c, "test-ns", ls, fs, 10)
opts := metav1.ListOptions{}
obj, err := listFunc(opts)
assert.NoError(t, err)
Expand Down Expand Up @@ -95,7 +100,9 @@ func Test_fakeInformer(t *testing.T) {
// nothing real to test here. just to make coverage happy
c, err := newFakeAPIClientset(k8sconfig.APIConfig{})
assert.NoError(t, err)
i := NewFakeInformer(c, "ns", nil, nil)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
i := NewFakeInformer(logger, c, "ns", nil, nil, 10)
_, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second)
assert.NoError(t, err)
i.HasSynced()
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type ClientProvider func(
InformerProvider,
OwnerProvider,
string,
int,
time.Duration,
time.Duration,
) (Client, error)
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/kube/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func newOwnerProvider(
informers.WithTweakListOptions(func(opts *meta_v1.ListOptions) {
opts.LabelSelector = labelSelector.String()
opts.FieldSelector = fieldSelector.String()
// Unset resource version to get the latest data
opts.ResourceVersion = ""
}))

ownerCache.addNamespaceInformer(factory)
Expand Down
8 changes: 8 additions & 0 deletions pkg/processor/k8sprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ func WithDelimiter(delimiter string) Option {
}
}

// WithLimit sets the limit to use by kubernetesprocessor
func WithLimit(limit int) Option {
return func(p *kubernetesprocessor) error {
p.limit = limit
return nil
}
}

// WithExcludes allows specifying pods to exclude
func WithExcludes(excludeConfig ExcludeConfig) Option {
return func(p *kubernetesprocessor) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type kubernetesprocessor struct {
podAssociations []kube.Association
podIgnore kube.Excludes
delimiter string
limit int
}

func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kube.ClientProvider) error {
Expand All @@ -62,6 +63,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub
nil,
nil,
kp.delimiter,
kp.limit,
30*time.Second,
kube.DefaultPodDeleteGracePeriod,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func TestProcessorBadClientProvider(t *testing.T) {
_ kube.InformerProvider,
_ kube.OwnerProvider,
_ string,
_ int,
_ time.Duration,
_ time.Duration,
) (kube.Client, error) {
Expand Down

0 comments on commit f05c06a

Please sign in to comment.