Skip to content

Commit

Permalink
Merge branch 'release/0.2.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Apr 18, 2024
2 parents 86335e0 + 2c93986 commit b62fd77
Show file tree
Hide file tree
Showing 45 changed files with 1,431 additions and 203 deletions.
14 changes: 7 additions & 7 deletions .Rbuildignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
^[.]gitignore
^[.]github/
^[.]Rdump/
^[.]make/
^docs/
^pkgdown/
^incl/
^[.]github
^[.]make
^[.]local
^docs
^pkgdown
^incl
^Makefile
^CONTRIBUTING.md

^cran-comments.md
^.*\.Rproj$
^\.Rproj\.user$
26 changes: 15 additions & 11 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ jobs:
fail-fast: false
matrix:
config:
# - {os: windows-latest, r: 'devel' }
# - {os: windows-latest, r: 'release' }
# - {os: windows-latest, r: 'oldrel' }
# - {os: macOS-latest, r: 'devel' }
# - {os: macOS-latest, r: 'release' }
# - {os: macOS-latest, r: 'oldrel' }
# - {os: ubuntu-latest, r: 'devel' }
# - {os: ubuntu-latest, r: 'devel' , mirai: 'devel', label: "mirai & nanonext devel" }
- {os: windows-latest, r: 'devel' }
- {os: windows-latest, r: 'release' }
- {os: windows-latest, r: 'oldrel' }
- {os: macOS-latest, r: 'devel' }
- {os: macOS-latest, r: 'release' }
- {os: macOS-latest, r: 'oldrel' }
- {os: ubuntu-latest, r: 'devel' }
- {os: ubuntu-latest, r: 'devel' , mirai: 'devel', label: "mirai & nanonext devel" }
- {os: ubuntu-latest, r: 'release' }
# - {os: ubuntu-latest, r: 'oldrel' }
- {os: ubuntu-latest, r: 'oldrel' }
# - {os: ubuntu-latest, r: 'oldrel-1' }
# - {os: ubuntu-latest, r: 'oldrel-2' }
# - {os: ubuntu-latest, r: '3.6' }
- {os: ubuntu-latest, r: '3.6' }
# # - {os: ubuntu-latest, r: 'release' , language: ko, label: ko }
# - {os: ubuntu-latest, r: 'release' , language: zh_CN, label: zh_CN }
# # - {os: ubuntu-latest, r: 'release' , language: zh_TW, label: zh_TW }
Expand All @@ -51,11 +51,12 @@ jobs:
R_FUTURE_RNG_ONMISUSE: error

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: r-lib/actions/setup-r@v2
with:
use-public-rspm: true
r-version: ${{ matrix.config.r }}

- uses: r-lib/actions/setup-r-dependencies@v2
with:
Expand Down Expand Up @@ -86,3 +87,6 @@ jobs:
shell: Rscript {0}

- uses: r-lib/actions/check-r-package@v2
with:
error-on: '"note"'

4 changes: 2 additions & 2 deletions .github/workflows/covr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
_R_CHECK_CRAN_INCOMING_: false

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: r-lib/actions/setup-pandoc@v2

Expand All @@ -42,7 +42,7 @@ jobs:
shell: Rscript {0}

