From f16ac4910043a56de235d8dc1383996b6ddd13ef Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 24 Oct 2023 15:22:19 -0700 Subject: [PATCH] Use Airflow task insdie dynamic workflow (#1912) Signed-off-by: Kevin Su --- .../flytekit-airflow/flytekitplugins/airflow/task.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/plugins/flytekit-airflow/flytekitplugins/airflow/task.py b/plugins/flytekit-airflow/flytekitplugins/airflow/task.py index 93df95b3bf..a25a46cf1e 100644 --- a/plugins/flytekit-airflow/flytekitplugins/airflow/task.py +++ b/plugins/flytekit-airflow/flytekitplugins/airflow/task.py @@ -1,3 +1,4 @@ +import logging import typing from dataclasses import dataclass from typing import Any, Dict, Optional, Type @@ -51,9 +52,12 @@ def _flyte_operator(*args, **kwargs): task instead. """ cls = args[0] - if FlyteContextManager.current_context().user_space_params.get_original_task: - # Return original task when running in the agent. - return object.__new__(cls) + try: + if FlyteContextManager.current_context().user_space_params.get_original_task: + # Return original task when running in the agent. + return object.__new__(cls) + except AssertionError: + logging.debug("failed to get the attribute GET_ORIGINAL_TASK from user space params") config = AirflowConfig(task_module=cls.__module__, task_name=cls.__name__, task_config=kwargs) t = AirflowTask(name=kwargs["task_id"], query_template="", task_config=config, original_new=cls.__new__) return t()