Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ray client is not supported with ray data -> /tutorials/beginner/execution_engine.html only works with local cluster #218

Open
andnig opened this issue Dec 23, 2023 · 3 comments

Comments

@andnig
Copy link

andnig commented Dec 23, 2023

When running the "ray" example as provided here, the example fails, if a remote cluster is used.

This ray issue seems related: ray-project/ray#41333

Repro-steps:

  1. set up a ray cluster, not on localhost
import os
import pandas as pd
from fugue import transform 

os.environ["RAY_ADDRESS"] = "ray://<ray-cluster>:10001"

df = pd.DataFrame({"col1": [1,2,3,4], "col2": [1,2,3,4]})

# schema: *, col3:int
def add_cols(df:pd.DataFrame) -> pd.DataFrame:
    return df.assign(col3 = df['col1'] + df['col2'])

It would be nice, if this is documented somewhere (or is there a fix for that?)

Error:

_0 _State.RUNNING -> _State.FAILED  Global node is not initialized.
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Cell In[4], line 3
      1 ray.init(ignore_reinit_error=True)
----> 3 ray_df = transform(df, add_cols, engine="ray")
      4 ray_df.show(5)

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/api.py:174, in transform(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    171     else:
    172         tdf.save(save_path, fmt="parquet")
--> 174 dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175 if checkpoint:
    176     result = dag.yields["result"].result  # type:ignore

File /opt/conda/lib/python3.11/site-packages/fugue/workflow/workflow.py:1604, in FugueWorkflow.run(self, engine, conf, **kwargs)
   1602             if ctb is None:  # pragma: no cover
   1603                 raise
-> 1604             raise ex.with_traceback(ctb)
   1605         self._computed = True
   1606 return FugueWorkflowResult(self.yields)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:240, in RayExecutionEngine.to_df(self, df, schema)
    239 def to_df(self, df: Any, schema: Any = None) -> DataFrame:
--> 240     return self._to_ray_df(df, schema=schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:329, in RayExecutionEngine._to_ray_df(self, df, schema)
    327 def _to_ray_df(self, df: Any, schema: Any = None) -> RayDataFrame:
    328     # TODO: remove this in phase 2
--> 329     res = self._to_auto_df(df, schema)
    330     if not isinstance(res, RayDataFrame):
    331         return RayDataFrame(res)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/execution_engine.py:342, in RayExecutionEngine._to_auto_df(self, df, schema)
    337     assert_or_throw(
    338         schema is None,
    339         ValueError("schema must be None when df is a DataFrame"),
    340     )
    341     return df
--> 342 return RayDataFrame(df, schema)

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:105, in RayDataFrame.__init__(self, df, schema, internal_schema)
    103 else:
    104     raise ValueError(f"{df} is incompatible with RayDataFrame")
--> 105 rdf, schema = self._apply_schema(rdf, schema, internal_schema)
    106 super().__init__(schema)
    107 self._native = rdf

File /opt/conda/lib/python3.11/site-packages/fugue_ray/dataframe.py:238, in RayDataFrame._apply_schema(self, rdf, schema, internal_schema)
    236 if internal_schema:
    237     return rdf, schema
--> 238 fmt, rdf = get_dataset_format(rdf)
    239 if fmt is None:  # empty
    240     schema = _input_schema(schema).assert_not_empty()

File /opt/conda/lib/python3.11/site-packages/fugue_ray/_utils/dataframe.py:32, in get_dataset_format(df)
     30 def get_dataset_format(df: rd.Dataset) -> Tuple[Optional[str], rd.Dataset]:
     31     df = materialize(df)
---> 32     if df.count() == 0:
     33         return None, df
     34     if ray.__version__ < "2.5.0":  # pragma: no cover

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:2598, in Dataset.count(self)
   2595     return 0
   2597 # For parquet, we can return the count directly from metadata.
-> 2598 meta_count = self._meta_count()
   2599 if meta_count is not None:
   2600     return meta_count

File /opt/conda/lib/python3.11/site-packages/ray/data/dataset.py:5108, in Dataset._meta_count(self)
   5107 def _meta_count(self) -> Optional[int]:
-> 5108     return self._plan.meta_count()

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:496, in ExecutionPlan.meta_count(self)
    491     return None
    492 elif self._in_blocks is not None and self._snapshot_blocks is None:
    493     # If the plan only has input blocks, we execute it, so snapshot has output.
    494     # This applies to newly created dataset. For example, initial dataset
    495     # from read, and output datasets of Dataset.split().
--> 496     self.execute()
    497 # Snapshot is now guaranteed to be the final block or None.
    498 return self._get_num_rows_from_blocks_metadata(self._snapshot_blocks)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/plan.py:628, in ExecutionPlan.execute(self, allow_clear_input_blocks, force_read, preserve_order)
    621 metrics_tag = create_dataset_tag(
    622     self._dataset_name, self._dataset_uuid
    623 )
    624 executor = StreamingExecutor(
    625     copy.deepcopy(context.execution_options),
    626     metrics_tag,
    627 )
--> 628 blocks = execute_to_legacy_block_list(
    629     executor,
    630     self,
    631     allow_clear_input_blocks=allow_clear_input_blocks,
    632     dataset_uuid=self._dataset_uuid,
    633     preserve_order=preserve_order,
    634 )
    635 stats = executor.get_stats()
    636 stats_summary_string = stats.to_summary().to_string(
    637     include_parent=False
    638 )

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py:125, in execute_to_legacy_block_list(executor, plan, allow_clear_input_blocks, dataset_uuid, preserve_order)
    107 """Execute a plan with the new executor and translate it into a legacy block list.
    108 
    109 Args:
   (...)
    117     The output as a legacy block list.
    118 """
    119 dag, stats = _get_execution_dag(
    120     executor,
    121     plan,
    122     allow_clear_input_blocks,
    123     preserve_order,
    124 )
--> 125 bundles = executor.execute(dag, initial_stats=stats)
    126 block_list = _bundles_to_block_list(bundles)
    127 # Set the stats UUID after execution finishes.

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/execution/streaming_executor.py:132, in StreamingExecutor.execute(self, dag, initial_stats)
    129     self._global_info = ProgressBar("Running", dag.num_outputs_total())
    131 self._output_node: OpState = self._topology[dag]
--> 132 StatsManager.register_dataset_to_stats_actor(
    133     self._dataset_tag,
    134     self._get_operator_tags(),
    135 )
    136 self.start()
    137 self._execution_started = True

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:531, in _StatsManager.register_dataset_to_stats_actor(self, dataset_tag, operator_tags)
    530 def register_dataset_to_stats_actor(self, dataset_tag, operator_tags):
--> 531     self._stats_actor().register_dataset.remote(dataset_tag, operator_tags)

File /opt/conda/lib/python3.11/site-packages/ray/data/_internal/stats.py:414, in _StatsManager._stats_actor(self, create_if_not_exists)
    412 def _stats_actor(self, create_if_not_exists=True) -> _StatsActor:
    413     if ray._private.worker._global_node is None:
--> 414         raise RuntimeError("Global node is not initialized.")
    415     current_cluster_id = ray._private.worker._global_node.cluster_id
    416     if (
    417         self._stats_actor_handle is None
    418         or self._stats_actor_cluster_id != current_cluster_id
    419     ):

RuntimeError: Global node is not initialized.
@kvnkho
Copy link
Collaborator

kvnkho commented Jan 2, 2024

Hi @andnig ,

Sorry for the late response. Just got back from holidays. We test this on Anyscale and it is supposed on a remote cluster. Can you give me your versions for Ray and Fugue?

@andnig
Copy link
Author

andnig commented Jan 7, 2024

Hey, no problem. I worked around the issue, so it was ultimately not a blocker for me.

fugue: 0.8.6
ray: 2.9.0

I think the issue might be that ray data does not support ray client. Ray data seems to be used in the Dataframe example. (See here for more details: ray-project/ray#41333 (comment))

@HarryCaveMan
Copy link

HarryCaveMan commented Feb 7, 2024

As noted, this is separate from #41333 because that is only a workaround to address the fact that rat data does not support using an already connected client without wrapping your actor in a task. This issue is asking for tay client support, not asking for a workaround in its absence. Under the hood, ray data just uses cached RemoteFunction objects (IE what the @ray.remote decorator creates). The only real difference I see is they are invoking remote directly instead of using decorators. Is there no way to configure ray data to connect to an existing cluster's head node without scheduling it as a task through the client (essentially wrapping it in another RemoteFunction)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants