Skip to content

Commit

Permalink
Adding Integration Tests (#27)
Browse files Browse the repository at this point in the history
* initial test

* path changed

* bug fix

* bug fix

* bug fix

* bug fix

* fix

* update

* bug fix

* bug fix

* update

* bug fix

* bug fix

* bug fix

* bug fix

* update

* update

* bug fix

* bug fix

* Bug fix

* bug fix

* added more cpus

* using minikube

* updated

* 14 cpus

* GKE cluster

* updated

* update

* update

* update

* update

* update

* update

* update

* update

* print roles

* update

* update

* test

* test

* update

* minor update

* bug fix

* more tests

* update

* bug fix

* update

* update

* try again

* update

* updates

* update

* try again

* node svc account set

* update

* update

* update

* update

* yaml files updated

* making ip or dns generic choice

* unit tests added

* minor updates
  • Loading branch information
venkatajagannath authored Aug 7, 2024
1 parent 3634b53 commit 6aa46cf
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 112 deletions.
80 changes: 74 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
name: test

on:
push:
branches: [ "main" ]
Expand Down Expand Up @@ -45,30 +44,99 @@ jobs:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}

- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.nox
key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- name: Test Ray against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }}
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov
- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage

Run-Integration-Tests:
needs: Authorize
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.9"]
permissions:
contents: 'read'
id-token: 'write'
steps:
- uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha || github.ref }}
- uses: actions/cache@v4
with:
path: |
~/.cache/pip
.nox
key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }}
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install packages and dependencies
run: |
python -m pip install hatch
hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze
- id: 'auth'
name: 'Authenticate to Google Cloud'
uses: 'google-github-actions/auth@v1'
with:
credentials_json: '${{ secrets.GCP_SA_KEY }}'
service_account: ${{ secrets.SERVICE_ACCOUNT_EMAIL }}'
create_credentials_file: true

- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v1'

- name: 'Set GCP Project ID'
run: gcloud config set project ${{ secrets.PROJECT_ID }}

- name: Create GKE cluster
run: |
gcloud container clusters create integration-test-cluster \
--zone us-central1-a \
--num-nodes 2 \
--machine-type e2-standard-8 \
--no-enable-autoupgrade \
--no-enable-autorepair \
--no-enable-ip-alias \
--service-account=${{ secrets.SERVICE_ACCOUNT_EMAIL }}
- name: Setup gcloud and get kubeconfig
run: |
gcloud components install gke-gcloud-auth-plugin
gcloud container clusters get-credentials integration-test-cluster --zone us-central1-a
kubectl config view --raw > kubeconfig.yaml
echo "KUBECONFIG=${{ github.workspace }}/kubeconfig.yaml" >> $GITHUB_ENV
echo "USE_GKE_GCLOUD_AUTH_PLUGIN=True" >> $GITHUB_ENV
- name: Run integration tests
run: |
hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration
- name: Upload coverage to Github
uses: actions/upload-artifact@v4
with:
name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }}
path: .coverage
- name: Delete GKE cluster
if: always()
run: |
gcloud container clusters delete integration-test-cluster --zone us-central1-a --quiet
15 changes: 0 additions & 15 deletions example_dags/scripts/k8-gpu.yaml

This file was deleted.

15 changes: 0 additions & 15 deletions example_dags/scripts/k8.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions example_dags/scripts/ray-gpu.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
apiVersion: ray.io/v1alpha1
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-complete
spec:
rayVersion: "2.10.0"
enableInTreeAutoscaling: true
headGroupSpec:
serviceType: ClusterIP
serviceType: LoadBalancer
rayStartParams:
dashboard-host: "0.0.0.0"
block: "true"
Expand Down
27 changes: 0 additions & 27 deletions example_dags/scripts/ray-service.yaml

This file was deleted.

11 changes: 2 additions & 9 deletions example_dags/scripts/ray.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
apiVersion: ray.io/v1alpha1
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: raycluster-complete
spec:
rayVersion: "2.10.0"
enableInTreeAutoscaling: true
headGroupSpec:
serviceType: ClusterIP
serviceType: LoadBalancer
rayStartParams:
dashboard-host: "0.0.0.0"
block: "true"
Expand All @@ -18,13 +18,6 @@ spec:
containers:
- name: ray-head
image: rayproject/ray-ml:latest
env: # Environment variables section starts here
- name: RAY_GRAFANA_IFRAME_HOST
value: "http://127.0.0.1:3000"
- name: RAY_GRAFANA_HOST
value: "http://prometheus-grafana.prometheus-system.svc:80"
- name: RAY_PROMETHEUS_HOST
value: "http://prometheus-kube-prometheus-prometheus.prometheus-system.svc:9090"
resources:
limits:
cpu: 4
Expand Down
62 changes: 62 additions & 0 deletions example_dags/setup-teardown.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG

from ray_provider.operators.ray import DeleteRayCluster, SetupRayCluster, SubmitRayJob

default_args = {
"owner": "airflow",
"start_date": datetime(2024, 3, 26),
"retries": 1,
"retry_delay": timedelta(minutes=0),
}


RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
FOLDER_PATH = Path(__file__).parent / "ray_scripts"

dag = DAG(
"Setup_Teardown",
default_args=default_args,
description="Setup Ray cluster and submit a job",
schedule=None,
)

