Skip to content

Commit

Permalink
Support node type based allocation
Browse files Browse the repository at this point in the history
  • Loading branch information
whynick1 committed Feb 2, 2019
1 parent 06d5e6b commit 3177db2
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 48 deletions.
5 changes: 2 additions & 3 deletions ducktape/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ def name(self):
return self.account.hostname

@property
def operating_system(self):
return self.account.operating_system

def host_type(self):
return self.account.host_type

class Cluster(object):
""" Interface for a cluster -- a collection of nodes with login credentials.
Expand Down
28 changes: 18 additions & 10 deletions ducktape/cluster/cluster_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@

WINDOWS = "windows"

SUPPORTED_OS_TYPES = [LINUX, WINDOWS]


class NodeSpec(object):
"""
The specification for a ducktape cluster node.
:param operating_system: The operating system of the node.
:param host_type: The type of the node.
"""
def __init__(self, operating_system=LINUX):
self.operating_system = operating_system
if self.operating_system not in SUPPORTED_OS_TYPES:
raise RuntimeError("Unsupported os type %s" % self.operating_system)
def __init__(self, host_type=LINUX):
self.host_type = host_type

def __str__(self):
dict = {
"os": self.operating_system,
"type": self.host_type,
}
return json.dumps(dict, sort_keys=True)

Expand All @@ -57,15 +53,27 @@ def simple_linux(num_nodes):
"""
Create a ClusterSpec containing some simple Linux nodes.
"""
node_specs = [NodeSpec(LINUX)] * num_nodes
node_specs_dict = {LINUX: num_nodes}
return ClusterSpec.from_dict(node_specs_dict)

@staticmethod
def from_dict(node_specs_dict):
"""
Create a ClusterSpec from a dict of nodes specifics.
etc. {'type1': 10, 'type2':5}
"""
node_specs = []
for node_type, num_nodes in node_specs_dict.iteritems():
for x in range(num_nodes):
node_specs.append(NodeSpec(node_type))
return ClusterSpec(node_specs)

@staticmethod
def from_nodes(nodes):
"""
Create a ClusterSpec describing a list of nodes.
"""
return ClusterSpec(ClusterSpec([NodeSpec(node.operating_system) for node in nodes]))
return ClusterSpec(ClusterSpec([NodeSpec(node.host_type) for node in nodes]))

def __init__(self, nodes=None):
"""
Expand Down
33 changes: 25 additions & 8 deletions ducktape/cluster/finite_subcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
from ducktape.cluster.cluster import Cluster
from ducktape.cluster.cluster_spec import ClusterSpec
from ducktape.cluster.node_container import NodeContainer
Expand All @@ -23,20 +24,36 @@ class FiniteSubcluster(Cluster):

def __init__(self, nodes):
self.nodes = nodes
self._available_nodes = NodeContainer(nodes)
self._in_use_nodes = NodeContainer()
self._available_nodes_dict = self._get_available_nodes_dict(nodes)
self._in_use_nodes_dict = defaultdict(NodeContainer)

def alloc(self, cluster_spec):
allocated = self._available_nodes.remove_spec(cluster_spec)
self._in_use_nodes.add_nodes(allocated)
for node_type in cluster_spec.nodes.type_to_nodes:
available_nodes = self._available_nodes_dict.get(node_type, NodeContainer())
allocated = available_nodes.remove_spec(cluster_spec)
self._in_use_nodes_dict[node_type].add_nodes(allocated)
return allocated

def free_single(self, node):
self._in_use_nodes.remove_node(node)
self._available_nodes.add_node(node)
self._in_use_nodes_dict[node.host_type].remove_node(node)
self._available_nodes_dict[node.host_type].add_node(node)

def available(self):
return ClusterSpec.from_nodes(self._available_nodes)
available_nodes = NodeContainer()
for container in self._available_nodes_dict.itervalues():
for node in container.elements():
available_nodes.add_node(node)
return ClusterSpec.from_nodes(available_nodes)

def used(self):
return ClusterSpec.from_nodes(self._in_use_nodes)
used_nodes = NodeContainer()
for container in self._in_use_nodes_dict.itervalues():
for node in container.elements():
used_nodes.add_node(node)
return ClusterSpec.from_nodes(used_nodes)

def _get_available_nodes_dict(self, nodes):
available_nodes_dict = defaultdict(NodeContainer)
for node in nodes:
available_nodes_dict[node.host_type].add_node(node)
return available_nodes_dict
38 changes: 19 additions & 19 deletions ducktape/cluster/node_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,40 @@ def __init__(self, nodes=None):
"""
Create a NodeContainer with the given nodes.
Node objects should implement at least an operating_system property.
Node objects should implement at least an host_type property.
:param nodes: A collection of node objects to add, or None to add nothing.
"""
self.os_to_nodes = {}
self.type_to_nodes = {}
if nodes is not None:
for node in nodes:
self.os_to_nodes.setdefault(node.operating_system, []).append(node)
self.type_to_nodes.setdefault(node.host_type, []).append(node)

def size(self):
"""
Returns the total number of nodes in the container.
"""
return sum([len(val) for val in self.os_to_nodes.values()])
return sum([len(val) for val in self.type_to_nodes.values()])

def __len__(self):
return self.size()

def __iter__(self):
return self.elements()

def elements(self, operating_system=None):
def elements(self, host_type=None):
"""
Yield the elements in this container.
:param operating_system: If this is non-None, we will iterate only over elements
which have this operating system.
:param host_type: If this is non-None, we will iterate only over nodes
with this type.
"""
if operating_system is None:
for node_list in self.os_to_nodes.values():
if host_type is None:
for node_list in self.type_to_nodes.values():
for node in node_list:
yield node
else:
for node in self.os_to_nodes.get(operating_system, []):
for node in self.type_to_nodes.get(host_type, []):
yield node

def add_node(self, node):
Expand All @@ -68,7 +68,7 @@ def add_node(self, node):
:param node: The node to add.
"""
self.os_to_nodes.setdefault(node.operating_system, []).append(node)
self.type_to_nodes.setdefault(node.host_type, []).append(node)

def add_nodes(self, nodes):
"""
Expand All @@ -88,7 +88,7 @@ def remove_node(self, node):
:throws NodeNotPresentError: If the node is not in the collection.
"""
try:
return self.os_to_nodes.get(node.operating_system, []).remove(node)
return self.type_to_nodes.get(node.host_type, []).remove(node)
except ValueError:
raise NodeNotPresentError

Expand All @@ -114,9 +114,9 @@ def remove_spec(self, cluster_spec):
if len(msg) > 0:
raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg)
removed = []
for os, node_specs in cluster_spec.nodes.os_to_nodes.iteritems():
for host_type, node_specs in cluster_spec.nodes.type_to_nodes.iteritems():
num_nodes = len(node_specs)
avail_nodes = self.os_to_nodes.get(os, [])
avail_nodes = self.type_to_nodes.get(host_type, [])
for i in range(0, num_nodes):
removed.append(avail_nodes.pop(0))
return removed
Expand All @@ -141,20 +141,20 @@ def attempt_remove_spec(self, cluster_spec):
an error string otherwise.
"""
msg = ""
for os, node_specs in cluster_spec.nodes.os_to_nodes.iteritems():
for host_type, node_specs in cluster_spec.nodes.type_to_nodes.iteritems():
num_nodes = len(node_specs)
avail_nodes = len(self.os_to_nodes.get(os, []))
avail_nodes = len(self.type_to_nodes.get(host_type, []))
if avail_nodes < num_nodes:
msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \
(os, num_nodes, os, avail_nodes)
(host_type, num_nodes, host_type, avail_nodes)
return msg

