Skip to content

Commit

Permalink
improved readability fixed minor bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
XaverStiensmeier committed Oct 17, 2024
1 parent 5e6a25a commit 3f488a5
Showing 1 changed file with 58 additions and 48 deletions.
106 changes: 58 additions & 48 deletions resources/playbook/roles/bibigrid/files/slurm/create_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,25 @@
import yaml
from openstack.exceptions import OpenStackCloudException

print("Test")


class ImageNotFoundException(Exception):
""" Image not found exception"""


LOGGER_FORMAT = "%(asctime)s [%(levelname)s] %(message)s"
# Create a StreamHandler to log to stdout
console_handler = logging.StreamHandler(sys.stdout)

# Set the logging format for the StreamHandler
console_handler.setFormatter(logging.Formatter(LOGGER_FORMAT))

# Set up the logger with the StreamHandler
logging.basicConfig(level=logging.INFO, handlers=[console_handler])
logging.basicConfig(format=LOGGER_FORMAT, filename="/var/log/slurm/create_server.log", level=logging.INFO)
# For Debugging
# console_handler = logging.StreamHandler(sys.stdout)
# console_handler.setFormatter(logging.Formatter(LOGGER_FORMAT))
# logging.basicConfig(level=logging.INFO, handlers=[console_handler])

HOSTS_FILE_PATH = "/opt/playbook/vars/hosts.yaml"

logging.info("create_server.py started")
start_time = time.time()

if len(sys.argv) < 2:
logging.warning("usage: $0 instance1_name[,instance2_name,...]")
logging.warning("Not enough arguments!")
logging.info("Your input %s with length %s", sys.argv, len(sys.argv))
sys.exit(1)
start_workers = sys.argv[1].split("\n")
Expand Down Expand Up @@ -76,31 +71,46 @@ class ImageNotFoundException(Exception):
connections[cloud] = os_client_config.make_sdk(cloud=cloud, volume_api_version="3")


def attach_volumes(provider, instance, name):
def get_server_vars(name):
# loading server_vars
host_vars_path = f"/opt/playbook/host_vars/{name}.yaml"
server_vars = {"volumes": []}
if os.path.isfile(host_vars_path):
logging.info(f"Found host_vars file {host_vars_path}.")
with open(host_vars_path, mode="r", encoding="utf-8") as host_vars_file:
server_vars = yaml.safe_load(host_vars_file)
logging.info(f"Loaded Vars: {server_vars}")
else:
logging.info(f"No host vars exist (group vars still apply). Using {server_vars}")
return server_vars


def attach_volumes(provider, host_vars, name):
logging.info("Creating volumes ...")
attach_volumes = host_vars.get('volumes', [])
volumes = []
logging.info(f"Instance Volumes {instance.get('volumes', [])}")
for i, attach_volume in enumerate(instance.get("volumes", [])):
logging.info(f"{i}: {attach_volume}")
volume_name = f"{name}-{i}"
logging.info(f"Creating volume {volume_name}")
volume = provider.create_volume(size=attach_volume.get("size", 50), name=volume_name)
attach_volume["name"] = volume_name
volumes.append(volume)
host_vars_path = f"/opt/playbook/host_vars/{name}.yaml"

with FileLock(f"{host_vars_path}.lock"):
logging.info(f"Instance Volumes {attach_volumes}")
for i, attach_volume in enumerate(attach_volumes):
logging.info(f"{i}: {attach_volume}")
volume_name = f"{name}-{i}"
logging.info(f"Creating volume {volume_name}")
volume = provider.create_volume(size=attach_volume.get("size", 50), name=volume_name)
attach_volume["name"] = volume_name
volumes.append(volume)
with open(host_vars_path, mode="w+", encoding="utf-8") as host_vars_file:
yaml.dump(host_vars, host_vars_file)
return volumes


def attached_volumes_host_vars_update(connection, server, instance):
def attached_volumes_host_vars_update(connection, server, host_vars):
logging.info("Updating host vars volume info")
host_vars_path = f"/opt/playbook/host_vars/{server['name']}.yaml"

with FileLock(f"{host_vars_path}.lock"):
logging.info(f"{host_vars_path}.lock acquired")
# loading server_vars
server_vars = {"volumes": []}
if os.path.isfile(host_vars_path):
logging.info(f"Found host_vars file {host_vars_path}.")
with open(host_vars_path, mode="r", encoding="utf-8") as host_vars_file:
server_vars = yaml.safe_load(host_vars_file)
logging.info(f"Loaded Vars: {server_vars}")
# get name and device info
server_attachment = []
for server_volume in server["volumes"]:
Expand All @@ -110,18 +120,18 @@ def attached_volumes_host_vars_update(connection, server, instance):
server_attachment.append({"name": volume["name"], "device": attachment["device"]})
break
# add device info
attach_volumes = instance.get("volumes", [])
attach_volumes = host_vars.get("volumes", [])
if attach_volumes:
for attach_volume in attach_volumes:
logging.info(f"Finding device for {attach_volume['name']}.")
server_volume = next((server_volume for server_volume in server_attachment if
server_volume["name"] == attach_volume["name"]), None)
attach_volume["device"] = server_volume.get("device")
logging.debug(f"Added Configuration: Instance {server['name']} has volume {attach_volume['name']} "
f"as device {attach_volume['device']} that is going to be mounted to "
f"{attach_volume['mountPoint']}")
server_vars["volumes"].append(attach_volume)
with open(host_vars_path, mode="w+", encoding="utf-8") as host_vars_file:
yaml.dump(server_vars, host_vars_file)
yaml.dump(host_vars, host_vars_file)
logging.info(f"{host_vars_path}.lock released")


