diff --git a/.gitignore b/.gitignore index 4021ae1..8d9d90d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,99 @@ -inst/doc -attic +# File created using '.gitignore Generator' for Visual Studio Code: https://bit.ly/vscode-gig +# Created by https://www.toptal.com/developers/gitignore/api/visualstudiocode,linux,r +# Edit at https://www.toptal.com/developers/gitignore?templates=visualstudiocode,linux,r + +### Linux ### +*~ + +# temporary files which can be created if a process still has a handle open of a deleted file +.fuse_hidden* + +# KDE directory preferences +.directory + +# Linux trash folder which might appear on any partition or disk +.Trash-* + +# .nfs files are created when an open file is removed but is still being accessed +.nfs* + +### R ### +# History files +.Rhistory +.Rapp.history + +# Session Data files +.RData +.RDataTmp + +# User-specific files +.Ruserdata + +# Example code in package build process +*-Ex.R + +# Output files from R CMD build +/*.tar.gz + +# Output files from R CMD check +/*.Rcheck/ + +# RStudio files +.Rproj.user/ + +# produced vignettes +vignettes/*.html +vignettes/*.pdf + +# OAuth2 token, see https://github.com/hadley/httr/releases/tag/v0.3 +.httr-oauth + +# knitr and R markdown default cache directories +*_cache/ +/cache/ + +# Temporary files created by R markdown +*.utf8.md +*.knit.md + +# R Environment Variables +.Renviron + +# pkgdown site +docs/ + +# translation temp files +po/*~ + +# RStudio Connect folder +rsconnect/ + +### R.Bookdown Stack ### +# R package: bookdown caching files +/*_files/ + +### VisualStudioCode ### +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json +!.vscode/*.code-snippets + +# Local History for Visual Studio Code +.history/ + +# Built Visual Studio Code Extensions +*.vsix + +### VisualStudioCode Patch ### +# Ignore all local history of files +.history +.ionide + +# End of https://www.toptal.com/developers/gitignore/api/visualstudiocode,linux,r + +# Custom rules (everything added below won't be overriden by 'Generate .gitignore File' if you use 'Update' option) + +.Rprofile +attic/ diff --git a/R/Rush.R b/R/Rush.R index cec672a..c486e7b 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -130,7 +130,6 @@ Rush = R6::R6Class("Rush", } self$connector = redux::hiredis(self$config) private$.hostname = get_hostname() - private$.pid_exists = choose_pid_exists() }, #' @description @@ -166,6 +165,8 @@ Rush = R6::R6Class("Rush", #' #' @param n_workers (`integer(1)`)\cr #' Number of workers to be started. + #' @param supervise (`logical(1)`)\cr + #' Whether to kill the workers when the main R process is shut down. #' @param wait_for_workers (`logical(1)`)\cr #' Whether to wait until all workers are available. #' @param ... (`any`)\cr @@ -179,12 +180,13 @@ Rush = R6::R6Class("Rush", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - + supervise = TRUE, worker_loop = worker_loop_default, ... ) { n_workers = assert_count(n_workers %??% rush_env$n_workers) assert_flag(wait_for_workers) + assert_flag(supervise) # push worker config to redis private$.push_worker_config( @@ -202,7 +204,8 @@ Rush = R6::R6Class("Rush", self$processes = c(self$processes, set_names(map(worker_ids, function(worker_id) { processx::process$new("Rscript", args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')", - self$network_id, worker_id, private$.hostname, self$config$url))) + self$network_id, worker_id, private$.hostname, self$config$url)), + supervise = supervise) }), worker_ids)) if (wait_for_workers) self$wait_for_workers(n_workers) @@ -210,7 +213,26 @@ Rush = R6::R6Class("Rush", return(invisible(worker_ids)) }, + #' @description + #' Restart workers. + #' + #' @param worker_ids (`character()`)\cr + #' Worker ids to be restarted. + restart_workers = function(worker_ids) { + assert_subset(unlist(worker_ids), self$worker_ids) + r = self$connector + lg$error("Restarting %i worker(s): %s", length(worker_ids), str_collapse(worker_ids)) + processes = set_names(map(worker_ids, function(worker_id) { + # restart worker + processx::process$new("Rscript", + args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')", + self$network_id, worker_id, private$.hostname, self$config$url))) + }), worker_ids) + self$processes = insert_named(self$processes, processes) + + return(invisible(worker_ids)) + }, #' @description #' Create script to start workers. @@ -290,30 +312,33 @@ Rush = R6::R6Class("Rush", } else if (type == "kill") { worker_info = self$worker_info[list(worker_ids), , on = "worker_id"] - # kill local - local_workers = worker_info[list("local"), , on = c("host"), nomatch = NULL] + local_workers = worker_info[list("local"), worker_id, on = c("host"), nomatch = NULL] + lg$debug("Killing %i local worker(s) %s", length(local_workers), as_short_string(local_workers)) - if (nrow(local_workers)) { - tools::pskill(local_workers$pid) - cmds = map(local_workers$worker_id, function(worker_id) { - c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("killed_worker_ids"), worker_id) - }) - r$pipeline(.commands = cmds) - } + # kill with processx + walk(local_workers, function(worker_id) { + killed = self$processes[[worker_id]]$kill() + if (!killed) lg$error("Failed to kill worker %s", worker_id) + }) + + # set worker state + cmds_local = map(local_workers, function(worker_id) { + c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("killed_worker_ids"), worker_id) + }) # kill remote - remote_workers = worker_info[list("remote"), worker_id, on = c("host"), nomatch = NULL] - - if (length(remote_workers)) { - # push kill signal to heartbeat - cmds = unlist(map(remote_workers, function(worker_id) { - list( - c("LPUSH", private$.get_worker_key("kill", worker_id), "TRUE"), - c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("killed_worker_ids"), worker_id)) - }), recursive = FALSE) - r$pipeline(.commands = cmds) - } + remote_workers = worker_info [list("remote"), worker_id, on = c("host"), nomatch = NULL] + lg$debug("Killing %i remote worker(s) %s", length(remote_workers), as_short_string(remote_workers)) + + # push kill signal to heartbeat process and set worker state + cmds_remote = unlist(map(remote_workers, function(worker_id) { + list( + c("LPUSH", private$.get_worker_key("kill", worker_id), "TRUE"), + c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("killed_worker_ids"), worker_id)) + }), recursive = FALSE) + + r$pipeline(.commands = c(cmds_local, cmds_remote)) } return(invisible(self)) @@ -326,12 +351,17 @@ Rush = R6::R6Class("Rush", #' Checking local workers on unix systems only takes a few microseconds per worker. #' But checking local workers on windows might be very slow. #' Workers with a heartbeat process are checked with the heartbeat. - detect_lost_workers = function() { + #' Lost tasks are marked as `"lost"`. + #' + #' @param restart (`logical(1)`)\cr + #' Whether to restart lost workers. + detect_lost_workers = function(restart = FALSE) { + assert_flag(restart) r = self$connector # check workers with a heartbeat heartbeat_keys = r$SMEMBERS(private$.get_key("heartbeat_keys")) - if (length(heartbeat_keys)) { + lost_workers = if (length(heartbeat_keys)) { lg$debug("Checking %i worker(s) with heartbeat", length(heartbeat_keys)) running = as.logical(r$pipeline(.commands = map(heartbeat_keys, function(heartbeat_key) c("EXISTS", heartbeat_key)))) if (all(running)) return(invisible(self)) @@ -340,50 +370,55 @@ Rush = R6::R6Class("Rush", heartbeat_keys = heartbeat_keys[!running] lost_workers = self$worker_info[heartbeat == heartbeat_keys, worker_id] - # move worker ids to lost workers set and remove heartbeat keys - cmds = map(lost_workers, function(worker_id) c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("lost_worker_ids"), worker_id)) - r$pipeline(.commands = c(cmds, list(c("SREM", "heartbeat_keys", heartbeat_keys)))) - lg$error("Lost %i worker(s): %s", length(lost_workers), str_collapse(lost_workers)) + # set worker state + cmds = map(lost_workers, function(worker_id) { + c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("lost_worker_ids"), worker_id) + }) + + # remove heartbeat keys + cmds = c(cmds, list(c("SREM", "heartbeat_keys", heartbeat_keys))) + r$pipeline(.commands = cmds) + lost_workers } # check local workers without a heartbeat - local_pids = r$SMEMBERS(private$.get_key("local_pids")) - if (length(local_pids)) { - lg$debug("Checking %i worker(s) with process id", length(local_pids)) - running = map_lgl(local_pids, function(pid) private$.pid_exists(pid)) + local_workers = r$SMEMBERS(private$.get_key("local_workers")) + lost_workers = if (length(local_workers)) { + lg$debug("Checking %i worker(s) with process id", length(local_workers)) + running = map_lgl(local_workers, function(worker_id) self$processes[[worker_id]]$is_alive()) if (all(running)) return(invisible(self)) # search for associated worker ids - local_pids = local_pids[!running] - lost_workers = self$worker_info[pid == local_pids, worker_id] - - # move worker ids to lost workers set and remove pids - cmds = map(lost_workers, function(worker_id) c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("lost_worker_ids"), worker_id)) - r$pipeline(.commands = c(cmds, list(c("SREM", private$.get_key("local_pids"), local_pids)))) + lost_workers = local_workers[!running] lg$error("Lost %i worker(s): %s", length(lost_workers), str_collapse(lost_workers)) - } - return(invisible(self)) - }, + if (restart) { + self$restart_workers(unlist(lost_workers)) + lost_workers + } else { + # set worker state + cmds = map(lost_workers, function(worker_id) { + c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("lost_worker_ids"), worker_id) + }) - #' @description - #' Detect lost tasks. - #' Changes the state of tasks to `"lost"` if the worker crashed. - detect_lost_tasks = function() { - r = self$connector - if (!self$n_workers) return(invisible(self)) - self$detect_lost_workers() - lost_workers = self$lost_worker_ids + # remove local pids + cmds = c(cmds, list(c("SREM", private$.get_key("local_workers"), lost_workers))) + r$pipeline(.commands = cmds) + lost_workers + } + } + # mark lost tasks if (length(lost_workers)) { running_tasks = self$fetch_running_tasks(fields = "worker_extra") if (!nrow(running_tasks)) return(invisible(self)) - bin_state = redux::object_to_bin(list(state = "lost")) keys = running_tasks[lost_workers, keys, on = "worker_id"] + lg$error("Lost %i task(s): %s", length(keys), str_collapse(keys)) + cmds = unlist(map(keys, function(key) { list( - c("HSET", key, "state", list(bin_state)), + list("HSET", key, "state", failed_state), c("SREM", private$.get_key("running_tasks"), key), c("RPUSH", private$.get_key("failed_tasks"), key)) }), recursive = FALSE) @@ -416,7 +451,6 @@ Rush = R6::R6Class("Rush", r$DEL(private$.get_worker_key("kill", worker_id)) r$DEL(private$.get_worker_key("heartbeat", worker_id)) r$DEL(private$.get_worker_key("queued_tasks", worker_id)) - r$DEL(private$.get_worker_key("log", worker_id)) r$DEL(private$.get_worker_key("events", worker_id)) }) @@ -439,13 +473,12 @@ Rush = R6::R6Class("Rush", r$DEL(private$.get_key("lost_worker_ids")) r$DEL(private$.get_key("start_args")) r$DEL(private$.get_key("terminate_on_idle")) - r$DEL(private$.get_key("local_pids")) + r$DEL(private$.get_key("local_workers")) r$DEL(private$.get_key("heartbeat_keys")) # reset counters and caches private$.cached_results_dt = data.table() private$.cached_tasks_dt = data.table() - private$.cached_worker_info = data.table() private$.n_seen_results = 0 return(invisible(self)) @@ -751,15 +784,15 @@ Rush = R6::R6Class("Rush", #' #' @param keys (`character()`)\cr #' Keys of the tasks to wait for. - #' @param detect_lost_tasks (`logical(1)`)\cr + #' @param detect_lost_workers (`logical(1)`)\cr #' Whether to detect failed tasks. #' Comes with an overhead. - wait_for_tasks = function(keys, detect_lost_tasks = FALSE) { + wait_for_tasks = function(keys, detect_lost_workers = FALSE) { assert_character(keys, min.len = 1) - assert_flag(detect_lost_tasks) + assert_flag(detect_lost_workers) while (any(keys %nin% c(self$finished_tasks, self$failed_tasks)) && self$n_running_workers > 0) { - if (detect_lost_tasks) self$detect_lost_tasks() + if (detect_lost_workers) self$detect_lost_workers() Sys.sleep(0.01) } @@ -1007,8 +1040,7 @@ Rush = R6::R6Class("Rush", #' Contains information about the workers. worker_info = function(rhs) { assert_ro_binding(rhs) - if (nrow(private$.cached_worker_info) == self$n_workers) return(private$.cached_worker_info) - + if (!self$n_running_workers) return(data.table()) r = self$connector fields = c("worker_id", "pid", "host", "hostname", "heartbeat") @@ -1019,8 +1051,6 @@ Rush = R6::R6Class("Rush", # fix type worker_info[, pid := as.integer(pid)][] - # cache result - private$.cached_worker_info = worker_info worker_info }, @@ -1078,9 +1108,6 @@ Rush = R6::R6Class("Rush", .cached_tasks_list = list(), - # cache of the worker info which usually does not change after starting the workers - .cached_worker_info = data.table(), - # counter of the seen results for the latest results methods .n_seen_results = 0, diff --git a/R/RushWorker.R b/R/RushWorker.R index d62e618..538986b 100644 --- a/R/RushWorker.R +++ b/R/RushWorker.R @@ -107,7 +107,7 @@ RushWorker = R6::R6Class("RushWorker", if (!is.null(self$heartbeat)) { r$SADD(private$.get_key("heartbeat_keys"), private$.get_worker_key("heartbeat")) } else if (host == "local") { - r$SADD(private$.get_key("local_pids"), Sys.getpid()) + r$SADD(private$.get_key("local_workers"), self$worker_id) } # register worker info in @@ -205,29 +205,12 @@ RushWorker = R6::R6Class("RushWorker", return(invisible(self)) }, - #' @description - #' Write log message written with the `lgr` package to the database. - write_log = function() { - if (!is.null(self$lgr_buffer)) { - r = self$connector - tab = self$lgr_buffer$dt - if (nrow(tab)) { - bin_log = redux::object_to_bin(self$lgr_buffer$dt) - r$command(list("RPUSH", private$.get_worker_key("log"), bin_log)) - self$lgr_buffer$flush() - } - } - - return(invisible(self)) - }, - #' @description #' Mark the worker as terminated. #' Last step in the worker loop before the worker terminates. set_terminated = function() { r = self$connector lg$debug("Worker %s terminated", self$worker_id) - self$write_log() r$command(c("SMOVE", private$.get_key("running_worker_ids"), private$.get_key("terminated_worker_ids"), self$worker_id)) return(invisible(self)) } diff --git a/R/worker_loops.R b/R/worker_loops.R index b1064e8..ef3635a 100644 --- a/R/worker_loops.R +++ b/R/worker_loops.R @@ -30,7 +30,6 @@ worker_loop_default = function(fun, constants = NULL, rush) { } else { if (rush$terminated_on_idle) break } - rush$write_log() } return(NULL) diff --git a/man/Rush.Rd b/man/Rush.Rd index 47618f3..38936d6 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -210,11 +210,11 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho \item \href{#method-Rush-format}{\code{Rush$format()}} \item \href{#method-Rush-print}{\code{Rush$print()}} \item \href{#method-Rush-start_workers}{\code{Rush$start_workers()}} +\item \href{#method-Rush-restart_workers}{\code{Rush$restart_workers()}} \item \href{#method-Rush-create_worker_script}{\code{Rush$create_worker_script()}} \item \href{#method-Rush-wait_for_workers}{\code{Rush$wait_for_workers()}} \item \href{#method-Rush-stop_workers}{\code{Rush$stop_workers()}} \item \href{#method-Rush-detect_lost_workers}{\code{Rush$detect_lost_workers()}} -\item \href{#method-Rush-detect_lost_tasks}{\code{Rush$detect_lost_tasks()}} \item \href{#method-Rush-reset}{\code{Rush$reset()}} \item \href{#method-Rush-read_log}{\code{Rush$read_log()}} \item \href{#method-Rush-push_tasks}{\code{Rush$push_tasks()}} @@ -314,6 +314,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, + supervise = TRUE, worker_loop = worker_loop_default, ... )}\if{html}{\out{}} @@ -348,6 +349,9 @@ By default (\code{lgr_buffer_size = 0}), the log messages are directly saved in If \code{lgr_buffer_size > 0}, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.} +\item{\code{supervise}}{(\code{logical(1)})\cr +Whether to kill the workers when the main R process is shut down.} + \item{\code{worker_loop}}{(\code{function})\cr Loop run on the workers. Defaults to \link{worker_loop_default} which is called with \code{fun}. @@ -360,6 +364,24 @@ Arguments passed to \code{worker_loop}.} } } \if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-restart_workers}{}}} +\subsection{Method \code{restart_workers()}}{ +Restart workers. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$restart_workers(worker_ids)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{worker_ids}}{(\code{character()})\cr +Worker ids to be restarted.} +} +\if{html}{\out{
}} +} +} +\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-create_worker_script}{}}} \subsection{Method \code{create_worker_script()}}{ @@ -465,21 +487,19 @@ Local workers without a heartbeat are checked by their process id. Checking local workers on unix systems only takes a few microseconds per worker. But checking local workers on windows might be very slow. Workers with a heartbeat process are checked with the heartbeat. +Lost tasks are marked as \code{"lost"}. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$detect_lost_workers()}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$detect_lost_workers(restart = FALSE)}\if{html}{\out{
}} } +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{restart}}{(\code{logical(1)})\cr +Whether to restart lost workers.} } -\if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-Rush-detect_lost_tasks}{}}} -\subsection{Method \code{detect_lost_tasks()}}{ -Detect lost tasks. -Changes the state of tasks to \code{"lost"} if the worker crashed. -\subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$detect_lost_tasks()}\if{html}{\out{
}} +\if{html}{\out{
}} } - } \if{html}{\out{
}} \if{html}{\out{}} @@ -687,7 +707,7 @@ Results. Fetch queued tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_queued_tasks( - fields = c("xs", "xs_extra", "state"), + fields = c("xs", "xs_extra"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -697,7 +717,7 @@ Fetch queued tasks from the data base. \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "state")}.} +Defaults to \code{c("xs", "xs_extra")}.} \item{\code{data_format}}{(\code{character()})\cr Returned data format. @@ -718,7 +738,7 @@ Table of queued tasks. Fetch queued priority tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_priority_tasks( - fields = c("xs", "xs_extra", "state"), + fields = c("xs", "xs_extra"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -728,7 +748,7 @@ Fetch queued priority tasks from the data base. \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "state")}.} +Defaults to \code{c("xs", "xs_extra")}.} \item{\code{data_format}}{(\code{character()})\cr Returned data format. @@ -749,7 +769,7 @@ Table of queued priority tasks. Fetch running tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_running_tasks( - fields = c("xs", "xs_extra", "worker_extra", "state"), + fields = c("xs", "xs_extra", "worker_extra"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -759,7 +779,7 @@ Fetch running tasks from the data base. \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "worker_extra", "state")}.} +Defaults to \code{c("xs", "xs_extra", "worker_extra")}.} \item{\code{data_format}}{(\code{character()})\cr Returned data format. @@ -781,7 +801,7 @@ Fetch finished tasks from the data base. Finished tasks are cached. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_finished_tasks( - fields = c("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "state"), + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), reset_cache = FALSE, data_format = "data.table" )}\if{html}{\out{
}} @@ -792,7 +812,7 @@ Finished tasks are cached. \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "state")}.} +Defaults to \code{c("xs", "xs_extra", "worker_extra", "ys", "ys_extra")}.} \item{\code{reset_cache}}{(\code{logical(1)})\cr Whether to reset the cache.} @@ -817,7 +837,7 @@ Block process until a new finished task is available. Returns all finished tasks or \code{NULL} if no new task is available after \code{timeout} seconds. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$wait_for_finished_tasks( - fields = c("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "state"), + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), timeout = Inf, data_format = "data.table" )}\if{html}{\out{
}} @@ -828,7 +848,7 @@ Returns all finished tasks or \code{NULL} if no new task is available after \cod \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "state")}.} +Defaults to \code{c("xs", "xs_extra", "worker_extra", "ys", "ys_extra")}.} \item{\code{timeout}}{(\code{numeric(1)})\cr Time to wait for a result in seconds.} @@ -852,7 +872,7 @@ Table of finished tasks. Fetch failed tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_failed_tasks( - fields = c("xs", "worker_extra", "condition", "state"), + fields = c("xs", "worker_extra", "condition"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -862,7 +882,7 @@ Fetch failed tasks from the data base. \describe{ \item{\code{fields}}{(\code{character()})\cr Fields to be read from the hashes. -Defaults to \code{c("xs", "xs_extra", "worker_extra", "condition", "state")}.} +Defaults to \verb{c("xs", "xs_extra", "worker_extra", "condition"}.} \item{\code{data_format}}{(\code{character()})\cr Returned data format. @@ -883,7 +903,7 @@ Table of failed tasks. Fetch all tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_tasks( - fields = c("xs", "xs_extra", "worker_extra", "ys", "ys_extra", "condition", "state"), + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition", "state"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -914,7 +934,7 @@ Table of all tasks. Wait until tasks are finished. The function also unblocks when no worker is running or all tasks failed. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$wait_for_tasks(keys, detect_lost_tasks = FALSE)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$wait_for_tasks(keys, detect_lost_workers = FALSE)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -923,7 +943,7 @@ The function also unblocks when no worker is running or all tasks failed. \item{\code{keys}}{(\code{character()})\cr Keys of the tasks to wait for.} -\item{\code{detect_lost_tasks}}{(\code{logical(1)})\cr +\item{\code{detect_lost_workers}}{(\code{logical(1)})\cr Whether to detect failed tasks. Comes with an overhead.} } diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd index b9f2664..19dc803 100644 --- a/man/RushWorker.Rd +++ b/man/RushWorker.Rd @@ -47,7 +47,6 @@ Used in the worker loop to determine whether to continue.} \item \href{#method-RushWorker-push_running_task}{\code{RushWorker$push_running_task()}} \item \href{#method-RushWorker-pop_task}{\code{RushWorker$pop_task()}} \item \href{#method-RushWorker-push_results}{\code{RushWorker$push_results()}} -\item \href{#method-RushWorker-write_log}{\code{RushWorker$write_log()}} \item \href{#method-RushWorker-set_terminated}{\code{RushWorker$set_terminated()}} \item \href{#method-RushWorker-clone}{\code{RushWorker$clone()}} } @@ -56,7 +55,6 @@ Used in the worker loop to determine whether to continue.}
Inherited methods