From 34c171808c9e126f01cfd097808d93c869ce143e Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Mon, 11 Sep 2023 21:52:44 +0100 Subject: [PATCH] Fix openlineage event emition --- cosmos/operators/local.py | 38 ++++++++++++++++++----------------- tests/operators/test_local.py | 7 +++++-- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index a4965dc63..60aeb71e8 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -4,6 +4,7 @@ import shutil import signal import tempfile +from attr import define from pathlib import Path from typing import Any, Callable, Literal, Sequence, TYPE_CHECKING @@ -59,6 +60,13 @@ logger.exception(error) is_openlineage_available = False + @define + class OperatorLineage: # type: ignore + inputs: list[str] = list() + outputs: list[str] = list() + run_facets: dict[str, str] = dict() + job_facets: dict[str, str] = dict() + class DbtLocalBaseOperator(DbtBaseOperator): """ @@ -305,7 +313,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() - def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None: + def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage: """ Collect the input, output, job and run facets for this operator. It relies on the calculate_openlineage_events_completes having being called before. @@ -324,30 +332,24 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope openlineage_events_completes = self.openlineage_events_completes elif hasattr(task_instance, "openlineage_events_completes"): openlineage_events_completes = task_instance.openlineage_events_completes - if not openlineage_events_completes: - logger.warning("Unable to emit OpenLineage events since no events were created.") - return None + else: + logger.warning("Unable to emit OpenLineage events due to lack of data.") - if is_openlineage_available: + if openlineage_events_completes is not None: for completed in openlineage_events_completes: [inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore [outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore run_facets = {**run_facets, **completed.run.facets} job_facets = {**job_facets, **completed.job.facets} else: - logger.warning("Unable to emit OpenLineage events since the necessary dependencies are not installed.") - return None - - if inputs or outputs or run_facets or job_facets: - return OperatorLineage( - inputs=inputs, - outputs=outputs, - run_facets=run_facets, - job_facets=job_facets, - ) - else: - logger.warning("Unable to emit OpenLineage events since the OperatorLineage is not available.") - return None + logger.warning("Unable to emit OpenLineage events due to lack of dependencies or data.") + + return OperatorLineage( + inputs=inputs, + outputs=outputs, + run_facets=run_facets, + job_facets=job_facets, + ) def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index 74baa2491..4a94d4d8a 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -209,8 +209,11 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog): ) delattr(dbt_base_operator, "openlineage_events_completes") facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator) - assert facets is None - log = "Unable to emit OpenLineage events since no events were created." + assert facets.inputs == [] + assert facets.outputs == [] + assert facets.run_facets == {} + assert facets.job_facets == {} + log = "Unable to emit OpenLineage events due to lack of dependencies or data." assert log in caplog.text