Skip to content

Commit

Permalink
Merge pull request #79 from coroot/dns_tracking
Browse files Browse the repository at this point in the history
Add DNS protocol tracing
  • Loading branch information
def authored May 7, 2024
2 parents 58f0336 + 41714cd commit 827e918
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 38 deletions.
57 changes: 52 additions & 5 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ type Container struct {
connectionsByPidFd map[PidFd]*ActiveConnection
retransmits map[AddrPair]int64 // dst:actual_dst -> count

l7Stats L7Stats
l7Stats L7Stats
dnsStats *L7Metrics

oomKills int

Expand Down Expand Up @@ -157,6 +158,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
connectionsByPidFd: map[PidFd]*ActiveConnection{},
retransmits: map[AddrPair]int64{},
l7Stats: L7Stats{},
dnsStats: &L7Metrics{},

mounts: map[string]proc.MountInfo{},

Expand Down Expand Up @@ -340,7 +342,12 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
for appType := range appTypes {
ch <- gauge(metrics.ApplicationType, 1, appType)
}

if c.dnsStats.Requests != nil {
c.dnsStats.Requests.Collect(ch)
}
if c.dnsStats.Latency != nil {
c.dnsStats.Latency.Collect(ch)
}
c.l7Stats.collect(ch)

if !*flags.DisablePinger {
Expand Down Expand Up @@ -561,16 +568,55 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
return true
}

func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
func (c *Container) onDNSRequest(r *l7.RequestData) map[netaddr.IP]string {
status := r.Status.DNS()
if status == "" {
return nil
}
t, fqdn, ips := l7.ParseDns(r.Payload)
if t == "" {
return nil
}
if c.dnsStats.Requests == nil {
dnsReq := L7Requests[l7.ProtocolDNS]
c.dnsStats.Requests = prometheus.NewCounterVec(
prometheus.CounterOpts{Name: dnsReq.Name, Help: dnsReq.Help},
[]string{"request_type", "status"},
)
}
if m, _ := c.dnsStats.Requests.GetMetricWithLabelValues(t, status); m != nil {
m.Inc()
}
if r.Duration != 0 {
if c.dnsStats.Latency == nil {
dnsLatency := L7Latency[l7.ProtocolDNS]
c.dnsStats.Latency = prometheus.NewHistogram(prometheus.HistogramOpts{Name: dnsLatency.Name, Help: dnsLatency.Help})
}
c.dnsStats.Latency.Observe(r.Duration.Seconds())
}
ip2fqdn := map[netaddr.IP]string{}
if fqdn != "" {
for _, ip := range ips {
ip2fqdn[ip] = fqdn
}
}
return ip2fqdn
}

func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) map[netaddr.IP]string {
c.lock.Lock()
defer c.lock.Unlock()

if r.Protocol == l7.ProtocolDNS {
return c.onDNSRequest(r)
}

conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
if conn == nil {
return
return nil
}
if timestamp != 0 && conn.Timestamp != timestamp {
return
return nil
}
stats := c.l7Stats.get(r.Protocol, conn.Dest, conn.ActualDest)
trace := tracing.NewTrace(string(c.id), conn.ActualDest)
Expand Down Expand Up @@ -625,6 +671,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
case l7.ProtocolDubbo2:
stats.observe(r.Status.String(), "", r.Duration)
}
return nil
}

