From be59d71c63d42eb1c954211b42e292250a764dc3 Mon Sep 17 00:00:00 2001 From: crayon <873217631@qq.com> Date: Thu, 9 Nov 2023 21:05:51 +0800 Subject: [PATCH] =?UTF-8?q?optimization:=20=E6=94=AF=E6=8C=81=E5=BC=80?= =?UTF-8?q?=E5=8F=91=E6=8E=A5=E5=85=A5=E4=BA=8B=E4=BB=B6=20&=20=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E8=8A=82=E7=82=B9=E8=AE=B0=E5=BD=95=E8=87=AA=E5=88=A0?= =?UTF-8?q?=E9=99=A4=20&=20Action=20=E4=BD=BF=E7=94=A8=20Metaclass=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../commands/start_node_timeout_process.py | 10 +- .../contrib/node_timer_event/adapter.py | 138 +++++++++++++++++ .../pipeline/contrib/node_timer_event/api.py | 7 +- .../contrib/node_timer_event/handlers.py | 145 +++++++++++++----- .../contrib/node_timer_event/models.py | 78 +--------- .../contrib/node_timer_event/settings.py | 30 ++++ .../node_timer_event/signals/handlers.py | 30 ++-- .../contrib/node_timer_event/tasks.py | 59 ++++--- .../contrib/node_timer_event/types.py | 8 + .../tests/contrib/test_node_timer_event.py | 4 +- 10 files changed, 347 insertions(+), 162 deletions(-) create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/adapter.py diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/management/commands/start_node_timeout_process.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/management/commands/start_node_timeout_process.py index 6ab1fdbb..3a5513a4 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/management/commands/start_node_timeout_process.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/management/commands/start_node_timeout_process.py @@ -12,17 +12,15 @@ """ import datetime import json +import logging import signal import time -import logging -from django.conf import settings from django.core.management import BaseCommand from django.db import connections - +from pipeline.contrib.node_timeout.models import TimeoutNodesRecord from pipeline.contrib.node_timeout.settings import node_timeout_settings from pipeline.contrib.node_timeout.tasks import dispatch_timeout_nodes -from pipeline.contrib.node_timeout.models import TimeoutNodesRecord logger = logging.getLogger("root") @@ -33,8 +31,8 @@ class Command(BaseCommand): def handle(self, *args, **options): signal.signal(signal.SIGTERM, self._graceful_exit) - redis_inst = settings.redis_inst - nodes_pool = settings.EXECUTING_NODE_POOL + redis_inst = node_timeout_settings.redis_inst + nodes_pool = node_timeout_settings.executing_pool while not self.has_killed: try: start = time.time() diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/adapter.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/adapter.py new file mode 100644 index 00000000..ca0d5683 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/adapter.py @@ -0,0 +1,138 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import abc +import datetime +import json +import logging +import re +from typing import Dict, List, Optional, Union + +from pipeline.contrib.node_timer_event.models import NodeTimerEventConfig +from pipeline.contrib.node_timer_event.settings import node_timer_event_settings +from pipeline.contrib.node_timer_event.types import TimerEvent, TimerEvents +from pipeline.contrib.node_timer_event.utils import parse_timer_defined + +logger = logging.getLogger(__name__) + +EVENT_KEY_PATTERN = re.compile(r".*node:(?P.+):version:(?P.+):index:(?P\d+)") + + +class NodeTimerEventBaseAdapter(abc.ABC): + + node_id: str = None + version: str = None + root_pipeline_id: Optional[str] = None + events: Optional[TimerEvents] = None + index__event_map: Optional[Dict[int, TimerEvent]] = None + + def __init__(self, node_id: str, version: str): + self.node_id: str = node_id + self.version: str = version + + def is_ready(self) -> bool: + """适配器是否就绪""" + if not self.events: + return False + return True + + def fetch_keys_to_be_rem(self) -> List[str]: + """ + 获取需要被移除的事件 Key + :return: + """ + return [self.get_event_key(event) for event in self.events] + + def get_event_key(self, event: TimerEvent) -> str: + """ + 获取事件 Key + :param event: + :return: + """ + + # zset 没有按字符串匹配模式批量删除 key 的支持,使用 key 的命名采用已检索的信息进行拼接 + # 之前想把 loop 也维护进去,发觉移除操作非常麻烦,故采用 incr 的方式,单独维护每个事件事件的触发次数 + key_prefix: str = f"{node_timer_event_settings.key_prefix}:node:{self.node_id}:version:{self.version}" + return f"{key_prefix}:index:{event['index']}" + + @classmethod + def get_next_expired_time(cls, event: TimerEvent, start: Optional[datetime.datetime] = None) -> float: + """ + 获取时间事件下一次过期时间 + :param event: 事件详情 + :param start: 开始时间,默认取 datetime.now() + :return: + """ + return parse_timer_defined(event["timer_type"], event["defined"], start=start or datetime.datetime.now())[ + "timestamp" + ] + + def add_to_pool(self, redis_inst, event: TimerEvent): + + key: str = self.get_event_key(event) + + expired_time: float = self.get_next_expired_time(event) + + # TODO 考虑 incr & zadd 合并,使用 lua 封装成原子操作 + loop: int = int(redis_inst.incr(key, 1)) + redis_inst.expire(key, node_timer_event_settings.max_expire_time) + if loop > event["repetitions"]: + logger.info( + "[add_to_pool] No need to add: node -> %s, version -> %s, loop -> %s, event -> %s", + self.node_id, + self.version, + loop, + event, + ) + return + + redis_inst.zadd(node_timer_event_settings.executing_pool, mapping={key: expired_time}, nx=True) + + logger.info( + "[add_to_pool] add event to redis: " + "node_id -> %s, version -> %s, event -> %s, key -> %s, expired_time -> %s", + self.node_id, + self.version, + event, + key, + expired_time, + ) + + @classmethod + def parse_event_key(cls, key: str) -> Dict[str, Union[str, int]]: + match = EVENT_KEY_PATTERN.match(key) + if match: + key_info: Dict[str, Union[str, int]] = match.groupdict() + # to int + key_info["index"] = int(key_info["index"]) + + return key_info + + else: + raise ValueError(f"invalid key -> {key}") + + +class NodeTimerEventAdapter(NodeTimerEventBaseAdapter): + def __init__(self, node_id: str, version: str): + super().__init__(node_id, version) + + node_timer_event_config: NodeTimerEventConfig = NodeTimerEventConfig.objects.filter( + node_id=self.node_id + ).first() + + if not node_timer_event_config: + return + + self.root_pipeline_id: str = node_timer_event_config.root_pipeline_id + self.events: TimerEvents = json.loads(node_timer_event_config.events) + self.index__event_map: Dict[int, TimerEvent] = {event["index"]: event for event in self.events} diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/api.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/api.py index b497437e..26ce1e01 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/api.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/api.py @@ -14,9 +14,11 @@ from typing import Any, Dict, List from pipeline.contrib.node_timer_event.models import NodeTimerEventConfig +from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result from pipeline.core.constants import PE +@ensure_return_pipeline_contrib_api_result def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[str, List[Dict[str, Any]]]): """ 在 pipeline_tree 中应用节点计时器边界事件配置 @@ -29,9 +31,7 @@ def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[ if act["type"] == PE.SubProcess: apply_node_timer_event_configs(act[PE.pipeline], configs) elif act["type"] == PE.ServiceActivity and act_id in configs: - if "events" not in act: - act["events"] = {} - act["events"]["timer_events"] = [ + act.setdefault("events", {})["timer_events"] = [ { "enable": config["enable"], "timer_type": config["timer_type"], @@ -43,6 +43,7 @@ def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[ return new_pipeline_tree +@ensure_return_pipeline_contrib_api_result def batch_create_node_timer_event_config(root_pipeline_id: str, pipeline_tree: Dict[str, Any]): """ 批量创建节点时间事件配置 diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py index b8520af2..1725b686 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py @@ -12,27 +12,81 @@ """ import logging from abc import abstractmethod -from typing import Any, Dict, Tuple, Type +from typing import Any, Dict, Tuple from pipeline.core.data.base import DataObject from pipeline.eri.runtime import BambooDjangoRuntime +from bamboo_engine import api as bamboo_engine_api from bamboo_engine.eri import ExecutionData logger = logging.getLogger(__name__) -def register_action(action_name: str): - """注册 Action""" +class ActionManager: + __hub = {} - def register(cls): - ActionFactory.add_action(action_name, cls) - return cls + @classmethod + def register_invocation_cls(cls, invocation_cls): + action_name = invocation_cls.Meta.action_name + existed_invocation_cls = cls.__hub.get(action_name) + if existed_invocation_cls: + raise RuntimeError( + "func register error, {}'s action_name {} conflict with {}".format( + existed_invocation_cls, action_name, invocation_cls + ) + ) + + cls.__hub[action_name] = invocation_cls - return register + @classmethod + def clear(cls): + cls.__hub = {} + + @classmethod + def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_name: str) -> "BaseAction": + """ + 获取 Action 实例 + :param root_pipeline_id: 根节点 ID + :param node_id: 节点 ID + :param version: 节点版本 + :param action_name: Action 名称 + :return: + """ + if action_name not in cls.__hub: + raise ValueError("{} not found".format(action_name)) + return cls.__hub[action_name](root_pipeline_id, node_id, version) -class BaseAction: +class ActionMeta(type): + """ + Metaclass for FEEL function invocation + """ + + def __new__(cls, name, bases, dct): + # ensure initialization is only performed for subclasses of Plugin + parents = [b for b in bases if isinstance(b, ActionMeta)] + if not parents: + return super().__new__(cls, name, bases, dct) + + new_cls = super().__new__(cls, name, bases, dct) + + # meta validation + meta_obj = getattr(new_cls, "Meta", None) + if not meta_obj: + raise AttributeError("Meta class is required") + + action_name = getattr(meta_obj, "action_name", None) + if not action_name: + raise AttributeError("action_name is required in Meta") + + # register func + ActionManager.register_invocation_cls(new_cls) + + return new_cls + + +class BaseAction(metaclass=ActionMeta): def __init__(self, root_pipeline_id: str, node_id: str, version: str): self.root_pipeline_id = root_pipeline_id self.node_id = node_id @@ -60,42 +114,53 @@ def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool raise NotImplementedError -class ActionFactory: - """ - 节点处理器工厂 - """ +class ExampleAction(BaseAction): + def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: + logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data) + return True - _actions: Dict[str, Type["BaseAction"]] = {} + class Meta: + action_name = "example" - @classmethod - def add_action(cls, action_name: str, action_cls: Type["BaseAction"]): - """ - 向工厂中注册 Action - :param action_cls: Action 类 - :param action_name: Action 名称 - """ - if not issubclass(action_cls, BaseAction): - raise TypeError("register action err: {} is not subclass of {}".format(action_cls, "BaseAction")) - cls._actions[action_name] = action_cls +class ForcedFailAction(BaseAction): - @classmethod - def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_name: str) -> BaseAction: - """ - 获取 Action 实例 - :param root_pipeline_id: 根节点 ID - :param node_id: 节点 ID - :param version: 节点版本 - :param action_name: Action 名称 - :return: - """ - if action_name not in cls._actions: - raise TypeError("{} not found".format(action_name)) - return cls._actions[action_name](root_pipeline_id, node_id, version) + TIMEOUT_NODE_OPERATOR = "bamboo_engine" + def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: + logger.info("[Action(bamboo_engine_forced_fail)] do: data -> %s, parent_data -> %s", data, parent_data) + result = bamboo_engine_api.forced_fail_activity( + runtime=BambooDjangoRuntime(), + node_id=self.node_id, + ex_data="forced fail by {}".format(self.TIMEOUT_NODE_OPERATOR), + send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True), + ) + return result.result + + class Meta: + action_name = "bamboo_engine_forced_fail" + + +class ForcedFailAndSkipAction(BaseAction): + + TIMEOUT_NODE_OPERATOR = "bamboo_engine" -@register_action("example") -class ExampleAction(BaseAction): def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: - logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data) - return True + logger.info("[Action(bamboo_engine_forced_fail_and_skip)] do: data -> %s, parent_data -> %s", data, parent_data) + result = bamboo_engine_api.forced_fail_activity( + runtime=BambooDjangoRuntime(), + node_id=self.node_id, + ex_data="forced fail by {}".format(self.TIMEOUT_NODE_OPERATOR), + send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True), + ) + if result.result: + result = bamboo_engine_api.skip_node( + runtime=BambooDjangoRuntime(), + node_id=self.node_id, + ex_data="forced skip by {}".format(self.TIMEOUT_NODE_OPERATOR), + send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True), + ) + return result.result + + class Meta: + action_name = "bamboo_engine_forced_fail_and_skip" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py index b0f2d4cf..0075ee9f 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py @@ -10,16 +10,14 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -import datetime import json import logging import re import typing -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional from django.db import models from django.utils.translation import ugettext_lazy as _ -from pipeline.contrib.node_timer_event.settings import node_timer_event_settings from pipeline.contrib.node_timer_event.types import TimeDefined from pipeline.contrib.node_timer_event.utils import parse_timer_defined from pipeline.core.constants import PE @@ -94,7 +92,7 @@ def batch_create_node_timer_event_config(self, root_pipeline_id: str, pipeline_t ) return config_parse_result - configs: List[Dict[str, Any]] = config_parse_result["data"] or [] + configs: List[Dict[str, Any]] = config_parse_result["data"] config_objs: typing.List[NodeTimerEventConfig] = [ NodeTimerEventConfig( root_pipeline_id=root_pipeline_id, node_id=config["node_id"], events=json.dumps(config["events"]) @@ -120,78 +118,6 @@ class Meta: def get_events(self) -> List[Dict[str, Any]]: return json.loads(self.events) - def get_index__event_map(self) -> Dict[int, Dict[str, Any]]: - return {event["index"]: event for event in self.get_events()} - - @classmethod - def get_event_key(cls, node_id: str, version: str, event: Dict[str, Any]) -> str: - """ - 获取事件 Key - :param node_id: 节点 ID - :param version: State 版本 - :param event: 事件详情 - :return: - """ - # zset 没有按字符串匹配模式批量删除 key 的支持,使用 key 的命名采用已检索的信息进行拼接 - # 之前想把 loop 也维护进去,发觉移除操作非常麻烦,故采用 incr 的方式,单独维护每个事件事件的触发次数 - key_prefix: str = f"{node_timer_event_settings.key_prefix}:node:{node_id}:version:{version}" - return f"{key_prefix}:index:{event['index']}" - - @classmethod - def get_next_expired_time(cls, event: Dict[str, Any], start: Optional[datetime.datetime] = None) -> float: - """ - 获取时间事件下一次过期时间 - :param event: 事件详情 - :param start: 开始时间,默认取 datetime.now() - :return: - """ - return parse_timer_defined(event["timer_type"], event["defined"], start=start or datetime.datetime.now())[ - "timestamp" - ] - - @classmethod - def add_to_pool(cls, redis_inst, node_id: str, version: str, event: Dict[str, Any]): - key: str = cls.get_event_key(node_id, version, event) - expired_time: float = cls.get_next_expired_time(event) - - # TODO 考虑 incr & zadd 合并,使用 lua 封装成原子操作 - loop: int = int(redis_inst.incr(key, 1)) - redis_inst.expire(key, node_timer_event_settings.max_expire_time) - if loop > event["repetitions"]: - logger.info( - "[add_to_pool] No need to add: node -> %s, version -> %s, loop -> %s, event -> %s", - node_id, - version, - loop, - event, - ) - return - - redis_inst.zadd(node_timer_event_settings.executing_pool, mapping={key: expired_time}, nx=True) - - logger.info( - "[add_to_pool] add event to redis: " - "node_id -> %s, version -> %s, event -> %s, key -> %s, expired_time -> %s", - node_id, - version, - event, - key, - expired_time, - ) - - @classmethod - def parse_event_key(cls, key: str) -> Dict[str, Union[str, int]]: - match = EVENT_KEY_PATTERN.match(key) - if match: - key_info: Dict[str, Union[str, int]] = match.groupdict() - # to int - key_info["index"] = int(key_info["index"]) - - return key_info - - else: - raise ValueError(f"invalid key -> {key}") - class ExpiredNodesRecord(models.Model): id = models.BigAutoField(verbose_name="ID", primary_key=True) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py index 1c35cfbc..4eac23ff 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py @@ -11,7 +11,32 @@ specific language governing permissions and limitations under the License. """ +from importlib import import_module + from django.conf import settings +from pipeline.contrib.node_timer_event import types + + +def get_import_path(cls: types.T) -> str: + return f"{cls.__module__}.{cls.__name__}" + + +def import_string(dotted_path: str): + """ + Import a dotted module path and return the attribute/class designated by the + last name in the path. Raise ImportError if the import failed. + """ + try: + module_path, class_name = dotted_path.rsplit(".", 1) + except ValueError as err: + raise ImportError(f"{dotted_path} doesn't look like a module path") from err + + module = import_module(module_path) + + try: + return getattr(module, class_name) + except AttributeError as err: + raise ImportError(f'Module "{module_path}" does not define a "{class_name}" attribute/class') from err class NodeTimerEventSettngs: @@ -30,11 +55,16 @@ class NodeTimerEventSettngs: "pool_scan_interval": 1, # 最长过期时间,兜底删除 Redis 冗余数据,默认为 15 Days,请根据业务场景调整 "max_expire_time": 60 * 60 * 24 * 15, + # 边界事件处理适配器,默认为 `pipeline.contrib.node_timer_event.adapter.NodeTimerEventAdapter` + "adapter_class": "pipeline.contrib.node_timer_event.adapter.NodeTimerEventAdapter", } def __getattr__(self, item: str): if item == "redis_inst": return settings.redis_inst + if item == "adapter_class": + return import_string(getattr(settings, f"{self.PREFIX}_{item.upper()}", self.DEFAULT_SETTINGS.get(item))) + return getattr(settings, f"{self.PREFIX}_{item.upper()}", self.DEFAULT_SETTINGS.get(item)) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/signals/handlers.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/signals/handlers.py index eb117d91..4b3c4d56 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/signals/handlers.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/signals/handlers.py @@ -11,10 +11,10 @@ specific language governing permissions and limitations under the License. """ import logging -from typing import Any, Dict, List, Optional +from typing import List, Optional, Type from django.dispatch import receiver -from pipeline.contrib.node_timer_event.models import NodeTimerEventConfig +from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter from pipeline.contrib.node_timer_event.settings import node_timer_event_settings from pipeline.eri.signals import post_set_state @@ -25,8 +25,7 @@ def _node_timer_event_info_update(redis_inst, to_state: str, node_id: str, version: str): - events: Optional[List[Dict[str, Any]]] = None - node_timer_event_config: Optional[NodeTimerEventConfig] = None + adapter: Optional[NodeTimerEventBaseAdapter] = None if to_state in [ bamboo_engine_states.RUNNING, @@ -34,10 +33,11 @@ def _node_timer_event_info_update(redis_inst, to_state: str, node_id: str, versi bamboo_engine_states.FINISHED, bamboo_engine_states.SUSPENDED, ]: - node_timer_event_config: Optional[NodeTimerEventConfig] = NodeTimerEventConfig.objects.filter( - node_id=node_id - ).first() - if not node_timer_event_config: + + adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class + adapter: NodeTimerEventBaseAdapter = adapter_class(node_id=node_id, version=version) + + if not adapter.is_ready(): logger.info( "[node_timer_event_info_update] node_timer_event_config not exist and skipped: " "node_id -> %s, version -> %s", @@ -45,29 +45,29 @@ def _node_timer_event_info_update(redis_inst, to_state: str, node_id: str, versi version, ) return - events = node_timer_event_config.get_events() + logger.info( "[node_timer_event_info_update] load node_timer_event_config: node_id -> %s, version -> %s, events -> %s", node_id, version, - events, + adapter.events, ) if to_state == bamboo_engine_states.RUNNING: # 遍历节点时间事件,丢进待调度节点池 - for event in events: - node_timer_event_config.add_to_pool(redis_inst, node_id, version, event) + for event in adapter.events: + adapter.add_to_pool(redis_inst, event) elif to_state in [bamboo_engine_states.FAILED, bamboo_engine_states.FINISHED, bamboo_engine_states.SUSPENDED]: - keys: List[str] = [node_timer_event_config.get_event_key(node_id, version, event) for event in events] + keys: List[str] = adapter.fetch_keys_to_be_rem() redis_inst.zrem(node_timer_event_settings.executing_pool, *keys) redis_inst.delete(*keys) logger.info( - "[node_timer_event_info_update] remove events from redis: " + "[node_timer_event_info_update] removed events from redis: " "node_id -> %s, version -> %s, events -> %s, keys -> %s", node_id, version, - events, + adapter.events, keys, ) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py index c4160467..44970a5a 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py @@ -12,14 +12,12 @@ """ import json import logging -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Type, Union from celery import task -from pipeline.contrib.node_timer_event.handlers import ActionFactory -from pipeline.contrib.node_timer_event.models import ( - ExpiredNodesRecord, - NodeTimerEventConfig, -) +from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter +from pipeline.contrib.node_timer_event.handlers import ActionManager +from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord from pipeline.contrib.node_timer_event.settings import node_timer_event_settings from pipeline.eri.models import Process, State @@ -31,11 +29,24 @@ def dispatch_expired_nodes(record_id: int): record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id) node_keys: List[str] = json.loads(record.nodes) logger.info("[dispatch_expired_nodes] record -> %s, nodes -> %s", record_id, node_keys) + + adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class + for node_key in node_keys: - key_info: Dict[str, Union[str, int]] = NodeTimerEventConfig.parse_event_key(node_key) + try: + key_info: Dict[str, Union[str, int]] = adapter_class.parse_event_key(node_key) + except ValueError: + logger.warning( + "[dispatch_expired_nodes] failed to parse key, skipped: record -> %s, node_key -> %s", + record_id, + node_key, + ) + continue + index: int = key_info["index"] node_id: str = key_info["node_id"] version: str = key_info["version"] + if node_timer_event_settings.handle_queue is None: execute_node_timer_event_action.apply_async(kwargs={"node_id": node_id, "version": version, "index": index}) else: @@ -45,49 +56,55 @@ def dispatch_expired_nodes(record_id: int): routing_key=node_timer_event_settings.handle_queue, ) + logger.info("[dispatch_expired_nodes] dispatch finished: record -> %s, nodes -> %s", record_id, node_keys) + # 删除临时记录 + record.delete() + logger.info("[dispatch_expired_nodes] record deleted: record -> %s", record_id) + @task(ignore_result=True) def execute_node_timer_event_action(node_id: str, version: str, index: int): - node_timer_event_config: NodeTimerEventConfig = NodeTimerEventConfig.objects.filter(node_id=node_id).first() - - if node_timer_event_config is None or index not in node_timer_event_config.get_index__event_map(): + adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class + adapter: NodeTimerEventBaseAdapter = adapter_class(node_id=node_id, version=version) + if not adapter.is_ready() or (adapter.index__event_map and index not in adapter.index__event_map): message: str = ( f"[execute_node_timer_event_action] no timer config: " f"node_id -> {node_id}, version -> {version}, index -> {index}" ) - logger.info(message) + logger.exception(message) return {"result": False, "message": message, "data": None} - event: Dict[str, Any] = node_timer_event_config.get_index__event_map()[index] - root_pipeline_id: str = node_timer_event_config.root_pipeline_id + event: Dict[str, Any] = adapter.index__event_map[index] # 判断当前节点是否符合策略执行要求 is_process_current_node: bool = Process.objects.filter( - root_pipeline_id=root_pipeline_id, current_node_id=node_id + root_pipeline_id=adapter.root_pipeline_id, current_node_id=node_id ).exists() is_node_match = State.objects.filter(node_id=node_id, version=version).exists() if not (is_node_match and is_process_current_node): message = ( f"[execute_node_timer_event_action] node {node_id} with version {version} " - f"in pipeline {root_pipeline_id} has been passed." + f"in pipeline {adapter.root_pipeline_id} has been passed." ) logger.error(message) return {"result": False, "message": message, "data": None} # 计算事件下一次触发事件并丢进待调度节点池 - NodeTimerEventConfig.add_to_pool(node_timer_event_settings.redis_inst, node_id, version, event) + adapter.add_to_pool(node_timer_event_settings.redis_inst, event) try: - is_success: bool = ActionFactory.get_action(root_pipeline_id, node_id, version, event["action"]).notify() + is_success: bool = ActionManager.get_action( + adapter.root_pipeline_id, node_id, version, event["action"] + ).notify() logger.info( - f"[execute_node_timer_event_action] node {node_id} with version {version} in pipeline {root_pipeline_id} " - f"action result is: {is_success}." + f"[execute_node_timer_event_action] node {node_id} with version {version} in pipeline " + f"{adapter.root_pipeline_id} action result is: {is_success}." ) return {"result": is_success, "data": None} except Exception as e: logger.exception( - f"[execute_node_timer_event_action] node {node_id} with version {version} in pipeline {root_pipeline_id} " - f"error is: {e}." + f"[execute_node_timer_event_action] node {node_id} with version {version} in pipeline " + f"{adapter.root_pipeline_id} error is: {e}." ) return {"result": False, "data": None, "message": str(e)} diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/types.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/types.py index 5621b084..092524ff 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/types.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/types.py @@ -14,4 +14,12 @@ import typing +T = typing.TypeVar("T") + TimeDefined = typing.Dict[str, typing.Any] + + +TimerEvent = typing.Dict[str, typing.Any] + + +TimerEvents = typing.List[TimerEvent] diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_node_timer_event.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_node_timer_event.py index a7d5a45b..93cc0f4f 100644 --- a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_node_timer_event.py +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_node_timer_event.py @@ -162,6 +162,8 @@ def test_dispatch_expired_nodes(self): ] ) + self.assertFalse(ExpiredNodesRecord.objects.filter(id=1).exists()) + def execute_node_timeout_action_success_test_helper(self, index: int): NodeTimerEventConfig.objects.create( root_pipeline_id=self.root_pipeline_id, node_id=self.node_id, events=json.dumps(self.timer_events) @@ -175,7 +177,7 @@ def execute_node_timeout_action_success_test_helper(self, index: int): key: str = f"bamboo:v1:node_timer_event:node:{self.node_id}:version:{self.version}:index:{index}" with patch("pipeline.contrib.node_timer_event.handlers.BambooDjangoRuntime", self.mock_runtime): - with patch("pipeline.contrib.node_timer_event.models.node_timer_event_settings.redis_inst", redis_inst): + with patch("pipeline.contrib.node_timer_event.adapter.node_timer_event_settings.redis_inst", redis_inst): result = execute_node_timer_event_action(self.node_id, self.version, index=index) self.assertEqual(result["result"], True) self.runtime.get_execution_data.assert_called_once_with(self.node_id)