Skip to content

Commit

Permalink
feat: 优化订阅任务按顺序执行而非抛错 (closed #2447)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa authored and wyyalt committed Nov 21, 2024
1 parent f14f01b commit 1305056
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 115 deletions.
20 changes: 20 additions & 0 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]:
# redis Gse Agent 配置缓存
REDIS_AGENT_CONF_KEY_TPL = f"{settings.APP_CODE}:backend:agent:config:" + "{file_name}:str:{sub_inst_id}"

# 更新订阅参数储存redis键名模板
UPDATE_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:update_subscription:params"

# 执行订阅参数储存redis键名模板
RUN_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:run_subscription:params"


class SubscriptionSwithBizAction(enum.EnhanceEnum):
ENABLE = "enable"
Expand Down Expand Up @@ -166,3 +172,17 @@ def needs_batch_request(self) -> bool:
DEFAULT_CLEAN_RECORD_LIMIT = 5000

POWERSHELL_SERVICE_CHECK_SSHD = "powershell -c Get-Service -Name sshd"

# 处理更新订阅任务间隔
UPDATE_SUBSCRIPTION_TASK_INTERVAL = 2 * 60

# 处理执行订阅任务间隔
RUN_SUBSCRIPTION_TASK_INTERVAL = 3 * 60
# 处理卸载残留订阅任务间隔
HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL = 6 * 60 * 60

# 最大订阅任务数量
MAX_SUBSCRIPTION_TASK_COUNT = 50

# 订阅删除时间小时数
SUBSCRIPTION_DELETE_HOURS = 6
156 changes: 156 additions & 0 deletions apps/backend/periodic_tasks/schedule_running_subscription_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import json
from datetime import timedelta
from typing import Any, Dict, List, Set

from celery.task import periodic_task
from django.db.models import QuerySet
from django.utils import timezone

from apps.backend import constants
from apps.backend.subscription.handler import SubscriptionHandler
from apps.backend.utils.redis import REDIS_INST
from apps.node_man import constants as node_man_constants
from apps.node_man import models
from common.log import logger


def get_need_clean_subscription_app_code():
"""
获取配置需要清理的appcode
"""
app_codes: List[str] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.NEED_CLEAN_SUBSCRIPTION_APP_CODE.value, default=[]
)
return app_codes


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_update_subscription():
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
# 先计算出要从redis取数据的长度
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
# 从redis中取出对应长度的数据
update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
# 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失
REDIS_INST.ltrim(name, 0, -length - 1)
# 翻转数据,先进的数据先处理
update_params.reverse()
results = []
if not update_params:
return
for update_param in update_params:
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(update_param.decode())
subscription_id = params["subscription_id"]
try:
result: Dict[str, int] = SubscriptionHandler.update_subscription(params=params)
except Exception as e:
logger.exception(f"{subscription_id} update subscription failed with error: {e}")
result = {"subscription_id": subscription_id, "update_result": False}
results.append(result)
logger.info(f"update subscription with results: {results}, length -> {len(results)} ")


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_run_subscription():
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
REDIS_INST.ltrim(name, 0, -length - 1)
run_params.reverse()
results = []
if not run_params:
return
for run_param in run_params:
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(run_param.decode())
subscription_id = params["subscription_id"]
scope = params["scope"]
actions = params["actions"]
try:
result: Dict[str, int] = SubscriptionHandler(subscription_id).run(scope=scope, actions=actions)
except Exception as e:
logger.exception(f"{subscription_id} run subscription failed with error: {e}")
result = {"subscription_id": subscription_id, "run_result": False}
results.append(result)
logger.info(f"run subscription with results: {results}, length -> {len(results)}")


@periodic_task(
run_every=constants.HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL,
queue="default",
options={"queue": "default"},
)
def clean_deleted_subscription():
"""
清理被删除且有卸载残留的订阅
"""
query_kwargs: Dict[str, Any] = {
"is_deleted": True,
"from_system": "bkmonitorv3",
"deleted_time__range": (
timezone.now() - timedelta(hours=constants.SUBSCRIPTION_DELETE_HOURS),
timezone.now(),
),
}

# 卸载有残留的订阅开启订阅巡检的生命周期允许为12h,需要再次设置为软删,减少资源消耗
again_delete_query_kwargs: Dict[str, Any] = {
"enable": True,
"from_system": "bkmonitorv3",
"deleted_time__range": (
timezone.now() - timedelta(hours=3 * constants.SUBSCRIPTION_DELETE_HOURS),
timezone.now() - timedelta(hours=2 * constants.SUBSCRIPTION_DELETE_HOURS),
),
}

app_codes = get_need_clean_subscription_app_code()
if app_codes:
query_kwargs.pop("from_system")
query_kwargs["from_system__in"] = app_codes
again_delete_query_kwargs.pop("from_system")
again_delete_query_kwargs["from_system__in"] = app_codes
need_reset_deleted_subscription_qs: QuerySet = models.Subscription.objects.filter(**again_delete_query_kwargs)
if need_reset_deleted_subscription_qs.exists():
# 使用update方法,不会刷新删除时间
need_reset_deleted_subscription_qs.update(enable=False, is_deleted=True)
changed_subscription_ids = list(need_reset_deleted_subscription_qs.values_list("id", flat=True))
# 记录再次被软删除的订阅ID
logger.info(
f"reset subscription{changed_subscription_ids} is_deleted, length -> {len(changed_subscription_ids)}"
)
# 查询6个小时内被删除的订阅
subscription_qs: QuerySet = models.Subscription.objects.filter(**query_kwargs)

if not subscription_qs.exists():
# 没有被删除的订阅
return
# 被删除的订阅ID
deleted_subscription_ids: Set[int] = set(subscription_qs.values_list("id", flat=True))
# 被删除且卸载残留(失败)的订阅
failed_subscription_qs: QuerySet = models.SubscriptionInstanceRecord.objects.filter(
subscription_id__in=deleted_subscription_ids, is_latest=True, status=node_man_constants.StatusType.FAILED
)
if not failed_subscription_qs.exists():
# 没有失败的订阅实例
return
# 被删除且有卸载残留的订阅ID
failed_subscription_ids: Set[int] = set(failed_subscription_qs.values_list("subscription_id", flat=True))
# 将订阅下的实例更新为空,并且开启订阅巡检
models.Subscription.objects.filter(id__in=failed_subscription_ids, is_deleted=True).update(
nodes=[], is_deleted=False, enable=True
)

logger.info(
f"set {failed_subscription_ids} nodes be null and enable auto trigger, length -> {len(failed_subscription_ids)}"
)
6 changes: 6 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ class SubscriptionIncludeGrayBizError(AppBaseException):
ERROR_CODE = 19
MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")
MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")


class SubscriptionNotDeletedCantOperateError(AppBaseException):
ERROR_CODE = 20
MESSAGE = _("订阅未被删除,无法操作")
MESSAGE_TPL = _("订阅ID:{subscription_id}未被删除,无法进行清理操作,可增加参数is_force=true强制操作")
132 changes: 127 additions & 5 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
from __future__ import absolute_import, unicode_literals

import json
import logging
import random
from collections import Counter, defaultdict
Expand All @@ -18,13 +19,15 @@

from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from django.db.models import Max, Q, QuerySet, Value
from django.utils.translation import get_language
from django.utils.translation import ugettext as _

from apps.backend import constants as backend_constants
from apps.backend.subscription import errors, task_tools, tasks, tools
from apps.backend.subscription.errors import InstanceTaskIsRunning
from apps.backend.utils.pipeline_parser import PipelineParser
from apps.backend.utils.redis import REDIS_INST
from apps.core.concurrent import controller
from apps.node_man import constants, models
from apps.utils import concurrent
Expand Down Expand Up @@ -432,17 +435,19 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i
subscription = models.Subscription.objects.get(id=self.subscription_id)
except models.Subscription.DoesNotExist:
raise errors.SubscriptionNotExist({"subscription_id": self.subscription_id})

if subscription.is_running():
raise InstanceTaskIsRunning()

if tools.check_subscription_is_disabled(
subscription_identity=f"subscription -> [{subscription.id}]",
scope=subscription.scope,
steps=subscription.steps,
):
raise errors.SubscriptionIncludeGrayBizError()

if subscription.is_running():
params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions})
REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params)
logger.info(f"run subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

subscription_task = models.SubscriptionTask.objects.create(
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
Expand Down Expand Up @@ -670,3 +675,120 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) ->
result.append({"subscription_id": subscription.id, "instances": subscription_result})

return result

def clean_subscription(self, execute_actions: Dict[str, str]):
"""
:param execute_actions: {"bk-beat": "STOP", "exporter": "STOP"}
"""
try:
# 3.调用执行订阅的方法
result = self.run(actions=execute_actions)
except Exception as e:
result = {"result": False, "message": str(e)}
# 4.删除订阅,使用delete()方法才会记录删除时间
models.Subscription.objects.filter(id=self.subscription_id).delete()
return result

@staticmethod
def update_subscription(params: Dict[str, Any]):
scope = params["scope"]
try:
subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False)
except models.Subscription.DoesNotExist:
raise errors.SubscriptionNotExist({"subscription_id": params["subscription_id"]})
# 更新订阅不在序列化器中做校验,因为获取更新订阅的类型 step 需要查一次表
if tools.check_subscription_is_disabled(
subscription_identity=f"subscription -> [{subscription.id}]",
steps=subscription.steps,
scope=scope,
):
raise errors.SubscriptionIncludeGrayBizError()
if subscription.is_running():
REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

