From fe64f5929d0991a9d6c5d21fa021867cf368af6f Mon Sep 17 00:00:00 2001 From: Christopher Canel Date: Sat, 1 Jun 2024 20:32:20 +0000 Subject: [PATCH] [WIP] Idle timeout, cleanup --- ratemon/runtime/c/libratemon_interp.cpp | 258 +++++++++++++----------- 1 file changed, 137 insertions(+), 121 deletions(-) diff --git a/ratemon/runtime/c/libratemon_interp.cpp b/ratemon/runtime/c/libratemon_interp.cpp index 3900518..583d627 100644 --- a/ratemon/runtime/c/libratemon_interp.cpp +++ b/ratemon/runtime/c/libratemon_interp.cpp @@ -31,7 +31,7 @@ #include "ratemon.h" -// Protects writes only to max_active_flows, epoch_us, idle_timeout_us, +// Protects writes only to max_active_flows, epoch_us, idle_timeout_ns, // num_to_schedule, monitor_port_start, monitor_port_end, flow_to_rwnd_fd, // flow_to_win_scale_fd, oldact, and setup. Reads are unprotected. std::mutex lock_setup; @@ -66,7 +66,7 @@ std::unordered_map fd_to_flow; // parameter documentation. unsigned int max_active_flows = 5; unsigned int epoch_us = 10000; -unsigned int idle_timeout_us = 1000; +unsigned int idle_timeout_ns = 1000; unsigned int num_to_schedule = 1; unsigned short monitor_port_start = 9000; unsigned short monitor_port_end = 9999; @@ -87,123 +87,159 @@ inline void trigger_ack(int fd) { &placeholder_cc_info_length); } -// Call this when a flow should be paused. If there are waiting flows and -// available capacity, then one will be activated. Flows are paused and -// activated in round-robin order. Each flow is allowed to be active for -// at most epoch_us microseconds. +// Jitter the provided value by +/- 12.5%. Returns just the jitter. +inline int jitter(int v) { + return std::experimental::randint(0, (int)std::roundl(v * 0.25)) - + (int)std::roundl(v * 0.125); +} + +inline void activate_flow(int fd) { + bpf_map_delete_elem(flow_to_rwnd_fd, &fd_to_flow[fd]); + trigger_ack(fd); + RM_PRINTF("INFO: activated FD=%d\n", fd); +} + +inline void pause_flow(int fd) { + // Pausing a flow means retting its RWND to 0 B. + bpf_map_update_elem(flow_to_rwnd_fd, &fd_to_flow[fd], &zero, BPF_ANY); + trigger_ack(fd); + RM_PRINTF("INFO: paused flow FD=%d\n", fd); +} + +// Call this to check if scheduling should take place, and if so, perform it. If +// there are waiting flows and available capacity, then one will be activated. +// Flows are paused and activated in round-robin order. Each flow is allowed to +// be active for at most epoch_us microseconds. Flows that have been idle for +// longer than idle_timeout_ns will be paused. void timer_callback(const boost::system::error_code &error) { RM_PRINTF("INFO: in timer_callback\n"); // 0. Perform validity checks. // If an error (such as a cancellation) triggered this callback, then abort - // immediately. + // immediately. Do not set another timer. if (error) { RM_PRINTF("ERROR: timer_callback error: %s\n", error.message().c_str()); return; } - // If the program has been signalled to stop, then exit. + // If the program has been signalled to stop, then exit. Do not set another + // timer. if (!run) { - RM_PRINTF("INFO: exiting\n"); + RM_PRINTF("INFO: program signalled to exit\n"); return; } // If setup has not been performed yet, then we cannot perform scheduling. + // Otherwise, revert to slow check mode. if (!setup_done) { + RM_PRINTF("INFO: not set up\n"); if (timer.expires_from_now(one_sec)) { - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } - timer.async_wait(&timer_callback); - RM_PRINTF("INFO: not set up (flag)\n"); return; } - // At this point, max_active_flows, epoch_us, num_to_schedule, - // flow_to_rwnd_fd, and flow_to_last_data_time_fd should be set. If not, - // revert to slow check mode. - if (max_active_flows == 0 || epoch_us == 0 || idle_timeout_us || - num_to_schedule == 0 || flow_to_rwnd_fd == 0 || - flow_to_last_data_time_fd == 0) { + // Check that relevant parameters have been set. Otherwise, revert to slow + // check mode. + if (max_active_flows == 0 || epoch_us == 0 || num_to_schedule == 0 || + flow_to_rwnd_fd == 0 || flow_to_last_data_time_fd == 0) { RM_PRINTF( "ERROR: cannot continue, invalid max_active_flows=%u, epoch_us=%u, " - "idle_timeout_us=%u, num_to_schedule=%u, flow_to_rwnd_fd=%d, or " + "num_to_schedule=%u, flow_to_rwnd_fd=%d, or " "flow_to_last_data_time_fd=%d\n", - max_active_flows, epoch_us, idle_timeout_us, num_to_schedule, - flow_to_rwnd_fd, flow_to_last_data_time_fd); + max_active_flows, epoch_us, num_to_schedule, flow_to_rwnd_fd, + flow_to_last_data_time_fd); if (timer.expires_from_now(one_sec)) { - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } timer.async_wait(&timer_callback); - RM_PRINTF("INFO: not set up (params)\n"); return; } - // It is safe to perform scheduling. + // It is now safe to perform scheduling. lock_scheduler.lock(); RM_PRINTF("INFO: performing scheduling\n"); // 1. If there are no flows, then revert to slow check mode. - if (active_fds_queue.empty() && paused_fds_queue.empty()) { - if (timer.expires_from_now(one_sec)) { - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); - } - timer.async_wait(&timer_callback); - lock_scheduler.unlock(); - RM_PRINTF("INFO: no flows\n"); - return; - } + // This case is handled in 6. Make sure that all of the following code still + // works even if there are no flows. // 2. Remove closed flows from both queues. - // active_fds_queue - for (unsigned long i = 0; i < active_fds_queue.size(); ++i) { + // active_fds_queue: + unsigned long s = active_fds_queue.size(); + for (unsigned long i = 0; i < s; ++i) { auto a = active_fds_queue.front(); active_fds_queue.pop(); - if (fd_to_flow.contains(a.first)) active_fds_queue.push(a); + if (fd_to_flow.contains(a.first)) + active_fds_queue.push(a); + else { + RM_PRINTF( + "INFO: removed FD=%d from active queue because it is not in " + "fd_to_flow\n", + a.first); + } } - // paused_fds_queue - for (unsigned long i = 0; i < paused_fds_queue.size(); ++i) { + // paused_fds_queue: + s = paused_fds_queue.size(); + for (unsigned long i = 0; i < s; ++i) { auto p = paused_fds_queue.front(); paused_fds_queue.pop(); - if (fd_to_flow.contains(p)) paused_fds_queue.push(p); + if (fd_to_flow.contains(p)) + paused_fds_queue.push(p); + else { + RM_PRINTF( + "INFO: removed FD=%d from paused queue because it is not in " + "fd_to_flow\n", + p); + } + } + + RM_PRINTF("INFO: active_fds_queue.size()=%lu, paused_fds_queue.size()=%lu\n", + active_fds_queue.size(), paused_fds_queue.size()); + for (unsigned long i = 0; i < active_fds_queue.size(); ++i) { + RM_PRINTF("INFO: active FD=%d\n", active_fds_queue.front().first); + active_fds_queue.push(active_fds_queue.front()); + active_fds_queue.pop(); } // 3. Idle timeout. Look through all active flows and pause any that have been - // idle for longer than idle_timeout_us. Only do this if there are paused + // idle for longer than idle_timeout_ns. Only do this if there are paused // flows. // Get the time in a form that we can compare to bpf_ktime_get_ns(). struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); + unsigned long last_data_time_ns, idle_ns; unsigned long ktime_now_ns = ts.tv_sec * 1000000000ull + ts.tv_nsec; - if (idle_timeout_us > 0 && !active_fds_queue.empty()) { - for (unsigned long i = 0; i < active_fds_queue.size(); ++i) { + if (idle_timeout_ns > 0 && !paused_fds_queue.empty()) { + s = active_fds_queue.size(); + for (unsigned long i = 0; i < s; ++i) { auto a = active_fds_queue.front(); active_fds_queue.pop(); // Look up last active time. - unsigned long last_data_time_ns; if (bpf_map_lookup_elem(flow_to_last_data_time_fd, &fd_to_flow[a.first], &last_data_time_ns)) { active_fds_queue.push(a); continue; } // If the flow has been active within the idle timeout, then keep it. - if (ktime_now_ns - last_data_time_ns < (long)idle_timeout_us * 1000) { + idle_ns = ktime_now_ns - last_data_time_ns; + RM_PRINTF("INFO: FD=%d idle has been idle for %lu ns\n", a.first, + idle_ns); + if (idle_ns < (long)idle_timeout_ns * 1000) { active_fds_queue.push(a); continue; } // Otherwise, pause the flow. paused_fds_queue.push(a.first); - bpf_map_update_elem(flow_to_rwnd_fd, &fd_to_flow[a.first], &zero, - BPF_ANY); - trigger_ack(a.first); - RM_PRINTF("INFO: paused FD=%d due to idle timeout\n", a.first); + pause_flow(a.first); } } // 3. Calculate how many flows to activate. - boost::posix_time::ptime now = - boost::posix_time::microsec_clock::local_time(); // Start with the number of free slots. unsigned long num_to_activate = max_active_flows - active_fds_queue.size(); // If the next flow should be scheduled now, then activate at least // num_to_schedule flows. + boost::posix_time::ptime now = + boost::posix_time::microsec_clock::local_time(); if (now > active_fds_queue.front().second) { num_to_activate = std::max(num_to_activate, (unsigned long)num_to_schedule); } @@ -214,74 +250,68 @@ void timer_callback(const boost::system::error_code &error) { boost::posix_time::ptime now_plus_epoch = now + boost::posix_time::microseconds(epoch_us); for (unsigned long i = 0; i < num_to_activate; ++i) { - int to_activate = paused_fds_queue.front(); + auto p = paused_fds_queue.front(); paused_fds_queue.pop(); // Randomly jitter the epoch time by +/- 12.5%. - int rand = - std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - - (int)std::roundl(epoch_us * 0.125); - active_fds_queue.push( - {to_activate, now_plus_epoch + boost::posix_time::microseconds(rand)}); - bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[to_activate])); - trigger_ack(to_activate); - RM_PRINTF("INFO: activated FD=%d\n", to_activate); + active_fds_queue.push({p, now_plus_epoch + boost::posix_time::microseconds( + jitter(epoch_us))}); + activate_flow(p); } // 4. Pause excessive flows. while (active_fds_queue.size() > max_active_flows) { - int to_pause = active_fds_queue.front().first; + auto a = active_fds_queue.front().first; active_fds_queue.pop(); if (active_fds_queue.size() < max_active_flows && paused_fds_queue.empty()) { // If there are fewer than the limit flows active and there are no // waiting flows, then schedule this flow again. Randomly jitter the epoch // time by +/- 12.5%. - int rand = - std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - - (int)std::roundl(epoch_us * 0.125); - boost::posix_time::ptime now_plus_epoch_plus_rand = - now_plus_epoch + boost::posix_time::microseconds(rand); - active_fds_queue.push({to_pause, now_plus_epoch_plus_rand}); - RM_PRINTF("INFO: reactivated FD=%d\n", to_pause); + active_fds_queue.push( + {a, + now_plus_epoch + boost::posix_time::microseconds(jitter(epoch_us))}); + RM_PRINTF("INFO: reactivated FD=%d\n", a); } else { // Pause this flow. - paused_fds_queue.push(to_pause); - bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[to_pause]), &zero, - BPF_ANY); - trigger_ack(to_pause); - RM_PRINTF("INFO: paused FD=%d\n", to_pause); + paused_fds_queue.push(a); + pause_flow(a); } } // 5. Check invariants. +#ifdef RM_VERBOSE // Cannot have more than the max number of active flows. assert(active_fds_queue.size() <= max_active_flows); // If there are no active flows, then there should also be no paused flows. assert(!active_fds_queue.empty() || paused_fds_queue.empty()); +#endif // 6. Calculate when the next timer should expire. boost::posix_time::time_duration when; if (active_fds_queue.empty()) { // If there are no flows, revert to slow check mode. - RM_PRINTF("INFO: no flows remaining\n"); when = one_sec; - } else if (idle_timeout_us == 0) { + RM_PRINTF("INFO: no flows remaining, reverting to slow check mode\n"); + } else if (idle_timeout_ns == 0) { + // If we are not using idle timeout mode... + when = active_fds_queue.front().second - now; + RM_PRINTF("INFO: scheduling time for next epoch end\n"); + } else { // If we are using idle timeout mode... when = boost::posix_time::microsec( - std::min((long)idle_timeout_us, + std::min((long)idle_timeout_ns, (active_fds_queue.front().second - now).total_microseconds())); - } else { - // If we are not using idle timeout mode... - when = active_fds_queue.front().second - now; + RM_PRINTF("INFO: scheduling time for next epoch end or idle timeout\n"); } // 7. Start the next timer. - if (timer.expires_from_now(one_sec)) { - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + if (timer.expires_from_now(when)) { + RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } timer.async_wait(&timer_callback); lock_scheduler.unlock(); - RM_PRINTF("INFO: sleeping until next event\n"); + RM_PRINTF("INFO: sleeping until next event in %ld us\n", + when.total_microseconds()); return; } @@ -290,7 +320,7 @@ void thread_func() { // managing the async timers that perform scheduling. RM_PRINTF("INFO: scheduler thread started\n"); if (timer.expires_from_now(one_sec)) { - RM_PRINTF("ERROR: cancelled timer when should not have!\n"); + RM_PRINTF("ERROR: timer unexpectedly cancelled\n"); } timer.async_wait(&timer_callback); @@ -302,10 +332,8 @@ void thread_func() { lock_scheduler.lock(); for (const auto &p : fd_to_flow) { if (flow_to_rwnd_fd != 0) bpf_map_delete_elem(flow_to_rwnd_fd, &p.second); - if (flow_to_win_scale_fd != 0) bpf_map_delete_elem(flow_to_win_scale_fd, &p.second); - if (flow_to_last_data_time_fd != 0) bpf_map_delete_elem(flow_to_last_data_time_fd, &p.second); } @@ -315,7 +343,7 @@ void thread_func() { if (run) { RM_PRINTF( "ERROR: scheduled thread ended before program was signalled to " - "stop!\n"); + "stop\n"); } } @@ -365,9 +393,10 @@ bool setup() { // Read environment variables with parameters. if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) return false; if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) return false; - if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_us, + if (!read_env_uint(RM_IDLE_TIMEOUT_US_KEY, &idle_timeout_ns, true /* allow_zero */)) return false; + idle_timeout_ns *= 1000; // Convert from us to ns. // Cannot schedule more than max_active_flows at once. if (!read_env_uint(RM_NUM_TO_SCHEDULE_KEY, &num_to_schedule) || num_to_schedule > max_active_flows) @@ -392,7 +421,6 @@ bool setup() { return false; } flow_to_rwnd_fd = err; - RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n"); // Look up the FD for the flow_to_win_scale map. We do not need the BPF // skeleton for this. @@ -404,7 +432,6 @@ bool setup() { return false; } flow_to_win_scale_fd = err; - RM_PRINTF("INFO: successfully looked up 'flow_to_win_scale' FD\n"); // Look up the FD for the flow_to_last_data_time_ns map. We do not need the // BPF skeleton for this. @@ -417,7 +444,6 @@ bool setup() { return false; } flow_to_last_data_time_fd = err; - RM_PRINTF("INFO: successfully looked up 'flow_to_last_data_time_ns' FD\n"); // Catch SIGINT to end the program. struct sigaction action; @@ -431,13 +457,14 @@ bool setup() { RM_PRINTF( "INFO: setup complete! max_active_flows=%u, epoch_us=%u, " - "idle_timeout_us=%u, " + "idle_timeout_ns=%u, " "num_to_schedule=%u, monitor_port_start=%u, monitor_port_end=%u\n", - max_active_flows, epoch_us, idle_timeout_us, num_to_schedule, + max_active_flows, epoch_us, idle_timeout_ns, num_to_schedule, monitor_port_start, monitor_port_end); return true; } +// Fill in the four-tuple for this socket. bool get_flow(int fd, struct rm_flow *flow) { // Determine the four-tuple, which we need to track because RWND tuning is // applied based on four-tuple. @@ -464,8 +491,8 @@ bool get_flow(int fd, struct rm_flow *flow) { return true; } +// Set the CCA for this socket and make sure it was set correctly. bool set_cca(int fd, const char *cca) { - // Set the CCA and make sure it was set correctly. if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, cca, strlen(cca)) == -1) { RM_PRINTF("ERROR: failed to 'setsockopt' TCP_CONGESTION\n"); return false; @@ -486,32 +513,27 @@ bool set_cca(int fd, const char *cca) { } // Perform initial scheduling for this flow. -void initial_scheduling(int fd, struct rm_flow *flow) { +void initial_scheduling(int fd) { // Should this flow be active or paused? if (active_fds_queue.size() < max_active_flows) { // Less than the max number of flows are active, so make this one active. boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); - int rand = - std::experimental::randint(0, (int)std::roundl(epoch_us * 0.25)) - - (int)std::roundl(epoch_us * 0.125); - active_fds_queue.push({fd, now + boost::posix_time::microseconds(epoch_us) + - boost::posix_time::microseconds(rand)}); + active_fds_queue.push( + {fd, now + boost::posix_time::microseconds(epoch_us) + + boost::posix_time::microseconds(jitter(epoch_us))}); RM_PRINTF("INFO: allowing new flow FD=%d\n", fd); if (active_fds_queue.size() == 1) { if (timer.expires_from_now(active_fds_queue.front().second - now) != 1) { - RM_PRINTF("ERROR: should have cancelled 1 timer!\n"); + RM_PRINTF("ERROR: should have cancelled 1 timer\n"); } - timer.async_wait(&timer_callback); RM_PRINTF("INFO: first scheduling event\n"); } } else { // The max number of flows are active already, so pause this one. paused_fds_queue.push(fd); - // Pausing a flow means retting its RWND to 0 B. - bpf_map_update_elem(flow_to_rwnd_fd, flow, &zero, BPF_ANY); - RM_PRINTF("INFO: paused new flow FD=%d\n", fd); + pause_flow(fd); } } @@ -533,9 +555,8 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n", addr->sa_family); if (addr->sa_family == AF_INET6) { - RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n"); + RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6\n"); } - return fd; } @@ -570,7 +591,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) { if (!set_cca(fd, RM_BPF_CUBIC)) return fd; // Initial scheduling for this flow. lock_scheduler.lock(); - initial_scheduling(fd, &flow); + initial_scheduling(fd); lock_scheduler.unlock(); RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, fd); return fd; @@ -585,35 +606,30 @@ int close(int sockfd) { return -1; } int ret = real_close(sockfd); + if (ret == -1) { + RM_PRINTF("ERROR: real 'close' failed\n"); + } else { + RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd); + } // Remove this FD from all data structures. lock_scheduler.lock(); if (fd_to_flow.contains(sockfd)) { // Obviously, do this before removing the FD from fd_to_flow. if (flow_to_rwnd_fd != 0) - bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd])); - + bpf_map_delete_elem(flow_to_rwnd_fd, &fd_to_flow[sockfd]); if (flow_to_win_scale_fd != 0) - bpf_map_delete_elem(flow_to_win_scale_fd, &(fd_to_flow[sockfd])); - + bpf_map_delete_elem(flow_to_win_scale_fd, &fd_to_flow[sockfd]); if (flow_to_last_data_time_fd != 0) - bpf_map_delete_elem(flow_to_last_data_time_fd, &(fd_to_flow[sockfd])); - + bpf_map_delete_elem(flow_to_last_data_time_fd, &fd_to_flow[sockfd]); // Removing the FD from fd_to_flow triggers it to be (eventually) removed // from scheduling. - fd_to_flow.erase(sockfd); - RM_PRINTF("INFO: removed FD=%d\n", sockfd); + auto d = fd_to_flow.erase(sockfd); + RM_PRINTF("INFO: removed FD=%d (%ld elements removed)\n", sockfd, d); } else { RM_PRINTF("INFO: ignoring 'close' for FD=%d, not in fd_to_flow\n", sockfd); } - lock_scheduler.unlock(); - - if (ret == -1) { - RM_PRINTF("ERROR: real 'close' failed\n"); - return ret; - } - RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd); return ret; } }