Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 上云周期任务添加最低任务间隔时间配置 --story=120737215 #7646

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,11 @@ def check_engine_admin_permission(request, *args, **kwargs):
# 周期任务消息通知类型
PERIODIC_TASK_REMINDER_NOTIFY_TYPE = env.PERIODIC_TASK_REMINDER_NOTIFY_TYPE

# 周期任务最短时间间隔
PERIODIC_TASK_SHORTEST_TIME = env.PERIODIC_TASK_SHORTEST_TIME
# 周期任务迭代次数
PERIODIC_TASK_ITERATION = env.PERIODIC_TASK_ITERATION

# bk_audit
ENABLE_BK_AUDIT = True if env.BK_AUDIT_DATA_TOKEN else False
BK_AUDIT_SETTINGS = {
Expand Down
5 changes: 5 additions & 0 deletions env.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@
# 周期任务消息通知类型
PERIODIC_TASK_REMINDER_NOTIFY_TYPE = json.loads(os.getenv("PERIODIC_TASK_REMINDER_NOTIFY_TYPE", '["email"]'))

# 周期任务最短时间间隔,以分钟为单位,例如:30,0 表示无限制
PERIODIC_TASK_SHORTEST_TIME = int(os.getenv("PERIODIC_TASK_SHORTEST_TIME", 0))
# 周期任务迭代次数
PERIODIC_TASK_ITERATION = int(os.getenv("PERIODIC_TASK_ITERATION", 10))

# bk_audit
BK_AUDIT_ENDPOINT = os.getenv("BK_AUDIT_ENDPOINT", None)
BK_AUDIT_DATA_TOKEN = os.getenv("BK_AUDIT_DATA_TOKEN", None)
Expand Down
30 changes: 27 additions & 3 deletions gcloud/core/apis/drf/serilaziers/periodic_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from rest_framework.validators import ValidationError

import env
from gcloud.utils.strings import inspect_time
from gcloud.conf import settings
from gcloud.constants import PROJECT
from gcloud.core.apis.drf.serilaziers.project import ProjectSerializer
from gcloud.core.models import Project, ProjectConfig
Expand Down Expand Up @@ -152,9 +154,27 @@ def check_cron_params(cron, project):
raise ValidationError("周期任务时间格式过长")


class CreatePeriodicTaskSerializer(serializers.ModelSerializer):
project = serializers.IntegerField(write_only=True)
class CronFieldSerializer(serializers.Serializer):
cron = serializers.DictField(write_only=True)

def inspect_cron(self, cron):
minute = cron.get("minute", "*")
hour = cron.get("hour", "*")
day_of_month = cron.get("day_of_month", "*")
month = cron.get("month", "*")
day_of_week = cron.get("day_of_week", "*")

cron_expression = f"{minute} {hour} {day_of_month} {month} {day_of_week}"

result = inspect_time(cron_expression, settings.PERIODIC_TASK_SHORTEST_TIME, settings.PERIODIC_TASK_ITERATION)
if not result:
raise serializers.ValidationError(
"The interval between tasks should be at least {} minutes".format(settings.PERIODIC_TASK_SHORTEST_TIME)
)


class CreatePeriodicTaskSerializer(CronFieldSerializer, serializers.ModelSerializer):
project = serializers.IntegerField(write_only=True)
template_source = serializers.CharField(required=False, default=PROJECT)
pipeline_tree = ReadWriteSerializerMethodField()
template_scheme_ids = ReadWriteSerializerMethodField()
Expand Down Expand Up @@ -193,19 +213,23 @@ def validate_project(self, value):

def validate(self, attrs):
check_cron_params(attrs.get("cron"), attrs.get("project"))
if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser:
self.inspect_cron(attrs.get("cron"))
return attrs

class Meta:
model = PeriodicTask
fields = ["project", "cron", "name", "template_id", "pipeline_tree", "template_source", "template_scheme_ids"]


class PatchUpdatePeriodicTaskSerializer(serializers.Serializer):
class PatchUpdatePeriodicTaskSerializer(CronFieldSerializer, serializers.Serializer):
cron = serializers.DictField(help_text="周期", required=False)
project = serializers.IntegerField(help_text="项目ID", required=False)
constants = serializers.DictField(help_text="执行参数", required=False)
name = serializers.CharField(help_text="任务名", required=False)

def validate(self, attrs):
check_cron_params(attrs.get("cron"), attrs.get("project"))
if settings.PERIODIC_TASK_SHORTEST_TIME and not self.context["request"].user.is_superuser:
self.inspect_cron(attrs.get("cron"))
return attrs
6 changes: 3 additions & 3 deletions gcloud/core/apis/drf/viewsets/periodic_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def destroy(self, request, *args, **kwargs):
return super(PeriodicTaskViewSet, self).destroy(request, *args, **kwargs)

def create(self, request, *args, **kwargs):
serializer = CreatePeriodicTaskSerializer(data=request.data)
serializer = CreatePeriodicTaskSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
try:
self._handle_serializer(request, serializer)
Expand All @@ -270,7 +270,7 @@ def create(self, request, *args, **kwargs):

def update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = CreatePeriodicTaskSerializer(instance, data=request.data)
serializer = CreatePeriodicTaskSerializer(instance, data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
try:
self._handle_serializer(request, serializer)
Expand All @@ -287,7 +287,7 @@ def update(self, request, *args, **kwargs):

def partial_update(self, request, *args, **kwargs):
instance = self.get_object()
serializer = PatchUpdatePeriodicTaskSerializer(data=request.data)
serializer = PatchUpdatePeriodicTaskSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)

with transaction.atomic():
Expand Down
41 changes: 41 additions & 0 deletions gcloud/tests/utils/strings/test_inspect_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017-2020 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from django.test import TestCase

from gcloud.utils.strings import inspect_time


class InspectTimeTestCase(TestCase):
def test_inspect_time(self):
cron = "* * * * *"
shortest_time = 30
iter_count = 10
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_fail_inspect_time(self):
cron = "*/15 * * * *"
shortest_time = 30
iter_count = 10
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_success_inspect_time(self):
cron = "15 2 * * *"
shortest_time = 30
iter_count = 10
self.assertTrue(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))

