Skip to content

Commit

Permalink
use futex
Browse files Browse the repository at this point in the history
dd

dd

dd

dd

dd

dd

dd
  • Loading branch information
deanlee committed Jul 4, 2024
1 parent 74074d6 commit 4d275a3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 41 deletions.
1 change: 1 addition & 0 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ msgq_objects = env.SharedObject([
'msgq/impl_zmq.cc',
'msgq/impl_msgq.cc',
'msgq/impl_fake.cc',
'msgq/futex.cc',
'msgq/msgq.cc',
])
msgq = env.Library('msgq', msgq_objects)
Expand Down
40 changes: 40 additions & 0 deletions msgq/futex.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#include "msgq/futex.h"

#include <cassert>
#include <syscall.h>
#include <fcntl.h>
// #include <stdlib.h>
#include <unistd.h>
#include <linux/futex.h>
#include <stdio.h>
#include <limits.h>
#include <sys/mman.h>

Futex::Futex(const std::string &path) {
auto fd = open(path.c_str(), O_RDWR | O_CREAT, 0664);
assert(fd >= 0);
int rc = ftruncate(fd, sizeof(int));
assert(rc >= 0);
int *mem = (int*)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);
futex = (std::atomic<int>*)(mem);
}

Futex::~Futex() {
munmap(futex, sizeof(int));
}

void Futex::broadcast() {
*futex += 1;
syscall(SYS_futex, futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

int Futex::wait(int expected, struct timespec *timeout) {
if ((*futex) == expected) {
int ret = syscall(SYS_futex, futex, FUTEX_WAIT, expected, timeout, NULL, 0);
if (ret == -1 && errno == ETIMEDOUT) {
return -1; // Timeout occurred
}
}
return 0;
}
16 changes: 16 additions & 0 deletions msgq/futex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <atomic>
#include <string>

class Futex {
public:
Futex(const std::string &path);
~Futex();
void broadcast();
int wait(int expected, struct timespec *timeout);
inline int value() const { return futex->load(); }

private:
std::atomic<int> *futex = nullptr;
};
52 changes: 11 additions & 41 deletions msgq/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,20 @@
#include <cerrno>
#include <cmath>
#include <cstring>
#include <cstdint>
#include <chrono>
#include <algorithm>
#include <cstdlib>
#include <csignal>
#include <random>
#include <string>
#include <limits>

#include <poll.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <fcntl.h>
#include <unistd.h>

#include <stdio.h>

#include "msgq/msgq.h"
#include "msgq/futex.h"

void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);
}
Futex g_futex("/dev/shm/msgq_futex");

uint64_t msgq_get_uid(void){
std::random_device rd("/dev/urandom");
Expand Down Expand Up @@ -85,7 +74,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);

std::string full_path = "/dev/shm/";
const char* prefix = std::getenv("OPENPILOT_PREFIX");
Expand Down Expand Up @@ -158,15 +146,6 @@ void msgq_init_publisher(msgq_queue_t * q) {
q->write_uid_local = uid;
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}

void msgq_init_subscriber(msgq_queue_t * q) {
assert(q != NULL);
assert(q->num_readers != NULL);
Expand All @@ -185,14 +164,11 @@ void msgq_init_subscriber(msgq_queue_t * q) {

for (size_t i = 0; i < NUM_READERS; i++){
*q->read_valids[i] = false;

uint64_t old_uid = *q->read_uids[i];
*q->read_uids[i] = 0;

// Wake up reader in case they are in a poll
thread_signal(old_uid & 0xFFFFFFFF);
}

// Wake up reader
g_futex.broadcast();
continue;
}

Expand Down Expand Up @@ -293,10 +269,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
PACK64(*q->write_pointer, write_cycles, new_ptr);

// Notify readers
for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i];
thread_signal(reader_uid & 0xFFFFFFFF);
}
g_futex.broadcast();

return msg->size;
}
Expand Down Expand Up @@ -433,12 +406,14 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
struct timespec ts;
ts.tv_sec = ms / 1000;
ts.tv_nsec = (ms % 1000) * 1000 * 1000;


int last_value = 0;
while (num == 0) {
int ret;

ret = nanosleep(&ts, &ts);
int ret = g_futex.wait(last_value, &ts);
if (ret == -1) {
break;
}
last_value = g_futex.value();

// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
Expand All @@ -447,11 +422,6 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
items[i].revents = 1;
}
}

// exit if we had a timeout and the sleep finished
if (timeout != -1 && ret == 0){
break;
}
}

return num;
Expand Down

0 comments on commit 4d275a3

Please sign in to comment.