Skip to content

Commit

Permalink
Allocate node by machine type
Browse files Browse the repository at this point in the history
  • Loading branch information
whynick1 committed Aug 31, 2019
1 parent b9496ce commit 5231b40
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
73 changes: 61 additions & 12 deletions ducktape/cluster/node_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from six import iteritems
from operator import attrgetter


class NodeNotPresentError(Exception):
Expand Down Expand Up @@ -105,7 +106,14 @@ def remove_nodes(self, nodes):

def remove_spec(self, cluster_spec):
"""
Remove nodes matching a ClusterSpec from this NodeContainer.
Remove nodes matching a ClusterSpec from this NodeContainer. Nodes are allocated
based on machine type with following strategy:
1) To compare MachineType, different weight has been assigned to configuration
as cpu > mem > disk > additional_disk, which means node1:{mem:4G, disk:100G}
required more resource than node2:{mem:2G, disk:200G}.
2) Always try to satisfy node specific that requires most resource.
3) Always try to allocate machine with least resource as possible.
:param cluster_spec: The cluster spec. This will not be modified.
:returns: A list of the nodes that were removed.
Expand All @@ -116,11 +124,17 @@ def remove_spec(self, cluster_spec):
if len(msg) > 0:
raise InsufficientResourcesError("Not enough nodes available to allocate. " + msg)
removed = []
for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes):
num_nodes = len(node_specs)
for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes):
avail_nodes = self.os_to_nodes.get(os, [])
for i in range(0, num_nodes):
removed.append(avail_nodes.pop(0))
sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True)
sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes)
for req_node in sorted_req_nodes[:]:
for avail_node in sorted_avail_nodes[:]:
if NodeContainer.satisfy(avail_node, req_node):
sorted_avail_nodes.remove(avail_node)
avail_nodes.remove(avail_node)
removed.append(avail_node)
break
return removed

def can_remove_spec(self, cluster_spec):
Expand All @@ -136,21 +150,56 @@ def can_remove_spec(self, cluster_spec):

def attempt_remove_spec(self, cluster_spec):
"""
Attempt to remove a cluster_spec from this node container.
Attempt to remove a cluster_spec from this node container. Uses the same strategy
as remove_spec(self, cluster_spec).
:param cluster_spec: The cluster spec. This will not be modified.
:returns: An empty string if we can remove the nodes;
an error string otherwise.
"""
msg = ""
for os, node_specs in iteritems(cluster_spec.nodes.os_to_nodes):
num_nodes = len(node_specs)
avail_nodes = len(self.os_to_nodes.get(os, []))
if avail_nodes < num_nodes:
msg = msg + "%s nodes requested: %d. %s nodes available: %d" % \
(os, num_nodes, os, avail_nodes)
for os, req_nodes in iteritems(cluster_spec.nodes.os_to_nodes):
avail_nodes = self.os_to_nodes.get(os, [])
num_avail_nodes = len(avail_nodes)
num_req_nodes = len(req_nodes)
if num_avail_nodes < num_req_nodes:
msg = msg + "%s nodes requested: %d. %s nodes available: %d" % (os, num_req_nodes, os, num_avail_nodes)
sorted_req_nodes = NodeContainer.sort(nodes=req_nodes, reverse=True)
sorted_avail_nodes = NodeContainer.sort(nodes=avail_nodes)
for req_node in sorted_req_nodes[:]:
for avail_node in sorted_avail_nodes[:]:
if NodeContainer.satisfy(avail_node, req_node):
sorted_req_nodes.remove(req_node)
sorted_avail_nodes.remove(avail_node)
break
# check unsatisfied nodes
for unsatisfied_node in sorted_req_nodes:
msg += "\ncannot satisfy minimum requirement for requested node: %s" % str(unsatisfied_node)
return msg

@staticmethod
def satisfy(avail_node, req_node):
"""
Return true if available node satisfies the minimum requirement of requested node.
"""
if avail_node.machine_type.cpu_core < req_node.machine_type.cpu_core or \
avail_node.machine_type.mem_size_gb < req_node.machine_type.mem_size_gb or \
avail_node.machine_type.disk_size_gb < req_node.machine_type.disk_size_gb:
return False
for d_name, d_size in req_node.machine_type.additional_disks.iteritems():
if avail_node.machine_type.additional_disks.get(d_name, 0) < d_size:
return False
return True

@staticmethod
def sort(nodes, reverse=False):
"""
Return sorted list of nodes based on machine_type.
"""
return sorted(nodes, key=attrgetter('machine_type.cpu_core', 'machine_type.mem_size_gb',
'machine_type.disk_size_gb', 'machine_type.additional_disks'),
reverse=reverse)

def clone(self):
"""
Returns a deep copy of this object.
Expand Down
24 changes: 7 additions & 17 deletions ducktape/tests/runner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
import signal
import time
import traceback
import zmq

from six import iteritems
import zmq

from ducktape.tests.event import ClientEventFactory
from ducktape.tests.loader import TestLoader
from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL
from ducktape.tests.serde import SerDe
from ducktape.tests.test import test_logger, TestContext

from ducktape.tests.result import TestResult, IGNORE, PASS, FAIL
from ducktape.utils.local_filesystem_utils import mkdir_p


Expand Down Expand Up @@ -113,19 +111,11 @@ def run(self):

self.log(logging.DEBUG, "Checking if there are enough nodes...")
min_cluster_spec = self.test.min_cluster_spec()
os_to_num_nodes = {}
for node_spec in min_cluster_spec:
if not os_to_num_nodes.get(node_spec.operating_system):
os_to_num_nodes[node_spec.operating_system] = 1
else:
os_to_num_nodes[node_spec.operating_system] = os_to_num_nodes[node_spec.operating_system] + 1
for (operating_system, node_count) in iteritems(os_to_num_nodes):
num_avail = len(list(self.cluster.all().nodes.elements(operating_system=operating_system)))
if node_count > num_avail:
raise RuntimeError(
"There are not enough nodes available in the cluster to run this test. "
"Cluster size for %s: %d, Need at least: %d. Services currently registered: %s" %
(operating_system, num_avail, node_count, self.test_context.services))

# Check test resource
msg = self.cluster.all().nodes.attempt_remove_spec(min_cluster_spec)
if len(msg) > 0:
raise RuntimeError("There are not enough nodes available in the cluster to run this test. " + msg)

# Run the test unit
start_time = time.time()
Expand Down

0 comments on commit 5231b40

Please sign in to comment.