Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: set worker arguments only once #31

Merged
merged 1 commit into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ Rush = R6::R6Class("Rush",
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
start_workers = function(
Expand All @@ -218,6 +218,11 @@ Rush = R6::R6Class("Rush",
assert_flag(supervise)
r = self$connector

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
stop("Worker configuration is already set. Use `$add_workers()` to add additional workers or `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand All @@ -230,21 +235,54 @@ Rush = R6::R6Class("Rush",
...
)

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
self$processes = set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = supervise, stdout = "|", stderr = "|") # , stdout = "|", stderr = "|"
}), worker_ids))
}), worker_ids)

if (wait_for_workers) self$wait_for_workers(n_workers)

return(invisible(worker_ids))
},

#' @description
#' Restart workers.
#' Add local workers to the network.
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' @param wait_for_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param supervise (`logical(1)`)\cr
#' Whether to kill the workers when the main R process is shut down.
add_workers = function(n_workers, wait_for_workers = TRUE, supervise = TRUE) {
assert_count(n_workers)
assert_flag(wait_for_workers)
assert_flag(supervise)
r = self$connector
n_running_workers = self$n_running_workers

lg$debug("Starting %i worker(s)", n_workers)

worker_ids = uuid::UUIDgenerate(n = n_workers)
self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) {
processx::process$new("Rscript",
args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')",
self$network_id, worker_id, private$.hostname, self$config$url)),
supervise = TRUE, stdout = "|", stderr = "|")
}), worker_ids))

if (wait_for_workers) self$wait_for_workers(n_workers + n_running_workers)

return(invisible(worker_ids))
},

#' @description
#' Restart local workers.
#'
#' @param worker_ids (`character()`)\cr
#' Worker ids to be restarted.
Expand Down Expand Up @@ -281,6 +319,11 @@ Rush = R6::R6Class("Rush",
...
) {

if (self$has_start_arguments) {
# too reduce the complexity, we don't allow to start workers with different arguments for now.
lg$warn("Worker configuration is already set. Use `$stop_workers()` to stop the workers and start with new arguments.")
}

# push worker config to redis
private$.push_worker_config(
globals = globals,
Expand Down Expand Up @@ -1043,8 +1086,8 @@ Rush = R6::R6Class("Rush",
#' @description
#' Reads a single Redis hash and returns the values as a list named by the fields.
#'
#' @param keys (`character()`)\cr
#' Keys of the hashes.
#' @param key (`character(1)`)\cr
#' Key of the hash.
#' @param fields (`character()`)\cr
#' Fields to be read from the hashes.
#'
Expand Down Expand Up @@ -1276,6 +1319,14 @@ Rush = R6::R6Class("Rush",
r = self$connector
r$command(c("CONFIG", "SET", "save", str_collapse(rhs, sep = " ")))
private$.snapshot_schedule = rhs
},

#' @field has_start_arguments (`logical(1)`)\cr
#' Whether the start arguments for the workers are set.
has_start_arguments = function(rhs) {
assert_ro_binding(rhs)
r = self$connector
as.logical(r$EXISTS(private$.get_key("start_args")))
}
),

Expand Down
89 changes: 67 additions & 22 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 3 additions & 37 deletions man/RushWorker.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkgdown/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ reference:
contents:
- get_hostname
- rush-package
- with_rng_state
Loading
Loading