diff --git a/cosmos/operators/virtualenv.py b/cosmos/operators/virtualenv.py index c8b31f6af..ebc8bcf3b 100644 --- a/cosmos/operators/virtualenv.py +++ b/cosmos/operators/virtualenv.py @@ -28,8 +28,8 @@ ) if TYPE_CHECKING: - from airflow.utils.context import Context - + from airflow.utils.context import Context # pragma: no cover + from dbt.cli.main import dbtRunnerResult # pragma: no cover PY_INTERPRETER = "python3" LOCK_FILENAME = "cosmos_virtualenv.lock" @@ -77,49 +77,54 @@ def __init__( self.virtualenv_dir = virtualenv_dir self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary self.max_retries_lock = settings.virtualenv_max_retries_lock + self._py_bin: str | None = None super().__init__(**kwargs) if not self.py_requirements: self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter") def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult: - # No virtualenv_dir set, so create a temporary virtualenv - if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: - self.log.info("Creating temporary virtualenv") - with TemporaryDirectory(prefix="cosmos-venv") as tempdir: - self.virtualenv_dir = Path(tempdir) - py_bin = self._prepare_virtualenv() - dbt_bin = str(Path(py_bin).parent / "dbt") - command[0] = dbt_bin # type: ignore - subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command( - command=command, - env=env, - cwd=cwd, - output_encoding=self.output_encoding, - ) - return subprocess_result - - # Use a reusable virtualenv - self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") - while not self._is_lock_available() and self.max_retries_lock: - logger.info("Waiting for virtualenv lock to be released") - time.sleep(1) - self.max_retries_lock -= 1 - - self.log.info(f"Acquiring the virtualenv lock") - self._acquire_venv_lock() - py_bin = self._prepare_virtualenv() - dbt_bin = str(Path(py_bin).parent / "dbt") - command[0] = dbt_bin # type: ignore + self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd) + if self._py_bin is not None: + self.log.info(f"Using Python binary from virtualenv: {self._py_bin}") + command[0] = str(Path(self._py_bin).parent / "dbt") subprocess_result = self.subprocess_hook.run_command( command=command, env=env, cwd=cwd, output_encoding=self.output_encoding, ) - self.log.info("Releasing virtualenv lock") - self._release_venv_lock() + self.log.info(subprocess_result.output) return subprocess_result + def run_command( + self, + cmd: list[str], + env: dict[str, str | bytes | os.PathLike[Any]], + context: Context, + ) -> FullOutputSubprocessResult | dbtRunnerResult: + # No virtualenv_dir set, so create a temporary virtualenv + if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary: + self.log.info("Creating temporary virtualenv") + with TemporaryDirectory(prefix="cosmos-venv") as tempdir: + self.virtualenv_dir = Path(tempdir) + self._py_bin = self._prepare_virtualenv() + return super().run_command(cmd, env, context) + + try: + self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists") + while not self._is_lock_available() and self.max_retries_lock: + logger.info("Waiting for virtualenv lock to be released") + time.sleep(1) + self.max_retries_lock -= 1 + + self.log.info("Acquiring the virtualenv lock") + self._acquire_venv_lock() + self._py_bin = self._prepare_virtualenv() + return super().run_command(cmd, env, context) + finally: + self.log.info("Releasing virtualenv lock") + self._release_venv_lock() + def clean_dir_if_temporary(self) -> None: """ Delete the virtualenv directory if it is temporary. diff --git a/tests/operators/test_virtualenv.py b/tests/operators/test_virtualenv.py index 0a7128626..f0e7c1a22 100644 --- a/tests/operators/test_virtualenv.py +++ b/tests/operators/test_virtualenv.py @@ -77,9 +77,22 @@ def test_run_command_without_virtualenv_dir( assert dbt_deps["command"][0] == dbt_cmd["command"][0] assert dbt_deps["command"][1] == "deps" assert dbt_cmd["command"][1] == "do-something" - assert mock_execute.call_count == 4 - - + assert mock_execute.call_count == 2 + virtualenv_call, pip_install_call = mock_execute.call_args_list + assert "python" in virtualenv_call[0][0][0] + assert virtualenv_call[0][0][1] == "-m" + assert virtualenv_call[0][0][2] == "virtualenv" + assert "pip" in pip_install_call[0][0][0] + assert pip_install_call[0][0][1] == "install" + cosmos_venv_dirs = [ + f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv") + ] + assert len(cosmos_venv_dirs) == 0 + + +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available") +@patch("time.sleep") +@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") @patch("airflow.utils.python_virtualenv.execute_in_subprocess") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes") @patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql") @@ -93,7 +106,12 @@ def test_run_command_with_virtualenv_dir( mock_store_compiled_sql, mock_calculate_openlineage_events_completes, mock_execute, + mock_release_venv_lock, + mock_sleep, + mock_is_lock_available, + caplog, ): + mock_is_lock_available.side_effect = [False, False, True] mock_get_connection.return_value = Connection( conn_id="fake_conn", conn_type="postgres", @@ -124,6 +142,12 @@ def test_run_command_with_virtualenv_dir( dbt_cmd = run_command_args[1].kwargs assert dbt_deps["command"][0] == "mock-venv/bin/dbt" assert dbt_cmd["command"][0] == "mock-venv/bin/dbt" + assert caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert mock_sleep.call_count == 2 + assert mock_is_lock_available.call_count == 3 + assert mock_release_venv_lock.call_count == 1 + cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"] + assert len(cosmos_venv_dirs) == 1 def test_virtualenv_operator_append_env_is_true_by_default(): @@ -184,13 +208,7 @@ def test_on_kill(mock_clean_dir_if_temporary): @patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock") -@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True]) -def test_run_subprocess_waits_for_lock( - mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog -): +def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog): venv_operator = ConcreteDbtVirtualenvBaseOperator( profile_config=profile_config, project_dir="./dev/dags/dbt/jaffle_shop", @@ -199,7 +217,7 @@ def test_run_subprocess_waits_for_lock( virtualenv_dir=tmpdir, ) venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop") - assert caplog.text.count("Waiting for virtualenv lock to be released") == 2 + assert len(mock_subprocess_hook.run_command.call_args_list) == 1 @patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError)