Skip to content

Commit

Permalink
dd
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed Jul 2, 2024
1 parent 74074d6 commit 254e740
Showing 1 changed file with 115 additions and 22 deletions.
137 changes: 115 additions & 22 deletions msgq/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
#include <chrono>
#include <algorithm>
#include <cstdlib>
#include <csignal>
#include <random>
#include <string>
#include <limits>

// #include <stdatomic.h>
#include <poll.h>
#include <linux/futex.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
Expand All @@ -25,9 +25,112 @@

#include "msgq/msgq.h"

void sigusr2_handler(int signal) {
assert(signal == SIGUSR2);



#include <linux/futex.h>
#include <syscall.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <sys/wait.h>
#include <time.h>
int futex_wait(int *futexp, int val, int timeout_ms) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (timeout_ms / 1000);
ts.tv_nsec += (timeout_ms % 1000) * 1000000;
if (ts.tv_nsec >= 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
return syscall(SYS_futex, futexp, FUTEX_WAIT, val, &ts, NULL, 0);
}

int futex_wake(int *futexp, int val) {
return syscall(SYS_futex, futexp, FUTEX_WAKE, val, NULL, NULL, 0);
}

struct Futex {
Futex() {
std::string full_path = "/dev/shm/openpilot_futex";
auto fd = open(full_path.c_str(), O_RDWR | O_CREAT, 0664);
if (fd < 0) {
std::cout << "Warning, could not open: " << full_path << std::endl;
assert(0);
}

int rc = ftruncate(fd, sizeof(int));
if (rc < 0){
close(fd);
assert(0);
}
futex = (int*)mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
close(fd);

if (futex == NULL){
assert(0);
}
}

void post() {
printf("begin post\n");
// while (1) {
__atomic_exchange_n(futex, 1, __ATOMIC_ACQUIRE);
int futex_rc = futex_wake(futex, 1);
if (futex_rc == -1) {
perror("futex wake");
exit(1);
}
// }
printf("posted %d\n", futex_rc);
}

void wait(int timeout_ms) {
while (1) {
printf("begin wait %d\n", timeout_ms);
int futex_rc = futex_wait(futex, 1, timeout_ms);
printf("wait fu %d\n", futex_rc);
if (futex_rc == -1) {
if (errno != EAGAIN) {
perror("futex");
exit(1);
}
} else if (futex_rc == 0) {
if (*futex == 1) {
// This is a real wakeup.
return;
}
} else {
abort();
}
}
}

// int acquire(int timeout_ms) {
// // Acquire the futex
// while (__atomic_exchange_n(futex, 1, __ATOMIC_ACQUIRE) != 0) {
// if (futex_wait(futex, 1, timeout_ms) == -1 && errno == ETIMEDOUT) {
// printf("timeout\n");
// return -1;
// }
// }
// return 0;
// }

~Futex() {
munmap(futex, sizeof(int));
}
int *futex = nullptr;
};

Futex g_futex;

uint64_t msgq_get_uid(void){
std::random_device rd("/dev/urandom");
Expand Down Expand Up @@ -85,7 +188,6 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR2, sigusr2_handler);

std::string full_path = "/dev/shm/";
const char* prefix = std::getenv("OPENPILOT_PREFIX");
Expand Down Expand Up @@ -159,12 +261,7 @@ void msgq_init_publisher(msgq_queue_t * q) {
}

static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
g_futex.post();
}

void msgq_init_subscriber(msgq_queue_t * q) {
Expand Down Expand Up @@ -430,28 +527,24 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
}

int ms = (timeout == -1) ? 100 : timeout;
struct timespec ts;
ts.tv_sec = ms / 1000;
ts.tv_nsec = (ms % 1000) * 1000 * 1000;


while (num == 0) {
int ret;

ret = nanosleep(&ts, &ts);
// int ret;

g_futex.wait(ms);
printf("has lock\n");
// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){
num += 1;
items[i].revents = 1;
}
}
// g_futex.post();

// exit if we had a timeout and the sleep finished
if (timeout != -1 && ret == 0){
break;
}
// if (timeout != -1 && ret == 0){
// break;
// }
}

return num;
Expand Down

0 comments on commit 254e740

Please sign in to comment.