diff --git a/resources/playbook/roles/bibigrid/files/slurm/create_server.py b/resources/playbook/roles/bibigrid/files/slurm/create_server.py index 6a1bb51d..dddcac69 100644 --- a/resources/playbook/roles/bibigrid/files/slurm/create_server.py +++ b/resources/playbook/roles/bibigrid/files/slurm/create_server.py @@ -20,22 +20,17 @@ import yaml from openstack.exceptions import OpenStackCloudException -print("Test") - class ImageNotFoundException(Exception): """ Image not found exception""" LOGGER_FORMAT = "%(asctime)s [%(levelname)s] %(message)s" -# Create a StreamHandler to log to stdout -console_handler = logging.StreamHandler(sys.stdout) - -# Set the logging format for the StreamHandler -console_handler.setFormatter(logging.Formatter(LOGGER_FORMAT)) - -# Set up the logger with the StreamHandler -logging.basicConfig(level=logging.INFO, handlers=[console_handler]) +logging.basicConfig(format=LOGGER_FORMAT, filename="/var/log/slurm/create_server.log", level=logging.INFO) +# For Debugging +# console_handler = logging.StreamHandler(sys.stdout) +# console_handler.setFormatter(logging.Formatter(LOGGER_FORMAT)) +# logging.basicConfig(level=logging.INFO, handlers=[console_handler]) HOSTS_FILE_PATH = "/opt/playbook/vars/hosts.yaml" @@ -43,7 +38,7 @@ class ImageNotFoundException(Exception): start_time = time.time() if len(sys.argv) < 2: - logging.warning("usage: $0 instance1_name[,instance2_name,...]") + logging.warning("Not enough arguments!") logging.info("Your input %s with length %s", sys.argv, len(sys.argv)) sys.exit(1) start_workers = sys.argv[1].split("\n") @@ -76,31 +71,46 @@ class ImageNotFoundException(Exception): connections[cloud] = os_client_config.make_sdk(cloud=cloud, volume_api_version="3") -def attach_volumes(provider, instance, name): +def get_server_vars(name): + # loading server_vars + host_vars_path = f"/opt/playbook/host_vars/{name}.yaml" + server_vars = {"volumes": []} + if os.path.isfile(host_vars_path): + logging.info(f"Found host_vars file {host_vars_path}.") + with open(host_vars_path, mode="r", encoding="utf-8") as host_vars_file: + server_vars = yaml.safe_load(host_vars_file) + logging.info(f"Loaded Vars: {server_vars}") + else: + logging.info(f"No host vars exist (group vars still apply). Using {server_vars}") + return server_vars + + +def attach_volumes(provider, host_vars, name): logging.info("Creating volumes ...") + attach_volumes = host_vars.get('volumes', []) volumes = [] - logging.info(f"Instance Volumes {instance.get('volumes', [])}") - for i, attach_volume in enumerate(instance.get("volumes", [])): - logging.info(f"{i}: {attach_volume}") - volume_name = f"{name}-{i}" - logging.info(f"Creating volume {volume_name}") - volume = provider.create_volume(size=attach_volume.get("size", 50), name=volume_name) - attach_volume["name"] = volume_name - volumes.append(volume) + host_vars_path = f"/opt/playbook/host_vars/{name}.yaml" + + with FileLock(f"{host_vars_path}.lock"): + logging.info(f"Instance Volumes {attach_volumes}") + for i, attach_volume in enumerate(attach_volumes): + logging.info(f"{i}: {attach_volume}") + volume_name = f"{name}-{i}" + logging.info(f"Creating volume {volume_name}") + volume = provider.create_volume(size=attach_volume.get("size", 50), name=volume_name) + attach_volume["name"] = volume_name + volumes.append(volume) + with open(host_vars_path, mode="w+", encoding="utf-8") as host_vars_file: + yaml.dump(host_vars, host_vars_file) return volumes -def attached_volumes_host_vars_update(connection, server, instance): +def attached_volumes_host_vars_update(connection, server, host_vars): + logging.info("Updating host vars volume info") host_vars_path = f"/opt/playbook/host_vars/{server['name']}.yaml" + with FileLock(f"{host_vars_path}.lock"): logging.info(f"{host_vars_path}.lock acquired") - # loading server_vars - server_vars = {"volumes": []} - if os.path.isfile(host_vars_path): - logging.info(f"Found host_vars file {host_vars_path}.") - with open(host_vars_path, mode="r", encoding="utf-8") as host_vars_file: - server_vars = yaml.safe_load(host_vars_file) - logging.info(f"Loaded Vars: {server_vars}") # get name and device info server_attachment = [] for server_volume in server["volumes"]: @@ -110,18 +120,18 @@ def attached_volumes_host_vars_update(connection, server, instance): server_attachment.append({"name": volume["name"], "device": attachment["device"]}) break # add device info - attach_volumes = instance.get("volumes", []) + attach_volumes = host_vars.get("volumes", []) if attach_volumes: for attach_volume in attach_volumes: + logging.info(f"Finding device for {attach_volume['name']}.") server_volume = next((server_volume for server_volume in server_attachment if server_volume["name"] == attach_volume["name"]), None) attach_volume["device"] = server_volume.get("device") logging.debug(f"Added Configuration: Instance {server['name']} has volume {attach_volume['name']} " f"as device {attach_volume['device']} that is going to be mounted to " f"{attach_volume['mountPoint']}") - server_vars["volumes"].append(attach_volume) with open(host_vars_path, mode="w+", encoding="utf-8") as host_vars_file: - yaml.dump(server_vars, host_vars_file) + yaml.dump(host_vars, host_vars_file) logging.info(f"{host_vars_path}.lock released") @@ -150,19 +160,19 @@ def select_image(start_worker_group, connection): return image -def start_server(worker, start_worker_group, start_data): +def start_server(name, start_worker_group, start_data): try: - logging.info("Create server %s.", worker) + logging.info("Create server %s.", name) connection = connections[start_worker_group["cloud_identifier"]] # check if running - already_running_server = connection.get_server(worker) + already_running_server = connection.get_server(name) if already_running_server: logging.warning( - f"Already running server {worker} on {start_worker_group['cloud_identifier']} (will be terminated): " + f"Already running server {name} on {start_worker_group['cloud_identifier']} (will be terminated): " f"{already_running_server['name']}") - server_deleted = connection.delete_server(worker) + server_deleted = connection.delete_server(name) logging.info( - f"Server {worker} on {start_worker_group['cloud_identifier']} has been terminated ({server_deleted}). " + f"Server {name} on {start_worker_group['cloud_identifier']} has been terminated ({server_deleted}). " f"Continuing startup.") # check for userdata userdata = "" @@ -172,8 +182,9 @@ def start_server(worker, start_worker_group, start_data): userdata = userdata_file.read() # create server and ... image = select_image(start_worker_group, connection) - volumes = attach_volumes(connection, start_worker_group, worker) - server = connection.create_server(name=worker, flavor=start_worker_group["flavor"]["name"], image=image, + host_vars = get_server_vars(name) + volumes = attach_volumes(connection, host_vars, name) + server = connection.create_server(name=name, flavor=start_worker_group["flavor"]["name"], image=image, network=start_worker_group["network"], key_name=f"tempKey_bibi-{common_config['cluster_id']}", security_groups=[f"default-{common_config['cluster_id']}"], userdata=userdata, @@ -184,7 +195,7 @@ def start_server(worker, start_worker_group, start_data): connection.wait_for_server(server, auto_ip=False, timeout=600) server = connection.get_server(server["id"]) except OpenStackCloudException as exc: - logging.warning("While creating %s the OpenStackCloudException %s occurred.", worker, exc) + logging.warning("While creating %s the OpenStackCloudException %s occurred.", name, exc) server_start_data["openstack_wait_exceptions"].append(server.name) return logging.info("%s is active. Checking ssh", server.name) @@ -196,12 +207,12 @@ def start_server(worker, start_worker_group, start_data): logging.warning(f"{exc}: Couldn't connect to {server.name}.") server_start_data["connection_exceptions"].append(server.name) logging.info("Update hosts.yaml") - attached_volumes_host_vars_update(connection, server, start_worker_group) + attached_volumes_host_vars_update(connection, server, host_vars) update_hosts(server.name, server.private_v4) except OpenStackCloudException as exc: - logging.warning("While creating %s the OpenStackCloudException %s occurred. Worker ignored.", worker, exc) - server_start_data["other_openstack_exception"].append(worker) + logging.warning("While creating %s the OpenStackCloudException %s occurred. Worker ignored.", name, exc) + server_start_data["other_openstack_exception"].append(name) def check_ssh_active(private_ip, private_key="/opt/slurm/.ssh/id_ecdsa", username="ubuntu"): @@ -294,22 +305,21 @@ def _run_playbook(cmdline_args): start_server_threads = [] for worker_group in worker_groups: - for start_worker in start_workers: + for worker_name in start_workers: # start all servers that are part of the current worker group result = subprocess.run(["scontrol", "show", "hostname", worker_group["name"]], stdout=subprocess.PIPE, check=True) # get all workers in worker_type possible_workers = result.stdout.decode("utf-8").strip().split("\n") - if start_worker in possible_workers: + if worker_name in possible_workers: start_worker_thread = threading.Thread(target=start_server, - kwargs={"worker": start_worker, "start_worker_group": worker_group, + kwargs={"name": worker_name, "start_worker_group": worker_group, "start_data": server_start_data}) start_worker_thread.start() start_server_threads.append(start_worker_thread) for start_server_thread in start_server_threads: start_server_thread.join() -print("Server Started. Test Done") -exit(0) + # If no suitable server can be started: abort if len(server_start_data["available_servers"]) == 0: logging.warning("Couldn't make server available! Abort!")