Skip to content

Commit

Permalink
feature: 支持单节点执行方案
Browse files Browse the repository at this point in the history
  • Loading branch information
hanshuaikang committed Nov 24, 2023
1 parent 14cd46f commit 3e385ba
Show file tree
Hide file tree
Showing 19 changed files with 776 additions and 12 deletions.
1 change: 1 addition & 0 deletions runtime/bamboo-pipeline/pipeline/conf/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@
ENABLE_PIPELINE_EVENT_SIGNALS = getattr(settings, "ENABLE_PIPELINE_EVENT_SIGNALS", False)

ROLLBACK_QUEUE = getattr(settings, "ROLLBACK_QUEUE", "rollback")
PLUGIN_EXECUTE_QUEUE = getattr(settings, "PLUGIN_EXECUTE_QUEUE", "plugin_execute")
4 changes: 4 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ class RollBackException(PipelineException):

class UpdatePipelineContextException(PipelineException):
pass


class PluginExecuteException(PipelineException):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
41 changes: 41 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from pipeline.contrib.plugin_execute.handler import PluginExecuteHandler
from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result


@ensure_return_pipeline_contrib_api_result
def run(component_code: str, version: str, inputs: dict, contexts: dict, runtime_attr: dict = None):
task_id = PluginExecuteHandler.run(component_code, version, inputs, contexts, runtime_attr)
return task_id


@ensure_return_pipeline_contrib_api_result
def get_state(task_id: int):
return PluginExecuteHandler.get_state(task_id)


@ensure_return_pipeline_contrib_api_result
def callback(task_id: int, callback_data: dict = None):
PluginExecuteHandler.callback(task_id, callback_data)


@ensure_return_pipeline_contrib_api_result
def forced_fail(task_id):
PluginExecuteHandler.forced_fail(task_id)


@ensure_return_pipeline_contrib_api_result
def retry(task_id: int, inputs: dict = None, context: dict = None, runtime_attr: dict = None):
PluginExecuteHandler.retry_node(task_id, inputs, context, runtime_attr)
21 changes: 21 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from django.apps import AppConfig


class RollbackConfig(AppConfig):
name = "pipeline.contrib.plugin_execute"
verbose_name = "PipelinePluginExecute"

def ready(self):
from pipeline.contrib.plugin_execute import tasks # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""


class State:
READY = "READY"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
FAILED = "FAILED"
168 changes: 168 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import logging

from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE
from pipeline.contrib.exceptions import PluginExecuteException
from pipeline.contrib.plugin_execute.contants import State
from pipeline.contrib.plugin_execute.models import PluginExecuteTask, get_schedule_lock
from pipeline.contrib.plugin_execute.tasks import execute, schedule

logger = logging.getLogger("celery")


def _retry_once(action: callable):
try:
action()
except Exception:
try:
action()
except Exception as e:
raise e


class PluginExecuteHandler:
@classmethod
def run(cls, component_code: str, version: str, inputs: dict, contexts: dict, runtime_attrs: dict = None):
if runtime_attrs is None:
runtime_attrs = {}

if not (isinstance(inputs, dict) and isinstance(contexts, dict) and isinstance(runtime_attrs, dict)):
raise PluginExecuteException("[plugin_execute_run] error, the inputs, contexts, runtime_attrs must be dict")

plugin_execute_task = PluginExecuteTask.objects.create(
state=State.READY,
inputs=inputs,
version=version,
component_code=component_code,
contexts=contexts,
runtime_attrs=runtime_attrs,
)

def action():
# 发送执行任务
result = execute.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE)
logger.info(
"[plugin_execute_run] send execute task, celery task_id = {}, plugin_execute_task_id = {}".format(
result.id, plugin_execute_task.id
)
)

try:
_retry_once(action=action)
except Exception as e:
# 如果任务启动出现异常,则删除任务
plugin_execute_task.delete()
raise e

return plugin_execute_task.id

