Skip to content

Commit

Permalink
initial
Browse files Browse the repository at this point in the history
  • Loading branch information
gowerc committed Oct 3, 2024
1 parent 06ca118 commit 85f40b0
Show file tree
Hide file tree
Showing 14 changed files with 572 additions and 120 deletions.
5 changes: 3 additions & 2 deletions .lintr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
linters: with_defaults(
linters: linters_with_defaults(
line_length_linter(120),
object_name_linter = NULL
object_name_linter = NULL,
indentation_linter(indent = 4L)
)
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export(has_class)
export(impute)
export(locf)
export(longDataConstructor)
export(make_rbmi_cluster)
export(method_approxbayes)
export(method_bayes)
export(method_bmlmi)
Expand Down
107 changes: 100 additions & 7 deletions R/analyse.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,58 @@
#' @param delta A `data.frame` containing the delta transformation to be applied to the imputed
#' datasets prior to running `fun`. See details.
#' @param ... Additional arguments passed onto `fun`.
#' @param ncores The number of parallel processes to use when running this function. Can also be a
#' cluster object created by [`make_rbmi_cluster()`]. See the parallisation section below.
#'
#' @section Parallelisation:
#' To speed up the evaluation of `analyse()` you can use the `ncores` argument to enable parallelisation.
#' Simply providing an integer will get rbmi to automatically spawn that many background processes
#' to parallelise across. If you are using a custom analysis function then you need to ensure
#' that any libraries or global objects required by your function are available in the
#' sub-processes. To do this you need to use the [`make_rbmi_cluster()`] function for example:
#' ```
#' my_custom_fun <- function(...) <some analysis code>
#' cl <- make_rbmi_cluster(
#' 4,
#' objects = list("my_custom_fun" = my_custom_fun),
#' packages = c("dplyr", "nlme")
#' )
#' analyse(
#' imputations = imputeObj,
#' fun = my_custom_fun,
#' ncores = cl
#' )
#' parallel::stopCluster(cl)
#' ```
#'
#' Note that there is significant overhead both with setting up the sub-processes and with
#' transferring data back-and-forth between the main process and the sub-processes. As such
#' parallelisation of the `analyse()` function tends to only be worth it when you have
#' `> 2000` samples generated by [`draws()`]. Conversely using parallelisation if your samples
#' are smaller than this may lead to longer run times than just running it sequentially.
#'
#' Finally, if you are doing a tipping point analysis you can get a reasonable performance
#' improvement by re-using the cluster between each call to `analyse()` e.g.
#' ```
#' cl <- make_rbmi_cluster(4)
#' ana_1 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_1,
#' ncores = cl
#' )
#' ana_2 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_2,
#' ncores = cl
#' )
#' ana_3 <- analyse(
#' imputations = imputeObj,
#' delta = delta_plan_3,
#' ncores = cl
#' )
#' parallel::clusterStop(cl)
#' ```
#'
#' @examples
#' \dontrun{
#' vars <- set_vars(
Expand Down Expand Up @@ -119,7 +171,13 @@
#' )
#' }
#' @export
analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
analyse <- function(
imputations,
fun = ancova,
delta = NULL,
...,
ncores = 1
) {

validate(imputations)

Expand Down Expand Up @@ -152,14 +210,49 @@ analyse <- function(imputations, fun = ancova, delta = NULL, ...) {
)
}

results <- lapply(
imputations$imputations,
function(x, ...) {
dat2 <- extract_imputed_df(x, imputations$data, delta)
fun(dat2, ...)
# Mangle name to avoid any conflicts with user defined objects if running
# in a cluster
..rbmi..analysis..fun <- fun
cl <- make_rbmi_cluster(
ncores,
objects = list(
"imputations" = imputations,
"delta" = delta,
"..rbmi..analysis..fun" = fun
)
)

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")) {
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# Chunk up requests for significant speed improvement when running in parallel
number_of_cores <- ifelse(is.null(cl), 1, length(cl))
indexes <- seq_along(imputations$imputations)
indexes_split <- split(indexes, (indexes %% number_of_cores) + 1)

results <- par_lapply(
cl,
function(indicies, ...) {
inner_fun <- function(idx, ...) {
dat2 <- extract_imputed_df(imputations$imputations[[idx]], imputations$data, delta)
..rbmi..analysis..fun(dat2, ...)
}
lapply(indicies, inner_fun, ...)
},
indexes_split,
...
)
) |>
unlist(recursive = FALSE, use.names = FALSE)

# Reorder
results <- results[order(unlist(indexes_split, use.names = FALSE))]
names(results) <- NULL

fun_name <- deparse(substitute(fun))
if (length(fun_name) > 1) {
Expand Down
34 changes: 23 additions & 11 deletions R/draws.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
#' [method_approxbayes()], [method_condmean()] or [method_bmlmi()].
#' It specifies the multiple imputation methodology to be used. See details.
#' @param ncores A single numeric specifying the number of cores to use in creating the draws object.
#' Note that this parameter is ignored for [method_bayes()] (Default = 1).
#' Note that this parameter is ignored for [method_bayes()] (Default = 1). Can also be a cluster object
#' generated by [`make_rbmi_cluster()`]
#' @param quiet Logical, if `TRUE` will suppress printing of progress information that is printed to
#' the console.
#'
Expand Down Expand Up @@ -342,27 +343,40 @@ get_draws_mle <- function(
}


cl <- get_cluster(ncores)
mmrm_sample <- encap_get_mmrm_sample(cl, longdata, method)
cl <- make_rbmi_cluster(ncores, objects = list("longdata" = longdata, "method" = method))

# If the user provided the clusters object directly then do not close it on completion
if (!is(ncores, "cluster")){
on.exit(
{ if (!is.null(cl)) parallel::stopCluster(cl) },
add = TRUE,
after = FALSE
)
}

# browser()
# get_mmrm_sample
# mmrm_sample(ids)
# clusterEvalQ(cl, fit_mmrm)

# Encapsulate arguments into a single function on `ids` and handle parallelisation
par_get_mmrm_sample <- function(ids) {
par_lapply(
cl,
get_mmrm_sample,
ids,
longdata = longdata,
method = method
)
}

samples <- list()
n_failed_samples <- 0
logger <- progressLogger$new(n_target_samples, quiet = quiet)

while (length(samples) < n_target_samples) {
ids <- sample_stack$pop(min(ncores, n_target_samples - length(samples)))
new_samples <- mmrm_sample(ids)
new_samples <- par_get_mmrm_sample(ids)
isfailure <- vapply(new_samples, function(x) x$failed, logical(1))
new_samples_keep <- new_samples[!isfailure]
n_failed_samples <- n_failed_samples + sum(isfailure)
if (n_failed_samples > failure_limit) {
if (!is.null(cl)) parallel::stopCluster(cl)
if (!is.null(method$type)) {
if(method$type == "jackknife"){
ids_fail <- ids[isfailure][[1]]
Expand All @@ -378,8 +392,6 @@ get_draws_mle <- function(
samples <- append(samples, new_samples_keep)
}

if (!is.null(cl)) parallel::stopCluster(cl)

assert_that(
length(samples) == n_target_samples,
msg = "Incorrect number of samples were produced"
Expand Down
140 changes: 87 additions & 53 deletions R/parallel.R
Original file line number Diff line number Diff line change
@@ -1,41 +1,102 @@

#' Create cluster
#' Create a rbmi ready cluster
#'
#' @param ncores Number of parallel processes to use
#' @param ncores Number of parallel processes to use or an existing cluster to make use of
#' @param objects a named list of objects to export into the sub-processes
#' @param packages a character vector of libraries to load in the sub-processes
#'
#' If `ncores` is `1` this function will return NULL
#' This function spawns a PSOCK cluster.
#' Ensures that `rbmi` and `assert_that` have been loaded
#' on the sub-processes
#' This function is a wrapper around `parallel::makePSOCKcluster()` but takes
#' care of configuring rbmi to be used in the sub-processes as well as loading
#' user defined objects and libraries and setting the seed for reproducibility.
#'
get_cluster <- function(ncores = 1) {
if (ncores == 1) {
#' If `ncores` is `1` this function will return `NULL`.
#'
#' If `ncores` is a cluster created via `parallel::makeCluster()` then this function
#' just takes care of inserting the relevant rbmi objects into the existing cluster.
#'
#' @examples
#' \dontrun{
#' # Basic usage
#' make_rbmi_cluster(5)
#'
#' # User objects + libraries
#' VALUE <- 5
#' myfun <- function(x) {
#' x + day(VALUE) # From lubridate::day()
#' }
#' make_rbmi_cluster(5, list(VALUE = VALUE, myfun = myfun), c("lubridate"))
#'
#' # Using a already created cluster
#' cl <- parallel::makeCluster(5)
#' make_rbmi_cluster(cl)
#' }
#' @export
make_rbmi_cluster <- function(ncores = 1, objects = NULL, packages = NULL) {

if (is.numeric(ncores) && ncores == 1) {
return(NULL)
} else if (is.numeric(ncores)) {
cl <- parallel::makePSOCKcluster(ncores)
} else if (is(ncores, "cluster")) {
cl <- ncores
} else {
stop(sprintf(
"`ncores` has unsupported class of: %s",
paste(class(ncores), collapse = ", ")
))
}

# Load user defined objects into the globalname space
if (!is.null(objects) && length(objects)) {
export_env <- list2env(objects)
parallel::clusterExport(cl, names(objects), export_env)
}

cl <- parallel::makePSOCKcluster(
ncores
# Load user defined packages
packages <- if (is.null(packages)) {
# TODO - can't remember why this is needed; need to look into
"assertthat"
} else {
c(packages, "assertthat")
}
# Remove attempts to load rbmi as this will be covered later
packages <- grep("^rbmi$", packages, value = TRUE, invert = TRUE)
devnull <- parallel::clusterCall(
cl,
function(pkgs) lapply(pkgs, function(x) library(x, character.only = TRUE)),
as.list(packages)
)

devnull <- parallel::clusterEvalQ(cl, {
library(assertthat)
})
# Ensure reproducibility
parallel::clusterSetRNGStream(cl, sample.int(1))

# If user has previously configured rbmi sub-processes then early exit
exported_rbmi <- unlist(parallel::clusterEvalQ(cl, exists("..exported..parallel..rbmi")))
if (all(exported_rbmi)) {
return(cl)
}

# Ensure that exported and unexported objects are all directly accessible
# from the globalenv in the sub-processes
if (is_in_rbmi_development()) {
devnull <- parallel::clusterEvalQ(cl, pkgload::load_all())
} else {
devnull <- parallel::clusterEvalQ(
cl,
{
# Here we "export" both exported and non-exported functions
# from the package to the global environment of our subprocesses
.namespace <- getNamespace("rbmi")
for (.nsfun in ls(.namespace)) {
assign(.nsfun, get(.nsfun, envir = .namespace))
}
}
)
}

# Set variable to signify rbmi has been configured
devnull <- parallel::clusterEvalQ(cl, {
..exported..parallel..rbmi <- TRUE
})

return(cl)
}

Expand Down Expand Up @@ -65,46 +126,19 @@ is_in_rbmi_development <- function() {



#' Encapsulate get_mmrm_sample
#'
#' Function creates a new wrapper function around [get_mmrm_sample()]
#' so that the arguments of [get_mmrm_sample()] are enclosed within
#' the new function. This makes running parallel and single process
#' calls to the function smoother. In particular this function takes care
#' of exporting the arguments if required to parallel process in a cluster
#'
#' @seealso [get_cluster()] for more documentation on the function inputs
#' Parallelise Lapply
#'
#' @param cl Either a cluster from [get_cluster()] or `NULL`
#' @param longdata A longdata object from `longDataConstructor$new()`
#' @param method A method object
encap_get_mmrm_sample <- function(cl, longdata, method) {
fun <- function(ids) {
get_mmrm_sample(
ids = ids,
longdata = longdata,
method = method
)
}
lfun <- function(ids) {
lapply(ids, fun)
}

#' Simple wrapper around `lapply` and [`parallel::clusterApplyLB`] to abstract away
#' the logic of deciding which one to use
#' @param cl Cluster created by [`parallel::makeCluster()`] or `NULL`
#' @param fun Function to be run
#' @param x object to be looped over
#' @param ... extra arguements passed to `fun`
par_lapply <- function(cl, fun, x, ...) {
if (is.null(cl)) {
return(lfun)
}

parallel::clusterExport(
cl = cl,
varlist = c("longdata", "method"),
envir = environment()
)

lfun <- function(ids) {
parallel::clusterApplyLB(cl, ids, fun)
return(lapply(x, fun, ...))
} else {
return(parallel::clusterApplyLB(cl, x, fun, ...))
}

return(lfun)
}


Loading

0 comments on commit 85f40b0

Please sign in to comment.