diff --git a/docs/user_guide/node_timer_event_introduction.md b/docs/user_guide/node_timer_event_introduction.md index 7335da7..de23a3f 100644 --- a/docs/user_guide/node_timer_event_introduction.md +++ b/docs/user_guide/node_timer_event_introduction.md @@ -31,7 +31,7 @@ @register_action("example") class ExampleAction(BaseAction): def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: - logger.info("[Action] example do: data -> %s, parent_data -> %", data, parent_data) + logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data) return True ``` @@ -91,6 +91,8 @@ PIPELINE_NODE_TIMER_EVENT_KEY_PREFIX = "bamboo:v1:node_timer_event" # Redis key PIPELINE_NODE_TIMER_EVENT_HANDLE_QUEUE = None # 节点计时器边界事件处理队列名称, 用于处理计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列 PIPELINE_NODE_TIMER_EVENT_DISPATCH_QUEUE = None # 节点计时器边界事件分发队列名称, 用于记录计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列 PIPELINE_NODE_TIMER_EVENT_EXECUTING_POOL = "bamboo:v1:node_timer_event:executing_node_pool" # 执行节点池名称,用于记录正在执行的节点,需要保证 Redis key 唯一,命名示例: {app_code}:{app_env}:{module}:executing_node_pool +PIPELINE_NODE_TIMER_EVENT_POOL_SCAN_INTERVAL = 1 # 节点池扫描间隔,间隔越小,边界事件触发时间越精准,相应的事件处理的 workload 负载也会提升,默认为 1 s +PIPELINE_NODE_TIMER_EVENT_MAX_EXPIRE_TIME = 60 * 60 * 24 * 15 # 最长过期时间,兜底删除 Redis 冗余数据,默认为 15 Days,请根据业务场景调整 ``` ## 使用样例 diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/apps.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/apps.py index b9d81a4..2f7e43e 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/apps.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/apps.py @@ -19,6 +19,9 @@ class NodeTimerEventConfig(AppConfig): verbose_name = "PipelineNodeTimerEvent" def ready(self): + from pipeline.contrib.node_timer_event.signals.handlers import ( # noqa + bamboo_engine_eri_node_state_handler, + ) from pipeline.contrib.node_timer_event.tasks import ( # noqa dispatch_expired_nodes, execute_node_timer_event_action, diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py index fab7ff2..b8520af 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/handlers.py @@ -97,5 +97,5 @@ def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_na @register_action("example") class ExampleAction(BaseAction): def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool: - logger.info("[Action] example do: data -> %s, parent_data -> %", data, parent_data) + logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data) return True diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/management/commands/start_node_timer_event_process.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/management/commands/start_node_timer_event_process.py index cbbabd3..f02e309 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/management/commands/start_node_timer_event_process.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/management/commands/start_node_timer_event_process.py @@ -16,7 +16,6 @@ import signal import time -from django.conf import settings from django.core.management import BaseCommand from django.db import connections from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord @@ -32,8 +31,8 @@ class Command(BaseCommand): def handle(self, *args, **options): signal.signal(signal.SIGTERM, self._graceful_exit) - redis_inst = settings.redis_inst - nodes_pool = settings.EXECUTING_NODE_POOL + redis_inst = node_timer_event_settings.redis_inst + nodes_pool = node_timer_event_settings.executing_pool while not self.has_killed: try: start = time.time() @@ -42,7 +41,7 @@ def handle(self, *args, **options): logger.info(f"[node_timeout_process] time consuming: {end-start}") except Exception as e: logger.exception(e) - time.sleep(1) + time.sleep(node_timer_event_settings.pool_scan_interval) def _graceful_exit(self, *args): self.has_killed = True diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py index 939d4fb..b0f2d4c 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/models.py @@ -77,7 +77,8 @@ def parse_node_timer_event_configs(self, pipeline_tree: Dict[str, Any]) -> Dict[ index += 1 - configs.append({"node_id": act_id, "events": treated_timer_events}) + if treated_timer_events: + configs.append({"node_id": act_id, "events": treated_timer_events}) return {"result": True, "data": configs, "message": ""} @@ -155,6 +156,7 @@ def add_to_pool(cls, redis_inst, node_id: str, version: str, event: Dict[str, An # TODO 考虑 incr & zadd 合并,使用 lua 封装成原子操作 loop: int = int(redis_inst.incr(key, 1)) + redis_inst.expire(key, node_timer_event_settings.max_expire_time) if loop > event["repetitions"]: logger.info( "[add_to_pool] No need to add: node -> %s, version -> %s, loop -> %s, event -> %s", @@ -173,6 +175,7 @@ def add_to_pool(cls, redis_inst, node_id: str, version: str, event: Dict[str, An node_id, version, event, + key, expired_time, ) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py index 152b341..1c35cfb 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/settings.py @@ -17,11 +17,19 @@ class NodeTimerEventSettngs: PREFIX = "PIPELINE_NODE_TIMER_EVENT" DEFAULT_SETTINGS = { + # # Redis key 前缀,用于记录正在执行的节点,命名示例: {app_code}:{app_env}:{module}:node_timer_event # v1 表示 node_timer_event 的版本,预留以隔离 "key_prefix": "bamboo:v1:node_timer_event", + # 节点计时器边界事件处理队列名称, 用于处理计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列 "dispatch_queue": None, + # 节点计时器边界事件分发队列名称, 用于记录计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列 "handle_queue": None, + # 执行节点池名称,用于记录正在执行的节点,需要保证 Redis key 唯一,命名示例: {app_code}:{app_env}:{module}:executing_node_pool "executing_pool": "bamboo:v1:node_timer_event:executing_node_pool", + # 节点池扫描间隔,间隔越小,边界事件触发时间越精准,相应的事件处理的 workload 负载也会提升 + "pool_scan_interval": 1, + # 最长过期时间,兜底删除 Redis 冗余数据,默认为 15 Days,请根据业务场景调整 + "max_expire_time": 60 * 60 * 24 * 15, } def __getattr__(self, item: str): diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py index 24234e4..c416046 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py @@ -30,6 +30,7 @@ def dispatch_expired_nodes(record_id: int): record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id) node_keys: List[str] = json.loads(record.nodes) + logger.info("[dispatch_expired_nodes] record -> %s, nodes -> %s", record_id, node_keys) for node_key in node_keys: key_info: Dict[str, Union[str, int]] = NodeTimerEventConfig.parse_event_key(node_key) index: int = key_info["index"]