Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new cache mode 'follow' #291

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
20 changes: 14 additions & 6 deletions lib/cradle.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ cradle.options = {
retries: 0,
retryTimeout: 10e3,
forceSave: true,
headers: {}
headers: {},
forceReconnect: false,
};

cradle.setup = function (settings) {
Expand Down Expand Up @@ -155,13 +156,14 @@ cradle.Connection.prototype.rawRequest = function (options, callback) {
options.query[k] = String(options.query[k]);
}
}
options.path += '?' + querystring.stringify(options.query);
options.uri = this._url(options.path + '?' + querystring.stringify(options.query));
}
else {
options.uri = this._url(options.path);
}

options.headers['Connection'] = options.headers['Connection'] || 'keep-alive';
options.agent = this.agent;
options.uri = this._url(options.path);
delete options.path;

return request(options, callback || function () { });
};
Expand Down Expand Up @@ -214,6 +216,12 @@ cradle.Connection.prototype.request = function (options, callback) {
return this.rawRequest(options, function _onResponse(err, res, body) {
attempts++;
if (err) {
if (self.options.forceReconnect && String(err.code).startsWith('ECONN')) {
return setTimeout(
self.rawRequest.bind(self, options, _onResponse),
self.options.retryTimeout
);
}
if (self.options.retries &&
(!options.method || options.method.toLowerCase() === 'get' || options.body) &&
String(err.code).indexOf('ECONN') === 0 && attempts <= self.options.retries
Expand Down Expand Up @@ -253,8 +261,8 @@ cradle.Connection.prototype.request = function (options, callback) {
// We return an object with database functions,
// closing around the `name` argument.
//
cradle.Connection.prototype.database = function (name) {
return new cradle.Database(name, this)
cradle.Connection.prototype.database = function (name, opts) {
return new cradle.Database(name, this, opts)
};

//
Expand Down
16 changes: 15 additions & 1 deletion lib/cradle/database/changes.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,21 @@ Database.prototype.changes = function (options, callback) {

response.emit.apply(response, ['data'].concat(Array.prototype.slice.call(arguments)));
});


var self = this;
// Keep a consistent object for return to the client, even if
// this feed is restarted due to error.
feed.on('error', function (err) {
if (feed.dead && options.follow !== false) {
console.error(self.name, 'ERROR: Cradle changes feed died, restarting', err.message || err);
setTimeout(function() {
console.error(self.name, 'RECOVERY: Restarting feed that died with', err.message || err);
feed.restart();
feed.emit('recover', err);
}, 1000);
}
});

if (options.follow !== false) {
feed.follow();
}
Expand Down
67 changes: 58 additions & 9 deletions lib/cradle/database/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,60 @@ var querystring = require('querystring'),
Args = require('vargs').Constructor,
cradle = require('../../cradle');

var Database = exports.Database = function (name, connection) {
var Database = exports.Database = function (name, connection, opts) {
this.connection = connection;
this.name = encodeURIComponent(name);
this.cache = new (cradle.Cache)(connection.options);
this.opts = {...connection.options};
if (opts && opts.disableCache) {
this.opts.cache = false;
this.opts.cacheSize = 0;
}

// Workaround https://issues.apache.org/jira/browse/COUCHDB-1415 by default
this.workaroundBug1415 = true;

this.cache = new (cradle.Cache)(this.opts);
this.cacheFeed = null;
if (this.opts.cache === 'follow') {
var self = this;
this.exists(function(err, result) {
if (result === true)
self.configureCacheFeed();
});
}
};

Database.prototype.configureCacheFeed = function () {
if (this.cacheFeed) {
this.cacheFeed.stop();
this.cacheFeed = null;
}
// For any entry already in the cache, update it if it changes
// remotely.
if (this.opts.cache === 'follow') {
var self = this;
this.changes({descending: true, limit: 0}, function (err, list) {
var lastSeq = 0;
if (list && 'last_seq' in list)
lastSeq = list.last_seq;
self.cacheFeed = self.changes({ since: lastSeq, include_docs: true });
self.cacheFeed.on('change', function (change) {
var id = change["id"];
if (id && 'doc' in change && self.cache.has(id))
self.cache.save(id, change["doc"]);
});
});
}
};

// A wrapper around `Connection.request`,
// which prepends the database name.
Database.prototype.query = function (options, callback) {

// XXX: Workaround https://issues.apache.org/jira/browse/COUCHDB-1415
if (options.body && options.method === 'POST' && this.workaroundBug1415)
options.body.$ts = Date.now();

options.path = [this.name, options.path].filter(Boolean).join('/');
return this.connection.request(options, callback);
};
Expand Down Expand Up @@ -39,7 +84,11 @@ Database.prototype.info = function (callback) {
};

Database.prototype.create = function (callback) {
this.query({ method: 'PUT' }, callback);
var self = this;
this.query({ method: 'PUT' }, function () {
self.configureCacheFeed();
callback.apply(this, arguments);
});
};

// Destroys a database with 'DELETE'
Expand All @@ -48,12 +97,12 @@ Database.prototype.create = function (callback) {
Database.prototype.destroy = function (callback) {
if (arguments.length > 1) {
throw new(Error)("destroy() doesn't take any additional arguments");
}
}

this.query({
method: 'DELETE',
path: '/',
}, callback);
method: 'DELETE',
path: '/',
}, callback);
};

//
Expand All @@ -62,4 +111,4 @@ Database.prototype.destroy = function (callback) {
require('./attachments');
require('./changes');
require('./documents');
require('./views');
require('./views');