Skip to content

Commit

Permalink
Add runner service, that handles runs from queue
Browse files Browse the repository at this point in the history
Includes simple shell driver for testing purposes
  • Loading branch information
teferi committed Dec 29, 2016
1 parent 51e9b5a commit ed362ec
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 3 deletions.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ flask-helpers==0.1
gunicorn==19.6.0
jsonschema>=2.0.0,!=2.5.0,<3.0.0 # MIT
six==1.10.0
schedule==0.4.2
eventlet==0.20.0
19 changes: 16 additions & 3 deletions runbook/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

CONF = {
"reader": None,
"writer": None
"writer": None,
"runner": None,
}

DEFAULT_CONF = {
Expand All @@ -50,7 +51,19 @@
"connection": [{"host": "127.0.0.1", "port": 9200}]
},
"regions": [],
}
},
"runner": {
"flask": {
"HOST": "0.0.0.0",
"PORT": 5002,
"DEBUG": False
},
"backend": {
"type": "elastic",
"connection": [{"host": "127.0.0.1", "port": 9200}]
},
"regions": [],
},
}


Expand Down Expand Up @@ -108,7 +121,7 @@ def get_config(api_type=None):
:returns: application config
:rtype: dict
"""
if api_type not in ["reader", "writer"]:
if api_type not in ["reader", "writer", "runner"]:
raise RuntimeError("Unknown api type '{}'".format(api_type))

global CONF
Expand Down
Empty file added runbook/drivers/__init__.py
Empty file.
30 changes: 30 additions & 0 deletions runbook/drivers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


class Driver(object):

interpreters = {
"bash": "/bin/bash",
"python": "/bin/python",
}

@classmethod
def initialise(cls):
raise NotImplemented

@classmethod
def run(cls, runbook, parameters):
raise NotImplemented
62 changes: 62 additions & 0 deletions runbook/drivers/shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import copy
import logging
import os
import subprocess
import tempfile

from runbook.drivers import base


class Driver(base.Driver):

@classmethod
def initialise(cls):
pass

@classmethod
def run(cls, runbook, parameters):
logging.info("Running runbook '{}' with parameters '{}'".format(
runbook, parameters))
f = tempfile.NamedTemporaryFile()
f.write(runbook["runbook"])
f.flush()

returncode = 0
interpreter = cls.interpreters.get(runbook.get("type"))
if not interpreter:
return {
"return_code": -1,
"output": "Don't know how to run '{}' type runbook".format(
runbook.get("type")),
}

env = copy.deepcopy(os.environ)
env.update(parameters)
try:
output = subprocess.check_output(
[interpreter, f.name],
stderr=subprocess.STDOUT,
env=env)
except subprocess.CalledProcessError as e:
output = e.output
returncode = e.returncode

return {
"return_code": returncode,
"output": output,
}
164 changes: 164 additions & 0 deletions runbook/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# Copyright 2016: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import copy
import datetime
import importlib
import logging
import sys
import time

import elasticsearch.exceptions
import eventlet
import schedule

from runbook import config
from runbook import storage

eventlet.monkey_patch()

API_TYPE = "runner"
CONF = config.get_config(API_TYPE)
POOL = eventlet.GreenPool()

LOG = logging.getLogger("runner")
LOG.setLevel(logging.INFO)

INNER_QUERY_WITH_PARENT = {
"has_parent": {
"inner_hits": {
"name": "parent",
"size": 1,
},
"type": "runbook",
"query": {
"match_all": {}
}
}
}

RUNS_QUERY = {
"version": True,
"query": {
"bool": {
"filter": [
INNER_QUERY_WITH_PARENT,
{"term": {"status": "scheduled"}},
]
}
}
}


def handle_hit(hit, index_name, driver):
es = storage.get_elasticsearch(API_TYPE)
try:
es.update(
index=index_name,
doc_type="run",
id=hit["_id"],
routing=hit["_routing"],
version=hit["_version"],
body={"doc": {"status": "started"}}
)
LOG.info("Handling run '{}'".format(hit['_id']))
except elasticsearch.exceptions.ConflictError:
LOG.warning("Ignoring run '{}' with id '{}'"
"It's already being handled by other thread".format(
hit["_source"], hit["_id"]))
return False

parameters = hit.get("parameters", {})
try:
runbook = hit['inner_hits']['parent']['hits']['hits'][0]["_source"]
except (IndexError, KeyError):
LOG.exception("Couldn't find runbook to run for {}".format(hit))
return False

run_result = {}
try:
run_result = driver.run(runbook, parameters)
except Exception:
LOG.exception("Got exception, when running '{}', marking "
"run as 'failed'".format(hit["_source"]))

LOG.info("Finished run '{}': '{}'".format(hit['_id'], run_result))

end_status = "finished" if run_result.get("return_code") == 0 else "failed"
# end_status = "scheduled" # FIXME

now = datetime.datetime.now()
try:
es.update(
index=index_name,
doc_type="run",
id=hit["_id"],
routing=hit["_routing"],
body={"doc": {
"status": end_status,
"output": run_result.get("output"),
"return_code": run_result.get("return_code"),
"updated_at": now,
}}
)
LOG.info("Updated run '{}' status".format(hit['_id']))
except elasticsearch.exceptions.TransportError:
LOG.exception("Got exception, when finilazing run "
"'{}'".format(hit["_source"]))
return False
return True


def job():
driver_name = CONF.get("driver", "shell")
try:
driver_module = importlib.import_module(
"runbook.drivers." + driver_name)
except ImportError:
LOG.critical("No driver named '{}'".format(driver_name))
return

driver = driver_module.Driver

es = storage.get_elasticsearch(API_TYPE)
for region in CONF["regions"]:
index_name = "ms_runbooks_{}".format(region)

query = copy.deepcopy(RUNS_QUERY)

result = es.search(index=index_name,
doc_type="run",
body=query)
for hit in result['hits']['hits']:
POOL.spawn_n(handle_hit, hit, index_name, driver)


def main():

run_every_seconds = CONF.get("run_every_seconds", 30)
schedule.every(run_every_seconds).seconds.do(job)

job()

while True:
schedule.run_pending()
time.sleep(1)


if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
LOG.error("Got KeyboardInterrupt, exiting.")

0 comments on commit ed362ec

Please sign in to comment.