Skip to content

Commit

Permalink
Clean up libratemon_interp, remove unneeded ratemon_maps.bpf.c
Browse files Browse the repository at this point in the history
  • Loading branch information
ccanel committed May 1, 2024
1 parent 50eeedb commit c711254
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 77 deletions.
2 changes: 1 addition & 1 deletion ratemon/runtime/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ $(APPS): %: $(OUTPUT)/%.o $(LIBBPF_OBJ) | $(OUTPUT)
$(call msg,BINARY,$@)
$(CC) $(CFLAGS) $^ $(ALL_LDFLAGS) -lelf -lz -o $@

$(OUTPUT)/libratemon_interp.so: libratemon_interp.cpp $(OUTPUT)/ratemon_maps.skel.h ratemon.h | $(OUTPUT)
$(OUTPUT)/libratemon_interp.so: libratemon_interp.cpp ratemon.h | $(OUTPUT)
$(CXX) $(CXXFLAGS) -shared -fPIC -I$(OUTPUT) -I$(BOOST_INCLUDE) $< -ldl -L${BOOST_LIB} -lboost_thread -lbpf -o $@

$(INTERPS): %: $(OUTPUT)/%.so ;
Expand Down
133 changes: 59 additions & 74 deletions ratemon/runtime/libratemon_interp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,36 @@
#include <unistd.h>

#include <boost/thread.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <utility>
#include <vector>

#include "ratemon.h"
#include "ratemon_maps.skel.h"

volatile unsigned int max_active_flows = 0;
volatile unsigned int epoch_us = 0;
volatile bool setup = false;
volatile bool skipped_first = false;

// Protects active_fds_queue and paused_fds_queue.
std::mutex lock;
std::queue<int> active_fds_queue;
std::queue<int> paused_fds_queue;
// Maps file descriptor to rm_flow struct.
std::unordered_map<int, struct rm_flow> fd_to_flow;

// BPF things.
struct ratemon_maps_bpf *skel = NULL;
// struct bpf_map *flow_to_rwnd = NULL;
// FD for the flow_to_rwnd map.
int flow_to_rwnd_fd = 0;

// Maps file descriptor to flow struct.
boost::unordered::concurrent_flat_map<int, struct rm_flow> fd_to_flow;

// Used to set entries in flow_to_rwnd.
int zero = 0;

// As an optimization, reuse the same tcp_cc_info struct and size.
union tcp_cc_info placeholder_cc_info;
socklen_t placeholder_cc_info_length = (socklen_t)sizeof(placeholder_cc_info);

// Look up the environment variable for max active flows.
unsigned int max_active_flows =
(unsigned int)atoi(getenv(RM_MAX_ACTIVE_FLOWS_KEY));

// Look up the environment variable for scheduling epoch.
unsigned int epoch_us = (unsigned int)atoi(getenv(RM_EPOCH_US_KEY));

