Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better error handling in base consumer implementation. #479

Open
wants to merge 23 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e583ef8
Implement simple nack and remove _testing / message_processed hacks i…
hodgestar Mar 5, 2013
9962212
Re-order methods a bit for readability and hide internal method behin…
hodgestar Mar 5, 2013
84018c7
Clean up consume loop.
hodgestar Mar 5, 2013
2142fe3
Fix my common subclasses misspelling.
hodgestar Mar 5, 2013
bb981e4
Remove unnecessary returning of deferred from Consumer.start().
hodgestar Mar 5, 2013
ffceb80
Commit non-inlineCallbacks version for comparison.
hodgestar Mar 5, 2013
f0904f1
Add failing test for basic_ack.
hodgestar Mar 6, 2013
801d88c
Properly handle delivery using basic_get().
Mar 6, 2013
b709ff8
Add tests for basic_nack and basic_ack.
hodgestar Mar 6, 2013
cbbb147
Fix test_consume which was pre-populating log with the result consume…
hodgestar Mar 6, 2013
8db0561
Add test for failing consume.
hodgestar Mar 6, 2013
539d663
Remove non-inlineCallbacks version of _consume.
hodgestar Mar 6, 2013
a7f4912
Log message content when a message fails.
hodgestar Mar 6, 2013
052200b
Remove unused imports.
hodgestar Apr 11, 2013
82ec3d2
Add means for specifying custom consumer error handles.
hodgestar Apr 11, 2013
95c3cdf
Add errback support to consume helper method.
hodgestar Apr 11, 2013
90c463d
Merge branch 'develop' into feature/issue-479-better-error-handling-i…
hodgestar Apr 16, 2013
35b8bf9
Merge branch 'develop' into feature/issue-479-better-error-handling-i…
hodgestar Apr 16, 2013
15eb8e0
Add more context to consumer_error (exception raised and full AMQP me…
hodgestar Apr 17, 2013
713953c
Fix failing tests.
hodgestar Apr 17, 2013
430a458
Move exchange details into an Exchange class and set up a default dea…
hodgestar Apr 17, 2013
a5dcbb1
Add TODO for handling dead-lettering in fake_amqp.
hodgestar Apr 17, 2013
e3432aa
Merge branch 'develop' into feature/issue-479-better-error-handling-i…
hodgestar Jun 21, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions vumi/blinkenlights/heartbeat/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from twisted.internet.task import LoopingCall

from vumi.service import Publisher
from vumi.service import Publisher, Exchange
from vumi.message import Message
from vumi import log

Expand Down Expand Up @@ -38,10 +38,11 @@ class HeartBeatPublisher(Publisher):

HEARTBEAT_PERIOD_SECS = 10

exchange = Exchange(name="vumi.health",
exchange_type="direct", durable=True)

def __init__(self, gen_attrs_func):
self.routing_key = "heartbeat.inbound"
self.exchange_name = "vumi.health"
self.durable = True
self._task = None
self._gen_attrs_func = gen_attrs_func

Expand Down
10 changes: 3 additions & 7 deletions vumi/blinkenlights/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from twisted.internet.task import LoopingCall
from twisted.python import log

from vumi.service import Publisher, Consumer
from vumi.service import Publisher, Consumer, Exchange
from vumi.blinkenlights.message20110818 import MetricMessage

import time
Expand All @@ -27,10 +27,8 @@ class MetricManager(Publisher):
:param on_publish:
Function to call immediately after metrics after published.
"""
exchange_name = "vumi.metrics"
exchange_type = "direct"
exchange = Exchange("vumi.metrics", exchange_type="direct", durable=True)
routing_key = "vumi.metrics"
durable = True
auto_delete = False
delivery_mode = 2

Expand Down Expand Up @@ -272,10 +270,8 @@ class MetricsConsumer(Consumer):
aggregator (list of aggregator names) and values (a
list of timestamp and value paits).
"""
exchange_name = "vumi.metrics"
exchange_type = "direct"
exchange = Exchange("vumi.metrics", exchange_type="direct", durable=True)
routing_key = "vumi.metrics"
durable = True

def __init__(self, callback):
self.callback = callback
Expand Down
26 changes: 10 additions & 16 deletions vumi/blinkenlights/metrics_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import DatagramProtocol

from vumi.service import Consumer, Publisher, Worker
from vumi.service import Consumer, Publisher, Worker, Exchange
from vumi.blinkenlights.metrics import (MetricsConsumer, MetricManager, Count,
Metric, Timer, Aggregator)
from vumi.blinkenlights.message20110818 import MetricMessage
Expand All @@ -27,9 +27,8 @@ class AggregatedMetricConsumer(Consumer):
parameters are metric_name (str) and values (a list of
timestamp and value pairs).
"""
exchange_name = "vumi.metrics.aggregates"
exchange_type = "direct"
durable = True
exchange = Exchange("vumi.metrics.aggregates",
exchange_type="direct", durable=True)
routing_key = "vumi.metrics.aggregates"

def __init__(self, callback):
Expand All @@ -45,9 +44,8 @@ def consume_message(self, vumi_message):
class AggregatedMetricPublisher(Publisher):
"""Publishes aggregated metrics.
"""
exchange_name = "vumi.metrics.aggregates"
exchange_type = "direct"
durable = True
exchange = Exchange("vumi.metrics.aggregates",
exchange_type="direct", durable=True)
routing_key = "vumi.metrics.aggregates"

def publish_aggregate(self, metric_name, timestamp, value):
Expand All @@ -70,9 +68,8 @@ class TimeBucketConsumer(Consumer):
aggregator (list of aggregator names) and values (a
list of timestamp and value pairs).
"""
exchange_name = "vumi.metrics.buckets"
exchange_type = "direct"
durable = True
exchange = Exchange("vumi.metrics.buckets",
exchange_type="direct", durable=True)
ROUTING_KEY_TEMPLATE = "bucket.%d"

def __init__(self, bucket, callback):
Expand All @@ -97,9 +94,8 @@ class TimeBucketPublisher(Publisher):
bucket_size : int, in seconds
Size of each time bucket in seconds.
"""
exchange_name = "vumi.metrics.buckets"
exchange_type = "direct"
durable = True
exchange = Exchange("vumi.metrics.buckets",
exchange_type="direct", durable=True)
ROUTING_KEY_TEMPLATE = "bucket.%d"

def __init__(self, buckets, bucket_size):
Expand Down Expand Up @@ -281,9 +277,7 @@ def consume_metrics(self, metric_name, values):
class GraphitePublisher(Publisher):
"""Publisher for sending messages to Graphite."""

exchange_name = "graphite"
exchange_type = "topic"
durable = True
exchange = Exchange("graphite", exchange_type="topic", durable=True)
auto_delete = False
delivery_mode = 2
require_bind = False # Graphite uses a topic exchange
Expand Down
Loading