diff --git a/config/default.py b/config/default.py index 20386f5bb9..2493b0f245 100644 --- a/config/default.py +++ b/config/default.py @@ -89,6 +89,7 @@ "pipeline.contrib.periodic_task", "pipeline.contrib.external_plugins", "pipeline.contrib.engine_admin", + "pipeline.contrib.node_timer_event", "pipeline.django_signal_valve", "pipeline_plugins", "pipeline_plugins.components", @@ -406,6 +407,9 @@ def _(s): PIPELINE_DATA_BACKEND_AUTO_EXPIRE = True +# 配置节点计时器边界事件扫描间隔为 0.1,保证计时器事件及时执行 +PIPELINE_NODE_TIMER_EVENT_POOL_SCAN_INTERVAL = 0.1 + BAMBOO_PERIODIC_TASK_ROOT_PIPELINE_CONTEXT_PROVIER = "gcloud.taskflow3.context.root_pipeline_context_provider" BAMBOO_PERIODIC_TASK_SUBPROCESS_CONTEXT_PROVIER = "gcloud.taskflow3.context.subprocess_context_provider" diff --git a/gcloud/core/apis/drf/viewsets/taskflow.py b/gcloud/core/apis/drf/viewsets/taskflow.py index 58e9736a8d..9975fe42bc 100644 --- a/gcloud/core/apis/drf/viewsets/taskflow.py +++ b/gcloud/core/apis/drf/viewsets/taskflow.py @@ -22,6 +22,7 @@ from django.utils.translation import ugettext_lazy as _ from django_filters import FilterSet from drf_yasg.utils import swagger_auto_schema +from pipeline.contrib.node_timer_event.api import batch_create_node_timer_event_config from pipeline.eri.models import State from pipeline.exceptions import PipelineException from pipeline.models import PipelineInstance, Snapshot @@ -69,7 +70,8 @@ from gcloud.iam_auth.utils import get_common_flow_allowed_actions_for_user, get_flow_allowed_actions_for_user from gcloud.taskflow3.domains.auto_retry import AutoRetryNodeStrategyCreator from gcloud.taskflow3.domains.dispatchers import TaskCommandDispatcher -from gcloud.taskflow3.models import TaskConfig, TaskFlowInstance, TaskFlowRelation, TimeoutNodeConfig +from gcloud.taskflow3.models import TaskConfig, TaskFlowInstance, TaskFlowRelation +from gcloud.taskflow3.utils import convert_to_timer_events_config from gcloud.tasktmpl3.models import TaskTemplate from gcloud.utils import concurrent from gcloud.utils.strings import standardize_name, standardize_pipeline_node_name @@ -441,6 +443,11 @@ def perform_create(self, serializer): "func_claim" if serializer.validated_data["flow_type"] == "common_func" else "execute_task" ) serializer.validated_data["template_id"] = template.id + + # convert timeout_config to timer_events + convert_to_timer_events_config(pipeline_instance.execution_data) + pipeline_instance.execution_snapshot.save() + # create taskflow serializer.save() # crete auto retry strategy @@ -449,9 +456,7 @@ def perform_create(self, serializer): ) arn_creator.batch_create_strategy(pipeline_instance.execution_data) - # create timeout config - TimeoutNodeConfig.objects.batch_create_node_timeout_config( - taskflow_id=serializer.instance.id, + batch_create_node_timer_event_config( root_pipeline_id=pipeline_instance.instance_id, pipeline_tree=pipeline_instance.execution_data, ) diff --git a/gcloud/taskflow3/domains/node_timeout_strategy.py b/gcloud/taskflow3/domains/node_timeout_strategy.py index 00c29c6d53..3a7bf76325 100644 --- a/gcloud/taskflow3/domains/node_timeout_strategy.py +++ b/gcloud/taskflow3/domains/node_timeout_strategy.py @@ -10,8 +10,16 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ +import logging from abc import ABCMeta, abstractmethod +from pipeline.contrib.node_timer_event.handlers import BaseAction, register_action +from pipeline.core.data.base import DataObject + +from gcloud.taskflow3.models import TaskFlowInstance + +logger = logging.getLogger(__name__) + class NodeTimeoutStrategy(metaclass=ABCMeta): TIMEOUT_NODE_OPERATOR = "sops_system" @@ -35,6 +43,22 @@ def deal_with_timeout_node(self, task, node_id): return fail_result +@register_action("forced_fail") +class ForcedFailAction(BaseAction): + def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: + logger.info("[Action(forced_fail)] do: data -> %s, parent_data -> %s", data, parent_data) + task_inst = TaskFlowInstance.objects.get(pk=parent_data.get_one_of_inputs("task_id")) + task_inst.nodes_action("forced_fail", self.node_id, "sops_system") + return True + + +@register_action("forced_fail_and_skip") +class ForcedFailAndSkipAction(BaseAction): + def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: + logger.info("[Action(forced_fail_and_skip)] do: data -> %s, parent_data -> %s", data, parent_data) + return True + + node_timeout_handler = { "forced_fail": ForcedFailStrategy(), "forced_fail_and_skip": ForcedFailAndSkipStrategy(), diff --git a/gcloud/taskflow3/utils.py b/gcloud/taskflow3/utils.py index a65fd115fc..70e5dd44f8 100644 --- a/gcloud/taskflow3/utils.py +++ b/gcloud/taskflow3/utils.py @@ -12,12 +12,13 @@ """ import logging -from typing import Any, Dict, List, Optional import typing from collections import defaultdict +from typing import Any, Dict, List, Optional from django.apps import apps from django.utils.translation import ugettext_lazy as _ +from pipeline.contrib.node_timer_event.constants import TimerType from pipeline.core import constants as pipeline_constants from pipeline.engine import states as pipeline_states from pipeline.engine.utils import calculate_elapsed_time @@ -173,3 +174,29 @@ def parse_node_timeout_configs(pipeline_tree: dict) -> list: continue configs.append({"action": action, "node_id": act_id, "timeout": timeout_seconds}) return {"result": True, "data": configs, "message": ""} + + +def convert_to_timer_events_config(pipeline_tree: dict): + """将标准运维原来的节点超时配置,转为计时器边界事件,用于测试增强服务功能""" + for act_id, act in pipeline_tree[pipeline_constants.PE.activities].items(): + if act["type"] == pipeline_constants.PE.SubProcess: + convert_to_timer_events_config(act[pipeline_constants.PE.pipeline]) + + elif act["type"] == pipeline_constants.PE.ServiceActivity: + timeout_config = act.get("timeout_config", {}) + enable = timeout_config.get("enable") + if not enable: + continue + action = timeout_config.get("action") + timeout_seconds = timeout_config.get("seconds") + if action == "forced_fail_and_skip": + defined = f"R1000/PT{timeout_seconds}S" + timer_type = TimerType.TIME_CYCLE.value + else: + defined = f"PT{timeout_seconds}S" + timer_type = TimerType.TIME_DURATION.value + + act["events"] = {} + act["events"]["timer_events"] = [ + {"enable": True, "action": action, "timer_type": timer_type, "defined": defined} + ]