Skip to content

Commit

Permalink
NSXT Bulk API for realizing SGs
Browse files Browse the repository at this point in the history
For ports with more than 27 SGs, the approach of adding them as static
member one by one results in a rpc-timout. Thus ports rely on the
update_sg_group method executed after a port binding.

With this commit, we use the nsxt bulk API to add the staticmembership
of ports directly after binding the port.
  • Loading branch information
sven-rosenzweig committed Jan 23, 2024
1 parent 0890df5 commit 58c4718
Showing 1 changed file with 79 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,7 @@ def infra(self, target_obj: dict, child_objs: List[dict]) -> dict:
"id": target_obj.get("id"),
"target_type": target_obj.get("resource_type"),
"children": [
{
o.get("resource_type"): {
"path": o.get("path"),
"parent_path": o.get("parent_path"),
"id": o.get("id"),
"resource_type": o.get("resource_type")
},
"resource_type": "Child{}".format(o.get("resource_type")),
"marked_for_delete": o.get("marked_for_delete")
} for o in child_objs
{ o } for o in child_objs
]
}
]
Expand Down Expand Up @@ -322,7 +313,16 @@ def segment_port(self, os_port, provider_port) -> dict:

return segment_port

# NSX-T Group Members
def sg_members_child_container(self, sgs) -> dict:
sgs = []
for sg in sgs:
tmp = { "resource_type": "ChildGroup",
"Group": self.sg_members_container(sg, {"paths": sg["member_paths"], "_revision": sg["_revision"]}),
}
tmp["Group"]["resource_type"] = "Group"
sgs.append(tmp)
return sgs

def sg_members_container(self, os_sg: dict, provider_sg: dict) -> dict:
sg_id = os_sg.get("id")
sg = {
Expand Down Expand Up @@ -456,7 +456,6 @@ def _filter_out_ipv4_mapped_ipv6_nets(self, target):


class Provider(base.Provider):

QOS = "Segment QoS"
NETWORK = "Segment"
PORT = "SegmentPort"
Expand Down Expand Up @@ -618,6 +617,33 @@ def _wait_to_realize(self, resource_type, os_id):
exporter.REALIZED.labels(resource_type, status).inc()
raise Exception("{} ID: {} did not get realized for {}s", resource_type, os_id, until * pause)

def _realize_bulk(self, resource_type: str, data, nsxt_res_type, dsl):
report = "Bulk Operation: Resource: {} with ID: {} is going to be %s.".format(resource_type, lambda: data[0]["id"])

LOG.info(report, "created")
path = "{}".format(API.INFRA)

res = self.client.patch(path=path, data=data)
res.raise_for_status()

nsxt_objects = [child[nsxt_res_type] for child in data["children"][0]["children"]]
nsxt_objects = sorted(nsxt_objects, key=lambda x: x["id"])

# Fetch _revision data (needed to run put requests) for all updated objects
# This is needed to run put requests on objects since put relies on the correct _revision number
updated_nsxt_objs = self.client.get_all(path=API.SEARCH_DSL, params=API.SEARCH_DSL_QUERY(nsxt_res_type, dsl))
updated_nsxt_objs = sorted(updated_nsxt_objs, key=lambda x: x["id"])

for curr_obj in zip(nsxt_objects, updated_nsxt_objs):
nsxt_objects['_revision'] = curr_obj['_revision']
nsxt_objects['parent_path'] = curr_obj['parent_path']
nsxt_objects['unique_id'] = curr_obj['unique_id']
nsxt_objects['_last_modified_time'] = curr_obj['_last_modified_time']
nsxt_objects['resource_type'] = curr_obj['resource_type']
nsxt_objects['marked_for_delete'] = curr_obj['marked_for_delete']

meta = self.metadata_update_bulk(resource_type, nsxt_objects)
return meta
# overrides
@refresh_and_retry
def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: dict, provider_o: dict):
Expand Down Expand Up @@ -768,7 +794,6 @@ def port_realize(self, os_port: dict, delete=False):
LOG.info("Segment Port:%s already deleted.", port_id)
return
return self._delete_segment_port(os_port, port_meta)

# Realize the port via the Policy API
provider_port = dict()
parent_port_id = os_port.get("parent_id")
Expand Down Expand Up @@ -821,6 +846,10 @@ def port_realize(self, os_port: dict, delete=False):
LOG.info("Port: %s has %s security groups which is more than the maximum allowed %s. \
The port will be added to the security groups as a static member.", port_id, len(port_sgs), max_sg_tags)
os_port["security_groups"] = None
port_data = self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port)
self.sg_members_realize_bulk(port_sgs, port_id)
return port_data

return self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port)
def get_port(self, os_id):
port = self.client.get_unique(path=API.SEARCH_QUERY, params={"query": API.SEARCH_Q_SEG_PORT.format(os_id)})
Expand Down Expand Up @@ -871,6 +900,33 @@ def qos_realize(self, qos: dict, delete=False):
provider_o = {"id": meta.real_id, "_revision": meta.revision}
return self._realize(Provider.QOS, delete, self.payload.qos, qos, provider_o)

def sg_members_realize_bulk(self, os_objects: list, port_id):
nsxt_objects = []
dsl = port_id

for os_obj in os_objects:
meta_data = self.metadata(self.SG_MEMBERS, os_obj)

if meta_data:
sg_o = {"id": os_obj,
"cidrs": meta_data.sg_cidrs,
"member_paths": meta_data.sg_members,
"revision_number": meta_data.rev,
"_revision": meta_data.revision
}
else:
sg_o = {"id": os_obj,
"cidrs": [],
"member_paths": [],
"revision_number": 0,
"_revision": None
}
nsxt_objects.append(sg_o)

container = self.payload.sg_members_child_container(nsxt_objects)
data = self.payload.infra({"id": "default", "resource_type": "Domain"}, container)
return self._realize_bulk(Provider.SG_MEMBERS, data,"Group", dsl)

def sg_members_realize(self, os_sg: dict, delete=False):
os_id = os_sg.get("id")
if delete and self.metadata(Provider.SG_RULES, os_id):
Expand Down Expand Up @@ -903,6 +959,16 @@ def metadata_update(self, resource_type, provider_object) -> PolicyResourceMeta:
self._metadata[resource_type].meta.update(res)
return res.meta

def metadata_update_bulk(self, resource_type, provider_objects) -> PolicyResourceMeta:
if resource_type != Provider.SG_RULE:
with LockManager.get_lock(resource_type):
meta = []
for po in provider_objects:
res = Resource(po)
self._metadata[resource_type].meta.update(res)
meta.append(res.meta)
return meta

# overrides
def metadata_refresh(self, resource_type, params=dict()):
provider = self._metadata[resource_type]
Expand Down

0 comments on commit 58c4718

Please sign in to comment.