From bdd6a094e216a8880d734955c9da2495159c5120 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E6=95=B0?= <33194175+hanshuaikang@users.noreply.github.com> Date: Tue, 24 Oct 2023 14:43:27 +0800 Subject: [PATCH] =?UTF-8?q?bugfix:=20=E4=BF=AE=E5=A4=8D=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E5=BF=AB=E7=85=A7=E6=9F=90=E4=BA=9B=E5=80=BC=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E5=BA=8F=E5=88=97=E5=8C=96=E7=9A=84=E9=97=AE?= =?UTF-8?q?=E9=A2=98=20(#183)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * bugfix: 修复节点快照某些值无法正确序列化的问题 * minor: any 模式下的回滚遵循token校验 * minor: any 模式增加token校验 --- .../pipeline/contrib/rollback/fields.py | 31 +++++++ .../pipeline/contrib/rollback/handler.py | 82 ++++++------------- .../migrations/0002_auto_20231020_1234.py | 29 +++++++ .../pipeline/contrib/rollback/models.py | 7 +- .../pipeline/eri/imp/rollback.py | 7 +- 5 files changed, 94 insertions(+), 62 deletions(-) create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py new file mode 100644 index 0000000..8f6462c --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +import codecs +import json +import pickle + +from django.db.models import TextField + + +class SerializerField(TextField): + """ + 特定的序列化类,用于兼容json和pickle两种序列化数据 + """ + + def to_python(self, value): + try: + return json.loads(value) + except Exception: + return pickle.loads(codecs.decode(value.encode(), "base64")) + + def from_db_value(self, value, expression, connection, context=None): + try: + return json.loads(value) + except Exception: + return pickle.loads(codecs.decode(value.encode(), "base64")) + + def get_prep_value(self, value): + try: + return json.dumps(value) + except TypeError: + return codecs.encode(pickle.dumps(value), "base64").decode() + pass diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py index a639404..691cff2 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py @@ -153,7 +153,32 @@ def __init__(self, root_pipeline_id): RollbackValidator.validate_pipeline(root_pipeline_id) def get_allowed_rollback_node_id_list(self, start_node_id): - return [] + """ + 获取允许回滚的节点范围 + 规则:token 一致的节点允许回滚 + """ + try: + rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id) + except RollbackToken.DoesNotExist: + raise RollBackException( + "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) + ) + node_map = self._get_allowed_rollback_node_map() + service_activity_node_list = [ + node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity + ] + + tokens = json.loads(rollback_token.token) + start_token = tokens.get(start_node_id) + if not start_token: + return [] + + nodes = [] + for node_id, token in tokens.items(): + if start_token == token and node_id != start_node_id and node_id in service_activity_node_list: + nodes.append(node_id) + + return nodes def _get_allowed_rollback_node_map(self): # 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表 @@ -232,42 +257,17 @@ def cancel_reserved_rollback(self, start_node_id, target_node_id): class AnyRollbackHandler(BaseRollbackHandler): mode = ANY - def get_allowed_rollback_node_id_list(self, start_node_id): - node_map = self._get_allowed_rollback_node_map() - start_node_state = ( - State.objects.filter(root_id=self.root_pipeline_id) - .exclude(node_id=self.root_pipeline_id) - .order_by("created_time") - .first() - ) - target_node_id = start_node_state.node_id - rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) - graph, _ = rollback_graph.build_rollback_graph() - - return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id}) - def retry_rollback_failed_node(self, node_id, retry_data): """ """ raise RollBackException("rollback failed: when mode is any, not support retry") - def reserve_rollback(self, start_node_id, target_node_id): - """ - 预约回滚 - """ - self._reserve(start_node_id, target_node_id) - - def cancel_reserved_rollback(self, start_node_id, target_node_id): - """ - 取消预约回滚 - """ - self._reserve(start_node_id, target_node_id, reserve_rollback=False) - def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): RollbackValidator.validate_node_state_by_any_mode(self.root_pipeline_id) # 回滚的开始节点运行失败的情况 RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id) RollbackValidator.validate_node(start_node_id, allow_failed=True) RollbackValidator.validate_node(target_node_id) + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) node_map = self._get_allowed_rollback_node_map() rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) @@ -294,34 +294,6 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): class TokenRollbackHandler(BaseRollbackHandler): mode = TOKEN - def get_allowed_rollback_node_id_list(self, start_node_id): - """ - 获取允许回滚的节点范围 - 规则:token 一致的节点允许回滚 - """ - try: - rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id) - except RollbackToken.DoesNotExist: - raise RollBackException( - "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) - ) - node_map = self._get_allowed_rollback_node_map() - service_activity_node_list = [ - node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity - ] - - tokens = json.loads(rollback_token.token) - start_token = tokens.get(start_node_id) - if not start_token: - return [] - - nodes = [] - for node_id, token in tokens.items(): - if start_token == token and node_id != start_node_id and node_id in service_activity_node_list: - nodes.append(node_id) - - return nodes - def retry_rollback_failed_node(self, node_id, retry_data): """ 重试回滚失败的节点 diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py new file mode 100644 index 0000000..ae217a0 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py @@ -0,0 +1,29 @@ +# Generated by Django 3.2.18 on 2023-10-20 12:34 + +import pipeline.contrib.rollback.fields +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("rollback", "0001_initial"), + ] + + operations = [ + migrations.AlterField( + model_name="rollbacknodesnapshot", + name="context_values", + field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="pipeline context values"), + ), + migrations.AlterField( + model_name="rollbacknodesnapshot", + name="inputs", + field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node inputs"), + ), + migrations.AlterField( + model_name="rollbacknodesnapshot", + name="outputs", + field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node outputs"), + ), + ] diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py index b482f0a..ab452cf 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py @@ -2,6 +2,7 @@ from django.db import models from django.utils.translation import ugettext_lazy as _ from pipeline.contrib.rollback.constants import TOKEN +from pipeline.contrib.rollback.fields import SerializerField class RollbackToken(models.Model): @@ -38,9 +39,9 @@ class RollbackNodeSnapshot(models.Model): node_id = models.CharField(verbose_name="node_id", max_length=64, db_index=True) code = models.CharField(verbose_name="node_code", max_length=64) version = models.CharField(verbose_name=_("version"), null=False, max_length=33) - inputs = models.TextField(verbose_name=_("node inputs")) - outputs = models.TextField(verbose_name=_("node outputs")) - context_values = models.TextField(verbose_name=_("pipeline context values")) + inputs = SerializerField(verbose_name=_("node inputs")) + outputs = SerializerField(verbose_name=_("node outputs")) + context_values = SerializerField(verbose_name=_("pipeline context values")) rolled_back = models.BooleanField(_("whether the node rolls back"), default=False) diff --git a/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py index 99ec147..ea1a15a 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py +++ b/runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py @@ -3,7 +3,6 @@ import logging from django.apps import apps -from django.core.serializers.json import DjangoJSONEncoder from bamboo_engine.builder.builder import generate_pipeline_token @@ -49,9 +48,9 @@ def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_va node_id=node_id, code=code, version=version, - context_values=json.dumps(context_values, cls=DjangoJSONEncoder), - inputs=json.dumps(inputs, cls=DjangoJSONEncoder), - outputs=json.dumps(outputs, cls=DjangoJSONEncoder), + context_values=context_values, + inputs=inputs, + outputs=outputs, ) def start_rollback(self, root_pipeline_id, node_id):