From 7e5cc2ed3eff9d2bf1a1280257b801a739343a43 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Fri, 19 Jul 2024 23:28:20 +0530 Subject: [PATCH] Fix tests --- .coveragerc | 3 ++ .github/workflows/test.yml | 2 +- .pre-commit-config.yaml | 1 + ray_provider/codespell-ignore-words.txt | 1 + scripts/test/unit_cov.sh | 6 +++ scripts/test/unit_test.sh | 2 +- tests/decorators/test_ray_decorators.py | 5 +-- tests/operators/test_ray_operators.py | 50 +++++++--------------- tests/triggers/test_ray_triggers.py | 57 +++++++++++-------------- 9 files changed, 55 insertions(+), 72 deletions(-) create mode 100644 .coveragerc create mode 100644 ray_provider/codespell-ignore-words.txt create mode 100644 scripts/test/unit_cov.sh diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..a7cba45 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,3 @@ +[run] +omit = + tests/* diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 592daa5..414a3fe 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -65,7 +65,7 @@ jobs: - name: Test Ray against Airflow ${{ matrix.airflow-version }} and Python ${{ matrix.python-version }} run: | - hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-cov - name: Upload coverage to Github uses: actions/upload-artifact@v4 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 274a1f2..a8005ef 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,6 +31,7 @@ repos: name: Run codespell to check for common misspellings in files language: python types: [text] + args: ["--ignore-words", codespell-ignore-words.txt] - repo: https://github.com/pre-commit/pygrep-hooks rev: v1.10.0 hooks: diff --git a/ray_provider/codespell-ignore-words.txt b/ray_provider/codespell-ignore-words.txt new file mode 100644 index 0000000..037e3b6 --- /dev/null +++ b/ray_provider/codespell-ignore-words.txt @@ -0,0 +1 @@ +ascend diff --git a/scripts/test/unit_cov.sh b/scripts/test/unit_cov.sh new file mode 100644 index 0000000..a82dd58 --- /dev/null +++ b/scripts/test/unit_cov.sh @@ -0,0 +1,6 @@ +pytest \ + -vv \ + --cov=ray_provider \ + --cov-report=term-missing \ + --cov-report=xml \ + --durations=0 diff --git a/scripts/test/unit_test.sh b/scripts/test/unit_test.sh index 2fd9dc4..4b6dec8 100644 --- a/scripts/test/unit_test.sh +++ b/scripts/test/unit_test.sh @@ -1,4 +1,4 @@ pytest \ -vv \ --durations=0 \ - -m "not (integration or perf)" \ No newline at end of file + -m "not (integration or perf)" diff --git a/tests/decorators/test_ray_decorators.py b/tests/decorators/test_ray_decorators.py index ddfd8a3..56d1c90 100644 --- a/tests/decorators/test_ray_decorators.py +++ b/tests/decorators/test_ray_decorators.py @@ -1,4 +1,3 @@ -import os from unittest.mock import MagicMock, patch import pytest @@ -29,13 +28,11 @@ def dummy_callable(): operator = _RayDecoratedOperator(task_id="test_task", config=config, python_callable=dummy_callable) - assert operator.host == "http://localhost:8265" assert operator.entrypoint == "python my_script.py" assert operator.runtime_env == {"pip": ["ray"]} assert operator.num_cpus == 2 assert operator.num_gpus == 1 assert operator.memory == "1G" - assert operator.node_group is None @patch.object(_RayDecoratedOperator, "get_python_source") @patch.object(SubmitRayJob, "execute") @@ -67,7 +64,7 @@ def dummy_callable(): pass operator = _RayDecoratedOperator(task_id="test_task", config=config, python_callable=dummy_callable) - assert operator.host == os.getenv("RAY_DASHBOARD_URL") + assert operator.entrypoint == "python my_script.py" def test_invalid_config_raises_exception(self): config = { diff --git a/tests/operators/test_ray_operators.py b/tests/operators/test_ray_operators.py index 2381393..89ca4a6 100644 --- a/tests/operators/test_ray_operators.py +++ b/tests/operators/test_ray_operators.py @@ -2,7 +2,6 @@ import pytest from airflow.exceptions import AirflowException, TaskDeferred -from ray.job_submission import JobStatus from ray_provider.operators.ray import SubmitRayJob @@ -21,7 +20,7 @@ @pytest.fixture def operator(): return SubmitRayJob( - host=host, + conn_id="test_conn", entrypoint=entrypoint, runtime_env=runtime_env, num_cpus=num_cpus, @@ -36,51 +35,34 @@ def operator(): class TestSubmitRayJob: def test_init(self, operator): - assert operator.host == host + assert operator.conn_id == "test_conn" assert operator.entrypoint == entrypoint assert operator.runtime_env == runtime_env assert operator.num_cpus == num_cpus assert operator.num_gpus == num_gpus assert operator.memory == memory - assert operator.resources == resources + # assert operator.resources == resources assert operator.timeout == timeout - assert operator.client is None - assert operator.job_id is None - assert operator.status_to_wait_for == {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED} - @patch("ray_provider.operators.kuberay.JobSubmissionClient") - def test_execute(self, mock_client_class, operator): - mock_client = MagicMock() - mock_client_class.return_value = mock_client - mock_client.submit_job.return_value = "job_12345" - mock_client.get_job_status.return_value = JobStatus.RUNNING - - try: + @patch("ray_provider.operators.ray.SubmitRayJob.hook") + def test_execute(self, mock_hook, operator): + with pytest.raises(TaskDeferred): operator.execute(context) - except TaskDeferred: - pass - - mock_client_class.assert_called_once_with(host) - mock_client.submit_job.assert_called_once_with( - entrypoint=entrypoint, - runtime_env=runtime_env, - entrypoint_num_cpus=num_cpus, - entrypoint_num_gpus=num_gpus, - entrypoint_memory=memory, - entrypoint_resources=resources, + mock_hook.submit_ray_job.assert_called_once_with( + entrypoint="python script.py", + runtime_env={"pip": ["requests"]}, + entrypoint_num_cpus=2, + entrypoint_num_gpus=1, + entrypoint_memory=1024, + entrypoint_resources={"CPU": 2}, ) - assert operator.job_id == "job_12345" - @patch("ray_provider.operators.kuberay.JobSubmissionClient") - def test_on_kill(self, mock_client_class, operator): - mock_client = MagicMock() - mock_client_class.return_value = mock_client - operator.client = mock_client + @patch("ray_provider.operators.ray.SubmitRayJob.hook") + def test_on_kill(self, mock_hook, operator): operator.job_id = "job_12345" operator.on_kill() - - mock_client.delete_job.assert_called_once_with("job_12345") + mock_hook.delete_ray_job.assert_called_once_with("job_12345") def test_execute_complete_success(self, operator): event = {"status": "success", "message": "Job completed successfully"} diff --git a/tests/triggers/test_ray_triggers.py b/tests/triggers/test_ray_triggers.py index e5365f8..c9e98aa 100644 --- a/tests/triggers/test_ray_triggers.py +++ b/tests/triggers/test_ray_triggers.py @@ -1,9 +1,8 @@ -import time -from unittest import mock +from unittest.mock import patch import pytest from airflow.triggers.base import TriggerEvent -from ray.dashboard.modules.job.sdk import JobStatus, JobSubmissionClient +from ray.dashboard.modules.job.sdk import JobStatus from ray_provider.triggers.ray import RayJobTrigger @@ -11,36 +10,30 @@ class TestRayJobTrigger: @pytest.mark.asyncio - async def test_run_no_job_id(self): - trigger = RayJobTrigger(job_id="", host="localhost", end_time=time.time() + 60, poll_interval=1) + @patch("ray_provider.triggers.ray.RayJobTrigger._is_terminal_state") + @patch("ray_provider.triggers.ray.RayJobTrigger.hook") + async def test_run_no_job_id(self, mock_hook, mock_is_terminal): + mock_is_terminal.return_value = True + trigger = RayJobTrigger(job_id="", poll_interval=1, conn_id="test", xcom_dashboard_url="test") generator = trigger.run() - event = await generator.send(None) - assert event == TriggerEvent( - {"status": "error", "message": "No job_id provided to async trigger", "job_id": ""} - ) + event = await generator.asend(None) + assert event == TriggerEvent({"status": "error", "message": "Job run has failed.", "job_id": ""}) @pytest.mark.asyncio - async def test_run_job_succeeded(self): - trigger = RayJobTrigger(job_id="test_job_id", host="localhost", end_time=time.time() + 60, poll_interval=1) - - client_mock = mock.MagicMock(spec=JobSubmissionClient) - client_mock.get_job_status.return_value = JobStatus.SUCCEEDED - - async def async_generator(): - yield "log line 1" - yield "log line 2" - - client_mock.tail_job_logs.return_value = async_generator() - - with mock.patch("ray_provider.triggers.kuberay.JobSubmissionClient", return_value=client_mock): - generator = trigger.run() - async for event in generator: - assert event == TriggerEvent( - { - "status": "success", - "message": "Job run test_job_id has completed successfully.", - "job_id": "test_job_id", - } - ) - break # Stop after the first event for testing purposes + @patch("ray_provider.triggers.ray.RayJobTrigger.hook") + async def test_run_job_succeeded(self, mock_hook): + trigger = RayJobTrigger(job_id="test_job_id", poll_interval=1, conn_id="test", xcom_dashboard_url="test") + + mock_hook.get_ray_job_status.return_value = JobStatus.SUCCEEDED + + generator = trigger.run() + async for event in generator: + assert event == TriggerEvent( + { + "status": "success", + "message": "Job run test_job_id has completed successfully.", + "job_id": "test_job_id", + } + ) + break # Stop after the first event for testing purposes