Skip to content

Commit

Permalink
CLEANUP: mirai_cluster() no longer accepts argument 'workers'; it wil…
Browse files Browse the repository at this point in the history
…l default to use whatever workers are already set up using mirai::daemons()
  • Loading branch information
HenrikBengtsson committed Apr 11, 2024
1 parent bdb7e76 commit 2b68853
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 132 deletions.
3 changes: 1 addition & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
Package: future.mirai
Version: 0.1.1-9010
Version: 0.1.1-9011
Depends:
future
Imports:
mirai (>= 0.12.1),
parallelly,
parallel,
utils
Suggests:
future.tests
Expand Down
4 changes: 0 additions & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ importFrom(mirai,is_error_value)
importFrom(mirai,mirai)
importFrom(mirai,status)
importFrom(mirai,unresolved)
importFrom(parallel,parLapply)
importFrom(parallel,stopCluster)
importFrom(parallelly,availableCores)
importFrom(parallelly,availableWorkers)
importFrom(parallelly,makeClusterPSOCK)
importFrom(utils,capture.output)
importFrom(utils,packageVersion)
importFrom(utils,str)
18 changes: 1 addition & 17 deletions R/MiraiFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,8 @@ MiraiFuture <- function(expr = NULL,
daemons(n = 0L) ## reset is required
daemons(n = workers, dispatcher = dispatcher)
}
} else if (is.character(workers)) {
stop_if_not(length(workers) >= 1L, !anyNA(workers))
if (identical(dispatcher, "auto")) dispatcher <- TRUE

dd <- get_mirai_daemons()
if (is.data.frame(dd)) {
uris <- rownames(dd)
n <- length(uris)
} else {
n <- -1L
}
if (length(workers) != n) {
daemons(n = 0L) ## reset is required
daemons(n = length(workers), url = "ws://:0", dispatcher = dispatcher)
}
cluster <- launch_mirai_daemons(workers)
} else if (!is.null(workers)) {
stop("Argument 'workers' should be a numeric scalar or a character vector: ", mode(workers))
stop("Argument 'workers' should be a numeric scalar or NULL: ", mode(workers))
}

future <- structure(future, class = c("MiraiFuture", class(future)))
Expand Down
15 changes: 2 additions & 13 deletions R/mirai_cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,28 @@
#' @inheritParams MiraiFuture
#' @inheritParams future::cluster
#'
#' @param workers Specifies **mirai** workers to use as parallel workers.
#' If a numeric scalar, then this number of local **mirai** daemons will
#' be launched.
#' If a character vector, then this it specifies the hostnames of where
#' the **mirai** daemons will be launched.
#' If `NULL`, then any **mirai** daemons previously created by
#' [mirai::daemons()] will be used as parallel workers.
#'
#' @return An object of class [MiraiFuture].
#'
#' @example incl/mirai_cluster.R
#'
#' @importFrom parallelly availableWorkers
#' @export
mirai_cluster <- function(expr,
substitute = TRUE,
envir = parent.frame(),
...,
workers = availableWorkers()) {
...) {
if (substitute) expr <- substitute(expr)

future <- MiraiFuture(
expr = expr, substitute = FALSE,
envir = envir,
workers = workers,
workers = NULL,
...
)
if(!isTRUE(future[["lazy"]])) future <- run(future)
invisible(future)
}
class(mirai_cluster) <- c("mirai_cluster", "mirai", "multiprocess", "future", "function")
attr(mirai_cluster, "init") <- TRUE
attr(mirai_cluster, "tweakable") <- "workers"


#' @importFrom future tweak
Expand Down
66 changes: 0 additions & 66 deletions R/utils,mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,3 @@ get_mirai_daemons <- function() {
as.data.frame(res)

}

