Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NSXT Bulk API for realizing SGs #129

Open
wants to merge 1 commit into
base: stable/yoga-m3
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,7 @@ def infra(self, target_obj: dict, child_objs: List[dict]) -> dict:
"id": target_obj.get("id"),
"target_type": target_obj.get("resource_type"),
"children": [
{
o.get("resource_type"): {
"path": o.get("path"),
"parent_path": o.get("parent_path"),
"id": o.get("id"),
"resource_type": o.get("resource_type")
},
"resource_type": "Child{}".format(o.get("resource_type")),
"marked_for_delete": o.get("marked_for_delete")
} for o in child_objs
{ o } for o in child_objs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This simplification improvement does not work for the delete segment port scenario. Also, this is the reason why one of the functional tests failed. Suggest reviewing it to the old structure and adding the additional fields you need (if any). Also, it will be more readable.

]
}
]
Expand Down Expand Up @@ -322,7 +313,16 @@ def segment_port(self, os_port, provider_port) -> dict:

return segment_port

# NSX-T Group Members
def sg_members_child_container(self, sgs) -> dict:
sgs = []
for sg in sgs:
tmp = { "resource_type": "ChildGroup",
"Group": self.sg_members_container(sg, {"paths": sg["member_paths"], "_revision": sg["_revision"]}),
}
tmp["Group"]["resource_type"] = "Group"
sgs.append(tmp)
return sgs

def sg_members_container(self, os_sg: dict, provider_sg: dict) -> dict:
sg_id = os_sg.get("id")
sg = {
Expand Down Expand Up @@ -456,7 +456,6 @@ def _filter_out_ipv4_mapped_ipv6_nets(self, target):


class Provider(base.Provider):

QOS = "Segment QoS"
NETWORK = "Segment"
PORT = "SegmentPort"
Expand Down Expand Up @@ -618,6 +617,33 @@ def _wait_to_realize(self, resource_type, os_id):
exporter.REALIZED.labels(resource_type, status).inc()
raise Exception("{} ID: {} did not get realized for {}s", resource_type, os_id, until * pause)

def _realize_bulk(self, resource_type: str, data, nsxt_res_type, dsl):
report = "Bulk Operation: Resource: {} with ID: {} is going to be %s.".format(resource_type, lambda: data[0]["id"])

LOG.info(report, "created")
path = "{}".format(API.INFRA)

res = self.client.patch(path=path, data=data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a lock before the execution of the actual update/create operation. The challenge here is not to lock on the entire "SG_MEMBERS" type but for each group ID individually "member-{}".format(os_id). A semaphore implementation would be required.

res.raise_for_status()

nsxt_objects = [child[nsxt_res_type] for child in data["children"][0]["children"]]
nsxt_objects = sorted(nsxt_objects, key=lambda x: x["id"])

# Fetch _revision data (needed to run put requests) for all updated objects
# This is needed to run put requests on objects since put relies on the correct _revision number
updated_nsxt_objs = self.client.get_all(path=API.SEARCH_DSL, params=API.SEARCH_DSL_QUERY(nsxt_res_type, dsl))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the search API approach. Ideally, we need to use GET /infra as we use the same endpoint for the update/create operation.

updated_nsxt_objs = sorted(updated_nsxt_objs, key=lambda x: x["id"])

for curr_obj in zip(nsxt_objects, updated_nsxt_objs):
nsxt_objects['_revision'] = curr_obj['_revision']
nsxt_objects['parent_path'] = curr_obj['parent_path']
nsxt_objects['unique_id'] = curr_obj['unique_id']
nsxt_objects['_last_modified_time'] = curr_obj['_last_modified_time']
nsxt_objects['resource_type'] = curr_obj['resource_type']
nsxt_objects['marked_for_delete'] = curr_obj['marked_for_delete']

meta = self.metadata_update_bulk(resource_type, nsxt_objects)
return meta
# overrides
@refresh_and_retry
def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: dict, provider_o: dict):
Expand Down Expand Up @@ -768,7 +794,6 @@ def port_realize(self, os_port: dict, delete=False):
LOG.info("Segment Port:%s already deleted.", port_id)
return
return self._delete_segment_port(os_port, port_meta)

# Realize the port via the Policy API
provider_port = dict()
parent_port_id = os_port.get("parent_id")
Expand Down Expand Up @@ -821,6 +846,10 @@ def port_realize(self, os_port: dict, delete=False):
LOG.info("Port: %s has %s security groups which is more than the maximum allowed %s. \
The port will be added to the security groups as a static member.", port_id, len(port_sgs), max_sg_tags)
os_port["security_groups"] = None
port_data = self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port)
self.sg_members_realize_bulk(port_sgs, port_id)
return port_data

return self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port)
def get_port(self, os_id):
port = self.client.get_unique(path=API.SEARCH_QUERY, params={"query": API.SEARCH_Q_SEG_PORT.format(os_id)})
Expand Down Expand Up @@ -871,6 +900,33 @@ def qos_realize(self, qos: dict, delete=False):
provider_o = {"id": meta.real_id, "_revision": meta.revision}
return self._realize(Provider.QOS, delete, self.payload.qos, qos, provider_o)

def sg_members_realize_bulk(self, os_objects: list, port_id):
nsxt_objects = []
dsl = port_id

for os_obj in os_objects:
meta_data = self.metadata(self.SG_MEMBERS, os_obj)

if meta_data:
sg_o = {"id": os_obj,
"cidrs": meta_data.sg_cidrs,
"member_paths": meta_data.sg_members,
"revision_number": meta_data.rev,
"_revision": meta_data.revision
}
else:
sg_o = {"id": os_obj,
"cidrs": [],
"member_paths": [],
"revision_number": 0,
"_revision": None
}
nsxt_objects.append(sg_o)

container = self.payload.sg_members_child_container(nsxt_objects)
data = self.payload.infra({"id": "default", "resource_type": "Domain"}, container)
return self._realize_bulk(Provider.SG_MEMBERS, data,"Group", dsl)

def sg_members_realize(self, os_sg: dict, delete=False):
os_id = os_sg.get("id")
if delete and self.metadata(Provider.SG_RULES, os_id):
Expand Down Expand Up @@ -903,6 +959,16 @@ def metadata_update(self, resource_type, provider_object) -> PolicyResourceMeta:
self._metadata[resource_type].meta.update(res)
return res.meta

def metadata_update_bulk(self, resource_type, provider_objects) -> PolicyResourceMeta:
if resource_type != Provider.SG_RULE:
with LockManager.get_lock(resource_type):
meta = []
for po in provider_objects:
res = Resource(po)
self._metadata[resource_type].meta.update(res)
meta.append(res.meta)
return meta

# overrides
def metadata_refresh(self, resource_type, params=dict()):
provider = self._metadata[resource_type]
Expand Down
Loading