def test_iter_count_inspect_time(self):
cron = "*/15 * * * *"
shortest_time = 30
iter_count = 100
self.assertFalse(inspect_time(cron=cron, shortest_time=shortest_time, iter_count=iter_count))
24 changes: 23 additions & 1 deletion gcloud/utils/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

import re
import logging

from croniter import croniter
from datetime import datetime, timedelta
from gcloud.constants import TEMPLATE_NODE_NAME_MAX_LENGTH, AE

logger = logging.getLogger("root")
Expand Down Expand Up @@ -116,3 +117,24 @@ def django_celery_beat_cron_time_format_fit(cron_str):
cron_config = {time_format: cron_time for time_format, cron_time in zip(time_formats, cron_times)}
result_cron_list = [cron_config[unit] for unit in unit_order] + ["({})".format("/".join(unit_order))] + time_zone
return " ".join(result_cron_list).strip()


def inspect_time(cron, shortest_time, iter_count):
"""检查定时任务时间间隔是否符合要求
:param cron: 定时任务配置
:type cron str
:param shortest_time: 最短时间间隔,以分钟为单位,例如 30
:type shortest_time int
:param iter_count: 迭代次数
:type iter_count int
"""

schedule_iter = croniter(cron)
# 计算指定次数内的最短时间间隔
next_times = [schedule_iter.get_next(datetime) for _ in range(iter_count)]
min_interval = min((next_times[i] - next_times[i - 1] for i in range(1, len(next_times))))

if min_interval < timedelta(minutes=shortest_time):
return False

return True
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ opentelemetry-instrumentation-logging==0.30b1
opentelemetry-instrumentation-requests==0.30b1

bk-notice-sdk==1.3.0
croniter==1.4.1
Loading