Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize the logic to set node name #3024

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions pkg/eventcache/eventcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/server"
)
Expand All @@ -25,8 +24,7 @@ const (
)

var (
cache *Cache
nodeName string
cache *Cache
)

type CacheObj struct {
Expand Down Expand Up @@ -145,9 +143,8 @@ func (ec *Cache) handleEvents() {

if event.msg.Notify() {
processedEvent := &tetragon.GetEventsResponse{
Event: event.event.Encapsulate(),
NodeName: nodeName,
Time: ktime.ToProto(event.timestamp),
Event: event.event.Encapsulate(),
Time: ktime.ToProto(event.timestamp),
}

ec.notifier.NotifyListener(event.msg, processedEvent)
Expand Down Expand Up @@ -231,7 +228,6 @@ func NewWithTimer(n server.Notifier, dur time.Duration) *Cache {
notifier: n,
dur: dur,
}
nodeName = node.GetNodeNameForExport()
go cache.loop()
return cache
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/encoder"
"github.com/cilium/tetragon/pkg/ratelimit"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/rthooks"
"github.com/cilium/tetragon/pkg/server"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -171,6 +172,7 @@ func Test_rateLimitExport(t *testing.T) {
}
defer os.Unsetenv(hubbleNodeNameEnv)
}
node.SetNodeName()

tests := []struct {
name string
Expand Down
15 changes: 4 additions & 11 deletions pkg/grpc/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ import (
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
readerexec "github.com/cilium/tetragon/pkg/reader/exec"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
nodeName = node.GetNodeNameForExport()
)

const (
ParentRefCnt = 0
ProcessRefCnt = 1
Expand Down Expand Up @@ -245,9 +240,8 @@ func (msg *MsgExecveEventUnix) HandleMessage() *tetragon.GetEventsResponse {

if e := GetProcessExec(msg, true); e != nil {
res = &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e},
NodeName: nodeName,
Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessExec{ProcessExec: e},
Time: ktime.ToProto(msg.Unix.Msg.Common.Ktime),
}
}
return res
Expand Down Expand Up @@ -468,9 +462,8 @@ func (msg *MsgExitEventUnix) HandleMessage() *tetragon.GetEventsResponse {
e := GetProcessExit(msg)
if e != nil {
res = &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e},
NodeName: nodeName,
Time: ktime.ToProto(msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessExit{ProcessExit: e},
Time: ktime.ToProto(msg.Common.Ktime),
}
}
return res
Expand Down
5 changes: 2 additions & 3 deletions pkg/grpc/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (

// ProcessManager maintains a cache of processes from tetragon exec events.
type ProcessManager struct {
nodeName string
Server *server.Server
Server *server.Server
// synchronize access to the listeners map.
mux sync.Mutex
listeners map[server.Listener]struct{}
Expand All @@ -37,7 +36,6 @@ func NewProcessManager(
hookRunner *rthooks.Runner,
) (*ProcessManager, error) {
pm := &ProcessManager{
nodeName: node.GetNodeNameForExport(),
listeners: make(map[server.Listener]struct{}),
}

Expand Down Expand Up @@ -85,6 +83,7 @@ func (pm *ProcessManager) RemoveListener(listener server.Listener) {
func (pm *ProcessManager) NotifyListener(original interface{}, processed *tetragon.GetEventsResponse) {
pm.mux.Lock()
defer pm.mux.Unlock()
node.SetCommonFields(processed)
for l := range pm.listeners {
l.Notify(processed)
}
Expand Down
16 changes: 3 additions & 13 deletions pkg/grpc/process_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ import (
"testing"
"time"

"github.com/cilium/tetragon/pkg/grpc/exec"
"github.com/cilium/tetragon/pkg/option"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/api/processapi"
"github.com/cilium/tetragon/pkg/grpc/exec"
"github.com/cilium/tetragon/pkg/option"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/rthooks"
Expand Down Expand Up @@ -212,18 +211,9 @@ func TestProcessManager_GetProcessExec(t *testing.T) {
exec.GetProcessExec(pi, false).Process.BinaryProperties)
}

func Test_getNodeNameForExport(t *testing.T) {
assert.NotEqual(t, "", node.GetNodeNameForExport()) // we should get the hostname here
assert.NoError(t, os.Setenv("NODE_NAME", "from-node-name"))
assert.Equal(t, "from-node-name", node.GetNodeNameForExport())
assert.NoError(t, os.Setenv("HUBBLE_NODE_NAME", "from-hubble-node-name"))
assert.Equal(t, "from-hubble-node-name", node.GetNodeNameForExport())
assert.NoError(t, os.Unsetenv("NODE_NAME"))
assert.NoError(t, os.Unsetenv("HUBBLE_NODE_NAME"))
}

func TestProcessManager_GetProcessID(t *testing.T) {
assert.NoError(t, os.Setenv("NODE_NAME", "my-node"))
node.SetNodeName()

err := process.InitCache(watcher.NewFakeK8sWatcher([]interface{}{}), 10)
assert.NoError(t, err)
Expand Down
8 changes: 1 addition & 7 deletions pkg/grpc/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@ import (
"github.com/cilium/tetragon/pkg/ktime"
"github.com/cilium/tetragon/pkg/logger"
"github.com/cilium/tetragon/pkg/process"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
)

var (
nodeName = node.GetNodeNameForExport()
)

type MsgTestEventUnix struct {
Msg *testapi.MsgTestEvent
}
Expand Down Expand Up @@ -46,8 +41,7 @@ func (msg *MsgTestEventUnix) HandleMessage() *tetragon.GetEventsResponse {
Arg2: msg.Msg.Arg2,
Arg3: msg.Msg.Arg3,
}},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
default:
logger.GetLogger().WithField("message", msg).Warn("HandleTestMessage: Unhandled event")
Expand Down
33 changes: 11 additions & 22 deletions pkg/grpc/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,12 @@ import (
"github.com/cilium/tetragon/pkg/reader/bpf"
"github.com/cilium/tetragon/pkg/reader/caps"
"github.com/cilium/tetragon/pkg/reader/network"
"github.com/cilium/tetragon/pkg/reader/node"
"github.com/cilium/tetragon/pkg/reader/notify"
"github.com/cilium/tetragon/pkg/reader/path"
"github.com/cilium/tetragon/pkg/tracingpolicy"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
nodeName = node.GetNodeNameForExport()
)

func kprobeAction(act uint64) tetragon.KprobeAction {
switch act {
case tracingapi.ActionPost:
Expand Down Expand Up @@ -576,9 +571,8 @@ func (msg *MsgGenericTracepointUnix) HandleMessage() *tetragon.GetEventsResponse
}

return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessTracepoint{ProcessTracepoint: tetragonEvent},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -624,9 +618,8 @@ func (msg *MsgGenericKprobeUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessKprobe{ProcessKprobe: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -715,8 +708,7 @@ func (msg *MsgProcessLoaderUnix) HandleMessage() *tetragon.GetEventsResponse {
}
LoaderMetricInc(LoaderResolvedImm)
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k},
NodeName: nodeName,
Event: &tetragon.GetEventsResponse_ProcessLoader{ProcessLoader: k},
}
}

Expand Down Expand Up @@ -823,9 +815,8 @@ func (msg *MsgGenericUprobeUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessUprobe{ProcessUprobe: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -867,9 +858,8 @@ func (msg *MsgGenericLsmUnix) HandleMessage() *tetragon.GetEventsResponse {
return nil
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k},
NodeName: nodeName,
Time: ktime.ToProto(msg.Msg.Common.Ktime),
Event: &tetragon.GetEventsResponse_ProcessLsm{ProcessLsm: k},
Time: ktime.ToProto(msg.Msg.Common.Ktime),
}
}

Expand Down Expand Up @@ -993,9 +983,8 @@ func (msg *MsgProcessThrottleUnix) HandleMessage() *tetragon.GetEventsResponse {
Cgroup: msg.Cgroup,
}
return &tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event},
NodeName: nodeName,
Time: ktime.ToProto(msg.Ktime),
Event: &tetragon.GetEventsResponse_ProcessThrottle{ProcessThrottle: event},
Time: ktime.ToProto(msg.Ktime),
}
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type ProcessInternal struct {
}

var (
nodeName string
procCache *Cache
k8s watcher.K8sResourceWatcher
)
Expand All @@ -81,7 +80,6 @@ func InitCache(w watcher.K8sResourceWatcher, size int) error {
FreeCache()
}

nodeName = node.GetNodeNameForExport()
k8s = w
procCache, err = NewCache(size)
if err != nil {
Expand Down Expand Up @@ -261,7 +259,7 @@ func UpdateEventProcessTid(process *tetragon.Process, tid *uint32) {
}

func GetProcessID(pid uint32, ktime uint64) string {
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", nodeName, ktime, pid)))
return base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%d:%d", node.GetNodeNameForExport(), ktime, pid)))
}

func GetExecID(proc *tetragonAPI.MsgProcess) string {
Expand Down
9 changes: 5 additions & 4 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func (r *RateLimiter) reportRateLimitInfo(encoder encoder.EventEncoder) {
case <-ticker.C:
dropped := atomic.SwapUint64(&r.dropped, 0)
if dropped > 0 {
err := encoder.Encode(&tetragon.GetEventsResponse{
ev := tetragon.GetEventsResponse{
Event: &tetragon.GetEventsResponse_RateLimitInfo{
RateLimitInfo: &tetragon.RateLimitInfo{
NumberOfDroppedProcessEvents: dropped,
},
},
NodeName: node.GetNodeNameForExport(),
Time: timestamppb.New(time.Now()),
})
Time: timestamppb.New(time.Now()),
}
node.SetCommonFields(&ev)
err := encoder.Encode(&ev)
if err != nil {
logger.GetLogger().
WithError(err).
Expand Down
35 changes: 27 additions & 8 deletions pkg/reader/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ package node
import (
"os"

"github.com/cilium/tetragon/api/v1/tetragon"
"github.com/cilium/tetragon/pkg/logger"
)

// getNodeNameForExport returns node name string for JSON export. It uses NODE_NAME
// env variable by default, which is also used by k8s watcher to watch for local pods:
//
// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32
//
// Set HUBBLE_NODE_NAME to override the node_name field for JSON export.
func GetNodeNameForExport() string {
var (
nodeName string
)

func init() {
SetNodeName()
}

// SetNodeName initializes the nodeName variable. It's defined separately from
// init() so that it can be called from unit tests.
func SetNodeName() {
var err error
nodeName := os.Getenv("HUBBLE_NODE_NAME")
nodeName = os.Getenv("HUBBLE_NODE_NAME")
if nodeName == "" {
nodeName = os.Getenv("NODE_NAME")
}
Expand All @@ -27,5 +32,19 @@ func GetNodeNameForExport() string {
logger.GetLogger().WithError(err).Warn("failed to retrieve hostname")
}
}
}

// GetNodeNameForExport returns node name string for JSON export. It uses NODE_NAME
// env variable by default, which is also used by k8s watcher to watch for local pods:
//
// https://github.com/cilium/tetragon/blob/a7be620c9fecdc2b693e3633506aca35d46cd3b2/pkg/grpc/watcher.go#L32
//
// Set HUBBLE_NODE_NAME to override the node_name field for JSON export.
func GetNodeNameForExport() string {
return nodeName
}

// SetCommonFields set fields that are common in all the events.
func SetCommonFields(ev *tetragon.GetEventsResponse) {
ev.NodeName = nodeName
}
Loading
Loading