diff --git a/SConscript b/SConscript index 4e1ed6ae2..726ac018a 100644 --- a/SConscript +++ b/SConscript @@ -37,10 +37,10 @@ messaging_objects = env.SharedObject([ messaging = env.Library('messaging', messaging_objects) Depends('messaging/impl_zmq.cc', services_h) -env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging, 'zmq', 'rt', 'pthread', common]) +env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging, 'zmq', 'pthread', common]) Depends('messaging/bridge.cc', services_h) -messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common, 'rt']) +messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common]) # Build Vision IPC vipc_sources = [ diff --git a/messaging/msgq.cc b/messaging/msgq.cc index 06afeed4f..229af4d4e 100644 --- a/messaging/msgq.cc +++ b/messaging/msgq.cc @@ -19,7 +19,7 @@ const char *SEMAPHORE_NAME = "mysemphore"; SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name) { - shm_size = size + sizeof(bool); + shm_size = sizeof(SharedMemoryHeader) + size; std::string full_path = "/dev/shm/"; const char* prefix = std::getenv("OPENPILOT_PREFIX"); if (prefix) { @@ -31,35 +31,26 @@ SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name) assert(ret != -1); shm_ptr = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); assert(shm_ptr != MAP_FAILED); -} -SharedMemory::~SharedMemory() { - munmap(shm_ptr, shm_size); - ::close(shm_fd); -} - -msgq_context::msgq_context() : shm("msgq_poll", sizeof(msgq_context_t)) { - // ctx = shm.getObject(); sem_t *sem = sem_open(SEMAPHORE_NAME, O_CREAT, 0644, 1); assert(sem != SEM_FAILED); sem_wait(sem); // Lock semaphore - ctx = (msgq_context_t *)shm.shm_ptr; - bool *initialized = (bool *)((char*)shm.shm_ptr + shm.shm_size - sizeof(bool)); - if (!(*initialized)) { + header = (SharedMemoryHeader *)shm_ptr; + if (!header->initialized) { pthread_mutexattr_t mutex_attr; pthread_condattr_t cond_attr; pthread_mutexattr_init(&mutex_attr); pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&(ctx->mutex), &mutex_attr); + pthread_mutex_init(&(header->mutex), &mutex_attr); pthread_condattr_init(&cond_attr); pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(&(ctx->cond), &cond_attr); + pthread_cond_init(&(header->cond), &cond_attr); pthread_mutexattr_destroy(&mutex_attr); pthread_condattr_destroy(&cond_attr); - *initialized = true; + header->initialized = true; printf("not initialized\n"); } else { printf("initialized\n"); @@ -68,6 +59,15 @@ msgq_context::msgq_context() : shm("msgq_poll", sizeof(msgq_context_t)) { sem_close(sem); } +SharedMemory::~SharedMemory() { + munmap(shm_ptr, shm_size); + ::close(shm_fd); +} + +msgq_context::msgq_context() : shm("msgq_poll", 0) { + ctx = (SharedMemoryHeader *)shm.shm_ptr; +} + msgq_context msgq_context_; uint64_t msgq_get_uid(void){ @@ -126,7 +126,7 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes // printf("new queue %s\n", path); q->shm = new SharedMemory(path, size + sizeof(msgq_header_t)); - msgq_header_t *header = (msgq_header_t*)q->shm->shm_ptr;//(); + msgq_header_t *header = (msgq_header_t*)(q->shm->header + 1); // Setup pointers to header segment q->num_readers = reinterpret_cast*>(&header->num_readers); @@ -427,13 +427,10 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ int ms = (timeout == -1) ? 100 : timeout; struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); - ts.tv_sec += ms / 1000; - ts.tv_nsec += (ms % 1000) * 1000000; + uint64_t ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec + ms * 1000000; + ts.tv_sec = ns / 1000000000ULL; + ts.tv_nsec = ms % 1000000000ULL; - if (ts.tv_nsec >= 1000000000) { - ts.tv_sec += 1; - ts.tv_nsec -= 1000000000; - } while (num == 0) { pthread_mutex_lock(&(msgq_context_.ctx->mutex)); pthread_cond_timedwait(&(msgq_context_.ctx->cond), &(msgq_context_.ctx->mutex), &ts); diff --git a/messaging/msgq.h b/messaging/msgq.h index bca71e58b..d10bc5f4d 100644 --- a/messaging/msgq.h +++ b/messaging/msgq.h @@ -12,17 +12,19 @@ #define UNPACK64(higher, lower, input) do {uint64_t tmp = input; higher = tmp >> 32; lower = tmp & 0xFFFFFFFF;} while (0) #define PACK64(output, higher, lower) output = ((uint64_t)higher << 32) | ((uint64_t)lower & 0xFFFFFFFF) -struct msgq_context_t { +struct SharedMemoryHeader { pthread_mutex_t mutex; pthread_cond_t cond; + bool initialized; }; class SharedMemory { public: SharedMemory(const std::string &name, size_t size); ~SharedMemory(); - void *shm_ptr; - size_t shm_size; + void *shm_ptr; + size_t shm_size; + SharedMemoryHeader *header; private: std::string shm_name; @@ -33,7 +35,7 @@ class SharedMemory { struct msgq_context { msgq_context(); SharedMemory shm; - msgq_context_t *ctx; + SharedMemoryHeader *ctx; }; extern msgq_context msgq_context_;