Skip to content

Commit

Permalink
feat: 超大订阅方案 (closed #2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Dec 6, 2024
1 parent e01e2eb commit d7844cb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 14 deletions.
9 changes: 5 additions & 4 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
from django.db.models.functions import Concat
from django.utils import timezone
from django.utils.translation import ugettext as _
from django_redis import get_redis_connection

from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.api.constants import POLLING_TIMEOUT
from apps.backend.constants import ActionNameType
from apps.backend.subscription import errors
from apps.backend.utils.redis import REDIS_INST
from apps.core.files.storage import get_storage
from apps.exceptions import parse_exception
from apps.node_man import constants, models
Expand Down Expand Up @@ -271,7 +271,7 @@ class CommonData:
class RedisCommonData:
def __init__(self, *args, **kwargs):
self.uuid_key = uuid.uuid4().hex
self.client = get_redis_connection()
self.client = REDIS_INST

for k, v in dict(*args, **kwargs).items():
self.client.hset(self.uuid_key, k, str(pickle.dumps(v)))
Expand Down Expand Up @@ -341,7 +341,6 @@ class BaseService(Service, LogMixin, DBHelperMixin, PollingTimeoutMixin):
failed_subscription_instance_id_reason_map: Optional[Dict[int, Any]] = None
# 日志制作类实例
log_maker: Optional[LogMaker] = None
# is_multi_paralle_gateway: bool = False

def __init__(self, *args, **kwargs):
self.failed_subscription_instance_id_reason_map: Dict = {}
Expand Down Expand Up @@ -686,7 +685,9 @@ def inputs_format(self):
),
Service.InputItem(name="subscription_step_id", key="subscription_step_id", type="int", required=True),
Service.InputItem(name="blueking_language", key="blueking_language", type="str", required=True),
Service.InputItem(name="is_multi_paralle_gateway", key="is_multi_paralle_gateway", type="bool", required=True),
Service.InputItem(
name="is_multi_paralle_gateway", key="is_multi_paralle_gateway", type="bool", required=True
),
]

def outputs_format(self):
Expand Down
13 changes: 8 additions & 5 deletions apps/backend/subscription/steps/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def handle_uninstall_instances(
def handle_new_add_instances(
self,
install_action: str,
instances: RedisDict,
instances: Union[RedisDict, Dict[str, Dict]],
instance_actions: Dict[str, str],
bk_host_id__host_map: Dict[int, models.Host],
group_id__host_key__proc_status_map: Dict[str, Dict[str, models.ProcessStatus]],
Expand Down Expand Up @@ -441,7 +441,7 @@ def handle_exceed_max_retry_times_instances(

def handle_manual_op_instances(
self,
instances: RedisDict,
instances: Union[RedisDict, Dict[str, Dict]],
instance_actions: Dict[str, str],
auto_trigger: bool,
push_migrate_reason_func: Callable,
Expand Down Expand Up @@ -492,7 +492,10 @@ def handle_manual_op_instances(
)

def handle_not_change_instances(
self, instances: RedisDict, migrate_reasons: Dict[str, Dict], push_migrate_reason_func: Callable
self,
instances: Union[RedisDict, Dict[str, Dict]],
migrate_reasons: Dict[str, Dict],
push_migrate_reason_func: Callable,
):
"""
处理无需变更实例,请在最后调用该钩子
Expand Down Expand Up @@ -584,7 +587,7 @@ def handle_check_and_skip_instances(
instance_actions: Dict[str, str],
push_migrate_reason_func: Callable,
bk_host_id__host_map: Dict[int, models.Host],
instances: RedisDict,
instances: Union[RedisDict, Dict[str, Dict]],
):
"""
插件状态及版本检查,确定是否执行安装
Expand Down Expand Up @@ -714,7 +717,7 @@ def get_action_dict(self) -> Dict[str, str]:

def make_instances_migrate_actions(
self,
instances: RedisDict,
instances: Union[RedisDict, Dict[str, Dict]],
auto_trigger: bool = False,
preview_only: bool = False,
**kwargs,
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def wrapper(subscription: models.Subscription, subscription_task: models.Subscri
def create_task(
subscription: models.Subscription,
subscription_task: models.SubscriptionTask,
instances: RedisDict,
instances: Union[RedisDict, Dict[str, Dict]],
instance_actions: Dict[str, Dict[str, str]],
preview_only: bool = False,
data_backend: str = None,
Expand Down
8 changes: 4 additions & 4 deletions apps/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

from _collections_abc import dict_keys
from django.conf import settings
from django_redis import get_redis_connection

from apps.backend.utils.redis import REDIS_INST
from apps.node_man.constants import DataBackend, DCReturnType

# 过期时间为1小时
Expand All @@ -27,7 +27,7 @@

class RedisHashScanner:
def __init__(self, key: str, match: Optional[str] = None, count: int = REDIS_CACHE_DATA_LENGTH):
self.redis_client = get_redis_connection()
self.redis_client = REDIS_INST
self.key = key
self.match = match
self.count = count
Expand All @@ -51,7 +51,7 @@ def __iter__(self) -> Generator[Tuple[str, str], None, None]:

class RedisListIterator:
def __init__(self, key: str, batch_size: int = REDIS_CACHE_DATA_LENGTH):
self.redis_client = get_redis_connection()
self.redis_client = REDIS_INST
self.key = key
self.batch_size = batch_size

Expand All @@ -69,7 +69,7 @@ def __iter__(self) -> Generator[Tuple[str, str], None, None]:
class RedisDataBase:
def __init__(self):
self.uuid_key = f"{uuid.uuid4().hex}"
self.client = get_redis_connection()
self.client = REDIS_INST

def _update_redis_expiry(self):
self.client.expire(self.uuid_key, REDIS_CACHE_DATA_TIMEOUT)
Expand Down

0 comments on commit d7844cb

Please sign in to comment.