Skip to content

Commit

Permalink
dd
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed May 29, 2024
1 parent af40521 commit f454f7e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 28 deletions.
4 changes: 2 additions & 2 deletions SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ messaging_objects = env.SharedObject([
messaging = env.Library('messaging', messaging_objects)
Depends('messaging/impl_zmq.cc', services_h)

env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging, 'zmq', 'rt', 'pthread', common])
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging, 'zmq', 'pthread', common])
Depends('messaging/bridge.cc', services_h)

messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common, 'rt'])
messaging_python = envCython.Program('messaging/messaging_pyx.so', 'messaging/messaging_pyx.pyx', LIBS=envCython["LIBS"]+[messaging, "zmq", common])

# Build Vision IPC
vipc_sources = [
Expand Down
41 changes: 19 additions & 22 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
const char *SEMAPHORE_NAME = "mysemphore";

SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name) {
shm_size = size + sizeof(bool);
shm_size = sizeof(SharedMemoryHeader) + size;
std::string full_path = "/dev/shm/";
const char* prefix = std::getenv("OPENPILOT_PREFIX");
if (prefix) {
Expand All @@ -31,35 +31,26 @@ SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name)
assert(ret != -1);
shm_ptr = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
assert(shm_ptr != MAP_FAILED);
}

SharedMemory::~SharedMemory() {
munmap(shm_ptr, shm_size);
::close(shm_fd);
}

msgq_context::msgq_context() : shm("msgq_poll", sizeof(msgq_context_t)) {
// ctx = shm.getObject<msgq_context_t>();
sem_t *sem = sem_open(SEMAPHORE_NAME, O_CREAT, 0644, 1);
assert(sem != SEM_FAILED);
sem_wait(sem); // Lock semaphore
ctx = (msgq_context_t *)shm.shm_ptr;
bool *initialized = (bool *)((char*)shm.shm_ptr + shm.shm_size - sizeof(bool));
if (!(*initialized)) {
header = (SharedMemoryHeader *)shm_ptr;
if (!header->initialized) {
pthread_mutexattr_t mutex_attr;
pthread_condattr_t cond_attr;

pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(ctx->mutex), &mutex_attr);
pthread_mutex_init(&(header->mutex), &mutex_attr);

pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&(ctx->cond), &cond_attr);
pthread_cond_init(&(header->cond), &cond_attr);

pthread_mutexattr_destroy(&mutex_attr);
pthread_condattr_destroy(&cond_attr);
*initialized = true;
header->initialized = true;
printf("not initialized\n");
} else {
printf("initialized\n");
Expand All @@ -68,6 +59,15 @@ msgq_context::msgq_context() : shm("msgq_poll", sizeof(msgq_context_t)) {
sem_close(sem);
}

SharedMemory::~SharedMemory() {
munmap(shm_ptr, shm_size);
::close(shm_fd);
}

msgq_context::msgq_context() : shm("msgq_poll", 0) {
ctx = (SharedMemoryHeader *)shm.shm_ptr;
}

msgq_context msgq_context_;

uint64_t msgq_get_uid(void){
Expand Down Expand Up @@ -126,7 +126,7 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
// printf("new queue %s\n", path);
q->shm = new SharedMemory(path, size + sizeof(msgq_header_t));
msgq_header_t *header = (msgq_header_t*)q->shm->shm_ptr;//<msgq_header_t>();
msgq_header_t *header = (msgq_header_t*)(q->shm->header + 1);

// Setup pointers to header segment
q->num_readers = reinterpret_cast<std::atomic<uint64_t>*>(&header->num_readers);
Expand Down Expand Up @@ -427,13 +427,10 @@ int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
int ms = (timeout == -1) ? 100 : timeout;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += ms / 1000;
ts.tv_nsec += (ms % 1000) * 1000000;
uint64_t ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec + ms * 1000000;
ts.tv_sec = ns / 1000000000ULL;
ts.tv_nsec = ms % 1000000000ULL;

if (ts.tv_nsec >= 1000000000) {
ts.tv_sec += 1;
ts.tv_nsec -= 1000000000;
}
while (num == 0) {
pthread_mutex_lock(&(msgq_context_.ctx->mutex));
pthread_cond_timedwait(&(msgq_context_.ctx->cond), &(msgq_context_.ctx->mutex), &ts);
Expand Down
10 changes: 6 additions & 4 deletions messaging/msgq.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
#define UNPACK64(higher, lower, input) do {uint64_t tmp = input; higher = tmp >> 32; lower = tmp & 0xFFFFFFFF;} while (0)
#define PACK64(output, higher, lower) output = ((uint64_t)higher << 32) | ((uint64_t)lower & 0xFFFFFFFF)

struct msgq_context_t {
struct SharedMemoryHeader {
pthread_mutex_t mutex;
pthread_cond_t cond;
bool initialized;
};

class SharedMemory {
public:
SharedMemory(const std::string &name, size_t size);
~SharedMemory();
void *shm_ptr;
size_t shm_size;
void *shm_ptr;
size_t shm_size;
SharedMemoryHeader *header;

private:
std::string shm_name;
Expand All @@ -33,7 +35,7 @@ class SharedMemory {
struct msgq_context {
msgq_context();
SharedMemory shm;
msgq_context_t *ctx;
SharedMemoryHeader *ctx;
};

extern msgq_context msgq_context_;
Expand Down

0 comments on commit f454f7e

Please sign in to comment.