From 88e426fa32e5495780dbc8b7476f212a883e8bb9 Mon Sep 17 00:00:00 2001 From: normal-wls Date: Tue, 16 Apr 2024 19:36:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20celery=205=20&=20redis=20=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E5=8C=85=E5=8D=87=E7=BA=A7=E9=80=82=E9=85=8D=20(#230)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: pipeline dependencies version update (#227) * feat: pipeline dependencies version update * minor: flake8 fix * minor: unit test fix * Update pr_check.yml * minor: git ci fix * minor: git ci fix --------- Co-authored-by: NULL * feat: celery 4 & 5 异步任务用法兼容 * feat: pipeline release 3.29.3 * fix: fix engine unittest coverage ci --------- Co-authored-by: NULL --- .github/workflows/engine_unittest.yml | 5 +- runtime/bamboo-pipeline/pipeline/__init__.py | 2 +- runtime/bamboo-pipeline/pipeline/apps.py | 5 +- .../pipeline/contrib/celery_tools/__init__.py | 12 + .../pipeline/contrib/celery_tools/periodic.py | 47 +++ .../pipeline/contrib/node_timeout/tasks.py | 6 +- .../contrib/node_timer_event/tasks.py | 6 +- .../pipeline/contrib/periodic_task/tasks.py | 35 ++- .../pipeline/contrib/plugin_execute/tasks.py | 6 +- .../pipeline/contrib/rollback/tasks.py | 6 +- .../pipeline/contrib/statistics/tasks.py | 6 +- .../pipeline/engine/models/core.py | 94 ++++-- .../bamboo-pipeline/pipeline/engine/tasks.py | 25 +- .../pipeline/eri/celery/tasks.py | 8 +- runtime/bamboo-pipeline/pipeline/log/tasks.py | 2 +- .../celery_version_adapter/__init__.py | 12 + .../celery_version_adapter/celery4.py | 44 +++ .../celery_version_adapter/celery5.py | 29 ++ .../celery_version_adapter/celerybeat4.py | 43 +++ .../celery_version_adapter/celerybeat5.py | 29 ++ .../pipeline/management/commands/celery.py | 35 +-- .../management/commands/celerybeat.py | 46 ++- runtime/bamboo-pipeline/poetry.lock | 276 +++++++++++++----- runtime/bamboo-pipeline/pyproject.toml | 25 +- .../test/eri_chaos/celery_tasks.py | 6 +- 25 files changed, 608 insertions(+), 202 deletions(-) create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py create mode 100644 runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py create mode 100644 runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py create mode 100644 runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py create mode 100644 runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py create mode 100644 runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py create mode 100644 runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py diff --git a/.github/workflows/engine_unittest.yml b/.github/workflows/engine_unittest.yml index 14ad9243..b9111181 100644 --- a/.github/workflows/engine_unittest.yml +++ b/.github/workflows/engine_unittest.yml @@ -34,6 +34,7 @@ jobs: run: python -m coverage xml - name: Upload coverage to Codecov - uses: codecov/codecov-action@v1 + uses: codecov/codecov-action@v4 with: - fail_ci_if_error: true + token: ${{ secrets.CODECOV_TOKEN }} + diff --git a/runtime/bamboo-pipeline/pipeline/__init__.py b/runtime/bamboo-pipeline/pipeline/__init__.py index 0db56ada..196e0695 100644 --- a/runtime/bamboo-pipeline/pipeline/__init__.py +++ b/runtime/bamboo-pipeline/pipeline/__init__.py @@ -13,4 +13,4 @@ default_app_config = "pipeline.apps.PipelineConfig" -__version__ = "3.29.2" +__version__ = "3.29.3" diff --git a/runtime/bamboo-pipeline/pipeline/apps.py b/runtime/bamboo-pipeline/pipeline/apps.py index f1f2f831..8e8eb166 100644 --- a/runtime/bamboo-pipeline/pipeline/apps.py +++ b/runtime/bamboo-pipeline/pipeline/apps.py @@ -19,7 +19,10 @@ from django.apps import AppConfig from django.conf import settings from redis.sentinel import Sentinel -from rediscluster import RedisCluster +try: + from redis.cluster import RedisCluster +except ImportError: + from rediscluster import RedisCluster logger = logging.getLogger("root") diff --git a/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py new file mode 100644 index 00000000..ce5d64f7 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from celery import Task, current_app +from celery.schedules import maybe_schedule + + +class PipelinePeriodicTask(Task): + """A task that adds itself to the :setting:`beat_schedule` setting.""" + + abstract = True + ignore_result = True + relative = False + options = None + compat = True + + def __init__(self): + if not hasattr(self, 'run_every'): + raise NotImplementedError( + 'Periodic tasks must have a run_every attribute') + self.run_every = maybe_schedule(self.run_every, self.relative) + super(PipelinePeriodicTask, self).__init__() + + @classmethod + def on_bound(cls, app): + app.conf.beat_schedule[cls.name] = { + 'task': cls.name, + 'schedule': cls.run_every, + 'args': (), + 'kwargs': {}, + 'options': cls.options or {}, + 'relative': cls.relative, + } + + +def periodic_task(*args, **options): + """Deprecated decorator, please use :setting:`beat_schedule`.""" + return current_app.task(**dict({'base': PipelinePeriodicTask}, **options)) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py index 7e63f6ab..b6d297db 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timeout/tasks.py @@ -13,7 +13,7 @@ import json import logging -from celery import task +from celery import current_app from pipeline.contrib.node_timeout.settings import node_timeout_settings from pipeline.eri.models import State, Process @@ -24,7 +24,7 @@ logger = logging.getLogger("celery") -@task(acks_late=True) +@current_app.task(acks_late=True) def dispatch_timeout_nodes(record_id: int): record = TimeoutNodesRecord.objects.get(id=record_id) nodes = json.loads(record.timeout_nodes) @@ -40,7 +40,7 @@ def dispatch_timeout_nodes(record_id: int): ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute_node_timeout_strategy(node_id, version): timeout_config = TimeoutNodeConfig.objects.filter(node_id=node_id).only("root_pipeline_id", "action").first() if timeout_config is None: diff --git a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py index 44970a5a..fd904c55 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/node_timer_event/tasks.py @@ -14,7 +14,7 @@ import logging from typing import Any, Dict, List, Type, Union -from celery import task +from celery import current_app from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter from pipeline.contrib.node_timer_event.handlers import ActionManager from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord @@ -24,7 +24,7 @@ logger = logging.getLogger("celery") -@task(acks_late=True) +@current_app.task(acks_late=True) def dispatch_expired_nodes(record_id: int): record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id) node_keys: List[str] = json.loads(record.nodes) @@ -62,7 +62,7 @@ def dispatch_expired_nodes(record_id: int): logger.info("[dispatch_expired_nodes] record deleted: record -> %s", record_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute_node_timer_event_action(node_id: str, version: str, index: int): adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class diff --git a/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py index 90da26e2..76d0d366 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py @@ -16,7 +16,7 @@ import traceback import pytz -from celery import task +from celery import current_app from django.utils import timezone from django.utils.module_loading import import_string from bamboo_engine import api as bamboo_engine_api @@ -35,7 +35,7 @@ logger = logging.getLogger("celery") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def periodic_task_start(*args, **kwargs): try: periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"]) @@ -69,19 +69,28 @@ def periodic_task_start(*args, **kwargs): ) result = instance.start( - periodic_task.creator, check_workers=False, priority=periodic_task.priority, queue=periodic_task.queue, + periodic_task.creator, + check_workers=False, + priority=periodic_task.priority, + queue=periodic_task.queue, ) except Exception: et = traceback.format_exc() logger.error(et) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=et, + start_success=False, ) return if not result.result: PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=result.message, + start_success=False, ) return @@ -93,7 +102,7 @@ def periodic_task_start(*args, **kwargs): PeriodicTaskHistory.objects.record_schedule(periodic_task=periodic_task, pipeline_instance=instance, ex_data="") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def bamboo_engine_periodic_task_start(*args, **kwargs): try: periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"]) @@ -147,16 +156,24 @@ def bamboo_engine_periodic_task_start(*args, **kwargs): et = traceback.format_exc() logger.error(et) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=et, + start_success=False, ) return if not result.result: PipelineInstance.objects.filter(id=instance.instance_id).update( - start_time=None, is_started=False, executor="", + start_time=None, + is_started=False, + executor="", ) PeriodicTaskHistory.objects.record_schedule( - periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False, + periodic_task=periodic_task, + pipeline_instance=None, + ex_data=result.message, + start_success=False, ) return diff --git a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py index b1898431..eccf1908 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/plugin_execute/tasks.py @@ -14,7 +14,7 @@ import logging import traceback -from celery import task +from celery import current_app from django.utils import timezone from pipeline.component_framework.library import ComponentLibrary from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE @@ -25,7 +25,7 @@ logger = logging.getLogger("celery") -@task +@current_app.task def execute(task_id): try: plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) @@ -89,7 +89,7 @@ def execute(task_id): ) -@task +@current_app.task def schedule(task_id): try: plugin_execute_task = PluginExecuteTask.objects.get(id=task_id) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py index 259df0cb..fde31d89 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py @@ -2,7 +2,7 @@ import json import logging -from celery import task +from celery import current_app from django.conf import settings from django.db import transaction from pipeline.conf.default_settings import ROLLBACK_QUEUE @@ -287,7 +287,7 @@ def rollback(self): raise e -@task +@current_app.task def token_rollback(snapshot_id, node_id, retry=False, retry_data=None): """ snapshot_id 本次回滚的快照id @@ -296,6 +296,6 @@ def token_rollback(snapshot_id, node_id, retry=False, retry_data=None): TokenRollbackTaskHandler(snapshot_id=snapshot_id, node_id=node_id, retry=retry, retry_data=retry_data).rollback() -@task +@current_app.task def any_rollback(snapshot_id): AnyRollbackHandler(snapshot_id=snapshot_id).rollback() diff --git a/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py b/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py index a18ab110..f6e921f1 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py @@ -16,7 +16,7 @@ import ujson as json from copy import deepcopy -from celery import task +from celery import current_app from bamboo_engine import api as bamboo_engine_api from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION @@ -112,7 +112,7 @@ def recursive_collect_components(activities, status_tree, instance_id, stack=Non return component_list -@task +@current_app.task def pipeline_post_save_statistics_task(instance_id): instance = PipelineInstance.objects.get(instance_id=instance_id) # 统计流程标准插件个数,子流程个数,网关个数 @@ -134,7 +134,7 @@ def pipeline_post_save_statistics_task(instance_id): ) -@task +@current_app.task def pipeline_archive_statistics_task(instance_id): instance = PipelineInstance.objects.get(instance_id=instance_id) engine_ver = 1 diff --git a/runtime/bamboo-pipeline/pipeline/engine/models/core.py b/runtime/bamboo-pipeline/pipeline/engine/models/core.py index aaa7551b..6b621559 100644 --- a/runtime/bamboo-pipeline/pipeline/engine/models/core.py +++ b/runtime/bamboo-pipeline/pipeline/engine/models/core.py @@ -17,7 +17,12 @@ import traceback from celery import current_app -from celery.task.control import revoke + +try: + from celery.task.control import revoke +except ModuleNotFoundError: + revoke = current_app.control.revoke + from django.db import models, transaction from django.utils import timezone from django.utils.translation import ugettext_lazy as _ @@ -89,10 +94,16 @@ def prepare_for_pipeline(self, pipeline): """ # init runtime info snapshot = ProcessSnapshot.objects.create_snapshot( - pipeline_stack=utils.Stack(), children=[], root_pipeline=pipeline, subprocess_stack=utils.Stack(), + pipeline_stack=utils.Stack(), + children=[], + root_pipeline=pipeline, + subprocess_stack=utils.Stack(), ) process = self.create( - id=node_uniqid(), root_pipeline_id=pipeline.id, current_node_id=pipeline.start_event.id, snapshot=snapshot, + id=node_uniqid(), + root_pipeline_id=pipeline.id, + current_node_id=pipeline.start_event.id, + snapshot=snapshot, ) process.push_pipeline(pipeline) process.save() @@ -362,7 +373,9 @@ def adjust_status(self, adjust_scope=None): if node_state in {states.FAILED, states.SUSPENDED}: # if current node failed or suspended Status.objects.batch_transit( - id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=states.BLOCKED, + from_state=states.RUNNING, ) Status.objects.transit(self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True) elif states.SUSPENDED in set(subproc_states): @@ -372,7 +385,9 @@ def adjust_status(self, adjust_scope=None): elif pipeline_state == states.SUSPENDED: # if root pipeline suspended Status.objects.batch_transit( - id_list=self.subprocess_stack, state=pipeline_state, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=pipeline_state, + from_state=states.RUNNING, ) def wake_up(self): @@ -475,10 +490,14 @@ def destroy_and_wake_up_parent(self, destination_id): else: if parent.blocked_by_failure_or_suspended(): Status.objects.batch_transit( - id_list=self.subprocess_stack, state=states.BLOCKED, from_state=states.RUNNING, + id_list=self.subprocess_stack, + state=states.BLOCKED, + from_state=states.RUNNING, ) Status.objects.transit( - id=self.root_pipeline.id, to_state=states.BLOCKED, is_pipeline=True, + id=self.root_pipeline.id, + to_state=states.BLOCKED, + is_pipeline=True, ) parent.save(save_snapshot=False) @@ -540,7 +559,8 @@ def exit_gracefully(self, e): if not result.result: logger.error( "process({process_id}) exit_gracefully status transit failed, current_node :{node_id}".format( - process_id=self.id, node_id=current_node.id if current_node else self.current_node_id, + process_id=self.id, + node_id=current_node.id if current_node else self.current_node_id, ) ) self.sleep(adjust_status=True) @@ -615,7 +635,9 @@ def build_relationship(self, ancestor_id, descendant_id): relationships = [NodeRelationship(ancestor_id=descendant_id, descendant_id=descendant_id, distance=0)] for ancestor in ancestors: rel = NodeRelationship( - ancestor_id=ancestor.ancestor_id, descendant_id=descendant_id, distance=ancestor.distance + 1, + ancestor_id=ancestor.ancestor_id, + descendant_id=descendant_id, + distance=ancestor.distance + 1, ) relationships.append(rel) self.bulk_create(relationships) @@ -630,12 +652,26 @@ class NodeRelationship(models.Model): objects = RelationshipManager() def __unicode__(self): - return str("#{} -({})-> #{}".format(self.ancestor_id, self.distance, self.descendant_id,)) + return str( + "#{} -({})-> #{}".format( + self.ancestor_id, + self.distance, + self.descendant_id, + ) + ) class StatusManager(models.Manager): def transit( - self, id, to_state, is_pipeline=False, appoint=False, start=False, name="", version=None, unchanged_pass=False, + self, + id, + to_state, + is_pipeline=False, + appoint=False, + start=False, + name="", + version=None, + unchanged_pass=False, ): """ 尝试改变某个节点的状态 @@ -679,7 +715,10 @@ def transit( return ActionResult(result=True, message="success", extra=status) if states.can_transit( - from_state=status.state, to_state=to_state, is_pipeline=is_pipeline, appoint=appoint, + from_state=status.state, + to_state=to_state, + is_pipeline=is_pipeline, + appoint=appoint, ): # 在冻结状态下不能改变 pipeline 的状态 @@ -688,11 +727,17 @@ def transit( if subprocess_rel: process = PipelineProcess.objects.get(id=subprocess_rel[0].process_id) if process.is_frozen: - return ActionResult(result=False, message="engine is frozen, can not perform operation",) + return ActionResult( + result=False, + message="engine is frozen, can not perform operation", + ) processes = PipelineProcess.objects.filter(root_pipeline_id=id) if processes and processes[0].is_frozen: - return ActionResult(result=False, message="engine is frozen, can not perform operation",) + return ActionResult( + result=False, + message="engine is frozen, can not perform operation", + ) if name: status.name = name @@ -772,7 +817,9 @@ def prepare_for_pipeline(self, pipeline): cls_str = str(pipeline.__class__) cls_name = pipeline.__class__.__name__[:NAME_MAX_LENGTH] self.create( - id=pipeline.id, state=states.READY, name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name, + id=pipeline.id, + state=states.READY, + name=cls_str if len(cls_str) <= NAME_MAX_LENGTH else cls_name, ) def fail(self, node, ex_data): @@ -1287,7 +1334,11 @@ def record(self, name, kwargs, type, extra_kwargs, exec_trace): save_kwargs = json.dumps(save_kwargs) return self.create( - name=name, kwargs=save_kwargs, type=type, extra_kwargs=save_extra_kwargs, exec_trace=exec_trace, + name=name, + kwargs=save_kwargs, + type=type, + extra_kwargs=save_extra_kwargs, + exec_trace=exec_trace, ) def resend(self, id): @@ -1341,7 +1392,10 @@ def resend(self): ) elif self.type == self.TASK_TYPE_NODE: NodeCeleryTask.objects.start_task( - node_id=self.extra_kwargs_dict["node_id"], task=task, kwargs=self.kwargs_dict, record_error=False, + node_id=self.extra_kwargs_dict["node_id"], + task=task, + kwargs=self.kwargs_dict, + record_error=False, ) elif self.type == self.TASK_TYPE_SCHEDULE: ScheduleCeleryTask.objects.start_task( @@ -1366,7 +1420,11 @@ def watch(cls, name, kwargs, type, extra_kwargs): except Exception: logger.exception("celery task({}) watcher catch error.".format(name)) cls.objects.record( - name=name, kwargs=kwargs, type=type, extra_kwargs=extra_kwargs, exec_trace=traceback.format_exc(), + name=name, + kwargs=kwargs, + type=type, + extra_kwargs=extra_kwargs, + exec_trace=traceback.format_exc(), ) # raise specific exception to indicate that send fail task have been catched raise exceptions.CeleryFailedTaskCatchException(name) diff --git a/runtime/bamboo-pipeline/pipeline/engine/tasks.py b/runtime/bamboo-pipeline/pipeline/engine/tasks.py index 3a6470b1..7bee23db 100644 --- a/runtime/bamboo-pipeline/pipeline/engine/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/engine/tasks.py @@ -14,12 +14,12 @@ import datetime import logging -from celery import task +from celery import current_app from celery.schedules import crontab -from celery.task import periodic_task from dateutil.relativedelta import relativedelta from django.apps import apps -from django.db import connection, transaction +from django.db import transaction, connection + from pipeline.conf import default_settings from pipeline.core.pipeline import Pipeline from pipeline.engine import api, signals, states @@ -35,11 +35,12 @@ Status, ) from pipeline.models import PipelineInstance +from pipeline.contrib.celery_tools.periodic import periodic_task logger = logging.getLogger("celery") -@task(ignore_result=True) +@current_app.task(ignore_result=True) def process_unfreeze(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -49,7 +50,7 @@ def process_unfreeze(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def start(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -68,7 +69,7 @@ def start(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def dispatch(child_id): process = PipelineProcess.objects.get(id=child_id) if not process.is_alive: @@ -78,7 +79,7 @@ def dispatch(child_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def process_wake_up(process_id, current_node_id=None, call_from_child=False): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -104,7 +105,7 @@ def process_wake_up(process_id, current_node_id=None, call_from_child=False): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def wake_up(process_id): process = PipelineProcess.objects.get(id=process_id) if not process.is_alive: @@ -115,7 +116,7 @@ def wake_up(process_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def batch_wake_up(process_id_list, pipeline_id): # success_when_unchanged to deal with parallel gateway subprocess wake up action_result = Status.objects.transit(pipeline_id, to_state=states.RUNNING, is_pipeline=True, unchanged_pass=True) @@ -127,7 +128,7 @@ def batch_wake_up(process_id_list, pipeline_id): ProcessCeleryTask.objects.bind(process_id, task_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def wake_from_schedule(process_id, service_act_id): process = PipelineProcess.objects.get(id=process_id) process.wake_up() @@ -137,12 +138,12 @@ def wake_from_schedule(process_id, service_act_id): runtime.run_loop(process) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def service_schedule(process_id, schedule_id, data_id=None): schedule.schedule(process_id, schedule_id, data_id) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def node_timeout_check(node_id, version, root_pipeline_id): NodeCeleryTask.objects.destroy(node_id) state = Status.objects.state_for(node_id, version=version, may_not_exist=True) diff --git a/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py b/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py index 76bc4c6f..77c160da 100644 --- a/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/eri/celery/tasks.py @@ -14,9 +14,9 @@ import logging from typing import Optional -from celery import task -from celery.decorators import periodic_task +from celery import current_app from celery.schedules import crontab +from pipeline.contrib.celery_tools.periodic import periodic_task from django.conf import settings from bamboo_engine import metrics @@ -47,7 +47,7 @@ def _observe_message_delay(metric: metrics.Histogram, headers: dict): logger.exception("%s observe err" % metric) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def execute( process_id: int, node_id: str, @@ -86,7 +86,7 @@ def execute( ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def schedule( process_id: int, node_id: str, diff --git a/runtime/bamboo-pipeline/pipeline/log/tasks.py b/runtime/bamboo-pipeline/pipeline/log/tasks.py index 394a1d5b..a5aef126 100644 --- a/runtime/bamboo-pipeline/pipeline/log/tasks.py +++ b/runtime/bamboo-pipeline/pipeline/log/tasks.py @@ -13,11 +13,11 @@ import logging -from celery.decorators import periodic_task from celery.schedules import crontab from django.conf import settings from pipeline.log.models import LogEntry +from pipeline.contrib.celery_tools.periodic import periodic_task logger = logging.getLogger(__name__) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py new file mode 100644 index 00000000..26a6d1c2 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/__init__.py @@ -0,0 +1,12 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py new file mode 100644 index 00000000..f34dc676 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery4.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from __future__ import absolute_import, unicode_literals + +from optparse import make_option as Option + +from celery.bin import celery + +from pipeline.management.commands.app import app +from pipeline.management.commands.base import CeleryCommand + +base = celery.CeleryCommand(app=app) + + +class Command(CeleryCommand): + """The celery command.""" + + help = "celery commands, see celery help" + options = ( + Option("-A", "--app", default=None), + Option("--broker", default=None), + Option("--loader", default=None), + Option("--config", default=None), + Option("--workdir", default=None, dest="working_directory"), + Option("--result-backend", default=None), + Option("--no-color", "-C", action="store_true", default=None), + Option("--quiet", "-q", action="store_true"), + ) + if base.get_options() is not None: + options = options + CeleryCommand.options + base.get_options() + + def run_from_argv(self, argv): + argv = self.handle_default_options(argv) + base.execute_from_commandline(["{0[0]} {0[1]}".format(argv)] + argv[2:],) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py new file mode 100644 index 00000000..49339fb1 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celery5.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from celery.bin.celery import celery +from click.exceptions import Exit +from django.core.management import BaseCommand + + +class Command(BaseCommand): + """The celery command.""" + + help = "celery commands, see celery help" + + def run_from_argv(self, argv): + try: + celery.main(args=argv[2:], standalone_mode=False) + except Exit as e: + print(f"celery command error: {e}") + return e.exit_code diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py new file mode 100644 index 00000000..2ef3aef4 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat4.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from __future__ import absolute_import, unicode_literals + +from optparse import make_option as Option + +from celery.bin import beat + +from pipeline.management.commands.app import app +from pipeline.management.commands.base import CeleryCommand + +beat = beat.beat(app=app) + + +class Command(CeleryCommand): + """Run the celery periodic task scheduler.""" + + help = 'Old alias to the "celery beat" command.' + options = ( + Option("-A", "--app", default=None), + Option("--broker", default=None), + Option("--loader", default=None), + Option("--config", default=None), + Option("--workdir", default=None, dest="working_directory"), + Option("--result-backend", default=None), + Option("--no-color", "-C", action="store_true", default=None), + Option("--quiet", "-q", action="store_true"), + ) + if beat.get_options() is not None: + options = options + CeleryCommand.options + beat.get_options() + + def handle(self, *args, **options): + beat.run(*args, **options) diff --git a/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py new file mode 100644 index 00000000..88ec7be5 --- /dev/null +++ b/runtime/bamboo-pipeline/pipeline/management/celery_version_adapter/celerybeat5.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +""" +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from celery.bin.celery import celery +from click.exceptions import Exit +from django.core.management import BaseCommand + + +class Command(BaseCommand): + """The celery command.""" + + help = "celery commands, see celery help" + + def run_from_argv(self, argv): + try: + celery.main(args=["beat", *argv[2:]], standalone_mode=False) + except Exit as e: + print(f"celery command error: {e}") + return e.exit_code diff --git a/runtime/bamboo-pipeline/pipeline/management/commands/celery.py b/runtime/bamboo-pipeline/pipeline/management/commands/celery.py index f34dc676..4dd3bc9a 100644 --- a/runtime/bamboo-pipeline/pipeline/management/commands/celery.py +++ b/runtime/bamboo-pipeline/pipeline/management/commands/celery.py @@ -10,35 +10,14 @@ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ -from __future__ import absolute_import, unicode_literals +import celery -from optparse import make_option as Option +major_version = celery.VERSION.major -from celery.bin import celery -from pipeline.management.commands.app import app -from pipeline.management.commands.base import CeleryCommand +if major_version < 5: + from ..celery_version_adapter.celery4 import Command +else: + from ..celery_version_adapter.celery5 import Command -base = celery.CeleryCommand(app=app) - - -class Command(CeleryCommand): - """The celery command.""" - - help = "celery commands, see celery help" - options = ( - Option("-A", "--app", default=None), - Option("--broker", default=None), - Option("--loader", default=None), - Option("--config", default=None), - Option("--workdir", default=None, dest="working_directory"), - Option("--result-backend", default=None), - Option("--no-color", "-C", action="store_true", default=None), - Option("--quiet", "-q", action="store_true"), - ) - if base.get_options() is not None: - options = options + CeleryCommand.options + base.get_options() - - def run_from_argv(self, argv): - argv = self.handle_default_options(argv) - base.execute_from_commandline(["{0[0]} {0[1]}".format(argv)] + argv[2:],) +Command = Command diff --git a/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py b/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py index 73e0cfe7..4aadd198 100644 --- a/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py +++ b/runtime/bamboo-pipeline/pipeline/management/commands/celerybeat.py @@ -1,36 +1,24 @@ +# -*- coding: utf-8 -*- """ - -Start the celery clock service from the Django management command. - +Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community +Edition) available. +Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. """ -from __future__ import absolute_import, unicode_literals - -from optparse import make_option as Option - -from celery.bin import beat - -from pipeline.management.commands.app import app -from pipeline.management.commands.base import CeleryCommand -beat = beat.beat(app=app) +import celery +major_version = celery.VERSION.major -class Command(CeleryCommand): - """Run the celery periodic task scheduler.""" - help = 'Old alias to the "celery beat" command.' - options = ( - Option("-A", "--app", default=None), - Option("--broker", default=None), - Option("--loader", default=None), - Option("--config", default=None), - Option("--workdir", default=None, dest="working_directory"), - Option("--result-backend", default=None), - Option("--no-color", "-C", action="store_true", default=None), - Option("--quiet", "-q", action="store_true"), - ) - if beat.get_options() is not None: - options = options + CeleryCommand.options + beat.get_options() +if major_version < 5: + from ..celery_version_adapter.celerybeat4 import Command +else: + from ..celery_version_adapter.celerybeat5 import Command - def handle(self, *args, **options): - beat.run(*args, **options) +Command = Command diff --git a/runtime/bamboo-pipeline/poetry.lock b/runtime/bamboo-pipeline/poetry.lock index 2cde3dfb..74ebac20 100644 --- a/runtime/bamboo-pipeline/poetry.lock +++ b/runtime/bamboo-pipeline/poetry.lock @@ -1,13 +1,13 @@ [[package]] name = "amqp" -version = "2.6.1" +version = "5.2.0" description = "Low-level AMQP client for Python (fork of amqplib)." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=3.6" [package.dependencies] -vine = ">=1.1.3,<5.0.0a1" +vine = ">=5.0.0,<6.0.0" [[package]] name = "appdirs" @@ -31,6 +31,17 @@ typing-extensions = {version = "*", markers = "python_version < \"3.8\""} [package.extras] tests = ["pytest", "pytest-asyncio", "mypy (>=0.800)"] +[[package]] +name = "async-timeout" +version = "4.0.2" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +typing-extensions = {version = ">=3.6.5", markers = "python_version < \"3.8\""} + [[package]] name = "atomicwrites" version = "1.4.1" @@ -132,44 +143,55 @@ urllib3 = ">=1.25.4,<1.27" [package.extras] crt = ["awscrt (==0.13.8)"] +[[package]] +name = "cached-property" +version = "1.5.2" +description = "A decorator for caching properties in classes." +category = "main" +optional = false +python-versions = "*" + [[package]] name = "celery" -version = "4.4.7" +version = "5.1.2" description = "Distributed Task Queue." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=3.6," [package.dependencies] -billiard = ">=3.6.3.0,<4.0" -kombu = ">=4.6.10,<4.7" +billiard = ">=3.6.4.0,<4.0" +click = ">=7.0,<8.0" +click-didyoumean = ">=0.0.3" +click-plugins = ">=1.1.1" +click-repl = ">=0.1.6" +kombu = ">=5.1.0,<6.0" pytz = ">0.0-dev" -vine = "1.3.0" +vine = ">=5.0.0,<6.0" [package.extras] arangodb = ["pyArango (>=1.3.2)"] auth = ["cryptography"] -azureblockblob = ["azure-storage (==0.36.0)", "azure-common (==1.1.5)", "azure-storage-common (==1.1.0)"] +azureblockblob = ["azure-storage-blob (==12.6.0)"] brotli = ["brotli (>=1.0.0)", "brotlipy (>=0.7.0)"] cassandra = ["cassandra-driver (<3.21.0)"] -consul = ["python-consul"] +consul = ["python-consul2"] cosmosdbsql = ["pydocumentdb (==2.3.2)"] -couchbase = ["couchbase-cffi (<3.0.0)", "couchbase (<3.0.0)"] +couchbase = ["couchbase (>=3.0.0)"] couchdb = ["pycouchdb"] django = ["Django (>=1.11)"] dynamodb = ["boto3 (>=1.9.178)"] elasticsearch = ["elasticsearch"] -eventlet = ["eventlet (>=0.24.1)"] -gevent = ["gevent"] +eventlet = ["eventlet (>=0.26.1)"] +gevent = ["gevent (>=1.0.0)"] librabbitmq = ["librabbitmq (>=1.5.0)"] -lzma = ["backports.lzma"] memcache = ["pylibmc"] mongodb = ["pymongo[srv] (>=3.3.0)"] msgpack = ["msgpack"] pymemcache = ["python-memcached"] pyro = ["pyro4"] +pytest = ["pytest-celery"] redis = ["redis (>=3.2.0)"] -riak = ["riak (>=2.0)"] s3 = ["boto3 (>=1.9.125)"] slmq = ["softlayer-messaging (>=1.0.3)"] solar = ["ephem"] @@ -182,7 +204,7 @@ zstd = ["zstandard"] [[package]] name = "certifi" -version = "2022.12.7" +version = "2024.2.2" description = "Python package for providing Mozilla's CA Bundle." category = "main" optional = false @@ -201,15 +223,49 @@ unicode_backport = ["unicodedata2"] [[package]] name = "click" -version = "8.0.4" +version = "7.1.2" description = "Composable command line interface toolkit" -category = "dev" +category = "main" optional = false -python-versions = ">=3.6" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + +[[package]] +name = "click-didyoumean" +version = "0.0.3" +description = "Enable git-like did-you-mean feature in click." +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +click = "*" + +[[package]] +name = "click-plugins" +version = "1.1.1" +description = "An extension module for click to enable registering CLI commands via setuptools entry-points." +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +click = ">=4.0" + +[package.extras] +dev = ["pytest (>=3.6)", "pytest-cov", "wheel", "coveralls"] + +[[package]] +name = "click-repl" +version = "0.2.0" +description = "REPL plugin for Click" +category = "main" +optional = false +python-versions = "*" [package.dependencies] -colorama = {version = "*", markers = "platform_system == \"Windows\""} -importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} +click = "*" +prompt-toolkit = "*" +six = "*" [[package]] name = "colorama" @@ -243,7 +299,7 @@ python-versions = ">=3.6, <3.7" [[package]] name = "django" -version = "3.2.18" +version = "3.2.25" description = "A high-level Python Web framework that encourages rapid development and clean, pragmatic design." category = "main" optional = false @@ -260,14 +316,14 @@ bcrypt = ["bcrypt"] [[package]] name = "django-celery-beat" -version = "2.2.0" +version = "2.2.1" description = "Database-backed Periodic Tasks." category = "main" optional = false python-versions = "*" [package.dependencies] -celery = ">=4.4,<6.0" +celery = ">=5.0,<6.0" Django = ">=2.2,<4.0" django-timezone-field = ">=4.1.0,<5.0" python-crontab = ">=2.3.4" @@ -312,7 +368,7 @@ typing-extensions = {version = ">=3.7.4.3", markers = "python_version < \"3.8\"" [[package]] name = "idna" -version = "3.4" +version = "3.7" description = "Internationalized Domain Names in Applications (IDNA)" category = "main" optional = false @@ -375,18 +431,20 @@ format = ["rfc3987", "strict-rfc3339", "webcolors"] [[package]] name = "kombu" -version = "4.6.11" +version = "5.1.0" description = "Messaging library for Python." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=3.6" [package.dependencies] -amqp = ">=2.6.0,<2.7" +amqp = ">=5.0.6,<6.0.0" +cached-property = {version = "*", markers = "python_version < \"3.8\""} importlib-metadata = {version = ">=0.18", markers = "python_version < \"3.8\""} +vine = "*" [package.extras] -azureservicebus = ["azure-servicebus (>=0.21.1)"] +azureservicebus = ["azure-servicebus (>=7.0.0)"] azurestoragequeues = ["azure-storage-queue"] consul = ["python-consul (>=0.6.0)"] librabbitmq = ["librabbitmq (>=1.5.2)"] @@ -397,7 +455,7 @@ qpid = ["qpid-python (>=0.26)", "qpid-tools (>=0.26)"] redis = ["redis (>=3.3.11)"] slmq = ["softlayer-messaging (>=1.0.3)"] sqlalchemy = ["sqlalchemy"] -sqs = ["boto3 (>=1.4.4)", "pycurl (==7.43.0.2)"] +sqs = ["boto3 (>=1.4.4)", "pycurl (==7.43.0.2)", "urllib3 (<1.26)"] yaml = ["PyYAML (>=3.10)"] zookeeper = ["kazoo (>=1.3.1)"] @@ -457,7 +515,7 @@ python-versions = ">=3.5" name = "packaging" version = "21.3" description = "Core utilities for Python packages" -category = "dev" +category = "main" optional = false python-versions = ">=3.6" @@ -498,6 +556,17 @@ python-versions = "*" [package.extras] twisted = ["twisted"] +[[package]] +name = "prompt-toolkit" +version = "3.0.3" +description = "Library for building powerful interactive command lines in Python" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +wcwidth = "*" + [[package]] name = "py" version = "1.11.0" @@ -553,7 +622,7 @@ testing = ["django", "django-configurations (>=2.0)"] [[package]] name = "python-crontab" -version = "2.7.1" +version = "3.0.0" description = "Python Crontab API" category = "main" optional = false @@ -568,7 +637,7 @@ cron-schedule = ["croniter"] [[package]] name = "python-dateutil" -version = "2.8.2" +version = "2.9.0.post0" description = "Extensions to the standard Python datetime module" category = "main" optional = false @@ -587,32 +656,25 @@ python-versions = "*" [[package]] name = "redis" -version = "3.5.3" -description = "Python client for Redis key-value store" -category = "main" -optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" - -[package.extras] -hiredis = ["hiredis (>=0.1.3)"] - -[[package]] -name = "redis-py-cluster" -version = "2.1.0" -description = "Library for communicating with Redis Clusters. Built on top of redis-py lib" +version = "4.3.6" +description = "Python client for Redis database and key-value store" category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4" +python-versions = ">=3.6" [package.dependencies] -redis = ">=3.0.0,<4.0.0" +async-timeout = ">=4.0.2" +importlib-metadata = {version = ">=1.0", markers = "python_version < \"3.8\""} +packaging = ">=20.4" +typing-extensions = {version = "*", markers = "python_version < \"3.8\""} [package.extras] -hiredis = ["hiredis (>=0.1.3)"] +hiredis = ["hiredis (>=1.0.0)"] +ocsp = ["cryptography (>=36.0.1)", "pyopenssl (==20.0.1)", "requests (>=2.26.0)"] [[package]] name = "regex" -version = "2022.10.31" +version = "2023.8.8" description = "Alternative regular expression module, to replace re." category = "dev" optional = false @@ -660,12 +722,17 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "sqlparse" -version = "0.4.3" +version = "0.4.4" description = "A non-validating SQL parser." category = "main" optional = false python-versions = ">=3.5" +[package.extras] +dev = ["flake8", "build"] +doc = ["sphinx"] +test = ["pytest", "pytest-cov"] + [[package]] name = "toml" version = "0.10.2" @@ -676,7 +743,7 @@ python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" [[package]] name = "typed-ast" -version = "1.5.4" +version = "1.5.5" description = "a fork of Python 2 and 3 ast modules with type comment support" category = "dev" optional = false @@ -692,7 +759,7 @@ python-versions = ">=3.6" [[package]] name = "ujson" -version = "4.1.0" +version = "4.3.0" description = "Ultra fast JSON encoder and decoder for Python" category = "main" optional = false @@ -700,24 +767,32 @@ python-versions = ">=3.6" [[package]] name = "urllib3" -version = "1.26.15" +version = "1.26.18" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [package.extras] -brotli = ["brotlicffi (>=0.8.0)", "brotli (>=1.0.9)", "brotlipy (>=0.6.0)"] +brotli = ["brotlicffi (>=0.8.0)", "brotli (==1.0.9)", "brotlipy (>=0.6.0)", "brotli (>=1.0.9)"] secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "urllib3-secure-extra", "ipaddress"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] name = "vine" -version = "1.3.0" -description = "Promises, promises, promises." +version = "5.1.0" +description = "Python promises." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.6" + +[[package]] +name = "wcwidth" +version = "0.2.13" +description = "Measures the displayed width of unicode strings in a terminal" +category = "main" +optional = false +python-versions = "*" [[package]] name = "werkzeug" @@ -746,12 +821,16 @@ testing = ["pytest (>=4.6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytes [metadata] lock-version = "1.1" python-versions = ">= 3.6, < 4" -content-hash = "120e412f8ba95fcebb08c720255fe9ff4c8fa17681a9eee4937e799a4512c1cf" +content-hash = "d235e59f2ce17520b71b66f5a9f8eac11ff16c9f8629422c5a7d63b590d5fa15" [metadata.files] amqp = [] appdirs = [] -asgiref = [] +asgiref = [ + {file = "asgiref-3.4.1-py3-none-any.whl", hash = "sha256:ffc141aa908e6f175673e7b1b3b7af4fdb0ecb738fc5c8b88f69f055c2415214"}, + {file = "asgiref-3.4.1.tar.gz", hash = "sha256:4ef1ab46b484e3c706329cedeff284a5d40824200638503f5768edb6de7d58e9"}, +] +async-timeout = [] atomicwrites = [] attrs = [] bamboo-engine = [] @@ -759,14 +838,30 @@ billiard = [] black = [] boto3 = [] botocore = [] -celery = [] +cached-property = [] +celery = [ + {file = "celery-5.1.2-py3-none-any.whl", hash = "sha256:9dab2170b4038f7bf10ef2861dbf486ddf1d20592290a1040f7b7a1259705d42"}, + {file = "celery-5.1.2.tar.gz", hash = "sha256:8d9a3de9162965e97f8e8cc584c67aad83b3f7a267584fa47701ed11c3e0d4b0"}, +] certifi = [] charset-normalizer = [] -click = [] +click = [ + {file = "click-7.1.2-py2.py3-none-any.whl", hash = "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc"}, + {file = "click-7.1.2.tar.gz", hash = "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a"}, +] +click-didyoumean = [] +click-plugins = [ + {file = "click-plugins-1.1.1.tar.gz", hash = "sha256:46ab999744a9d831159c3411bb0c79346d94a444df9a3a3742e9ed63645f264b"}, + {file = "click_plugins-1.1.1-py2.py3-none-any.whl", hash = "sha256:5d262006d3222f5057fd81e1623d4443e41dcda5dc815c06b442aa3c02889fc8"}, +] +click-repl = [] colorama = [] coverage = [] dataclasses = [] -django = [] +django = [ + {file = "Django-3.2.25-py3-none-any.whl", hash = "sha256:a52ea7fcf280b16f7b739cec38fa6d3f8953a5456986944c3ca97e79882b4e38"}, + {file = "Django-3.2.25.tar.gz", hash = "sha256:7ca38a78654aee72378594d63e51636c04b8e28574f5505dff630895b5472777"}, +] django-celery-beat = [] django-timezone-field = [] factory-boy = [] @@ -787,6 +882,7 @@ packaging = [] pathspec = [] pluggy = [] prometheus-client = [] +prompt-toolkit = [] py = [] pyparsing = [] pytest = [] @@ -795,7 +891,6 @@ python-crontab = [] python-dateutil = [] pytz = [] redis = [] -redis-py-cluster = [] regex = [] requests = [] s3transfer = [] @@ -804,8 +899,57 @@ sqlparse = [] toml = [] typed-ast = [] typing-extensions = [] -ujson = [] +ujson = [ + {file = "ujson-4.3.0-cp310-cp310-macosx_10_14_x86_64.whl", hash = "sha256:3609e0514f6f721c6c9818b9374ec91b994e59fb193af2f924ca3f2f32009f1c"}, + {file = "ujson-4.3.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:de42986e2602b6a0baca452ff50e9cbe66faf256761295d5d07ae3f6757b487d"}, + {file = "ujson-4.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:843fd8b3246b2b20bbae48b2334d26507c9531b2b014533adfc6132e3ec8e60c"}, + {file = "ujson-4.3.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5d1083a0dcb39b43cfcd948f09e480c23eb4af66d7d08f6b36951f4c629c3bd1"}, + {file = "ujson-4.3.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:01d12df8eb25afb939a003284b5b5adca9788c1176c445641e5980fa892562ac"}, + {file = "ujson-4.3.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:b0b9cde57eebaac26de040f8ebf0541e06fe9bcf7e42872dc036d2ced7d99ccf"}, + {file = "ujson-4.3.0-cp310-cp310-win32.whl", hash = "sha256:3d8eaab72ad8129c12ed90ebf310230bd014b6bbf99145ebf2bc890238e0254f"}, + {file = "ujson-4.3.0-cp310-cp310-win_amd64.whl", hash = "sha256:85f28c38952b8a94183ab15ec6c6e89c117d00ceeae5d754ef1a33e01e28b845"}, + {file = "ujson-4.3.0-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:8a0d9dde58937976cd06cd776411b77b0e5d38db0a3c1be28ee8bb428ff5a42b"}, + {file = "ujson-4.3.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9f4a34386785a33600ac7442fec34c3d8b2d7e5309cfc94bc7c9ba93f12640c2"}, + {file = "ujson-4.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d8e2a52fbeee55db306b9306892f5cde7e78c56069c1212abf176d1886fff60a"}, + {file = "ujson-4.3.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9c5330692122b999997911252466a7d17e4e428d7d9a8db0b99ba81b8b9c010c"}, + {file = "ujson-4.3.0-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:9baa160ba1d3f712a356e77718251c9d9eee43ed548debdcc9d75b06a75b3e82"}, + {file = "ujson-4.3.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:a6c32356145d95a0403b5895d60c36798a48af13b8863e43ad7457a0361afad0"}, + {file = "ujson-4.3.0-cp36-cp36m-win32.whl", hash = "sha256:b72fadeea5727204674c9f77166da7feaafdf70f1ed50bb15bf321f7c39c7194"}, + {file = "ujson-4.3.0-cp36-cp36m-win_amd64.whl", hash = "sha256:1601354caaab0697a9b24815a31611ad013d29cf957d545fc1cd59835b82e3c1"}, + {file = "ujson-4.3.0-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:b80a35bad8fad1772f992bae8086b0cde788cd3b37f35d0d4506c93e6edad645"}, + {file = "ujson-4.3.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7a318df321d7adc3de876b29640cca8de1ad4d4e4fe7c4a76d64d9d6f1676304"}, + {file = "ujson-4.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fc9a508efb829bf0542be9b2578d8da08f0ab1fa712e086ebb777d6ec9e6d8d2"}, + {file = "ujson-4.3.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43d2403451d7bd27b6a600f89d4bd2cf6e1b3494254509d8b5ef3c8e94ae4d8e"}, + {file = "ujson-4.3.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:fd0901db652a58f46550074596227dbddb7a02d2de744d3cd2358101f78037bb"}, + {file = "ujson-4.3.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:00fd67952b1a8a46cf5b0a51b3838187332d13d2e8d178423c5a5405c21d9e7c"}, + {file = "ujson-4.3.0-cp37-cp37m-win32.whl", hash = "sha256:b0e9510e867c72a87db2d16377c2bef912f29afd8381d1fdae332b9b7f697efa"}, + {file = "ujson-4.3.0-cp37-cp37m-win_amd64.whl", hash = "sha256:294e907f134fb5d83e0a4439cf4040d74da77157938b4db5730cd174621dcf8b"}, + {file = "ujson-4.3.0-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:087cd977f4f63f885a49607244e7e157801a22aadcc075a262d3c3633138573c"}, + {file = "ujson-4.3.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4f35dcf6d2a67e913a7135809006bd000d55ad5b5834b5dbe5b82dcf8db1ac05"}, + {file = "ujson-4.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f158fdb08e022f2f16f0fba317a80558b0cebc7e2c84ae783e5f75616d5c90d5"}, + {file = "ujson-4.3.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2a06006dad34c8cfaa734bd6458452e46702b368da53b56e7732351082aa0420"}, + {file = "ujson-4.3.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:6df94e675b05ecf4e7a57883a73b916ffcb5872d7b1298ac5cef8ac1cbce73c6"}, + {file = "ujson-4.3.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:47af81df5d575e36d4be9396db94f35c8f62de3077a405f9af94f9756255cef5"}, + {file = "ujson-4.3.0-cp38-cp38-win32.whl", hash = "sha256:e46c1462761db518fae51ab0d89a6256aeac148a795f7244d9084c459b477af5"}, + {file = "ujson-4.3.0-cp38-cp38-win_amd64.whl", hash = "sha256:bf199015910fcfa19b6e12881abeb462498791b2ab0111ff8b17095d0477e9d4"}, + {file = "ujson-4.3.0-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:32ee97ec37af31b35ca4395732d883bf74fb70309d38485f7fb9a5cc3332c53e"}, + {file = "ujson-4.3.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1f211c7c0c9377cbf4650aa990118d0c2cce3c5fad476c39ecd35b6714ba4463"}, + {file = "ujson-4.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c81159d3f1bcb5729ba019e63e78ee6c91b556e1ac0e67c7579768720fd3c4e"}, + {file = "ujson-4.3.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b850029d64008e970cae04ada69aa33e1cd412106a1efde221269c1cda1b40cc"}, + {file = "ujson-4.3.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:327ec982bb89abe779fe463e1013c47aae6ed53b76600af7cb1e8b8cb0ee9f85"}, + {file = "ujson-4.3.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:103cbabe4e6fd70c957219519e37d65be612d7c74d91ef19022a2c8f8c5e4e82"}, + {file = "ujson-4.3.0-cp39-cp39-win32.whl", hash = "sha256:7b0a63865ec2978ebafb0906bf982eb52bea26fc98e2ae5e59b9d204afe2d762"}, + {file = "ujson-4.3.0-cp39-cp39-win_amd64.whl", hash = "sha256:18040475d997d93a6851d8bee474fba2ec94869e8f826dddd66cdae4aa3fdb92"}, + {file = "ujson-4.3.0-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:df481d4e13ca34d870d1fdf387742867edff3f78a1eea1bbcd72ea2fa68d9a6e"}, + {file = "ujson-4.3.0-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e7e73ec5ba1b42c2027773f69b70eff28df132907aa98b28166c39d3ea45e85b"}, + {file = "ujson-4.3.0-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b270088e472f1d65a0a0aab3190010b9ac1a5b2969d39bf2b53c0fbf339bc87a"}, + {file = "ujson-4.3.0.tar.gz", hash = "sha256:baee56eca35cb5fbe02c28bd9c0936be41a96fa5c0812d9d4b7edeb5c3d568a0"}, +] urllib3 = [] -vine = [] +vine = [ + {file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"}, + {file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"}, +] +wcwidth = [] werkzeug = [] zipp = [] diff --git a/runtime/bamboo-pipeline/pyproject.toml b/runtime/bamboo-pipeline/pyproject.toml index a026b0e4..b7ef004f 100644 --- a/runtime/bamboo-pipeline/pyproject.toml +++ b/runtime/bamboo-pipeline/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "bamboo-pipeline" -version = "3.29.2" +version = "3.29.3" description = "runtime for bamboo-engine base on Django and Celery" authors = ["homholueng "] license = "MIT" @@ -10,23 +10,22 @@ packages = [ [tool.poetry.dependencies] python = ">= 3.6, < 4" -celery = "^4.4.0" -Django = "^2.2 || ^3.0" -requests = "^2.22.0" -django-celery-beat = "^2.1.0" +celery = ">=4.4.0, <6" +Django = ">=2.2, <5" +requests = "^2.22" +django-celery-beat = "^2.1" Mako = "^1.1.4" pytz = "2019.3" bamboo-engine = "^2.10.2" jsonschema = "^2.5.1" -ujson = "4.1.*" -pyparsing = "^2.2.0" -redis = "^3.2.0" -redis-py-cluster = "2.1.0" -django-timezone-field = "^4.0" -Werkzeug = "^1.0.0" -prometheus-client = "^0.9.0" +ujson = "^4" +pyparsing = "^2.2" +redis = ">=3.2.0, <6" +django-timezone-field = "^4" +Werkzeug = "^1" +prometheus-client = "^0.9" boto3 = "^1.9.130" -isodate = "^0.6.1" +isodate = "^0.6" [tool.poetry.dev-dependencies] pytest = "^6.2.2" diff --git a/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py b/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py index 7939a9a0..749178f5 100644 --- a/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py +++ b/runtime/bamboo-pipeline/test/eri_chaos/celery_tasks.py @@ -14,7 +14,7 @@ import json from typing import Optional -from celery import task +from celery import current_app from bamboo_engine.eri import ExecuteInterruptPoint, ScheduleInterruptPoint from bamboo_engine.engine import Engine @@ -28,7 +28,7 @@ from .runtime import ChoasBambooDjangoRuntime -@task(ignore_result=True) +@current_app.task(ignore_result=True) def chaos_execute( process_id: int, node_id: str, @@ -70,7 +70,7 @@ def chaos_execute( ) -@task(ignore_result=True) +@current_app.task(ignore_result=True) def chaos_schedule( process_id: int, node_id: str,