@classmethod
def get_state(cls, task_id):
"""
获取任务状态
@param task_id:
@return:
"""
# 直接抛出异常让上层去捕获
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
return {
"task_id": plugin_execute_task.id,
"state": plugin_execute_task.state,
"component_code": plugin_execute_task.component_code,
"version": plugin_execute_task.version,
"invoke_count": plugin_execute_task.invoke_count,
"inputs": plugin_execute_task.inputs,
"outputs": plugin_execute_task.outputs,
"contexts": plugin_execute_task.contexts,
"runtime_attrs": plugin_execute_task.runtime_attrs,
"create_at": plugin_execute_task.created_at,
"finish_at": plugin_execute_task.finish_at,
}

@classmethod
def forced_fail(cls, task_id):
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
if plugin_execute_task.state != State.RUNNING:
raise PluginExecuteException(
"[forced_fail] error, the plugin_execute_task.state is not RUNNING, state={}".format(
plugin_execute_task.state
)
)
# 插件状态改成 FAILED, 在schdule会自动停止
plugin_execute_task.state = State.FAILED
plugin_execute_task.save()

@classmethod
def callback(cls, task_id: int, callback_data: dict = None):

if callback_data is None:
callback_data = {}

if not isinstance(callback_data, dict):
raise PluginExecuteException("[plugin_execute_callback] error, the callback must be dict")

plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
if plugin_execute_task.state != State.RUNNING:
raise PluginExecuteException(
"[callback] error, the plugin_execute_task.state is not RUNNING, state={}".format(
plugin_execute_task.state
)
)

def action():
# 需要加锁,防止流程处在回调的过程中
with get_schedule_lock(task_id) as locked:
if not locked:
raise PluginExecuteException(
"[plugin_execute_callback] error, it`s have callbacking task, please try"
)
plugin_execute_task.callback_data = callback_data
plugin_execute_task.save()
result = schedule.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE)
logger.info("[plugin_execute_callback] send callback task, celery task_id = {}".format(result.id))

_retry_once(action=action)

@classmethod
def retry_node(cls, task_id: int, inputs: dict = None, contexts: dict = None, runtime_attrs: dict = None):

plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
if plugin_execute_task.state != State.FAILED:
raise PluginExecuteException(
"[retry_node] error, the plugin_execute_task.state is not FAILED, state={}".format(
plugin_execute_task.state
)
)

if contexts and isinstance(contexts, dict):
plugin_execute_task.contexts = contexts
if inputs and isinstance(inputs, dict):
plugin_execute_task.inputs = inputs
if runtime_attrs and isinstance(runtime_attrs, dict):
plugin_execute_task.runtime_attrs = runtime_attrs

plugin_execute_task.state = State.READY
plugin_execute_task.inputs = inputs
plugin_execute_task.invoke_count += 1
# 清空输出和callback_data
plugin_execute_task.outputs = {}
plugin_execute_task.callback_data = {}
plugin_execute_task.save()

def action():
result = execute.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE)
logger.info("[plugin_execute_retry_node] send retry_node task, celery task_id = {}".format(result.id))

_retry_once(action=action)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 3.2.18 on 2023-11-24 03:20
import pipeline.contrib.fields
from django.db import migrations, models


class Migration(migrations.Migration):
initial = True

dependencies = []

operations = [
migrations.CreateModel(
name="PluginExecuteTask",
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("state", models.CharField(max_length=64, verbose_name="状态名")),
("invoke_count", models.IntegerField(default=1, verbose_name="invoke count")),
("component_code", models.CharField(db_index=True, max_length=255, verbose_name="组件编码")),
("version", models.CharField(default="legacy", max_length=255, verbose_name="插件版本")),
("inputs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="node inputs")),
("outputs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="node outputs")),
("callback_data", pipeline.contrib.fields.SerializerField(default={}, verbose_name="callback data")),
(
"contexts",
pipeline.contrib.fields.SerializerField(default={}, verbose_name="pipeline context values"),
),
("runtime_attrs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="runtime attr")),
("scheduling", models.BooleanField(db_index=True, default=False, verbose_name="是否正在调度")),
("created_at", models.DateTimeField(auto_now_add=True, verbose_name="create time")),
("finish_at", models.DateTimeField(null=True, verbose_name="finish time")),
],
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
Loading

0 comments on commit 3e385ba

Please sign in to comment.