diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f16eb3..096294e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,5 +1,4 @@ name: test - on: push: branches: [ "main" ] @@ -45,30 +44,99 @@ jobs: - uses: actions/checkout@v4 with: ref: ${{ github.event.pull_request.head.sha || github.ref }} - - uses: actions/cache@v4 with: path: | ~/.cache/pip .nox key: unit-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }} - - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - - name: Install packages and dependencies run: | python -m pip install hatch hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze - - name: Test Ray against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} run: | hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov - - name: Upload coverage to Github uses: actions/upload-artifact@v4 with: name: coverage-unit-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} path: .coverage + + Run-Integration-Tests: + needs: Authorize + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + airflow-version: ["2.9"] + permissions: + contents: 'read' + id-token: 'write' + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.ref }} + - uses: actions/cache@v4 + with: + path: | + ~/.cache/pip + .nox + key: integration-${{ runner.os }}-${{ matrix.python-version }}-${{ matrix.airflow-version }}-${{ hashFiles('pyproject.toml') }}-${{ hashFiles('ray_provider/__init__.py') }} + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install packages and dependencies + run: | + python -m pip install hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + - id: 'auth' + name: 'Authenticate to Google Cloud' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_SA_KEY }}' + service_account: ${{ secrets.SERVICE_ACCOUNT_EMAIL }}' + create_credentials_file: true + + - name: 'Set up Cloud SDK' + uses: 'google-github-actions/setup-gcloud@v1' + + - name: 'Set GCP Project ID' + run: gcloud config set project ${{ secrets.PROJECT_ID }} + + - name: Create GKE cluster + run: | + gcloud container clusters create integration-test-cluster \ + --zone us-central1-a \ + --num-nodes 2 \ + --machine-type e2-standard-8 \ + --no-enable-autoupgrade \ + --no-enable-autorepair \ + --no-enable-ip-alias \ + --service-account=${{ secrets.SERVICE_ACCOUNT_EMAIL }} + + - name: Setup gcloud and get kubeconfig + run: | + gcloud components install gke-gcloud-auth-plugin + gcloud container clusters get-credentials integration-test-cluster --zone us-central1-a + kubectl config view --raw > kubeconfig.yaml + echo "KUBECONFIG=${{ github.workspace }}/kubeconfig.yaml" >> $GITHUB_ENV + echo "USE_GKE_GCLOUD_AUTH_PLUGIN=True" >> $GITHUB_ENV + + - name: Run integration tests + run: | + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-integration + - name: Upload coverage to Github + uses: actions/upload-artifact@v4 + with: + name: coverage-integration-test-${{ matrix.python-version }}-${{ matrix.airflow-version }} + path: .coverage + - name: Delete GKE cluster + if: always() + run: | + gcloud container clusters delete integration-test-cluster --zone us-central1-a --quiet diff --git a/example_dags/scripts/k8-gpu.yaml b/example_dags/scripts/k8-gpu.yaml deleted file mode 100644 index c823f78..0000000 --- a/example_dags/scripts/k8-gpu.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: eksctl.io/v1alpha5 -kind: ClusterConfig - -metadata: - name: RayCluster - region: us-east-2 - -managedNodeGroups: - - name: ng-1-gpu - instanceType: g4dn.2xlarge - desiredCapacity: 2 - minSize: 1 - maxSize: 3 - ssh: - allow: false diff --git a/example_dags/scripts/k8.yaml b/example_dags/scripts/k8.yaml deleted file mode 100644 index fd68b5a..0000000 --- a/example_dags/scripts/k8.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: eksctl.io/v1alpha5 -kind: ClusterConfig - -metadata: - name: RayCluster - region: us-east-2 - -managedNodeGroups: - - name: ng-1 - instanceType: m5.2xlarge - desiredCapacity: 2 - minSize: 1 - maxSize: 3 - ssh: - allow: false # This will use the default SSH key on your system (optional) diff --git a/example_dags/scripts/ray-gpu.yaml b/example_dags/scripts/ray-gpu.yaml index da7b596..c79860d 100644 --- a/example_dags/scripts/ray-gpu.yaml +++ b/example_dags/scripts/ray-gpu.yaml @@ -1,4 +1,4 @@ -apiVersion: ray.io/v1alpha1 +apiVersion: ray.io/v1 kind: RayCluster metadata: name: raycluster-complete @@ -6,7 +6,7 @@ spec: rayVersion: "2.10.0" enableInTreeAutoscaling: true headGroupSpec: - serviceType: ClusterIP + serviceType: LoadBalancer rayStartParams: dashboard-host: "0.0.0.0" block: "true" diff --git a/example_dags/scripts/ray-service.yaml b/example_dags/scripts/ray-service.yaml deleted file mode 100644 index 9e6490c..0000000 --- a/example_dags/scripts/ray-service.yaml +++ /dev/null @@ -1,27 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: ray-service - namespace: ray -spec: - selector: - ray.io/cluster: raycluster-complete - ray.io/node-type: head - ports: - - name: dashboard - protocol: TCP - port: 8265 - targetPort: 8265 - - name: client - protocol: TCP - port: 10001 - targetPort: 10001 - - name: gcs - protocol: TCP - port: 6379 - targetPort: 6379 - - name: serve - protocol: TCP - port: 8000 - targetPort: 8000 - type: LoadBalancer diff --git a/example_dags/scripts/ray.yaml b/example_dags/scripts/ray.yaml index 01e1e33..1eb8e70 100644 --- a/example_dags/scripts/ray.yaml +++ b/example_dags/scripts/ray.yaml @@ -1,4 +1,4 @@ -apiVersion: ray.io/v1alpha1 +apiVersion: ray.io/v1 kind: RayCluster metadata: name: raycluster-complete @@ -6,7 +6,7 @@ spec: rayVersion: "2.10.0" enableInTreeAutoscaling: true headGroupSpec: - serviceType: ClusterIP + serviceType: LoadBalancer rayStartParams: dashboard-host: "0.0.0.0" block: "true" @@ -18,13 +18,6 @@ spec: containers: - name: ray-head image: rayproject/ray-ml:latest - env: # Environment variables section starts here - - name: RAY_GRAFANA_IFRAME_HOST - value: "http://127.0.0.1:3000" - - name: RAY_GRAFANA_HOST - value: "http://prometheus-grafana.prometheus-system.svc:80" - - name: RAY_PROMETHEUS_HOST - value: "http://prometheus-kube-prometheus-prometheus.prometheus-system.svc:9090" resources: limits: cpu: 4 diff --git a/example_dags/setup-teardown.py b/example_dags/setup-teardown.py new file mode 100644 index 0000000..0185c1e --- /dev/null +++ b/example_dags/setup-teardown.py @@ -0,0 +1,62 @@ +from datetime import datetime, timedelta +from pathlib import Path + +from airflow import DAG + +from ray_provider.operators.ray import DeleteRayCluster, SetupRayCluster, SubmitRayJob + +default_args = { + "owner": "airflow", + "start_date": datetime(2024, 3, 26), + "retries": 1, + "retry_delay": timedelta(minutes=0), +} + + +RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml" +FOLDER_PATH = Path(__file__).parent / "ray_scripts" + +dag = DAG( + "Setup_Teardown", + default_args=default_args, + description="Setup Ray cluster and submit a job", + schedule=None, +) + +setup_cluster = SetupRayCluster( + task_id="SetupRayCluster", + conn_id="ray_conn", + ray_cluster_yaml=str(RAY_SPEC), + use_gpu=False, + update_if_exists=False, + dag=dag, +) + +submit_ray_job = SubmitRayJob( + task_id="SubmitRayJob", + conn_id="ray_conn", + entrypoint="python script.py", + runtime_env={"working_dir": str(FOLDER_PATH)}, + num_cpus=1, + num_gpus=0, + memory=0, + resources={}, + fetch_logs=True, + wait_for_completion=True, + job_timeout_seconds=600, + xcom_task_key="SetupRayCluster.dashboard", + poll_interval=5, + dag=dag, +) + +delete_cluster = DeleteRayCluster( + task_id="DeleteRayCluster", + conn_id="ray_conn", + ray_cluster_yaml=str(RAY_SPEC), + use_gpu=False, + dag=dag, +) + +# Create ray cluster and submit ray job +setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown() +setup_cluster >> delete_cluster diff --git a/ray_provider/hooks/ray.py b/ray_provider/hooks/ray.py index 87becf3..ab4b73d 100644 --- a/ray_provider/hooks/ray.py +++ b/ray_provider/hooks/ray.py @@ -79,7 +79,7 @@ def __init__( self.verify = self._get_field("verify") or False self.ray_client_instance = None - self.namespace = self.get_namespace() + self.namespace = self.get_namespace() or self.DEFAULT_NAMESPACE self.kubeconfig: str | None = None self.in_cluster: bool | None = None self.client_configuration = None @@ -235,7 +235,6 @@ def _is_port_open(self, host: str, port: int) -> bool: def _get_service(self, name: str, namespace: str) -> client.V1Service: """ Get the Kubernetes service. - :param name: The name of the service. :param namespace: The namespace of the service. :return: The Kubernetes service object. @@ -250,16 +249,34 @@ def _get_service(self, name: str, namespace: str) -> client.V1Service: def _get_load_balancer_details(self, service: client.V1Service) -> dict[str, Any] | None: """ Extract LoadBalancer details from the service. - :param service: The Kubernetes service object. :return: A dictionary containing LoadBalancer details if available, None otherwise. """ if service.status.load_balancer.ingress: ingress: client.V1LoadBalancerIngress = service.status.load_balancer.ingress[0] - ip_or_hostname: str | None = ingress.ip or ingress.hostname - if ip_or_hostname: + ip: str | None = ingress.ip + hostname: str | None = ingress.hostname + if ip or hostname: ports: list[dict[str, Any]] = [{"name": port.name, "port": port.port} for port in service.spec.ports] - return {"ip_or_hostname": ip_or_hostname, "ports": ports} + return {"ip": ip, "hostname": hostname, "ports": ports} + return None + + def _check_load_balancer_readiness(self, lb_details: dict[str, Any]) -> str | None: + """ + Check if the LoadBalancer is ready by testing port connectivity. + :param lb_details: Dictionary containing LoadBalancer details. + :return: The working address (IP or hostname) if ready, None otherwise. + """ + ip: str | None = lb_details["ip"] + hostname: str | None = lb_details["hostname"] + + for port_info in lb_details["ports"]: + port = port_info["port"] + if ip and self._is_port_open(ip, port): + return ip + if hostname and self._is_port_open(hostname, port): + return hostname + return None def wait_for_load_balancer( @@ -271,7 +288,6 @@ def wait_for_load_balancer( ) -> dict[str, Any]: """ Wait for the LoadBalancer to be ready and return its details. - :param service_name: The name of the LoadBalancer service. :param namespace: The namespace of the service. :param max_retries: Maximum number of retries. @@ -281,25 +297,23 @@ def wait_for_load_balancer( """ for attempt in range(1, max_retries + 1): self.log.info(f"Attempt {attempt}: Checking LoadBalancer status...") + try: service: client.V1Service = self._get_service(service_name, namespace) lb_details: dict[str, Any] | None = self._get_load_balancer_details(service) - if lb_details: - hostname = lb_details["ip_or_hostname"] - all_ports_open = True - for port in lb_details["ports"]: - if not self._is_port_open(hostname, port["port"]): - all_ports_open = False - break - - if all_ports_open: - self.log.info("All ports are open. LoadBalancer is ready.") - return lb_details - else: - self.log.info("Not all ports are open. Waiting...") - else: + if not lb_details: self.log.info("LoadBalancer details not available yet.") + continue + + working_address = self._check_load_balancer_readiness(lb_details) + + if working_address: + self.log.info("LoadBalancer is ready.") + lb_details["working_address"] = working_address + return lb_details + + self.log.info("LoadBalancer is not ready yet. Waiting...") except AirflowException: self.log.info("LoadBalancer service is not available yet...") diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index 71574f4..a3cb622 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -103,7 +103,7 @@ def _setup_load_balancer(self, name: str, namespace: str, context: Context) -> N if lb_details: self.log.info(lb_details) - dns = lb_details["ip_or_hostname"] + dns = lb_details["working_address"] for port in lb_details["ports"]: url = f"http://{dns}:{port['port']}" context["task_instance"].xcom_push(key=port["name"], value=url) diff --git a/scripts/test/integration_test.sh b/scripts/test/integration_test.sh new file mode 100644 index 0000000..c213a81 --- /dev/null +++ b/scripts/test/integration_test.sh @@ -0,0 +1,8 @@ +pytest -vv \ + --cov=ray_provider \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 \ + -m integration \ + -s \ + --log-cli-level=DEBUG diff --git a/scripts/test/unit_cov.sh b/scripts/test/unit_cov.sh index a82dd58..56287f4 100644 --- a/scripts/test/unit_cov.sh +++ b/scripts/test/unit_cov.sh @@ -3,4 +3,6 @@ pytest \ --cov=ray_provider \ --cov-report=term-missing \ --cov-report=xml \ - --durations=0 + --durations=0 \ + -m "not (integration or perf)" \ + --ignore=tests/test_dag_example.py diff --git a/tests/hooks/test_ray_hooks.py b/tests/hooks/test_ray_hooks.py index b717343..535bdee 100644 --- a/tests/hooks/test_ray_hooks.py +++ b/tests/hooks/test_ray_hooks.py @@ -232,7 +232,7 @@ def test_get_load_balancer_details_with_ingress(self, ray_hook): lb_details = ray_hook._get_load_balancer_details(mock_service) - assert lb_details == {"ip_or_hostname": "192.168.1.1", "ports": [{"name": "http", "port": 80}]} + assert lb_details == {"ip": "192.168.1.1", "hostname": None, "ports": [{"name": "http", "port": 80}]} def test_get_load_balancer_details_with_hostname(self, ray_hook): mock_service = Mock(spec=client.V1Service) @@ -248,7 +248,7 @@ def test_get_load_balancer_details_with_hostname(self, ray_hook): lb_details = ray_hook._get_load_balancer_details(mock_service) - assert lb_details == {"ip_or_hostname": "example.com", "ports": [{"name": "https", "port": 443}]} + assert lb_details == {"hostname": "example.com", "ip": None, "ports": [{"name": "https", "port": 443}]} def test_get_load_balancer_details_no_ingress(self, ray_hook): mock_service = Mock(spec=client.V1Service) @@ -332,29 +332,32 @@ def test_uninstall_kuberay_operator(self, mock_subprocess_run, mock_kubernetes_i @patch("ray_provider.hooks.ray.RayHook._get_service") @patch("ray_provider.hooks.ray.RayHook._get_load_balancer_details") - @patch("ray_provider.hooks.ray.RayHook._is_port_open") - def test_wait_for_load_balancer_success(self, mock_is_port_open, mock_get_lb_details, mock_get_service, ray_hook): - # Mock the service + @patch("ray_provider.hooks.ray.RayHook._check_load_balancer_readiness") + def test_wait_for_load_balancer_success( + self, mock_check_readiness, mock_get_lb_details, mock_get_service, ray_hook + ): mock_service = Mock(spec=client.V1Service) mock_get_service.return_value = mock_service - # Mock the load balancer details mock_get_lb_details.return_value = { - "ip_or_hostname": "test-lb.example.com", + "hostname": "test-lb.example.com", + "ip": None, "ports": [{"name": "http", "port": 80}, {"name": "https", "port": 443}], } - # Mock the port check to return True (ports are open) - mock_is_port_open.return_value = True + mock_check_readiness.return_value = "test-lb.example.com" - # Call the method result = ray_hook.wait_for_load_balancer("test-service", namespace="default", max_retries=1, retry_interval=1) - # Assertions - assert result == mock_get_lb_details.return_value + assert result == { + "hostname": "test-lb.example.com", + "ip": None, + "ports": [{"name": "http", "port": 80}, {"name": "https", "port": 443}], + "working_address": "test-lb.example.com", + } mock_get_service.assert_called_once_with("test-service", "default") mock_get_lb_details.assert_called_once_with(mock_service) - assert mock_is_port_open.call_count == 2 # Called for both ports + mock_check_readiness.assert_called_once() @patch("ray_provider.hooks.ray.RayHook._get_service") @patch("ray_provider.hooks.ray.RayHook._get_load_balancer_details") @@ -366,7 +369,8 @@ def test_wait_for_load_balancer_timeout(self, mock_is_port_open, mock_get_lb_det # Mock the load balancer details mock_get_lb_details.return_value = { - "ip_or_hostname": "test-lb.example.com", + "hostname": "test-lb.example.com", + "ip": None, "ports": [{"name": "http", "port": 80}], } @@ -390,6 +394,42 @@ def test_wait_for_load_balancer_service_not_found(self, mock_get_service, ray_ho assert "LoadBalancer did not become ready after 1 attempts" in str(exc_info.value) + @patch("ray_provider.hooks.ray.RayHook._is_port_open") + def test_check_load_balancer_readiness_ip(self, mock_is_port_open, ray_hook): + mock_is_port_open.return_value = True + lb_details = {"ip": "192.168.1.1", "hostname": None, "ports": [{"name": "http", "port": 80}]} + + result = ray_hook._check_load_balancer_readiness(lb_details) + + assert result == "192.168.1.1" + mock_is_port_open.assert_called_once_with("192.168.1.1", 80) + + @patch("ray_provider.hooks.ray.RayHook._is_port_open") + def test_check_load_balancer_readiness_hostname(self, mock_is_port_open, ray_hook): + mock_is_port_open.side_effect = [False, True] + lb_details = { + "ip": "192.168.1.1", + "hostname": "example.com", + "ports": [{"name": "http", "port": 80}, {"name": "https", "port": 443}], + } + + result = ray_hook._check_load_balancer_readiness(lb_details) + + assert result == "example.com" + mock_is_port_open.assert_any_call("192.168.1.1", 80) + mock_is_port_open.assert_any_call("example.com", 80) + + @patch("ray_provider.hooks.ray.RayHook._is_port_open") + def test_check_load_balancer_readiness_not_ready(self, mock_is_port_open, ray_hook): + mock_is_port_open.return_value = False + lb_details = {"ip": "192.168.1.1", "hostname": "example.com", "ports": [{"name": "http", "port": 80}]} + + result = ray_hook._check_load_balancer_readiness(lb_details) + + assert result is None + mock_is_port_open.assert_any_call("192.168.1.1", 80) + mock_is_port_open.assert_any_call("example.com", 80) + @patch("ray_provider.hooks.ray.KubernetesHook.get_connection") @patch("ray_provider.hooks.ray.KubernetesHook.__init__") @patch("ray_provider.hooks.ray.client.AppsV1Api.read_namespaced_daemon_set") diff --git a/tests/operators/test_ray_operators.py b/tests/operators/test_ray_operators.py index 910237f..90176de 100644 --- a/tests/operators/test_ray_operators.py +++ b/tests/operators/test_ray_operators.py @@ -109,7 +109,7 @@ def test_setup_gpu_driver(self, mock_hook, operator): def test_setup_load_balancer(self, mock_hook, operator): mock_hook.wait_for_load_balancer.return_value = { - "ip_or_hostname": "test.example.com", + "working_address": "test.example.com", "ports": [{"name": "dashboard", "port": 8265}], } context = {"task_instance": Mock()} diff --git a/tests/test_dag_example.py b/tests/test_dag_example.py new file mode 100644 index 0000000..6348205 --- /dev/null +++ b/tests/test_dag_example.py @@ -0,0 +1,66 @@ +import os +from pathlib import Path + +import pytest +from airflow.models import Connection, DagBag +from airflow.utils.db import create_default_connections +from airflow.utils.session import create_session + +# Correctly construct the example DAGs directory path +EXAMPLE_DAGS_DIR = Path(__file__).parent.parent / "example_dags" +print(f"EXAMPLE_DAGS_DIR: {EXAMPLE_DAGS_DIR}") + + +def get_dags(dag_folder=None): + dag_bag = ( + DagBag(dag_folder=str(dag_folder), include_examples=False) if dag_folder else DagBag(include_examples=False) + ) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME", "")) + + dags_info = [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + for dag_id, dag, fileloc in dags_info: + print(f"DAG ID: {dag_id}, File Location: {fileloc}") + return dags_info + + +@pytest.fixture(scope="module") +def setup_airflow_db(): + os.system("airflow db init") + conn_id = "ray_conn" + # Explicitly create the tables if necessary + create_default_connections() + + with create_session() as session: + conn_exists = session.query(Connection).filter(Connection.conn_id == conn_id).first() + if conn_exists: + session.delete(conn_exists) + session.commit() + conn = Connection( + conn_id=conn_id, + conn_type="ray", + extra={ + "kube_config_path": os.environ.get("KUBECONFIG"), + "namespace": "ray", + "cluster_context": None, # Set to None as we don't know how to get cluster context for a kind cluster + }, + ) + session.add(conn) + session.commit() + + +dags = get_dags(EXAMPLE_DAGS_DIR) +print(f"Discovered DAGs: {dags}") + + +@pytest.mark.integration +@pytest.mark.parametrize("dag_id,dag,fileloc", dags, ids=[x[2] for x in dags]) +def test_dag_runs(setup_airflow_db, dag_id, dag, fileloc): + print(f"Testing DAG: {dag_id}, located at: {fileloc}") + assert dag is not None, f"DAG {dag_id} not found!" + + try: + dag.test() + except Exception as e: + pytest.fail(f"Error running DAG {dag_id}: {e}")