Skip to content

Commit

Permalink
Merge pull request #63 from mrc-ide/mrc-4929
Browse files Browse the repository at this point in the history
Use hipercow_resources when running tasks
  • Loading branch information
richfitz authored Jan 16, 2024
2 parents 524884c + 318e99f commit 8cf97c0
Show file tree
Hide file tree
Showing 30 changed files with 374 additions and 101 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow
Title: High Performance Computing
Version: 0.2.16
Version: 0.2.17
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
24 changes: 23 additions & 1 deletion R/resource.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,22 @@ hipercow_resources_validate <- function(resources,
driver = NULL,
root = NULL) {

root <- hipercow_root(root)
# A bit of a hack for now - handle NULL driver (which often happens
# in the tests) - and also ignore if more than one driver is
# configured, which is caught elsewhere; here we would return a
# "driver must be a scalar" error.

if (is.null(driver) || length(driver) > 1) {
return(resources)
}

cluster_info <- hipercow_cluster_info(driver, root)

if (is.null(resources$queue$computed)) {
resources$queue$computed <- cluster_info$default_queue
}

validate_cluster_cores(resources$cores$computed, cluster_info$max_cores)

validate_cluster_memory(
Expand All @@ -281,7 +295,7 @@ hipercow_resources_validate <- function(resources,
validate_cluster_requested_nodes(
resources$requested_nodes$computed, cluster_info$nodes)

TRUE
resources
}


Expand Down Expand Up @@ -327,3 +341,11 @@ validate_cluster_requested_nodes <- function(nodes, cluster_nodes) {
}
}
}


