Skip to content

Commit

Permalink
collate now supports minimal dimension + added more support in sample…
Browse files Browse the repository at this point in the history
…s cacher + added interface validation utility (#349)

* multiprocessing fix and helper func

* path utilities and also moshikos change to solve hang on tests

* PR comments implemented

* path utils

* changed default deepdiff behavior to ignore nans in comparison, added keys() items() and values() to our NDict, and tried to highlight more the faulting op in pipeline ops error

* solved static code analysis raised issues

* removed unreachable code in paths.py

* * Added "remove_extension" to path utils
* Changed default deepdiff behavior to ignore nans in comparison,
* Added keys() items() and values() to our NDict (until now it returned empty iterables for those which is incorrect)
* Tried to highlight more the faulting op in pipeline ops error

* fixed a bug in head_1D_classifier

* added a lightweight mode of DatasetDefault that doesn't hold any sample_ids. fixed a typo in samplers.py and added a describe method to NDict

* fixing statically detected issues

* added simple function caching utility

* lite weight dataset default

* fixed static checkers

* fixed static code analysis related stuff

* code cleanup

* removed comments

* implemented PR comments

* added hints and better error messages for common mistakes when providing pipeline ops list

* linters etc.

* activation checkpointing

* ...

* activation checkpointing

* removed unneeded files

* added ability to ignore kwargs in function string descriptor building logic, and solved an issue in run_multiprocessed in verbose=0 case

* ...

* added support for maxtasksperchild in run_multiprocessed

* static code analysis based fixes

* ...

* multiprocessing related improvements

* added a utility to get available cpu cores num

* added simple but useful helper function to add prefix to file basename

* static code fixes

* fixed info message on num_available_cores

* num_available_cores works correctly now in distributed LSF/CCC setting as well now

* static code checkers fixes

* ...

* added a common op for replacing entities

* static code checkers

* ...

* removed too user specific dir paths from hash input string

* address tuple in colate

* shared memory utility helping to speed up processing significantly in cases that enough RAM is available

* typo fix

* added a check for available total memory

* ...

* ...

* ...

* ...

* ...

* storing with full path

* storing with full path

* storing with full path

* collate now supports minimal dimension + added more support in samples cacher

* collate now supports minimal dimension + added more support in samples cacher

* PR comments

* ...

* added unit tests :)

* unit tests

* unit tests

* ...

* interface validator

* PR comments

* PR comments

* fixing tests

* fixing tests

---------

Co-authored-by: Yoel Shoshan <[email protected]>
Co-authored-by: Moshiko Raboh <[email protected]>
Co-authored-by: Michal Ozery-Flato <[email protected]>
Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
5 people authored May 16, 2024
1 parent 967e156 commit d0a7250
Show file tree
Hide file tree
Showing 12 changed files with 460 additions and 23 deletions.
31 changes: 24 additions & 7 deletions fuse/data/datasets/caching/object_caching_handlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Any
import numpy as np
from fuse.utils.ndict import NDict
import torch
Expand Down Expand Up @@ -51,14 +51,31 @@ def _object_requires_hdf5_recurse(curr: NDict, str_base: str = "") -> List[str]:
# return None


def _object_requires_hdf5_single(obj, minimal_ndarray_size=100): # type: ignore
ans = isinstance(obj, np.ndarray) and (obj.size > minimal_ndarray_size)
_error_msg_torch_tensors = "You need to cast to numpy ndarray in the dynamic pipeline as it takes a lot of time pickling torch.Tensor"


def _object_requires_hdf5_single(obj, minimal_ndarray_size=100) -> bool: # type: ignore
if _valid_ndarray(obj, minimal_ndarray_size):
return True

if isinstance(obj, torch.Tensor):
raise Exception(
"You need to cast to tensor in the dynamic pipeline as it takes a lot of time pickling torch.Tensor"
)
raise Exception(_error_msg_torch_tensors)

if isinstance(obj, (list, tuple)):
if any(
[_valid_ndarray(x, minimal_ndarray_size) for x in obj]
): # _valid_ndarray(obj[0], minimal_ndarray_size):
assert all(
[isinstance(x, np.ndarray) for x in obj]
), f"first element in {type(obj)} is a numpy array but not all of the rest are! This is not supported."
return True
if any([torch.is_tensor(x) for x in obj]):
raise Exception(_error_msg_torch_tensors)

