From a5de8b4dc5184b46ca8358025a01b5107c747ffc Mon Sep 17 00:00:00 2001 From: anai-s <92357426+anai-s@users.noreply.github.com> Date: Mon, 11 Nov 2024 16:27:32 +0100 Subject: [PATCH] Add missing `DbtSourceGcpCloudRunJobOperator` in module `cosmos.operators.gcp_cloud_run_job` (#1290) This PR adds a missing attribute, `DbtSourceGcpCloudRunJobOperator`, in `cosmos.operators.gcp_cloud_run_job.py` to be able to use the source_rendering_behavior params in the `cosmos.config.RenderConfig` class with the new `GCP_CLOUD_RUN_JOB` execution mode (available in 1.7.0) . Closes: #1276 --------- Co-authored-by: anai-s --- cosmos/operators/gcp_cloud_run_job.py | 10 ++++++++++ tests/operators/test_gcp_cloud_run_job.py | 16 ++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py index 6b4ded49a..76570d56a 100644 --- a/cosmos/operators/gcp_cloud_run_job.py +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -15,6 +15,7 @@ DbtRunOperationMixin, DbtSeedMixin, DbtSnapshotMixin, + DbtSourceMixin, DbtTestMixin, ) @@ -135,6 +136,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) +class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core source freshness command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): """ Executes a dbt core run command. diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py index 1582456d6..08b7ba999 100644 --- a/tests/operators/test_gcp_cloud_run_job.py +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -16,6 +16,7 @@ DbtRunOperationGcpCloudRunJobOperator, DbtSeedGcpCloudRunJobOperator, DbtSnapshotGcpCloudRunJobOperator, + DbtSourceGcpCloudRunJobOperator, DbtTestGcpCloudRunJobOperator, ) @@ -171,12 +172,13 @@ def test_dbt_gcp_cloud_run_job_build_command(): "seed": DbtSeedGcpCloudRunJobOperator(**BASE_KWARGS), "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), + "source": DbtSourceGcpCloudRunJobOperator(**BASE_KWARGS), "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), } for command_name, command_operator in result_map.items(): command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) - if command_name != "run-operation": + if command_name not in ("run-operation", "source"): assert command_operator.command == [ "dbt", command_name, @@ -185,7 +187,7 @@ def test_dbt_gcp_cloud_run_job_build_command(): "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", ] - else: + elif command_name == "run-operation": assert command_operator.command == [ "dbt", command_name, @@ -195,6 +197,16 @@ def test_dbt_gcp_cloud_run_job_build_command(): "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", "--no-version-check", ] + else: + assert command_operator.command == [ + "dbt", + command_name, + "freshness", + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] @skip_on_empty_operator