Skip to content

Commit

Permalink
Merge pull request #107 from coroot/tcp_connection_time
Browse files Browse the repository at this point in the history
add the `container_net_tcp_connection_time_seconds_total` metric
  • Loading branch information
def authored Jul 9, 2024
2 parents 2d816cb + f214737 commit 05c1e51
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 50 deletions.
55 changes: 34 additions & 21 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type PidFd struct {
Fd uint64
}

type ConnectionStats struct {
Count int64
TotalTime time.Duration
Retransmissions int64
}

type Container struct {
id ContainerID
cgroup *cgroup.Cgroup
Expand All @@ -110,12 +116,11 @@ type Container struct {
listens map[netaddr.IPPort]map[uint32]*ListenDetails
ipsByNs map[string][]netaddr.IP

connectsSuccessful map[AddrPair]int64 // dst:actual_dst -> count
connectsFailed map[netaddr.IPPort]int64 // dst -> count
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
connectsSuccessful map[AddrPair]*ConnectionStats // dst:actual_dst -> count
connectsFailed map[netaddr.IPPort]int64 // dst -> count
connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
connectionsActive map[AddrPair]*ActiveConnection
connectionsByPidFd map[PidFd]*ActiveConnection
retransmits map[AddrPair]int64 // dst:actual_dst -> count

l7Stats L7Stats
dnsStats *L7Metrics
Expand Down Expand Up @@ -153,12 +158,11 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
listens: map[netaddr.IPPort]map[uint32]*ListenDetails{},
ipsByNs: map[string][]netaddr.IP{},

connectsSuccessful: map[AddrPair]int64{},
connectsSuccessful: map[AddrPair]*ConnectionStats{},
connectsFailed: map[netaddr.IPPort]int64{},
connectLastAttempt: map[netaddr.IPPort]time.Time{},
connectionsActive: map[AddrPair]*ActiveConnection{},
connectionsByPidFd: map[PidFd]*ActiveConnection{},
retransmits: map[AddrPair]int64{},
l7Stats: L7Stats{},
dnsStats: &L7Metrics{},

Expand Down Expand Up @@ -289,14 +293,15 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
}
}

for d, count := range c.connectsSuccessful {
ch <- counter(metrics.NetConnectsSuccessful, float64(count), d.src.String(), d.dst.String())
for d, stats := range c.connectsSuccessful {
ch <- counter(metrics.NetConnectionsSuccessful, float64(stats.Count), d.src.String(), d.dst.String())
ch <- counter(metrics.NetConnectionsTotalTime, stats.TotalTime.Seconds(), d.src.String(), d.dst.String())
if stats.Retransmissions > 0 {
ch <- counter(metrics.NetRetransmits, float64(stats.Retransmissions), d.src.String(), d.dst.String())
}
}
for dst, count := range c.connectsFailed {
ch <- counter(metrics.NetConnectsFailed, float64(count), dst.String())
}
for d, count := range c.retransmits {
ch <- counter(metrics.NetRetransmits, float64(count), d.src.String(), d.dst.String())
ch <- counter(metrics.NetConnectionsFailed, float64(count), dst.String())
}

connections := map[AddrPair]int{}
Expand Down Expand Up @@ -489,7 +494,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
}
}

