Skip to content

Commit

Permalink
Use Airflow task insdie dynamic workflow (#1912)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Oct 24, 2023
1 parent c7c5d13 commit f16ac49
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions plugins/flytekit-airflow/flytekitplugins/airflow/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import typing
from dataclasses import dataclass
from typing import Any, Dict, Optional, Type
Expand Down Expand Up @@ -51,9 +52,12 @@ def _flyte_operator(*args, **kwargs):
task instead.
"""
cls = args[0]
if FlyteContextManager.current_context().user_space_params.get_original_task:
# Return original task when running in the agent.
return object.__new__(cls)
try:
if FlyteContextManager.current_context().user_space_params.get_original_task:
# Return original task when running in the agent.
return object.__new__(cls)
except AssertionError:
logging.debug("failed to get the attribute GET_ORIGINAL_TASK from user space params")
config = AirflowConfig(task_module=cls.__module__, task_name=cls.__name__, task_config=kwargs)
t = AirflowTask(name=kwargs["task_id"], query_template="", task_config=config, original_new=cls.__new__)
return t()
Expand Down

0 comments on commit f16ac49

Please sign in to comment.