diff --git a/lib/db.js b/lib/db.js index 894e3db..aa43e07 100644 --- a/lib/db.js +++ b/lib/db.js @@ -158,7 +158,10 @@ class LimitDBRedis extends EventEmitter { remaining: bucketKeyConfig.size, reset: Math.ceil(Date.now() / 1000), limit: bucketKeyConfig.size, - delayed: false + delayed: false, + stats: { + nonConformantAttempts: 0 + } }); } @@ -178,7 +181,10 @@ class LimitDBRedis extends EventEmitter { remaining, reset: this.calculateReset(bucketKeyConfig, remaining, parseInt(results[2], 10)), limit: bucketKeyConfig.size, - delayed: false + delayed: false, + stats: { + nonConformantAttempts: results[3] + } }); }); } @@ -302,6 +308,47 @@ class LimitDBRedis extends EventEmitter { }); } + /** + * Get elements in the bucket. + * + * @param {getParams} params - The params for take. + * @param {function(Error, getResult)} [callback]. + */ + incrementStatsCounter(params, callback) { + callback = callback || _.noop; + + const valError = this.validateParams(params, 'key'); + if (valError) { + return process.nextTick(callback, valError); + } + + const bucket = this.buckets[params.type]; + const bucketKeyConfig = this.bucketKeyConfig(bucket, params.key); + + if (bucketKeyConfig.unlimited) { + return process.nextTick(callback, null, { + remaining: bucketKeyConfig.size, + reset: Math.ceil(Date.now() / 1000), + limit: bucketKeyConfig.size + }); + } + + const key = `${params.type}:${params.key}`; + this.redis.hincrby(key, params.count, + (err, results) => { + if (err) { + return callback(err); + } + + const remaining = parseInt(results[0], 10); + return callback(null, { + remaining: Number.isInteger(remaining) ? remaining : bucketKeyConfig.size, + reset: this.calculateReset(bucketKeyConfig, remaining, parseInt(results[1], 10)), + limit: bucketKeyConfig.size + }); + }); + } + /** * Resets/re-fills all keys in all buckets. * @param {function(Error)} [callback]. diff --git a/lib/take.lua b/lib/take.lua index 15ff3ab..e90e843 100644 --- a/lib/take.lua +++ b/lib/take.lua @@ -7,7 +7,7 @@ local ttl = tonumber(ARGV[4]) local current_time = redis.call('TIME') local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000 -local current = redis.pcall('HMGET', KEYS[1], 'd', 'r') +local current = redis.pcall('HMGET', KEYS[1], 'd', 'r', 'nca') if current.err ~= nil then current = {} @@ -26,9 +26,17 @@ elseif current[1] and tokens_per_ms == 0 then end local enough_tokens = new_content >= tokens_to_take +local current_conformant_attempts = current[3] or 0; +local non_conformant_attempts = 0; if enough_tokens then new_content = math.min(new_content - tokens_to_take, bucket_size) +else + -- HINCRBY is the natural redis command to think about for this case + -- however this approach allows to use a single "HMSET" command instead + -- HINCRBY and "HMSET" which makes the code a bit cleaner and since LUA scripts + -- runs atomically it has the same guarantees as HINCRBY + non_conformant_attempts = current_conformant_attempts + 1 end -- https://redis.io/commands/EVAL#replicating-commands-instead-of-scripts @@ -36,7 +44,8 @@ redis.replicate_commands() redis.call('HMSET', KEYS[1], 'd', current_timestamp_ms, - 'r', new_content) + 'r', new_content, + 'nca', non_conformant_attempts) redis.call('EXPIRE', KEYS[1], ttl) -return { new_content, enough_tokens, current_timestamp_ms } +return { new_content, enough_tokens, current_timestamp_ms, current_conformant_attempts } diff --git a/test/db.tests.js b/test/db.tests.js index 4a5e266..630e604 100644 --- a/test/db.tests.js +++ b/test/db.tests.js @@ -393,6 +393,48 @@ describe('LimitDBRedis', () => { done(); }); }); + + it('should return non-conformant-attempts stats', (done) => { + async.waterfall([ + cb => async.map(_.range(10), (i, done) => { + db.take({ type: 'ip', key: '8.8.8.8' }, done); + }, cb), + + (results, cb) => { + assert.ok(results.every(r => r.conformant)); + + results.forEach((r, i) => { + assert.equal(r.stats.nonConformantAttempts, 0); + }); + + cb(); + }, + + cb => async.map(_.range(10), (i, done) => { + db.take({ type: 'ip', key: '8.8.8.8' }, done); + }, cb), + + (results, cb) => { + results.forEach((r, i) => { + assert.notOk(r.conformant); + assert.equal(r.stats.nonConformantAttempts, i); + }); + + cb(); + }, + + cb => db.put({ type: 'ip', key: '8.8.8.8', count: 1 }, (err) => done(err)), + + cb => db.take({ type: 'ip', key: '8.8.8.8' }, cb), + + (result, cb) => { + assert.notOk(result.conformant); + assert.equal(result.stats.nonConformantAttempts, 10); + + cb(); + } + ], done); + }); }); describe('PUT', function () {