Skip to content

Commit

Permalink
Change operators to log dbt commands output as opposed to recordi…
Browse files Browse the repository at this point in the history
…ng to XCom (#513)

All Cosmos operators were dumping unnecessary data to XCom.
Change the behavior to log the information as opposed to polluting XCom.

Closes: #304
  • Loading branch information
tatiana authored Sep 6, 2023
1 parent 6c3d673 commit c7a203a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 76 deletions.
26 changes: 9 additions & 17 deletions cosmos/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def __init__(
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}") # type: ignore[has-type]
return super().execute(context)
result = super().execute(context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand All @@ -52,6 +53,9 @@ def build_command(self, context: Context, cmd_flags: list[str] | None = None) ->
self.environment = {**env_vars, **self.environment} # type: ignore[has-type]
self.command = dbt_cmd

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)


class DbtLSDockerOperator(DbtDockerBaseOperator):
"""
Expand All @@ -64,9 +68,6 @@ def __init__(self, **kwargs: str) -> None:
super().__init__(**kwargs)
self.base_cmd = ["ls"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtSeedDockerOperator(DbtDockerBaseOperator):
"""
Expand All @@ -89,9 +90,9 @@ def add_cmd_flags(self) -> list[str]:

return flags

def execute(self, context: Context) -> Any:
def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
return self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)


class DbtSnapshotDockerOperator(DbtDockerBaseOperator):
Expand All @@ -106,9 +107,6 @@ def __init__(self, **kwargs: str) -> None:
super().__init__(**kwargs)
self.base_cmd = ["snapshot"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtRunDockerOperator(DbtDockerBaseOperator):
"""
Expand All @@ -122,9 +120,6 @@ def __init__(self, **kwargs: str) -> None:
super().__init__(**kwargs)
self.base_cmd = ["run"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtTestDockerOperator(DbtDockerBaseOperator):
"""
Expand All @@ -139,9 +134,6 @@ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwar
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtRunOperationDockerOperator(DbtDockerBaseOperator):
"""
Expand All @@ -168,6 +160,6 @@ def add_cmd_flags(self) -> list[str]:
flags.append(yaml.dump(self.args))
return flags

def execute(self, context: Context) -> Any:
def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
return self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
26 changes: 9 additions & 17 deletions cosmos/operators/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ def build_env_args(self, env: dict[str, str | bytes | PathLike[Any]]) -> None:
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_kube_args(context, cmd_flags)
self.log.info(f"Running command: {self.arguments}")
return super().execute(context)
result = super().execute(context)
logger.info(result)

def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
Expand All @@ -71,6 +72,9 @@ def build_kube_args(self, context: Context, cmd_flags: list[str] | None = None)
self.build_env_args(env_vars)
self.arguments = dbt_cmd

def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)


class DbtLSKubernetesOperator(DbtKubernetesBaseOperator):
"""
Expand All @@ -83,9 +87,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["ls"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtSeedKubernetesOperator(DbtKubernetesBaseOperator):
"""
Expand All @@ -108,9 +109,9 @@ def add_cmd_flags(self) -> list[str]:

return flags

def execute(self, context: Context) -> Any:
def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
return self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)


class DbtSnapshotKubernetesOperator(DbtKubernetesBaseOperator):
Expand All @@ -125,9 +126,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["snapshot"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtRunKubernetesOperator(DbtKubernetesBaseOperator):
"""
Expand All @@ -141,9 +139,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["run"]

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtTestKubernetesOperator(DbtKubernetesBaseOperator):
"""
Expand All @@ -158,9 +153,6 @@ def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwar
# as of now, on_warning_callback in kubernetes executor does nothing
self.on_warning_callback = on_warning_callback

def execute(self, context: Context) -> Any:
return self.build_and_run_cmd(context=context)


class DbtRunOperationKubernetesOperator(DbtKubernetesBaseOperator):
"""
Expand All @@ -187,6 +179,6 @@ def add_cmd_flags(self) -> list[str]:
flags.append(yaml.dump(self.args))
return flags

def execute(self, context: Context) -> Any:
def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
return self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
47 changes: 7 additions & 40 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
)
from cosmos.dbt.parser.output import (
extract_log_issues,
parse_output,
)

logger = get_logger(__name__)
Expand Down Expand Up @@ -320,13 +319,14 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
job_facets=job_facets,
)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> FullOutputSubprocessResult:
def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
dbt_cmd = dbt_cmd or []
return self.run_command(cmd=dbt_cmd, env=env, context=context)
result = self.run_command(cmd=dbt_cmd, env=env, context=context)
logger.info(result.output)

