Skip to content

Commit

Permalink
Merge branch 'master' into pq_ready_tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
JinZhou5042 authored Oct 15, 2024
2 parents 8ecde57 + 96bbc71 commit 7c2f7bf
Show file tree
Hide file tree
Showing 30 changed files with 448 additions and 113 deletions.
5 changes: 3 additions & 2 deletions batch_job/src/vine_factory.c
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,9 @@ static void show_help(const char *cmd)
printf(" %-30s Enable debugging for this subsystem.\n", "-d,--debug=<subsystem>");
printf(" %-30s Send debugging to this file.\n", "-o,--debug-file=<file>");
printf(" %-30s Specify the size of the debug file.\n", "-O,--debug-file-size=<mb>");
printf(" %-30s Workers should use SSL to connect to managers. (Not needed if project names.)", "--ssl");
printf(" %-30s Workers should use SSL to connect to managers. (Not needed if project names.)\n", "--ssl");
printf(" %-30s SNI domain name if different from manager hostname. Implies --ssl.\n", "--tls-sni=<domain name>");
printf(" %-30s Set a custom factory name.\n", "--factory-name");

printf(" %-30s Show the version string.\n", "-v,--version");
printf(" %-30s Show this screen.\n", "-h,--help");
Expand Down Expand Up @@ -1198,7 +1199,7 @@ static void show_help(const char *cmd)
printf(" %-30s Alternate binary instead of vine_worker.\n", "--worker-binary=<file>");
printf(" %-30s Wrap factory with this command prefix.\n","--wrapper");
printf(" %-30s Add this input file needed by the wrapper.\n","--wrapper-input");
printf(" %-30s Run each worker inside this python environment.\n","--python-env=<file.tar.gz>");
printf(" %-30s Run each worker inside this poncho environment.\n","--poncho-env=<file.tar.gz>");

printf("\nOptions specific to batch systems:\n");
printf(" %-30s Generic batch system options.\n", "-B,--batch-options=<options>");
Expand Down
4 changes: 2 additions & 2 deletions doc/manuals/poncho/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ This will create `package.json` with contents similar to this:
Then to create a complete package from the specification:

```
poncho_package_create package.json
poncho_package_create package.json package.tar.gz
```

Once created, this package can be moved to another machine for execution.
Then, to run a program in the environment:

```
poncho_package_run -e package.tar.gz -- example.py
poncho_package_run -e package.tar.gz -- python example.py
```

## Specification File
Expand Down
8 changes: 5 additions & 3 deletions doc/manuals/taskvine/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1928,8 +1928,8 @@ Consider now that the task requires 1 cores, 6GB of memory, and 27 GB of disk:

