From acfca30a7b435d4903c2766f5e7df6721976a7df Mon Sep 17 00:00:00 2001 From: Hongyi Wang Date: Tue, 10 Dec 2019 15:03:32 -0800 Subject: [PATCH 1/2] Allocate node by machine type --- ducktape/cluster/cluster.py | 4 ++ ducktape/cluster/cluster_spec.py | 73 ++++++++++++++++++-- ducktape/cluster/json.py | 14 ++-- ducktape/cluster/linux_remoteaccount.py | 4 +- ducktape/cluster/localhost.py | 3 +- ducktape/cluster/node_container.py | 84 +++++++++++++++++++---- ducktape/cluster/remoteaccount.py | 70 ++++++++++++++++++- ducktape/cluster/vagrant.py | 5 +- ducktape/cluster/windows_remoteaccount.py | 4 +- ducktape/services/background_thread.py | 4 +- ducktape/tests/runner_client.py | 23 ++----- 11 files changed, 237 insertions(+), 51 deletions(-) diff --git a/ducktape/cluster/cluster.py b/ducktape/cluster/cluster.py index cf404e565..ba561a12c 100644 --- a/ducktape/cluster/cluster.py +++ b/ducktape/cluster/cluster.py @@ -29,6 +29,10 @@ def name(self): def operating_system(self): return self.account.operating_system + @property + def machine_type(self): + return self.account.machine_type + class Cluster(object): """ Interface for a cluster -- a collection of nodes with login credentials. diff --git a/ducktape/cluster/cluster_spec.py b/ducktape/cluster/cluster_spec.py index 684cb3aee..eea3873e8 100644 --- a/ducktape/cluster/cluster_spec.py +++ b/ducktape/cluster/cluster_spec.py @@ -15,8 +15,10 @@ from __future__ import absolute_import import json +import re from ducktape.cluster.node_container import NodeContainer +from ducktape.cluster.remoteaccount import MachineType LINUX = "linux" @@ -30,15 +32,21 @@ class NodeSpec(object): The specification for a ducktape cluster node. :param operating_system: The operating system of the node. + :param machine_type: The machine type of the node including required resource. """ - def __init__(self, operating_system=LINUX): + def __init__(self, operating_system=LINUX, machine_type=None): self.operating_system = operating_system if self.operating_system not in SUPPORTED_OS_TYPES: raise RuntimeError("Unsupported os type %s" % self.operating_system) + self.machine_type = machine_type or MachineType() def __str__(self): dict = { "os": self.operating_system, + "cpu": self.machine_type.cpu_core, + "mem(GB)": self.machine_type.mem_size_gb, + "disk(GB)": self.machine_type.disk_size_gb, + "additional_disks(GB)": self.machine_type.additional_disks } return json.dumps(dict, sort_keys=True) @@ -57,15 +65,72 @@ def simple_linux(num_nodes): """ Create a ClusterSpec containing some simple Linux nodes. """ - node_specs = [NodeSpec(LINUX)] * num_nodes - return ClusterSpec(node_specs) + node_specs_dict = {'os': LINUX, 'num_nodes': num_nodes} + return ClusterSpec.from_dict(node_specs_dict) + + @staticmethod + def from_dict(node_specs_dict): + """ + Create ClusterSpec from a dict of nodes specifics. Operation system defaults to + 'linux'. Number of nodes default to 1. + e.g. {'os':'linux', 'cpu':2, 'mem':'4GB', 'disk':'30GB', 'additional_disks':{'/dev/sdb':'100GB'}} + + :param node_specs_dict: The dictionary of node specifics + :return: ClusterSpec + """ + os = node_specs_dict.get('os', LINUX) + cpu_core = node_specs_dict.get('cpu') + mem_size = node_specs_dict.get('mem') + disk_size = node_specs_dict.get('disk') + addl_disks = node_specs_dict.get('additional_disks', {}) + addl_disks_gb = {d: ClusterSpec.to_gigabyte(d_size) for d, d_size in addl_disks.items()} + num_nodes = node_specs_dict.get('num_nodes', 1) + return ClusterSpec([NodeSpec(os, MachineType(cpu_core, ClusterSpec.to_gigabyte(mem_size), + ClusterSpec.to_gigabyte(disk_size), addl_disks_gb)) for _ in range(num_nodes)]) + + @staticmethod + def from_list(node_specs_dict_list): + """ + Create a ClusterSpec from a list of nodes specifics dictionaries. + e.g. [{'cpu':1, 'mem':'500MB', 'disk':'10GB'}, + {'cpu':2, 'mem':'4GB', 'disk':'30GB', 'num_nodes':2}] + + :param node_specs_dict_list: The list of node specifics dictionaries + :return: ClusterSpec + """ + node_specs = [] + for node_specs_dict in node_specs_dict_list: + cluster_spec = ClusterSpec.from_dict(node_specs_dict) + node_specs += cluster_spec.nodes + return ClusterSpec.from_nodes(node_specs) + + @staticmethod + def to_gigabyte(size): + """ + Return number of gigabytes parsing from size. + + :param size: The string representation of size in format of + :return: number of gigabytes + """ + if size is None: + return size + else: + unit_definitions = {'kb': 1024, 'k': 1024, + 'mb': 1024 ** 2, 'm': 1024 ** 2, + 'gb': 1024 ** 3, 'g': 1024 ** 3, + 'tb': 1024 ** 4, 't': 1024 ** 4} + m = re.match(r"(\d*\.?\d+|\d+)\s*(\w+)", size.lower(), re.I) + number = m.group(1) + unit = m.group(2) + num_bytes = float(number) * unit_definitions[unit] + return num_bytes / unit_definitions['gb'] @staticmethod def from_nodes(nodes): """ Create a ClusterSpec describing a list of nodes. """ - return ClusterSpec(ClusterSpec([NodeSpec(node.operating_system) for node in nodes])) + return ClusterSpec([NodeSpec(node.operating_system, node.machine_type) for node in nodes]) def __init__(self, nodes=None): """ diff --git a/ducktape/cluster/json.py b/ducktape/cluster/json.py index ed2b3ef0a..7111d739b 100644 --- a/ducktape/cluster/json.py +++ b/ducktape/cluster/json.py @@ -75,6 +75,7 @@ def __init__(self, cluster_json=None, *args, **kwargs): """ super(JsonCluster, self).__init__() + is_type_based = kwargs.get("is_type_based", True) self._available_accounts = NodeContainer() self._in_use_nodes = NodeContainer() if cluster_json is None: @@ -90,7 +91,7 @@ def __init__(self, cluster_json=None, *args, **kwargs): "Cluster json has a node without a ssh_config field: %s\n Cluster json: %s" % (ninfo, cluster_json) ssh_config = RemoteAccountSSHConfig(**ninfo.get("ssh_config", {})) - remote_account = JsonCluster.make_remote_account(ssh_config, ninfo.get("externally_routable_ip")) + remote_account = JsonCluster.make_remote_account(ssh_config, is_type_based, ninfo.get("externally_routable_ip")) if remote_account.externally_routable_ip is None: remote_account.externally_routable_ip = self._externally_routable_ip(remote_account) self._available_accounts.add_node(remote_account) @@ -100,15 +101,14 @@ def __init__(self, cluster_json=None, *args, **kwargs): self._id_supplier = 0 @staticmethod - def make_remote_account(ssh_config, externally_routable_ip=None): + def make_remote_account(ssh_config, is_type_based, externally_routable_ip=None): """Factory function for creating the correct RemoteAccount implementation.""" - if ssh_config.host and WINDOWS in ssh_config.host: - return WindowsRemoteAccount(ssh_config=ssh_config, - externally_routable_ip=externally_routable_ip) + return WindowsRemoteAccount(ssh_config=ssh_config, externally_routable_ip=externally_routable_ip, + is_type_based=is_type_based) else: - return LinuxRemoteAccount(ssh_config=ssh_config, - externally_routable_ip=externally_routable_ip) + return LinuxRemoteAccount(ssh_config=ssh_config, externally_routable_ip=externally_routable_ip, + is_type_based=is_type_based) def alloc(self, cluster_spec): allocated_accounts = self._available_accounts.remove_spec(cluster_spec) diff --git a/ducktape/cluster/linux_remoteaccount.py b/ducktape/cluster/linux_remoteaccount.py index d345551f2..dd16301c1 100644 --- a/ducktape/cluster/linux_remoteaccount.py +++ b/ducktape/cluster/linux_remoteaccount.py @@ -18,9 +18,9 @@ class LinuxRemoteAccount(RemoteAccount): - def __init__(self, ssh_config, externally_routable_ip=None, logger=None): + def __init__(self, ssh_config, externally_routable_ip=None, logger=None, is_type_based=True): super(LinuxRemoteAccount, self).__init__(ssh_config, externally_routable_ip=externally_routable_ip, - logger=logger) + logger=logger, is_type_based=is_type_based) self._ssh_client = None self._sftp_client = None self.os = LINUX diff --git a/ducktape/cluster/localhost.py b/ducktape/cluster/localhost.py index f75b15ad7..b3714611a 100644 --- a/ducktape/cluster/localhost.py +++ b/ducktape/cluster/localhost.py @@ -28,10 +28,11 @@ class LocalhostCluster(Cluster): def __init__(self, *args, **kwargs): num_nodes = kwargs.get("num_nodes", 1000) + is_type_based = kwargs.get("is_type_based", True) self._available_nodes = NodeContainer() for i in range(num_nodes): ssh_config = RemoteAccountSSHConfig("localhost%d" % i, hostname="localhost", port=22) - self._available_nodes.add_node(ClusterNode(LinuxRemoteAccount(ssh_config))) + self._available_nodes.add_node(ClusterNode(LinuxRemoteAccount(ssh_config=ssh_config, is_type_based=is_type_based))) self._in_use_nodes = NodeContainer() def alloc(self, cluster_spec): diff --git a/ducktape/cluster/node_container.py b/ducktape/cluster/node_container.py index e5412f2ab..41550bc70 100644 --- a/ducktape/cluster/node_container.py +++ b/ducktape/cluster/node_container.py @@ -13,6 +13,8 @@ # limitations under the License. from six import iteritems +from operator import attrgetter +from ducktape.cluster.remoteaccount import MachineType class NodeNotPresentError(Exception): @@ -105,7 +107,14 @@ def remove_nodes(self, nodes): def remove_spec(self, cluster_spec): """ - Remove nodes matching a ClusterSpec from this NodeContainer. + Remove nodes matching a ClusterSpec from this NodeContainer. Nodes are allocated + based on machine type with following strategy: + + 1) To compare MachineType, different weight has been assigned to configuration + as cpu > mem > disk > additional_disk, which means node1:{mem:4G, disk:100G} + required more resource than node2:{mem:2G, disk:200G}. + 2) Always try to satisfy node specific that requires most resource. + 3) Always try to allocate machine with least resource as possible. :param cluster_spec: The cluster spec. This will not be modified. :returns: A list of the nodes that were removed. @@ -116,11 +125,17 @@ def remove_spec(self, cluster_spec): if len(msg) > 0: raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg) removed = [] - for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes): - num_nodes = len(node_specs) + for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes): avail_nodes = self.os_to_nodes.get(os, []) - for i in range(0, num_nodes): - removed.append(avail_nodes.pop(0)) + sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True) + sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes) + for req_node in sorted_req_nodes[:]: + for avail_node in sorted_avail_nodes[:]: + if NodeContainer.satisfy(avail_node, req_node): + sorted_avail_nodes.remove(avail_node) + avail_nodes.remove(avail_node) + removed.append(avail_node) + break return removed def can_remove_spec(self, cluster_spec): @@ -136,21 +151,66 @@ def can_remove_spec(self, cluster_spec): def attempt_remove_spec(self, cluster_spec): """ - Attempt to remove a cluster_spec from this node container. + Attempt to remove a cluster_spec from this node container. Uses the same strategy + as remove_spec(self, cluster_spec). :param cluster_spec: The cluster spec. This will not be modified. :returns: An empty string if we can remove the nodes; an error string otherwise. """ msg = "" - for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes): - num_nodes = len(node_specs) - avail_nodes = len(self.os_to_nodes.get(os, [])) - if avail_nodes < num_nodes: - msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \ - (os, num_nodes, os, avail_nodes) + for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes): + avail_nodes = self.os_to_nodes.get(os, []) + num_avail_nodes = len(avail_nodes) + num_req_nodes = len(req_nodes) + if num_avail_nodes < num_req_nodes: + msg = msg + "%s nodes requested: %d. %s nodes available: %d" % (os, num_req_nodes, os, num_avail_nodes) + sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True) + sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes) + for req_node in sorted_req_nodes[:]: + for avail_node in sorted_avail_nodes[:]: + if NodeContainer.satisfy(avail_node, req_node): + sorted_req_nodes.remove(req_node) + sorted_avail_nodes.remove(avail_node) + break + # check unsatisfied nodes + for unsatisfied_node in sorted_req_nodes: + msg += "\ncannot satisfy minimum requirement for requested node: %s" % str(unsatisfied_node) return msg + @staticmethod + def satisfy(avail_node, req_node): + """ + Return true if available node satisfies the minimum requirement of requested node. + """ + if req_node.machine_type is None: + return True + if avail_node.machine_type.cpu_core < req_node.machine_type.cpu_core or \ + avail_node.machine_type.mem_size_gb < req_node.machine_type.mem_size_gb or \ + avail_node.machine_type.disk_size_gb < req_node.machine_type.disk_size_gb: + return False + for d_name, d_size in req_node.machine_type.additional_disks.items(): + if avail_node.machine_type.additional_disks.get(d_name, 0) < d_size: + return False + return True + + @staticmethod + def sort(nodes, reverse=False): + """ + Return sorted list of nodes based on machine_type. + """ + sorted_nodes = [] + type_based_nodes = [] + for node in nodes: + if node.machine_type is None: + sorted_nodes.append(node) + else: + type_based_nodes.append(node) + + sorted_nodes.extend(sorted(type_based_nodes, key=attrgetter('machine_type.cpu_core', 'machine_type.mem_size_gb', + 'machine_type.disk_size_gb', 'machine_type.additional_disks'))) + return list(reversed(sorted_nodes)) if reverse else sorted_nodes + def clone(self): """ Returns a deep copy of this object. diff --git a/ducktape/cluster/remoteaccount.py b/ducktape/cluster/remoteaccount.py index 470248436..e255b65dc 100644 --- a/ducktape/cluster/remoteaccount.py +++ b/ducktape/cluster/remoteaccount.py @@ -15,6 +15,7 @@ from contextlib import contextmanager import logging import os +import re from paramiko import SSHClient, SSHConfig, MissingHostKeyPolicy import shutil import signal @@ -120,7 +121,7 @@ class RemoteAccount(HttpMixin): Each operating system has its own RemoteAccount implementation. """ - def __init__(self, ssh_config, externally_routable_ip=None, logger=None): + def __init__(self, ssh_config, externally_routable_ip=None, logger=None, is_type_based=True): # Instance of RemoteAccountSSHConfig - use this instead of a dict, because we need the entire object to # be hashable self.ssh_config = ssh_config @@ -139,11 +140,35 @@ def __init__(self, ssh_config, externally_routable_ip=None, logger=None): self.os = None self._ssh_client = None self._sftp_client = None + self.type = self._get_machine_type() if is_type_based else MachineType() @property def operating_system(self): return self.os + @property + def machine_type(self): + return self.type + + def _get_machine_type(self): + cpu_core_cmd = "lscpu | grep -oP '^CPU\(s\):\s*\K\d+'" + mem_size_cmd = "cat /proc/meminfo | grep -oP '^MemTotal:\s*\K\d+'" + disk_info_cmd = "sudo fdisk -l | grep -oP 'Disk\s+\K/dev/.+GB'" + boot_disk_cmd = "mount | grep -E '(/|/boot) ' | grep -oP '/dev/[a-z]+'" + + cpu_core = int(self.ssh_output(cpu_core_cmd)) + mem_size_gb = float(self.ssh_output(mem_size_cmd)) / (1024 ** 2) + disk_info = self.ssh_output(disk_info_cmd).strip() + boot_disk = self.ssh_output(boot_disk_cmd).strip() + disks = {} + for d in disk_info.splitlines(): + d_name = re.match(r"(/dev/[a-z]+)", d).group(1) + d_size = float(re.match(r"/dev/[a-z]+:\s*([\d|\.]+)\s*GB", d).group(1)) + disks[d_name] = d_size + additional_disks = {d: info for d, info in disks.items() if d != boot_disk} + + return MachineType(cpu_core, mem_size_gb, disks[boot_disk], additional_disks) + @property def logger(self): if self._logger: @@ -239,7 +264,7 @@ def wait_for_http_service(self, port, headers, timeout=20, path='/'): url = "http://%s:%s%s" % (self.externally_routable_ip, str(port), path) err_msg = "Timed out trying to contact service on %s. " % url + \ - "Either the service failed to start, or there is a problem with the url." + "Either the service failed to start, or there is a problem with the url." wait_until(lambda: self._can_ping_url(url, headers), timeout_sec=timeout, backoff_sec=.25, err_msg=err_msg) def _can_ping_url(self, url, headers): @@ -715,3 +740,44 @@ class IgnoreMissingHostKeyPolicy(MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): return + + +class MachineType(object): + """MachineType represents resource of a machine. + + The resource contains number of cpu cores, memory size, boot disk and additional disks size. + Each RemoteAccount has its own MachineType. + Each NodeSpec has it own MachineType. + Node allocation is based on MachineType between requested NodeSpec and available RemoteAccount. + """ + + DEFAULT_CPU_CORE = 0 + DEFAULT_MEM_SIZE = 0 + DEFAULT_DISK_SIZE = 0 + DEFAULT_ADDITIONAL_DISKS = {} + + def __init__(self, cpu_core=None, mem_size_gb=None, disk_size_gb=None, additional_disks=None): + """ + :param cpu_core: The number of cpu cores, default to 0 + :param mem_size_gb: The size of memory in gigabyte, default to 0 + :param disk_size_gb: The size of boot disk in gigabyte, default to 0 + :param additional_disks: The dictionary of additional disks, e.g. {'/dev/sdb':10.0, '/dev/sdc':50.0} + """ + self.cpu_core = cpu_core or self.DEFAULT_CPU_CORE + self.mem_size_gb = mem_size_gb or self.DEFAULT_MEM_SIZE + self.disk_size_gb = disk_size_gb or self.DEFAULT_DISK_SIZE + self.additional_disks = additional_disks or self.DEFAULT_ADDITIONAL_DISKS + + def __repr__(self): + return "'cpu':{}, 'mem(GB)':{}, 'disk(GB)':{}, 'additional_disks(GB)':{}" \ + .format(self.cpu_core, self.mem_size_gb, self.disk_size_gb, self.additional_disks) + + def __str__(self): + return "MachineType(cpu core:{}, memory(GB):{}, boot disk(GB):{}, additional disks(GB):{})" \ + .format(self.cpu_core, self.mem_size_gb, self.disk_size_gb, self.additional_disks) + + def __eq__(self, other): + return other and other.__dict__ == self.__dict__ + + def __hash__(self): + return hash(tuple(sorted(self.__dict__.items()))) diff --git a/ducktape/cluster/vagrant.py b/ducktape/cluster/vagrant.py index 462131a1a..8a26ec757 100644 --- a/ducktape/cluster/vagrant.py +++ b/ducktape/cluster/vagrant.py @@ -37,6 +37,7 @@ def __init__(self, *args, **kwargs): self._is_aws = None is_read_from_file = False + is_type_based = kwargs.get("is_type_based", True) cluster_file = kwargs.get("cluster_file") if cluster_file is not None: try: @@ -51,7 +52,7 @@ def __init__(self, *args, **kwargs): "nodes": self._get_nodes_from_vagrant() } - super(VagrantCluster, self).__init__(cluster_json) + super(VagrantCluster, self).__init__(cluster_json, is_type_based=is_type_based) # If cluster file is specified but the cluster info is not read from it, write the cluster info into the file if not is_read_from_file and cluster_file is not None: @@ -82,7 +83,7 @@ def _get_nodes_from_vagrant(self): account = None try: - account = JsonCluster.make_remote_account(ssh_config) + account = JsonCluster.make_remote_account(ssh_config, False) externally_routable_ip = account.fetch_externally_routable_ip(self.is_aws) finally: if account: diff --git a/ducktape/cluster/windows_remoteaccount.py b/ducktape/cluster/windows_remoteaccount.py index 68c7197ec..799b2c0bc 100644 --- a/ducktape/cluster/windows_remoteaccount.py +++ b/ducktape/cluster/windows_remoteaccount.py @@ -37,9 +37,9 @@ class WindowsRemoteAccount(RemoteAccount): WINRM_USERNAME = "Administrator" - def __init__(self, ssh_config, externally_routable_ip=None, logger=None): + def __init__(self, ssh_config, externally_routable_ip=None, logger=None, is_type_based=True): super(WindowsRemoteAccount, self).__init__(ssh_config, externally_routable_ip=externally_routable_ip, - logger=logger) + logger=logger, is_type_based=is_type_based) self.os = WINDOWS self._winrm_client = None diff --git a/ducktape/services/background_thread.py b/ducktape/services/background_thread.py index b204c454c..0a800739b 100644 --- a/ducktape/services/background_thread.py +++ b/ducktape/services/background_thread.py @@ -20,8 +20,8 @@ class BackgroundThreadService(Service): - def __init__(self, context, num_nodes): - super(BackgroundThreadService, self).__init__(context, num_nodes) + def __init__(self, context, num_nodes=None, cluster_spec=None): + super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec) self.worker_threads = {} self.worker_errors = {} self.errors = '' diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index 91858d406..ce84c6736 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -19,14 +19,11 @@ import traceback import zmq -from six import iteritems - from ducktape.tests.event import ClientEventFactory from ducktape.tests.loader import TestLoader +from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL from ducktape.tests.serde import SerDe from ducktape.tests.test import test_logger, TestContext - -from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL from ducktape.utils.local_filesystem_utils import mkdir_p @@ -113,19 +110,11 @@ def run(self): self.log(logging.DEBUG, "Checking if there are enough nodes...") min_cluster_spec = self.test.min_cluster_spec() - os_to_num_nodes = {} - for node_spec in min_cluster_spec: - if not os_to_num_nodes.get(node_spec.operating_system): - os_to_num_nodes[node_spec.operating_system] = 1 - else: - os_to_num_nodes[node_spec.operating_system] = os_to_num_nodes[node_spec.operating_system] + 1 - for (operating_system, node_count) in iteritems(os_to_num_nodes): - num_avail = len(list(self.cluster.all().nodes.elements(operating_system=operating_system))) - if node_count > num_avail: - raise RuntimeError( - "There are not enough nodes available in the cluster to run this test. " - "Cluster size for %s: %d, Need at least: %d. Services currently registered: %s" % - (operating_system, num_avail, node_count, self.test_context.services)) + + # Check test resource + msg = self.cluster.all().nodes.attempt_remove_spec(min_cluster_spec) + if len(msg) > 0: + raise RuntimeError("There are not enough nodes available in the cluster to run this test. " + msg) # Run the test unit start_time = time.time() From 1a1f62f78fac8a318b524343daa1d0b55286c2bb Mon Sep 17 00:00:00 2001 From: Hongyi Wang Date: Tue, 10 Dec 2019 15:04:36 -0800 Subject: [PATCH 2/2] Fix unit tests --- tests/cluster/check_cluster.py | 13 +++--- tests/cluster/check_cluster_spec.py | 21 ++++++++- tests/cluster/check_finite_subcluster.py | 4 ++ tests/cluster/check_json.py | 56 ++++++++++++++---------- tests/cluster/check_localhost.py | 4 +- tests/cluster/check_remoteaccount.py | 3 +- tests/cluster/check_vagrant.py | 8 ++-- tests/ducktape_mock.py | 7 ++- tests/runner/check_runner.py | 6 +-- tests/runner/check_runner_memory.py | 2 +- tests/services/check_service.py | 2 +- tests/tests/check_session.py | 2 +- 12 files changed, 84 insertions(+), 44 deletions(-) diff --git a/tests/cluster/check_cluster.py b/tests/cluster/check_cluster.py index 2c9ad3ebf..95a3b514c 100644 --- a/tests/cluster/check_cluster.py +++ b/tests/cluster/check_cluster.py @@ -16,20 +16,21 @@ from ducktape.cluster.cluster import ClusterNode from ducktape.cluster.cluster_spec import NodeSpec, ClusterSpec, LINUX, WINDOWS +from ducktape.cluster.remoteaccount import MachineType from tests.ducktape_mock import FakeCluster -FakeRemoteAccount = collections.namedtuple('FakeRemoteAccount', ['operating_system']) +FakeRemoteAccount = collections.namedtuple('FakeRemoteAccount', ['operating_system', 'machine_type']) class CheckCluster(object): def setup_method(self, _): self.cluster = FakeCluster(0) - self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=LINUX))) - self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=LINUX))) - self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS))) - self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS))) - self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS))) + self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=LINUX, machine_type=MachineType()))) + self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=LINUX, machine_type=MachineType()))) + self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS, machine_type=MachineType()))) + self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS, machine_type=MachineType()))) + self.cluster._available_nodes.add_node(ClusterNode(FakeRemoteAccount(operating_system=WINDOWS, machine_type=MachineType()))) def spec(self, linux_nodes, windows_nodes): nodes = [] diff --git a/tests/cluster/check_cluster_spec.py b/tests/cluster/check_cluster_spec.py index b50c9edbe..0cbdb4c20 100644 --- a/tests/cluster/check_cluster_spec.py +++ b/tests/cluster/check_cluster_spec.py @@ -25,4 +25,23 @@ def check_to_string(self): empty = ClusterSpec.empty() assert "[]" == str(empty) simple_linux_5 = ClusterSpec.simple_linux(5) - assert '[{"num_nodes": 5, "os": "linux"}]' == str(simple_linux_5) + assert '[{"additional_disks(GB)": {}, "cpu": 0, "disk(GB)": 0, "mem(GB)": 0, "num_nodes": 5, "os": "linux"}]' \ + == str(simple_linux_5) + + def check_from_dict(self): + empty = ClusterSpec.empty() + assert "[]" == str(empty) + node_specs_dict = {'cpu': 2, 'mem': '2GB', 'disk': '30GB', 'num_nodes': 2} + custom_linux_1 = ClusterSpec.from_dict(node_specs_dict) + assert '[{"additional_disks(GB)": {}, "cpu": 2, "disk(GB)": 30.0, "mem(GB)": 2.0, "num_nodes": 2, "os": "linux"}]' \ + == str(custom_linux_1) + + def check_from_list(self): + empty = ClusterSpec.empty() + assert "[]" == str(empty) + node_specs_dict_list = [{'cpu': 2, 'mem': '2GB', 'disk': '20GB', 'num_nodes': 2}, + {'cpu': 4, 'mem': '4GB', 'disk': '40GB', 'num_nodes': 4}] + custom_linux_2 = ClusterSpec.from_list(node_specs_dict_list) + assert '[{"additional_disks(GB)": {}, "cpu": 2, "disk(GB)": 20.0, "mem(GB)": 2.0, "num_nodes": 2, "os": "linux"},' \ + ' {"additional_disks(GB)": {}, "cpu": 4, "disk(GB)": 40.0, "mem(GB)": 4.0, "num_nodes": 4, "os": "linux"}]' \ + == str(custom_linux_2) \ No newline at end of file diff --git a/tests/cluster/check_finite_subcluster.py b/tests/cluster/check_finite_subcluster.py index 0ef4d6182..7e6f727d8 100644 --- a/tests/cluster/check_finite_subcluster.py +++ b/tests/cluster/check_finite_subcluster.py @@ -15,6 +15,7 @@ from ducktape.cluster.cluster_spec import LINUX from ducktape.cluster.finite_subcluster import FiniteSubcluster from ducktape.cluster.node_container import InsufficientResourcesError, NodeNotPresentError +from ducktape.cluster.remoteaccount import MachineType from ducktape.services.service import Service import pickle import pytest @@ -25,6 +26,9 @@ class MockFiniteSubclusterNode: def operating_system(self): return LINUX + @property + def machine_type(self): + return MachineType() class CheckFiniteSubcluster(object): single_node_cluster_json = {"nodes": [{"hostname": "localhost"}]} diff --git a/tests/cluster/check_json.py b/tests/cluster/check_json.py index 2205b61eb..fdac49526 100644 --- a/tests/cluster/check_json.py +++ b/tests/cluster/check_json.py @@ -31,42 +31,48 @@ class CheckJsonCluster(object): def check_invalid_json(self): # Missing list of nodes with pytest.raises(ValueError): - JsonCluster({}) + JsonCluster(cluster_json={}, is_type_based=False) # Missing hostname, which is required with pytest.raises(ValueError): - JsonCluster({"nodes": [{}]}) + JsonCluster(cluster_json={"nodes": [{}]}, is_type_based=False) @staticmethod def cluster_hostnames(nodes): return set([node.account.hostname for node in nodes]) def check_cluster_size(self): - cluster = JsonCluster({"nodes": []}) + cluster = JsonCluster(cluster_json={"nodes": []}, is_type_based=False) assert len(cluster) == 0 n = 10 cluster = JsonCluster( - {"nodes": [ - {"ssh_config": {"hostname": "localhost%d" % x}} for x in range(n)]}) + cluster_json={ + "nodes": [ + {"ssh_config": {"host": "localhost%d" % x}} for x in range(n)]}, + is_type_based=False) assert len(cluster) == n def check_pickleable(self): cluster = JsonCluster( - {"nodes": [ - {"ssh_config": {"host": "localhost1"}}, - {"ssh_config": {"host": "localhost2"}}, - {"ssh_config": {"host": "localhost3"}}]}) + cluster_json={ + "nodes": [ + {"ssh_config": {"host": "localhost1"}}, + {"ssh_config": {"host": "localhost2"}}, + {"ssh_config": {"host": "localhost3"}}]}, + is_type_based=False) pickle.dumps(cluster) def check_allocate_free(self): cluster = JsonCluster( - {"nodes": [ - {"ssh_config": {"host": "localhost1"}}, - {"ssh_config": {"host": "localhost2"}}, - {"ssh_config": {"host": "localhost3"}}]}) + cluster_json = { + "nodes": [ + {"ssh_config": {"host": "localhost1"}}, + {"ssh_config": {"host": "localhost2"}}, + {"ssh_config": {"host": "localhost3"}}]}, + is_type_based=False) assert len(cluster) == 3 assert(cluster.num_available_nodes() == 3) @@ -93,9 +99,10 @@ def check_parsing(self): """ Checks that RemoteAccounts are generated correctly from input JSON""" node = JsonCluster( - { + cluster_json={ "nodes": [ - {"ssh_config": {"host": "hostname"}}]}).alloc(Service.setup_cluster_spec(num_nodes=1))[0] + {"ssh_config": {"host": "hostname"}}]}, + is_type_based=False).alloc(Service.setup_cluster_spec(num_nodes=1))[0] assert node.account.hostname == "hostname" assert node.account.user is None @@ -106,9 +113,12 @@ def check_parsing(self): "hostname": "localhost", "port": 22 } - node = JsonCluster({"nodes": [{"hostname": "hostname", + node = JsonCluster( + cluster_json={ + "nodes": [{"hostname": "hostname", "user": "user", - "ssh_config": ssh_config}]}).alloc(Service.setup_cluster_spec(num_nodes=1))[0] + "ssh_config": ssh_config}]}, + is_type_based=False).alloc(Service.setup_cluster_spec(num_nodes=1))[0] assert node.account.hostname == "hostname" assert node.account.user == "user" @@ -120,16 +130,18 @@ def check_parsing(self): assert node.account.ssh_config.port == 22 def check_exhausts_supply(self): - cluster = JsonCluster(self.single_node_cluster_json) + cluster = JsonCluster(cluster_json=self.single_node_cluster_json, is_type_based=False) with pytest.raises(InsufficientResourcesError): cluster.alloc(Service.setup_cluster_spec(num_nodes=2)) def check_node_names(self): cluster = JsonCluster( - {"nodes": [ - {"ssh_config": {"host": "localhost1"}}, - {"ssh_config": {"host": "localhost2"}}, - {"ssh_config": {"host": "localhost3"}}]}) + cluster_json={ + "nodes": [ + {"ssh_config": {"host": "localhost1"}}, + {"ssh_config": {"host": "localhost2"}}, + {"ssh_config": {"host": "localhost3"}}]}, + is_type_based=False) hosts = set(["localhost1", "localhost2", "localhost3"]) nodes = cluster.alloc(cluster.available()) assert hosts == set(node.name for node in nodes) diff --git a/tests/cluster/check_localhost.py b/tests/cluster/check_localhost.py index b32779552..8f31a7535 100644 --- a/tests/cluster/check_localhost.py +++ b/tests/cluster/check_localhost.py @@ -20,13 +20,13 @@ class CheckLocalhostCluster(object): def setup_method(self, _): - self.cluster = LocalhostCluster() + self.cluster = LocalhostCluster(is_type_based=False) def check_size(self): len(self.cluster) >= 2 ** 31 - 1 def check_pickleable(self): - cluster = LocalhostCluster() + cluster = LocalhostCluster(is_type_based=False) pickle.dumps(cluster) def check_request_free(self): diff --git a/tests/cluster/check_remoteaccount.py b/tests/cluster/check_remoteaccount.py index 57c1c2f6a..6a76b8b9e 100644 --- a/tests/cluster/check_remoteaccount.py +++ b/tests/cluster/check_remoteaccount.py @@ -100,7 +100,8 @@ def check_remote_account_equality(self): kwargs = { "ssh_config": ssh_config, "externally_routable_ip": "345", - "logger": logging.getLogger(__name__) + "logger": logging.getLogger(__name__), + "is_type_based": False } r1 = RemoteAccount(**kwargs) r2 = RemoteAccount(**kwargs) diff --git a/tests/cluster/check_vagrant.py b/tests/cluster/check_vagrant.py index f5d68ebd0..aff420712 100644 --- a/tests/cluster/check_vagrant.py +++ b/tests/cluster/check_vagrant.py @@ -66,7 +66,7 @@ def _set_monkeypatch_attr(self, monkeypatch): def check_pickleable(self, monkeypatch): self._set_monkeypatch_attr(monkeypatch) - cluster = VagrantCluster() + cluster = VagrantCluster(is_type_based=False) pickle.dumps(cluster) def check_one_host_parsing(self, monkeypatch): @@ -75,7 +75,7 @@ def check_one_host_parsing(self, monkeypatch): """ self._set_monkeypatch_attr(monkeypatch) - cluster = VagrantCluster() + cluster = VagrantCluster(is_type_based=False) assert len(cluster) == 2 assert cluster.num_available_nodes() == 2 node1, node2 = cluster.alloc(Service.setup_cluster_spec(num_nodes=2)) @@ -96,7 +96,7 @@ def check_cluster_file_write(self, monkeypatch): self._set_monkeypatch_attr(monkeypatch) assert not os.path.exists(self.cluster_file) - cluster = VagrantCluster(cluster_file=self.cluster_file) + cluster = VagrantCluster(cluster_file=self.cluster_file, is_type_based=False) cluster_json_expected = {} nodes = [ { @@ -159,7 +159,7 @@ def check_cluster_file_read(self, monkeypatch): indent=2, separators=(',', ': '), sort_keys=True) # Load the cluster from the json file we just created - cluster = VagrantCluster(cluster_file=self.cluster_file) + cluster = VagrantCluster(cluster_file=self.cluster_file, is_type_based=False) assert len(cluster) == 2 assert cluster.num_available_nodes() == 2 diff --git a/tests/ducktape_mock.py b/tests/ducktape_mock.py index 3477b4d56..bfa22cd7d 100644 --- a/tests/ducktape_mock.py +++ b/tests/ducktape_mock.py @@ -18,7 +18,7 @@ from ducktape.tests.session import SessionContext from ducktape.tests.test import TestContext from ducktape.cluster.linux_remoteaccount import LinuxRemoteAccount -from ducktape.cluster.remoteaccount import RemoteAccountSSHConfig +from ducktape.cluster.remoteaccount import RemoteAccountSSHConfig, MachineType from mock import MagicMock @@ -35,6 +35,9 @@ class FakeClusterNode(object): def operating_system(self): return LINUX + @property + def machine_type(self): + return MachineType() class FakeCluster(Cluster): """A cluster class with counters, but no actual node objects""" @@ -95,4 +98,4 @@ def __init__(self): hostname="localhost", port=22) - super(MockAccount, self).__init__(ssh_config, externally_routable_ip="localhost", logger=None) + super(MockAccount, self).__init__(ssh_config, externally_routable_ip="localhost", is_type_based=False, logger=None) diff --git a/tests/runner/check_runner.py b/tests/runner/check_runner.py index 651b601e3..3a4d8ac55 100644 --- a/tests/runner/check_runner.py +++ b/tests/runner/check_runner.py @@ -65,7 +65,7 @@ def _do_expand(self, test_file, test_class, test_methods, cluster=None, session_ def check_simple_run(self): """Check expected behavior when running a single test.""" - mock_cluster = LocalhostCluster(num_nodes=1000) + mock_cluster = LocalhostCluster(num_nodes=1000, is_type_based=False) session_context = tests.ducktape_mock.session_context() test_methods = [TestThingy.test_pi, TestThingy.test_ignore1, TestThingy.test_ignore2] @@ -86,7 +86,7 @@ def check_exit_first(self): """Confirm that exit_first in session context has desired effect of preventing any tests from running after the first test failure. """ - mock_cluster = LocalhostCluster(num_nodes=1000) + mock_cluster = LocalhostCluster(num_nodes=1000, is_type_based=False) session_context = tests.ducktape_mock.session_context(**{"exit_first": True}) test_methods = [FailingTest.test_fail] @@ -100,7 +100,7 @@ def check_exit_first(self): def check_exits_if_failed_to_initialize(self): """Validate that runner exits correctly when tests failed to initialize. """ - mock_cluster = LocalhostCluster(num_nodes=1000) + mock_cluster = LocalhostCluster(num_nodes=1000, is_type_based=False) session_context = tests.ducktape_mock.session_context() ctx_list = self._do_expand(test_file=FAILS_TO_INIT_TEST_FILE, test_class=FailsToInitTest, diff --git a/tests/runner/check_runner_memory.py b/tests/runner/check_runner_memory.py index 5958d2d05..99fe56a70 100644 --- a/tests/runner/check_runner_memory.py +++ b/tests/runner/check_runner_memory.py @@ -62,7 +62,7 @@ def _run_single_test(self, test_context): class CheckMemoryUsage(object): def setup_method(self, _): - self.cluster = LocalhostCluster(num_nodes=100) + self.cluster = LocalhostCluster(num_nodes=100, is_type_based=False) self.session_context = tests.ducktape_mock.session_context() def check_for_inter_test_memory_leak(self): diff --git a/tests/services/check_service.py b/tests/services/check_service.py index 53c82f06a..acaee64c0 100644 --- a/tests/services/check_service.py +++ b/tests/services/check_service.py @@ -40,7 +40,7 @@ def idx(self, node): class CheckAllocateFree(object): def setup_method(self, _): - self.cluster = LocalhostCluster() + self.cluster = LocalhostCluster(is_type_based=False) self.session_context = session_context() self.context = test_context(self.session_context, cluster=self.cluster) diff --git a/tests/tests/check_session.py b/tests/tests/check_session.py index 6e3c8e60d..1cf41c1d1 100644 --- a/tests/tests/check_session.py +++ b/tests/tests/check_session.py @@ -40,7 +40,7 @@ def check_pickleable(self): kwargs = { "session_id": "hello-123", "results_dir": self.tempdir, - "cluster": LocalhostCluster(), + "cluster": LocalhostCluster(is_type_based=False), "globals": {} } session_context = SessionContext(**kwargs)