Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
Signed-off-by: Jean-Louis Leroy <[email protected]>
  • Loading branch information
jll63 committed Jan 19, 2024
1 parent 64a1de1 commit 58f0166
Show file tree
Hide file tree
Showing 17 changed files with 2,105 additions and 2,183 deletions.
57 changes: 26 additions & 31 deletions src/integration-tests/test_admin_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
Testing admin session.
This suite of test cases exercises admin session connection and some basic
commands.
"""

import json
Expand All @@ -9,40 +10,34 @@
from bmq.dev.it.process.admin import AdminClient


class TestAdminClient:
"""
This suite of test cases exercises admin session connection and some basic
commands.
"""
def test_admin(local_cluster: Cluster):
cluster: Cluster = local_cluster
endpoint: str = cluster.config.definition.nodes[0].transport.tcp.endpoint # type: ignore

def test_admin(self, local_cluster: Cluster):
cluster: Cluster = local_cluster
endpoint: str = cluster.config.definition.nodes[0].transport.tcp.endpoint # type: ignore
# Extract the (host, port) pair from the config
m = re.match(r".+://(.+):(\d+)", endpoint) # tcp://host:port
assert m is not None
host, port = str(m.group(1)), int(m.group(2))

# Extract the (host, port) pair from the config
m = re.match(r".+://(.+):(\d+)", endpoint) # tcp://host:port
assert m is not None
host, port = str(m.group(1)), int(m.group(2))
# Start the admin client
admin = AdminClient()
admin.connect(host, port)

# Start the admin client
admin = AdminClient()
admin.connect(host, port)
# Check basic "help" command
assert (
"This process responds to the following CMD subcommands:"
in admin.send_admin("help")
)

# Check basic "help" command
assert (
"This process responds to the following CMD subcommands:"
in admin.send_admin("help")
)
# Check non-existing "invalid cmd" command
assert "Unable to decode command" in admin.send_admin("invalid cmd")

# Check non-existing "invalid cmd" command
assert "Unable to decode command" in admin.send_admin("invalid cmd")
# Check more complex "brokerconfig dump" command, expect valid json
# with the same "port" value as the one used for this connection
broker_config_str = admin.send_admin("brokerconfig dump")
broker_config = json.loads(broker_config_str)

# Check more complex "brokerconfig dump" command, expect valid json
# with the same "port" value as the one used for this connection
broker_config_str = admin.send_admin("brokerconfig dump")
broker_config = json.loads(broker_config_str)
assert broker_config["networkInterfaces"]["tcpInterface"]["port"] == port

assert broker_config["networkInterfaces"]["tcpInterface"]["port"] == port

# Stop the admin session
admin.stop()
# Stop the admin session
admin.stop()
77 changes: 38 additions & 39 deletions src/integration-tests/test_alarms.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,57 @@
"""
Testing broker ALARMS.
"""

import time

import bmq.dev.it.testconstants as tc
from bmq.dev.it.fixtures import Cluster, cluster, tweak # pylint: disable=unused-import


class TestAlarms:
@tweak.cluster.queue_operations.consumption_monitor_period_ms(500)
@tweak.domain.max_idle_time(3)
def test_no_alarms_for_a_slow_queue(cluster: Cluster):
"""
Testing broker ALARMS.
Test no broker ALARMS for a slowly moving queue.
"""
leader = cluster.last_known_leader
proxy = next(cluster.proxy_cycle())

@tweak.cluster.queue_operations.consumption_monitor_period_ms(500)
@tweak.domain.max_idle_time(3)
def test_no_alarms_for_a_slow_queue(self, cluster: Cluster):
"""
Test no broker ALARMS for a slowly moving queue.
"""
leader = cluster.last_known_leader
proxy = next(cluster.proxy_cycle())

producer = proxy.create_client("producer")
producer.open(tc.URI_PRIORITY, flags=["write,ack"], succeed=True)
producer = proxy.create_client("producer")
producer.open(tc.URI_PRIORITY, flags=["write,ack"], succeed=True)

consumer1 = proxy.create_client("consumer1")
consumer2 = proxy.create_client("consumer2")
consumer1.open(
tc.URI_PRIORITY, flags=["read"], max_unconfirmed_messages=1, succeed=True
)
consumer1 = proxy.create_client("consumer1")
consumer2 = proxy.create_client("consumer2")
consumer1.open(
tc.URI_PRIORITY, flags=["read"], max_unconfirmed_messages=1, succeed=True
)

producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)

consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)
consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)

producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)

time.sleep(4)
time.sleep(4)

# First, test the alarm
assert leader.alarms("QUEUE_CONSUMER_MONITOR", 1)
leader.drain()
# First, test the alarm
assert leader.alarms("QUEUE_CONSUMER_MONITOR", 1)
leader.drain()

# Then test no alarm while consumer1 slowly confirms
time.sleep(1)
consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)
# Then test no alarm while consumer1 slowly confirms
time.sleep(1)
consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)

time.sleep(1)
consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)
time.sleep(1)
consumer1.confirm(tc.URI_PRIORITY, "*", succeed=True)
producer.post(tc.URI_PRIORITY, ["msg1"], succeed=True, wait_ack=True)

time.sleep(1)
# Consumer2 picks the last message
consumer2.open(
tc.URI_PRIORITY, flags=["read"], max_unconfirmed_messages=1, succeed=True
)
time.sleep(1)
# Consumer2 picks the last message
consumer2.open(
tc.URI_PRIORITY, flags=["read"], max_unconfirmed_messages=1, succeed=True
)

time.sleep(1)
assert not leader.alarms("QUEUE_CONSUMER_MONITOR", 1)
time.sleep(1)
assert not leader.alarms("QUEUE_CONSUMER_MONITOR", 1)
Loading

0 comments on commit 58f0166

Please sign in to comment.