Skip to content

Commit

Permalink
Fixes #14 issues with events emit order and missing events
Browse files Browse the repository at this point in the history
  • Loading branch information
harrisiirak committed Aug 14, 2016
1 parent cc75b67 commit 079ed81
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
43 changes: 23 additions & 20 deletions lib/webhdfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ WebHDFS.prototype.writeFile = function writeFile (path, data, append, opts, call
var remoteStream = this.createWriteStream(path, append, opts);

// Handle events
remoteStream.on('error', function onError (err) {
remoteStream.once('error', function onError (err) {
error = err;
});

remoteStream.on('finish', function onFinish () {
remoteStream.once('finish', function onFinish () {
return callback && callback(error);
});

Expand All @@ -434,7 +434,7 @@ WebHDFS.prototype.writeFile = function writeFile (path, data, append, opts, call
*
* @returns {Object}
*/
WebHDFS.prototype.appendFile = function writeFile (path, data, opts, callback) {
WebHDFS.prototype.appendFile = function appendFile (path, data, opts, callback) {
return this.writeFile(path, data, true, opts, callback);
};

Expand Down Expand Up @@ -514,6 +514,17 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,
throw new Error('path must be a string');
}

var emitError = function (instance, err) {
const isErrorEmitted = instance.errorEmitted;

if (!isErrorEmitted) {
instance.emit('error', err);
instance.emit('finish');
}

instance.errorEmitted = true;
};

var endpoint = this._getOperationEndpoint(append ? 'append' : 'create', path, extend({
overwrite: true,
permissions: '0777'
Expand All @@ -533,10 +544,10 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,
// Handle redirect only if there was not an error (e.g. res is defined)
if (res && self._isRedirect(res)) {
var upload = request(extend(params, { url: res.headers.location }), function (err, res, body) {
if (err) {
return req.emit('error', err);
} else if (self._isError(res)) {
return req.emit('error', self._parseError(body));
if (err || self._isError(res)) {
emitError(req, err || self._parseError(body));
req.end();
return;
}

if (res.headers.hasOwnProperty('location')) {
Expand All @@ -551,21 +562,13 @@ WebHDFS.prototype.createWriteStream = function createWriteStream (path, append,
stream.pipe(upload);
stream.resume();
}
});

// Handle possible server error
req.on('data', function onError (data) {
var error = self._parseError(data.toString());
if (error) {
stream.emit('error', error);
req.emit('error', error);
if (err || self._isError(res)) {
emitError(req, err || self._parseError(body));
return;
}
});

req.once('error', function onError (err) {
req.emit('finish'); // Request is finished
});

req.on('pipe', function onPipe (src) {
// Pause read stream
stream = src;
Expand Down Expand Up @@ -693,7 +696,7 @@ WebHDFS.prototype.createReadStream = function createReadStream (path, opts) {
*
* @returns {Object}
*/
WebHDFS.prototype.symlink = function writeFile (src, dest, createParent, callback) {
WebHDFS.prototype.symlink = function symlink (src, dest, createParent, callback) {
if (typeof createParent === 'function') {
callback = createParent;
createParent = false;
Expand Down Expand Up @@ -729,7 +732,7 @@ WebHDFS.prototype.symlink = function writeFile (src, dest, createParent, callbac
*
* @returns {Object}
*/
WebHDFS.prototype.unlink = function writeFile (path, recursive, callback) {
WebHDFS.prototype.unlink = function unlink (path, recursive, callback) {
if (typeof callback === 'undefined') {
callback = recursive;
recursive = null;
Expand Down
7 changes: 5 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "webhdfs",
"version": "1.0.0",
"version": "1.1.0",
"description": "Node.js WebHDFS REST API client",
"main": "lib/webhdfs.js",
"scripts": {
Expand Down Expand Up @@ -29,6 +29,9 @@
"dependencies": {
"buffer-stream-reader": "^0.1.1",
"extend": "^3.0.0",
"request": "^2.65.0"
"mocha": "^3.0.2",
"must": "^0.13.2",
"request": "^2.74.0",
"sinon": "^1.17.5"
}
}
1 change: 1 addition & 0 deletions test/webhdfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ describe('WebHDFS', function () {
done();
});
});

it('should open and read a file', function (done) {
hdfs.readFile(path + '/file-1', function (err, data) {
demand(err).be.null();
Expand Down

0 comments on commit 079ed81

Please sign in to comment.