diff --git a/pkg/eventcache/eventcache.go b/pkg/eventcache/eventcache.go index c086bbc0601..3d695ede59b 100644 --- a/pkg/eventcache/eventcache.go +++ b/pkg/eventcache/eventcache.go @@ -12,7 +12,6 @@ import ( "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/cilium/tetragon/pkg/server" ) @@ -25,8 +24,7 @@ const ( ) var ( - cache *Cache - nodeName string + cache *Cache ) type CacheObj struct { @@ -145,9 +143,8 @@ func (ec *Cache) handleEvents() { if event.msg.Notify() { processedEvent := &tetragon.GetEventsResponse{ - Event: event.event.Encapsulate(), - NodeName: nodeName, - Time: ktime.ToProto(event.timestamp), + Event: event.event.Encapsulate(), + Time: ktime.ToProto(event.timestamp), } ec.server.NotifyListeners(event.msg, processedEvent) @@ -231,7 +228,6 @@ func NewWithTimer(s *server.Server, dur time.Duration) *Cache { server: s, dur: dur, } - nodeName = node.GetNodeNameForExport() go cache.loop() return cache } diff --git a/pkg/exporter/exporter_test.go b/pkg/exporter/exporter_test.go index a9518d5672b..383d8d1d4c8 100644 --- a/pkg/exporter/exporter_test.go +++ b/pkg/exporter/exporter_test.go @@ -16,6 +16,7 @@ import ( "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/encoder" "github.com/cilium/tetragon/pkg/ratelimit" + "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/rthooks" "github.com/cilium/tetragon/pkg/server" "github.com/stretchr/testify/assert" @@ -171,6 +172,7 @@ func Test_rateLimitExport(t *testing.T) { } defer os.Unsetenv(hubbleNodeNameEnv) } + node.SetNodeName() tests := []struct { name string diff --git a/pkg/grpc/exec/exec.go b/pkg/grpc/exec/exec.go index 1e24f86e1fe..b36b8ba65e7 100644 --- a/pkg/grpc/exec/exec.go +++ b/pkg/grpc/exec/exec.go @@ -18,16 +18,11 @@ import ( "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" readerexec "github.com/cilium/tetragon/pkg/reader/exec" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/wrapperspb" ) -var ( - nodeName = node.GetNodeNameForExport() -) - const ( ParentRefCnt = 0 ProcessRefCnt = 1 @@ -245,9 +240,8 @@ func (msg *MsgExecveEventUnix) HandleMessage() *tetragon.GetEventsResponse { if e := GetProcessExec(msg, true); e != nil { res = &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e}, + Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime), } } return res @@ -468,9 +462,8 @@ func (msg *MsgExitEventUnix) HandleMessage() *tetragon.GetEventsResponse { e := GetProcessExit(msg) if e != nil { res = &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e}, + Time: ktime.ToProto(msg.Common.Ktime), } } return res diff --git a/pkg/grpc/process_manager.go b/pkg/grpc/process_manager.go index 0474e0f320c..d789b71f772 100644 --- a/pkg/grpc/process_manager.go +++ b/pkg/grpc/process_manager.go @@ -22,8 +22,7 @@ import ( // ProcessManager maintains a cache of processes from tetragon exec events. type ProcessManager struct { - nodeName string - Server *server.Server + Server *server.Server // synchronize access to the listeners map. mux sync.Mutex listeners map[server.Listener]struct{} @@ -37,7 +36,6 @@ func NewProcessManager( hookRunner *rthooks.Runner, ) (*ProcessManager, error) { pm := &ProcessManager{ - nodeName: node.GetNodeNameForExport(), listeners: make(map[server.Listener]struct{}), } @@ -85,6 +83,7 @@ func (pm *ProcessManager) RemoveListener(listener server.Listener) { func (pm *ProcessManager) NotifyListener(original interface{}, processed *tetragon.GetEventsResponse) { pm.mux.Lock() defer pm.mux.Unlock() + node.SetCommonFields(processed) for l := range pm.listeners { l.Notify(processed) } diff --git a/pkg/grpc/process_manager_test.go b/pkg/grpc/process_manager_test.go index 06976e84bd3..a7af237fb49 100644 --- a/pkg/grpc/process_manager_test.go +++ b/pkg/grpc/process_manager_test.go @@ -11,11 +11,10 @@ import ( "testing" "time" - "github.com/cilium/tetragon/pkg/grpc/exec" - "github.com/cilium/tetragon/pkg/option" - "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/api/processapi" + "github.com/cilium/tetragon/pkg/grpc/exec" + "github.com/cilium/tetragon/pkg/option" "github.com/cilium/tetragon/pkg/process" "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/rthooks" @@ -212,18 +211,9 @@ func TestProcessManager_GetProcessExec(t *testing.T) { exec.GetProcessExec(pi, false).Process.BinaryProperties) } -func Test_getNodeNameForExport(t *testing.T) { - assert.NotEqual(t, "", node.GetNodeNameForExport()) // we should get the hostname here - assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name")) - assert.Equal(t, "from-node-name", node.GetNodeNameForExport()) - assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name")) - assert.Equal(t, "from-hubble-node-name", node.GetNodeNameForExport()) - assert.NoError(t, os.Unsetenv("NODE_NAME")) - assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME")) -} - func TestProcessManager_GetProcessID(t *testing.T) { assert.NoError(t, os.Setenv("NODE_NAME", "my-node")) + node.SetNodeName() err := process.InitCache(watcher.NewFakeK8sWatcher([]interface{}{}), 10) assert.NoError(t, err) diff --git a/pkg/grpc/test/test.go b/pkg/grpc/test/test.go index 8e554c61a3f..abd2e3b5ea6 100644 --- a/pkg/grpc/test/test.go +++ b/pkg/grpc/test/test.go @@ -11,14 +11,9 @@ import ( "github.com/cilium/tetragon/pkg/ktime" "github.com/cilium/tetragon/pkg/logger" "github.com/cilium/tetragon/pkg/process" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" ) -var ( - nodeName = node.GetNodeNameForExport() -) - type MsgTestEventUnix struct { Msg *testapi.MsgTestEvent } @@ -46,8 +41,7 @@ func (msg *MsgTestEventUnix) HandleMessage() *tetragon.GetEventsResponse { Arg2: msg.Msg.Arg2, Arg3: msg.Msg.Arg3, }}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Time: ktime.ToProto(msg.Msg.Common.Ktime), } default: logger.GetLogger().WithField("message", msg).Warn("HandleTestMessage: Unhandled event") diff --git a/pkg/grpc/tracing/tracing.go b/pkg/grpc/tracing/tracing.go index e0176435df7..d5aef59730c 100644 --- a/pkg/grpc/tracing/tracing.go +++ b/pkg/grpc/tracing/tracing.go @@ -24,17 +24,12 @@ import ( "github.com/cilium/tetragon/pkg/reader/bpf" "github.com/cilium/tetragon/pkg/reader/caps" "github.com/cilium/tetragon/pkg/reader/network" - "github.com/cilium/tetragon/pkg/reader/node" "github.com/cilium/tetragon/pkg/reader/notify" "github.com/cilium/tetragon/pkg/reader/path" "github.com/cilium/tetragon/pkg/tracingpolicy" "google.golang.org/protobuf/types/known/wrapperspb" ) -var ( - nodeName = node.GetNodeNameForExport() -) - func kprobeAction(act uint64) tetragon.KprobeAction { switch act { case tracingapi.ActionPost: @@ -576,9 +571,8 @@ func (msg *MsgGenericTracepointUnix) HandleMessage() *tetragon.GetEventsResponse } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -624,9 +618,8 @@ func (msg *MsgGenericKprobeUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -715,8 +708,7 @@ func (msg *MsgProcessLoaderUnix) HandleMessage() *tetragon.GetEventsResponse { } LoaderMetricInc(LoaderResolvedImm) return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k}, - NodeName: nodeName, + Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k}, } } @@ -823,9 +815,8 @@ func (msg *MsgGenericUprobeUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -867,9 +858,8 @@ func (msg *MsgGenericLsmUnix) HandleMessage() *tetragon.GetEventsResponse { return nil } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Msg.Common.Ktime), + Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k}, + Time: ktime.ToProto(msg.Msg.Common.Ktime), } } @@ -993,9 +983,8 @@ func (msg *MsgProcessThrottleUnix) HandleMessage() *tetragon.GetEventsResponse { Cgroup: msg.Cgroup, } return &tetragon.GetEventsResponse{ - Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event}, - NodeName: nodeName, - Time: ktime.ToProto(msg.Ktime), + Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event}, + Time: ktime.ToProto(msg.Ktime), } } diff --git a/pkg/process/process.go b/pkg/process/process.go index 1f2c4610005..03321669846 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -65,7 +65,6 @@ type ProcessInternal struct { } var ( - nodeName string procCache *Cache k8s watcher.K8sResourceWatcher ) @@ -81,7 +80,6 @@ func InitCache(w watcher.K8sResourceWatcher, size int) error { FreeCache() } - nodeName = node.GetNodeNameForExport() k8s = w procCache, err = NewCache(size) if err != nil { @@ -261,7 +259,7 @@ func UpdateEventProcessTid(process *tetragon.Process, tid *uint32) { } func GetProcessID(pid uint32, ktime uint64) string { - return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", nodeName, ktime, pid))) + return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", node.GetNodeNameForExport(), ktime, pid))) } func GetExecID(proc *tetragonAPI.MsgProcess) string { diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index 52c5512a4e2..ab8fc60a274 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -53,15 +53,16 @@ func (r *RateLimiter) reportRateLimitInfo(encoder encoder.EventEncoder) { case <-ticker.C: dropped := atomic.SwapUint64(&r.dropped, 0) if dropped > 0 { - err := encoder.Encode(&tetragon.GetEventsResponse{ + ev := tetragon.GetEventsResponse{ Event: &tetragon.GetEventsResponse_RateLimitInfo{ RateLimitInfo: &tetragon.RateLimitInfo{ NumberOfDroppedProcessEvents: dropped, }, }, - NodeName: node.GetNodeNameForExport(), - Time: timestamppb.New(time.Now()), - }) + Time: timestamppb.New(time.Now()), + } + node.SetCommonFields(&ev) + err := encoder.Encode(&ev) if err != nil { logger.GetLogger(). WithError(err). diff --git a/pkg/reader/node/node.go b/pkg/reader/node/node.go index 9db51ef61d4..2f5013f5724 100644 --- a/pkg/reader/node/node.go +++ b/pkg/reader/node/node.go @@ -6,18 +6,23 @@ package node import ( "os" + "github.com/cilium/tetragon/api/v1/tetragon" "github.com/cilium/tetragon/pkg/logger" ) -// getNodeNameForExport returns node name string for JSON export. It uses NODE_NAME -// env variable by default, which is also used by k8s watcher to watch for local pods: -// -// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32 -// -// Set HUBBLE_NODE_NAME to override the node_name field for JSON export. -func GetNodeNameForExport() string { +var ( + nodeName string +) + +func init() { + SetNodeName() +} + +// SetNodeName initializes the nodeName variable. It's defined separately from +// init() so that it can be called from unit tests. +func SetNodeName() { var err error - nodeName := os.Getenv("HUBBLE_NODE_NAME") + nodeName = os.Getenv("HUBBLE_NODE_NAME") if nodeName == "" { nodeName = os.Getenv("NODE_NAME") } @@ -27,5 +32,19 @@ func GetNodeNameForExport() string { logger.GetLogger().WithError(err).Warn("failed to retrieve hostname") } } +} + +// GetNodeNameForExport returns node name string for JSON export. It uses NODE_NAME +// env variable by default, which is also used by k8s watcher to watch for local pods: +// +// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32 +// +// Set HUBBLE_NODE_NAME to override the node_name field for JSON export. +func GetNodeNameForExport() string { return nodeName } + +// SetCommonFields set fields that are common in all the events. +func SetCommonFields(ev *tetragon.GetEventsResponse) { + ev.NodeName = nodeName +} diff --git a/pkg/reader/node/node_test.go b/pkg/reader/node/node_test.go new file mode 100644 index 00000000000..128cb9bdf77 --- /dev/null +++ b/pkg/reader/node/node_test.go @@ -0,0 +1,35 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package node + +import ( + "os" + "testing" + + "github.com/cilium/tetragon/api/v1/tetragon" + "github.com/stretchr/testify/assert" +) + +func TestGetNodeNameForExport(t *testing.T) { + assert.NotEqual(t, "", GetNodeNameForExport()) // we should get the hostname here + assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name")) + SetNodeName() + assert.Equal(t, "from-node-name", GetNodeNameForExport()) + assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name")) + SetNodeName() + assert.Equal(t, "from-hubble-node-name", GetNodeNameForExport()) + assert.NoError(t, os.Unsetenv("NODE_NAME")) + assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME")) +} + +func TestSetCommonFields(t *testing.T) { + ev := tetragon.GetEventsResponse{} + assert.Empty(t, ev.NodeName) + nodeName := "my-node-name" + assert.NoError(t, os.Setenv("NODE_NAME", nodeName)) + SetNodeName() + SetCommonFields(&ev) + assert.Equal(t, nodeName, ev.GetNodeName()) + assert.NoError(t, os.Unsetenv("NODE_NAME")) +}