Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: celery 5 & redis 依赖包升级适配 #230

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/engine_unittest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
run: python -m coverage xml

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v4
with:
fail_ci_if_error: true
token: ${{ secrets.CODECOV_TOKEN }}

2 changes: 1 addition & 1 deletion runtime/bamboo-pipeline/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@

default_app_config = "pipeline.apps.PipelineConfig"

__version__ = "3.29.2"
__version__ = "3.29.3"
5 changes: 4 additions & 1 deletion runtime/bamboo-pipeline/pipeline/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from django.apps import AppConfig
from django.conf import settings
from redis.sentinel import Sentinel
from rediscluster import RedisCluster
try:
from redis.cluster import RedisCluster
except ImportError:
from rediscluster import RedisCluster

logger = logging.getLogger("root")

Expand Down
12 changes: 12 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/celery_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
47 changes: 47 additions & 0 deletions runtime/bamboo-pipeline/pipeline/contrib/celery_tools/periodic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from celery import Task, current_app
from celery.schedules import maybe_schedule


class PipelinePeriodicTask(Task):
"""A task that adds itself to the :setting:`beat_schedule` setting."""

abstract = True
ignore_result = True
relative = False
options = None
compat = True

def __init__(self):
if not hasattr(self, 'run_every'):
raise NotImplementedError(
'Periodic tasks must have a run_every attribute')
self.run_every = maybe_schedule(self.run_every, self.relative)
super(PipelinePeriodicTask, self).__init__()

@classmethod
def on_bound(cls, app):
app.conf.beat_schedule[cls.name] = {
'task': cls.name,
'schedule': cls.run_every,
'args': (),
'kwargs': {},
'options': cls.options or {},
'relative': cls.relative,
}


def periodic_task(*args, **options):
"""Deprecated decorator, please use :setting:`beat_schedule`."""
return current_app.task(**dict({'base': PipelinePeriodicTask}, **options))
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import json
import logging

from celery import task
from celery import current_app

from pipeline.contrib.node_timeout.settings import node_timeout_settings
from pipeline.eri.models import State, Process
Expand All @@ -24,7 +24,7 @@
logger = logging.getLogger("celery")


@task(acks_late=True)
@current_app.task(acks_late=True)
def dispatch_timeout_nodes(record_id: int):
record = TimeoutNodesRecord.objects.get(id=record_id)
nodes = json.loads(record.timeout_nodes)
Expand All @@ -40,7 +40,7 @@ def dispatch_timeout_nodes(record_id: int):
)


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def execute_node_timeout_strategy(node_id, version):
timeout_config = TimeoutNodeConfig.objects.filter(node_id=node_id).only("root_pipeline_id", "action").first()
if timeout_config is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
from typing import Any, Dict, List, Type, Union

from celery import task
from celery import current_app
from pipeline.contrib.node_timer_event.adapter import NodeTimerEventBaseAdapter
from pipeline.contrib.node_timer_event.handlers import ActionManager
from pipeline.contrib.node_timer_event.models import ExpiredNodesRecord
Expand All @@ -24,7 +24,7 @@
logger = logging.getLogger("celery")


@task(acks_late=True)
@current_app.task(acks_late=True)
def dispatch_expired_nodes(record_id: int):
record: ExpiredNodesRecord = ExpiredNodesRecord.objects.get(id=record_id)
node_keys: List[str] = json.loads(record.nodes)
Expand Down Expand Up @@ -62,7 +62,7 @@ def dispatch_expired_nodes(record_id: int):
logger.info("[dispatch_expired_nodes] record deleted: record -> %s", record_id)


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def execute_node_timer_event_action(node_id: str, version: str, index: int):

adapter_class: Type[NodeTimerEventBaseAdapter] = node_timer_event_settings.adapter_class
Expand Down
35 changes: 26 additions & 9 deletions runtime/bamboo-pipeline/pipeline/contrib/periodic_task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import traceback

import pytz
from celery import task
from celery import current_app
from django.utils import timezone
from django.utils.module_loading import import_string
from bamboo_engine import api as bamboo_engine_api
Expand All @@ -35,7 +35,7 @@
logger = logging.getLogger("celery")


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def periodic_task_start(*args, **kwargs):
try:
periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"])
Expand Down Expand Up @@ -69,19 +69,28 @@ def periodic_task_start(*args, **kwargs):
)

