Skip to content

Commit

Permalink
adapt tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jean-Louis Leroy <[email protected]>
  • Loading branch information
jll63 committed Jan 19, 2024
1 parent 58f0166 commit 445ef6b
Show file tree
Hide file tree
Showing 28 changed files with 1,887 additions and 1,779 deletions.
54 changes: 11 additions & 43 deletions src/integration-tests/README.md
Original file line number Diff line number Diff line change
@@ -1,46 +1,14 @@
## Integration Tests
# BlazingMQ Integration Tests

This directory contains integration tests based on the `ito` and 'bmqit'
frameworks. It contains the following test suites:
[WIP]

To run the tests:

### `00breathing_test.py`

Provides a basic integration test suite.


### `20cluster_node_shutdown_test.py`

Integration test that shuts down a cluster node and confirms that the system
recovers and keeps working as expected.


### `20node_status_change_test.py`

Integration test that suspends a node and confirms that the system recovers
and performs as expected.

### `30leader_node_delay.py`

Integration test that temporarily suspends the leader node, resulting in
followers becoming leaderless. When leader is unpaused, they will re-discover
the leader but in PASSIVE state, and then in ACTIVE state once healing logic
kicks in.

### `50appids_test.py`

Integration test suite exercising AppIDs.


### `50broadcast_test.py`

Integration test suite exercising broadcast functionality.


### `50list_messages_test.py`

Integration test suite exercising list messages functionality.

### `50maxqueues_test.py`

Integration test suite exercising Max Queues functionality.
* (create and) activate a Python 3.8 (or above) `venv`
* `python -m venv /path/to/venv`
* `source /path/to/venv/bin/activate`
* install required modules
* `pip install -r src/python/requirements.txt`
* run the tests
* `cd src/integration-tests`
* `./run-tests [extra pytest options]`
43 changes: 39 additions & 4 deletions src/integration-tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import contextlib
import logging
import pytest

import bmq.dev.it.logging
import bmq.util.logging as bul
from bmq.dev.pytest import PYTEST_LOG_SPEC_VAR
import blazingmq.dev.it.logging
import blazingmq.util.logging as bul
from blazingmq.dev.pytest import PYTEST_LOG_SPEC_VAR


def pytest_addoption(parser):
Expand Down Expand Up @@ -89,9 +90,18 @@ def pytest_addoption(parser):
)
parser.addini(PYTEST_LOG_SPEC_VAR, help_, type=None, default=None)

help_ = "run only with the specified order"
parser.addoption(
"--bmq-wave",
type=int,
action="store",
metavar="WAVE",
help=help_,
)


def pytest_configure(config):
logging.setLoggerClass(bmq.dev.it.logging.BMQLogger)
logging.setLoggerClass(blazingmq.dev.it.logging.BMQLogger)