Expand Down Expand Up @@ -150,19 +160,19 @@ def select_image(start_worker_group, connection):
return image


def start_server(worker, start_worker_group, start_data):
def start_server(name, start_worker_group, start_data):
try:
logging.info("Create server %s.", worker)
logging.info("Create server %s.", name)
connection = connections[start_worker_group["cloud_identifier"]]
# check if running
already_running_server = connection.get_server(worker)
already_running_server = connection.get_server(name)
if already_running_server:
logging.warning(
f"Already running server {worker} on {start_worker_group['cloud_identifier']} (will be terminated): "
f"Already running server {name} on {start_worker_group['cloud_identifier']} (will be terminated): "
f"{already_running_server['name']}")
server_deleted = connection.delete_server(worker)
server_deleted = connection.delete_server(name)
logging.info(
f"Server {worker} on {start_worker_group['cloud_identifier']} has been terminated ({server_deleted}). "
f"Server {name} on {start_worker_group['cloud_identifier']} has been terminated ({server_deleted}). "
f"Continuing startup.")
# check for userdata
userdata = ""
Expand All @@ -172,8 +182,9 @@ def start_server(worker, start_worker_group, start_data):
userdata = userdata_file.read()
# create server and ...
image = select_image(start_worker_group, connection)
volumes = attach_volumes(connection, start_worker_group, worker)
server = connection.create_server(name=worker, flavor=start_worker_group["flavor"]["name"], image=image,
host_vars = get_server_vars(name)
volumes = attach_volumes(connection, host_vars, name)
server = connection.create_server(name=name, flavor=start_worker_group["flavor"]["name"], image=image,
network=start_worker_group["network"],
key_name=f"tempKey_bibi-{common_config['cluster_id']}",
security_groups=[f"default-{common_config['cluster_id']}"], userdata=userdata,
Expand All @@ -184,7 +195,7 @@ def start_server(worker, start_worker_group, start_data):
connection.wait_for_server(server, auto_ip=False, timeout=600)
server = connection.get_server(server["id"])
except OpenStackCloudException as exc:
logging.warning("While creating %s the OpenStackCloudException %s occurred.", worker, exc)
logging.warning("While creating %s the OpenStackCloudException %s occurred.", name, exc)
server_start_data["openstack_wait_exceptions"].append(server.name)
return
logging.info("%s is active. Checking ssh", server.name)
Expand All @@ -196,12 +207,12 @@ def start_server(worker, start_worker_group, start_data):
logging.warning(f"{exc}: Couldn't connect to {server.name}.")
server_start_data["connection_exceptions"].append(server.name)
logging.info("Update hosts.yaml")
attached_volumes_host_vars_update(connection, server, start_worker_group)
attached_volumes_host_vars_update(connection, server, host_vars)
update_hosts(server.name, server.private_v4)

except OpenStackCloudException as exc:
logging.warning("While creating %s the OpenStackCloudException %s occurred. Worker ignored.", worker, exc)
server_start_data["other_openstack_exception"].append(worker)
logging.warning("While creating %s the OpenStackCloudException %s occurred. Worker ignored.", name, exc)
server_start_data["other_openstack_exception"].append(name)


def check_ssh_active(private_ip, private_key="/opt/slurm/.ssh/id_ecdsa", username="ubuntu"):
Expand Down Expand Up @@ -294,22 +305,21 @@ def _run_playbook(cmdline_args):

start_server_threads = []
for worker_group in worker_groups:
for start_worker in start_workers:
for worker_name in start_workers:
# start all servers that are part of the current worker group
result = subprocess.run(["scontrol", "show", "hostname", worker_group["name"]], stdout=subprocess.PIPE,
check=True) # get all workers in worker_type
possible_workers = result.stdout.decode("utf-8").strip().split("\n")
if start_worker in possible_workers:
if worker_name in possible_workers:
start_worker_thread = threading.Thread(target=start_server,
kwargs={"worker": start_worker, "start_worker_group": worker_group,
kwargs={"name": worker_name, "start_worker_group": worker_group,
"start_data": server_start_data})
start_worker_thread.start()
start_server_threads.append(start_worker_thread)

for start_server_thread in start_server_threads:
start_server_thread.join()
print("Server Started. Test Done")
exit(0)

# If no suitable server can be started: abort
if len(server_start_data["available_servers"]) == 0:
logging.warning("Couldn't make server available! Abort!")
Expand Down

0 comments on commit 3f488a5

Please sign in to comment.