# if ans:
# print(f'found hfd5 requiring object! shape={obj.shape}, size={obj.size}')
return ans
return False


def _valid_ndarray(obj: Any, minimal_ndarray_size: int) -> bool:
return isinstance(obj, np.ndarray) and (obj.size > minimal_ndarray_size)
65 changes: 60 additions & 5 deletions fuse/data/datasets/caching/samples_cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Created on June 30, 2021
"""
from functools import partial
from typing import Hashable, List, Optional, Sequence, Union, Callable, Any, Tuple
from typing import Hashable, List, Optional, Sequence, Union, Callable, Any, Tuple, Dict

from fuse.data.pipelines.pipeline_default import PipelineDefault
from collections import OrderedDict
Expand Down Expand Up @@ -43,6 +43,7 @@
from fuse.data.datasets.sample_caching_audit import SampleCachingAudit
from fuse.data.utils.sample import get_initial_sample_id, set_initial_sample_id
from warnings import warn
import numpy as np


class SamplesCacher:
Expand Down Expand Up @@ -311,14 +312,18 @@ def load_sample(
return sample_from_cache

def _load_sample_using_pipeline(
self, sample_id: Hashable, keys: Optional[Sequence[str]] = None
self,
sample_id: Hashable,
keys: Optional[Sequence[str]] = None, # FIXME: noticed that keys is ignored !
) -> NDict:
sample_dict = create_initial_sample(sample_id)
result_sample = self._pipeline(sample_dict)
return result_sample

def _load_sample_from_cache(
self, sample_id: Hashable, keys: Optional[Sequence[str]] = None
self,
sample_id: Hashable,
keys: Optional[Sequence[str]] = None, # FIXME: noticed that keys is ignored !
) -> NDict:
"""
TODO: add comments
Expand All @@ -332,6 +337,27 @@ def _load_sample_from_cache(
loaded_sample = NDict(load_pickle(extension_less + ".pkl.gz"))
if os.path.isfile(extension_less + ".hdf5"):
loaded_sample_hdf5_part = load_hdf5(extension_less + ".hdf5")
stored_ndarrays_list = [
k
for k in loaded_sample_hdf5_part.keys()
if ("@RESERVED_LIST@" in k) or ("@RESERVED_TUPLE@" in k)
] # in these cases the ndarrays were originally stored as a list or tuple of ndarrays
for seq_key in stored_ndarrays_list:
seq_key_clean = seq_key.replace("@RESERVED_LIST@", "").replace(
"@RESERVED_TUPLE@", ""
)
curr_sequence_length = loaded_sample_hdf5_part.pop(seq_key)[0]
curr_seq = []
for i in range(curr_sequence_length):
curr_seq.append(
loaded_sample_hdf5_part.pop(
seq_key_clean + f"@RESERVED_ELEM@{i}"
)
)
if "@RESERVED_TUPLE@" in seq_key:
curr_seq = tuple(curr_seq)
loaded_sample_hdf5_part[seq_key_clean] = curr_seq

loaded_sample.merge(loaded_sample_hdf5_part)
return loaded_sample

Expand Down Expand Up @@ -399,10 +425,17 @@ def _cache(self, orig_sample_id: Any) -> Any:
hdf5_filename = os.path.join(
write_dir, output_sample_hash + ".hdf5"
)
save_hdf5_safe(hdf5_filename, **requiring_hdf5_dict)

requiring_hdf5_dict_final = NDict()
for k, obj in requiring_hdf5_dict.items():
requiring_hdf5_dict_final.update(
_convert_to_sequence_for_hdf5_if_needed(k, obj)
)

save_hdf5_safe(hdf5_filename, **requiring_hdf5_dict_final)

# remove all hdf5 entries from the sample_dict that will be pickled
for k in requiring_hdf5_dict:
for k in requiring_hdf5_dict.keys():
_ = curr_sample.pop(k)

save_pickle_safe(
Expand All @@ -418,6 +451,28 @@ def _cache(self, orig_sample_id: Any) -> Any:
return output_info


def _convert_to_sequence_for_hdf5_if_needed(key: str, data: Any) -> Dict:
ans = {}
if isinstance(data, (list, tuple)):
if isinstance(data, list):
ans[key + "@RESERVED_LIST@"] = np.array(
[len(data)]
) # to mark that this case is a list, and its length
elif isinstance(data, tuple):
ans[key + "@RESERVED_TUPLE@"] = np.array(
[len(data)]
) # to mark that this case is a tuple, and its length
else:
assert False # should not get here

for i, elem in enumerate(data):
ans[key + f"@RESERVED_ELEM@{i}"] = elem
else:
ans = {key: data}

return ans


def _get_available_write_location(
cache_dirs: List[str], max_allowed_used_space: Optional[float] = None
) -> str:
Expand Down
2 changes: 1 addition & 1 deletion fuse/data/datasets/dataset_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
- An integer that describes only the size of the dataset. This is useful in massive datasets
(for example 100M samples). In such case, multiple functionalities will not be supported, mainly -
cacher, allow_uncached_sample_morphing and get_all_sample_ids
- None. In this case, the dataset will not deal with sample ids. it is the user's respobsibility to handle
- None. In this case, the dataset will not deal with sample ids. it is the user's responsibility to handle
iterations w.r.t the length of the dataset, as well as the index passed to __getitem__
this is useful for massive datasets, but when the sample ids are not expected to be running integets from 0 to a given length.
Expand Down
2 changes: 1 addition & 1 deletion fuse/data/datasets/sample_caching_audit.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ def audit(self, cached_sample: Any, fresh_sample: Any) -> None:
"This is perfectly fine to do, just make sure you reset your cache after such change.\n"
"Gladly, the Audit feature caught this stale cache state! :)\n"
f"sample id in which this staleness was caught: {get_sample_id(fresh_sample)}\n"
'NOTE: if small changes between the saved cached and the live-loaded/processed sample are ok for your use case, you can set a tolerance epsilon like this: audit_diff_kwargs={"math_epsilon":1e-9}'"""
'NOTE: if small changes between the saved cached and the live-loaded/processed sample are ok for your use case, you can set a tolerance epsilon like this: audit_diff_kwargs=dict(math_epsilon=1e-9)'"""
raise Exception(msg + f"diff = {diff}" + msg)
4 changes: 3 additions & 1 deletion fuse/data/ops/caching_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ def value_to_string(val: Any, warn_on_types: Optional[Sequence] = None) -> str:


def convert_func_call_into_kwargs_only(
func: Callable, *args: list, **kwargs: dict
func: Callable,
*args: list,
**kwargs: dict,
) -> dict:
"""
considers positional and kwargs (including their default values !)
Expand Down
6 changes: 3 additions & 3 deletions fuse/data/pipelines/pipeline_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from fuse.utils.misc.context import DummyContext
from fuse.utils.ndict import NDict
from fuse.utils.cpu_profiling.timer import Timer

import os
import copy
import threading


class PipelineDefault(OpReversibleBase):
Expand Down Expand Up @@ -137,8 +138,7 @@ def __call__(
for sub_op_id, (op, op_kwargs) in zip(self._op_ids, self._ops_and_kwargs):
if self._verbose:
context = Timer(
f"Pipeline {self._name}: op {type(op).__name__}, op_id {sub_op_id}",
self._verbose,
f"PID={os.getpid()} thread={threading.get_ident()} Pipeline {self._name}: op {type(op).__name__}, op_id {sub_op_id}",
)
else:
context = DummyContext()
Expand Down
26 changes: 22 additions & 4 deletions fuse/data/utils/collates.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Created on June 30, 2021
"""
from typing import Any, Callable, Dict, List, Sequence, Optional
from typing import Any, Callable, Dict, List, Sequence, Optional, Tuple

import numpy as np
import torch
Expand Down Expand Up @@ -134,7 +134,7 @@ def _batch_dispatch(
batch_dict[key] = self._special_handlers_keys[key](collected_values)
elif isinstance(
collected_values[0],
(torch.Tensor, np.ndarray, float, int, str, bytes), # , tuple),
(torch.Tensor, np.ndarray, float, int, str, bytes),
):
# batch with default PyTorch implementation
batch_dict[key] = default_collate(collected_values)
Expand All @@ -150,12 +150,18 @@ def just_collect_to_list(values: List[Any]) -> List[Any]:

@staticmethod
def pad_all_tensors_to_same_size(
values: List[torch.Tensor], pad_val: float = 0.0
values: List[torch.Tensor],
pad_val: float = 0.0,
min_size_per_dim: Optional[Tuple] = None,
) -> torch.Tensor:
"""
pad tensors and create a batch - the shape will be the max size per dim
values: list of tensor - all should have the same number of dimensions
pad_val: constant value for padding
min_size_per_dim: defines, per dimension, the minimal size in the post-collated tensor (excluding the batch dimension, which you shouldn't provide)
this can be useful to prevent OOM due to memory fragmentation
for example, you can use min_size_per_dim = (1000, -1) to make sure that the final collated tensor first dimension is at least 1000
:return: torch.stack of padded tensors
"""

Expand All @@ -164,6 +170,7 @@ def pad_all_tensors_to_same_size(
values[0], torch.Tensor
), f"Expecting just tensors, got {type(values[0])}"
num_dims = len(values[0].shape)

for value in values:
assert isinstance(
value, torch.Tensor
Expand All @@ -172,9 +179,20 @@ def pad_all_tensors_to_same_size(
len(value.shape) == num_dims
), f"Expecting all tensors to have the same dim size, got {len(value.shape)} and {num_dims}"

# get max per dim
# get max per dim - this is the ovserved actual max from the batch
max_per_dim = np.amax(np.stack([value.shape for value in values]), axis=0)

if min_size_per_dim is not None:
assert isinstance(min_size_per_dim, tuple)
assert len(min_size_per_dim) == len(max_per_dim)
assert all(
[(x > 0) or (x == -1) for x in min_size_per_dim]
), "allowed values for elements in min_size_per_dim are only positive integer or -1"
max_per_dim = [
max(actual, minimal_requested) if minimal_requested != -1 else actual
for (actual, minimal_requested) in zip(max_per_dim, min_size_per_dim)
]

# pad
def _pad_size(value: torch.Tensor, dim: int) -> List[int]:
assert max_per_dim[dim] >= value.shape[dim]
Expand Down
9 changes: 9 additions & 0 deletions fuse/data/utils/tests/test_collates.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ def test_pad_all_tensors_to_same_size_bs_3(self) -> None:
values = CollateDefault.pad_all_tensors_to_same_size([a, b, c])
self.assertListEqual(list(values.shape), [3, 3, 3, 3])

def test_pad_all_tensors_to_same_size_with_min_size_per_dim(self) -> None:
a = torch.ones((1, 2, 3))
b = torch.ones((3, 2, 1))
c = torch.ones((1, 3, 2))
values = CollateDefault.pad_all_tensors_to_same_size(
[a, b, c], min_size_per_dim=(100, 100, -1)
)
self.assertListEqual(list(values.shape), [3, 100, 100, 3])


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion fuse/utils/file_io/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ def get_valid_filename(s: str) -> str:
modifies an input string into a string that is valid as a filename in linux
"""
s = str(s).strip().replace(" ", "_")
return re.sub(r"(?u)[^-\w.]", "@", s)
return re.sub(r"[\\:\"*?<>|]", "@", s)
61 changes: 61 additions & 0 deletions fuse/utils/file_io/tests/test_path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
(C) Copyright 2021 IBM Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import unittest

from fuse.utils.rand.seed import Seed
from fuse.utils.file_io.file_io import save_hdf5_safe, load_hdf5
import numpy as np
import tempfile
import os
from fuse.utils.file_io import path


class TestPath(unittest.TestCase):
"""
Test path.py
"""

def setUp(self) -> None:
pass

def test_path_1(self) -> None:

ans = path.add_base_prefix("/a/b/c/de/fg/banana.phone", "hohoho@")
self.assertEqual(ans, "/a/b/c/de/fg/[email protected]")

ans = path.change_extension("/a/b/c/de/fg/123.txt", "7zip")
self.assertEqual(ans, "/a/b/c/de/fg/123.7zip")

ans = path.change_extension("/a/b/c/de/fg/123.456.txt", "7zip")
self.assertEqual(ans, "/a/b/c/de/fg/123.456.7zip")

ans = path.get_extension("/a/b/c/de/fg/123.456.7zip")
self.assertEqual(ans, ".7zip")

ans = path.remove_extension("/a/b/c/de/fg/123.456.7zip")
self.assertEqual(ans, "/a/b/c/de/fg/123.456")

ans = path.get_valid_filename("test 1 2 3 he^^llo")
self.assertEqual(ans, "test_1_2_3_he^^llo")

def tearDown(self) -> None:
pass


if __name__ == "__main__":
unittest.main()
Loading

0 comments on commit d0a7250

Please sign in to comment.