Skip to content

Commit

Permalink
optimization: 支持开发接入事件 & 超时节点记录自删除 & Action 使用 Metaclass 优化定义
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Nov 9, 2023
1 parent 8129892 commit be59d71
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,15 @@
"""
import datetime
import json
import logging
import signal
import time
import logging

from django.conf import settings
from django.core.management import BaseCommand
from django.db import connections

from pipeline.contrib.node_timeout.models import TimeoutNodesRecord
from pipeline.contrib.node_timeout.settings import node_timeout_settings
from pipeline.contrib.node_timeout.tasks import dispatch_timeout_nodes
from pipeline.contrib.node_timeout.models import TimeoutNodesRecord

logger = logging.getLogger("root")

Expand All @@ -33,8 +31,8 @@ class Command(BaseCommand):

def handle(self, *args, **options):
signal.signal(signal.SIGTERM, self._graceful_exit)
redis_inst = settings.redis_inst
nodes_pool = settings.EXECUTING_NODE_POOL
redis_inst = node_timeout_settings.redis_inst
nodes_pool = node_timeout_settings.executing_pool
while not self.has_killed:
try:
start = time.time()
Expand Down
138 changes: 138 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import abc
import datetime
import json
import logging
import re
from typing import Dict, List, Optional, Union

from pipeline.contrib.node_timer_event.models import NodeTimerEventConfig
from pipeline.contrib.node_timer_event.settings import node_timer_event_settings
from pipeline.contrib.node_timer_event.types import TimerEvent, TimerEvents
from pipeline.contrib.node_timer_event.utils import parse_timer_defined

logger = logging.getLogger(__name__)

EVENT_KEY_PATTERN = re.compile(r".*node:(?P<node_id>.+):version:(?P<version>.+):index:(?P<index>\d+)")


class NodeTimerEventBaseAdapter(abc.ABC):

node_id: str = None
version: str = None
root_pipeline_id: Optional[str] = None
events: Optional[TimerEvents] = None
index__event_map: Optional[Dict[int, TimerEvent]] = None

def __init__(self, node_id: str, version: str):
self.node_id: str = node_id
self.version: str = version

def is_ready(self) -> bool:
"""适配器是否就绪"""
if not self.events:
return False
return True

def fetch_keys_to_be_rem(self) -> List[str]:
"""
获取需要被移除的事件 Key
:return:
"""
return [self.get_event_key(event) for event in self.events]

def get_event_key(self, event: TimerEvent) -> str:
"""
获取事件 Key
:param event:
:return:
"""

# zset 没有按字符串匹配模式批量删除 key 的支持,使用 key 的命名采用已检索的信息进行拼接
# 之前想把 loop 也维护进去,发觉移除操作非常麻烦,故采用 incr 的方式,单独维护每个事件事件的触发次数
key_prefix: str = f"{node_timer_event_settings.key_prefix}:node:{self.node_id}:version:{self.version}"
return f"{key_prefix}:index:{event['index']}"

@classmethod
def get_next_expired_time(cls, event: TimerEvent, start: Optional[datetime.datetime] = None) -> float:
"""
获取时间事件下一次过期时间
:param event: 事件详情
:param start: 开始时间,默认取 datetime.now()
:return:
"""
return parse_timer_defined(event["timer_type"], event["defined"], start=start or datetime.datetime.now())[
"timestamp"
]

def add_to_pool(self, redis_inst, event: TimerEvent):

key: str = self.get_event_key(event)

expired_time: float = self.get_next_expired_time(event)

# TODO 考虑 incr & zadd 合并,使用 lua 封装成原子操作
loop: int = int(redis_inst.incr(key, 1))
redis_inst.expire(key, node_timer_event_settings.max_expire_time)
if loop > event["repetitions"]:
logger.info(
"[add_to_pool] No need to add: node -> %s, version -> %s, loop -> %s, event -> %s",
self.node_id,
self.version,
loop,
event,
)
return

redis_inst.zadd(node_timer_event_settings.executing_pool, mapping={key: expired_time}, nx=True)

logger.info(
"[add_to_pool] add event to redis: "
"node_id -> %s, version -> %s, event -> %s, key -> %s, expired_time -> %s",
self.node_id,
self.version,
event,
key,
expired_time,
)

@classmethod
def parse_event_key(cls, key: str) -> Dict[str, Union[str, int]]:
match = EVENT_KEY_PATTERN.match(key)
if match:
key_info: Dict[str, Union[str, int]] = match.groupdict()
# to int
key_info["index"] = int(key_info["index"])

return key_info

else:
raise ValueError(f"invalid key -> {key}")


class NodeTimerEventAdapter(NodeTimerEventBaseAdapter):
def __init__(self, node_id: str, version: str):
super().__init__(node_id, version)

node_timer_event_config: NodeTimerEventConfig = NodeTimerEventConfig.objects.filter(
node_id=self.node_id
).first()

if not node_timer_event_config:
return

self.root_pipeline_id: str = node_timer_event_config.root_pipeline_id
self.events: TimerEvents = json.loads(node_timer_event_config.events)
self.index__event_map: Dict[int, TimerEvent] = {event["index"]: event for event in self.events}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
from typing import Any, Dict, List

from pipeline.contrib.node_timer_event.models import NodeTimerEventConfig
from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result
from pipeline.core.constants import PE


@ensure_return_pipeline_contrib_api_result
def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[str, List[Dict[str, Any]]]):
"""
在 pipeline_tree 中应用节点计时器边界事件配置
Expand All @@ -29,9 +31,7 @@ def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[
if act["type"] == PE.SubProcess:
apply_node_timer_event_configs(act[PE.pipeline], configs)
elif act["type"] == PE.ServiceActivity and act_id in configs:
if "events" not in act:
act["events"] = {}
act["events"]["timer_events"] = [
act.setdefault("events", {})["timer_events"] = [
{
"enable": config["enable"],
"timer_type": config["timer_type"],
Expand All @@ -43,6 +43,7 @@ def apply_node_timer_event_configs(pipeline_tree: Dict[str, Any], configs: Dict[
return new_pipeline_tree


@ensure_return_pipeline_contrib_api_result
def batch_create_node_timer_event_config(root_pipeline_id: str, pipeline_tree: Dict[str, Any]):
"""
批量创建节点时间事件配置
Expand Down
145 changes: 105 additions & 40 deletions runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,81 @@
"""
import logging
from abc import abstractmethod
from typing import Any, Dict, Tuple, Type
from typing import Any, Dict, Tuple

from pipeline.core.data.base import DataObject
from pipeline.eri.runtime import BambooDjangoRuntime

from bamboo_engine import api as bamboo_engine_api
from bamboo_engine.eri import ExecutionData

logger = logging.getLogger(__name__)


def register_action(action_name: str):
"""注册 Action"""
class ActionManager:
__hub = {}

def register(cls):
ActionFactory.add_action(action_name, cls)
return cls
@classmethod
def register_invocation_cls(cls, invocation_cls):
action_name = invocation_cls.Meta.action_name
existed_invocation_cls = cls.__hub.get(action_name)
if existed_invocation_cls:
raise RuntimeError(
"func register error, {}'s action_name {} conflict with {}".format(
existed_invocation_cls, action_name, invocation_cls
)
)

cls.__hub[action_name] = invocation_cls

return register
@classmethod
def clear(cls):
cls.__hub = {}

@classmethod
def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_name: str) -> "BaseAction":
"""
获取 Action 实例
:param root_pipeline_id: 根节点 ID
:param node_id: 节点 ID
:param version: 节点版本
:param action_name: Action 名称
:return:
"""
if action_name not in cls.__hub:
raise ValueError("{} not found".format(action_name))
return cls.__hub[action_name](root_pipeline_id, node_id, version)


class BaseAction:
class ActionMeta(type):
"""
Metaclass for FEEL function invocation
"""

def __new__(cls, name, bases, dct):
# ensure initialization is only performed for subclasses of Plugin
parents = [b for b in bases if isinstance(b, ActionMeta)]
if not parents:
return super().__new__(cls, name, bases, dct)

new_cls = super().__new__(cls, name, bases, dct)

# meta validation
meta_obj = getattr(new_cls, "Meta", None)
if not meta_obj:
raise AttributeError("Meta class is required")

action_name = getattr(meta_obj, "action_name", None)
if not action_name:
raise AttributeError("action_name is required in Meta")

# register func
ActionManager.register_invocation_cls(new_cls)

return new_cls


class BaseAction(metaclass=ActionMeta):
def __init__(self, root_pipeline_id: str, node_id: str, version: str):
self.root_pipeline_id = root_pipeline_id
self.node_id = node_id
Expand Down Expand Up @@ -60,42 +114,53 @@ def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool
raise NotImplementedError


class ActionFactory:
"""
节点处理器工厂
"""
class ExampleAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data)
return True

_actions: Dict[str, Type["BaseAction"]] = {}
class Meta:
action_name = "example"

@classmethod
def add_action(cls, action_name: str, action_cls: Type["BaseAction"]):
"""
向工厂中注册 Action

:param action_cls: Action 类
:param action_name: Action 名称
"""
if not issubclass(action_cls, BaseAction):
raise TypeError("register action err: {} is not subclass of {}".format(action_cls, "BaseAction"))
cls._actions[action_name] = action_cls
class ForcedFailAction(BaseAction):

@classmethod
def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_name: str) -> BaseAction:
"""
获取 Action 实例
:param root_pipeline_id: 根节点 ID
:param node_id: 节点 ID
:param version: 节点版本
:param action_name: Action 名称
:return:
"""
if action_name not in cls._actions:
raise TypeError("{} not found".format(action_name))
return cls._actions[action_name](root_pipeline_id, node_id, version)
TIMEOUT_NODE_OPERATOR = "bamboo_engine"

def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action(bamboo_engine_forced_fail)] do: data -> %s, parent_data -> %s", data, parent_data)
result = bamboo_engine_api.forced_fail_activity(
runtime=BambooDjangoRuntime(),
node_id=self.node_id,
ex_data="forced fail by {}".format(self.TIMEOUT_NODE_OPERATOR),
send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True),
)
return result.result

class Meta:
action_name = "bamboo_engine_forced_fail"


class ForcedFailAndSkipAction(BaseAction):

TIMEOUT_NODE_OPERATOR = "bamboo_engine"

@register_action("example")
class ExampleAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data)
return True
logger.info("[Action(bamboo_engine_forced_fail_and_skip)] do: data -> %s, parent_data -> %s", data, parent_data)
result = bamboo_engine_api.forced_fail_activity(
runtime=BambooDjangoRuntime(),
node_id=self.node_id,
ex_data="forced fail by {}".format(self.TIMEOUT_NODE_OPERATOR),
send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True),
)
if result.result:
result = bamboo_engine_api.skip_node(
runtime=BambooDjangoRuntime(),
node_id=self.node_id,
ex_data="forced skip by {}".format(self.TIMEOUT_NODE_OPERATOR),
send_post_set_state_signal=kwargs.get("send_post_set_state_signal", True),
)
return result.result

class Meta:
action_name = "bamboo_engine_forced_fail_and_skip"
Loading

0 comments on commit be59d71

Please sign in to comment.