From cf527ffdf1f90041f9a2c06bd1459a75f96543dd Mon Sep 17 00:00:00 2001 From: Adam Butcher Date: Tue, 11 Jun 2019 08:58:09 +0100 Subject: [PATCH] core, changes: Support infinite retry for local connections and recover broken changes feed. --- lib/cradle.js | 16 ++++++++++++---- lib/cradle/database/changes.js | 16 +++++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/lib/cradle.js b/lib/cradle.js index ec8f3b1..6c68c34 100644 --- a/lib/cradle.js +++ b/lib/cradle.js @@ -26,7 +26,8 @@ cradle.options = { retries: 0, retryTimeout: 10e3, forceSave: true, - headers: {} + headers: {}, + forceReconnect: false, }; cradle.setup = function (settings) { @@ -155,13 +156,14 @@ cradle.Connection.prototype.rawRequest = function (options, callback) { options.query[k] = String(options.query[k]); } } - options.path += '?' + querystring.stringify(options.query); + options.uri = this._url(options.path + '?' + querystring.stringify(options.query)); + } + else { + options.uri = this._url(options.path); } options.headers['Connection'] = options.headers['Connection'] || 'keep-alive'; options.agent = this.agent; - options.uri = this._url(options.path); - delete options.path; return request(options, callback || function () { }); }; @@ -214,6 +216,12 @@ cradle.Connection.prototype.request = function (options, callback) { return this.rawRequest(options, function _onResponse(err, res, body) { attempts++; if (err) { + if (self.options.forceReconnect && String(err.code).startsWith('ECONN')) { + return setTimeout( + self.rawRequest.bind(self, options, _onResponse), + self.options.retryTimeout + ); + } if (self.options.retries && (!options.method || options.method.toLowerCase() === 'get' || options.body) && String(err.code).indexOf('ECONN') === 0 && attempts <= self.options.retries diff --git a/lib/cradle/database/changes.js b/lib/cradle/database/changes.js index fe0ba8f..999f86c 100644 --- a/lib/cradle/database/changes.js +++ b/lib/cradle/database/changes.js @@ -50,7 +50,21 @@ Database.prototype.changes = function (options, callback) { response.emit.apply(response, ['data'].concat(Array.prototype.slice.call(arguments))); }); - + + var self = this; + // Keep a consistent object for return to the client, even if + // this feed is restarted due to error. + feed.on('error', function (err) { + if (feed.dead && options.follow !== false) { + console.error(self.name, 'ERROR: Cradle changes feed died, restarting', err.message || err); + setTimeout(function() { + console.error(self.name, 'RECOVERY: Restarting feed that died with', err.message || err); + feed.restart(); + feed.emit('recover', err); + }, 1000); + } + }); + if (options.follow !== false) { feed.follow(); }