From e583ef879a19cafe5bee91f3d5e4bad474ce2225 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 15:53:19 +0200 Subject: [PATCH 01/20] Implement simple nack and remove _testing / message_processed hacks in consumer. --- vumi/service.py | 22 +++++++++++++++------- vumi/tests/fake_amqp.py | 31 ++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index f58d1f968..9bdd526e2 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -289,9 +289,8 @@ class Consumer(object): def start(self, channel, queue): self.channel = channel self.queue = queue - self.keep_consuming = True - self._testing = hasattr(channel, 'message_processed') self.paused = self.start_paused + self.keep_consuming = True @inlineCallbacks def read_messages(): @@ -322,8 +321,6 @@ def unpause(self): def consume(self, message): result = yield self.consume_message(self.message_class.from_json( message.content.body)) - if self._testing: - self.channel.message_processed() if result is not False: returnValue(self.ack(message)) else: @@ -331,11 +328,22 @@ def consume(self, message): 'Not acknowledging AMQ message' % result) def consume_message(self, message): - """helper method, override in implementation""" + """Fallback consume method. + + Logs the message at the `info` logging level. + + Should be overridden by sub-classes. + """ log.msg("Received message: %s" % message) - def ack(self, message): - self.channel.basic_ack(message.delivery_tag, True) + def ack(self, message, multiple=False): + """Acknowledge a message as processed.""" + self.channel.basic_ack(message.delivery_tag, multiple=multiple) + + def nack(self, message, multiple=False, requeue=True): + """Reject a message as unprocessed.""" + self.channel.basic_reject(message.delivery_tag, multiple=multiple, + requeue=requeue) @inlineCallbacks def stop(self): diff --git a/vumi/tests/fake_amqp.py b/vumi/tests/fake_amqp.py index 136d76d6b..25a4dce42 100644 --- a/vumi/tests/fake_amqp.py +++ b/vumi/tests/fake_amqp.py @@ -165,6 +165,12 @@ def basic_get(self, queue): def basic_ack(self, queue, delivery_tag): self._get_queue(queue).ack(delivery_tag) + self._message_processed() + return None + + def basic_nack(self, queue, delivery_tag): + self._get_queue(queue).nack(delivery_tag) + self._message_processed() return None def deliver_to_channels(self): @@ -176,7 +182,7 @@ def deliver_to_channels(self): self.try_deliver_to_channel(channel) # Process the sentinel "message" we added in kick_delivery(). - self.message_processed() + self._message_processed() def try_deliver_to_channel(self, channel): if not channel.deliverable(): @@ -264,7 +270,7 @@ def publish_raw(self, exchange, routing_key, data): amq_message = Content(data) return self.basic_publish(exchange, routing_key, amq_message) - def message_processed(self): + def _message_processed(self): assert self._delivering is not None self._delivering['count'] -= 1 if self._delivering['count'] <= 0: @@ -340,6 +346,15 @@ def basic_ack(self, delivery_tag, multiple): if (dtag == delivery_tag): return resp + def basic_nack(self, delivery_tag, multiple, requeue): + assert delivery_tag in [d for d, _q in self.unacked] + for dtag, queue in self.unacked[:]: + if multiple or (dtag == delivery_tag): + self.unacked.remove((dtag, queue)) + resp = self.broker.basic_nack(queue, dtag) + if (dtag == delivery_tag): + return resp + def deliverable(self): if not self.flow_active: return False @@ -359,13 +374,6 @@ def basic_get(self, queue): msg['routing_key'], dtag) return Message(mkMethod("get-empty", 72)) - def message_processed(self): - """ - Notify the broker that a message has been processed, in order - to make delivery sane. - """ - self.broker.message_processed() - class FakeAMQPExchange(object): def __init__(self, name): @@ -447,6 +455,11 @@ def put(self, exchange, routing_key, content): def ack(self, delivery_tag): self.unacked_messages.pop(delivery_tag) + def nack(self, delivery_tag): + msg = self.unacked_messages.pop(delivery_tag) + self.put(msg['exchange'], msg['routing_key'], + msg['content']) + def get_message(self): try: msg = self.messages.pop(0) From 9962212df14dd1163f19d03dd7b3e9371453d8f4 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 16:06:36 +0200 Subject: [PATCH 02/20] Re-order methods a bit for readability and hide internal method behind an underscore. --- vumi/service.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index 9bdd526e2..c0baa858d 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -301,7 +301,7 @@ def read_messages(): if isinstance(message, QueueCloseMarker): log.msg("Queue closed.") return - yield self.consume(message) + yield self._consume(message) except txamqp.queue.Closed, e: log.err("Queue has closed", e) @@ -309,16 +309,8 @@ def read_messages(): yield None returnValue(self) - def pause(self): - self.paused = True - return self.channel.channel_flow(active=False) - - def unpause(self): - self.paused = False - return self.channel.channel_flow(active=True) - @inlineCallbacks - def consume(self, message): + def _consume(self, message): result = yield self.consume_message(self.message_class.from_json( message.content.body)) if result is not False: @@ -336,6 +328,14 @@ def consume_message(self, message): """ log.msg("Received message: %s" % message) + def pause(self): + self.paused = True + return self.channel.channel_flow(active=False) + + def unpause(self): + self.paused = False + return self.channel.channel_flow(active=True) + def ack(self, message, multiple=False): """Acknowledge a message as processed.""" self.channel.basic_ack(message.delivery_tag, multiple=multiple) From 84018c79eac239526289cc7cef1ebb18d63ce397 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 16:24:08 +0200 Subject: [PATCH 03/20] Clean up consume loop. --- vumi/service.py | 56 ++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index c0baa858d..b1ddd7129 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -6,7 +6,8 @@ from twisted.python import log from twisted.application.service import MultiService from twisted.application.internet import TCPClient -from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.defer import inlineCallbacks, returnValue, succeed +from twisted.internet.task import LoopingCall from twisted.internet import protocol, reactor from twisted.web.resource import Resource import txamqp @@ -285,39 +286,31 @@ class Consumer(object): message_class = Message start_paused = False - @inlineCallbacks + _consume_loop = None + def start(self, channel, queue): + log.msg("Consumer starting...") self.channel = channel self.queue = queue self.paused = self.start_paused - self.keep_consuming = True - - @inlineCallbacks - def read_messages(): - log.msg("Consumer starting...") - try: - while self.keep_consuming: - message = yield self.queue.get() - if isinstance(message, QueueCloseMarker): - log.msg("Queue closed.") - return - yield self._consume(message) - except txamqp.queue.Closed, e: - log.err("Queue has closed", e) - - read_messages() - yield None - returnValue(self) + self._consume_loop = LoopingCall(self._consume) + self._consume_done = self._consume_loop.start(0) + return succeed(self) @inlineCallbacks - def _consume(self, message): - result = yield self.consume_message(self.message_class.from_json( - message.content.body)) - if result is not False: - returnValue(self.ack(message)) - else: - log.msg('Received %s as a return value consume_message. ' - 'Not acknowledging AMQ message' % result) + def _consume(self): + message = yield self.queue.get() + if isinstance(message, QueueCloseMarker): + log.msg("Queue closed.") + self._consume_loop.stop() + return + try: + yield self.consume_message( + self.message_class.from_json(message.content.body)) + self.ack(message) + except Exception: + log.err() + self.nack(message) def consume_message(self, message): """Fallback consume method. @@ -348,13 +341,14 @@ def nack(self, message, multiple=False, requeue=True): @inlineCallbacks def stop(self): log.msg("Consumer stopping...") - self.keep_consuming = False # This actually closes the channel on the server yield self.channel.channel_close() # This just marks the channel as closed on the client self.channel.close(None) - self.queue.put(QueueCloseMarker()) - returnValue(self.keep_consuming) + # This waits for the client to consume its current messages + if self._consume_loop is not None: + self.queue.put(QueueCloseMarker()) + yield self._consume_done class DynamicConsumer(Consumer): From 2142fe3980a6d6f64a5c77ffe5daa338603625a6 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 16:32:13 +0200 Subject: [PATCH 04/20] Fix my common subclasses misspelling. --- vumi/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vumi/service.py b/vumi/service.py index b1ddd7129..edc64fcd9 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -317,7 +317,7 @@ def consume_message(self, message): Logs the message at the `info` logging level. - Should be overridden by sub-classes. + Should be overridden by subclasses. """ log.msg("Received message: %s" % message) From bb981e490b52faea9ec6b9b5d53360d5ed1e8e90 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 16:36:56 +0200 Subject: [PATCH 05/20] Remove unnecessary returning of deferred from Consumer.start(). --- vumi/service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index edc64fcd9..c2c408db9 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -6,7 +6,7 @@ from twisted.python import log from twisted.application.service import MultiService from twisted.application.internet import TCPClient -from twisted.internet.defer import inlineCallbacks, returnValue, succeed +from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.task import LoopingCall from twisted.internet import protocol, reactor from twisted.web.resource import Resource @@ -295,7 +295,6 @@ def start(self, channel, queue): self.paused = self.start_paused self._consume_loop = LoopingCall(self._consume) self._consume_done = self._consume_loop.start(0) - return succeed(self) @inlineCallbacks def _consume(self): From ffceb800a085bd281764508ca7c0228e2b048738 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Tue, 5 Mar 2013 17:08:47 +0200 Subject: [PATCH 06/20] Commit non-inlineCallbacks version for comparison. --- vumi/service.py | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index c2c408db9..c3fadf893 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -6,9 +6,11 @@ from twisted.python import log from twisted.application.service import MultiService from twisted.application.internet import TCPClient -from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.defer import ( + inlineCallbacks, returnValue, maybeDeferred, fail) from twisted.internet.task import LoopingCall from twisted.internet import protocol, reactor +from twisted.python.failure import Failure from twisted.web.resource import Resource import txamqp from txamqp.client import TwistedDelegate @@ -299,10 +301,6 @@ def start(self, channel, queue): @inlineCallbacks def _consume(self): message = yield self.queue.get() - if isinstance(message, QueueCloseMarker): - log.msg("Queue closed.") - self._consume_loop.stop() - return try: yield self.consume_message( self.message_class.from_json(message.content.body)) @@ -311,6 +309,32 @@ def _consume(self): log.err() self.nack(message) + def _process_message(self, message): + if isinstance(message, QueueCloseMarker): + log.msg("Queue closed.") + self._consume_loop.stop() + else: + try: + message_obj = self.message_class.from_json( + message.content.body) + d = maybeDeferred(self.consume_message, message_obj) + except: + d = fail(Failure()) + d.addCallback(self._ack_message, message) + d.addErrback(self._nack_message, message) + return d + + def _ack_message(self, result, message): + self.ack(message) + + def _nack_message(self, f, message): + log.err(f) + self.nack(message) + + def _consume(self): + d = self.queue.get() + return d.addCallback(self._process_message) + def consume_message(self, message): """Fallback consume method. From f0904f1558710a5badfa92acfba252c536ecd6e4 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 13:28:56 +0200 Subject: [PATCH 07/20] Add failing test for basic_ack. --- vumi/tests/test_fake_amqp.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/vumi/tests/test_fake_amqp.py b/vumi/tests/test_fake_amqp.py index c0ed46878..2061359f4 100644 --- a/vumi/tests/test_fake_amqp.py +++ b/vumi/tests/test_fake_amqp.py @@ -178,6 +178,13 @@ def test_basic_get(self): self.assertEqual('blah', self.chan1.basic_get('q1').content.body) self.assertEqual('get-empty', self.chan1.basic_get('q1').method.name) + def test_basic_ack(self): + self.set_up_broker() + self.chan1.queue_bind('q1', 'direct', 'routing.key') + self.chan1.basic_publish('direct', 'routing.key', mkmsg('blah')) + msg = self.chan1.basic_get('q1') + self.chan1.basic_ack(msg.delivery_tag, False) + def test_consumer_wrangling(self): self.set_up_broker() self.chan1.queue_bind('q1', 'direct', 'foo') From 801d88c1e5c5cc9157690a52a57e8270c024af1f Mon Sep 17 00:00:00 2001 From: Jeremy Thurgood Date: Wed, 6 Mar 2013 14:12:46 +0200 Subject: [PATCH 08/20] Properly handle delivery using basic_get(). --- vumi/tests/fake_amqp.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/vumi/tests/fake_amqp.py b/vumi/tests/fake_amqp.py index 25a4dce42..1ee8dad6e 100644 --- a/vumi/tests/fake_amqp.py +++ b/vumi/tests/fake_amqp.py @@ -161,7 +161,15 @@ def basic_publish(self, exchange, routing_key, content): return None def basic_get(self, queue): - return self._get_queue(queue).get_message() + dtag, msg = self._get_queue(queue).get_message() + if dtag is not None: + if self._delivering is None: + self._delivering = { + 'deferred': Deferred(), + 'count': 0, + } + self._delivering['count'] += 1 + return (dtag, msg) def basic_ack(self, queue, delivery_tag): self._get_queue(queue).ack(delivery_tag) From b709ff80a04875829ea6e534333696653499f83b Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 14:34:48 +0200 Subject: [PATCH 09/20] Add tests for basic_nack and basic_ack. --- vumi/tests/fake_amqp.py | 15 ++++++++------- vumi/tests/test_fake_amqp.py | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/vumi/tests/fake_amqp.py b/vumi/tests/fake_amqp.py index 1ee8dad6e..d9fe9a661 100644 --- a/vumi/tests/fake_amqp.py +++ b/vumi/tests/fake_amqp.py @@ -176,8 +176,8 @@ def basic_ack(self, queue, delivery_tag): self._message_processed() return None - def basic_nack(self, queue, delivery_tag): - self._get_queue(queue).nack(delivery_tag) + def basic_nack(self, queue, delivery_tag, requeue): + self._get_queue(queue).nack(delivery_tag, requeue) self._message_processed() return None @@ -359,7 +359,7 @@ def basic_nack(self, delivery_tag, multiple, requeue): for dtag, queue in self.unacked[:]: if multiple or (dtag == delivery_tag): self.unacked.remove((dtag, queue)) - resp = self.broker.basic_nack(queue, dtag) + resp = self.broker.basic_nack(queue, dtag, requeue) if (dtag == delivery_tag): return resp @@ -461,12 +461,13 @@ def put(self, exchange, routing_key, content): }) def ack(self, delivery_tag): - self.unacked_messages.pop(delivery_tag) + return self.unacked_messages.pop(delivery_tag) - def nack(self, delivery_tag): + def nack(self, delivery_tag, requeue): msg = self.unacked_messages.pop(delivery_tag) - self.put(msg['exchange'], msg['routing_key'], - msg['content']) + if requeue: + self.put(msg['exchange'], msg['routing_key'], + mkContent(msg['content'])) def get_message(self): try: diff --git a/vumi/tests/test_fake_amqp.py b/vumi/tests/test_fake_amqp.py index 2061359f4..365402216 100644 --- a/vumi/tests/test_fake_amqp.py +++ b/vumi/tests/test_fake_amqp.py @@ -183,7 +183,33 @@ def test_basic_ack(self): self.chan1.queue_bind('q1', 'direct', 'routing.key') self.chan1.basic_publish('direct', 'routing.key', mkmsg('blah')) msg = self.chan1.basic_get('q1') + self.assertTrue(msg.delivery_tag in self.q1.unacked_messages) self.chan1.basic_ack(msg.delivery_tag, False) + self.assertTrue(msg.delivery_tag not in self.q1.unacked_messages) + + def test_basic_nack_with_requeue(self): + self.set_up_broker() + self.chan1.queue_bind('q1', 'direct', 'routing.key') + self.chan1.basic_publish('direct', 'routing.key', mkmsg('blah')) + msg = self.chan1.basic_get('q1') + self.assertTrue(msg.delivery_tag in self.q1.unacked_messages) + self.assertEqual(self.q1.messages, []) + self.chan1.basic_nack(msg.delivery_tag, False, requeue=True) + self.assertTrue(msg.delivery_tag not in self.q1.unacked_messages) + self.assertEqual(self.q1.messages, + [{'content': 'blah', 'routing_key': 'routing.key', + 'exchange': 'direct'}]) + + def test_basic_nack_without_requeue(self): + self.set_up_broker() + self.chan1.queue_bind('q1', 'direct', 'routing.key') + self.chan1.basic_publish('direct', 'routing.key', mkmsg('blah')) + msg = self.chan1.basic_get('q1') + self.assertTrue(msg.delivery_tag in self.q1.unacked_messages) + self.assertEqual(self.q1.messages, []) + self.chan1.basic_nack(msg.delivery_tag, False, requeue=False) + self.assertTrue(msg.delivery_tag not in self.q1.unacked_messages) + self.assertEqual(self.q1.messages, []) def test_consumer_wrangling(self): self.set_up_broker() From cbbb147c09b52deb013d3e07583eccf859b97732 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 14:59:25 +0200 Subject: [PATCH 10/20] Fix test_consume which was pre-populating log with the result consume was supposed to put in it. :/ --- vumi/tests/test_service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vumi/tests/test_service.py b/vumi/tests/test_service.py index 1fcd1a94c..c2d9f534f 100644 --- a/vumi/tests/test_service.py +++ b/vumi/tests/test_service.py @@ -14,6 +14,7 @@ def setUp(self): def tearDown(self): pass + @inlineCallbacks def test_consume(self): """The consume helper should direct all incoming messages matching the specified routing_key, queue_name & exchange to the given callback""" @@ -22,15 +23,16 @@ def test_consume(self): worker = get_stubbed_worker(Worker) # buffer to check messages consumed - log = [Message.from_json(message.content.body)] + log = [] # consume all messages on the given routing key and append # them to the log - worker.consume('test.routing.key', lambda msg: log.append(msg)) + yield worker.consume('test.routing.key', lambda msg: log.append(msg)) # if all works well then the consume method should funnel the test # message straight to the callback, the callback will apend it to the # log and we can test it. worker._amqp_client.broker.basic_publish('vumi', 'test.routing.key', message.content) + yield worker._amqp_client.broker.wait_delivery() self.assertEquals(log, [Message(key="value")]) @inlineCallbacks From 8db0561b277b37cf1e57cbd22e13a73e6f67c2a9 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 15:16:30 +0200 Subject: [PATCH 11/20] Add test for failing consume. --- vumi/service.py | 4 ++-- vumi/tests/test_service.py | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index c3fadf893..f266d4338 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -358,8 +358,8 @@ def ack(self, message, multiple=False): def nack(self, message, multiple=False, requeue=True): """Reject a message as unprocessed.""" - self.channel.basic_reject(message.delivery_tag, multiple=multiple, - requeue=requeue) + self.channel.basic_nack(message.delivery_tag, multiple=multiple, + requeue=requeue) @inlineCallbacks def stop(self): diff --git a/vumi/tests/test_service.py b/vumi/tests/test_service.py index c2d9f534f..dda2efb33 100644 --- a/vumi/tests/test_service.py +++ b/vumi/tests/test_service.py @@ -35,6 +35,31 @@ def test_consume(self): yield worker._amqp_client.broker.wait_delivery() self.assertEquals(log, [Message(key="value")]) + @inlineCallbacks + def test_failed_consume(self): + class TestError(Exception): + pass + + logs, errors = [], [] + + def raise_error(msg): + # after failing, the message is requeued and retried + if not errors: + errors.append(msg) + raise TestError() + logs.append(msg) + + message = fake_amq_message({"key": "value"}) + worker = get_stubbed_worker(Worker) + + yield worker.consume('test.routing.key', raise_error) + worker._amqp_client.broker.basic_publish('vumi', 'test.routing.key', + message.content) + yield worker._amqp_client.broker.wait_delivery() + self.assertEqual(len(self.flushLoggedErrors(TestError)), 1) + self.assertEquals(logs, [Message(key="value")]) + self.assertEquals(errors, [Message(key="value")]) + @inlineCallbacks def test_start_publisher(self): """The publisher should publish""" From 539d663bafafd668370380ef7fcf0b851e5cc44f Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 18:45:04 +0200 Subject: [PATCH 12/20] Remove non-inlineCallbacks version of _consume. --- vumi/service.py | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index f266d4338..284d84bc2 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -301,6 +301,10 @@ def start(self, channel, queue): @inlineCallbacks def _consume(self): message = yield self.queue.get() + if isinstance(message, QueueCloseMarker): + log.msg("Queue closed.") + self._consume_loop.stop() + return try: yield self.consume_message( self.message_class.from_json(message.content.body)) @@ -309,32 +313,6 @@ def _consume(self): log.err() self.nack(message) - def _process_message(self, message): - if isinstance(message, QueueCloseMarker): - log.msg("Queue closed.") - self._consume_loop.stop() - else: - try: - message_obj = self.message_class.from_json( - message.content.body) - d = maybeDeferred(self.consume_message, message_obj) - except: - d = fail(Failure()) - d.addCallback(self._ack_message, message) - d.addErrback(self._nack_message, message) - return d - - def _ack_message(self, result, message): - self.ack(message) - - def _nack_message(self, f, message): - log.err(f) - self.nack(message) - - def _consume(self): - d = self.queue.get() - return d.addCallback(self._process_message) - def consume_message(self, message): """Fallback consume method. From a7f4912a1836f5f66dd53f7f960bf90e49bf6f66 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 6 Mar 2013 18:47:47 +0200 Subject: [PATCH 13/20] Log message content when a message fails. --- vumi/service.py | 1 + vumi/tests/test_service.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index 284d84bc2..7d14e1c53 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -311,6 +311,7 @@ def _consume(self): self.ack(message) except Exception: log.err() + log.msg(message.content.body) self.nack(message) def consume_message(self, message): diff --git a/vumi/tests/test_service.py b/vumi/tests/test_service.py index dda2efb33..9085470f8 100644 --- a/vumi/tests/test_service.py +++ b/vumi/tests/test_service.py @@ -2,7 +2,8 @@ from twisted.internet.defer import inlineCallbacks from vumi.service import Worker, WorkerCreator -from vumi.tests.utils import (fake_amq_message, get_stubbed_worker) +from vumi.tests.utils import ( + fake_amq_message, get_stubbed_worker, LogCatcher) from vumi.message import Message @@ -53,12 +54,17 @@ def raise_error(msg): worker = get_stubbed_worker(Worker) yield worker.consume('test.routing.key', raise_error) - worker._amqp_client.broker.basic_publish('vumi', 'test.routing.key', - message.content) - yield worker._amqp_client.broker.wait_delivery() + + lc = LogCatcher() + with lc: + worker._amqp_client.broker.basic_publish( + 'vumi', 'test.routing.key', message.content) + yield worker._amqp_client.broker.wait_delivery() + self.assertEqual(len(self.flushLoggedErrors(TestError)), 1) self.assertEquals(logs, [Message(key="value")]) self.assertEquals(errors, [Message(key="value")]) + self.assertEqual(lc.messages(), ['{"key": "value"}']) @inlineCallbacks def test_start_publisher(self): From 052200b07196ca3ddef23c7507667151cba8fcbe Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Thu, 11 Apr 2013 17:38:50 +0200 Subject: [PATCH 14/20] Remove unused imports. --- vumi/service.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index 7d14e1c53..13626df61 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -6,11 +6,9 @@ from twisted.python import log from twisted.application.service import MultiService from twisted.application.internet import TCPClient -from twisted.internet.defer import ( - inlineCallbacks, returnValue, maybeDeferred, fail) +from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet.task import LoopingCall from twisted.internet import protocol, reactor -from twisted.python.failure import Failure from twisted.web.resource import Resource import txamqp from txamqp.client import TwistedDelegate From 82ec3d214dbccadea975b68249d25b5b3b82ce11 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Thu, 11 Apr 2013 17:42:27 +0200 Subject: [PATCH 15/20] Add means for specifying custom consumer error handles. --- vumi/service.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index 13626df61..ddad69629 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -309,7 +309,7 @@ def _consume(self): self.ack(message) except Exception: log.err() - log.msg(message.content.body) + yield self.consume_error(message.content.body) self.nack(message) def consume_message(self, message): @@ -319,7 +319,16 @@ def consume_message(self, message): Should be overridden by subclasses. """ - log.msg("Received message: %s" % message) + log.msg("Received message: %s" % (message,)) + + def consume_error(self, message_body): + """Fallback error handling method. + + Logs the message body at the `info` logging level. + + Should be overridden by subclasses. + """ + log.msg("Logging failed message: %s" % (message_body,)) def pause(self): self.paused = True @@ -352,12 +361,16 @@ def stop(self): class DynamicConsumer(Consumer): - def __init__(self, callback): + def __init__(self, callback, errback=None): self.callback = callback + self.errback = errback if errback is not None else log.msg def consume_message(self, message): return self.callback(message) + def consume_error(self, message_body): + return self.errback(message_body) + class RoutingKeyError(Exception): def __init__(self, value): From 95c3cdf5f82f6cc7044a3d2d7e4bbc258663deff Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Thu, 11 Apr 2013 17:44:12 +0200 Subject: [PATCH 16/20] Add errback support to consume helper method. --- vumi/service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index ddad69629..813c981c8 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -201,7 +201,7 @@ def stopService(self): def routing_key_to_class_name(self, routing_key): return ''.join(map(lambda s: s.capitalize(), routing_key.split('.'))) - def consume(self, routing_key, callback, queue_name=None, + def consume(self, routing_key, callback, errback=None, queue_name=None, exchange_name='vumi', exchange_type='direct', durable=True, message_class=None, paused=False): @@ -221,7 +221,7 @@ def consume(self, routing_key, callback, queue_name=None, klass = type(class_name, (DynamicConsumer,), kwargs) if message_class is not None: klass.message_class = message_class - return self.start_consumer(klass, callback) + return self.start_consumer(klass, callback, errback=errback) def start_consumer(self, consumer_class, *args, **kw): return self._amqp_client.start_consumer(consumer_class, *args, **kw) From 15eb8e0feea915d986dc6473470863cd03bf2722 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 17 Apr 2013 12:11:22 +0200 Subject: [PATCH 17/20] Add more context to consumer_error (exception raised and full AMQP message). --- vumi/service.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index 830b351f9..ff331eb4a 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -309,9 +309,9 @@ def _consume(self): yield self.consume_message( self.message_class.from_json(message.content.body)) self.ack(message) - except Exception: + except Exception as err: log.err() - yield self.consume_error(message.content.body) + yield self.consume_error(err, message) self.nack(message) def consume_message(self, message): @@ -323,14 +323,14 @@ def consume_message(self, message): """ log.msg("Received message: %s" % (message,)) - def consume_error(self, message_body): + def consume_error(self, err, message): """Fallback error handling method. Logs the message body at the `info` logging level. Should be overridden by subclasses. """ - log.msg("Logging failed message: %s" % (message_body,)) + log.msg("Logging failed message: %r" % (message.content.body,)) def pause(self): self.paused = True @@ -365,13 +365,17 @@ def stop(self): class DynamicConsumer(Consumer): def __init__(self, callback, errback=None): self.callback = callback - self.errback = errback if errback is not None else log.msg + self.errback = (errback if errback is not None + else self._default_errback) def consume_message(self, message): return self.callback(message) - def consume_error(self, message_body): - return self.errback(message_body) + def _default_errback(self, err, message): + log.msg("Logging failed message: %r" % (message.content.body,)) + + def consume_error(self, err, message): + return self.errback(err, message) class RoutingKeyError(Exception): From 713953c48124b1006428a85e24b4c47c39f39818 Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 17 Apr 2013 20:27:10 +0200 Subject: [PATCH 18/20] Fix failing tests. --- vumi/service.py | 4 ++++ vumi/tests/test_connectors.py | 4 ++-- vumi/tests/test_service.py | 3 ++- vumi/tests/test_worker.py | 6 +++--- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/vumi/service.py b/vumi/service.py index ff331eb4a..73db8db60 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -349,6 +349,10 @@ def nack(self, message, multiple=False, requeue=True): self.channel.basic_nack(message.delivery_tag, multiple=multiple, requeue=requeue) + def running(self): + """Return True if the consumer loop is running and False otherwise.""" + return self._consume_done is not None and not self._consume_done.called + @inlineCallbacks def stop(self): log.msg("Consumer stopping...") diff --git a/vumi/tests/test_connectors.py b/vumi/tests/test_connectors.py index 9dffd16ce..45563e4a8 100644 --- a/vumi/tests/test_connectors.py +++ b/vumi/tests/test_connectors.py @@ -101,9 +101,9 @@ def test_setup_raises(self): @inlineCallbacks def test_teardown(self): conn, consumer = yield self.mk_consumer() - self.assertTrue(consumer.keep_consuming) + self.assertTrue(consumer.running()) yield conn.teardown() - self.assertFalse(consumer.keep_consuming) + self.assertFalse(consumer.running()) @inlineCallbacks def test_paused(self): diff --git a/vumi/tests/test_service.py b/vumi/tests/test_service.py index 9085470f8..801ec56e6 100644 --- a/vumi/tests/test_service.py +++ b/vumi/tests/test_service.py @@ -64,7 +64,8 @@ def raise_error(msg): self.assertEqual(len(self.flushLoggedErrors(TestError)), 1) self.assertEquals(logs, [Message(key="value")]) self.assertEquals(errors, [Message(key="value")]) - self.assertEqual(lc.messages(), ['{"key": "value"}']) + self.assertEqual(lc.messages(), + ['Logging failed message: \'{"key": "value"}\'']) @inlineCallbacks def test_start_publisher(self): diff --git a/vumi/tests/test_worker.py b/vumi/tests/test_worker.py index 0458de81d..9074765d2 100644 --- a/vumi/tests/test_worker.py +++ b/vumi/tests/test_worker.py @@ -110,7 +110,7 @@ def test_teardown_connectors(self): connector = yield self.worker.setup_ri_connector('foo') yield self.worker.teardown_connectors() self.assertTrue('foo' not in self.worker.connectors) - self.assertFalse(connector._consumers['inbound'].keep_consuming) + self.assertFalse(connector._consumers['inbound'].running()) def test_setup_worker_raises(self): worker = get_stubbed_worker(BaseWorker, {}, None) # None -> dummy AMQP @@ -171,7 +171,7 @@ def test_setup_connector(self): self.assertTrue('foo' in self.worker.connectors) self.assertTrue(isinstance(connector, ReceiveInboundConnector)) # test setup happened - self.assertTrue(connector._consumers['inbound'].keep_consuming) + self.assertTrue(connector._consumers['inbound'].running()) @inlineCallbacks def test_teardown_connector(self): @@ -180,7 +180,7 @@ def test_teardown_connector(self): yield self.worker.teardown_connector('foo') self.assertFalse('foo' in self.worker.connectors) # test teardown happened - self.assertFalse(connector._consumers['inbound'].keep_consuming) + self.assertFalse(connector._consumers['inbound'].running()) @inlineCallbacks def test_setup_ri_connector(self): From 430a458c7606df8abbb973e386132830bc96c01b Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 17 Apr 2013 21:18:24 +0200 Subject: [PATCH 19/20] Move exchange details into an Exchange class and set up a default dead letter exchange. --- vumi/blinkenlights/heartbeat/publisher.py | 7 +- vumi/blinkenlights/metrics.py | 10 +-- vumi/blinkenlights/metrics_workers.py | 26 +++---- vumi/service.py | 84 +++++++++++++---------- vumi/tests/fake_amqp.py | 2 +- 5 files changed, 66 insertions(+), 63 deletions(-) diff --git a/vumi/blinkenlights/heartbeat/publisher.py b/vumi/blinkenlights/heartbeat/publisher.py index fe1826b5c..c35202cb4 100644 --- a/vumi/blinkenlights/heartbeat/publisher.py +++ b/vumi/blinkenlights/heartbeat/publisher.py @@ -2,7 +2,7 @@ from twisted.internet.task import LoopingCall -from vumi.service import Publisher +from vumi.service import Publisher, Exchange from vumi.message import Message from vumi.log import log @@ -34,10 +34,11 @@ class HeartBeatPublisher(Publisher): HEARTBEAT_PERIOD_SECS = 10 + exchange = Exchange(name="vumi.health", + exchange_type="direct", durable=True) + def __init__(self, gen_attrs_func): self.routing_key = "heartbeat.inbound" - self.exchange_name = "vumi.health" - self.durable = True self._task = None self._gen_attrs_func = gen_attrs_func diff --git a/vumi/blinkenlights/metrics.py b/vumi/blinkenlights/metrics.py index f6ddc82b6..a9d2fa94c 100644 --- a/vumi/blinkenlights/metrics.py +++ b/vumi/blinkenlights/metrics.py @@ -8,7 +8,7 @@ from twisted.internet.task import LoopingCall from twisted.python import log -from vumi.service import Publisher, Consumer +from vumi.service import Publisher, Consumer, Exchange from vumi.blinkenlights.message20110818 import MetricMessage import time @@ -27,10 +27,8 @@ class MetricManager(Publisher): :param on_publish: Function to call immediately after metrics after published. """ - exchange_name = "vumi.metrics" - exchange_type = "direct" + exchange = Exchange("vumi.metrics", exchange_type="direct", durable=True) routing_key = "vumi.metrics" - durable = True auto_delete = False delivery_mode = 2 @@ -272,10 +270,8 @@ class MetricsConsumer(Consumer): aggregator (list of aggregator names) and values (a list of timestamp and value paits). """ - exchange_name = "vumi.metrics" - exchange_type = "direct" + exchange = Exchange("vumi.metrics", exchange_type="direct", durable=True) routing_key = "vumi.metrics" - durable = True def __init__(self, callback): self.callback = callback diff --git a/vumi/blinkenlights/metrics_workers.py b/vumi/blinkenlights/metrics_workers.py index dfc7e2e57..00ec9aaa3 100644 --- a/vumi/blinkenlights/metrics_workers.py +++ b/vumi/blinkenlights/metrics_workers.py @@ -11,7 +11,7 @@ from twisted.internet.task import LoopingCall from twisted.internet.protocol import DatagramProtocol -from vumi.service import Consumer, Publisher, Worker +from vumi.service import Consumer, Publisher, Worker, Exchange from vumi.blinkenlights.metrics import (MetricsConsumer, MetricManager, Count, Metric, Timer, Aggregator) from vumi.blinkenlights.message20110818 import MetricMessage @@ -27,9 +27,8 @@ class AggregatedMetricConsumer(Consumer): parameters are metric_name (str) and values (a list of timestamp and value pairs). """ - exchange_name = "vumi.metrics.aggregates" - exchange_type = "direct" - durable = True + exchange = Exchange("vumi.metrics.aggregates", + exchange_type="direct", durable=True) routing_key = "vumi.metrics.aggregates" def __init__(self, callback): @@ -45,9 +44,8 @@ def consume_message(self, vumi_message): class AggregatedMetricPublisher(Publisher): """Publishes aggregated metrics. """ - exchange_name = "vumi.metrics.aggregates" - exchange_type = "direct" - durable = True + exchange = Exchange("vumi.metrics.aggregates", + exchange_type="direct", durable=True) routing_key = "vumi.metrics.aggregates" def publish_aggregate(self, metric_name, timestamp, value): @@ -70,9 +68,8 @@ class TimeBucketConsumer(Consumer): aggregator (list of aggregator names) and values (a list of timestamp and value pairs). """ - exchange_name = "vumi.metrics.buckets" - exchange_type = "direct" - durable = True + exchange = Exchange("vumi.metrics.buckets", + exchange_type="direct", durable=True) ROUTING_KEY_TEMPLATE = "bucket.%d" def __init__(self, bucket, callback): @@ -97,9 +94,8 @@ class TimeBucketPublisher(Publisher): bucket_size : int, in seconds Size of each time bucket in seconds. """ - exchange_name = "vumi.metrics.buckets" - exchange_type = "direct" - durable = True + exchange = Exchange("vumi.metrics.buckets", + exchange_type="direct", durable=True) ROUTING_KEY_TEMPLATE = "bucket.%d" def __init__(self, buckets, bucket_size): @@ -281,9 +277,7 @@ def consume_metrics(self, metric_name, values): class GraphitePublisher(Publisher): """Publisher for sending messages to Graphite.""" - exchange_name = "graphite" - exchange_type = "topic" - durable = True + exchange = Exchange("graphite", exchange_type="topic", durable=True) auto_delete = False delivery_mode = 2 require_bind = False # Graphite uses a topic exchange diff --git a/vumi/service.py b/vumi/service.py index 73db8db60..1d3c7f0b3 100644 --- a/vumi/service.py +++ b/vumi/service.py @@ -105,13 +105,10 @@ def get_new_channel_id(self): """ return (max(self.channels) + 1) if self.channels else 0 - def _declare_exchange(self, source, channel): - # get the details for AMQP - exchange_name = source.exchange_name - exchange_type = source.exchange_type - durable = source.durable - return channel.exchange_declare(exchange=exchange_name, - type=exchange_type, durable=durable) + def _declare_exchange(self, exchange, channel): + return channel.exchange_declare(exchange=exchange.name, + type=exchange.exchange_type, + durable=exchange.durable) @inlineCallbacks def start_consumer(self, consumer_class, *args, **kwargs): @@ -122,18 +119,24 @@ def start_consumer(self, consumer_class, *args, **kwargs): consumer.vumi_options = self.vumi_options # get the details for AMQP - exchange_name = consumer.exchange_name - durable = consumer.durable + exchange = consumer.exchange queue_name = consumer.queue_name routing_key = consumer.routing_key + arguments = {} # declare the exchange, doesn't matter if it already exists - yield self._declare_exchange(consumer, channel) + yield self._declare_exchange(consumer.exchange, channel) + if consumer.dead_letter_exchange is not None: + yield self._declare_exchange(consumer.dead_letter_exchange, + channel) + arguments["x-dead-letter-exchange"] = ( + consumer.dead_letter_exchange.name) # declare the queue - yield channel.queue_declare(queue=queue_name, durable=durable) + yield channel.queue_declare(queue=queue_name, durable=exchange.durable, + arguments=arguments) # bind it to the exchange with the routing key - yield channel.queue_bind(queue=queue_name, exchange=exchange_name, + yield channel.queue_bind(queue=queue_name, exchange=exchange.name, routing_key=routing_key) # register the consumer reply = yield channel.basic_consume(queue=queue_name) @@ -152,7 +155,7 @@ def start_publisher(self, publisher_class, *args, **kwargs): publisher = publisher_class(*args, **kwargs) publisher.vumi_options = self.vumi_options # declare the exchange, doesn't matter if it already exists - yield self._declare_exchange(publisher, channel) + yield self._declare_exchange(publisher.exchange, channel) # start! yield publisher.start(channel) # return the publisher @@ -210,13 +213,12 @@ def consume(self, routing_key, callback, errback=None, queue_name=None, # use the routing key to generate the name for the class # amq.routing.key -> AmqRoutingKey dynamic_name = self.routing_key_to_class_name(routing_key) - class_name = "%sDynamicConsumer" % str(dynamic_name) + class_name = "%sDynamicConsumer" % dynamic_name kwargs = { 'routing_key': routing_key, 'queue_name': queue_name or routing_key, - 'exchange_name': exchange_name, - 'exchange_type': exchange_type, - 'durable': durable, + 'exchange': Exchange(exchange_name, exchange_type=exchange_type, + durable=durable), 'start_paused': paused, } log.msg('Starting %s with %s' % (class_name, kwargs)) @@ -231,15 +233,16 @@ def start_consumer(self, consumer_class, *args, **kw): def publish_to(self, routing_key, exchange_name='vumi', exchange_type='direct', durable=True, delivery_mode=2): - class_name = self.routing_key_to_class_name(routing_key) - publisher_class = type("%sDynamicPublisher" % class_name, (Publisher,), - { - "routing_key": routing_key, - "exchange_name": exchange_name, - "exchange_type": exchange_type, - "durable": durable, - "delivery_mode": delivery_mode, - }) + dynamic_name = self.routing_key_to_class_name(routing_key) + class_name = "%sDynamicPublisher" % dynamic_name + kwargs = { + "routing_key": routing_key, + "exchange": Exchange(exchange_name, exchange_type=exchange_type, + durable=durable), + "delivery_mode": delivery_mode, + } + log.msg('Starting %s with %s' % (class_name, kwargs)) + publisher_class = type(class_name, (Publisher,), kwargs) return self.start_publisher(publisher_class) def start_publisher(self, publisher_class, *args, **kw): @@ -276,11 +279,20 @@ class QueueCloseMarker(object): "This is a marker for closing consumer queues." +class Exchange(object): + def __init__(self, name, exchange_type, durable): + self.name = name + self.exchange_type = exchange_type + self.durable = durable + + class Consumer(object): - exchange_name = "vumi" - exchange_type = "direct" - durable = False + exchange = Exchange(name="vumi", exchange_type="direct", durable=False) + + # set dead_letter_exchange to None to disable + dead_letter_exchange = Exchange(name="vumi.dead_letter", + exchange_type="direct", durable=True) queue_name = "queue" routing_key = "routing_key" @@ -391,11 +403,11 @@ def __str__(self): class Publisher(object): - exchange_name = "vumi" - exchange_type = "direct" + + exchange = Exchange(name="vumi", exchange_type="direct", durable=False) + routing_key = "routing_key" require_bind = True - durable = False auto_delete = False delivery_mode = 2 # save to disk @@ -424,7 +436,7 @@ def list_bindings(self): bound_routing_keys = {} for b in bindings: if (b['vhost'] == self.vumi_options['vhost'] and - b['source'] == self.exchange_name): + b['source'] == self.exchange.name): bound_routing_keys[b['routing_key']] = \ bound_routing_keys.get(b['routing_key'], []) + \ [b['destination']] @@ -439,7 +451,7 @@ def routing_key_is_bound(self, key): # too many http calls to RabbitMQ Management will be required, # and the auto-generated queues & routing_keys are unlikley to # result in errors where routing keys are unbound - if self.exchange_name[-4:].lower() == '_rpc': + if self.exchange.name[-4:].lower() == '_rpc': returnValue(True) if (len(self.bound_routing_keys) == 1 and self.bound_routing_keys.get("bindings") == "undetected"): @@ -470,11 +482,11 @@ def check_routing_key(self, routing_key, require_bind): raise RoutingKeyError("The routing_key: %s is not bound to any" " queues in vhost: %s exchange: %s" % ( routing_key, self.vumi_options['vhost'], - self.exchange_name)) + self.exchange.name)) @inlineCallbacks def publish(self, message, **kwargs): - exchange_name = kwargs.get('exchange_name') or self.exchange_name + exchange_name = kwargs.get('exchange_name') or self.exchange.name routing_key = kwargs.get('routing_key') or self.routing_key require_bind = kwargs.get('require_bind', self.require_bind) yield self.check_routing_key(routing_key, require_bind) diff --git a/vumi/tests/fake_amqp.py b/vumi/tests/fake_amqp.py index 34c35a4d8..9a05387f8 100644 --- a/vumi/tests/fake_amqp.py +++ b/vumi/tests/fake_amqp.py @@ -326,7 +326,7 @@ def basic_qos(self, _prefetch_size, prefetch_count, _global): def exchange_declare(self, exchange, type, durable=None): return self.broker.exchange_declare(exchange, type) - def queue_declare(self, queue, durable=None): + def queue_declare(self, queue, durable=None, arguments=None): return self.broker.queue_declare(queue) def queue_bind(self, queue, exchange, routing_key): From a5dcbb152d8b26e4f030294ba6e07a422cfbb0ab Mon Sep 17 00:00:00 2001 From: Simon Cross Date: Wed, 17 Apr 2013 21:24:52 +0200 Subject: [PATCH 20/20] Add TODO for handling dead-lettering in fake_amqp. --- vumi/tests/fake_amqp.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/vumi/tests/fake_amqp.py b/vumi/tests/fake_amqp.py index 9a05387f8..7a67c7963 100644 --- a/vumi/tests/fake_amqp.py +++ b/vumi/tests/fake_amqp.py @@ -327,6 +327,9 @@ def exchange_declare(self, exchange, type, durable=None): return self.broker.exchange_declare(exchange, type) def queue_declare(self, queue, durable=None, arguments=None): + # TODO: handle arguments['x-dead-letter-exchange'] by + # setting a dead-letter exchange and using it from + # basic_nack / basic_reject if it exists. return self.broker.queue_declare(queue) def queue_bind(self, queue, exchange, routing_key):