diff --git a/ducktape/cluster/cluster.py b/ducktape/cluster/cluster.py index cf404e565..9a7b6cd99 100644 --- a/ducktape/cluster/cluster.py +++ b/ducktape/cluster/cluster.py @@ -26,9 +26,8 @@ def name(self): return self.account.hostname @property - def operating_system(self): - return self.account.operating_system - + def host_type(self): + return self.account.host_type class Cluster(object): """ Interface for a cluster -- a collection of nodes with login credentials. diff --git a/ducktape/cluster/cluster_spec.py b/ducktape/cluster/cluster_spec.py index 684cb3aee..996f39f76 100644 --- a/ducktape/cluster/cluster_spec.py +++ b/ducktape/cluster/cluster_spec.py @@ -22,23 +22,19 @@ WINDOWS = "windows" -SUPPORTED_OS_TYPES = [LINUX, WINDOWS] - class NodeSpec(object): """ The specification for a ducktape cluster node. - :param operating_system: The operating system of the node. + :param host_type: The type of the node. """ - def __init__(self, operating_system=LINUX): - self.operating_system = operating_system - if self.operating_system not in SUPPORTED_OS_TYPES: - raise RuntimeError("Unsupported os type %s" % self.operating_system) + def __init__(self, host_type=LINUX): + self.host_type = host_type def __str__(self): dict = { - "os": self.operating_system, + "type": self.host_type, } return json.dumps(dict, sort_keys=True) @@ -57,7 +53,19 @@ def simple_linux(num_nodes): """ Create a ClusterSpec containing some simple Linux nodes. """ - node_specs = [NodeSpec(LINUX)] * num_nodes + node_specs_dict = {LINUX: num_nodes} + return ClusterSpec.from_dict(node_specs_dict) + + @staticmethod + def from_dict(node_specs_dict): + """ + Create a ClusterSpec from a dict of nodes specifics. + etc. {'type1': 10, 'type2':5} + """ + node_specs = [] + for node_type, num_nodes in node_specs_dict.iteritems(): + for x in range(num_nodes): + node_specs.append(NodeSpec(node_type)) return ClusterSpec(node_specs) @staticmethod @@ -65,7 +73,7 @@ def from_nodes(nodes): """ Create a ClusterSpec describing a list of nodes. """ - return ClusterSpec(ClusterSpec([NodeSpec(node.operating_system) for node in nodes])) + return ClusterSpec(ClusterSpec([NodeSpec(node.host_type) for node in nodes])) def __init__(self, nodes=None): """ diff --git a/ducktape/cluster/finite_subcluster.py b/ducktape/cluster/finite_subcluster.py index 2899c3040..9f27cdb8e 100644 --- a/ducktape/cluster/finite_subcluster.py +++ b/ducktape/cluster/finite_subcluster.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict from ducktape.cluster.cluster import Cluster from ducktape.cluster.cluster_spec import ClusterSpec from ducktape.cluster.node_container import NodeContainer @@ -23,20 +24,36 @@ class FiniteSubcluster(Cluster): def __init__(self, nodes): self.nodes = nodes - self._available_nodes = NodeContainer(nodes) - self._in_use_nodes = NodeContainer() + self._available_nodes_dict = self._get_available_nodes_dict(nodes) + self._in_use_nodes_dict = defaultdict(NodeContainer) def alloc(self, cluster_spec): - allocated = self._available_nodes.remove_spec(cluster_spec) - self._in_use_nodes.add_nodes(allocated) + for node_type in cluster_spec.nodes.type_to_nodes: + available_nodes = self._available_nodes_dict.get(node_type, NodeContainer()) + allocated = available_nodes.remove_spec(cluster_spec) + self._in_use_nodes_dict[node_type].add_nodes(allocated) return allocated def free_single(self, node): - self._in_use_nodes.remove_node(node) - self._available_nodes.add_node(node) + self._in_use_nodes_dict[node.host_type].remove_node(node) + self._available_nodes_dict[node.host_type].add_node(node) def available(self): - return ClusterSpec.from_nodes(self._available_nodes) + available_nodes = NodeContainer() + for container in self._available_nodes_dict.itervalues(): + for node in container.elements(): + available_nodes.add_node(node) + return ClusterSpec.from_nodes(available_nodes) def used(self): - return ClusterSpec.from_nodes(self._in_use_nodes) + used_nodes = NodeContainer() + for container in self._in_use_nodes_dict.itervalues(): + for node in container.elements(): + used_nodes.add_node(node) + return ClusterSpec.from_nodes(used_nodes) + + def _get_available_nodes_dict(self, nodes): + available_nodes_dict = defaultdict(NodeContainer) + for node in nodes: + available_nodes_dict[node.host_type].add_node(node) + return available_nodes_dict diff --git a/ducktape/cluster/node_container.py b/ducktape/cluster/node_container.py index 9ff73e94f..a952c1de5 100644 --- a/ducktape/cluster/node_container.py +++ b/ducktape/cluster/node_container.py @@ -26,20 +26,20 @@ def __init__(self, nodes=None): """ Create a NodeContainer with the given nodes. - Node objects should implement at least an operating_system property. + Node objects should implement at least an host_type property. :param nodes: A collection of node objects to add, or None to add nothing. """ - self.os_to_nodes = {} + self.type_to_nodes = {} if nodes is not None: for node in nodes: - self.os_to_nodes.setdefault(node.operating_system, []).append(node) + self.type_to_nodes.setdefault(node.host_type, []).append(node) def size(self): """ Returns the total number of nodes in the container. """ - return sum([len(val) for val in self.os_to_nodes.values()]) + return sum([len(val) for val in self.type_to_nodes.values()]) def __len__(self): return self.size() @@ -47,19 +47,19 @@ def __len__(self): def __iter__(self): return self.elements() - def elements(self, operating_system=None): + def elements(self, host_type=None): """ Yield the elements in this container. - :param operating_system: If this is non-None, we will iterate only over elements - which have this operating system. + :param host_type: If this is non-None, we will iterate only over nodes + with this type. """ - if operating_system is None: - for node_list in self.os_to_nodes.values(): + if host_type is None: + for node_list in self.type_to_nodes.values(): for node in node_list: yield node else: - for node in self.os_to_nodes.get(operating_system, []): + for node in self.type_to_nodes.get(host_type, []): yield node def add_node(self, node): @@ -68,7 +68,7 @@ def add_node(self, node): :param node: The node to add. """ - self.os_to_nodes.setdefault(node.operating_system, []).append(node) + self.type_to_nodes.setdefault(node.host_type, []).append(node) def add_nodes(self, nodes): """ @@ -88,7 +88,7 @@ def remove_node(self, node): :throws NodeNotPresentError: If the node is not in the collection. """ try: - return self.os_to_nodes.get(node.operating_system, []).remove(node) + return self.type_to_nodes.get(node.host_type, []).remove(node) except ValueError: raise NodeNotPresentError @@ -114,9 +114,9 @@ def remove_spec(self, cluster_spec): if len(msg) > 0: raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) removed = [] - for os, node_specs in cluster_spec.nodes.os_to_nodes.iteritems(): + for host_type, node_specs in cluster_spec.nodes.type_to_nodes.iteritems(): num_nodes = len(node_specs) - avail_nodes = self.os_to_nodes.get(os, []) + avail_nodes = self.type_to_nodes.get(host_type, []) for i in range(0, num_nodes): removed.append(avail_nodes.pop(0)) return removed @@ -141,12 +141,12 @@ def attempt_remove_spec(self, cluster_spec): an error string otherwise. """ msg = "" - for os, node_specs in cluster_spec.nodes.os_to_nodes.iteritems(): + for host_type, node_specs in cluster_spec.nodes.type_to_nodes.iteritems(): num_nodes = len(node_specs) - avail_nodes = len(self.os_to_nodes.get(os, [])) + avail_nodes = len(self.type_to_nodes.get(host_type, [])) if avail_nodes < num_nodes: msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \ - (os, num_nodes, os, avail_nodes) + (host_type, num_nodes, host_type, avail_nodes) return msg def clone(self): @@ -154,7 +154,7 @@ def clone(self): Returns a deep copy of this object. """ container = NodeContainer() - for operating_system, nodes in self.os_to_nodes.iteritems(): + for host_type, nodes in self.type_to_nodes.iteritems(): for node in nodes: - container.os_to_nodes.setdefault(operating_system, []).append(node) + container.type_to_nodes.setdefault(host_type, []).append(node) return container diff --git a/ducktape/cluster/remoteaccount.py b/ducktape/cluster/remoteaccount.py index fdae01083..22cefae8b 100644 --- a/ducktape/cluster/remoteaccount.py +++ b/ducktape/cluster/remoteaccount.py @@ -22,6 +22,7 @@ import stat import tempfile import warnings +import re from ducktape.utils.http_utils import HttpMixin from ducktape.utils.util import wait_until @@ -144,6 +145,13 @@ def __init__(self, ssh_config, externally_routable_ip=None, logger=None): def operating_system(self): return self.os + @property + def host_type(self): + # If hostname is in format of (.+)-([0-9]+), extract host_type + # from it. Otherwise set to operating_system + m = re.search("(.+)-([0-9]+)", self.hostname) + return m.group(1) if m else self.operating_system + @property def logger(self): if self._logger: diff --git a/ducktape/mark/resource.py b/ducktape/mark/resource.py index 6addd36d7..bcac4d5d3 100644 --- a/ducktape/mark/resource.py +++ b/ducktape/mark/resource.py @@ -16,7 +16,9 @@ from ducktape.mark._mark import Mark +CLUSTER_SPEC_KEYWORD = "cluster_spec" CLUSTER_SIZE_KEYWORD = "num_nodes" +NODES_SPEC_KEYWORD = "nodes_spec" class ClusterUseMetadata(Mark): diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index 3a7a94c4e..e6e334ca5 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -111,19 +111,19 @@ def run(self): self.log(logging.DEBUG, "Checking if there are enough nodes...") min_cluster_spec = self.test.min_cluster_spec() - os_to_num_nodes = {} + type_to_num_nodes = {} for node_spec in min_cluster_spec: - if not os_to_num_nodes.get(node_spec.operating_system): - os_to_num_nodes[node_spec.operating_system] = 1 + if not type_to_num_nodes.get(node_spec.host_type): + type_to_num_nodes[node_spec.host_type] = 1 else: - os_to_num_nodes[node_spec.operating_system] = os_to_num_nodes[node_spec.operating_system] + 1 - for (operating_system, node_count) in os_to_num_nodes.iteritems(): - num_avail = len(list(self.cluster.all().nodes.elements(operating_system=operating_system))) + type_to_num_nodes[node_spec.host_type] = type_to_num_nodes[node_spec.host_type] + 1 + for (host_type, node_count) in type_to_num_nodes.iteritems(): + num_avail = len(list(self.cluster.all().nodes.elements(host_type=host_type))) if node_count > num_avail: raise RuntimeError( "There are not enough nodes available in the cluster to run this test. " "Cluster size for %s: %d, Need at least: %d. Services currently registered: %s" % - (operating_system, num_avail, node_count, self.test_context.services)) + (host_type, num_avail, node_count, self.test_context.services)) # Run the test unit start_time = time.time() diff --git a/ducktape/tests/test.py b/ducktape/tests/test.py index 99967760c..6cf2836ee 100644 --- a/ducktape/tests/test.py +++ b/ducktape/tests/test.py @@ -27,7 +27,9 @@ from ducktape.command_line.defaults import ConsoleDefaults from ducktape.services.service_registry import ServiceRegistry from ducktape.template import TemplateRenderer +from ducktape.mark.resource import CLUSTER_SPEC_KEYWORD from ducktape.mark.resource import CLUSTER_SIZE_KEYWORD +from ducktape.mark.resource import NODES_SPEC_KEYWORD from ducktape.tests.status import FAIL @@ -384,9 +386,15 @@ def expected_cluster_spec(self): :return: A ClusterSpec object. """ + cluster_spec = self.cluster_use_metadata.get(CLUSTER_SPEC_KEYWORD) cluster_size = self.cluster_use_metadata.get(CLUSTER_SIZE_KEYWORD) - if cluster_size is not None: + nodes_spec = self.cluster_use_metadata.get(NODES_SPEC_KEYWORD) + if cluster_spec is not None: + return cluster_spec + elif cluster_size is not None: return ClusterSpec.simple_linux(cluster_size) + elif nodes_spec is not None: + return ClusterSpec.from_dict(nodes_spec) elif self.cluster is None: return ClusterSpec.empty() else: