From e16c94e69ac60ca7d08f398b6b1b23b65b4a3c54 Mon Sep 17 00:00:00 2001 From: Sven Rosenzweig Date: Fri, 11 Aug 2023 14:57:35 +0200 Subject: [PATCH 1/3] Use context provided by request for client -> agent RPC Scoping all API and RPC calls to the same context makes debugging far easier since the request will be logged. Oslo_log tries to extract the context either by explicit passing to the logger or taking it from the thread-local store. The latter does not work since some of the requests enter a queue before being processed by a different thread. Therefore the context is passed in job entering the queue and passed to every log line necessary. --- networking_nsxv3/common/synchronization.py | 12 ++- .../plugins/ml2/drivers/nsxv3/agent/agent.py | 34 +++--- .../drivers/nsxv3/agent/provider_nsx_mgmt.py | 42 ++++---- .../nsxv3/agent/provider_nsx_policy.py | 100 +++++++++--------- .../ml2/drivers/nsxv3/agent/realization.py | 62 +++++------ 5 files changed, 127 insertions(+), 123 deletions(-) diff --git a/networking_nsxv3/common/synchronization.py b/networking_nsxv3/common/synchronization.py index 31e25cc5..f0dbde85 100644 --- a/networking_nsxv3/common/synchronization.py +++ b/networking_nsxv3/common/synchronization.py @@ -16,6 +16,7 @@ import functools import collections +from oslo_context import context LOG = logging.getLogger(__name__) @@ -69,10 +70,11 @@ def retry_next(self): class Runnable(object): - def __init__(self, idn, fn, priority=Priority.LOWEST): + def __init__(self, idn, fn, priority=Priority.LOWEST, context=None): self.priority = priority self.idn = idn self.fn = fn + self.context = context def __repr__(self): return str(self.idn) @@ -137,7 +139,7 @@ def __init__(self, active_size=INFINITY, passive_size=INFINITY, self._idle = workers_size self._state = "not started" - def run(self, priority, ids, fn): + def run(self, priority, ids, fn, context=None): """ Submit a job with priority Keyword arguments: @@ -154,7 +156,7 @@ def run(self, priority, ids, fn): try: LOG.info(MESSAGE.format("Enqueued", jid, priority.name, fn.__name__)) - job = Runnable(jid, fn, priority.value) + job = Runnable(jid, fn, priority.value, context) if priority.value == Priority.HIGHEST: self._active.put_nowait(job) else: @@ -172,8 +174,8 @@ def _start(self): self._active.put_nowait(self._passive.get_nowait()) self._passive.task_done() job = self._active.get(block=True, timeout=TIMEOUT) - LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__)) - self._workers.spawn(job.fn, job.idn)#.wait() + LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__), context=job.context) + self._workers.spawn(job.fn, job.idn, job.context)#.wait() self._active.task_done() except eventlet.queue.Empty: LOG.info("No activity for the last {} seconds.".format(TIMEOUT)) diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py index 8f9c6693..f0ca1311 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py @@ -72,19 +72,19 @@ def get_network_bridge(self, context, current, network_segments, network_current seg_id = ns.get("segmentation_id") net_type = ns.get("network_type") if seg_id and net_type in nsxv3_constants.NSXV3_AGENT_NETWORK_TYPES: - network_meta = self.realizer.network(seg_id) + network_meta = self.realizer.network(seg_id, context=context) break if try_create_port and bool(network_meta.get("nsx-logical-switch-id")): - self.realizer.precreate_port(current["id"], network_meta) + self.realizer.precreate_port(current["id"], network_meta, context=context) return network_meta def security_groups_member_updated(self, context, **kwargs): - self.callback(kwargs["security_groups"], self.realizer.security_group_members) + self.callback(kwargs["security_groups"], self.realizer.security_group_members, context) def security_groups_rule_updated(self, context, **kwargs): - self.callback(kwargs["security_groups"], self.realizer.security_group_rules) + self.callback(kwargs["security_groups"], self.realizer.security_group_rules, context) def port_create(self, **kwargs): self.realizer.port(kwargs["port"]["id"]) @@ -92,41 +92,41 @@ def port_create(self, **kwargs): def port_update(self, context, **kwargs): # Ensure security groups attached to the port are synced first for sg in kwargs["port"].get("security_groups", []): - self.callback(sg, self.realizer.security_group_rules) + self.callback(sg, self.realizer.security_group_rules, context) # Also ensure allowed_address_pairs are re-processed - self.callback(sg, self.realizer.security_group_members) - self.callback(kwargs["port"]["id"], self.realizer.port) + self.callback(sg, self.realizer.security_group_members, context) + self.callback(kwargs["port"]["id"], self.realizer.port, context) def port_delete(self, context, **kwargs): # Ports removed by the background synchronization pass def create_policy(self, context, policy): - self.update_policy(context, policy) + self.update_policy(context, policy, context) def delete_policy(self, context, policy): - self.update_policy(context, policy) + self.update_policy(context, policy, context) def update_policy(self, context, policy): - self.callback(policy["id"], self.realizer.qos) + self.callback(policy["id"], self.realizer.qos, context) def validate_policy(self, context, policy): pass def create_log(self, context, log_obj): - self.callback(log_obj, self.realizer.enable_policy_logging) + self.callback(log_obj, self.realizer.enable_policy_logging, context) def create_log_precommit(self, context, log_obj): pass def update_log(self, context, log_obj): - self.callback(log_obj, self.realizer.update_policy_logging) + self.callback(log_obj, self.realizer.update_policy_logging, context) def update_log_precommit(self, context, log_obj): pass def delete_log(self, context, log_obj): - self.callback(log_obj, self.realizer.disable_policy_logging) + self.callback(log_obj, self.realizer.disable_policy_logging, context) def delete_log_precommit(self, context, log_obj): pass @@ -162,15 +162,15 @@ def _sync_all(self): except Exception as err: LOG.error("Synchronization has failed. Error: %s", err) - def _sync_immediate(self, os_ids, realizer): + def _sync_immediate(self, os_ids, realizer, context=None): ids = list(os_ids) if isinstance(os_ids, set) else os_ids ids = ids if isinstance(ids, list) else [ids] - self.runner.run(sync.Priority.HIGHEST, ids, realizer) + self.runner.run(sync.Priority.HIGHEST, ids, realizer, context) - def _sync_delayed(self, os_ids, realizer): + def _sync_delayed(self, os_ids, realizer, context=None): ids = list(os_ids) if isinstance(os_ids, set) else os_ids ids = ids if isinstance(ids, list) else [ids] - self.runner.run(sync.Priority.HIGH, ids, realizer) + self.runner.run(sync.Priority.HIGH, ids, realizer, context) def kpi(self): return {"active": self.runner.active(), "passive": self.runner.passive()} diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_mgmt.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_mgmt.py index 3c8f9524..66446dce 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_mgmt.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_mgmt.py @@ -575,7 +575,7 @@ def metadata(self, resource_type, os_id) -> ResourceMeta: def meta_provider(self, resource_type) -> base.MetaProvider: return self._metadata.get(resource_type) - def _realize(self, resource_type, delete, convertor, os_o, provider_o): + def _realize(self, resource_type, delete, convertor, os_o, provider_o, context=None): os_id = os_o.get("id") begin_report = "[{}] Resource: {} with ID: {} is going to be %s.".format(self.provider, resource_type, os_id) @@ -601,7 +601,7 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o): if resource_type == Provider.PORT: res = self.client.get(path=path) if res.status_code == 404: - LOG.info(end_report, "rescheduled due 404: not found") + LOG.info(end_report, "rescheduled due 404: not found", context=context) return metadata o = res.json() @@ -609,16 +609,16 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o): stamp = int(o.get("_last_modified_time")) / 1000 if not self.orphan_ports_tmout_passed(stamp): - LOG.info(end_report, "rescheduled for deletion") + LOG.info(end_report, "rescheduled for deletion", context=context) return metadata self.client.delete(path=path, params=params) - LOG.info(end_report, "deleted") + LOG.info(end_report, "deleted", context=context) return self.metadata_delete(resource_type, os_id) else: - LOG.info(begin_report, "updated") + LOG.info(begin_report, "updated", context=context) if resource_type == Provider.SG_RULES_EXT: LOG.debug( "Skipping update of NSGroup:%s", @@ -627,15 +627,15 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o): if metadata.revision != None: data["_revision"] = metadata.revision o = self.client.put(path=path, data=data) - LOG.info(end_report, "updated") + LOG.info(end_report, "updated", context=context) return self.metadata_update(resource_type, o.json()) else: if not delete: - LOG.info(begin_report, "created") + LOG.info(begin_report, "created", context=context) o = self.client.post(path=path, data=convertor(os_o, provider_o)) - LOG.info(end_report, "created") + LOG.info(end_report, "created", context=context) return self.metadata_update(resource_type, o.json()) - LOG.info(end_report, "already deleted") + LOG.info(end_report, "already deleted", context=context) def outdated(self, resource_type: str, os_meta): self.metadata_refresh(resource_type) @@ -698,7 +698,7 @@ def get_port(self, os_id): return self.metadata_update(Provider.PORT, port), port return None - def port_realize(self, os_port: dict, delete=False): + def port_realize(self, os_port: dict, delete=False, context=None): provider_port = dict() if delete: @@ -711,7 +711,7 @@ def port_realize(self, os_port: dict, delete=False): if parent_port and parent_port[0]: provider_port["parent_id"] = parent_port[0].id else: - LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id")) + LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id"), context=context) return else: # Parent port is NOT always created externally @@ -719,31 +719,31 @@ def port_realize(self, os_port: dict, delete=False): if port and port[0]: provider_port["id"] = port[0].id else: - LOG.warning("Not found. Port: %s", os_port.get("id")) + LOG.warning("Not found. Port: %s", os_port.get("id"), context=context) if os_port.get("qos_policy_id"): meta_qos = self.metadata(Provider.QOS, os_port.get("qos_policy_id")) if meta_qos: provider_port["qos_policy_id"] = meta_qos.id else: - LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id")) + LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id"), context=context) provider_port["switching_profile_ids"] = copy.deepcopy(self.switch_profiles) - return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port) + return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port, context=context) - def qos_realize(self, qos, delete=False): - return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict()) + def qos_realize(self, qos, delete=False, context=None): + return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict(), context) - def sg_members_realize(self, sg, delete=False): + def sg_members_realize(self, sg, delete=False, context=None): if delete and self.metadata(Provider.SG_RULES, sg.get("id")): LOG.warning( "Resource: %s with ID: %s deletion is rescheduled due to dependency.", Provider.SG_MEMBERS, sg.get("id") ) return - return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict()) + return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict(), context) - def sg_rules_realize(self, os_sg, delete=False, logged=False): + def sg_rules_realize(self, os_sg, delete=False, logged=False, context=None): provider_sg = dict() nsg_args = [Provider.SG_RULES_EXT, delete, self.payload.sg_rules_ext_container, os_sg, dict()] @@ -764,7 +764,7 @@ def sg_rules_realize(self, os_sg, delete=False, logged=False): # Update section tags(revision) when all rules applied successfully provider_sg["tags_update"] = True - self._realize(*sec_args) + self._realize(*sec_args, context=context) def _sg_rules_realize(self, os_sg, meta_sg: ResourceMeta, logged=False): @@ -824,7 +824,7 @@ def _create_sg_provider_rule_remote_prefix(self, cidr): def _delete_sg_provider_rule_remote_prefix(self, id): self.client.delete(path=API.IPSET.format(id)) - def network_realize(self, segmentation_id): + def network_realize(self, segmentation_id, context=None): meta = self.metadata(self.NETWORK, segmentation_id) if not meta: os_net = {"id": "{}-{}".format(self.zone_name, segmentation_id), "segmentation_id": segmentation_id} diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py index 941f5318..e681aa04 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/provider_nsx_policy.py @@ -558,7 +558,7 @@ def _is_valid_vlan(self, res: Resource) -> bool: return False @exporter.IN_REALIZATION.track_inprogress() - def _wait_to_realize(self, resource_type, os_id): + def _wait_to_realize(self, resource_type, os_id, context): if resource_type == Provider.SG_RULES: path = API.POLICY.format(os_id) elif resource_type == Provider.SG_MEMBERS: @@ -576,11 +576,11 @@ def _wait_to_realize(self, resource_type, os_id): o = self.client.get(path=API.STATUS, params=params).json() status = o.get("consolidated_status", {}).get("consolidated_status") if status == "SUCCESS": - LOG.info("%s ID: %s in Status: %s", resource_type, os_id, status) + LOG.info("%s ID: %s in Status: %s", resource_type, os_id, status, context=context) exporter.REALIZED.labels(resource_type, status).inc() return True else: - LOG.info("%s ID: %s in Status: %s for %ss", resource_type, os_id, status, attempt * pause) + LOG.info("%s ID: %s in Status: %s for %ss", resource_type, os_id, status, attempt * pause, context=context) eventlet.sleep(pause) # When multiple policies did not get realized in the defined timeframe, # this is a symptom for another issue. @@ -590,7 +590,7 @@ def _wait_to_realize(self, resource_type, os_id): # overrides @refresh_and_retry - def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: dict, provider_o: dict): + def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: dict, provider_o: dict, context=None): os_id = os_o.get("id") report = "Resource: {} with ID: {} is going to be %s.".format(resource_type, os_id) @@ -598,17 +598,17 @@ def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: if meta: if delete: try: - LOG.info(report, "deleted") + LOG.info(report, "deleted", context=context) self.client.delete(path="{}{}".format(API.POLICY_BASE, meta.path)) return self.metadata_delete(resource_type, os_id) except RuntimeError as e: if re.match("cannot be deleted as either it has children or it is being referenced", str(e)): - LOG.warning(self.RESCHEDULE_WARN_MSG, resource_type, os_id) + LOG.warning(self.RESCHEDULE_WARN_MSG, resource_type, os_id, context=context) return else: raise e else: - LOG.info(report, "updated") + LOG.info(report, "updated", context=context) provider_o["_revision"] = meta.revision data = convertor(os_o, provider_o) path = "{}{}".format(API.POLICY_BASE, data.get("path")) @@ -617,11 +617,11 @@ def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: data = res.json() # NSX-T applies desired state, no need to fetch after put meta = self.metadata_update(resource_type, data) - self._wait_to_realize(resource_type, os_id) + self._wait_to_realize(resource_type, os_id, context) return meta else: if not delete: - LOG.info(report, "created") + LOG.info(report, "created", context=context) provider_o["_revision"] = None data = convertor(os_o, provider_o) path = "{}{}".format(API.POLICY_BASE, data.get("path")) @@ -630,11 +630,11 @@ def _realize(self, resource_type: str, delete: bool, convertor: Callable, os_o: data = res.json() # NSX-T applies desired state, no need to fetch after put meta = self.metadata_update(resource_type, data) - self._wait_to_realize(resource_type, os_id) + self._wait_to_realize(resource_type, os_id, context) return meta - LOG.info("Resource: %s with ID: %s already deleted.", resource_type, os_id) + LOG.info("Resource: %s with ID: %s already deleted.", resource_type, os_id, context=context) - def _delete_segment_port(self, os_port: dict, port_meta: PolicyResourceMeta) -> None: + def _delete_segment_port(self, os_port: dict, port_meta: PolicyResourceMeta, context) -> None: os_id = os_port.get("id") nsx_segment_id = port_meta.parent_path.replace(API.SEGMENT_PATH.format(""), "") target_o = {"id": nsx_segment_id, "resource_type": Provider.NETWORK} @@ -651,11 +651,11 @@ def _delete_segment_port(self, os_port: dict, port_meta: PolicyResourceMeta) -> match = re.search(r'referenced by other objects path=\[([\w\/\-\,]+)\]', err_msg) LOG.warning(self.RESCHEDULE_WARN_MSG, Provider.PORT, os_id) if match: - self._realize_sg_members_after_port_deletion(child_o, match) + self._realize_sg_members_after_port_deletion(child_o, match, context) return self.metadata_delete(Provider.PORT, os_id) - def _realize_sg_members_after_port_deletion(self, child_o, match): + def _realize_sg_members_after_port_deletion(self, child_o, match, context): refs = match.groups()[0].split(",") sg_paths = filter(lambda r: re.match(r'\/infra\/domains\/default\/groups\/', r), refs) meta = self._metadata[Provider.SG_MEMBERS].meta @@ -671,9 +671,9 @@ def _realize_sg_members_after_port_deletion(self, child_o, match): except: pass else: - self.sg_members_realize({"id": sg_id, "member_paths": sg_m.sg_members}) + self.sg_members_realize({"id": sg_id, "member_paths": sg_m.sg_members}, context) - def _sg_logged_drop_rules_realize(self, os_sg, delete=False, logged=False): + def _sg_logged_drop_rules_realize(self, os_sg, delete=False, logged=False, context=None): logged_drop_policy_rules = self.client.get_all(path=API.RULES.format(DEFAULT_APPLICATION_DROP_POLICY["id"])) is_logged = [rule for rule in logged_drop_policy_rules if rule["id"] == os_sg["id"]] @@ -698,15 +698,15 @@ def _sg_logged_drop_rules_realize(self, os_sg, delete=False, logged=False): path=API.RULES_CREATE.format(DEFAULT_APPLICATION_DROP_POLICY["id"], is_logged[0]["id"])) # overrides - def port_realize(self, os_port: dict, delete=False): + def port_realize(self, os_port: dict, delete=False, context=None): port_id = os_port.get("id") port_meta = self.metadata(Provider.PORT, port_id) if delete: if not port_meta: - LOG.info("Segment Port:%s already deleted.", port_id) + LOG.info("Segment Port:%s already deleted.", port_id, context=context) return - return self._delete_segment_port(os_port, port_meta) + return self._delete_segment_port(os_port, port_meta, context) # Realize the port via the Policy API provider_port = dict() @@ -718,7 +718,7 @@ def port_realize(self, os_port: dict, delete=False): if parent_meta: provider_port["parent_id"] = parent_meta.real_id else: - LOG.warning("Not found. Parent Segment Port:%s for Child Port:%s.", parent_port_id, port_id) + LOG.warning("Not found. Parent Segment Port:%s for Child Port:%s.", parent_port_id, port_id, context=context) return if port_meta: @@ -726,7 +726,7 @@ def port_realize(self, os_port: dict, delete=False): provider_port["path"] = port_meta.path provider_port["_revision"] = port_meta.revision else: - LOG.warning("Not found. Segment Port: %s", port_id) + LOG.warning("Not found. Segment Port: %s", port_id, context=context) os_qos_id = os_port.get("qos_policy_id") @@ -750,24 +750,24 @@ def port_realize(self, os_port: dict, delete=False): # In case the port already exists, realize the static group membership before the port is updated if port_meta: - self.realize_sg_static_members(port_sgs, port_meta) + self.realize_sg_static_members(port_sgs, port_meta, context) # Realize the port with empty security groups tags - updated_port_meta = self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port) + updated_port_meta = self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port, context) # If the port was not existing, realize the static group membership after the port was created - return updated_port_meta if port_meta is not None else self.realize_sg_static_members(port_sgs, updated_port_meta) + return updated_port_meta if port_meta is not None else self.realize_sg_static_members(port_sgs, updated_port_meta, context) - return self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port) + return self._realize(Provider.PORT, False, self.payload.segment_port, os_port, provider_port, context) - def realize_sg_static_members(self, port_sgs: List[str], port_meta: PolicyResourceMeta): + def realize_sg_static_members(self, port_sgs: List[str], port_meta: PolicyResourceMeta, context): for sg_id in port_sgs: with LockManager.get_lock("member-{}".format(sg_id)): sg_meta = self.metadata(self.SG_MEMBERS, sg_id) if not sg_meta: # Realize the Security Group if it does not exist with empty members sg_meta = self.sg_members_realize( - {"id": sg_id, "cidrs": [], "revision_number": 0, "member_paths": []}) + {"id": sg_id, "cidrs": [], "revision_number": 0, "member_paths": []}, context) if not port_meta.path: raise RuntimeError(f"Not found path in Metadata for port: {port_meta.real_id}") if port_meta.path not in sg_meta.sg_members: @@ -775,7 +775,7 @@ def realize_sg_static_members(self, port_sgs: List[str], port_meta: PolicyResour self.sg_members_realize({"id": sg_id, "cidrs": sg_meta.sg_cidrs, "member_paths": sg_meta.sg_members, - "revision_number": sg_meta.revision or 0}) + "revision_number": sg_meta.revision or 0}, context) def get_port(self, os_id): port = self.client.get_unique(path=API.SEARCH_QUERY, params={"query": API.SEARCH_Q_SEG_PORT.format(os_id)}) @@ -792,12 +792,12 @@ def get_port_meta_by_ids(self, port_ids: Set[str]) -> Set[PolicyResourceMeta]: return segment_ports # overrides - def network_realize(self, segmentation_id: int) -> PolicyResourceMeta: + def network_realize(self, segmentation_id: int, context=None) -> PolicyResourceMeta: segment = self.metadata(Provider.NETWORK, segmentation_id) if not segment or segment.real_id is None: os_net = {"id": "{}-{}".format(self.zone_name, segmentation_id), "segmentation_id": segmentation_id} provider_net = {"transport_zone_id": self.zone_id} - segment = self._realize(Provider.NETWORK, False, self.payload.segment, os_net, provider_net) + segment = self._realize(Provider.NETWORK, False, self.payload.segment, os_net, provider_net, context) return segment def get_non_default_switching_profiles(self) -> list: @@ -806,36 +806,36 @@ def get_non_default_switching_profiles(self) -> list: return [p for p in prfls if p and p.get("id").find("default") == -1] # overrides - def sg_rules_realize(self, os_sg, delete=False, logged=False): + def sg_rules_realize(self, os_sg, delete=False, logged=False, context=None): os_id = os_sg.get("id") logged = bool(logged) - self._sg_logged_drop_rules_realize(os_sg, delete, logged) + self._sg_logged_drop_rules_realize(os_sg, delete, logged, context) if delete: - self._realize(Provider.SG_RULES, delete, None, os_sg, dict()) + self._realize(Provider.SG_RULES, delete, None, os_sg, dict(), context) return provider_sg = self._create_provider_sg(os_sg, os_id, logged=logged) - return self._realize(Provider.SG_RULES, delete, self.payload.sg_rules_container, os_sg, provider_sg) + return self._realize(Provider.SG_RULES, delete, self.payload.sg_rules_container, os_sg, provider_sg, context) - def qos_realize(self, qos: dict, delete=False): + def qos_realize(self, qos: dict, delete=False, context=None): qos_id = qos.get("id") meta = self.metadata(Provider.QOS, qos_id) if not meta: return None provider_o = {"id": meta.real_id, "_revision": meta.revision} - return self._realize(Provider.QOS, delete, self.payload.qos, qos, provider_o) + return self._realize(Provider.QOS, delete, self.payload.qos, qos, provider_o, context) - def sg_members_realize(self, os_sg: dict, delete=False): + def sg_members_realize(self, os_sg: dict, delete=False, context=None): os_id = os_sg.get("id") if delete and self.metadata(Provider.SG_RULES, os_id): provider_group = {"paths": [], "_revision": None} - self._realize(Provider.SG_MEMBERS, False, self.payload.sg_members_container, os_sg, provider_group) - LOG.warning(self.RESCHEDULE_WARN_MSG, Provider.SG_MEMBERS, os_id) + self._realize(Provider.SG_MEMBERS, False, self.payload.sg_members_container, os_sg, provider_group, context) + LOG.warning(self.RESCHEDULE_WARN_MSG, Provider.SG_MEMBERS, os_id, context=context) return provider_group = {"paths": os_sg.get("member_paths"), "_revision": None} - return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, os_sg, provider_group) + return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, os_sg, provider_group, context) # overrides def metadata(self, resource_type: str, os_id: str) -> PolicyResourceMeta: @@ -974,12 +974,12 @@ def remove_orphan_service(provider_id): sanitize.append((service.get("id"), remove_orphan_service)) return sanitize - def set_policy_logging(self, log_obj, enable_logging): - LOG.debug(f"PROVIDER: set_policy_logging: {json.dumps(log_obj, indent=2)} as {enable_logging}") + def set_policy_logging(self, log_obj, enable_logging, context): + LOG.debug(f"PROVIDER: set_policy_logging: {json.dumps(log_obj, indent=2)} as {enable_logging}", context=context) # Check for a valid request if log_obj['resource_type'] != 'security_group': - LOG.error(f"set_policy_logging: incompatible resource type: {log_obj['resource_type']}") + LOG.error(f"set_policy_logging: incompatible resource type: {log_obj['resource_type']}", context) return # Get current rules configuration @@ -997,19 +997,19 @@ def set_policy_logging(self, log_obj, enable_logging): # Update the logging state res = self.client.patch(path=API.POLICY.format(log_obj['resource_id']), data=data) res.raise_for_status() - self._sg_logged_drop_rules_realize({"id": log_obj['resource_id']}, False, enable_logging) + self._sg_logged_drop_rules_realize({"id": log_obj['resource_id']}, False, enable_logging, context) - def enable_policy_logging(self, log_obj): + def enable_policy_logging(self, log_obj, context=None): LOG.debug(f"PROVIDER: enable_policy_logging") - return self.set_policy_logging(log_obj, True) + return self.set_policy_logging(log_obj, True, context) - def disable_policy_logging(self, log_obj): + def disable_policy_logging(self, log_obj, context=None): LOG.debug(f"PROVIDER: disable_policy_logging") - return self.set_policy_logging(log_obj, False) + return self.set_policy_logging(log_obj, False, context) - def update_policy_logging(self, log_obj): + def update_policy_logging(self, log_obj, context=None): LOG.debug(f"PROVIDER: update_policy_logging") - return self.set_policy_logging(log_obj, log_obj['enabled']) + return self.set_policy_logging(log_obj, log_obj['enabled'], context) def tag_transport_zone(self, scope, tag): tz = self._get_tz() diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py index 37639835..98d463ae 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py @@ -194,7 +194,7 @@ def _age_cycle(self, _slice, port_current, sgr_current, qos_current, sgm_maybe_o self.AGE = int(time.time()) - def security_group_members(self, os_id: str, reference=False): + def security_group_members(self, os_id: str, reference=False, context=None): """ Realize security group members state. Realization will happen only if the group has active ports on the host @@ -217,11 +217,12 @@ def security_group_members(self, os_id: str, reference=False): paths = [p.path for p in segment_ports] # SG Members are not revisionable, use default "0" - pp.sg_members_realize({"id": os_id, "cidrs": cidrs, "revision_number": 0, "member_paths": paths}) + pp.sg_members_realize({"id": os_id, "cidrs": cidrs, "revision_number": 0, "member_paths": paths}, + context=context) else: - pp.sg_members_realize({"id": os_id}, delete=True) + pp.sg_members_realize({"id": os_id}, delete=True, context=context) - def security_group_rules(self, os_id: str): + def security_group_rules(self, os_id: str, context=None): """ Realize security group rules state. Realization will happen only if the group has active ports on the host. @@ -235,24 +236,25 @@ def security_group_rules(self, os_id: str): if os_sg and os_sg.get("ports"): # Create Members Container - self.security_group_members(os_id, reference=True) + self.security_group_members(os_id, reference=True, context=context) os_sg["rules"] = self.rpc.get_rules_for_security_group_id(os_id) for os_rule in os_sg["rules"]: remote_id = os_rule.get("remote_group_id") if remote_id: - self.security_group_members(remote_id, reference=True) + self.security_group_members(remote_id, reference=True, context=context) logged = self.rpc.has_security_group_logging(os_id) - LOG.info(f"Neutron DB logged flag for {os_id}: rpc.has_security_group_logging(os_id): {logged}") - self.plcy_provider.sg_rules_realize(os_sg, logged=logged) + LOG.info(f"Neutron DB logged flag for {os_id}: rpc.has_security_group_logging(os_id): {logged}", + context=context) + self.plcy_provider.sg_rules_realize(os_sg, logged=logged, context=context) else: - self.plcy_provider.sg_rules_realize({"id": os_id}, delete=True) + self.plcy_provider.sg_rules_realize({"id": os_id}, delete=True, context=context) # Skip members as they can be used as references - def precreate_port(self, os_id: str, network_meta: dict): + def precreate_port(self, os_id: str, network_meta: dict, context=None): """ Try to precreate port on first binding request. :os_id: -- OpenStack ID of the Port @@ -266,13 +268,13 @@ def precreate_port(self, os_id: str, network_meta: dict): if port: os_qid = port.get("qos_policy_id") if os_qid: - self.qos(os_qid, reference=True) + self.qos(os_qid, reference=True, context=context) if not port.get("vif_details") and network_meta: port["vif_details"] = network_meta - self._port_realize(port) + self._port_realize(port, context) - def port(self, os_id: str): + def port(self, os_id: str, context=None): """ Realize port state. :os_id: -- OpenStack ID of the Port @@ -286,11 +288,11 @@ def port(self, os_id: str): os_qid = port.get("qos_policy_id") if os_qid: self.qos(os_qid, reference=True) - self._port_realize(port) + self._port_realize(port, context=context) else: - self._port_realize({"id": os_id}, delete=True) + self._port_realize({"id": os_id}, delete=True, context=context) - def qos(self, os_id: str, reference=False): + def qos(self, os_id: str, reference=False, context=None): """ Realize QoS Policy state. :os_id: -- OpenStack ID of the QoS Policy @@ -307,11 +309,11 @@ def qos(self, os_id: str, reference=False): if not (reference and meta): qos = self.rpc.get_qos(os_id) if qos: - self._qos_realize(os_qos=qos) + self._qos_realize(os_qos=qos, context=context) else: - self._qos_realize(os_qos={"id": os_id}, delete=True) + self._qos_realize(os_qos={"id": os_id}, delete=True, context=context) - def network(self, os_seg_id: str): + def network(self, os_seg_id: str, context=None): """ Realize Network state. :os_seg_id: -- OpenStack Network Segmentation ID @@ -323,47 +325,47 @@ def network(self, os_seg_id: str): with LockManager.get_lock("network-{}".format(os_seg_id)): # TODO: mngr has to be removed after POLICY is fully supported provider = self.plcy_provider if self.USE_POLICY_API else self.mngr_provider - meta = provider.network_realize(os_seg_id) + meta = provider.network_realize(os_seg_id, context=context) return {"nsx-logical-switch-id": meta.unique_id, "external-id": meta.id, "segmentation_id": os_seg_id} - def enable_policy_logging(self, log_obj: dict): + def enable_policy_logging(self, log_obj: dict, context=None): """ Realize security policy logging state enablement. :os_seg_id: -- OpenStack Security Group ID :return: -- None """ with LockManager.get_lock("rules-{}".format(log_obj['resource_id'])): - self.plcy_provider.enable_policy_logging(log_obj) + self.plcy_provider.enable_policy_logging(log_obj, context) - def disable_policy_logging(self, log_obj: dict): + def disable_policy_logging(self, log_obj: dict, context=None): """ Realize security policy logging state disablement. :os_seg_id: -- OpenStack Security Group ID :return: -- None """ with LockManager.get_lock("rules-{}".format(log_obj['resource_id'])): - self.plcy_provider.disable_policy_logging(log_obj) + self.plcy_provider.disable_policy_logging(log_obj, context) - def update_policy_logging(self, log_obj: dict): + def update_policy_logging(self, log_obj: dict, context=None): """ Realize security policy logging state update. :os_seg_id: -- OpenStack Security Group ID :return: -- None """ with LockManager.get_lock("rules-{}".format(log_obj['resource_id'])): - self.plcy_provider.update_policy_logging(log_obj) + self.plcy_provider.update_policy_logging(log_obj, context) - def _qos_realize(self, os_qos: dict, delete=False): + def _qos_realize(self, os_qos: dict, delete=False, context=None): # TODO: mngr has to be removed after POLICY is fully supported provider = self.plcy_provider if self.USE_POLICY_API else self.mngr_provider - return provider.qos_realize(os_qos, delete) + return provider.qos_realize(os_qos, delete, context) - def _port_realize(self, os_port: dict, delete: bool = False): + def _port_realize(self, os_port: dict, delete: bool = False, context=None): # TODO: mngr has to be removed after POLICY is fully supported provider = self.plcy_provider if self.USE_POLICY_API else self.mngr_provider - return provider.port_realize(os_port, delete) + return provider.port_realize(os_port, delete, context=context) def _check_mp2policy_support(self): """Check if MP-to-Policy is forced, check if NSX-T version is supported From 9929c9701e554cc130ff5c64bbdae156d081e9fb Mon Sep 17 00:00:00 2001 From: Sven Rosenzweig Date: Wed, 16 Aug 2023 12:58:28 +0200 Subject: [PATCH 2/3] Use provided context for agent --> server RPC Calls, initially started from the driver, made from the agent to neutron server also used a one-time created context instead of the context associated with the call, leading to one single request id for all calls made by the agent, instead of them being tied to a client request for which they were made. For calls, made from the agent to neutron server initially triggered by the sync-loop, still use the one-time created context. --- networking_nsxv3/api/rpc.py | 39 ++++++++++--------- .../ml2/drivers/nsxv3/agent/realization.py | 18 ++++----- networking_nsxv3/tests/unit/openstack.py | 16 ++++---- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/networking_nsxv3/api/rpc.py b/networking_nsxv3/api/rpc.py index b07febae..56cf8c2d 100644 --- a/networking_nsxv3/api/rpc.py +++ b/networking_nsxv3/api/rpc.py @@ -27,7 +27,6 @@ def __init__(self, context): self.context = context self.rpc = rpc.get_client(target) - def _get_call_context(self, host=None): topic = topics.get_topic_name( topics.AGENT, nsxv3_constants.NSXV3, topics.UPDATE, host) @@ -133,7 +132,11 @@ def __init__(self): self.context = neutron_context.get_admin_context() self.client = rpc.get_client(target) self.host = cfg.CONF.host - + + def _choose_context(self, request_context): + if not request_context: + return self.context + return request_context @log_helpers.log_method_call def get_ports_with_revisions(self, limit, cursor): cctxt = self.client.prepare() @@ -153,38 +156,38 @@ def get_security_groups_with_revisions(self, limit, cursor): host=self.host, limit=limit, cursor=cursor) @log_helpers.log_method_call - def get_security_group(self, security_group_id): + def get_security_group(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'get_security_group', + return cctxt.call(self._choose_context(request_context), 'get_security_group', host=self.host, security_group_id=security_group_id) @log_helpers.log_method_call - def get_qos(self, qos_id): + def get_qos(self, qos_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'get_qos', host=self.host, qos_id=qos_id) + return cctxt.call(self._choose_context(request_context), 'get_qos', host=self.host, qos_id=qos_id) @log_helpers.log_method_call - def get_port(self, port_id): + def get_port(self, port_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'get_port', host=self.host, port_id=port_id) + return cctxt.call(self._choose_context(request_context), 'get_port', host=self.host, port_id=port_id) @log_helpers.log_method_call - def get_rules_for_security_group_id(self, security_group_id): + def get_rules_for_security_group_id(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'get_rules_for_security_group_id', + return cctxt.call(self._choose_context(request_context), 'get_rules_for_security_group_id', security_group_id=security_group_id) @log_helpers.log_method_call - def get_security_group_members_effective_ips(self, security_group_id): + def get_security_group_members_effective_ips(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, + return cctxt.call(self._choose_context(request_context), 'get_security_group_members_effective_ips', security_group_id=security_group_id) @log_helpers.log_method_call - def get_security_group_port_ids(self, security_group_id): + def get_security_group_port_ids(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'get_security_group_port_ids', + return cctxt.call(self._choose_context(request_context), 'get_security_group_port_ids', host=self.host, security_group_id=security_group_id) @log_helpers.log_method_call @@ -200,9 +203,9 @@ def get_remote_security_groups_for_host(self, limit, cursor): host=self.host, limit=limit, cursor=cursor) @log_helpers.log_method_call - def has_security_group_used_by_host(self, security_group_id): + def has_security_group_used_by_host(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'has_security_group_used_by_host', + return cctxt.call(self._choose_context(request_context), 'has_security_group_used_by_host', host=self.host, security_group_id=security_group_id) @log_helpers.log_method_call @@ -211,9 +214,9 @@ def get_port_logging(self, port_id): return cctxt.call(self.context, 'get_port_logging', port_id=port_id) @log_helpers.log_method_call - def has_security_group_logging(self, security_group_id): + def has_security_group_logging(self, security_group_id, request_context): cctxt = self.client.prepare() - return cctxt.call(self.context, 'has_security_group_logging', + return cctxt.call(self._choose_context(request_context), 'has_security_group_logging', security_group_id=security_group_id) diff --git a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py index 98d463ae..19f7bb2f 100644 --- a/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py +++ b/networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/realization.py @@ -209,9 +209,9 @@ def security_group_members(self, os_id: str, reference=False, context=None): pp = self.plcy_provider meta = pp.metadata(pp.SG_MEMBERS, os_id) if not (reference and meta): - if self.rpc.has_security_group_used_by_host(os_id): - cidrs = self.rpc.get_security_group_members_effective_ips(os_id) - port_ids = set(self.rpc.get_security_group_port_ids(os_id)) + if self.rpc.has_security_group_used_by_host(os_id, context): + cidrs = self.rpc.get_security_group_members_effective_ips(os_id, context) + port_ids = set(self.rpc.get_security_group_port_ids(os_id, context)) segment_ports = pp.get_port_meta_by_ids(port_ids) paths = [p.path for p in segment_ports] @@ -232,20 +232,20 @@ def security_group_rules(self, os_id: str, context=None): LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('security_group_rules realization')}") return with LockManager.get_lock("rules-{}".format(os_id)): - os_sg = self.rpc.get_security_group(os_id) + os_sg = self.rpc.get_security_group(os_id, context) if os_sg and os_sg.get("ports"): # Create Members Container self.security_group_members(os_id, reference=True, context=context) - os_sg["rules"] = self.rpc.get_rules_for_security_group_id(os_id) + os_sg["rules"] = self.rpc.get_rules_for_security_group_id(os_id, context) for os_rule in os_sg["rules"]: remote_id = os_rule.get("remote_group_id") if remote_id: self.security_group_members(remote_id, reference=True, context=context) - logged = self.rpc.has_security_group_logging(os_id) + logged = self.rpc.has_security_group_logging(os_id, context) LOG.info(f"Neutron DB logged flag for {os_id}: rpc.has_security_group_logging(os_id): {logged}", context=context) self.plcy_provider.sg_rules_realize(os_sg, logged=logged, context=context) @@ -264,7 +264,7 @@ def precreate_port(self, os_id: str, network_meta: dict, context=None): LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('port realization')}") return with LockManager.get_lock("port-{}".format(os_id)): - port: dict = self.rpc.get_port(os_id) + port: dict = self.rpc.get_port(os_id, context) if port: os_qid = port.get("qos_policy_id") if os_qid: @@ -283,7 +283,7 @@ def port(self, os_id: str, context=None): LOG.info(f"{self.MIGR_IN_PROGRESS_MSG.format('port realization')}") return with LockManager.get_lock("port-{}".format(os_id)): - port: dict = self.rpc.get_port(os_id) + port: dict = self.rpc.get_port(os_id, context) if port: os_qid = port.get("qos_policy_id") if os_qid: @@ -307,7 +307,7 @@ def qos(self, os_id: str, reference=False, context=None): meta = provider.metadata(provider.QOS, os_id) if not (reference and meta): - qos = self.rpc.get_qos(os_id) + qos = self.rpc.get_qos(os_id, context) if qos: self._qos_realize(os_qos=qos, context=context) else: diff --git a/networking_nsxv3/tests/unit/openstack.py b/networking_nsxv3/tests/unit/openstack.py index 976b7811..38f91fd9 100644 --- a/networking_nsxv3/tests/unit/openstack.py +++ b/networking_nsxv3/tests/unit/openstack.py @@ -327,7 +327,7 @@ def get_security_groups_with_revisions(self, limit, offset): effective_sgs.append((id, rev, cursor)) return effective_sgs - def has_security_group_used_by_host(self, os_id): + def has_security_group_used_by_host(self, os_id, context=None): sgs = set() for _, port in self.inventory.get_all(NeutronMock.PORT): port_sgs = port.get("security_groups") @@ -341,7 +341,7 @@ def has_security_group_used_by_host(self, os_id): return True return False - def get_security_group_port_ids(self, os_id): + def get_security_group_port_ids(self, os_id, context=None): ports = set() for port_id, port in self.inventory.get_all(NeutronMock.PORT): port_sgs = port.get("security_groups") @@ -349,7 +349,7 @@ def get_security_group_port_ids(self, os_id): ports.update(port_id) return ports - def get_security_group_members_effective_ips(self, os_id): + def get_security_group_members_effective_ips(self, os_id, context=None): sg = self.inventory.get_by_id(NeutronMock.SECURITY_GROUP, os_id) if not sg: return [] @@ -365,7 +365,7 @@ def get_security_group_members_effective_ips(self, os_id): effective_ips.extend(ips_allowed) return effective_ips - def get_security_group(self, os_id): + def get_security_group(self, os_id, context=None): sg = self.inventory.get_by_id(NeutronMock.SECURITY_GROUP, os_id) if not sg: return None @@ -373,14 +373,14 @@ def get_security_group(self, os_id): sg["ports"] = [o.get("id") for _, o in id_o if os_id in o.get("security_groups")] return sg - def get_rules_for_security_group_id(self, os_id): + def get_rules_for_security_group_id(self, os_id, context=None): id_o = self.inventory.get_all(NeutronMock.SECURITY_GROUP_RULE) return [o for _, o in id_o if os_id == o.get("security_group_id")] - def get_port(self, id): + def get_port(self, id, context=None): return self.inventory.get_by_id(NeutronMock.PORT, id) - def get_qos(self, os_id): + def get_qos(self, os_id, context): """ Return QoS only if associated with port """ @@ -388,6 +388,6 @@ def get_qos(self, os_id): if [o for _, o in id_o if o.get("qos_policy_id") == os_id]: return self.inventory.get_by_id(NeutronMock.QOS, os_id) - def has_security_group_logging(self, security_group_id): + def has_security_group_logging(self, security_group_id, context=None): g = self.inventory.get_by_id(NeutronMock.SECURITY_GROUP, security_group_id) return g is not None and g.get("logged") From 0b058c12ccbb39c976aca92f1183416405200887 Mon Sep 17 00:00:00 2001 From: Sven Rosenzweig Date: Wed, 16 Aug 2023 16:35:28 +0200 Subject: [PATCH 3/3] retrigger checks