Skip to content

Commit

Permalink
Parallelization of the projection (#235)
Browse files Browse the repository at this point in the history
* Parallelize the projection (i.e., parallelize across the projected draws). A

related discussion may be found in issue #77.

* Introduce global option `projpred.nprjdraws_parallel`.

* Extend docs for parallelization.

* Improve the documentation for global option `projpred.nprjdraws_parallel` (and

adapt the code correspondingly).

* Increase the default for option `projpred.nprjdraws_parallel` from

100 to 200 to ensure a speed improvement even for `devtools::load_all()` instead
of `library(projpred)` (for `devtools::load_all()`, parallelization seems to be
a bit slower).

* Iterate over the objects in the `foreach()` loop themselves instead of their index. This should

avoid the export of those (large) objects.

* Use the `.export` argument in the `foreach()` call to handle exports manually. This could

avoid the unnecessary export of (large) objects which are unused.

* Use the `.noexport` argument in the `foreach()` call to handle exports manually. This could

avoid the unnecessary export of (large) objects which are unused.

* Check the most important (largest) objects which should not be exported.

* Set `.noexport` manually.

* Explain why we have the global option `projpred.nprjdraws_parallel`.

* Set `.export` manually (necessary for the `doFuture` package with `options(doFuture.foreach.export = ".export")`).

* Make the parallelization of the projection an optional feature.

* Add comments concerning the parallelization.

* Rename option `projpred.nprjdraws_parallel` to `projpred.prll_prj_trigger`.

* Throw errors if packages **foreach** and **iterators** are not installed.

* Add package **parallel** to `Suggests:`.

* Add package **doFuture** to `Suggests:` (only needed for the parallel tests, though, which are not run on CRAN by default).

* Add packages **future** and **future.callr** to `Suggests:`.

* `bootstrap()`: Remove argument `oobfun` which is not used within **projpred** (and makes future modifications of `bootstrap()` more complicated).

* `bootstrap()`: Replace `seq.int()` by `seq_len()`, as recommended in `?seq.int`.

* `bootstrap()`: Improve a comment.

* Parallelize `bootstrap()`.

* Extend the docs for the parallelized projection (in ``?`projpred-package` ``).

* Test the `"noclust"` setting (for projecting from a reference model) more often.

* Avoid a failing "`offsetnew` works" test due to numerical inaccuracies for extreme values.

* Add test "PRNG is not taking place where not expected".

* Simplify the test "PRNG is not taking place where not expected".

* Rename test
"PRNG is not taking place where not expected"
to
"non-clustered projection does not require a seed".

* Add tests for parallelization.

* Move the code for the sequential `bootstrap()` run from `setup.R` to `test_parallel.R`.

* Revert the parallelization of the bootstrap. The first reason is

that in order for the parallel test results to match the sequential ones
exactly, we would have to use doRNG::`%dorng%`() also in the sequential case of
`bootstrap()` and then, for the sequential results in the tests, not register
any **foreach** backend at all or use `registerDoSEQ()`. The second reason is
that, because of the parallelization overhead, the parallelized bootstrap didn't
provide a speed improvement, at least not up to the number of bootstrap samples
used by default (2000).

* Even if not on CRAN, `R CMD check` imposes a limit of 2 cores (at least by the defaults used in RStudio).
  • Loading branch information
fweber144 authored Oct 12, 2021
1 parent 58581eb commit ef3cfc2
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 37 deletions.
9 changes: 8 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ Suggests:
glmnet,
bayesplot (>= 1.5.0),
optimx,
posterior
posterior,
parallel,
foreach,
iterators,
doParallel,
future,
future.callr,
doFuture
LinkingTo: Rcpp, RcppArmadillo
LazyData: TRUE
Roxygen: list(markdown = TRUE)
Expand Down
60 changes: 51 additions & 9 deletions R/divergence_minimizers.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Divergence minimizers ---------------------------------------------------

# Needed to avoid a NOTE in `R CMD check`:
if (getRversion() >= package_version("2.15.1")) {
utils::globalVariables("formula_s")
utils::globalVariables("projpred_var_s")
utils::globalVariables("projpred_formula_no_random_s")
}

divmin <- function(formula, projpred_var, ...) {
trms_all <- extract_terms_response(formula)
has_grp <- length(trms_all$group_terms) > 0
Expand Down Expand Up @@ -29,15 +36,50 @@ divmin <- function(formula, projpred_var, ...) {
)
}

lapply(seq_along(formulas), function(s) {
sdivmin(
formula = formulas[[s]],
projpred_var = projpred_var[, s, drop = FALSE],
projpred_formula_no_random = projpred_formulas_no_random[[s]],
projpred_random = projpred_random,
...
)
})
if (length(formulas) < getOption("projpred.prll_prj_trigger", Inf)) {
# Sequential case. Actually, we could simply use ``%do_projpred%` <-
# foreach::`%do%`` here and then proceed as in the parallel case, but that
# would require adding more "hard" dependencies (because packages 'foreach'
# and 'iterators' would have to be moved from `Suggests:` to `Imports:`).
return(lapply(seq_along(formulas), function(s) {
sdivmin(
formula = formulas[[s]],
projpred_var = projpred_var[, s, drop = FALSE],
projpred_formula_no_random = projpred_formulas_no_random[[s]],
projpred_random = projpred_random,
...
)
}))
} else {
# Parallel case.
if (!requireNamespace("foreach", quietly = TRUE)) {
stop("Please install the 'foreach' package.")
}
if (!requireNamespace("iterators", quietly = TRUE)) {
stop("Please install the 'iterators' package.")
}
dot_args <- list(...)
`%do_projpred%` <- foreach::`%dopar%`
return(foreach::foreach(
formula_s = formulas,
projpred_var_s = iterators::iter(projpred_var, by = "column"),
projpred_formula_no_random_s = projpred_formulas_no_random,
.export = c("sdivmin", "projpred_random", "dot_args"),
.noexport = c(
"object", "p_sel", "p_pred", "search_path", "p_ref", "refmodel",
"formulas", "projpred_var", "projpred_formulas_no_random"
)
) %do_projpred% {
do.call(
sdivmin,
c(list(formula = formula_s,
projpred_var = projpred_var_s,
projpred_formula_no_random = projpred_formula_no_random_s,
projpred_random = projpred_random),
dot_args)
)
})
}
}