level_spec = config.getoption(PYTEST_LOG_SPEC_VAR) or config.getini(
PYTEST_LOG_SPEC_VAR
Expand All @@ -103,3 +113,28 @@ def pytest_configure(config):
top_level = levels[0]
logging.getLogger("proc").setLevel(top_level)
logging.getLogger("test").setLevel(top_level)


def pytest_collection_modifyitems(config, items):
active_wave = config.getoption("bmq_wave")
if active_wave is None:
return

for item in items:
mark = None
for mark in item.iter_markers(name="order"):
pass

if mark is None:
order = 0
else:
order = int(mark.args[0])

if order == active_wave:
continue

item.add_marker(
pytest.mark.skip(
reason=f"order = {order}, running {active_wave} only"
)
)
6 changes: 3 additions & 3 deletions src/integration-tests/pytest.ini
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[pytest]
log_cli_format = %(bmqContext)-16s %(filename)s:%(lineno)d %(message)s
log_cli_format = %(bmqprocess)-16s %(filename)s:%(lineno)d %(message)s
log_level = INFO
log_format = %(bmqContext16)s %(asctime)s.%(msecs)03d (%(thread)15d) %(levelname)-8s %(name24)s %(filename)10s:%(lineno)-03d %(message)s
log_file_format = %(bmqContext16)s %(asctime)s.%(msecs)03d (%(thread)15d) %(levelname)-8s %(name24)s %(filename)10s:%(lineno)-03d %(message)s
log_format = %(bmqprocess16)s %(asctime)s.%(msecs)03d (%(thread)15d) %(levelname)-8s %(name24)s %(filename)10s:%(lineno)-03d %(message)s
log_file_format = %(bmqprocess16)s %(asctime)s.%(msecs)03d (%(thread)15d) %(levelname)-8s %(name24)s %(filename)10s:%(lineno)-03d %(message)s
log_file_level = INFO
log_file_date_format = %d%b%Y_%H:%M:%S
addopts = --strict-markers -p no:cacheprovider
Expand Down
11 changes: 11 additions & 0 deletions src/integration-tests/run-tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#! /usr/bin/env bash

set -e

repo_dir=$(realpath "$0")
repo_dir=${repo_dir%/src/*}

export PYTHONPATH=$repo_dir/src/python:$PYTHONPATH
cd "$repo_dir/src/integration-tests"

python -m pytest -m "not csl_mode and not fsm_mode" "$@"
8 changes: 4 additions & 4 deletions src/integration-tests/test_admin_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import json
import re

from bmq.dev.it.fixtures import Cluster, local_cluster # pylint: disable=unused-import
from bmq.dev.it.process.admin import AdminClient
from blazingmq.dev.it.fixtures import Cluster, single_node, order # pylint: disable=unused-import
from blazingmq.dev.it.process.admin import AdminClient


def test_admin(local_cluster: Cluster):
cluster: Cluster = local_cluster
def test_admin(single_node: Cluster):
cluster: Cluster = single_node
endpoint: str = cluster.config.definition.nodes[0].transport.tcp.endpoint # type: ignore

# Extract the (host, port) pair from the config
Expand Down
4 changes: 2 additions & 2 deletions src/integration-tests/test_alarms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import time

import bmq.dev.it.testconstants as tc
from bmq.dev.it.fixtures import Cluster, cluster, tweak # pylint: disable=unused-import
import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import Cluster, cluster, order, tweak # pylint: disable=unused-import


@tweak.cluster.queue_operations.consumption_monitor_period_ms(500)
Expand Down
53 changes: 28 additions & 25 deletions src/integration-tests/test_appids.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import time
from typing import List

import bmq.dev.it.testconstants as tc
from bmq.dev.it.fixtures import ( # pylint: disable=unused-import
import blazingmq.dev.it.testconstants as tc
from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import
Cluster,
cluster,
logger,
standard_cluster,
test_logger,
order,
multi_node,
tweak,
)
from bmq.dev.it.process.client import Client
from bmq.dev.it.util import attempt, wait_until
from blazingmq.dev.it.process.client import Client
from blazingmq.dev.it.util import attempt, wait_until

pytestmark = order(3)

authorized_app_ids = ["foo", "bar", "baz"]
timeout = 60
Expand All @@ -23,10 +26,10 @@ def set_app_ids(cluster: Cluster, app_ids: List[str]): # noqa: F811
cluster.config.domains[
tc.DOMAIN_FANOUT
].definition.parameters.mode.fanout.app_ids = app_ids # type: ignore
cluster.reconfigure_domain_values(tc.DOMAIN_FANOUT, {}, succeed=True)
cluster.reconfigure_domain(tc.DOMAIN_FANOUT, succeed=True)


def test_open_alarm_authorize_post(cluster: Cluster, logger):
def test_open_alarm_authorize_post(cluster: Cluster):
leader = cluster.last_known_leader
proxies = cluster.proxy_cycle()

Expand Down Expand Up @@ -62,16 +65,16 @@ def test_open_alarm_authorize_post(cluster: Cluster, logger):

leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE)

barStatus, bazStatus, fooStatus, quuxStatus = sorted(
bar_status, baz_status, foo_status, quuxStatus = sorted(
[
leader.capture(r"(\w+).*: status=(\w+)(?:, StorageIter.atEnd=(\w+))?", 60)
for i in all_app_ids
],
key=lambda m: m[1],
key=lambda match: match[1],
)
assert barStatus[2] == "alive"
assert bazStatus[2] == "alive"
assert fooStatus[2] == "alive"
assert bar_status[2] == "alive"
assert baz_status[2] == "alive"
assert foo_status[2] == "alive"
assert quuxStatus.group(2, 3) == ("unauthorized", None)

assert (
Expand All @@ -87,7 +90,7 @@ def test_open_alarm_authorize_post(cluster: Cluster, logger):

# ---------------------------------------------------------------------
# Check that 'quux' (unauthorized) client did not receive it.
logger.info('Check that "quux" has not seen any messages')
test_logger.info('Check that "quux" has not seen any messages')
assert not quux.wait_push_event(timeout=2, quiet=True)
assert len(quux.list(f"{tc.URI_FANOUT}?id=quux", block=True)) == 0

Expand Down Expand Up @@ -116,7 +119,7 @@ def test_open_alarm_authorize_post(cluster: Cluster, logger):
leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE)
# pylint: disable=cell-var-from-loop; passing lambda to 'wait_until' is safe
for app_id in authorized_app_ids:
logger.info(f"Check if {app_id} has seen 2 messages")
test_logger.info(f"Check if {app_id} has seen 2 messages")
assert wait_until(
lambda: len(
consumers[app_id].list(f"{tc.URI_FANOUT}?id={app_id}", block=True)
Expand All @@ -125,7 +128,7 @@ def test_open_alarm_authorize_post(cluster: Cluster, logger):
3,
)

logger.info("Check if quux has seen 1 message")
test_logger.info("Check if quux has seen 1 message")
assert wait_until(
lambda: len(quux.list(f"{tc.URI_FANOUT}?id=quux", block=True)) == 1, 3
)
Expand Down Expand Up @@ -236,7 +239,7 @@ def _test_command_errors(cluster):
set_app_ids(cluster, authorized_app_ids)


def test_unregister_in_presence_of_queues(cluster: Cluster, logger):
def test_unregister_in_presence_of_queues(cluster: Cluster):
leader = cluster.last_known_leader
proxies = cluster.proxy_cycle()

Expand Down Expand Up @@ -266,7 +269,7 @@ def _():
assert leader.outputs_substr("Num virtual storages: 2")
assert leader.outputs_substr("foo: status=unauthorized")

logger.info("confirm msg 1 for bar, expecting 1 msg in storage")
test_logger.info("confirm msg 1 for bar, expecting 1 msg in storage")
time.sleep(1) # Let the message reach the proxy
bar.confirm(tc.URI_FANOUT_BAR, "+1", succeed=True)

Expand All @@ -275,7 +278,7 @@ def _():
leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE)
assert leader.outputs_regex("Storage.*: 1 messages")

logger.info("confirm msg 1 for baz, expecting 0 msg in storage")
test_logger.info("confirm msg 1 for baz, expecting 0 msg in storage")
time.sleep(1) # Let the message reach the proxy
baz.confirm(tc.URI_FANOUT_BAZ, "+1", succeed=True)

Expand Down Expand Up @@ -489,28 +492,28 @@ def test_unauthorization(cluster: Cluster):
consumer.open(appid_uri, flags=["read"], succeed=True)


def test_two_consumers_of_unauthorized_app(standard_cluster: Cluster):
def test_two_consumers_of_unauthorized_app(multi_node: Cluster):
"""DRQS 167201621: First client open authorized and unauthorized apps;
second client opens unauthorized app.
Then, primary shuts down causing replica to issue wildcard close
requests to primary.
"""

leader = standard_cluster.last_known_leader
leader = multi_node.last_known_leader

replica1 = standard_cluster.nodes()[0]
replica1 = multi_node.nodes()[0]
if replica1 == leader:
replica1 = standard_cluster.nodes()[1]
replica1 = multi_node.nodes()[1]

# ---------------------------------------------------------------------
# Two "foo" and "unauthorized" consumers
consumer1 = replica1.create_client("consumer1")
consumer1.open(tc.URI_FANOUT_FOO, flags=["read"], succeed=True)
consumer1.open(f"{tc.URI_FANOUT}?id=unauthorized", flags=["read"], succeed=True)

replica2 = standard_cluster.nodes()[2]
replica2 = multi_node.nodes()[2]
if replica2 == leader:
replica2 = standard_cluster.nodes()[3]
replica2 = multi_node.nodes()[3]

consumer2 = replica2.create_client("consumer2")
consumer2.open(f"{tc.URI_FANOUT}?id=unauthorized", flags=["read"], succeed=True)
Expand Down
Loading

0 comments on commit 445ef6b

Please sign in to comment.