Skip to content

Commit

Permalink
474 allow non on demandpermanent workers (#487)
Browse files Browse the repository at this point in the history
* added worker server start without anything else

* added host entry for permanent workers

* added state unknown for permanent nodes

* added on_demand key for groups and instances for ansible templating

* fixed wording

* temporary solution for custom execute list

* added documentation for onDemand
  • Loading branch information
XaverStiensmeier authored Apr 9, 2024
1 parent 9b0f632 commit d4d3fc4
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 27 deletions.
75 changes: 56 additions & 19 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ def __init__(self, providers, configurations, config_path, log, debug=False, clu
self.wireguard_security_group_name = WIREGUARD_SECURITY_GROUP_NAME.format(cluster_id=self.cluster_id)

self.worker_counter = 0
# permanents holds groups or single nodes that ansible playbook should be run for during startup
self.permanents = ["vpn"]
self.vpn_counter = 0
self.thread_lock = threading.Lock()
self.vpn_master_thread_lock = threading.Lock()
self.worker_thread_lock = threading.Lock()
self.use_master_with_public_ip = not configurations[0].get("gateway") and configurations[0].get(
"useMasterWithPublicIp", True)
self.log.debug("Keyname: %s", self.key_name)
Expand Down Expand Up @@ -151,16 +154,15 @@ def generate_security_groups(self):
for cidr in tmp_configuration['subnet_cidrs']:
rules.append(
{"direction": "ingress", "ethertype": "IPv4", "protocol": "tcp", "port_range_min": None,
"port_range_max": None, "remote_ip_prefix": cidr,
"remote_group_id": None})
"port_range_max": None, "remote_ip_prefix": cidr, "remote_group_id": None})
provider.append_rules_to_security_group(default_security_group_id, rules)
configuration["security_groups"] = [self.default_security_group_name] # store in configuration
# when running a multi-cloud setup create an additional wireguard group
if len(self.providers) > 1:
_ = provider.create_security_group(name=self.wireguard_security_group_name)["id"]
configuration["security_groups"].append(self.wireguard_security_group_name) # store in configuration

def start_vpn_or_master_instance(self, configuration, provider):
def start_vpn_or_master(self, configuration, provider):
"""
Start master/vpn-worker of a provider
@param configuration: dict configuration of said provider.
Expand All @@ -169,7 +171,7 @@ def start_vpn_or_master_instance(self, configuration, provider):
"""
identifier, instance_type, volumes = self.prepare_vpn_or_master_args(configuration, provider)
external_network = provider.get_external_network(configuration["network"])
with self.thread_lock:
with self.vpn_master_thread_lock:
if identifier == MASTER_IDENTIFIER: # pylint: disable=comparison-with-callable
name = identifier(cluster_id=self.cluster_id)
else:
Expand All @@ -185,6 +187,7 @@ def start_vpn_or_master_instance(self, configuration, provider):
# create a server and block until it is up and running
server = provider.create_server(name=name, flavor=flavor, key_name=self.key_name, image=image, network=network,
volumes=volumes, security_groups=configuration["security_groups"], wait=True)
print("MASTER", server)
configuration["private_v4"] = server["private_v4"]
self.log.debug(f"Created Server {name}: {server['private_v4']}.")
# get mac address for given private address
Expand All @@ -206,6 +209,27 @@ def start_vpn_or_master_instance(self, configuration, provider):
configuration["floating_ip"] = server["private_v4"] # pylint: enable=comparison-with-callable
configuration["volumes"] = provider.get_mount_info_from_server(server)

def start_worker(self, worker, worker_count, configuration, provider):
name = WORKER_IDENTIFIER(cluster_id=self.cluster_id, additional=worker_count)
self.log.info(f"Starting instance/server {name} on {provider.cloud_specification['identifier']}")
flavor = worker["type"]
network = configuration["network"]
image = image_selection.select_image(provider, worker["image"], self.log,
configuration.get("fallbackOnOtherImage"))

# create a server and block until it is up and running
server = provider.create_server(name=name, flavor=flavor, key_name=self.key_name, image=image, network=network,
volumes=None, security_groups=configuration["security_groups"], wait=True)
with self.worker_thread_lock:
self.permanents.append(name)
with open(a_rp.HOSTS_FILE, mode="r", encoding="utf-8") as hosts_file:
hosts = yaml.safe_load(hosts_file)
if not hosts or "host_entries" not in hosts:
self.log.info(f"Resetting host entries because {'first run' if hosts else 'broken'}.")
hosts = {"host_entries": {}}
hosts["host_entries"][name] = server["private_v4"]
ansible_configurator.write_yaml(a_rp.HOSTS_FILE, hosts, self.log)

def prepare_vpn_or_master_args(self, configuration, provider):
"""
Prepares start_instance arguments for master/vpn
Expand Down Expand Up @@ -314,25 +338,40 @@ def upload_data(self):
if self.configurations[0].get("dontUploadCredentials"):
commands = ssh_handler.ANSIBLE_START
else:
ansible_start = ssh_handler.ANSIBLE_START
ansible_start[-1] = (ansible_start[-1][0].format(",".join(self.permanents)), ansible_start[-1][1])
self.log.debug(f"Starting playbook with {ansible_start}.")
commands = [ssh_handler.get_ac_command(self.providers, AC_NAME.format(
cluster_id=self.cluster_id))] + ssh_handler.ANSIBLE_START
ssh_handler.execute_ssh(floating_ip=self.master_ip, private_key=KEY_FOLDER + self.key_name,
username=self.ssh_user, filepaths=FILEPATHS, commands=commands, log=self.log,
gateway=self.configurations[0].get("gateway", {}))

def start_start_instance_threads(self):
def start_start_server_threads(self):
"""
Starts for each provider a start_instances thread and joins them.
@return:
"""
start_instance_threads = []
start_server_threads = []
worker_count = 0
for configuration, provider in zip(self.configurations, self.providers):
start_instance_thread = return_threading.ReturnThread(target=self.start_vpn_or_master_instance,
args=[configuration, provider])
start_instance_thread.start()
start_instance_threads.append(start_instance_thread)
for start_instance_thread in start_instance_threads:
start_instance_thread.join()
start_server_thread = return_threading.ReturnThread(target=self.start_vpn_or_master,
args=[configuration, provider])
start_server_thread.start()
start_server_threads.append(start_server_thread)
for worker in configuration.get("workerInstances", []):
if not worker.get("onDemand", True):
for _ in range(int(worker["count"])):
start_server_thread = return_threading.ReturnThread(target=self.start_worker,
args=[worker, worker_count, configuration,
provider])
start_server_thread.start()
start_server_threads.append(start_server_thread)
worker_count += 1
else:
worker_count += worker["count"]
for start_server_thread in start_server_threads:
start_server_thread.join()

def extended_network_configuration(self):
"""
Expand All @@ -354,8 +393,7 @@ def extended_network_configuration(self):
f"{configuration_b['subnet_cidrs']})")
# add provider_b network as allowed network
for cidr in configuration_b["subnet_cidrs"]:
allowed_addresses.append(
{'ip_address': cidr, 'mac_address': configuration_a["mac_addr"]})
allowed_addresses.append({'ip_address': cidr, 'mac_address': configuration_a["mac_addr"]})
# configure security group rules
provider_a.append_rules_to_security_group(self.wireguard_security_group_name, [
{"direction": "ingress", "ethertype": "IPv4", "protocol": "udp", "port_range_min": 51820,
Expand All @@ -374,7 +412,7 @@ def create(self): # pylint: disable=too-many-branches,too-many-statements
self.generate_keypair()
self.prepare_configurations()
self.generate_security_groups()
self.start_start_instance_threads()
self.start_start_server_threads()
self.extended_network_configuration()
self.initialize_instances()
self.upload_data()
Expand Down Expand Up @@ -443,9 +481,8 @@ def log_cluster_start_info(self):
port = int(sympy.sympify(gateway["portFunction"]).subs(dict(octets)))
ssh_ip = gateway["ip"]
self.log.log(42, f"Cluster {self.cluster_id} with master {self.master_ip} up and running!")
self.log.log(42,
f"SSH: ssh -i '{KEY_FOLDER}{self.key_name}' {self.ssh_user}@{ssh_ip}"
f"{f' -p {port}' if gateway else ''}")
self.log.log(42, f"SSH: ssh -i '{KEY_FOLDER}{self.key_name}' {self.ssh_user}@{ssh_ip}"
f"{f' -p {port}' if gateway else ''}")
self.log.log(42, f"Terminate cluster: ./bibigrid.sh -i '{self.config_path}' -t -cid {self.cluster_id}")
self.log.log(42, f"Detailed cluster info: ./bibigrid.sh -i '{self.config_path}' -l -cid {self.cluster_id}")
if self.configurations[0].get("ide"):
Expand Down
2 changes: 1 addition & 1 deletion bibigrid/core/utility/ansible_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
MV_ANSIBLE_CONFIG = (
"sudo install -D /opt/playbook/ansible.cfg /etc/ansible/ansible.cfg", "Move ansible configuration.")
EXECUTE = (f"ansible-playbook {os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.SITE_YML)} -i "
f"{os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.ANSIBLE_HOSTS)} -l vpn",
f"{os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.ANSIBLE_HOSTS)} -l {{}}",
"Execute ansible playbook. Be patient.")

# ansible setup
Expand Down
10 changes: 6 additions & 4 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def generate_site_file_yaml(custom_roles):

def write_host_and_group_vars(configurations, providers, cluster_id, log): # pylint: disable=too-many-locals
"""
ToDo filter what information really is necessary. Determined by further development
Filters unnecessary information
@param configurations: configurations
@param providers: providers
Expand All @@ -95,7 +94,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
worker_dict = {"name": name, "regexp": regexp, "image": worker["image"],
"network": configuration["network"], "flavor": flavor_dict,
"gateway_ip": configuration["private_v4"],
"cloud_identifier": configuration["cloud_identifier"]}
"cloud_identifier": configuration["cloud_identifier"],
"on_demand": worker.get("onDemand", True)}

worker_features = worker.get("features", [])
if isinstance(worker_features, str):
Expand All @@ -118,7 +118,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
"floating_ip": configuration["floating_ip"], "private_v4": configuration["private_v4"],
"flavor": flavor_dict, "wireguard_ip": wireguard_ip,
"cloud_identifier": configuration["cloud_identifier"],
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False)}
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False),
"on_demand": False}
if configuration.get("wireguard_peer"):
vpngtw_dict["wireguard"] = {"ip": wireguard_ip, "peer": configuration.get("wireguard_peer")}
pass_through(configuration, vpngtw_dict, "waitForServices", "wait_for_services")
Expand All @@ -132,7 +133,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
"network_cidrs": configuration["subnet_cidrs"], "floating_ip": configuration["floating_ip"],
"flavor": flavor_dict, "private_v4": configuration["private_v4"],
"cloud_identifier": configuration["cloud_identifier"], "volumes": configuration["volumes"],
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False)}
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False),
"on_demand": False}
if configuration.get("wireguard_peer"):
master_dict["wireguard"] = {"ip": "10.0.0.1", "peer": configuration.get("wireguard_peer")}
pass_through(configuration, master_dict, "waitForServices", "wait_for_services")
Expand Down
2 changes: 1 addition & 1 deletion bibigrid/core/utility/handler/ssh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def execute_ssh_cml_commands(client, commands, log):

def ansible_preparation(floating_ip, private_key, username, log, gateway, commands=None, filepaths=None):
"""
Installs python and pip. Then installs ansible over pip.
Installs python and pip. Then installs ansible via pip.
Copies private key to instance so cluster-nodes are reachable and sets permission as necessary.
Copies additional files and executes additional commands if given.
The playbook is copied later, because it needs all servers setup and is not time intensive.
Expand Down
4 changes: 3 additions & 1 deletion documentation/markdown/features/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ Other infrastructures would be [AWS](https://aws.amazon.com/) and so on.
`cloud` decides which entry in the `clouds.yaml` is used. When using OpenStack the entry is named `openstack`.
You can read more about the `clouds.yaml` [here](cloud_specification_data.md).

#### workerInstances (optional)
#### workerInstances

`workerInstances` expects a list of worker groups (instance definitions with `count` key).
If `count` is omitted, `count: 1` is assumed.
Expand All @@ -189,11 +189,13 @@ workerInstance:
- type: de.NBI tiny
image: Ubuntu 22.04 LTS (2022-10-14)
count: 2
onDemand: True # optional only on master cloud for now. Default True.
```

- `type` sets the instance's hardware configuration.
- `image` sets the bootable operating system to be installed on the instance.
- `count` sets how many workers of that `type` `image` combination are in this work group
- `onDemand` defines whether nodes in the worker group are scheduled on demand (True) or are started permanently (False). This option only works on the master cloud for now.

##### Find your active `images`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ SlurmdLogFile=/var/log/slurm/slurmd.log
{{ sl.update({node.cloud_identifier: []}) }}
{% endif %}
{% if node.name not in sl[node.cloud_identifier] %}
NodeName={{ node.name }} SocketsPerBoard={{ node.flavor.vcpus }} CoresPerSocket=1 RealMemory={{ mem - [mem // 2, 16000] | min }} State=CLOUD {{"Features="+node.features|join(",") if node.features is defined}}# {{ node.cloud_identifier }}
NodeName={{ node.name }} SocketsPerBoard={{ node.flavor.vcpus }} CoresPerSocket=1 RealMemory={{ mem - [mem // 2, 16000] | min }} State={{'CLOUD' if node.on_demand else 'UNKNOWN'}} {{"Features="+node.features|join(",") if node.features is defined}}# {{ node.cloud_identifier }}
{{ sl[node.cloud_identifier].append(node.name)}}
{{ all.nodes.append(node.name)}}
{% endif %}
Expand Down

0 comments on commit d4d3fc4

Please sign in to comment.