Skip to content

Commit

Permalink
minor: 新增api,返回当指定开始节点时,被跳过执行的节点列表
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuaikang committed Nov 9, 2023
1 parent e404cff commit b05b2c1
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 14 deletions.
44 changes: 43 additions & 1 deletion bamboo_engine/validator/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import copy

from bamboo_engine import exceptions
from bamboo_engine.eri import NodeType

from . import rules
from .connection import validate_graph_connection, validate_graph_without_circle
from .gateway import validate_gateways, validate_stream
from .utils import format_pipeline_tree_io_to_list, get_allowed_start_node_ids
from .utils import (
compute_pipeline_main_nodes,
compute_pipeline_skip_executed_map,
format_pipeline_tree_io_to_list,
get_nodes_dict,
)


def validate_pipeline_start_node(pipeline: dict, node_id: str):
Expand All @@ -30,6 +36,42 @@ def validate_pipeline_start_node(pipeline: dict, node_id: str):
raise exceptions.StartPositionInvalidException("this node_id is not allowed as a starting node")


def get_skipped_execute_node_ids(pipeline_tree, start_node_id, validate=True):
if validate and start_node_id not in get_allowed_start_node_ids(pipeline_tree):
raise Exception("the start_node_id is not legal, please check")
start_event_id = pipeline_tree["start_event"]["id"]
node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
will_skipped_nodes = compute_pipeline_skip_executed_map(start_event_id, node_dict, start_node_id)
return list(will_skipped_nodes)


def get_allowed_start_node_ids(pipeline_tree):
# 检查该流程是否已经经过汇聚网关填充
def check_converge_gateway():
gateways = pipeline_tree["gateways"]
if not gateways:
return True
# 经过填充的网关会有converge_gateway_id 字段
for gateway in gateways.values():
if (
gateway["type"] in ["ParallelGateway", "ConditionalParallelGateway"]
and "converge_gateway_id" not in gateway
):
return False

return True

if check_converge_gateway():
pipeline_tree = copy.deepcopy(pipeline_tree)
validate_gateways(pipeline_tree)
start_event_id = pipeline_tree["start_event"]["id"]
node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
allowed_start_node_ids = compute_pipeline_main_nodes(start_event_id, node_dict)
return allowed_start_node_ids


def validate_and_process_pipeline(pipeline: dict, cycle_tolerate=False):
for subproc in [act for act in pipeline["activities"].values() if act["type"] == NodeType.SubProcess.value]:
validate_and_process_pipeline(subproc["pipeline"], cycle_tolerate)
Expand Down
29 changes: 20 additions & 9 deletions bamboo_engine/validator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_nodes_dict(data):
return nodes


def _compute_pipeline_main_nodes(node_id, node_dict):
def compute_pipeline_main_nodes(node_id, node_dict):
"""
计算流程中的主线节点,遇到并行网关/分支并行网关/子流程,则会跳过
最后计算出来主干分支所允许开始的节点范围
Expand All @@ -110,17 +110,28 @@ def _compute_pipeline_main_nodes(node_id, node_dict):
if node_type in ["EmptyStartEvent", "ServiceActivity", "ExclusiveGateway", "ConvergeGateway", "SubProcess"]:
next_nodes = node_detail.get("target", [])
for next_node_id in next_nodes:
nodes += _compute_pipeline_main_nodes(next_node_id, node_dict)
nodes += compute_pipeline_main_nodes(next_node_id, node_dict)
elif node_type in ["ParallelGateway", "ConditionalParallelGateway"]:
next_node_id = node_detail["converge_gateway_id"]
nodes += _compute_pipeline_main_nodes(next_node_id, node_dict)
nodes += compute_pipeline_main_nodes(next_node_id, node_dict)

return nodes


def get_allowed_start_node_ids(pipeline_tree):
start_event_id = pipeline_tree["start_event"]["id"]
node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
allowed_start_node_ids = _compute_pipeline_main_nodes(start_event_id, node_dict)
return allowed_start_node_ids
def compute_pipeline_skip_executed_map(node_id, node_dict, start_node_id):
nodes = [node_id]
if node_id == start_node_id:
return nodes
node_detail = node_dict[node_id]
next_nodes = node_detail.get("target", [])
if node_detail["type"] in ["ExclusiveGateway"]:
for next_node_id in next_nodes:
node_ids = compute_pipeline_skip_executed_map(next_node_id, node_dict, start_node_id)
# 如果开始的位置在分支网关内,只处理该分支
if start_node_id in node_ids:
nodes += node_ids
else:
for next_node_id in next_nodes:
nodes += compute_pipeline_skip_executed_map(next_node_id, node_dict, start_node_id)

return set(nodes) - {start_node_id}
105 changes: 102 additions & 3 deletions docs/user_guide/start_the_pipeline_at_the_specified_location.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ from bamboo_engine import api
pipeline = {}
# 可以使用root_pipeline_context的方式补充缺失的上下文信息
api.run_pipeline(runtime=BambooDjangoRuntime(),
pipeline=pipeline,
start_node_id="xxxxx",
pipeline=pipeline,
start_node_id="xxxxx",
root_pipeline_context={})
```

Expand All @@ -28,4 +28,103 @@ start_node_id 的指定需要遵循如下规则:

下图红框内的节点表示允许作为起始位置的节点。

![run_pipeline.png](..%2Fassets%2Fimg%2Fstart_the_pipeline_at_the_specified_location%2Frun_pipeline.png)
![run_pipeline.png](..%2Fassets%2Fimg%2Fstart_the_pipeline_at_the_specified_location%2Frun_pipeline.png)

其他工具方法:

1. 获取某个流程所允许的回滚范围

```python

from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import get_allowed_start_node_ids

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)
allowed_start_node_ids = get_allowed_start_node_ids(pipeline)
```

2. 检查某个节点是否可作为开始节点:

```python
from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import validate_pipeline_start_node

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)
validate_pipeline_start_node(pipeline, act_2.id)
```

2.当开始节点为某个节点时,流程被跳过执行的节点列表:

```python
from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import get_skipped_execute_node_ids

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)

# validate = True 将会校验节点合法性
skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_2.id, validate=True)

```
23 changes: 22 additions & 1 deletion tests/validator/test_validate_start_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
build_tree,
)
from bamboo_engine.exceptions import StartPositionInvalidException
from bamboo_engine.validator import (
from bamboo_engine.validator.api import (
get_allowed_start_node_ids,
get_skipped_execute_node_ids,
validate_pipeline_start_node,
)
from bamboo_engine.validator.gateway import validate_gateways
Expand All @@ -39,6 +40,10 @@ def test_get_allowed_start_node_ids_by_parallel_gateway():
assert len(allowed_start_node_ids) == 3
assert allowed_start_node_ids == [start.id, act_1.id, pg.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, pg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}


def test_get_allowed_start_node_ids_by_exclusive_gateway():
start = EmptyStartEvent()
Expand All @@ -56,6 +61,18 @@ def test_get_allowed_start_node_ids_by_exclusive_gateway():
assert len(allowed_start_node_ids) == 5
assert allowed_start_node_ids == [start.id, act_1.id, eg.id, act_2.id, act_3.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, eg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_2.id)
assert len(skipped_execute_node_ids) == 3
assert set(skipped_execute_node_ids) == {start.id, act_1.id, eg.id}

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_3.id)
assert len(skipped_execute_node_ids) == 3
assert set(skipped_execute_node_ids) == {start.id, act_1.id, eg.id}


def test_get_allowed_start_node_ids_by_condition_parallel_gateway():
start = EmptyStartEvent()
Expand All @@ -78,6 +95,10 @@ def test_get_allowed_start_node_ids_by_condition_parallel_gateway():
assert len(allowed_start_node_ids) == 3
assert allowed_start_node_ids == [start.id, act_1.id, cpg.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, cpg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}


def test_get_allowed_start_node_ids_by_normal():
start = EmptyStartEvent()
Expand Down

0 comments on commit b05b2c1

Please sign in to comment.