Skip to content

Commit

Permalink
Add non-conformant-attempts stats
Browse files Browse the repository at this point in the history
Adds stats about the number of times the take operation returned non-conformant
in the past.
  • Loading branch information
dafortune committed Apr 19, 2019
1 parent d4393be commit 6cb8155
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 5 deletions.
51 changes: 49 additions & 2 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ class LimitDBRedis extends EventEmitter {
remaining: bucketKeyConfig.size,
reset: Math.ceil(Date.now() / 1000),
limit: bucketKeyConfig.size,
delayed: false
delayed: false,
stats: {
nonConformantAttempts: 0
}
});
}

Expand All @@ -178,7 +181,10 @@ class LimitDBRedis extends EventEmitter {
remaining,
reset: this.calculateReset(bucketKeyConfig, remaining, parseInt(results[2], 10)),
limit: bucketKeyConfig.size,
delayed: false
delayed: false,
stats: {
nonConformantAttempts: results[3]
}
});
});
}
Expand Down Expand Up @@ -302,6 +308,47 @@ class LimitDBRedis extends EventEmitter {
});
}

/**
* Get elements in the bucket.
*
* @param {getParams} params - The params for take.
* @param {function(Error, getResult)} [callback].
*/
incrementStatsCounter(params, callback) {
callback = callback || _.noop;

const valError = this.validateParams(params, 'key');
if (valError) {
return process.nextTick(callback, valError);
}

const bucket = this.buckets[params.type];
const bucketKeyConfig = this.bucketKeyConfig(bucket, params.key);

if (bucketKeyConfig.unlimited) {
return process.nextTick(callback, null, {
remaining: bucketKeyConfig.size,
reset: Math.ceil(Date.now() / 1000),
limit: bucketKeyConfig.size
});
}

const key = `${params.type}:${params.key}`;
this.redis.hincrby(key, params.count,
(err, results) => {
if (err) {
return callback(err);
}

const remaining = parseInt(results[0], 10);
return callback(null, {
remaining: Number.isInteger(remaining) ? remaining : bucketKeyConfig.size,
reset: this.calculateReset(bucketKeyConfig, remaining, parseInt(results[1], 10)),
limit: bucketKeyConfig.size
});
});
}

/**
* Resets/re-fills all keys in all buckets.
* @param {function(Error)} [callback].
Expand Down
15 changes: 12 additions & 3 deletions lib/take.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ local ttl = tonumber(ARGV[4])
local current_time = redis.call('TIME')
local current_timestamp_ms = current_time[1] * 1000 + current_time[2] / 1000

local current = redis.pcall('HMGET', KEYS[1], 'd', 'r')
local current = redis.pcall('HMGET', KEYS[1], 'd', 'r', 'nca')

if current.err ~= nil then
current = {}
Expand All @@ -26,17 +26,26 @@ elseif current[1] and tokens_per_ms == 0 then
end

local enough_tokens = new_content >= tokens_to_take
local current_conformant_attempts = current[3] or 0;
local non_conformant_attempts = 0;

if enough_tokens then
new_content = math.min(new_content - tokens_to_take, bucket_size)
else
-- HINCRBY is the natural redis command to think about for this case
-- however this approach allows to use a single "HMSET" command instead
-- HINCRBY and "HMSET" which makes the code a bit cleaner and since LUA scripts
-- runs atomically it has the same guarantees as HINCRBY
non_conformant_attempts = current_conformant_attempts + 1
end

-- https://redis.io/commands/EVAL#replicating-commands-instead-of-scripts
redis.replicate_commands()

redis.call('HMSET', KEYS[1],
'd', current_timestamp_ms,
'r', new_content)
'r', new_content,
'nca', non_conformant_attempts)
redis.call('EXPIRE', KEYS[1], ttl)

return { new_content, enough_tokens, current_timestamp_ms }
return { new_content, enough_tokens, current_timestamp_ms, current_conformant_attempts }
42 changes: 42 additions & 0 deletions test/db.tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,48 @@ describe('LimitDBRedis', () => {
done();
});
});

it('should return non-conformant-attempts stats', (done) => {
async.waterfall([
cb => async.map(_.range(10), (i, done) => {
db.take({ type: 'ip', key: '8.8.8.8' }, done);
}, cb),

(results, cb) => {
assert.ok(results.every(r => r.conformant));

results.forEach((r, i) => {
assert.equal(r.stats.nonConformantAttempts, 0);
});

cb();
},

cb => async.map(_.range(10), (i, done) => {
db.take({ type: 'ip', key: '8.8.8.8' }, done);
}, cb),

(results, cb) => {
results.forEach((r, i) => {
assert.notOk(r.conformant);
assert.equal(r.stats.nonConformantAttempts, i);
});

cb();
},

cb => db.put({ type: 'ip', key: '8.8.8.8', count: 1 }, (err) => done(err)),

cb => db.take({ type: 'ip', key: '8.8.8.8' }, cb),

(result, cb) => {
assert.notOk(result.conformant);
assert.equal(result.stats.nonConformantAttempts, 10);

cb();
}
], done);
});
});

describe('PUT', function () {
Expand Down

0 comments on commit 6cb8155

Please sign in to comment.