Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add cuda backend support for to_raggedtensor and from_raggedtensor functions #3263

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
8 changes: 8 additions & 0 deletions docs/reference/toctree.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
generated/ak.to_feather
generated/ak.from_avro_file

.. toctree::
:caption: Conversions for machine learning

generated/ak.from_raggedtensor
generated/ak.to_raggedtensor
generated/ak.from_torch
generated/ak.to_torch

.. toctree::
:caption: Converting to Pandas DataFrames

Expand Down
34 changes: 31 additions & 3 deletions src/awkward/operations/ak_from_raggedtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@ def from_raggedtensor(array):
def _impl(array):
try:
# get the flat values
content = array.flat_values.numpy()
content = array.flat_values
except AttributeError as err:
raise TypeError(
"""only RaggedTensor can be converted to awkward array"""
) from err
# convert them to ak.contents right away

# handle gpu and cpu instances separately
device = content.backing_device

content = _tensor_to_np_or_cp(content, device)

# convert flat_values to ak.contents right away
content = ak.contents.NumpyArray(content)

# get the offsets
offsets_arr = []
for splits in array.nested_row_splits:
split = splits.numpy()
# handle gpu and cpu instances separately
split = _tensor_to_np_or_cp(splits, device)
# convert to ak.index
offset = ak.index.Index64(split)
offsets_arr.append(offset)
Expand All @@ -55,6 +62,27 @@ def _impl(array):
return ak.Array(_recursive_call(content, offsets_arr, 0))


def _tensor_to_np_or_cp(array, device):
if device.endswith("GPU", 0, -2):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to check the documentation on str.endswith, but it seems that this is equivalent to

    if device[:-2].endswith("GPU"):

(though I think the latter is easier to understand because slicing is more well-known than the extra arguments of str.endswith).

However, are you assuming that the GPU number is one digit? That is, will the above code break for a computer with 10 GPUs?

If the format for the 15th GPU is "GPU-14" (zero-indexed), then maybe you want

Suggested change
if device.endswith("GPU", 0, -2):
if device.split("-")[0] == "GPU":

(and if lowercase is possible, you can also add a .upper() in the chain).

But before you accept the suggestion above, is it really a hyphen? If there's only one GPU, would there be no hyphen? (Note that device.split("-")[0] is equal to device if there is no hyphen, so the same code may be fine.)

Copy link
Collaborator Author

@maxymnaumchyk maxymnaumchyk Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that, I haven't thought about that case! If there's only one GPU, then the device looks like this:
/job:localhost/replica:0/task:0/device:GPU:0
So, I think if device.split(":")[-2].upper() == "GPU": will work for all cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will, but it relies on TensorFlow never changing the text to end with "GPU" rather than "GPU:0". All of this is about trying to write something defensively, so that either our incomplete knowledge of the upstream library (TensorFlow) or possible changes in that upstream library would cause our code to break. By "break," I mean "do the wrong thing without an error message." Failing with an error message if TensorFlow changes would be fine.

Given that what we expect from TensorFlow is a string like

/job:localhost/replica:0/task:0/device:CPU:0

or

/job:localhost/replica:0/task:0/device:GPU:0

or

/job:localhost/replica:0/task:0/device:GPU:14

this would be a safe way to catch it:

import re

m = re.match(".*:(CPU|GPU):[0-9]+", device)
if m is not None:
    raise NotImplementedError(f"TensorFlow device has an unexpected format: {device!r}")
if m.groups()[0] == "GPU":
    ...

It also expresses to the future maintainer (or code reviewer) what you know about what TensorFlow gives you. (The import needs to be in the import section.)

try:
import tensorflow as tf
except ImportError as err:
raise ImportError(
"""to use ak.from_raggedtensor, you must install the 'tensorflow' package with:

pip install tensorflow
or
conda install tensorflow"""
) from err

from awkward._nplikes.cupy import Cupy

cp = Cupy.instance()
return cp.from_dlpack(tf.experimental.dlpack.to_dlpack(array))
else:
return array.numpy()


def _recursive_call(content, offsets_arr, count):
if count == len(offsets_arr) - 2:
return ak.contents.ListOffsetArray(
Expand Down
56 changes: 49 additions & 7 deletions src/awkward/operations/ak_to_raggedtensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

import awkward as ak
from awkward._dispatch import high_level_function
from awkward._nplikes.numpy_like import NumpyMetadata

__all__ = ("to_raggedtensor",)

np = NumpyMetadata.instance()


@high_level_function()
def to_raggedtensor(array):
Expand Down Expand Up @@ -45,14 +48,49 @@ def _impl(array):
# also transforms a python list to awkward array
array = ak.to_layout(array, allow_record=False)

# keep the same device
ak_device = ak.backend(array)
if ak_device not in ["cuda", "cpu"]:
raise ValueError("""Only 'cpu' and 'cuda' backend conversions are allowed""")

if ak_device == "cpu":
device = "CPU:0"
else:
id = _find_innermost_content(array).data.device.id
device = "GPU:" + str(id)

with tf.device(device):
if isinstance(array, ak.contents.numpyarray.NumpyArray):
values = array.data
# handle cupy separately
values = _convert_to_tensor_if_cupy(values)
return tf.RaggedTensor.from_row_splits(
values=values, row_splits=[0, array.__len__()]
)

else:
flat_values, nested_row_splits = _recursive_call(array, ())
return tf.RaggedTensor.from_nested_row_splits(
flat_values, nested_row_splits
)


def _find_innermost_content(array):
if isinstance(array, ak.contents.numpyarray.NumpyArray):
return tf.RaggedTensor.from_row_splits(
values=array.data, row_splits=[0, array.__len__()]
)
return array
else:
return _find_innermost_content(array.content)


def _convert_to_tensor_if_cupy(array):
if isinstance(array, np.ndarray):
return array
else:
flat_values, nested_row_splits = _recursive_call(array, ())
# converts cupy directly to tensor,
# since `tf.RaggedTensor.from_nested_row_splits` can not work with Cupy arrays
import tensorflow as tf

return tf.RaggedTensor.from_nested_row_splits(flat_values, nested_row_splits)
return tf.experimental.dlpack.from_dlpack(array.toDlpack())


def _recursive_call(layout, offsets_arr):
Expand All @@ -75,10 +113,14 @@ def _recursive_call(layout, offsets_arr):
)

# recursively gather all of the offsets of an array
offsets_arr += (layout.offsets.data,)
offset = layout.offsets.data
offset = _convert_to_tensor_if_cupy(offset)
offsets_arr += (offset,)

except AttributeError:
# at the last iteration form a ragged tensor from the
# accumulated offsets and flattened values of the array
return layout.data, offsets_arr
data = layout.data
data = _convert_to_tensor_if_cupy(data)
return data, offsets_arr
return _recursive_call(layout.content, offsets_arr)
Loading