Skip to content

Commit

Permalink
improve receive
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed May 27, 2024
1 parent 51cbf62 commit bfcf6d7
Showing 1 changed file with 13 additions and 53 deletions.
66 changes: 13 additions & 53 deletions messaging/impl_msgq.cc
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
#include <cassert>
#include <cstring>
#include <iostream>
#include <cstdlib>
#include <csignal>
#include <cerrno>

#include "cereal/services.h"
#include "cereal/messaging/impl_msgq.h"


volatile sig_atomic_t msgq_do_exit = 0;

void sig_handler(int signal) {
assert(signal == SIGINT || signal == SIGTERM);
msgq_do_exit = 1;
}

static bool service_exists(std::string path){
return services.count(path) > 0;
}
Expand Down Expand Up @@ -81,59 +73,27 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a


Message * MSGQSubSocket::receive(bool non_blocking){
msgq_do_exit = 0;

void (*prev_handler_sigint)(int);
void (*prev_handler_sigterm)(int);
if (!non_blocking){
prev_handler_sigint = std::signal(SIGINT, sig_handler);
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
}

msgq_msg_t msg;

MSGQMessage *r = NULL;

int rc = msgq_msg_recv(&msg, q);

// Hack to implement blocking read with a poller. Don't use this
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
msgq_pollitem_t items[1];
items[0].q = q;

int t = (timeout != -1) ? timeout : 100;

int n = msgq_poll(items, 1, t);
rc = msgq_msg_recv(&msg, q);

// The poll indicated a message was ready, but the receive failed. Try again
if (n == 1 && rc == 0){
continue;
}

if (timeout != -1){
break;
if (rc == 0 && !non_blocking) {
int ms = (timeout == -1) ? 100 : timeout;
struct timespec ts = {ms / 1000, (ms % 1000) * 1000 * 1000};
while (rc == 0) {
int err = nanosleep(&ts, &ts);
rc = msgq_msg_recv(&msg, q);
if (err == 0) {
break;
}
}
}


if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm);
}

errno = msgq_do_exit ? EINTR : 0;

if (rc > 0){
if (msgq_do_exit){
msgq_msg_close(&msg); // Free unused message on exit
} else {
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
MSGQMessage *r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
return r;
}

return (Message*)r;
return nullptr;
}

void MSGQSubSocket::setTimeout(int t){
Expand Down

0 comments on commit bfcf6d7

Please sign in to comment.