Skip to content

Commit

Permalink
Merge branch 'dev' into feature-increase-ssh-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
XaverStiensmeier authored Apr 10, 2024
2 parents 9ccd5a5 + d4d3fc4 commit 6ef3acb
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 19 deletions.
64 changes: 52 additions & 12 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ def __init__(self, providers, configurations, config_path, log, debug=False, clu
self.wireguard_security_group_name = WIREGUARD_SECURITY_GROUP_NAME.format(cluster_id=self.cluster_id)

self.worker_counter = 0
# permanents holds groups or single nodes that ansible playbook should be run for during startup
self.permanents = ["vpn"]
self.vpn_counter = 0
self.thread_lock = threading.Lock()
self.vpn_master_thread_lock = threading.Lock()
self.worker_thread_lock = threading.Lock()
self.use_master_with_public_ip = not configurations[0].get("gateway") and configurations[0].get(
"useMasterWithPublicIp", True)
self.log.debug("Keyname: %s", self.key_name)
Expand Down Expand Up @@ -160,7 +163,7 @@ def generate_security_groups(self):
_ = provider.create_security_group(name=self.wireguard_security_group_name)["id"]
configuration["security_groups"].append(self.wireguard_security_group_name) # store in configuration

def start_vpn_or_master_instance(self, configuration, provider):
def start_vpn_or_master(self, configuration, provider):
"""
Start master/vpn-worker of a provider
@param configuration: dict configuration of said provider.
Expand All @@ -169,7 +172,7 @@ def start_vpn_or_master_instance(self, configuration, provider):
"""
identifier, instance_type, volumes = self.prepare_vpn_or_master_args(configuration, provider)
external_network = provider.get_external_network(configuration["network"])
with self.thread_lock:
with self.vpn_master_thread_lock:
if identifier == MASTER_IDENTIFIER: # pylint: disable=comparison-with-callable
name = identifier(cluster_id=self.cluster_id)
else:
Expand All @@ -185,6 +188,7 @@ def start_vpn_or_master_instance(self, configuration, provider):
# create a server and block until it is up and running
server = provider.create_server(name=name, flavor=flavor, key_name=self.key_name, image=image, network=network,
volumes=volumes, security_groups=configuration["security_groups"], wait=True)
print("MASTER", server)
configuration["private_v4"] = server["private_v4"]
self.log.debug(f"Created Server {name}: {server['private_v4']}.")
# get mac address for given private address
Expand All @@ -206,6 +210,27 @@ def start_vpn_or_master_instance(self, configuration, provider):
configuration["floating_ip"] = server["private_v4"] # pylint: enable=comparison-with-callable
configuration["volumes"] = provider.get_mount_info_from_server(server)

def start_worker(self, worker, worker_count, configuration, provider):
name = WORKER_IDENTIFIER(cluster_id=self.cluster_id, additional=worker_count)
self.log.info(f"Starting instance/server {name} on {provider.cloud_specification['identifier']}")
flavor = worker["type"]
network = configuration["network"]
image = image_selection.select_image(provider, worker["image"], self.log,
configuration.get("fallbackOnOtherImage"))

# create a server and block until it is up and running
server = provider.create_server(name=name, flavor=flavor, key_name=self.key_name, image=image, network=network,
volumes=None, security_groups=configuration["security_groups"], wait=True)
with self.worker_thread_lock:
self.permanents.append(name)
with open(a_rp.HOSTS_FILE, mode="r", encoding="utf-8") as hosts_file:
hosts = yaml.safe_load(hosts_file)
if not hosts or "host_entries" not in hosts:
self.log.info(f"Resetting host entries because {'first run' if hosts else 'broken'}.")
hosts = {"host_entries": {}}
hosts["host_entries"][name] = server["private_v4"]
ansible_configurator.write_yaml(a_rp.HOSTS_FILE, hosts, self.log)

def prepare_vpn_or_master_args(self, configuration, provider):
"""
Prepares start_instance arguments for master/vpn
Expand Down Expand Up @@ -314,26 +339,41 @@ def upload_data(self):
if self.configurations[0].get("dontUploadCredentials"):
commands = ssh_handler.ANSIBLE_START
else:
ansible_start = ssh_handler.ANSIBLE_START
ansible_start[-1] = (ansible_start[-1][0].format(",".join(self.permanents)), ansible_start[-1][1])
self.log.debug(f"Starting playbook with {ansible_start}.")
commands = [ssh_handler.get_ac_command(self.providers, AC_NAME.format(
cluster_id=self.cluster_id))] + ssh_handler.ANSIBLE_START
ssh_data = {"floating_ip": self.master_ip, "private_key": KEY_FOLDER + self.key_name,
"username": self.ssh_user, "commands": commands, "filepaths": FILEPATHS,
"gateway": self.configurations[0].get("gateway", {}), "timeout": self.ssh_timeout}
ssh_handler.execute_ssh(ssh_data=ssh_data, log=self.log)

def start_start_instance_threads(self):
def start_start_server_threads(self):
"""
Starts for each provider a start_instances thread and joins them.
@return:
"""
start_instance_threads = []
start_server_threads = []
worker_count = 0
for configuration, provider in zip(self.configurations, self.providers):
start_instance_thread = return_threading.ReturnThread(target=self.start_vpn_or_master_instance,
args=[configuration, provider])
start_instance_thread.start()
start_instance_threads.append(start_instance_thread)
for start_instance_thread in start_instance_threads:
start_instance_thread.join()
start_server_thread = return_threading.ReturnThread(target=self.start_vpn_or_master,
args=[configuration, provider])
start_server_thread.start()
start_server_threads.append(start_server_thread)
for worker in configuration.get("workerInstances", []):
if not worker.get("onDemand", True):
for _ in range(int(worker["count"])):
start_server_thread = return_threading.ReturnThread(target=self.start_worker,
args=[worker, worker_count, configuration,
provider])
start_server_thread.start()
start_server_threads.append(start_server_thread)
worker_count += 1
else:
worker_count += worker["count"]
for start_server_thread in start_server_threads:
start_server_thread.join()

def extended_network_configuration(self):
"""
Expand Down Expand Up @@ -374,7 +414,7 @@ def create(self): # pylint: disable=too-many-branches,too-many-statements
self.generate_keypair()
self.prepare_configurations()
self.generate_security_groups()
self.start_start_instance_threads()
self.start_start_server_threads()
self.extended_network_configuration()
self.initialize_instances()
self.upload_data()
Expand Down
2 changes: 1 addition & 1 deletion bibigrid/core/utility/ansible_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
MV_ANSIBLE_CONFIG = (
"sudo install -D /opt/playbook/ansible.cfg /etc/ansible/ansible.cfg", "Move ansible configuration.")
EXECUTE = (f"ansible-playbook {os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.SITE_YML)} -i "
f"{os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.ANSIBLE_HOSTS)} -l vpn",
f"{os.path.join(a_rp.PLAYBOOK_PATH_REMOTE, a_rp.ANSIBLE_HOSTS)} -l {{}}",
"Execute ansible playbook. Be patient.")

# ansible setup
Expand Down
10 changes: 6 additions & 4 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def generate_site_file_yaml(custom_roles):

def write_host_and_group_vars(configurations, providers, cluster_id, log): # pylint: disable=too-many-locals
"""
ToDo filter what information really is necessary. Determined by further development
Filters unnecessary information
@param configurations: configurations
@param providers: providers
Expand All @@ -96,7 +95,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
worker_dict = {"name": name, "regexp": regexp, "image": worker["image"],
"network": configuration["network"], "flavor": flavor_dict,
"gateway_ip": configuration["private_v4"],
"cloud_identifier": configuration["cloud_identifier"]}
"cloud_identifier": configuration["cloud_identifier"],
"on_demand": worker.get("onDemand", True)}

worker_features = worker.get("features", [])
if isinstance(worker_features, str):
Expand All @@ -119,7 +119,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
"floating_ip": configuration["floating_ip"], "private_v4": configuration["private_v4"],
"flavor": flavor_dict, "wireguard_ip": wireguard_ip,
"cloud_identifier": configuration["cloud_identifier"],
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False)}
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False),
"on_demand": False}
if configuration.get("wireguard_peer"):
vpngtw_dict["wireguard"] = {"ip": wireguard_ip, "peer": configuration.get("wireguard_peer")}
pass_through(configuration, vpngtw_dict, "waitForServices", "wait_for_services")
Expand All @@ -133,7 +134,8 @@ def write_host_and_group_vars(configurations, providers, cluster_id, log): # py
"network_cidrs": configuration["subnet_cidrs"], "floating_ip": configuration["floating_ip"],
"flavor": flavor_dict, "private_v4": configuration["private_v4"],
"cloud_identifier": configuration["cloud_identifier"], "volumes": configuration["volumes"],
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False)}
"fallback_on_other_image": configuration.get("fallbackOnOtherImage", False),
"on_demand": False}
if configuration.get("wireguard_peer"):
master_dict["wireguard"] = {"ip": "10.0.0.1", "peer": configuration.get("wireguard_peer")}
pass_through(configuration, master_dict, "waitForServices", "wait_for_services")
Expand Down
4 changes: 3 additions & 1 deletion documentation/markdown/features/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ Other infrastructures would be [AWS](https://aws.amazon.com/) and so on.
`cloud` decides which entry in the `clouds.yaml` is used. When using OpenStack the entry is named `openstack`.
You can read more about the `clouds.yaml` [here](cloud_specification_data.md).

#### workerInstances (optional)
#### workerInstances

`workerInstances` expects a list of worker groups (instance definitions with `count` key).
If `count` is omitted, `count: 1` is assumed.
Expand All @@ -200,11 +200,13 @@ workerInstance:
- type: de.NBI tiny
image: Ubuntu 22.04 LTS (2022-10-14)
count: 2
onDemand: True # optional only on master cloud for now. Default True.
```

- `type` sets the instance's hardware configuration.
- `image` sets the bootable operating system to be installed on the instance.
- `count` sets how many workers of that `type` `image` combination are in this work group
- `onDemand` defines whether nodes in the worker group are scheduled on demand (True) or are started permanently (False). This option only works on the master cloud for now.

##### Find your active `images`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ SlurmdLogFile=/var/log/slurm/slurmd.log
{{ sl.update({node.cloud_identifier: []}) }}
{% endif %}
{% if node.name not in sl[node.cloud_identifier] %}
NodeName={{ node.name }} SocketsPerBoard={{ node.flavor.vcpus }} CoresPerSocket=1 RealMemory={{ mem - [mem // 2, 16000] | min }} State=CLOUD {{"Features="+node.features|join(",") if node.features is defined}}# {{ node.cloud_identifier }}
NodeName={{ node.name }} SocketsPerBoard={{ node.flavor.vcpus }} CoresPerSocket=1 RealMemory={{ mem - [mem // 2, 16000] | min }} State={{'CLOUD' if node.on_demand else 'UNKNOWN'}} {{"Features="+node.features|join(",") if node.features is defined}}# {{ node.cloud_identifier }}
{{ sl[node.cloud_identifier].append(node.name)}}
{{ all.nodes.append(node.name)}}
{% endif %}
Expand Down

0 comments on commit 6ef3acb

Please sign in to comment.