From ed362ec764e274373e4aa3cb4425a0868edaa47b Mon Sep 17 00:00:00 2001 From: Kirill Zaitsev Date: Thu, 29 Dec 2016 16:17:13 +0300 Subject: [PATCH] Add runner service, that handles runs from queue Includes simple shell driver for testing purposes --- requirements.txt | 2 + runbook/config.py | 19 ++++- runbook/drivers/__init__.py | 0 runbook/drivers/base.py | 30 +++++++ runbook/drivers/shell.py | 62 ++++++++++++++ runbook/runner.py | 164 ++++++++++++++++++++++++++++++++++++ 6 files changed, 274 insertions(+), 3 deletions(-) create mode 100644 runbook/drivers/__init__.py create mode 100644 runbook/drivers/base.py create mode 100644 runbook/drivers/shell.py create mode 100644 runbook/runner.py diff --git a/requirements.txt b/requirements.txt index 61b4099..270d822 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,5 @@ flask-helpers==0.1 gunicorn==19.6.0 jsonschema>=2.0.0,!=2.5.0,<3.0.0 # MIT six==1.10.0 +schedule==0.4.2 +eventlet==0.20.0 diff --git a/runbook/config.py b/runbook/config.py index 8c40db9..9d26b73 100644 --- a/runbook/config.py +++ b/runbook/config.py @@ -23,7 +23,8 @@ CONF = { "reader": None, - "writer": None + "writer": None, + "runner": None, } DEFAULT_CONF = { @@ -50,7 +51,19 @@ "connection": [{"host": "127.0.0.1", "port": 9200}] }, "regions": [], - } + }, + "runner": { + "flask": { + "HOST": "0.0.0.0", + "PORT": 5002, + "DEBUG": False + }, + "backend": { + "type": "elastic", + "connection": [{"host": "127.0.0.1", "port": 9200}] + }, + "regions": [], + }, } @@ -108,7 +121,7 @@ def get_config(api_type=None): :returns: application config :rtype: dict """ - if api_type not in ["reader", "writer"]: + if api_type not in ["reader", "writer", "runner"]: raise RuntimeError("Unknown api type '{}'".format(api_type)) global CONF diff --git a/runbook/drivers/__init__.py b/runbook/drivers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/runbook/drivers/base.py b/runbook/drivers/base.py new file mode 100644 index 0000000..2b79e8d --- /dev/null +++ b/runbook/drivers/base.py @@ -0,0 +1,30 @@ +# Copyright 2016: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class Driver(object): + + interpreters = { + "bash": "/bin/bash", + "python": "/bin/python", + } + + @classmethod + def initialise(cls): + raise NotImplemented + + @classmethod + def run(cls, runbook, parameters): + raise NotImplemented diff --git a/runbook/drivers/shell.py b/runbook/drivers/shell.py new file mode 100644 index 0000000..1d9b8b1 --- /dev/null +++ b/runbook/drivers/shell.py @@ -0,0 +1,62 @@ +# Copyright 2016: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging +import os +import subprocess +import tempfile + +from runbook.drivers import base + + +class Driver(base.Driver): + + @classmethod + def initialise(cls): + pass + + @classmethod + def run(cls, runbook, parameters): + logging.info("Running runbook '{}' with parameters '{}'".format( + runbook, parameters)) + f = tempfile.NamedTemporaryFile() + f.write(runbook["runbook"]) + f.flush() + + returncode = 0 + interpreter = cls.interpreters.get(runbook.get("type")) + if not interpreter: + return { + "return_code": -1, + "output": "Don't know how to run '{}' type runbook".format( + runbook.get("type")), + } + + env = copy.deepcopy(os.environ) + env.update(parameters) + try: + output = subprocess.check_output( + [interpreter, f.name], + stderr=subprocess.STDOUT, + env=env) + except subprocess.CalledProcessError as e: + output = e.output + returncode = e.returncode + + return { + "return_code": returncode, + "output": output, + } diff --git a/runbook/runner.py b/runbook/runner.py new file mode 100644 index 0000000..c986a2f --- /dev/null +++ b/runbook/runner.py @@ -0,0 +1,164 @@ +# Copyright 2016: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import datetime +import importlib +import logging +import sys +import time + +import elasticsearch.exceptions +import eventlet +import schedule + +from runbook import config +from runbook import storage + +eventlet.monkey_patch() + +API_TYPE = "runner" +CONF = config.get_config(API_TYPE) +POOL = eventlet.GreenPool() + +LOG = logging.getLogger("runner") +LOG.setLevel(logging.INFO) + +INNER_QUERY_WITH_PARENT = { + "has_parent": { + "inner_hits": { + "name": "parent", + "size": 1, + }, + "type": "runbook", + "query": { + "match_all": {} + } + } +} + +RUNS_QUERY = { + "version": True, + "query": { + "bool": { + "filter": [ + INNER_QUERY_WITH_PARENT, + {"term": {"status": "scheduled"}}, + ] + } + } +} + + +def handle_hit(hit, index_name, driver): + es = storage.get_elasticsearch(API_TYPE) + try: + es.update( + index=index_name, + doc_type="run", + id=hit["_id"], + routing=hit["_routing"], + version=hit["_version"], + body={"doc": {"status": "started"}} + ) + LOG.info("Handling run '{}'".format(hit['_id'])) + except elasticsearch.exceptions.ConflictError: + LOG.warning("Ignoring run '{}' with id '{}'" + "It's already being handled by other thread".format( + hit["_source"], hit["_id"])) + return False + + parameters = hit.get("parameters", {}) + try: + runbook = hit['inner_hits']['parent']['hits']['hits'][0]["_source"] + except (IndexError, KeyError): + LOG.exception("Couldn't find runbook to run for {}".format(hit)) + return False + + run_result = {} + try: + run_result = driver.run(runbook, parameters) + except Exception: + LOG.exception("Got exception, when running '{}', marking " + "run as 'failed'".format(hit["_source"])) + + LOG.info("Finished run '{}': '{}'".format(hit['_id'], run_result)) + + end_status = "finished" if run_result.get("return_code") == 0 else "failed" + # end_status = "scheduled" # FIXME + + now = datetime.datetime.now() + try: + es.update( + index=index_name, + doc_type="run", + id=hit["_id"], + routing=hit["_routing"], + body={"doc": { + "status": end_status, + "output": run_result.get("output"), + "return_code": run_result.get("return_code"), + "updated_at": now, + }} + ) + LOG.info("Updated run '{}' status".format(hit['_id'])) + except elasticsearch.exceptions.TransportError: + LOG.exception("Got exception, when finilazing run " + "'{}'".format(hit["_source"])) + return False + return True + + +def job(): + driver_name = CONF.get("driver", "shell") + try: + driver_module = importlib.import_module( + "runbook.drivers." + driver_name) + except ImportError: + LOG.critical("No driver named '{}'".format(driver_name)) + return + + driver = driver_module.Driver + + es = storage.get_elasticsearch(API_TYPE) + for region in CONF["regions"]: + index_name = "ms_runbooks_{}".format(region) + + query = copy.deepcopy(RUNS_QUERY) + + result = es.search(index=index_name, + doc_type="run", + body=query) + for hit in result['hits']['hits']: + POOL.spawn_n(handle_hit, hit, index_name, driver) + + +def main(): + + run_every_seconds = CONF.get("run_every_seconds", 30) + schedule.every(run_every_seconds).seconds.do(job) + + job() + + while True: + schedule.run_pending() + time.sleep(1) + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + LOG.error("Got KeyboardInterrupt, exiting.")