Skip to content

Commit

Permalink
sprintfix: 场景测试修复
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 31, 2023
1 parent b1d0121 commit 88afa33
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 7 deletions.
4 changes: 3 additions & 1 deletion docs/user_guide/node_timer_event_introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@register_action("example")
class ExampleAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action] example do: data -> %s, parent_data -> %", data, parent_data)
logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data)
return True

```
Expand Down Expand Up @@ -91,6 +91,8 @@ PIPELINE_NODE_TIMER_EVENT_KEY_PREFIX = "bamboo:v1:node_timer_event" # Redis key
PIPELINE_NODE_TIMER_EVENT_HANDLE_QUEUE = None # 节点计时器边界事件处理队列名称, 用于处理计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列
PIPELINE_NODE_TIMER_EVENT_DISPATCH_QUEUE = None # 节点计时器边界事件分发队列名称, 用于记录计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列
PIPELINE_NODE_TIMER_EVENT_EXECUTING_POOL = "bamboo:v1:node_timer_event:executing_node_pool" # 执行节点池名称,用于记录正在执行的节点,需要保证 Redis key 唯一,命名示例: {app_code}:{app_env}:{module}:executing_node_pool
PIPELINE_NODE_TIMER_EVENT_POOL_SCAN_INTERVAL = 1 # 节点池扫描间隔,间隔越小,边界事件触发时间越精准,相应的事件处理的 workload 负载也会提升,默认为 1 s
PIPELINE_NODE_TIMER_EVENT_MAX_EXPIRE_TIME = 60 * 60 * 24 * 15 # 最长过期时间,兜底删除 Redis 冗余数据,默认为 15 Days,请根据业务场景调整
```

## 使用样例
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class NodeTimerEventConfig(AppConfig):
verbose_name = "PipelineNodeTimerEvent"

def ready(self):
from pipeline.contrib.node_timer_event.signals.handlers import ( # noqa
bamboo_engine_eri_node_state_handler,
)
from pipeline.contrib.node_timer_event.tasks import ( # noqa
dispatch_expired_nodes,
execute_node_timer_event_action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,5 @@ def get_action(cls, root_pipeline_id: str, node_id: str, version: str, action_na
@register_action("example")
class ExampleAction(BaseAction):
def do(self, data: DataObject, parent_data: DataObject, *args, **kwargs) -> bool:
logger.info("[Action] example do: data -> %s, parent_data -> %", data, parent_data)
logger.info("[Action] example do: data -> %s, parent_data -> %s", data, parent_data)
return True
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import signal
import time

from django.conf import settings
from django.core.management import BaseCommand
from django.db import connections
from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord
Expand All @@ -32,8 +31,8 @@ class Command(BaseCommand):

def handle(self, *args, **options):
signal.signal(signal.SIGTERM, self._graceful_exit)
redis_inst = settings.redis_inst
nodes_pool = settings.EXECUTING_NODE_POOL
redis_inst = node_timer_event_settings.redis_inst
nodes_pool = node_timer_event_settings.executing_pool
while not self.has_killed:
try:
start = time.time()
Expand All @@ -42,7 +41,7 @@ def handle(self, *args, **options):
logger.info(f"[node_timeout_process] time consuming: {end-start}")
except Exception as e:
logger.exception(e)
time.sleep(1)
time.sleep(node_timer_event_settings.pool_scan_interval)

def _graceful_exit(self, *args):
self.has_killed = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def parse_node_timer_event_configs(self, pipeline_tree: Dict[str, Any]) -> Dict[

index += 1

configs.append({"node_id": act_id, "events": treated_timer_events})
if treated_timer_events:
configs.append({"node_id": act_id, "events": treated_timer_events})

return {"result": True, "data": configs, "message": ""}

Expand Down Expand Up @@ -155,6 +156,7 @@ def add_to_pool(cls, redis_inst, node_id: str, version: str, event: Dict[str, An

# TODO 考虑 incr & zadd 合并,使用 lua 封装成原子操作
loop: int = int(redis_inst.incr(key, 1))
redis_inst.expire(key, node_timer_event_settings.max_expire_time)
if loop > event["repetitions"]:
logger.info(
"[add_to_pool] No need to add: node -> %s, version -> %s, loop -> %s, event -> %s",
Expand All @@ -173,6 +175,7 @@ def add_to_pool(cls, redis_inst, node_id: str, version: str, event: Dict[str, An
node_id,
version,
event,
key,
expired_time,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@
class NodeTimerEventSettngs:
PREFIX = "PIPELINE_NODE_TIMER_EVENT"
DEFAULT_SETTINGS = {
# # Redis key 前缀,用于记录正在执行的节点,命名示例: {app_code}:{app_env}:{module}:node_timer_event
# v1 表示 node_timer_event 的版本,预留以隔离
"key_prefix": "bamboo:v1:node_timer_event",
# 节点计时器边界事件处理队列名称, 用于处理计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列
"dispatch_queue": None,
# 节点计时器边界事件分发队列名称, 用于记录计时器边界事件, 需要 worker 接收该队列消息,默认为 None,即使用 celery 默认队列
"handle_queue": None,
# 执行节点池名称,用于记录正在执行的节点,需要保证 Redis key 唯一,命名示例: {app_code}:{app_env}:{module}:executing_node_pool
"executing_pool": "bamboo:v1:node_timer_event:executing_node_pool",
# 节点池扫描间隔,间隔越小,边界事件触发时间越精准,相应的事件处理的 workload 负载也会提升
"pool_scan_interval": 1,
# 最长过期时间,兜底删除 Redis 冗余数据,默认为 15 Days,请根据业务场景调整
"max_expire_time": 60 * 60 * 24 * 15,
}

def __getattr__(self, item: str):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
def dispatch_expired_nodes(record_id: int):
record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id)
node_keys: List[str] = json.loads(record.nodes)
logger.info("[dispatch_expired_nodes] record -> %s, nodes -> %s", record_id, node_keys)
for node_key in node_keys:
key_info: Dict[str, Union[str, int]] = NodeTimerEventConfig.parse_event_key(node_key)
index: int = key_info["index"]
Expand Down

0 comments on commit 88afa33

Please sign in to comment.