Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: 修复节点快照某些值无法正确序列化的问题 #183

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
import codecs
import json
import pickle

from django.db.models import TextField


class SerializerField(TextField):
"""
特定的序列化类,用于兼容json和pickle两种序列化数据
"""

def to_python(self, value):
try:
return json.loads(value)
except Exception:
return pickle.loads(codecs.decode(value.encode(), "base64"))

def from_db_value(self, value, expression, connection, context=None):
try:
return json.loads(value)
except Exception:
return pickle.loads(codecs.decode(value.encode(), "base64"))

def get_prep_value(self, value):
try:
return json.dumps(value)
except TypeError:
return codecs.encode(pickle.dumps(value), "base64").decode()
pass
70 changes: 27 additions & 43 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,32 @@ def __init__(self, root_pipeline_id):
RollbackValidator.validate_pipeline(root_pipeline_id)

def get_allowed_rollback_node_id_list(self, start_node_id):
return []
"""
获取允许回滚的节点范围
规则:token 一致的节点允许回滚
"""
try:
rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id)
except RollbackToken.DoesNotExist:
raise RollBackException(
"rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id)
)
node_map = self._get_allowed_rollback_node_map()
service_activity_node_list = [
node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity
]

tokens = json.loads(rollback_token.token)
start_token = tokens.get(start_node_id)
if not start_token:
return []

nodes = []
for node_id, token in tokens.items():
if start_token == token and node_id != start_node_id and node_id in service_activity_node_list:
nodes.append(node_id)

return nodes

def _get_allowed_rollback_node_map(self):
# 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表
Expand Down Expand Up @@ -232,20 +257,6 @@ def cancel_reserved_rollback(self, start_node_id, target_node_id):
class AnyRollbackHandler(BaseRollbackHandler):
mode = ANY

def get_allowed_rollback_node_id_list(self, start_node_id):
node_map = self._get_allowed_rollback_node_map()
start_node_state = (
State.objects.filter(root_id=self.root_pipeline_id)
.exclude(node_id=self.root_pipeline_id)
.order_by("created_time")
.first()
)
target_node_id = start_node_state.node_id
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)
graph, _ = rollback_graph.build_rollback_graph()

return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id})

def retry_rollback_failed_node(self, node_id, retry_data):
""" """
raise RollBackException("rollback failed: when mode is any, not support retry")
Expand All @@ -268,6 +279,7 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
RollbackValidator.validate_start_node_id(self.root_pipeline_id, start_node_id)
RollbackValidator.validate_node(start_node_id, allow_failed=True)
RollbackValidator.validate_node(target_node_id)
RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id)

node_map = self._get_allowed_rollback_node_map()
rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id)
Expand All @@ -294,34 +306,6 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None):
class TokenRollbackHandler(BaseRollbackHandler):
mode = TOKEN

def get_allowed_rollback_node_id_list(self, start_node_id):
"""
获取允许回滚的节点范围
规则:token 一致的节点允许回滚
"""
try:
rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id)
except RollbackToken.DoesNotExist:
raise RollBackException(
"rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id)
)
node_map = self._get_allowed_rollback_node_map()
service_activity_node_list = [
node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity
]

tokens = json.loads(rollback_token.token)
start_token = tokens.get(start_node_id)
if not start_token:
return []

nodes = []
for node_id, token in tokens.items():
if start_token == token and node_id != start_node_id and node_id in service_activity_node_list:
nodes.append(node_id)

return nodes

def retry_rollback_failed_node(self, node_id, retry_data):
"""
重试回滚失败的节点
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 3.2.18 on 2023-10-20 12:34

import pipeline.contrib.rollback.fields
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("rollback", "0001_initial"),
]

operations = [
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="context_values",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="pipeline context values"),
),
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="inputs",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node inputs"),
),
migrations.AlterField(
model_name="rollbacknodesnapshot",
name="outputs",
field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node outputs"),
),
]
7 changes: 4 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from django.db import models
from django.utils.translation import ugettext_lazy as _
from pipeline.contrib.rollback.constants import TOKEN
from pipeline.contrib.rollback.fields import SerializerField


class RollbackToken(models.Model):
Expand Down Expand Up @@ -38,9 +39,9 @@ class RollbackNodeSnapshot(models.Model):
node_id = models.CharField(verbose_name="node_id", max_length=64, db_index=True)
code = models.CharField(verbose_name="node_code", max_length=64)
version = models.CharField(verbose_name=_("version"), null=False, max_length=33)
inputs = models.TextField(verbose_name=_("node inputs"))
outputs = models.TextField(verbose_name=_("node outputs"))
context_values = models.TextField(verbose_name=_("pipeline context values"))
inputs = SerializerField(verbose_name=_("node inputs"))
outputs = SerializerField(verbose_name=_("node outputs"))
context_values = SerializerField(verbose_name=_("pipeline context values"))
rolled_back = models.BooleanField(_("whether the node rolls back"), default=False)


Expand Down
7 changes: 3 additions & 4 deletions runtime/bamboo-pipeline/pipeline/eri/imp/rollback.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging

from django.apps import apps
from django.core.serializers.json import DjangoJSONEncoder

from bamboo_engine.builder.builder import generate_pipeline_token

Expand Down Expand Up @@ -49,9 +48,9 @@ def set_node_snapshot(self, root_pipeline_id, node_id, code, version, context_va
node_id=node_id,
code=code,
version=version,
context_values=json.dumps(context_values, cls=DjangoJSONEncoder),
inputs=json.dumps(inputs, cls=DjangoJSONEncoder),
outputs=json.dumps(outputs, cls=DjangoJSONEncoder),
context_values=context_values,
inputs=inputs,
outputs=outputs,
)

def start_rollback(self, root_pipeline_id, node_id):
Expand Down
Loading