Skip to content

Commit

Permalink
[WIP] Keepalive and idle timeout debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed Jun 8, 2024
1 parent e7c6394 commit 96a4edf
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 41 deletions.
5 changes: 5 additions & 0 deletions ratemon/runtime/c/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ clean:
sudo rm -fv /sys/fs/bpf/flow_to_rwnd
sudo rm -fv /sys/fs/bpf/flow_to_win_scale
sudo rm -fv /sys/fs/bpf/flow_to_last_data_time_ns
sudo rm -fv /sys/fs/bpf/flow_to_keepalive
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_rwnd
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_win_scale
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_last_data_time_ns
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_keepalive

$(OUTPUT) $(OUTPUT)/libbpf $(BPFTOOL_OUTPUT):
$(call msg,MKDIR,$@)
Expand Down Expand Up @@ -153,15 +155,18 @@ attach_tc_and_run: ratemon_main $(OUTPUT)/ratemon_tc.bpf.o
sudo bpftool map pin name flow_to_rwnd /sys/fs/bpf/flow_to_rwnd
sudo bpftool map pin name flow_to_win_sca /sys/fs/bpf/flow_to_win_scale
sudo bpftool map pin name flow_to_last_da /sys/fs/bpf/flow_to_last_data_time_ns
sudo bpftool map pin name flow_to_keepali /sys/fs/bpf/flow_to_keepalive
sudo RM_CGROUP=$(RM_CGROUP) ./ratemon_main || true
for id in `sudo bpftool struct_ops list | cut -d":" -f1`; do sudo bpftool struct_ops unregister id $$id; done
sudo tc filter del dev $(RM_IFACE) egress
sudo rm -fv /sys/fs/bpf/flow_to_rwnd
sudo rm -fv /sys/fs/bpf/flow_to_win_scale
sudo rm -fv /sys/fs/bpf/flow_to_last_data_time_ns
sudo rm -fv /sys/fs/bpf/flow_to_keepalive
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_rwnd
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_win_scale
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_last_data_time_ns
sudo rm -fv /sys/fs/bpf/tc/globals/flow_to_keepalive
sudo tc qdisc del dev $(RM_IFACE) clsact

# delete failed targets
Expand Down
100 changes: 70 additions & 30 deletions ratemon/runtime/c/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

