Skip to content

Commit

Permalink
fix: 修复差量同步主机忽略内网IP变更场景 (closed #2504)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed Dec 6, 2024
1 parent 847a5d7 commit 29ca544
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 20 deletions.
20 changes: 20 additions & 0 deletions apps/mock_data/views_mkd/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@
],
"retention": 1,
}
# 因为MockClient中接口数据定义死了,如需调用使用,深拷贝后update原数据
JOB_REINSTALL_REQUEST_PARAMS = {
"job_type": constants.JobType.REINSTALL_AGENT,
"hosts": [
{
"bk_host_id": 14110,
"bk_cloud_id": constants.DEFAULT_CLOUD,
"ap_id": constants.DEFAULT_AP_ID,
"install_channel_id": None,
"bk_biz_id": 100001,
"os_type": constants.OsType.LINUX,
"inner_ip": host.DEFAULT_IP,
"inner_ipv6": host.DEFAULT_IPV6,
"account": constants.LINUX_ACCOUNT,
"port": settings.BKAPP_DEFAULT_SSH_PORT,
"auth_type": constants.AuthType.PASSWORD,
"password": "password",
}
],
}


JOB_OPERATE_REQUEST_PARAMS = {"job_type": constants.JobType.REINSTALL_AGENT, "bk_host_id": [host.DEFAULT_HOST_ID]}
Expand Down
79 changes: 65 additions & 14 deletions apps/node_man/periodic_tasks/sync_cmdb_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import ipaddress
import math
import typing

from celery.schedules import crontab
from celery.task import periodic_task
from django.conf import settings
from django.db import transaction
from django.db.models import Q

from apps.backend.celery import app
from apps.backend.utils.redis import REDIS_INST
Expand Down Expand Up @@ -374,28 +376,49 @@ def update_or_create_host_base(biz_id, ap_map_config, is_gse2_gray, task_id, cmd


def sync_biz_incremental_hosts(
bk_biz_id: int, ap_map_config: SyncHostApMapConfig, expected_bk_host_ids: typing.Iterable[int], is_gse2_gray: bool
bk_biz_id: int,
ap_map_config: SyncHostApMapConfig,
expected_bk_host_ids: typing.Iterable[int],
is_gse2_gray: bool,
inner_ips: typing.List[str],
):
"""
同步业务增量主机
:param bk_biz_id: 业务ID
:param ap_map_config:
:param expected_bk_host_ids: 期望得到的主机ID列表
:param is_gse2_gray:
:param inner_ips:内网IPv4/IPv6列表
:return:
"""
logger.info(
f"[sync_cmdb_host] sync_biz_incremental_hosts: "
f"bk_biz_id -> {bk_biz_id}, expected_bk_host_ids -> {expected_bk_host_ids}"
)
expected_bk_host_ids: typing.Set[int] = set(expected_bk_host_ids)
exists_host_ids: typing.Set[int] = set(
models.Host.objects.filter(bk_biz_id=bk_biz_id, bk_host_id__in=expected_bk_host_ids).values_list(
"bk_host_id", flat=True
common_query_conditions = Q(bk_biz_id=bk_biz_id) & Q(bk_host_id__in=expected_bk_host_ids)
if not inner_ips:
exists_host_ids: typing.Set[int] = set(
models.Host.objects.filter(common_query_conditions).values_list("bk_host_id", flat=True)
)
else:
ipv4_list, ipv6_list = filter_ipv4_and_ipv6(inner_ips)
# 仅存在IPv4
if ipv4_list and not ipv6_list:
query_conditions = common_query_conditions & Q(inner_ip__in=ipv4_list)
# 仅存在IPv6
elif ipv6_list and not ipv4_list:
query_conditions = common_query_conditions & Q(inner_ipv6__in=ipv6_list)
# 两者都存在
else:
query_conditions = common_query_conditions & (Q(inner_ip__in=ipv4_list) | Q(inner_ipv6__in=ipv6_list))

exists_host_ids: typing.Set[int] = set(
models.Host.objects.filter(query_conditions).values_list("bk_host_id", flat=True)
)
)
# 计算出对比本地主机缓存,增量的主机 ID
incremental_host_ids: typing.List[int] = list(expected_bk_host_ids - exists_host_ids)
logger.info(f"need sync hosts: {incremental_host_ids}, length -> {len(incremental_host_ids)}")
# 尝试获取增量主机信息
hosts: typing.List[typing.Dict] = query_biz_hosts(bk_biz_id=bk_biz_id, bk_host_ids=incremental_host_ids)
# 更新本地缓存
Expand All @@ -408,25 +431,31 @@ def sync_biz_incremental_hosts(
)


def bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]]):
def bulk_differential_sync_biz_hosts(
expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]],
inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = None,
):
"""
并发同步增量主机
:param expected_bk_host_ids_gby_bk_biz_id: 按业务ID聚合主机ID列表
:param inner_ips_gby_bk_biz_id: 按业务ID聚合主机内网IP列表
:return:
"""
params_list: typing.List[typing.Dict] = []
ap_map_config: SyncHostApMapConfig = get_sync_host_ap_map_config()
gray_tools: GrayTools = GrayTools()
# TODO 开始跳跃
for bk_biz_id, bk_host_ids in expected_bk_host_ids_gby_bk_biz_id.items():
params_list.append(
{
"bk_biz_id": bk_biz_id,
"ap_map_config": ap_map_config,
"expected_bk_host_ids": bk_host_ids,
"is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id),
}
)
params = {
"bk_biz_id": bk_biz_id,
"ap_map_config": ap_map_config,
"expected_bk_host_ids": bk_host_ids,
"is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id),
"inner_ips": None,
}
if inner_ips_gby_bk_biz_id:
params["inner_ips"] = inner_ips_gby_bk_biz_id.get(bk_biz_id)
params_list.append(params)
batch_call(func=sync_biz_incremental_hosts, params_list=params_list)


Expand Down Expand Up @@ -618,3 +647,25 @@ def clear_need_delete_host_ids_task():
"""
task_id = clear_need_delete_host_ids_task.request.id
clear_need_delete_host_ids(task_id)


def filter_ipv4_and_ipv6(ip_list):
"""
过滤出列表中的IPv4、IPv6地址
:param ip_list: 包含IP地址的列表
:return: 包含IPv4、IPv6地址的列表
"""
ipv4_list = []
ipv6_list = []
for ip in ip_list:
try:
# 尝试将字符串解析为IP地址对象
ip_obj = ipaddress.ip_address(ip)
if isinstance(ip_obj, ipaddress.IPv4Address):
ipv4_list.append(ip)
if isinstance(ip_obj, ipaddress.IPv6Address):
ipv6_list.append(ip)
except ValueError:
# 如果解析失败,则跳过该IP地址
continue
return ipv4_list, ipv6_list
10 changes: 9 additions & 1 deletion apps/node_man/serializers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def validate(self, attrs):

bk_biz_ids = set()
expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.List[int]] = defaultdict(list)
inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = defaultdict(list)
cipher = tools.HostTools.get_asymmetric_cipher()
fields_need_decrypt = ["password", "key"]
# 密码解密
Expand All @@ -325,10 +326,17 @@ def validate(self, attrs):
if "bk_biz_id" not in host:
raise ValidationError(_("主机信息缺少业务ID(bk_biz_id)"))
expected_bk_host_ids_gby_bk_biz_id[host["bk_biz_id"]].append(host["bk_host_id"])
if host.get("inner_ip"):
inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ip"])
elif host.get("inner_ipv6"):
inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ipv6"])

if attrs["op_type"] not in [constants.OpType.INSTALL, constants.OpType.REPLACE]:
# 差量同步主机
bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id)
bulk_differential_sync_biz_hosts(
expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id,
inner_ips_gby_bk_biz_id=inner_ips_gby_bk_biz_id,
)

set_agent_setup_info_to_attrs(attrs)

Expand Down
6 changes: 3 additions & 3 deletions apps/node_man/tests/test_handlers/test_install_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def test_install_channel_hidden(self):
hidden=True,
)

self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 10)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 10)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 12)
32 changes: 30 additions & 2 deletions apps/node_man/tests/test_views/test_job_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from apps.mock_data.common_unit import host
from apps.mock_data.views_mkd import job
from apps.node_man import constants
from apps.node_man.models import Host
from apps.node_man.tests.utils import Subscription
from apps.node_man.models import Host, IdentityData
from apps.node_man.tests.utils import MockClient, Subscription
from apps.utils.unittest.testcase import CustomAPITestCase, MockSuperUserMixin


Expand Down Expand Up @@ -94,3 +94,31 @@ def generate_install_job_request_params():
data["hosts"][0]["outer_ip"] = ""
data["hosts"][0]["outer_ipv6"] = ""
return data


class TestHostInfoNotUpdateCase(MockSuperUserMixin, CustomAPITestCase):
def setUp(self) -> None:
Host.objects.update_or_create(
defaults={
"bk_cloud_id": constants.DEFAULT_CLOUD,
"node_type": constants.NodeType.AGENT,
"bk_biz_id": 100001,
"inner_ip": host.DEFAULT_IP,
},
bk_host_id=14110,
)
identify_data = copy.deepcopy(host.IDENTITY_MODEL_DATA)
identify_data["bk_host_id"] = 14110
IdentityData.objects.create(**identify_data)
return super().setUp()

@patch("apps.node_man.handlers.job.JobHandler.create_subscription", Subscription.create_subscription)
@patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient)
def test_install(self):
data = copy.deepcopy(job.JOB_REINSTALL_REQUEST_PARAMS)
data["hosts"][0]["inner_ip"] = "2.1.2.52"

response = self.client.post(path="/api/job/install/", data=data)
# 成功创建安装任务
self.assertEqual(response["result"], True)
self.assertEqual(type(response["data"]["job_id"]), int)

0 comments on commit 29ca544

Please sign in to comment.