with transaction.atomic():
subscription.name = params.get("name", "")
subscription.node_type = scope["node_type"]
subscription.nodes = scope["nodes"]
subscription.bk_biz_id = scope.get("bk_biz_id")
# 避免空列表误判
if scope.get("instance_selector") is not None:
subscription.instance_selector = scope["instance_selector"]
# 策略部署新增
subscription.plugin_name = params.get("plugin_name")
subscription.bk_biz_scope = params.get("bk_biz_scope")
# 指定操作进程用户新增
if params.get("system_account"):
params["operate_info"].insert(0, params["system_account"])
subscription.operate_info = params["operate_info"]
subscription.save()

step_ids: Set[str] = set()
step_id__obj_map: Dict[str, models.SubscriptionStep] = {
step_obj.step_id: step_obj for step_obj in subscription.steps
}
step_objs_to_be_created: List[models.SubscriptionStep] = []
step_objs_to_be_updated: List[models.SubscriptionStep] = []

for index, step_info in enumerate(params["steps"]):

if step_info["id"] in step_id__obj_map:
# 存在则更新
step_obj: models.SubscriptionStep = step_id__obj_map[step_info["id"]]
step_obj.params = step_info["params"]
if "config" in step_info:
step_obj.config = step_info["config"]
step_obj.index = index
step_objs_to_be_updated.append(step_obj)
else:
# 新增场景
try:
step_obj_to_be_created: models.SubscriptionStep = models.SubscriptionStep(
subscription_id=subscription.id,
index=index,
step_id=step_info["id"],
type=step_info["type"],
config=step_info["config"],
params=step_info["params"],
)
except KeyError as e:
logger.warning(
f"update subscription[{subscription.id}] to add step[{step_info['id']}] error: "
f"err_msg -> {e}"
)
raise errors.SubscriptionUpdateError(
{
"subscription_id": subscription.id,
"msg": _("新增订阅步骤[{step_id}] 需要提供 type & config,错误信息 -> {err_msg}").format(
step_id=step_info["id"], err_msg=e
),
}
)
step_objs_to_be_created.append(step_obj_to_be_created)
step_ids.add(step_info["id"])

# 删除更新后不存在的 step
models.SubscriptionStep.objects.filter(
subscription_id=subscription.id, step_id__in=set(step_id__obj_map.keys()) - step_ids
).delete()
models.SubscriptionStep.objects.bulk_update(step_objs_to_be_updated, fields=["config", "params", "index"])
models.SubscriptionStep.objects.bulk_create(step_objs_to_be_created)
# 更新 steps 需要移除缓存
if hasattr(subscription, "_steps"):
delattr(subscription, "_steps")

result = {"subscription_id": subscription.id}

run_immediately = params["run_immediately"]
if run_immediately:
subscription_task = models.SubscriptionTask.objects.create(
subscription_id=subscription.id, scope=subscription.scope, actions={}
)
tasks.run_subscription_task_and_create_instance.delay(
subscription, subscription_task, language=get_language()
)
result["task_id"] = subscription_task.id

return result
6 changes: 6 additions & 0 deletions apps/backend/subscription/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,9 @@ class QueryHostSubscriptionsSerializer(TargetHostSerializer):
class SubscriptionSwitchBizSerializer(serializers.Serializer):
bk_biz_ids = serializers.ListField(child=serializers.IntegerField())
action = serializers.ChoiceField(choices=SubscriptionSwithBizAction.list_choices())


class ClearnSubscriptionSerializer(serializers.Serializer):
subscription_id_list = serializers.ListField(required=True, label=_("订阅ID列表"), child=serializers.IntegerField())
action_type = serializers.ChoiceField(choices=constants.OpType, default="STOP", label=_("执行动作类型"))
is_force = serializers.BooleanField(default=False, label=_("是否强制清理"))
Loading

0 comments on commit 1305056

Please sign in to comment.