def clone(self):
"""
Returns a deep copy of this object.
"""
container = NodeContainer()
for operating_system, nodes in self.os_to_nodes.iteritems():
for host_type, nodes in self.type_to_nodes.iteritems():
for node in nodes:
container.os_to_nodes.setdefault(operating_system, []).append(node)
container.type_to_nodes.setdefault(host_type, []).append(node)
return container
8 changes: 8 additions & 0 deletions ducktape/cluster/remoteaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import stat
import tempfile
import warnings
import re

from ducktape.utils.http_utils import HttpMixin
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -144,6 +145,13 @@ def __init__(self, ssh_config, externally_routable_ip=None, logger=None):
def operating_system(self):
return self.os

@property
def host_type(self):
# If hostname is in format of (.+)-([0-9]+), extract host_type
# from it. Otherwise set to operating_system
m = re.search("(.+)-([0-9]+)", self.hostname)
return m.group(1) if m else self.operating_system

@property
def logger(self):
if self._logger:
Expand Down
2 changes: 2 additions & 0 deletions ducktape/mark/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

from ducktape.mark._mark import Mark

CLUSTER_SPEC_KEYWORD = "cluster_spec"
CLUSTER_SIZE_KEYWORD = "num_nodes"
NODES_SPEC_KEYWORD = "nodes_spec"


class ClusterUseMetadata(Mark):
Expand Down
14 changes: 7 additions & 7 deletions ducktape/tests/runner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,19 @@ def run(self):

self.log(logging.DEBUG, "Checking if there are enough nodes...")
min_cluster_spec = self.test.min_cluster_spec()
os_to_num_nodes = {}
type_to_num_nodes = {}
for node_spec in min_cluster_spec:
if not os_to_num_nodes.get(node_spec.operating_system):
os_to_num_nodes[node_spec.operating_system] = 1
if not type_to_num_nodes.get(node_spec.host_type):
type_to_num_nodes[node_spec.host_type] = 1
else:
os_to_num_nodes[node_spec.operating_system] = os_to_num_nodes[node_spec.operating_system] + 1
for (operating_system, node_count) in os_to_num_nodes.iteritems():
num_avail = len(list(self.cluster.all().nodes.elements(operating_system=operating_system)))
type_to_num_nodes[node_spec.host_type] = type_to_num_nodes[node_spec.host_type] + 1
for (host_type, node_count) in type_to_num_nodes.iteritems():
num_avail = len(list(self.cluster.all().nodes.elements(host_type=host_type)))
if node_count > num_avail:
raise RuntimeError(
"There are not enough nodes available in the cluster to run this test. "
"Cluster size for %s: %d, Need at least: %d. Services currently registered: %s" %
(operating_system, num_avail, node_count, self.test_context.services))
(host_type, num_avail, node_count, self.test_context.services))

# Run the test unit
start_time = time.time()
Expand Down
10 changes: 9 additions & 1 deletion ducktape/tests/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
from ducktape.command_line.defaults import ConsoleDefaults
from ducktape.services.service_registry import ServiceRegistry
from ducktape.template import TemplateRenderer
from ducktape.mark.resource import CLUSTER_SPEC_KEYWORD
from ducktape.mark.resource import CLUSTER_SIZE_KEYWORD
from ducktape.mark.resource import NODES_SPEC_KEYWORD
from ducktape.tests.status import FAIL


Expand Down Expand Up @@ -384,9 +386,15 @@ def expected_cluster_spec(self):
:return: A ClusterSpec object.
"""
cluster_spec = self.cluster_use_metadata.get(CLUSTER_SPEC_KEYWORD)
cluster_size = self.cluster_use_metadata.get(CLUSTER_SIZE_KEYWORD)
if cluster_size is not None:
nodes_spec = self.cluster_use_metadata.get(NODES_SPEC_KEYWORD)
if cluster_spec is not None:
return cluster_spec
elif cluster_size is not None:
return ClusterSpec.simple_linux(cluster_size)
elif nodes_spec is not None:
return ClusterSpec.from_dict(nodes_spec)
elif self.cluster is None:
return ClusterSpec.empty()
else:
Expand Down

0 comments on commit 3177db2

Please sign in to comment.