Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataloader improvements #2806

Draft
wants to merge 39 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c2e89b1
dataloader changed initial
activesoull Mar 20, 2024
8bc1311
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 20, 2024
34d6b34
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 21, 2024
170e1e1
shuffling fix
activesoull Mar 21, 2024
09206a6
test changes
activesoull Mar 21, 2024
f6a69d1
linter changes
activesoull Mar 21, 2024
0ca7b91
added dicom and nifti support
activesoull Mar 22, 2024
55c7458
handle decode method data if data tensors are object dtype case
activesoull Mar 22, 2024
e685347
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 22, 2024
a940dff
added missing arguments
activesoull Mar 25, 2024
a6662c2
bumb libdeeplake to 0.0.110
activesoull Mar 25, 2024
060c39d
fix dataset slicing logic
activesoull Mar 26, 2024
3ae90ec
black v 24.3.0 fixes
activesoull Mar 26, 2024
99849fd
bumb libdeeplake to 0.0.111
activesoull Mar 26, 2024
f6818f3
bumb libdeeplake to 0.0.112
activesoull Mar 27, 2024
fe28310
Merge branch 'main' of github.com:activeloopai/deeplake into libdeepl…
activesoull Mar 27, 2024
3d9f8c6
tmp-ssh
activesoull Mar 28, 2024
129119d
revert
activesoull Mar 29, 2024
65dd060
windows fix
activesoull Mar 29, 2024
7e07ac0
debug
activesoull Mar 29, 2024
9b2932e
remove pin_memory_device
activesoull Apr 1, 2024
d4f6152
Merge branch 'main' into torch_dl
activesoull Apr 1, 2024
0d3da6a
bumb libdeeplake to 0.0.113
activesoull Apr 1, 2024
ab4df01
ssh
activesoull Apr 1, 2024
cde1beb
merge with main
activesoull Apr 1, 2024
a5406a1
Bump
activesoull Apr 1, 2024
90a9046
Merge branch 'fix-mypy' into torch_dl
khustup2 Apr 1, 2024
e1eab17
bumb libdeeplake to 0.0.115
activesoull Apr 2, 2024
1cbfbf1
set endpoint
activesoull Apr 2, 2024
4a53cf1
set endpoint
activesoull Apr 2, 2024
d9857ef
bumb libdeeplake to 0.0.116
activesoull Apr 2, 2024
e5f37a6
added MacOS environment variable exception
activesoull Apr 10, 2024
8ee120a
adjust with 3.9 changes
activesoull Apr 18, 2024
dac271c
Merge branch 'torch_dl' of github.com:activeloopai/deeplake into torc…
activesoull Apr 18, 2024
ea21124
set origin path too
activesoull Apr 24, 2024
0e567a4
Merge branch 'main' of github.com:activeloopai/deeplake into torch_dl
activesoull Apr 24, 2024
9cddcdb
save
activesoull Jun 4, 2024
d53f825
Merge branch 'main' of github.com:activeloopai/deeplake into torch_dl
activesoull Jul 11, 2024
aacfb5c
remove medical
activesoull Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions deeplake/core/dataset/indra_dataset_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,40 @@ def __init__(
except:
pass

def __getstate__(self) -> Dict[str, Any]:
keys = [
"path",
"_read_only",
"group_index",
"storage",
"_token",
"verbose",
"enabled_tensors",
"index"
]

state = {k: getattr(self, k) for k in keys}
return state

def __setstate__(self, state):
from indra import api # type: ignore

d: Dict[str, Any] = {}
self.storage = state["storage"]
d["indra_ds"] = api.load_from_storage(self.storage.core)
d["group_index"] = state["group_index"]
d["enabled_tensors"] = state["enabled_tensors"]
d["verbose"] = state["verbose"]
d["_token"] = state["_token"]
self.__dict__.update(d)
self._view_base = None
self._view_entry = None
self._read_only = state["_read_only"]
self._locked_out = False
self._query_string = None
index = state["index"]
self.indra_ds = self[list(index.values[0].value)].indra_ds

@property
def meta(self):
return DatasetMeta()
Expand Down Expand Up @@ -97,6 +131,10 @@ def commit_id(self) -> str:
def libdeeplake_dataset(self):
return self.indra_ds

@libdeeplake_dataset.setter
def libdeeplake_dataset(self, new_indra_ds):
self.indra_ds = new_indra_ds

def merge(self, *args, **kwargs):
raise InvalidOperationError(
"merge", "merge method cannot be called on a Dataset view."
Expand Down Expand Up @@ -188,23 +226,31 @@ def __getitem__(
)
for x in item
]
return IndraDatasetView(
ret = IndraDatasetView(
indra_ds=self.indra_ds,
enabled_tensors=enabled_tensors,
)
if hasattr(self, "_tql_query"):
ret._tql_query = self._tql_query
return ret
elif isinstance(item, tuple) and len(item) and isinstance(item[0], str):
ret = self
for x in item:
ret = self[x]
return ret
else:
return IndraDatasetView(
ret = IndraDatasetView(
indra_ds=self.indra_ds[item],
)
if hasattr(self, "_tql_query"):
ret._tql_query = self._tql_query
return ret
else:
raise InvalidKeyTypeError(item)

raise AttributeError("Dataset has no attribute - {item}")


def __getattr__(self, key):
try:
ret = self.__getitem__(key)
Expand Down
8 changes: 5 additions & 3 deletions deeplake/core/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ def __init__(
self.tensors = tensors
self.pad_tensors = pad_tensors
self.decode_method = decode_method
jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors(
self.dataset, tensors, verbose
)
(
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
) = check_tensors(self.dataset, tensors, verbose)
(
raw_tensors,
pil_compressed_tensors,
Expand Down
14 changes: 11 additions & 3 deletions deeplake/enterprise/convert_to_libdeeplake.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from deeplake.core.storage.azure import AzureProvider
from deeplake.util.remove_cache import get_base_storage
from deeplake.util.exceptions import EmptyTokenException
from deeplake.core.dataset.indra_dataset_view import IndraDatasetView

from deeplake.util.dataset import try_flushing # type: ignore
import importlib
Expand Down Expand Up @@ -65,6 +66,7 @@ def _get_indra_ds_from_azure_provider(
storage = IndraProvider(
path,
read_only=provider.read_only,
origin_path=provider.root,
token=token,
account_name=account_name,
account_key=account_key,
Expand Down Expand Up @@ -169,7 +171,7 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset):
token = (
hub2_dataset.client.get_token()
if (hub2_dataset.token is None or hub2_dataset._token == "")
and hub2_dataset.client
and hasattr(hub2_dataset, "client") and hub2_dataset.client
else hub2_dataset.token
)
if token is None or token == "":
Expand Down Expand Up @@ -248,5 +250,11 @@ def dataset_to_libdeeplake(hub2_dataset: Dataset):
if slice_ != slice(None):
if isinstance(slice_, tuple):
slice_ = list(slice_)
libdeeplake_dataset = libdeeplake_dataset[slice_]
return libdeeplake_dataset
from deeplake.core.index import Index
try:
idx = Index(libdeeplake_dataset.indexes)
except:
idx = Index(slice(0, len(libdeeplake_dataset)))
if isinstance(slice_, slice) or (list(slice_) != list(idx.values[0].value)):
libdeeplake_dataset = libdeeplake_dataset[slice_]
return libdeeplake_dataset
65 changes: 42 additions & 23 deletions deeplake/enterprise/dataloader.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import Callable, Dict, List, Optional, Union
import deeplake
from deeplake.enterprise.convert_to_libdeeplake import dataset_to_libdeeplake

from deeplake.enterprise.dummy_dataloader import DummyDataloader # type: ignore
from deeplake.util.scheduling import create_fetching_schedule, find_primary_tensor
from deeplake.core.seed import DeeplakeRandom
from deeplake.util.exceptions import EmptyTensorError, MacOSEnvironmentError
from deeplake.enterprise.util import (
handle_mode,
raise_indra_installation_error,
Expand All @@ -22,6 +23,8 @@
from deeplake.util.dataset import map_tensor_keys
from functools import partial
import importlib
import os
import sys

try:
from torch.utils.data.dataloader import DataLoader, _InfiniteConstantSampler
Expand Down Expand Up @@ -113,6 +116,7 @@ def __init__(
_ignore_errors=False,
_verbose=False,
_offset=None,
_pin_memory=False,
**kwargs,
):
import_indra_loader()
Expand All @@ -139,6 +143,7 @@ def __init__(
self._ignore_errors = _ignore_errors
self._verbose = _verbose
self._offset = _offset
self._pin_memory = _pin_memory
for k, v in kwargs.items():
setattr(self, k, v)

Expand Down Expand Up @@ -345,13 +350,16 @@ def shuffle(self, shuffle: bool = True, buffer_size: int = 2048):
all_vars = self.__dict__.copy()
all_vars["_shuffle"] = shuffle
all_vars["_buffer_size"] = buffer_size
if shuffle:
schedule = create_fetching_schedule(
self._orig_dataset, self._primary_tensor_name
)
if schedule is not None:
ds = self._orig_dataset # type: ignore
all_vars["_orig_dataset"] = ds[schedule]

# TODO check the view dataset shuffle
# if shuffle:
# schedule = create_fetching_schedule(
# self._orig_dataset, self._primary_tensor_name
# )
# if schedule is not None:
# ds = self._orig_dataset # type: ignore
# all_vars["_orig_dataset"] = ds[schedule]

all_vars["_dataloader"] = None
return self.__class__(**all_vars)

Expand Down Expand Up @@ -483,6 +491,7 @@ def pytorch(
return_index: bool = True,
decode_method: Optional[Dict[str, str]] = None,
persistent_workers: bool = False,
pin_memory: bool = False,
):
"""Creates a PyTorch Dataloader on top of the ``DeepLakeDataLoader`` from the Deep Lake dataset. During iteration, the data from all tensors will be streamed on-the-fly from the storage location.
Understanding the parameters below is critical for achieving fast streaming for your use-case
Expand All @@ -498,6 +507,7 @@ def pytorch(
distributed (bool): Used for DDP training. Distributes different sections of the dataset to different ranks. Defaults to ``False``.
return_index (bool): Used to idnetify where loader needs to retur sample index or not. Defaults to ``True``.
persistent_workers (bool): If ``True``, the data loader will not shutdown the worker processes after a dataset has been consumed once. Defaults to ``False``.
pin_memory (bool): If ``True``, the data loader will copy Tensors into device/CUDA pinned memory before returning them. Defaults to ``False``.
decode_method (Dict[str, str], Optional): A dictionary of decode methods for each tensor. Defaults to ``None``.


Expand Down Expand Up @@ -554,6 +564,7 @@ def pytorch(
all_vars["_mode"] = mode
all_vars["_persistent_workers"] = persistent_workers
all_vars["_dataloader"] = None
all_vars["_pin_memory"] = pin_memory
if distributed:
all_vars["_world_size"] = torch.distributed.get_world_size()
return self.__class__(**all_vars)
Expand Down Expand Up @@ -740,8 +751,7 @@ def __create_dummy_dataloader(

def __get_indra_dataloader(
self,
dataset,
indra_dataset,
deeplake_dataset,
tensors: Optional[List[str]] = None,
raw_tensors: Optional[List[str]] = None,
pil_compressed_tensors: Optional[List[str]] = None,
Expand Down Expand Up @@ -774,25 +784,25 @@ def __get_indra_dataloader(
json_tensors=json_tensors or [],
list_tensors=list_tensors or [],
)

loader_meta = LoaderMetaInfo(
context=self.multiprocessing_context,
distributed=self._distributed,
mode=self._mode,
upcast=self._mode == "pytorch"
and self.__is_upcast_needed(
dataset, tensors
deeplake_dataset, tensors
), # upcast to handle unsupported dtypes,
return_index=self._return_index,
verbose=self._verbose,
ignore_errors=self._ignore_errors,
prefetch_factor=self._prefetch_factor,
offset=self._offset,
primary_tensor=self._primary_tensor_name,
worker_init_fn=self.worker_init_fn,
pin_memory=self.pin_memory,
)

return INDRA_LOADER( # type: ignore [misc]
indra_dataset,
deeplake_dataset=deeplake_dataset,
batch_size=self._batch_size,
num_threads=num_threads,
shuffle=self._shuffle,
Expand Down Expand Up @@ -833,9 +843,11 @@ def __iter__(self):
dataset = self._orig_dataset
tensors = self._tensors or map_tensor_keys(dataset, None)

jpeg_png_compressed_tensors, json_tensors, list_tensors = check_tensors(
dataset, tensors
)
(
jpeg_png_compressed_tensors,
json_tensors,
list_tensors,
) = check_tensors(dataset, tensors)
(
raw_tensors,
pil_compressed_tensors,
Expand All @@ -855,6 +867,7 @@ def __iter__(self):
self._fill_sample_info_tensors(
dataset, sample_info_tensors, json_tensors, list_tensors
)

tensors.extend(sample_info_tensors)
htype_dict, ndim_dict, tensor_info_dict = get_htype_ndim_tensor_info_dicts(
dataset, data_tensors, tensor_info_tensors
Expand All @@ -867,14 +880,8 @@ def __iter__(self):
pil_compressed_tensors=pil_compressed_tensors,
)
else:
if not hasattr(self, "_indra_dataset"):
indra_dataset = dataset_to_libdeeplake(dataset)
else:
indra_dataset = self._indra_dataset

self._dataloader = self.__get_indra_dataloader(
dataset,
indra_dataset,
tensors=tensors,
raw_tensors=raw_tensors,
pil_compressed_tensors=pil_compressed_tensors,
Expand All @@ -887,11 +894,23 @@ def __iter__(self):

dataset_read(self._orig_dataset)

self._check_environment()
if self._iterator is not None:
self._iterator = iter(self._dataloader)

return self

def _check_environment(self):
if sys.platform == "darwin":
import multiprocessing as mp

if mp.get_start_method() == "fork":
env_vars = os.environ
no_proxy = env_vars.get("NO_PROXY", "")
init_check = env_vars.get("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "")
if no_proxy != "*" or init_check != "YES":
raise MacOSEnvironmentError

def __setattr__(self, attr, val):
if (
attr == "_iterator"
Expand Down
8 changes: 7 additions & 1 deletion deeplake/enterprise/libdeeplake_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ def query(dataset, query_string: str):
dsv = ds.query(query_string)
from deeplake.enterprise.convert_to_libdeeplake import INDRA_API

if not isinstance(dataset, IndraDatasetView) and INDRA_API.tql.parse(query_string).is_filter and len(dsv.indexes) < INDRA_DATASET_SAMPLES_THRESHOLD: # type: ignore
try:
is_gt = len(dsv.indexes) < INDRA_DATASET_SAMPLES_THRESHOLD
except:
is_gt = False
pass

if not isinstance(dataset, IndraDatasetView) and INDRA_API.tql.parse(query_string).is_filter and is_gt: # type: ignore
indexes = list(dsv.indexes)
return dataset.no_view_dataset[indexes]
else:
Expand Down
4 changes: 1 addition & 3 deletions deeplake/enterprise/test_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,8 +827,6 @@ def test_batch_sampler_attribute(local_auth_ds):
@pytest.mark.slow
@pytest.mark.flaky
def test_pil_decode_method(local_auth_ds):
from indra.pytorch.exceptions import CollateExceptionWrapper # type: ignore

with local_auth_ds as ds:
ds.create_tensor("x", htype="image", sample_compression="jpeg")
ds.x.extend(np.random.randint(0, 255, (10, 10, 10, 3), np.uint8))
Expand All @@ -840,7 +838,7 @@ def test_pil_decode_method(local_auth_ds):
assert batch["x"].shape == (1, 10, 10, 3)

ptds = ds.dataloader().pytorch(decode_method={"x": "pil"})
with pytest.raises(CollateExceptionWrapper):
with pytest.raises(AttributeError):
for _ in ptds:
continue

Expand Down
8 changes: 7 additions & 1 deletion deeplake/integrations/pytorch/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ def validate_decode_method(
pil_compressed_tensors.append(tensor_name)
elif decode_method == "data":
data_tensors.append(tensor_name)
return raw_tensors, pil_compressed_tensors, json_tensors, list_tensors, data_tensors
return (
raw_tensors,
pil_compressed_tensors,
json_tensors,
list_tensors,
data_tensors,
)


def find_additional_tensors_and_info(dataset, data_tensors):
Expand Down
11 changes: 11 additions & 0 deletions deeplake/util/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,17 @@ def __init__(self, org_id, creds_key):
)


class MacOSEnvironmentError(Exception):
def __init__(self):
message = (
"When using the multiprocessing mode 'fork' on MacOS, "
"you need to execute the following commands in the terminal:\n"
"export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES\n"
"export NO_PROXY=*"
)
super().__init__(message)


class UnableToReadFromUrlError(Exception):
def __init__(self, url, status_code):
super().__init__(f"Unable to read from url {url}. Status code: {status_code}")
Expand Down
Loading