Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support node type based allocation #205

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ducktape/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def name(self):
def operating_system(self):
return self.account.operating_system

@property
def machine_type(self):
return self.account.machine_type


class Cluster(object):
""" Interface for a cluster -- a collection of nodes with login credentials.
Expand Down
73 changes: 69 additions & 4 deletions ducktape/cluster/cluster_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from __future__ import absolute_import

import json
import re

from ducktape.cluster.node_container import NodeContainer
from ducktape.cluster.remoteaccount import MachineType

LINUX = "linux"

Expand All @@ -30,15 +32,21 @@ class NodeSpec(object):
The specification for a ducktape cluster node.

:param operating_system: The operating system of the node.
:param machine_type: The machine type of the node including required resource.
"""
def __init__(self, operating_system=LINUX):
def __init__(self, operating_system=LINUX, machine_type=None):
self.operating_system = operating_system
if self.operating_system not in SUPPORTED_OS_TYPES:
raise RuntimeError("Unsupported os type %s" % self.operating_system)
self.machine_type = machine_type or MachineType()

def __str__(self):
dict = {
"os": self.operating_system,
"cpu": self.machine_type.cpu_core,
"mem(GB)": self.machine_type.mem_size_gb,
"disk(GB)": self.machine_type.disk_size_gb,
"additional_disks(GB)": self.machine_type.additional_disks
}
return json.dumps(dict, sort_keys=True)

Expand All @@ -57,15 +65,72 @@ def simple_linux(num_nodes):
"""
Create a ClusterSpec containing some simple Linux nodes.
"""
node_specs = [NodeSpec(LINUX)] * num_nodes
return ClusterSpec(node_specs)
node_specs_dict = {'os': LINUX, 'num_nodes': num_nodes}
return ClusterSpec.from_dict(node_specs_dict)

@staticmethod
def from_dict(node_specs_dict):
"""
Create ClusterSpec from a dict of nodes specifics. Operation system defaults to
'linux'. Number of nodes default to 1.
e.g. {'os':'linux', 'cpu':2, 'mem':'4GB', 'disk':'30GB', 'additional_disks':{'/dev/sdb':'100GB'}}

:param node_specs_dict: The dictionary of node specifics
:return: ClusterSpec
"""
os = node_specs_dict.get('os', LINUX)
cpu_core = node_specs_dict.get('cpu')
mem_size = node_specs_dict.get('mem')
disk_size = node_specs_dict.get('disk')
addl_disks = node_specs_dict.get('additional_disks', {})
addl_disks_gb = {d: ClusterSpec.to_gigabyte(d_size) for d, d_size in addl_disks.items()}
num_nodes = node_specs_dict.get('num_nodes', 1)
return ClusterSpec([NodeSpec(os, MachineType(cpu_core, ClusterSpec.to_gigabyte(mem_size),
ClusterSpec.to_gigabyte(disk_size), addl_disks_gb)) for _ in range(num_nodes)])

@staticmethod
def from_list(node_specs_dict_list):
"""
Create a ClusterSpec from a list of nodes specifics dictionaries.
e.g. [{'cpu':1, 'mem':'500MB', 'disk':'10GB'},
{'cpu':2, 'mem':'4GB', 'disk':'30GB', 'num_nodes':2}]

:param node_specs_dict_list: The list of node specifics dictionaries
:return: ClusterSpec
"""
node_specs = []
for node_specs_dict in node_specs_dict_list:
cluster_spec = ClusterSpec.from_dict(node_specs_dict)
node_specs += cluster_spec.nodes
return ClusterSpec.from_nodes(node_specs)

@staticmethod
def to_gigabyte(size):
"""
Return number of gigabytes parsing from size.

:param size: The string representation of size in format of <number+[TB|T|GB|G|MB|M|KB|K]>
:return: number of gigabytes
"""
if size is None:
return size
else:
unit_definitions = {'kb': 1024, 'k': 1024,
'mb': 1024 ** 2, 'm': 1024 ** 2,
'gb': 1024 ** 3, 'g': 1024 ** 3,
'tb': 1024 ** 4, 't': 1024 ** 4}
m = re.match(r"(\d*\.?\d+|\d+)\s*(\w+)", size.lower(), re.I)
number = m.group(1)
unit = m.group(2)
num_bytes = float(number) * unit_definitions[unit]
return num_bytes / unit_definitions['gb']

