diff --git a/.Rbuildignore b/.Rbuildignore index 8c1f77aa..6eebfb9c7 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -12,3 +12,5 @@ ^\.github$ ^vignettes/articles-online-only$ ^release-prep\.R$ +^doc$ +^Meta$ diff --git a/.gitignore b/.gitignore index db3f1c32..9f0bb3a7 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ inst/doc dev-helpers.R release-prep.R +/doc/ +/Meta/ diff --git a/NAMESPACE b/NAMESPACE index c8a6217d..88f5380f 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -26,6 +26,7 @@ export(read_cmdstan_csv) export(read_sample_csv) export(rebuild_cmdstan) export(register_knitr_engine) +export(remaining_columns_to_read) export(set_cmdstan_path) export(set_num_threads) export(write_stan_file) diff --git a/R/args.R b/R/args.R index d6851852..436b8df1 100644 --- a/R/args.R +++ b/R/args.R @@ -1446,7 +1446,7 @@ validate_seed <- function(seed, num_procs) { #' @return An integer vector of length `num_procs`. maybe_generate_seed <- function(seed, num_procs) { if (is.null(seed)) { - seed <- base::sample(.Machine$integer.max, 1) + seed <- base::sample(.Machine$integer.max, num_procs) } else if (length(seed) == 1 && num_procs > 1) { seed <- rep(as.integer(seed), num_procs) } else if (length(seed) != num_procs) { diff --git a/R/cmdstanr-package.R b/R/cmdstanr-package.R index 305241bf..d1f34bf1 100644 --- a/R/cmdstanr-package.R +++ b/R/cmdstanr-package.R @@ -30,6 +30,6 @@ #' @inherit cmdstan_model examples #' @import R6 #' -NULL +"_PACKAGE" if (getRversion() >= "2.15.1") utils::globalVariables(c("self", "private", "super")) diff --git a/R/csv.R b/R/csv.R index c0d85b43..e2087976 100644 --- a/R/csv.R +++ b/R/csv.R @@ -927,6 +927,7 @@ unrepair_variable_names <- function(names) { names } +#' @export remaining_columns_to_read <- function(requested, currently_read, all) { if (is.null(requested)) { if (is.null(all)) { diff --git a/R/fit.R b/R/fit.R index 4ca6753b..4ffe3bfa 100644 --- a/R/fit.R +++ b/R/fit.R @@ -1347,11 +1347,7 @@ CmdStanMCMC <- R6::R6Class( }, # override the CmdStanFit output method output = function(id = NULL) { - if (is.null(id)) { self$runset$procs$proc_output() - } else { - cat(paste(self$runset$procs$proc_output(id), collapse = "\n")) - } }, # override the CmdStanFit draws method diff --git a/R/model.R b/R/model.R index 2dee3672..3644910b 100644 --- a/R/model.R +++ b/R/model.R @@ -63,7 +63,7 @@ #' data = stan_data, #' seed = 123, #' chains = 2, -#' parallel_chains = 2 +#' threads = 2 #' ) #' #' # Use 'posterior' package for summaries @@ -341,6 +341,9 @@ CmdStanModel <- R6::R6Class( "- ", new_hpp_loc) private$hpp_file_ <- new_hpp_loc invisible(private$hpp_file_) + }, + threads_enabled = function() { + return(as.logical(private$cpp_options_[["STAN_THREADS"]])) } ) ) @@ -414,7 +417,6 @@ CmdStanModel <- R6::R6Class( #' [`$expose_functions()`][model-method-expose_functions] method. #' @param dry_run (logical) If `TRUE`, the code will do all checks before compilation, #' but skip the actual C++ compilation. Used to speedup tests. -#' #' @param threads Deprecated and will be removed in a future release. Please #' turn on threading via `cpp_options = list(stan_threads = TRUE)` instead. #' @@ -461,7 +463,7 @@ compile <- function(quiet = TRUE, pedantic = FALSE, include_paths = NULL, user_header = NULL, - cpp_options = list(), + cpp_options = list(stan_threads = os_use_single_process()), stanc_options = list(), force_recompile = getOption("cmdstanr_force_recompile", default = FALSE), compile_model_methods = FALSE, @@ -1218,9 +1220,19 @@ sample <- function(data = NULL, if (fixed_param) { save_warmup <- FALSE } + if (self$threads_enabled()) { + num_procs = 1 + parallel_procs = 1 + threads_per_proc = threads + } else { + num_procs = chains + parallel_procs = chains + threads_per_proc = as.integer(threads / chains) + } procs <- CmdStanMCMCProcs$new( - num_procs = 1, - parallel_procs = 1, + num_procs = num_procs, + parallel_procs = parallel_procs, + threads_per_proc = assert_valid_threads(threads_per_proc, self$cpp_options(), multiple_chains = TRUE), show_stderr_messages = show_exceptions, show_stdout_messages = show_messages ) @@ -1266,7 +1278,7 @@ sample <- function(data = NULL, sig_figs = sig_figs, opencl_ids = assert_valid_opencl(opencl_ids, self$cpp_options()), model_variables = model_variables, - threads = threads + threads = threads_per_proc ) runset <- CmdStanRun$new(args, procs) runset$run_cmdstan() @@ -1382,9 +1394,19 @@ sample_mpi <- function(data = NULL, chains <- 1 save_warmup <- FALSE } + if (self$threads_enabled()) { + num_procs = 1 + parallel_procs = 1 + threads_per_proc = threads + } else { + num_procs = chains + parallel_procs = chains + threads_per_proc = as.integer(threads / num_chains) + } procs <- CmdStanMCMCProcs$new( - num_procs = 1, - parallel_procs = 1, + num_procs = num_procs, + parallel_procs = parallel_procs, + threads_per_proc = assert_valid_threads(threads_per_proc, self$cpp_options(), multiple_chains = TRUE), show_stderr_messages = show_exceptions, show_stdout_messages = show_messages ) @@ -1455,10 +1477,6 @@ CmdStanModel$set("public", name = "sample_mpi", value = sample_mpi) #' metadata of an example model, e.g., #' `cmdstanr_example(method="optimize")$metadata()`. #' @template model-common-args -#' @param threads (positive integer) If the model was -#' [compiled][model-method-compile] with threading support, the number of -#' threads to use in parallelized sections (e.g., when -#' using the Stan functions `reduce_sum()` or `map_rect()`). #' @param iter (positive integer) The maximum number of iterations. #' @param algorithm (string) The optimization algorithm. One of `"lbfgs"`, #' `"bfgs"`, or `"newton"`. The control parameters below are only available @@ -1509,8 +1527,12 @@ optimize <- function(data = NULL, history_size = NULL, show_messages = TRUE, show_exceptions = TRUE) { + if (self$threads_enabled() && is.null(threads)) { + threads <- 1 + } procs <- CmdStanProcs$new( num_procs = 1, + threads_per_proc = assert_valid_threads(threads, self$cpp_options(), multiple_chains = TRUE), show_stderr_messages = show_exceptions, show_stdout_messages = show_messages ) @@ -1579,7 +1601,6 @@ CmdStanModel$set("public", name = "optimize", value = optimize) #' installed version of CmdStan. #' #' @template model-common-args -#' @inheritParams model-method-optimize #' @param save_latent_dynamics Ignored for this method. #' @param mode (multiple options) The mode to center the approximation at. One #' of the following: @@ -1647,8 +1668,12 @@ laplace <- function(data = NULL, if (!is.null(mode) && !is.null(opt_args)) { stop("Cannot specify both 'opt_args' and 'mode' arguments.", call. = FALSE) } + if (self$threads_enabled() && is.null(threads)) { + threads <- 1 + } procs <- CmdStanProcs$new( num_procs = 1, + threads_per_proc = assert_valid_threads(threads, self$cpp_options(), multiple_chains = TRUE), show_stderr_messages = show_exceptions, show_stdout_messages = show_messages ) @@ -1743,10 +1768,6 @@ CmdStanModel$set("public", name = "laplace", value = laplace) #' installed version of CmdStan. #' #' @template model-common-args -#' @param threads (positive integer) If the model was -#' [compiled][model-method-compile] with threading support, the number of -#' threads to use in parallelized sections (e.g., when using the Stan -#' functions `reduce_sum()` or `map_rect()`). #' @param algorithm (string) The algorithm. Either `"meanfield"` or #' `"fullrank"`. #' @param iter (positive integer) The _maximum_ number of iterations. @@ -1869,10 +1890,6 @@ CmdStanModel$set("public", name = "variational", value = variational) #' installed version of CmdStan #' #' @template model-common-args -#' @param num_threads (positive integer) If the model was -#' [compiled][model-method-compile] with threading support, the number of -#' threads to use in parallelized sections (e.g., for multi-path pathfinder -#' as well as `reduce_sum`). #' @param init_alpha (positive real) The initial step size parameter. #' @param tol_obj (positive real) Convergence tolerance on changes in objective function value. #' @param tol_rel_obj (positive real) Convergence tolerance on relative changes in objective function value. @@ -1938,8 +1955,12 @@ pathfinder <- function(data = NULL, calculate_lp = NULL, show_messages = TRUE, show_exceptions = TRUE) { + if (self$threads_enabled() && is.null(threads)) { + threads <- 1 + } procs <- CmdStanProcs$new( num_procs = 1, + threads_per_proc = assert_valid_threads(threads, self$cpp_options(), multiple_chains = TRUE), show_stderr_messages = show_exceptions, show_stdout_messages = show_messages ) @@ -2074,10 +2095,14 @@ generate_quantities <- function(fitted_params, threads = NULL, opencl_ids = NULL) { fitted_params_files <- process_fitted_params(fitted_params) + if (self$threads_enabled() && is.null(threads)) { + threads <- 1 + } procs <- CmdStanGQProcs$new( num_procs = length(fitted_params_files), parallel_procs = checkmate::assert_integerish(parallel_chains, lower = 1, - null.ok = TRUE) + null.ok = TRUE), + threads_per_proc = assert_valid_threads(threads, self$cpp_options(), multiple_chains = TRUE), ) model_variables <- NULL if (is_variables_method_supported(self)) { diff --git a/R/run.R b/R/run.R index 43cbedb8..2bd81cc9 100644 --- a/R/run.R +++ b/R/run.R @@ -118,10 +118,20 @@ CmdStanRun <- R6::R6Class( call. = FALSE ) } - private$latent_dynamics_files_ + if (include_failed) { + private$latent_dynamics_files_ + } else { + ok <- self$procs$is_finished() | self$procs$is_queued() + private$latent_dynamics_files_[ok] + } }, output_files = function(include_failed = FALSE) { + if (include_failed) { private$output_files_ + } else { + ok <- self$procs$is_finished() | self$procs$is_queued() + private$output_files_[ok] + } }, profile_files = function(include_failed = FALSE) { files <- private$profile_files_ @@ -132,7 +142,12 @@ CmdStanRun <- R6::R6Class( call. = FALSE ) } - files + if (include_failed) { + files + } else { + ok <- self$procs$is_finished() | self$procs$is_queued() + files[ok] + } }, save_output_files = function(dir = ".", basename = NULL, @@ -234,7 +249,7 @@ CmdStanRun <- R6::R6Class( command = function() self$args$command(), command_args = function(id = 1) { # create a list of character vectors (one per run/chain) of cmdstan arguments - if (inherits(self$args$method_args, "GenerateQuantitiesArgs")) { + if (self$procs$num_procs() > 1) { output_file = private$output_files_[id] latent_dynamic_file = private$latent_dynamics_files_[id] } else { @@ -429,24 +444,29 @@ check_target_exe <- function(exe) { start_time <- Sys.time() chain_id <- 1 while (!all(procs$is_finished() | procs$is_failed())) { - procs$new_proc( - id = chain_id, - command = self$command(), - args = self$command_args(), - wd = dirname(self$exe_file()), - mpi_cmd = mpi_cmd, - mpi_args = mpi_args - ) - procs$mark_proc_start(chain_id) - procs$set_active_procs(procs$active_procs() + 1) + while (procs$active_procs() != procs$parallel_procs() && procs$any_queued()) { + procs$new_proc( + id = chain_id, + command = self$command(), + args = self$command_args(chain_id), + wd = dirname(self$exe_file()), + mpi_cmd = mpi_cmd, + mpi_args = mpi_args + ) + procs$mark_proc_start(chain_id) + procs$set_active_procs(procs$active_procs() + 1) + chain_id <- chain_id + 1 + } start_active_procs <- procs$active_procs() while (procs$active_procs() == start_active_procs && procs$active_procs() > 0) { procs$wait(0.1) procs$poll(0) - if (!procs$is_queued(chain_id)) { - procs$process_output(chain_id) - procs$process_error_output(chain_id) + for (chain_iter in seq_len(chain_id)) { + if (!procs$is_queued(chain_iter)) { + procs$process_output(chain_iter) + procs$process_error_output(chain_iter) + } } procs$set_active_procs(procs$num_alive()) } @@ -629,10 +649,12 @@ CmdStanProcs <- R6::R6Class( public = list( initialize = function(num_procs, parallel_procs = NULL, + threads_per_proc = NULL, show_stderr_messages = TRUE, show_stdout_messages = TRUE) { checkmate::assert_integerish(num_procs, lower = 1, len = 1, any.missing = FALSE) checkmate::assert_integerish(parallel_procs, lower = 1, len = 1, any.missing = FALSE, null.ok = TRUE) + checkmate::assert_integerish(threads_per_proc, lower = 1, len = 1, null.ok = TRUE) private$num_procs_ <- as.integer(num_procs) if (is.null(parallel_procs)) { private$parallel_procs_ <- private$num_procs_ @@ -640,6 +662,7 @@ CmdStanProcs <- R6::R6Class( private$parallel_procs_ <- as.integer(parallel_procs) } private$active_procs_ <- 0 + private$threads_per_proc_ <- as.integer(threads_per_proc) private$proc_ids_ <- seq_len(num_procs) zeros <- rep(0, num_procs) names(zeros) <- private$proc_ids_ @@ -662,6 +685,9 @@ CmdStanProcs <- R6::R6Class( parallel_procs = function() { private$parallel_procs_ }, + threads_per_proc = function() { + private$threads_per_proc_ + }, proc_ids = function() { private$proc_ids_ }, @@ -883,6 +909,7 @@ CmdStanProcs <- R6::R6Class( num_procs_ = integer(), parallel_procs_ = integer(), active_procs_ = integer(), + threads_per_proc_ = integer(), proc_state_ = NULL, proc_start_time_ = NULL, proc_total_time_ = NULL, diff --git a/R/utils.R b/R/utils.R index dd40aa42..c670ef89 100644 --- a/R/utils.R +++ b/R/utils.R @@ -54,6 +54,10 @@ os_is_linux <- function() { isTRUE(Sys.info()[["sysname"]] == "Linux") } +os_use_single_process <- function() { + return(os_is_wsl() || os_is_linux()); +} + is_rtools43_toolchain <- function() { os_is_windows() && R.version$major == "4" && R.version$minor >= "3.0" } diff --git a/man-roxygen/model-common-args.R b/man-roxygen/model-common-args.R index ca609ebe..038d86c6 100644 --- a/man-roxygen/model-common-args.R +++ b/man-roxygen/model-common-args.R @@ -99,3 +99,8 @@ #' [`$output()`][fit-method-output] method of the resulting fit object can be #' used to display the silenced messages. #' +#' @param threads (positive integer) If the model was +#' [compiled][model-method-compile] with threading support, the number of +#' threads to use in parallelized sections (e.g., when for multiple chains +#' running in parallel and for using the Stan functions +#' `reduce_sum()` or `map_rect()`). diff --git a/man-roxygen/model-sample-args.R b/man-roxygen/model-sample-args.R index 240300a8..b743e0ef 100644 --- a/man-roxygen/model-sample-args.R +++ b/man-roxygen/model-sample-args.R @@ -1,22 +1,12 @@ #' @param chains (positive integer) The number of Markov chains to run. The #' default is 4. -#' @param parallel_chains (positive integer) The _maximum_ number of MCMC chains -#' to run in parallel. If `parallel_chains` is not specified then the default -#' is to look for the option `"mc.cores"`, which can be set for an entire \R -#' session by `options(mc.cores=value)`. If the `"mc.cores"` option has not -#' been set then the default is `1`. +#' @param parallel_chains (positive integer) Deprecated. Please use the `chains` +#' argument instead. #' @param chain_ids (integer vector) A vector of chain IDs. Must contain as many #' unique positive integers as the number of chains. If not set, the default #' chain IDs are used (integers starting from `1`). -#' @param threads_per_chain (positive integer) If the model was -#' [compiled][model-method-compile] with threading support, the number of -#' threads to use in parallelized sections _within_ an MCMC chain (e.g., when -#' using the Stan functions `reduce_sum()` or `map_rect()`). This is in -#' contrast with `parallel_chains`, which specifies the number of chains to -#' run in parallel. The actual number of CPU cores used is -#' `parallel_chains*threads_per_chain`. For an example of using threading see -#' the Stan case study -#' [Reduce Sum: A Minimal Example](https://mc-stan.org/users/documentation/case-studies/reduce_sum_tutorial.html). +#' @param threads_per_chain (positive integer) Deprecated. Please use the +#' threads argument instead. #' #' @param iter_sampling (positive integer) The number of post-warmup iterations #' to run per chain. Note: in the CmdStan User's Guide this is referred to as diff --git a/man/CmdStanModel.Rd b/man/CmdStanModel.Rd index 55200357..493360ec 100644 --- a/man/CmdStanModel.Rd +++ b/man/CmdStanModel.Rd @@ -82,7 +82,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/cmdstan_model.Rd b/man/cmdstan_model.Rd index b0432abc..001df430 100644 --- a/man/cmdstan_model.Rd +++ b/man/cmdstan_model.Rd @@ -68,7 +68,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/cmdstanr-package.Rd b/man/cmdstanr-package.Rd index c98b0b3a..1b041e88 100644 --- a/man/cmdstanr-package.Rd +++ b/man/cmdstanr-package.Rd @@ -95,7 +95,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/model-method-compile.Rd b/man/model-method-compile.Rd index 7bfa47d7..dd01fe5a 100644 --- a/man/model-method-compile.Rd +++ b/man/model-method-compile.Rd @@ -11,7 +11,7 @@ compile( pedantic = FALSE, include_paths = NULL, user_header = NULL, - cpp_options = list(), + cpp_options = list(stan_threads = os_use_single_process()), stanc_options = list(), force_recompile = getOption("cmdstanr_force_recompile", default = FALSE), compile_model_methods = FALSE, diff --git a/man/model-method-generate-quantities.Rd b/man/model-method-generate-quantities.Rd index ef7c2a33..cb12433a 100644 --- a/man/model-method-generate-quantities.Rd +++ b/man/model-method-generate-quantities.Rd @@ -85,11 +85,14 @@ values with 6 significant figures. The upper limit for \code{sig_figs} is 18. Increasing this value will result in larger output CSV files and thus an increased usage of disk space.} -\item{parallel_chains}{(positive integer) The \emph{maximum} number of MCMC chains -to run in parallel. If \code{parallel_chains} is not specified then the default -is to look for the option \code{"mc.cores"}, which can be set for an entire \R -session by \code{options(mc.cores=value)}. If the \code{"mc.cores"} option has not -been set then the default is \code{1}.} +\item{parallel_chains}{(positive integer) Deprecated. Please use the \code{chains} +argument instead.} + +\item{threads}{(positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} \item{opencl_ids}{(integer vector of length 2) The platform and device IDs of the OpenCL device to use for fitting. The model must diff --git a/man/model-method-laplace.Rd b/man/model-method-laplace.Rd index 253d67f5..841b61aa 100644 --- a/man/model-method-laplace.Rd +++ b/man/model-method-laplace.Rd @@ -108,8 +108,9 @@ increased usage of disk space.} \item{threads}{(positive integer) If the model was \link[=model-method-compile]{compiled} with threading support, the number of -threads to use in parallelized sections (e.g., when -using the Stan functions \code{reduce_sum()} or \code{map_rect()}).} +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} \item{opencl_ids}{(integer vector of length 2) The platform and device IDs of the OpenCL device to use for fitting. The model must diff --git a/man/model-method-optimize.Rd b/man/model-method-optimize.Rd index b9b53454..afb7c60f 100644 --- a/man/model-method-optimize.Rd +++ b/man/model-method-optimize.Rd @@ -122,8 +122,9 @@ increased usage of disk space.} \item{threads}{(positive integer) If the model was \link[=model-method-compile]{compiled} with threading support, the number of -threads to use in parallelized sections (e.g., when -using the Stan functions \code{reduce_sum()} or \code{map_rect()}).} +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} \item{opencl_ids}{(integer vector of length 2) The platform and device IDs of the OpenCL device to use for fitting. The model must @@ -219,7 +220,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/model-method-pathfinder.Rd b/man/model-method-pathfinder.Rd index bd1b4f3d..532f7ad7 100644 --- a/man/model-method-pathfinder.Rd +++ b/man/model-method-pathfinder.Rd @@ -130,6 +130,12 @@ device IDs of the OpenCL device to use for fitting. The model must be compiled with \code{cpp_options = list(stan_opencl = TRUE)} for this argument to have an effect.} +\item{threads}{(positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} + \item{init_alpha}{(positive real) The initial step size parameter.} \item{tol_obj}{(positive real) Convergence tolerance on changes in objective function value.} @@ -188,11 +194,6 @@ recommended unless you are very confident that the model is correct up to numerical error. If the messages are silenced then the \code{\link[=fit-method-output]{$output()}} method of the resulting fit object can be used to display the silenced messages.} - -\item{num_threads}{(positive integer) If the model was -\link[=model-method-compile]{compiled} with threading support, the number of -threads to use in parallelized sections (e.g., for multi-path pathfinder -as well as \code{reduce_sum}).} } \value{ A \code{\link{CmdStanPathfinder}} object. @@ -244,7 +245,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/model-method-sample.Rd b/man/model-method-sample.Rd index 6d628d57..5abcf45b 100644 --- a/man/model-method-sample.Rd +++ b/man/model-method-sample.Rd @@ -146,6 +146,12 @@ default is 4.} unique positive integers as the number of chains. If not set, the default chain IDs are used (integers starting from \code{1}).} +\item{threads}{(positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} + \item{opencl_ids}{(integer vector of length 2) The platform and device IDs of the OpenCL device to use for fitting. The model must be compiled with \code{cpp_options = list(stan_opencl = TRUE)} for this @@ -255,21 +261,11 @@ using the \code{\link[=fit-method-summary]{$summary()}} method.} \item{cores, num_cores, num_chains, num_warmup, num_samples, save_extra_diagnostics, max_depth, stepsize, validate_csv}{Deprecated and will be removed in a future release.} -\item{threads_per_chain}{(positive integer) If the model was -\link[=model-method-compile]{compiled} with threading support, the number of -threads to use in parallelized sections \emph{within} an MCMC chain (e.g., when -using the Stan functions \code{reduce_sum()} or \code{map_rect()}). This is in -contrast with \code{parallel_chains}, which specifies the number of chains to -run in parallel. The actual number of CPU cores used is -\code{parallel_chains*threads_per_chain}. For an example of using threading see -the Stan case study -\href{https://mc-stan.org/users/documentation/case-studies/reduce_sum_tutorial.html}{Reduce Sum: A Minimal Example}.} - -\item{parallel_chains}{(positive integer) The \emph{maximum} number of MCMC chains -to run in parallel. If \code{parallel_chains} is not specified then the default -is to look for the option \code{"mc.cores"}, which can be set for an entire \R -session by \code{options(mc.cores=value)}. If the \code{"mc.cores"} option has not -been set then the default is \code{1}.} +\item{threads_per_chain}{(positive integer) Deprecated. Please use the +threads argument instead.} + +\item{parallel_chains}{(positive integer) Deprecated. Please use the \code{chains} +argument instead.} } \value{ A \code{\link{CmdStanMCMC}} object. @@ -314,7 +310,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/man/model-method-sample_mpi.Rd b/man/model-method-sample_mpi.Rd index b3d981e5..4b8033cc 100644 --- a/man/model-method-sample_mpi.Rd +++ b/man/model-method-sample_mpi.Rd @@ -153,6 +153,12 @@ is \code{FALSE}.} \item{thin}{(positive integer) The period between saved samples. This should typically be left at its default (no thinning) unless memory is a problem.} +\item{threads}{(positive integer) If the model was +\link[=model-method-compile]{compiled} with threading support, the number of +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} + \item{max_treedepth}{(positive integer) The maximum allowed tree depth for the NUTS engine. See the \emph{Tree Depth} section of the CmdStan User's Guide for more details.} diff --git a/man/model-method-variational.Rd b/man/model-method-variational.Rd index 1b2d9a74..33bd7d7f 100644 --- a/man/model-method-variational.Rd +++ b/man/model-method-variational.Rd @@ -123,8 +123,9 @@ increased usage of disk space.} \item{threads}{(positive integer) If the model was \link[=model-method-compile]{compiled} with threading support, the number of -threads to use in parallelized sections (e.g., when using the Stan -functions \code{reduce_sum()} or \code{map_rect()}).} +threads to use in parallelized sections (e.g., when for multiple chains +running in parallel and for using the Stan functions +\code{reduce_sum()} or \code{map_rect()}).} \item{opencl_ids}{(integer vector of length 2) The platform and device IDs of the OpenCL device to use for fitting. The model must @@ -219,7 +220,7 @@ fit_mcmc <- mod$sample( data = stan_data, seed = 123, chains = 2, - parallel_chains = 2 + threads = 2 ) # Use 'posterior' package for summaries diff --git a/tests/testthat/test-failed-chains.R b/tests/testthat/test-failed-chains.R index 7bda0459..1c20c1fe 100644 --- a/tests/testthat/test-failed-chains.R +++ b/tests/testthat/test-failed-chains.R @@ -13,57 +13,20 @@ make_all_fail <- function(x) { ) all_fail } -if (FALSE) { - -make_some_fail <- function(x, seed = 0) { - num_files <- 0 - attempt <- 1 - set.seed(seed) - while (num_files == 0 || num_files == 4) { - utils::capture.output( - check_some_fail <- mod$sample( - data = list(pr_fail = 0.5), - save_latent_dynamics = TRUE, - chains = 4, - seed = base::sample(.Machine$integer.max, 1) - ) - ) - num_files <- length(check_some_fail$output_files(include_failed = FALSE)) - attempt <- attempt + 1 - } - check_some_fail -} # called here and also in tests below suppressWarnings( utils::capture.output( - # fit_all_fail <- make_all_fail(mod), - fit_some_fail <- make_some_fail(mod) + fit_all_fail <- make_all_fail(mod) ) ) test_that("correct warnings are thrown when all chains fail", { expect_warning( make_all_fail(mod), - "All chains finished unexpectedly!" - ) - for (i in 1:4) { - expect_output(fit_all_fail$output(i), "Location parameter is inf") - } -}) - -test_that("correct warnings are thrown when some chains fail", { - fit_tmp <- suppressWarnings(make_some_fail(mod, seed = 2022)) - expect_warning( - make_some_fail(mod, seed = 2022), - paste(4 - length(fit_tmp$output_files(include_failed = FALSE)), "chain(s) finished unexpectedly"), - fixed = TRUE + "Chain 1 finished unexpectedly!" ) - - failed <- !fit_some_fail$runset$procs$is_finished() - for (i in which(failed)) { - expect_output(fit_some_fail$output(i), "Location parameter is inf") - } + expect_match(paste0(fit_all_fail$output()), "Location parameter is inf") }) test_that("$output_files() and latent_dynamic_files() returns path to all files regardless of chain failure", { @@ -75,10 +38,6 @@ test_that("$output_files() and latent_dynamic_files() returns path to all files length(fit_all_fail$output_files(include_failed = FALSE)), 0 ) - expect_equal( - length(fit_some_fail$output_files(include_failed = TRUE)), - 4 - ) expect_equal( length(fit_all_fail$latent_dynamics_files(include_failed = TRUE)), 4 @@ -87,10 +46,6 @@ test_that("$output_files() and latent_dynamic_files() returns path to all files length(fit_all_fail$latent_dynamics_files(include_failed = FALSE)), 0 ) - expect_equal( - length(fit_some_fail$latent_dynamics_files(include_failed = TRUE)), - 4 - ) expect_equal( length(fit_all_fail$output_files()), 0 @@ -106,19 +61,10 @@ test_that("$save_* methods save all files regardless of chain failure", { fit_all_fail$save_output_files(dir = tempdir()), "Moved 4 files" ) - expect_message( - fit_some_fail$save_output_files(dir = tempdir()), - "Moved 4 files" - ) - expect_message( fit_all_fail$save_latent_dynamics_files(dir = tempdir()), "Moved 4 files" ) - expect_message( - fit_some_fail$save_latent_dynamics_files(dir = tempdir()), - "Moved 4 files" - ) }) test_that("errors when using draws after all chains fail", { @@ -133,14 +79,6 @@ test_that("errors when using draws after all chains fail", { expect_error(fit_all_fail$inv_metric(), "No chains finished successfully") }) -test_that("can use draws after some chains fail", { - expect_s3_class(fit_some_fail$summary(), "draws_summary") - expect_s3_class(fit_some_fail$draws(), "draws_array") - expect_output(fit_some_fail$cmdstan_summary(), "Inference for Stan model") - expect_output(fit_some_fail$cmdstan_diagnose(), "Processing complete") - expect_output(fit_some_fail$print(), "variable") -}) - test_that("init warnings are shown", { suppressWarnings( expect_message( @@ -214,4 +152,4 @@ test_that("gq chains error on wrong input CSV", { ) }) -} + diff --git a/tests/testthat/test-model-sample-metric.R b/tests/testthat/test-model-sample-metric.R index 422442fa..314843c7 100644 --- a/tests/testthat/test-model-sample-metric.R +++ b/tests/testthat/test-model-sample-metric.R @@ -50,7 +50,7 @@ test_that("sample() method works with provided inv_metrics", { expect_sample_output(fit_r <- mod$sample(data = data_list, chains = 3, - parallel_chains = 2, + threads = 2, metric = "dense_e", metric_file = inv_metric_matrix_r, seed = 123)) @@ -146,7 +146,7 @@ test_that("sample() method works with lists of inv_metrics", { expect_error(fit_r <- mod$sample(data = data_list, chains = 3, - parallel_chains = 2, + threads = 2, metric = "diag_e", inv_metric = list(inv_metric_vector, inv_metric_vector)), "2 metric\\(s\\) provided. Must provide 1 or 3 metric\\(s\\) for 3 chain\\(s\\)") @@ -168,7 +168,7 @@ test_that("sample() method works with lists of inv_metrics", { expect_error(fit_r <- mod$sample(data = data_list, chains = 3, - parallel_chains = 2, + threads = 2, metric = "diag_e", metric_file = c(inv_metric_vector_json, inv_metric_vector_json)), "2 metric\\(s\\) provided. Must provide 1 or 3 metric\\(s\\) for 3 chain\\(s\\)") diff --git a/tests/testthat/test-model-sample.R b/tests/testthat/test-model-sample.R index f8a8917a..bdb41ba6 100644 --- a/tests/testthat/test-model-sample.R +++ b/tests/testthat/test-model-sample.R @@ -15,7 +15,7 @@ ok_arg_values <- list( data = data_list, output_dir = tempdir(), chains = 2, - parallel_chains = 1, + threads = 1, iter_warmup = 50, iter_sampling = 100, save_warmup = FALSE, @@ -39,7 +39,7 @@ bad_arg_values <- list( data = "NOT_A_FILE", output_dir = "NOT_A_DIRECTORY", chains = -1, - parallel_chains = -1, + threads = -1, iter_warmup = -1, iter_sampling = -1, save_warmup = "NO", @@ -62,7 +62,7 @@ bad_arg_values_2 <- list( data = matrix(1:10), output_dir = 1, chains = "NOT_A_NUMBER", - parallel_chains = "NOT_A_NUMBER", + threads = "NOT_A_NUMBER", init = "NOT_A_FILE", seed = 1:10, step_size = 1:10, @@ -161,13 +161,13 @@ test_that("sample works for warmup-only run", { test_that("sampling in parallel works", { expect_output( - mod$sample(data = data_list, chains = 2, parallel_chains = 2), + mod$sample(data = data_list, chains = 2, threads = 2), "Running MCMC with 2 parallel chains", fixed = TRUE ) expect_output( - mod$sample(data = data_list, chains = 2, parallel_chains = 2), + mod$sample(data = data_list, chains = 2, threads = 2), "Both chains finished successfully", fixed = TRUE ) diff --git a/tests/testthat/test-threads.R b/tests/testthat/test-threads.R index 1a333e82..c55dd2a1 100644 --- a/tests/testthat/test-threads.R +++ b/tests/testthat/test-threads.R @@ -11,56 +11,44 @@ test_that("using threads_per_chain without stan_threads set in compile() warns", mod <- cmdstan_model(stan_program) expect_warning( expect_output( - mod$sample(data = data_file_json, threads_per_chain = 4), - "Running MCMC with 4 sequential chains", + mod$sample(data = data_file_json, threads = 4), + "Running MCMC with 4 chains and 4 threads", fixed = TRUE ), - "'threads_per_chain' is set but the model was not compiled with 'cpp_options = list(stan_threads = TRUE)' so 'threads_per_chain' will have no effect!", + "'threads' is set but the model was not compiled with 'cpp_options = list(stan_threads = TRUE)' so 'threads_per_chain' will have no effect!", fixed = TRUE) }) test_that("threading works with sample()", { mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE), force_recompile = TRUE) - expect_error( - mod$sample(data = data_file_json), - "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads_per_chain' was not set!", - fixed = TRUE - ) - expect_output( - f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 1), - "Running MCMC with 4 parallel chains, with 1 thread(s) per chain..", + f <- mod$sample(data = data_file_json, chains = 4, threads = 4), + "Running MCMC with 4 chains and 4 threads..", fixed = TRUE ) - expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 1) - expect_equal(f$metadata()$threads_per_chain, 1) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) + expect_equal(f$metadata()$threads, 4) expect_output( - f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 2), - "Running MCMC with 4 parallel chains, with 2 thread(s) per chain..", + f <- mod$sample(data = data_file_json, chains = 4, threads = 2), + "Running MCMC with 4 chains and 2 threads..", fixed = TRUE ) expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) - expect_equal(f$metadata()$threads_per_chain, 2) + expect_equal(f$metadata()$threads, 2) expect_output( - f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 4), - "Running MCMC with 4 parallel chains, with 4 thread(s) per chain..", + f <- mod$sample(data = data_file_json, chains = 4, threads = 16), + "Running MCMC with 4 chains and 16 threads..", fixed = TRUE ) - expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) - expect_equal(f$metadata()$threads_per_chain, 4) + expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 16) + expect_equal(f$metadata()$threads, 16) }) test_that("threading works with optimize()", { mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE), force_recompile = TRUE) - expect_error( - mod$optimize(data = data_file_json), - "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads' was not set!", - fixed = TRUE - ) - expect_output( f <- mod$optimize(data = data_file_json, threads = 1, seed = 123), "Optimization terminated normally", @@ -89,12 +77,6 @@ test_that("threading works with optimize()", { test_that("threading works with variational()", { mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE), force_recompile = TRUE) - expect_error( - mod$variational(data = data_file_json), - "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads' was not set!", - fixed = TRUE - ) - expect_output( f <- mod$variational(data = data_file_json, threads = 1, seed = 123), "EXPERIMENTAL ALGORITHM", @@ -124,15 +106,18 @@ test_that("threading works with generate_quantities()", { mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = TRUE), force_recompile = TRUE) mod_gq <- cmdstan_model(stan_gq_program, cpp_options = list(stan_threads = TRUE), force_recompile = TRUE) expect_output( - f <- mod$sample(data = data_file_json, parallel_chains = 4, threads_per_chain = 1), - "Running MCMC with 4 parallel chains, with 1 thread(s) per chain..", - fixed = TRUE - ) - expect_error( - mod_gq$generate_quantities(fitted_params = f, data = data_file_json), - "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads_per_chain' was not set!", + f <- mod$sample(data = data_file_json, chains = 4, threads = 1), + "Running MCMC with 4 chains and 1 thread..", fixed = TRUE ) + if (FALSE) { + expect_error( + mod_gq$generate_quantities(fitted_params = f, data = data_file_json), + "The model was compiled with 'cpp_options = list(stan_threads = TRUE)' but 'threads_per_chain' was not set!", + fixed = TRUE + ) + + } expect_output( f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 1, seed = 123), "Running standalone generated quantities after 4 MCMC chains", @@ -142,16 +127,16 @@ test_that("threading works with generate_quantities()", { expect_equal(f_gq$metadata()$threads_per_chain, 1) expect_output( - f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 2, seed = 123), - "Running standalone generated quantities after 4 MCMC chains", + f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads = 2, seed = 123), + "Running standalone generated quantities with 2 threads", fixed = TRUE ) expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 2) expect_equal(f_gq$metadata()$threads_per_chain, 2) expect_output( - f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads_per_chain = 4, seed = 123), - "Running standalone generated quantities after 4 MCMC chains", + f_gq <- mod_gq$generate_quantities(fitted_params = f, data = data_file_gq_json, threads = 4, seed = 123), + "Running standalone generated quantities with 4 threads", fixed = TRUE ) expect_equal(as.integer(Sys.getenv("STAN_NUM_THREADS")), 4) @@ -162,19 +147,19 @@ test_that("correct output when stan_threads not TRUE", { mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = FALSE), force_recompile = TRUE) expect_output( mod$sample(data = data_file_json), - "Running MCMC with 4 sequential chains", + "Running MCMC with 4 chains and 1 thread", fixed = TRUE ) mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = "dummy string"), force_recompile = TRUE) expect_output( mod$sample(data = data_file_json), - "Running MCMC with 4 sequential chains", + "Running MCMC with 4 chains and 1 thread", fixed = TRUE ) mod <- cmdstan_model(stan_program, cpp_options = list(stan_threads = FALSE), force_recompile = TRUE) expect_warning( - mod$sample(data = data_file_json, threads_per_chain = 4), - "'threads_per_chain' is set but the model was not compiled with 'cpp_options = list(stan_threads = TRUE)' so 'threads_per_chain' will have no effect!", + mod$sample(data = data_file_json, thread = 4), + "'threads' is set but the model was not compiled with 'cpp_options = list(stan_threads = TRUE)' so 'threads_per_chain' will have no effect!", fixed = TRUE ) }) diff --git a/vignettes/cmdstanr.Rmd b/vignettes/cmdstanr.Rmd index b9b2e341..bbec41cc 100644 --- a/vignettes/cmdstanr.Rmd +++ b/vignettes/cmdstanr.Rmd @@ -159,7 +159,7 @@ fit <- mod$sample( data = data_list, seed = 123, chains = 4, - parallel_chains = 4, + threads = 4, refresh = 500 # print update every 500 iters ) ```