diff --git a/queue.js b/queue.js index 692279a..f85177d 100644 --- a/queue.js +++ b/queue.js @@ -278,6 +278,68 @@ Queue.prototype.receive = function(opts, callback) { }) } +Queue.prototype.changes = function(opts) { + var self = this + + if(typeof opts != 'object') + opts = {} + + var prefix = 'CQS/' + self.name + '/' + var feed = self.db.changes(opts) + var limit = opts.ConcurrencyLimit || 1e3 + var count = 0 + var vis_timeout = opts.VisibilityTimeout || self.VisibilityTimeout + + assert.ok(vis_timeout >= 0, "Visibility timeout is too low") + assert.ok(vis_timeout <= 43200, "Visibility timeout is too high") + + //TODO this is a rather ugly solution (as it consumes _all_ changes) + //follow also allows `filter` to be based on a view, which is probably much better suited + feed.filter = function(doc, req) { + return doc._id.substr(0, prefix.length) === prefix + && Date.parse(doc.visible_at) <= Date.now() + } + + feed.on('change', function(change) { + if(count++ > limit) feed.pause() + + var msg_opts = {} + lib.copy(change.doc, msg_opts, 'uppercase') + msg_opts.MessageId = change.id.substr(prefix.length) + + var msg = new message.Message(msg_opts) + msg.queue = self + msg.VisibilityTimeout = vis_timeout + msg.is_heartbeat = false + msg.mvcc = { _id: change.doc._id, _rev: change.doc._rev } + msg.receive(function(err) { + if(err) { + if(err.statusCode == 409 && err.error == 'conflict'); + else feed.emit('error', err) + reduceCount() + } else { + feed.emit('message', msg, callback) + } + + var called = false + function callback() { + if(!called){ + called = true + reduceCount() + } + } + }) + + function reduceCount() { + count -= 1 + if(feed.is_paused) feed.resume() + } + }) + + feed.follow() + + return feed +} Queue.prototype.scrub = function(opts, callback) { var self = this; diff --git a/test/tap/cqs.js b/test/tap/cqs.js index 4e9954c..56d4b10 100644 --- a/test/tap/cqs.js +++ b/test/tap/cqs.js @@ -532,3 +532,54 @@ test('Receive conflict', function(t) { } }) }) + +test('Follow changes', function(t) { + var changes = state.foo.changes() + + var unexpectedMessage = function() { + throw new Error('unexpected message') + } + + var receiveChange = function() { + changes.once('message', function(msg) { + t.equal(msg.Body, 'msg2') + msg.del(function(er) { + if(er) throw er; + changes.stop(); + t.end() + }) + }) + changes.removeListener('message', unexpectedMessage); + changes.resume(); + } + + var testPaused = function() { + changes.pause() + + state.foo.send('msg2', function(er) { + if(er) throw er; + + setTimeout(receiveChange, 100); + }) + } + + changes.once('message', function(msg) { + t.equal(msg.Body, 'msg1') + + changes.on('message', unexpectedMessage) + + msg.del(function(er) { + if(er) throw er; + + //should not receive messages from other queues + state.bar.send('noMsg', function(er){ + if(er) throw er; + testPaused(); + }) + }) + }) + + state.foo.send('msg1', function(er) { + if(er) throw er; + }) +})