Skip to content

Commit

Permalink
allows wait thread to be re-launched after shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 11, 2024
1 parent 4c33e6f commit 8c3856d
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 56 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: nanonext
Type: Package
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.3.0.9006
Version: 1.3.0.9007
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
a socket library implementing 'Scalability Protocols', a reliable,
high-performance standard for common communications patterns including
Expand Down
2 changes: 1 addition & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# nanonext 1.3.0.9006 (development)
# nanonext 1.3.0.9007 (development)

#### Updates

Expand Down
110 changes: 56 additions & 54 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -284,60 +284,6 @@ void single_wait_thread_create(SEXP x) {

// # nocov end

static void thread_duo_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_thread_duo *xp = (nano_thread_duo *) NANO_PTR(xptr);
nano_cv *ncv = xp->cv;
if (ncv != NULL) {
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
}
nng_thread_destroy(xp->thr);
R_Free(xp);

}

static void thread_disp_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr);
nano_cv *ncv = xp->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
if (xp->tls != NULL)
nng_tls_config_free(xp->tls);
nng_thread_destroy(xp->thr);
nng_url_free(xp->up);
for (int i = 0; i < xp->n; i++) {
nng_aio_free(xp->saio[i]->aio);
nng_aio_free(xp->raio[i]->aio);
nng_aio_free(xp->haio[i]->aio);
R_Free(xp->saio[i]);
R_Free(xp->raio[i]);
R_Free(xp->haio[i]);
R_Free(xp->url[i]);
}
R_Free(xp->saio);
R_Free(xp->raio);
R_Free(xp->haio);
R_Free(xp->url);
R_Free(xp->online);
nng_cv_free(ncv->cv);
nng_mtx_free(ncv->mtx);
R_Free(xp->cv);
R_Free(xp);

}

static void rnng_wait_thread(void *args) {

while (1) {
Expand Down Expand Up @@ -468,6 +414,7 @@ SEXP rnng_thread_shutdown(void) {
nng_thread_destroy(nano_wait_thr);
nng_cv_free(nano_wait_cv);
nng_mtx_free(nano_wait_mtx);
nano_wait_thr = NULL;
}
return R_NilValue;
}
Expand All @@ -487,6 +434,24 @@ static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) {

}

static void thread_duo_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_thread_duo *xp = (nano_thread_duo *) NANO_PTR(xptr);
nano_cv *ncv = xp->cv;
if (ncv != NULL) {
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
}
nng_thread_destroy(xp->thr);
R_Free(xp);

}

static void rnng_signal_thread(void *args) {

nano_thread_duo *duo = (nano_thread_duo *) args;
Expand Down Expand Up @@ -574,6 +539,43 @@ SEXP rnng_signal_thread_create(SEXP cv, SEXP cv2) {

}


static void thread_disp_finalizer(SEXP xptr) {

if (NANO_PTR(xptr) == NULL) return;
nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr);
nano_cv *ncv = xp->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);
if (xp->tls != NULL)
nng_tls_config_free(xp->tls);
nng_thread_destroy(xp->thr);
nng_url_free(xp->up);
for (int i = 0; i < xp->n; i++) {
nng_aio_free(xp->saio[i]->aio);
nng_aio_free(xp->raio[i]->aio);
nng_aio_free(xp->haio[i]->aio);
R_Free(xp->saio[i]);
R_Free(xp->raio[i]);
R_Free(xp->haio[i]);
R_Free(xp->url[i]);
}
R_Free(xp->saio);
R_Free(xp->raio);
R_Free(xp->haio);
R_Free(xp->url);
R_Free(xp->online);
nng_cv_free(ncv->cv);
nng_mtx_free(ncv->mtx);
R_Free(xp->cv);
R_Free(xp);

}

static void rnng_dispatch_thread(void *args) {

nano_thread_disp *disp = (nano_thread_disp *) args;
Expand Down

0 comments on commit 8c3856d

Please sign in to comment.