!!! note
If you want TaskVine to exactly allocate the resources you have
specified, use the `proportional-resources` and `proportional-whole-tasks`
parameters as shown [here](#tuning-specialized-execution-parameters). In
specified, use `m.disable_proportional_resources()` (see also `proportional-whole-tasks`
[here](#tuning-specialized-execution-parameters). In
general, however, we have found that using proportions nicely adapts to the
underlying available resources, and leads to very few resource exhaustion
failures while still using worker resources efficiently.
Expand Down Expand Up @@ -2535,10 +2535,10 @@ change.
| min-transfer-timeout | Set the minimum number of seconds to wait for files to be transferred to or from a worker. | 10 |
| monitor-interval | Maximum number of seconds between resource monitor measurements. If less than 1, use default. | 5 |
| prefer-dispatch | If 1, try to dispatch tasks even if there are retrieved tasks ready to be reportedas done. | 0 |
| proportional-resources | If set to 0, do not assign resources proportionally to tasks. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| proportional-whole-tasks | Round up resource proportions such that only an integer number of tasks could be fit in the worker. The default is to use proportions. (See [task resources.](#task-resources) | 1 |
| ramp-down-heuristic | If set to 1 and there are more workers than tasks waiting, then tasks are allocated all the free resources of a worker large enough to run them. If monitoring watchdog is not enabled, then this heuristic has no effect. | 0 |
| resource-submit-multiplier | Assume that workers have `resource x resources-submit-multiplier` available.<br> This overcommits resources at the worker, causing tasks to be sent to workers that cannot be immediately executed.<br>The extra tasks wait at the worker until resources become available. | 1 |
| sandbox-grow-factor | When task disk sandboxes are exhausted, increase the allocation using their measured valued times this factor. Minimum is 1.1. | 2 |
| short-timeout | Set the minimum timeout in seconds when sending a brief message to a single worker. | 5 |
| temp-replica-count | Number of temp file replicas created across workers | 0 |
| transfer-outlier-factor | Transfer that are this many times slower than the average will be terminated. | 10 |
Expand Down Expand Up @@ -2721,6 +2721,8 @@ The `compute` call above may receive the following keyword arguments:
| lazy\_transfer | Whether to bring each result back from the workers (False, default), or keep transient results at workers (True) |
| resources | A dictionary to specify [maximum resources](#task-resources), e.g. `{"cores": 1, "memory": 2000"}` |
| resources\_mode | [Automatic resource management](#automatic-resource-management) to use, e.g., "fixed", "max", or "max throughput"|
| task\_mode | Mode to execute individual tasks, such as [function calls](#serverless-computing). to use, e.g., "tasks", or "function-calls"|



### Further Information
Expand Down
5 changes: 5 additions & 0 deletions dttools/src/category.c
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,11 @@ const struct rmsummary *category_task_min_resources(struct category *c, struct r
/* but don't go below the minimum defined for the category. */
rmsummary_merge_max(internal, c->min_allocation);

/* nor below the observed sandboxes if not in an auto mode */
if (c->allocation_mode == CATEGORY_ALLOCATION_MODE_FIXED && user && user->disk < 0) {
internal->disk = MAX(internal->disk, c->min_vine_sandbox);
}

return internal;
}

Expand Down
3 changes: 3 additions & 0 deletions dttools/src/category.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ struct category {
/* stats for taskvine */
struct vine_stats *vine_stats;

/* Max sandbox disk space observed, in MB. This is the minimum sandbox size needed if nothing else is known about the task.*/
int64_t min_vine_sandbox;

/* variables for makeflow */
/* Mappings between variable names defined in the makeflow file and their values. */
struct hash_table *mf_variables;
Expand Down
7 changes: 7 additions & 0 deletions dttools/src/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ See the file COPYING for details.
#define TERABYTE TERA
#define PETABYTE PETA

#define BYTES_TO_STORAGE_UNIT(x, unit) (((double) x) / unit)
#define BYTES_TO_KILOBYTES(x) BYTES_TO_STORAGE_UNIT(x, KILOBYTE)
#define BYTES_TO_MEGABYTES(x) BYTES_TO_STORAGE_UNIT(x, MEGABYTE)
#define BYTES_TO_GIGABYTES(x) BYTES_TO_STORAGE_UNIT(x, GIGABYTE)
#define BYTES_TO_TERABYTES(x) BYTES_TO_STORAGE_UNIT(x, TERABYTE)
#define BYTES_TO_PETABYTES(x) BYTES_TO_STORAGE_UNIT(x, PETABYTE)

#define USECOND 1000000

#endif
10 changes: 7 additions & 3 deletions dttools/src/path_disk_size_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ struct DIR_with_name {
char *name;
};

int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files)
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths)
{

struct stat info;
int result = stat(path, &info);
if (result == 0) {
if (S_ISDIR(info.st_mode)) {
struct path_disk_size_info *state = NULL;
result = path_disk_size_info_get_r(path, -1, &state);
result = path_disk_size_info_get_r(path, -1, &state, exclude_paths);

*measured_size = state->last_byte_size_complete;
*number_of_files = state->last_file_count_complete;
Expand All @@ -46,7 +46,7 @@ int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *n
return result;
}

int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state)
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths)
{
int64_t start_time = time(0);
int result = 0;
Expand Down Expand Up @@ -115,6 +115,10 @@ int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_di
snprintf(composed_path, PATH_MAX, "%s/%s", tail->name, entry->d_name);
}

if (exclude_paths && hash_table_lookup(exclude_paths, composed_path)) {
continue;
}

if (lstat(composed_path, &file_info) < 0) {
if (errno == ENOENT) {
/* our DIR structure is stale, and a file went away. We simply do nothing. */
Expand Down
8 changes: 6 additions & 2 deletions dttools/src/path_disk_size_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ See the file COPYING for details.

#include "int_sizes.h"
#include "list.h"
#include "hash_table.h"

struct path_disk_size_info {
int complete_measurement;
Expand All @@ -29,9 +30,10 @@ Query disk space on the given directory.
@param path Directory to be measured.
@param *measured_size A pointer to an integer that will be filled with the total space in bytes.
@param *number_of_files A pointer to an integer that will be filled with the total number of files, directories, and symbolic links.
@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files);
int path_disk_size_info_get(const char *path, int64_t *measured_size, int64_t *number_of_files, struct hash_table *exclude_paths);

/** Get a (perhaps partial) disk usage on path, but measure by max_secs at a time.
If *state is NULL, start a new measurement, otherwise continue from
Expand All @@ -40,9 +42,11 @@ When the function returns, if *state->complete_measurement is 1, then the measur
@param path Directory to be measured.
@param max_secs Maximum number of seconds to spend in the measurement.
@param *state State of the measurement.
@param exclude_paths Hash table with strings of paths that should not be measured. Values of the hash table are ignored.
@return zero on success, -1 if an error is encounterd (see errno).
*/
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state);
int path_disk_size_info_get_r(const char *path, int64_t max_secs, struct path_disk_size_info **state, struct hash_table *exclude_paths);


void path_disk_size_info_delete_state(struct path_disk_size_info *state);

Expand Down
4 changes: 2 additions & 2 deletions dttools/src/rmonitor_poll.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ int rmonitor_get_wd_usage(struct rmonitor_wdir_info *d, int max_time_for_measure
{
/* We need a pointer to a pointer, which it is not possible from a struct. Use a dummy variable. */
struct path_disk_size_info *state = d->state;
int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state);
int status = path_disk_size_info_get_r(d->path, max_time_for_measurement, &state, NULL);

d->state = state;

Expand Down Expand Up @@ -945,7 +945,7 @@ struct rmsummary *rmonitor_measure_host(char *path)
struct rmsummary *tr = rmsummary_create(-1);

if (path) {
path_disk_size_info_get(path, &total_disk, &file_count);
path_disk_size_info_get(path, &total_disk, &file_count, NULL);
tr->disk = ((double)total_disk) / ONE_MEGABYTE;
tr->total_files = file_count;
}
Expand Down
1 change: 1 addition & 0 deletions hi.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
q
39 changes: 29 additions & 10 deletions poncho/src/poncho/package_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ def dict_to_env(
return output


def _write_pinned_files(poncho_spec, env_dir):
if "pinned" in poncho_spec:
if "conda" in poncho_spec:
with open(os.path.join(env_dir, "env", "conda-meta", "pinned"), "w") as f:
for p, v in poncho_spec["pinned"]["conda"].items():
assert isinstance(v, str)
f.write(f"{p}={v}\n")


def pack_env_from_dict(
spec,
output,
Expand Down Expand Up @@ -210,11 +219,21 @@ def pack_env_from_dict(
http_data(spec, env_dir)

# create conda environment in temp directory
logger.info("creating environment directory...")
_run_conda_command(
env_dir,
needs_confirmation,
"create",
)

_write_pinned_files(spec, env_dir)

# update conda environment in temp directory from spec
logger.info("populating environment...")
_run_conda_command(
env_dir,
needs_confirmation,
"env create",
"env update",
"--file",
env_dir + "/conda_spec.yml",
)
Expand Down Expand Up @@ -265,15 +284,15 @@ def pack_env(

# else if spec is a file or from stdin
elif os.path.isfile(spec) or spec == "-":
f = open(spec, "r")
poncho_spec = json.load(f)
pack_env_from_dict(
poncho_spec,
output,
conda_executable,
download_micromamba,
ignore_editable_packages,
)
with open(spec, "r") as f:
poncho_spec = json.load(f)
pack_env_from_dict(
poncho_spec,
output,
conda_executable,
download_micromamba,
ignore_editable_packages,
)

# else pack from a conda environment name
# this thus assumes conda executable is in the current shell executable path
Expand Down
2 changes: 1 addition & 1 deletion poncho/src/poncho_package_create
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Create a packed environment from a spec, a conda environment name, or a conda directory.')
parser.add_argument('spec', help='Read in a spec file, a conda environment name, a conda directory, or - for stdin.')
parser.add_argument('output', help='Write output from conda-pack to the given file.')
parser.add_argument('--conda-executable', action='store_true', help='Path to conda executable to use. Default are, in this order: mamba, $CONDA_EXE, conda')
parser.add_argument('--conda-executable', action='store', help='Path to conda executable to use. Default are, in this order: mamba, $CONDA_EXE, conda')

parser.add_argument('--no-micromamba', action='store_true', help='Do not try no download micromamba if a conda executable is not found.')
parser.add_argument('--ignore-editable-packages', action='store_true', help='Skip checks for editable packages.')
Expand Down
18 changes: 17 additions & 1 deletion taskvine/src/manager/taskvine.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ typedef enum {
VINE_RESULT_FIXED_LOCATION_MISSING = 10 << 3, /**< The task failed because no worker could satisfy the fixed
location input file requirements. */
VINE_RESULT_CANCELLED = 11 << 3, /**< The task was cancelled by the caller. */
VINE_RESULT_LIBRARY_EXIT = 12 << 3 /**< Task is a library that has terminated. **/
VINE_RESULT_LIBRARY_EXIT = 12 << 3, /**< Task is a library that has terminated. **/
VINE_RESULT_SANDBOX_EXHAUSTION = 13 << 3 /**< The task used more disk than the allowed sandbox. **/
} vine_result_t;

/** Select how to allocate resources for similar tasks with @ref vine_set_category_mode */
Expand Down Expand Up @@ -1088,6 +1089,21 @@ int vine_enable_peer_transfers(struct vine_manager *m);
/** Disable taskvine peer transfers to be scheduled by the manager **/
int vine_disable_peer_transfers(struct vine_manager *m);

/** When enabled, resources to tasks in are assigned in proportion to the size
of the worker. If a resource is specified (e.g. with @ref vine_task_set_cores),
proportional resources never go below explicit specifications. This mode is most
useful when only some of the resources are explicitely specified, or
with automatic resource allocation. By default it is enabled.
@param m A manager object
**/
int vine_enable_proportional_resources(struct vine_manager *m);

/** Disable proportional resources. See @ref vine_enable_proportional_resources.
* Proportional resources are enabled by default.
@param m A manager object
**/
int vine_disable_proportional_resources(struct vine_manager *m);

/** Set the minimum task_id of future submitted tasks.
Further submitted tasks are guaranteed to have a task_id larger or equal to
minid. This function is useful to make task_ids consistent in a workflow that
Expand Down
14 changes: 14 additions & 0 deletions taskvine/src/manager/vine_file_replica_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Copyright (C) 2022- The University of Notre Dame
See the file COPYING for details.
*/

#include <math.h>

#include "vine_file_replica_table.h"
#include "set.h"
#include "vine_blocklist.h"
Expand All @@ -25,6 +27,12 @@ int vine_file_replica_table_insert(struct vine_manager *m, struct vine_worker_in
w->inuse_cache += replica->size;
hash_table_insert(w->current_files, cachename, replica);

double prev_available = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache + replica->size));
if (prev_available >= m->current_max_worker->disk) {
/* the current worker may have been the one with the maximum available space, so we update it. */
m->current_max_worker->disk = w->resources->disk.total - ceil(BYTES_TO_MEGABYTES(w->inuse_cache));
}

struct set *workers = hash_table_lookup(m->file_worker_table, cachename);
if (!workers) {
workers = set_create(4);
Expand All @@ -44,6 +52,12 @@ struct vine_file_replica *vine_file_replica_table_remove(struct vine_manager *m,
w->inuse_cache -= replica->size;
}

double available = w->resources->disk.total - BYTES_TO_MEGABYTES(w->inuse_cache);
if (available > m->current_max_worker->disk) {
/* the current worker has more space than we knew before for all workers, so we update it. */
m->current_max_worker->disk = available;
}

struct set *workers = hash_table_lookup(m->file_worker_table, cachename);

if (workers) {
Expand Down
Loading

0 comments on commit 7c2f7bf

Please sign in to comment.