setup_cluster = SetupRayCluster(
task_id="SetupRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
update_if_exists=False,
dag=dag,
)

submit_ray_job = SubmitRayJob(
task_id="SubmitRayJob",
conn_id="ray_conn",
entrypoint="python script.py",
runtime_env={"working_dir": str(FOLDER_PATH)},
num_cpus=1,
num_gpus=0,
memory=0,
resources={},
fetch_logs=True,
wait_for_completion=True,
job_timeout_seconds=600,
xcom_task_key="SetupRayCluster.dashboard",
poll_interval=5,
dag=dag,
)

delete_cluster = DeleteRayCluster(
task_id="DeleteRayCluster",
conn_id="ray_conn",
ray_cluster_yaml=str(RAY_SPEC),
use_gpu=False,
dag=dag,
)

# Create ray cluster and submit ray job
setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown()
setup_cluster >> delete_cluster
56 changes: 35 additions & 21 deletions ray_provider/hooks/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def __init__(
self.verify = self._get_field("verify") or False
self.ray_client_instance = None

self.namespace = self.get_namespace()
self.namespace = self.get_namespace() or self.DEFAULT_NAMESPACE
self.kubeconfig: str | None = None
self.in_cluster: bool | None = None
self.client_configuration = None
Expand Down Expand Up @@ -235,7 +235,6 @@ def _is_port_open(self, host: str, port: int) -> bool:
def _get_service(self, name: str, namespace: str) -> client.V1Service:
"""
Get the Kubernetes service.
:param name: The name of the service.
:param namespace: The namespace of the service.
:return: The Kubernetes service object.
Expand All @@ -250,16 +249,34 @@ def _get_service(self, name: str, namespace: str) -> client.V1Service:
def _get_load_balancer_details(self, service: client.V1Service) -> dict[str, Any] | None:
"""
Extract LoadBalancer details from the service.
:param service: The Kubernetes service object.
:return: A dictionary containing LoadBalancer details if available, None otherwise.
"""
if service.status.load_balancer.ingress:
ingress: client.V1LoadBalancerIngress = service.status.load_balancer.ingress[0]
ip_or_hostname: str | None = ingress.ip or ingress.hostname
if ip_or_hostname:
ip: str | None = ingress.ip
hostname: str | None = ingress.hostname
if ip or hostname:
ports: list[dict[str, Any]] = [{"name": port.name, "port": port.port} for port in service.spec.ports]
return {"ip_or_hostname": ip_or_hostname, "ports": ports}
return {"ip": ip, "hostname": hostname, "ports": ports}
return None

def _check_load_balancer_readiness(self, lb_details: dict[str, Any]) -> str | None:
"""
Check if the LoadBalancer is ready by testing port connectivity.
:param lb_details: Dictionary containing LoadBalancer details.
:return: The working address (IP or hostname) if ready, None otherwise.
"""
ip: str | None = lb_details["ip"]
hostname: str | None = lb_details["hostname"]

for port_info in lb_details["ports"]:
port = port_info["port"]
if ip and self._is_port_open(ip, port):
return ip
if hostname and self._is_port_open(hostname, port):
return hostname

return None

def wait_for_load_balancer(
Expand All @@ -271,7 +288,6 @@ def wait_for_load_balancer(
) -> dict[str, Any]:
"""
Wait for the LoadBalancer to be ready and return its details.
:param service_name: The name of the LoadBalancer service.
:param namespace: The namespace of the service.
:param max_retries: Maximum number of retries.
Expand All @@ -281,25 +297,23 @@ def wait_for_load_balancer(
"""
for attempt in range(1, max_retries + 1):
self.log.info(f"Attempt {attempt}: Checking LoadBalancer status...")

try:
service: client.V1Service = self._get_service(service_name, namespace)
lb_details: dict[str, Any] | None = self._get_load_balancer_details(service)

if lb_details:
hostname = lb_details["ip_or_hostname"]
all_ports_open = True
for port in lb_details["ports"]:
if not self._is_port_open(hostname, port["port"]):
all_ports_open = False
break

if all_ports_open:
self.log.info("All ports are open. LoadBalancer is ready.")
return lb_details
else:
self.log.info("Not all ports are open. Waiting...")
else:
if not lb_details:
self.log.info("LoadBalancer details not available yet.")
continue

working_address = self._check_load_balancer_readiness(lb_details)

if working_address:
self.log.info("LoadBalancer is ready.")
lb_details["working_address"] = working_address
return lb_details

self.log.info("LoadBalancer is not ready yet. Waiting...")

except AirflowException:
self.log.info("LoadBalancer service is not available yet...")
Expand Down
2 changes: 1 addition & 1 deletion ray_provider/operators/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _setup_load_balancer(self, name: str, namespace: str, context: Context) -> N

if lb_details:
self.log.info(lb_details)
dns = lb_details["ip_or_hostname"]
dns = lb_details["working_address"]
for port in lb_details["ports"]:
url = f"http://{dns}:{port['port']}"
context["task_instance"].xcom_push(key=port["name"], value=url)
Expand Down
8 changes: 8 additions & 0 deletions scripts/test/integration_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pytest -vv \
--cov=ray_provider \
--cov-report=term-missing \
--cov-report=xml \
--durations=0 \
-m integration \
-s \
--log-cli-level=DEBUG
Loading

0 comments on commit 6aa46cf

Please sign in to comment.