From 05d5c5e7d3b1f53ace36eb4510e92d955b95fb52 Mon Sep 17 00:00:00 2001 From: Emelia Lei Date: Wed, 13 Nov 2024 10:47:07 -0500 Subject: [PATCH] IT[BMQ]: Add tests to remove an appid with consumer still connected Signed-off-by: Emelia Lei --- src/integration-tests/test_appids.py | 102 +++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/src/integration-tests/test_appids.py b/src/integration-tests/test_appids.py index d0e333433..c439ce322 100644 --- a/src/integration-tests/test_appids.py +++ b/src/integration-tests/test_appids.py @@ -641,3 +641,105 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster): consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True) == Client.e_SUCCESS ) + + +def test_remove_appid_reconfigure(cluster: Cluster): + proxies = cluster.proxy_cycle() + + producer = next(proxies).create_client("producer") + producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True) + + # --------------------------------------------------------------------- + # Create a consumer for each authorized substream. + + consumers = {} + + app_ids = default_app_ids + victim_app_id = app_ids[0] + + for app_id in app_ids: + consumer = next(proxies).create_client(app_id) + consumers[app_id] = consumer + consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) + + # --------------------------------------------------------------------- + # Post 5 messages. + + producer.post( + tc.URI_FANOUT, + ["msg1", "msg2", "msg3", "msg4", "msg5"], + succeed=True, + wait_ack=True, + ) + + # --------------------------------------------------------------------- + # Reconfigure domain to remove app_id foo. + + app_ids.remove(victim_app_id) + set_app_ids(cluster, app_ids) + + # --------------------------------------------------------------------- + # Consumer connected to foo receives the message and tries to confirm. + + consumers[victim_app_id].wait_push_event(timeout=5) + assert ( + consumers[victim_app_id].confirm( + tc.URI_FANOUT, "+1", succeed=True, no_except=True + ) + == Client.e_UNKNOWN + ) + + +def test_remove_appid_cluster_restart(cluster: Cluster): + leader = cluster.last_known_leader + proxies = cluster.proxy_cycle() + + producer = next(proxies).create_client("producer") + producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True) + + # --------------------------------------------------------------------- + # Create a consumer for each authorized substream. + + consumers = {} + + app_ids = default_app_ids + victim_app_id = app_ids[0] + + for app_id in app_ids: + consumer = next(proxies).create_client(app_id) + consumers[app_id] = consumer + consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True) + + # --------------------------------------------------------------------- + # Post 5 messages. + + producer.post( + tc.URI_FANOUT, + ["msg1", "msg2", "msg3", "msg4", "msg5"], + succeed=True, + wait_ack=True, + ) + + # --------------------------------------------------------------------- + # Remove app_id foo and restart broker + + victim_app_id = app_ids[0] + app_ids.remove(victim_app_id) + + cluster.restart_nodes() + + # --------------------------------------------------------------------- + # Broker can still see the messages + + assert leader.capture_n("msg*", 5, timeout=1) + + # --------------------------------------------------------------------- + # Consumer connected to foo receives the message and tries to confirm. + + consumers[victim_app_id].wait_push_event(timeout=5) + assert ( + consumers[victim_app_id].confirm( + tc.URI_FANOUT, "+1", succeed=True, no_except=True + ) + == Client.e_UNKNOWN + )