From ee780d252892469931b887643c006714dff08b22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 14 Oct 2024 12:50:54 +0200 Subject: [PATCH] Only watch metadata for ReplicaSets in K8s (#41100) * Bump github.com/elastic/elastic-agent-autodiscover to v0.9.0 * Only watch metadata for ReplicaSets in k8s autodiscovery * Only watch metadata for ReplicaSets in add_kubernetes_metadata processor * Fix linter warnings * Merge changelog entries --- CHANGELOG.next.asciidoc | 2 +- NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- .../autodiscover/providers/kubernetes/pod.go | 71 ++++++++++++------- .../providers/kubernetes/pod_test.go | 13 +++- .../add_kubernetes_metadata/kubernetes.go | 30 +++++--- 7 files changed, 85 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d3022fce790..2118be15ec8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -236,7 +236,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions. - Update to Go 1.22.7. {pull}41018[41018] - Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942] - +- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 2ea1ac2107c..bb5807f9a41 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -12575,11 +12575,11 @@ various licenses: -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-autodiscover -Version: v0.8.2 +Version: v0.9.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.8.2/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.9.0/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 03ea8323624..c643f16b1fa 100644 --- a/go.mod +++ b/go.mod @@ -188,7 +188,7 @@ require ( github.com/dgraph-io/badger/v4 v4.2.1-0.20240828131336-2725dc8ed5c2 github.com/elastic/bayeux v1.0.5 github.com/elastic/ebpfevents v0.6.0 - github.com/elastic/elastic-agent-autodiscover v0.8.2 + github.com/elastic/elastic-agent-autodiscover v0.9.0 github.com/elastic/elastic-agent-libs v0.12.1 github.com/elastic/elastic-agent-system-metrics v0.11.1 github.com/elastic/go-elasticsearch/v8 v8.14.0 diff --git a/go.sum b/go.sum index 63a740c46dd..4f561fa3d6e 100644 --- a/go.sum +++ b/go.sum @@ -352,8 +352,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ebpfevents v0.6.0 h1:BrL3m7JFK7U6h2jkbk3xAWWs//IZnugCHEDds5u2v68= github.com/elastic/ebpfevents v0.6.0/go.mod h1:ESG9gw7N+n5yCCMgdg1IIJENKWSmX7+X0Fi9GUs9nvU= -github.com/elastic/elastic-agent-autodiscover v0.8.2 h1:Fs2FhR33AMBPfm5/jz4drVzaEZaqOIHlDBvGtkUZdIk= -github.com/elastic/elastic-agent-autodiscover v0.8.2/go.mod h1:VZnU53EVaFTxR8Xf6YsLN8FHD5DKQzHSPlKax9/4w+o= +github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0JojNasPgYIA47gpuuns= +github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4= github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE= github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI= github.com/elastic/elastic-agent-libs v0.12.1 h1:5jkxMx15Bna8cq7/Sz/XUIVUXfNWiJ80iSk4ICQ7KJ0= diff --git a/libbeat/autodiscover/providers/kubernetes/pod.go b/libbeat/autodiscover/providers/kubernetes/pod.go index be7179873ec..da018c6f6c2 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod.go +++ b/libbeat/autodiscover/providers/kubernetes/pod.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/gofrs/uuid/v5" k8s "k8s.io/client-go/kubernetes" @@ -135,11 +137,23 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - HonorReSyncs: true, - }, nil) + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + logger.Errorf("Error creating metadata client due to error %+v", err) + } + replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + HonorReSyncs: true, + }, + nil, + metadata.RemoveUnnecessaryReplicaSetData, + ) if err != nil { logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) } @@ -225,23 +239,26 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { var kubeMeta, container mapstr.M annotations := make(mapstr.M, 0) - rawMeta, ok := event["kubernetes"] - if ok { - kubeMeta = rawMeta.(mapstr.M) - // The builder base config can configure any of the field values of kubernetes if need be. - e["kubernetes"] = kubeMeta - if rawAnn, ok := kubeMeta["annotations"]; ok { - anns, _ := rawAnn.(mapstr.M) - if len(anns) != 0 { - annotations = anns.Clone() + rawMeta, found := event["kubernetes"] + if found { + kubeMetaMap, ok := rawMeta.(mapstr.M) + if ok { + kubeMeta = kubeMetaMap + // The builder base config can configure any of the field values of kubernetes if need be. + e["kubernetes"] = kubeMeta + if rawAnn, ok := kubeMeta["annotations"]; ok { + anns, _ := rawAnn.(mapstr.M) + if len(anns) != 0 { + annotations = anns.Clone() + } } - } - // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. - if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { - namespaceAnnotations, _ := rawNsAnn.(mapstr.M) - if len(namespaceAnnotations) != 0 { - annotations.DeepUpdateNoOverwrite(namespaceAnnotations) + // Look at all the namespace level default annotations and do a merge with priority going to the pod annotations. + if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok { + namespaceAnnotations, _ := rawNsAnn.(mapstr.M) + if len(namespaceAnnotations) != 0 { + annotations.DeepUpdateNoOverwrite(namespaceAnnotations) + } } } } @@ -255,12 +272,14 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event { e["ports"] = ports } - if rawCont, ok := kubeMeta["container"]; ok { - container = rawCont.(mapstr.M) - // This would end up adding a runtime entry into the event. This would make sure - // that there is not an attempt to spin up a docker input for a rkt container and when a - // rkt input exists it would be natively supported. - e["container"] = container + if rawCont, found := kubeMeta["container"]; found { + if containerMap, ok := rawCont.(mapstr.M); ok { + container = containerMap + // This would end up adding a runtime entry into the event. This would make sure + // that there is not an attempt to spin up a docker input for a rkt container and when a + // rkt input exists it would be natively supported. + e["container"] = container + } } cname := utils.GetContainerName(container) diff --git a/libbeat/autodiscover/providers/kubernetes/pod_test.go b/libbeat/autodiscover/providers/kubernetes/pod_test.go index bb8731275b3..a9e2179cea1 100644 --- a/libbeat/autodiscover/providers/kubernetes/pod_test.go +++ b/libbeat/autodiscover/providers/kubernetes/pod_test.go @@ -44,16 +44,19 @@ import ( func TestGenerateHints(t *testing.T) { tests := []struct { + name string event bus.Event result bus.Event }{ // Empty events should return empty hints { + name: "empty", event: bus.Event{}, result: bus.Event{}, }, // Only kubernetes payload must return only kubernetes as part of the hint { + name: "only kubernetes", event: bus.Event{ "kubernetes": mapstr.M{ "pod": mapstr.M{ @@ -71,6 +74,7 @@ func TestGenerateHints(t *testing.T) { }, // Kubernetes payload with container info must be bubbled to top level { + name: "kubernetes container info top level", event: bus.Event{ "kubernetes": mapstr.M{ "container": mapstr.M{ @@ -102,6 +106,7 @@ func TestGenerateHints(t *testing.T) { // not.to.include must not be part of hints // period is annotated at both container and pod level. Container level value must be in hints { + name: "multiple hints", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -163,6 +168,7 @@ func TestGenerateHints(t *testing.T) { // Have one set of hints come from the pod and the other come from namespaces // The resultant hints should have a combination of both { + name: "hints from Pod and Namespace", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -227,6 +233,7 @@ func TestGenerateHints(t *testing.T) { // Have one set of hints come from the pod and the same keys come from namespaces // The resultant hints should honor only pods and not namespace. { + name: "pod hints win over namespace", event: bus.Event{ "kubernetes": mapstr.M{ "annotations": getNestedAnnotations(mapstr.M{ @@ -288,6 +295,7 @@ func TestGenerateHints(t *testing.T) { // Have no hints on the pod and have namespace level defaults. // The resultant hints should honor only namespace defaults. { + name: "namespace defaults", event: bus.Event{ "kubernetes": mapstr.M{ "namespace_annotations": getNestedAnnotations(mapstr.M{ @@ -339,7 +347,10 @@ func TestGenerateHints(t *testing.T) { logger: logp.NewLogger("kubernetes.pod"), } for _, test := range tests { - assert.Equal(t, p.GenerateHints(test.event), test.result) + test := test + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.result, p.GenerateHints(test.event)) + }) } } diff --git a/libbeat/processors/add_kubernetes_metadata/kubernetes.go b/libbeat/processors/add_kubernetes_metadata/kubernetes.go index c22875ccf3c..7bb1ddd0905 100644 --- a/libbeat/processors/add_kubernetes_metadata/kubernetes.go +++ b/libbeat/processors/add_kubernetes_metadata/kubernetes.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/runtime/schema" + k8sclient "k8s.io/client-go/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes" @@ -235,11 +237,23 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { // Deployment -> Replicaset -> Pod // CronJob -> job -> Pod if metaConf.Deployment { - replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{ - SyncTimeout: config.SyncPeriod, - Namespace: config.Namespace, - HonorReSyncs: true, - }, nil) + metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions) + if err != nil { + k.log.Errorf("Error creating metadata client due to error %+v", err) + } + replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher( + "resource_metadata_enricher_rs", + client, + metadataClient, + schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}, + kubernetes.WatchOptions{ + SyncTimeout: config.SyncPeriod, + Namespace: config.Namespace, + HonorReSyncs: true, + }, + nil, + metadata.RemoveUnnecessaryReplicaSetData, + ) if err != nil { k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err) } @@ -268,15 +282,15 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) { watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) + pod, _ := obj.(*kubernetes.Pod) k.addPod(pod) }, UpdateFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) + pod, _ := obj.(*kubernetes.Pod) k.updatePod(pod) }, DeleteFunc: func(obj interface{}) { - pod := obj.(*kubernetes.Pod) + pod, _ := obj.(*kubernetes.Pod) k.removePod(pod) }, })