Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubmitRayJob enhancements #50

Merged
merged 18 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,44 @@
CHANGELOG
=========

0.2.0 (2024-08-29)
------------------

by @venkatajagannath in #50

**Breaking changes**

- We removed the "use_gpu" input parameter from the SetupRayCluster and DeleteRayCluster operators. GPU drivers get installed if GPU nodes are available
- Spelling correction in the ``SubmitRayJob``operator. Changed "self.terminal _state" to "self.terminal_states"

**Enhancements to the SubmitRayJob operator**
venkatajagannath marked this conversation as resolved.
Show resolved Hide resolved

Based on customer feedback, we learnt that it would be a much easier UX to spin up/down the cluster in the background of a task. The user would simply decorate their python function with @ray.task and the decorator would orchestrate the rest.

To enable this feature, we had to make changes to the code for SetupRayCluster and DeleteRayCluster operators. Making these changes helps us avoid duplication.

Following new input params added to enable this change -- ray_cluster_yaml, kuberay_version, update_if_exists, gpu_device_plugin_yaml

**Add more more example DAGs**

Earlier we had only 2 example dags. We now have 4. And we execute a different DAG for integration test.

**Making the Decorator more robust**

We made some changes to the decorator source code to make it more robust

**Unit tests updated**

Added unit tests where necessary and deleted where unnecessary. Updated where required.

**Documentation improvements**

- Significant changes to code samples section of the github page to make it easier to navigate
- Added two additional code samples along with explanation
- Added Getting Involved section to both Readme and Index.rst along with box formatting
- Some other minor changes


0.1.0 (2024-08-09)
------------------

