Skip to content

Commit

Permalink
dd
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed May 29, 2024
1 parent ec62744 commit 2d59387
Showing 1 changed file with 11 additions and 68 deletions.
79 changes: 11 additions & 68 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,6 @@
#include "cereal/services.h"
#include "cereal/messaging/impl_msgq.h"


volatile sig_atomic_t msgq_do_exit = 0;

void sig_handler(int signal) {
assert(signal == SIGINT || signal == SIGTERM);
msgq_do_exit = 1;
}

static bool service_exists(std::string path){
return services.count(path) > 0;
}


MSGQContext::MSGQContext() {
}

Expand Down Expand Up @@ -81,63 +68,28 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a

#include <unistd.h>
Message * MSGQSubSocket::receive(bool non_blocking){
// msgq_do_exit = 0;

// void (*prev_handler_sigint)(int);
// void (*prev_handler_sigterm)(int);
// if (!non_blocking){
// prev_handler_sigint = std::signal(SIGINT, sig_handler);
// prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
// }
// Set up the signal set to block SIGINT
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
// sigaddset(&mask, SIGUSR2);
// sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGTERM);
sigprocmask(SIG_BLOCK, &mask, nullptr);
msgq_msg_t msg;

MSGQMessage *r = NULL;

int rc = msgq_msg_recv(&msg, q);

// Hack to implement blocking read with a poller. Don't use this
while (!non_blocking && rc == 0 && !msgq_do_exit){
msgq_pollitem_t items[1];
items[0].q = q;

while (!non_blocking) {
int t = (timeout != -1) ? timeout : 100;

// int n = msgq_poll(items, 1, t);
// struct timespec ts;
// ts.tv_sec = t / 1000;
// ts.tv_nsec = (t % 1000) * 1000 * 1000;
// int dd = nanosleep(&ts, &ts);
// printf("sleep %d %d\n", dd, errno);

// sigset_t pending;
// sigpending(&pending);
// if (sigismember(&pending, SIGINT)) {
// assert(0);
// }
// int signo = SIGINT;
struct timespec ts;
ts.tv_sec = t / 1000;
ts.tv_nsec = (t % 1000) * 1000 * 1000;
// nanosleep(&ts, &ts);

// struct timespec ts1 = {0, 0};
struct timespec ts;
ts.tv_sec = t / 1000;
ts.tv_nsec = (t % 1000) * 1000 * 1000;
siginfo_t siginfo;
int i = sigtimedwait(&mask, &siginfo, &ts);
if (i == SIGINT) {
msgq_do_exit = true;
// assert(0);
} else if (i == -1 && errno == EINTR) {
// assert(0);
if (i == SIGINT || i == SIGTERM) {
raise(i);
break;
}
printf("sigtimedwait %d\n", i);
rc = msgq_msg_recv(&msg, q);
rc = msgq_msg_recv(&msg, q);

// The poll indicated a message was ready, but the receive failed. Try again
if (rc == 0){
Expand All @@ -150,19 +102,10 @@ Message * MSGQSubSocket::receive(bool non_blocking){
}

sigprocmask(SIG_UNBLOCK, &mask, nullptr);
if (msgq_do_exit) {
// kill(getpid(), SIGINT);
raise(SIGINT);
}
if (rc > 0){
if (msgq_do_exit){
msgq_msg_close(&msg); // Free unused message on exit
} else {
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}

return (Message*)r;
}

Expand Down

0 comments on commit 2d59387

Please sign in to comment.