diff --git a/README.rst b/README.rst index 25bd1489..9867edfc 100644 --- a/README.rst +++ b/README.rst @@ -80,20 +80,133 @@ that worker and report the failure as usual. You can use the ``--max-worker-restart`` option to limit the number of workers that can be restarted, or disable restarting altogether using ``--max-worker-restart=0``. -By default, the ``-n`` option will send pending tests to any worker that is available, without -any guaranteed order, but you can control this with these options: +Dividing tests up +^^^^^^^^^^^^^^^^^ + +In order to divide the tests up amongst the workers, ``pytest-xdist`` first puts sets of +them into "test groups". The tests within a test group are all run together in one shot, +so fixtures of larger scopes won't be run once for every single test. Instead, they'll +be run as many times as they need to for the tests within that test group. But, once +that test group is finished, it should be assumed that all cached fixture values from +that test group's execution are destroyed. + +By default, there is no grouping logic and every individual test is placed in its own +test group, so using the ``-n`` option will send pending tests to any worker that is +available, without any guaranteed order. It should be assumed that when using this +approach, every single test is run entirely in isolation from the others, meaning the +tests can't rely on cached fixture values from larger-scoped fixtures. + +Provided test grouping options +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, ``pytest-xdist`` doesn't group any tests together, but it provides some +grouping options, based on simple criteria about a test's nodeid. so you can gunarantee +that certain tests are run in the same process. When they're run in the same process, +you gunarantee that larger-scoped fixtures are only executed as many times as would +normally be expected for the tests in the test group. But, once that test group is +finished, it should be assumed that all cached fixture values from that test group's +execution are destroyed. + +Here's the options that are built in: + +* ``--dist=loadscope``: tests will be grouped by **module** shown in each test's node + for *test functions* and by the **class** shown in each test's nodeid for *test + methods*. This feature was added in version ``1.19``. + +* ``--dist=loadfile``: tests will be grouped by the **module** shown in each test's + nodeid. This feature was added in version ``1.21``. + +Defining custom load distribution logic +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +``pytest-xdist`` iterates over the entire list of collected tests and usually determines +what group to put them in based off of their nodeid. There is no set number of test +groups, as it creates a new groups as needed. You can tap into this system to define +your own grouping logic by using the ``pytest_xdist_set_test_group_from_nodeid``. + +If you define your own copy of that hook, it will be called once for every test, and the +nodeid for each test will be passed in. Whatever it returns is the test group for that +test. If a test group doesn't already exist with that name, then it will be created, so +anything can be used. + +For example, let's say you have the following tests:: + + test/test_something.py::test_form_upload[image-chrome] + test/test_something.py::test_form_upload[image-firefox] + test/test_something.py::test_form_upload[video-chrome] + test/test_something.py::test_form_upload[video-firefox] + test/test_something_else.py::test_form_upload[image-chrome] + test/test_something_else.py::test_form_upload[image-firefox] + test/test_something_else.py::test_form_upload[video-chrome] + test/test_something_else.py::test_form_upload[video-firefox] + +In order to have the ``chrome`` related tests run together and the ``firefox`` tests run +together, but allow them to be separated by file, this could be done: -* ``--dist=loadscope``: tests will be grouped by **module** for *test functions* and - by **class** for *test methods*, then each group will be sent to an available worker, - guaranteeing that all tests in a group run in the same process. This can be useful if you have - expensive module-level or class-level fixtures. Currently the groupings can't be customized, - with grouping by class takes priority over grouping by module. - This feature was added in version ``1.19``. +.. code-block:: python + + def pytest_xdist_set_test_group_from_nodeid(nodeid): + browser_names = ['chrome', 'firefox'] + nodeid_params = nodeid.split('[', 1)[-1].rstrip(']').split('-') + for name in browser_names: + if name in nodeid_params: + return "{test_file}[{browser_name}]".format( + test_file=nodeid.split("::", 1)[0], + browser_name=name, + ) + +The tests would then be divided into these test groups: + +.. code-block:: python + + { + "test/test_something.py::test_form_upload[chrome]" : [ + "test/test_something.py::test_form_upload[image-chrome]", + "test/test_something.py::test_form_upload[video-chrome]" + ], + "test/test_something.py::test_form_upload[firefox]": [ + "test/test_something.py::test_form_upload[image-firefox]", + "test/test_something.py::test_form_upload[video-firefox]" + ], + "test/test_something_else.py::test_form_upload[firefox]": [ + "test/test_something_else.py::test_form_upload[image-firefox]", + "test/test_something_else.py::test_form_upload[video-firefox]" + ], + "test/test_something_else.py::test_form_upload[chrome]": [ + "test/test_something_else.py::test_form_upload[image-chrome]", + "test/test_something_else.py::test_form_upload[video-chrome]" + ] + } + +You can also fall back on one of the default load distribution mechanism by passing the +arguments for them listed above when you call pytest. Because this example returns +``None`` if the nodeid doesn't meet any of the criteria, it will defer to whichever +mechanism you chose. So if you passed ``--dist=loadfile``, tests would otherwise be +divided up by file name. + +Keep in mind, this is a means of optimization, not a means for determinism. + +Controlling test group execution order +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Sometimes you may want to have certain test groups start before or after others. Once +the test groups have been determined, the ``OrderedDict`` they are stored in can have +its order modified through the ``pytest_xdist_order_test_groups`` hook. For example, in +order to move the test group named ``"groupA"`` to the end of the queue, this can be +done: + +.. code-block:: python + + def pytest_xdist_order_test_groups(workqueue): + workqueue.move_to_end("groupA") -* ``--dist=loadfile``: tests will be grouped by file name, and then will be sent to an available - worker, guaranteeing that all tests in a group run in the same worker. This feature was added - in version ``1.21``. +Keep in mind, this is a means of optimization, not a means for determinism or filtering. +Removing test groups from this ``OrderedDict``, or adding new ones in after the fact can +have unforseen consequences. +If you want to filter out which tests get run, it is recommended to either rely on test +suite structure (so you can target the tests in specific locations), or by using marks +(so you can select or filter out based on specific marks with the ``-m`` flag). Making session-scoped fixtures execute only once ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/changelog/18.feature.rst b/changelog/18.feature.rst new file mode 100644 index 00000000..92db0d10 --- /dev/null +++ b/changelog/18.feature.rst @@ -0,0 +1 @@ +Allow defining of custom logic for test distribution among test groups, and changing the order in which test groups are passed out to workers. diff --git a/src/xdist/newhooks.py b/src/xdist/newhooks.py index f389192c..9737c657 100644 --- a/src/xdist/newhooks.py +++ b/src/xdist/newhooks.py @@ -55,3 +55,55 @@ def pytest_xdist_node_collection_finished(node, ids): @pytest.mark.firstresult def pytest_xdist_make_scheduler(config, log): """ return a node scheduler implementation """ + + +@pytest.mark.trylast +def pytest_xdist_set_test_group_from_nodeid(nodeid): + """Set the test group of a test using its nodeid. + + This will determine which tests are grouped up together and distributed to + workers at the same time. This will be called for every test, and whatever + is returned will be the name of the test group that test belongs to. In + order to have tests be grouped together, this function must return the same + value for each nodeid for each test. + + For example, given the following nodeids:: + + test/test_something.py::test_form_upload[image-chrome] + test/test_something.py::test_form_upload[image-firefox] + test/test_something.py::test_form_upload[video-chrome] + test/test_something.py::test_form_upload[video-firefox] + test/test_something_else.py::test_form_upload[image-chrome] + test/test_something_else.py::test_form_upload[image-firefox] + test/test_something_else.py::test_form_upload[video-chrome] + test/test_something_else.py::test_form_upload[video-firefox] + + In order to have the ``chrome`` related tests run together and the + ``firefox`` tests run together, but allow them to be separated by file, + this could be done:: + + def pytest_xdist_set_test_group_from_nodeid(nodeid): + browser_names = ['chrome', 'firefox'] + nodeid_params = nodeid.split('[', 1)[-1].rstrip(']').split('-') + for name in browser_names: + if name in nodeid_params: + return "{test_file}[{browser_name}]".format( + test_file=nodeid.split("::", 1)[0], + browser_name=name, + ) + + This would then defer to the default distribution logic for any tests this + can't apply to (i.e. if this would return ``None`` for a given ``nodeid``). + """ + +@pytest.mark.trylast +def pytest_xdist_order_test_groups(workqueue): + """Sort the queue of test groups to determine the order they will be executed in. + + The ``workqueue`` is an ``OrderedDict`` containing all of the test groups in the + order they will be handed out to the workers. Groups that are listed first will be + handed out to workers first. The ``workqueue`` only needs to be modified and doesn't + need to be returned. + + This can be useful when you want to run longer tests first. + """ diff --git a/src/xdist/scheduler/load.py b/src/xdist/scheduler/load.py index 41169849..915efde7 100644 --- a/src/xdist/scheduler/load.py +++ b/src/xdist/scheduler/load.py @@ -1,265 +1,434 @@ -from itertools import cycle +from collections import OrderedDict -from py.log import Producer from _pytest.runner import CollectReport - -from xdist.workermanage import parse_spec_config +from py.log import Producer from xdist.report import report_collection_diff +from xdist.workermanage import parse_spec_config class LoadScheduling(object): """Implement load scheduling across nodes. - This distributes the tests collected across all nodes so each test - is run just once. All nodes collect and submit the test suite and - when all collections are received it is verified they are - identical collections. Then the collection gets divided up in - chunks and chunks get submitted to nodes. Whenever a node finishes - an item, it calls ``.mark_test_complete()`` which will trigger the - scheduler to assign more tests if the number of pending tests for - the node falls below a low-watermark. - - When created, ``numnodes`` defines how many nodes are expected to - submit a collection. This is used to know when all nodes have - finished collection or how large the chunks need to be created. + This distributes the tests collected across all nodes so each test is run + just once. All nodes collect and submit the list of tests and when all + collections are received it is verified they are identical collections. + Then the collection gets divided up in work units, and those work units get + submitted to nodes. Whenever a node finishes an item, it calls + ``.mark_test_complete()`` which will trigger the scheduler to assign more + work units if the number of pending tests for the node falls below a + low-watermark. + + When created, ``numnodes`` defines how many nodes are expected to submit a + collection. This is used to know when all nodes have finished collection. + + Work units can also be considered to be "test groups", as the tests inside + should not be split up across multiple workers, and should be run within + the same work cycle for a single worker. By default, this does not attempt + to group the tests in any way, so each work unit would only contain a + single test. This is designed to be extensible so that custom grouping + logic can be applied either by making a child class from this and + overriding the ``get_default_test_group`` method, or by defining the + ``pytest_xdist_set_test_group_from_nodeid``.hook. If the hook is used, but it returns + ``None`` for a given test, then this class's default grouping logic will be + used for that test. Attributes: - :numnodes: The expected number of nodes taking part. The actual - number of nodes will vary during the scheduler's lifetime as - nodes are added by the DSession as they are brought up and - removed either because of a dead node or normal shutdown. This - number is primarily used to know when the initial collection is - completed. - - :node2collection: Map of nodes and their test collection. All - collections should always be identical. - - :node2pending: Map of nodes and the indices of their pending - tests. The indices are an index into ``.pending`` (which is - identical to their own collection stored in - ``.node2collection``). - - :collection: The one collection once it is validated to be - identical between all the nodes. It is initialised to None - until ``.schedule()`` is called. - - :pending: List of indices of globally pending tests. These are - tests which have not yet been allocated to a chunk for a node - to process. + :numnodes: The expected number of nodes taking part. The actual number of + nodes will vary during the scheduler's lifetime as nodes are added by + the DSession as they are brought up and removed either because of a dead + node or normal shutdown. This number is primarily used to know when the + initial collection is completed. + + :collection: The final list of tests collected by all nodes once it is + validated to be identical between all the nodes. It is initialised to + None until ``.schedule()`` is called. + + :workqueue: Ordered dictionary that maps all available groups with their + associated tests (nodeid). Nodeids are in turn associated with their + completion status. One entry of the workqueue is called a work unit. + In turn, a collection of work unit is called a workload. All groups in + this ordered dictionary contain tests that have yet to be scheduled for + a worker node. + + :: + + workqueue = { + '///test_module.py': { + '///test_module.py::test_case1': False, + '///test_module.py::test_case2': False, + (...) + }, + (...) + } + + :assigned_work: Ordered dictionary that maps worker nodes with their + assigned work units. + + :: + + assigned_work = { + '': { + '///test_module.py': { + '///test_module.py::test_case1': False, + '///test_module.py::test_case2': False, + (...) + }, + (...) + }, + (...) + } + + :registered_collections: Ordered dictionary that maps worker nodes with + their collection of tests gathered during test discovery. + + :: + + registered_collections = { + '': [ + '///test_module.py::test_case1', + '///test_module.py::test_case2', + ], + (...) + } + :node2collection: Map of nodes and their test collection. All collections + should always be identical. This is an alias for + `.registered_collections``. + + :node2pending: Map of nodes and the names of their pending test groups. The + names correspond to the names of the work groups that were originally + stored in ``.workqueue``. :log: A py.log.Producer instance. :config: Config object, used for handling hooks. """ + _producer = "loadsched" + def __init__(self, config, log=None): self.numnodes = len(parse_spec_config(config)) - self.node2collection = {} - self.node2pending = {} - self.pending = [] self.collection = None + + self.workqueue = OrderedDict() + self.assigned_work = OrderedDict() + self.registered_collections = OrderedDict() + self.node2collection = self.registered_collections + if log is None: - self.log = Producer("loadsched") + self.log = Producer(self._producer) else: - self.log = log.loadsched + self.log = getattr(log, self._producer) + self.config = config @property def nodes(self): - """A list of all nodes in the scheduler.""" - return list(self.node2pending.keys()) + """A list of all active nodes in the scheduler.""" + return list(self.assigned_work.keys()) + + @property + def node2pending(self): + """Pending work groups for each node.""" + pending = {} + for node, work_groups in self.assigned_work.items(): + pending[node] = [group for group in work_groups.keys()] + return pending @property def collection_is_completed(self): - """Boolean indication initial test collection is complete. + """Booleanq indication initial test collection is complete. - This is a boolean indicating all initial participating nodes - have finished collection. The required number of initial - nodes is defined by ``.numnodes``. + This is a boolean indicating all initial participating nodes have + finished collection. The required number of initial nodes is defined + by ``.numnodes``. """ - return len(self.node2collection) >= self.numnodes + return len(self.registered_collections) >= self.numnodes @property def tests_finished(self): """Return True if all tests have been executed by the nodes.""" if not self.collection_is_completed: return False - if self.pending: + + if self.workqueue: return False - for pending in self.node2pending.values(): - if len(pending) >= 2: + + for assigned_unit in self.assigned_work.values(): + if self._pending_of(assigned_unit) >= 2: return False + return True @property def has_pending(self): - """Return True if there are pending test items + """Return True if there are pending test items. - This indicates that collection has finished and nodes are - still processing test items, so this can be thought of as + This indicates that collection has finished and nodes are still + processing test items, so this can be thought of as "the scheduler is active". """ - if self.pending: + if self.workqueue: return True - for pending in self.node2pending.values(): - if pending: + + for assigned_unit in self.assigned_work.values(): + if self._pending_of(assigned_unit) > 0: return True + return False + @property + def pending(self): + """Names of unscheduled work groups.""" + return list(self.workqueue.keys()) + def add_node(self, node): """Add a new node to the scheduler. - From now on the node will be allocated chunks of tests to - execute. + From now on the node will be assigned work units to be executed. + + Called by the ``DSession.worker_workerready`` hook when it successfully + bootstraps a new node. + """ + assert node not in self.assigned_work + self.assigned_work[node] = OrderedDict() + + def remove_node(self, node): + """Remove a node from the scheduler. + + This should be called either when the node crashed or at shutdown time. + In the former case any pending items assigned to the node will be + re-scheduled. + + Called by the hooks: + + - ``DSession.worker_workerfinished``. + - ``DSession.worker_errordown``. - Called by the ``DSession.worker_workerready`` hook when it - successfully bootstraps a new node. + Removes any completed test items from the test group being executed, + along with the first non-executed test item (as this is the test item + that crashed), and then returns the crashed test item's nodeid, or None + if there's no more pending test items. """ - assert node not in self.node2pending - self.node2pending[node] = [] + workload = self.assigned_work.pop(node) + if not self._pending_of(workload): + return None + + # The node crashed, identify test that crashed + for test_group, work_unit in workload.copy().items(): + for nodeid, completed in work_unit.copy().items(): + # Remove test items that already ran from the test group. + del workload[test_group][nodeid] + if completed: + continue + # Track the nodeid of the crashed test item. + crashitem = nodeid + if len(workload[test_group]) == 0: + # Remove the test group from the workload as there's no + # incomplete work left for it. + del workload[test_group] + break + else: + if len(workload[test_group]) == 0: + # Remove the test group from the workload as there's no + # incomplete work left for it. + del workload[test_group] + continue + break + else: + raise RuntimeError( + "Unable to identify crashitem on a workload with pending items" + ) + + # Make uncompleted work unit available again + self.workqueue.update(workload) + + for node in self.assigned_work: + self._reschedule(node) + + return crashitem def add_node_collection(self, node, collection): - """Add the collected test items from a node + """Add the collected test items from a node. + + The collection is stored in the ``.registered_collections`` dictionary. + + Called by the hook: - The collection is stored in the ``.node2collection`` map. - Called by the ``DSession.worker_collectionfinish`` hook. + - ``DSession.worker_collectionfinish``. """ - assert node in self.node2pending + + # Check that add_node() was called on the node before + assert node in self.assigned_work + + # A new node has been added later, perhaps an original one died. if self.collection_is_completed: - # A new node has been added later, perhaps an original one died. - # .schedule() should have - # been called by now + + # Assert that .schedule() should have been called by now assert self.collection + + # Check that the new collection matches the official collection if collection != self.collection: - other_node = next(iter(self.node2collection.keys())) + + other_node = next(iter(self.registered_collections.keys())) + msg = report_collection_diff( self.collection, collection, other_node.gateway.id, node.gateway.id ) self.log(msg) return - self.node2collection[node] = list(collection) + + self.registered_collections[node] = list(collection) def mark_test_complete(self, node, item_index, duration=0): - """Mark test item as completed by node + """Mark test item as completed by node. - The duration it took to execute the item is used as a hint to - the scheduler. + Called by the hook: - This is called by the ``DSession.worker_testreport`` hook. + - ``DSession.worker_testreport``. """ - self.node2pending[node].remove(item_index) - self.check_schedule(node, duration=duration) + nodeid = self.registered_collections[node][item_index] + test_group = self.get_test_group(nodeid) - def check_schedule(self, node, duration=0): - """Maybe schedule new items on the node + self.assigned_work[node][test_group][nodeid] = True + self._reschedule(node) - If there are any globally pending nodes left then this will - check if the given node should be given any more tests. The - ``duration`` of the last test is optionally used as a - heuristic to influence how many tests the node is assigned. - """ - if node.shutting_down: - return + def _assign_work_unit(self, node): + """Assign a work unit to a node.""" + assert self.workqueue - if self.pending: - # how many nodes do we have? - num_nodes = len(self.node2pending) - # if our node goes below a heuristic minimum, fill it out to - # heuristic maximum - items_per_node_min = max(2, len(self.pending) // num_nodes // 4) - items_per_node_max = max(2, len(self.pending) // num_nodes // 2) - node_pending = self.node2pending[node] - if len(node_pending) < items_per_node_min: - if duration >= 0.1 and len(node_pending) >= 2: - # seems the node is doing long-running tests - # and has enough items to continue - # so let's rather wait with sending new items - return - num_send = items_per_node_max - len(node_pending) - self._send_tests(node, num_send) - else: - node.shutdown() + # Grab a unit of work + test_group, work_unit = self.workqueue.popitem(last=False) - self.log("num items waiting for node:", len(self.pending)) + # Keep track of the assigned work + assigned_to_node = self.assigned_work.setdefault(node, default=OrderedDict()) + assigned_to_node[test_group] = work_unit - def remove_node(self, node): - """Remove a node from the scheduler + # Ask the node to execute the workload + worker_collection = self.registered_collections[node] + nodeids_indexes = [ + worker_collection.index(nodeid) + for nodeid, completed in work_unit.items() + if not completed + ] + + node.send_runtest_some(nodeids_indexes) + + def get_default_test_group(self, nodeid): + """Determine the default test grouping of a nodeid. + + This doesn't group tests together. Every test is placed in its own test + group. + """ + return nodeid - This should be called either when the node crashed or at - shutdown time. In the former case any pending items assigned - to the node will be re-scheduled. Called by the - ``DSession.worker_workerfinished`` and - ``DSession.worker_errordown`` hooks. + def get_test_group(self, nodeid): + """Determine the test grouping of a nodeid. - Return the item which was being executing while the node - crashed or None if the node has no more pending items. + Every test is assigned a test grouping which is determined using the test's + nodeid. This test grouping effectively determines which tests should not be + separated from each other, and ensures they are run on the same worker during + the same work cycle for that worker. + """ + test_group = self.config.hook.pytest_xdist_set_test_group_from_nodeid(nodeid=nodeid) + if test_group: + return test_group[0] + return self.get_default_test_group(nodeid) + + def _pending_of(self, workload): + """Return the number of pending tests in a workload.""" + pending = sum(list(group.values()).count(False) for group in workload.values()) + return pending + + def _reschedule(self, node): + """Maybe schedule new items on the node. + If there are any globally pending work units left then this will check + if the given node should be given any more tests. """ - pending = self.node2pending.pop(node) - if not pending: + + # Do not add more work to a node shutting down + if node.shutting_down: return - # The node crashed, reassing pending items - crashitem = self.collection[pending.pop(0)] - self.pending.extend(pending) - for node in self.node2pending: - self.check_schedule(node) - return crashitem + # Check that more work is available + if not self.workqueue: + node.shutdown() + return + + self.log("Number of units waiting for node:", len(self.workqueue)) + + # Check that the node is almost depleted of work + # 2: Heuristic of minimum tests to enqueue more work + if self._pending_of(self.assigned_work[node]) > 2: + return + + # Pop one unit of work and assign it + self._assign_work_unit(node) def schedule(self): - """Initiate distribution of the test collection + """Initiate distribution of the test collection. + + Initiate scheduling of the items across the nodes. If this gets called + again later it behaves the same as calling ``._reschedule()`` on all + nodes so that newly added nodes will start to be used. - Initiate scheduling of the items across the nodes. If this - gets called again later it behaves the same as calling - ``.check_schedule()`` on all nodes so that newly added nodes - will start to be used. + If ``.collection_is_completed`` is True, this is called by the hook: - This is called by the ``DSession.worker_collectionfinish`` hook - if ``.collection_is_completed`` is True. + - ``DSession.worker_collectionfinish``. """ assert self.collection_is_completed # Initial distribution already happened, reschedule on all nodes if self.collection is not None: for node in self.nodes: - self.check_schedule(node) + self._reschedule(node) return - # XXX allow nodes to have different collections + # Check that all nodes collected the same tests if not self._check_nodes_have_same_collection(): self.log("**Different tests collected, aborting run**") return - # Collections are identical, create the index of pending items. - self.collection = list(self.node2collection.values())[0] - self.pending[:] = range(len(self.collection)) + # Collections are identical, create the final list of items + self.collection = list(next(iter(self.registered_collections.values()))) if not self.collection: return - # Send a batch of tests to run. If we don't have at least two - # tests per node, we have to send them all so that we can send - # shutdown signals and get all nodes working. - initial_batch = max(len(self.pending) // 4, 2 * len(self.nodes)) + # Determine chunks of work (test groups) + for nodeid in self.collection: + test_group = self.get_test_group(nodeid) + work_unit = self.workqueue.setdefault(test_group, default=OrderedDict()) + work_unit[nodeid] = False + + # allow customization of test group order + self.config.hook.pytest_xdist_order_test_groups(workqueue=self.workqueue) + + # Avoid having more workers than work + extra_nodes = len(self.nodes) - len(self.workqueue) - # distribute tests round-robin up to the batch size - # (or until we run out) - nodes = cycle(self.nodes) - for i in range(initial_batch): - self._send_tests(next(nodes), 1) + if extra_nodes > 0: + self.log("Shuting down {0} nodes".format(extra_nodes)) - if not self.pending: - # initial distribution sent all tests, start node shutdown + for _ in range(extra_nodes): + unused_node, assigned = self.assigned_work.popitem(last=True) + + self.log("Shuting down unused node {0}".format(unused_node)) + unused_node.shutdown() + + # Assign initial workload + for node in self.nodes: + self._assign_work_unit(node) + + # Ensure nodes start with at least two work units if possible (#277) + for node in self.nodes: + self._reschedule(node) + + # Initial distribution sent all tests, start node shutdown + if not self.workqueue: for node in self.nodes: node.shutdown() - def _send_tests(self, node, num): - tests_per_node = self.pending[:num] - if tests_per_node: - del self.pending[:num] - self.node2pending[node].extend(tests_per_node) - node.send_runtest_some(tests_per_node) - def _check_nodes_have_same_collection(self): """Return True if all nodes have collected the same items. @@ -267,20 +436,24 @@ def _check_nodes_have_same_collection(self): the collection differences and posting collection errors to pytest_collectreport hook. """ - node_collection_items = list(self.node2collection.items()) + node_collection_items = list(self.registered_collections.items()) first_node, col = node_collection_items[0] same_collection = True + for node, collection in node_collection_items[1:]: msg = report_collection_diff( col, collection, first_node.gateway.id, node.gateway.id ) - if msg: - same_collection = False - self.log(msg) - if self.config is not None: - rep = CollectReport( - node.gateway.id, "failed", longrepr=msg, result=[] - ) - self.config.hook.pytest_collectreport(report=rep) + if not msg: + continue + + same_collection = False + self.log(msg) + + if self.config is None: + continue + + rep = CollectReport(node.gateway.id, "failed", longrepr=msg, result=[]) + self.config.hook.pytest_collectreport(report=rep) return same_collection diff --git a/src/xdist/scheduler/loadfile.py b/src/xdist/scheduler/loadfile.py index 52a28b1e..eafd02bc 100644 --- a/src/xdist/scheduler/loadfile.py +++ b/src/xdist/scheduler/loadfile.py @@ -1,9 +1,9 @@ -from .loadscope import LoadScopeScheduling +from .load import LoadScheduling from py.log import Producer -class LoadFileScheduling(LoadScopeScheduling): - """Implement load scheduling across nodes, but grouping test test file. +class LoadFileScheduling(LoadScheduling): + """Implement load scheduling across nodes, but grouping test by file. This distributes the tests collected across all nodes so each test is run just once. All nodes collect and submit the list of tests and when all @@ -17,36 +17,14 @@ class LoadFileScheduling(LoadScopeScheduling): When created, ``numnodes`` defines how many nodes are expected to submit a collection. This is used to know when all nodes have finished collection. - This class behaves very much like LoadScopeScheduling, but with a file-level scope. + This groups tests by default based on their file. """ - def __init__(self, config, log=None): - super(LoadFileScheduling, self).__init__(config, log) - if log is None: - self.log = Producer("loadfilesched") - else: - self.log = log.loadfilesched + _producer = "loadfilesched" - def _split_scope(self, nodeid): - """Determine the scope (grouping) of a nodeid. + def get_default_test_group(self, nodeid): + """Determine the default test grouping of a nodeid, but based on file. - There are usually 3 cases for a nodeid:: - - example/loadsuite/test/test_beta.py::test_beta0 - example/loadsuite/test/test_delta.py::Delta1::test_delta0 - example/loadsuite/epsilon/__init__.py::epsilon.epsilon - - #. Function in a test module. - #. Method of a class in a test module. - #. Doctest in a function in a package. - - This function will group tests with the scope determined by splitting - the first ``::`` from the left. That is, test will be grouped in a - single work unit when they reside in the same file. - In the above example, scopes will be:: - - example/loadsuite/test/test_beta.py - example/loadsuite/test/test_delta.py - example/loadsuite/epsilon/__init__.py + Tests belonging to the same file will be put into the same test group. """ return nodeid.split("::", 1)[0] diff --git a/src/xdist/scheduler/loadscope.py b/src/xdist/scheduler/loadscope.py index 426d3407..67d9dadc 100644 --- a/src/xdist/scheduler/loadscope.py +++ b/src/xdist/scheduler/loadscope.py @@ -1,18 +1,14 @@ -from collections import OrderedDict - -from _pytest.runner import CollectReport +from .load import LoadScheduling from py.log import Producer -from xdist.report import report_collection_diff -from xdist.workermanage import parse_spec_config -class LoadScopeScheduling(object): +class LoadScopeScheduling(LoadScheduling): """Implement load scheduling across nodes, but grouping test by scope. This distributes the tests collected across all nodes so each test is run just once. All nodes collect and submit the list of tests and when all collections are received it is verified they are identical collections. - Then the collection gets divided up in work units, grouped by test scope, + Then the collection gets divided up in work units, grouped by test file, and those work units get submitted to nodes. Whenever a node finishes an item, it calls ``.mark_test_complete()`` which will trigger the scheduler to assign more work units if the number of pending tests for the node falls @@ -21,251 +17,13 @@ class LoadScopeScheduling(object): When created, ``numnodes`` defines how many nodes are expected to submit a collection. This is used to know when all nodes have finished collection. - Attributes: - - :numnodes: The expected number of nodes taking part. The actual number of - nodes will vary during the scheduler's lifetime as nodes are added by - the DSession as they are brought up and removed either because of a dead - node or normal shutdown. This number is primarily used to know when the - initial collection is completed. - - :collection: The final list of tests collected by all nodes once it is - validated to be identical between all the nodes. It is initialised to - None until ``.schedule()`` is called. - - :workqueue: Ordered dictionary that maps all available scopes with their - associated tests (nodeid). Nodeids are in turn associated with their - completion status. One entry of the workqueue is called a work unit. - In turn, a collection of work unit is called a workload. - - :: - - workqueue = { - '///test_module.py': { - '///test_module.py::test_case1': False, - '///test_module.py::test_case2': False, - (...) - }, - (...) - } - - :assigned_work: Ordered dictionary that maps worker nodes with their - assigned work units. - - :: - - assigned_work = { - '': { - '///test_module.py': { - '///test_module.py::test_case1': False, - '///test_module.py::test_case2': False, - (...) - }, - (...) - }, - (...) - } - - :registered_collections: Ordered dictionary that maps worker nodes with - their collection of tests gathered during test discovery. - - :: - - registered_collections = { - '': [ - '///test_module.py::test_case1', - '///test_module.py::test_case2', - ], - (...) - } - - :log: A py.log.Producer instance. - - :config: Config object, used for handling hooks. + This groups tests by default based on their scope. """ - def __init__(self, config, log=None): - self.numnodes = len(parse_spec_config(config)) - self.collection = None - - self.workqueue = OrderedDict() - self.assigned_work = OrderedDict() - self.registered_collections = OrderedDict() - - if log is None: - self.log = Producer("loadscopesched") - else: - self.log = log.loadscopesched - - self.config = config - - @property - def nodes(self): - """A list of all active nodes in the scheduler.""" - return list(self.assigned_work.keys()) - - @property - def collection_is_completed(self): - """Boolean indication initial test collection is complete. - - This is a boolean indicating all initial participating nodes have - finished collection. The required number of initial nodes is defined - by ``.numnodes``. - """ - return len(self.registered_collections) >= self.numnodes - - @property - def tests_finished(self): - """Return True if all tests have been executed by the nodes.""" - if not self.collection_is_completed: - return False - - if self.workqueue: - return False - - for assigned_unit in self.assigned_work.values(): - if self._pending_of(assigned_unit) >= 2: - return False - - return True - - @property - def has_pending(self): - """Return True if there are pending test items. - - This indicates that collection has finished and nodes are still - processing test items, so this can be thought of as - "the scheduler is active". - """ - if self.workqueue: - return True - - for assigned_unit in self.assigned_work.values(): - if self._pending_of(assigned_unit) > 0: - return True - - return False - - def add_node(self, node): - """Add a new node to the scheduler. - - From now on the node will be assigned work units to be executed. - - Called by the ``DSession.worker_workerready`` hook when it successfully - bootstraps a new node. - """ - assert node not in self.assigned_work - self.assigned_work[node] = OrderedDict() - - def remove_node(self, node): - """Remove a node from the scheduler. - - This should be called either when the node crashed or at shutdown time. - In the former case any pending items assigned to the node will be - re-scheduled. - - Called by the hooks: - - - ``DSession.worker_workerfinished``. - - ``DSession.worker_errordown``. - - Return the item being executed while the node crashed or None if the - node has no more pending items. - """ - workload = self.assigned_work.pop(node) - if not self._pending_of(workload): - return None - - # The node crashed, identify test that crashed - for work_unit in workload.values(): - for nodeid, completed in work_unit.items(): - if not completed: - crashitem = nodeid - break - else: - continue - break - else: - raise RuntimeError( - "Unable to identify crashitem on a workload with pending items" - ) - - # Made uncompleted work unit available again - self.workqueue.update(workload) - - for node in self.assigned_work: - self._reschedule(node) - - return crashitem - - def add_node_collection(self, node, collection): - """Add the collected test items from a node. - - The collection is stored in the ``.registered_collections`` dictionary. - - Called by the hook: - - - ``DSession.worker_collectionfinish``. - """ - - # Check that add_node() was called on the node before - assert node in self.assigned_work + _producer = "loadscopesched" - # A new node has been added later, perhaps an original one died. - if self.collection_is_completed: - - # Assert that .schedule() should have been called by now - assert self.collection - - # Check that the new collection matches the official collection - if collection != self.collection: - - other_node = next(iter(self.registered_collections.keys())) - - msg = report_collection_diff( - self.collection, collection, other_node.gateway.id, node.gateway.id - ) - self.log(msg) - return - - self.registered_collections[node] = list(collection) - - def mark_test_complete(self, node, item_index, duration=0): - """Mark test item as completed by node. - - Called by the hook: - - - ``DSession.worker_testreport``. - """ - nodeid = self.registered_collections[node][item_index] - scope = self._split_scope(nodeid) - - self.assigned_work[node][scope][nodeid] = True - self._reschedule(node) - - def _assign_work_unit(self, node): - """Assign a work unit to a node.""" - assert self.workqueue - - # Grab a unit of work - scope, work_unit = self.workqueue.popitem(last=False) - - # Keep track of the assigned work - assigned_to_node = self.assigned_work.setdefault(node, default=OrderedDict()) - assigned_to_node[scope] = work_unit - - # Ask the node to execute the workload - worker_collection = self.registered_collections[node] - nodeids_indexes = [ - worker_collection.index(nodeid) - for nodeid, completed in work_unit.items() - if not completed - ] - - node.send_runtest_some(nodeids_indexes) - - def _split_scope(self, nodeid): - """Determine the scope (grouping) of a nodeid. + def get_default_test_group(self, nodeid): + """Determine the default test grouping of a nodeid, but based on scope. There are usually 3 cases for a nodeid:: @@ -278,132 +36,12 @@ def _split_scope(self, nodeid): #. Doctest in a function in a package. This function will group tests with the scope determined by splitting - the first ``::`` from the right. That is, classes will be grouped in a - single work unit, and functions from a test module will be grouped by - their module. In the above example, scopes will be:: + the first ``::`` from the left. That is, test will be grouped in a + single work unit when they reside in the same file. + In the above example, scopes will be:: example/loadsuite/test/test_beta.py - example/loadsuite/test/test_delta.py::Delta1 + example/loadsuite/test/test_delta.py example/loadsuite/epsilon/__init__.py """ - return nodeid.rsplit("::", 1)[0] - - def _pending_of(self, workload): - """Return the number of pending tests in a workload.""" - pending = sum(list(scope.values()).count(False) for scope in workload.values()) - return pending - - def _reschedule(self, node): - """Maybe schedule new items on the node. - - If there are any globally pending work units left then this will check - if the given node should be given any more tests. - """ - - # Do not add more work to a node shutting down - if node.shutting_down: - return - - # Check that more work is available - if not self.workqueue: - node.shutdown() - return - - self.log("Number of units waiting for node:", len(self.workqueue)) - - # Check that the node is almost depleted of work - # 2: Heuristic of minimum tests to enqueue more work - if self._pending_of(self.assigned_work[node]) > 2: - return - - # Pop one unit of work and assign it - self._assign_work_unit(node) - - def schedule(self): - """Initiate distribution of the test collection. - - Initiate scheduling of the items across the nodes. If this gets called - again later it behaves the same as calling ``._reschedule()`` on all - nodes so that newly added nodes will start to be used. - - If ``.collection_is_completed`` is True, this is called by the hook: - - - ``DSession.worker_collectionfinish``. - """ - assert self.collection_is_completed - - # Initial distribution already happened, reschedule on all nodes - if self.collection is not None: - for node in self.nodes: - self._reschedule(node) - return - - # Check that all nodes collected the same tests - if not self._check_nodes_have_same_collection(): - self.log("**Different tests collected, aborting run**") - return - - # Collections are identical, create the final list of items - self.collection = list(next(iter(self.registered_collections.values()))) - if not self.collection: - return - - # Determine chunks of work (scopes) - for nodeid in self.collection: - scope = self._split_scope(nodeid) - work_unit = self.workqueue.setdefault(scope, default=OrderedDict()) - work_unit[nodeid] = False - - # Avoid having more workers than work - extra_nodes = len(self.nodes) - len(self.workqueue) - - if extra_nodes > 0: - self.log("Shuting down {0} nodes".format(extra_nodes)) - - for _ in range(extra_nodes): - unused_node, assigned = self.assigned_work.popitem(last=True) - - self.log("Shuting down unused node {0}".format(unused_node)) - unused_node.shutdown() - - # Assign initial workload - for node in self.nodes: - self._assign_work_unit(node) - - # Ensure nodes start with at least two work units if possible (#277) - for node in self.nodes: - self._reschedule(node) - - # Initial distribution sent all tests, start node shutdown - if not self.workqueue: - for node in self.nodes: - node.shutdown() - - def _check_nodes_have_same_collection(self): - """Return True if all nodes have collected the same items. - - If collections differ, this method returns False while logging - the collection differences and posting collection errors to - pytest_collectreport hook. - """ - node_collection_items = list(self.registered_collections.items()) - first_node, col = node_collection_items[0] - same_collection = True - - for node, collection in node_collection_items[1:]: - msg = report_collection_diff( - col, collection, first_node.gateway.id, node.gateway.id - ) - if not msg: - continue - - same_collection = False - self.log(msg) - - if self.config is None: - continue - - rep = CollectReport(node.gateway.id, "failed", longrepr=msg, result=[]) - self.config.hook.pytest_collectreport(report=rep) - - return same_collection + return nodeid.split("::", 1)[0] diff --git a/testing/test_dsession.py b/testing/test_dsession.py index 8d0373e9..31f26313 100644 --- a/testing/test_dsession.py +++ b/testing/test_dsession.py @@ -118,70 +118,115 @@ def test_schedule_load_simple(self, testdir): assert sched.tests_finished def test_schedule_batch_size(self, testdir): + # should have 2 workers config = testdir.parseconfig("--tx=2*popen") + # create the scheduler sched = LoadScheduling(config) + # add the 3 worker nodes sched.add_node(MockNode()) sched.add_node(MockNode()) + # get references to all the worker nodes node1, node2 = sched.nodes - col = ["xyz"] * 6 + # generate the test collection with 6 tests + col = ["xyz{}".format(i) for i in range(6)] + # each worker node has the test collection sched.add_node_collection(node1, col) sched.add_node_collection(node2, col) + # do the initial dividing up of the work sched.schedule() - # assert not sched.tests_finished sent1 = node1.sent sent2 = node2.sent + # the work was divided up in a round robin fashion, and only two work groups + # were provided initially (and there's one test item per work group) assert sent1 == [0, 2] assert sent2 == [1, 3] - assert sched.pending == [4, 5] - assert sched.node2pending[node1] == sent1 - assert sched.node2pending[node2] == sent2 + # only all but the first 4 work groups are still pending, as only + # *2 work groups were initially scheduled + assert sched.pending == col[4:] assert len(sched.pending) == 2 + # the worker nodes show the correct pending work groups + assert sched.node2pending[node1] == col[:4:2] + assert sched.node2pending[node2] == col[1:4:2] + # mark the first test of the first worker node as completed sched.mark_test_complete(node1, 0) + # the first worker node was given the next work group in the queue assert node1.sent == [0, 2, 4] - assert sched.pending == [5] + # the second worker node was not given another work group as none of + # its tests were marked as complete. assert node2.sent == [1, 3] + # only all but the first 4 work groups are still pending, as only + # *2 work groups were initially scheduled and + # the first worker node was given another + assert sched.pending == col[5:] + assert len(sched.pending) == 1 + # mark the second test of the first worker node as completed sched.mark_test_complete(node1, 2) + # the first worker node was given the next work group in the queue assert node1.sent == [0, 2, 4, 5] + # the second worker node was not given another work group as none of + # its tests were marked as complete. + assert node2.sent == [1, 3] + # there's no tests left that haven't been scheduled assert not sched.pending def test_schedule_fewer_tests_than_nodes(self, testdir): - config = testdir.parseconfig("--tx=2*popen") + # should have 3 workers + config = testdir.parseconfig("--tx=3*popen") + # create the scheduler sched = LoadScheduling(config) + # add the 3 worker nodes sched.add_node(MockNode()) sched.add_node(MockNode()) sched.add_node(MockNode()) + # get references to all the worker nodes node1, node2, node3 = sched.nodes - col = ["xyz"] * 2 + # generate the test collection with 2 tests + col = ["xyz{}".format(i) for i in range(2)] + # each worker node has the test collection sched.add_node_collection(node1, col) sched.add_node_collection(node2, col) + sched.add_node_collection(node3, col) + # do the initial dividing up of the work sched.schedule() # assert not sched.tests_finished sent1 = node1.sent sent2 = node2.sent sent3 = node3.sent + # the work was divided up in a round robin fashion assert sent1 == [0] assert sent2 == [1] assert sent3 == [] + # there's no tests left that haven't been scheduled assert not sched.pending def test_schedule_fewer_than_two_tests_per_node(self, testdir): - config = testdir.parseconfig("--tx=2*popen") + # should have 3 workers + config = testdir.parseconfig("--tx=3*popen") + # create the scheduler sched = LoadScheduling(config) + # add the 3 worker nodes sched.add_node(MockNode()) sched.add_node(MockNode()) sched.add_node(MockNode()) + # get references to all the worker nodes node1, node2, node3 = sched.nodes - col = ["xyz"] * 5 + # generate the test collection with 5 tests + col = ["xyz{}".format(i) for i in range(5)] + # each worker node has the test collection sched.add_node_collection(node1, col) sched.add_node_collection(node2, col) + sched.add_node_collection(node3, col) + # do the initial dividing up of the work sched.schedule() # assert not sched.tests_finished sent1 = node1.sent sent2 = node2.sent sent3 = node3.sent + # the work was divided up in a round robin fashion assert sent1 == [0, 3] assert sent2 == [1, 4] assert sent3 == [2] + # there's no tests left that haven't been scheduled assert not sched.pending def test_add_remove_node(self, testdir): diff --git a/testing/test_newhooks.py b/testing/test_newhooks.py index 22928fe1..31b2dff5 100644 --- a/testing/test_newhooks.py +++ b/testing/test_newhooks.py @@ -1,5 +1,8 @@ +import re import pytest +from xdist.scheduler import LoadScheduling + class TestHooks: @pytest.fixture(autouse=True) @@ -57,3 +60,209 @@ def pytest_xdist_node_collection_finished(node, ids): ["*HOOK: gw0 test_a, test_b, test_c", "*HOOK: gw1 test_a, test_b, test_c"] ) res.stdout.fnmatch_lines(["*3 passed*"]) + + +class MockGateway: + _count = 0 + + def __init__(self): + self.id = str(self._count) + self._count += 1 + + +class MockNode: + def __init__(self): + self.sent = [] + self.gateway = MockGateway() + self._shutdown = False + + def send_runtest_some(self, indices): + self.sent.extend(indices) + + def send_runtest_all(self): + self.sent.append("ALL") + + def shutdown(self): + self._shutdown = True + + @property + def shutting_down(self): + return self._shutdown + + +class TestSetTestGroupWithLoadScheduler: + + def test_gw0_contains_group_a_and_gw1_contains_group_b(self, testdir): + testdir.makeconftest( + """ + import pytest + + def pytest_xdist_set_test_group_from_nodeid(nodeid): + group_names = ['groupA', 'groupB'] + nodeid_params = nodeid.split('[', 1)[-1].rstrip(']').split('-') + for name in group_names: + if name in nodeid_params: + return name + """ + ) + testdir.makepyfile( + """ + import pytest + @pytest.fixture(params=["groupA", "groupB"]) + def group(request): + return request.param + @pytest.fixture(params=[1, 2, 3, 4, 5]) + def number(request): + return request.param + def test_with_group(number, group): pass + """ + ) + results = testdir.runpytest("-n3", "-v") + combos = parse_tests_and_workers_from_output(results.outlines) + groupings = {} + for worker_id, status, nodeid in combos: + groupings.setdefault(worker_id, []).append((nodeid, status)) + gw0_tests = "".join(group[0] for group in groupings["gw0"]) + gw1_tests = "".join(group[0] for group in groupings["gw1"]) + # there is only groupA tests in gw0's assigned work + assert gw0_tests.count("groupA") == 5 + assert gw0_tests.count("groupB") == 0 + # there is only groupB tests in gw0's assigned work + assert gw1_tests.count("groupA") == 0 + assert gw1_tests.count("groupB") == 5 + # the third worker node recieved no work groups + assert "gw2" not in groupings.keys() + # all tests passed + assert len(groupings["gw0"]) == 5 + assert len(groupings["gw1"]) == 5 + assert [group[1] for group in groupings["gw0"]].count("PASSED") == 5 + assert [group[1] for group in groupings["gw1"]].count("PASSED") == 5 + + def test_default_distribution_fallback(self, testdir): + testdir.makeconftest( + """ + import pytest + + def pytest_xdist_set_test_group_from_nodeid(nodeid): + group_names = ['groupA', 'groupB'] + nodeid_params = nodeid.split('[', 1)[-1].rstrip(']').split('-') + for name in group_names: + if name in nodeid_params: + return name + """ + ) + testdir.makepyfile( + """ + import pytest + @pytest.fixture(params=["groupA", "groupB"]) + def group(request): + return request.param + @pytest.fixture(params=[1, 2, 3, 4, 5]) + def number(request): + return request.param + def test_with_group(number, group): pass + def test_no_group_no_class(number): pass + class TestNoGroupClass: + def test_no_group_class(self, number): pass + class TestOtherNoGroupClass: + def test_no_group_other_class(self, number): pass + """ + ) + results = testdir.runpytest("-n3", "-v", "--dist=loadfile") + combos = parse_tests_and_workers_from_output(results.outlines) + groupings = {} + for worker_id, status, nodeid in combos: + groupings.setdefault(worker_id, []).append((nodeid, status)) + gw0_tests = "".join(group[0] for group in groupings["gw0"]) + gw1_tests = "".join(group[0] for group in groupings["gw1"]) + gw2_tests = "".join(group[0] for group in groupings["gw2"]) + # there is only groupA tests in gw0's assigned work + assert gw0_tests.count("groupA") == 5 + assert gw0_tests.count("groupB") == 0 + # there is only groupB tests in gw0's assigned work + assert gw1_tests.count("groupA") == 0 + assert gw1_tests.count("groupB") == 5 + # the third worker node recieved all remaining tests + assert gw2_tests.count("test_no_group_no_class") == 5 + assert gw2_tests.count("test_no_group_class") == 5 + assert gw2_tests.count("test_no_group_other_class") == 5 + # all tests passed + assert len(groupings["gw0"]) == 5 + assert len(groupings["gw1"]) == 5 + assert len(groupings["gw2"]) == 15 + assert [group[1] for group in groupings["gw0"]].count("PASSED") == 5 + assert [group[1] for group in groupings["gw1"]].count("PASSED") == 5 + assert [group[1] for group in groupings["gw2"]].count("PASSED") == 15 + + +class TestSetTestGroupWithLoadSchedulerOrderGroups: + + def test_gw0_contains_group_b_and_gw1_contains_group_a(self, testdir): + testdir.makeconftest( + """ + import pytest + + def pytest_xdist_set_test_group_from_nodeid(nodeid): + group_names = ['groupA', 'groupB'] + nodeid_params = nodeid.split('[', 1)[-1].rstrip(']').split('-') + for name in group_names: + if name in nodeid_params: + return name + + def pytest_xdist_order_test_groups(workqueue): + workqueue.move_to_end('groupA') + """ + ) + testdir.makepyfile( + """ + import pytest + @pytest.fixture(params=["groupA", "groupB"]) + def group(request): + return request.param + @pytest.fixture(params=[1, 2, 3, 4, 5]) + def number(request): + return request.param + def test_with_group(number, group): pass + """ + ) + results = testdir.runpytest("-n3", "-v") + combos = parse_tests_and_workers_from_output(results.outlines) + groupings = {} + for worker_id, status, nodeid in combos: + groupings.setdefault(worker_id, []).append((nodeid, status)) + gw0_tests = "".join(group[0] for group in groupings["gw0"]) + gw1_tests = "".join(group[0] for group in groupings["gw1"]) + # there is only groupA tests in gw0's assigned work + assert gw0_tests.count("groupB") == 5 + assert gw0_tests.count("groupA") == 0 + # there is only groupB tests in gw0's assigned work + assert gw1_tests.count("groupB") == 0 + assert gw1_tests.count("groupA") == 5 + # the third worker node recieved no work groups + assert "gw2" not in groupings.keys() + # all tests passed + assert len(groupings["gw0"]) == 5 + assert len(groupings["gw1"]) == 5 + assert [group[1] for group in groupings["gw0"]].count("PASSED") == 5 + assert [group[1] for group in groupings["gw1"]].count("PASSED") == 5 + + +def parse_tests_and_workers_from_output(lines): + result = [] + for line in lines: + # example match: "[gw0] PASSED test_a.py::test[7]" + m = re.match( + r""" + \[(gw\d)\] # worker + \s* + (?:\[\s*\d+%\])? # progress indicator (pytest >=3.3) + \s(.*?) # status string ("PASSED") + \s(.*::.*) # nodeid + """, + line.strip(), + re.VERBOSE, + ) + if m: + worker, status, nodeid = m.groups() + result.append((worker, status, nodeid)) + return result