func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool) {
func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, timestamp uint64, failed bool, duration time.Duration) {
if common.PortFilter.ShouldBeSkipped(dst.Port()) {
return
}
Expand Down Expand Up @@ -521,7 +526,14 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
if failed {
c.connectsFailed[dst]++
} else {
c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
key := AddrPair{src: dst, dst: *actualDst}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
stats.Count++
stats.TotalTime += duration
connection := &ActiveConnection{
Dest: dst,
ActualDest: *actualDst,
Expand Down Expand Up @@ -682,14 +694,20 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
return nil
}

func (c *Container) onRetransmit(srcDst AddrPair) bool {
func (c *Container) onRetransmission(srcDst AddrPair) bool {
c.lock.Lock()
defer c.lock.Unlock()
conn, ok := c.connectionsActive[srcDst]
if !ok {
return false
}
c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
key := AddrPair{src: srcDst.dst, dst: conn.ActualDest}
stats := c.connectsSuccessful[key]
if stats == nil {
stats = &ConnectionStats{}
c.connectsSuccessful[key] = stats
}
stats.Retransmissions++
return true
}

Expand Down Expand Up @@ -987,11 +1005,6 @@ func (c *Container) gc(now time.Time) {
delete(c.connectsSuccessful, d)
}
}
for d := range c.retransmits {
if d.src == dst {
delete(c.retransmits, d)
}
}
c.l7Stats.delete(dst)
}
}
Expand Down
26 changes: 14 additions & 12 deletions containers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ var metrics = struct {
DiskWriteOps *prometheus.Desc
DiskWriteBytes *prometheus.Desc

NetListenInfo *prometheus.Desc
NetConnectsSuccessful *prometheus.Desc
NetConnectsFailed *prometheus.Desc
NetConnectionsActive *prometheus.Desc
NetRetransmits *prometheus.Desc
NetLatency *prometheus.Desc
NetListenInfo *prometheus.Desc
NetConnectionsSuccessful *prometheus.Desc
NetConnectionsTotalTime *prometheus.Desc
NetConnectionsFailed *prometheus.Desc
NetConnectionsActive *prometheus.Desc
NetRetransmits *prometheus.Desc
NetLatency *prometheus.Desc

LogMessages *prometheus.Desc

Expand Down Expand Up @@ -70,12 +71,13 @@ var metrics = struct {
DiskWriteOps: metric("container_resources_disk_writes_total", "Total number of writes completed successfully by the container", "mount_point", "device", "volume"),
DiskWriteBytes: metric("container_resources_disk_written_bytes_total", "Total number of bytes written to the disk by the container", "mount_point", "device", "volume"),

NetListenInfo: metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
NetConnectsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
NetConnectsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),
NetListenInfo: metric("container_net_tcp_listen_info", "Listen address of the container", "listen_addr", "proxy"),
NetConnectionsSuccessful: metric("container_net_tcp_successful_connects_total", "Total number of successful TCP connects", "destination", "actual_destination"),
NetConnectionsTotalTime: metric("container_net_tcp_connection_time_seconds_total", "Time spent on TCP connections", "destination", "actual_destination"),
NetConnectionsFailed: metric("container_net_tcp_failed_connects_total", "Total number of failed TCP connects", "destination"),
NetConnectionsActive: metric("container_net_tcp_active_connections", "Number of active outbound connections used by the container", "destination", "actual_destination"),
NetRetransmits: metric("container_net_tcp_retransmits_total", "Total number of retransmitted TCP segments", "destination", "actual_destination"),
NetLatency: metric("container_net_latency_seconds", "Round-trip time between the container and a remote IP", "destination_ip"),

LogMessages: metric("container_log_messages_total", "Number of messages grouped by the automatically extracted repeated pattern", "source", "level", "pattern_hash", "sample"),

Expand Down
6 changes: 3 additions & 3 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,14 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {

case ebpftracer.EventTypeConnectionOpen:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false)
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, e.Timestamp, false, e.Duration)
c.attachTlsUprobes(r.tracer, e.Pid)
} else {
klog.Infoln("TCP connection from unknown container", e)
}
case ebpftracer.EventTypeConnectionError:
if c := r.getOrCreateContainer(e.Pid); c != nil {
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true)
c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, 0, true, e.Duration)
} else {
klog.Infoln("TCP connection error from unknown container", e)
}
Expand All @@ -256,7 +256,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
case ebpftracer.EventTypeTCPRetransmit:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
for _, c := range r.containersById {
if c.onRetransmit(srcDst) {
if c.onRetransmission(srcDst) {
break
}
}
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

24 changes: 18 additions & 6 deletions ebpftracer/ebpf/tcp/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
struct tcp_event {
__u64 fd;
__u64 timestamp;
__u64 duration;
__u32 type;
__u32 pid;
__u16 sport;
Expand Down Expand Up @@ -53,12 +54,19 @@ struct sk_info {
__u64 fd;
__u32 pid;
};

struct conn_info {
__u64 fd;
__u64 ts;
__u32 pid;
};

struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(key_size, sizeof(void *));
__uint(value_size, sizeof(struct sk_info));
__uint(value_size, sizeof(struct conn_info));
__uint(max_entries, 10240);
} sk_info SEC(".maps");
} conn_info SEC(".maps");

struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
Expand Down Expand Up @@ -86,20 +94,22 @@ int inet_sock_set_state(void *ctx)
if (!fdp) {
return 0;
}
struct sk_info i = {};
struct conn_info i = {};
i.pid = pid;
i.ts = bpf_ktime_get_ns();
i.fd = *fdp;
bpf_map_delete_elem(&fd_by_pid_tgid, &id);
bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
bpf_map_update_elem(&conn_info, &args.skaddr, &i, BPF_ANY);
return 0;
}

__u64 fd = 0;
__u32 type = 0;
__u64 timestamp = 0;
__u64 duration = 0;
void *map = &tcp_connect_events;
if (args.oldstate == BPF_TCP_SYN_SENT) {
struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
struct conn_info *i = bpf_map_lookup_elem(&conn_info, &args.skaddr);
if (!i) {
return 0;
}
Expand All @@ -113,9 +123,10 @@ int inet_sock_set_state(void *ctx)
} else if (args.newstate == BPF_TCP_CLOSE) {
type = EVENT_TYPE_CONNECTION_ERROR;
}
duration = bpf_ktime_get_ns() - i->ts;
pid = i->pid;
fd = i->fd;
bpf_map_delete_elem(&sk_info, &args.skaddr);
bpf_map_delete_elem(&conn_info, &args.skaddr);
}
if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
pid = 0;
Expand All @@ -136,6 +147,7 @@ int inet_sock_set_state(void *ctx)

struct tcp_event e = {};
e.type = type;
e.duration = duration;
e.timestamp = timestamp;
e.pid = pid;
e.sport = args.sport;
Expand Down
3 changes: 3 additions & 0 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Event struct {
DstAddr netaddr.IPPort
Fd uint64
Timestamp uint64
Duration time.Duration
L7Request *l7.RequestData
}

Expand Down Expand Up @@ -295,6 +296,7 @@ type procEvent struct {
type tcpEvent struct {
Fd uint64
Timestamp uint64
Duration uint64
Type EventType
Pid uint32
SPort uint16
Expand Down Expand Up @@ -388,6 +390,7 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
DstAddr: ipPort(v.DAddr, v.DPort),
Fd: v.Fd,
Timestamp: v.Timestamp,
Duration: time.Duration(v.Duration),
}
default:
continue
Expand Down

0 comments on commit 05c1e51

Please sign in to comment.