inline void trigger_ack(int fd) {
// Do not store the output to check for errors since there is nothing we can
// do.
Expand All @@ -68,27 +59,18 @@ void thread_func() {
// Preallocate suffient space.
new_active_fds.reserve(max_active_flows);

if (max_active_flows == 0 || epoch_us == 0) {
RM_PRINTF("ERROR when querying environment variables '%s' or '%s'\n",
RM_MAX_ACTIVE_FLOWS_KEY, RM_EPOCH_US_KEY);
return;
}

RM_PRINTF(
"libratemon_interp scheduling thread started, max flows=%u, epoch=%u "
"us\n",
max_active_flows, epoch_us);
RM_PRINTF("INFO: libratemon_interp scheduling thread started\n");

while (true) {
usleep(epoch_us);
// RM_PRINTF("Time to schedule\n");

// If setup has not been performed yet, then we cannot perform scheduling.
if (!setup) {
// RM_PRINTF("WARNING setup not completed, skipping scheduling\n");
usleep(10000);
continue;
}

usleep(epoch_us);

// If fewer than the max number of flows exist and they are all active, then
// there is no need for scheduling.
if (active_fds_queue.size() < max_active_flows &&
Expand All @@ -106,7 +88,7 @@ void thread_func() {
new_active_fds.size() < max_active_flows) {
// If we still know about this flow, then we can activate it.
int next_fd = paused_fds_queue.front();
if (fd_to_flow.visit(next_fd, [](const auto &) {})) {
if (fd_to_flow.contains(next_fd)) {
paused_fds_queue.pop();
new_active_fds.push_back(next_fd);
}
Expand All @@ -120,9 +102,7 @@ void thread_func() {
for (const auto &fd : new_active_fds) {
RM_PRINTF("%d ", fd);
active_fds_queue.push(fd);
fd_to_flow.visit(fd, [](const auto &p) {
bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]));
trigger_ack(fd);
}
RM_PRINTF("\n");
Expand All @@ -136,9 +116,7 @@ void thread_func() {
active_fds_queue.pop();
RM_PRINTF("%d ", fd);
paused_fds_queue.push(fd);
fd_to_flow.visit(fd, [](const auto &p) {
bpf_map_update_elem(flow_to_rwnd_fd, &p.second, &zero, BPF_ANY);
});
bpf_map_update_elem(flow_to_rwnd_fd, &(fd_to_flow[fd]), &zero, BPF_ANY);
// TODO: Do we need to send an ACK to immediately pause the flow?
trigger_ack(fd);
}
Expand All @@ -153,82 +131,91 @@ void thread_func() {

boost::thread t(thread_func);

bool read_env_uint(const char *key, volatile unsigned int *dest) {
char *val_str = getenv(key);
if (val_str == NULL) {
RM_PRINTF("ERROR: failed to query environment variable '%s'\n", key);
return false;
}
int val_int = atoi(val_str);
if (val_int <= 0) {
RM_PRINTF("ERROR: invalid value for '%s'=%d (must be > 0)\n", key, val_int);
return false;
}
*dest = (unsigned int)val_int;
return true;
}

// For some reason, C++ function name mangling does not prevent us from
// overriding accept(), so we do not need 'extern "C"'.
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
if (max_active_flows == 0 || epoch_us == 0) {
RM_PRINTF("ERROR when querying environment variables '%s' or '%s'\n",
RM_MAX_ACTIVE_FLOWS_KEY, RM_EPOCH_US_KEY);
return -1;
}

static int (*real_accept)(int, struct sockaddr *, socklen_t *) =
(int (*)(int, struct sockaddr *, socklen_t *))dlsym(RTLD_NEXT, "accept");
if (real_accept == NULL) {
RM_PRINTF("ERROR when querying dlsym for 'accept': %s\n", dlerror());
RM_PRINTF("ERROR: failed to query dlsym for 'accept': %s\n", dlerror());
return -1;
}
int new_fd = real_accept(sockfd, addr, addrlen);
if (new_fd == -1) {
RM_PRINTF("ERROR in real 'accept'\n");
RM_PRINTF("ERROR: real 'accept' failed\n");
return new_fd;
}
if (addr != NULL && addr->sa_family != AF_INET) {
RM_PRINTF("WARNING got 'accept' for non-AF_INET: sa_family=%u\n",
RM_PRINTF("WARNING: got 'accept' for non-AF_INET sa_family=%u\n",
addr->sa_family);
if (addr->sa_family == AF_INET6) {
RM_PRINTF("WARNING (continued) got 'accept' for AF_INET6!\n");
RM_PRINTF("WARNING: (continued) got 'accept' for AF_INET6!\n");
}
return new_fd;
}

// Perform BPF setup (only once for all flows in this process).
if (!setup) {
skel = ratemon_maps_bpf__open_and_load();
if (skel == NULL) {
RM_PRINTF("ERROR: failed to open/load 'ratemon_maps' BPF skeleton\n");
if (!read_env_uint(RM_MAX_ACTIVE_FLOWS_KEY, &max_active_flows)) {
return new_fd;
}
if (!read_env_uint(RM_EPOCH_US_KEY, &epoch_us)) {
return new_fd;
}

int pinned_map_fd = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH);

// int err = bpf_map__reuse_fd(skel->maps.flow_to_rwnd, pinned_map_fd);
// if (err) {
// RM_PRINTF("ERROR when reusing map FD\n");
// return new_fd;
// }

// flow_to_rwnd = skel->maps.flow_to_rwnd;
flow_to_rwnd_fd = pinned_map_fd;
RM_PRINTF("Successfully reused map FD\n");
int err = bpf_obj_get(RM_FLOW_TO_RWND_PIN_PATH);
if (err == -1) {
RM_PRINTF("ERROR: failed to get FD for 'flow_to_rwnd'\n");
return new_fd;
}
flow_to_rwnd_fd = err;
RM_PRINTF("INFO: successfully looked up 'flow_to_rwnd' FD\n");
setup = true;

RM_PRINTF("INFO: max_active_flows=%u epoch_us=%u\n", max_active_flows,
epoch_us);
}

// Hack for iperf3. The first flow is a control flow that should not be
// scheduled. Note that for this hack to work, libratemon_interp must be
// restarted between tests.
if (fd_to_flow.size() == 0 && !skipped_first) {
RM_PRINTF("WARNING skipping first flow\n");
RM_PRINTF("WARNING: skipping first flow\n");
skipped_first = true;
return new_fd;
}

// Set the CCA and make sure it was set correctly.
if (setsockopt(new_fd, SOL_TCP, TCP_CONGESTION, RM_BPF_CUBIC,
strlen(RM_BPF_CUBIC)) == -1) {
RM_PRINTF("ERROR in 'setsockopt' TCP_CONGESTION\n");
RM_PRINTF("ERROR: failed to 'setsockopt' TCP_CONGESTION\n");
return new_fd;
}
char retrieved_cca[32];
socklen_t retrieved_cca_len = sizeof(retrieved_cca);
if (getsockopt(new_fd, SOL_TCP, TCP_CONGESTION, retrieved_cca,
&retrieved_cca_len) == -1) {
RM_PRINTF("ERROR in 'getsockopt' TCP_CONGESTION\n");
RM_PRINTF("ERROR: failed to 'getsockopt' TCP_CONGESTION\n");
return new_fd;
}
if (strcmp(retrieved_cca, RM_BPF_CUBIC)) {
RM_PRINTF("ERROR when setting CCA to %s! Actual CCA is: %s\n", RM_BPF_CUBIC,
retrieved_cca);
RM_PRINTF("ERROR: failed to set CCA to %s! Actual CCA is: %s\n",
RM_BPF_CUBIC, retrieved_cca);
return new_fd;
}

Expand All @@ -239,15 +226,15 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
// Get the local IP and port.
if (getsockname(new_fd, (struct sockaddr *)&local_addr, &local_addr_len) ==
-1) {
RM_PRINTF("ERROR when calling 'getsockname'\n");
RM_PRINTF("ERROR: failed to call 'getsockname'\n");
return -1;
}
struct sockaddr_in remote_addr;
socklen_t remote_addr_len = sizeof(remote_addr);
// Get the peer's (i.e., the remote) IP and port.
if (getpeername(new_fd, (struct sockaddr *)&remote_addr, &remote_addr_len) ==
-1) {
RM_PRINTF("ERROR when calling 'getpeername'\n");
RM_PRINTF("ERROR: failed to call 'getpeername'\n");
return -1;
}
// Fill in the four-tuple.
Expand All @@ -257,7 +244,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
.remote_port = ntohs(remote_addr.sin_port)};
// RM_PRINTF("flow: %u:%u->%u:%u\n", flow.remote_addr, flow.remote_port,
// flow.local_addr, flow.local_port);
fd_to_flow.insert({new_fd, flow});
fd_to_flow[new_fd] = flow;

lock.lock();

Expand All @@ -274,7 +261,7 @@ int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {

lock.unlock();

RM_PRINTF("Successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd);
RM_PRINTF("INFO: successful 'accept' for FD=%d, got FD=%d\n", sockfd, new_fd);
return new_fd;
}

Expand All @@ -283,26 +270,24 @@ extern "C" {
int close(int sockfd) {
static int (*real_close)(int) = (int (*)(int))dlsym(RTLD_NEXT, "close");
if (real_close == NULL) {
RM_PRINTF("ERROR when querying dlsym for 'close': %s\n", dlerror());
RM_PRINTF("ERROR: failed to query dlsym for 'close': %s\n", dlerror());
return -1;
}
int ret = real_close(sockfd);
if (ret == -1) {
RM_PRINTF("ERROR in real 'close'\n");
RM_PRINTF("ERROR: real 'close' failed\n");
return ret;
}

// To get the flow struct for this FD, we must use visit() to look it up
// in the concurrent_flat_map. Obviously, do this before removing the FD
// from fd_to_flow.
fd_to_flow.visit(sockfd, [](const auto &p) {
bpf_map_delete_elem(flow_to_rwnd_fd, &p.second);
});
bpf_map_delete_elem(flow_to_rwnd_fd, &(fd_to_flow[sockfd]));
// Removing the FD from fd_to_flow triggers it to be (eventually) removed from
// scheduling.
fd_to_flow.erase(sockfd);

RM_PRINTF("Successful 'close' for FD=%d\n", sockfd);
RM_PRINTF("INFO: successful 'close' for FD=%d\n", sockfd);
return ret;
}
}
2 changes: 0 additions & 2 deletions ratemon/runtime/ratemon_maps.bpf.c

This file was deleted.

0 comments on commit c711254

Please sign in to comment.