diff --git a/bamboo_engine/builder/builder.py b/bamboo_engine/builder/builder.py index 92a5eb53..26a7e84c 100644 --- a/bamboo_engine/builder/builder.py +++ b/bamboo_engine/builder/builder.py @@ -16,10 +16,10 @@ from bamboo_engine.utils.string import unique_id +from ..validator.connection import validate_graph_without_circle from .flow.data import Data, Params from .flow.event import ExecutableEndEvent - __all__ = ["build_tree"] __skeleton = { @@ -94,6 +94,177 @@ def build_tree(start_elem, id=None, data=None): return tree +def _get_next_node(node, pipeline_tree): + """ + 获取当前节点的下一个节点 + """ + + out_goings = node["outgoing"] + + # 当只有一个输出时, + if not isinstance(out_goings, list): + out_goings = [out_goings] + + next_nodes = [] + for out_going in out_goings: + target_id = pipeline_tree["flows"][out_going]["target"] + if target_id in pipeline_tree["activities"]: + next_nodes.append(pipeline_tree["activities"][target_id]) + elif target_id in pipeline_tree["gateways"]: + next_nodes.append(pipeline_tree["gateways"][target_id]) + elif target_id == pipeline_tree["end_event"]["id"]: + next_nodes.append(pipeline_tree["end_event"]) + + return next_nodes + + +def _get_all_nodes(pipeline_tree: dict, with_subprocess: bool = False) -> dict: + """ + 获取 pipeline_tree 中所有 activity 的信息 + + :param pipeline_tree: pipeline web tree + :param with_subprocess: 是否是子流程的 tree + :return: 包含 pipeline_tree 中所有 activity 的字典(包括子流程的 acitivity) + """ + all_nodes = {} + all_nodes.update(pipeline_tree["activities"]) + all_nodes.update(pipeline_tree["gateways"]) + all_nodes.update( + { + pipeline_tree["start_event"]["id"]: pipeline_tree["start_event"], + pipeline_tree["end_event"]["id"]: pipeline_tree["end_event"], + } + ) + if with_subprocess: + for act in pipeline_tree["activities"].values(): + if act["type"] == "SubProcess": + all_nodes.update(_get_all_nodes(act["pipeline"], with_subprocess=True)) + return all_nodes + + +def _delete_flow_id_from_node_io(node, flow_id, io_type): + """ + 删除节点的某条连线,io_type(incoming or outgoing) + """ + if node[io_type] == flow_id: + node[io_type] = "" + elif isinstance(node[io_type], list): + if len(node[io_type]) == 1 and node[io_type][0] == flow_id: + node[io_type] = ( + "" if node["type"] not in ["ExclusiveGateway", "ParallelGateway", "ConditionalParallelGateway"] else [] + ) + else: + node[io_type].pop(node[io_type].index(flow_id)) + + # recover to original format + if ( + len(node[io_type]) == 1 + and io_type == "outgoing" + and node["type"] in ["EmptyStartEvent", "ServiceActivity", "ConvergeGateway"] + ): + node[io_type] = node[io_type][0] + + +def _acyclic(pipeline): + """ + @summary: 逆转反向边 + @return: + """ + + pipeline["all_nodes"] = _get_all_nodes(pipeline, with_subprocess=True) + + deformed_flows = { + "{}.{}".format(flow["source"], flow["target"]): flow_id for flow_id, flow in pipeline["flows"].items() + } + while True: + no_circle = validate_graph_without_circle(pipeline) + if no_circle["result"]: + break + source = no_circle["error_data"][-2] + target = no_circle["error_data"][-1] + circle_flow_key = "{}.{}".format(source, target) + flow_id = deformed_flows[circle_flow_key] + pipeline["flows"][flow_id].update({"source": target, "target": source}) + + source_node = pipeline["all_nodes"][source] + _delete_flow_id_from_node_io(source_node, flow_id, "outgoing") + + target_node = pipeline["all_nodes"][target] + _delete_flow_id_from_node_io(target_node, flow_id, "incoming") + + +def generate_pipeline_token(pipeline_tree): + tree = copy.deepcopy(pipeline_tree) + # 去环 + _acyclic(tree) + + start_node = tree["start_event"] + token = unique_id("t") + node_token_map = {start_node["id"]: token} + inject_pipeline_token(start_node, tree, node_token_map, token) + return node_token_map + + +# 需要处理子流程的问题 +def inject_pipeline_token(node, pipeline_tree, node_token_map, token): + # 如果是网关 + if node["type"] in ["ParallelGateway", "ExclusiveGateway", "ConditionalParallelGateway"]: + next_nodes = _get_next_node(node, pipeline_tree) + node_token = unique_id("t") + target_node = None + for next_node in next_nodes: + # 分支网关各个分支token相同 + node_token_map[next_node["id"]] = node_token + # 并行网关token不同 + if node["type"] in ["ParallelGateway", "ConditionalParallelGateway"]: + node_token = unique_id("t") + node_token_map[next_node["id"]] = node_token + + # 如果是网关,沿着路径向内搜索,最终遇到对应的分支网关会返回 + target_node = inject_pipeline_token(next_node, pipeline_tree, node_token_map, node_token) + + if target_node is None: + return + + # 汇聚网关可以直连结束节点,所以可能会存在找不到对应的汇聚网关的情况 + if target_node["type"] == "EmptyEndEvent": + node_token_map[target_node["id"]] = token + return + # 汇聚网关的token等于对应的网关的token + node_token_map[target_node["id"]] = token + # 到汇聚网关之后,此时继续向下遍历 + next_node = _get_next_node(target_node, pipeline_tree)[0] + # 汇聚网关只会有一个出度 + node_token_map[next_node["id"]] = token + return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token) + + # 如果是汇聚网关,并且id等于converge_id,说明此时遍历在某个单元 + if node["type"] == "ConvergeGateway": + return node + + # 如果是普通的节点,说明只有一个出度,此时直接向下遍历就好 + if node["type"] in ["ServiceActivity", "EmptyStartEvent"]: + next_node = _get_next_node(node, pipeline_tree)[0] + node_token_map[next_node["id"]] = token + return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token) + + # 如果遇到结束节点,直接返回 + if node["type"] == "EmptyEndEvent": + return node + + if node["type"] == "SubProcess": + subprocess_pipeline_tree = node["pipeline"] + subprocess_start_node = subprocess_pipeline_tree["start_event"] + subprocess_start_node_token = unique_id("t") + node_token_map[subprocess_start_node["id"]] = subprocess_start_node_token + inject_pipeline_token( + subprocess_start_node, subprocess_pipeline_tree, node_token_map, subprocess_start_node_token + ) + next_node = _get_next_node(node, pipeline_tree)[0] + node_token_map[next_node["id"]] = token + return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token) + + def __update(tree, elem): node_type = __node_type[elem.type()] node = tree[node_type] if node_type == "end_event" else tree[node_type][elem.id] diff --git a/bamboo_engine/config.py b/bamboo_engine/config.py index bebf2d21..f4a02cc5 100644 --- a/bamboo_engine/config.py +++ b/bamboo_engine/config.py @@ -70,3 +70,5 @@ class Settings: PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value + + PIPELINE_ENABLE_ROLLBACK = False diff --git a/bamboo_engine/engine.py b/bamboo_engine/engine.py index 6f929705..c7d5ed7b 100644 --- a/bamboo_engine/engine.py +++ b/bamboo_engine/engine.py @@ -57,6 +57,7 @@ setup_gauge, setup_histogram, ) +from .utils.constants import RuntimeSettings from .utils.host import get_hostname from .utils.string import get_lower_case_name @@ -122,6 +123,7 @@ def run_pipeline( process_id = self.runtime.prepare_run_pipeline( pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options ) + # execute from start event self.runtime.execute( process_id=process_id, @@ -912,7 +914,8 @@ def execute( # 设置状态前检测 if node_state.name not in states.INVERTED_TRANSITION[states.RUNNING]: logger.info( - "[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING for exist state", # noqa + "[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING " + "for exist state", process_info.root_pipeline_id, node_state.name, ) @@ -1010,6 +1013,15 @@ def execute( hook=HookType.NODE_FINISH, node=node, ) + if node.type == NodeType.ServiceActivity and self.runtime.get_config( + RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value + ): + self._set_snapshot(root_pipeline_id, node) + # 判断是否已经预约了回滚,如果已经预约,则kill掉当前的process,直接return + if node.reserve_rollback: + self.runtime.die(process_id) + self.runtime.start_rollback(root_pipeline_id, node_id) + return # 进程是否要进入睡眠 if execute_result.should_sleep: @@ -1177,7 +1189,9 @@ def schedule( # only retry at multiple calback type if schedule.type is not ScheduleType.MULTIPLE_CALLBACK: logger.info( - "root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, will not retry to get lock", # noqa + "root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, " + "will not retry to get lock", + # noqa root_pipeline_id, schedule_id, node_id, @@ -1290,6 +1304,15 @@ def schedule( node=node, callback_data=callback_data, ) + if node.type == NodeType.ServiceActivity and self.runtime.get_config( + RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value + ): + self._set_snapshot(root_pipeline_id, node) + # 判断是否已经预约了回滚,如果已经预约,启动回滚流程 + if node.reserve_rollback: + self.runtime.start_rollback(root_pipeline_id, node_id) + return + self.runtime.execute( process_id=process_id, node_id=schedule_result.next_node_id, @@ -1302,6 +1325,20 @@ def schedule( time.time() - engine_post_schedule_start_at ) + def _set_snapshot(self, root_pipeline_id, node): + inputs = self.runtime.get_execution_data_inputs(node.id) + outputs = self.runtime.get_execution_data_outputs(node.id) + root_pipeline_input = {key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()} + self.runtime.set_node_snapshot( + root_pipeline_id=root_pipeline_id, + node_id=node.id, + code=node.code, + version=node.version, + context_values=root_pipeline_input, + inputs=inputs, + outputs=outputs, + ) + def _add_history( self, node_id: str, diff --git a/bamboo_engine/eri/interfaces.py b/bamboo_engine/eri/interfaces.py index 99c722db..8ac6fb23 100644 --- a/bamboo_engine/eri/interfaces.py +++ b/bamboo_engine/eri/interfaces.py @@ -1577,6 +1577,35 @@ def get_config(self, name): """ +class RollbackMixin: + @abstractmethod + def set_pipeline_token(self, pipeline_tree: dict): + """ + 设置pipeline token + """ + + @abstractmethod + def set_node_snapshot( + self, + root_pipeline_id: str, + node_id: str, + code: str, + version: str, + context_values: dict, + inputs: dict, + outputs: dict, + ): + """ + 创建一份节点快照 + """ + + @abstractmethod + def start_rollback(self, root_pipeline_id: str, node_id: str): + """ + 开始回滚 + """ + + class EngineRuntimeInterface( PluginManagerMixin, EngineAPIHooksMixin, @@ -1591,6 +1620,7 @@ class EngineRuntimeInterface( ExecutionHistoryMixin, InterruptMixin, ConfigMixin, + RollbackMixin, metaclass=ABCMeta, ): @abstractmethod diff --git a/bamboo_engine/eri/models/node.py b/bamboo_engine/eri/models/node.py index 05c7ce21..ba34862a 100644 --- a/bamboo_engine/eri/models/node.py +++ b/bamboo_engine/eri/models/node.py @@ -12,7 +12,7 @@ """ from enum import Enum -from typing import List, Dict +from typing import Dict, List from bamboo_engine.utils.object import Representable @@ -49,7 +49,8 @@ def __init__( parent_pipeline_id: str, can_skip: bool = True, can_retry: bool = True, - name: str = None + name: str = None, + reserve_rollback: bool = False, ): """ @@ -82,6 +83,7 @@ def __init__( self.can_skip = can_skip self.can_retry = can_retry self.name = name + self.reserve_rollback = reserve_rollback class EmptyStartEvent(Node): diff --git a/bamboo_engine/states.py b/bamboo_engine/states.py index 6a4a460d..319960b1 100644 --- a/bamboo_engine/states.py +++ b/bamboo_engine/states.py @@ -29,6 +29,9 @@ class StateType(Enum): FINISHED = "FINISHED" FAILED = "FAILED" REVOKED = "REVOKED" + ROLLING_BACK = "ROLLING_BACK" + ROLL_BACK_SUCCESS = "ROLL_BACK_SUCCESS" + ROLL_BACK_FAILED = "ROLL_BACK_FAILED" CREATED = StateType.CREATED.value @@ -39,8 +42,11 @@ class StateType(Enum): FINISHED = StateType.FINISHED.value FAILED = StateType.FAILED.value REVOKED = StateType.REVOKED.value +ROLLING_BACK = StateType.ROLLING_BACK.value +ROLL_BACK_SUCCESS = StateType.ROLL_BACK_SUCCESS.value +ROLL_BACK_FAILED = StateType.ROLL_BACK_FAILED.value -ALL_STATES = frozenset([READY, RUNNING, SUSPENDED, BLOCKED, FINISHED, FAILED, REVOKED]) +ALL_STATES = frozenset([READY, RUNNING, SUSPENDED, BLOCKED, FINISHED, FAILED, REVOKED, ROLLING_BACK]) ARCHIVED_STATES = frozenset([FINISHED, FAILED, REVOKED]) SLEEP_STATES = frozenset([SUSPENDED, REVOKED]) @@ -51,18 +57,20 @@ class StateType(Enum): TRANSITION = ConstantDict( { READY: frozenset([RUNNING, SUSPENDED]), - RUNNING: frozenset([FINISHED, FAILED, REVOKED, SUSPENDED]), - SUSPENDED: frozenset([READY, REVOKED, RUNNING]), + RUNNING: frozenset([FINISHED, FAILED, REVOKED, SUSPENDED, ROLLING_BACK]), + SUSPENDED: frozenset([READY, REVOKED, RUNNING, ROLLING_BACK]), BLOCKED: frozenset([]), - FINISHED: frozenset([RUNNING, FAILED]), + FINISHED: frozenset([RUNNING, FAILED, ROLLING_BACK]), FAILED: frozenset([READY, FINISHED]), REVOKED: frozenset([]), + ROLLING_BACK: frozenset([ROLL_BACK_SUCCESS, ROLL_BACK_FAILED]), + ROLL_BACK_SUCCESS: frozenset([READY, FINISHED]), + ROLL_BACK_FAILED: frozenset([READY, FINISHED, ROLLING_BACK]), } ) def can_transit(from_state, to_state): - if from_state in TRANSITION: if to_state in TRANSITION[from_state]: return True diff --git a/bamboo_engine/utils/constants.py b/bamboo_engine/utils/constants.py index 1eb0f7df..5f458433 100644 --- a/bamboo_engine/utils/constants.py +++ b/bamboo_engine/utils/constants.py @@ -35,9 +35,11 @@ class ExclusiveGatewayStrategy(Enum): class RuntimeSettings(Enum): PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = "PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC" PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = "PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY" + PIPELINE_ENABLE_ROLLBACK = "PIPELINE_ENABLE_ROLLBACK" RUNTIME_ALLOWED_CONFIG = [ RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC.value, RuntimeSettings.PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY.value, + RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value, ] diff --git a/bamboo_engine/utils/graph.py b/bamboo_engine/utils/graph.py index c3232c25..df162a6f 100644 --- a/bamboo_engine/utils/graph.py +++ b/bamboo_engine/utils/graph.py @@ -60,6 +60,55 @@ def get_cycle(self): return [] +class RollbackGraph(Graph): + def __init__(self, nodes=None, flows=None): + self.nodes = nodes or [] + self.flows = flows or [] + super().__init__(self.nodes, self.flows) + self.edges = self.build_edges() + self.path = [] + self.last_visited_node = "" + self.graph = {node: [] for node in self.nodes} + for flow in self.flows: + self.graph[flow[0]].append(flow[1]) + + def build_edges(self): + edges = {} + for flow in self.flows: + edges.setdefault(flow[0], set()).add(flow[1]) + return edges + + def add_node(self, node): + if node not in self.nodes: + self.nodes.append(node) + + def add_edge(self, source, target): + self.flows.append([source, target]) + self.edges.setdefault(source, set()).add(target) + + def next(self, node): + return self.edges.get(node, {}) + + def reverse(self): + graph = RollbackGraph() + graph.nodes = self.nodes + for flow in self.flows: + graph.add_edge(flow[1], flow[0]) + + return graph + + def in_degrees(self): + ingress = {node: 0 for node in self.nodes} + for node, targets in self.edges.items(): + for target in targets: + ingress[target] += 1 + + return ingress + + def as_dict(self): + return {"nodes": self.nodes, "flows": self.flows} + + if __name__ == "__main__": graph1 = Graph([1, 2, 3, 4], [[1, 2], [2, 3], [3, 4]]) assert not graph1.has_cycle() diff --git a/docs/assets/img/rollback/rollback.png b/docs/assets/img/rollback/rollback.png index 2029187e..537ef21b 100644 Binary files a/docs/assets/img/rollback/rollback.png and b/docs/assets/img/rollback/rollback.png differ diff --git a/docs/user_guide/rollback.md b/docs/user_guide/rollback.md index a4e066e5..2c08268b 100644 --- a/docs/user_guide/rollback.md +++ b/docs/user_guide/rollback.md @@ -1,39 +1,116 @@ ### 功能介绍: -节点回退允许流程回退到某个特定的节点重新开始。**该回退会删除目标节点之后所有已执行过的节点信息和数据,让流程表现的是第一次执行的样子。** -需要注意的流程的回退并不是无限制的,需要满足以下条件的节点才允许回退。 -- 只能回退运行中的流程 -- 子流程暂时不支持回退 -- 目标节点的状态必须为已完成状态 -- 并行网关内的节点不允许回退。并行网关包含条件并行网关和并行网关 -- 网关节点不允许回退 -- 条件网关只允许条件网关内已经运行的节点允许回退,未执行到的分支不允许回退。 +## 回滚配置 -节点在回退前会强制失败掉当前运行的节点,只有流程中没有正在运行的节点时才会开始执行回退逻辑。 -节点回退的过程无法中断,因为中断导致的回退失败可能会导致无法通过再次回退重试成功 +### 开启配置项 -针对如下图的流程,蓝色框所标注的节点是允许回退的节点。 +在开始体验回滚之前,我们需要在django_settings 中开启以下的配置: -![rollback.png](..%2Fassets%2Fimg%2Frollback%2Frollback.png) +```python +PIPELINE_ENABLE_ROLLBACK = True +ROLLBACK_QUEUE = "default_test" +PIPELINE_ENABLE_AUTO_EXECUTE_WHEN_ROLL_BACKED = False +``` -### 使用事例: +其中: +- PIPELINE_ENABLE_ROLLBACK 表示开启回滚 +- ROLLBACK_QUEUE: 表示回滚所使用的队列 +- PIPELINE_ENABLE_AUTO_EXECUTE_WHEN_ROLL_BACKED: 是否开启回滚后自动开始,开启时,回滚到目标节点将会自动开始,未开启时流程回到目标节点将会暂停。 -查询可以回退的节点列表: +### Install App +之后需要在 `INSTALLED_APPS` 增加配置: ```python -from pipeline.contrib.rollback import api +INSTALLED_APPS += ( + "pipeline.contrib.rollback", +) +``` -# 获取该pipeline允许回滚的节点列表 -result = api.get_allowed_rollback_node_id_list(pipeline_id) -node_ids = result.data +### 执行 migrate 操作 +```bash +python manage.py migrate rollback ``` -节点的回退使用非常简单,只需要指定pipeline_id和node_id即可,如下所示: -```python +之后回滚的一切便已经就绪了。 + +## 回滚的边界条件 + +### Token 模式: + + +现阶段的回滚的行为受到严格限制,在TOKEN 模式下,回滚将不能随意的指向某个节点。流程回滚时将沿着原路径依次回滚(如果存在子流程,则先回滚子流程,再继续主流程的回滚)。 +在流程回滚时, 节点的状态机如下: + +![rollback.png](..%2Fassets%2Fimg%2Frollback%2Frollback.png) + +以下是回滚的各项边界条件: + +#### 任务状态 + +只有处于 `RUNNING` 和 `ROLL_BACK_FAILED` 状态的任务才允许回滚。当任务处于结束,暂停时,将不允许回滚。 + +#### 任务节点 + +在 token 模式下,**回滚的起始和目标节点的token须保持一致**。同时不允许同 token下 **存在正在运行的节点**。 + +##### 回滚的起点 + +- 回滚开始的节点的状态只支持`FINISHED` 或 `FAILED`, 正在运行的节点将不允许回滚。 +- 回滚开始的节点必须是流程当前正在运行的节点之一,也就是流程的末端节点。 + +##### 回滚的目标节点 + +- 回滚的目标节点只支持`FINISHED` 状态,回滚的目标节点只支持任务类型的节点,网关节点不支持回滚。 + +#### 回滚预约 +- 回滚预约只能预约为RUNNING状态的节点,当该节点结束时,将自动开始回滚。 +- **一个流程同时只能有一个预约回滚的任务** + + +### ANY 模式: + +当为ANY 模式的回滚时,流程可以从任何地方开始,回滚到之前的任意节点上去,此时流程**将不会按照路径调用回滚(不会调用节点的rollback方法)**,而是直接回到目标节点,并删除回滚路径上已经执行过的节点信息,从目标位置开始。 + +#### 任务状态 + +只有处于 `RUNNING` 的任务才允许回滚。当任务处于结束,暂停时,将不允许回滚。 + +#### 任务节点 + +在 any 模式下,回滚的边界条件将少得多,由于 any 状态下的回滚将直接回到目标节点并开始,类似于任务的任意节点跳转。 + +在 any 模式下,回滚开始前**不允许当前流程存在处于 running 状态的节点。** + +##### 回滚的起点 + +- 回滚开始的节点必须是流程当前正在运行的节点,也就是流程的末端节点。 +- -回滚开始的节点的状态只支持`FINISHED` 或 `FAILED`, 正在运行的节点将不再允许回滚。 + +##### 回滚的目标节点 + +- 回滚的目标节点只支持`FINISHED` 状态,回滚的目标节点只支持任务节点类型。 + +#### 回滚预约 +- 回滚预约只能预约为running状态的节点,当该节点结束时,将自动开始回滚。 +- **一个流程同时只能有一个预约回滚的任务** + +#### 回滚的使用: + +``` python from pipeline.contrib.rollback import api -result = api.rollback(pipeline_id, node_id) -if result.result: - pass -``` \ No newline at end of file +# 节点回滚,其中mode 有 TOKEN 和 ANY 两种模式可选 +api.rollback(root_pipeline_id, start_node_id, target_node_id, mode="TOKEN") + +# 回滚预约 +api.reserve_rollback(root_pipeline_id, start_node_id, target_node_id, mode="TOKEN") + +# 取消回滚预约 +api.cancel_reserved_rollback(root_pipeline_id, start_node_id, target_node_id, mode="TOKEN") + + +# 获取本次回滚支持的范围 +api.get_allowed_rollback_node_id_list(root_pipeline_id, start_node_id, mode="TOKEN") + +``` diff --git a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py index 437cfe9d..5f52c1a6 100644 --- a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py +++ b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py @@ -104,3 +104,5 @@ # 是否开启PIPELINE HOOKS 事件通知 ENABLE_PIPELINE_EVENT_SIGNALS = getattr(settings, "ENABLE_PIPELINE_EVENT_SIGNALS", False) + +ROLLBACK_QUEUE = getattr(settings, "ROLLBACK_QUEUE", "rollback") diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py index d123c1a7..f8deac76 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py @@ -10,23 +10,40 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from pipeline.contrib.rollback.handler import RollBackHandler +from pipeline.contrib.rollback.constants import TOKEN +from pipeline.contrib.rollback.handler import RollbackDispatcher from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result @ensure_return_pipeline_contrib_api_result -def rollback(root_pipeline_id: str, node_id: str): +def rollback( + root_pipeline_id: str, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, mode: str = TOKEN +): """ :param root_pipeline_id: pipeline id - :param node_id: 节点 id + :param start_node_id: 开始的 id + :param target_node_id: 开始的 id + :param mode 回滚模式 :return: True or False - - 回退的思路是,先搜索计算出来当前允许跳过的节点,在计算的过程中网关节点会合并成一个节点 - 只允许回退到已经执行过的节点 """ - RollBackHandler(root_pipeline_id, node_id).rollback() + RollbackDispatcher(root_pipeline_id, mode).rollback(start_node_id, target_node_id) + + +@ensure_return_pipeline_contrib_api_result +def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): + RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id) + + +@ensure_return_pipeline_contrib_api_result +def cancel_reserved_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): + RollbackDispatcher(root_pipeline_id, mode).cancel_reserved_rollback(start_node_id, target_node_id) + + +@ensure_return_pipeline_contrib_api_result +def retry_rollback_failed_node(root_pipeline_id: str, node_id: str, retry_data: dict = None, mode: str = TOKEN): + RollbackDispatcher(root_pipeline_id, mode).retry_rollback_failed_node(node_id, retry_data) @ensure_return_pipeline_contrib_api_result -def get_allowed_rollback_node_id_list(root_pipeline_id: str): - return RollBackHandler(root_pipeline_id, None).get_allowed_rollback_node_id_list() +def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN): + return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/apps.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/apps.py new file mode 100644 index 00000000..5072026b --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/apps.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.apps import AppConfig + + +class RollbackConfig(AppConfig): + name = "pipeline.contrib.rollback" + verbose_name = "PipelineRollback" + + def ready(self): + pass diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/constants.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/constants.py new file mode 100644 index 00000000..3798671b --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/constants.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# 回滚开始的标识位 +START_FLAG = "START" + +# 回滚结束的标志位 +END_FLAG = "END" + +ANY = "ANY" # 任意跳转模式,此时将不再检查token,可以任意回退到指定节点 +TOKEN = "TOKEN" # TOKEN 跳转模式,只允许跳转到指定的范围的节点 diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/graph.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/graph.py new file mode 100644 index 00000000..8b2863cd --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/graph.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +import copy + +from pipeline.contrib.rollback import constants +from pipeline.core.constants import PE + +from bamboo_engine.utils.graph import RollbackGraph + + +class CycleHandler: + """ + 环处理器,作用是去除拓扑中的环 + """ + + def __init__(self, node_map): + self.node_map = copy.deepcopy(node_map) + + def get_nodes_and_edges(self): + """ + 从node_map 中解析出环和边 + """ + nodes = [] + edges = [] + for node, value in self.node_map.items(): + nodes.append(node) + targets = value["targets"] + for target in targets.values(): + # 过滤掉那些没有执行的分支 + if target not in self.node_map: + continue + edges.append([node, target]) + return nodes, edges + + def has_cycle(self, nodes, edges) -> list: + """ + 判断是否有环,存在环是,将返回一个有效的list + """ + graph = RollbackGraph(nodes, edges) + return graph.get_cycle() + + def delete_edge(self, source, target): + """ + 删除环边 + """ + targets = self.node_map[source]["targets"] + + keys_to_remove = [] + for key, val in targets.items(): + if val == target: + keys_to_remove.append(key) + for key in keys_to_remove: + del targets[key] + + def remove_cycle(self): + while True: + nodes, edges = self.get_nodes_and_edges() + cycles = self.has_cycle(nodes, edges) + if not cycles: + break + source = cycles[-2] + target = cycles[-1] + self.delete_edge(source, target) + return self.node_map + + +class RollbackGraphHandler: + def __init__(self, node_map, start_id, target_id): + self.graph = RollbackGraph() + # 回滚开始的节点 + self.start_id = start_id + # 回滚结束的节点 + self.target_id = target_id + self.graph.add_node(start_id) + self.graph.add_node(target_id) + # 去除自环边 + self.node_map = CycleHandler(node_map).remove_cycle() + # 其他不参与回滚,但是需要被清理的节点,主要是网关节点和子流程节点 + self.others_nodes = [] + + def build(self, node_id, source_id=None): + """ + 使用递归构建用于回滚的图谱,最终会生成一条连线 source_id -> node_id + @param node_id 本次遍历到的节点id + @param source_id 上一个遍历到的节点id + """ + node_detail = self.node_map.get(node_id) + if node_detail is None: + return + node_type = node_detail["type"] + + if node_type not in [PE.ServiceActivity]: + self.others_nodes.append(node_id) + + if node_type == PE.ServiceActivity: + next_node_id = node_detail.get("id") + self.graph.add_node(next_node_id) + if source_id and source_id != next_node_id: + self.graph.add_edge(source_id, next_node_id) + + # 如果遍历到目标节点,则返回 + if node_id == self.start_id: + return + source_id = next_node_id + targets = node_detail.get("targets", {}).values() + elif node_type == PE.SubProcess: + # 处理子流程 + source_id = self.build(node_detail["start_event_id"], source_id) + targets = node_detail.get("targets", {}).values() + elif node_type == PE.ExclusiveGateway: + targets = [target for target in node_detail.get("targets", {}).values() if target in self.node_map.keys()] + else: + targets = node_detail.get("targets", {}).values() + + # 为了避免循环的过程中source_id值被覆盖,需要额外临时存储source_id + temporary_source_id = source_id + for target in targets: + source_id = self.build(target, temporary_source_id) + + return source_id + + def build_rollback_graph(self): + """ + 这里将会从结束的节点往开始的节点进行遍历,之后再反转图 + """ + self.graph.add_node(constants.END_FLAG) + # 未整个流程加上结束节点 + self.graph.add_edge(constants.END_FLAG, self.target_id) + self.graph.add_node(constants.START_FLAG) + self.graph.add_edge(self.start_id, constants.START_FLAG) + self.build(self.target_id, self.target_id) + + return self.graph.reverse(), self.others_nodes diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py index b11fff64..a6394044 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py @@ -14,197 +14,443 @@ import json from django.db import transaction +from django.db.models import Q +from pipeline.conf.default_settings import ROLLBACK_QUEUE from pipeline.contrib.exceptions import RollBackException -from pipeline.core.constants import PE -from pipeline.eri.models import ( - CallbackData, - ExecutionData, - ExecutionHistory, - LogEntry, - Node, - Process, - Schedule, - State, +from pipeline.contrib.rollback import constants +from pipeline.contrib.rollback.constants import ANY, TOKEN +from pipeline.contrib.rollback.graph import RollbackGraphHandler +from pipeline.contrib.rollback.models import ( + RollbackPlan, + RollbackSnapshot, + RollbackToken, ) +from pipeline.contrib.rollback.tasks import any_rollback, token_rollback +from pipeline.core.constants import PE +from pipeline.eri.models import Node, Process, State from pipeline.eri.runtime import BambooDjangoRuntime -from bamboo_engine import api, states +from bamboo_engine import states -class RollBackHandler: - def __init__(self, root_pipeline_id, node_id): - self.root_pipeline_id = root_pipeline_id - self.node_id = node_id - self.runtime = BambooDjangoRuntime() +class RollbackValidator: + @staticmethod + def validate_pipeline(root_pipeline_id): + pipeline_state = State.objects.filter(node_id=root_pipeline_id).first() + if not pipeline_state: + raise RollBackException( + "rollback failed: pipeline state not exist, pipeline_id={}".format(root_pipeline_id) + ) - def _compute_validate_nodes(self, node_id, node_map, nodes=None): - """ - 计算并得到一个允许回调的节点列表。 - 该方法的实现思路如下,从开始节点开始遍历,通过每个节点的 targets 获取到该节点的下一个节点 - - 对于并行网关和条件并行网关将直接跳过 - - 对于分支网关,则会裁剪只保留执行的那条分支 - - node_map 记录了所有已经执行过的节点的信息,当遍历到node_map中不存在的节点时,意味着已经遍历到了当前未执行的节点 - 此时会停止计算 - """ + if pipeline_state.name not in [states.RUNNING, states.ROLL_BACK_FAILED]: + raise RollBackException( + "rollback failed: the task of non-running state is not allowed to roll back, " + "pipeline_id={}, state={}".format(root_pipeline_id, pipeline_state.name) + ) + + @staticmethod + def validate_node(node_id, allow_failed=False): + node = Node.objects.filter(node_id=node_id).first() + if node is None: + raise RollBackException("rollback failed: node not exist, node={}".format(node_id)) + + node_detail = json.loads(node.detail) + if node_detail["type"] not in [PE.ServiceActivity, PE.EmptyStartEvent]: + raise RollBackException("rollback failed: only allows rollback to ServiceActivity type nodes") - if nodes is None: - nodes = [] - node_detail = node_map.get(node_id) - # 当搜索不到时,说明已经扫描了所有已经执行过的节点了,此时直接结束 - if node_detail is None: - return nodes + target_node_state = State.objects.filter(node_id=node_id).first() - if node_detail["type"] == PE.ServiceActivity: - nodes.append(node_id) + if target_node_state is None: + raise RollBackException("rollback failed: node state not exist, node={}".format(node_id)) - # 对于并行网关,无法跳转到任何路径 - if node_detail["type"] in [PE.ParallelGateway, PE.ConditionalParallelGateway]: - targets = [node_detail.get(PE.converge_gateway_id)] - # 对于分支网关内的, 只允许跳转到已经执行过的路径 - elif node_detail["type"] == PE.ExclusiveGateway: - targets = [target for target in node_detail.get("targets", {}).values() if target in node_map.keys()] - else: - targets = node_detail.get("targets", {}).values() + allow_states = [states.FINISHED] + if allow_failed: + allow_states = [states.FINISHED, states.FAILED] + if target_node_state.name not in allow_states: + raise RollBackException( + "rollback failed: only allows rollback to finished node, allowed states {}".format(allow_states) + ) - for target in targets: - # 如果目标节点已经出现在了node中,说明出现了环,跳过该分支 - if target in nodes: - continue - self._compute_validate_nodes(target, node_map, nodes) + @staticmethod + def validate_token(root_pipeline_id, start_node_id, target_node_id): + try: + rollback_token = RollbackToken.objects.get(root_pipeline_id=root_pipeline_id) + except RollbackToken.DoesNotExist: + raise RollBackException( + "rollback failed: pipeline token not exist, pipeline_id={}".format(root_pipeline_id) + ) - return nodes + tokens = json.loads(rollback_token.token) + + start_node_token = tokens.get(start_node_id) + target_node_token = tokens.get(target_node_id) - def _clean_engine_data(self, target_state): + if start_node_token is None or target_node_token is None: + raise RollBackException("rollback failed: token not found, pipeline_id={}".format(root_pipeline_id)) + + if start_node_token != target_node_token: + raise RollBackException( + "rollback failed: start node token must equal target node, pipeline_id={}".format(root_pipeline_id) + ) + + @staticmethod + def validate_node_state_by_token_mode(root_pipeline_id, start_node_id): """ - 执行清理工作 + 使用token模式下的回滚,相同token的节点不允许有正在运行的节点 """ - # 获取当前正在运行的节点 - state_list = State.objects.filter(root_id=self.root_pipeline_id, name=states.RUNNING).exclude( - node_id=self.root_pipeline_id - ) - for state in state_list: - # 强制失败这些节点 - result = api.forced_fail_activity(self.runtime, node_id=state.node_id, ex_data="") - if not result.result: - raise RollBackException( - "rollback failed: forced_fail_activity failed, node_id={}, message={}".format( - target_state.node_id, result.message - ) - ) + try: + rollback_token = RollbackToken.objects.get(root_pipeline_id=root_pipeline_id) + except RollbackToken.DoesNotExist: + raise RollBackException( + "rollback failed: pipeline token not exist, pipeline_id={}".format(root_pipeline_id) + ) - # 之后清理多余的进程信息,只保留主process即可。 - Process.objects.filter(root_pipeline_id=self.root_pipeline_id).exclude(parent_id=-1).delete() + tokens = json.loads(rollback_token.token) + start_token = tokens.get(start_node_id) + if start_token is None: + raise RollBackException("rollback failed: can't find the not token, node_id={}".format(start_node_id)) - # 查询到所有在该节点之后创建的状态信息 - need_clean_node_id_list = list( - State.objects.filter(root_id=self.root_pipeline_id, created_time__gt=target_state.created_time).values_list( - "node_id", flat=True - ) - ) - # 同时清理掉目标节点的信息 - need_clean_node_id_list.append(target_state.node_id) + node_id_list = [] + for node_id, token in node_id_list: + if token == start_token: + node_id_list.append(node_id) - # 清理状态信息 - State.objects.filter(root_id=self.root_pipeline_id, node_id__in=need_clean_node_id_list).delete() - # 清理Schedule 信息 - Schedule.objects.filter(node_id__in=need_clean_node_id_list).delete() - # 清理日志信息 - LogEntry.objects.filter(node_id__in=need_clean_node_id_list).delete() - ExecutionHistory.objects.filter(node_id__in=need_clean_node_id_list).delete() - ExecutionData.objects.filter(node_id__in=need_clean_node_id_list).delete() - CallbackData.objects.filter(node_id__in=need_clean_node_id_list).delete() + if State.objects.filter(node_id__in=node_id_list, name=states.RUNNING).exists(): + raise RollBackException( + "rollback failed: there is currently the same node that the same token is running, node_id={}".format( + start_node_id + ) + ) - def get_allowed_rollback_node_id_list(self): + @staticmethod + def validate_node_state_by_any_mode(root_pipeline_id): + """ + 使用any模式下的回滚, 不允许有正在运行的节点 """ - 获取允许回退的节点id列表 + if ( + State.objects.filter(root_id=root_pipeline_id, name=states.RUNNING) + .exclude(node_id=root_pipeline_id) + .exists() + ): + raise RollBackException("rollback failed: there is currently the some node is running") + + @staticmethod + def validate_start_node_id(root_pipeline_id, start_node_id): """ - # 不需要遍历整颗树,获取到现在已经执行成功的所有列表 + 回滚的开始节点必须是流程的末尾节点 + """ + if not Process.objects.filter(root_pipeline_id=root_pipeline_id, current_node_id=start_node_id).exists(): + raise RollBackException("rollback failed: The node to be rolled back must be the current node!") + + +class BaseRollbackHandler: + mode = None + + def __init__(self, root_pipeline_id): + self.root_pipeline_id = root_pipeline_id + self.runtime = BambooDjangoRuntime() + # 检查pipeline 回滚的合法性 + RollbackValidator.validate_pipeline(root_pipeline_id) + + def get_allowed_rollback_node_id_list(self, start_node_id): + return [] + + def _get_allowed_rollback_node_map(self): + # 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表 finished_node_id_list = ( - State.objects.filter(root_id=self.root_pipeline_id, name=states.FINISHED) + State.objects.filter(root_id=self.root_pipeline_id, name__in=[states.FINISHED, states.FAILED]) .exclude(node_id=self.root_pipeline_id) .values_list("node_id", flat=True) ) + node_detail_list = Node.objects.filter(node_id__in=finished_node_id_list) + # 获取node_id 到 node_detail的映射 + return {n.node_id: json.loads(n.detail) for n in node_detail_list} + + def _reserve(self, start_node_id, target_node_id, reserve_rollback=True): + # 节点预约 需要在 Node 里面 插入 reserve_rollback = True, 为 True的节点执行完将暂停 + RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id) + RollbackValidator.validate_node(target_node_id) + node = Node.objects.filter(node_id=start_node_id).first() + if node is None: + raise RollBackException("reserve rollback failed, the node is not exists, node_id={}".format(start_node_id)) + + state = State.objects.filter(node_id=start_node_id).first() + if state is None: + raise RollBackException( + "reserve rollback failed, the node state is not exists, node_id={}".format(start_node_id) + ) + + # 不在执行中的节点不允许预约 + if state.name != states.RUNNING: + raise RollBackException( + "reserve rollback failed, the node state is not Running, current state={}, node_id={}".format( + state.name, start_node_id + ) + ) - # 获取到除pipeline节点之外第一个被创建的节点,此时是开始节点 + with transaction.atomic(): + if reserve_rollback: + # 一个流程只能同时拥有一个预约任务 + if RollbackPlan.objects.filter(root_pipeline_id=self.root_pipeline_id, is_expired=False).exists(): + raise RollBackException( + "reserve rollback failed, the rollbackPlan, current state={}, node_id={}".format( + state.name, start_node_id + ) + ) + RollbackPlan.objects.create( + root_pipeline_id=self.root_pipeline_id, + start_node_id=start_node_id, + target_node_id=target_node_id, + mode=self.mode, + ) + else: + # 取消回滚,删除所有的任务 + RollbackPlan.objects.filter(root_pipeline_id=self.root_pipeline_id, start_node_id=start_node_id).update( + is_expired=True + ) + + node_detail = json.loads(node.detail) + node_detail["reserve_rollback"] = reserve_rollback + node.detail = json.dumps(node_detail) + node.save() + + def reserve_rollback(self, start_node_id, target_node_id): + """ + 预约回滚 + """ + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + self._reserve(start_node_id, target_node_id) + + def cancel_reserved_rollback(self, start_node_id, target_node_id): + """ + 取消预约回滚 + """ + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + self._reserve(start_node_id, target_node_id, reserve_rollback=False) + + +class AnyRollbackHandler(BaseRollbackHandler): + mode = ANY + + def get_allowed_rollback_node_id_list(self, start_node_id): + node_map = self._get_allowed_rollback_node_map() start_node_state = ( State.objects.filter(root_id=self.root_pipeline_id) .exclude(node_id=self.root_pipeline_id) .order_by("created_time") .first() ) + target_node_id = start_node_state.node_id + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) + graph, _ = rollback_graph.build_rollback_graph() - # 获取到所有当前已经运行完节点的详情 - node_detail_list = Node.objects.filter(node_id__in=finished_node_id_list) - # 获取node_id 到 node_detail的映射 - node_map = {n.node_id: json.loads(n.detail) for n in node_detail_list} + return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id}) - # 计算当前允许跳过的合法的节点 - validate_nodes_list = self._compute_validate_nodes(start_node_state.node_id, node_map) + def retry_rollback_failed_node(self, node_id, retry_data): + """ """ + raise RollBackException("rollback failed: when mode is any, not support retry") - return validate_nodes_list + def reserve_rollback(self, start_node_id, target_node_id): + """ + 预约回滚 + """ + self._reserve(start_node_id, target_node_id) - def rollback(self): - pipeline_state = State.objects.filter(node_id=self.root_pipeline_id).first() - if not pipeline_state: + def cancel_reserved_rollback(self, start_node_id, target_node_id): + """ + 取消预约回滚 + """ + self._reserve(start_node_id, target_node_id, reserve_rollback=False) + + def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): + RollbackValidator.validate_node_state_by_any_mode(self.root_pipeline_id) + # 回滚的开始节点运行失败的情况 + RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id) + RollbackValidator.validate_node(start_node_id, allow_failed=True) + RollbackValidator.validate_node(target_node_id) + + node_map = self._get_allowed_rollback_node_map() + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) + + graph, other_nodes = rollback_graph.build_rollback_graph() + node_access_record = {node: 0 for node in graph.nodes} + + rollback_snapshot = RollbackSnapshot.objects.create( + root_pipeline_id=self.root_pipeline_id, + graph=json.dumps(graph.as_dict()), + node_access_record=json.dumps(node_access_record), + start_node_id=start_node_id, + target_node_id=target_node_id, + other_nodes=json.dumps(other_nodes), + skip_rollback_nodes=json.dumps([]), + ) + + any_rollback.apply_async( + kwargs={"snapshot_id": rollback_snapshot.id}, + queue=ROLLBACK_QUEUE, + ) + + +class TokenRollbackHandler(BaseRollbackHandler): + mode = TOKEN + + def get_allowed_rollback_node_id_list(self, start_node_id): + """ + 获取允许回滚的节点范围 + 规则:token 一致的节点允许回滚 + """ + try: + rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id) + except RollbackToken.DoesNotExist: raise RollBackException( - "rollback failed: pipeline state not exist, pipeline_id={}".format(self.root_pipeline_id) + "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) ) + node_map = self._get_allowed_rollback_node_map() + service_activity_node_list = [ + node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity + ] - if pipeline_state.name != states.RUNNING: + tokens = json.loads(rollback_token.token) + start_token = tokens.get(start_node_id) + if not start_token: + return [] + + nodes = [] + for node_id, token in tokens.items(): + if start_token == token and node_id != start_node_id and node_id in service_activity_node_list: + nodes.append(node_id) + + return nodes + + def retry_rollback_failed_node(self, node_id, retry_data): + """ + 重试回滚失败的节点 + """ + pipeline_state = State.objects.filter(node_id=self.root_pipeline_id).first() + if pipeline_state.name != states.ROLL_BACK_FAILED: raise RollBackException( - "rollback failed: the task of non-running state is not allowed to roll back, pipeline_id={}".format( - self.root_pipeline_id - ) + "rollback failed: only retry the failed pipeline, current_status={}".format(pipeline_state.name) + ) + node_state = State.objects.filter(node_id=node_id).first() + if node_state.name != states.ROLL_BACK_FAILED: + raise RollBackException( + "rollback failed: only retry the failed node, current_status={}".format(node_state.name) ) - node = Node.objects.filter(node_id=self.node_id).first() - if node is None: - raise RollBackException("rollback failed: node not exist, node={}".format(self.node_id)) + # 获取镜像 + try: + rollback_snapshot = RollbackSnapshot.objects.get(root_pipeline_id=self.root_pipeline_id, is_expired=False) + except RollbackSnapshot.DoesNotExist: + raise RollBackException("rollback failed: the rollback snapshot is not exists, please check") + except RollbackSnapshot.MultipleObjectsReturned: + raise RollBackException("rollback failed: found multi not expired rollback snapshot, please check") + + # 重置pipeline的状态为回滚中 + self.runtime.set_state( + node_id=self.root_pipeline_id, + to_state=states.ROLLING_BACK, + ) - node_detail = json.loads(node.detail) - if node_detail["type"] not in [PE.ServiceActivity, PE.EmptyStartEvent]: - raise RollBackException("rollback failed: only allows rollback to ServiceActivity type nodes") + # 驱动这个任务 + token_rollback.apply_async( + kwargs={ + "snapshot_id": rollback_snapshot.id, + "node_id": node_id, + "retry": True, + "retry_data": retry_data, + }, + queue=ROLLBACK_QUEUE, + ) - target_node_state = State.objects.filter(node_id=self.node_id).first() + def _node_state_is_failed(self, node_id): + """ + 判断该节点是不是失败的状态 + """ + node_state = State.objects.filter(node_id=node_id).first() + if node_state.name == states.FAILED: + return True + return False + + def _get_failed_skip_node_id_list(self, node_id_list): + failed_skip_node_id_list = State.objects.filter( + Q(Q(skip=True) | Q(error_ignored=True)) & Q(node_id__in=node_id_list) + ).values_list("node_id", flat=True) + return failed_skip_node_id_list + + def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): + + if skip_rollback_nodes is None: + skip_rollback_nodes = [] + + # 相同token回滚时,不允许有正在运行的节点 + RollbackValidator.validate_node_state_by_token_mode(self.root_pipeline_id, start_node_id) + # 回滚的开始节点运行失败的情况 + RollbackValidator.validate_node(start_node_id, allow_failed=True) + RollbackValidator.validate_node(target_node_id) + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + + # 如果开始节点是失败的情况,则跳过该节点的回滚操作 + if self._node_state_is_failed(start_node_id): + skip_rollback_nodes.append(start_node_id) + + node_map = self._get_allowed_rollback_node_map() + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) + + runtime = BambooDjangoRuntime() + + graph, other_nodes = rollback_graph.build_rollback_graph() + node_access_record = {node: 0 for node in graph.nodes} + + # 所有失败并跳过的节点不再参与回滚 + failed_skip_node_id_list = self._get_failed_skip_node_id_list(node_map.keys()) + skip_rollback_nodes.extend(list(failed_skip_node_id_list)) + + rollback_snapshot = RollbackSnapshot.objects.create( + root_pipeline_id=self.root_pipeline_id, + graph=json.dumps(graph.as_dict()), + node_access_record=json.dumps(node_access_record), + start_node_id=start_node_id, + target_node_id=target_node_id, + other_nodes=json.dumps(other_nodes), + skip_rollback_nodes=json.dumps(skip_rollback_nodes), + ) - if target_node_state is None: - raise RollBackException("rollback failed: node state not exist, node={}".format(self.node_id)) + runtime.set_state( + node_id=self.root_pipeline_id, + to_state=states.ROLLING_BACK, + ) + # 驱动这个任务 + token_rollback.apply_async( + kwargs={ + "snapshot_id": rollback_snapshot.id, + "node_id": constants.START_FLAG, + "retry": False, + "retry_data": None, + }, + queue=ROLLBACK_QUEUE, + ) - if target_node_state.name != states.FINISHED: - raise RollBackException("rollback failed: only allows rollback to finished node") - validate_nodes_list = self.get_allowed_rollback_node_id_list() +class RollbackDispatcher: + def __init__(self, root_pipeline_id, mode): + if mode == ANY: + self.handler = AnyRollbackHandler(root_pipeline_id) + elif mode == TOKEN: + self.handler = TokenRollbackHandler(root_pipeline_id) + else: + raise RollBackException("rollback failed: not support this mode, please check") - if self.node_id not in validate_nodes_list: - raise RollBackException("rollback failed: node is not allow to rollback, node={}".format(self.node_id)) + def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None): + self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes) - with transaction.atomic(): - try: - self._clean_engine_data(target_node_state) - except Exception as e: - raise RollBackException("rollback failed: clean engine data error, error={}".format(str(e))) - - try: - # 将当前住进程的正在运行的节点指向目标ID - main_process = Process.objects.get(root_pipeline_id=self.root_pipeline_id, parent_id=-1) - main_process.current_node_id = self.node_id - main_process.save() - - # 重置该节点的状态信息 - self.runtime.set_state( - node_id=self.node_id, - to_state=states.READY, - is_retry=True, - refresh_version=True, - clear_started_time=True, - clear_archived_time=True, - ) - process_info = self.runtime.get_process_info(main_process.id) - self.runtime.execute( - process_id=process_info.process_id, - node_id=self.node_id, - root_pipeline_id=process_info.root_pipeline_id, - parent_pipeline_id=process_info.top_pipeline_id, - ) - except Exception as e: - raise RollBackException("rollback failed: rollback to node error, error={}".format(str(e))) + def reserve_rollback(self, start_node_id: str, target_node_id: str): + self.handler.reserve_rollback(start_node_id, target_node_id) + + def retry_rollback_failed_node(self, node_id: str, retry_data: dict = None): + self.handler.retry_rollback_failed_node(node_id, retry_data) + + def cancel_reserved_rollback(self, start_node_id: str, target_node_id: str): + self.handler.cancel_reserved_rollback(start_node_id, target_node_id) + + def get_allowed_rollback_node_id_list(self, start_node_id: str): + return self.handler.get_allowed_rollback_node_id_list(start_node_id) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0001_initial.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0001_initial.py new file mode 100644 index 00000000..fd0a18d2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0001_initial.py @@ -0,0 +1,65 @@ +# Generated by Django 3.2.18 on 2023-10-16 07:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="RollbackNodeSnapshot", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("root_pipeline_id", models.CharField(db_index=True, max_length=64, verbose_name="root pipeline id")), + ("node_id", models.CharField(db_index=True, max_length=64, verbose_name="node_id")), + ("code", models.CharField(max_length=64, verbose_name="node_code")), + ("version", models.CharField(max_length=33, verbose_name="version")), + ("inputs", models.TextField(verbose_name="node inputs")), + ("outputs", models.TextField(verbose_name="node outputs")), + ("context_values", models.TextField(verbose_name="pipeline context values")), + ("rolled_back", models.BooleanField(default=False, verbose_name="whether the node rolls back")), + ], + ), + migrations.CreateModel( + name="RollbackPlan", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("root_pipeline_id", models.CharField(db_index=True, max_length=64, verbose_name="root pipeline id")), + ("start_node_id", models.CharField(db_index=True, max_length=64, verbose_name="start node id")), + ("target_node_id", models.CharField(db_index=True, max_length=64, verbose_name="target_node_id")), + ("mode", models.CharField(default="TOKEN", max_length=32, verbose_name="rollback mode")), + ("is_expired", models.BooleanField(default=False, verbose_name="is expired")), + ], + ), + migrations.CreateModel( + name="RollbackSnapshot", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("root_pipeline_id", models.CharField(db_index=True, max_length=64, verbose_name="root pipeline id")), + ("graph", models.TextField(verbose_name="rollback graph")), + ("node_access_record", models.TextField(verbose_name="node access record")), + ("skip_rollback_nodes", models.TextField(verbose_name="skip rollback nodes")), + ("other_nodes", models.TextField(verbose_name="other nodes")), + ("start_node_id", models.CharField(db_index=True, max_length=64, verbose_name="start node id")), + ("target_node_id", models.CharField(db_index=True, max_length=64, verbose_name="target_node_id")), + ("is_expired", models.BooleanField(db_index=True, default=False, verbose_name="is expired")), + ], + ), + migrations.CreateModel( + name="RollbackToken", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("root_pipeline_id", models.CharField(db_index=True, max_length=64, verbose_name="root pipeline id")), + ("token", models.TextField(verbose_name="token map")), + ( + "is_deleted", + models.BooleanField( + db_index=True, default=False, help_text="is deleted", verbose_name="is deleted" + ), + ), + ], + ), + ] diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/__init__.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py new file mode 100644 index 00000000..b482f0a6 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py @@ -0,0 +1,52 @@ +# -*- coding: utf-8 -*- +from django.db import models +from django.utils.translation import ugettext_lazy as _ +from pipeline.contrib.rollback.constants import TOKEN + + +class RollbackToken(models.Model): + """ + 回滚配置token信息 + """ + + root_pipeline_id = models.CharField(verbose_name="root pipeline id", max_length=64, db_index=True) + token = models.TextField(_("token map"), null=False) + is_deleted = models.BooleanField(_("is deleted"), default=False, help_text=_("is deleted"), db_index=True) + + +class RollbackSnapshot(models.Model): + """ + 节点执行的快照信息 + """ + + root_pipeline_id = models.CharField(verbose_name="root pipeline id", max_length=64, db_index=True) + graph = models.TextField(verbose_name="rollback graph", null=False) + node_access_record = models.TextField(verbose_name="node access record") + skip_rollback_nodes = models.TextField(verbose_name="skip rollback nodes") + other_nodes = models.TextField(verbose_name="other nodes") + start_node_id = models.CharField(verbose_name="start node id", max_length=64, db_index=True) + target_node_id = models.CharField(verbose_name="target_node_id", max_length=64, db_index=True) + is_expired = models.BooleanField(verbose_name="is expired", default=False, db_index=True) + + +class RollbackNodeSnapshot(models.Model): + """ + 节点快照 + """ + + root_pipeline_id = models.CharField(verbose_name="root pipeline id", max_length=64, db_index=True) + node_id = models.CharField(verbose_name="node_id", max_length=64, db_index=True) + code = models.CharField(verbose_name="node_code", max_length=64) + version = models.CharField(verbose_name=_("version"), null=False, max_length=33) + inputs = models.TextField(verbose_name=_("node inputs")) + outputs = models.TextField(verbose_name=_("node outputs")) + context_values = models.TextField(verbose_name=_("pipeline context values")) + rolled_back = models.BooleanField(_("whether the node rolls back"), default=False) + + +class RollbackPlan(models.Model): + root_pipeline_id = models.CharField(verbose_name="root pipeline id", max_length=64, db_index=True) + start_node_id = models.CharField(verbose_name="start node id", max_length=64, db_index=True) + target_node_id = models.CharField(verbose_name="target_node_id", max_length=64, db_index=True) + mode = models.CharField(verbose_name="rollback mode", max_length=32, default=TOKEN) + is_expired = models.BooleanField(verbose_name="is expired", default=False) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py new file mode 100644 index 00000000..cabd7c4b --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 -*- +import json +import logging + +from celery import task +from django.conf import settings +from django.db import transaction +from pipeline.conf.default_settings import ROLLBACK_QUEUE +from pipeline.contrib.rollback import constants +from pipeline.contrib.rollback.models import ( + RollbackNodeSnapshot, + RollbackPlan, + RollbackSnapshot, +) +from pipeline.eri.models import CallbackData +from pipeline.eri.models import ExecutionData as DBExecutionData +from pipeline.eri.models import ( + ExecutionHistory, + LogEntry, + Node, + Process, + Schedule, + State, +) +from pipeline.eri.runtime import BambooDjangoRuntime + +from bamboo_engine import states +from bamboo_engine.eri import ExecutionData +from bamboo_engine.utils.graph import RollbackGraph + +logger = logging.getLogger("celery") + + +class RollbackCleaner: + def __init__(self, snapshot): + self.snapshot = snapshot + + def _clear_node_reserve_flag(self, node_id): + node = Node.objects.get(node_id=node_id) + node_detail = json.loads(node.detail) + node_detail["reserve_rollback"] = False + node.detail = json.dumps(node_detail) + node.save() + + def clear_data(self): + # 节点快照需要全部删除,可能会有下一次的回滚 + RollbackNodeSnapshot.objects.filter(root_pipeline_id=self.snapshot.root_pipeline_id).delete() + # 回滚快照需要置为已过期 + RollbackSnapshot.objects.filter(root_pipeline_id=self.snapshot.root_pipeline_id).update(is_expired=True) + # 预约计划需要修改为已过期 + RollbackPlan.objects.filter( + root_pipeline_id=self.snapshot.root_pipeline_id, start_node_id=self.snapshot.start_node_id + ).update(is_expired=True) + # 节点的预约信息需要清理掉 + self._clear_node_reserve_flag(self.snapshot.start_node_id) + # 需要删除该节点的进程信息/非主进程的,防止网关再分支处回滚时,仍然有正在运行的process得不到清理 + Process.objects.filter( + root_pipeline_id=self.snapshot.root_pipeline_id, current_node_id=self.snapshot.start_node_id + ).exclude(parent_id=-1).delete() + + graph = json.loads(self.snapshot.graph) + need_clean_node_id_list = graph["nodes"] + json.loads(self.snapshot.other_nodes) + # 清理状态信息 + State.objects.filter(root_id=self.snapshot.root_pipeline_id, node_id__in=need_clean_node_id_list).delete() + # 清理Schedule 信息 + Schedule.objects.filter(node_id__in=need_clean_node_id_list).delete() + # 清理日志信息 + LogEntry.objects.filter(node_id__in=need_clean_node_id_list).delete() + ExecutionHistory.objects.filter(node_id__in=need_clean_node_id_list).delete() + DBExecutionData.objects.filter(node_id__in=need_clean_node_id_list).delete() + CallbackData.objects.filter(node_id__in=need_clean_node_id_list).delete() + + +class TokenRollbackTaskHandler: + def __init__(self, snapshot_id, node_id, retry, retry_data): + self.snapshot_id = snapshot_id + self.node_id = node_id + self.retry_data = retry_data + self.retry = retry + self.runtime = BambooDjangoRuntime() + + def set_state(self, node_id, state): + logger.info("[TokenRollbackTaskHandler][set_state] set node_id state to {}".format(state)) + # 开始和结束节点直接跳过回滚 + if node_id in [constants.END_FLAG, constants.START_FLAG]: + return + self.runtime.set_state( + node_id=node_id, + to_state=state, + ) + + def execute_rollback(self): + """ + 执行回滚的操作 + """ + if self.node_id in [constants.END_FLAG, constants.START_FLAG]: + return True + + # 获取节点快照,可能会有多多份快照,需要多次回滚 + node_snapshots = RollbackNodeSnapshot.objects.filter(node_id=self.node_id, rolled_back=False).order_by("-id") + for node_snapshot in node_snapshots: + service = self.runtime.get_service(code=node_snapshot.code, version=node_snapshot.version) + data = ExecutionData(inputs=json.loads(node_snapshot.inputs), outputs=json.loads(node_snapshot.outputs)) + parent_data = ExecutionData(inputs=json.loads(node_snapshot.context_values), outputs={}) + result = service.service.rollback(data, parent_data, self.retry_data) + node_snapshot.rolled_back = True + node_snapshot.save() + if not result: + return False + + return True + + def start_pipeline(self, root_pipeline_id, target_node_id): + """ + 启动pipeline + """ + # 将当前住进程的正在运行的节点指向目标ID + main_process = Process.objects.get(root_pipeline_id=root_pipeline_id, parent_id=-1) + main_process.current_node_id = target_node_id + main_process.save() + + # 重置该节点的状态信息 + self.runtime.set_state( + node_id=target_node_id, + to_state=states.READY, + is_retry=True, + refresh_version=True, + clear_started_time=True, + clear_archived_time=True, + ) + + process_info = self.runtime.get_process_info(main_process.id) + # 设置pipeline的状态 + self.runtime.set_state( + node_id=root_pipeline_id, + to_state=states.READY, + ) + + # 如果开启了流程自动回滚,则会开启到目标节点之后自动开始 + if getattr(settings, "PIPELINE_ENABLE_AUTO_EXECUTE_WHEN_ROLL_BACKED", True): + self.runtime.set_state( + node_id=root_pipeline_id, + to_state=states.RUNNING, + ) + else: + # 流程设置为暂停状态,需要用户点击才可以继续开始 + self.runtime.set_state( + node_id=root_pipeline_id, + to_state=states.SUSPENDED, + ) + + self.runtime.execute( + process_id=process_info.process_id, + node_id=target_node_id, + root_pipeline_id=process_info.root_pipeline_id, + parent_pipeline_id=process_info.top_pipeline_id, + ) + + def rollback(self): + with transaction.atomic(): + rollback_snapshot = RollbackSnapshot.objects.select_for_update().get(id=self.snapshot_id, is_expired=False) + node_access_record = json.loads(rollback_snapshot.node_access_record) + # 只有非重试状态下才需要记录访问 + if not self.retry: + node_access_record[self.node_id] += 1 + rollback_snapshot.node_access_record = json.dumps(node_access_record) + rollback_snapshot.save() + + graph = json.loads(rollback_snapshot.graph) + target_node_id = rollback_snapshot.target_node_id + rollback_graph = RollbackGraph(nodes=graph["nodes"], flows=graph["flows"]) + skip_rollback_nodes = json.loads(rollback_snapshot.skip_rollback_nodes) + in_degrees = rollback_graph.in_degrees() + + clearner = RollbackCleaner(rollback_snapshot) + + if node_access_record[self.node_id] >= in_degrees[self.node_id]: + # 对于不需要跳过的节点才会执行具体的回滚行为 + if self.node_id not in skip_rollback_nodes: + try: + # 设置节点状态为回滚中 + self.set_state(self.node_id, states.ROLLING_BACK) + # 执行同步回滚的操作 + result = self.execute_rollback() + except Exception as e: + logger.error( + "[TokenRollbackTaskHandler][rollback] execute rollback error," + "snapshot_id={}, node_id={}, err={}".format(self.snapshot_id, self.node_id, e) + ) + # 节点和流程重置为回滚失败的状态 + self.set_state(rollback_snapshot.root_pipeline_id, states.ROLL_BACK_FAILED) + # 回滚失败的节点将不再向下执行 + self.set_state(self.node_id, states.ROLL_BACK_FAILED) + return + + # 节点回滚成功 + if result: + self.set_state(self.node_id, states.ROLL_BACK_SUCCESS) + else: + logger.info( + "[TokenRollbackTaskHandler][rollback], execute rollback failed, " + "result=False, snapshot_id={}, node_id={}".format(self.snapshot_id, self.node_id) + ) + self.set_state(self.node_id, states.ROLL_BACK_FAILED) + # 回滚失败的节点将不再向下执行 + self.set_state(rollback_snapshot.root_pipeline_id, states.ROLL_BACK_FAILED) + return + + next_node = rollback_graph.next(self.node_id) + if list(next_node)[0] == constants.END_FLAG: + self.set_state(rollback_snapshot.root_pipeline_id, states.ROLL_BACK_SUCCESS) + try: + clearner.clear_data() + self.start_pipeline( + root_pipeline_id=rollback_snapshot.root_pipeline_id, target_node_id=target_node_id + ) + except Exception as e: + logger.error("[TokenRollbackTaskHandler][rollback] start_pipeline failed, err={}".format(e)) + return + return + + for node in next_node: + token_rollback.apply_async( + kwargs={ + "snapshot_id": self.snapshot_id, + "node_id": node, + }, + queue=ROLLBACK_QUEUE, + ) + + +class AnyRollbackHandler: + def __init__(self, snapshot_id): + self.snapshot_id = snapshot_id + self.runtime = BambooDjangoRuntime() + + def start_pipeline(self, root_pipeline_id, target_node_id): + """ + 启动pipeline + """ + + main_process = Process.objects.get(root_pipeline_id=root_pipeline_id, parent_id=-1) + main_process.current_node_id = target_node_id + main_process.save() + + # 重置该节点的状态信息 + self.runtime.set_state( + node_id=target_node_id, + to_state=states.READY, + is_retry=True, + refresh_version=True, + clear_started_time=True, + clear_archived_time=True, + ) + + process_info = self.runtime.get_process_info(main_process.id) + + # 如果PIPELINE_ENABLE_AUTO_EXECUTE_WHEN_ROLL_BACKED为False, 那么则会重制流程为暂停状态 + if not getattr(settings, "PIPELINE_ENABLE_AUTO_EXECUTE_WHEN_ROLL_BACKED", True): + self.runtime.set_state( + node_id=root_pipeline_id, + to_state=states.SUSPENDED, + ) + + self.runtime.execute( + process_id=process_info.process_id, + node_id=target_node_id, + root_pipeline_id=process_info.root_pipeline_id, + parent_pipeline_id=process_info.top_pipeline_id, + ) + + def rollback(self): + with transaction.atomic(): + rollback_snapshot = RollbackSnapshot.objects.get(id=self.snapshot_id, is_expired=False) + clearner = RollbackCleaner(rollback_snapshot) + try: + clearner.clear_data() + self.start_pipeline( + root_pipeline_id=rollback_snapshot.root_pipeline_id, target_node_id=rollback_snapshot.target_node_id + ) + except Exception as e: + logger.error( + "rollback failed: start pipeline, pipeline_id={}, target_node_id={}, error={}".format( + rollback_snapshot.root_pipeline_id, rollback_snapshot.target_node_id, str(e) + ) + ) + raise e + + +@task +def token_rollback(snapshot_id, node_id, retry=False, retry_data=None): + """ + snapshot_id 本次回滚的快照id + node_id 当前要回滚的节点id + """ + TokenRollbackTaskHandler(snapshot_id=snapshot_id, node_id=node_id, retry=retry, retry_data=retry_data).rollback() + + +@task +def any_rollback(snapshot_id): + AnyRollbackHandler(snapshot_id=snapshot_id).rollback() diff --git a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py index 6cb32fb3..d0076b4b 100644 --- a/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py +++ b/runtime/bamboo-pipeline/pipeline/core/flow/activity/service_activity.py @@ -100,6 +100,9 @@ def need_run_hook(self): def schedule(self, data, parent_data, callback_data=None): return True + def rollback(self, data, parent_data, rollback_data=None): + return True + def finish_schedule(self): setattr(self, self.schedule_result_attr, True) diff --git a/runtime/bamboo-pipeline/pipeline/engine/tasks.py b/runtime/bamboo-pipeline/pipeline/engine/tasks.py index 956350f8..3a6470b1 100644 --- a/runtime/bamboo-pipeline/pipeline/engine/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/engine/tasks.py @@ -11,27 +11,28 @@ specific language governing permissions and limitations under the License. """ -import logging import datetime -from dateutil.relativedelta import relativedelta +import logging + from celery import task from celery.schedules import crontab from celery.task import periodic_task -from django.db import transaction, connection - +from dateutil.relativedelta import relativedelta +from django.apps import apps +from django.db import connection, transaction from pipeline.conf import default_settings from pipeline.core.pipeline import Pipeline from pipeline.engine import api, signals, states from pipeline.engine.core import runtime, schedule from pipeline.engine.health import zombie from pipeline.engine.models import ( + History, NodeCeleryTask, NodeRelationship, PipelineProcess, ProcessCeleryTask, - Status, ScheduleService, - History, + Status, ) from pipeline.models import PipelineInstance @@ -257,6 +258,15 @@ def _clean_pipeline_instance_data(instance_id, timestamp): delete_pipeline_process = ( "DELETE FROM `engine_pipelineprocess` " "WHERE `engine_pipelineprocess`.`root_pipeline_id` = %s" ) + + delete_rollback_plan = "DELETE FROM `rollback_rollbackplan` WHERE `rollback_rollbackplan`.`root_pipeline_id` = %s" + delete_rollback_snapshot = ( + "DELETE FROM `rollback_rollbacksnapshot` WHERE `rollback_rollbacksnapshot`.`root_pipeline_id` = %s" + ) + delete_rollback_token = ( + "DELETE FROM `rollback_rollbacktoken` WHERE `rollback_rollbacktoken`.`root_pipeline_id` = %s" + ) + with transaction.atomic(): with connection.cursor() as cursor: if pipeline_process_ids: @@ -289,6 +299,15 @@ def _clean_pipeline_instance_data(instance_id, timestamp): _raw_sql_execute(cursor, delete_pipeline_process, [instance_id], timestamp) PipelineInstance.objects.filter(instance_id=instance_id).update(is_expired=True) + try: + apps.get_model("rollback", "RollbackToken") + except Exception: + logger.error("[_clean_pipeline_rollback_data] delete error, the rollback app not installed") + return + _raw_sql_execute(cursor, delete_rollback_plan, [instance_id], timestamp) + _raw_sql_execute(cursor, delete_rollback_snapshot, [instance_id], timestamp) + _raw_sql_execute(cursor, delete_rollback_token, [instance_id], timestamp) + def _sql_log(sql, params, timestamp): if isinstance(params, list): diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py index a0be2298..23904fbd 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/hooks.py @@ -17,6 +17,8 @@ from pipeline.eri.models import LogEntry from pipeline.eri.signals import pipeline_event +from bamboo_engine.utils.constants import RuntimeSettings + class PipelineEvent: def __init__(self, event_type, data): @@ -64,6 +66,9 @@ def pre_prepare_run_pipeline( ) ) + if self.get_config(RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value): + self.set_pipeline_token(pipeline) + def post_prepare_run_pipeline( self, pipeline: dict, root_pipeline_data: dict, root_pipeline_context: dict, subprocess_context: dict, **options ): diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/node.py b/runtime/bamboo-pipeline/pipeline/eri/imp/node.py index aad414d3..01706562 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/node.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/node.py @@ -13,24 +13,25 @@ import json +from pipeline.eri.models import Node as DBNode + from bamboo_engine import metrics from bamboo_engine.eri import ( - Node, - NodeType, - ServiceActivity, - SubProcess, - ExclusiveGateway, - ParallelGateway, + Condition, ConditionalParallelGateway, ConvergeGateway, - EmptyStartEvent, + DefaultCondition, EmptyEndEvent, + EmptyStartEvent, + ExclusiveGateway, ExecutableEndEvent, - Condition, DefaultCondition, + Node, + NodeType, + ParallelGateway, + ServiceActivity, + SubProcess, ) -from pipeline.eri.models import Node as DBNode - class NodeMixin: def _get_node(self, node: DBNode): @@ -47,6 +48,7 @@ def _get_node(self, node: DBNode): can_skip=node_detail["can_skip"], name=node_detail.get("name"), can_retry=node_detail["can_retry"], + reserve_rollback=node_detail.get("reserve_rollback", False), ) if node_type == NodeType.ServiceActivity.value: diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/process.py b/runtime/bamboo-pipeline/pipeline/eri/imp/process.py index 05aef601..730da35d 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/process.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/process.py @@ -12,17 +12,17 @@ """ import json -from typing import List, Optional, Dict +from typing import Dict, List, Optional -from django.utils import timezone -from django.db.models import F +from django.conf import settings from django.db import transaction - -from bamboo_engine import metrics -from bamboo_engine.eri import ProcessInfo, SuspendedProcessInfo, DispatchProcess - +from django.db.models import F +from django.utils import timezone from pipeline.eri.models import Process +from bamboo_engine import metrics, states +from bamboo_engine.eri import DispatchProcess, ProcessInfo, SuspendedProcessInfo + class ProcessMixin: def beat(self, process_id: int): @@ -273,6 +273,23 @@ def fork( if not qs: raise Process.DoesNotExist("Process with id({}) does not exist".format(parent_id)) + if getattr(settings, "PIPELINE_ENABLE_ROLLBACK", False): + # 如果开启了回滚,则会自动删除相关的process信息,防止异常 + state = self.get_state(root_pipeline_id) + # 如果流程处在回滚中,才会删除 + if state.name == states.ROLLING_BACK: + for current_node, destination in from_to.items(): + Process.objects.filter( + parent_id=parent_id, + asleep=True, + destination_id=destination, + current_node_id=current_node, + root_pipeline_id=root_pipeline_id, + pipeline_stack=stack_json, + priority=qs[0].priority, + queue=qs[0].queue, + ).delete() + children = [ Process( parent_id=parent_id, diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py new file mode 100644 index 00000000..99ec147d --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +import json +import logging + +from django.apps import apps +from django.core.serializers.json import DjangoJSONEncoder + +from bamboo_engine.builder.builder import generate_pipeline_token + +logger = logging.getLogger("bamboo_engine") + + +class RollbackMixin: + def set_pipeline_token(self, pipeline_tree: dict): + """ + 设置pipeline token + """ + try: + # 引用成功说明pipeline rollback 这个 app 是安装过的 + RollbackToken = apps.get_model("rollback", "RollbackToken") + except Exception as e: + logger.error( + "[RollbackMixin][set_pipeline_token] import RollbackToken error, " + "Please check whether the rollback app is installed correctly, err={}".format(e) + ) + return + + root_pipeline_id = pipeline_tree["id"] + node_map = generate_pipeline_token(pipeline_tree) + + RollbackToken.objects.create(root_pipeline_id=root_pipeline_id, token=json.dumps(node_map)) + + def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_values, inputs, outputs): + """ + 创建一分节点快照 + """ + try: + RollbackNodeSnapshot = apps.get_model("rollback", "RollbackNodeSnapshot") + # 引用成功说明pipeline rollback 这个 app 是安装过的 + except Exception as e: + logger.error( + "[RollbackMixin][set_node_snapshot] import RollbackNodeSnapshot error, " + "Please check whether the rollback app is installed correctly, err={}".format(e) + ) + return + + RollbackNodeSnapshot.objects.create( + root_pipeline_id=root_pipeline_id, + node_id=node_id, + code=code, + version=version, + context_values=json.dumps(context_values, cls=DjangoJSONEncoder), + inputs=json.dumps(inputs, cls=DjangoJSONEncoder), + outputs=json.dumps(outputs, cls=DjangoJSONEncoder), + ) + + def start_rollback(self, root_pipeline_id, node_id): + """ + 新建一个回滚任务 + """ + try: + # 引用成功说明pipeline rollback 这个 app 是安装过的 + from pipeline.contrib.rollback.handler import RollbackDispatcher + + RollbackPlan = apps.get_model("rollback", "RollbackPlan") + except Exception as e: + logger.error( + "[RollbackMixin][set_pipeline_token] import RollbackDispatcher or RollbackPlan error, " + "Please check whether the rollback app is installed correctly, err={}".format(e) + ) + return + + try: + rollback_plan = RollbackPlan.objects.get( + root_pipeline_id=root_pipeline_id, start_node_id=node_id, is_expired=False + ) + handler = RollbackDispatcher(root_pipeline_id=root_pipeline_id, mode=rollback_plan.mode) + handler.rollback(rollback_plan.start_node_id, rollback_plan.target_node_id) + rollback_plan.is_expired = True + rollback_plan.save(update_fields=["is_expired"]) + except Exception as e: + logger.error("[RollbackMixin][start_rollback] start a rollback task error, err={}".format(e)) + return diff --git a/runtime/bamboo-pipeline/pipeline/eri/runtime.py b/runtime/bamboo-pipeline/pipeline/eri/runtime.py index 664157ff..a6e0494b 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/runtime.py +++ b/runtime/bamboo-pipeline/pipeline/eri/runtime.py @@ -29,6 +29,7 @@ from pipeline.eri.imp.node import NodeMixin from pipeline.eri.imp.plugin_manager import PipelinePluginManagerMixin from pipeline.eri.imp.process import ProcessMixin +from pipeline.eri.imp.rollback import RollbackMixin from pipeline.eri.imp.schedule import ScheduleMixin from pipeline.eri.imp.state import StateMixin from pipeline.eri.imp.task import TaskMixin @@ -68,10 +69,10 @@ class BambooDjangoRuntime( InterruptMixin, EventMixin, ConfigMixin, + RollbackMixin, EngineRuntimeInterface, ): - - ERI_SUPPORT_VERSION = 7 + ERI_SUPPORT_VERSION = 8 def __init__(self): try: diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_graph.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_graph.py new file mode 100644 index 00000000..f439b1dc --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_graph.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +from unittest import TestCase + +from pipeline.contrib.rollback.graph import RollbackGraphHandler + + +class TestGraph(TestCase): + def test_build_rollback_graph_with_cycle(self): + node_map = { + "start": { + "id": "start", + "type": "EmptyStartEvent", + "targets": {"lf67ec0280323668ba383bc61c92bdce": "node_1"}, + }, + "node_1": { + "id": "node_1", + "type": "ServiceActivity", + "targets": {"l5c6729c70b83c81bd98eebefe0c46e3": "node_2"}, + }, + "node_2": { + "id": "node_2", + "type": "ServiceActivity", + "targets": {"ld09dcdaaae53cd9868b652fd4b7b074": "node_3"}, + }, + "node_3": { + "id": "node_3", + "type": "ExclusiveGateway", + "targets": {"ld9beef12dd33812bb9b697afd5f2728": "node_4", "lffeab3bdb0139b69ac6978a415e3f54": "node_1"}, + }, + "node_4": { + "id": "node_4", + "type": "ServiceActivity", + "targets": {"l995fa16e367312e99a1f8b54458ed6a": "node_5"}, + }, + "node_5": { + "id": "node_5", + "type": "ServiceActivity", + "targets": {"l802b3f8e60e39518915f85d4c943a18": "node_6"}, + }, + "node_6": { + "id": "node_6", + "type": "ExclusiveGateway", + "targets": {"l8ff0721ec8c3745b6f2183a7006d2c6": "node_7", "l5df5ee5497f3616aec4347c0e5913b8": "node_5"}, + }, + "node_7": {"id": "node_7", "type": "EmptyEndEvent", "targets": {}}, + } + + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id="node_5", target_id="node_1") + graph, other_nodes = rollback_graph.build_rollback_graph() + self.assertListEqual(other_nodes, ["node_3"]) + self.assertListEqual(graph.as_dict()["nodes"], ["node_5", "node_1", "END", "START", "node_2", "node_4"]) + self.assertListEqual( + graph.as_dict()["flows"], + [["node_1", "END"], ["START", "node_5"], ["node_2", "node_1"], ["node_4", "node_2"], ["node_5", "node_4"]], + ) + self.assertListEqual(list(graph.next("START")), ["node_5"]) diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py index e52b2982..9fc47f6b 100644 --- a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py @@ -1,80 +1,87 @@ -# -*- coding: utf-8 -*- -""" -Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community -Edition) available. -Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at -http://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" +# # -*- coding: utf-8 -*- +# """ +# Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +# Edition) available. +# Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +# Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://opensource.org/licenses/MIT +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# """ import json import mock from django.test import TestCase -from django.utils import timezone from mock.mock import MagicMock from pipeline.contrib.rollback import api -from pipeline.contrib.rollback.handler import RollBackHandler +from pipeline.contrib.rollback.models import ( + RollbackPlan, + RollbackSnapshot, + RollbackToken, +) from pipeline.core.constants import PE from pipeline.eri.models import Node, Process, State from bamboo_engine import states from bamboo_engine.utils.string import unique_id -forced_fail_activity_mock = MagicMock() -forced_fail_activity_mock.result = True +token_rollback = MagicMock() +token_rollback.apply_async = MagicMock(return_value=True) class TestRollBackBase(TestCase): - def setUp(self) -> None: - self.started_time = timezone.now() - self.archived_time = timezone.now() - - @mock.patch("bamboo_engine.api.forced_fail_activity", MagicMock(return_value=forced_fail_activity_mock)) - @mock.patch("pipeline.eri.runtime.BambooDjangoRuntime.execute", MagicMock()) + @mock.patch("pipeline.contrib.rollback.handler.token_rollback", MagicMock(return_value=token_rollback)) def test_rollback(self): pipeline_id = unique_id("n") - State.objects.create( - node_id=pipeline_id, - root_id=pipeline_id, - parent_id=pipeline_id, - name=states.FINISHED, - version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, + pipeline_state = State.objects.create( + node_id=pipeline_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.FAILED, version=unique_id("v") ) - node_id_1 = unique_id("n") - node_id_2 = unique_id("n") - State.objects.create( - node_id=node_id_1, + start_node_id = unique_id("n") + start_state = State.objects.create( + node_id=start_node_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.RUNNING, version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, ) + target_node_id = unique_id("n") State.objects.create( - node_id=node_id_2, + node_id=target_node_id, root_id=pipeline_id, parent_id=pipeline_id, - name=states.RUNNING, + name=states.FINISHED, version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, ) - node_id_1_detail = { - "id": "n0be4eaa13413f9184863776255312f1", - "type": PE.ParallelGateway, - "targets": {"l7895e18cd7c33b198d56534ca332227": node_id_2}, - "root_pipeline_id": "n3369d7ce884357f987af1631bda69cb", - "parent_pipeline_id": "n3369d7ce884357f987af1631bda69cb", + result = api.rollback(pipeline_id, start_node_id, target_node_id) + self.assertFalse(result.result) + message = ( + "rollback failed: the task of non-running state is not allowed to roll back," + " pipeline_id={}, state=FAILED".format(pipeline_id) + ) + self.assertEqual(str(result.exc), message) + pipeline_state.name = states.RUNNING + pipeline_state.save() + + token = RollbackToken.objects.create( + root_pipeline_id=pipeline_id, token=json.dumps({target_node_id: "xxx", start_node_id: "xsx"}) + ) + + result = api.rollback(pipeline_id, start_node_id, target_node_id) + self.assertFalse(result.result) + message = "rollback failed: node not exist, node={}".format(start_node_id) + self.assertEqual(str(result.exc), message) + + target_node_detail = { + "id": target_node_id, + "type": PE.ServiceActivity, + "targets": {target_node_id: start_node_id}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, "can_skip": True, "code": "bk_display", "version": "v1.0", @@ -82,14 +89,12 @@ def test_rollback(self): "can_retry": True, } - Node.objects.create(node_id=node_id_1, detail=json.dumps(node_id_1_detail)) - - node_id_2_detail = { - "id": "n0be4eaa13413f9184863776255312f1", - "type": PE.ParallelGateway, - "targets": {"l7895e18cd7c33b198d56534ca332227": unique_id("n")}, - "root_pipeline_id": "n3369d7ce884357f987af1631bda69cb", - "parent_pipeline_id": "n3369d7ce884357f987af1631bda69cb", + start_node_detail = { + "id": start_node_id, + "type": PE.ServiceActivity, + "targets": {}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, "can_skip": True, "code": "bk_display", "version": "v1.0", @@ -97,151 +102,163 @@ def test_rollback(self): "can_retry": True, } - Node.objects.create(node_id=node_id_2, detail=json.dumps(node_id_2_detail)) + Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) + Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) - # pipeline_id 非running的情况下会异常 - message = "rollback failed: the task of non-running state is not allowed to roll back, pipeline_id={}".format( - pipeline_id - ) - result = api.rollback(pipeline_id, pipeline_id) + result = api.rollback(pipeline_id, start_node_id, target_node_id) self.assertFalse(result.result) + message = "rollback failed: only allows rollback to finished node, allowed states ['FINISHED', 'FAILED']" self.assertEqual(str(result.exc), message) - State.objects.filter(node_id=pipeline_id).update(name=states.RUNNING) - # pipeline_id 非running的情况下会异常 - message = "rollback failed: only allows rollback to ServiceActivity type nodes" - result = api.rollback(pipeline_id, node_id_1) + start_state.name = states.FINISHED + start_state.save() + + result = api.rollback(pipeline_id, start_node_id, target_node_id) self.assertFalse(result.result) + message = "rollback failed: start node token must equal target node, pipeline_id={}".format(pipeline_id) self.assertEqual(str(result.exc), message) - node_id_1_detail["type"] = PE.ServiceActivity - Node.objects.filter(node_id=node_id_1).update(detail=json.dumps(node_id_1_detail)) + token.token = json.dumps({target_node_id: "xxx", start_node_id: "xxx"}) + token.save() + + result = api.rollback(pipeline_id, start_node_id, target_node_id) + self.assertTrue(result.result) + rollback_snapshot = RollbackSnapshot.objects.get(root_pipeline_id=pipeline_id) + self.assertEqual(json.loads(rollback_snapshot.skip_rollback_nodes), []) + self.assertEqual(len(json.loads(rollback_snapshot.graph)["nodes"]), 4) + + pipeline_state.refresh_from_db() + self.assertEqual(pipeline_state.name, states.ROLLING_BACK) - message = "rollback failed: only allows rollback to finished node" - result = api.rollback(pipeline_id, node_id_1) + def test_reserve_rollback(self): + pipeline_id = unique_id("n") + State.objects.create( + node_id=pipeline_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.RUNNING, + version=unique_id("v") + # noqa + ) + + start_node_id = unique_id("n") + start_state = State.objects.create( + node_id=start_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + target_node_id = unique_id("n") + State.objects.create( + node_id=target_node_id, + root_id=pipeline_id, + parent_id=pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + + Process.objects.create(root_pipeline_id=pipeline_id, current_node_id=start_node_id, priority=1) + result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) self.assertFalse(result.result) + message = "rollback failed: pipeline token not exist, pipeline_id={}".format(pipeline_id) self.assertEqual(str(result.exc), message) - State.objects.filter(node_id=node_id_1).update(name=states.FINISHED) - - p = Process.objects.create( - root_pipeline_id=pipeline_id, - parent_id=-1, - current_node_id=node_id_2, - pipeline_stack=json.dumps([pipeline_id]), - priority=1, + + RollbackToken.objects.create( + root_pipeline_id=pipeline_id, token=json.dumps({target_node_id: "xxx", start_node_id: "xxx"}) ) - result = api.rollback(pipeline_id, node_id_1) - self.assertTrue(result.result) + result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) + self.assertFalse(result.result) + message = "rollback failed: node not exist, node={}".format(target_node_id) + self.assertEqual(str(result.exc), message) - p.refresh_from_db() - self.assertEqual(p.current_node_id, node_id_1) - # 验证Node2 是不是被删除了 - self.assertFalse(State.objects.filter(node_id=node_id_2).exists()) - - state = State.objects.get(node_id=node_id_1) - self.assertEqual(state.name, states.READY) - - def test_compute_validate_nodes(self): - node_map = { - "node_1": { - "id": "node_1", - "type": "EmptyStartEvent", - "targets": {"n": "node_2"}, - }, - "node_2": { - "id": "node_2", - "type": "ServiceActivity", - "targets": {"n": "node_3"}, - }, - "node_3": { - "id": "node_3", - "type": "ServiceActivity", - "targets": {"n": "node_4"}, - }, - "node_4": { - "id": "node_4", - "type": "ParallelGateway", - "targets": {"n": "node_5", "n1": "node_6"}, - "converge_gateway_id": "node_7", - }, - "node_5": { - "id": "node_5", - "type": "ServiceActivity", - "targets": {"n": "node_7"}, - }, - "node_6": { - "id": "node_6", - "type": "ServiceActivity", - "targets": {"n": "node_7"}, - }, - "node_7": { - "id": "node_7", - "type": "ConvergeGateway", - "targets": {"n": "node_8"}, - }, - "node_8": { - "id": "node_8", - "type": "ExclusiveGateway", - "targets": {"n1": "node_13", "n2": "node_9", "n3": "node_3"}, - }, - "node_9": { - "id": "node_9", - "type": "ServiceActivity", - "targets": {"n": "node_10"}, - }, - "node_10": { - "id": "node_10", - "type": "ExclusiveGateway", - "targets": {"n": "node_11", "n2": "node_12"}, - }, + target_node_detail = { + "id": target_node_id, + "type": PE.ServiceActivity, + "targets": {target_node_id: start_node_id}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, } - node_id = "node_1" - nodes = RollBackHandler("p", node_map)._compute_validate_nodes(node_id, node_map) - self.assertListEqual(nodes, ["node_2", "node_3", "node_9"]) + start_node_detail = { + "id": start_node_id, + "type": PE.ServiceActivity, + "targets": {}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, + "can_skip": True, + "code": "bk_display", + "version": "v1.0", + "error_ignorable": True, + "can_retry": True, + } - def test_get_allowed_rollback_node_id_list(self): - pipeline_id = unique_id("n") + Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) + Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) + result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) + self.assertFalse(result.result) + message = "reserve rollback failed, the node state is not Running, current state=FINISHED, node_id={}".format( + # noqa + start_node_id + ) + self.assertEqual(str(result.exc), message) + + start_state.name = states.RUNNING + start_state.save() + + result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) + self.assertTrue(result.result) + + plan = RollbackPlan.objects.get(root_pipeline_id=pipeline_id) + + self.assertEqual(plan.start_node_id, start_node_id) + self.assertEqual(plan.target_node_id, target_node_id) + + result = api.cancel_reserved_rollback(pipeline_id, start_node_id, target_node_id) + self.assertTrue(result.result) + + def test_allowed_rollback_node_id_list(self): + pipeline_id = unique_id("n") State.objects.create( node_id=pipeline_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.RUNNING, - version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, + version=unique_id("v") + # noqa ) - - node_id_1 = unique_id("n") - node_id_2 = unique_id("n") + start_node_id = unique_id("n") State.objects.create( - node_id=node_id_1, + node_id=start_node_id, root_id=pipeline_id, parent_id=pipeline_id, name=states.FINISHED, version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, ) + target_node_id = unique_id("n") State.objects.create( - node_id=node_id_2, + node_id=target_node_id, root_id=pipeline_id, parent_id=pipeline_id, - name=states.RUNNING, + name=states.FINISHED, version=unique_id("v"), - started_time=self.started_time, - archived_time=self.archived_time, ) - node_id_1_detail = { - "id": "n0be4eaa13413f9184863776255312f1", + target_node_detail = { + "id": target_node_id, "type": PE.ServiceActivity, - "targets": {"l7895e18cd7c33b198d56534ca332227": node_id_2}, - "root_pipeline_id": "n3369d7ce884357f987af1631bda69cb", - "parent_pipeline_id": "n3369d7ce884357f987af1631bda69cb", + "targets": {target_node_id: start_node_id}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, "can_skip": True, "code": "bk_display", "version": "v1.0", @@ -249,14 +266,12 @@ def test_get_allowed_rollback_node_id_list(self): "can_retry": True, } - Node.objects.create(node_id=node_id_1, detail=json.dumps(node_id_1_detail)) - - node_id_2_detail = { - "id": "n0be4eaa13413f9184863776255312f1", + start_node_detail = { + "id": start_node_id, "type": PE.ServiceActivity, - "targets": {"l7895e18cd7c33b198d56534ca332227": unique_id("n")}, - "root_pipeline_id": "n3369d7ce884357f987af1631bda69cb", - "parent_pipeline_id": "n3369d7ce884357f987af1631bda69cb", + "targets": {}, + "root_pipeline_id": pipeline_id, + "parent_pipeline_id": pipeline_id, "can_skip": True, "code": "bk_display", "version": "v1.0", @@ -264,8 +279,62 @@ def test_get_allowed_rollback_node_id_list(self): "can_retry": True, } - Node.objects.create(node_id=node_id_2, detail=json.dumps(node_id_2_detail)) + Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) + Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) + + RollbackToken.objects.create( + root_pipeline_id=pipeline_id, token=json.dumps({target_node_id: "xxx", start_node_id: "xxx"}) + ) + + result = api.get_allowed_rollback_node_id_list(pipeline_id, start_node_id) + self.assertTrue(result.result) + self.assertEqual(len(result.data), 1) + self.assertEqual(result.data[0], target_node_id) + + @mock.patch("pipeline.contrib.rollback.handler.token_rollback", MagicMock(return_value=token_rollback)) + def test_retry_rollback_failed_node(self): + root_pipeline_id = unique_id("n") + pipeline_state = State.objects.create( + node_id=root_pipeline_id, + root_id=root_pipeline_id, + parent_id=root_pipeline_id, + name=states.RUNNING, + version=unique_id("v"), + ) + node_id = unique_id("n") + node_state = State.objects.create( + node_id=node_id, + root_id=root_pipeline_id, + parent_id=root_pipeline_id, + name=states.FINISHED, + version=unique_id("v"), + ) + result = api.retry_rollback_failed_node(root_pipeline_id, node_id) + self.assertFalse(result.result) + message = "rollback failed: only retry the failed pipeline, current_status=RUNNING" + self.assertEqual(str(result.exc), message) + + pipeline_state.name = states.ROLL_BACK_FAILED + pipeline_state.save() + + result = api.retry_rollback_failed_node(root_pipeline_id, node_id) + self.assertFalse(result.result) + message = "rollback failed: only retry the failed node, current_status=FINISHED" + self.assertEqual(str(result.exc), message) + + node_state.name = states.ROLL_BACK_FAILED + node_state.save() - result = api.get_allowed_rollback_node_id_list(pipeline_id) - self.assertEqual(result.result, True) - self.assertEqual(result.data, [node_id_1]) + result = api.retry_rollback_failed_node(root_pipeline_id, node_id) + self.assertFalse(result.result) + message = "rollback failed: the rollback snapshot is not exists, please check" + self.assertEqual(str(result.exc), message) + + RollbackSnapshot.objects.create( + root_pipeline_id=root_pipeline_id, + graph=json.dumps({}), + node_access_record=json.dumps({}), + ) + + result = api.retry_rollback_failed_node(root_pipeline_id, node_id) + self.assertTrue(result.result) diff --git a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/control/test_rollback_node.py b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/control/test_rollback_node.py deleted file mode 100644 index 992fdc99..00000000 --- a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/control/test_rollback_node.py +++ /dev/null @@ -1,149 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community -Edition) available. -Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. -Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. -You may obtain a copy of the License at -http://opensource.org/licenses/MIT -Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -specific language governing permissions and limitations under the License. -""" -import time - -from bamboo_engine import Engine - -from bamboo_engine.builder import * # noqa -from pipeline.engine import states -from pipeline.eri.models import Process, State -from pipeline.eri.runtime import BambooDjangoRuntime - -from pipeline.contrib.rollback import api - - -def test_rollback_sample_pipeline(): - start = EmptyStartEvent() - act_0 = ServiceActivity(component_code="callback_node") - act_1 = ServiceActivity(component_code="callback_node") - end = EmptyEndEvent() - start.extend(act_0).extend(act_1).extend(end) - pipeline = build_tree(start) - runtime = BambooDjangoRuntime() - engine = Engine(runtime) - engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}) - time.sleep(3) - - state = runtime.get_state(act_0.id) - engine.callback(act_0.id, state.version, {"bit": 1}) - pipeline_id = pipeline["id"] - time.sleep(3) - assert State.objects.filter(node_id=act_0.id, name=states.FINISHED).exists() - api.rollback(root_pipeline_id=pipeline_id, node_id=act_0.id) - time.sleep(3) - process = Process.objects.get(root_pipeline_id=pipeline_id, parent_id=-1) - # 此时最新进程被指向了最新的node_id - assert process.current_node_id == act_0.id - # 此时第一个节点重回RUNNING状态 - assert State.objects.filter(node_id=act_0.id, name=states.RUNNING).exists() - - -def test_rollback_pipeline_with_exclusive_gateway(): - """ - -> act_0 - 开始 -> 分支网关 -> 汇聚网关 -> act_2 -> 结束 - -> act_1 - - 当执行到 act_2 时,此时回退到act_0 应当能够再次回到 act_2 - """ - - runtime = BambooDjangoRuntime() - - start = EmptyStartEvent() - eg = ExclusiveGateway( - conditions={0: "True == True", 1: "True == False"} - ) - act_0 = ServiceActivity(component_code="callback_node") - act_1 = ServiceActivity(component_code="callback_node") - act_2 = ServiceActivity(component_code="callback_node") - - cg = ConvergeGateway() - end = EmptyEndEvent() - - start.extend(eg).connect(act_0, act_1).converge(cg).extend(act_2).extend(end) - - pipeline = build_tree(start) - engine = Engine(BambooDjangoRuntime()) - engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}) - time.sleep(3) - - state = runtime.get_state(act_0.id) - engine.callback(act_0.id, state.version, {"bit": 1}) - - time.sleep(3) - pipeline_id = pipeline["id"] - - process = Process.objects.get(root_pipeline_id=pipeline_id, parent_id=-1) - - # 此时执行到了act_2 - assert process.current_node_id == act_2.id - - api.rollback(pipeline_id, act_0.id) - time.sleep(3) - - process.refresh_from_db() - # 此时最新进程被指向了最新的node_id - assert process.current_node_id == act_0.id - # 此时第一个节点重回RUNNING状态 - assert State.objects.filter(node_id=act_0.id, name=states.RUNNING).exists() - - -def test_rollback_pipeline_with_conditional_parallel(): - """ - -> act_1 - 开始 -> act_0 并行网关 -> 汇聚网关 -> act_3 -> 结束 - -> act_2 - - 当执行到 act_2 时,此时回退到act_0 应当能够再次回到 act_2 - """ - - runtime = BambooDjangoRuntime() - - start = EmptyStartEvent() - act_0 = ServiceActivity(component_code="debug_node") - pg = ParallelGateway() - - act_1 = ServiceActivity(component_code="debug_node") - act_2 = ServiceActivity(component_code="debug_node") - cg = ConvergeGateway() - act_3 = ServiceActivity(component_code="callback_node") - end = EmptyEndEvent() - - start.extend(act_0).extend(pg).connect(act_1, act_2).converge(cg).extend(act_3).extend(end) - - pipeline = build_tree(start) - engine = Engine(BambooDjangoRuntime()) - engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}) - - time.sleep(3) - pipeline_id = pipeline["id"] - - process = Process.objects.get(root_pipeline_id=pipeline_id, parent_id=-1) - # 此时执行到了act_2 - assert process.current_node_id == act_3.id - - # 此时回到开始节点 - api.rollback(pipeline_id, act_0.id) - time.sleep(3) - - process.refresh_from_db() - # 此时第二次执行到act_2 - assert process.current_node_id == act_3.id - # 此时第一个节点重回RUNNING状态 - assert State.objects.filter(node_id=act_3.id, name=states.RUNNING).exists() - # callback act_2 此时流程结束 - state = runtime.get_state(act_3.id) - engine.callback(act_3.id, state.version, {"bit": 1}) - time.sleep(3) - - assert State.objects.filter(node_id=pipeline_id, name=states.FINISHED).exists() diff --git a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py index ed03f568..3b092716 100755 --- a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py +++ b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py @@ -13,13 +13,13 @@ # Build paths inside the project like this: os.path.join(BASE_DIR, ...) import os +from celery import Celery from pipeline.celery.queues import ScalableQueues # noqa from pipeline.celery.settings import * # noqa from pipeline.eri.celery import queues, step -from celery import Celery -CELERY_QUEUES.extend(queues.CELERY_QUEUES) -CELERY_QUEUES.extend(queues.QueueResolver("api").queues()) +CELERY_QUEUES.extend(queues.CELERY_QUEUES) # noqa +CELERY_QUEUES.extend(queues.QueueResolver("api").queues()) # noqa step.PromServerStep.port = 8002 @@ -55,6 +55,7 @@ "pipeline", "pipeline.log", "pipeline.engine", + "pipeline.contrib.rollback", "pipeline.component_framework", "pipeline.variable_framework", "pipeline.django_signal_valve", @@ -161,7 +162,7 @@ BROKER_URL = "amqp://guest:guest@localhost:5672//" -# BROKER_URL = 'redis://localhost:6379/0' +# BROKER_URL = "redis://localhost:6379/0" PIPELINE_DATA_BACKEND = "pipeline.engine.core.data.redis_backend.RedisDataBackend" diff --git a/tests/builder/__init__.py b/tests/builder/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/tests/builder/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/tests/builder/test_token.py b/tests/builder/test_token.py new file mode 100644 index 00000000..52a1325c --- /dev/null +++ b/tests/builder/test_token.py @@ -0,0 +1,205 @@ +# -*- coding: utf-8 -*- + +from bamboo_engine.builder import ( + ConditionalParallelGateway, + ConvergeGateway, + EmptyEndEvent, + EmptyStartEvent, + ExclusiveGateway, + ParallelGateway, + ServiceActivity, + SubProcess, + build_tree, +) +from bamboo_engine.builder.builder import generate_pipeline_token + + +def get_node_token(tree, name: str, node_map): + # 根据 name 获取对应的token + if name.startswith("act"): + for activity_id, value in tree["activities"].items(): + if value["name"] == name: + return node_map[activity_id] + + if ( + name.startswith("ParallelGateway") + or name.startswith("ExclusiveGateway") + or name.startswith("ConvergeGateway") + or name.startswith("ConditionalParallelGateway") + ): + for gateway_id, value in tree["gateways"].items(): + if value["name"] == name: + return node_map[gateway_id] + + if name.startswith("start_event"): + return node_map[tree["start_event"]["id"]] + + if name.startswith("end_event"): + return node_map[tree["end_event"]["id"]] + + +def test_inject_pipeline_token_normal(): + start = EmptyStartEvent() + act = ServiceActivity(name="act_1", component_code="example_component") + end = EmptyEndEvent() + + start.extend(act).extend(end) + pipeline = build_tree(start) + + node_token_map = generate_pipeline_token(pipeline) + + assert get_node_token(pipeline, "act_1", node_token_map) == get_node_token(pipeline, "start_event", node_token_map) + assert get_node_token(pipeline, "start_event", node_token_map) == get_node_token( + pipeline, "end_event", node_token_map + ) + + +def test_inject_pipeline_token_parallel_gateway(): + start = EmptyStartEvent() + pg = ParallelGateway(name="ParallelGateway") + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + cg = ConvergeGateway(name="ConvergeGateway") + end = EmptyEndEvent() + + start.extend(pg).connect(act_1, act_2, act_3).to(pg).converge(cg).extend(end) + + pipeline = build_tree(start) + node_token_map = generate_pipeline_token(pipeline) + + assert ( + get_node_token(pipeline, "start_event", node_token_map) + == get_node_token(pipeline, "ParallelGateway", node_token_map) + == get_node_token(pipeline, "ConvergeGateway", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + != get_node_token(pipeline, "act_1", node_token_map) + ) + + assert ( + get_node_token(pipeline, "act_1", node_token_map) + != get_node_token(pipeline, "act_2", node_token_map) + != get_node_token(pipeline, "act_3", node_token_map) + ) + + +def test_inject_pipeline_token_exclusive_gateway(): + start = EmptyStartEvent() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + eg = ExclusiveGateway(conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0"}, name="ExclusiveGateway") + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + end = EmptyEndEvent() + start.extend(act_1).extend(eg).connect(act_2, act_3).to(eg).converge(end) + + pipeline = build_tree(start) + node_token_map = generate_pipeline_token(pipeline) + + assert ( + get_node_token(pipeline, "start_event", node_token_map) + == get_node_token(pipeline, "act_1", node_token_map) + == get_node_token(pipeline, "ExclusiveGateway", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + != get_node_token(pipeline, "act_2", node_token_map) + ) + + assert get_node_token(pipeline, "act_2", node_token_map) == get_node_token(pipeline, "act_3", node_token_map) + + +def test_inject_pipeline_token_conditional_exclusive_gateway(): + start = EmptyStartEvent() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + cpg = ConditionalParallelGateway( + conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"}, + name="ConditionalParallelGateway", + ) + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + act_4 = ServiceActivity(component_code="pipe_example_component", name="act_4") + cg = ConvergeGateway(name="ConvergeGateway") + end = EmptyEndEvent() + + start.extend(act_1).extend(cpg).connect(act_2, act_3, act_4).to(cpg).converge(cg).extend(end) + + pipeline = build_tree(start) + node_token_map = generate_pipeline_token(pipeline) + assert ( + get_node_token(pipeline, "start_event", node_token_map) + == get_node_token(pipeline, "act_1", node_token_map) + == get_node_token(pipeline, "ConditionalParallelGateway", node_token_map) + == get_node_token(pipeline, "ConvergeGateway", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + != get_node_token(pipeline, "act_3", node_token_map) + ) + + assert ( + get_node_token(pipeline, "act_2", node_token_map) + != get_node_token(pipeline, "act_3", node_token_map) + != get_node_token(pipeline, "act_4", node_token_map) + ) + + +def test_inject_pipeline_token_subprocess(): + def sub_process(name): + subproc_start = EmptyStartEvent() + subproc_act = ServiceActivity(component_code="pipe_example_component", name="act_2") + subproc_end = EmptyEndEvent() + subproc_start.extend(subproc_act).extend(subproc_end) + return SubProcess(start=subproc_start, name=name) + + start = EmptyStartEvent() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + eg = ExclusiveGateway(conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0"}, name="ExclusiveGateway") + + subproc_1 = sub_process(name="act_3") + subproc_2 = sub_process(name="act_4") + end = EmptyEndEvent() + + start.extend(act_1).extend(eg).connect(subproc_1, subproc_2).converge(end) + + pipeline = build_tree(start) + node_token_map = generate_pipeline_token(pipeline) + + assert ( + get_node_token(pipeline, "start_event", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + == get_node_token(pipeline, "ExclusiveGateway", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + != get_node_token(pipeline, "act_3", node_token_map) + ) + + assert get_node_token(pipeline, "act_3", node_token_map) == get_node_token(pipeline, "act_4", node_token_map) + + subproc_pipeline = pipeline["activities"][subproc_1.id]["pipeline"] + + assert ( + get_node_token(subproc_pipeline, "start_event", node_token_map) + == get_node_token(subproc_pipeline, "end_event", node_token_map) + == get_node_token(subproc_pipeline, "act_2", node_token_map) + ) + + +def test_inject_pipeline_token_with_cycle(): + start = EmptyStartEvent() + act_1 = ServiceActivity(component_code="pipe_example_component", name="act_1") + eg = ExclusiveGateway( + conditions={0: "${act_1_output} < 0", 1: "${act_1_output} >= 0", 2: "${act_1_output} >= 0"}, + name="ExclusiveGateway", + ) + act_2 = ServiceActivity(component_code="pipe_example_component", name="act_2") + act_3 = ServiceActivity(component_code="pipe_example_component", name="act_3") + end = EmptyEndEvent() + start.extend(act_1).extend(eg).connect(act_2, act_3, act_1).to(eg).converge(end) + + pipeline = build_tree(start) + node_token_map = generate_pipeline_token(pipeline) + + assert ( + get_node_token(pipeline, "start_event", node_token_map) + == get_node_token(pipeline, "act_1", node_token_map) + == get_node_token(pipeline, "ExclusiveGateway", node_token_map) + == get_node_token(pipeline, "end_event", node_token_map) + != get_node_token(pipeline, "act_2", node_token_map) + ) + + assert get_node_token(pipeline, "act_2", node_token_map) == get_node_token(pipeline, "act_3", node_token_map) diff --git a/tests/engine/test_engine_execute.py b/tests/engine/test_engine_execute.py index 59cab573..362cfd2a 100644 --- a/tests/engine/test_engine_execute.py +++ b/tests/engine/test_engine_execute.py @@ -96,6 +96,7 @@ def node(node_id): parent_pipeline_id="root", code="", version="", + reserve_rollback=True, error_ignorable=False, ) @@ -380,7 +381,6 @@ def test_execute__rerun_and_have_to_sleep(node_id, pi, interrupter, node, state) def test_execute__have_to_sleep(node_id, pi, interrupter, node, state): - runtime = MagicMock() runtime.get_process_info = MagicMock(return_value=pi) runtime.batch_get_state_name = MagicMock(return_value={"root": states.RUNNING}) @@ -780,7 +780,6 @@ def test_execute__has_dispatch_processes(node_id, pi, interrupter, node, state): def test_execute__have_to_die(node_id, pi, interrupter, node, state): - runtime = MagicMock() runtime.get_process_info = MagicMock(return_value=pi) runtime.batch_get_state_name = MagicMock(return_value={"root": states.RUNNING}) @@ -851,6 +850,81 @@ def test_execute__have_to_die(node_id, pi, interrupter, node, state): assert interrupter.check_point.execute_result is not None +def test_execute__has_reversed_rollback_plan(node_id, pi, interrupter, node, state): + runtime = MagicMock() + runtime.get_process_info = MagicMock(return_value=pi) + runtime.batch_get_state_name = MagicMock(return_value={"root": states.RUNNING}) + runtime.get_node = MagicMock(return_value=node) + runtime.get_config = MagicMock(return_value=True) + runtime.start_rollback = MagicMock(return_value=True) + runtime.get_state_or_none = MagicMock(return_value=None) + runtime.get_state = MagicMock(return_value=state) + runtime.set_state = MagicMock(return_value=state.version) + handler = MagicMock() + handler.execute = MagicMock( + return_value=ExecuteResult( + should_sleep=False, + schedule_ready=False, + schedule_type=None, + schedule_after=-1, + dispatch_processes=[], + next_node_id=None, + should_die=True, + ) + ) + + get_handler = MagicMock(return_value=handler) + + engine = Engine(runtime=runtime) + + with mock.patch( + "bamboo_engine.engine.HandlerFactory.get_handler", + get_handler, + ): + engine.execute(pi.process_id, node_id, pi.root_pipeline_id, pi.top_pipeline_id, interrupter, {}) + + runtime.beat.assert_called_once_with(pi.process_id) + runtime.get_node.assert_called_once_with(node_id) + runtime.get_state_or_none.assert_called_once_with(node_id) + runtime.node_rerun_limit.assert_not_called() + runtime.set_state.assert_called_once_with( + node_id=node.id, + to_state=states.RUNNING, + version=None, + loop=1, + inner_loop=1, + root_id=pi.root_pipeline_id, + parent_id=pi.top_pipeline_id, + set_started_time=True, + reset_skip=False, + reset_retry=False, + reset_error_ignored=False, + refresh_version=False, + ignore_boring_set=False, + ) + runtime.start_rollback.assert_called_once_with(pi.root_pipeline_id, node_id) + runtime.sleep.assert_not_called() + runtime.set_schedule.assert_not_called() + runtime.schedule.assert_not_called() + runtime.execute.assert_not_called() + runtime.die.assert_called_once_with(pi.process_id) + + get_handler.assert_called_once_with(node, runtime, interrupter) + + handler.execute.assert_called_once_with( + process_info=pi, + loop=state.loop, + inner_loop=state.loop, + version=state.version, + recover_point=interrupter.recover_point, + ) + + assert interrupter.check_point.name == ExecuteKeyPoint.EXECUTE_NODE_DONE + assert interrupter.check_point.state_already_exist is False + assert interrupter.check_point.running_node_version == "v" + assert interrupter.check_point.execute_result is not None + + def test_execute__recover_with_state_not_exsit(node_id, pi, interrupter, node, state, recover_point): recover_point.state_already_exist = False recover_point.running_node_version = "set_running_return_version" diff --git a/tests/engine/test_engine_schedule.py b/tests/engine/test_engine_schedule.py index f3cd6ddc..32ca385c 100644 --- a/tests/engine/test_engine_schedule.py +++ b/tests/engine/test_engine_schedule.py @@ -10,7 +10,7 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ - +import copy import mock import pytest @@ -128,7 +128,6 @@ def recover_point(): def test_schedule__lock_get_failed(node_id, schedule_id, state, pi, schedule, interrupter): - runtime = MagicMock() runtime.get_process_info = MagicMock(return_value=pi) runtime.apply_schedule_lock = MagicMock(return_value=False) @@ -470,7 +469,7 @@ def test_schedule__schedule_done(node_id, state, pi, schedule, node, interrupter runtime.get_node.assert_called_once_with(node_id) runtime.get_execution_data.assert_called_once_with(node_id) runtime.set_execution_data.assert_called_once_with(node_id=node.id, data=execution_data) - runtime.get_data_inputs.assert_called_once_with(pi.root_pipeline_id) + runtime.get_data_inputs.assert_has_calls([call("root"), call("root")]) runtime.get_callback_data.assert_not_called() service.hook_dispatch.assert_called_once() handler.schedule.assert_called_once_with( @@ -496,6 +495,71 @@ def test_schedule__schedule_done(node_id, state, pi, schedule, node, interrupter assert interrupter.check_point.lock_released is True +def test_schedule__schedule_done_with_node_reserved_rollback(node_id, state, pi, schedule, node, interrupter): + node = copy.deepcopy(node) + node.reserve_rollback = True + + schedule.type = ScheduleType.POLL + runtime = MagicMock() + runtime.get_process_info = MagicMock(return_value=pi) + runtime.apply_schedule_lock = MagicMock(return_value=True) + runtime.get_schedule = MagicMock(return_value=schedule) + runtime.start_rollback = MagicMock(return_value=True) + runtime.get_config = MagicMock(return_value=True) + runtime.get_state = MagicMock(return_value=state) + runtime.get_node = MagicMock(return_value=node) + runtime.node_finish = MagicMock() + handler = MagicMock() + handler.schedule = MagicMock( + return_value=ScheduleResult( + has_next_schedule=False, + schedule_after=-1, + schedule_done=True, + next_node_id="nid2", + ) + ) + get_handler = MagicMock(return_value=handler) + + engine = Engine(runtime=runtime) + + with mock.patch( + "bamboo_engine.engine.HandlerFactory.get_handler", + get_handler, + ): + engine.schedule(pi.process_id, node_id, schedule.id, interrupter, headers={}) + + runtime.beat.assert_called_once_with(pi.process_id) + runtime.get_process_info.assert_called_once_with(pi.process_id) + runtime.apply_schedule_lock.assert_called_once_with(schedule.id) + runtime.schedule.assert_not_called() + + runtime.start_rollback.assert_called_once_with(pi.root_pipeline_id, node_id) + + runtime.get_schedule.assert_called_once_with(schedule.id) + runtime.node_finish.assert_called_once_with(pi.root_pipeline_id, node.id) + + runtime.get_state.assert_has_calls([call(node_id), call(node_id)]) + runtime.get_node.assert_called_once_with(node_id) + runtime.get_callback_data.assert_not_called() + handler.schedule.assert_called_once_with( + process_info=pi, + loop=state.loop, + inner_loop=state.inner_loop, + schedule=schedule, + callback_data=None, + recover_point=interrupter.recover_point, + ) + runtime.set_next_schedule.assert_not_called() + runtime.finish_schedule.assert_called_once_with(schedule.id) + + assert interrupter.check_point.name == ScheduleKeyPoint.RELEASE_LOCK_DONE + assert interrupter.check_point.version_mismatch is False + assert interrupter.check_point.node_not_running is False + assert interrupter.check_point.lock_get is True + assert interrupter.check_point.schedule_result is not None + assert interrupter.check_point.lock_released is True + + def test_schedule__recover_version_mismatch(node_id, pi, state, schedule, interrupter, recover_point): recover_point.version_mismatch = True interrupter.recover_point = recover_point diff --git a/tests/utils/test_graph.py b/tests/utils/test_graph.py new file mode 100644 index 00000000..599d362a --- /dev/null +++ b/tests/utils/test_graph.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- +from bamboo_engine.utils.graph import RollbackGraph + + +def test_graph(): + graph1 = RollbackGraph([1, 2, 3, 4], [[1, 2], [2, 3], [3, 4]]) + assert not graph1.has_cycle() + assert graph1.get_cycle() == [] + graph2 = RollbackGraph([1, 2, 3, 4], [[1, 2], [2, 3], [3, 4], [4, 1]]) + assert graph2.has_cycle() + assert graph2.get_cycle() == [1, 2, 3, 4, 1] + graph3 = RollbackGraph([1, 2, 3, 4], [[1, 2], [2, 3], [3, 4], [4, 2]]) + assert graph3.has_cycle() + assert graph3.get_cycle() == [2, 3, 4, 2] + graph4 = RollbackGraph( + [ + "n20c4a0601193f268bfa168f1192eacd", + "nef42d10350b3961b53df7af67e16d9b", + "n0ada7b4abe63771a43052eaf188dc4b", + "n0cd3b95c714388bacdf1a486ab432fc", + "n1430047af8537f88710c4bbf3cbfb0f", + "n383748fe27434d582f0ca17af9d968a", + "n51426abd4be3a4691c80a73c3f93b3c", + "n854753a77933562ae72ec87c365f23d", + "n89f083892a731d7b9d7edb0f372006d", + "n8d4568db0ad364692b0387e86a2f1e0", + "n8daedbb02273a0fbc94cc118c90649f", + "n90b7ef55fe839b181879e036b4f8ffe", + "n99817348b4a36a6931854c93eed8c5f", + "na02956eba6f3a36ab9b0af2f2350213", + "nc3d0d49adf530bbaffe53630c184c0a", + "nca50848d1aa340f8c2b4776ce81868d", + "ncab9a48e79d357195dcee68dad3a31f", + "ncb4e013a6a8348bab087cc8500a3876", + "ne1f86f902a23e7fa4a67192e8b38a05", + "ne26def77df1385caa206c64e7e3ea53", + "nf3ebee137c53da28091ad7d140ce00c", + "nfc1dcdd7476393b9a81a988c113e1cf", + "n0197f8f210b3a1b8a7fc2f90e94744e", + "n01fb40259ad3cf285bb11a8bbbe59f2", + "n03f39191e8a32629145ba6a677ed040", + "n03ffc3b9e12316d8be63261cb9dec71", + "n07982b8985139249bca3a046f3a4379", + "n0b9e36e6b633ddb906d2044f658f110", + "n136c4fedebe3eb0ba932495aff6a945", + "n17cdc62c5d43976a413bda8f35634eb", + "n1d48483d8023439ad98d61d156c85fb", + "n26725bdcc0931fab0bc73e7244545ca", + "n2890db24f6c3cd1bbcd6b7d8cf2c045", + "n2ad9caac5b737bd897d4c8844c85f12", + "n2c88d1c1d8b35aebf883cbf259fb6bc", + "n302d25dfc9c369ab13104d5208e7119", + "n31688b7ab44338e9e6cb8dcaf259eef", + "n374443fbdc1313d98ebbe19d535fec2", + "n38c3dd0344a3f86bc7511c454bcdf4c", + "n3934eef90463940a6a9cf4ba2e63b1c", + "n40d5f0ca4bc3dd99c0b264cb186f00f", + "n476ddcb6dd33e2abac43596b08c2bc1", + "n4790f8aa48e335aa712e2af757e180b", + "n48bbfdc912334fc89c4f48c05e8969e", + "n5bef4f4532a382eaf79a0af70b2396b", + "n5ced56bcc863060ac4977755f35a5f5", + "n66a0562670e37648a3e05c243335bff", + "n6dc118cd3f7341d9ef8c97c63e2e9d9", + "n6e9d52e1ea53958a93e5b34022e7037", + "n786694b5ed33295a885b5bcd8c7c1ce", + "n7dccd56c80233469a4609f684ebe457", + "n8492d92ab6a3da48c2b49d6fcb8a479", + "n86a8b1a56f9399f90c4c227594a9d03", + "n8a805c0cd02307bad9f7828880b53dc", + "n8c7e35b0457300d9d6a96a6b1d18329", + "n91fdaed36403d06a07f4afe85e2892c", + "n9335d0718a937f9a39ec5b36d5637fe", + "n9372fb07ad936cba31f3d4e440f395a", + "n9ab96f926d83a93a5d3ebe2888fd343", + "na2a8a54e68033d0a276eb88dbff91c3", + "na493a7b5d5b3cc29f4070a6c4589cb7", + "nadfa68cb2503a39aac6626d6c72484a", + "nae1218ddd2e3448b562bc79dc084401", + "nc012287be793377b975b0230b35d713", + "ncb2e01f0c5336fe82b0e0e496f2612b", + "ncb5843900903b4c8a0a8302474d8c51", + "ncbf4db2c48f3348b2c7081f9e3b363a", + "nd4ee6c3248935ce9239e4bb20a81ab8", + "ndb1cf7af0e2319c9868530d0df8fd93", + "ne36a6858a733430bffa4fec053dc1ab", + "ne7af4a7c3613b3d81fe9e6046425a36", + "ne8035dd8de732758c1cc623f80f2fc8", + "ned91fdb914c35f3a21f320f62d72ffd", + "nf5448b3c66430f4a299d08208d313a6", + "nfaa0756a06f300495fb2e2e45e05ed3", + ], + [ + ["n8d4568db0ad364692b0387e86a2f1e0", "n5bef4f4532a382eaf79a0af70b2396b"], + ["n8daedbb02273a0fbc94cc118c90649f", "nf5448b3c66430f4a299d08208d313a6"], + ["n01fb40259ad3cf285bb11a8bbbe59f2", "ne1f86f902a23e7fa4a67192e8b38a05"], + ["ncab9a48e79d357195dcee68dad3a31f", "n0197f8f210b3a1b8a7fc2f90e94744e"], + ["na493a7b5d5b3cc29f4070a6c4589cb7", "ne1f86f902a23e7fa4a67192e8b38a05"], + ["n89f083892a731d7b9d7edb0f372006d", "n136c4fedebe3eb0ba932495aff6a945"], + ["n51426abd4be3a4691c80a73c3f93b3c", "n9ab96f926d83a93a5d3ebe2888fd343"], + ["n89f083892a731d7b9d7edb0f372006d", "n8492d92ab6a3da48c2b49d6fcb8a479"], + ["n17cdc62c5d43976a413bda8f35634eb", "n6e9d52e1ea53958a93e5b34022e7037"], + ["n476ddcb6dd33e2abac43596b08c2bc1", "ne1f86f902a23e7fa4a67192e8b38a05"], + ["n6dc118cd3f7341d9ef8c97c63e2e9d9", "nfc1dcdd7476393b9a81a988c113e1cf"], + ["n91fdaed36403d06a07f4afe85e2892c", "ncb4e013a6a8348bab087cc8500a3876"], + ["n8a805c0cd02307bad9f7828880b53dc", "n3934eef90463940a6a9cf4ba2e63b1c"], + ["n2890db24f6c3cd1bbcd6b7d8cf2c045", "n0ada7b4abe63771a43052eaf188dc4b"], + ["ned91fdb914c35f3a21f320f62d72ffd", "n383748fe27434d582f0ca17af9d968a"], + ["n89f083892a731d7b9d7edb0f372006d", "n0b9e36e6b633ddb906d2044f658f110"], + ["nc3d0d49adf530bbaffe53630c184c0a", "na493a7b5d5b3cc29f4070a6c4589cb7"], + ["ncb2e01f0c5336fe82b0e0e496f2612b", "nc012287be793377b975b0230b35d713"], + ["n86a8b1a56f9399f90c4c227594a9d03", "nf3ebee137c53da28091ad7d140ce00c"], + ["nc3d0d49adf530bbaffe53630c184c0a", "nadfa68cb2503a39aac6626d6c72484a"], + ["na02956eba6f3a36ab9b0af2f2350213", "na2a8a54e68033d0a276eb88dbff91c3"], + ["n8daedbb02273a0fbc94cc118c90649f", "n07982b8985139249bca3a046f3a4379"], + ["n136c4fedebe3eb0ba932495aff6a945", "nfc1dcdd7476393b9a81a988c113e1cf"], + ["n9372fb07ad936cba31f3d4e440f395a", "n1430047af8537f88710c4bbf3cbfb0f"], + ["n8d4568db0ad364692b0387e86a2f1e0", "n91fdaed36403d06a07f4afe85e2892c"], + ["n854753a77933562ae72ec87c365f23d", "n40d5f0ca4bc3dd99c0b264cb186f00f"], + ["n854753a77933562ae72ec87c365f23d", "n1d48483d8023439ad98d61d156c85fb"], + ["n9ab96f926d83a93a5d3ebe2888fd343", "n383748fe27434d582f0ca17af9d968a"], + ["ne36a6858a733430bffa4fec053dc1ab", "n0cd3b95c714388bacdf1a486ab432fc"], + ["n03ffc3b9e12316d8be63261cb9dec71", "nca50848d1aa340f8c2b4776ce81868d"], + ["ne8035dd8de732758c1cc623f80f2fc8", "n0ada7b4abe63771a43052eaf188dc4b"], + ["n51426abd4be3a4691c80a73c3f93b3c", "ned91fdb914c35f3a21f320f62d72ffd"], + ["nd4ee6c3248935ce9239e4bb20a81ab8", "nfaa0756a06f300495fb2e2e45e05ed3"], + ["n5bef4f4532a382eaf79a0af70b2396b", "ncb4e013a6a8348bab087cc8500a3876"], + ["ne26def77df1385caa206c64e7e3ea53", "n786694b5ed33295a885b5bcd8c7c1ce"], + ["n854753a77933562ae72ec87c365f23d", "ne8035dd8de732758c1cc623f80f2fc8"], + ["n374443fbdc1313d98ebbe19d535fec2", "ndb1cf7af0e2319c9868530d0df8fd93"], + ["nfaa0756a06f300495fb2e2e45e05ed3", "n8c7e35b0457300d9d6a96a6b1d18329"], + ["n90b7ef55fe839b181879e036b4f8ffe", "n26725bdcc0931fab0bc73e7244545ca"], + ["n8d4568db0ad364692b0387e86a2f1e0", "ncb2e01f0c5336fe82b0e0e496f2612b"], + ["ncb5843900903b4c8a0a8302474d8c51", "ncb4e013a6a8348bab087cc8500a3876"], + ["nf5448b3c66430f4a299d08208d313a6", "nf3ebee137c53da28091ad7d140ce00c"], + ["n20c4a0601193f268bfa168f1192eacd", "nd4ee6c3248935ce9239e4bb20a81ab8"], + ["nca50848d1aa340f8c2b4776ce81868d", "nc3d0d49adf530bbaffe53630c184c0a"], + ["na02956eba6f3a36ab9b0af2f2350213", "n03ffc3b9e12316d8be63261cb9dec71"], + ["n7dccd56c80233469a4609f684ebe457", "n8daedbb02273a0fbc94cc118c90649f"], + ["n0ada7b4abe63771a43052eaf188dc4b", "na02956eba6f3a36ab9b0af2f2350213"], + ["n9335d0718a937f9a39ec5b36d5637fe", "n99817348b4a36a6931854c93eed8c5f"], + ["n90b7ef55fe839b181879e036b4f8ffe", "n5ced56bcc863060ac4977755f35a5f5"], + ["ncb4e013a6a8348bab087cc8500a3876", "ne26def77df1385caa206c64e7e3ea53"], + ["na02956eba6f3a36ab9b0af2f2350213", "n4790f8aa48e335aa712e2af757e180b"], + ["nc012287be793377b975b0230b35d713", "ncb4e013a6a8348bab087cc8500a3876"], + ["n8d4568db0ad364692b0387e86a2f1e0", "ncb5843900903b4c8a0a8302474d8c51"], + ["n40d5f0ca4bc3dd99c0b264cb186f00f", "n0ada7b4abe63771a43052eaf188dc4b"], + ["n38c3dd0344a3f86bc7511c454bcdf4c", "n17cdc62c5d43976a413bda8f35634eb"], + ["n6e9d52e1ea53958a93e5b34022e7037", "n90b7ef55fe839b181879e036b4f8ffe"], + ["nf3ebee137c53da28091ad7d140ce00c", "n51426abd4be3a4691c80a73c3f93b3c"], + ["n99817348b4a36a6931854c93eed8c5f", "n89f083892a731d7b9d7edb0f372006d"], + ["n89f083892a731d7b9d7edb0f372006d", "n6dc118cd3f7341d9ef8c97c63e2e9d9"], + ["n8daedbb02273a0fbc94cc118c90649f", "n66a0562670e37648a3e05c243335bff"], + ["nadfa68cb2503a39aac6626d6c72484a", "ne1f86f902a23e7fa4a67192e8b38a05"], + ["n383748fe27434d582f0ca17af9d968a", "nef42d10350b3961b53df7af67e16d9b"], + ["na02956eba6f3a36ab9b0af2f2350213", "n03f39191e8a32629145ba6a677ed040"], + ["nae1218ddd2e3448b562bc79dc084401", "n383748fe27434d582f0ca17af9d968a"], + ["n26725bdcc0931fab0bc73e7244545ca", "n1430047af8537f88710c4bbf3cbfb0f"], + ["n48bbfdc912334fc89c4f48c05e8969e", "n8a805c0cd02307bad9f7828880b53dc"], + ["ne7af4a7c3613b3d81fe9e6046425a36", "ncb4e013a6a8348bab087cc8500a3876"], + ["nfc1dcdd7476393b9a81a988c113e1cf", "n8d4568db0ad364692b0387e86a2f1e0"], + ["n0197f8f210b3a1b8a7fc2f90e94744e", "n99817348b4a36a6931854c93eed8c5f"], + ["n90b7ef55fe839b181879e036b4f8ffe", "n302d25dfc9c369ab13104d5208e7119"], + ["n1d48483d8023439ad98d61d156c85fb", "n0ada7b4abe63771a43052eaf188dc4b"], + ["na2a8a54e68033d0a276eb88dbff91c3", "nca50848d1aa340f8c2b4776ce81868d"], + ["n90b7ef55fe839b181879e036b4f8ffe", "n9372fb07ad936cba31f3d4e440f395a"], + ["ndb1cf7af0e2319c9868530d0df8fd93", "n2ad9caac5b737bd897d4c8844c85f12"], + ["n8492d92ab6a3da48c2b49d6fcb8a479", "nfc1dcdd7476393b9a81a988c113e1cf"], + ["n8d4568db0ad364692b0387e86a2f1e0", "ne7af4a7c3613b3d81fe9e6046425a36"], + ["n302d25dfc9c369ab13104d5208e7119", "n1430047af8537f88710c4bbf3cbfb0f"], + ["n51426abd4be3a4691c80a73c3f93b3c", "n2c88d1c1d8b35aebf883cbf259fb6bc"], + ["n786694b5ed33295a885b5bcd8c7c1ce", "n0cd3b95c714388bacdf1a486ab432fc"], + ["n854753a77933562ae72ec87c365f23d", "n2890db24f6c3cd1bbcd6b7d8cf2c045"], + ["nc3d0d49adf530bbaffe53630c184c0a", "n476ddcb6dd33e2abac43596b08c2bc1"], + ["n2c88d1c1d8b35aebf883cbf259fb6bc", "n383748fe27434d582f0ca17af9d968a"], + ["n0cd3b95c714388bacdf1a486ab432fc", "n854753a77933562ae72ec87c365f23d"], + ["n51426abd4be3a4691c80a73c3f93b3c", "nae1218ddd2e3448b562bc79dc084401"], + ["nc3d0d49adf530bbaffe53630c184c0a", "n01fb40259ad3cf285bb11a8bbbe59f2"], + ["ne1f86f902a23e7fa4a67192e8b38a05", "n374443fbdc1313d98ebbe19d535fec2"], + ["n0b9e36e6b633ddb906d2044f658f110", "nfc1dcdd7476393b9a81a988c113e1cf"], + ["ncab9a48e79d357195dcee68dad3a31f", "ncbf4db2c48f3348b2c7081f9e3b363a"], + ["n8daedbb02273a0fbc94cc118c90649f", "n86a8b1a56f9399f90c4c227594a9d03"], + ["ncbf4db2c48f3348b2c7081f9e3b363a", "n99817348b4a36a6931854c93eed8c5f"], + ["n1430047af8537f88710c4bbf3cbfb0f", "ncab9a48e79d357195dcee68dad3a31f"], + ["n4790f8aa48e335aa712e2af757e180b", "nca50848d1aa340f8c2b4776ce81868d"], + ["ne26def77df1385caa206c64e7e3ea53", "ne36a6858a733430bffa4fec053dc1ab"], + ["ncab9a48e79d357195dcee68dad3a31f", "n31688b7ab44338e9e6cb8dcaf259eef"], + ["n07982b8985139249bca3a046f3a4379", "nf3ebee137c53da28091ad7d140ce00c"], + ["n66a0562670e37648a3e05c243335bff", "nf3ebee137c53da28091ad7d140ce00c"], + ["n03f39191e8a32629145ba6a677ed040", "nca50848d1aa340f8c2b4776ce81868d"], + ["n8c7e35b0457300d9d6a96a6b1d18329", "n38c3dd0344a3f86bc7511c454bcdf4c"], + ["n5ced56bcc863060ac4977755f35a5f5", "n1430047af8537f88710c4bbf3cbfb0f"], + ["n2ad9caac5b737bd897d4c8844c85f12", "n48bbfdc912334fc89c4f48c05e8969e"], + ["n31688b7ab44338e9e6cb8dcaf259eef", "n99817348b4a36a6931854c93eed8c5f"], + ["n3934eef90463940a6a9cf4ba2e63b1c", "n7dccd56c80233469a4609f684ebe457"], + ["ncab9a48e79d357195dcee68dad3a31f", "n9335d0718a937f9a39ec5b36d5637fe"], + ], + ) + assert not graph4.has_cycle() + assert graph4.get_cycle() == [] + graph5 = RollbackGraph([1, 2, 3, 4, 5], [[1, 2], [2, 3], [2, 4], [4, 5], [5, 2]]) + assert graph5.has_cycle() + assert graph5.get_cycle() == [2, 4, 5, 2] + + graph6 = graph5.reverse() + assert graph6.next(2), {1, 5}