Skip to content

Commit

Permalink
bugfix: 修复节点快照某些值无法正确序列化的问题 (#183)
Browse files Browse the repository at this point in the history
* bugfix: 修复节点快照某些值无法正确序列化的问题

* minor: any 模式下的回滚遵循token校验

* minor: any 模式增加token校验
  • Loading branch information
hanshuaikang authored Oct 24, 2023
1 parent 6efaf05 commit bdd6a09
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 62 deletions.
31 changes: 31 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import codecs
import json
import pickle

from django.db.models import TextField


class SerializerField(TextField):
"""
特定的序列化类,用于兼容json和pickle两种序列化数据
"""

def to_python(self, value):
try:
return json.loads(value)
except Exception:
return pickle.loads(codecs.decode(value.encode(), "base64"))

def from_db_value(self, value, expression, connection, context=None):
try:
return json.loads(value)
except Exception:
return pickle.loads(codecs.decode(value.encode(), "base64"))

def get_prep_value(self, value):
try:
return json.dumps(value)
except TypeError:
return codecs.encode(pickle.dumps(value), "base64").decode()
pass
82 changes: 27 additions & 55 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,32 @@ def __init__(self, root_pipeline_id):
RollbackValidator.validate_pipeline(root_pipeline_id)

def get_allowed_rollback_node_id_list(self, start_node_id):
return []
"""
获取允许回滚的节点范围
规则:token 一致的节点允许回滚
"""
try:
rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id)
except RollbackToken.DoesNotExist:
raise RollBackException(
"rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id)
)
node_map = self._get_allowed_rollback_node_map()
service_activity_node_list = [
node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity
]

tokens = json.loads(rollback_token.token)
start_token = tokens.get(start_node_id)
if not start_token:
return []

nodes = []
for node_id, token in tokens.items():
if start_token == token and node_id != start_node_id and node_id in service_activity_node_list:
nodes.append(node_id)

return nodes

def _get_allowed_rollback_node_map(self):
# 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表
Expand Down Expand Up @@ -232,42 +257,17 @@ def cancel_reserved_rollback(self, start_node_id, target_node_id):
class AnyRollbackHandler(BaseRollbackHandler):
mode = ANY

def get_allowed_rollback_node_id_list(self, start_node_id):
node_map = self._get_allowed_rollback_node_map()
start_node_state = (
State.objects.filter(root_id=self.root_pipeline_id)
.exclude(node_id=self.root_pipeline_id)
.order_by("created_time")
.first()
)
target_node_id = start_node_state.node_id
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)
graph, _ = rollback_graph.build_rollback_graph()

return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id})

def retry_rollback_failed_node(self, node_id, retry_data):
""" """
raise RollBackException("rollback failed: when mode is any, not support retry")

def reserve_rollback(self, start_node_id, target_node_id):
"""
预约回滚
"""
self._reserve(start_node_id, target_node_id)

def cancel_reserved_rollback(self, start_node_id, target_node_id):
"""
取消预约回滚
"""
self._reserve(start_node_id, target_node_id, reserve_rollback=False)

def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
RollbackValidator.validate_node_state_by_any_mode(self.root_pipeline_id)
# 回滚的开始节点运行失败的情况
RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id)
RollbackValidator.validate_node(start_node_id, allow_failed=True)
RollbackValidator.validate_node(target_node_id)
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)

node_map = self._get_allowed_rollback_node_map()
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)
Expand All @@ -294,34 +294,6 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
class TokenRollbackHandler(BaseRollbackHandler):
mode = TOKEN

def get_allowed_rollback_node_id_list(self, start_node_id):
"""
获取允许回滚的节点范围
规则:token 一致的节点允许回滚
"""
try:
rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id)
except RollbackToken.DoesNotExist:
raise RollBackException(
"rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id)
)
node_map = self._get_allowed_rollback_node_map()
service_activity_node_list = [
node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity
]

tokens = json.loads(rollback_token.token)
start_token = tokens.get(start_node_id)
if not start_token:
return []

nodes = []
for node_id, token in tokens.items():
if start_token == token and node_id != start_node_id and node_id in service_activity_node_list:
nodes.append(node_id)

return nodes

def retry_rollback_failed_node(self, node_id, retry_data):
"""
重试回滚失败的节点
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 3.2.18 on 2023-10-20 12:34

import pipeline.contrib.rollback.fields
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("rollback", "0001_initial"),
]

operations = [
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="context_values",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="pipeline context values"),
),
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="inputs",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node inputs"),
),
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="outputs",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node outputs"),
),
]
7 changes: 4 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.db import models
from django.utils.translation import ugettext_lazy as _
from pipeline.contrib.rollback.constants import TOKEN
from pipeline.contrib.rollback.fields import SerializerField


class RollbackToken(models.Model):
Expand Down Expand Up @@ -38,9 +39,9 @@ class RollbackNodeSnapshot(models.Model):
node_id = models.CharField(verbose_name="node_id", max_length=64, db_index=True)
code = models.CharField(verbose_name="node_code", max_length=64)
version = models.CharField(verbose_name=_("version"), null=False, max_length=33)
inputs = models.TextField(verbose_name=_("node inputs"))
outputs = models.TextField(verbose_name=_("node outputs"))
context_values = models.TextField(verbose_name=_("pipeline context values"))
inputs = SerializerField(verbose_name=_("node inputs"))
outputs = SerializerField(verbose_name=_("node outputs"))
context_values = SerializerField(verbose_name=_("pipeline context values"))
rolled_back = models.BooleanField(_("whether the node rolls back"), default=False)


Expand Down
7 changes: 3 additions & 4 deletions runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging

from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder

from bamboo_engine.builder.builder import generate_pipeline_token

Expand Down Expand Up @@ -49,9 +48,9 @@ def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_va
node_id=node_id,
code=code,
version=version,
context_values=json.dumps(context_values, cls=DjangoJSONEncoder),
inputs=json.dumps(inputs, cls=DjangoJSONEncoder),
outputs=json.dumps(outputs, cls=DjangoJSONEncoder),
context_values=context_values,
inputs=inputs,
outputs=outputs,
)

def start_rollback(self, root_pipeline_id, node_id):
Expand Down

0 comments on commit bdd6a09

Please sign in to comment.