Skip to content

Commit

Permalink
Merge pull request #121 from coroot/fix_short_lived_connections_handling
Browse files Browse the repository at this point in the history
trace connection closure only through the `close` syscall
  • Loading branch information
def authored Aug 21, 2024
2 parents 3f56517 + f21f868 commit a90fb2d
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 68 deletions.
17 changes: 3 additions & 14 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,31 +596,20 @@ func (c *Container) getActualDestination(p *Process, src, dst netaddr.IPPort) (*
return nil, nil
}

func (c *Container) onConnectionClose(e ebpftracer.Event) bool {
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
func (c *Container) onConnectionClose(e ebpftracer.Event) {
c.lock.Lock()
conn, ok := c.connectionsActive[srcDst]
conn := c.connectionsByPidFd[PidFd{Pid: e.Pid, Fd: e.Fd}]
c.lock.Unlock()
if conn != nil {
if conn.Closed.IsZero() {
if e.Pid == 0 && e.Fd == 0 {
stats, err := c.registry.tracer.GetAndDeleteTCPConnection(conn.Pid, conn.Fd)
if err != nil {
klog.Warningln(c.id, conn.Pid, conn.Fd, conn.ActualDest, err)
} else {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, stats.BytesSent, stats.BytesReceived)
c.lock.Unlock()
}
} else if e.TrafficStats != nil {
if e.TrafficStats != nil {
c.lock.Lock()
c.updateConnectionTrafficStats(conn, e.TrafficStats.BytesSent, e.TrafficStats.BytesReceived)
c.lock.Unlock()
}
conn.Closed = time.Now()
}
}
return ok
}

func (c *Container) updateTrafficStats(u *TrafficStatsUpdate) {
Expand Down
12 changes: 2 additions & 10 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,16 +262,8 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
klog.Infoln("TCP connection error from unknown container", e)
}
case ebpftracer.EventTypeConnectionClose:
if e.Pid != 0 && e.Fd != 0 {
if c := r.containersByPid[e.Pid]; c != nil {
c.onConnectionClose(e)
}
} else {
for _, c := range r.containersById {
if c.onConnectionClose(e) {
break
}
}
if c := r.containersByPid[e.Pid]; c != nil {
c.onConnectionClose(e)
}
case ebpftracer.EventTypeTCPRetransmit:
srcDst := AddrPair{src: e.SrcAddr, dst: e.DstAddr}
Expand Down
16 changes: 8 additions & 8 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

44 changes: 14 additions & 30 deletions ebpftracer/ebpf/tcp/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,7 @@ int inet_sock_set_state(void *ctx)
fd = cid->fd;
}
if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
struct connection_id *cid = bpf_map_lookup_elem(&connection_id_by_socket, &args.skaddr);
if (cid) {
pid = cid->pid;
fd = cid->fd;
struct connection *conn = bpf_map_lookup_elem(&active_connections, cid);
if (conn) {
e.bytes_sent = conn->bytes_sent;
e.bytes_received = conn->bytes_received;
bpf_map_delete_elem(&active_connections, cid);
}
bpf_map_delete_elem(&connection_id_by_socket, &args.skaddr);
}
type = EVENT_TYPE_CONNECTION_CLOSE;
bpf_map_delete_elem(&connection_id_by_socket, &args.skaddr);
}
if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_LISTEN) {
type = EVENT_TYPE_LISTEN_OPEN;
Expand Down Expand Up @@ -225,24 +213,20 @@ int sys_enter_close(void *ctx) {
return 0;
}
__u64 id = bpf_get_current_pid_tgid();
bpf_map_update_elem(&fd_by_pid_tgid, &id, &args.fd, BPF_ANY);
return 0;
}

SEC("tracepoint/syscalls/sys_exit_close")
int sys_exit_close(struct trace_event_raw_sys_exit__stub* ctx) {
__u64 id = bpf_get_current_pid_tgid();
__u64 *fdp = bpf_map_lookup_elem(&fd_by_pid_tgid, &id);
if (!fdp) {
return 0;
}
struct connection_id cid = {};
cid.pid = id >> 32;
cid.fd = *fdp;
bpf_map_delete_elem(&active_connections, &cid);
bpf_map_delete_elem(&fd_by_pid_tgid, &id);
cid.fd = args.fd;
struct connection *conn = bpf_map_lookup_elem(&active_connections, &cid);
if (conn) {
struct tcp_event e = {};
e.type = EVENT_TYPE_CONNECTION_CLOSE;
e.pid = cid.pid;
e.fd = cid.fd;
e.bytes_sent = conn->bytes_sent;
e.bytes_received = conn->bytes_received;
e.timestamp = conn->timestamp;
bpf_perf_event_output(ctx, &tcp_connect_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
bpf_map_delete_elem(&active_connections, &cid);
}
return 0;
}



6 changes: 0 additions & 6 deletions ebpftracer/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,6 @@ func (t *Tracer) init(ch chan<- Event) error {
return nil
}

func (t *Tracer) GetAndDeleteTCPConnection(pid uint32, fd uint64) (*Connection, error) {
id := ConnectionId{FD: fd, PID: pid}
conn := &Connection{}
return conn, t.collection.Maps["active_connections"].LookupAndDelete(id, conn)
}

func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
return t.collection.Maps["active_connections"].Iterate()
}
Expand Down

0 comments on commit a90fb2d

Please sign in to comment.