Skip to content

Commit

Permalink
Fix implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 12, 2023
1 parent 2afc643 commit dcbde93
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
28 changes: 18 additions & 10 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]
DAG.bulk_write_to_db([self.dag], session=session)
session.commit()

def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage:
def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> OperatorLineage | None:
"""
Collect the input, output, job and run facets for this operator.
It relies on the calculate_openlineage_events_completes having being called before.
Expand All @@ -324,22 +324,30 @@ def get_openlineage_facets_on_complete(self, task_instance: TaskInstance) -> Ope
openlineage_events_completes = self.openlineage_events_completes
elif hasattr(task_instance, "openlineage_events_completes"):
openlineage_events_completes = task_instance.openlineage_events_completes
if not openlineage_events_completes:
logger.warning("Unable to emit OpenLineage events since no events were created.")
return None

if is_openlineage_available and openlineage_events_completes:
if is_openlineage_available:
for completed in openlineage_events_completes:
[inputs.append(input_) for input_ in completed.inputs if input_ not in inputs] # type: ignore
[outputs.append(output) for output in completed.outputs if output not in outputs] # type: ignore
run_facets = {**run_facets, **completed.run.facets}
job_facets = {**job_facets, **completed.job.facets}
else:
logger.warning("Unable to emit OpenLineage events since dependencies are not installed.")

return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets=run_facets,
job_facets=job_facets,
)
logger.warning("Unable to emit OpenLineage events since the necessary dependencies are not installed.")
return None

if inputs or outputs or run_facets or job_facets:
return OperatorLineage(
inputs=inputs,
outputs=outputs,
run_facets=run_facets,
job_facets=job_facets,
)
else:
logger.warning("Unable to emit OpenLineage events since the OperatorLineage is not available.")
return None

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> None:
dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags)
Expand Down
7 changes: 2 additions & 5 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,8 @@ def test_run_operator_emits_events_without_openlineage_events_completes(caplog):
)
delattr(dbt_base_operator, "openlineage_events_completes")
facets = dbt_base_operator.get_openlineage_facets_on_complete(dbt_base_operator)
assert facets.inputs == []
assert facets.outputs == []
assert facets.run_facets == {}
assert facets.job_facets == {}
log = "Unable to emit OpenLineage events since dependencies are not installed"
assert facets is None
log = "Unable to emit OpenLineage events since no events were created."
assert log in caplog.text


Expand Down

0 comments on commit dcbde93

Please sign in to comment.