Expand Down
159 changes: 30 additions & 129 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Benefits of using this provider include:
## Table of Contents
- [Quickstart](#quickstart)
- [Sample DAGs](#sample-dags)
- [Getting Involved](#getting-involved)
- [Changelog](#changelog)
- [Contributing Guide](#contributing-guide)

Expand All @@ -29,135 +30,35 @@ Check out the Getting Started guide in our [docs](https://astronomer.github.io/a

## Sample DAGs

```python
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG

from ray_provider.operators.ray import SetupRayCluster, SubmitRayJob, DeleteRayCluster

default_args = {
"owner": "airflow",
"start_date": datetime(2024, 3, 26),
"retries": 1,
"retry_delay": timedelta(minutes=0),
}


RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
FOLDER_PATH = Path(__file__).parent / "ray_scripts"

dag = DAG(
"Setup_Teardown",
default_args=default_args,
description="Setup Ray cluster and submit a job",
schedule=None,
)

setup_cluster = SetupRayCluster(
task_id="SetupRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
update_if_exists=False,
dag=dag,
)

submit_ray_job = SubmitRayJob(
task_id="SubmitRayJob",
conn_id="ray_conn",
entrypoint="python script.py",
runtime_env={"working_dir": str(FOLDER_PATH)},
num_cpus=1,
num_gpus=0,
memory=0,
resources={},
fetch_logs=True,
wait_for_completion=True,
job_timeout_seconds=600,
xcom_task_key="SetupRayCluster.dashboard",
poll_interval=5,
dag=dag,
)

delete_cluster = DeleteRayCluster(
task_id="DeleteRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
dag=dag,
)

# Create ray cluster and submit ray job
setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown()
setup_cluster >> delete_cluster
```

```python
from datetime import datetime, timedelta
from pathlib import Path

from airflow.decorators import dag, task

from ray_provider.decorators.ray import ray

RAY_TASK_CONFIG = {
"conn_id": "ray_conn",
"runtime_env": {
"working_dir": Path(__file__).parent / "ray_scripts",
"pip": ["numpy"],
},
"num_cpus": 1,
"num_gpus": 0,
"memory": 0,
"poll_interval": 5,
}


@dag(
dag_id="Ray_Taskflow_Example",
start_date=datetime(2023, 1, 1),
schedule=timedelta(days=1),
catchup=False,
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
tags=["ray", "example"],
)
def ray_taskflow_dag():

@task
def generate_data():
import numpy as np

return np.random.rand(100).tolist()

@ray.task(config=RAY_TASK_CONFIG)
def process_data_with_ray(data):
import numpy as np
import ray

@ray.remote
def square(x):
return x**2

ray.init()
data = np.array(data)
futures = [square.remote(x) for x in data]
results = ray.get(futures)
mean = np.mean(results)
print(f"Mean of squared values: {mean}")
return mean

data = generate_data()
process_data_with_ray(data)


ray_example_dag = ray_taskflow_dag()
```
### Example 1: Using @ray.task for job life cycle
The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown.

This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion

https://github.com/astronomer/astro-provider-ray/blob/bd6d847818be08fae78bc1e4c9bf3334adb1d2ee/example_dags/ray_taskflow_example.py#L1-L57

### Example 2: Using SetupRayCluster, SubmitRayJob & DeleteRayCluster
This example shows how to use separate operators for cluster setup, job submission, and teardown, providing more granular control over the process.

This approach allows for more complex workflows involving Ray clusters.

Key Points:

- Uses SetupRayCluster, SubmitRayJob, and DeleteRayCluster operators separately.
- Allows for multiple jobs to be submitted to the same cluster before deletion.
- Demonstrates how to pass cluster information between tasks using XCom.

This method is ideal for scenarios where you need fine-grained control over the cluster lifecycle, such as running multiple jobs on the same cluster or keeping the cluster alive for a certain period.

https://github.com/astronomer/astro-provider-ray/blob/bd6d847818be08fae78bc1e4c9bf3334adb1d2ee/example_dags/setup-teardown.py#L1-L44

## Getting Involved

| Platform | Purpose | Est. Response time |
|:---:|:---:|:---:|
| [Discussion Forum](https://github.com/astronomer/astro-provider-ray/discussions) | General inquiries and discussions | < 3 days |
| [GitHub Issues](https://github.com/astronomer/astro-provider-ray/issues) | Bug reports and feature requests | < 1-2 days |
| [Slack](https://join.slack.com/t/apache-airflow/shared_invite/zt-2nsw28cw1-Lw4qCS0fgme4UI_vWRrwEQ) | Quick questions and real-time chat | 12 hrs |

## Changelog
We follow [Semantic Versioning](https://semver.org/) for releases. Check [CHANGELOG.rst](https://github.com/astronomer/astro-provider-ray/blob/main/CHANGELOG.rst) for the latest changes.
Expand Down
77 changes: 58 additions & 19 deletions docs/getting_started/code_samples.rst
Original file line number Diff line number Diff line change
@@ -1,42 +1,81 @@
Code Samples
============

There are two main scenarios for using this provider:
Index
-----
- `Example 1: Ray jobs on an existing cluster`_
- `Ray Cluster Sample Spec (YAML)`_
- `Example 2: Using @ray.task for job lifecycle`_
- `Example 3: Using SubmitRayJob operator for job lifecycle`_
- `Example 4: SetupRayCluster, SubmitRayJob & DeleteRayCluster`_

Scenario 1: Setting up a Ray cluster on an existing Kubernetes cluster
----------------------------------------------------------------------
Example 1: Ray jobs on an existing cluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
If you already have a Ray cluster set up, you can use the ``SubmitRayJob`` operator or ``ray.task()`` decorator to submit jobs directly.

If you have an existing Kubernetes cluster and want to install a Ray cluster on it, and then run a Ray job, you can use the ``SetupRayCluster``, ``SubmitRayJob``, and ``DeleteRayCluster`` operators.
In the example below (``ray_taskflow_example_existing_cluster.py``), the ``@ray.task`` decorator is used to define a task that will be executed on the Ray cluster:

This will involve 2 steps -
.. important::
**Set the Ray Dashboard URL connection parameter or RAY_ADDRESS on your airflow worker to connect to your cluster**

Create a YAML file defining your Ray cluster configuration.
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
.. literalinclude:: ../../example_dags/ray_taskflow_example_existing_cluster.py
:language: python
:linenos:


Ray Cluster Sample Spec (YAML)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. important::
``spec.headGroupSpec.serviceType`` must be a 'LoadBalancer' to spin a service that exposes your dashboard externally

Save this file in a location accessible to your Airflow installation, and reference it in your DAG code.

.. literalinclude:: ../../example_dags/scripts/ray.yaml
:language: yaml

Save this file in a location accessible to your Airflow installation, and reference it in your DAG code.

Example 2: Using @ray.task for job lifecycle
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Sample DAG (``setup_teardown.py``):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown.

.. literalinclude:: ../../example_dags/setup-teardown.py
This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion.

.. literalinclude:: ../../example_dags/ray_taskflow_example.py
:language: python
:linenos:

Scenario 2: Using an existing Ray cluster
-----------------------------------------

If you already have a Ray cluster set up, you can use the ``SubmitRayJob`` operator or ``task.ray()`` decorator to submit jobs directly.
Example 3: Using SubmitRayJob operator for job lifecycle
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

In the example below (``ray_taskflow_example.py``), the ``@task.ray`` decorator is used to define a task that will be executed on the Ray cluster:
This example demonstrates how to use the ``SubmitRayJob`` operator to manage the full lifecycle of a Ray cluster and job execution.
venkatajagannath marked this conversation as resolved.
Show resolved Hide resolved

.. literalinclude:: ../../example_dags/ray_taskflow_example.py
This operator provides a more declarative way to define your Ray job within an Airflow DAG.

.. literalinclude:: ../../example_dags/ray_single_operator.py
:language: python
:linenos:

.. note::
Remember to adjust file paths, connection IDs, and other specifics according to your setup.

Example 4: SetupRayCluster, SubmitRayJob & DeleteRayCluster
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This example shows how to use separate operators for cluster setup, job submission, and teardown, providing more granular control over the process.

This approach allows for more complex workflows involving Ray clusters.

Key Points:

- Uses SetupRayCluster, SubmitRayJob, and DeleteRayCluster operators separately.
- Allows for multiple jobs to be submitted to the same cluster before deletion.
- Demonstrates how to pass cluster information between tasks using XCom.

This method is ideal for scenarios where you need fine-grained control over the cluster lifecycle, such as running multiple jobs on the same cluster or keeping the cluster alive for a certain period.

.. important::
**The SubmitRayJob operator uses the xcom_task_key parameter "SetupRayCluster.dashboard" to retrieve the Ray dashboard URL. This URL, stored as an XCom variable by the SetupRayCluster task, is necessary for job submission.**

.. literalinclude:: ../../example_dags/setup-teardown.py
:language: python
:linenos:
27 changes: 14 additions & 13 deletions docs/getting_started/setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Getting Started
See the `installing Helm <https://helm.sh/docs/intro/install/>`_ page for other options.

.. note::
This step is only required if you intend to use the ``SetupRayCluster`` & ``DeleteRayCluster`` operators.
This step is only required if you do not have a ray cluster and intend to spin it up using the provider.

**2. Install the python package:**

Expand All @@ -21,22 +21,23 @@ See the `installing Helm <https://helm.sh/docs/intro/install/>`_ page for other
pip install astro-provider-ray


**3. Setting up the connection**
**3. Setting up the Airflow connection**

.. image:: ../_static/connection.png
:align: center

- For SubmitRayJob operator (using an existing Ray cluster)
- Setup/Teardown a Ray cluster on Kubernetes

- **Connection ID**: e.g., "ray_k8s_conn"
- **Connection Type**: "Ray"
- **Kube config path** OR **Kube config content (JSON format)**: Kubeconfig of the Kubernetes cluster where Ray cluster must be set up
- **Namespace**: The K8s namespace where your cluster must be created. If not provided, "default" is used
- **Optional fields**: Cluster context, Disable SSL, Disable TCP keepalive


- Use an existing Ray cluster

- **Connection ID**: e.g., "ray_conn"
- **Connection Type**: "Ray"
- **Ray dashboard URL**: URL of the Ray dashboard
- **Optional fields**: Cookies, Metadata, Headers, Verify SSL

- For SetupRayCluster and DeleteRayCluster operators

- **Connection Type**: "Ray"
- **Connection ID**: e.g., "ray_k8s_conn"
- **Kube config path** OR **Kube config content (JSON format)**: Kubeconfig of the Kubernetes cluster where Ray cluster must be set up
- **Namespace**: The K8s namespace where your cluster must be created. If not provided, "default" is used
- **Optional fields**: Cluster context, Disable SSL, Disable TCP keepalive
.. image:: ../_static/connection.png
:align: center
Loading
Loading