Skip to content

Commit

Permalink
Merge pull request #205 from TencentBlueKing/develop
Browse files Browse the repository at this point in the history
feature: 事件&单节点执行&流程重演&流程回滚
  • Loading branch information
hanshuaikang authored Dec 6, 2023
2 parents ec4a2a9 + 0fd44da commit e7c990e
Show file tree
Hide file tree
Showing 92 changed files with 5,414 additions and 584 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: PR check

on:
pull_request:
branches: [ master ]
branches: [master, develop]

jobs:
engine-lint:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ bamboo-engine 是一个通用的流程引擎,他可以解析,执行,调度
- [重置某个异常节点的输出](./docs/user_guide/update_node_output.md)
- [设置](./docs/user_guide/settings.md)
- [增强包 - 节点超时功能](./docs/user_guide/node_timeout_introduction.md)
- [流程从指定位置开始](./docs/user_guide/start_the_pipeline_at_the_specified_location.md)


## 整体设计
Expand Down
2 changes: 1 addition & 1 deletion bamboo_engine/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
specific language governing permissions and limitations under the License.
"""

__version__ = "2.9.0"
__version__ = "2.10.0rc4"
181 changes: 180 additions & 1 deletion bamboo_engine/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from bamboo_engine.utils.string import unique_id

from ..validator.connection import validate_graph_without_circle
from .flow.data import Data, Params
from .flow.event import ExecutableEndEvent


__all__ = ["build_tree"]

__skeleton = {
Expand Down Expand Up @@ -94,6 +94,185 @@ def build_tree(start_elem, id=None, data=None):
return tree


def _get_next_node(node, pipeline_tree):
"""
获取当前节点的下一个节点
"""

out_goings = node["outgoing"]

# 说明曾经去除过环,此时没有out_goings
if out_goings == "":
return []

# 当只有一个输出时,
if not isinstance(out_goings, list):
out_goings = [out_goings]

next_nodes = []
for out_going in out_goings:
target_id = pipeline_tree["flows"][out_going]["target"]
if target_id in pipeline_tree["activities"]:
next_nodes.append(pipeline_tree["activities"][target_id])
elif target_id in pipeline_tree["gateways"]:
next_nodes.append(pipeline_tree["gateways"][target_id])
elif target_id == pipeline_tree["end_event"]["id"]:
next_nodes.append(pipeline_tree["end_event"])

return next_nodes


def _get_all_nodes(pipeline_tree: dict, with_subprocess: bool = False) -> dict:
"""
获取 pipeline_tree 中所有 activity 的信息
:param pipeline_tree: pipeline web tree
:param with_subprocess: 是否是子流程的 tree
:return: 包含 pipeline_tree 中所有 activity 的字典(包括子流程的 acitivity)
"""
all_nodes = {}
all_nodes.update(pipeline_tree["activities"])
all_nodes.update(pipeline_tree["gateways"])
all_nodes.update(
{
pipeline_tree["start_event"]["id"]: pipeline_tree["start_event"],
pipeline_tree["end_event"]["id"]: pipeline_tree["end_event"],
}
)
if with_subprocess:
for act in pipeline_tree["activities"].values():
if act["type"] == "SubProcess":
all_nodes.update(_get_all_nodes(act["pipeline"], with_subprocess=True))
return all_nodes


def _delete_flow_id_from_node_io(node, flow_id, io_type):
"""
删除节点的某条连线,io_type(incoming or outgoing)
"""
if node[io_type] == flow_id:
node[io_type] = ""
elif isinstance(node[io_type], list):
if len(node[io_type]) == 1 and node[io_type][0] == flow_id:
node[io_type] = (
"" if node["type"] not in ["ExclusiveGateway", "ParallelGateway", "ConditionalParallelGateway"] else []
)
else:
node[io_type].pop(node[io_type].index(flow_id))

# recover to original format
if (
len(node[io_type]) == 1
and io_type == "outgoing"
and node["type"] in ["EmptyStartEvent", "ServiceActivity", "ConvergeGateway"]
):
node[io_type] = node[io_type][0]


def _acyclic(pipeline):
"""
@summary: 逆转反向边
@return:
"""

pipeline["all_nodes"] = _get_all_nodes(pipeline, with_subprocess=True)

deformed_flows = {
"{}.{}".format(flow["source"], flow["target"]): flow_id for flow_id, flow in pipeline["flows"].items()
}
while True:
no_circle = validate_graph_without_circle(pipeline)
if no_circle["result"]:
break
source = no_circle["error_data"][-2]
target = no_circle["error_data"][-1]
circle_flow_key = "{}.{}".format(source, target)
flow_id = deformed_flows[circle_flow_key]
pipeline["flows"][flow_id].update({"source": target, "target": source})

source_node = pipeline["all_nodes"][source]
_delete_flow_id_from_node_io(source_node, flow_id, "outgoing")

target_node = pipeline["all_nodes"][target]
_delete_flow_id_from_node_io(target_node, flow_id, "incoming")


def generate_pipeline_token(pipeline_tree):
tree = copy.deepcopy(pipeline_tree)
# 去环
_acyclic(tree)

start_node = tree["start_event"]
token = unique_id("t")
node_token_map = {start_node["id"]: token}
inject_pipeline_token(start_node, tree, node_token_map, token)
return node_token_map


# 需要处理子流程的问题
def inject_pipeline_token(node, pipeline_tree, node_token_map, token):
# 如果是网关
if node["type"] in ["ParallelGateway", "ExclusiveGateway", "ConditionalParallelGateway"]:
next_nodes = _get_next_node(node, pipeline_tree)
target_nodes = {}
for next_node in next_nodes:
# 分支网关各个分支token相同
node_token = token
node_token_map[next_node["id"]] = node_token
# 并行网关token不同
if node["type"] in ["ParallelGateway", "ConditionalParallelGateway"]:
node_token = unique_id("t")
node_token_map[next_node["id"]] = node_token

# 如果是并行网关,沿着路径向内搜索,最终遇到对应的汇聚网关会返回
target_node = inject_pipeline_token(next_node, pipeline_tree, node_token_map, node_token)
if target_node:
target_nodes[target_node["id"]] = target_node

for target_node in target_nodes.values():
# 汇聚网关可以直连结束节点,所以可能会存在找不到对应的汇聚网关的情况
if target_node["type"] in ["EmptyEndEvent", "ExecutableEndEvent"]:
node_token_map[target_node["id"]] = token
continue
# 汇聚网关的token等于对应的网关的token
node_token_map[target_node["id"]] = token
# 到汇聚网关之后,此时继续向下遍历
next_node = _get_next_node(target_node, pipeline_tree)[0]
# 汇聚网关只会有一个出度
node_token_map[next_node["id"]] = token
inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果是汇聚网关,并且id等于converge_id,说明此时遍历在某个单元
if node["type"] == "ConvergeGateway":
return node

# 如果是普通的节点,说明只有一个出度,此时直接向下遍历就好
if node["type"] in ["ServiceActivity", "EmptyStartEvent"]:
next_node_list = _get_next_node(node, pipeline_tree)
# 此时有可能遇到一个去环的节点,该节点没有
if not next_node_list:
return
next_node = next_node_list[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果遇到结束节点,直接返回
if node["type"] in ["EmptyEndEvent", "ExecutableEndEvent"]:
return node

if node["type"] == "SubProcess":
subprocess_pipeline_tree = node["pipeline"]
subprocess_start_node = subprocess_pipeline_tree["start_event"]
subprocess_start_node_token = unique_id("t")
node_token_map[subprocess_start_node["id"]] = subprocess_start_node_token
inject_pipeline_token(
subprocess_start_node, subprocess_pipeline_tree, node_token_map, subprocess_start_node_token
)
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)


def __update(tree, elem):
node_type = __node_type[elem.type()]
node = tree[node_type] if node_type == "end_event" else tree[node_type][elem.id]
Expand Down
2 changes: 2 additions & 0 deletions bamboo_engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ class Settings:
PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func

PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value

PIPELINE_ENABLE_ROLLBACK = False
47 changes: 44 additions & 3 deletions bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
setup_gauge,
setup_histogram,
)
from .utils.constants import RuntimeSettings
from .utils.host import get_hostname
from .utils.string import get_lower_case_name

Expand Down Expand Up @@ -115,17 +116,22 @@ def run_pipeline(
cycle_tolerate = options.get("cycle_tolerate", False)
validator.validate_and_process_pipeline(pipeline, cycle_tolerate)

start_node_id = options.get("start_node_id", pipeline["start_event"]["id"])
# 如果起始位置不是开始节点,则需要进行额外校验
validator.validate_pipeline_start_node(pipeline, start_node_id)

self.runtime.pre_prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

process_id = self.runtime.prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

# execute from start event
self.runtime.execute(
process_id=process_id,
node_id=pipeline["start_event"]["id"],
node_id=start_node_id,
root_pipeline_id=pipeline["id"],
parent_pipeline_id=pipeline["id"],
)
Expand Down Expand Up @@ -912,7 +918,8 @@ def execute(
# 设置状态前检测
if node_state.name not in states.INVERTED_TRANSITION[states.RUNNING]:
logger.info(
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING for exist state", # noqa
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING "
"for exist state",
process_info.root_pipeline_id,
node_state.name,
)
Expand Down Expand Up @@ -1010,6 +1017,15 @@ def execute(
hook=HookType.NODE_FINISH,
node=node,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,则kill掉当前的process,直接return
if node.reserve_rollback:
self.runtime.die(process_id)
self.runtime.start_rollback(root_pipeline_id, node_id)
return

# 进程是否要进入睡眠
if execute_result.should_sleep:
Expand Down Expand Up @@ -1177,7 +1193,9 @@ def schedule(
# only retry at multiple calback type
if schedule.type is not ScheduleType.MULTIPLE_CALLBACK:
logger.info(
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, will not retry to get lock", # noqa
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, "
"will not retry to get lock",
# noqa
root_pipeline_id,
schedule_id,
node_id,
Expand Down Expand Up @@ -1290,6 +1308,15 @@ def schedule(
node=node,
callback_data=callback_data,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,启动回滚流程
if node.reserve_rollback:
self.runtime.start_rollback(root_pipeline_id, node_id)
return

self.runtime.execute(
process_id=process_id,
node_id=schedule_result.next_node_id,
Expand All @@ -1302,6 +1329,20 @@ def schedule(
time.time() - engine_post_schedule_start_at
)

def _set_snapshot(self, root_pipeline_id, node):
inputs = self.runtime.get_execution_data_inputs(node.id)
outputs = self.runtime.get_execution_data_outputs(node.id)
root_pipeline_input = {key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()}
self.runtime.set_node_snapshot(
root_pipeline_id=root_pipeline_id,
node_id=node.id,
code=node.code,
version=node.version,
context_values=root_pipeline_input,
inputs=inputs,
outputs=outputs,
)

def _add_history(
self,
node_id: str,
Expand Down
30 changes: 30 additions & 0 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,35 @@ def get_config(self, name):
"""


class RollbackMixin:
@abstractmethod
def set_pipeline_token(self, pipeline_tree: dict):
"""
设置pipeline token
"""

@abstractmethod
def set_node_snapshot(
self,
root_pipeline_id: str,
node_id: str,
code: str,
version: str,
context_values: dict,
inputs: dict,
outputs: dict,
):
"""
创建一份节点快照
"""

@abstractmethod
def start_rollback(self, root_pipeline_id: str, node_id: str):
"""
开始回滚
"""


class EngineRuntimeInterface(
PluginManagerMixin,
EngineAPIHooksMixin,
Expand All @@ -1591,6 +1620,7 @@ class EngineRuntimeInterface(
ExecutionHistoryMixin,
InterruptMixin,
ConfigMixin,
RollbackMixin,
metaclass=ABCMeta,
):
@abstractmethod
Expand Down
Loading

0 comments on commit e7c990e

Please sign in to comment.