diff --git a/st2common/st2common/services/sensor_watcher.py b/st2common/st2common/services/sensor_watcher.py index 1c54881663..363d73ab3e 100644 --- a/st2common/st2common/services/sensor_watcher.py +++ b/st2common/st2common/services/sensor_watcher.py @@ -21,6 +21,7 @@ import six from kombu.mixins import ConsumerMixin +from oslo_config import cfg from st2common import log as logging from st2common.transport import reactor, publishers @@ -126,7 +127,7 @@ def stop(self): @staticmethod def _get_queue(queue_suffix): queue_name = queue_utils.get_queue_name( - queue_name_base="st2.sensor.watch", + queue_name_base=f"{cfg.CONF.messaging.prefix}.sensor.watch", queue_name_suffix=queue_suffix, add_random_uuid_to_suffix=True, ) diff --git a/st2common/st2common/services/triggerwatcher.py b/st2common/st2common/services/triggerwatcher.py index b82a46043a..698bcadeba 100644 --- a/st2common/st2common/services/triggerwatcher.py +++ b/st2common/st2common/services/triggerwatcher.py @@ -18,6 +18,7 @@ import six from kombu.mixins import ConsumerMixin +from oslo_config import cfg from st2common import log as logging from st2common.persistence.trigger import Trigger @@ -163,7 +164,7 @@ def _load_triggers_from_db(self): @staticmethod def _get_queue(queue_suffix, exclusive): queue_name = queue_utils.get_queue_name( - queue_name_base="st2.trigger.watch", + queue_name_base=f"{cfg.CONF.messaging.prefix}.trigger.watch", queue_name_suffix=queue_suffix, add_random_uuid_to_suffix=True, ) diff --git a/st2reactor/st2reactor/container/sensor_wrapper.py b/st2reactor/st2reactor/container/sensor_wrapper.py index cc4723043a..9fa8df1881 100644 --- a/st2reactor/st2reactor/container/sensor_wrapper.py +++ b/st2reactor/st2reactor/container/sensor_wrapper.py @@ -15,6 +15,8 @@ from __future__ import absolute_import +from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG + # Note: We need to perform monkey patching in the worker. If we do it in # the master process (gunicorn_config.py), it breaks tons of things # including shutdown @@ -241,6 +243,12 @@ def __init__( ssl_match_hostname=cfg.CONF.database.ssl_match_hostname, ) + # Apply prefix to RabbitMQ exchanges/queues (which are not bootstrapped in the wrapper) + prefix = cfg.CONF.messaging.prefix + if prefix != "st2": + for exchange in (TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG): + exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1) + # 3. Instantiate the watcher self._trigger_watcher = TriggerWatcher( create_handler=self._handle_create_trigger, diff --git a/tools/st2-inject-trigger-instances.py b/tools/st2-inject-trigger-instances.py index 79b0f18e25..6af46c3b2f 100755 --- a/tools/st2-inject-trigger-instances.py +++ b/tools/st2-inject-trigger-instances.py @@ -37,7 +37,7 @@ from st2common import config from st2common.util.monkey_patch import monkey_patch from st2common.util import date as date_utils -from st2common.transport.reactor import TriggerDispatcher +from st2common.transport.reactor import TRIGGER_INSTANCE_XCHG, TriggerDispatcher def do_register_cli_opts(opts, ignore_errors=False): @@ -126,6 +126,12 @@ def main(): do_register_cli_opts(cli_opts) config.parse_args() + # Apply prefix to RabbitMQ exchanges/queues (which have not been bootstrapped) + prefix = cfg.CONF.messaging.prefix + if prefix != "st2": + exchange = TRIGGER_INSTANCE_XCHG + exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1) + # Get config values triggers = cfg.CONF.triggers trigger_payload_schema = {}