diff --git a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py index 5f52c1a6..eda27c28 100644 --- a/runtime/bamboo-pipeline/pipeline/conf/default_settings.py +++ b/runtime/bamboo-pipeline/pipeline/conf/default_settings.py @@ -106,3 +106,4 @@ ENABLE_PIPELINE_EVENT_SIGNALS = getattr(settings, "ENABLE_PIPELINE_EVENT_SIGNALS", False) ROLLBACK_QUEUE = getattr(settings, "ROLLBACK_QUEUE", "rollback") +PLUGIN_EXECUTE_QUEUE = getattr(settings, "PLUGIN_EXECUTE_QUEUE", "plugin_execute") diff --git a/runtime/bamboo-pipeline/pipeline/contrib/exceptions.py b/runtime/bamboo-pipeline/pipeline/contrib/exceptions.py index 186ea958..a769bc4d 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/exceptions.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/exceptions.py @@ -20,3 +20,7 @@ class RollBackException(PipelineException): class UpdatePipelineContextException(PipelineException): pass + + +class PluginExecuteException(PipelineException): + pass diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py b/runtime/bamboo-pipeline/pipeline/contrib/fields.py similarity index 100% rename from runtime/bamboo-pipeline/pipeline/contrib/rollback/fields.py rename to runtime/bamboo-pipeline/pipeline/contrib/fields.py diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/__init__.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/api.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/api.py new file mode 100644 index 00000000..1a03fc63 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/api.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from pipeline.contrib.plugin_execute.handler import PluginExecuteHandler +from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result + + +@ensure_return_pipeline_contrib_api_result +def run(component_code: str, version: str, inputs: dict, contexts: dict, runtime_attr: dict = None): + task_id = PluginExecuteHandler.run(component_code, version, inputs, contexts, runtime_attr) + return task_id + + +@ensure_return_pipeline_contrib_api_result +def get_state(task_id: int): + return PluginExecuteHandler.get_state(task_id) + + +@ensure_return_pipeline_contrib_api_result +def callback(task_id: int, callback_data: dict = None): + PluginExecuteHandler.callback(task_id, callback_data) + + +@ensure_return_pipeline_contrib_api_result +def forced_fail(task_id): + PluginExecuteHandler.forced_fail(task_id) + + +@ensure_return_pipeline_contrib_api_result +def retry(task_id: int, inputs: dict = None, context: dict = None, runtime_attr: dict = None): + PluginExecuteHandler.retry_node(task_id, inputs, context, runtime_attr) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/apps.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/apps.py new file mode 100644 index 00000000..8bbefc32 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/apps.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from django.apps import AppConfig + + +class RollbackConfig(AppConfig): + name = "pipeline.contrib.plugin_execute" + verbose_name = "PipelinePluginExecute" + + def ready(self): + from pipeline.contrib.plugin_execute import tasks # noqa diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/contants.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/contants.py new file mode 100644 index 00000000..008c9397 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/contants.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + + +class State: + READY = "READY" + RUNNING = "RUNNING" + FINISHED = "FINISHED" + FAILED = "FAILED" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/handler.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/handler.py new file mode 100644 index 00000000..b784a220 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/handler.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging + +from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE +from pipeline.contrib.exceptions import PluginExecuteException +from pipeline.contrib.plugin_execute.contants import State +from pipeline.contrib.plugin_execute.models import PluginExecuteTask, get_schedule_lock +from pipeline.contrib.plugin_execute.tasks import execute, schedule + +logger = logging.getLogger("celery") + + +def _retry_once(action: callable): + try: + action() + except Exception: + try: + action() + except Exception as e: + raise e + + +class PluginExecuteHandler: + @classmethod + def run(cls, component_code: str, version: str, inputs: dict, contexts: dict, runtime_attrs: dict = None): + if runtime_attrs is None: + runtime_attrs = {} + + if not (isinstance(inputs, dict) and isinstance(contexts, dict) and isinstance(runtime_attrs, dict)): + raise PluginExecuteException("[plugin_execute_run] error, the inputs, contexts, runtime_attrs must be dict") + + plugin_execute_task = PluginExecuteTask.objects.create( + state=State.READY, + inputs=inputs, + version=version, + component_code=component_code, + contexts=contexts, + runtime_attrs=runtime_attrs, + ) + + def action(): + # 发送执行任务 + result = execute.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE) + logger.info( + "[plugin_execute_run] send execute task, celery task_id = {}, plugin_execute_task_id = {}".format( + result.id, plugin_execute_task.id + ) + ) + + try: + _retry_once(action=action) + except Exception as e: + # 如果任务启动出现异常,则删除任务 + plugin_execute_task.delete() + raise e + + return plugin_execute_task.id + + @classmethod + def get_state(cls, task_id): + """ + 获取任务状态 + @param task_id: + @return: + """ + # 直接抛出异常让上层去捕获 + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + return { + "task_id": plugin_execute_task.id, + "state": plugin_execute_task.state, + "component_code": plugin_execute_task.component_code, + "version": plugin_execute_task.version, + "invoke_count": plugin_execute_task.invoke_count, + "inputs": plugin_execute_task.inputs, + "outputs": plugin_execute_task.outputs, + "contexts": plugin_execute_task.contexts, + "runtime_attrs": plugin_execute_task.runtime_attrs, + "create_at": plugin_execute_task.created_at, + "finish_at": plugin_execute_task.finish_at, + } + + @classmethod + def forced_fail(cls, task_id): + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + if plugin_execute_task.state != State.RUNNING: + raise PluginExecuteException( + "[forced_fail] error, the plugin_execute_task.state is not RUNNING, state={}".format( + plugin_execute_task.state + ) + ) + # 插件状态改成 FAILED, 在schdule会自动停止 + plugin_execute_task.state = State.FAILED + plugin_execute_task.save() + + @classmethod + def callback(cls, task_id: int, callback_data: dict = None): + + if callback_data is None: + callback_data = {} + + if not isinstance(callback_data, dict): + raise PluginExecuteException("[plugin_execute_callback] error, the callback must be dict") + + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + if plugin_execute_task.state != State.RUNNING: + raise PluginExecuteException( + "[callback] error, the plugin_execute_task.state is not RUNNING, state={}".format( + plugin_execute_task.state + ) + ) + + def action(): + # 需要加锁,防止流程处在回调的过程中 + with get_schedule_lock(task_id) as locked: + if not locked: + raise PluginExecuteException( + "[plugin_execute_callback] error, it`s have callbacking task, please try" + ) + plugin_execute_task.callback_data = callback_data + plugin_execute_task.save() + result = schedule.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE) + logger.info("[plugin_execute_callback] send callback task, celery task_id = {}".format(result.id)) + + _retry_once(action=action) + + @classmethod + def retry_node(cls, task_id: int, inputs: dict = None, contexts: dict = None, runtime_attrs: dict = None): + + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + if plugin_execute_task.state != State.FAILED: + raise PluginExecuteException( + "[retry_node] error, the plugin_execute_task.state is not FAILED, state={}".format( + plugin_execute_task.state + ) + ) + + if contexts and isinstance(contexts, dict): + plugin_execute_task.contexts = contexts + if inputs and isinstance(inputs, dict): + plugin_execute_task.inputs = inputs + if runtime_attrs and isinstance(runtime_attrs, dict): + plugin_execute_task.runtime_attrs = runtime_attrs + + plugin_execute_task.state = State.READY + plugin_execute_task.inputs = inputs + plugin_execute_task.invoke_count += 1 + # 清空输出和callback_data + plugin_execute_task.outputs = {} + plugin_execute_task.callback_data = {} + plugin_execute_task.save() + + def action(): + result = execute.apply_async(kwargs={"task_id": plugin_execute_task.id}, queue=PLUGIN_EXECUTE_QUEUE) + logger.info("[plugin_execute_retry_node] send retry_node task, celery task_id = {}".format(result.id)) + + _retry_once(action=action) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/0001_initial.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/0001_initial.py new file mode 100644 index 00000000..7fd1dde3 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/0001_initial.py @@ -0,0 +1,33 @@ +# Generated by Django 3.2.18 on 2023-11-24 03:20 +import pipeline.contrib.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="PluginExecuteTask", + fields=[ + ("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")), + ("state", models.CharField(max_length=64, verbose_name="状态名")), + ("invoke_count", models.IntegerField(default=1, verbose_name="invoke count")), + ("component_code", models.CharField(db_index=True, max_length=255, verbose_name="组件编码")), + ("version", models.CharField(default="legacy", max_length=255, verbose_name="插件版本")), + ("inputs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="node inputs")), + ("outputs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="node outputs")), + ("callback_data", pipeline.contrib.fields.SerializerField(default={}, verbose_name="callback data")), + ( + "contexts", + pipeline.contrib.fields.SerializerField(default={}, verbose_name="pipeline context values"), + ), + ("runtime_attrs", pipeline.contrib.fields.SerializerField(default={}, verbose_name="runtime attr")), + ("scheduling", models.BooleanField(db_index=True, default=False, verbose_name="是否正在调度")), + ("created_at", models.DateTimeField(auto_now_add=True, verbose_name="create time")), + ("finish_at", models.DateTimeField(null=True, verbose_name="finish time")), + ], + ), + ] diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/__init__.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/migrations/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/models.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/models.py new file mode 100644 index 00000000..ccd873ed --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/models.py @@ -0,0 +1,78 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from django.db import models +from django.utils.translation import ugettext_lazy as _ +from pipeline.contrib.fields import SerializerField + + +class ScheduleManger(models.Manager): + def apply_schedule_lock(self, task_id: int) -> bool: + """ + 获取 Schedule 对象的调度锁,返回是否成功获取锁 + + :return: True or False + """ + return self.filter(id=task_id, scheduling=False).update(scheduling=True) == 1 + + def release_schedule_lock(self, task_id: int) -> None: + """ + 释放指定 Schedule 的调度锁 + :return: + """ + self.filter(id=task_id, scheduling=True).update(scheduling=False) + + +class ScheduleLock(object): + def __init__(self, task_id: int): + self.task_id = task_id + self.locked = False + + def __enter__(self): + self.locked = PluginExecuteTask.objects.apply_schedule_lock(self.task_id) + + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.locked: + PluginExecuteTask.objects.release_schedule_lock(self.task_id) + + +def get_schedule_lock(task_id: int) -> ScheduleLock: + """ + 获取 schedule lock 的 context 对象 + :param task_id: + :return: + """ + return ScheduleLock(task_id) + + +class PluginExecuteTask(models.Model): + """ + 回滚配置token信息 + """ + + state = models.CharField(_("状态名"), null=False, max_length=64) + invoke_count = models.IntegerField("invoke count", default=1) + component_code = models.CharField(_("组件编码"), max_length=255, db_index=True) + version = models.CharField(_("插件版本"), max_length=255, default="legacy") + inputs = SerializerField(verbose_name=_("node inputs"), default={}) + outputs = SerializerField(verbose_name=_("node outputs"), default={}) + callback_data = SerializerField(verbose_name=_("callback data"), default={}) + contexts = SerializerField(verbose_name=_("pipeline context values"), default={}) + runtime_attrs = SerializerField(verbose_name=_("runtime attr"), default={}) + scheduling = models.BooleanField("是否正在调度", default=False, db_index=True) + created_at = models.DateTimeField("create time", auto_now_add=True) + finish_at = models.DateTimeField("finish time", null=True) + + objects = ScheduleManger() diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py new file mode 100644 index 00000000..f54df039 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +import logging +import traceback + +from celery import task +from django.utils import timezone +from pipeline.component_framework.library import ComponentLibrary +from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE +from pipeline.contrib.plugin_execute.contants import State +from pipeline.contrib.plugin_execute.models import PluginExecuteTask +from pipeline.core.data.base import DataObject + +logger = logging.getLogger("celery") + + +@task +def execute(task_id): + try: + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + except PluginExecuteTask.DoesNotExist: + logger.exception("[plugin_execute] execute error, task not exist, task_id={}".format(task_id)) + return + + # 更新插件的状态 + plugin_execute_task.state = State.RUNNING + plugin_execute_task.save(update_fields=["state"]) + + # 封装data + data = DataObject(inputs=plugin_execute_task.inputs, outputs={}) + parent_data = DataObject(inputs=plugin_execute_task.contexts, outputs={}) + + try: + # 获取 component + comp_cls = ComponentLibrary.get_component_class(plugin_execute_task.component_code, plugin_execute_task.version) + # 获取service + service = comp_cls.bound_service(name=plugin_execute_task.runtime_attrs.get("name", None)) + + # 封装运行时 + service.setup_runtime_attrs(**plugin_execute_task.runtime_attrs) + execute_success = service.execute(data, parent_data) + plugin_execute_task.outputs = data.outputs + plugin_execute_task.save() + except Exception as e: + # 处理异常情况 + ex_data = traceback.format_exc() + data.outputs.ex_data = ex_data + logger.exception("[plugin_execute] plugin execute failed, err={}".format(e)) + plugin_execute_task.outputs = data.outputs + plugin_execute_task.state = State.FAILED + plugin_execute_task.save() + return + + # 单纯的执行失败, 更新状态和输出信息 + if not execute_success: + plugin_execute_task.state = State.FAILED + plugin_execute_task.save() + return + + # 执行成功, 需要判断是否需要调度 + need_schedule = service.need_schedule() + if not need_schedule: + plugin_execute_task.state = State.FINISHED + plugin_execute_task.finish_at = timezone.now() + plugin_execute_task.save() + return + + # 需要调度,则调度自身 + if service.interval: + schedule.apply_async(kwargs={"task_id": task_id}, queue=PLUGIN_EXECUTE_QUEUE, countdown=service.interval.next()) + + +@task +def schedule(task_id): + try: + plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) + except PluginExecuteTask.DoesNotExist: + logger.exception("[plugin_execute] schedule error, task not exist, task_id={}".format(task_id)) + return + + # 只有处于运行状态的节点才允许被调度 + if plugin_execute_task.state != State.RUNNING: + logger.exception("[plugin_execute] schedule error, task not exist, task_id={}".format(task_id)) + return + + data = DataObject(inputs=plugin_execute_task.inputs, outputs=plugin_execute_task.outputs) + parent_data = DataObject(inputs=plugin_execute_task.contexts, outputs={}) + + try: + comp_cls = ComponentLibrary.get_component_class(plugin_execute_task.component_code, plugin_execute_task.version) + # 获取service + service = comp_cls.bound_service(name=plugin_execute_task.runtime_attrs.get("name", None)) + # 封装运行时 + service.setup_runtime_attrs(**plugin_execute_task.runtime_attrs) + schedule_success = service.schedule( + data=data, parent_data=parent_data, callback_data=plugin_execute_task.callback_data + ) + plugin_execute_task.outputs = data.outputs + plugin_execute_task.save() + except Exception as e: + # 处理异常情况 + ex_data = traceback.format_exc() + data.outputs.ex_data = ex_data + logger.exception("[plugin_execute] plugin execute failed, err={}".format(e)) + plugin_execute_task.outputs = data.outputs + plugin_execute_task.state = State.FAILED + plugin_execute_task.save() + return + + if not schedule_success: + plugin_execute_task.state = State.FAILED + plugin_execute_task.save() + return + + if service.is_schedule_finished(): + plugin_execute_task.state = State.FINISHED + plugin_execute_task.finish_at = timezone.now() + plugin_execute_task.save() + return + + # 还需要下一次的调度 + # 需要调度,则调度自身 + if service.interval: + schedule.apply_async(kwargs={"task_id": task_id}, queue=PLUGIN_EXECUTE_QUEUE, countdown=service.interval.next()) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py index ae217a0f..26b00ac0 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/migrations/0002_auto_20231020_1234.py @@ -1,6 +1,6 @@ # Generated by Django 3.2.18 on 2023-10-20 12:34 -import pipeline.contrib.rollback.fields +import pipeline.contrib.fields from django.db import migrations @@ -14,16 +14,16 @@ class Migration(migrations.Migration): migrations.AlterField( model_name="rollbacknodesnapshot", name="context_values", - field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="pipeline context values"), + field=pipeline.contrib.fields.SerializerField(verbose_name="pipeline context values"), ), migrations.AlterField( model_name="rollbacknodesnapshot", name="inputs", - field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node inputs"), + field=pipeline.contrib.fields.SerializerField(verbose_name="node inputs"), ), migrations.AlterField( model_name="rollbacknodesnapshot", name="outputs", - field=pipeline.contrib.rollback.fields.SerializerField(verbose_name="node outputs"), + field=pipeline.contrib.fields.SerializerField(verbose_name="node outputs"), ), ] diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py index ab452cfc..cefac0b9 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/models.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- from django.db import models from django.utils.translation import ugettext_lazy as _ +from pipeline.contrib.fields import SerializerField from pipeline.contrib.rollback.constants import TOKEN -from pipeline.contrib.rollback.fields import SerializerField class RollbackToken(models.Model): diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_plugin_execute.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_plugin_execute.py new file mode 100644 index 00000000..2ada1162 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_plugin_execute.py @@ -0,0 +1,117 @@ +# # -*- coding: utf-8 -*- +# """ +# Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +# Edition) available. +# Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +# Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://opensource.org/licenses/MIT +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# """ + +from unittest import TestCase + +from mock.mock import MagicMock +from pipeline.contrib.plugin_execute import api +from pipeline.contrib.plugin_execute.models import PluginExecuteTask +from pipeline.contrib.plugin_execute.tasks import execute, schedule +from pipeline.tests import mock + +mock_execute = MagicMock() +mock_execute.apply_async = MagicMock(return_value=True) + +mock_schedule = MagicMock() +mock_schedule.apply_async = MagicMock(return_value=True) + + +class TestPluginExecuteBase(TestCase): + @mock.patch("pipeline.contrib.plugin_execute.handler.execute", MagicMock(return_value=mock_execute)) + def test_run(self): + task_id = api.run("debug_callback_node", "legacy", {"hello": "world"}, {"hello": "world"}).data + task = PluginExecuteTask.objects.get(id=task_id) + self.assertEqual(task.state, "READY") + self.assertDictEqual(task.callback_data, {}) + self.assertDictEqual(task.contexts, {"hello": "world"}) + self.assertDictEqual(task.inputs, {"hello": "world"}) + + def test_get_state(self): + task_id = api.run("debug_callback_node", "legacy", {"hello": "world"}, {"hello": "world"}).data + state = api.get_state(task_id).data + + self.assertEqual(state["state"], "READY") + self.assertDictEqual(state["callback_data"], {}) + self.assertDictEqual(state["inputs"], {"hello": "world"}) + self.assertDictEqual(state["contexts"], {"hello": "world"}) + + @mock.patch("pipeline.contrib.plugin_execute.handler.execute", MagicMock(return_value=mock_execute)) + def test_retry(self): + task_id = api.run("debug_callback_node", "legacy", {"hello": "world"}, {"hello": "world"}).data + task = PluginExecuteTask.objects.get(id=task_id) + result = api.retry(task_id, {}) + + self.assertFalse(result.result) + + task.state = "FAILED" + task.save() + + result = api.retry(task_id, {"hello": "tim"}, {"hello": "jav"}) + self.assertEqual(result.result, True) + + task.refresh_from_db() + + self.assertEqual(task.state, "READY") + self.assertDictEqual(task.inputs, {"hello": "tim"}) + self.assertDictEqual(task.contexts, {"hello": "jav"}) + + @mock.patch("pipeline.contrib.plugin_execute.handler.schedule", MagicMock(return_value=mock_schedule)) + def test_callback(self): + task_id = api.run("debug_callback_node", "legacy", {"hello": "world"}, {"hello": "world"}).data + task = PluginExecuteTask.objects.get(id=task_id) + result = api.retry(task_id, {}) + + self.assertFalse(result.result) + + task.state = "RUNNING" + task.save() + + result = api.callback(task_id, {"hello": "sandri"}) + self.assertEqual(result.result, True) + + task.refresh_from_db() + self.assertDictEqual(task.callback_data, {"hello": "sandri"}) + + def test_force_fail(self): + task_id = api.run("debug_callback_node", "legacy", {"hello": "world"}, {"hello": "world"}).data + task = PluginExecuteTask.objects.get(id=task_id) + result = api.forced_fail(task_id) + + self.assertFalse(result.result) + + task.state = "RUNNING" + task.save() + + result = api.forced_fail(task_id) + self.assertEqual(result.result, True) + + task.refresh_from_db() + self.assertEqual(task.state, "FAILED") + + def test_execute_task(self): + task_id = api.run("interrupt_dummy_exec_node", "legacy", {"time": 1}, {}).data + execute(task_id) + task = PluginExecuteTask.objects.get(id=task_id) + self.assertEqual(task.state, "FINISHED") + self.assertDictEqual(task.outputs, {"execute_count": 1}) + + def test_schedule_task(self): + task_id = api.run("debug_callback_node", "legacy", {}, {}).data + task = PluginExecuteTask.objects.get(id=task_id) + task.callback_data = {"bit": 1} + task.save() + + execute(task_id) + schedule(task_id) + task.refresh_from_db() + self.assertEqual(task.state, "FINISHED") diff --git a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/__init__.py b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/test_plugin_execute.py b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/test_plugin_execute.py new file mode 100644 index 00000000..28ad1ef1 --- /dev/null +++ b/runtime/bamboo-pipeline/test/eri_imp_test_use/tests/plugin_execute/test_plugin_execute.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +import time + +from pipeline.contrib.plugin_execute import api + + +def test_run_plugin_no_schedule(): + # 测试execute的情况 + task_id = api.run("debug_no_schedule_node", "legacy", {}, {}).data + state = api.get_state(task_id).data + assert state["state"] == "READY" + time.sleep(2) + state = api.get_state(task_id).data + assert state["state"] == "FINISHED" + + +def test_run_plugin_with_schedule(): + # 测试schedule的情况 + task_id = api.run("schedule_node", "legacy", {}, {}).data + state = api.get_state(task_id).data + assert state["state"] == "READY" + time.sleep(20) + state = api.get_state(task_id).data + assert state["state"] == "FINISHED" + assert state["count"] == 5 + + +def test_run_plugin_with_callback(): + # 测试callback的情况 + task_id = api.run("hook_callback_node", "legacy", {}, {}).data + state = api.get_state(task_id).data + assert state["state"] == "READY" + time.sleep(5) + state = api.get_state(task_id).data + assert state["state"] == "RUNNING" + + api.callback(task_id, {"bit": 0}) + time.sleep(10) + + state = api.get_state(task_id).data + assert state["state"] == "FAILED" + + api.retry(task_id, inputs={}) + time.sleep(5) + state = api.get_state(task_id).data + assert state["state"] == "RUNNING" + + api.callback(task_id, {"bit": 1}) + time.sleep(5) + state = api.get_state(task_id).data + assert state["state"] == "RUNNING" + + +def test_run_plugin_with_callback_success(): + task_id = api.run("debug_callback_node", "legacy", {}, {}).data + state = api.get_state(task_id).data + + assert state["state"] == "READY" + time.sleep(5) + state = api.get_state(task_id).data + assert state["state"] == "RUNNING" + + api.callback(task_id, {"bit": 1}) + time.sleep(10) + + state = api.get_state(task_id).data + assert state["state"] == "FINISHED" + + +def test_run_plugin_with_force_fail(): + task_id = api.run("debug_callback_node", "legacy", {}, {}).data + state = api.get_state(task_id).data + + assert state["state"] == "READY" + time.sleep(5) + state = api.get_state(task_id).data + assert state["state"] == "RUNNING" + + api.forced_fail(task_id) + time.sleep(3) + + state = api.get_state(task_id).data + assert state["state"] == "FAILED" diff --git a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py index 3288539e..3d7e8ed3 100755 --- a/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py +++ b/runtime/bamboo-pipeline/test/pipeline_sdk_use/settings.py @@ -21,7 +21,6 @@ CELERY_QUEUES.extend(queues.CELERY_QUEUES) # noqa CELERY_QUEUES.extend(queues.QueueResolver("api").queues()) # noqa - step.PromServerStep.port = 8002 app = Celery("proj") app.config_from_object("django.conf:settings") @@ -55,13 +54,14 @@ "pipeline", "pipeline.log", "pipeline.engine", - "pipeline.contrib.rollback", "pipeline.contrib.node_timer_event", "pipeline.component_framework", "pipeline.variable_framework", "pipeline.django_signal_valve", "pipeline.contrib.periodic_task", "pipeline.contrib.node_timeout", + "pipeline.contrib.rollback", + "pipeline.contrib.plugin_execute", "django_celery_beat", "pipeline_test_use", "variable_app", @@ -156,7 +156,6 @@ STATIC_URL = "/static/" - ENABLE_EXAMPLE_COMPONENTS = True BROKER_VHOST = "/" @@ -181,3 +180,6 @@ # } # } # ] + + +PLUGIN_EXECUTE_QUEUE = "default" diff --git a/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py b/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py index e5fdba4e..b2ab9dc2 100755 --- a/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py +++ b/runtime/bamboo-pipeline/test/pipeline_test_use/components/collections/atom.py @@ -18,8 +18,9 @@ class HookMixin: __need_run_hook__ = True def recorder(self, hook: HookType, data, parent_data, callback_data=None): - self.logger.info("hook_debug_node({}) id: {}".format(hook.value, self.id)) - self.logger.info("hook_debug_node({}) root_pipeline_id: {}".format(hook.value, self.root_pipeline_id)) + if hasattr(hook.value, "id"): + self.logger.info("hook_debug_node({}) id: {}".format(hook.value, self.id)) + self.logger.info("hook_debug_node({}) root_pipeline_id: {}".format(hook.value, self.root_pipeline_id)) logger.info("hook_debug_node hook(%s) data %s ", hook.value, pprint.pformat(data.inputs)) logger.info("hook_debug_node hook(%s) parent data %s ", hook.value, pprint.pformat(parent_data.inputs)) logger.info("hook_debug_node hook(%s) output data %s ", hook.value, pprint.pformat(data.outputs)) @@ -441,6 +442,32 @@ class CallbackComponent(Component): form = "index.html" +class DebugCallbackService(Service): + __need_schedule__ = True + interval = None + + def execute(self, data, parent_data): + return True + + def schedule(self, data, parent_data, callback_data=None): + if callback_data: + if int(callback_data.get("bit", 1)) == 1: + self.finish_schedule() + return True + + return False + + def outputs_format(self): + return [] + + +class DebugCallbackComponent(Component): + name = "callback component" + code = "debug_callback_node" + bound_service = DebugCallbackService + form = "index.html" + + class HookCallbackService(HookMixin, CallbackService): pass @@ -469,7 +496,6 @@ def schedule(self, data, parent_data, callback_data=None): logger.info("[{}]: callback_data={}".format(_scheduled_times, callback_data)) if callback_data: if int(callback_data.get("bit", 0)) == 0: - print("hahacai") return False _scheduled_times += 1 @@ -646,7 +672,7 @@ def outputs_format(self): return [] -class InterruptScheduleComponent(Component): +class InterruptRaiseScheduleComponent(Component): name = "debug 组件" code = "interrupt_raise_test" bound_service = InterruptRaiseService