result = instance.start(
periodic_task.creator, check_workers=False, priority=periodic_task.priority, queue=periodic_task.queue,
periodic_task.creator,
check_workers=False,
priority=periodic_task.priority,
queue=periodic_task.queue,
)
except Exception:
et = traceback.format_exc()
logger.error(et)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=et,
start_success=False,
)
return

if not result.result:
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=result.message,
start_success=False,
)
return

Expand All @@ -93,7 +102,7 @@ def periodic_task_start(*args, **kwargs):
PeriodicTaskHistory.objects.record_schedule(periodic_task=periodic_task, pipeline_instance=instance, ex_data="")


@task(ignore_result=True)
@current_app.task(ignore_result=True)
def bamboo_engine_periodic_task_start(*args, **kwargs):
try:
periodic_task = PeriodicTask.objects.get(id=kwargs["period_task_id"])
Expand Down Expand Up @@ -147,16 +156,24 @@ def bamboo_engine_periodic_task_start(*args, **kwargs):
et = traceback.format_exc()
logger.error(et)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=et, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=et,
start_success=False,
)
return

if not result.result:
PipelineInstance.objects.filter(id=instance.instance_id).update(
start_time=None, is_started=False, executor="",
start_time=None,
is_started=False,
executor="",
)
PeriodicTaskHistory.objects.record_schedule(
periodic_task=periodic_task, pipeline_instance=None, ex_data=result.message, start_success=False,
periodic_task=periodic_task,
pipeline_instance=None,
ex_data=result.message,
start_success=False,
)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import traceback

from celery import task
from celery import current_app
from django.utils import timezone
from pipeline.component_framework.library import ComponentLibrary
from pipeline.conf.default_settings import PLUGIN_EXECUTE_QUEUE
Expand All @@ -25,7 +25,7 @@
logger = logging.getLogger("celery")


@task
@current_app.task
def execute(task_id):
try:
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
Expand Down Expand Up @@ -89,7 +89,7 @@ def execute(task_id):
)


@task
@current_app.task
def schedule(task_id):
try:
plugin_execute_task = PluginExecuteTask.objects.get(id=task_id)
Expand Down
6 changes: 3 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/rollback/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import logging

from celery import task
from celery import current_app
from django.conf import settings
from django.db import transaction
from pipeline.conf.default_settings import ROLLBACK_QUEUE
Expand Down Expand Up @@ -287,7 +287,7 @@ def rollback(self):
raise e


@task
@current_app.task
def token_rollback(snapshot_id, node_id, retry=False, retry_data=None):
"""
snapshot_id 本次回滚的快照id
Expand All @@ -296,6 +296,6 @@ def token_rollback(snapshot_id, node_id, retry=False, retry_data=None):
TokenRollbackTaskHandler(snapshot_id=snapshot_id, node_id=node_id, retry=retry, retry_data=retry_data).rollback()


@task
@current_app.task
def any_rollback(snapshot_id):
AnyRollbackHandler(snapshot_id=snapshot_id).rollback()
6 changes: 3 additions & 3 deletions runtime/bamboo-pipeline/pipeline/contrib/statistics/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import ujson as json
from copy import deepcopy

from celery import task
from celery import current_app
from bamboo_engine import api as bamboo_engine_api

from pipeline.component_framework.constants import LEGACY_PLUGINS_VERSION
Expand Down Expand Up @@ -112,7 +112,7 @@ def recursive_collect_components(activities, status_tree, instance_id, stack=Non
return component_list


@task
@current_app.task
def pipeline_post_save_statistics_task(instance_id):
instance = PipelineInstance.objects.get(instance_id=instance_id)
# 统计流程标准插件个数,子流程个数,网关个数
Expand All @@ -134,7 +134,7 @@ def pipeline_post_save_statistics_task(instance_id):
)


@task
@current_app.task
def pipeline_archive_statistics_task(instance_id):
instance = PipelineInstance.objects.get(instance_id=instance_id)
engine_ver = 1
Expand Down
Loading
Loading