- name: Cache R packages
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ${{ env.R_LIBS_USER }}
key: ${{ runner.os }}-${{ hashFiles('.github/R-version') }}-1-${{ hashFiles('.github/depends.Rds') }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/future_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
R_FUTURE_RNG_ONMISUSE: error

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- uses: r-lib/actions/setup-pandoc@v2

Expand All @@ -48,7 +48,7 @@ jobs:
shell: Rscript {0}

- name: Cache R packages
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: ${{ env.R_LIBS_USER }}
key: ${{ runner.os }}-${{ hashFiles('.github/R-version') }}-1-${{ hashFiles('.github/depends.Rds') }}
Expand Down Expand Up @@ -98,7 +98,7 @@ jobs:
- name: Upload check results
if: failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ runner.os }}-r${{ matrix.future.plan }}-results
path: check
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*[.]so
[.]Rhistory
[.]Rdump/
[.]local/
docs/
.Rproj.user
*.Rproj
15 changes: 8 additions & 7 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
Package: future.mirai
Version: 0.1.1
Version: 0.2.0
Depends:
future
Imports:
mirai (>= 0.9.1),
mirai (>= 0.12.1),
parallelly,
parallel,
utils
Suggests:
future.tests
Title: A Future API for Parallel Processing using 'mirai'
Description: A Future API implementation on top of the 'mirai' package.
future.tests,
future.apply,
listenv
Title: A 'Future' API for Parallel Processing using 'mirai'
Description: Implementation of the 'Future' API <doi:10.32614/RJ-2021-048> on top of the 'mirai' package. This allows you to process futures, as defined by the 'future' package, in parallel out of the box, on your local machine or across remote machines. Contrary to back-ends relying on the 'parallel' package (e.g. 'multisession') and socket connections, 'mirai_cluster' and 'mirai_multisession', provided here, can run more than 125 parallel R processes.
Authors@R: c(
person("Henrik", "Bengtsson",
role = c("aut", "cre", "cph"),
Expand All @@ -26,5 +27,5 @@ License: GPL (>= 3)
Encoding: UTF-8
URL: https://future.mirai.futureverse.org, https://github.com/HenrikBengtsson/future.mirai
BugReports: https://github.com/HenrikBengtsson/future.mirai/issues
RoxygenNote: 7.2.3
RoxygenNote: 7.3.1
Roxygen: list(markdown = TRUE)
5 changes: 1 addition & 4 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ importFrom(future,resolved)
importFrom(future,result)
importFrom(future,run)
importFrom(future,tweak)
importFrom(mirai,call_mirai_)
importFrom(mirai,daemons)
importFrom(mirai,is_error_value)
importFrom(mirai,mirai)
importFrom(mirai,status)
importFrom(mirai,unresolved)
importFrom(parallel,parLapply)
importFrom(parallel,stopCluster)
importFrom(parallelly,availableCores)
importFrom(parallelly,availableWorkers)
importFrom(parallelly,makeClusterPSOCK)
importFrom(utils,capture.output)
importFrom(utils,packageVersion)
importFrom(utils,str)
9 changes: 7 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
# Version 0.2.0

* First public release.


# Version 0.1.1

## Miscelleanous
## Miscellaneous

* Align code with **mirai** 0.9.1.


# Version 0.1.0

## Signficant Changes
## Significant Changes

* A working, proof-of-concept implementation.
53 changes: 30 additions & 23 deletions R/MiraiFuture-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ MiraiFuture <- function(expr = NULL,
packages = NULL,
lazy = FALSE,
workers = availableCores(),
dispatcher = "auto",
...)
{
if(isTRUE(substitute)) expr <- substitute(expr)

if (!identical(dispatcher, "auto")) {
stopifnot(is.logical(dispatcher), length(dispatcher) == 1L, !is.na(dispatcher))
}

## Record globals
if(!isTRUE(attr(globals, "already-done", exact = TRUE))) {
gp <- getGlobalsAndPackages(expr, envir = envir, persistent = FALSE, globals = globals)
Expand All @@ -41,35 +46,22 @@ MiraiFuture <- function(expr = NULL,

if (is.function(workers)) workers <- workers()
if (!is.null(workers)) stop_if_not(length(workers) >= 1)

cluster <- NULL
if (is.numeric(workers)) {
stop_if_not(length(workers) == 1L, !is.na(workers), workers >= 1)
if (identical(dispatcher, "auto")) dispatcher <- FALSE

## Do we need to change the number of mirai workers?
nworkers <- mirai_daemons_nworkers()
if (is.infinite(workers) && nworkers < +Inf) {
daemons(n = 0L)
} else if (workers != nworkers) {
daemons(n = 0L) ## reset is required
daemons(n = workers, dispatcher = TRUE)
}
} else if (is.character(workers)) {
stop_if_not(length(workers) >= 1L, !anyNA(workers))
dd <- get_mirai_daemons()
if (is.data.frame(dd)) {
uris <- rownames(dd)
n <- length(uris)
} else {
n <- -1L
}
if (length(workers) != n) {
daemons(n = 0L) ## reset is required
daemons(n = length(workers), url = "ws://:0", dispatcher = TRUE)
daemons(n = workers, dispatcher = dispatcher, resilience = FALSE)
}
cluster <- launch_mirai_daemons(workers)
} else if (!is.null(workers)) {
stop("Argument 'workers' should be a numeric scalar or a character vector: ", mode(workers))
stop("Argument 'workers' should be a numeric scalar or NULL: ", mode(workers))
}

future <- structure(future, class = c("MiraiFuture", class(future)))
Expand Down Expand Up @@ -133,7 +125,16 @@ run.MiraiFuture <- function(future, ...) {

expr <- getExpression(future)
globals <- future[["globals"]]
mirai <- mirai(expr, .args = globals)

## Sanity check
not_allowed <- intersect(names(globals), names(formals(mirai::mirai)))
if (length(not_allowed) > 0) {
stop(FutureError(sprintf("Detected global variables that clash with argument names of mirai::mirai(): %s", paste(sQuote(not_allowed), collapse = ", "))))
}

args = list(.expr = expr)
if (length(globals) > 0) args <- c(args, globals)
mirai <- do.call(mirai, args = args)
future[["mirai"]] <- mirai

future[["state"]] <- "running"
Expand All @@ -151,6 +152,7 @@ mirai_version <- local({
})

#' @importFrom future result
#' @importFrom mirai call_mirai_
#' @export
result.MiraiFuture <- function(future, ...) {
if(isTRUE(future[["state"]] == "finished")) {
Expand All @@ -164,11 +166,15 @@ result.MiraiFuture <- function(future, ...) {
}

mirai <- future[["mirai"]]
while (unresolved(mirai)) {
Sys.sleep(0.1)
result <- call_mirai_(mirai)$data

if (inherits(result, "errorValue")) {
label <- future$label
if (is.null(label)) label <- "<none>"
msg <- sprintf("Failed to retrieve results from %s (%s). The mirai framework reports on error value %s", class(future)[1], label, result)
stop(FutureError(msg))
}

result <- mirai$data

future[["result"]] <- result
future[["state"]] <- "finished"

Expand All @@ -184,7 +190,8 @@ mirai_daemons_nworkers <- function() {
if (is.data.frame(workers)) return(nrow(workers))

if (length(workers) != 1L) {
stop(FutureError(sprintf("Length of mirai::status()$daemons is not one: %d", length(workers))))
msg <- sprintf("Length of mirai::status()$daemons is not one: %d", length(workers))
stop(FutureError(msg))
}

if (workers == 0L) return(Inf)
Expand Down
7 changes: 2 additions & 5 deletions R/mirai_cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,24 @@
#'
#' @example incl/mirai_cluster.R
#'
#' @importFrom parallelly availableWorkers
#' @export
mirai_cluster <- function(expr,
substitute = TRUE,
envir = parent.frame(),
...,
workers = availableWorkers()) {
...) {
if (substitute) expr <- substitute(expr)

future <- MiraiFuture(
expr = expr, substitute = FALSE,
envir = envir,
workers = workers,
workers = NULL,
...
)
if(!isTRUE(future[["lazy"]])) future <- run(future)
invisible(future)
}
class(mirai_cluster) <- c("mirai_cluster", "mirai", "multiprocess", "future", "function")
attr(mirai_cluster, "init") <- TRUE
attr(mirai_cluster, "tweakable") <- "workers"


#' @importFrom future tweak
Expand Down
10 changes: 8 additions & 2 deletions R/nbrOfWorkers.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
nbrOfWorkers.mirai <- function(evaluator) {
res <- status()
workers <- res[["daemons"]]
if (!is.numeric(workers)) {
if (is.character(workers)) {
workers <- res[["connections"]]
stopifnot(is.numeric(workers))
} else if (!is.numeric(workers)) {
stop(FutureError(sprintf("Unknown type of mirai::daemons()$daemons: %s", typeof(workers))))
}

Expand All @@ -30,7 +33,10 @@ nbrOfWorkers.mirai <- function(evaluator) {
nbrOfFreeWorkers.mirai <- function(evaluator, background = FALSE, ...) {
res <- status()
workers <- res[["daemons"]]
if (!is.numeric(workers)) {
if (is.character(workers)) {
workers <- res[["connections"]]
stopifnot(is.numeric(workers))
} else if (!is.numeric(workers)) {
stop(FutureError(sprintf("Unknown type of mirai::daemons()$daemons: %s", typeof(workers))))
}

Expand Down
5 changes: 3 additions & 2 deletions R/package.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#' TRUE
#' }
#'
#' @docType package
#' @aliases future.mirai-package
#' @name future.mirai
NULL
"_PACKAGE"


Loading

0 comments on commit b62fd77

Please sign in to comment.