Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 事件&单节点执行&流程重演&流程回滚 #205

Merged
merged 26 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
17c21ec
feature: 回滚功能支持 (#172)
hanshuaikang Oct 18, 2023
e97fbdd
minor: release bamboo-engine 2.9.0rc1
hanshuaikang Oct 18, 2023
a7facd6
minor: release bamboo-engine 3.28.0rc1
hanshuaikang Oct 18, 2023
6efaf05
feature: 流程支持从指定位置开始 (#180)
hanshuaikang Oct 20, 2023
bdd6a09
bugfix: 修复节点快照某些值无法正确序列化的问题 (#183)
hanshuaikang Oct 24, 2023
9f172ec
minor: release bamboo-engine 2.10.0rc1
hanshuaikang Oct 24, 2023
6af6d5e
minor: release bamboo-pipeline 3.29.0rc1
hanshuaikang Oct 24, 2023
afc1c28
bugfix: 修复环问题 && 构建token新增可执行结束节点判定 (#188)
hanshuaikang Oct 31, 2023
ce53e06
minor: release 2.10.0rc2
hanshuaikang Oct 31, 2023
e404cff
minor: release 3.29.0rc2
hanshuaikang Nov 1, 2023
b05b2c1
minor: 新增api,返回当指定开始节点时,被跳过执行的节点列表
hanshuaikang Nov 8, 2023
e1485c0
minor: code review
hanshuaikang Nov 9, 2023
75cf42b
minor: release 2.10.0rc3
hanshuaikang Nov 9, 2023
0af03b1
minor: release pipeline 3.29.0rc3
hanshuaikang Nov 9, 2023
a6c9c21
minor: 新增修改已有recursive_replace_id, 支持返回map
hanshuaikang Nov 16, 2023
fff2f73
test: 补充replace单元测试
hanshuaikang Nov 17, 2023
5a0f0d0
feature: 节点计时器边界事件支持 (closed #189) (#190)
ZhuoZhuoCrayon Nov 20, 2023
0aa6dbc
minor: 去除any模式下当前正在运行节点的限制
hanshuaikang Nov 20, 2023
9563da6
minor: release bamboo-pipeline 3.29.0rc4
hanshuaikang Nov 20, 2023
5755dcb
bugfix: 修复快照回滚时反序列化的问题
hanshuaikang Nov 20, 2023
8e75c59
minor: release bamboo-engine 2.10.0rc4
hanshuaikang Nov 20, 2023
14cd46f
minor: release bamboo-pipeline 3.29.0rc4
hanshuaikang Nov 20, 2023
fe41f9c
bugfix: 调整分支网关token分配算法,分支网关内与主流程token一致
hanshuaikang Nov 27, 2023
94efa96
feature: 单节点执行方案 (#203)
hanshuaikang Dec 5, 2023
eb72afe
minor: merge master
hanshuaikang Dec 5, 2023
0fd44da
Merge pull request #206 from hanshuaikang/minor/fix_conflict
hanshuaikang Dec 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: PR check

on:
pull_request:
branches: [ master ]
branches: [master, develop]

jobs:
engine-lint:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ bamboo-engine 是一个通用的流程引擎,他可以解析,执行,调度
- [重置某个异常节点的输出](./docs/user_guide/update_node_output.md)
- [设置](./docs/user_guide/settings.md)
- [增强包 - 节点超时功能](./docs/user_guide/node_timeout_introduction.md)
- [流程从指定位置开始](./docs/user_guide/start_the_pipeline_at_the_specified_location.md)


## 整体设计
Expand Down
2 changes: 1 addition & 1 deletion bamboo_engine/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
specific language governing permissions and limitations under the License.
"""

__version__ = "2.9.0"
__version__ = "2.10.0rc4"
181 changes: 180 additions & 1 deletion bamboo_engine/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from bamboo_engine.utils.string import unique_id

from ..validator.connection import validate_graph_without_circle
from .flow.data import Data, Params
from .flow.event import ExecutableEndEvent


__all__ = ["build_tree"]

__skeleton = {
Expand Down Expand Up @@ -94,6 +94,185 @@
return tree


def _get_next_node(node, pipeline_tree):
"""
获取当前节点的下一个节点
"""

out_goings = node["outgoing"]

# 说明曾经去除过环,此时没有out_goings
if out_goings == "":
return []

# 当只有一个输出时,
if not isinstance(out_goings, list):
out_goings = [out_goings]

next_nodes = []
for out_going in out_goings:
target_id = pipeline_tree["flows"][out_going]["target"]
if target_id in pipeline_tree["activities"]:
next_nodes.append(pipeline_tree["activities"][target_id])
elif target_id in pipeline_tree["gateways"]:
next_nodes.append(pipeline_tree["gateways"][target_id])
elif target_id == pipeline_tree["end_event"]["id"]:
next_nodes.append(pipeline_tree["end_event"])

return next_nodes


def _get_all_nodes(pipeline_tree: dict, with_subprocess: bool = False) -> dict:
"""
获取 pipeline_tree 中所有 activity 的信息

:param pipeline_tree: pipeline web tree
:param with_subprocess: 是否是子流程的 tree
:return: 包含 pipeline_tree 中所有 activity 的字典(包括子流程的 acitivity)
"""
all_nodes = {}
all_nodes.update(pipeline_tree["activities"])
all_nodes.update(pipeline_tree["gateways"])
all_nodes.update(
{
pipeline_tree["start_event"]["id"]: pipeline_tree["start_event"],
pipeline_tree["end_event"]["id"]: pipeline_tree["end_event"],
}
)
if with_subprocess:
for act in pipeline_tree["activities"].values():
if act["type"] == "SubProcess":
all_nodes.update(_get_all_nodes(act["pipeline"], with_subprocess=True))
return all_nodes


def _delete_flow_id_from_node_io(node, flow_id, io_type):
"""
删除节点的某条连线,io_type(incoming or outgoing)
"""
if node[io_type] == flow_id:
node[io_type] = ""
elif isinstance(node[io_type], list):
if len(node[io_type]) == 1 and node[io_type][0] == flow_id:
node[io_type] = (

Check warning on line 157 in bamboo_engine/builder/builder.py

View check run for this annotation

Codecov / codecov/patch

bamboo_engine/builder/builder.py#L157

Added line #L157 was not covered by tests
"" if node["type"] not in ["ExclusiveGateway", "ParallelGateway", "ConditionalParallelGateway"] else []
)
else:
node[io_type].pop(node[io_type].index(flow_id))

# recover to original format
if (
len(node[io_type]) == 1
and io_type == "outgoing"
and node["type"] in ["EmptyStartEvent", "ServiceActivity", "ConvergeGateway"]
):
node[io_type] = node[io_type][0]

Check warning on line 169 in bamboo_engine/builder/builder.py

View check run for this annotation

Codecov / codecov/patch

bamboo_engine/builder/builder.py#L169

Added line #L169 was not covered by tests


def _acyclic(pipeline):
"""
@summary: 逆转反向边
@return:
"""

pipeline["all_nodes"] = _get_all_nodes(pipeline, with_subprocess=True)

deformed_flows = {
"{}.{}".format(flow["source"], flow["target"]): flow_id for flow_id, flow in pipeline["flows"].items()
}
while True:
no_circle = validate_graph_without_circle(pipeline)
if no_circle["result"]:
break
source = no_circle["error_data"][-2]
target = no_circle["error_data"][-1]
circle_flow_key = "{}.{}".format(source, target)
flow_id = deformed_flows[circle_flow_key]
pipeline["flows"][flow_id].update({"source": target, "target": source})

source_node = pipeline["all_nodes"][source]
_delete_flow_id_from_node_io(source_node, flow_id, "outgoing")

target_node = pipeline["all_nodes"][target]
_delete_flow_id_from_node_io(target_node, flow_id, "incoming")


def generate_pipeline_token(pipeline_tree):
tree = copy.deepcopy(pipeline_tree)
# 去环
_acyclic(tree)

start_node = tree["start_event"]
token = unique_id("t")
node_token_map = {start_node["id"]: token}
inject_pipeline_token(start_node, tree, node_token_map, token)
return node_token_map


# 需要处理子流程的问题
def inject_pipeline_token(node, pipeline_tree, node_token_map, token):
# 如果是网关
if node["type"] in ["ParallelGateway", "ExclusiveGateway", "ConditionalParallelGateway"]:
next_nodes = _get_next_node(node, pipeline_tree)
target_nodes = {}
for next_node in next_nodes:
# 分支网关各个分支token相同
node_token = token
node_token_map[next_node["id"]] = node_token
# 并行网关token不同
if node["type"] in ["ParallelGateway", "ConditionalParallelGateway"]:
node_token = unique_id("t")
node_token_map[next_node["id"]] = node_token

# 如果是并行网关,沿着路径向内搜索,最终遇到对应的汇聚网关会返回
target_node = inject_pipeline_token(next_node, pipeline_tree, node_token_map, node_token)
if target_node:
target_nodes[target_node["id"]] = target_node

for target_node in target_nodes.values():
# 汇聚网关可以直连结束节点,所以可能会存在找不到对应的汇聚网关的情况
if target_node["type"] in ["EmptyEndEvent", "ExecutableEndEvent"]:
node_token_map[target_node["id"]] = token
continue
# 汇聚网关的token等于对应的网关的token
node_token_map[target_node["id"]] = token
# 到汇聚网关之后,此时继续向下遍历
next_node = _get_next_node(target_node, pipeline_tree)[0]
# 汇聚网关只会有一个出度
node_token_map[next_node["id"]] = token
inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果是汇聚网关,并且id等于converge_id,说明此时遍历在某个单元
if node["type"] == "ConvergeGateway":
return node

# 如果是普通的节点,说明只有一个出度,此时直接向下遍历就好
if node["type"] in ["ServiceActivity", "EmptyStartEvent"]:
next_node_list = _get_next_node(node, pipeline_tree)
# 此时有可能遇到一个去环的节点,该节点没有
if not next_node_list:
return
next_node = next_node_list[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果遇到结束节点,直接返回
if node["type"] in ["EmptyEndEvent", "ExecutableEndEvent"]:
return node

if node["type"] == "SubProcess":
subprocess_pipeline_tree = node["pipeline"]
subprocess_start_node = subprocess_pipeline_tree["start_event"]
subprocess_start_node_token = unique_id("t")
node_token_map[subprocess_start_node["id"]] = subprocess_start_node_token
inject_pipeline_token(
subprocess_start_node, subprocess_pipeline_tree, node_token_map, subprocess_start_node_token
)
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)


def __update(tree, elem):
node_type = __node_type[elem.type()]
node = tree[node_type] if node_type == "end_event" else tree[node_type][elem.id]
Expand Down
2 changes: 2 additions & 0 deletions bamboo_engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ class Settings:
PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func

PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value

PIPELINE_ENABLE_ROLLBACK = False
47 changes: 44 additions & 3 deletions bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
setup_gauge,
setup_histogram,
)
from .utils.constants import RuntimeSettings
from .utils.host import get_hostname
from .utils.string import get_lower_case_name

Expand Down Expand Up @@ -115,17 +116,22 @@
cycle_tolerate = options.get("cycle_tolerate", False)
validator.validate_and_process_pipeline(pipeline, cycle_tolerate)

start_node_id = options.get("start_node_id", pipeline["start_event"]["id"])
# 如果起始位置不是开始节点,则需要进行额外校验
validator.validate_pipeline_start_node(pipeline, start_node_id)

self.runtime.pre_prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

process_id = self.runtime.prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

# execute from start event
self.runtime.execute(
process_id=process_id,
node_id=pipeline["start_event"]["id"],
node_id=start_node_id,
root_pipeline_id=pipeline["id"],
parent_pipeline_id=pipeline["id"],
)
Expand Down Expand Up @@ -737,8 +743,8 @@
]:
# 保存数据
self.runtime.set_execution_data(node_id=node.id, data=service_data)
except Exception:
pass

Check warning on line 747 in bamboo_engine/engine.py

View check run for this annotation

Codecov / codecov/patch

bamboo_engine/engine.py#L746-L747

Added lines #L746 - L747 were not covered by tests

# engine event
@setup_gauge(ENGINE_RUNNING_PROCESSES)
Expand Down Expand Up @@ -912,7 +918,8 @@
# 设置状态前检测
if node_state.name not in states.INVERTED_TRANSITION[states.RUNNING]:
logger.info(
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING for exist state", # noqa
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING "
"for exist state",
process_info.root_pipeline_id,
node_state.name,
)
Expand Down Expand Up @@ -1010,6 +1017,15 @@
hook=HookType.NODE_FINISH,
node=node,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,则kill掉当前的process,直接return
if node.reserve_rollback:
self.runtime.die(process_id)
self.runtime.start_rollback(root_pipeline_id, node_id)
return

# 进程是否要进入睡眠
if execute_result.should_sleep:
Expand Down Expand Up @@ -1177,7 +1193,9 @@
# only retry at multiple calback type
if schedule.type is not ScheduleType.MULTIPLE_CALLBACK:
logger.info(
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, will not retry to get lock", # noqa
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, "
"will not retry to get lock",
# noqa
root_pipeline_id,
schedule_id,
node_id,
Expand Down Expand Up @@ -1290,6 +1308,15 @@
node=node,
callback_data=callback_data,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,启动回滚流程
if node.reserve_rollback:
self.runtime.start_rollback(root_pipeline_id, node_id)
return

self.runtime.execute(
process_id=process_id,
node_id=schedule_result.next_node_id,
Expand All @@ -1302,6 +1329,20 @@
time.time() - engine_post_schedule_start_at
)

def _set_snapshot(self, root_pipeline_id, node):
inputs = self.runtime.get_execution_data_inputs(node.id)
outputs = self.runtime.get_execution_data_outputs(node.id)
root_pipeline_input = {key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()}
self.runtime.set_node_snapshot(
root_pipeline_id=root_pipeline_id,
node_id=node.id,
code=node.code,
version=node.version,
context_values=root_pipeline_input,
inputs=inputs,
outputs=outputs,
)

def _add_history(
self,
node_id: str,
Expand Down
30 changes: 30 additions & 0 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,35 @@ def get_config(self, name):
"""


class RollbackMixin:
@abstractmethod
def set_pipeline_token(self, pipeline_tree: dict):
"""
设置pipeline token
"""

@abstractmethod
def set_node_snapshot(
self,
root_pipeline_id: str,
node_id: str,
code: str,
version: str,
context_values: dict,
inputs: dict,
outputs: dict,
):
"""
创建一份节点快照
"""

@abstractmethod
def start_rollback(self, root_pipeline_id: str, node_id: str):
"""
开始回滚
"""


class EngineRuntimeInterface(
PluginManagerMixin,
EngineAPIHooksMixin,
Expand All @@ -1591,6 +1620,7 @@ class EngineRuntimeInterface(
ExecutionHistoryMixin,
InterruptMixin,
ConfigMixin,
RollbackMixin,
metaclass=ABCMeta,
):
@abstractmethod
Expand Down
Loading
Loading