Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IT[BMQ]: Add tests to remove an appid with consumer still connected #512

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/integration-tests/test_appids.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,105 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):
consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True)
== Client.e_SUCCESS
)


def test_remove_appid_reconfigure(cluster: Cluster):
proxies = cluster.proxy_cycle()

producer = next(proxies).create_client("producer")
producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True)

# ---------------------------------------------------------------------
# Create a consumer for each authorized substream.

consumers = {}

app_ids = default_app_ids
victim_app_id = app_ids[0]

for app_id in app_ids:
consumer = next(proxies).create_client(app_id)
consumers[app_id] = consumer
consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True)

# ---------------------------------------------------------------------
# Post 5 messages.

producer.post(
tc.URI_FANOUT,
["msg1", "msg2", "msg3", "msg4", "msg5"],
succeed=True,
wait_ack=True,
)

# ---------------------------------------------------------------------
# Reconfigure domain to remove app_id foo.

app_ids.remove(victim_app_id)
set_app_ids(cluster, app_ids)

# ---------------------------------------------------------------------
# Consumer connected to foo receives the message and tries to confirm.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also verify that other consumers can confirm normally?


consumers[victim_app_id].wait_push_event(timeout=5)
assert (
consumers[victim_app_id].confirm(
tc.URI_FANOUT, "+1", succeed=True, no_except=True
)
== Client.e_UNKNOWN
)


def test_remove_appid_cluster_restart(cluster: Cluster):
leader = cluster.last_known_leader
proxies = cluster.proxy_cycle()

producer = next(proxies).create_client("producer")
producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True)

# ---------------------------------------------------------------------
# Create a consumer for each authorized substream.

consumers = {}

app_ids = default_app_ids
victim_app_id = app_ids[0]

for app_id in app_ids:
consumer = next(proxies).create_client(app_id)
consumers[app_id] = consumer
consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True)

# ---------------------------------------------------------------------
# Post 5 messages.

producer.post(
tc.URI_FANOUT,
["msg1", "msg2", "msg3", "msg4", "msg5"],
succeed=True,
wait_ack=True,
)

# ---------------------------------------------------------------------
# Remove app_id foo and restart broker

victim_app_id = app_ids[0]
app_ids.remove(victim_app_id)

cluster.restart_nodes()

# ---------------------------------------------------------------------
# Broker can still see the messages

assert leader.capture_n("msg*", 5, timeout=1)

# ---------------------------------------------------------------------
# Consumer connected to foo receives the message and tries to confirm.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also verify that other consumers can confirm normally?


consumers[victim_app_id].wait_push_event(timeout=5)
assert (
consumers[victim_app_id].confirm(
tc.URI_FANOUT, "+1", succeed=True, no_except=True
)
== Client.e_UNKNOWN
)
Loading