Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing to deserialize a Dask Dataframe on a distributed cluster #8038

Closed
jacobgreen405 opened this issue Jul 26, 2023 · 3 comments
Closed

Comments

@jacobgreen405
Copy link

jacobgreen405 commented Jul 26, 2023

Background

I'm trying to load a CSV file in with a SLURM cluster and convert it to Parquet, yet every time I perform this conversion with a cluster I receive errors on the worker side. This happens in both a Jupyter notebook and normal Python script. Miniconda3 and all of the code are stored in a directory shared across the controller and worker nodes of the cluster.

EDIT:
I've tried reading the CSV with LocalCluster() as well, and still get the exact same error

Example Code

EDIT: Here's a more reproducible example than my previous one. Upon running this, I still get the same error as I did when I was reading data from cloud storage.

import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client, LocalCluster

# Instantiate Cluster & Client
cluster = LocalCluster()
client = Client(cluster)

# Put your inputs here
path = '/path/to/data.csv' # Path to write local data
size = 500 # Dimension to make square dataframe

# Write Random Array
array = da.random.random((size, size))
df = dd.from_array(array)
df.to_csv(path, header=None, index=False)

# Read CSV file
df = dd.read_csv(f'{path}/*')
df.compute() # <--------- Error occurs here

The follow error report is generated upon execution of df.compute():

2023-07-27 16:26:33,813 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    ret = unpacker._unpack()
          ^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 585, in _unpack
    key = self._unpack(EX_CONSTRUCT)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
    obj = obj.decode("utf_8", self._unicode_errors)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd8 in position 2: invalid continuation byte
2023-07-27 16:26:33,815 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/core.py", line 930, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/scheduler.py", line 5478, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/core.py", line 983, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/tcp.py", line 254, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/utils.py", line 100, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/utils.py", line 83, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    ret = unpacker._unpack()
          ^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 585, in _unpack
    key = self._unpack(EX_CONSTRUCT)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
    obj = obj.decode("utf_8", self._unicode_errors)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd8 in position 2: invalid continuation byte