@staticmethod
def from_nodes(nodes):
"""
Create a ClusterSpec describing a list of nodes.
"""
return ClusterSpec(ClusterSpec([NodeSpec(node.operating_system) for node in nodes]))
return ClusterSpec([NodeSpec(node.operating_system, node.machine_type) for node in nodes])

def __init__(self, nodes=None):
"""
Expand Down
14 changes: 7 additions & 7 deletions ducktape/cluster/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def __init__(self, cluster_json=None, *args, **kwargs):

"""
super(JsonCluster, self).__init__()
is_type_based = kwargs.get("is_type_based", True)
self._available_accounts = NodeContainer()
self._in_use_nodes = NodeContainer()
if cluster_json is None:
Expand All @@ -90,7 +91,7 @@ def __init__(self, cluster_json=None, *args, **kwargs):
"Cluster json has a node without a ssh_config field: %s\n Cluster json: %s" % (ninfo, cluster_json)

ssh_config = RemoteAccountSSHConfig(**ninfo.get("ssh_config", {}))
remote_account = JsonCluster.make_remote_account(ssh_config, ninfo.get("externally_routable_ip"))
remote_account = JsonCluster.make_remote_account(ssh_config, is_type_based, ninfo.get("externally_routable_ip"))
if remote_account.externally_routable_ip is None:
remote_account.externally_routable_ip = self._externally_routable_ip(remote_account)
self._available_accounts.add_node(remote_account)
Expand All @@ -100,15 +101,14 @@ def __init__(self, cluster_json=None, *args, **kwargs):
self._id_supplier = 0

@staticmethod
def make_remote_account(ssh_config, externally_routable_ip=None):
def make_remote_account(ssh_config, is_type_based, externally_routable_ip=None):
"""Factory function for creating the correct RemoteAccount implementation."""

if ssh_config.host and WINDOWS in ssh_config.host:
return WindowsRemoteAccount(ssh_config=ssh_config,
externally_routable_ip=externally_routable_ip)
return WindowsRemoteAccount(ssh_config=ssh_config, externally_routable_ip=externally_routable_ip,
is_type_based=is_type_based)
else:
return LinuxRemoteAccount(ssh_config=ssh_config,
externally_routable_ip=externally_routable_ip)
return LinuxRemoteAccount(ssh_config=ssh_config, externally_routable_ip=externally_routable_ip,
is_type_based=is_type_based)

def alloc(self, cluster_spec):
allocated_accounts = self._available_accounts.remove_spec(cluster_spec)
Expand Down
4 changes: 2 additions & 2 deletions ducktape/cluster/linux_remoteaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

class LinuxRemoteAccount(RemoteAccount):

def __init__(self, ssh_config, externally_routable_ip=None, logger=None):
def __init__(self, ssh_config, externally_routable_ip=None, logger=None, is_type_based=True):
super(LinuxRemoteAccount, self).__init__(ssh_config, externally_routable_ip=externally_routable_ip,
logger=logger)
logger=logger, is_type_based=is_type_based)
self._ssh_client = None
self._sftp_client = None
self.os = LINUX
Expand Down
3 changes: 2 additions & 1 deletion ducktape/cluster/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ class LocalhostCluster(Cluster):

def __init__(self, *args, **kwargs):
num_nodes = kwargs.get("num_nodes", 1000)
is_type_based = kwargs.get("is_type_based", True)
self._available_nodes = NodeContainer()
for i in range(num_nodes):
ssh_config = RemoteAccountSSHConfig("localhost%d" % i, hostname="localhost", port=22)
self._available_nodes.add_node(ClusterNode(LinuxRemoteAccount(ssh_config)))
self._available_nodes.add_node(ClusterNode(LinuxRemoteAccount(ssh_config=ssh_config, is_type_based=is_type_based)))
self._in_use_nodes = NodeContainer()

def alloc(self, cluster_spec):
Expand Down
84 changes: 72 additions & 12 deletions ducktape/cluster/node_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

from six import iteritems
from operator import attrgetter
from ducktape.cluster.remoteaccount import MachineType


