Skip to content

Commit

Permalink
dd
Browse files Browse the repository at this point in the history
  • Loading branch information
deanlee committed May 30, 2024
1 parent 31da0f3 commit 11bade6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 38 deletions.
64 changes: 28 additions & 36 deletions messaging/msgq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <fcntl.h>
#include <unistd.h>

const char *SEMAPHORE_NAME = "mysemphore";
constexpr const char *SEMAPHORE_NAME = "/op_init_lock_semphore";

SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name) {
const char* prefix = std::getenv("OPENPILOT_PREFIX");
Expand All @@ -33,10 +33,20 @@ SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name)
shm_ptr = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
assert(shm_ptr != MAP_FAILED);

initMutexCond();
}

SharedMemory::~SharedMemory() {
munmap(shm_ptr, shm_size);
::close(shm_fd);
}

void SharedMemory::initMutexCond() {
sem_t *sem = sem_open(SEMAPHORE_NAME, O_CREAT, 0644, 1);
assert(sem != SEM_FAILED);
sem_wait(sem); // Lock semaphore

// Initialize the header if it hasn't been initialized yet
header = (SharedMemoryHeader *)shm_ptr;
if (!header->initialized) {
pthread_mutexattr_t mutex_attr;
Expand All @@ -57,15 +67,10 @@ SharedMemory::SharedMemory(const std::string &name, size_t size) :shm_name(name)
header->initialized = true;
}

sem_post(sem); // Unlock semaphore
sem_post(sem);
sem_close(sem);
}

SharedMemory::~SharedMemory() {
munmap(shm_ptr, shm_size);
::close(shm_fd);
}

void SharedMemory::notifyAll() {
pthread_cond_broadcast(&(header->cond));
}
Expand Down Expand Up @@ -142,7 +147,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){

int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
q->shm = new SharedMemory(path, size + sizeof(msgq_header_t));
q->shm = std::make_unique<SharedMemory>(path, size + sizeof(msgq_header_t));
msgq_header_t *header = (msgq_header_t*)(q->shm->header + 1);

// Setup pointers to header segment
Expand All @@ -166,9 +171,7 @@ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
return 0;
}

void msgq_close_queue(msgq_queue_t *q){
delete q->shm;
}
void msgq_close_queue(msgq_queue_t *q) {}

void msgq_init_publisher(msgq_queue_t * q) {
//std::cout << "Starting publisher" << std::endl;
Expand Down Expand Up @@ -429,37 +432,26 @@ int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){
return msg->size;
}

int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){
int msgq_poll(msgq_pollitem_t *items, size_t nitems, int timeout) {
int num = 0;
// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
items[i].revents = msgq_msg_ready(items[i].q);
if (items[i].revents) num++;
}

int ms = (timeout == -1) ? 100 : timeout;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
uint64_t ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec + ms * 1000000;
ts.tv_sec = ns / 1000000000ULL;
ts.tv_nsec = ns % 1000000000ULL;

while (num == 0) {
if (!msgq_context_.shm.waitFor(&ts))
continue;

while (true) {
// Check if messages ready
for (size_t i = 0; i < nitems; i++) {
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){
num += 1;
items[i].revents = 1;
}
for (size_t i = 0; i < nitems; ++i) {
items[i].revents = msgq_msg_ready(items[i].q);
if (items[i].revents) ++num;
}
if (num > 0 || timeout == 0) break;

// exit if we had a timeout and the sleep finished
if (timeout != -1){
break;
}
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
uint64_t ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec + ms * 1000000;
ts.tv_sec = ns / 1000000000ULL;
ts.tv_nsec = ns % 1000000000ULL;

// Wait until messages are ready or timeout occurs
if (!msgq_context_.shm.waitFor(&ts) && timeout != -1) break;
}
return num;
}
Expand Down
5 changes: 3 additions & 2 deletions messaging/msgq.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstring>
#include <string>
#include <atomic>
#include <memory>

#define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024)
#define NUM_READERS 12
Expand All @@ -29,14 +30,14 @@ class SharedMemory {
SharedMemoryHeader *header;

private:
void initMutexCond();
std::string shm_name;
int shm_fd;

};

struct msgq_context {
msgq_context();
~msgq_context() {printf("msgq_context closed*************\n");}
SharedMemory shm;
SharedMemoryHeader *ctx;
};
Expand Down Expand Up @@ -69,7 +70,7 @@ struct msgq_queue_t {

bool read_conflate;
std::string endpoint;
SharedMemory *shm;
std::unique_ptr<SharedMemory> shm;
};

struct msgq_msg_t {
Expand Down

0 comments on commit 11bade6

Please sign in to comment.