Skip to content

Commit

Permalink
apply messaging.prefix for sensor_{wrapper,watcher}
Browse files Browse the repository at this point in the history
  • Loading branch information
cognifloyd committed Nov 10, 2024
1 parent 396db76 commit 6bc8221
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 3 deletions.
3 changes: 2 additions & 1 deletion st2common/st2common/services/sensor_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.transport import reactor, publishers
Expand Down Expand Up @@ -126,7 +127,7 @@ def stop(self):
@staticmethod
def _get_queue(queue_suffix):
queue_name = queue_utils.get_queue_name(
queue_name_base="st2.sensor.watch",
queue_name_base=f"{cfg.CONF.messaging.prefix}.sensor.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
3 changes: 2 additions & 1 deletion st2common/st2common/services/triggerwatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import six
from kombu.mixins import ConsumerMixin
from oslo_config import cfg

from st2common import log as logging
from st2common.persistence.trigger import Trigger
Expand Down Expand Up @@ -163,7 +164,7 @@ def _load_triggers_from_db(self):
@staticmethod
def _get_queue(queue_suffix, exclusive):
queue_name = queue_utils.get_queue_name(
queue_name_base="st2.trigger.watch",
queue_name_base=f"{cfg.CONF.messaging.prefix}.trigger.watch",
queue_name_suffix=queue_suffix,
add_random_uuid_to_suffix=True,
)
Expand Down
8 changes: 8 additions & 0 deletions st2reactor/st2reactor/container/sensor_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from __future__ import absolute_import

from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG

# Note: We need to perform monkey patching in the worker. If we do it in
# the master process (gunicorn_config.py), it breaks tons of things
# including shutdown
Expand Down Expand Up @@ -241,6 +243,12 @@ def __init__(
ssl_match_hostname=cfg.CONF.database.ssl_match_hostname,
)

# Apply prefix to RabbitMQ exchanges/queues (which are not bootstrapped in the wrapper)
prefix = cfg.CONF.messaging.prefix
if prefix != "st2":
for exchange in (TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG):
exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1)

# 3. Instantiate the watcher
self._trigger_watcher = TriggerWatcher(
create_handler=self._handle_create_trigger,
Expand Down
8 changes: 7 additions & 1 deletion tools/st2-inject-trigger-instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from st2common import config
from st2common.util.monkey_patch import monkey_patch
from st2common.util import date as date_utils
from st2common.transport.reactor import TriggerDispatcher
from st2common.transport.reactor import TRIGGER_INSTANCE_XCHG, TriggerDispatcher


def do_register_cli_opts(opts, ignore_errors=False):
Expand Down Expand Up @@ -126,6 +126,12 @@ def main():
do_register_cli_opts(cli_opts)
config.parse_args()

# Apply prefix to RabbitMQ exchanges/queues (which have not been bootstrapped)
prefix = cfg.CONF.messaging.prefix
if prefix != "st2":
exchange = TRIGGER_INSTANCE_XCHG
exchange.name = exchange.name.replace("st2.", f"{prefix}.", 1)

# Get config values
triggers = cfg.CONF.triggers
trigger_payload_schema = {}
Expand Down

0 comments on commit 6bc8221

Please sign in to comment.