Skip to content

Commit

Permalink
Merge pull request #17 from jonathansamines/feature/fix-reconnections
Browse files Browse the repository at this point in the history
Fix graphite client reconnections on Node 16 and above
  • Loading branch information
Serphentas authored Jul 13, 2022
2 parents 6a59aa6 + 7700021 commit f513603
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ node_js:
- 6
- 8
- 10
- 12
- 14
- 16
2 changes: 1 addition & 1 deletion lib/CarbonClient.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
var LazySocket = require('lazy-socket');
var LazySocket = require('./LazySocket');
var url = require('url');

module.exports = CarbonClient;
Expand Down
85 changes: 85 additions & 0 deletions lib/LazySocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
var net = require('net');

module.exports = LazySocket;
function LazySocket(properties) {
properties = properties || {};

this.port = properties.port;
this.host = properties.host;

this._socket = null;
this._closed = false;
this._callbacks = [];
}

LazySocket.createConnection = function(port, host) {
var socket = new this({port: port, host: host});
return socket;
};

LazySocket.prototype.write = function(/* data, encoding, cb */) {
var self = this;
var args = Array.prototype.slice.call(arguments);
var cb = args[args.length - 1];

if (typeof cb === 'function') {
var cbProxy = function() {
var index = self._callbacks.indexOf(cbProxy);
self._callbacks.splice(index, 1);

return cb.apply(this, arguments);
};

args[args.length - 1] = cbProxy;
this._callbacks.push(cbProxy);
}

this._lazyConnect();

try {
this._socket.write.apply(this._socket, args);
} catch (err) {
if (cbProxy) cbProxy(err);

this._socket.destroy();
this._socket = null;
}
};

LazySocket.prototype._lazyConnect = function() {
if (this._socket) return;

var self = this;

function onerror(err) {
self._socket = null;
self._callbacks.forEach(function(cb) {
cb(err);
});
}

function onend() {
// "end" is called when the socket connection is terminated, regardless of the termination being unexpected or not
// to distinguish between unexpected terminations (e.g need reconnection)
// from expected terminations (e.g calling LazySocket's .end() or .destroy()), the later are flagged as "closed"

if (!self._closed) {
self._socket = null;
}
}

this._socket = net
.createConnection(this.port, this.host)
.once('error', onerror)
.once('end', onend);
};

LazySocket.prototype.end = function() {
this._closed = true;
if (this._socket) this._socket.end();
};

LazySocket.prototype.destroy = function() {
this._closed = true;
if (this._socket) this._socket.destroy();
};
3 changes: 0 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
"engines": {
"node": ">=4"
},
"dependencies": {
"lazy-socket": "0.0.3"
},
"devDependencies": {
"sinon": "~7.1.1",
"urun": "0.0.8",
Expand Down
56 changes: 56 additions & 0 deletions test/integration/test-lazy-socket-connection-interrupt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
var common = require('../common');
var assert = require('assert');
var net = require('net');
var LazySocket = require('../../lib/LazySocket');
var data = '';

var num = 0;
var server = net.createServer(function(socket) {
socket
.on('data', function(chunk) {
data += chunk;
});

num++;
if (num === 1) {
socket
.on('end', sendSecondMessage)
.end();

server.close();
}

if (num === 2) {
socket.on('end', function() {
server.close();
});
}
});

server.listen(common.port, sendFirstMessage);

var socket = LazySocket.createConnection(common.port);
function sendFirstMessage() {
server.removeAllListeners('listening')
socket.write('first', 'utf-8', function(err) {
assert.ok(!err);
});
}

function sendSecondMessage() {
socket.write('second ', 'utf-8', function(err) {
assert.ok(err);
server.listen(common.port, sendThirdMessage);
});
}

function sendThirdMessage() {
socket.write('third', 'utf-8', function(err) {
assert.ok(!err);
socket.end();
});
}

process.on('exit', function() {
assert.equal(data, 'firstthird');
});
35 changes: 35 additions & 0 deletions test/integration/test-lazy-socket-failed-then-working-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
var common = require('../common');
var assert = require('assert');
var net = require('net');
var LazySocket = require('../../lib/LazySocket');
var data = '';

var server = net.createServer(function(socket) {
socket
.on('data', function(chunk) {
data += chunk;
})
.on('end', function() {
server.close();
});
});

var socket = LazySocket.createConnection(common.port);

var connectError;
socket.write('high', 'utf-8', function(err) {
connectError = err;

server.listen(common.port, function() {
socket.write('five', 'utf-8', function(err) {
assert.ok(!err);
socket.end();
});
});

});

process.on('exit', function() {
assert.ok(connectError);
assert.equal(data, 'five');
});
28 changes: 28 additions & 0 deletions test/integration/test-regular-connect-write-disconnect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
var common = require('../common');
var assert = require('assert');
var net = require('net');
var LazySocket = require('../../lib/LazySocket');
var data = '';

var server = net.createServer(function(socket) {
socket
.on('data', function(chunk) {
data += chunk;
})
.on('end', function() {
server.close();
});
});

server.listen(common.port, function() {
var socket = LazySocket.createConnection(common.port);
socket.write('high ', 'utf-8');
socket.write('five', 'utf-8', function(err) {
assert.ok(!err);
socket.end();
});
});

process.on('exit', function() {
assert.equal(data, 'high five');
});
64 changes: 64 additions & 0 deletions test/unit/test-LazySocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
var test = require('utest');
var assert = require('assert');
var LazySocket = require('../../lib/LazySocket');
var sinon = require('sinon');
var net = require('net');

test('LazySocket#createConnection', {
'returns a new LazySocket': function() {
var socket = LazySocket.createConnection();
assert.ok(socket instanceof LazySocket);
},

'sets the passed host / port': function() {
var socket = LazySocket.createConnection(8080, 'example.org');
assert.equal(socket.port, 8080);
assert.equal(socket.host, 'example.org');
},
});

var socket;
var fakeSocket;
test('LazySocket', {
before: function() {
socket = new LazySocket();
fakeSocket = sinon.stub({
once: function() {},
destroy: function() {},
end: function() {},
write: function() {},
});

sinon.stub(net, 'createConnection').returns(fakeSocket);
fakeSocket.once.returns(fakeSocket);

// To establish a connection
socket.write();
},

after: function() {
net.createConnection.restore();
},

'#end when disconnected (does not blow up)': function() {
socket = new LazySocket();
socket.end();
},

'#end when connected': function() {
socket.end();

assert.ok(fakeSocket.end.calledOnce);
},

'#destroy when disconnected (does not blow up)': function() {
var socket = new LazySocket();
socket.destroy();
},

'#destroy when connected': function() {
socket.destroy();

assert.ok(fakeSocket.destroy.calledOnce);
},
});

0 comments on commit f513603

Please sign in to comment.