Skip to content

Commit

Permalink
Store metadata only once for array jobs (#1042)
Browse files Browse the repository at this point in the history
* store metadata only once for array jobs

* remove todo comments

* fix typo

* add troubleshooting comment to clustertools/readme

* use metadata instead of meta_data everywhere

* update changelog
  • Loading branch information
philippotto authored Apr 19, 2024
1 parent f902584 commit be64527
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 23 deletions.
1 change: 1 addition & 0 deletions cluster_tools/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ For upgrade instructions, please check the respective *Breaking Changes* section
### Added

### Changed
- Array jobs are spawned faster now, because common meta data is not serialized for each subjob. The performance improvement is especially big, when custom logging code is configured for array jobs. [#1042](https://github.com/scalableminds/webknossos-libs/pull/1042)
- Updated ruff to v0.4.0 [1047](https://github.com/scalableminds/webknossos-libs/pull/1047)

### Fixed
Expand Down
1 change: 1 addition & 0 deletions cluster_tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ If you would like to configure these limits independently, you can do so by sett
## Dev Setup

```
# See ./dockered-slurm/README.md for troubleshooting
cd dockered-slurm
docker-compose up -d
docker exec -it slurmctld bash
Expand Down
20 changes: 8 additions & 12 deletions cluster_tools/cluster_tools/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,14 @@ def worker(
custom_main_path = get_custom_main_path(workerid, executor)
with open(input_file_name, "rb") as f:
unpickled_tuple = pickling.load(f, custom_main_path)
if len(unpickled_tuple) == 4:
fun, args, kwargs, meta_data = unpickled_tuple
output_pickle_path = executor.format_outfile_name(
cfut_dir, workerid_with_idx
)
else:
assert len(unpickled_tuple) == 5, "Unexpected encoding"
fun, args, kwargs, meta_data, output_pickle_path = unpickled_tuple

if isinstance(fun, str):
with open(fun, "rb") as function_file:
fun = pickling.load(function_file, custom_main_path)
assert len(unpickled_tuple) == 4, "Unexpected encoding"
fun_and_metadata, args, kwargs, output_pickle_path = unpickled_tuple

if isinstance(fun_and_metadata, str):
with open(fun_and_metadata, "rb") as function_file:
fun, meta_data = pickling.load(function_file, custom_main_path)
else:
fun, meta_data = fun_and_metadata

setup_logging(meta_data, executor, cfut_dir)

Expand Down
30 changes: 19 additions & 11 deletions cluster_tools/cluster_tools/schedulers/cluster_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ def __init__(
partial(_handle_kill_through_weakref, ref(self), existing_sigint_handler),
)

self.meta_data = {}
self.metadata = {}
assert not (
"logging_config" in kwargs and "logging_setup_fn" in kwargs
), "Specify either logging_config OR logging_setup_fn but not both at once"
if "logging_config" in kwargs:
self.meta_data["logging_config"] = kwargs["logging_config"]
self.metadata["logging_config"] = kwargs["logging_config"]
if "logging_setup_fn" in kwargs:
self.meta_data["logging_setup_fn"] = kwargs["logging_setup_fn"]
self.metadata["logging_setup_fn"] = kwargs["logging_setup_fn"]

@classmethod
def as_completed(cls, futs: List["Future[_T]"]) -> Iterator["Future[_T]"]:
Expand Down Expand Up @@ -412,7 +412,7 @@ def submit( # type: ignore[override]

# Start the job.
serialized_function_info = pickling.dumps(
(__fn, args, kwargs, self.meta_data, output_pickle_path)
((__fn, self.metadata), args, kwargs, output_pickle_path)
)
with open(self.format_infile_name(self.cfut_dir, workerid), "wb") as f:
f.write(serialized_function_info)
Expand Down Expand Up @@ -451,9 +451,10 @@ def get_workerid_with_index(cls, workerid: str, index: Union[int, str]) -> str:
def get_jobid_with_index(cls, jobid: Union[str, int], index: int) -> str:
return f"{jobid}_{index}"

def get_function_pickle_path(self, workerid: str) -> str:
def get_function_and_metadata_pickle_path(self, workerid: str) -> str:
return self.format_infile_name(
self.cfut_dir, self.get_workerid_with_index(workerid, "function")
self.cfut_dir,
self.get_workerid_with_index(workerid, "function-and-metadata"),
)

@staticmethod
Expand Down Expand Up @@ -484,10 +485,12 @@ def map_to_futures(
futs_with_output_paths = []
workerid = random_string()

pickled_function_path = self.get_function_pickle_path(workerid)
self.files_to_clean_up.append(pickled_function_path)
with open(pickled_function_path, "wb") as file:
pickling.dump(fn, file)
pickled_function_and_metadata_path = self.get_function_and_metadata_pickle_path(
workerid
)
self.files_to_clean_up.append(pickled_function_and_metadata_path)
with open(pickled_function_and_metadata_path, "wb") as file:
pickling.dump((fn, self.metadata), file)
self.store_main_path_to_meta_file(workerid)

for index, arg in enumerate(args):
Expand All @@ -511,7 +514,12 @@ def map_to_futures(
os.unlink(preliminary_output_pickle_path)

serialized_function_info = pickling.dumps(
(pickled_function_path, [arg], {}, self.meta_data, output_pickle_path)
(
pickled_function_and_metadata_path,
[arg],
{},
output_pickle_path,
)
)
infile_name = self.format_infile_name(self.cfut_dir, workerid_with_index)

Expand Down

0 comments on commit be64527

Please sign in to comment.