// Protects writes only to max_active_flows, epoch_us, idle_timeout_ns,
// monitor_port_start, monitor_port_end, flow_to_rwnd_fd, flow_to_win_scale_fd,
// oldact, and setup. Reads are unprotected.
// flow_to_last_data_time_fd, flow_to_keepalive_fd, oldact, and setup. Reads are
// unprotected.
std::mutex lock_setup;
// Whether setup has been performed.
bool setup_done = false;
Expand All @@ -48,6 +49,8 @@ int flow_to_rwnd_fd = 0;
int flow_to_win_scale_fd = 0;
// FD for the BPF map "flow_to_last_da" (short for "flow_to_last_data_time_ns").
int flow_to_last_data_time_fd = 0;
// FD for the BPF map "flow_to_keepali" (short for "flow_to_keepalive").
int flow_to_keepalive_fd = 0;
// Runs async timers for scheduling
boost::asio::io_service io;
// Periodically performs scheduling using timer_callback().
Expand Down Expand Up @@ -144,12 +147,14 @@ void timer_callback(const boost::system::error_code &error) {
}
// Check that relevant parameters have been set. Otherwise, revert to slow
// check mode.
if (max_active_flows == 0 || epoch_us == 0 || flow_to_rwnd_fd == 0 ||
flow_to_last_data_time_fd == 0) {
if (!max_active_flows || !epoch_us || !flow_to_rwnd_fd ||
!flow_to_last_data_time_fd || !flow_to_keepalive_fd) {
RM_PRINTF(
"ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, "
"flow_to_rwnd_fd=%d, or flow_to_last_data_time_fd=%d\n",
max_active_flows, epoch_us, flow_to_rwnd_fd, flow_to_last_data_time_fd);
"flow_to_rwnd_fd=%d, flow_to_last_data_time_fd=%d, or "
"flow_to_keepalive_fd=%d\n",
max_active_flows, epoch_us, flow_to_rwnd_fd, flow_to_last_data_time_fd,
flow_to_keepalive_fd);
if (timer.expires_from_now(one_sec)) {
RM_PRINTF("ERROR: timer unexpectedly cancelled\n");
}
Expand All @@ -159,7 +164,8 @@ void timer_callback(const boost::system::error_code &error) {

// It is now safe to perform scheduling.
lock_scheduler.lock();
RM_PRINTF("INFO: performing scheduling\n");
RM_PRINTF("INFO: performing scheduling. active=%lu, paused=%lu\n",
active_fds_queue.size(), paused_fds_queue.size());

// Temporary variable for storing the front of active_fds_queue.
std::pair<int, boost::posix_time::ptime> a;
Expand Down Expand Up @@ -226,6 +232,9 @@ void timer_callback(const boost::system::error_code &error) {
// risk causing a drop in utilization by pausing it immediately.
if (idle_ns >= idle_timeout_ns) {
RM_PRINTF("INFO: Pausing FD=%d due to idle timeout\n", a.first);
// Remove the flow from flow_to_keepalive, signalling that it no
// longer has pending demand.
bpf_map_delete_elem(flow_to_keepalive_fd, &a.first);
paused_fds_queue.push(a.first);
pause_flow(a.first);
continue;
Expand All @@ -250,6 +259,9 @@ void timer_callback(const boost::system::error_code &error) {
to_pause.push_back(a.first);
}
}
// Put the flow back in the active queue. For this to occur, we know the
// flow is not idle, and it is either not yet at its epoch time or it has
// passed its epoch time and it will be extracted and paused later.
active_fds_queue.push(a);
}

Expand All @@ -259,13 +271,23 @@ void timer_callback(const boost::system::error_code &error) {
// as many entries in paused_fds_queue as needed.
unsigned long num_to_activate =
max_active_flows - active_fds_queue.size() + to_pause.size();
int dummy;
for (unsigned long i = 0; i < num_to_activate; ++i) {
// Loop until we find a paused flow that is valid (not closed).
while (!paused_fds_queue.empty()) {
unsigned long num_paused = paused_fds_queue.size();
for (unsigned long j = 0; j < num_paused; ++j) {
p = paused_fds_queue.front();
paused_fds_queue.pop();
// If this flow has been closed, then skip it.
if (!fd_to_flow.contains(p)) continue;
// If this flow is not in the flow_to_keepalive map (bpf_map_lookup_elem()
// returns negative error code when the flow is not found), then it has no
// pending data and should be skipped.
if (bpf_map_lookup_elem(flow_to_keepalive_fd, &p, &dummy)) {
// RM_PRINTF("INFO: skipping activating FD=%d, no pending data\n", p);
paused_fds_queue.push(p);
continue;
}
// Randomly jitter the epoch time by +/- 12.5%.
active_fds_queue.push(
{p,
Expand All @@ -291,6 +313,7 @@ void timer_callback(const boost::system::error_code &error) {
while (j < s) {
a = active_fds_queue.front();
active_fds_queue.pop();
++j;
if (a.first == to_pause[i]) {
// Pause this flow.
paused_fds_queue.push(a.first);
Expand All @@ -299,7 +322,6 @@ void timer_callback(const boost::system::error_code &error) {
}
// Examine the next flow in active_fds_queue.
active_fds_queue.push(a);
++j;
}
}

Expand All @@ -308,30 +330,32 @@ void timer_callback(const boost::system::error_code &error) {
// Cannot have more than the max number of active flows.
assert(active_fds_queue.size() <= max_active_flows);
// If there are no active flows, then there should also be no paused flows.
assert(!active_fds_queue.empty() || paused_fds_queue.empty());
// No, this is not strictly true anymore. If none of the flows have pending
// data (i.e., none are in flow_to_keepalive), then they will all be paused.
// assert(!active_fds_queue.empty() || paused_fds_queue.empty());
#endif

// 5) Calculate when the next timer should expire.
boost::posix_time::time_duration when;
long next_epoch_us =
(active_fds_queue.front().second - now).total_microseconds();
if (active_fds_queue.empty()) {
// If there are no flows, revert to slow check mode.
when = one_sec;
RM_PRINTF("INFO: no flows remaining, reverting to slow check mode\n");
} else if (idle_timeout_ns == 0) {
when = one_sec;
} else if (!idle_timeout_ns) {
// If we are not using idle timeout mode...
when = active_fds_queue.front().second - now;
RM_PRINTF("INFO: scheduling timer for next epoch end\n");
} else {
RM_PRINTF("INFO: no idle timeout, scheduling timer for next epoch end\n");
when = boost::posix_time::microsec(next_epoch_us);
} else if (idle_timeout_us < next_epoch_us) {
// If we are using idle timeout mode...
long next_epoch_us =
(active_fds_queue.front().second - now).total_microseconds();
if (idle_timeout_us < next_epoch_us) {
RM_PRINTF("INFO: scheduling timer for next idle timeout\n");
when = boost::posix_time::microsec(idle_timeout_us);
} else {
RM_PRINTF("INFO: scheduling timer for next epoch end\n");
when = boost::posix_time::microsec(next_epoch_us);
}
RM_PRINTF("INFO: scheduling timer for next idle timeout\n");
when = boost::posix_time::microsec(idle_timeout_us);
} else {
RM_PRINTF(
"INFO: scheduling timer for next epoch end, sooner than idle "
"timeout\n");
when = boost::posix_time::microsec(next_epoch_us);
}

// 6) Start the next timer.
Expand Down Expand Up @@ -362,11 +386,13 @@ void thread_func() {
// Delete all flows from flow_to_rwnd and flow_to_win_scale.
lock_scheduler.lock();
for (const auto &p : fd_to_flow) {
if (flow_to_rwnd_fd != 0) bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
if (flow_to_win_scale_fd != 0)
if (flow_to_rwnd_fd) bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
if (flow_to_win_scale_fd)
bpf_map_delete_elem(flow_to_win_scale_fd, &p.second);
if (flow_to_last_data_time_fd != 0)
if (flow_to_last_data_time_fd)
bpf_map_delete_elem(flow_to_last_data_time_fd, &p.second);
if (flow_to_keepalive_fd)
bpf_map_delete_elem(flow_to_keepalive_fd, &p.second);
}
lock_scheduler.unlock();
RM_PRINTF("INFO: scheduler thread ended\n");
Expand Down Expand Up @@ -469,11 +495,23 @@ bool setup() {
RM_PRINTF(
"ERROR: failed to get FD for 'flow_to_last_data_time_ns' from path "
"'%s'\n",
RM_FLOW_TO_RWND_PIN_PATH);
RM_FLOW_TO_LAST_DATA_TIME_PIN_PATH);
return false;
}
flow_to_last_data_time_fd = err;

// Look up the FD for the flow_to_keepalive map. We do not need the
// BPF skeleton for this.
err = bpf_obj_get(RM_FLOW_TO_KEEPALIVE_PIN_PATH);
if (err == -1) {
RM_PRINTF(
"ERROR: failed to get FD for 'flow_to_keepalive' from path "
"'%s'\n",
RM_FLOW_TO_KEEPALIVE_PIN_PATH);
return false;
}
flow_to_keepalive_fd = err;

// Catch SIGINT to end the program.
struct sigaction action;
action.sa_handler = sigint_handler;
Expand Down Expand Up @@ -648,12 +686,14 @@ int close(int sockfd) {
lock_scheduler.lock();
if (fd_to_flow.contains(sockfd)) {
// Obviously, do this before removing the FD from fd_to_flow.
if (flow_to_rwnd_fd != 0)
if (flow_to_rwnd_fd)
bpf_map_delete_elem(flow_to_rwnd_fd, &fd_to_flow[sockfd]);
if (flow_to_win_scale_fd != 0)
if (flow_to_win_scale_fd)
bpf_map_delete_elem(flow_to_win_scale_fd, &fd_to_flow[sockfd]);
if (flow_to_last_data_time_fd != 0)
if (flow_to_last_data_time_fd)
bpf_map_delete_elem(flow_to_last_data_time_fd, &fd_to_flow[sockfd]);
if (flow_to_keepalive_fd)
bpf_map_delete_elem(flow_to_keepalive_fd, &fd_to_flow[sockfd]);
// Removing the FD from fd_to_flow triggers it to be (eventually) removed
// from scheduling.
unsigned long d = fd_to_flow.erase(sockfd);
Expand Down
1 change: 1 addition & 0 deletions ratemon/runtime/c/ratemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#define RM_FLOW_TO_WIN_SCALE_PIN_PATH "/sys/fs/bpf/flow_to_win_scale"
#define RM_FLOW_TO_LAST_DATA_TIME_PIN_PATH \
"/sys/fs/bpf/flow_to_last_data_time_ns"
#define RM_FLOW_TO_KEEPALIVE_PIN_PATH "/sys/fs/bpf/flow_to_keepalive"
// Name of struct_ops CCA that flows must use to be woken up.
#define RM_BPF_CUBIC "bpf_cubic"

Expand Down
54 changes: 43 additions & 11 deletions ratemon/runtime/c/ratemon_kprobe.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,60 @@ int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) {
bpf_printk("ERROR: 'tcp_rcv_established' tp=%u", tp);
return 0;
}
// If this packet does not contain new data (e.g., it is a pure ACK or a
// retransmission), then we are not interested in it.
__be32 seq;

// Safely extract members from tcp_sock, tcphdr, and sk_buff.
// tcp_sock:
u32 rcv_nxt;
BPF_CORE_READ_INTO(&seq, th, seq);
BPF_CORE_READ_INTO(&rcv_nxt, tp, rcv_nxt);
if (bpf_ntohl(seq) < rcv_nxt) {
bpf_printk("ERROR: 'tcp_rcv_established' packet does not contain new data",
tp);
return 0;
}
// Build the flow struct.
// tcphdr
__be32 seq_;
BPF_CORE_READ_INTO(&seq_, th, seq);
u32 seq = bpf_ntohl(seq_);
u64 doff = BPF_CORE_READ_BITFIELD_PROBED(th, doff);
u64 syn = BPF_CORE_READ_BITFIELD_PROBED(th, syn);
u64 fin = BPF_CORE_READ_BITFIELD_PROBED(th, fin);
u64 rst = BPF_CORE_READ_BITFIELD_PROBED(th, rst);
// sk_buff:
unsigned int len;
__be32 skc_daddr;
__be32 skc_rcv_saddr;
__u16 skc_num;
__be16 skc_dport;
BPF_CORE_READ_INTO(&len, skb, len);
BPF_CORE_READ_INTO(&skc_daddr, sk, __sk_common.skc_daddr);
BPF_CORE_READ_INTO(&skc_rcv_saddr, sk, __sk_common.skc_rcv_saddr);
BPF_CORE_READ_INTO(&skc_num, sk, __sk_common.skc_num);
BPF_CORE_READ_INTO(&skc_dport, sk, __sk_common.skc_dport);

// Build the flow struct.
struct rm_flow flow = {.local_addr = bpf_ntohl(skc_rcv_saddr),
.remote_addr = bpf_ntohl(skc_daddr),
.local_port = skc_num,
.remote_port = bpf_ntohs(skc_dport)};

// Check for TCP keepalive. From Wireshark
// (https://www.wireshark.org/docs/wsug_html_chunked/ChAdvTCPAnalysis.html):
// A packet is a keepalive "...when the segment size is zero or one, the
// current sequence number is one byte less than the next expected sequence
// number, and none of SYN, FIN, or RST are set".
if ((len - (doff * 4) <= 1) && (seq == rcv_nxt - 1) && !syn && !fin && !rst) {
// This is a keepalive packet.
bpf_printk("INFO: 'tcp_rcv_established' found keepalive");
int one = 1;
if (bpf_map_update_elem(&flow_to_keepalive, &flow, &one, BPF_ANY)) {
bpf_printk(
"ERROR: 'tcp_rcv_established' error updating flow_to_keepalive");
}
}

// If this packet does not contain new data (e.g., it is a pure ACK or a
// retransmission), then we are not interested in it.
if (seq < rcv_nxt) {
bpf_printk("INFO: 'tcp_rcv_established' packet does not contain new data",
tp);
return 0;
}

// Check if we should record the last data time for this flow.
if (bpf_map_lookup_elem(&flow_to_last_data_time_ns, &flow) == NULL) {
// This flow is not in the map, so we are not supposed to track its last
Expand All @@ -72,7 +102,9 @@ int BPF_KPROBE(tcp_rcv_established, struct sock *sk, struct sk_buff *skb) {
unsigned long now_ns = bpf_ktime_get_ns();
if (bpf_map_update_elem(&flow_to_last_data_time_ns, &flow, &now_ns,
BPF_ANY)) {
bpf_printk("ERROR: 'tcp_rcv_established' error updating last data time");
bpf_printk(
"ERROR: 'tcp_rcv_established' error updating "
"flow_to_last_data_time_ns");
}
return 0;
}
9 changes: 9 additions & 0 deletions ratemon/runtime/c/ratemon_maps.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,13 @@ struct {
__uint(pinning, LIBBPF_PIN_BY_NAME);
} flow_to_last_data_time_ns SEC(".maps");

// Flows in this map have received a recent keepalive and have not gone idle since, so they are considered to be active.
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, RM_MAX_FLOWS);
__type(key, struct rm_flow);
__type(value, int);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} flow_to_keepalive SEC(".maps");

#endif /* __RATEMON_MAPS_H */

0 comments on commit 96a4edf

Please sign in to comment.