From 27b8478f01c27edbd9ba8f22518df7447e83dbce Mon Sep 17 00:00:00 2001 From: eKatchko <47833533+eKatchko@users.noreply.github.com> Date: Mon, 19 Aug 2019 21:58:58 +0200 Subject: [PATCH] refactor(playbook): non-blocking subprocess. SIGTERM catched and playbook logs saved to redis --- .../VirtualMachineHandler.py | 90 +++++++++++-------- VirtualMachineService/VirtualMachineServer.py | 17 +++- VirtualMachineService/ancon/Playbook.py | 74 +++++++++++---- docker-compose.yml | 21 +++++ requirements.txt | 1 + 5 files changed, 145 insertions(+), 58 deletions(-) diff --git a/VirtualMachineService/VirtualMachineHandler.py b/VirtualMachineService/VirtualMachineHandler.py index 53d7b659..f80e3acf 100644 --- a/VirtualMachineService/VirtualMachineHandler.py +++ b/VirtualMachineService/VirtualMachineHandler.py @@ -45,27 +45,37 @@ import yaml import base64 from oslo_utils import encodeutils +import redis -osi_key_dict = dict() active_playbooks = dict() class VirtualMachineHandler(Iface): + """Handler which the PortalClient uses.""" - global osi_key_dict + global active_playbooks BUILD = "BUILD" ACTIVE = "ACTIVE" PREPARE_PLAYBOOK_BUILD = "PREPARE_PLAYBOOK_BUILD" BUILD_PLAYBOOK = "BUILD_PLAYBOOK" PLAYBOOK_FAILED = "PLAYBOOK_FAILED" + def keyboard_interrupt_handler_playbooks(self): + global active_playbooks + for k, v in active_playbooks.items(): + self.logger.info("Clearing traces of Playbook-VM for (openstack_id): {0}".format(k)) + self.delete_keypair(key_name=self.redis.hget(k, "name").decode("utf-8")) + v.stop(k) + self.delete_server(openstack_id=k) + raise SystemExit(0) + def create_connection(self): """ - Create connection to OpenStack. + Create connection to OpenStack. - :return: OpenStack connection instance - """ + :return: OpenStack connection instance + """ try: conn = connection.Connection( @@ -88,10 +98,10 @@ def create_connection(self): def __init__(self, config): """ - Initialize the handler. + Initialize the handler. - Read all config variables and creates a connection to OpenStack. - """ + Read all config variables and creates a connection to OpenStack. + """ # create logger with 'spam_application' self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.DEBUG) @@ -110,6 +120,11 @@ def __init__(self, config): # add the handlers to the logger self.logger.addHandler(self.fh) self.logger.addHandler(self.ch) + + # connection to redis. Uses a pool with 10 connections. + self.pool = redis.ConnectionPool(host='redis', port=6379) + self.redis = redis.Redis(connection_pool=self.pool, charset='utf-8') + self.USERNAME = os.environ["OS_USERNAME"] self.PASSWORD = os.environ["OS_PASSWORD"] self.PROJECT_NAME = os.environ["OS_PROJECT_NAME"] @@ -147,12 +162,12 @@ def __init__(self, config): @deprecated(version="1.0.0", reason="Not supported at the moment") def setUserPassword(self, user, password): """ - Set the password of a user. + Set the password of a user. - :param user: Elixir-Id of the user which wants to set a password - :param password: The new password. - :return: The new password - """ + :param user: Elixir-Id of the user which wants to set a password + :param password: The new password. + :return: The new password + """ if str(self.SET_PASSWORD) == "True": try: auth = v3.Password( @@ -163,8 +178,8 @@ def setUserPassword(self, user, password): user_domain_id="default", project_domain_id="default", ) - sess = session.Session(auth=auth) + sess = session.Session(auth=auth) def findUser(keystone, name): users = keystone.users.list() for user in users: @@ -633,9 +648,8 @@ def start_server_with_custom_key(self, flavor, image, servername, metadata, disk ) openstack_id = server.to_dict()["id"] - global osi_key_dict - osi_key_dict[openstack_id] = dict(key=private_key, name=servername, - status=self.PREPARE_PLAYBOOK_BUILD) + self.redis.hmset(openstack_id, dict(key=private_key, name=servername, + status=self.PREPARE_PLAYBOOK_BUILD)) return {"openstackid": openstack_id, "volumeId": volume_id, 'private_key': private_key} except Exception as e: self.delete_keypair(key_name=servername) @@ -643,35 +657,31 @@ def start_server_with_custom_key(self, flavor, image, servername, metadata, disk return {} def create_and_deploy_playbook(self, public_key, playbooks_information, openstack_id): + global active_playbooks self.logger.info(msg="Starting Playbook for (openstack_id): {0}" .format(openstack_id)) - # get ip and port for inventory fields = self.get_ip_ports(openstack_id=openstack_id) - global osi_key_dict - global active_playbooks - key_name = osi_key_dict[openstack_id]['name'] + key = self.redis.hget(openstack_id, "key").decode('utf-8') playbook = Playbook(fields["IP"], fields["PORT"], playbooks_information, - osi_key_dict[openstack_id]["key"], + key, public_key, - self.logger) + self.logger, + self.pool) + self.redis.hset(openstack_id, "status", self.BUILD_PLAYBOOK) + playbook.run_it() active_playbooks[openstack_id] = playbook - osi_key_dict[openstack_id]["status"] = self.BUILD_PLAYBOOK - status, stdout, stderr = playbook.run_it() - self.delete_keypair(key_name=key_name) - if status != 0: - osi_key_dict[openstack_id]["status"] = self.PLAYBOOK_FAILED - else: - osi_key_dict[openstack_id]["status"] = self.ACTIVE - return status + return 0 def get_playbook_logs(self, openstack_id): global active_playbooks - if openstack_id in active_playbooks: + if self.redis.exists(openstack_id) == 1 and openstack_id in active_playbooks: + key_name = self.redis.hget(openstack_id, 'name').decode('utf-8') playbook = active_playbooks.pop(openstack_id) status, stdout, stderr = playbook.get_logs() - playbook.cleanup() + playbook.cleanup(openstack_id) + self.delete_keypair(key_name=key_name) return PlaybookResult(status=status, stdout=stdout, stderr=stderr) else: return PlaybookResult(status=-2, stdout='', stderr='') @@ -754,7 +764,6 @@ def check_server_status(self, openstack_id, diskspace, volume_id): serv = server.to_dict() try: - global osi_key_dict if serv["status"] == self.ACTIVE: host = self.get_server(openstack_id).floating_ip port = self.SSH_PORT @@ -781,14 +790,19 @@ def check_server_status(self, openstack_id, diskspace, volume_id): server.status = "DESTROYED" return server - if openstack_id in osi_key_dict: - if osi_key_dict[openstack_id]["status"] == self.PREPARE_PLAYBOOK_BUILD: + if self.redis.exists(openstack_id) == 1: + global active_playbooks + if openstack_id in active_playbooks: + playbook = active_playbooks[openstack_id] + playbook.check_status(openstack_id) + status = self.redis.hget(openstack_id, "status").decode('utf-8') + if status == self.PREPARE_PLAYBOOK_BUILD: server.status = self.PREPARE_PLAYBOOK_BUILD return server - elif osi_key_dict[openstack_id]["status"] == self.BUILD_PLAYBOOK: + elif status == self.BUILD_PLAYBOOK: server.status = self.BUILD_PLAYBOOK return server - elif osi_key_dict[openstack_id]["status"] == self.PLAYBOOK_FAILED: + elif status == self.PLAYBOOK_FAILED: server.status = self.PLAYBOOK_FAILED return server else: diff --git a/VirtualMachineService/VirtualMachineServer.py b/VirtualMachineService/VirtualMachineServer.py index 11502d84..0a25f972 100644 --- a/VirtualMachineService/VirtualMachineServer.py +++ b/VirtualMachineService/VirtualMachineServer.py @@ -1,13 +1,15 @@ import os import sys +import threading + try: from VirtualMachineService.VirtualMachineService import Client, Processor except Exception: from VirtualMachineService import Client, Processor try: - from VirtualMachineService.VirtualMachineHandler import VirtualMachineHandler + from VirtualMachineService.VirtualMachineHandler import VirtualMachineHandler except Exception: from VirtualMachineHandler import VirtualMachineHandler @@ -17,6 +19,7 @@ from thrift.server import TServer import yaml import click +import signal USERNAME = 'OS_USERNAME' PASSWORD = 'OS_PASSWORD' @@ -40,10 +43,17 @@ @click.command() @click.argument('config') def startServer(config): + + def catch_shutdown(signal, frame): + click.echo("Caught SIGTERM. Shutting down. Signal: {0} Frame: {1}".format(signal, frame)) + handler.keyboard_interrupt_handler_playbooks() + click.echo("SIGTERM was handled. Exiting with Exitcode: -1.") + sys.exit(-1) + + signal.signal(signal.SIGTERM, catch_shutdown) click.echo("Start Cloud-Client-Portal Server") CONFIG_FILE = config - with open(CONFIG_FILE, 'r') as ymlfile: cfg = yaml.load(ymlfile, Loader=yaml.SafeLoader) HOST = cfg['openstack_connection']['host'] @@ -57,8 +67,9 @@ def startServer(config): tfactory = TTransport.TBufferedTransportFactory() pfactory = TBinaryProtocol.TBinaryProtocolFactory() server = TServer.TThreadPoolServer( - processor, transport, tfactory, pfactory) + processor, transport, tfactory, pfactory, daemon=True) server.setNumThreads(15) + server.serve() diff --git a/VirtualMachineService/ancon/Playbook.py b/VirtualMachineService/ancon/Playbook.py index be449f58..9a76d9fa 100644 --- a/VirtualMachineService/ancon/Playbook.py +++ b/VirtualMachineService/ancon/Playbook.py @@ -3,6 +3,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory import ruamel.yaml import subprocess +import redis BIOCONDA = "bioconda" THEIA = "theia" @@ -11,15 +12,19 @@ class Playbook(object): - def __init__(self, ip, port, playbooks_information, osi_private_key, public_key, logger): + ACTIVE = "ACTIVE" + PLAYBOOK_FAILED = "PLAYBOOK_FAILED" + + def __init__(self, ip, port, playbooks_information, osi_private_key, public_key, logger, pool): + self.redis = redis.Redis(connection_pool=pool) self.yaml_exec = ruamel.yaml.YAML() self.vars_files = [] self.tasks = [] self.logger = logger - # init return logs - self.status = -1 - self.stdout = '' - self.stderr = '' + self.process = None + self.returncode = -1 + self.stdout = "" + self.stderr = "" # init temporary directories and mandatory generic files self.ancon_dir = "/code/VirtualMachineService/ancon" # path to this directory self.playbooks_dir = self.ancon_dir + "/playbooks" # path to source playbooks @@ -28,6 +33,9 @@ def __init__(self, ip, port, playbooks_information, osi_private_key, public_key, self.private_key.write(osi_private_key) self.private_key.close() + self.log_file_stdout = NamedTemporaryFile(mode="w+", dir=self.directory.name, delete=False) + self.log_file_stderr = NamedTemporaryFile(mode="w+", dir=self.directory.name, delete=False) + # create the custom playbook and save its name self.playbook_exec_name = "generic_playbook.yml" self.copy_playbooks_and_init(playbooks_information, public_key) @@ -40,6 +48,7 @@ def __init__(self, ip, port, playbooks_information, osi_private_key, public_key, self.inventory.close() def copy_playbooks_and_init(self, playbooks_information, public_key): + # go through every wanted playbook for k, v in playbooks_information.items(): self.copy_and_init(k, v) @@ -114,18 +123,49 @@ def run_it(self): command_string = "/usr/local/bin/ansible-playbook -v -i {0} {1}/{2}"\ .format(self.inventory.name, self.directory.name, self.playbook_exec_name) command_string = shlex.split(command_string) - process = subprocess.run(command_string, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - universal_newlines=True) - self.stdout = process.stdout - if len(process.stderr) > 0: - self.stderr = process.stderr - self.status = process.returncode - return process.returncode, process.stdout, process.stderr + self.process = subprocess.Popen(command_string, + stdout=self.log_file_stdout, + stderr=self.log_file_stderr, + universal_newlines=True) + + def check_status(self, openstack_id): + done = self.process.poll() + if done is None: + self.logger.info("Playbook for (openstack_id) {0} still in progress.".format(openstack_id)) + elif done != 0: + self.logger.info("Playbook for (openstack_id) {0} has failed.".format(openstack_id)) + self.redis.hset(openstack_id, "status", self.PLAYBOOK_FAILED) + self.returncode = self.process.returncode + self.process.wait() + else: + self.logger.info("Playbook for (openstack_id) {0} is successful.".format(openstack_id)) + self.redis.hset(openstack_id, "status", self.ACTIVE) + self.returncode = self.process.returncode + self.process.wait() + return done def get_logs(self): - return self.status, self.stdout, self.stderr - - def cleanup(self): + self.log_file_stdout.seek(0, 0) + lines_stdout = self.log_file_stdout.readlines() + for line in lines_stdout: + self.stdout += line + self.log_file_stderr.seek(0, 0) + line_stderr = self.log_file_stderr.readlines() + for line in line_stderr: + self.stderr += line + return self.returncode, self.stdout, self.stderr + + def cleanup(self, openstack_id): self.directory.cleanup() + self.redis.delete(openstack_id) + + def stop(self, openstack_id): + self.process.terminate() + rc, stdout, stderr = self.get_logs() + logs_to_save = { + "returncode": rc, + "stdout": stdout, + "stderr": stderr + } + self.redis.hmset("pb_logs_{0}".format(openstack_id), logs_to_save) + self.cleanup(openstack_id) diff --git a/docker-compose.yml b/docker-compose.yml index 11334cf3..b7fda09c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -20,3 +20,24 @@ services: - ./VirtualMachineService/config/config.yml:/code/VirtualMachineService/config.yml - ./VirtualMachineService/keys/localhost/server.pem:/code/VirtualMachineService/keys/server.pem command: python3 VirtualMachineServer.py /code/VirtualMachineService/config.yml + networks: + - portal + + redis: + image: redis:5.0.5 + expose: + - "6379" + networks: + - portal + + +networks: + portal: + name: portal_default + driver: bridge + driver_opts: + com.docker.network.driver.mtu: 1440 + default: + driver: bridge + driver_opts: + com.docker.network.driver.mtu: 1440 diff --git a/requirements.txt b/requirements.txt index 7f988207..fc0f1a4e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ ansible==2.8.0 ruamel.yaml<0.16.00 paramiko==2.6.0 pyvim==2.0.24 +redis==3.3.6