func (c *Container) onRetransmit(srcDst AddrPair) bool {
Expand Down
4 changes: 4 additions & 0 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var metrics = struct {
JvmGCTime *prometheus.Desc
JvmSafepointTime *prometheus.Desc
JvmSafepointSyncTime *prometheus.Desc
Ip2Fqdn *prometheus.Desc
}{
ContainerInfo: metric("container_info", "Meta information about the container", "image"),

Expand Down Expand Up @@ -86,6 +87,7 @@ var metrics = struct {
JvmGCTime: metric("container_jvm_gc_time_seconds", "Time spent in the given JVM garbage collector in seconds", "jvm", "gc"),
JvmSafepointTime: metric("container_jvm_safepoint_time_seconds", "Time the application has been stopped for safepoint operations in seconds", "jvm"),
JvmSafepointSyncTime: metric("container_jvm_safepoint_sync_time_seconds", "Time spent getting to safepoints in seconds", "jvm"),
Ip2Fqdn: metric("ip_to_fqdn", "Mapping IP addresses to FQDNs based on DNS requests initiated by containers", "ip", "fqdn"),
}

var (
Expand All @@ -101,6 +103,7 @@ var (
l7.ProtocolRabbitmq: {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
l7.ProtocolNats: {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_total", Help: "Total number of outbound DUBBO requests"},
l7.ProtocolDNS: {Name: "container_dns_requests_total", Help: "Total number of outbound DNS requests"},
}
L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
l7.ProtocolHTTP: {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
Expand All @@ -112,6 +115,7 @@ var (
l7.ProtocolKafka: {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
l7.ProtocolDubbo2: {Name: "container_dubbo_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DUBBO request"},
l7.ProtocolDNS: {Name: "container_dns_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound DNS request"},
}
)

Expand Down
41 changes: 37 additions & 4 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/coroot/coroot-node-agent/cgroup"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/coroot/coroot-node-agent/proc"
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netns"
"inet.af/netaddr"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -42,6 +44,8 @@ type Registry struct {
containersById map[ContainerID]*Container
containersByCgroupId map[string]*Container
containersByPid map[uint32]*Container
ip2fqdn map[netaddr.IP]string
ip2fqdnLock sync.Mutex

processInfoCh chan<- ProcessInfo
}
Expand Down Expand Up @@ -97,12 +101,15 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
containersById: map[ContainerID]*Container{},
containersByCgroupId: map[string]*Container{},
containersByPid: map[uint32]*Container{},
ip2fqdn: map[netaddr.IP]string{},

processInfoCh: processInfoCh,

tracer: ebpftracer.NewTracer(kernelVersion, *flags.DisableL7Tracing),
}

if err = reg.Register(r); err != nil {
return nil, err
}
go r.handleEvents(r.events)
if err = r.tracer.Run(r.events); err != nil {
close(r.events)
Expand All @@ -112,6 +119,18 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string, processInfoCh
return r, nil
}

func (r *Registry) Describe(ch chan<- *prometheus.Desc) {
ch <- metrics.Ip2Fqdn
}

func (r *Registry) Collect(ch chan<- prometheus.Metric) {
r.ip2fqdnLock.Lock()
defer r.ip2fqdnLock.Unlock()
for ip, fqdn := range r.ip2fqdn {
ch <- gauge(metrics.Ip2Fqdn, 1, ip.String(), fqdn)
}
}

func (r *Registry) Close() {
r.tracer.Close()
close(r.events)
Expand All @@ -137,11 +156,14 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
c.onProcessExit(pid, false)
}
}

activeIPs := map[netaddr.IP]struct{}{}
for id, c := range r.containersById {
if !c.Dead(now) {
continue
}
for dst := range c.connectLastAttempt {
activeIPs[dst.IP()] = struct{}{}
}
klog.Infoln("deleting dead container:", id)
for cg, cc := range r.containersByCgroupId {
if cc == c {
Expand All @@ -159,7 +181,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
delete(r.containersById, id)
c.Close()
}

r.ip2fqdnLock.Lock()
for ip := range r.ip2fqdn {
if _, ok := activeIPs[ip]; !ok {
delete(r.ip2fqdn, ip)
}
}
r.ip2fqdnLock.Unlock()
case e, more := <-ch:
if !more {
return
Expand Down Expand Up @@ -237,7 +265,12 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
continue
}
if c := r.containersByPid[e.Pid]; c != nil {
c.onL7Request(e.Pid, e.Fd, e.Timestamp, e.L7Request)
ip2fqdn := c.onL7Request(e.Pid, e.Fd, e.Timestamp, e.L7Request)
r.ip2fqdnLock.Lock()
for ip, fqdn := range ip2fqdn {
r.ip2fqdn[ip] = fqdn
}
r.ip2fqdnLock.Unlock()
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

58 changes: 58 additions & 0 deletions ebpftracer/ebpf/l7/dns.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#define DNS_QR_RESPONSE 0b10000000
#define DNS_OPCODE 0b01111000
#define DNS_Z 0b11110000
#define DNS_RCODE 0b00001111

struct dns_header {
__s16 id;
__u8 bits0;
__u8 bits1;
__s16 qdcount;
};

static __always_inline
int is_dns_request(char *buf, __u64 buf_size, __s16 *stream_id) {
struct dns_header h = {};
if (buf_size < sizeof(h)) {
return 0;
}
bpf_read(buf, h);
if (h.bits0 & DNS_QR_RESPONSE) {
return 0;
}
if (h.bits0 & DNS_OPCODE) {
return 0;
}
h.qdcount = bpf_ntohs(h.qdcount);

if (h.qdcount != 1) {
return 0;
}
*stream_id = h.id;
return 1;
}

static __always_inline
int is_dns_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *status) {
struct dns_header h = {};
if (buf_size < sizeof(h)) {
return 0;
}
bpf_read(buf, h);
if (!(h.bits0 & DNS_QR_RESPONSE)) {
return 0;
}
if (h.bits0 & DNS_OPCODE) {
return 0;
}
if (!(h.bits1 & DNS_Z)) {
return 0;
}
h.qdcount = bpf_ntohs(h.qdcount);
if (h.qdcount != 1) {
return 0;
}
*status = h.bits1 & DNS_RCODE;
*stream_id = h.id;
return 1;
}
43 changes: 42 additions & 1 deletion ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#define PROTOCOL_NATS 10
#define PROTOCOL_HTTP2 11
#define PROTOCOL_DUBBO2 12
#define PROTOCOL_DNS 13

#define STATUS_UNKNOWN 0
#define STATUS_OK 200
Expand Down Expand Up @@ -51,6 +52,7 @@
#include "nats.c"
#include "http2.c"
#include "dubbo2.c"
#include "dns.c"

struct l7_event {
__u64 fd;
Expand Down Expand Up @@ -155,6 +157,9 @@ struct user_msghdr {
int msg_namelen;
struct iovec *msg_iov;
__u64 msg_iovlen;
void *msg_control;
__u64 msg_controllen;
__u32 msg_flags;
};

static inline __attribute__((__always_inline__))
Expand Down Expand Up @@ -315,6 +320,8 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
return 0;
} else if (is_dubbo2_request(payload, size)) {
req->protocol = PROTOCOL_DUBBO2;
} else if (is_dns_request(payload, size, &k.stream_id)) {
req->protocol = PROTOCOL_DNS;
}

if (req->protocol == PROTOCOL_UNKNOWN) {
Expand Down Expand Up @@ -405,7 +412,18 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
int response = 0;
if (!req) {
if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
if (is_dns_response(payload, ret, &k.stream_id, &e->status)) {
req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (!req) {
return 0;
}
e->protocol = PROTOCOL_DNS;
e->duration = bpf_ktime_get_ns() - req->ns;
e->payload_size = ret;
COPY_PAYLOAD(e->payload, ret, payload);
send_event(ctx, e, k.pid, k.fd);
return 0;
} else if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (!req) {
return 0;
Expand Down Expand Up @@ -493,6 +511,29 @@ int sys_enter_sendmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
return trace_enter_write(ctx, ctx->fd, 0, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
}

struct mmsghdr {
struct user_msghdr msg_hdr;
__u32 msg_len;
};

SEC("tracepoint/syscalls/sys_enter_sendmmsg")
int sys_enter_sendmmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
__u64 offset = 0;
#pragma unroll
for (int i = 0; i <= 1; i++) {
if (i >= ctx->size) {
break;
}
struct mmsghdr h = {};
if (bpf_probe_read(&h , sizeof(h), (void *)(ctx->buf + offset))) {
return 0;
}
offset += sizeof(h);
trace_enter_write(ctx, ctx->fd, 0, (char*)h.msg_hdr.msg_iov, 0, h.msg_hdr.msg_iovlen);
}
return 0;
}

SEC("tracepoint/syscalls/sys_enter_sendto")
int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
Expand Down
Loading

0 comments on commit 827e918

Please sign in to comment.