Skip to content

Commit

Permalink
Make the limit in the List API call configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
rnishtala-sumo committed Oct 22, 2024
1 parent a57e19c commit f9a6ae6
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func newFakeClient(
_ kube.InformerProvider,
_ kube.OwnerProvider,
_ string,
_ int,
_ time.Duration,
_ time.Duration,
) (kube.Client, error) {
Expand All @@ -65,7 +66,7 @@ func newFakeClient(
Rules: rules,
Filters: filters,
Associations: associations,
Informer: kube.NewFakeInformer(logger, cs, "", ls, fs),
Informer: kube.NewFakeInformer(logger, cs, "", ls, fs, 10),
StopCh: make(chan struct{}),
}, nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Config struct {
// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`

// Limit is the page size for the list of pods to fetch from the API.
Limit int `mapstructure:"limit"`
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -234,6 +237,9 @@ type PodAssociationConfig struct {
// DefaultDelimiter is default value for Delimiter for ExtractConfig
const DefaultDelimiter string = ", "

// DefaultLimit is default value for Limit for Config
const DefaultLimit int = 200

// ExcludeConfig represent a list of Pods to exclude
type ExcludeConfig struct {
Pods []ExcludePodConfig `mapstructure:"pods"`
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestLoadConfig(t *testing.T) {
assert.EqualValues(t,
&Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Limit: 200,
Extract: ExtractConfig{Delimiter: ", "},
},
p0,
Expand All @@ -53,6 +54,7 @@ func TestLoadConfig(t *testing.T) {
assert.EqualValues(t,
&Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeKubeConfig},
Limit: 200,
Passthrough: false,
OwnerLookupEnabled: true,
Extract: ExtractConfig{
Expand Down
3 changes: 3 additions & 0 deletions pkg/processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewFactory() processor.Factory {
func createDefaultConfig() component.Config {
return &Config{
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Limit: DefaultLimit,
Extract: ExtractConfig{
Delimiter: DefaultDelimiter,
},
Expand Down Expand Up @@ -210,6 +211,8 @@ func createProcessorOpts(cfg component.Config) []Option {

opts = append(opts, WithDelimiter(oCfg.Extract.Delimiter))

opts = append(opts, WithLimit(oCfg.Limit))

opts = append(opts, WithExcludes(oCfg.Exclude))

return opts
Expand Down
5 changes: 4 additions & 1 deletion pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type WatchClient struct {
stopCh chan struct{}
op OwnerAPI
delimiter string
limit int

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Expand All @@ -67,6 +68,7 @@ func New(
newInformer InformerProvider,
newOwnerProviderFunc OwnerProvider,
delimiter string,
limit int,
deleteInterval time.Duration,
gracePeriod time.Duration,
) (Client, error) {
Expand All @@ -78,6 +80,7 @@ func New(
Exclude: exclude,
stopCh: make(chan struct{}),
delimiter: delimiter,
limit: limit,
Pods: map[PodIdentifier]*Pod{},
}
go c.deleteLoop(deleteInterval, gracePeriod)
Expand Down Expand Up @@ -122,7 +125,7 @@ func New(
fieldSelector = addNodeSelector(fieldSelector, filters.Node)
}

c.informer = newInformer(logger, c.kc, c.Filters.Namespace, labelSelector, fieldSelector)
c.informer = newInformer(logger, c.kc, c.Filters.Namespace, labelSelector, fieldSelector, c.limit)
return c, err
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestDefaultClientset(t *testing.T) {
nil,
nil,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand All @@ -138,6 +139,7 @@ func TestDefaultClientset(t *testing.T) {
nil,
nil,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand All @@ -157,6 +159,7 @@ func TestBadFilters(t *testing.T) {
NewFakeInformer,
newFakeOwnerProvider,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down Expand Up @@ -205,6 +208,7 @@ func TestConstructorErrors(t *testing.T) {
NewFakeInformer,
newFakeOwnerProvider,
"",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down Expand Up @@ -1176,6 +1180,7 @@ func Test_PodsGetAddedAndDeletedFromCache(t *testing.T) {
newSharedInformer,
newOwnerProvider,
"_",
10,
10*time.Millisecond,
10*time.Millisecond,
)
Expand Down Expand Up @@ -1293,6 +1298,7 @@ func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters
NewFakeInformer,
newFakeOwnerProvider,
"_",
10,
30*time.Second,
DefaultPodDeleteGracePeriod,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/kube/fake_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewFakeInformer(
namespace string,
labelSelector labels.Selector,
fieldSelector fields.Selector,
limit int,
) cache.SharedInformer {
return &FakeInformer{
FakeController: &FakeController{},
Expand Down
15 changes: 11 additions & 4 deletions pkg/processor/k8sprocessor/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kube

import (
"context"
"time"

"go.uber.org/zap"
api_v1 "k8s.io/api/core/v1"
Expand All @@ -36,6 +37,7 @@ type InformerProvider func(
namespace string,
labelSelector labels.Selector,
fieldSelector fields.Selector,
limit int,
) cache.SharedInformer

func newSharedInformer(
Expand All @@ -44,10 +46,11 @@ func newSharedInformer(
namespace string,
ls labels.Selector,
fs fields.Selector,
limit int,
) cache.SharedInformer {
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: informerListFuncWithSelectors(logger, client, namespace, ls, fs),
ListFunc: informerListFuncWithSelectors(logger, client, namespace, ls, fs, limit),
WatchFunc: informerWatchFuncWithSelectors(client, namespace, ls, fs),
},
&api_v1.Pod{},
Expand All @@ -56,16 +59,20 @@ func newSharedInformer(
return informer
}

func informerListFuncWithSelectors(logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc {
func informerListFuncWithSelectors(logger *zap.Logger, client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector, limit int) cache.ListFunc {
return func(opts metav1.ListOptions) (runtime.Object, error) {
podList := &api_v1.PodList{}
logger.Info("Limiting page size to:", zap.Int("limit", limit))
deadline := time.Now().Add(60 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
for {
opts.LabelSelector = ls.String()
opts.FieldSelector = fs.String()
opts.Limit = 200
opts.Limit = int64(limit)
// Unset resource version to get the latest data
opts.ResourceVersion = ""
pods, err := client.CoreV1().Pods(namespace).List(context.Background(), opts)
pods, err := client.CoreV1().Pods(namespace).List(ctx, opts)
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/processor/k8sprocessor/kube/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func Test_newSharedInformer(t *testing.T) {
require.NoError(t, err)
client, err := newFakeAPIClientset(k8sconfig.APIConfig{})
require.NoError(t, err)
informer := newSharedInformer(logger, client, "testns", labelSelector, fieldSelector)
informer := newSharedInformer(logger, client, "testns", labelSelector, fieldSelector, 10)
assert.NotNil(t, informer)
}

Expand All @@ -62,7 +62,7 @@ func Test_informerListFuncWithSelectors(t *testing.T) {
assert.NoError(t, err)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
listFunc := informerListFuncWithSelectors(logger, c, "test-ns", ls, fs)
listFunc := informerListFuncWithSelectors(logger, c, "test-ns", ls, fs, 10)
opts := metav1.ListOptions{}
obj, err := listFunc(opts)
assert.NoError(t, err)
Expand Down Expand Up @@ -102,7 +102,7 @@ func Test_fakeInformer(t *testing.T) {
assert.NoError(t, err)
logger, err := zap.NewDevelopment()
assert.NoError(t, err)
i := NewFakeInformer(logger, c, "ns", nil, nil)
i := NewFakeInformer(logger, c, "ns", nil, nil, 10)
_, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second)
assert.NoError(t, err)
i.HasSynced()
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type ClientProvider func(
InformerProvider,
OwnerProvider,
string,
int,
time.Duration,
time.Duration,
) (Client, error)
Expand Down
8 changes: 8 additions & 0 deletions pkg/processor/k8sprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ func WithDelimiter(delimiter string) Option {
}
}

// WithLimit sets the limit to use by kubernetesprocessor
func WithLimit(limit int) Option {
return func(p *kubernetesprocessor) error {
p.limit = limit
return nil
}
}

// WithExcludes allows specifying pods to exclude
func WithExcludes(excludeConfig ExcludeConfig) Option {
return func(p *kubernetesprocessor) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type kubernetesprocessor struct {
podAssociations []kube.Association
podIgnore kube.Excludes
delimiter string
limit int
}

func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kube.ClientProvider) error {
Expand All @@ -62,6 +63,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub
nil,
nil,
kp.delimiter,
kp.limit,
30*time.Second,
kube.DefaultPodDeleteGracePeriod,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func TestProcessorBadClientProvider(t *testing.T) {
_ kube.InformerProvider,
_ kube.OwnerProvider,
_ string,
_ int,
_ time.Duration,
_ time.Duration,
) (kube.Client, error) {
Expand Down

0 comments on commit f9a6ae6

Please sign in to comment.