From 611b5ed90651ee3ac5e12b77fd60e0d8de8fbb5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Bo=CC=88hm?= Date: Sat, 2 Nov 2013 17:12:58 +0100 Subject: [PATCH] added Queue#changes now based on latest master --- queue.js | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/queue.js b/queue.js index 692279a..f85177d 100644 --- a/queue.js +++ b/queue.js @@ -278,6 +278,68 @@ Queue.prototype.receive = function(opts, callback) { }) } +Queue.prototype.changes = function(opts) { + var self = this + + if(typeof opts != 'object') + opts = {} + + var prefix = 'CQS/' + self.name + '/' + var feed = self.db.changes(opts) + var limit = opts.ConcurrencyLimit || 1e3 + var count = 0 + var vis_timeout = opts.VisibilityTimeout || self.VisibilityTimeout + + assert.ok(vis_timeout >= 0, "Visibility timeout is too low") + assert.ok(vis_timeout <= 43200, "Visibility timeout is too high") + + //TODO this is a rather ugly solution (as it consumes _all_ changes) + //follow also allows `filter` to be based on a view, which is probably much better suited + feed.filter = function(doc, req) { + return doc._id.substr(0, prefix.length) === prefix + && Date.parse(doc.visible_at) <= Date.now() + } + + feed.on('change', function(change) { + if(count++ > limit) feed.pause() + + var msg_opts = {} + lib.copy(change.doc, msg_opts, 'uppercase') + msg_opts.MessageId = change.id.substr(prefix.length) + + var msg = new message.Message(msg_opts) + msg.queue = self + msg.VisibilityTimeout = vis_timeout + msg.is_heartbeat = false + msg.mvcc = { _id: change.doc._id, _rev: change.doc._rev } + msg.receive(function(err) { + if(err) { + if(err.statusCode == 409 && err.error == 'conflict'); + else feed.emit('error', err) + reduceCount() + } else { + feed.emit('message', msg, callback) + } + + var called = false + function callback() { + if(!called){ + called = true + reduceCount() + } + } + }) + + function reduceCount() { + count -= 1 + if(feed.is_paused) feed.resume() + } + }) + + feed.follow() + + return feed +} Queue.prototype.scrub = function(opts, callback) { var self = this;