From c2a024f1f93badf9b9f1c769480640c32f8911bd Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 1 Nov 2024 14:42:38 +0000 Subject: [PATCH 01/11] persistent wait thread concept --- NEWS.md | 5 +++ src/init.c | 22 ++++++++++ src/thread.c | 121 ++++++++++++++++++++++++++++----------------------- 3 files changed, 93 insertions(+), 55 deletions(-) diff --git a/NEWS.md b/NEWS.md index 9f6019323..124df6217 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,7 +1,12 @@ # nanonext 1.3.0.9004 (development) +#### Behavioural Change + +* User interrupts while using `call_aio_()`, `collect_aio_()` or the equivalent `[]` method now cancel the 'aio' operation causing the Aio to return an 'errorValue 20 | Operation Canceled'. + #### Updates +* Performs interruptible 'aio' waits using a single dedicated thread, rather than launching new threads, for higher performance and efficiency. * Performance enhancements for 'ncurlAio' and 'recvAio' promises methods. * Updates bundled 'libnng' to v1.9.0 stable release. * The package has a shiny new hex logo. diff --git a/src/init.c b/src/init.c index 1d9f284f8..c1b866d9c 100644 --- a/src/init.c +++ b/src/init.c @@ -22,6 +22,15 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL; uint8_t special_bit = 0; +extern int nano_wait_thread_created; +extern int nano_wait_condition; +extern nng_thread *nano_wait_thr; +extern nng_mtx *nano_wait_mtx; +extern nng_cv *nano_wait_cv; +extern nng_mtx *nano_shared_mtx; +extern nng_cv *nano_shared_cv; +extern nng_aio *nano_shared_aio; + SEXP nano_AioSymbol; SEXP nano_ContextSymbol; SEXP nano_CvSymbol; @@ -211,5 +220,18 @@ void attribute_visible R_init_nanonext(DllInfo* dll) { // # nocov start void attribute_visible R_unload_nanonext(DllInfo *info) { ReleaseObjects(); + if (nano_wait_thread_created) { + if (nano_shared_aio != NULL) + nng_aio_stop(nano_shared_aio); + nng_mtx_lock(nano_wait_mtx); + nano_wait_condition = -1; + nng_cv_wake(nano_wait_cv); + nng_mtx_unlock(nano_wait_mtx); + nng_thread_destroy(nano_wait_thr); + nng_cv_free(nano_shared_cv); + nng_mtx_free(nano_shared_mtx); + nng_cv_free(nano_wait_cv); + nng_mtx_free(nano_wait_mtx); + } } // # nocov end diff --git a/src/thread.c b/src/thread.c index 8eb1654e1..461662449 100644 --- a/src/thread.c +++ b/src/thread.c @@ -22,6 +22,18 @@ // threads callable and messenger ---------------------------------------------- +int nano_wait_thread_created = 0; + +nng_thread *nano_wait_thr; +nng_mtx *nano_wait_mtx; +nng_cv *nano_wait_cv; +int nano_wait_condition = 0; + +nng_mtx *nano_shared_mtx; +nng_cv *nano_shared_cv; +static int nano_shared_condition = 0; +nng_aio *nano_shared_aio; + // # nocov start // tested interactively @@ -185,22 +197,6 @@ SEXP rnng_messenger_thread_create(SEXP args) { // threaded functions ---------------------------------------------------------- -static void thread_aio_finalizer(SEXP xptr) { - - if (NANO_PTR(xptr) == NULL) return; - nano_thread_aio *xp = (nano_thread_aio *) NANO_PTR(xptr); - nano_cv *ncv = xp->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; - nng_aio_stop(xp->aio); - nng_thread_destroy(xp->thr); - nng_cv_free(cv); - nng_mtx_free(mtx); - R_Free(ncv); - R_Free(xp); - -} - static void thread_duo_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; @@ -221,17 +217,24 @@ static void thread_duo_finalizer(SEXP xptr) { static void rnng_wait_thread(void *args) { - nano_thread_aio *taio = (nano_thread_aio *) args; - nano_cv *ncv = taio->cv; - nng_mtx *mtx = ncv->mtx; - nng_cv *cv = ncv->cv; + while (1) { + nng_mtx_lock(nano_wait_mtx); + while (nano_wait_condition == 0) + nng_cv_wait(nano_wait_cv); + if (nano_wait_condition == -1) { + nng_mtx_unlock(nano_wait_mtx); + break; + } + nano_wait_condition = 0; + nng_mtx_unlock(nano_wait_mtx); - nng_aio_wait(taio->aio); + nng_aio_wait(nano_shared_aio); - nng_mtx_lock(mtx); - ncv->condition = 1; - nng_cv_wake(cv); - nng_mtx_unlock(mtx); + nng_mtx_lock(nano_shared_mtx); + nano_shared_condition = 1; + nng_cv_wake(nano_shared_cv); + nng_mtx_unlock(nano_shared_mtx); + } } @@ -272,6 +275,11 @@ static void thread_disp_finalizer(SEXP xptr) { } +static void check_interrupt(void * data) { + (void) data; + R_CheckUserInterrupt(); +} + SEXP rnng_wait_thread_create(SEXP x) { const SEXPTYPE typ = TYPEOF(x); @@ -281,51 +289,52 @@ SEXP rnng_wait_thread_create(SEXP x) { if (NANO_TAG(coreaio) != nano_AioSymbol) return x; - PROTECT(coreaio); nano_aio *aiop = (nano_aio *) NANO_PTR(coreaio); - nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); - nano_cv *ncv = R_Calloc(1, nano_cv); - taio->aio = aiop->aio; - taio->cv = ncv; - nng_mtx *mtx; - nng_cv *cv; - int xc, signalled; - if ((xc = nng_mtx_alloc(&mtx))) - goto exitlevel1; + if (nano_wait_thread_created == 0) { + if ((xc = nng_mtx_alloc(&nano_wait_mtx))) + goto exitlevel1; - if ((xc = nng_cv_alloc(&cv, mtx))) - goto exitlevel2; + if ((xc = nng_cv_alloc(&nano_wait_cv, nano_wait_mtx))) + goto exitlevel2; - ncv->mtx = mtx; - ncv->cv = cv; + if ((xc = nng_mtx_alloc(&nano_shared_mtx))) + goto exitlevel3; - if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread, taio))) - goto exitlevel3; + if ((xc = nng_cv_alloc(&nano_shared_cv, nano_shared_mtx))) + goto exitlevel4; - SEXP xptr; - PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); - R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); - R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE); - UNPROTECT(2); + if ((xc = nng_thread_create(&nano_wait_thr, rnng_wait_thread, NULL))) + goto exitlevel5; + + nano_wait_thread_created = 1; + } + + nng_mtx_lock(nano_wait_mtx); + nano_shared_aio = aiop->aio; + nano_wait_condition = 1; + nng_cv_wake(nano_wait_cv); + nng_mtx_unlock(nano_wait_mtx); nng_time time = nng_clock(); while (1) { time = time + 400; signalled = 1; - nng_mtx_lock(mtx); - while (ncv->condition == 0) { - if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { + nng_mtx_lock(nano_shared_mtx); + while (nano_shared_condition == 0) { + if (nng_cv_until(nano_shared_cv, time) == NNG_ETIMEDOUT) { signalled = 0; break; } } - nng_mtx_unlock(mtx); + nano_shared_condition = 0; + nng_mtx_unlock(nano_shared_mtx); if (signalled) break; - R_CheckUserInterrupt(); + if (!R_ToplevelExec(check_interrupt, NULL)) + nng_aio_stop(nano_shared_aio); } switch (aiop->type) { @@ -348,13 +357,15 @@ SEXP rnng_wait_thread_create(SEXP x) { return x; + exitlevel5: + nng_cv_free(nano_shared_cv); + exitlevel4: + nng_mtx_free(nano_shared_mtx); exitlevel3: - nng_cv_free(cv); + nng_cv_free(nano_wait_cv); exitlevel2: - nng_mtx_free(mtx); + nng_mtx_free(nano_wait_mtx); exitlevel1: - R_Free(ncv); - R_Free(taio); ERROR_OUT(xc); } else if (typ == VECSXP) { From 5595075c21f3cc579386ee2423ffa65d2ea1398d Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 10:44:50 +0000 Subject: [PATCH 02/11] introduce rnng_thread_shutdown --- R/utils.R | 1 + man/zzz.Rd | 1 + src/init.c | 17 +++-------------- src/nanonext.h | 3 ++- src/thread.c | 18 ++++++++++++++++++ tests/tests.R | 1 + 6 files changed, 26 insertions(+), 15 deletions(-) diff --git a/R/utils.R b/R/utils.R index 4b7ec274b..04e432b91 100644 --- a/R/utils.R +++ b/R/utils.R @@ -356,6 +356,7 @@ serial_config <- function(class, sfunc, ufunc, vec = FALSE) #' if (Sys.info()[["sysname"]] == "Linux") { #' rm(list = ls()) #' gc() +#' .Call(nanonext:::rnng_thread_shutdown) #' Sys.sleep(1L) #' .Call(nanonext:::rnng_fini) #' } diff --git a/man/zzz.Rd b/man/zzz.Rd index 987814551..b375ac147 100644 --- a/man/zzz.Rd +++ b/man/zzz.Rd @@ -14,6 +14,7 @@ to run the examples. if (Sys.info()[["sysname"]] == "Linux") { rm(list = ls()) gc() + .Call(nanonext:::rnng_thread_shutdown) Sys.sleep(1L) .Call(nanonext:::rnng_fini) } diff --git a/src/init.c b/src/init.c index c1b866d9c..9d34c8b38 100644 --- a/src/init.c +++ b/src/init.c @@ -193,8 +193,9 @@ static const R_CallMethodDef callMethods[] = { {"rnng_stream_listen", (DL_FUNC) &rnng_stream_listen, 3}, {"rnng_strerror", (DL_FUNC) &rnng_strerror, 1}, {"rnng_subscribe", (DL_FUNC) &rnng_subscribe, 3}, - {"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0}, + {"rnng_thread_shutdown", (DL_FUNC) &rnng_thread_shutdown, 0}, {"rnng_tls_config", (DL_FUNC) &rnng_tls_config, 4}, + {"rnng_traverse_precious", (DL_FUNC) &rnng_traverse_precious, 0}, {"rnng_unresolved", (DL_FUNC) &rnng_unresolved, 1}, {"rnng_unresolved2", (DL_FUNC) &rnng_unresolved2, 1}, {"rnng_url_parse", (DL_FUNC) &rnng_url_parse, 1}, @@ -220,18 +221,6 @@ void attribute_visible R_init_nanonext(DllInfo* dll) { // # nocov start void attribute_visible R_unload_nanonext(DllInfo *info) { ReleaseObjects(); - if (nano_wait_thread_created) { - if (nano_shared_aio != NULL) - nng_aio_stop(nano_shared_aio); - nng_mtx_lock(nano_wait_mtx); - nano_wait_condition = -1; - nng_cv_wake(nano_wait_cv); - nng_mtx_unlock(nano_wait_mtx); - nng_thread_destroy(nano_wait_thr); - nng_cv_free(nano_shared_cv); - nng_mtx_free(nano_shared_mtx); - nng_cv_free(nano_wait_cv); - nng_mtx_free(nano_wait_mtx); - } + rnng_thread_shutdown(); } // # nocov end diff --git a/src/nanonext.h b/src/nanonext.h index d8a076975..949341e65 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -364,8 +364,9 @@ SEXP rnng_stream_dial(SEXP, SEXP, SEXP); SEXP rnng_stream_listen(SEXP, SEXP, SEXP); SEXP rnng_strerror(SEXP); SEXP rnng_subscribe(SEXP, SEXP, SEXP); -SEXP rnng_traverse_precious(void); +SEXP rnng_thread_shutdown(void); SEXP rnng_tls_config(SEXP, SEXP, SEXP, SEXP); +SEXP rnng_traverse_precious(void); SEXP rnng_unresolved(SEXP); SEXP rnng_unresolved2(SEXP); SEXP rnng_url_parse(SEXP); diff --git a/src/thread.c b/src/thread.c index 461662449..e3c7da362 100644 --- a/src/thread.c +++ b/src/thread.c @@ -231,6 +231,7 @@ static void rnng_wait_thread(void *args) { nng_aio_wait(nano_shared_aio); nng_mtx_lock(nano_shared_mtx); + nano_shared_aio = NULL; nano_shared_condition = 1; nng_cv_wake(nano_shared_cv); nng_mtx_unlock(nano_shared_mtx); @@ -280,6 +281,23 @@ static void check_interrupt(void * data) { R_CheckUserInterrupt(); } +SEXP rnng_thread_shutdown(void) { + if (nano_wait_thread_created) { + if (nano_shared_aio != NULL) + nng_aio_stop(nano_shared_aio); + nng_mtx_lock(nano_wait_mtx); + nano_wait_condition = -1; + nng_cv_wake(nano_wait_cv); + nng_mtx_unlock(nano_wait_mtx); + nng_thread_destroy(nano_wait_thr); + nng_cv_free(nano_shared_cv); + nng_mtx_free(nano_shared_mtx); + nng_cv_free(nano_wait_cv); + nng_mtx_free(nano_wait_mtx); + } + return R_NilValue; +} + SEXP rnng_wait_thread_create(SEXP x) { const SEXPTYPE typ = TYPEOF(x); diff --git a/tests/tests.R b/tests/tests.R index fdbabdafb..b762efde1 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -666,6 +666,7 @@ test_type("integer", .Call(nanonext:::rnng_traverse_precious)) if (Sys.info()[["sysname"]] == "Linux") { rm(list = ls()) gc() + .Call(nanonext:::rnng_thread_shutdown) Sys.sleep(1L) .Call(nanonext:::rnng_fini) invisible() From 8f4d22d6e2d697be75e78413680c9d8b26a074c6 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 11:44:53 +0000 Subject: [PATCH 03/11] greatly simplify implementation with single wait mutex/cv --- src/init.c | 8 +++----- src/thread.c | 58 +++++++++++++++++++--------------------------------- 2 files changed, 24 insertions(+), 42 deletions(-) diff --git a/src/init.c b/src/init.c index 9d34c8b38..1c719fff4 100644 --- a/src/init.c +++ b/src/init.c @@ -23,13 +23,11 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL; uint8_t special_bit = 0; extern int nano_wait_thread_created; -extern int nano_wait_condition; extern nng_thread *nano_wait_thr; +extern nng_aio *nano_shared_aio; extern nng_mtx *nano_wait_mtx; extern nng_cv *nano_wait_cv; -extern nng_mtx *nano_shared_mtx; -extern nng_cv *nano_shared_cv; -extern nng_aio *nano_shared_aio; +extern int nano_wait_condition; SEXP nano_AioSymbol; SEXP nano_ContextSymbol; @@ -220,7 +218,7 @@ void attribute_visible R_init_nanonext(DllInfo* dll) { // # nocov start void attribute_visible R_unload_nanonext(DllInfo *info) { - ReleaseObjects(); rnng_thread_shutdown(); + ReleaseObjects(); } // # nocov end diff --git a/src/thread.c b/src/thread.c index e3c7da362..22b3488d6 100644 --- a/src/thread.c +++ b/src/thread.c @@ -23,16 +23,17 @@ // threads callable and messenger ---------------------------------------------- int nano_wait_thread_created = 0; - nng_thread *nano_wait_thr; +nng_aio *nano_shared_aio; + nng_mtx *nano_wait_mtx; nng_cv *nano_wait_cv; int nano_wait_condition = 0; -nng_mtx *nano_shared_mtx; -nng_cv *nano_shared_cv; -static int nano_shared_condition = 0; -nng_aio *nano_shared_aio; +static void check_interrupt(void * data) { + (void) data; + R_CheckUserInterrupt(); +} // # nocov start // tested interactively @@ -225,16 +226,15 @@ static void rnng_wait_thread(void *args) { nng_mtx_unlock(nano_wait_mtx); break; } - nano_wait_condition = 0; nng_mtx_unlock(nano_wait_mtx); nng_aio_wait(nano_shared_aio); - nng_mtx_lock(nano_shared_mtx); + nng_mtx_lock(nano_wait_mtx); nano_shared_aio = NULL; - nano_shared_condition = 1; - nng_cv_wake(nano_shared_cv); - nng_mtx_unlock(nano_shared_mtx); + nano_wait_condition = 0; + nng_cv_wake(nano_wait_cv); + nng_mtx_unlock(nano_wait_mtx); } } @@ -276,11 +276,6 @@ static void thread_disp_finalizer(SEXP xptr) { } -static void check_interrupt(void * data) { - (void) data; - R_CheckUserInterrupt(); -} - SEXP rnng_thread_shutdown(void) { if (nano_wait_thread_created) { if (nano_shared_aio != NULL) @@ -290,8 +285,6 @@ SEXP rnng_thread_shutdown(void) { nng_cv_wake(nano_wait_cv); nng_mtx_unlock(nano_wait_mtx); nng_thread_destroy(nano_wait_thr); - nng_cv_free(nano_shared_cv); - nng_mtx_free(nano_shared_mtx); nng_cv_free(nano_wait_cv); nng_mtx_free(nano_wait_mtx); } @@ -311,29 +304,25 @@ SEXP rnng_wait_thread_create(SEXP x) { int xc, signalled; - if (nano_wait_thread_created == 0) { + if (!nano_wait_thread_created) { if ((xc = nng_mtx_alloc(&nano_wait_mtx))) goto exitlevel1; if ((xc = nng_cv_alloc(&nano_wait_cv, nano_wait_mtx))) goto exitlevel2; - if ((xc = nng_mtx_alloc(&nano_shared_mtx))) - goto exitlevel3; - - if ((xc = nng_cv_alloc(&nano_shared_cv, nano_shared_mtx))) - goto exitlevel4; - if ((xc = nng_thread_create(&nano_wait_thr, rnng_wait_thread, NULL))) - goto exitlevel5; + goto exitlevel3; nano_wait_thread_created = 1; } nng_mtx_lock(nano_wait_mtx); - nano_shared_aio = aiop->aio; - nano_wait_condition = 1; - nng_cv_wake(nano_wait_cv); + if (nano_wait_condition == 0) { + nano_shared_aio = aiop->aio; + nano_wait_condition = 1; + nng_cv_wake(nano_wait_cv); + } nng_mtx_unlock(nano_wait_mtx); nng_time time = nng_clock(); @@ -341,15 +330,14 @@ SEXP rnng_wait_thread_create(SEXP x) { while (1) { time = time + 400; signalled = 1; - nng_mtx_lock(nano_shared_mtx); - while (nano_shared_condition == 0) { - if (nng_cv_until(nano_shared_cv, time) == NNG_ETIMEDOUT) { + nng_mtx_lock(nano_wait_mtx); + while (nano_wait_condition == 1) { + if (nng_cv_until(nano_wait_cv, time) == NNG_ETIMEDOUT) { signalled = 0; break; } } - nano_shared_condition = 0; - nng_mtx_unlock(nano_shared_mtx); + nng_mtx_unlock(nano_wait_mtx); if (signalled) break; if (!R_ToplevelExec(check_interrupt, NULL)) nng_aio_stop(nano_shared_aio); @@ -375,10 +363,6 @@ SEXP rnng_wait_thread_create(SEXP x) { return x; - exitlevel5: - nng_cv_free(nano_shared_cv); - exitlevel4: - nng_mtx_free(nano_shared_mtx); exitlevel3: nng_cv_free(nano_wait_cv); exitlevel2: From fdabb171c98e7c1cdee6dce4c922fbfd7d068e66 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:25:38 +0000 Subject: [PATCH 04/11] no longer cancel the aio on interrupt but on next wait if necessary to unblock the thread --- NEWS.md | 4 ---- src/thread.c | 32 ++++++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/NEWS.md b/NEWS.md index 124df6217..550107d87 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,9 +1,5 @@ # nanonext 1.3.0.9004 (development) -#### Behavioural Change - -* User interrupts while using `call_aio_()`, `collect_aio_()` or the equivalent `[]` method now cancel the 'aio' operation causing the Aio to return an 'errorValue 20 | Operation Canceled'. - #### Updates * Performs interruptible 'aio' waits using a single dedicated thread, rather than launching new threads, for higher performance and efficiency. diff --git a/src/thread.c b/src/thread.c index 22b3488d6..2273d6e79 100644 --- a/src/thread.c +++ b/src/thread.c @@ -30,11 +30,6 @@ nng_mtx *nano_wait_mtx; nng_cv *nano_wait_cv; int nano_wait_condition = 0; -static void check_interrupt(void * data) { - (void) data; - R_CheckUserInterrupt(); -} - // # nocov start // tested interactively @@ -317,13 +312,23 @@ SEXP rnng_wait_thread_create(SEXP x) { nano_wait_thread_created = 1; } - nng_mtx_lock(nano_wait_mtx); - if (nano_wait_condition == 0) { - nano_shared_aio = aiop->aio; - nano_wait_condition = 1; - nng_cv_wake(nano_wait_cv); - } - nng_mtx_unlock(nano_wait_mtx); + int restart; + do { + nng_mtx_lock(nano_wait_mtx); + if (nano_wait_condition == 0) { + nano_shared_aio = aiop->aio; + nano_wait_condition = 1; + nng_cv_wake(nano_wait_cv); + } else { + if (nano_shared_aio != aiop->aio) { + nng_aio_stop(nano_shared_aio); + while (nano_wait_condition == 1) + nng_cv_wait(nano_wait_cv); + } + } + restart = nano_wait_condition == 0; + nng_mtx_unlock(nano_wait_mtx); + } while (restart); nng_time time = nng_clock(); @@ -339,8 +344,7 @@ SEXP rnng_wait_thread_create(SEXP x) { } nng_mtx_unlock(nano_wait_mtx); if (signalled) break; - if (!R_ToplevelExec(check_interrupt, NULL)) - nng_aio_stop(nano_shared_aio); + R_CheckUserInterrupt(); } switch (aiop->type) { From 37110281be892a6066adbda43ed9ff39b44d2a1b Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 14:07:38 +0000 Subject: [PATCH 05/11] preserve previous behaviour - launch new threads when required --- src/thread.c | 197 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 138 insertions(+), 59 deletions(-) diff --git a/src/thread.c b/src/thread.c index 2273d6e79..b972e2ba0 100644 --- a/src/thread.c +++ b/src/thread.c @@ -193,6 +193,22 @@ SEXP rnng_messenger_thread_create(SEXP args) { // threaded functions ---------------------------------------------------------- +static void thread_aio_finalizer(SEXP xptr) { + + if (NANO_PTR(xptr) == NULL) return; + nano_thread_aio *xp = (nano_thread_aio *) NANO_PTR(xptr); + nano_cv *ncv = xp->cv; + nng_mtx *mtx = ncv->mtx; + nng_cv *cv = ncv->cv; + nng_aio_stop(xp->aio); + nng_thread_destroy(xp->thr); + nng_cv_free(cv); + nng_mtx_free(mtx); + R_Free(ncv); + R_Free(xp); + +} + static void thread_duo_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; @@ -211,29 +227,6 @@ static void thread_duo_finalizer(SEXP xptr) { } -static void rnng_wait_thread(void *args) { - - while (1) { - nng_mtx_lock(nano_wait_mtx); - while (nano_wait_condition == 0) - nng_cv_wait(nano_wait_cv); - if (nano_wait_condition == -1) { - nng_mtx_unlock(nano_wait_mtx); - break; - } - nng_mtx_unlock(nano_wait_mtx); - - nng_aio_wait(nano_shared_aio); - - nng_mtx_lock(nano_wait_mtx); - nano_shared_aio = NULL; - nano_wait_condition = 0; - nng_cv_wake(nano_wait_cv); - nng_mtx_unlock(nano_wait_mtx); - } - -} - static void thread_disp_finalizer(SEXP xptr) { if (NANO_PTR(xptr) == NULL) return; @@ -271,19 +264,43 @@ static void thread_disp_finalizer(SEXP xptr) { } -SEXP rnng_thread_shutdown(void) { - if (nano_wait_thread_created) { - if (nano_shared_aio != NULL) - nng_aio_stop(nano_shared_aio); +static void rnng_wait_thread(void *args) { + + while (1) { nng_mtx_lock(nano_wait_mtx); - nano_wait_condition = -1; + while (nano_wait_condition == 0) + nng_cv_wait(nano_wait_cv); + if (nano_wait_condition == -1) { + nng_mtx_unlock(nano_wait_mtx); + break; + } + nng_mtx_unlock(nano_wait_mtx); + + nng_aio_wait(nano_shared_aio); + + nng_mtx_lock(nano_wait_mtx); + nano_shared_aio = NULL; + nano_wait_condition = 0; nng_cv_wake(nano_wait_cv); nng_mtx_unlock(nano_wait_mtx); - nng_thread_destroy(nano_wait_thr); - nng_cv_free(nano_wait_cv); - nng_mtx_free(nano_wait_mtx); } - return R_NilValue; + +} + +static void rnng_wait_thread_single(void *args) { + + nano_thread_aio *taio = (nano_thread_aio *) args; + nano_cv *ncv = taio->cv; + nng_mtx *mtx = ncv->mtx; + nng_cv *cv = ncv->cv; + + nng_aio_wait(taio->aio); + + nng_mtx_lock(mtx); + ncv->condition = 1; + nng_cv_wake(cv); + nng_mtx_unlock(mtx); + } SEXP rnng_wait_thread_create(SEXP x) { @@ -312,39 +329,86 @@ SEXP rnng_wait_thread_create(SEXP x) { nano_wait_thread_created = 1; } - int restart; - do { - nng_mtx_lock(nano_wait_mtx); - if (nano_wait_condition == 0) { - nano_shared_aio = aiop->aio; - nano_wait_condition = 1; - nng_cv_wake(nano_wait_cv); - } else { - if (nano_shared_aio != aiop->aio) { - nng_aio_stop(nano_shared_aio); - while (nano_wait_condition == 1) - nng_cv_wait(nano_wait_cv); + int thread_required = 0; + nng_mtx_lock(nano_wait_mtx); + if (nano_wait_condition == 0) { + nano_shared_aio = aiop->aio; + nano_wait_condition = 1; + nng_cv_wake(nano_wait_cv); + } else if (nano_shared_aio != aiop->aio) { + thread_required = 1; + } + nng_mtx_unlock(nano_wait_mtx); + + if (thread_required) { + + PROTECT(coreaio); + nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); + nano_cv *ncv = R_Calloc(1, nano_cv); + taio->aio = aiop->aio; + taio->cv = ncv; + nng_mtx *mtx; + nng_cv *cv; + + if ((xc = nng_mtx_alloc(&mtx))) + ERROR_OUT(xc); + + if ((xc = nng_cv_alloc(&cv, mtx))) { + nng_mtx_free(mtx); + ERROR_OUT(xc); + } + + ncv->mtx = mtx; + ncv->cv = cv; + + if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) { + nng_cv_free(cv); + nng_mtx_free(mtx); + ERROR_OUT(xc); + } + + SEXP xptr; + PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); + R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); + R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE); + UNPROTECT(2); + + nng_time time = nng_clock(); + + while (1) { + time = time + 400; + signalled = 1; + nng_mtx_lock(mtx); + while (ncv->condition == 0) { + if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { + signalled = 0; + break; + } } + nng_mtx_unlock(mtx); + if (signalled) break; + R_CheckUserInterrupt(); } - restart = nano_wait_condition == 0; - nng_mtx_unlock(nano_wait_mtx); - } while (restart); - nng_time time = nng_clock(); + } else { + + nng_time time = nng_clock(); - while (1) { - time = time + 400; - signalled = 1; - nng_mtx_lock(nano_wait_mtx); - while (nano_wait_condition == 1) { - if (nng_cv_until(nano_wait_cv, time) == NNG_ETIMEDOUT) { - signalled = 0; - break; + while (1) { + time = time + 400; + signalled = 1; + nng_mtx_lock(nano_wait_mtx); + while (nano_wait_condition == 1) { + if (nng_cv_until(nano_wait_cv, time) == NNG_ETIMEDOUT) { + signalled = 0; + break; + } } + nng_mtx_unlock(nano_wait_mtx); + if (signalled) break; + R_CheckUserInterrupt(); } - nng_mtx_unlock(nano_wait_mtx); - if (signalled) break; - R_CheckUserInterrupt(); + } switch (aiop->type) { @@ -387,6 +451,21 @@ SEXP rnng_wait_thread_create(SEXP x) { } +SEXP rnng_thread_shutdown(void) { + if (nano_wait_thread_created) { + if (nano_shared_aio != NULL) + nng_aio_stop(nano_shared_aio); + nng_mtx_lock(nano_wait_mtx); + nano_wait_condition = -1; + nng_cv_wake(nano_wait_cv); + nng_mtx_unlock(nano_wait_mtx); + nng_thread_destroy(nano_wait_thr); + nng_cv_free(nano_wait_cv); + nng_mtx_free(nano_wait_mtx); + } + return R_NilValue; +} + static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) { nano_signal *signal = (nano_signal *) arg; From 062dd2b49f6e1139e326f9b618fd0f34af498cb4 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 14:13:19 +0000 Subject: [PATCH 06/11] increment version and update NEWS --- DESCRIPTION | 2 +- NEWS.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 1386bf900..815d70e5f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.3.0.9004 +Version: 1.3.0.9005 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NEWS.md b/NEWS.md index 550107d87..621f95f07 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,8 +1,8 @@ -# nanonext 1.3.0.9004 (development) +# nanonext 1.3.0.9005 (development) #### Updates -* Performs interruptible 'aio' waits using a single dedicated thread, rather than launching new threads, for higher performance and efficiency. +* Performs interruptible 'aio' waits from a single dedicated thread, only launching new threads if required, for higher performance and efficiency. * Performance enhancements for 'ncurlAio' and 'recvAio' promises methods. * Updates bundled 'libnng' to v1.9.0 stable release. * The package has a shiny new hex logo. From e9fc6a11cc4e78f5e5a993895292eee9b75a2a61 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:41:23 +0000 Subject: [PATCH 07/11] refactor wait thread logic --- src/thread.c | 109 ++++++++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/src/thread.c b/src/thread.c index b972e2ba0..8efe4fb79 100644 --- a/src/thread.c +++ b/src/thread.c @@ -303,6 +303,63 @@ static void rnng_wait_thread_single(void *args) { } +void single_wait_thread_create(SEXP x) { + + nano_aio *aiop = (nano_aio *) NANO_PTR(x); + nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); + nano_cv *ncv = R_Calloc(1, nano_cv); + taio->aio = aiop->aio; + taio->cv = ncv; + nng_mtx *mtx; + nng_cv *cv; + int xc, signalled; + + if ((xc = nng_mtx_alloc(&mtx))) + goto exitlevel1; + + if ((xc = nng_cv_alloc(&cv, mtx))) + goto exitlevel2; + + ncv->mtx = mtx; + ncv->cv = cv; + + if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) + goto exitlevel3; + + SEXP xptr; + PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); + R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); + R_MakeWeakRef(x, xptr, R_NilValue, TRUE); + UNPROTECT(1); + + nng_time time = nng_clock(); + + while (1) { + time = time + 400; + signalled = 1; + nng_mtx_lock(mtx); + while (ncv->condition == 0) { + if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { + signalled = 0; + break; + } + } + nng_mtx_unlock(mtx); + if (signalled) break; + R_CheckUserInterrupt(); + } + + return; + + exitlevel3: + nng_cv_free(cv); + exitlevel2: + nng_mtx_free(mtx); + exitlevel1: + ERROR_OUT(xc); + +} + SEXP rnng_wait_thread_create(SEXP x) { const SEXPTYPE typ = TYPEOF(x); @@ -335,60 +392,16 @@ SEXP rnng_wait_thread_create(SEXP x) { nano_shared_aio = aiop->aio; nano_wait_condition = 1; nng_cv_wake(nano_wait_cv); - } else if (nano_shared_aio != aiop->aio) { - thread_required = 1; + } else { + thread_required = nano_shared_aio != aiop->aio; } nng_mtx_unlock(nano_wait_mtx); if (thread_required) { PROTECT(coreaio); - nano_thread_aio *taio = R_Calloc(1, nano_thread_aio); - nano_cv *ncv = R_Calloc(1, nano_cv); - taio->aio = aiop->aio; - taio->cv = ncv; - nng_mtx *mtx; - nng_cv *cv; - - if ((xc = nng_mtx_alloc(&mtx))) - ERROR_OUT(xc); - - if ((xc = nng_cv_alloc(&cv, mtx))) { - nng_mtx_free(mtx); - ERROR_OUT(xc); - } - - ncv->mtx = mtx; - ncv->cv = cv; - - if ((xc = nng_thread_create(&taio->thr, rnng_wait_thread_single, taio))) { - nng_cv_free(cv); - nng_mtx_free(mtx); - ERROR_OUT(xc); - } - - SEXP xptr; - PROTECT(xptr = R_MakeExternalPtr(taio, R_NilValue, R_NilValue)); - R_RegisterCFinalizerEx(xptr, thread_aio_finalizer, TRUE); - R_MakeWeakRef(coreaio, xptr, R_NilValue, TRUE); - UNPROTECT(2); - - nng_time time = nng_clock(); - - while (1) { - time = time + 400; - signalled = 1; - nng_mtx_lock(mtx); - while (ncv->condition == 0) { - if (nng_cv_until(cv, time) == NNG_ETIMEDOUT) { - signalled = 0; - break; - } - } - nng_mtx_unlock(mtx); - if (signalled) break; - R_CheckUserInterrupt(); - } + single_wait_thread_create(coreaio); + UNPROTECT(1); } else { From 3981b4e648be38bea14c4a0a9811cca7b00a829c Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:06:59 +0000 Subject: [PATCH 08/11] qs2 concept --- DESCRIPTION | 2 ++ NAMESPACE | 1 + R/nanonext-package.R | 1 + R/sendrecv.R | 12 ++++++++++++ man/recv.Rd | 12 ++++++++++++ src/comms.c | 17 +++++++++++++++-- src/core.c | 5 ++++- src/init.c | 5 +++++ src/nanonext.h | 4 ++++ 9 files changed, 56 insertions(+), 3 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 815d70e5f..4f1776374 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -30,6 +30,8 @@ SystemRequirements: 'libnng' >= 1.6 and 'libmbedtls' >= 2.5, or 'cmake' and 'xz' to compile NNG and/or Mbed TLS included in package sources Depends: R (>= 3.6) +Imports: + qs2 Enhances: promises Suggests: diff --git a/NAMESPACE b/NAMESPACE index cfaba60ed..1b30d8ecb 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -108,4 +108,5 @@ export(until_) export(wait) export(wait_) export(write_cert) +importFrom(qs2,qs_serialize) useDynLib(nanonext, .registration = TRUE) diff --git a/R/nanonext-package.R b/R/nanonext-package.R index 9766b8665..e1af1c2f9 100644 --- a/R/nanonext-package.R +++ b/R/nanonext-package.R @@ -91,5 +91,6 @@ #' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID}) #' #' @useDynLib nanonext, .registration = TRUE +#' @importFrom qs2 qs_serialize #' "_PACKAGE" diff --git a/R/sendrecv.R b/R/sendrecv.R index 8ae53a5b3..3214ec82d 100644 --- a/R/sendrecv.R +++ b/R/sendrecv.R @@ -162,6 +162,18 @@ send <- function(con, data, mode = c("serial", "raw"), block = NULL) #' close(req) #' close(rep) #' +#' # using qs2 format (specify mode = 0L) +#' +#' s <- socket(listen = "inproc://qs2") +#' s1 <- socket(dial = "inproc://qs2") +#' q <- list(a = 1, b = "test", c = data.frame()) +#' send(s, q, mode = 0L) +#' r <- recv(s1, mode = 0L) +#' identical(q, r) +#' +#' close(s) +#' close(s1) +#' #' @export #' recv <- function(con, diff --git a/man/recv.Rd b/man/recv.Rd index f350122eb..b2857a8c5 100644 --- a/man/recv.Rd +++ b/man/recv.Rd @@ -99,6 +99,18 @@ recv(ctxp, mode = "double", block = 100) close(req) close(rep) +# using qs2 format (specify mode = 0L) + +s <- socket(listen = "inproc://qs2") +s1 <- socket(dial = "inproc://qs2") +q <- list(a = 1, b = "test", c = data.frame()) +send(s, q, mode = 0L) +r <- recv(s1, mode = 0L) +identical(q, r) + +close(s) +close(s1) + } \seealso{ \code{\link{recv_aio}} for asynchronous receive. diff --git a/src/comms.c b/src/comms.c index e0ecbc830..7f76db137 100644 --- a/src/comms.c +++ b/src/comms.c @@ -309,12 +309,22 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { const int flags = block == R_NilValue ? NNG_DURATION_DEFAULT : TYPEOF(block) == LGLSXP ? 0 : nano_integer(block); nano_buf buf; - int xc; + int xc, mod; const SEXP ptrtag = NANO_TAG(con); if (ptrtag == nano_SocketSymbol) { - nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + mod = nano_encodes(mode); + switch (mod) { + case 2: + nano_encode(&buf, data); + break; + case 0: + buf.buf = (unsigned char *) qs2_serialize(data, &buf.cur, 1, 0, 1); + break; + default: + nano_serialize(&buf, data, NANO_PROT(con)); + } nng_socket *sock = (nng_socket *) NANO_PTR(con); if (flags <= 0) { @@ -421,6 +431,9 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { Rf_error("'con' is not a valid Socket, Context or Stream"); } + if (mod == 3) + free(buf.buf); + if (xc) return mk_error(xc); diff --git a/src/core.c b/src/core.c index 327985e41..9bf52fa18 100644 --- a/src/core.c +++ b/src/core.c @@ -557,9 +557,12 @@ SEXP nano_decode(unsigned char *buf, const size_t sz, const uint8_t mod, SEXP ho case 9: data = rawToChar(buf, sz); return data; + case 0: + data = qs2_deserialize((char *) buf, sz, 0, 1); + return data; default: data = nano_unserialize(buf, sz, hook); - return data; + return data; } memcpy(NANO_DATAPTR(data), buf, sz); diff --git a/src/init.c b/src/init.c index 1c719fff4..efe1bedac 100644 --- a/src/init.c +++ b/src/init.c @@ -20,6 +20,9 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL; +char *(*qs2_serialize)(SEXP, uint64_t *, const int, const bool, const int); +SEXP (*qs2_deserialize)(const char *, const uint64_t, const bool, const int); + uint8_t special_bit = 0; extern int nano_wait_thread_created; @@ -211,6 +214,8 @@ static const R_ExternalMethodDef externalMethods[] = { void attribute_visible R_init_nanonext(DllInfo* dll) { RegisterSymbols(); PreserveObjects(); + qs2_serialize = (char *(*)(SEXP, uint64_t *, const int, const bool, const int)) R_GetCCallable("qs2", "c_qs_serialize"); + qs2_deserialize = (SEXP (*)(const char *, const uint64_t, const bool, const int)) R_GetCCallable("qs2", "c_qs_deserialize"); R_registerRoutines(dll, NULL, callMethods, NULL, externalMethods); R_useDynamicSymbols(dll, FALSE); R_forceSymbols(dll, TRUE); diff --git a/src/nanonext.h b/src/nanonext.h index 949341e65..11bda94a7 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -228,6 +228,10 @@ typedef struct nano_buf_s { } nano_buf; extern void (*eln2)(void (*)(void *), void *, double, int); + +extern char *(*qs2_serialize)(SEXP, uint64_t *, const int, const bool, const int); +extern SEXP (*qs2_deserialize)(const char *, const uint64_t, const bool, const int); + extern uint8_t special_bit; extern SEXP nano_AioSymbol; From 348c5da956381f33bac879cb26a9c168e76a4528 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:25:47 +0000 Subject: [PATCH 09/11] correct free --- src/comms.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/comms.c b/src/comms.c index 7f76db137..e0eb9391f 100644 --- a/src/comms.c +++ b/src/comms.c @@ -357,6 +357,10 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { } + + if (mod == 0) + free(buf.buf); + } else if (ptrtag == nano_ContextSymbol) { nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); @@ -431,9 +435,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { Rf_error("'con' is not a valid Socket, Context or Stream"); } - if (mod == 3) - free(buf.buf); - if (xc) return mk_error(xc); From 2534d286babdaba020b039c58ce580944e569df0 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sat, 9 Nov 2024 21:30:25 +0000 Subject: [PATCH 10/11] add remote --- DESCRIPTION | 1 + 1 file changed, 1 insertion(+) diff --git a/DESCRIPTION b/DESCRIPTION index 4f1776374..c1c956bbc 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -40,3 +40,4 @@ Suggests: VignetteBuilder: litedown RoxygenNote: 7.3.2 Config/build/compilation-database: true +Remotes: qsbase/qs2 From 31e0d46d82c4170fbc63f2c30b4896932def0b37 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:37:50 +0000 Subject: [PATCH 11/11] update argument types --- src/comms.c | 2 +- src/core.c | 2 +- src/init.c | 8 ++++---- src/nanonext.h | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/comms.c b/src/comms.c index e0eb9391f..3dbc3bc25 100644 --- a/src/comms.c +++ b/src/comms.c @@ -320,7 +320,7 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { nano_encode(&buf, data); break; case 0: - buf.buf = (unsigned char *) qs2_serialize(data, &buf.cur, 1, 0, 1); + buf.buf = qs2_serialize(data, &buf.cur, 1, 0, 1); break; default: nano_serialize(&buf, data, NANO_PROT(con)); diff --git a/src/core.c b/src/core.c index 9bf52fa18..04f428c9c 100644 --- a/src/core.c +++ b/src/core.c @@ -558,7 +558,7 @@ SEXP nano_decode(unsigned char *buf, const size_t sz, const uint8_t mod, SEXP ho data = rawToChar(buf, sz); return data; case 0: - data = qs2_deserialize((char *) buf, sz, 0, 1); + data = qs2_deserialize(buf, sz, 0, 1); return data; default: data = nano_unserialize(buf, sz, hook); diff --git a/src/init.c b/src/init.c index efe1bedac..e2fa5f424 100644 --- a/src/init.c +++ b/src/init.c @@ -20,8 +20,8 @@ void (*eln2)(void (*)(void *), void *, double, int) = NULL; -char *(*qs2_serialize)(SEXP, uint64_t *, const int, const bool, const int); -SEXP (*qs2_deserialize)(const char *, const uint64_t, const bool, const int); +unsigned char *(*qs2_serialize)(SEXP, size_t *, const int, const bool, const int); +SEXP (*qs2_deserialize)(const unsigned char *, const size_t, const bool, const int); uint8_t special_bit = 0; @@ -214,8 +214,8 @@ static const R_ExternalMethodDef externalMethods[] = { void attribute_visible R_init_nanonext(DllInfo* dll) { RegisterSymbols(); PreserveObjects(); - qs2_serialize = (char *(*)(SEXP, uint64_t *, const int, const bool, const int)) R_GetCCallable("qs2", "c_qs_serialize"); - qs2_deserialize = (SEXP (*)(const char *, const uint64_t, const bool, const int)) R_GetCCallable("qs2", "c_qs_deserialize"); + qs2_serialize = (unsigned char *(*)(SEXP, size_t *, const int, const bool, const int)) R_GetCCallable("qs2", "c_qs_serialize"); + qs2_deserialize = (SEXP (*)(const unsigned char *, const size_t, const bool, const int)) R_GetCCallable("qs2", "c_qs_deserialize"); R_registerRoutines(dll, NULL, callMethods, NULL, externalMethods); R_useDynamicSymbols(dll, FALSE); R_forceSymbols(dll, TRUE); diff --git a/src/nanonext.h b/src/nanonext.h index 11bda94a7..1e051930b 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -229,8 +229,8 @@ typedef struct nano_buf_s { extern void (*eln2)(void (*)(void *), void *, double, int); -extern char *(*qs2_serialize)(SEXP, uint64_t *, const int, const bool, const int); -extern SEXP (*qs2_deserialize)(const char *, const uint64_t, const bool, const int); +extern unsigned char *(*qs2_serialize)(SEXP, size_t *, const int, const bool, const int); +extern SEXP (*qs2_deserialize)(const unsigned char *, const size_t, const bool, const int); extern uint8_t special_bit;