diff --git a/SConscript b/SConscript index 147eb304..85c5dbfc 100644 --- a/SConscript +++ b/SConscript @@ -12,6 +12,7 @@ msgq_objects = env.SharedObject([ 'msgq/impl_zmq.cc', 'msgq/impl_msgq.cc', 'msgq/impl_fake.cc', + 'msgq/futex.cc', 'msgq/msgq.cc', ]) msgq = env.Library('msgq', msgq_objects) diff --git a/msgq/futex.cc b/msgq/futex.cc new file mode 100644 index 00000000..035b678d --- /dev/null +++ b/msgq/futex.cc @@ -0,0 +1,62 @@ +#include "msgq/futex.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +Futex::Futex(const std::string &path) { + auto fd = open(path.c_str(), O_RDWR | O_CREAT, 0664); + if (fd < 0) { + throw std::runtime_error("Failed to open file: " + path); + } + + if (ftruncate(fd, sizeof(uint32_t)) < 0) { + close(fd); + throw std::runtime_error("Failed to truncate file: " + path); + } + + int *mem = (int *)mmap(NULL, sizeof(uint32_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if (mem == MAP_FAILED) { + throw std::runtime_error("Failed to mmap file: " + path); + } + + futex = reinterpret_cast *>(mem); +} + +Futex::~Futex() { + munmap(futex, sizeof(uint32_t)); +} + +void Futex::broadcast() { + // Increment the futex value to signal waiting threads + futex->fetch_add(1, std::memory_order_relaxed); + + // Wake up all threads waiting on the futex + syscall(SYS_futex, futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); +} + +bool Futex::wait(uint32_t expected, int timeout_ms) { + if (futex->load(std::memory_order_relaxed) != expected) { + return true; // Already not equal, no need to wait + } + + if (timeout_ms <= 0) { + return false; // Timeout immediately + } + + // Perform the futex wait syscall + struct timespec ts; + ts.tv_sec = timeout_ms / 1000; + ts.tv_nsec = (timeout_ms % 1000) * 1000 * 1000; + syscall(SYS_futex, futex, FUTEX_WAIT, expected, &ts, nullptr, 0); + + return futex->load(std::memory_order_relaxed) != expected; +} diff --git a/msgq/futex.h b/msgq/futex.h new file mode 100644 index 00000000..c961e481 --- /dev/null +++ b/msgq/futex.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include +#include + + +class Futex { +public: + Futex(const std::string &path); + ~Futex(); + void broadcast(); + bool wait(uint32_t expected, int timeout_ms); + inline uint32_t value() const { return futex->load(); } + +private: + std::atomic *futex = nullptr; +}; diff --git a/msgq/msgq.cc b/msgq/msgq.cc index fed2959b..d5c6bee4 100644 --- a/msgq/msgq.cc +++ b/msgq/msgq.cc @@ -3,31 +3,21 @@ #include #include #include -#include #include #include #include -#include #include #include #include - -#include -#include #include -#include -#include #include #include #include -#include - +#include "msgq/futex.h" #include "msgq/msgq.h" -void sigusr2_handler(int signal) { - assert(signal == SIGUSR2); -} +Futex g_futex("/dev/shm/msgq_futex"); uint64_t msgq_get_uid(void){ std::random_device rd("/dev/urandom"); @@ -85,7 +75,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes - std::signal(SIGUSR2, sigusr2_handler); std::string full_path = "/dev/shm/"; const char* prefix = std::getenv("OPENPILOT_PREFIX"); @@ -142,7 +131,6 @@ void msgq_close_queue(msgq_queue_t *q){ } } - void msgq_init_publisher(msgq_queue_t * q) { //std::cout << "Starting publisher" << std::endl; uint64_t uid = msgq_get_uid(); @@ -158,15 +146,6 @@ void msgq_init_publisher(msgq_queue_t * q) { q->write_uid_local = uid; } -static void thread_signal(uint32_t tid) { - #ifndef SYS_tkill - // TODO: this won't work for multithreaded programs - kill(tid, SIGUSR2); - #else - syscall(SYS_tkill, tid, SIGUSR2); - #endif -} - void msgq_init_subscriber(msgq_queue_t * q) { assert(q != NULL); assert(q->num_readers != NULL); @@ -185,14 +164,11 @@ void msgq_init_subscriber(msgq_queue_t * q) { for (size_t i = 0; i < NUM_READERS; i++){ *q->read_valids[i] = false; - - uint64_t old_uid = *q->read_uids[i]; *q->read_uids[i] = 0; - - // Wake up reader in case they are in a poll - thread_signal(old_uid & 0xFFFFFFFF); } + // Notify readers + g_futex.broadcast(); continue; } @@ -293,10 +269,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ PACK64(*q->write_pointer, write_cycles, new_ptr); // Notify readers - for (uint64_t i = 0; i < num_readers; i++){ - uint64_t reader_uid = *q->read_uids[i]; - thread_signal(reader_uid & 0xFFFFFFFF); - } + g_futex.broadcast(); return msg->size; } @@ -414,42 +387,31 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ goto start; } - return msg->size; } - - -int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ +int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout) { int num = 0; + int timeout_ms = (timeout == -1) ? 100 : timeout; + uint32_t current_futex_value = 0; - // Check if messages ready - for (size_t i = 0; i < nitems; i++) { - items[i].revents = msgq_msg_ready(items[i].q); - if (items[i].revents) num++; - } - - int ms = (timeout == -1) ? 100 : timeout; - struct timespec ts; - ts.tv_sec = ms / 1000; - ts.tv_nsec = (ms % 1000) * 1000 * 1000; - - + auto start_time = std::chrono::high_resolution_clock::now(); while (num == 0) { - int ret; - - ret = nanosleep(&ts, &ts); + if (g_futex.wait(current_futex_value, timeout_ms)) { + current_futex_value = g_futex.value(); - // Check if messages ready - for (size_t i = 0; i < nitems; i++) { - if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ - num += 1; - items[i].revents = 1; + // Check if messages ready + for (size_t i = 0; i < nitems; i++) { + items[i].revents = msgq_msg_ready(items[i].q); + if (items[i].revents) ++num; } } - // exit if we had a timeout and the sleep finished - if (timeout != -1 && ret == 0){ + // Update the remaining timeout + auto current_time = std::chrono::high_resolution_clock::now(); + timeout_ms -= std::chrono::duration_cast(current_time - start_time).count(); + start_time = current_time; + if (timeout_ms <= 0) { break; } }