Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor: 新增api,返回当指定开始节点时,被跳过执行的节点列表 #194

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion bamboo_engine/validator/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,20 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import copy

from bamboo_engine import exceptions
from bamboo_engine.eri import NodeType

from . import rules
from .connection import validate_graph_connection, validate_graph_without_circle
from .gateway import validate_gateways, validate_stream
from .utils import format_pipeline_tree_io_to_list, get_allowed_start_node_ids
from .utils import (
compute_pipeline_main_nodes,
compute_pipeline_skip_executed_map,
format_pipeline_tree_io_to_list,
get_nodes_dict,
)


def validate_pipeline_start_node(pipeline: dict, node_id: str):
Expand All @@ -30,6 +36,47 @@ def validate_pipeline_start_node(pipeline: dict, node_id: str):
raise exceptions.StartPositionInvalidException("this node_id is not allowed as a starting node")


def get_skipped_execute_node_ids(pipeline_tree, start_node_id, validate=True):
if validate and start_node_id not in get_allowed_start_node_ids(pipeline_tree):
raise Exception("the start_node_id is not legal, please check")
start_event_id = pipeline_tree["start_event"]["id"]

# 如果开始节点 = start_node_id, 说明要从开始节点开始执行,此时没有任何节点被跳过
if start_node_id == start_event_id:
return []

node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
will_skipped_nodes = compute_pipeline_skip_executed_map(start_event_id, node_dict, start_node_id)
return list(will_skipped_nodes)


def get_allowed_start_node_ids(pipeline_tree):
# 检查该流程是否已经经过汇聚网关填充
def check_converge_gateway():
gateways = pipeline_tree["gateways"]
if not gateways:
return True
# 经过填充的网关会有converge_gateway_id 字段
for gateway in gateways.values():
if (
gateway["type"] in ["ParallelGateway", "ConditionalParallelGateway"]
and "converge_gateway_id" not in gateway
):
return False

return True

if check_converge_gateway():
pipeline_tree = copy.deepcopy(pipeline_tree)
validate_gateways(pipeline_tree)
start_event_id = pipeline_tree["start_event"]["id"]
node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
allowed_start_node_ids = compute_pipeline_main_nodes(start_event_id, node_dict)
return allowed_start_node_ids


def validate_and_process_pipeline(pipeline: dict, cycle_tolerate=False):
for subproc in [act for act in pipeline["activities"].values() if act["type"] == NodeType.SubProcess.value]:
validate_and_process_pipeline(subproc["pipeline"], cycle_tolerate)
Expand Down
29 changes: 20 additions & 9 deletions bamboo_engine/validator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_nodes_dict(data):
return nodes


