Skip to content

Commit

Permalink
Add missing DbtSourceGcpCloudRunJobOperator in module `cosmos.opera…
Browse files Browse the repository at this point in the history
…tors.gcp_cloud_run_job` (astronomer#1290)

This PR adds a missing attribute, `DbtSourceGcpCloudRunJobOperator`, in
`cosmos.operators.gcp_cloud_run_job.py` to be able to use the
source_rendering_behavior params in the `cosmos.config.RenderConfig` class
with the new `GCP_CLOUD_RUN_JOB` execution mode (available in 1.7.0) .

Closes: astronomer#1276

---------

Co-authored-by: anai-s <[email protected]>
  • Loading branch information
anai-s and anai-s authored Nov 11, 2024
1 parent 1840c32 commit a5de8b4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
10 changes: 10 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtSourceMixin,
DbtTestMixin,
)

Expand Down Expand Up @@ -135,6 +136,15 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSourceGcpCloudRunJobOperator(DbtSourceMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core source freshness command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run command.
Expand Down
16 changes: 14 additions & 2 deletions tests/operators/test_gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DbtRunOperationGcpCloudRunJobOperator,
DbtSeedGcpCloudRunJobOperator,
DbtSnapshotGcpCloudRunJobOperator,
DbtSourceGcpCloudRunJobOperator,
DbtTestGcpCloudRunJobOperator,
)

Expand Down Expand Up @@ -171,12 +172,13 @@ def test_dbt_gcp_cloud_run_job_build_command():
"seed": DbtSeedGcpCloudRunJobOperator(**BASE_KWARGS),
"build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS),
"snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS),
"source": DbtSourceGcpCloudRunJobOperator(**BASE_KWARGS),
"run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS),
}

for command_name, command_operator in result_map.items():
command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock())
if command_name != "run-operation":
if command_name not in ("run-operation", "source"):
assert command_operator.command == [
"dbt",
command_name,
Expand All @@ -185,7 +187,7 @@ def test_dbt_gcp_cloud_run_job_build_command():
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
]
else:
elif command_name == "run-operation":
assert command_operator.command == [
"dbt",
command_name,
Expand All @@ -195,6 +197,16 @@ def test_dbt_gcp_cloud_run_job_build_command():
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
]
else:
assert command_operator.command == [
"dbt",
command_name,
"freshness",
"--vars",
"end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n"
"start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n",
"--no-version-check",
]


@skip_on_empty_operator
Expand Down

0 comments on commit a5de8b4

Please sign in to comment.