From f5075a0993b93c68158757cd77f6dcff3f4a0fcf Mon Sep 17 00:00:00 2001 From: Michi Mutsuzaki Date: Mon, 21 Oct 2024 22:07:48 +0000 Subject: [PATCH] Centralize the logic to set node name Centralize the logic to set the NodeName fields for event to avoid code duplication. Initialize the nodeName variable once in the node package init() function. Signed-off-by: Michi Mutsuzaki --- pkg/eventcache/eventcache.go | 10 +++------ pkg/exporter/exporter_test.go | 2 ++ pkg/grpc/exec/exec.go | 15 ++++---------- pkg/grpc/process_manager.go | 5 ++--- pkg/grpc/process_manager_test.go | 11 ---------- pkg/grpc/test/test.go | 8 +------- pkg/grpc/tracing/tracing.go | 33 ++++++++++-------------------- pkg/process/process.go | 4 +--- pkg/ratelimit/ratelimit.go | 9 ++++---- pkg/reader/node/node.go | 35 ++++++++++++++++++++++++-------- pkg/reader/node/node_test.go | 35 ++++++++++++++++++++++++++++++++ 11 files changed, 91 insertions(+), 76 deletions(-) create mode 100644 pkg/reader/node/node_test.go diff --git a/pkg/eventcache/eventcache.go b/pkg/eventcache/eventcache.go index c086bbc0601..3d695ede59b 100644 --- a/pkg/eventcache/eventcache.go +++ b/pkg/eventcache/eventcache.go @@ -12,7 +12,6 @@ import ( "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/cilium/tetragon/pkg/server" ) @@ -25,8 +24,7 @@ const ( ) var ( - cache *Cache - nodeName string + cache *Cache ) type CacheObj struct { @@ -145,9 +143,8 @@ func (ec *Cache) handleEvents() { if event.msg.Notify() { processedEvent := &tetragon.GetEventsResponse{ - Event: event.event.Encapsulate(), - NodeName: nodeName, - Time: ktime.ToProto(event.timestamp), + Event: event.event.Encapsulate(), + Time: ktime.ToProto(event.timestamp), } ec.server.NotifyListeners(event.msg, processedEvent) @@ -231,7 +228,6 @@ func NewWithTimer(s *server.Server, dur time.Duration) *Cache { server: s, dur: dur, } - nodeName = node.GetNodeNameForExport() go cache.loop() return cache } diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index a9518d5672b..383d8d1d4c8 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -16,6 +16,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/ratelimit" + "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/rthooks" "github.com/cilium/tetragon/pkg/server" "github.com/stretchr/testify/assert" @@ -171,6 +172,7 @@ func Test_rateLimitExport(t *testing.T) { } defer os.Unsetenv(hubbleNodeNameEnv) } + node.SetNodeName() tests := []struct { name string diff --git a/pkg/grpc/exec/exec.go b/pkg/grpc/exec/exec.go index 1e24f86e1fe..b36b8ba65e7 100644 --- a/pkg/grpc/exec/exec.go +++ b/pkg/grpc/exec/exec.go @@ -18,16 +18,11 @@ import ( "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" readerexec "github.com/cilium/tetragon/pkg/reader/exec" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/wrapperspb" ) -var ( - nodeName = node.GetNodeNameForExport() -) - const ( ParentRefCnt = 0 ProcessRefCnt = 1 @@ -245,9 +240,8 @@ func (msg *MsgExecveEventUnix) HandleMessage() *tetragon.GetEventsResponse { if e := GetProcessExec(msg, true); e != nil { res = &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e}, + Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime), } } return res @@ -468,9 +462,8 @@ func (msg *MsgExitEventUnix) HandleMessage() *tetragon.GetEventsResponse { e := GetProcessExit(msg) if e != nil { res = &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e}, + Time: ktime.ToProto(msg.Common.Ktime), } } return res diff --git a/pkg/grpc/process_manager.go b/pkg/grpc/process_manager.go index 0474e0f320c..d789b71f772 100644 --- a/pkg/grpc/process_manager.go +++ b/pkg/grpc/process_manager.go @@ -22,8 +22,7 @@ import ( // ProcessManager maintains a cache of processes from tetragon exec events. type ProcessManager struct { - nodeName string - Server *server.Server + Server *server.Server // synchronize access to the listeners map. mux sync.Mutex listeners map[server.Listener]struct{} @@ -37,7 +36,6 @@ func NewProcessManager( hookRunner *rthooks.Runner, ) (*ProcessManager, error) { pm := &ProcessManager{ - nodeName: node.GetNodeNameForExport(), listeners: make(map[server.Listener]struct{}), } @@ -85,6 +83,7 @@ func (pm *ProcessManager) RemoveListener(listener server.Listener) { func (pm *ProcessManager) NotifyListener(original interface{}, processed *tetragon.GetEventsResponse) { pm.mux.Lock() defer pm.mux.Unlock() + node.SetCommonFields(processed) for l := range pm.listeners { l.Notify(processed) } diff --git a/pkg/grpc/process_manager_test.go b/pkg/grpc/process_manager_test.go index 06976e84bd3..59c344273e7 100644 --- a/pkg/grpc/process_manager_test.go +++ b/pkg/grpc/process_manager_test.go @@ -17,7 +17,6 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/api/processapi" "github.com/cilium/tetragon/pkg/process" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/rthooks" "github.com/cilium/tetragon/pkg/watcher" "github.com/stretchr/testify/assert" @@ -212,16 +211,6 @@ func TestProcessManager_GetProcessExec(t *testing.T) { exec.GetProcessExec(pi, false).Process.BinaryProperties) } -func Test_getNodeNameForExport(t *testing.T) { - assert.NotEqual(t, "", node.GetNodeNameForExport()) // we should get the hostname here - assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name")) - assert.Equal(t, "from-node-name", node.GetNodeNameForExport()) - assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name")) - assert.Equal(t, "from-hubble-node-name", node.GetNodeNameForExport()) - assert.NoError(t, os.Unsetenv("NODE_NAME")) - assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME")) -} - func TestProcessManager_GetProcessID(t *testing.T) { assert.NoError(t, os.Setenv("NODE_NAME", "my-node")) diff --git a/pkg/grpc/test/test.go b/pkg/grpc/test/test.go index 8e554c61a3f..abd2e3b5ea6 100644 --- a/pkg/grpc/test/test.go +++ b/pkg/grpc/test/test.go @@ -11,14 +11,9 @@ import ( "github.com/cilium/tetragon/pkg/ktime" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/process" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" ) -var ( - nodeName = node.GetNodeNameForExport() -) - type MsgTestEventUnix struct { Msg *testapi.MsgTestEvent } @@ -46,8 +41,7 @@ func (msg *MsgTestEventUnix) HandleMessage() *tetragon.GetEventsResponse { Arg2: msg.Msg.Arg2, Arg3: msg.Msg.Arg3, }}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Time: ktime.ToProto(msg.Msg.Common.Ktime), } default: logger.GetLogger().WithField("message", msg).Warn("HandleTestMessage: Unhandled event") diff --git a/pkg/grpc/tracing/tracing.go b/pkg/grpc/tracing/tracing.go index e0176435df7..d5aef59730c 100644 --- a/pkg/grpc/tracing/tracing.go +++ b/pkg/grpc/tracing/tracing.go @@ -24,17 +24,12 @@ import ( "github.com/cilium/tetragon/pkg/reader/bpf" "github.com/cilium/tetragon/pkg/reader/caps" "github.com/cilium/tetragon/pkg/reader/network" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/cilium/tetragon/pkg/reader/path" "github.com/cilium/tetragon/pkg/tracingpolicy" "google.golang.org/protobuf/types/known/wrapperspb" ) -var ( - nodeName = node.GetNodeNameForExport() -) - func kprobeAction(act uint64) tetragon.KprobeAction { switch act { case tracingapi.ActionPost: @@ -576,9 +571,8 @@ func (msg *MsgGenericTracepointUnix) HandleMessage() *tetragon.GetEventsResponse } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -624,9 +618,8 @@ func (msg *MsgGenericKprobeUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -715,8 +708,7 @@ func (msg *MsgProcessLoaderUnix) HandleMessage() *tetragon.GetEventsResponse { } LoaderMetricInc(LoaderResolvedImm) return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k}, - NodeName: nodeName, + Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k}, } } @@ -823,9 +815,8 @@ func (msg *MsgGenericUprobeUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -867,9 +858,8 @@ func (msg *MsgGenericLsmUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -993,9 +983,8 @@ func (msg *MsgProcessThrottleUnix) HandleMessage() *tetragon.GetEventsResponse { Cgroup: msg.Cgroup, } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Ktime), + Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event}, + Time: ktime.ToProto(msg.Ktime), } } diff --git a/pkg/process/process.go b/pkg/process/process.go index 1f2c4610005..03321669846 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -65,7 +65,6 @@ type ProcessInternal struct { } var ( - nodeName string procCache *Cache k8s watcher.K8sResourceWatcher ) @@ -81,7 +80,6 @@ func InitCache(w watcher.K8sResourceWatcher, size int) error { FreeCache() } - nodeName = node.GetNodeNameForExport() k8s = w procCache, err = NewCache(size) if err != nil { @@ -261,7 +259,7 @@ func UpdateEventProcessTid(process *tetragon.Process, tid *uint32) { } func GetProcessID(pid uint32, ktime uint64) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", nodeName, ktime, pid))) + return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", node.GetNodeNameForExport(), ktime, pid))) } func GetExecID(proc *tetragonAPI.MsgProcess) string { diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index 52c5512a4e2..ab8fc60a274 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -53,15 +53,16 @@ func (r *RateLimiter) reportRateLimitInfo(encoder encoder.EventEncoder) { case <-ticker.C: dropped := atomic.SwapUint64(&r.dropped, 0) if dropped > 0 { - err := encoder.Encode(&tetragon.GetEventsResponse{ + ev := tetragon.GetEventsResponse{ Event: &tetragon.GetEventsResponse_RateLimitInfo{ RateLimitInfo: &tetragon.RateLimitInfo{ NumberOfDroppedProcessEvents: dropped, }, }, - NodeName: node.GetNodeNameForExport(), - Time: timestamppb.New(time.Now()), - }) + Time: timestamppb.New(time.Now()), + } + node.SetCommonFields(&ev) + err := encoder.Encode(&ev) if err != nil { logger.GetLogger(). WithError(err). diff --git a/pkg/reader/node/node.go b/pkg/reader/node/node.go index 9db51ef61d4..2f5013f5724 100644 --- a/pkg/reader/node/node.go +++ b/pkg/reader/node/node.go @@ -6,18 +6,23 @@ package node import ( "os" + "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/logger" ) -// getNodeNameForExport returns node name string for JSON export. It uses NODE_NAME -// env variable by default, which is also used by k8s watcher to watch for local pods: -// -// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32 -// -// Set HUBBLE_NODE_NAME to override the node_name field for JSON export. -func GetNodeNameForExport() string { +var ( + nodeName string +) + +func init() { + SetNodeName() +} + +// SetNodeName initializes the nodeName variable. It's defined separately from +// init() so that it can be called from unit tests. +func SetNodeName() { var err error - nodeName := os.Getenv("HUBBLE_NODE_NAME") + nodeName = os.Getenv("HUBBLE_NODE_NAME") if nodeName == "" { nodeName = os.Getenv("NODE_NAME") } @@ -27,5 +32,19 @@ func GetNodeNameForExport() string { logger.GetLogger().WithError(err).Warn("failed to retrieve hostname") } } +} + +// GetNodeNameForExport returns node name string for JSON export. It uses NODE_NAME +// env variable by default, which is also used by k8s watcher to watch for local pods: +// +// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32 +// +// Set HUBBLE_NODE_NAME to override the node_name field for JSON export. +func GetNodeNameForExport() string { return nodeName } + +// SetCommonFields set fields that are common in all the events. +func SetCommonFields(ev *tetragon.GetEventsResponse) { + ev.NodeName = nodeName +} diff --git a/pkg/reader/node/node_test.go b/pkg/reader/node/node_test.go new file mode 100644 index 00000000000..128cb9bdf77 --- /dev/null +++ b/pkg/reader/node/node_test.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package node + +import ( + "os" + "testing" + + "github.com/cilium/tetragon/api/v1/tetragon" + "github.com/stretchr/testify/assert" +) + +func TestGetNodeNameForExport(t *testing.T) { + assert.NotEqual(t, "", GetNodeNameForExport()) // we should get the hostname here + assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name")) + SetNodeName() + assert.Equal(t, "from-node-name", GetNodeNameForExport()) + assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name")) + SetNodeName() + assert.Equal(t, "from-hubble-node-name", GetNodeNameForExport()) + assert.NoError(t, os.Unsetenv("NODE_NAME")) + assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME")) +} + +func TestSetCommonFields(t *testing.T) { + ev := tetragon.GetEventsResponse{} + assert.Empty(t, ev.NodeName) + nodeName := "my-node-name" + assert.NoError(t, os.Setenv("NODE_NAME", nodeName)) + SetNodeName() + SetCommonFields(&ev) + assert.Equal(t, nodeName, ev.GetNodeName()) + assert.NoError(t, os.Unsetenv("NODE_NAME")) +}