Skip to content
This repository has been archived by the owner on Nov 26, 2018. It is now read-only.

Commit

Permalink
Example using Celery for plugin queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ipmb committed Jun 25, 2015
1 parent 487c870 commit ab2ac41
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
web: manage.py runserver $WEB_PORT
plugins: manage.py run_plugins
plugins: celery -A botbot worker -l info
bot: botbot-bot -v=2 -logtostderr=true
5 changes: 5 additions & 0 deletions botbot/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Empty file.
Empty file.
18 changes: 0 additions & 18 deletions botbot/apps/plugins/management/commands/run_plugins.py

This file was deleted.

67 changes: 12 additions & 55 deletions botbot/apps/plugins/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,7 @@ class PluginRunner(object):
Calls to plugins are done via greenlets
"""

def __init__(self, use_gevent=False):
if use_gevent:
import gevent
self.gevent = gevent
def __init__(self):
self.bot_bus = redis.StrictRedis.from_url(
settings.REDIS_PLUGIN_QUEUE_URL)
self.storage = redis.StrictRedis.from_url(
Expand Down Expand Up @@ -176,32 +173,15 @@ def register(self, plugin):
getattr(self, attr.route_rule[0] + '_router').setdefault(
plugin.slug, []).append((attr.route_rule[1], attr, plugin))

def listen(self):
"""Listens for incoming messages on the Redis queue"""
while 1:
val = None
try:
val = self.bot_bus.blpop('q', 1)

# Track q length
ql = self.bot_bus.llen('q')
statsd.gauge(".".join(["plugins", "q"]), ql)

if val:
_, val = val
LOG.debug('Recieved: %s', val)
line = Line(json.loads(val), self)

# Calculate the transport latency between go and the plugins.
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
statsd.timing(".".join(["plugins", "latency"]),
delta.total_seconds() * 1000)
def process_line(self, line_json):
LOG.debug('Recieved: %s', line_json)
line = Line(json.loads(line_json), self)
# Calculate the transport latency between go and the plugins.
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
statsd.timing(".".join(["plugins", "latency"]),
delta.total_seconds() * 1000)
self.dispatch(line)

self.dispatch(line)
except Exception:
LOG.error("Line Dispatch Failed", exc_info=True, extra={
"line": val
})

def dispatch(self, line):
"""Given a line, dispatch it to the right plugins & functions."""
Expand All @@ -214,16 +194,11 @@ def dispatch(self, line):
# firehose gets everything, no rule matching
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
with statsd.timer(".".join(["plugins", plugin_slug])):
# FIXME: This will not have correct timing if go back to
# gevent.
channel_plugin = self.setup_plugin_for_channel(
plugin.__class__, line)
new_func = log_on_error(LOG, getattr(channel_plugin,
func.__name__))
if hasattr(self, 'gevent'):
self.gevent.Greenlet.spawn(new_func, line)
else:
channel_plugin.respond(new_func(line))
channel_plugin.respond(new_func(line))

# pass line to other routers
if line._is_message:
Expand Down Expand Up @@ -252,30 +227,12 @@ def check_for_plugin_route_matches(self, line, router):
if match:
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
with statsd.timer(".".join(["plugins", plugin_slug])):
# FIXME: This will not have correct timing if go back to
# gevent.
# Instantiate a plugin specific to this channel
channel_plugin = self.setup_plugin_for_channel(
plugin.__class__, line)
# get the method from the channel-specific plugin
new_func = log_on_error(LOG, getattr(channel_plugin,
func.__name__))
if hasattr(self, 'gevent'):
grnlt = self.gevent.Greenlet(new_func, line,
**match.groupdict())
grnlt.link_value(channel_plugin.greenlet_respond)
grnlt.start()
else:
channel_plugin.respond(new_func(line,
**match.groupdict()))


def start_plugins(*args, **kwargs):
"""
Used by the management command to start-up plugin listener
and register the plugins.
"""
LOG.info('Starting plugins. Gevent=%s', kwargs['use_gevent'])
app = PluginRunner(**kwargs)
app.register_all_plugins()
app.listen()
channel_plugin.respond(new_func(line,
**match.groupdict()))
14 changes: 14 additions & 0 deletions botbot/apps/plugins/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from botbot.celery import app
from .runner import PluginRunner


runner = PluginRunner()
runner.register_all_plugins()

@app.task(bind=True)
def route_line(self, line_json):
try:
runner.process_line(line_json)
# For any error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
16 changes: 16 additions & 0 deletions botbot/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import absolute_import

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'botbot.settings')

from django.conf import settings

app = Celery('botbot')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
4 changes: 3 additions & 1 deletion botbot/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@
# Third party app settings
# ==============================================================================

# SOUTH_DATABASE_ADAPTERS = {'default': 'south.db.postgresql_psycopg2'}
CELERY_TASK_SERIALIZER='json'
CELERY_ACCEPT_CONTENT=['json']
BROKER_URL = REDIS_PLUGIN_QUEUE_URL

SOCIAL_AUTH_USER_MODEL = AUTH_USER_MODEL
SOCIAL_AUTH_PROTECTED_USER_FIELDS = ['email']
Expand Down
5 changes: 0 additions & 5 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
import sys

if __name__ == "__main__":
if (len(sys.argv) > 1 and
'run_plugins' in sys.argv and '--with-gevent' in sys.argv):
# import gevent as soon as possible
from gevent import monkey; monkey.patch_all()
from psycogreen.gevent import patch_psycopg; patch_psycopg()

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "botbot.settings")
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
django==1.7.2
celery==3.1.18

pytz
psycopg2==2.5.2
Expand Down

0 comments on commit ab2ac41

Please sign in to comment.