class NodeNotPresentError(Exception):
Expand Down Expand Up @@ -105,7 +107,14 @@ def remove_nodes(self, nodes):

def remove_spec(self, cluster_spec):
"""
Remove nodes matching a ClusterSpec from this NodeContainer.
Remove nodes matching a ClusterSpec from this NodeContainer. Nodes are allocated
based on machine type with following strategy:

1) To compare MachineType, different weight has been assigned to configuration
as cpu > mem > disk > additional_disk, which means node1:{mem:4G, disk:100G}
required more resource than node2:{mem:2G, disk:200G}.
2) Always try to satisfy node specific that requires most resource.
3) Always try to allocate machine with least resource as possible.

:param cluster_spec: The cluster spec. This will not be modified.
:returns: A list of the nodes that were removed.
Expand All @@ -116,11 +125,17 @@ def remove_spec(self, cluster_spec):
if len(msg) > 0:
raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg)
removed = []
for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes):
num_nodes = len(node_specs)
for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes):
avail_nodes = self.os_to_nodes.get(os, [])
for i in range(0, num_nodes):
removed.append(avail_nodes.pop(0))
sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True)
sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes)
for req_node in sorted_req_nodes[:]:
for avail_node in sorted_avail_nodes[:]:
if NodeContainer.satisfy(avail_node, req_node):
sorted_avail_nodes.remove(avail_node)
avail_nodes.remove(avail_node)
removed.append(avail_node)
break
return removed

def can_remove_spec(self, cluster_spec):
Expand All @@ -136,21 +151,66 @@ def can_remove_spec(self, cluster_spec):

def attempt_remove_spec(self, cluster_spec):
"""
Attempt to remove a cluster_spec from this node container.
Attempt to remove a cluster_spec from this node container. Uses the same strategy
as remove_spec(self, cluster_spec).

:param cluster_spec: The cluster spec. This will not be modified.
:returns: An empty string if we can remove the nodes;
an error string otherwise.
"""
msg = ""
for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes):
num_nodes = len(node_specs)
avail_nodes = len(self.os_to_nodes.get(os, []))
if avail_nodes < num_nodes:
msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \
(os, num_nodes, os, avail_nodes)
for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes):
avail_nodes = self.os_to_nodes.get(os, [])
num_avail_nodes = len(avail_nodes)
num_req_nodes = len(req_nodes)
if num_avail_nodes < num_req_nodes:
msg = msg + "%s nodes requested: %d. %s nodes available: %d" % (os, num_req_nodes, os, num_avail_nodes)
sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True)
sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes)
for req_node in sorted_req_nodes[:]:
for avail_node in sorted_avail_nodes[:]:
if NodeContainer.satisfy(avail_node, req_node):
sorted_req_nodes.remove(req_node)
sorted_avail_nodes.remove(avail_node)
break
# check unsatisfied nodes
for unsatisfied_node in sorted_req_nodes:
msg += "\ncannot satisfy minimum requirement for requested node: %s" % str(unsatisfied_node)
return msg

@staticmethod
def satisfy(avail_node, req_node):
"""
Return true if available node satisfies the minimum requirement of requested node.
"""
if req_node.machine_type is None:
return True
if avail_node.machine_type.cpu_core < req_node.machine_type.cpu_core or \
avail_node.machine_type.mem_size_gb < req_node.machine_type.mem_size_gb or \
avail_node.machine_type.disk_size_gb < req_node.machine_type.disk_size_gb:
return False
for d_name, d_size in req_node.machine_type.additional_disks.items():
if avail_node.machine_type.additional_disks.get(d_name, 0) < d_size:
return False
return True

@staticmethod
def sort(nodes, reverse=False):
"""
Return sorted list of nodes based on machine_type.
"""
sorted_nodes = []
type_based_nodes = []
for node in nodes:
if node.machine_type is None:
sorted_nodes.append(node)
else:
type_based_nodes.append(node)

sorted_nodes.extend(sorted(type_based_nodes, key=attrgetter('machine_type.cpu_core', 'machine_type.mem_size_gb',
'machine_type.disk_size_gb', 'machine_type.additional_disks')))
return list(reversed(sorted_nodes)) if reverse else sorted_nodes

def clone(self):
"""
Returns a deep copy of this object.
Expand Down
Loading