Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use context provided by request #121

Open
wants to merge 3 commits into
base: stable/yoga-m3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions networking_nsxv3/api/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def __init__(self, context):

self.context = context
self.rpc = rpc.get_client(target)

def _get_call_context(self, host=None):
topic = topics.get_topic_name(
topics.AGENT, nsxv3_constants.NSXV3, topics.UPDATE, host)
Expand Down Expand Up @@ -133,7 +132,11 @@ def __init__(self):
self.context = neutron_context.get_admin_context()
self.client = rpc.get_client(target)
self.host = cfg.CONF.host


def _choose_context(self, request_context):
if not request_context:
return self.context
return request_context
@log_helpers.log_method_call
def get_ports_with_revisions(self, limit, cursor):
cctxt = self.client.prepare()
Expand All @@ -153,38 +156,38 @@ def get_security_groups_with_revisions(self, limit, cursor):
host=self.host, limit=limit, cursor=cursor)

@log_helpers.log_method_call
def get_security_group(self, security_group_id):
def get_security_group(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_security_group',
return cctxt.call(self._choose_context(request_context), 'get_security_group',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
def get_qos(self, qos_id):
def get_qos(self, qos_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_qos', host=self.host, qos_id=qos_id)
return cctxt.call(self._choose_context(request_context), 'get_qos', host=self.host, qos_id=qos_id)

@log_helpers.log_method_call
def get_port(self, port_id):
def get_port(self, port_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_port', host=self.host, port_id=port_id)
return cctxt.call(self._choose_context(request_context), 'get_port', host=self.host, port_id=port_id)

@log_helpers.log_method_call
def get_rules_for_security_group_id(self, security_group_id):
def get_rules_for_security_group_id(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_rules_for_security_group_id',
return cctxt.call(self._choose_context(request_context), 'get_rules_for_security_group_id',
security_group_id=security_group_id)

@log_helpers.log_method_call
def get_security_group_members_effective_ips(self, security_group_id):
def get_security_group_members_effective_ips(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context,
return cctxt.call(self._choose_context(request_context),
'get_security_group_members_effective_ips',
security_group_id=security_group_id)

@log_helpers.log_method_call
def get_security_group_port_ids(self, security_group_id):
def get_security_group_port_ids(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'get_security_group_port_ids',
return cctxt.call(self._choose_context(request_context), 'get_security_group_port_ids',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
Expand All @@ -200,9 +203,9 @@ def get_remote_security_groups_for_host(self, limit, cursor):
host=self.host, limit=limit, cursor=cursor)

@log_helpers.log_method_call
def has_security_group_used_by_host(self, security_group_id):
def has_security_group_used_by_host(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'has_security_group_used_by_host',
return cctxt.call(self._choose_context(request_context), 'has_security_group_used_by_host',
host=self.host, security_group_id=security_group_id)

@log_helpers.log_method_call
Expand All @@ -211,9 +214,9 @@ def get_port_logging(self, port_id):
return cctxt.call(self.context, 'get_port_logging', port_id=port_id)

@log_helpers.log_method_call
def has_security_group_logging(self, security_group_id):
def has_security_group_logging(self, security_group_id, request_context):
cctxt = self.client.prepare()
return cctxt.call(self.context, 'has_security_group_logging',
return cctxt.call(self._choose_context(request_context), 'has_security_group_logging',
security_group_id=security_group_id)


Expand Down
12 changes: 7 additions & 5 deletions networking_nsxv3/common/synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import functools
import collections

from oslo_context import context

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,10 +70,11 @@ def retry_next(self):

class Runnable(object):

def __init__(self, idn, fn, priority=Priority.LOWEST):
def __init__(self, idn, fn, priority=Priority.LOWEST, context=None):
self.priority = priority
self.idn = idn
self.fn = fn
self.context = context

def __repr__(self):
return str(self.idn)
Expand Down Expand Up @@ -137,7 +139,7 @@ def __init__(self, active_size=INFINITY, passive_size=INFINITY,
self._idle = workers_size
self._state = "not started"

def run(self, priority, ids, fn):
def run(self, priority, ids, fn, context=None):
""" Submit a job with priority

Keyword arguments:
Expand All @@ -154,7 +156,7 @@ def run(self, priority, ids, fn):
try:
LOG.info(MESSAGE.format("Enqueued", jid, priority.name, fn.__name__))

job = Runnable(jid, fn, priority.value)
job = Runnable(jid, fn, priority.value, context)
if priority.value == Priority.HIGHEST:
self._active.put_nowait(job)
else:
Expand All @@ -172,8 +174,8 @@ def _start(self):
self._active.put_nowait(self._passive.get_nowait())
self._passive.task_done()
job = self._active.get(block=True, timeout=TIMEOUT)
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__))
self._workers.spawn(job.fn, job.idn)#.wait()
LOG.info(MESSAGE.format("Processing", job.idn, Priority(job.priority).name, job.fn.__name__), context=job.context)
self._workers.spawn(job.fn, job.idn, job.context)#.wait()
self._active.task_done()
except eventlet.queue.Empty:
LOG.info("No activity for the last {} seconds.".format(TIMEOUT))
Expand Down
34 changes: 17 additions & 17 deletions networking_nsxv3/plugins/ml2/drivers/nsxv3/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,61 +72,61 @@ def get_network_bridge(self, context, current, network_segments, network_current
seg_id = ns.get("segmentation_id")
net_type = ns.get("network_type")
if seg_id and net_type in nsxv3_constants.NSXV3_AGENT_NETWORK_TYPES:
network_meta = self.realizer.network(seg_id)
network_meta = self.realizer.network(seg_id, context=context)
break

if try_create_port and bool(network_meta.get("nsx-logical-switch-id")):
self.realizer.precreate_port(current["id"], network_meta)
self.realizer.precreate_port(current["id"], network_meta, context=context)

return network_meta

def security_groups_member_updated(self, context, **kwargs):
self.callback(kwargs["security_groups"], self.realizer.security_group_members)
self.callback(kwargs["security_groups"], self.realizer.security_group_members, context)

def security_groups_rule_updated(self, context, **kwargs):
self.callback(kwargs["security_groups"], self.realizer.security_group_rules)
self.callback(kwargs["security_groups"], self.realizer.security_group_rules, context)

def port_create(self, **kwargs):
self.realizer.port(kwargs["port"]["id"])

def port_update(self, context, **kwargs):
# Ensure security groups attached to the port are synced first
for sg in kwargs["port"].get("security_groups", []):
self.callback(sg, self.realizer.security_group_rules)
self.callback(sg, self.realizer.security_group_rules, context)
# Also ensure allowed_address_pairs are re-processed
self.callback(sg, self.realizer.security_group_members)
self.callback(kwargs["port"]["id"], self.realizer.port)
self.callback(sg, self.realizer.security_group_members, context)
self.callback(kwargs["port"]["id"], self.realizer.port, context)

def port_delete(self, context, **kwargs):
# Ports removed by the background synchronization
pass

def create_policy(self, context, policy):
self.update_policy(context, policy)
self.update_policy(context, policy, context)

def delete_policy(self, context, policy):
self.update_policy(context, policy)
self.update_policy(context, policy, context)

def update_policy(self, context, policy):
self.callback(policy["id"], self.realizer.qos)
self.callback(policy["id"], self.realizer.qos, context)

def validate_policy(self, context, policy):
pass

def create_log(self, context, log_obj):
self.callback(log_obj, self.realizer.enable_policy_logging)
self.callback(log_obj, self.realizer.enable_policy_logging, context)

def create_log_precommit(self, context, log_obj):
pass

def update_log(self, context, log_obj):
self.callback(log_obj, self.realizer.update_policy_logging)
self.callback(log_obj, self.realizer.update_policy_logging, context)

def update_log_precommit(self, context, log_obj):
pass

def delete_log(self, context, log_obj):
self.callback(log_obj, self.realizer.disable_policy_logging)
self.callback(log_obj, self.realizer.disable_policy_logging, context)

def delete_log_precommit(self, context, log_obj):
pass
Expand Down Expand Up @@ -162,15 +162,15 @@ def _sync_all(self):
except Exception as err:
LOG.error("Synchronization has failed. Error: %s", err)

def _sync_immediate(self, os_ids, realizer):
def _sync_immediate(self, os_ids, realizer, context=None):
ids = list(os_ids) if isinstance(os_ids, set) else os_ids
ids = ids if isinstance(ids, list) else [ids]
self.runner.run(sync.Priority.HIGHEST, ids, realizer)
self.runner.run(sync.Priority.HIGHEST, ids, realizer, context)

def _sync_delayed(self, os_ids, realizer):
def _sync_delayed(self, os_ids, realizer, context=None):
ids = list(os_ids) if isinstance(os_ids, set) else os_ids
ids = ids if isinstance(ids, list) else [ids]
self.runner.run(sync.Priority.HIGH, ids, realizer)
self.runner.run(sync.Priority.HIGH, ids, realizer, context)

def kpi(self):
return {"active": self.runner.active(), "passive": self.runner.passive()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ def metadata(self, resource_type, os_id) -> ResourceMeta:
def meta_provider(self, resource_type) -> base.MetaProvider:
return self._metadata.get(resource_type)

def _realize(self, resource_type, delete, convertor, os_o, provider_o):
def _realize(self, resource_type, delete, convertor, os_o, provider_o, context=None):
os_id = os_o.get("id")

begin_report = "[{}] Resource: {} with ID: {} is going to be %s.".format(self.provider, resource_type, os_id)
Expand All @@ -601,24 +601,24 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o):
if resource_type == Provider.PORT:
res = self.client.get(path=path)
if res.status_code == 404:
LOG.info(end_report, "rescheduled due 404: not found")
LOG.info(end_report, "rescheduled due 404: not found", context=context)
return metadata

o = res.json()
if o.get("attachment", {}).get("context", {}).get("vif_type") != "CHILD":
stamp = int(o.get("_last_modified_time")) / 1000

if not self.orphan_ports_tmout_passed(stamp):
LOG.info(end_report, "rescheduled for deletion")
LOG.info(end_report, "rescheduled for deletion", context=context)
return metadata

self.client.delete(path=path, params=params)

LOG.info(end_report, "deleted")
LOG.info(end_report, "deleted", context=context)

return self.metadata_delete(resource_type, os_id)
else:
LOG.info(begin_report, "updated")
LOG.info(begin_report, "updated", context=context)
if resource_type == Provider.SG_RULES_EXT:
LOG.debug(
"Skipping update of NSGroup:%s",
Expand All @@ -627,15 +627,15 @@ def _realize(self, resource_type, delete, convertor, os_o, provider_o):
if metadata.revision != None:
data["_revision"] = metadata.revision
o = self.client.put(path=path, data=data)
LOG.info(end_report, "updated")
LOG.info(end_report, "updated", context=context)
return self.metadata_update(resource_type, o.json())
else:
if not delete:
LOG.info(begin_report, "created")
LOG.info(begin_report, "created", context=context)
o = self.client.post(path=path, data=convertor(os_o, provider_o))
LOG.info(end_report, "created")
LOG.info(end_report, "created", context=context)
return self.metadata_update(resource_type, o.json())
LOG.info(end_report, "already deleted")
LOG.info(end_report, "already deleted", context=context)

def outdated(self, resource_type: str, os_meta):
self.metadata_refresh(resource_type)
Expand Down Expand Up @@ -698,7 +698,7 @@ def get_port(self, os_id):
return self.metadata_update(Provider.PORT, port), port
return None

def port_realize(self, os_port: dict, delete=False):
def port_realize(self, os_port: dict, delete=False, context=None):
provider_port = dict()

if delete:
Expand All @@ -711,39 +711,39 @@ def port_realize(self, os_port: dict, delete=False):
if parent_port and parent_port[0]:
provider_port["parent_id"] = parent_port[0].id
else:
LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id"))
LOG.warning("Not found. Parent Port:%s for Child Port:%s", os_port.get("parent_id"), os_port.get("id"), context=context)
return
else:
# Parent port is NOT always created externally
port = self.get_port(os_port.get("id"))
if port and port[0]:
provider_port["id"] = port[0].id
else:
LOG.warning("Not found. Port: %s", os_port.get("id"))
LOG.warning("Not found. Port: %s", os_port.get("id"), context=context)

if os_port.get("qos_policy_id"):
meta_qos = self.metadata(Provider.QOS, os_port.get("qos_policy_id"))
if meta_qos:
provider_port["qos_policy_id"] = meta_qos.id
else:
LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id"))
LOG.warning("Not found. QoS:%s for Port:%s", os_port.get("qos_policy_id"), os_port.get("id"), context=context)

provider_port["switching_profile_ids"] = copy.deepcopy(self.switch_profiles)

return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port)
return self._realize(Provider.PORT, False, self.payload.port, os_port, provider_port, context=context)

def qos_realize(self, qos, delete=False):
return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict())
def qos_realize(self, qos, delete=False, context=None):
return self._realize(Provider.QOS, delete, self.payload.qos, qos, dict(), context)

def sg_members_realize(self, sg, delete=False):
def sg_members_realize(self, sg, delete=False, context=None):
if delete and self.metadata(Provider.SG_RULES, sg.get("id")):
LOG.warning(
"Resource: %s with ID: %s deletion is rescheduled due to dependency.", Provider.SG_MEMBERS, sg.get("id")
)
return
return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict())
return self._realize(Provider.SG_MEMBERS, delete, self.payload.sg_members_container, sg, dict(), context)

def sg_rules_realize(self, os_sg, delete=False, logged=False):
def sg_rules_realize(self, os_sg, delete=False, logged=False, context=None):
provider_sg = dict()

nsg_args = [Provider.SG_RULES_EXT, delete, self.payload.sg_rules_ext_container, os_sg, dict()]
Expand All @@ -764,7 +764,7 @@ def sg_rules_realize(self, os_sg, delete=False, logged=False):

# Update section tags(revision) when all rules applied successfully
provider_sg["tags_update"] = True
self._realize(*sec_args)
self._realize(*sec_args, context=context)

def _sg_rules_realize(self, os_sg, meta_sg: ResourceMeta, logged=False):

Expand Down Expand Up @@ -824,7 +824,7 @@ def _create_sg_provider_rule_remote_prefix(self, cidr):
def _delete_sg_provider_rule_remote_prefix(self, id):
self.client.delete(path=API.IPSET.format(id))

def network_realize(self, segmentation_id):
def network_realize(self, segmentation_id, context=None):
meta = self.metadata(self.NETWORK, segmentation_id)
if not meta:
os_net = {"id": "{}-{}".format(self.zone_name, segmentation_id), "segmentation_id": segmentation_id}
Expand Down
Loading