def _compute_pipeline_main_nodes(node_id, node_dict):
def compute_pipeline_main_nodes(node_id, node_dict):
"""
计算流程中的主线节点,遇到并行网关/分支并行网关/子流程,则会跳过
最后计算出来主干分支所允许开始的节点范围
Expand All @@ -110,17 +110,28 @@ def _compute_pipeline_main_nodes(node_id, node_dict):
if node_type in ["EmptyStartEvent", "ServiceActivity", "ExclusiveGateway", "ConvergeGateway", "SubProcess"]:
next_nodes = node_detail.get("target", [])
for next_node_id in next_nodes:
nodes += _compute_pipeline_main_nodes(next_node_id, node_dict)
nodes += compute_pipeline_main_nodes(next_node_id, node_dict)
elif node_type in ["ParallelGateway", "ConditionalParallelGateway"]:
next_node_id = node_detail["converge_gateway_id"]
nodes += _compute_pipeline_main_nodes(next_node_id, node_dict)
nodes += compute_pipeline_main_nodes(next_node_id, node_dict)

return nodes


def get_allowed_start_node_ids(pipeline_tree):
start_event_id = pipeline_tree["start_event"]["id"]
node_dict = get_nodes_dict(pipeline_tree)
# 流程的开始位置只允许出现在主干,子流程/并行网关内的节点不允许作为起始位置
allowed_start_node_ids = _compute_pipeline_main_nodes(start_event_id, node_dict)
return allowed_start_node_ids
def compute_pipeline_skip_executed_map(node_id, node_dict, start_node_id):
nodes = [node_id]
if node_id == start_node_id:
return nodes
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
node_detail = node_dict[node_id]
next_nodes = node_detail.get("target", [])
if node_detail["type"] in ["ExclusiveGateway"]:
for next_node_id in next_nodes:
node_ids = compute_pipeline_skip_executed_map(next_node_id, node_dict, start_node_id)
# 如果开始的位置在分支网关内,只处理该分支
if start_node_id in node_ids:
nodes += node_ids
else:
for next_node_id in next_nodes:
nodes += compute_pipeline_skip_executed_map(next_node_id, node_dict, start_node_id)

return set(nodes) - {start_node_id}
105 changes: 102 additions & 3 deletions docs/user_guide/start_the_pipeline_at_the_specified_location.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ from bamboo_engine import api
pipeline = {}
# 可以使用root_pipeline_context的方式补充缺失的上下文信息
api.run_pipeline(runtime=BambooDjangoRuntime(),
pipeline=pipeline,
start_node_id="xxxxx",
pipeline=pipeline,
start_node_id="xxxxx",
root_pipeline_context={})
```

Expand All @@ -28,4 +28,103 @@ start_node_id 的指定需要遵循如下规则:

下图红框内的节点表示允许作为起始位置的节点。

![run_pipeline.png](..%2Fassets%2Fimg%2Fstart_the_pipeline_at_the_specified_location%2Frun_pipeline.png)
![run_pipeline.png](..%2Fassets%2Fimg%2Fstart_the_pipeline_at_the_specified_location%2Frun_pipeline.png)

其他工具方法:

1. 获取某个流程所允许的回滚范围

```python

from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import get_allowed_start_node_ids

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)
allowed_start_node_ids = get_allowed_start_node_ids(pipeline)
```

2. 检查某个节点是否可作为开始节点:

```python
from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import validate_pipeline_start_node

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)
validate_pipeline_start_node(pipeline, act_2.id)
```

2.当开始节点为某个节点时,流程被跳过执行的节点列表:

```python
from bamboo_engine.builder import (
ConditionalParallelGateway,
ConvergeGateway,
EmptyEndEvent,
EmptyStartEvent,
ServiceActivity,
build_tree,
)

from bamboo_engine.validator.api import get_skipped_execute_node_ids

start = EmptyStartEvent()
act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1")
cpg = ConditionalParallelGateway(
conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"},
name="[act_2] or [act_3 and act_4]",
)
act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2")
act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3")
act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4")
cg = ConvergeGateway()
end = EmptyEndEvent()
start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end)

pipeline = build_tree(start)

# validate = True 将会校验节点合法性
skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_2.id, validate=True)

```
23 changes: 22 additions & 1 deletion tests/validator/test_validate_start_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
build_tree,
)
from bamboo_engine.exceptions import StartPositionInvalidException
from bamboo_engine.validator import (
from bamboo_engine.validator.api import (
get_allowed_start_node_ids,
get_skipped_execute_node_ids,
validate_pipeline_start_node,
)
from bamboo_engine.validator.gateway import validate_gateways
Expand All @@ -39,6 +40,10 @@ def test_get_allowed_start_node_ids_by_parallel_gateway():
assert len(allowed_start_node_ids) == 3
assert allowed_start_node_ids == [start.id, act_1.id, pg.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, pg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}


def test_get_allowed_start_node_ids_by_exclusive_gateway():
start = EmptyStartEvent()
Expand All @@ -56,6 +61,18 @@ def test_get_allowed_start_node_ids_by_exclusive_gateway():
assert len(allowed_start_node_ids) == 5
assert allowed_start_node_ids == [start.id, act_1.id, eg.id, act_2.id, act_3.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, eg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_2.id)
assert len(skipped_execute_node_ids) == 3
assert set(skipped_execute_node_ids) == {start.id, act_1.id, eg.id}

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, act_3.id)
assert len(skipped_execute_node_ids) == 3
assert set(skipped_execute_node_ids) == {start.id, act_1.id, eg.id}


def test_get_allowed_start_node_ids_by_condition_parallel_gateway():
start = EmptyStartEvent()
Expand All @@ -78,6 +95,10 @@ def test_get_allowed_start_node_ids_by_condition_parallel_gateway():
assert len(allowed_start_node_ids) == 3
assert allowed_start_node_ids == [start.id, act_1.id, cpg.id]

skipped_execute_node_ids = get_skipped_execute_node_ids(pipeline, cpg.id)
assert len(skipped_execute_node_ids) == 2
assert set(skipped_execute_node_ids) == {start.id, act_1.id}


def test_get_allowed_start_node_ids_by_normal():
start = EmptyStartEvent()
Expand Down
Loading