Skip to content

Commit

Permalink
Add MachineType
Browse files Browse the repository at this point in the history
  • Loading branch information
whynick1 committed Aug 31, 2019
1 parent f540a74 commit b9496ce
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 5 deletions.
4 changes: 4 additions & 0 deletions ducktape/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ def name(self):
def operating_system(self):
return self.account.operating_system

@property
def machine_type(self):
return self.account.machine_type


class Cluster(object):
""" Interface for a cluster -- a collection of nodes with login credentials.
Expand Down
70 changes: 66 additions & 4 deletions ducktape/cluster/cluster_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
from __future__ import absolute_import

import json
import re

from ducktape.cluster.node_container import NodeContainer
from ducktape.cluster.remoteaccount import MachineType

LINUX = "linux"

Expand All @@ -30,15 +32,21 @@ class NodeSpec(object):
The specification for a ducktape cluster node.
:param operating_system: The operating system of the node.
:param machine_type: The machine type of the node including required resource.
"""
def __init__(self, operating_system=LINUX):
def __init__(self, operating_system=LINUX, machine_type=None):
self.operating_system = operating_system
if self.operating_system not in SUPPORTED_OS_TYPES:
raise RuntimeError("Unsupported os type %s" % self.operating_system)
self.machine_type = machine_type or MachineType()

def __str__(self):
dict = {
"os": self.operating_system,
"cpu": self.machine_type.cpu_core,
"mem": self.machine_type.mem_size_gb,
"disk": self.machine_type.disk_size_gb,
"additional_disks": self.machine_type.additional_disks
}
return json.dumps(dict, sort_keys=True)

Expand All @@ -57,15 +65,69 @@ def simple_linux(num_nodes):
"""
Create a ClusterSpec containing some simple Linux nodes.
"""
node_specs = [NodeSpec(LINUX)] * num_nodes
return ClusterSpec(node_specs)
node_specs_dict = {'os': LINUX, 'num_nodes': num_nodes}
return ClusterSpec.from_dict(node_specs_dict)

@staticmethod
def from_dict(node_specs_dict):
"""
Create ClusterSpec from a dict of nodes specifics. Operation system defaults to
'linux'. Number of nodes default to 1.
e.g. {'os':'linux', 'cpu':2, 'mem':'4GB', 'disk':'30GB', 'additional_disks':{'/dev/sdb':'100GB'}}
:param node_specs_dict: The dictionary of node specifics
:return: ClusterSpec
"""
os = node_specs_dict.get('os', LINUX)
cpu_core = node_specs_dict.get('cpu')
mem_size = node_specs_dict.get('mem')
disk_size = node_specs_dict.get('disk')
addl_disks = node_specs_dict.get('additional_disks', {})
addl_disks_gb = {d: ClusterSpec.to_gigabyte(d_size) for d, d_size in addl_disks.iteritems()}
num_nodes = node_specs_dict.get('num_nodes', 1)
return ClusterSpec([NodeSpec(os, MachineType(cpu_core, ClusterSpec.to_gigabyte(mem_size),
ClusterSpec.to_gigabyte(disk_size), addl_disks_gb)) for _ in range(num_nodes)])

@staticmethod
def from_list(node_specs_dict_list):
"""
Create a ClusterSpec from a list of nodes specifics dictionaries.
e.g. [{'cpu':1, 'mem':'500MB', 'disk':'10GB'},
{'cpu':2, 'mem':'4GB', 'disk':'30GB', 'num_nodes':2}]
:param node_specs_dict_list: The list of node specifics dictionaries
:return: ClusterSpec
"""
node_specs = []
for node_specs_dict in node_specs_dict_list:
cluster_spec = ClusterSpec.from_dict(node_specs_dict)
node_specs += cluster_spec.nodes
return ClusterSpec.from_nodes(node_specs)

@staticmethod
def to_gigabyte(size):
"""
Return number of gigabytes parsing from size.
:param size: The string representation of size in format of <number+[TB|T|GB|G|MB|M|KB|K]>
:return: number of gigabytes
"""
unit_definitions = {'kb': 1024, 'k': 1024,
'mb': 1024 ** 2, 'm': 1024 ** 2,
'gb': 1024 ** 3, 'g': 1024 ** 3,
'tb': 1024 ** 4, 't': 1024 ** 4}
m = re.match(r"(\d*\.?\d+|\d+)\s*(\w+)", size.lower(), re.I)
number = m.group(1)
unit = m.group(2)
num_bytes = float(number) * unit_definitions[unit]
return num_bytes / unit_definitions['gb']

@staticmethod
def from_nodes(nodes):
"""
Create a ClusterSpec describing a list of nodes.
"""
return ClusterSpec(ClusterSpec([NodeSpec(node.operating_system) for node in nodes]))
return ClusterSpec([NodeSpec(node.operating_system, node.machine_type) for node in nodes])

def __init__(self, nodes=None):
"""
Expand Down
53 changes: 52 additions & 1 deletion ducktape/cluster/remoteaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from contextlib import contextmanager
import logging
import os
import re
from paramiko import SSHClient, SSHConfig, MissingHostKeyPolicy
import shutil
import signal
Expand Down Expand Up @@ -139,11 +140,31 @@ def __init__(self, ssh_config, externally_routable_ip=None, logger=None):
self.os = None
self._ssh_client = None
self._sftp_client = None
self.machine_type = self._get_machine_type() if externally_routable_ip else None

@property
def operating_system(self):
return self.os

def _get_machine_type(self):
cpu_core_cmd = "lscpu | grep -oP '^CPU\(s\):\s*\K\d+'"
mem_size_cmd = "cat /proc/meminfo | grep -oP '^MemTotal:\s*\K\d+'"
disk_info_cmd = "sudo fdisk -l | grep -oP 'Disk\s+\K/dev/.+GB'"
boot_disk_cmd = "mount | grep -E '(/|/boot) ' | grep -oP '/dev/[a-z]+'"

cpu_core = int(self.ssh_output(cpu_core_cmd))
mem_size_gb = float(self.ssh_output(mem_size_cmd)) / (1024 ** 2)
disk_info = self.ssh_output(disk_info_cmd).strip()
boot_disk = self.ssh_output(boot_disk_cmd).strip()
disks = {}
for d in disk_info.splitlines():
d_name = re.match(r"(/dev/[a-z]+)", d).group(1)
d_size = float(re.match(r"/dev/[a-z]+:\s*([\d|\.]+)\s*GB", d).group(1))
disks[d_name] = d_size
additional_disks = {d: info for d, info in disks.iteritems() if d != boot_disk}

return MachineType(cpu_core, mem_size_gb, disks[boot_disk], additional_disks)

@property
def logger(self):
if self._logger:
Expand Down Expand Up @@ -239,7 +260,7 @@ def wait_for_http_service(self, port, headers, timeout=20, path='/'):
url = "http://%s:%s%s" % (self.externally_routable_ip, str(port), path)

err_msg = "Timed out trying to contact service on %s. " % url + \
"Either the service failed to start, or there is a problem with the url."
"Either the service failed to start, or there is a problem with the url."
wait_until(lambda: self._can_ping_url(url, headers), timeout_sec=timeout, backoff_sec=.25, err_msg=err_msg)

def _can_ping_url(self, url, headers):
Expand Down Expand Up @@ -715,3 +736,33 @@ class IgnoreMissingHostKeyPolicy(MissingHostKeyPolicy):

def missing_host_key(self, client, hostname, key):
return


class MachineType(object):
"""MachineType represents resource of a machine.
The resource contains number of cpu cores, memory size, boot disk and additional disks size.
Each RemoteAccount has its own MachineType.
Each NodeSpec has it own MachineType.
Node allocation is based on MachineType between requested NodeSpec and available RemoteAccount.
"""

def __init__(self, cpu_core=None, mem_size_gb=None, disk_size_gb=None, additional_disks=None):
"""
:param cpu_core: The number of cpu cores, default to 1
:param mem_size_gb: The size of memory in gigabyte, default to 1.0
:param disk_size_gb: The size of boot disk in gigabyte, default to 10.0
:param additional_disks: The dictionary of additional disks, e.g. {'/dev/sdb':10.0, '/dev/sdc':50.0}
"""
self.cpu_core = cpu_core or 1
self.mem_size_gb = mem_size_gb or 1.0
self.disk_size_gb = disk_size_gb or 10.0
self.additional_disks = additional_disks or {}

def __repr__(self):
return "'cpu':{}, 'mem(GB)':{}, 'disk(GB)':{}, 'additional_disks(GB)':{}" \
.format(self.cpu_core, self.mem_size_gb, self.disk_size_gb, self.additional_disks)

def __str__(self):
return "MachineType(cpu core:{}, memory(GB):{}, boot disk(GB):{}, additional disks(GB):{})" \
.format(self.cpu_core, self.mem_size_gb, self.disk_size_gb, self.additional_disks)

0 comments on commit b9496ce

Please sign in to comment.