diff --git a/messaging/impl_msgq.cc b/messaging/impl_msgq.cc index 8f2c10a08..0b6165579 100644 --- a/messaging/impl_msgq.cc +++ b/messaging/impl_msgq.cc @@ -1,7 +1,6 @@ #include #include #include -#include #include #include @@ -9,13 +8,6 @@ #include "cereal/messaging/impl_msgq.h" -volatile sig_atomic_t msgq_do_exit = 0; - -void sig_handler(int signal) { - assert(signal == SIGINT || signal == SIGTERM); - msgq_do_exit = 1; -} - static bool service_exists(std::string path){ return services.count(path) > 0; } @@ -81,59 +73,27 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a Message * MSGQSubSocket::receive(bool non_blocking){ - msgq_do_exit = 0; - - void (*prev_handler_sigint)(int); - void (*prev_handler_sigterm)(int); - if (!non_blocking){ - prev_handler_sigint = std::signal(SIGINT, sig_handler); - prev_handler_sigterm = std::signal(SIGTERM, sig_handler); - } - msgq_msg_t msg; - - MSGQMessage *r = NULL; - int rc = msgq_msg_recv(&msg, q); - // Hack to implement blocking read with a poller. Don't use this - while (!non_blocking && rc == 0 && msgq_do_exit == 0){ - msgq_pollitem_t items[1]; - items[0].q = q; - - int t = (timeout != -1) ? timeout : 100; - - int n = msgq_poll(items, 1, t); - rc = msgq_msg_recv(&msg, q); - - // The poll indicated a message was ready, but the receive failed. Try again - if (n == 1 && rc == 0){ - continue; - } - - if (timeout != -1){ - break; + if (rc == 0 && !non_blocking) { + int ms = (timeout == -1) ? 100 : timeout; + struct timespec ts = {ms / 1000, (ms % 1000) * 1000 * 1000}; + while (rc == 0) { + int err = nanosleep(&ts, &ts); + rc = msgq_msg_recv(&msg, q); + if (err == 0) { + break; + } } } - - if (!non_blocking){ - std::signal(SIGINT, prev_handler_sigint); - std::signal(SIGTERM, prev_handler_sigterm); - } - - errno = msgq_do_exit ? EINTR : 0; - if (rc > 0){ - if (msgq_do_exit){ - msgq_msg_close(&msg); // Free unused message on exit - } else { - r = new MSGQMessage; - r->takeOwnership(msg.data, msg.size); - } + MSGQMessage *r = new MSGQMessage; + r->takeOwnership(msg.data, msg.size); + return r; } - - return (Message*)r; + return nullptr; } void MSGQSubSocket::setTimeout(int t){