Task exception was never retrieved
future: <Task finished name='Task-3116' coro=<Server._handle_comm() done, defined at /home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/core.py:836> exception=UnicodeDecodeError('utf-8', b'\x1c^\xd8gg\xde?\xc9\x86\xc1\xee\t\xb0\xd0?>\xcc\x99L\r\xf4\xd5?\xe0Q\xc4', 2, 3, 'invalid continuation byte')>
Traceback (most recent call last):
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/core.py", line 930, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/scheduler.py", line 5478, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/core.py", line 983, in handle_stream
    msgs = await comm.read()
           ^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/tcp.py", line 254, in read
    msg = await from_frames(
          ^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/utils.py", line 100, in from_frames
    res = _from_frames()
          ^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/comm/utils.py", line 83, in _from_frames
    return protocol.loads(
           ^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/protocol/core.py", line 158, in loads
    return msgpack.loads(
           ^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 128, in unpackb
    ret = unpacker._unpack()
          ^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 565, in _unpack
    ret.append(self._unpack(EX_CONSTRUCT))
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 585, in _unpack
    key = self._unpack(EX_CONSTRUCT)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/jgreen/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/msgpack/fallback.py", line 602, in _unpack
    obj = obj.decode("utf_8", self._unicode_errors)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xd8 in position 2: invalid continuation byte
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
Cell In[7], line 2
      1 df = dd.read_csv('/home/jgreen/tmpfile.csv/*')
----> 2 df.compute()

File ~/.miniconda3/envs/cloud-data/lib/python3.11/site-packages/distributed/client.py:2248, in Client._gather(self, futures, errors, direct, local_worker)
   2246     else:
   2247         raise exception.with_traceback(traceback)
-> 2248     raise exc
   2249 if errors == "skip":
   2250     bad_keys.add(key)

CancelledError: ('read-csv-7d20f2ab429c39661d2f17a81e993fc0', 0)

Additional Info

I have checked the version of Dask across my cluster with client.get_versions(check=True) (per the suggestion from @mrocklin in #2124), which outputs the following:

{'scheduler': {'host': {'python': '3.11.4.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '3.10.0-1160.88.1.el7.x86_64',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'en_US.UTF-8'},
  'packages': {'python': '3.11.4.final.0',
   'dask': '2023.7.1',
   'distributed': '2023.7.1',
   'msgpack': '1.0.3',
   'cloudpickle': '2.2.1',
   'tornado': '6.2',
   'toolz': '0.12.0',
   'numpy': '1.25.0',
   'pandas': '1.5.3',
   'lz4': '4.3.2'}},
 'workers': {'tcp://127.0.0.1:33597': {'host': {'python': '3.11.4.final.0',
    'python-bits': 64,
    'OS': 'Linux',
    'OS-release': '3.10.0-1160.88.1.el7.x86_64',
    'machine': 'x86_64',
    'processor': 'x86_64',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'en_US.UTF-8'},
   'packages': {'python': '3.11.4.final.0',
    'dask': '2023.7.1',
    'distributed': '2023.7.1',
    'msgpack': '1.0.3',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.25.0',
    'pandas': '1.5.3',
    'lz4': '4.3.2'}},
  'tcp://127.0.0.1:44117': {'host': {'python': '3.11.4.final.0',
    'python-bits': 64,
    'OS': 'Linux',
    'OS-release': '3.10.0-1160.88.1.el7.x86_64',
    'machine': 'x86_64',
    'processor': 'x86_64',
    'byteorder': 'little',
    'LC_ALL': 'None',
    'LANG': 'en_US.UTF-8'},
   'packages': {'python': '3.11.4.final.0',
    'dask': '2023.7.1',
    'distributed': '2023.7.1',
    'msgpack': '1.0.3',
    'cloudpickle': '2.2.1',
    'tornado': '6.2',
    'toolz': '0.12.0',
    'numpy': '1.25.0',
    'pandas': '1.5.3',
    'lz4': '4.3.2'}}},
 'client': {'host': {'python': '3.11.4.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '3.10.0-1160.88.1.el7.x86_64',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'None',
   'LANG': 'en_US.UTF-8'},
  'packages': {'python': '3.11.4.final.0',
   'dask': '2023.7.1',
   'distributed': '2023.7.1',
   'msgpack': '1.0.3',
   'cloudpickle': '2.2.1',
   'tornado': '6.2',
   'toolz': '0.12.0',
   'numpy': '1.25.0',
   'pandas': '1.5.3',
   'lz4': '4.3.2'}}}

I have also confirmed that df.compute() works fine without using dask.distributed (albeit I run out of memory, but no deserialization errors are thrown).

Environment

  • Dask version: 2023.7.1
  • Python version: 3.11.4
  • Operating System: Linux
  • Install method: conda
@jacobgreen405 jacobgreen405 changed the title Failing to deserialize a Dask Dataframe on a distributed cluster (SLURMCluster using dask-jobqueue) Failing to deserialize a Dask Dataframe on a distributed cluster Jul 26, 2023
@jacobgreen405
Copy link
Author

The issue was that msgpack-python installed an old version by default (1.0.3). I don't think this is just isolated to my case, because I had run into the same issue on both my local machine and a Google VM.

Fixed by manually installing msgpack:

conda install -c conda-forge msgpack-python==1.0.5

@vivadiva81
Copy link

Its June 2024, and I am unable to install/update msgpack-python to its current version 1.0.8. I am using the commands
conda install -c conda-forge msgpack-python==1.0.8
also couldn't get conda install -c conda-forge msgpack-python==1.0.5 to work. It just hangs for hours!!

@fjetter
Copy link
Member

fjetter commented Jun 24, 2024

We are testing against msgpack-python==1.0.0 and the most recent version. If anybody can provide us with a reproducer that'd be helpful and we'd also adjust pinning accordingly to avoid this problem.

I tested against the CSV reproducer above but could not reproduce the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants