Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 修复差量同步主机忽略内网IP变更场景 (closed #2504) #2509

Open
wants to merge 1 commit into
base: v2.4.8-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions apps/mock_data/views_mkd/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@
],
"retention": 1,
}
# 因为MockClient中接口数据定义死了,如需调用使用,深拷贝后update原数据
JOB_REINSTALL_REQUEST_PARAMS = {
"job_type": constants.JobType.REINSTALL_AGENT,
"hosts": [
{
"bk_host_id": 14110,
"bk_cloud_id": constants.DEFAULT_CLOUD,
"ap_id": constants.DEFAULT_AP_ID,
"install_channel_id": None,
"bk_biz_id": 100001,
"os_type": constants.OsType.LINUX,
"inner_ip": host.DEFAULT_IP,
"inner_ipv6": host.DEFAULT_IPV6,
"account": constants.LINUX_ACCOUNT,
"port": settings.BKAPP_DEFAULT_SSH_PORT,
"auth_type": constants.AuthType.PASSWORD,
"password": "password",
}
],
}


JOB_OPERATE_REQUEST_PARAMS = {"job_type": constants.JobType.REINSTALL_AGENT, "bk_host_id": [host.DEFAULT_HOST_ID]}
Expand Down
66 changes: 53 additions & 13 deletions apps/node_man/periodic_tasks/sync_cmdb_host.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import ipaddress
import math
import typing

from celery.schedules import crontab
from celery.task import periodic_task
from django.conf import settings
from django.db import transaction
from django.db.models import Q

from apps.backend.celery import app
from apps.backend.utils.redis import REDIS_INST
Expand Down Expand Up @@ -374,28 +376,38 @@ def update_or_create_host_base(biz_id, ap_map_config, is_gse2_gray, task_id, cmd


def sync_biz_incremental_hosts(
bk_biz_id: int, ap_map_config: SyncHostApMapConfig, expected_bk_host_ids: typing.Iterable[int], is_gse2_gray: bool
bk_biz_id: int,
ap_map_config: SyncHostApMapConfig,
expected_bk_host_ids: typing.Iterable[int],
is_gse2_gray: bool,
inner_ips: typing.List[str],
):
"""
同步业务增量主机
:param bk_biz_id: 业务ID
:param ap_map_config:
:param expected_bk_host_ids: 期望得到的主机ID列表
:param is_gse2_gray:
:param inner_ips:内网IPv4/IPv6列表
:return:
"""
logger.info(
f"[sync_cmdb_host] sync_biz_incremental_hosts: "
f"bk_biz_id -> {bk_biz_id}, expected_bk_host_ids -> {expected_bk_host_ids}"
)
expected_bk_host_ids: typing.Set[int] = set(expected_bk_host_ids)
common_query_conditions = Q(bk_biz_id=bk_biz_id) & Q(bk_host_id__in=expected_bk_host_ids)
if inner_ips:
# 因经给序列化器校验后,必定有一个IP类型列表是有值的
ipv4_list, ipv6_list = filter_ipv4_and_ipv6(inner_ips)
common_query_conditions &= Q(inner_ip__in=ipv4_list) | Q(inner_ipv6__in=ipv6_list)

exists_host_ids: typing.Set[int] = set(
models.Host.objects.filter(bk_biz_id=bk_biz_id, bk_host_id__in=expected_bk_host_ids).values_list(
"bk_host_id", flat=True
)
models.Host.objects.filter(common_query_conditions).values_list("bk_host_id", flat=True)
)
# 计算出对比本地主机缓存,增量的主机 ID
incremental_host_ids: typing.List[int] = list(expected_bk_host_ids - exists_host_ids)
logger.info(f"need sync hosts id: {incremental_host_ids}, length -> {len(incremental_host_ids)}")
# 尝试获取增量主机信息
hosts: typing.List[typing.Dict] = query_biz_hosts(bk_biz_id=bk_biz_id, bk_host_ids=incremental_host_ids)
# 更新本地缓存
Expand All @@ -408,25 +420,31 @@ def sync_biz_incremental_hosts(
)


def bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]]):
def bulk_differential_sync_biz_hosts(
expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.Iterable[int]],
inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = None,
):
"""
并发同步增量主机
:param expected_bk_host_ids_gby_bk_biz_id: 按业务ID聚合主机ID列表
:param inner_ips_gby_bk_biz_id: 按业务ID聚合主机内网IP列表
:return:
"""
params_list: typing.List[typing.Dict] = []
ap_map_config: SyncHostApMapConfig = get_sync_host_ap_map_config()
gray_tools: GrayTools = GrayTools()
# TODO 开始跳跃
for bk_biz_id, bk_host_ids in expected_bk_host_ids_gby_bk_biz_id.items():
params_list.append(
{
"bk_biz_id": bk_biz_id,
"ap_map_config": ap_map_config,
"expected_bk_host_ids": bk_host_ids,
"is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id),
}
)
params = {
"bk_biz_id": bk_biz_id,
"ap_map_config": ap_map_config,
"expected_bk_host_ids": bk_host_ids,
"is_gse2_gray": gray_tools.is_gse2_gray(bk_biz_id=bk_biz_id),
"inner_ips": None,
}
if inner_ips_gby_bk_biz_id:
params["inner_ips"] = inner_ips_gby_bk_biz_id.get(bk_biz_id)
params_list.append(params)
batch_call(func=sync_biz_incremental_hosts, params_list=params_list)


Expand Down Expand Up @@ -618,3 +636,25 @@ def clear_need_delete_host_ids_task():
"""
task_id = clear_need_delete_host_ids_task.request.id
clear_need_delete_host_ids(task_id)


def filter_ipv4_and_ipv6(ip_list):
"""
过滤出列表中的IPv4、IPv6地址
:param ip_list: 包含IP地址的列表
:return: 包含IPv4、IPv6地址的列表
"""
ipv4_list = []
ipv6_list = []
for ip in ip_list:
try:
# 尝试将字符串解析为IP地址对象
ip_obj = ipaddress.ip_address(ip)
if isinstance(ip_obj, ipaddress.IPv4Address):
ipv4_list.append(ip)
if isinstance(ip_obj, ipaddress.IPv6Address):
ipv6_list.append(ip)
except ValueError:
# 如果解析失败,则跳过该IP地址
continue
return ipv4_list, ipv6_list
10 changes: 9 additions & 1 deletion apps/node_man/serializers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def validate(self, attrs):

bk_biz_ids = set()
expected_bk_host_ids_gby_bk_biz_id: typing.Dict[int, typing.List[int]] = defaultdict(list)
inner_ips_gby_bk_biz_id: typing.Dict[int, typing.List[str]] = defaultdict(list)
cipher = tools.HostTools.get_asymmetric_cipher()
fields_need_decrypt = ["password", "key"]
# 密码解密
Expand All @@ -325,10 +326,17 @@ def validate(self, attrs):
if "bk_biz_id" not in host:
raise ValidationError(_("主机信息缺少业务ID(bk_biz_id)"))
expected_bk_host_ids_gby_bk_biz_id[host["bk_biz_id"]].append(host["bk_host_id"])
if host.get("inner_ip"):
inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ip"])
elif host.get("inner_ipv6"):
inner_ips_gby_bk_biz_id[host["bk_biz_id"]].append(host["inner_ipv6"])

if attrs["op_type"] not in [constants.OpType.INSTALL, constants.OpType.REPLACE]:
# 差量同步主机
bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id)
bulk_differential_sync_biz_hosts(
expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id,
inner_ips_gby_bk_biz_id=inner_ips_gby_bk_biz_id,
)

set_agent_setup_info_to_attrs(attrs)

Expand Down
6 changes: 3 additions & 3 deletions apps/node_man/tests/test_handlers/test_install_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,6 @@ def test_install_channel_hidden(self):
hidden=True,
)

self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 10)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 10)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/")["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": False})["data"]), 11)
self.assertEqual(len(self.client.get("/api/install_channel/", {"with_hidden": True})["data"]), 12)
32 changes: 30 additions & 2 deletions apps/node_man/tests/test_views/test_job_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from apps.mock_data.common_unit import host
from apps.mock_data.views_mkd import job
from apps.node_man import constants
from apps.node_man.models import Host
from apps.node_man.tests.utils import Subscription
from apps.node_man.models import Host, IdentityData
from apps.node_man.tests.utils import MockClient, Subscription
from apps.utils.unittest.testcase import CustomAPITestCase, MockSuperUserMixin


Expand Down Expand Up @@ -94,3 +94,31 @@ def generate_install_job_request_params():
data["hosts"][0]["outer_ip"] = ""
data["hosts"][0]["outer_ipv6"] = ""
return data


class TestHostInfoNotUpdateCase(MockSuperUserMixin, CustomAPITestCase):
def setUp(self) -> None:
Host.objects.update_or_create(
defaults={
"bk_cloud_id": constants.DEFAULT_CLOUD,
"node_type": constants.NodeType.AGENT,
"bk_biz_id": 100001,
"inner_ip": host.DEFAULT_IP,
},
bk_host_id=14110,
)
identify_data = copy.deepcopy(host.IDENTITY_MODEL_DATA)
identify_data["bk_host_id"] = 14110
IdentityData.objects.create(**identify_data)
return super().setUp()

@patch("apps.node_man.handlers.job.JobHandler.create_subscription", Subscription.create_subscription)
@patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient)
def test_install(self):
data = copy.deepcopy(job.JOB_REINSTALL_REQUEST_PARAMS)
data["hosts"][0]["inner_ip"] = "2.1.2.52"

response = self.client.post(path="/api/job/install/", data=data)
# 成功创建安装任务
self.assertEqual(response["result"], True)
self.assertEqual(type(response["data"]["job_id"]), int)