Skip to content

Commit

Permalink
Centralize the logic to set node name
Browse files Browse the repository at this point in the history
Centralize the logic to set the NodeName fields for event to avoid code
duplication. Initialize the nodeName variable once in the node package
init() function.

Signed-off-by: Michi Mutsuzaki <[email protected]>
  • Loading branch information
michi-covalent committed Oct 22, 2024
1 parent e3de28e commit 8ec47ac
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 78 deletions.
10 changes: 3 additions & 7 deletions pkg/eventcache/eventcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/server"
)
Expand All @@ -25,8 +24,7 @@ const (
)

var (
cache *Cache
nodeName string
cache *Cache
)

type CacheObj struct {
Expand Down Expand Up @@ -145,9 +143,8 @@ func (ec *Cache) handleEvents() {

if event.msg.Notify() {
processedEvent := &tetragon.GetEventsResponse{
Event: event.event.Encapsulate(),
NodeName: nodeName,
Time: ktime.ToProto(event.timestamp),
Event: event.event.Encapsulate(),
Time: ktime.ToProto(event.timestamp),
}

ec.server.NotifyListeners(event.msg, processedEvent)
Expand Down Expand Up @@ -231,7 +228,6 @@ func NewWithTimer(s *server.Server, dur time.Duration) *Cache {
server: s,
dur: dur,
}
nodeName = node.GetNodeNameForExport()
go cache.loop()
return cache
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/ratelimit"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/rthooks"
"github.com/cilium/tetragon/pkg/server"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -171,6 +172,7 @@ func Test_rateLimitExport(t *testing.T) {
}
defer os.Unsetenv(hubbleNodeNameEnv)
}
node.SetNodeName()

tests := []struct {
name string
Expand Down
15 changes: 4 additions & 11 deletions pkg/grpc/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ import (
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
readerexec "github.com/cilium/tetragon/pkg/reader/exec"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
nodeName = node.GetNodeNameForExport()
)

const (
ParentRefCnt = 0
ProcessRefCnt = 1
Expand Down Expand Up @@ -245,9 +240,8 @@ func (msg *MsgExecveEventUnix) HandleMessage() *tetragon.GetEventsResponse {

if e := GetProcessExec(msg, true); e != nil {
res = &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e},
NodeName: nodeName,
Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e},
Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime),
}
}
return res
Expand Down Expand Up @@ -468,9 +462,8 @@ func (msg *MsgExitEventUnix) HandleMessage() *tetragon.GetEventsResponse {
e := GetProcessExit(msg)
if e != nil {
res = &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e},
NodeName: nodeName,
Time: ktime.ToProto(msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e},
Time: ktime.ToProto(msg.Common.Ktime),
}
}
return res
Expand Down
5 changes: 2 additions & 3 deletions pkg/grpc/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (

// ProcessManager maintains a cache of processes from tetragon exec events.
type ProcessManager struct {
nodeName string
Server *server.Server
Server *server.Server
// synchronize access to the listeners map.
mux sync.Mutex
listeners map[server.Listener]struct{}
Expand All @@ -37,7 +36,6 @@ func NewProcessManager(
hookRunner *rthooks.Runner,
) (*ProcessManager, error) {
pm := &ProcessManager{
nodeName: node.GetNodeNameForExport(),
listeners: make(map[server.Listener]struct{}),
}

Expand Down Expand Up @@ -85,6 +83,7 @@ func (pm *ProcessManager) RemoveListener(listener server.Listener) {
func (pm *ProcessManager) NotifyListener(original interface{}, processed *tetragon.GetEventsResponse) {
pm.mux.Lock()
defer pm.mux.Unlock()
node.SetCommonFields(processed)
for l := range pm.listeners {
l.Notify(processed)
}
Expand Down
16 changes: 3 additions & 13 deletions pkg/grpc/process_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"testing"
"time"

"github.com/cilium/tetragon/pkg/grpc/exec"
"github.com/cilium/tetragon/pkg/option"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/api/processapi"
"github.com/cilium/tetragon/pkg/grpc/exec"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/rthooks"
Expand Down Expand Up @@ -212,18 +211,9 @@ func TestProcessManager_GetProcessExec(t *testing.T) {
exec.GetProcessExec(pi, false).Process.BinaryProperties)
}

func Test_getNodeNameForExport(t *testing.T) {
assert.NotEqual(t, "", node.GetNodeNameForExport()) // we should get the hostname here
assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name"))
assert.Equal(t, "from-node-name", node.GetNodeNameForExport())
assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name"))
assert.Equal(t, "from-hubble-node-name", node.GetNodeNameForExport())
assert.NoError(t, os.Unsetenv("NODE_NAME"))
assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME"))
}

func TestProcessManager_GetProcessID(t *testing.T) {
assert.NoError(t, os.Setenv("NODE_NAME", "my-node"))
node.SetNodeName()

err := process.InitCache(watcher.NewFakeK8sWatcher([]interface{}{}), 10)
assert.NoError(t, err)
Expand Down
8 changes: 1 addition & 7 deletions pkg/grpc/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@ import (
"github.com/cilium/tetragon/pkg/ktime"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
)

var (
nodeName = node.GetNodeNameForExport()
)

type MsgTestEventUnix struct {
Msg *testapi.MsgTestEvent
}
Expand Down Expand Up @@ -46,8 +41,7 @@ func (msg *MsgTestEventUnix) HandleMessage() *tetragon.GetEventsResponse {
Arg2: msg.Msg.Arg2,
Arg3: msg.Msg.Arg3,
}},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
default:
logger.GetLogger().WithField("message", msg).Warn("HandleTestMessage: Unhandled event")
Expand Down
33 changes: 11 additions & 22 deletions pkg/grpc/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ import (
"github.com/cilium/tetragon/pkg/reader/bpf"
"github.com/cilium/tetragon/pkg/reader/caps"
"github.com/cilium/tetragon/pkg/reader/network"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/reader/path"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
nodeName = node.GetNodeNameForExport()
)

func kprobeAction(act uint64) tetragon.KprobeAction {
switch act {
case tracingapi.ActionPost:
Expand Down Expand Up @@ -576,9 +571,8 @@ func (msg *MsgGenericTracepointUnix) HandleMessage() *tetragon.GetEventsResponse
}

return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -624,9 +618,8 @@ func (msg *MsgGenericKprobeUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -715,8 +708,7 @@ func (msg *MsgProcessLoaderUnix) HandleMessage() *tetragon.GetEventsResponse {
}
LoaderMetricInc(LoaderResolvedImm)
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k},
NodeName: nodeName,
Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k},
}
}

Expand Down Expand Up @@ -823,9 +815,8 @@ func (msg *MsgGenericUprobeUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -867,9 +858,8 @@ func (msg *MsgGenericLsmUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -993,9 +983,8 @@ func (msg *MsgProcessThrottleUnix) HandleMessage() *tetragon.GetEventsResponse {
Cgroup: msg.Cgroup,
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event},
NodeName: nodeName,
Time: ktime.ToProto(msg.Ktime),
Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event},
Time: ktime.ToProto(msg.Ktime),
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type ProcessInternal struct {
}

var (
nodeName string
procCache *Cache
k8s watcher.K8sResourceWatcher
)
Expand All @@ -81,7 +80,6 @@ func InitCache(w watcher.K8sResourceWatcher, size int) error {
FreeCache()
}

nodeName = node.GetNodeNameForExport()
k8s = w
procCache, err = NewCache(size)
if err != nil {
Expand Down Expand Up @@ -261,7 +259,7 @@ func UpdateEventProcessTid(process *tetragon.Process, tid *uint32) {
}

func GetProcessID(pid uint32, ktime uint64) string {
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", nodeName, ktime, pid)))
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", node.GetNodeNameForExport(), ktime, pid)))
}

func GetExecID(proc *tetragonAPI.MsgProcess) string {
Expand Down
9 changes: 5 additions & 4 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func (r *RateLimiter) reportRateLimitInfo(encoder encoder.EventEncoder) {
case <-ticker.C:
dropped := atomic.SwapUint64(&r.dropped, 0)
if dropped > 0 {
err := encoder.Encode(&tetragon.GetEventsResponse{
ev := tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_RateLimitInfo{
RateLimitInfo: &tetragon.RateLimitInfo{
NumberOfDroppedProcessEvents: dropped,
},
},
NodeName: node.GetNodeNameForExport(),
Time: timestamppb.New(time.Now()),
})
Time: timestamppb.New(time.Now()),
}
node.SetCommonFields(&ev)
err := encoder.Encode(&ev)
if err != nil {
logger.GetLogger().
WithError(err).
Expand Down
35 changes: 27 additions & 8 deletions pkg/reader/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ package node
import (
"os"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/logger"
)

// getNodeNameForExport returns node name string for JSON export. It uses NODE_NAME
// env variable by default, which is also used by k8s watcher to watch for local pods:
//
// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32
//
// Set HUBBLE_NODE_NAME to override the node_name field for JSON export.
func GetNodeNameForExport() string {
var (
nodeName string
)

func init() {
SetNodeName()
}

// SetNodeName initializes the nodeName variable. It's defined separately from
// init() so that it can be called from unit tests.
func SetNodeName() {
var err error
nodeName := os.Getenv("HUBBLE_NODE_NAME")
nodeName = os.Getenv("HUBBLE_NODE_NAME")
if nodeName == "" {
nodeName = os.Getenv("NODE_NAME")
}
Expand All @@ -27,5 +32,19 @@ func GetNodeNameForExport() string {
logger.GetLogger().WithError(err).Warn("failed to retrieve hostname")
}
}
}

// GetNodeNameForExport returns node name string for JSON export. It uses NODE_NAME
// env variable by default, which is also used by k8s watcher to watch for local pods:
//
// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32
//
// Set HUBBLE_NODE_NAME to override the node_name field for JSON export.
func GetNodeNameForExport() string {
return nodeName
}

// SetCommonFields set fields that are common in all the events.
func SetCommonFields(ev *tetragon.GetEventsResponse) {
ev.NodeName = nodeName
}
Loading

0 comments on commit 8ec47ac

Please sign in to comment.