From 85c2ffc97f0a8e52c379458a19523883463c0ff8 Mon Sep 17 00:00:00 2001
From: Marc Becker <33069354+be-marc@users.noreply.github.com>
Date: Tue, 6 Feb 2024 14:35:21 +0100
Subject: [PATCH] refactor: set worker arguments only once (#31)
---
R/Rush.R | 65 +++++++++++++++++++++++++---
man/Rush.Rd | 89 ++++++++++++++++++++++++++++----------
man/RushWorker.Rd | 40 ++---------------
pkgdown/_pkgdown.yml | 1 +
tests/testthat/test-Rush.R | 26 ++++++++++-
5 files changed, 154 insertions(+), 67 deletions(-)
diff --git a/R/Rush.R b/R/Rush.R
index c45a583..e9c82f2 100644
--- a/R/Rush.R
+++ b/R/Rush.R
@@ -194,10 +194,10 @@ Rush = R6::R6Class("Rush",
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
- #' @param supervise (`logical(1)`)\cr
- #' Whether to kill the workers when the main R process is shut down.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
+ #' @param supervise (`logical(1)`)\cr
+ #' Whether to kill the workers when the main R process is shut down.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
start_workers = function(
@@ -218,6 +218,11 @@ Rush = R6::R6Class("Rush",
assert_flag(supervise)
r = self$connector
+ if (self$has_start_arguments) {
+ # too reduce the complexity, we don't allow to start workers with different arguments for now.
+ stop("Worker configuration is already set. Use `$add_workers()` to add additional workers or `$stop_workers()` to stop the workers and start with new arguments.")
+ }
+
# push worker config to redis
private$.push_worker_config(
globals = globals,
@@ -230,13 +235,15 @@ Rush = R6::R6Class("Rush",
...
)
+ lg$debug("Starting %i worker(s)", n_workers)
+
worker_ids = uuid::UUIDgenerate(n = n_workers)
- self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
+ self$processes = set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = supervise, stdout = "|", stderr = "|") # , stdout = "|", stderr = "|"
- }), worker_ids))
+ }), worker_ids)
if (wait_for_workers) self$wait_for_workers(n_workers)
@@ -244,7 +251,38 @@ Rush = R6::R6Class("Rush",
},
#' @description
- #' Restart workers.
+ #' Add local workers to the network.
+ #'
+ #' @param n_workers (`integer(1)`)\cr
+ #' Number of workers to be started.
+ #' @param wait_for_workers (`logical(1)`)\cr
+ #' Whether to wait until all workers are available.
+ #' @param supervise (`logical(1)`)\cr
+ #' Whether to kill the workers when the main R process is shut down.
+ add_workers = function(n_workers, wait_for_workers = TRUE, supervise = TRUE) {
+ assert_count(n_workers)
+ assert_flag(wait_for_workers)
+ assert_flag(supervise)
+ r = self$connector
+ n_running_workers = self$n_running_workers
+
+ lg$debug("Starting %i worker(s)", n_workers)
+
+ worker_ids = uuid::UUIDgenerate(n = n_workers)
+ self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
+ processx::process$new("Rscript",
+ args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
+ self$network_id, worker_id, private$.hostname, self$config$url)),
+ supervise = TRUE, stdout = "|", stderr = "|")
+ }), worker_ids))
+
+ if (wait_for_workers) self$wait_for_workers(n_workers + n_running_workers)
+
+ return(invisible(worker_ids))
+ },
+
+ #' @description
+ #' Restart local workers.
#'
#' @param worker_ids (`character()`)\cr
#' Worker ids to be restarted.
@@ -281,6 +319,11 @@ Rush = R6::R6Class("Rush",
...
) {
+ if (self$has_start_arguments) {
+ # too reduce the complexity, we don't allow to start workers with different arguments for now.
+ lg$warn("Worker configuration is already set. Use `$stop_workers()` to stop the workers and start with new arguments.")
+ }
+
# push worker config to redis
private$.push_worker_config(
globals = globals,
@@ -1043,8 +1086,8 @@ Rush = R6::R6Class("Rush",
#' @description
#' Reads a single Redis hash and returns the values as a list named by the fields.
#'
- #' @param keys (`character()`)\cr
- #' Keys of the hashes.
+ #' @param key (`character(1)`)\cr
+ #' Key of the hash.
#' @param fields (`character()`)\cr
#' Fields to be read from the hashes.
#'
@@ -1276,6 +1319,14 @@ Rush = R6::R6Class("Rush",
r = self$connector
r$command(c("CONFIG", "SET", "save", str_collapse(rhs, sep = " ")))
private$.snapshot_schedule = rhs
+ },
+
+ #' @field has_start_arguments (`logical(1)`)\cr
+ #' Whether the start arguments for the workers are set.
+ has_start_arguments = function(rhs) {
+ assert_ro_binding(rhs)
+ r = self$connector
+ as.logical(r$EXISTS(private$.get_key("start_args")))
}
),
diff --git a/man/Rush.Rd b/man/Rush.Rd
index 05596f6..e28abeb 100644
--- a/man/Rush.Rd
+++ b/man/Rush.Rd
@@ -209,6 +209,9 @@ For example, \code{c(60, 1000)} saves the data base every 60 seconds if there ar
Overwrites the redis configuration file.
Set to \code{NULL} to disable snapshots.
For more details see \href{https://redis.io/docs/management/persistence/#snapshotting}{redis.io}.}
+
+\item{\code{has_start_arguments}}{(\code{logical(1)})\cr
+Whether the start arguments for the workers are set.}
}
\if{html}{\out{}}
}
@@ -219,6 +222,7 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho
\item \href{#method-Rush-format}{\code{Rush$format()}}
\item \href{#method-Rush-print}{\code{Rush$print()}}
\item \href{#method-Rush-start_workers}{\code{Rush$start_workers()}}
+\item \href{#method-Rush-add_workers}{\code{Rush$add_workers()}}
\item \href{#method-Rush-restart_workers}{\code{Rush$restart_workers()}}
\item \href{#method-Rush-create_worker_script}{\code{Rush$create_worker_script()}}
\item \href{#method-Rush-wait_for_workers}{\code{Rush$wait_for_workers()}}
@@ -226,7 +230,6 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho
\item \href{#method-Rush-detect_lost_workers}{\code{Rush$detect_lost_workers()}}
\item \href{#method-Rush-reset}{\code{Rush$reset()}}
\item \href{#method-Rush-read_log}{\code{Rush$read_log()}}
-\item \href{#method-Rush-read_tasks}{\code{Rush$read_tasks()}}
\item \href{#method-Rush-push_tasks}{\code{Rush$push_tasks()}}
\item \href{#method-Rush-push_priority_tasks}{\code{Rush$push_priority_tasks()}}
\item \href{#method-Rush-push_failed}{\code{Rush$push_failed()}}
@@ -244,6 +247,7 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho
\item \href{#method-Rush-wait_for_tasks}{\code{Rush$wait_for_tasks()}}
\item \href{#method-Rush-write_hashes}{\code{Rush$write_hashes()}}
\item \href{#method-Rush-read_hashes}{\code{Rush$read_hashes()}}
+\item \href{#method-Rush-read_hash}{\code{Rush$read_hash()}}
\item \href{#method-Rush-is_running_task}{\code{Rush$is_running_task()}}
\item \href{#method-Rush-is_failed_task}{\code{Rush$is_failed_task()}}
\item \href{#method-Rush-clone}{\code{Rush$clone()}}
@@ -385,10 +389,34 @@ Arguments passed to \code{worker_loop}.}
}
}
\if{html}{\out{
}}
+\if{html}{\out{}}
+\if{latex}{\out{\hypertarget{method-Rush-add_workers}{}}}
+\subsection{Method \code{add_workers()}}{
+Add local workers to the network.
+\subsection{Usage}{
+\if{html}{\out{}}\preformatted{Rush$add_workers(n_workers, wait_for_workers = TRUE, supervise = TRUE)}\if{html}{\out{
}}
+}
+
+\subsection{Arguments}{
+\if{html}{\out{}}
+\describe{
+\item{\code{n_workers}}{(\code{integer(1)})\cr
+Number of workers to be started.}
+
+\item{\code{wait_for_workers}}{(\code{logical(1)})\cr
+Whether to wait until all workers are available.}
+
+\item{\code{supervise}}{(\code{logical(1)})\cr
+Whether to kill the workers when the main R process is shut down.}
+}
+\if{html}{\out{
}}
+}
+}
+\if{html}{\out{
}}
\if{html}{\out{}}
\if{latex}{\out{\hypertarget{method-Rush-restart_workers}{}}}
\subsection{Method \code{restart_workers()}}{
-Restart workers.
+Restart local workers.
\subsection{Usage}{
\if{html}{\out{}}\preformatted{Rush$restart_workers(worker_ids)}\if{html}{\out{
}}
}
@@ -565,18 +593,6 @@ If \code{NULL} all worker ids are used.}
}
\if{html}{\out{}}
}
-}
-\if{html}{\out{
}}
-\if{html}{\out{}}
-\if{latex}{\out{\hypertarget{method-Rush-read_tasks}{}}}
-\subsection{Method \code{read_tasks()}}{
-\subsection{Usage}{
-\if{html}{\out{}}\preformatted{Rush$read_tasks(
- keys,
- fields = c("status", "seed", "timeout", "max_retries", "n_retries")
-)}\if{html}{\out{
}}
-}
-
}
\if{html}{\out{
}}
\if{html}{\out{}}
@@ -697,6 +713,9 @@ Retry failed tasks.
\item{\code{keys}}{(\code{character()})\cr
Keys of the tasks to be retried.}
+\item{\code{ignore_max_retires}}{(\code{logical(1)})\cr
+Whether to ignore the maximum number of retries.}
+
\item{\code{next_seed}}{(\code{logical(1)})\cr
Whether to change the seed of the task.}
}
@@ -1094,17 +1113,14 @@ Keys of the hashes.
\if{html}{\out{}}
\if{latex}{\out{\hypertarget{method-Rush-read_hashes}{}}}
\subsection{Method \code{read_hashes()}}{
-Reads Redis hashes and combines the values of the fields into a list.
-The function reads the values of the \code{fields} in the hashes stored at \code{keys}.
-The values of a hash are deserialized and combined into a single list.
-
+Reads R Objects from Redis hashes.
+The function reads the field-value pairs of the hashes stored at \code{keys}.
+The values of a hash are deserialized and combined to a list.
+If \code{flatten} is \code{TRUE}, the values are flattened to a single list e.g. list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)) becomes list(x1 = 1, x2 = 2, y = 3).
The reading functions combine the hashes to a table where the names of the inner lists are the column names.
For example, \verb{xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))} becomes \code{data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))}.
-Vectors in list columns must be wrapped in lists.
-Otherwise, \verb{$read_values()} will expand the table by the length of the vectors.
-For example, \verb{xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = c("A", "B", "C"))) does not work. Pass }xs_extra = list(list(extra = list(c("A", "B", "C"))))` instead.
\subsection{Usage}{
-\if{html}{\out{}}\preformatted{Rush$read_hashes(keys, fields)}\if{html}{\out{
}}
+\if{html}{\out{}}\preformatted{Rush$read_hashes(keys, fields, flatten = TRUE)}\if{html}{\out{
}}
}
\subsection{Arguments}{
@@ -1113,6 +1129,35 @@ For example, \verb{xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra =
\item{\code{keys}}{(\code{character()})\cr
Keys of the hashes.}
+\item{\code{fields}}{(\code{character()})\cr
+Fields to be read from the hashes.}
+
+\item{\code{flatten}}{(\code{logical(1)})\cr
+Whether to flatten the list.}
+}
+\if{html}{\out{}}
+}
+\subsection{Returns}{
+(list of \code{list()})\cr
+The outer list contains one element for each key.
+The inner list is the combination of the lists stored at the different fields.
+}
+}
+\if{html}{\out{
}}
+\if{html}{\out{}}
+\if{latex}{\out{\hypertarget{method-Rush-read_hash}{}}}
+\subsection{Method \code{read_hash()}}{
+Reads a single Redis hash and returns the values as a list named by the fields.
+\subsection{Usage}{
+\if{html}{\out{}}\preformatted{Rush$read_hash(key, fields)}\if{html}{\out{
}}
+}
+
+\subsection{Arguments}{
+\if{html}{\out{}}
+\describe{
+\item{\code{key}}{(\code{character(1)})\cr
+Key of the hash.}
+
\item{\code{fields}}{(\code{character()})\cr
Fields to be read from the hashes.}
}
diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd
index 87eff74..bc84965 100644
--- a/man/RushWorker.Rd
+++ b/man/RushWorker.Rd
@@ -47,7 +47,6 @@ Used in the worker loop to determine whether to continue.}
\item \href{#method-RushWorker-push_running_task}{\code{RushWorker$push_running_task()}}
\item \href{#method-RushWorker-pop_task}{\code{RushWorker$pop_task()}}
\item \href{#method-RushWorker-push_results}{\code{RushWorker$push_results()}}
-\item \href{#method-RushWorker-retry_task}{\code{RushWorker$retry_task()}}
\item \href{#method-RushWorker-set_terminated}{\code{RushWorker$set_terminated()}}
\item \href{#method-RushWorker-clone}{\code{RushWorker$clone()}}
}
@@ -55,6 +54,7 @@ Used in the worker loop to determine whether to continue.}
\if{html}{\out{
Inherited methods
+rush::Rush$add_workers()
rush::Rush$create_worker_script()
rush::Rush$detect_lost_workers()
rush::Rush$fetch_failed_tasks()
@@ -72,9 +72,9 @@ Used in the worker loop to determine whether to continue.}
rush::Rush$push_failed()
rush::Rush$push_priority_tasks()
rush::Rush$push_tasks()
+rush::Rush$read_hash()
rush::Rush$read_hashes()
rush::Rush$read_log()
-rush::Rush$read_tasks()
rush::Rush$reset()
rush::Rush$restart_workers()
rush::Rush$retry_tasks()
@@ -206,12 +206,7 @@ Fields to be returned.}
\subsection{Method \code{push_results()}}{
Pushes results to the data base.
\subsection{Usage}{
-\if{html}{\out{}}\preformatted{RushWorker$push_results(
- keys,
- yss = list(),
- extra = list(),
- conditions = list()
-)}\if{html}{\out{
}}
+\if{html}{\out{}}\preformatted{RushWorker$push_results(keys, yss, extra = NULL)}\if{html}{\out{
}}
}
\subsection{Arguments}{
@@ -225,35 +220,6 @@ List of lists of named results.}
\item{\code{extra}}{(named \code{list()})\cr
List of lists of additional information stored along with the results.}
-
-\item{\code{conditions}}{(named \code{list()})\cr
-List of lists of conditions.}
-
-\item{\code{state}}{(\code{character(1)})\cr
-Status of the tasks.
-If \code{"finished"} the tasks are moved to the finished tasks.
-If \code{"error"} the tasks are moved to the failed tasks.}
-}
-\if{html}{\out{
}}
-}
-}
-\if{html}{\out{
}}
-\if{html}{\out{}}
-\if{latex}{\out{\hypertarget{method-RushWorker-retry_task}{}}}
-\subsection{Method \code{retry_task()}}{
-Retry failed task.
-\subsection{Usage}{
-\if{html}{\out{}}\preformatted{RushWorker$retry_task(task, next_seed = FALSE)}\if{html}{\out{
}}
-}
-
-\subsection{Arguments}{
-\if{html}{\out{}}
-\describe{
-\item{\code{task}}{(\code{list()})\cr
-Task to be retried.}
-
-\item{\code{next_seed}}{(\code{logical(1)})\cr
-Whether to change the seed of the task.}
}
\if{html}{\out{
}}
}
diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml
index 5e816ba..51f9506 100644
--- a/pkgdown/_pkgdown.yml
+++ b/pkgdown/_pkgdown.yml
@@ -58,3 +58,4 @@ reference:
contents:
- get_hostname
- rush-package
+ - with_rng_state
diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R
index 89cdf86..84c94dc 100644
--- a/tests/testthat/test-Rush.R
+++ b/tests/testthat/test-Rush.R
@@ -37,6 +37,9 @@ test_that("workers are started", {
expect_set_equal(rush$worker_ids, worker_ids)
expect_set_equal(rush$worker_states$state, "running")
+ expect_error(rush$start_workers(fun = fun, n_workers = 2, lgr_threshold = c(rush = "debug"), wait_for_workers = TRUE),
+ "Worker configuration is already set")
+
expect_rush_reset(rush)
})
@@ -68,7 +71,7 @@ test_that("additional workers are started", {
worker_ids = rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
expect_equal(rush$n_workers, 2)
- worker_ids_2 = rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
+ worker_ids_2 = rush$add_workers(n_workers = 2, wait_for_workers = TRUE)
rush$wait_for_workers(4)
expect_equal(rush$n_workers, 4)
@@ -124,6 +127,27 @@ test_that("globals are available on the worker", {
expect_rush_reset(rush)
})
+test_that("named globals are available on the worker", {
+ skip_on_cran()
+ skip_on_ci()
+
+ config = start_flush_redis()
+ rush = Rush$new(network_id = "test-rush", config = config)
+ fun = function(x1, x2, ...) list(y = z)
+ x <<- 33
+
+ rush$start_workers(fun = fun, n_workers = 2, globals = c(z = "x"), wait_for_workers = TRUE)
+
+ xss = list(list(x1 = 1, x2 = 2))
+ keys = rush$push_tasks(xss)
+ rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
+
+ expect_equal(rush$n_finished_tasks, 1)
+ expect_equal(rush$fetch_finished_tasks()$y, 33)
+
+ expect_rush_reset(rush)
+})
+
# start workers with script ----------------------------------------------------
test_that("worker can be started with script", {