diff --git a/.github/workflows/r-cmd-check.yml b/.github/workflows/r-cmd-check.yml
index 14883cc..0ac9afa 100644
--- a/.github/workflows/r-cmd-check.yml
+++ b/.github/workflows/r-cmd-check.yml
@@ -57,3 +57,5 @@ jobs:
limit-access-to-actor: true
- uses: r-lib/actions/check-r-package@v2
+ with:
+ args: 'c("--no-manual")' # "--as-cran" prevents to start external processes
diff --git a/NAMESPACE b/NAMESPACE
index 7708194..479e082 100644
--- a/NAMESPACE
+++ b/NAMESPACE
@@ -9,6 +9,7 @@ export(assert_rush_workers)
export(assert_rushs)
export(get_hostname)
export(heartbeat)
+export(remove_rush_plan)
export(rsh)
export(rush_available)
export(rush_config)
diff --git a/R/Rush.R b/R/Rush.R
index 700659f..ef32b44 100644
--- a/R/Rush.R
+++ b/R/Rush.R
@@ -327,7 +327,7 @@ Rush = R6::R6Class("Rush",
wait_for_workers = function(n, timeout = Inf) {
assert_count(n)
assert_number(timeout)
- timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout
+ timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout %??% Inf
start_time = Sys.time()
while(self$n_workers < n) {
diff --git a/R/rush_plan.R b/R/rush_plan.R
index 35c2a4b..f2d32f5 100644
--- a/R/rush_plan.R
+++ b/R/rush_plan.R
@@ -9,6 +9,8 @@
#' If `NULL`, the `REDIS_URL` environment variable is parsed.
#' If `REDIS_URL` is not set, a default configuration is used.
#' See [redux::redis_config] for details.
+#' @param start_worker_timeout (`numeric(1)`)\cr
+#' The time in seconds to wait for a worker to start.
#'
#' @template param_n_workers
#' @template param_lgr_thresholds
@@ -55,6 +57,17 @@ rush_config = function() {
start_worker_timeout = rush_env$start_worker_timeout)
}
+#' @title Remove Rush Plan
+#'
+#' @description
+#' Removes the rush plan that was set by [rush_plan()].
+#'
+#' @export
+remove_rush_plan = function() {
+ rm(list = ls(envir = rush_env), envir = rush_env)
+ invisible(NULL)
+}
+
#' @title Rush Available
#'
#' @description
diff --git a/man/Rush.Rd b/man/Rush.Rd
index 1ca0882..0daabda 100644
--- a/man/Rush.Rd
+++ b/man/Rush.Rd
@@ -346,6 +346,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi
\if{html}{\out{
}}\preformatted{Rush$start_workers(
n_workers = NULL,
wait_for_workers = TRUE,
+ timeout = Inf,
globals = NULL,
packages = NULL,
heartbeat_period = NULL,
@@ -367,6 +368,9 @@ Number of workers to be started.}
\item{\code{wait_for_workers}}{(\code{logical(1)})\cr
Whether to wait until all workers are available.}
+\item{\code{timeout}}{(\code{numeric(1)})\cr
+Timeout to wait for workers in seconds.}
+
\item{\code{globals}}{(\code{character()})\cr
Global variables to be loaded to the workers global environment.}
@@ -481,7 +485,7 @@ Arguments passed to \code{worker_loop}.}
\subsection{Method \code{wait_for_workers()}}{
Wait until \code{n} workers are available.
\subsection{Usage}{
-\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n)}\if{html}{\out{
}}
+\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n, timeout = Inf)}\if{html}{\out{
}}
}
\subsection{Arguments}{
@@ -489,6 +493,10 @@ Wait until \code{n} workers are available.
\describe{
\item{\code{n}}{(\code{integer(1)})\cr
Number of workers to wait for.}
+
+\item{\code{timeout}}{(\code{numeric(1)})\cr
+Timeout in seconds.
+Default is \code{Inf}.}
}
\if{html}{\out{
}}
}
diff --git a/man/remove_rush_plan.Rd b/man/remove_rush_plan.Rd
new file mode 100644
index 0000000..db94a73
--- /dev/null
+++ b/man/remove_rush_plan.Rd
@@ -0,0 +1,11 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/rush_plan.R
+\name{remove_rush_plan}
+\alias{remove_rush_plan}
+\title{Remove Rush Plan}
+\usage{
+remove_rush_plan()
+}
+\description{
+Removes the rush plan that was set by \code{\link[=rush_plan]{rush_plan()}}.
+}
diff --git a/man/rush_plan.Rd b/man/rush_plan.Rd
index 2b94e57..e8e3fcf 100644
--- a/man/rush_plan.Rd
+++ b/man/rush_plan.Rd
@@ -8,7 +8,8 @@ rush_plan(
n_workers,
config = NULL,
lgr_thresholds = NULL,
- large_objects_path = NULL
+ large_objects_path = NULL,
+ start_worker_timeout = Inf
)
}
\arguments{
@@ -26,6 +27,9 @@ Logger threshold on the workers e.g. \code{c(rush = "debug")}.}
\item{large_objects_path}{(\code{character(1)})\cr
The path to the directory where large objects are stored.}
+
+\item{start_worker_timeout}{(\code{numeric(1)})\cr
+The time in seconds to wait for a worker to start.}
}
\description{
Stores the number of workers and Redis configuration options (\link[redux:redis_config]{redux::redis_config}) for \link{Rush}.
diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml
index 2e2c70e..d67f5d7 100644
--- a/pkgdown/_pkgdown.yml
+++ b/pkgdown/_pkgdown.yml
@@ -47,6 +47,7 @@ reference:
- rush_plan
- rush_available
- rush_config
+ - remove_rush_plan
- title: Worker Loop
contents:
- start_worker
diff --git a/tests/testthat/_snaps/Rush.md b/tests/testthat/_snaps/Rush.md
deleted file mode 100644
index c2214ac..0000000
--- a/tests/testthat/_snaps/Rush.md
+++ /dev/null
@@ -1,11 +0,0 @@
-# worker can be started with script
-
- Code
- rush$create_worker_script(fun = fun)
- Output
- DEBUG (500): [rush] Pushing worker config to Redis
- DEBUG (500): [rush] Serializing worker configuration to 2384528 bytes
- INFO (400): [rush] Start worker with:
- INFO (400): [rush] Rscript -e 'rush::start_worker(network_id = 'test-rush', hostname = 'host', url = 'redis://127.0.0.1:6379')'
- INFO (400): [rush] See ?rush::start_worker for more details.
-
diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R
index 93a7e56..9c7b6cc 100644
--- a/tests/testthat/helper.R
+++ b/tests/testthat/helper.R
@@ -12,10 +12,9 @@ expect_rush_task = function(task) {
}
expect_rush_reset = function(rush, type = "kill") {
+ remove_rush_plan()
processes = rush$processes
rush$reset(type = type)
expect_list(rush$connector$command(c("KEYS", "*")), len = 0)
walk(processes, function(p) p$kill())
}
-
-lg$set_threshold("debug")
diff --git a/tests/testthat/setup.R b/tests/testthat/setup.R
new file mode 100644
index 0000000..af02590
--- /dev/null
+++ b/tests/testthat/setup.R
@@ -0,0 +1,13 @@
+old_opts = options(
+ warnPartialMatchArgs = TRUE,
+ warnPartialMatchAttr = TRUE,
+ warnPartialMatchDollar = TRUE
+)
+
+# https://github.com/HenrikBengtsson/Wishlist-for-R/issues/88
+old_opts = lapply(old_opts, function(x) if (is.null(x)) FALSE else x)
+
+lg_rush = lgr::get_logger("rush")
+old_threshold_rush = lg_rush$threshold
+lg_rush$set_threshold(0)
+
diff --git a/tests/testthat/teardown.R b/tests/testthat/teardown.R
new file mode 100644
index 0000000..de41d02
--- /dev/null
+++ b/tests/testthat/teardown.R
@@ -0,0 +1,2 @@
+options(old_opts)
+lg_rush$set_threshold(old_threshold_rush)
diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R
index 650d188..1476c56 100644
--- a/tests/testthat/test-Rush.R
+++ b/tests/testthat/test-Rush.R
@@ -2,7 +2,6 @@
test_that("constructing a rush controller works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -14,7 +13,6 @@ test_that("constructing a rush controller works", {
test_that("workers are started", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -22,7 +20,7 @@ test_that("workers are started", {
expect_data_table(rush$worker_info, nrows = 0)
- worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_threshold = c(rush = "debug"), wait_for_workers = TRUE)
+ worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_thresholds = c(rush = "debug"), wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)
# check fields
@@ -42,7 +40,6 @@ test_that("workers are started", {
test_that("workers are started with a heartbeat", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -59,7 +56,6 @@ test_that("workers are started with a heartbeat", {
test_that("additional workers are started", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -86,7 +82,6 @@ test_that("additional workers are started", {
test_that("packages are available on the worker", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -105,7 +100,6 @@ test_that("packages are available on the worker", {
test_that("globals are available on the worker", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -126,7 +120,6 @@ test_that("globals are available on the worker", {
test_that("named globals are available on the worker", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -149,16 +142,14 @@ test_that("named globals are available on the worker", {
test_that("worker can be started with script", {
skip_on_cran()
- skip_on_ci()
- skip_if(TRUE)
set.seed(1) # make log messages reproducible
root_logger = lgr::get_logger("root")
- old_fmt = root_logger$appenders$cons$layout$fmt
- root_logger$appenders$cons$layout$set_fmt("%L (%n): %m")
+ old_fmt = root_logger$appenders$console$layout$fmt
+ root_logger$appenders$console$layout$set_fmt("%L (%n): %m")
on.exit({
- root_logger$appenders$cons$layout$set_fmt(old_fmt)
+ root_logger$appenders$console$layout$set_fmt(old_fmt)
})
config = start_flush_redis()
@@ -172,7 +163,6 @@ test_that("worker can be started with script", {
test_that("a remote worker is started", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
fun = function(x1, x2, ...) list(y = x1 + x2)
@@ -189,7 +179,6 @@ test_that("a remote worker is started", {
test_that("a worker is terminated", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -220,7 +209,6 @@ test_that("a worker is terminated", {
test_that("a local worker is killed", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -251,7 +239,6 @@ test_that("a local worker is killed", {
test_that("a remote worker is killed via the heartbeat", {
skip_on_cran()
- skip_on_ci()
skip_on_os("windows")
config = start_flush_redis()
@@ -287,7 +274,6 @@ test_that("a remote worker is killed via the heartbeat", {
test_that("reading and writing a hash works with flatten", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -315,7 +301,6 @@ test_that("reading and writing a hash works with flatten", {
test_that("reading and writing a hash works without flatten", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -343,7 +328,6 @@ test_that("reading and writing a hash works without flatten", {
test_that("reading and writing hashes works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -383,7 +367,6 @@ test_that("reading and writing hashes works", {
test_that("writing hashes to specific keys works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -406,7 +389,6 @@ test_that("writing hashes to specific keys works", {
test_that("writing list columns works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -438,7 +420,6 @@ test_that("writing list columns works", {
test_that("evaluating a task works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -477,7 +458,6 @@ test_that("evaluating a task works", {
test_that("evaluating tasks works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -516,7 +496,6 @@ test_that("evaluating tasks works", {
test_that("caching results works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -553,8 +532,7 @@ test_that("caching results works", {
# segfault detection -----------------------------------------------------------
test_that("a segfault on a local worker is detected", {
- skip_on_cran()
- skip_on_ci()
+
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -575,8 +553,7 @@ test_that("a segfault on a local worker is detected", {
})
test_that("a segfault on a worker is detected via the heartbeat", {
- skip_on_cran()
- skip_on_ci()
+
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -602,7 +579,6 @@ test_that("a segfault on a worker is detected via the heartbeat", {
test_that("a simple error is catched", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -650,7 +626,6 @@ test_that("a simple error is catched", {
test_that("a lost task is detected", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -698,7 +673,6 @@ test_that("a lost task is detected", {
test_that("a lost task is detected when waiting", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -745,7 +719,6 @@ test_that("a lost task is detected when waiting", {
test_that("restarting a worker works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -767,8 +740,7 @@ test_that("restarting a worker works", {
test_that("restarting a worker kills the worker", {
skip_on_cran()
- skip_on_ci()
- skip_on_windows()
+ skip_on_os("windows")
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -780,6 +752,9 @@ test_that("restarting a worker kills the worker", {
expect_true(tools::pskill(pid, signal = 0))
rush$restart_workers(worker_ids = worker_id)
+
+ Sys.sleep(1)
+
expect_false(pid == rush$worker_info$pid)
expect_false(tools::pskill(pid, signal = 0))
@@ -790,7 +765,6 @@ test_that("restarting a worker kills the worker", {
test_that("blocking on new results works", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -812,7 +786,6 @@ test_that("blocking on new results works", {
test_that("wait for tasks works when a task gets lost", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -835,7 +808,6 @@ test_that("wait for tasks works when a task gets lost", {
test_that("saving lgr logs works", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
@@ -867,7 +839,6 @@ test_that("saving lgr logs works", {
test_that("snapshot option works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -887,7 +858,6 @@ test_that("snapshot option works", {
test_that("terminating workers on idle works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -906,7 +876,6 @@ test_that("terminating workers on idle works", {
test_that("constants works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -926,7 +895,6 @@ test_that("constants works", {
test_that("network without controller works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -962,7 +930,6 @@ test_that("network without controller works", {
test_that("seeds are generated from regular rng seed", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
@@ -978,7 +945,6 @@ test_that("seeds are generated from regular rng seed", {
test_that("seed are generated from L'Ecuyer-CMRG seed", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L))
@@ -994,7 +960,6 @@ test_that("seed are generated from L'Ecuyer-CMRG seed", {
test_that("seed is set correctly on two workers", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
@@ -1020,9 +985,13 @@ test_that("seed is set correctly on two workers", {
test_that("printing logs with redis appender works", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # does not work in testthat on environment
+ lg_rush = lgr::get_logger("rush")
+ old_threshold_rush = lg_rush$threshold
+ on.exit(lg_rush$set_threshold(old_threshold_rush))
+ lg_rush$set_threshold("info")
+
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config, seed = 123)
fun = function(x1, x2, ...) {
@@ -1050,8 +1019,7 @@ test_that("printing logs with redis appender works", {
})
test_that("redis info works", {
- skip_on_cran()
- skip_on_ci()
+
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -1062,7 +1030,6 @@ test_that("redis info works", {
test_that("evaluating a task works", {
skip_on_cran()
- skip_on_ci()
skip_if(TRUE) # takes too long
config = start_flush_redis()
diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R
index 926d03f..21543af 100644
--- a/tests/testthat/test-RushWorker.R
+++ b/tests/testthat/test-RushWorker.R
@@ -1,6 +1,5 @@
test_that("constructing a rush worker works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -24,7 +23,6 @@ test_that("constructing a rush worker works", {
test_that("active bindings work after construction", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -49,7 +47,6 @@ test_that("active bindings work after construction", {
test_that("a worker is registered", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -70,7 +67,6 @@ test_that("a worker is registered", {
test_that("a worker is terminated", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -85,7 +81,6 @@ test_that("a worker is terminated", {
test_that("a heartbeat is started", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", heartbeat_period = 3)
@@ -99,7 +94,6 @@ test_that("a heartbeat is started", {
test_that("pushing a task to the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -139,7 +133,6 @@ test_that("pushing a task to the queue works", {
test_that("pushing a task with extras to the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -182,7 +175,6 @@ test_that("pushing a task with extras to the queue works", {
test_that("pushing tasks to the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -223,7 +215,6 @@ test_that("pushing tasks to the queue works", {
test_that("pushing tasks with extras to the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -267,7 +258,6 @@ test_that("pushing tasks with extras to the queue works", {
test_that("popping a task from the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -310,7 +300,6 @@ test_that("popping a task from the queue works", {
test_that("popping a task with seed, max_retries and timeout works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -318,7 +307,7 @@ test_that("popping a task with seed, max_retries and timeout works", {
seed = 123456
max_retries = 2
timeout = 1
- rush$push_tasks(xss, seed = list(seed), max_retries = max_retries, timeout = timeout)
+ rush$push_tasks(xss, seeds = list(seed), max_retries = max_retries, timeouts = timeout)
# check task
task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout"))
@@ -359,7 +348,6 @@ test_that("popping a task with seed, max_retries and timeout works", {
test_that("pushing a finished task works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -401,7 +389,6 @@ test_that("pushing a finished task works", {
test_that("pushing a failed tasks works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -409,7 +396,7 @@ test_that("pushing a failed tasks works", {
rush$push_tasks(xss)
task = rush$pop_task()
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
# check task count
expect_equal(rush$n_tasks, 1)
@@ -443,7 +430,11 @@ test_that("pushing a failed tasks works", {
test_that("retry a failed task works", {
skip_on_cran()
- skip_on_ci()
+
+ lg_rush = lgr::get_logger("rush")
+ old_threshold_rush = lg_rush$threshold
+ on.exit(lg_rush$set_threshold(old_threshold_rush))
+ lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -453,7 +444,7 @@ test_that("retry a failed task works", {
expect_output(rush$retry_tasks(keys), "Not all task")
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
@@ -470,17 +461,16 @@ test_that("retry a failed task works", {
test_that("retry a failed task works and setting a new seed works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
seed = c(10407L, 1280795612L, -169270483L, -442010614L, -603558397L, -222347416L, 1489374793L)
- keys = rush$push_tasks(xss, seed = list(seed))
+ keys = rush$push_tasks(xss, seeds = list(seed))
task = rush$pop_task(fields = c("xs", "seed"))
expect_equal(task$seed, seed)
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
@@ -495,8 +485,10 @@ test_that("retry a failed task works and setting a new seed works", {
})
test_that("retry a failed task works with a maximum of retries", {
- skip_on_cran()
- skip_on_ci()
+ lg_rush = lgr::get_logger("rush")
+ old_threshold_rush = lg_rush$threshold
+ on.exit(lg_rush$set_threshold(old_threshold_rush))
+ lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -508,7 +500,7 @@ test_that("retry a failed task works with a maximum of retries", {
expect_null(task$n_retries)
expect_output(rush$retry_tasks(keys), "Not all task")
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 1)
@@ -524,7 +516,7 @@ test_that("retry a failed task works with a maximum of retries", {
expect_false(rush$is_failed_task(task$key))
task = rush$pop_task()
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
expect_output(rush$retry_tasks(keys), "reached the maximum number of retries")
rush$retry_tasks(keys, ignore_max_retries = TRUE)
@@ -540,7 +532,11 @@ test_that("retry a failed task works with a maximum of retries", {
test_that("retry failed tasks works", {
skip_on_cran()
- skip_on_ci()
+
+ lg_rush = lgr::get_logger("rush")
+ old_threshold_rush = lg_rush$threshold
+ on.exit(lg_rush$set_threshold(old_threshold_rush))
+ lg_rush$set_threshold("info")
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -552,7 +548,7 @@ test_that("retry failed tasks works", {
expect_output(rush$retry_tasks(keys), "Not all task")
- rush$push_failed(keys, condition = list(list(message = "error")))
+ rush$push_failed(keys, conditions = list(list(message = "error")))
expect_equal(rush$n_queued_tasks, 0)
expect_equal(rush$n_failed_tasks, 2)
@@ -569,7 +565,6 @@ test_that("retry failed tasks works", {
test_that("moving and fetching tasks works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -618,7 +613,7 @@ test_that("moving and fetching tasks works", {
# push failed task
task = rush$pop_task()
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 1)
expect_character(queued_tasks$keys, unique = TRUE)
@@ -645,7 +640,6 @@ test_that("moving and fetching tasks works", {
test_that("fetching as list works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -692,7 +686,7 @@ test_that("fetching as list works", {
# push failed task
task = rush$pop_task()
- rush$push_failed(task$key, condition = list(list(message = "error")))
+ rush$push_failed(task$key, conditions = list(list(message = "error")))
failed_tasks = rush$fetch_failed_tasks(data_format = "list")
expect_list(failed_tasks, len = 1)
expect_names(names(failed_tasks), identical.to = task$key)
@@ -702,7 +696,6 @@ test_that("fetching as list works", {
test_that("fetch task with states works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123)
@@ -747,7 +740,7 @@ test_that("fetch task with states works", {
xss = list(list(x1 = 2, x2 = 2))
rush$push_tasks(xss)
task_2 = rush$pop_task()
- rush$push_failed(task_2$key, condition = list(list(message = "error")))
+ rush$push_failed(task_2$key, conditions = list(list(message = "error")))
tab = rush$fetch_tasks_with_state()
expect_data_table(tab, nrows = 2)
expect_equal(tab$state, c("finished", "failed"))
@@ -760,7 +753,6 @@ test_that("fetch task with states works", {
test_that("latest results are fetched", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -803,7 +795,6 @@ test_that("latest results are fetched", {
test_that("priority queues work", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -857,7 +848,6 @@ test_that("priority queues work", {
test_that("redirecting to shared queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -888,7 +878,6 @@ test_that("redirecting to shared queue works", {
test_that("mixing priority queue and shared queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -910,7 +899,7 @@ test_that("mixing priority queue and shared queue works", {
test_that("saving logs with redis appender works", {
skip_on_cran()
- skip_on_ci()
+
appenders = lgr::get_logger("root")$appenders
on.exit({
@@ -945,7 +934,7 @@ test_that("saving logs with redis appender works", {
test_that("settings the buffer size in redis appender works", {
skip_on_cran()
- skip_on_ci()
+
appenders = lgr::get_logger("root")$appenders
on.exit({
@@ -977,7 +966,6 @@ test_that("settings the buffer size in redis appender works", {
test_that("pushing tasks and terminating worker works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -998,7 +986,6 @@ test_that("pushing tasks and terminating worker works", {
test_that("terminate on idle works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
@@ -1018,7 +1005,6 @@ test_that("terminate on idle works", {
test_that("popping a task with seed from the queue works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123)
@@ -1036,7 +1022,6 @@ test_that("popping a task with seed from the queue works", {
test_that("task in states works", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123)
@@ -1076,7 +1061,7 @@ test_that("task in states works", {
xss = list(list(x1 = 2, x2 = 2))
keys = rush$push_tasks(xss)
task_2 = rush$pop_task()
- rush$push_failed(task_2$key, condition = list(list(message = "error")))
+ rush$push_failed(task_2$key, conditions = list(list(message = "error")))
keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed"))
expect_list(keys_list, len = 4)
expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed"))
diff --git a/tests/testthat/test-rush_plan.R b/tests/testthat/test-rush_plan.R
index a315ce5..f9ac09d 100644
--- a/tests/testthat/test-rush_plan.R
+++ b/tests/testthat/test-rush_plan.R
@@ -1,7 +1,4 @@
test_that("rush_plan family works", {
- skip_on_cran()
- skip_on_ci()
-
expect_false(rush_available())
config = redis_config()
rush_plan(n_workers = 2, config)
@@ -13,16 +10,12 @@ test_that("rush_plan family works", {
})
test_that("rush_plan throws and error if redis is not available", {
- skip_on_cran()
- skip_on_ci()
-
config = redis_config(url = "redis://localhost:1234")
expect_error(rush_plan(n_workers = 2, config), "Can't connect to Redis")
})
test_that("start workers", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush_plan(n_workers = 2, config)
@@ -40,7 +33,11 @@ test_that("start workers", {
test_that("set threshold", {
skip_on_cran()
- skip_on_ci()
+
+ lg_rush = lgr::get_logger("rush")
+ old_threshold_rush = lg_rush$threshold
+ on.exit(lg_rush$set_threshold(old_threshold_rush))
+ lg_rush$set_threshold("debug")
config = start_flush_redis()
rush_plan(n_workers = 2, config, lgr_thresholds = c(rush = "debug"))
@@ -57,7 +54,6 @@ test_that("set threshold", {
test_that("set start worker timeout", {
skip_on_cran()
- skip_on_ci()
config = start_flush_redis()
rush_plan(n_workers = 2, config, start_worker_timeout = -Inf)
@@ -67,4 +63,6 @@ test_that("set start worker timeout", {
rush = rsh("test-rush")
fun = function(x1, x2, ...) list(y = x1 + x2)
expect_error(rush$start_workers(fun = fun), "Timeout waiting")
+
+ expect_rush_reset(rush)
})
diff --git a/tests/testthat/test-worker_loops.R b/tests/testthat/test-worker_loops.R
index 954b1cd..1a0a2e3 100644
--- a/tests/testthat/test-worker_loops.R
+++ b/tests/testthat/test-worker_loops.R
@@ -1,6 +1,8 @@
# default ----------------------------------------------------------------------
test_that("worker_loop_default works", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -14,6 +16,8 @@ test_that("worker_loop_default works", {
})
test_that("worker_loop_default works with failed task", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -28,6 +32,8 @@ test_that("worker_loop_default works with failed task", {
})
test_that("worker_loop_default retries failed task", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -42,6 +48,8 @@ test_that("worker_loop_default retries failed task", {
})
test_that("worker_loop_default sets seed is set correctly", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))
@@ -58,6 +66,8 @@ test_that("worker_loop_default sets seed is set correctly", {
# callr ------------------------------------------------------------------------
test_that("worker_loop_callr works", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -71,6 +81,8 @@ test_that("worker_loop_callr works", {
})
test_that("worker_loop_callr works with failed task", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -85,6 +97,8 @@ test_that("worker_loop_callr works with failed task", {
})
test_that("worker_loop_callr works with lost task", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -99,6 +113,8 @@ test_that("worker_loop_callr works with lost task", {
})
test_that("worker_loop_callr works with timeout", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
xss = list(list(x1 = 1, x2 = 2))
@@ -113,6 +129,8 @@ test_that("worker_loop_callr works with timeout", {
})
test_that("worker_loop_callr sets seed correctly", {
+ skip_on_cran()
+
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456)
xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))