diff --git a/ducktape/cluster/node_container.py b/ducktape/cluster/node_container.py index e5412f2ab..a7aaec1bf 100644 --- a/ducktape/cluster/node_container.py +++ b/ducktape/cluster/node_container.py @@ -13,6 +13,7 @@ # limitations under the License. from six import iteritems +from operator import attrgetter class NodeNotPresentError(Exception): @@ -105,7 +106,14 @@ def remove_nodes(self, nodes): def remove_spec(self, cluster_spec): """ - Remove nodes matching a ClusterSpec from this NodeContainer. + Remove nodes matching a ClusterSpec from this NodeContainer. Nodes are allocated + based on machine type with following strategy: + + 1) To compare MachineType, different weight has been assigned to configuration + as cpu > mem > disk > additional_disk, which means node1:{mem:4G, disk:100G} + required more resource than node2:{mem:2G, disk:200G}. + 2) Always try to satisfy node specific that requires most resource. + 3) Always try to allocate machine with least resource as possible. :param cluster_spec: The cluster spec. This will not be modified. :returns: A list of the nodes that were removed. @@ -116,11 +124,17 @@ def remove_spec(self, cluster_spec): if len(msg) > 0: raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) removed = [] - for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes): - num_nodes = len(node_specs) + for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes): avail_nodes = self.os_to_nodes.get(os, []) - for i in range(0, num_nodes): - removed.append(avail_nodes.pop(0)) + sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True) + sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes) + for req_node in sorted_req_nodes[:]: + for avail_node in sorted_avail_nodes[:]: + if NodeContainer.satisfy(avail_node, req_node): + sorted_avail_nodes.remove(avail_node) + avail_nodes.remove(avail_node) + removed.append(avail_node) + break return removed def can_remove_spec(self, cluster_spec): @@ -136,21 +150,56 @@ def can_remove_spec(self, cluster_spec): def attempt_remove_spec(self, cluster_spec): """ - Attempt to remove a cluster_spec from this node container. + Attempt to remove a cluster_spec from this node container. Uses the same strategy + as remove_spec(self, cluster_spec). :param cluster_spec: The cluster spec. This will not be modified. :returns: An empty string if we can remove the nodes; an error string otherwise. """ msg = "" - for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes): - num_nodes = len(node_specs) - avail_nodes = len(self.os_to_nodes.get(os, [])) - if avail_nodes < num_nodes: - msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \ - (os, num_nodes, os, avail_nodes) + for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes): + avail_nodes = self.os_to_nodes.get(os, []) + num_avail_nodes = len(avail_nodes) + num_req_nodes = len(req_nodes) + if num_avail_nodes < num_req_nodes: + msg = msg + "%s nodes requested: %d. %s nodes available: %d" % (os, num_req_nodes, os, num_avail_nodes) + sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True) + sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes) + for req_node in sorted_req_nodes[:]: + for avail_node in sorted_avail_nodes[:]: + if NodeContainer.satisfy(avail_node, req_node): + sorted_req_nodes.remove(req_node) + sorted_avail_nodes.remove(avail_node) + break + # check unsatisfied nodes + for unsatisfied_node in sorted_req_nodes: + msg += "\ncannot satisfy minimum requirement for requested node: %s" % str(unsatisfied_node) return msg + @staticmethod + def satisfy(avail_node, req_node): + """ + Return true if available node satisfies the minimum requirement of requested node. + """ + if avail_node.machine_type.cpu_core < req_node.machine_type.cpu_core or \ + avail_node.machine_type.mem_size_gb < req_node.machine_type.mem_size_gb or \ + avail_node.machine_type.disk_size_gb < req_node.machine_type.disk_size_gb: + return False + for d_name, d_size in req_node.machine_type.additional_disks.iteritems(): + if avail_node.machine_type.additional_disks.get(d_name, 0) < d_size: + return False + return True + + @staticmethod + def sort(nodes, reverse=False): + """ + Return sorted list of nodes based on machine_type. + """ + return sorted(nodes, key=attrgetter('machine_type.cpu_core', 'machine_type.mem_size_gb', + 'machine_type.disk_size_gb', 'machine_type.additional_disks'), + reverse=reverse) + def clone(self): """ Returns a deep copy of this object. diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index 91858d406..9785a363f 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -17,16 +17,14 @@ import signal import time import traceback -import zmq -from six import iteritems +import zmq from ducktape.tests.event import ClientEventFactory from ducktape.tests.loader import TestLoader +from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL from ducktape.tests.serde import SerDe from ducktape.tests.test import test_logger, TestContext - -from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL from ducktape.utils.local_filesystem_utils import mkdir_p @@ -113,19 +111,11 @@ def run(self): self.log(logging.DEBUG, "Checking if there are enough nodes...") min_cluster_spec = self.test.min_cluster_spec() - os_to_num_nodes = {} - for node_spec in min_cluster_spec: - if not os_to_num_nodes.get(node_spec.operating_system): - os_to_num_nodes[node_spec.operating_system] = 1 - else: - os_to_num_nodes[node_spec.operating_system] = os_to_num_nodes[node_spec.operating_system] + 1 - for (operating_system, node_count) in iteritems(os_to_num_nodes): - num_avail = len(list(self.cluster.all().nodes.elements(operating_system=operating_system))) - if node_count > num_avail: - raise RuntimeError( - "There are not enough nodes available in the cluster to run this test. " - "Cluster size for %s: %d, Need at least: %d. Services currently registered: %s" % - (operating_system, num_avail, node_count, self.test_context.services)) + + # Check test resource + msg = self.cluster.all().nodes.attempt_remove_spec(min_cluster_spec) + if len(msg) > 0: + raise RuntimeError("There are not enough nodes available in the cluster to run this test. " + msg) # Run the test unit start_time = time.time()