Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use hipercow_resources when running tasks #63

Merged
merged 16 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow
Title: High Performance Computing
Version: 0.2.13
Version: 0.2.14
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
15 changes: 14 additions & 1 deletion R/resource.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,21 @@ hipercow_resources_validate <- function(resources,
driver = NULL,
root = NULL) {

# A bit of a hack for now - handle NULL driver (which often happens
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
# in the tests) - and also ignore if more than one driver is
# configured, which is caught elsewhere; here we would return a
# "driver must be a scalar" error.

if (is.null(driver) || length(driver) > 1) {
return(TRUE)
}

cluster_info <- hipercow_cluster_info(driver, root)

if (is.null(resources$queue$computed)) {
resources$queue$computed <- cluster_info$default_queue
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
}

validate_cluster_cores(resources$cores$computed, cluster_info$max_cores)

validate_cluster_memory(
Expand All @@ -281,7 +294,7 @@ hipercow_resources_validate <- function(resources,
validate_cluster_requested_nodes(
resources$requested_nodes$computed, cluster_info$nodes)

TRUE
resources
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. By doing this (with the assignment of resources$queue$computed) we do open the door to the users sneaking a different queue through here if they are sufficiently motivated. Perhaps that's ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the only thing that gets changed here in the return is the queue gets default-ised if it was null - which would be good to do somewhere - I prefer that to having the user to explicitly talk about the queue.

I wonder if along with the discussion on when to turn "tonight" into a real time etc.... perhaps we really do have two stages of valdation here, the first one for all the syntactic stuff, and theoretical checks, and the second at last minute when we translate the timings, fill in the queue with a default if it was left NULL, and flag the sort of failures we'd only see when we consider the actual running of the job on an actual cluster...

For conversation later!

}


Expand Down
16 changes: 12 additions & 4 deletions R/submit.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@
##'
##' @param ... Disallowed additional arguments, don't use.
##'
##' @param resources A list generated by `hipercow_resources` giving
##' the cluster resource requirements to run your task.
##'
##' @param driver The name of the driver to use, or you can leave
##' blank if only one is configured (this will be typical).
##'
##' @param root The hipercow root
##'
##' @export
task_submit <- function(id, ..., driver = NULL, root = NULL) {
task_submit <- function(id, ..., resources = NULL,
driver = NULL, root = NULL) {
if (...length() > 0) {
cli::cli_abort("Additional arguments to 'task_submit' not allowed")
}
root <- hipercow_root(root)

dat <- hipercow_driver_prepare(driver, root, environment())

if (!is.null(resources$hold_until$computed)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this could be done at validation, unless someone sits on their resources a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in two minds about this. I wonder if people might create a couple of resource objects for their "test_job" and their "real_job" and their "massive_job_that_always_waits_til_midnight", and then have those as kind of ready-made resources to launch their jobs in the same way as last time, without having to think about it.

I am not sure whether this is terribly likely, and might not be totally advisable - once in a while, available resources might change (but then they will need a new package version and will have to rerun their resource things anyway).

But also, I'm not sure doing the translation during submission causes any real problem does it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only "problem" caused by doing it at submission is that there's two places to do the validation really. In particular, the validation here must never fail or it will be quite annoying for the user. We can see how this goes in practice if you want - I expect this flow will need some adjustment when it comes in contact with users, no matter what we pick.

We might sketch out a flow of validation etc as we complicate this too (environment variables and parallel configuration will all interact here too)

if (resources$hold_until$computed %in% c("tonight", "midnight", "weekend")) {
resources$hold_until$computed <- special_time(resources$hold_until$computed)
}
}

n <- length(id)
if (n == 0) {
return(invisible())
Expand All @@ -34,7 +42,7 @@ task_submit <- function(id, ..., driver = NULL, root = NULL) {
}

for (i in id) {
dat$driver$submit(i, dat$config, root$path$root)
dat$driver$submit(i, resources, dat$config, root$path$root)
writeLines(dat$name, file.path(root$path$tasks, i, STATUS_SUBMITTED))
root$cache$driver[[i]] <- dat$name
if (n > 1) {
Expand Down
42 changes: 32 additions & 10 deletions R/task-create.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
##' using your default configuration, or `FALSE` to prevent
##' submission. The default `NULL` will submit a task if a driver
##' is configured.
##'
##' @param resources A list generated by `hipercow_resources` giving
##' the cluster resource requirements to run your task.
##'
##' @inheritParams task_eval
##'
Expand All @@ -36,14 +39,18 @@
##' @export
task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
if (is.null(resources)) {
resources <- hipercow_resources()
}
resources <- hipercow_resources_validate(resources, names(root$config), root)
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
variables <- task_variables(export, envir, environment, root,
rlang::current_env())
path <- relative_workdir(root$path$root)
id <- task_create(root, "explicit", path, environment,
expr = expr, variables = variables)
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand All @@ -61,15 +68,20 @@ task_create_explicit <- function(expr, export = NULL, envir = .GlobalEnv,
##' @inherit task_create_explicit return
##' @export
task_create_expr <- function(expr, environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
expr <- check_expression(rlang::enquo(expr))
if (is.null(resources)) {
resources <- hipercow_resources()
}
resources <- hipercow_resources_validate(resources, names(root$config), root)
variables <- task_variables(
all.vars(expr$value), expr$envir, environment, root, rlang::current_env())
path <- relative_workdir(root$path$root)
id <- task_create(root, "expression", path, environment,
expr = expr$value, variables = variables)
task_submit_maybe(id, submit, root, rlang::current_env())

task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand Down Expand Up @@ -99,7 +111,7 @@ task_create_expr <- function(expr, environment = "default", submit = NULL,
##' @export
task_create_script <- function(script, chdir = FALSE, echo = TRUE,
environment = "default", submit = NULL,
root = NULL) {
resources = NULL, root = NULL) {
root <- hipercow_root(root)
if (!file.exists(script)) {
cli::cli_abort("Script file '{script}' does not exist")
Expand All @@ -108,14 +120,18 @@ task_create_script <- function(script, chdir = FALSE, echo = TRUE,
cli::cli_abort(
"Script file '{script}' is not contained within hipercow root")
}
if (is.null(resources)) {
resources <- hipercow_resources()
}
resources <- hipercow_resources_validate(resources, names(root$config), root)
path <- relative_workdir(root$path$root)
script <- as.character(fs::path_rel(script, getwd()))
assert_scalar_logical(chdir, call = rlang::current_env())
ensure_environment_exists(environment, root, rlang::current_env())

id <- task_create(root, "script", path, environment,
script = script, chdir = chdir, echo = echo)
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand All @@ -142,8 +158,14 @@ task_create_script <- function(script, chdir = FALSE, echo = TRUE,
##'
##' @export
task_create_bulk_expr <- function(expr, data, environment = "default",
submit = NULL, root = NULL) {
submit = NULL, resources = NULL,
root = NULL) {
root <- hipercow_root(root)
if (is.null(resources)) {
resources <- hipercow_resources()
}
resources <- hipercow_resources_validate(resources, names(root$config), root)

if (!inherits(data, "data.frame")) {
cli::cli_abort("Expected 'data' to be a data.frame (or tbl, etc)")
}
Expand Down Expand Up @@ -176,7 +198,7 @@ task_create_bulk_expr <- function(expr, data, environment = "default",
task_create(root, "expression", path, environment,
expr = expr$value, variables = variables_i)
})
task_submit_maybe(id, submit, root, rlang::current_env())
task_submit_maybe(id, submit, resources, root, rlang::current_env())
id
}

Expand Down Expand Up @@ -255,7 +277,7 @@ task_variables <- function(names, envir, environment, root, call = NULL) {
}


task_submit_maybe <- function(id, submit, root, call) {
task_submit_maybe <- function(id, submit, resources, root, call) {
if (!is.null(submit)) {
## Could also allow character here soon.
assert_scalar_logical(submit, call = call)
Expand All @@ -276,7 +298,7 @@ task_submit_maybe <- function(id, submit, root, call) {
cli::cli_abort("Can't cope with more than one driver configured yet",
call = call)
}
task_submit(id, driver = driver, root = root)
task_submit(id, resources = resources, driver = driver, root = root)
TRUE
}

Expand Down
12 changes: 10 additions & 2 deletions R/task-retry.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,16 @@
##'
##' @export
##' @return New identifiers for the retried tasks
task_retry <- function(id, submit = NULL, root = NULL) {
task_retry <- function(id, submit = NULL, resources = NULL, root = NULL) {
root <- hipercow_root(root)

# More thinking to do on what resources should be for a retry
weshinsley marked this conversation as resolved.
Show resolved Hide resolved

if (is.null(resources)) {
resources <- hipercow_resources()
}
resources <- hipercow_resources_validate(resources, names(root$config), root)

id_real <- follow_retry_map(id, root)
status <- task_status(id_real, follow = FALSE, root)
err <- !(status %in% c("success", "failure", "cancelled"))
Expand All @@ -46,7 +54,7 @@ task_retry <- function(id, submit = NULL, root = NULL) {

update_retry_map(id_new, id_real, id_base, root)

task_submit_maybe(id_new, submit, root, rlang::current_env())
task_submit_maybe(id_new, submit, resources, root, rlang::current_env())

id_new
}
Expand Down
55 changes: 54 additions & 1 deletion R/util.R
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,57 @@ duration_to_minutes <- function(period, name = "testing") {
index <- index + 1
}
minutes
}
}

format_datetime <- function(year, month, day, hour, minute, second) {
format(to_POSIXct(
sprintf("%s-%s-%s %s:%s:%s", year, month, day, hour, minute, second)),
"%Y-%m-%d %H:%M:%S")
}

to_POSIXct <- function(s) {
as.POSIXct(s, format = "%Y-%m-%d %H:%M:%S")
}


special_time <- function(name, now = Sys.time()) {
break_up <- function(dt) {
list(year = as.integer(format(dt, "%Y")),
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
month = as.integer(format(dt, "%m")),
day = as.integer(format(dt, "%d")),
weekday = format(dt, "%a"),
hour = as.integer(format(dt, "%H")),
minute = as.integer(format(dt, "%M")),
second = as.integer(format(dt, "%S")))
}

dt <- break_up(now)

if (name == "tonight") { # If between 7pm and 3am, run. Otherwise wait for 7pm
if ((dt$hour < 19) && (dt$hour >= 3)) {
dt$hour <- 19
dt$minute <- 0
dt$second <- 0
}

} else if (name == "midnight") { # Will allow up to 3am again/
if (dt$hour >= 3) {
date <- as.Date(now) + 1
dt <- break_up(as.POSIXlt(date))
}

} else if (name == "weekend") {
date <- as.Date(now)
days <- c("Sat", "Sun", "Mon", "Tue", "Wed", "Thu", "Fri")
current_day <- which(days == dt$weekday)
if (current_day >= 3) { # We'll allow launching on Sat/Sun
date <- date + (8 - current_day)
dt <- break_up(as.POSIXlt(date))
}
} else {
cli::cli_abort("Unrecognised special time {name}")
}

to_POSIXct(format_datetime(dt$year, dt$month, dt$day,
dt$hour, dt$minute, dt$second))
}
2 changes: 1 addition & 1 deletion drivers/windows/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: hipercow.windows
Title: DIDE HPC Support for Windows
Version: 0.2.13
Version: 0.2.14
Authors@R: c(person("Rich", "FitzJohn", role = c("aut", "cre"),
email = "[email protected]"),
person("Wes", "Hinsley", role = "aut"),
Expand Down
4 changes: 2 additions & 2 deletions drivers/windows/R/driver.R
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ hipercow_driver_windows <- function() {
}


windows_submit <- function(id, config, path_root) {
windows_submit <- function(id, resources, config, path_root) {
path_batch <- write_batch_task_run(id, config, path_root)

path_batch_dat <- prepare_path(path_batch, config$shares)
path_batch_unc <- windows_path_slashes(
file.path(path_batch_dat$path_remote, path_batch_dat$rel))

client <- get_web_client()
dide_id <- client$submit(path_batch_unc, id, config$template)
dide_id <- client$submit(path_batch_unc, id, resources)
weshinsley marked this conversation as resolved.
Show resolved Hide resolved
path_dide_id <- file.path(dirname(path_batch), DIDE_ID)
writeLines(dide_id, path_dide_id)
}
Expand Down
1 change: 1 addition & 0 deletions drivers/windows/R/resource.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ windows_cluster_info <- function(config, path_root) {
max_cores = 32,
max_ram = 512,
queues = c("AllNodes", "Training"),
default_queue = "AllNodes",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, have the first element of queues be default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - I think I maybe prefer saying explicitly default_queue than queues[1] but not strongly so...

nodes = sprintf("wpia-%003d", (1:70)[-c(41, 42, 49, 50)])
)
class(info) <- "windows_cluster_info"
Expand Down
Loading