def execute(self, context: Context) -> str: # type: ignore[return]
self.build_and_run_cmd(context=context).output
def execute(self, context: Context) -> None:
self.build_and_run_cmd(context=context)

def on_kill(self) -> None:
if self.cancel_query_on_kill:
Expand All @@ -348,10 +348,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["ls"]

def execute(self, context: Context) -> str:
result = self.build_and_run_cmd(context=context)
return result.output


class DbtSeedLocalOperator(DbtLocalBaseOperator):
"""
Expand All @@ -374,10 +370,9 @@ def add_cmd_flags(self) -> list[str]:

return flags

def execute(self, context: Context) -> str:
def execute(self, context: Context) -> None:
cmd_flags = self.add_cmd_flags()
result = self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
return result.output
self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)


class DbtSnapshotLocalOperator(DbtLocalBaseOperator):
Expand All @@ -392,10 +387,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["snapshot"]

def execute(self, context: Context) -> str:
result = self.build_and_run_cmd(context=context)
return result.output


class DbtRunLocalOperator(DbtLocalBaseOperator):
"""
Expand All @@ -409,9 +400,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["run"]

def execute(self, context: Context) -> str: # type: ignore[return]
self.build_and_run_cmd(context=context)


class DbtTestLocalOperator(DbtLocalBaseOperator):
"""
Expand Down Expand Up @@ -462,18 +450,6 @@ def _handle_warnings(self, result: FullOutputSubprocessResult, context: Context)
if self.on_warning_callback:
self.on_warning_callback(warning_context)

def execute(self, context: Context) -> str:
result = self.build_and_run_cmd(context=context)

if not self._should_run_tests(result):
return result.output

warnings = parse_output(result, "WARN")
if warnings > 0:
self._handle_warnings(result, context)

return result.output


class DbtRunOperationLocalOperator(DbtLocalBaseOperator):
"""
Expand All @@ -500,11 +476,6 @@ def add_cmd_flags(self) -> list[str]:
flags.append(yaml.dump(self.args))
return flags

def execute(self, context: Context) -> str:
cmd_flags = self.add_cmd_flags()
result = self.build_and_run_cmd(context=context, cmd_flags=cmd_flags)
return result.output


class DbtDocsLocalOperator(DbtLocalBaseOperator):
"""
Expand All @@ -520,10 +491,6 @@ def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.base_cmd = ["docs", "generate"]

def execute(self, context: Context) -> str:
result = self.build_and_run_cmd(context=context)
return result.output


class DbtDocsS3LocalOperator(DbtDocsLocalOperator):
"""
Expand Down
4 changes: 2 additions & 2 deletions cosmos/operators/virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def run_subprocess(self, *args: Any, command: list[str], **kwargs: Any) -> FullO
subprocess_result: FullOutputSubprocessResult = self.subprocess_hook.run_command(command, *args, **kwargs)
return subprocess_result

def execute(self, context: Context) -> str:
def execute(self, context: Context) -> None:
output = super().execute(context)
if self._venv_tmp_dir:
self._venv_tmp_dir.cleanup()
return output
logger.info(output)


class DbtLSVirtualenvOperator(DbtVirtualenvBaseOperator, DbtLSLocalOperator):
Expand Down

0 comments on commit c7a203a

Please sign in to comment.