Skip to content

Commit

Permalink
Only watch metadata for ReplicaSets in K8s (#41100)
Browse files Browse the repository at this point in the history
* Bump github.com/elastic/elastic-agent-autodiscover to v0.9.0

* Only watch metadata for ReplicaSets in k8s autodiscovery

* Only watch metadata for ReplicaSets in add_kubernetes_metadata processor

* Fix linter warnings

* Merge changelog entries
  • Loading branch information
swiatekm authored Oct 14, 2024
1 parent 9fde7b0 commit ee780d2
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 41 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions.
- Update to Go 1.22.7. {pull}41018[41018]
- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942]

- Reduce memory consumption of k8s autodiscovery and the add_kubernetes_metadata processor when Deployment metadata is enabled

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12575,11 +12575,11 @@ various licenses:

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-autodiscover
Version: v0.8.2
Version: v0.9.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.8.2/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.9.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ require (
github.com/dgraph-io/badger/v4 v4.2.1-0.20240828131336-2725dc8ed5c2
github.com/elastic/bayeux v1.0.5
github.com/elastic/ebpfevents v0.6.0
github.com/elastic/elastic-agent-autodiscover v0.8.2
github.com/elastic/elastic-agent-autodiscover v0.9.0
github.com/elastic/elastic-agent-libs v0.12.1
github.com/elastic/elastic-agent-system-metrics v0.11.1
github.com/elastic/go-elasticsearch/v8 v8.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr
github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY=
github.com/elastic/ebpfevents v0.6.0 h1:BrL3m7JFK7U6h2jkbk3xAWWs//IZnugCHEDds5u2v68=
github.com/elastic/ebpfevents v0.6.0/go.mod h1:ESG9gw7N+n5yCCMgdg1IIJENKWSmX7+X0Fi9GUs9nvU=
github.com/elastic/elastic-agent-autodiscover v0.8.2 h1:Fs2FhR33AMBPfm5/jz4drVzaEZaqOIHlDBvGtkUZdIk=
github.com/elastic/elastic-agent-autodiscover v0.8.2/go.mod h1:VZnU53EVaFTxR8Xf6YsLN8FHD5DKQzHSPlKax9/4w+o=
github.com/elastic/elastic-agent-autodiscover v0.9.0 h1:+iWIKh0u3e8I+CJa3FfWe9h0JojNasPgYIA47gpuuns=
github.com/elastic/elastic-agent-autodiscover v0.9.0/go.mod h1:5iUxLHhVdaGSWYTveSwfJEY4RqPXTG13LPiFoxcpFd4=
github.com/elastic/elastic-agent-client/v7 v7.15.0 h1:nDB7v8TBoNuD6IIzC3z7Q0y+7bMgXoT2DsHfolO2CHE=
github.com/elastic/elastic-agent-client/v7 v7.15.0/go.mod h1:6h+f9QdIr3GO2ODC0Y8+aEXRwzbA5W4eV4dd/67z7nI=
github.com/elastic/elastic-agent-libs v0.12.1 h1:5jkxMx15Bna8cq7/Sz/XUIVUXfNWiJ80iSk4ICQ7KJ0=
Expand Down
71 changes: 45 additions & 26 deletions libbeat/autodiscover/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/gofrs/uuid/v5"
k8s "k8s.io/client-go/kubernetes"

Expand Down Expand Up @@ -135,11 +137,23 @@ func NewPodEventer(uuid uuid.UUID, cfg *conf.C, client k8s.Interface, publish fu
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
logger.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
logger.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
Expand Down Expand Up @@ -225,23 +239,26 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
var kubeMeta, container mapstr.M

annotations := make(mapstr.M, 0)
rawMeta, ok := event["kubernetes"]
if ok {
kubeMeta = rawMeta.(mapstr.M)
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
anns, _ := rawAnn.(mapstr.M)
if len(anns) != 0 {
annotations = anns.Clone()
rawMeta, found := event["kubernetes"]
if found {
kubeMetaMap, ok := rawMeta.(mapstr.M)
if ok {
kubeMeta = kubeMetaMap
// The builder base config can configure any of the field values of kubernetes if need be.
e["kubernetes"] = kubeMeta
if rawAnn, ok := kubeMeta["annotations"]; ok {
anns, _ := rawAnn.(mapstr.M)
if len(anns) != 0 {
annotations = anns.Clone()
}
}
}

// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
namespaceAnnotations, _ := rawNsAnn.(mapstr.M)
if len(namespaceAnnotations) != 0 {
annotations.DeepUpdateNoOverwrite(namespaceAnnotations)
// Look at all the namespace level default annotations and do a merge with priority going to the pod annotations.
if rawNsAnn, ok := kubeMeta["namespace_annotations"]; ok {
namespaceAnnotations, _ := rawNsAnn.(mapstr.M)
if len(namespaceAnnotations) != 0 {
annotations.DeepUpdateNoOverwrite(namespaceAnnotations)
}
}
}
}
Expand All @@ -255,12 +272,14 @@ func (p *pod) GenerateHints(event bus.Event) bus.Event {
e["ports"] = ports
}

if rawCont, ok := kubeMeta["container"]; ok {
container = rawCont.(mapstr.M)
// This would end up adding a runtime entry into the event. This would make sure
// that there is not an attempt to spin up a docker input for a rkt container and when a
// rkt input exists it would be natively supported.
e["container"] = container
if rawCont, found := kubeMeta["container"]; found {
if containerMap, ok := rawCont.(mapstr.M); ok {
container = containerMap
// This would end up adding a runtime entry into the event. This would make sure
// that there is not an attempt to spin up a docker input for a rkt container and when a
// rkt input exists it would be natively supported.
e["container"] = container
}
}

cname := utils.GetContainerName(container)
Expand Down
13 changes: 12 additions & 1 deletion libbeat/autodiscover/providers/kubernetes/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ import (

func TestGenerateHints(t *testing.T) {
tests := []struct {
name string
event bus.Event
result bus.Event
}{
// Empty events should return empty hints
{
name: "empty",
event: bus.Event{},
result: bus.Event{},
},
// Only kubernetes payload must return only kubernetes as part of the hint
{
name: "only kubernetes",
event: bus.Event{
"kubernetes": mapstr.M{
"pod": mapstr.M{
Expand All @@ -71,6 +74,7 @@ func TestGenerateHints(t *testing.T) {
},
// Kubernetes payload with container info must be bubbled to top level
{
name: "kubernetes container info top level",
event: bus.Event{
"kubernetes": mapstr.M{
"container": mapstr.M{
Expand Down Expand Up @@ -102,6 +106,7 @@ func TestGenerateHints(t *testing.T) {
// not.to.include must not be part of hints
// period is annotated at both container and pod level. Container level value must be in hints
{
name: "multiple hints",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -163,6 +168,7 @@ func TestGenerateHints(t *testing.T) {
// Have one set of hints come from the pod and the other come from namespaces
// The resultant hints should have a combination of both
{
name: "hints from Pod and Namespace",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -227,6 +233,7 @@ func TestGenerateHints(t *testing.T) {
// Have one set of hints come from the pod and the same keys come from namespaces
// The resultant hints should honor only pods and not namespace.
{
name: "pod hints win over namespace",
event: bus.Event{
"kubernetes": mapstr.M{
"annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -288,6 +295,7 @@ func TestGenerateHints(t *testing.T) {
// Have no hints on the pod and have namespace level defaults.
// The resultant hints should honor only namespace defaults.
{
name: "namespace defaults",
event: bus.Event{
"kubernetes": mapstr.M{
"namespace_annotations": getNestedAnnotations(mapstr.M{
Expand Down Expand Up @@ -339,7 +347,10 @@ func TestGenerateHints(t *testing.T) {
logger: logp.NewLogger("kubernetes.pod"),
}
for _, test := range tests {
assert.Equal(t, p.GenerateHints(test.event), test.result)
test := test
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.result, p.GenerateHints(test.event))
})
}
}

Expand Down
30 changes: 22 additions & 8 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"

k8sclient "k8s.io/client-go/kubernetes"

"github.com/elastic/elastic-agent-autodiscover/kubernetes"
Expand Down Expand Up @@ -235,11 +237,23 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {
// Deployment -> Replicaset -> Pod
// CronJob -> job -> Pod
if metaConf.Deployment {
replicaSetWatcher, err = kubernetes.NewNamedWatcher("resource_metadata_enricher_rs", client, &kubernetes.ReplicaSet{}, kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
}, nil)
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
k.log.Errorf("Error creating metadata client due to error %+v", err)
}
replicaSetWatcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
kubernetes.WatchOptions{
SyncTimeout: config.SyncPeriod,
Namespace: config.Namespace,
HonorReSyncs: true,
},
nil,
metadata.RemoveUnnecessaryReplicaSetData,
)
if err != nil {
k.log.Errorf("Error creating watcher for %T due to error %+v", &kubernetes.ReplicaSet{}, err)
}
Expand Down Expand Up @@ -268,15 +282,15 @@ func (k *kubernetesAnnotator) init(config kubeAnnotatorConfig, cfg *config.C) {

watcher.AddEventHandler(kubernetes.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
pod, _ := obj.(*kubernetes.Pod)
k.addPod(pod)
},
UpdateFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
pod, _ := obj.(*kubernetes.Pod)
k.updatePod(pod)
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*kubernetes.Pod)
pod, _ := obj.(*kubernetes.Pod)
k.removePod(pod)
},
})
Expand Down

0 comments on commit ee780d2

Please sign in to comment.