Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create single virtualenv when DbtVirtualenvBaseOperator has virtualenv_dir=None and is_virtualenv_dir_temporary=True #1200

Merged
merged 13 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

if TYPE_CHECKING:
from airflow.utils.context import Context

from dbt.cli.main import dbtRunnerResult

Check warning on line 32 in cosmos/operators/virtualenv.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/virtualenv.py#L32

Added line #L32 was not covered by tests

PY_INTERPRETER = "python3"
LOCK_FILENAME = "cosmos_virtualenv.lock"
Expand Down Expand Up @@ -77,49 +77,54 @@
self.virtualenv_dir = virtualenv_dir
self.is_virtualenv_dir_temporary = is_virtualenv_dir_temporary
self.max_retries_lock = settings.virtualenv_max_retries_lock
self._py_bin: str | None = None
super().__init__(**kwargs)
if not self.py_requirements:
self.log.error("Cosmos virtualenv operators require the `py_requirements` parameter")

def run_subprocess(self, command: list[str], env: dict[str, str], cwd: str) -> FullOutputSubprocessResult:
# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
return subprocess_result

# Use a reusable virtualenv
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

self.log.info(f"Acquiring the virtualenv lock")
self._acquire_venv_lock()
py_bin = self._prepare_virtualenv()
dbt_bin = str(Path(py_bin).parent / "dbt")
command[0] = dbt_bin # type: ignore
self.log.info("Trying to run the command:\n %s\nFrom %s", command, cwd)
if self._py_bin is not None:
self.log.info(f"Using Python binary from virtualenv: {self._py_bin}")
command[0] = str(Path(self._py_bin).parent / "dbt")
subprocess_result = self.subprocess_hook.run_command(
command=command,
env=env,
cwd=cwd,
output_encoding=self.output_encoding,
)
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()
self.log.info(subprocess_result.output)
return subprocess_result

def run_command(
self,
cmd: list[str],
env: dict[str, str | bytes | os.PathLike[Any]],
context: Context,
) -> FullOutputSubprocessResult | dbtRunnerResult:
tatiana marked this conversation as resolved.
Show resolved Hide resolved
# No virtualenv_dir set, so create a temporary virtualenv
if self.virtualenv_dir is None or self.is_virtualenv_dir_temporary:
self.log.info("Creating temporary virtualenv")
with TemporaryDirectory(prefix="cosmos-venv") as tempdir:
self.virtualenv_dir = Path(tempdir)
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)

try:
self.log.info(f"Checking if the virtualenv lock {str(self._lock_file)} exists")
while not self._is_lock_available() and self.max_retries_lock:
logger.info("Waiting for virtualenv lock to be released")
time.sleep(1)
self.max_retries_lock -= 1

Check warning on line 118 in cosmos/operators/virtualenv.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/virtualenv.py#L116-L118

Added lines #L116 - L118 were not covered by tests

self.log.info("Acquiring the virtualenv lock")
self._acquire_venv_lock()
self._py_bin = self._prepare_virtualenv()
return super().run_command(cmd, env, context)
finally:
self.log.info("Releasing virtualenv lock")
self._release_venv_lock()

def clean_dir_if_temporary(self) -> None:
"""
Delete the virtualenv directory if it is temporary.
Expand Down
29 changes: 20 additions & 9 deletions tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,20 @@ def test_run_command_without_virtualenv_dir(
assert dbt_deps["command"][0] == dbt_cmd["command"][0]
assert dbt_deps["command"][1] == "deps"
assert dbt_cmd["command"][1] == "do-something"
assert mock_execute.call_count == 4
assert mock_execute.call_count == 2
virtualenv_call, pip_install_call = mock_execute.call_args_list
assert "python" in virtualenv_call[0][0][0]
assert virtualenv_call[0][0][1] == "-m"
assert virtualenv_call[0][0][2] == "virtualenv"
assert "pip" in pip_install_call[0][0][0]
assert pip_install_call[0][0][1] == "install"
cosmos_venv_dirs = [
f for f in os.listdir("/tmp") if os.path.isdir(os.path.join("/tmp", f)) and f.startswith("cosmos-venv")
]
assert len(cosmos_venv_dirs) == 0


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("airflow.utils.python_virtualenv.execute_in_subprocess")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.calculate_openlineage_events_completes")
@patch("cosmos.operators.virtualenv.DbtLocalBaseOperator.store_compiled_sql")
Expand All @@ -93,6 +104,8 @@ def test_run_command_with_virtualenv_dir(
mock_store_compiled_sql,
mock_calculate_openlineage_events_completes,
mock_execute,
mock_release_venv_lock,
caplog,
):
mock_get_connection.return_value = Connection(
conn_id="fake_conn",
Expand Down Expand Up @@ -124,6 +137,10 @@ def test_run_command_with_virtualenv_dir(
dbt_cmd = run_command_args[1].kwargs
assert dbt_deps["command"][0] == "mock-venv/bin/dbt"
assert dbt_cmd["command"][0] == "mock-venv/bin/dbt"
caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert mock_release_venv_lock.call_count == 1
cosmos_venv_dirs = [f for f in os.listdir() if f == "mock-venv"]
assert len(cosmos_venv_dirs) == 1


def test_virtualenv_operator_append_env_is_true_by_default():
Expand Down Expand Up @@ -184,13 +201,7 @@ def test_on_kill(mock_clean_dir_if_temporary):


@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator.subprocess_hook")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._release_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._prepare_virtualenv")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._acquire_venv_lock")
@patch("cosmos.operators.virtualenv.DbtVirtualenvBaseOperator._is_lock_available", side_effect=[False, False, True])
def test_run_subprocess_waits_for_lock(
mock_is_lock_available, mock_acquire, mock_prepare, mock_release, mock_subprocess_hook, tmpdir, caplog
):
def test_run_subprocess(mock_subprocess_hook, tmpdir, caplog):
venv_operator = ConcreteDbtVirtualenvBaseOperator(
profile_config=profile_config,
project_dir="./dev/dags/dbt/jaffle_shop",
Expand All @@ -199,7 +210,7 @@ def test_run_subprocess_waits_for_lock(
virtualenv_dir=tmpdir,
)
venv_operator.run_subprocess(["dbt", "run"], {}, "./dev/dags/dbt/jaffle_shop")
assert caplog.text.count("Waiting for virtualenv lock to be released") == 2
assert len(mock_subprocess_hook.run_command.call_args_list) == 1


@patch("cosmos.operators.local.DbtLocalBaseOperator.execute", side_effect=ValueError)
Expand Down
Loading