From e208bddefbda7a170f9d1a234ddca3f057d65303 Mon Sep 17 00:00:00 2001 From: Jacob Floyd Date: Sat, 9 Nov 2024 15:52:42 -0600 Subject: [PATCH] apply messaging.prefix for sensor_{wrapper,watcher} --- st2common/st2common/services/sensor_watcher.py | 3 ++- st2common/st2common/services/triggerwatcher.py | 3 ++- st2common/st2common/transport/reactor.py | 3 ++- st2reactor/st2reactor/container/sensor_wrapper.py | 7 +++++++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/st2common/st2common/services/sensor_watcher.py b/st2common/st2common/services/sensor_watcher.py index 1c54881663..363d73ab3e 100644 --- a/st2common/st2common/services/sensor_watcher.py +++ b/st2common/st2common/services/sensor_watcher.py @@ -21,6 +21,7 @@ import six from kombu.mixins import ConsumerMixin +from oslo_config import cfg from st2common import log as logging from st2common.transport import reactor, publishers @@ -126,7 +127,7 @@ def stop(self): @staticmethod def _get_queue(queue_suffix): queue_name = queue_utils.get_queue_name( - queue_name_base="st2.sensor.watch", + queue_name_base=f"{cfg.CONF.messaging.prefix}.sensor.watch", queue_name_suffix=queue_suffix, add_random_uuid_to_suffix=True, ) diff --git a/st2common/st2common/services/triggerwatcher.py b/st2common/st2common/services/triggerwatcher.py index b82a46043a..698bcadeba 100644 --- a/st2common/st2common/services/triggerwatcher.py +++ b/st2common/st2common/services/triggerwatcher.py @@ -18,6 +18,7 @@ import six from kombu.mixins import ConsumerMixin +from oslo_config import cfg from st2common import log as logging from st2common.persistence.trigger import Trigger @@ -163,7 +164,7 @@ def _load_triggers_from_db(self): @staticmethod def _get_queue(queue_suffix, exclusive): queue_name = queue_utils.get_queue_name( - queue_name_base="st2.trigger.watch", + queue_name_base=f"{cfg.CONF.messaging.prefix}.trigger.watch", queue_name_suffix=queue_suffix, add_random_uuid_to_suffix=True, ) diff --git a/st2common/st2common/transport/reactor.py b/st2common/st2common/transport/reactor.py index 2199a1b653..2ed01c8361 100644 --- a/st2common/st2common/transport/reactor.py +++ b/st2common/st2common/transport/reactor.py @@ -15,6 +15,7 @@ from __future__ import absolute_import from kombu import Exchange, Queue +from oslo_config import cfg from st2common import log as logging from st2common.constants.trace import TRACE_CONTEXT @@ -38,7 +39,7 @@ # Exchange for TriggerInstance events TRIGGER_INSTANCE_XCHG = Exchange("st2.trigger_instances_dispatch", type="topic") -# Exchane for Sensor CUD events +# Exchange for Sensor CUD events SENSOR_CUD_XCHG = Exchange("st2.sensor", type="topic") diff --git a/st2reactor/st2reactor/container/sensor_wrapper.py b/st2reactor/st2reactor/container/sensor_wrapper.py index cc4723043a..dcb0b55c03 100644 --- a/st2reactor/st2reactor/container/sensor_wrapper.py +++ b/st2reactor/st2reactor/container/sensor_wrapper.py @@ -15,6 +15,7 @@ from __future__ import absolute_import +from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG # Note: We need to perform monkey patching in the worker. If we do it in # the master process (gunicorn_config.py), it breaks tons of things # including shutdown @@ -241,6 +242,12 @@ def __init__( ssl_match_hostname=cfg.CONF.database.ssl_match_hostname, ) + # Apply prefix to RabbitMQ exchanges/queues (which are not bootstrapped in the wrapper) + prefix = cfg.CONF.messaging.prefix + if prefix != "st2": + for exchange in (TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG): + exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1) + # 3. Instantiate the watcher self._trigger_watcher = TriggerWatcher( create_handler=self._handle_create_trigger,