as_hipercow_resources <- function(resources, root) {
if (is.null(resources)) {
resources <- hipercow_resources()
}
hipercow_resources_validate(resources, names(root$config), root)
}
16 changes: 12 additions & 4 deletions R/submit.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@
##'
##' @param ... Disallowed additional arguments, don't use.
##'
##' @param resources A list generated by `hipercow_resources` giving
##' the cluster resource requirements to run your task.
##'
##' @param driver The name of the driver to use, or you can leave
##' blank if only one is configured (this will be typical).
##'
##' @param root The hipercow root
##'
##' @export
task_submit <- function(id, ..., driver = NULL, root = NULL) {
task_submit <- function(id, ..., resources = NULL,
driver = NULL, root = NULL) {
if (...length() > 0) {
cli::cli_abort("Additional arguments to 'task_submit' not allowed")
}
root <- hipercow_root(root)

dat <- hipercow_driver_prepare(driver, root, environment())

if (!is.null(resources$hold_until$computed)) {
if (resources$hold_until$computed %in% c("tonight", "midnight", "weekend")) {
resources$hold_until$computed <- special_time(resources$hold_until$computed)
}
}

n <- length(id)
if (n == 0) {
return(invisible())
Expand All @@ -34,7 +42,7 @@ task_submit <- function(id, ..., driver = NULL, root = NULL) {
}

for (i in id) {
dat$driver$submit(i, dat$config, root$path$root)
dat$driver$submit(i, resources, dat$config, root$path$root)
writeLines(dat$name, file.path(root$path$tasks, i, STATUS_SUBMITTED))
root$cache$driver[[i]] <- dat$name
if (n > 1) {
Expand Down
30 changes: 20 additions & 10 deletions R/task-create.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
##' submission. The default `NULL` will submit a task if a driver
##' is configured.
##'
##' @param resources A list generated by `hipercow_resources` giving
##' the cluster resource requirements to run your task.
##'
##' @inheritParams task_eval
##'
##' @return A task id, a string of hex characters. Use this to
Expand All @@ -36,14 +39,15 @@
##' @export
task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
resources <- as_hipercow_resources(resources, root)
variables <- task_variables(export, envir, environment, root,
rlang::current_env())
path <- relative_workdir(root$path$root)
id <- task_create(root, "explicit", path, environment,
expr = expr, variables = variables)
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand All @@ -61,15 +65,17 @@ task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
##' @inherit task_create_explicit return
##' @export
task_create_expr <- function(expr, environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
expr <- check_expression(rlang::enquo(expr))
resources <- as_hipercow_resources(resources, root)
variables <- task_variables(
all.vars(expr$value), expr$envir, environment, root, rlang::current_env())
path <- relative_workdir(root$path$root)
id <- task_create(root, "expression", path, environment,
expr = expr$value, variables = variables)
task_submit_maybe(id, submit, root, rlang::current_env())

task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand Down Expand Up @@ -99,7 +105,7 @@ task_create_expr <- function(expr, environment = "default", submit = NULL,
##' @export
task_create_script <- function(script, chdir = FALSE, echo = TRUE,
environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
if (!file.exists(script)) {
cli::cli_abort("Script file '{script}' does not exist")
Expand All @@ -108,14 +114,15 @@ task_create_script <- function(script, chdir = FALSE, echo = TRUE,
cli::cli_abort(
"Script file '{script}' is not contained within hipercow root")
}
resources <- as_hipercow_resources(resources, root)
path <- relative_workdir(root$path$root)
script <- as.character(fs::path_rel(script, getwd()))
assert_scalar_logical(chdir, call = rlang::current_env())
ensure_environment_exists(environment, root, rlang::current_env())

id <- task_create(root, "script", path, environment,
script = script, chdir = chdir, echo = echo)
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand All @@ -142,8 +149,11 @@ task_create_script <- function(script, chdir = FALSE, echo = TRUE,
##'
##' @export
task_create_bulk_expr <- function(expr, data, environment = "default",
submit = NULL, root = NULL) {
submit = NULL, resources = NULL,
root = NULL) {
root <- hipercow_root(root)
resources <- as_hipercow_resources(resources, root)

if (!inherits(data, "data.frame")) {
cli::cli_abort("Expected 'data' to be a data.frame (or tbl, etc)")
}
Expand Down Expand Up @@ -176,7 +186,7 @@ task_create_bulk_expr <- function(expr, data, environment = "default",
task_create(root, "expression", path, environment,
expr = expr$value, variables = variables_i)
})
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand Down Expand Up @@ -255,7 +265,7 @@ task_variables <- function(names, envir, environment, root, call = NULL) {
}


task_submit_maybe <- function(id, submit, root, call) {
task_submit_maybe <- function(id, submit, resources, root, call) {
if (!is.null(submit)) {
## Could also allow character here soon.
assert_scalar_logical(submit, call = call)
Expand All @@ -276,7 +286,7 @@ task_submit_maybe <- function(id, submit, root, call) {
cli::cli_abort("Can't cope with more than one driver configured yet",
call = call)
}
task_submit(id, driver = driver, root = root)
task_submit(id, resources = resources, driver = driver, root = root)
TRUE
}

Expand Down
8 changes: 6 additions & 2 deletions R/task-retry.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
##'
##' @export
##' @return New identifiers for the retried tasks
task_retry <- function(id, submit = NULL, root = NULL) {
task_retry <- function(id, submit = NULL, resources = NULL, root = NULL) {
root <- hipercow_root(root)

# More thinking to do on what resources should be for a retry
resources <- as_hipercow_resources(resources, root)

id_real <- follow_retry_map(id, root)
status <- task_status(id_real, follow = FALSE, root)
err <- !(status %in% c("success", "failure", "cancelled"))
Expand All @@ -46,7 +50,7 @@ task_retry <- function(id, submit = NULL, root = NULL) {

update_retry_map(id_new, id_real, id_base, root)

task_submit_maybe(id_new, submit, root, rlang::current_env())
task_submit_maybe(id_new, submit, resources, root, rlang::current_env())

id_new
}
Expand Down
43 changes: 42 additions & 1 deletion R/util.R
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,45 @@ duration_to_minutes <- function(period, name = "testing") {
index <- index + 1
}
minutes
}
}

format_datetime <- function(year, month, day, hour, minute, second) {
format(to_POSIXct(
sprintf("%s-%s-%s %s:%s:%s", year, month, day, hour, minute, second)),
"%Y-%m-%d %H:%M:%S")
}

to_POSIXct <- function(s) {
as.POSIXct(s, format = "%Y-%m-%d %H:%M:%S")
}


special_time <- function(name, now = Sys.time()) {
dt <- unclass(as.POSIXlt(now))

if (name == "tonight") { # If between 7pm and 3am, run. Otherwise wait for 7pm
if ((dt$hour < 19) && (dt$hour >= 3)) {
dt$hour <- 19
dt$min <- 0
dt$sec <- 0
}

} else if (name == "midnight") { # Will allow up to 3am again/
if (dt$hour >= 3) {
date <- as.Date(now) + 1
dt <- unclass(as.POSIXlt(date))
}

} else if (name == "weekend") {
date <- as.Date(now)
if ((dt$wday < 6) && (dt$wday > 0)) { # We'll allow launching on Sat/Sun
date <- date + (6 - dt$wday)
dt <- unclass(as.POSIXlt(date))
}
} else {
cli::cli_abort("Unrecognised special time {name}")
}


to_POSIXct(format_datetime((1900 + dt$year), (1 + dt$mon), dt$mday,
dt$hour, dt$min, dt$sec))}
2 changes: 1 addition & 1 deletion drivers/windows/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow.windows
Title: DIDE HPC Support for Windows
Version: 0.2.16
Version: 0.2.17
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
16 changes: 0 additions & 16 deletions drivers/windows/R/cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,6 @@ valid_clusters <- function() {
}


## TODO: this will move into an API call
valid_templates <- function(cluster) {
switch(
cluster,
"wpia-hn" = "AllNodes",
stop(sprintf("Invalid cluster '%s'", cluster)))
}


valid_cores <- function(cluster) {
switch(cluster,
"wpia-hn" = 32,
stop(sprintf("Invalid cluster '%s'", cluster)))
}


r_versions <- function() {
if (is.null(cache$r_versions)) {
cache$r_versions <- r_versions_fetch()
Expand Down
1 change: 0 additions & 1 deletion drivers/windows/R/config.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ windows_configure <- function(shares = NULL, r_version = NULL) {
stopifnot(fs::dir_exists(file.path(path, "hipercow")))
fs::dir_create(file.path(path, path_lib))
list(cluster = "wpia-hn",
template = "AllNodes",
shares = dide_cluster_paths(shares, path),
r_version = r_version,
path_lib = unix_path_slashes(path_lib))
Expand Down
4 changes: 2 additions & 2 deletions drivers/windows/R/driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ hipercow_driver_windows <- function() {
}


windows_submit <- function(id, config, path_root) {
windows_submit <- function(id, resources, config, path_root) {
path_batch <- write_batch_task_run(id, config, path_root)

path_batch_dat <- prepare_path(path_batch, config$shares)
path_batch_unc <- windows_path_slashes(
file.path(path_batch_dat$path_remote, path_batch_dat$rel))

client <- get_web_client()
dide_id <- client$submit(path_batch_unc, id, config$template)
dide_id <- client$submit(path_batch_unc, id, resources)
path_dide_id <- file.path(dirname(path_batch), DIDE_ID)
writeLines(dide_id, path_dide_id)
}
Expand Down
7 changes: 4 additions & 3 deletions drivers/windows/R/provision.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ windows_provision_run <- function(args, config, path_root) {
file.path(path_batch_dat$path_remote, path_batch_dat$rel))

client <- get_web_client()
template <- "BuildQueue"
dide_id <- client$submit(path_batch_unc, sprintf("conan:%s", id), template)

res <- hipercow::hipercow_resources()
res <- hipercow::hipercow_resources_validate(res, root = path_root)
res$queue <- list(original = "", computed = "BuildQueue")
dide_id <- client$submit(path_batch_unc, sprintf("conan:%s", id), res)
path_dide_id <- file.path(dirname(path_batch), DIDE_ID)
writeLines(dide_id, path_dide_id)

Expand Down
1 change: 1 addition & 0 deletions drivers/windows/R/resource.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ windows_cluster_info <- function(config, path_root) {
max_cores = 32,
max_ram = 512,
queues = c("AllNodes", "Training"),
default_queue = "AllNodes",
nodes = sprintf("wpia-%003d", (1:70)[-c(41, 42, 49, 50)])
)
class(info) <- "windows_cluster_info"
Expand Down
Loading

0 comments on commit 8cf97c0

Please sign in to comment.