Skip to content

Commit

Permalink
feat: workerEmit
Browse files Browse the repository at this point in the history
  • Loading branch information
Akryum committed Jan 18, 2021
1 parent b0e7269 commit 805e9b6
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 19 deletions.
8 changes: 5 additions & 3 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ function Pool(script, options) {
* will be stringified and executed via the
* workers built-in function `run(fn, args)`.
* @param {Array} [params] Function arguments applied when calling the function
* @param {ExecOptions} [options] Options object
* @return {Promise.<*, Error>} result
*/
Pool.prototype.exec = function (method, params) {
Pool.prototype.exec = function (method, params, options) {
// validate type of arguments
if (params && !Array.isArray(params)) {
throw new TypeError('Array expected as argument "params"');
Expand All @@ -109,7 +110,8 @@ Pool.prototype.exec = function (method, params) {
method: method,
params: params,
resolver: resolver,
timeout: null
timeout: null,
options: options
};
tasks.push(task);

Expand Down Expand Up @@ -202,7 +204,7 @@ Pool.prototype._next = function () {
// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
// send the request to the worker
var promise = worker.exec(task.method, task.params, task.resolver)
var promise = worker.exec(task.method, task.params, task.resolver, task.options)
.then(me._boundNext)
.catch(function () {
// if the worker crashed and terminated, remove it from the pool
Expand Down
38 changes: 23 additions & 15 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,27 @@ function WorkerHandler(script, _options) {
var id = response.id;
var task = me.processing[id];
if (task !== undefined) {
// remove the task from the queue
delete me.processing[id];
if (response.eventType != null) {
if (task.options && typeof task.options.on === 'function') {
task.options.on(response.eventType, response.payload);
}
} else {
// remove the task from the queue
delete me.processing[id];

// test if we need to terminate
if (me.terminating === true) {
// complete worker termination if all tasks are finished
me.terminate();
}
// test if we need to terminate
if (me.terminating === true) {
// complete worker termination if all tasks are finished
me.terminate();
}

// resolve the task's promise
if (response.error) {
task.resolver.reject(objectToError(response.error));
}
else {
task.resolver.resolve(response.result);
// resolve the task's promise
if (response.error) {
task.resolver.reject(objectToError(response.error));
}
else {
task.resolver.resolve(response.result);
}
}
}
}
Expand Down Expand Up @@ -303,9 +309,10 @@ WorkerHandler.prototype.methods = function () {
* @param {String} method
* @param {Array} [params]
* @param {{resolve: Function, reject: Function}} [resolver]
* @param {ExecOptions} [options]
* @return {Promise.<*, Error>} result
*/
WorkerHandler.prototype.exec = function(method, params, resolver) {
WorkerHandler.prototype.exec = function(method, params, resolver, options) {
if (!resolver) {
resolver = Promise.defer();
}
Expand All @@ -316,7 +323,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver) {
// register a new task as being in progress
this.processing[id] = {
id: id,
resolver: resolver
resolver: resolver,
options: options
};

// build a JSON-RPC request
Expand Down
10 changes: 10 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ exports.worker = function worker(methods) {
worker.add(methods);
};

/**
* Sends an event to the parent worker pool.
* @param {string} eventType
* @param {unknown} payload
*/
exports.workerEmit = function workerEmit(eventType, payload) {
var worker = require('./worker');
worker.emit(eventType, payload);
};

/**
* Create a promise.
* @type {Promise} promise
Expand Down
7 changes: 6 additions & 1 deletion src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@
* @property {*} [forkArgs]
* @property {*} [forkOpts]
* @property {number} [debugPortStart]
*/
*/

/**
* @typedef {Object} ExecOptions
* @property {(eventType: string, payload: unknown) => unknown} [on]
*/
19 changes: 19 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ worker.methods.methods = function methods() {
return Object.keys(worker.methods);
};

var currentRequestId = null;

worker.on('message', function (request) {
if (request === TERMINATE_METHOD_ID) {
return worker.exit(0);
Expand All @@ -117,6 +119,8 @@ worker.on('message', function (request) {
var method = worker.methods[request.method];

if (method) {
currentRequestId = request.id;

// execute the function
var result = method.apply(method, request.params);

Expand All @@ -129,13 +133,15 @@ worker.on('message', function (request) {
result: result,
error: null
});
currentRequestId = null;
})
.catch(function (err) {
worker.send({
id: request.id,
result: null,
error: convertError(err)
});
currentRequestId = null;
});
}
else {
Expand All @@ -145,6 +151,8 @@ worker.on('message', function (request) {
result: result,
error: null
});

currentRequestId = null;
}
}
else {
Expand Down Expand Up @@ -178,6 +186,17 @@ worker.register = function (methods) {

};

worker.emit = function (eventType, payload) {
if (currentRequestId) {
worker.send({
id: currentRequestId,
eventType,
payload
});
}
};

if (typeof exports !== 'undefined') {
exports.add = worker.register;
exports.emit = worker.emit;
}
26 changes: 26 additions & 0 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -976,4 +976,30 @@ describe('Pool', function () {
});
});

it('should receive events from worker', function (done) {
var pool = new Pool(__dirname + '/workers/emit.js');

var receivedEvent

pool.exec('sendEvent', [], {
on: function (eventType, payload) {
receivedEvent = [eventType, payload]
}
})
.then(function (result) {
assert.strictEqual(result, 'done');
assert.strictEqual(receivedEvent[0], 'test-event');
assert.deepStrictEqual(receivedEvent[1], {
foo: 'bar'
});

pool.terminate();
done();
})
.catch(function (err) {
console.log(err);
assert('Should not throw an error');
done(err);
});
});
});
16 changes: 16 additions & 0 deletions test/workers/emit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// a simple worker
var workerpool = require('../../');

function sendEvent() {
return new Promise(function (resolve, reject) {
workerpool.workerEmit('test-event', {
foo: 'bar'
});
resolve('done');
});
}

// create a worker and register some functions
workerpool.worker({
sendEvent: sendEvent
});

0 comments on commit 805e9b6

Please sign in to comment.