diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py index f711b66..ba5db14 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py @@ -217,16 +217,7 @@ def infra(self, target_obj: dict, child_objs: List[dict]) -> dict: "id": target_obj.get("id"), "target_type": target_obj.get("resource_type"), "children": [ - { - o.get("resource_type"): { - "path": o.get("path"), - "parent_path": o.get("parent_path"), - "id": o.get("id"), - "resource_type": o.get("resource_type") - }, - "resource_type": "Child{}".format(o.get("resource_type")), - "marked_for_delete": o.get("marked_for_delete") - } for o in child_objs + { o } for o in child_objs ] } ] @@ -322,7 +313,16 @@ def segment_port(self, os_port, provider_port) -> dict: return segment_port - # NSX-T Group Members + def sg_members_child_container(self, sgs) -> dict: + sgs = [] + for sg in sgs: + tmp = { "resource_type": "ChildGroup", + "Group": self.sg_members_container(sg, {"paths": sg["member_paths"], "_revision": sg["_revision"]}), + } + tmp["Group"]["resource_type"] = "Group" + sgs.append(tmp) + return sgs + def sg_members_container(self, os_sg: dict, provider_sg: dict) -> dict: sg_id = os_sg.get("id") sg = { @@ -456,7 +456,6 @@ def _filter_out_ipv4_mapped_ipv6_nets(self, target): class Provider(base.Provider): - QOS = "Segment QoS" NETWORK = "Segment" PORT = "SegmentPort" @@ -618,6 +617,33 @@ def _wait_to_realize(self, resource_type, os_id): exporter.REALIZED.labels(resource_type, status).inc() raise Exception("{} ID: {} did not get realized for {}s", resource_type, os_id, until * pause) + def _realize_bulk(self, resource_type: str, data, nsxt_res_type, dsl): + report = "Bulk Operation: Resource: {} with ID: {} is going to be %s.".format(resource_type, lambda: data[0]["id"]) + + LOG.info(report, "created") + path = "{}".format(API.INFRA) + + res = self.client.patch(path=path, data=data) + res.raise_for_status() + + nsxt_objects = [child[nsxt_res_type] for child in data["children"][0]["children"]] + nsxt_objects = sorted(nsxt_objects, key=lambda x: x["id"]) + + # Fetch _revision data (needed to run put requests) for all updated objects + # This is needed to run put requests on objects since put relies on the correct _revision number + updated_nsxt_objs = self.client.get_all(path=API.SEARCH_DSL, params=API.SEARCH_DSL_QUERY(nsxt_res_type, dsl)) + updated_nsxt_objs = sorted(updated_nsxt_objs, key=lambda x: x["id"]) + + for curr_obj in zip(nsxt_objects, updated_nsxt_objs): + nsxt_objects['_revision'] = curr_obj['_revision'] + nsxt_objects['parent_path'] = curr_obj['parent_path'] + nsxt_objects['unique_id'] = curr_obj['unique_id'] + nsxt_objects['_last_modified_time'] = curr_obj['_last_modified_time'] + nsxt_objects['resource_type'] = curr_obj['resource_type'] + nsxt_objects['marked_for_delete'] = curr_obj['marked_for_delete'] + + meta = self.metadata_update_bulk(resource_type, nsxt_objects) + return meta # overrides @refresh_and_retry def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: dict, provider_o: dict): @@ -768,7 +794,6 @@ def port_realize(self, os_port: dict, delete=False): LOG.info("Segment Port:%s already deleted.", port_id) return return self._delete_segment_port(os_port, port_meta) - # Realize the port via the Policy API provider_port = dict() parent_port_id = os_port.get("parent_id") @@ -821,6 +846,10 @@ def port_realize(self, os_port: dict, delete=False): LOG.info("Port: %s has %s security groups which is more than the maximum allowed %s. \ The port will be added to the security groups as a static member.", port_id, len(port_sgs), max_sg_tags) os_port["security_groups"] = None + port_data = self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port) + self.sg_members_realize_bulk(port_sgs, port_id) + return port_data + return self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port) def get_port(self, os_id): port = self.client.get_unique(path=API.SEARCH_QUERY, params={"query": API.SEARCH_Q_SEG_PORT.format(os_id)}) @@ -871,6 +900,33 @@ def qos_realize(self, qos: dict, delete=False): provider_o = {"id": meta.real_id, "_revision": meta.revision} return self._realize(Provider.QOS, delete, self.payload.qos, qos, provider_o) + def sg_members_realize_bulk(self, os_objects: list, port_id): + nsxt_objects = [] + dsl = port_id + + for os_obj in os_objects: + meta_data = self.metadata(self.SG_MEMBERS, os_obj) + + if meta_data: + sg_o = {"id": os_obj, + "cidrs": meta_data.sg_cidrs, + "member_paths": meta_data.sg_members, + "revision_number": meta_data.rev, + "_revision": meta_data.revision + } + else: + sg_o = {"id": os_obj, + "cidrs": [], + "member_paths": [], + "revision_number": 0, + "_revision": None + } + nsxt_objects.append(sg_o) + + container = self.payload.sg_members_child_container(nsxt_objects) + data = self.payload.infra({"id": "default", "resource_type": "Domain"}, container) + return self._realize_bulk(Provider.SG_MEMBERS, data,"Group", dsl) + def sg_members_realize(self, os_sg: dict, delete=False): os_id = os_sg.get("id") if delete and self.metadata(Provider.SG_RULES, os_id): @@ -903,6 +959,16 @@ def metadata_update(self, resource_type, provider_object) -> PolicyResourceMeta: self._metadata[resource_type].meta.update(res) return res.meta + def metadata_update_bulk(self, resource_type, provider_objects) -> PolicyResourceMeta: + if resource_type != Provider.SG_RULE: + with LockManager.get_lock(resource_type): + meta = [] + for po in provider_objects: + res = Resource(po) + self._metadata[resource_type].meta.update(res) + meta.append(res.meta) + return meta + # overrides def metadata_refresh(self, resource_type, params=dict()): provider = self._metadata[resource_type]