Skip to content

Commit

Permalink
apply messaging.prefix for sensor_{wrapper,watcher}
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd committed Nov 9, 2024
1 parent a812230 commit e208bdd
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 3 deletions.
3 changes: 2 additions & 1 deletion st2common/st2common/services/sensor_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.transport import reactor, publishers
Expand Down Expand Up @@ -126,7 +127,7 @@ def stop(self):
@staticmethod
def _get_queue(queue_suffix):
queue_name = queue_utils.get_queue_name(
queue_name_base="st2.sensor.watch",
queue_name_base=f"{cfg.CONF.messaging.prefix}.sensor.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/services/triggerwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.persistence.trigger import Trigger
Expand Down Expand Up @@ -163,7 +164,7 @@ def _load_triggers_from_db(self):
@staticmethod
def _get_queue(queue_suffix, exclusive):
queue_name = queue_utils.get_queue_name(
queue_name_base="st2.trigger.watch",
queue_name_base=f"{cfg.CONF.messaging.prefix}.trigger.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/transport/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import absolute_import
from kombu import Exchange, Queue
from oslo_config import cfg

from st2common import log as logging
from st2common.constants.trace import TRACE_CONTEXT
Expand All @@ -38,7 +39,7 @@
# Exchange for TriggerInstance events
TRIGGER_INSTANCE_XCHG = Exchange("st2.trigger_instances_dispatch", type="topic")

# Exchane for Sensor CUD events
# Exchange for Sensor CUD events
SENSOR_CUD_XCHG = Exchange("st2.sensor", type="topic")


Expand Down
7 changes: 7 additions & 0 deletions st2reactor/st2reactor/container/sensor_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from __future__ import absolute_import

from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
# Note: We need to perform monkey patching in the worker. If we do it in
# the master process (gunicorn_config.py), it breaks tons of things
# including shutdown
Expand Down Expand Up @@ -241,6 +242,12 @@ def __init__(
ssl_match_hostname=cfg.CONF.database.ssl_match_hostname,
)

# Apply prefix to RabbitMQ exchanges/queues (which are not bootstrapped in the wrapper)
prefix = cfg.CONF.messaging.prefix
if prefix != "st2":
for exchange in (TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG):
exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1)

# 3. Instantiate the watcher
self._trigger_watcher = TriggerWatcher(
create_handler=self._handle_create_trigger,
Expand Down

0 comments on commit e208bdd

Please sign in to comment.