#' @importFrom parallelly makeClusterPSOCK
#' @importFrom parallel parLapply stopCluster
launch_mirai_daemons <- function(hostnames, ..., timeout = 60) {
online <- NULL ## To please R CMD check

stopifnot(is.character(hostnames), !anyNA(hostnames))
stopifnot(is.numeric(timeout), !is.na(timeout), is.finite(timeout), timeout > 0.0)

## Assert that mirai daemons have been configured
dd <- get_mirai_daemons()
stopifnot(is.data.frame(dd) || is.numeric(dd))

## Consider only non-connected daemons
dd <- subset(dd, online == 0L)

## Nothing to do?
if (nrow(dd) == 0L) return(NULL)

## Launch only a subset?
if (nrow(dd) > length(hostnames)) {
dd <- dd[seq_along(hostnames), ]
} else if (nrow(dd) < length(hostnames)) {
hostnames <- hostnames[seq_len(nrow(dd))]
}

uris <- rownames(dd)
stopifnot(is.character(uris), !anyNA(uris), anyDuplicated(uris) == 0L)

## FIXME: This assumes servers are launched on localhost, or that
## reverse tunnelling is used when setting up the PSOCK cluster
## /HB 2023-05-02
host_ip <- "127.0.0.1"
uris <- sub("//:", sprintf("//%s:", host_ip), uris)

## Launching parallel PSOCK workers
cl <- makeClusterPSOCK(hostnames, ..., autoStop = TRUE)

## Use them to launch mirai daemons to connect back to host
void <- parLapply(cl, uris, function(uri) {
code <- sprintf('mirai::daemon("%s")', uri)
bin <- file.path(R.home("bin"), "Rscript")
system2(bin, args = c("-e", shQuote(code)), wait = FALSE)
}, chunk.size = 1L)

## Wait for mirai servers to connect back
t0 <- Sys.time()
ready <- FALSE
while (!ready) {
dd2 <- get_mirai_daemons()
stopifnot(is.data.frame(dd))
dd2 <- dd2[rownames(dd), , drop = FALSE]
dd2 <- subset(dd2, online == 0L)
ready <- (nrow(dd2) == 0)
Sys.sleep(1.0)
dt <- Sys.time() - t0
if (dt > timeout) {
stopCluster(cl)
cl <- NULL
stop(sprintf("%d out of %d mirai servers did not connect back within %g seconds",
nrow(dd2), nrow(dd), timeout))
}
}

cl
} ## launch_mirai_daemons()
5 changes: 1 addition & 4 deletions incl/mirai_cluster.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
\dontshow{if (.Platform[["OS.type"]] != "windows" || interactive()) \{}
mirai::daemons(parallelly::availableCores(), dispatcher = FALSE)
plan(mirai_cluster, workers = NULL)
plan(mirai_cluster)

# A function that returns a future, note that N uses lexical scoping...
f <- \() future({4 * sum((runif(N) ^ 2 + runif(N) ^ 2) < 1) / N}, seed = TRUE)
Expand All @@ -13,5 +12,3 @@ print(pi_est)

plan(sequential)
invisible(mirai::daemons(0)) ## Shut down mirai workers
\dontshow{\}}

21 changes: 2 additions & 19 deletions man/mirai_cluster.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions tests/mirai_cluster.R
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
if (requireNamespace("future.tests")) {
mirai::daemons(0) ## Reset any daemons running

## FIXME: The following is disable on MS Windows, because it will
## result in a 'R CMD check' NOTE on "detritus in the temp directory".
## This happens whenever we use mirai::daemons(..., dispatcher = TRUE) [1],
## which is what mirai_cluster() uses.
## FIXME: The following is disabled on MS Windows, because it will
## result in a 'R CMD check' NOTE on "detritus in the temp directory" [1].
## This happens whenever we use mirai::daemons(..., dispatcher = TRUE).
## [1] https://github.com/shikokuchuo/mirai/discussions/105
if (.Platform[["OS.type"]] != "windows") {
future.tests::check("future.mirai::mirai_cluster", timeout = 30.0, exit_value = FALSE)
}
dispatcher <- (.Platform[["OS.type"]] != "windows")

mirai::daemons(2, dispatcher = dispatcher)
future.tests::check("future.mirai::mirai_cluster", timeout = 10.0, exit_value = FALSE)

mirai::daemons(0) ## Reset any daemons running
gc()
Expand Down

0 comments on commit 2b68853

Please sign in to comment.