fit_glm_ridge_callback <- function(formula, data, projpred_var = 0,
Expand Down
25 changes: 6 additions & 19 deletions R/misc.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,38 +53,25 @@ auc <- function(x) {
return(sum(delta_fpr * tpr) + sum(delta_fpr * delta_tpr) / 2)
}

bootstrap <- function(x, fun = mean, b = 1000, oobfun = NULL, seed = NULL,
...) {
#
# bootstrap an arbitrary quantity fun that takes the sample x
# as the first input. other parameters to fun can be passed in as ...
# example: boostrap(x,mean)
#

# Bootstrap an arbitrary quantity `fun` that takes the sample `x` as the first
# input. Other arguments of `fun` can be passed by `...`. Example:
# `boostrap(x, mean)`.
bootstrap <- function(x, fun = mean, b = 1000, seed = NULL, ...) {
# set random seed but ensure the old RNG state is restored on exit
if (exists(".Random.seed")) {
rng_state_old <- .Random.seed
on.exit(assign(".Random.seed", rng_state_old, envir = .GlobalEnv))
}
set.seed(seed)

seq_x <- seq.int(NROW(x))
seq_x <- seq_len(NROW(x))
is_vector <- NCOL(x) == 1
bsstat <- rep(NA, b)
oobstat <- rep(NA, b)
for (i in 1:b) {
bsind <- sample(seq_x, replace = TRUE)
bsstat[i] <- fun(if (is_vector) x[bsind] else x[bsind, ], ...)
if (!is.null(oobfun)) {
oobind <- setdiff(seq_x, unique(bsind))
oobstat[i] <- oobfun(if (is_vector) x[oobind] else x[oobind, ], ...)
}
}
if (!is.null(oobfun)) {
return(list(bs = bsstat, oob = oobstat))
} else {
return(bsstat)
}
return(bsstat)
}

.is.wholenumber <- function(x) {
Expand Down
20 changes: 20 additions & 0 deletions R/projpred-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@
#' [brms::get_refmodel.brmsfit()]---also [brms::bernoulli()]), as well as
#' [poisson()].
#'
#' The projection of the reference model onto a submodel can be run on multiple
#' CPU cores in parallel (across the projected draws). This is powered by the
#' \pkg{foreach} package. Thus, you can use any parallel (or sequential) backend
#' compatible with \pkg{foreach}, e.g., the backends from packages
#' \pkg{doParallel}, \pkg{doMPI}, or \pkg{doFuture}. Using the global option
#' `projpred.prll_prj_trigger`, you can modify the number of projected draws
#' below which no parallelization is used (even if a parallel backend is
#' registered). Such a "trigger" threshold exists because of the computational
#' overhead of a parallelization which makes parallelization only useful for a
#' sufficiently large number of projected draws. By default, parallelization is
#' turned off, which can also be achieved by supplying `Inf` (or `NULL`) to
#' option `projpred.prll_prj_trigger`. Note that we cannot recommend
#' parallelizing the projection on Windows because in our experience, the
#' parallelization overhead is larger there, causing a parallel run to take
#' longer than a sequential run. Also note that the parallelization works well
#' for GLMs, but for GLMMs, GAMs, and GAMMs, the fitted model objects are quite
#' big, which---when running in parallel---may lead to an excessive memory usage
#' which in turn may crash the R session. Thus, we currently cannot recommend
#' the parallelization for GLMMs, GAMs, and GAMMs.
#'
#' See the vignettes
#' (\href{https://mc-stan.org/projpred/articles/quickstart.html}{quickstart-vignette}
#' and
Expand Down
20 changes: 20 additions & 0 deletions man/projpred-package.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 77 additions & 8 deletions tests/testthat/setup.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,68 @@ run_snaps <- identical(Sys.getenv("NOT_CRAN"), "true") &&
if (run_snaps) {
testthat_ed_max2 <- edition_get() <= 2
}
# Run parallel tests (see notes below)?:
run_prll <- identical(Sys.getenv("NOT_CRAN"), "true") &&
!identical(.Platform$OS.type, "windows")
if (run_prll) {
# Notes:
# * Currently, parallelization only works reliably for GLMs (because of
# memory issues for more complex models like GLMMs, GAMs and GAMMs).
# Therefore, we only test GLMs here.
# * Currently, parallelization on Windows takes longer than running
# sequentially. This makes parallelization impractical on Windows, so we
# don't run the tests on Windows by default.

ncores <- parallel::detectCores(logical = FALSE)
if (ncores == 1) {
warning("Deactivating the parallel tests because only a single worker ",
"could be detected.")
run_prll <- FALSE
}
# Do not run on more than 2 cores if requested so:
if (identical(Sys.getenv("_R_CHECK_LIMIT_CORES_"), "TRUE")) {
ncores <- min(ncores, 2L)
}
# Use the 'doParallel' package on all platforms except Windows. For Windows,
# the 'doFuture' package provides a faster alternative via the 'future.callr'
# package (which is still slower than a sequential run, though):
if (!identical(.Platform$OS.type, "windows")) {
if (!requireNamespace("doParallel", quietly = TRUE)) {
stop("Package \"doParallel\" is needed for these tests. Please ",
"install it.",
call. = FALSE)
}
dopar_backend <- "doParallel"
} else {
# This case (which should not be possible by default) is only included
# here to demonstrate how parallelization should be used on Windows (but
# currently, this makes no sense, as explained above).
if (!requireNamespace("doFuture", quietly = TRUE)) {
stop("Package \"doFuture\" is needed for these tests. Please ",
"install it.",
call. = FALSE)
}
dopar_backend <- "doFuture"
if (identical(.Platform$OS.type, "windows")) {
### Not used in this case because the 'future.callr' package provides a
### faster alternative on Windows (which is still slower than a sequential
### run, though):
# future_plan <- "multisession"
###
if (!requireNamespace("future.callr", quietly = TRUE)) {
stop("Package \"future.callr\" is needed for these tests. Please ",
"install it.",
call. = FALSE)
}
future_plan <- "callr"
} else {
# This case (which should not be possible by default) is only included
# here to demonstrate how other systems should be used with the 'doFuture'
# package.
future_plan <- "multicore"
}
}
}

source(testthat::test_path("helpers", "unlist_cust.R"), local = TRUE)
source(testthat::test_path("helpers", "testers.R"), local = TRUE)
Expand Down Expand Up @@ -664,7 +726,7 @@ if (run_cvvs) {
args_cvvs <- unlist_cust(args_cvvs)

# Use suppressWarnings() because of occasional warnings concerning Pareto k
# diagnostics: Additionally to suppressWarnings(), suppressMessages() could be
# diagnostics. Additionally to suppressWarnings(), suppressMessages() could be
# used here (because of the refits in K-fold CV):
cvvss <- suppressWarnings(lapply(args_cvvs, function(args_cvvs_i) {
do.call(cv_varsel, c(
Expand All @@ -684,6 +746,7 @@ tstsetups_prj_ref <- setNames(
value = TRUE, invert = TRUE)
)
args_prj <- lapply(tstsetups_prj_ref, function(tstsetup_ref) {
pkg_crr <- args_ref[[tstsetup_ref]]$pkg_nm
mod_crr <- args_ref[[tstsetup_ref]]$mod_nm
fam_crr <- args_ref[[tstsetup_ref]]$fam_nm
solterms <- nlist(empty = character(), solterms_x)
Expand All @@ -708,15 +771,21 @@ args_prj <- lapply(tstsetups_prj_ref, function(tstsetup_ref) {
solterms <- nlist(solterms_spcl)
}
lapply(setNames(nm = names(solterms)), function(solterms_nm_i) {
if (mod_crr == "glm" && fam_crr == "gauss" &&
solterms_nm_i == "solterms_x") {
if (pkg_crr == "rstanarm" && mod_crr == "glm" &&
fam_crr == "gauss" && solterms_nm_i == "solterms_x") {
ndr_ncl_pred <- ndr_ncl_pred_tst
} else if ((mod_crr == "glm" && fam_crr == "gauss" &&
solterms_nm_i == "empty") ||
(mod_crr == "glmm" && fam_crr == "binom")) {
ndr_ncl_pred <- ndr_ncl_pred_tst[c("clust", "clust1")]
} else if (pkg_crr == "rstanarm" && mod_crr == "glm" &&
fam_crr == "gauss" && solterms_nm_i == "empty") {
ndr_ncl_pred <- ndr_ncl_pred_tst[c("noclust", "clust", "clust1")]
} else if ((pkg_crr == "rstanarm" && mod_crr == "glmm" &&
fam_crr == "brnll" && solterms_nm_i == "solterms_xz") ||
(pkg_crr == "rstanarm" && mod_crr == "gam" &&
fam_crr == "binom" && solterms_nm_i == "solterms_xs") ||
(pkg_crr == "rstanarm" && mod_crr == "gamm" &&
fam_crr == "brnll" && solterms_nm_i == "solterms_xsz")) {
ndr_ncl_pred <- ndr_ncl_pred_tst[c("noclust", "clust")]
} else {
ndr_ncl_pred <- ndr_ncl_pred_tst["clust"]
ndr_ncl_pred <- ndr_ncl_pred_tst[c("clust")]
}
lapply(ndr_ncl_pred, function(ndr_ncl_pred_i) {
return(c(
Expand Down
92 changes: 92 additions & 0 deletions tests/testthat/test_parallel.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
context("parallel")

# Setup -------------------------------------------------------------------

if (run_prll) {
trigger_default <- options(projpred.prll_prj_trigger = 0L)

if (dopar_backend == "doParallel") {
doParallel::registerDoParallel(ncores)
} else if (dopar_backend == "doFuture") {
doFuture::registerDoFuture()
export_default <- options(doFuture.foreach.export = ".export")
if (future_plan == "callr") {
future::plan(future.callr::callr, workers = ncores)
} else if (future_plan == "multisession") {
future::plan(future::multisession, workers = ncores)
} else if (future_plan == "multicore") {
future::plan(future::multicore, workers = ncores)
} else {
stop("Unrecognized `future_plan`.")
}
} else {
stop("Unrecognized `dopar_backend`.")
}
stopifnot(identical(foreach::getDoParWorkers(), ncores))
}

# project() ---------------------------------------------------------------

test_that("project() in parallel gives the same results as sequentially", {
skip_if_not(run_prll)
tstsetups <- grep("\\.glm\\.", names(prjs), value = TRUE)
for (tstsetup in tstsetups) {
args_prj_i <- args_prj[[tstsetup]]
p_repr <- do.call(project, c(
list(object = refmods[[args_prj_i$tstsetup_ref]]),
excl_nonargs(args_prj_i)
))
expect_equal(p_repr, prjs[[tstsetup]], info = tstsetup)
}
})

# varsel() ----------------------------------------------------------------

test_that("varsel() in parallel gives the same results as sequentially", {
skip_if_not(run_prll)
skip_if_not(run_vs)
tstsetups <- grep("\\.glm\\.", names(vss), value = TRUE)
for (tstsetup in tstsetups) {
args_vs_i <- args_vs[[tstsetup]]
vs_repr <- do.call(varsel, c(
list(object = refmods[[args_vs_i$tstsetup_ref]]),
excl_nonargs(args_vs_i)
))
expect_equal(vs_repr, vss[[tstsetup]], info = tstsetup)
}
})

# cv_varsel() -------------------------------------------------------------

test_that("cv_varsel() in parallel gives the same results as sequentially", {
skip_if_not(run_prll)
skip_if_not(run_cvvs)
tstsetups <- grep("\\.glm\\.", names(cvvss), value = TRUE)
for (tstsetup in tstsetups) {
args_cvvs_i <- args_cvvs[[tstsetup]]
# Use suppressWarnings() because of occasional warnings concerning Pareto k
# diagnostics:
cvvs_repr <- suppressWarnings(do.call(cv_varsel, c(
list(object = refmods[[args_cvvs_i$tstsetup_ref]]),
excl_nonargs(args_cvvs_i)
)))
expect_equal(cvvs_repr, cvvss[[tstsetup]], info = tstsetup)
}
})

# Teardown ----------------------------------------------------------------

if (run_prll) {
if (dopar_backend == "doParallel") {
doParallel::stopImplicitCluster()
} else if (dopar_backend == "doFuture") {
future::plan(future::sequential)
options(doFuture.foreach.export = export_default$doFuture.foreach.export)
rm(export_default)
} else {
stop("Unrecognized `dopar_backend`.")
}

options(projpred.prll_prj_trigger = trigger_default$projpred.prll_prj_trigger)
rm(trigger_default)
}
Loading

0 comments on commit ef3cfc2

Please sign in to comment.