diff --git a/README.md b/README.md index 84ea7320..3034fb59 100644 --- a/README.md +++ b/README.md @@ -393,6 +393,38 @@ workerpool.worker({ }); ``` +Tasks may configure an `abort handler` to perform cleanup operations when `timeout` or `cancel` is called on a `task`. the `abortListenerTimeout` option can be configured to control when cleanup should be aborted in the case an `abortHandler` never resolves. This timeout trigger will cause the given worker to be cleaned up. Allowing a new worker to be created if need be. + +```js +function asyncTimeout() { + var me = this; + return new Promise(function (resolve) { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + + // An abort listener allows for cleanup for a given worker + // such that it may be resused for future tasks + // if an execption is thrown within scope of the handler + // the worker instance will be destroyed. + me.worker.addAbortListener(async function () { + clearTimeout(timeout); + resolve(); + }); + }); +} + +// create a worker and register public functions +workerpool.worker( + { + asyncTimeout: asyncTimeout, + }, + { + abortListenerTimeout: 1000 + } +); +``` + ### Events You can send data back from workers to the pool while the task is being executed using the `workerEmit` function: diff --git a/examples/abort.js b/examples/abort.js new file mode 100644 index 00000000..9345da15 --- /dev/null +++ b/examples/abort.js @@ -0,0 +1,46 @@ +const workerpool = require(".."); + +var workerCount = 0; + +// create a worker pool +const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", { + // maximum time to wait for worker to cleanup it's resources + // on termination before forcefully stopping the worker + workerTerminateTimeout: 1000, + onCreateWorker: (args) => { + console.log("New worker created"); + workerCount += 1; + } +}); + +function add (a, b) { + return a + b; +} + +const main = async () => { + const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => { + console.log("task timeout"); + console.log("timeout occured: ", err.message); + console.log("worker count ", workerCount); + return pool.exec(add, [1, 2]).then((sum) => { + console.log('add result', sum); + console.log("worker count: ", workerCount); + }); + }); + await cleanedUpTask; + + const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => { + console.log("task canceled"); + console.log("cancel occured: ", err.message); + console.log("worker count ", workerCount); + return pool.exec(add, [1, 2]).then((sum) => { + console.log('add result', sum); + console.log("worker count: ", workerCount); + }); + }); + + await canceledTask; +} + + +main(); diff --git a/examples/workers/cleanupAbort.js b/examples/workers/cleanupAbort.js new file mode 100644 index 00000000..3c05a4b2 --- /dev/null +++ b/examples/workers/cleanupAbort.js @@ -0,0 +1,55 @@ +var workerpool = require("../.."); + +function asyncTimeout() { + var me = this; + return new Promise(function (resolve) { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + + // An abort listener allows for cleanup for a given worker + // such that it may be resused for future tasks + // if an execption is thrown within scope of the handler + // the worker instance will be destroyed. + me.worker.addAbortListener(async function () { + clearTimeout(timeout); + resolve(); + }); + }); +} + +function asyncAbortHandlerNeverResolves() { + var me = this; + return new Promise((resolve) => { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + + // An abort listener allows for cleanup for a given worker + // such that it may be resused for future tasks + // if an execption is thrown within scope of the handler + // the worker instance will be destroyed. + me.worker.addAbortListener(function () { + clearTimeout(timeout); + return new Promise((res) => { + setTimeout(() => { + res(); + resolve(); + // set the timeout high so it will not resolve before the external + // timeout triggers and exits the worker + }, 1_000_000_000); + }); + }); + }); +} + +// create a worker and register public functions +workerpool.worker( + { + asyncTimeout: asyncTimeout, + asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves, + }, + { + abortListenerTimeout: 1000 + } +); \ No newline at end of file diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 224c7645..1d855e41 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -10,6 +10,12 @@ const {validateOptions, forkOptsNames, workerThreadOptsNames, workerOptsNames} = */ var TERMINATE_METHOD_ID = '__workerpool-terminate__'; +/** + * Special message by parent which causes a child process worker to perform cleaup + * steps before determining if the child process worker should be terminated. + */ +var CLEANUP_METHOD_ID = '__workerpool-cleanup__'; + function ensureWorkerThreads() { var WorkerThreads = tryRequireWorkerThreads() if (!WorkerThreads) { @@ -294,6 +300,20 @@ function WorkerHandler(script, _options) { } } } + + if (response.method === CLEANUP_METHOD_ID) { + var trackedTask = me.tracking[response.id]; + if (trackedTask !== undefined) { + if (response.error) { + clearTimeout(trackedTask.timeoutId); + trackedTask.resolver.reject(objectToError(response.error)) + } else { + me.tracking && clearTimeout(trackedTask.timeoutId); + trackedTask.resolver.resolve(trackedTask.result); + } + } + delete me.tracking[id]; + } } }); @@ -306,6 +326,7 @@ function WorkerHandler(script, _options) { me.processing[id].resolver.reject(error); } } + me.processing = Object.create(null); } @@ -337,7 +358,7 @@ function WorkerHandler(script, _options) { }); this.processing = Object.create(null); // queue with tasks currently in progress - + this.tracking = Object.create(null); // queue with tasks being monitored for cleanup status this.terminating = false; this.terminated = false; this.cleaning = false; @@ -399,17 +420,50 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { var me = this; return resolver.promise.catch(function (error) { if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) { + me.tracking[id] = { + id, + resolver: Promise.defer() + }; + // remove this task from the queue. It is already rejected (hence this // catch event), and else it will be rejected again when terminating delete me.processing[id]; - // terminate worker - return me.terminateAndNotify(true) - .then(function() { - throw error; - }, function(err) { - throw err; - }); + me.tracking[id].resolver.promise = me.tracking[id].resolver.promise.catch(function(err) { + delete me.tracking[id]; + + var promise = me.terminateAndNotify(true) + .then(function() { + throw err; + }, function(err) { + throw err; + }); + + return promise; + }); + + me.worker.send({ + id, + method: CLEANUP_METHOD_ID + }); + + + /** + * Sets a timeout to reject the cleanup operation if the message sent to the worker + * does not receive a response. see worker.tryCleanup for worker cleanup operations. + * Here we use the workerTerminateTimeout as the worker will be terminated if the timeout does invoke. + * + * We need this timeout in either case of a Timeout or Cancellation Error as if + * the worker does not send a message we still need to give a window of time for a response. + * + * The workerTermniateTimeout is used here if this promise is rejected the worker cleanup + * operations will occure. + */ + me.tracking[id].timeoutId = setTimeout(function() { + me.tracking[id].resolver.reject(error); + }, me.workerTerminateTimeout); + + return me.tracking[id].resolver.promise; } else { throw error; } @@ -441,9 +495,18 @@ WorkerHandler.prototype.terminate = function (force, callback) { this.processing[id].resolver.reject(new Error('Worker terminated')); } } + this.processing = Object.create(null); } + // If we are terminating, cancel all tracked task for cleanup + for (var task of Object.values(me.tracking)) { + clearTimeout(task.timeoutId); + task.resolver.reject(new Error('Worker Terminating')); + } + + me.tracking = Object.create(null); + if (typeof callback === 'function') { this.terminationHandler = callback; } @@ -452,6 +515,7 @@ WorkerHandler.prototype.terminate = function (force, callback) { var cleanup = function(err) { me.terminated = true; me.cleaning = false; + if (me.worker != null && me.worker.removeAllListeners) { // removeAllListeners is only available for child_process me.worker.removeAllListeners('message'); diff --git a/src/generated/embeddedWorker.js b/src/generated/embeddedWorker.js index 5bf50d05..5b4c7390 100644 --- a/src/generated/embeddedWorker.js +++ b/src/generated/embeddedWorker.js @@ -3,4 +3,4 @@ * This file is automatically generated, * changes made in this file will be overwritten. */ -module.exports = "!function(e,n){\"object\"==typeof exports&&\"undefined\"!=typeof module?module.exports=n():\"function\"==typeof define&&define.amd?define(n):(e=\"undefined\"!=typeof globalThis?globalThis:e||self).worker=n()}(this,(function(){\"use strict\";function e(e){return e&&e.__esModule&&Object.prototype.hasOwnProperty.call(e,\"default\")?e.default:e}function n(e){return n=\"function\"==typeof Symbol&&\"symbol\"==typeof Symbol.iterator?function(e){return typeof e}:function(e){return e&&\"function\"==typeof Symbol&&e.constructor===Symbol&&e!==Symbol.prototype?\"symbol\":typeof e},n(e)}var t,r,o,i={};var s=(o||(o=1,function(e){var o=r?t:(r=1,t=function(e,n){this.message=e,this.transfer=n}),i={exit:function(){}};if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)i.on=function(e,n){addEventListener(e,(function(e){n(e.data)}))},i.send=function(e,n){n?postMessage(e,n):postMessage(e)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");var s;try{s=require(\"worker_threads\")}catch(e){if(\"object\"!==n(e)||null===e||\"MODULE_NOT_FOUND\"!==e.code)throw e}if(s&&null!==s.parentPort){var u=s.parentPort;i.send=u.postMessage.bind(u),i.on=u.on.bind(u),i.exit=process.exit.bind(process)}else i.on=process.on.bind(process),i.send=function(e){process.send(e)},i.on(\"disconnect\",(function(){process.exit(1)})),i.exit=process.exit.bind(process)}function d(e){return Object.getOwnPropertyNames(e).reduce((function(n,t){return Object.defineProperty(n,t,{value:e[t],enumerable:!0})}),{})}function f(e){return e&&\"function\"==typeof e.then&&\"function\"==typeof e.catch}i.methods={},i.methods.run=function(e,n){var t=new Function(\"return (\"+e+\").apply(null, arguments);\");return t.apply(t,n)},i.methods.methods=function(){return Object.keys(i.methods)},i.terminationHandler=void 0,i.cleanupAndExit=function(e){var n=function(){i.exit(e)};if(!i.terminationHandler)return n();var t=i.terminationHandler(e);f(t)?t.then(n,n):n()};var a=null;i.on(\"message\",(function(e){if(\"__workerpool-terminate__\"===e)return i.cleanupAndExit(0);try{var n=i.methods[e.method];if(!n)throw new Error('Unknown method \"'+e.method+'\"');a=e.id;var t=n.apply(n,e.params);f(t)?t.then((function(n){n instanceof o?i.send({id:e.id,result:n.message,error:null},n.transfer):i.send({id:e.id,result:n,error:null}),a=null})).catch((function(n){i.send({id:e.id,result:null,error:d(n)}),a=null})):(t instanceof o?i.send({id:e.id,result:t.message,error:null},t.transfer):i.send({id:e.id,result:t,error:null}),a=null)}catch(n){i.send({id:e.id,result:null,error:d(n)})}})),i.register=function(e,n){if(e)for(var t in e)e.hasOwnProperty(t)&&(i.methods[t]=e[t]);n&&(i.terminationHandler=n.onTerminate),i.send(\"ready\")},i.emit=function(e){if(a){if(e instanceof o)return void i.send({id:a,isEvent:!0,payload:e.message},e.transfer);i.send({id:a,isEvent:!0,payload:e})}},e.add=i.register,e.emit=i.emit}(i)),i);return e(s)}));\n//# sourceMappingURL=worker.min.js.map\n"; +module.exports = "!function(e,n){\"object\"==typeof exports&&\"undefined\"!=typeof module?module.exports=n():\"function\"==typeof define&&define.amd?define(n):(e=\"undefined\"!=typeof globalThis?globalThis:e||self).worker=n()}(this,(function(){\"use strict\";function e(n){return e=\"function\"==typeof Symbol&&\"symbol\"==typeof Symbol.iterator?function(e){return typeof e}:function(e){return e&&\"function\"==typeof Symbol&&e.constructor===Symbol&&e!==Symbol.prototype?\"symbol\":typeof e},e(n)}function n(e){return e&&e.__esModule&&Object.prototype.hasOwnProperty.call(e,\"default\")?e.default:e}var t={};var r=function(e,n){this.message=e,this.transfer=n},o={};function i(e,n){var t=this;if(!(this instanceof i))throw new SyntaxError(\"Constructor must be called with the new operator\");if(\"function\"!=typeof e)throw new SyntaxError(\"Function parameter handler(resolve, reject) missing\");var r=[],o=[];this.resolved=!1,this.rejected=!1,this.pending=!0;var a=function(e,n){r.push(e),o.push(n)};this.then=function(e,n){return new i((function(t,r){var o=e?u(e,t,r):t,i=n?u(n,t,r):r;a(o,i)}),t)};var f=function(e){return t.resolved=!0,t.rejected=!1,t.pending=!1,r.forEach((function(n){n(e)})),a=function(n,t){n(e)},f=d=function(){},t},d=function(e){return t.resolved=!1,t.rejected=!0,t.pending=!1,o.forEach((function(n){n(e)})),a=function(n,t){t(e)},f=d=function(){},t};this.cancel=function(){return n?n.cancel():d(new s),t},this.timeout=function(e){if(n)n.timeout(e);else{var r=setTimeout((function(){d(new c(\"Promise timed out after \"+e+\" ms\"))}),e);t.always((function(){clearTimeout(r)}))}return t},e((function(e){f(e)}),(function(e){d(e)}))}function u(e,n,t){return function(r){try{var o=e(r);o&&\"function\"==typeof o.then&&\"function\"==typeof o.catch?o.then(n,t):n(o)}catch(e){t(e)}}}function s(e){this.message=e||\"promise cancelled\",this.stack=(new Error).stack}function c(e){this.message=e||\"timeout exceeded\",this.stack=(new Error).stack}return i.prototype.catch=function(e){return this.then(null,e)},i.prototype.always=function(e){return this.then(e,e)},i.prototype.finally=function(e){var n=this,t=function(){return new i((function(e){return e()})).then(e).then((function(){return n}))};return this.then(t,t)},i.all=function(e){return new i((function(n,t){var r=e.length,o=[];r?e.forEach((function(e,i){e.then((function(e){o[i]=e,0==--r&&n(o)}),(function(e){r=0,t(e)}))})):n(o)}))},i.defer=function(){var e={};return e.promise=new i((function(n,t){e.resolve=n,e.reject=t})),e},s.prototype=new Error,s.prototype.constructor=Error,s.prototype.name=\"CancellationError\",i.CancellationError=s,c.prototype=new Error,c.prototype.constructor=Error,c.prototype.name=\"TimeoutError\",i.TimeoutError=c,o.Promise=i,function(n){var t=r,i=o.Promise,u=\"__workerpool-cleanup__\",s={exit:function(){}},c={addAbortListener:function(e){s.abortListeners.push(e)},emit:s.emit};if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)s.on=function(e,n){addEventListener(e,(function(e){n(e.data)}))},s.send=function(e,n){n?postMessage(e,n):postMessage(e)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");var a;try{a=require(\"worker_threads\")}catch(n){if(\"object\"!==e(n)||null===n||\"MODULE_NOT_FOUND\"!==n.code)throw n}if(a&&null!==a.parentPort){var f=a.parentPort;s.send=f.postMessage.bind(f),s.on=f.on.bind(f),s.exit=process.exit.bind(process)}else s.on=process.on.bind(process),s.send=function(e){process.send(e)},s.on(\"disconnect\",(function(){process.exit(1)})),s.exit=process.exit.bind(process)}function d(e){return Object.getOwnPropertyNames(e).reduce((function(n,t){return Object.defineProperty(n,t,{value:e[t],enumerable:!0})}),{})}function l(e){return e&&\"function\"==typeof e.then&&\"function\"==typeof e.catch}s.methods={},s.methods.run=function(e,n){var t=new Function(\"return (\"+e+\").apply(this, arguments);\");return t.worker=c,t.apply(t,n)},s.methods.methods=function(){return Object.keys(s.methods)},s.terminationHandler=void 0,s.abortListenerTimeout=1e3,s.abortListeners=[],s.cleanupAndExit=function(e){var n=function(){s.exit(e)};if(!s.terminationHandler)return n();var t=s.terminationHandler(e);l(t)?t.then(n,n):n()},s.tryCleanup=function(){if(s.abortListeners.length){var e,n=s.abortListeners.map((function(e){return e()})),t=new i((function(n,t){e=setTimeout(t,s.abortListenerTimeout)})),r=i.all(n).then((function(){clearTimeout(e),s.abortListeners.length||(s.abortListeners=[])}),(function(){clearTimeout(e),s.exit()}));return i.all([r,t])}return new i((function(e,n){n(new Error(\"Cleanup failed, exiting worker\"))}))};var p=null;s.on(\"message\",(function(e){if(\"__workerpool-terminate__\"===e)return s.cleanupAndExit(0);if(e.method===u)return s.tryCleanup().then((function(){s.send({id:e.id,method:u,error:null})})).catch((function(n){s.send({id:e.id,method:u,error:n?d(n):null})}));try{var n=s.methods[e.method];if(!n)throw new Error('Unknown method \"'+e.method+'\"');p=e.id;var r=n.apply(n,e.params);l(r)?r.then((function(n){n instanceof t?s.send({id:e.id,result:n.message,error:null},n.transfer):s.send({id:e.id,result:n,error:null}),p=null})).catch((function(n){s.send({id:e.id,result:null,error:d(n)}),p=null})):(r instanceof t?s.send({id:e.id,result:r.message,error:null},r.transfer):s.send({id:e.id,result:r,error:null}),p=null)}catch(n){s.send({id:e.id,result:null,error:d(n)})}})),s.register=function(e,n){if(e)for(var t in e)e.hasOwnProperty(t)&&(s.methods[t]=e[t],s.methods[t].worker=c);n&&(s.terminationHandler=n.onTerminate,s.abortListenerTimeout=n.abortListenerTimeout||1e3),s.send(\"ready\")},s.emit=function(e){if(p){if(e instanceof t)return void s.send({id:p,isEvent:!0,payload:e.message},e.transfer);s.send({id:p,isEvent:!0,payload:e})}},n.add=s.register,n.emit=s.emit}(t),n(t)}));\n//# sourceMappingURL=worker.min.js.map\n"; diff --git a/src/types.js b/src/types.js index d07658fc..c4709fe9 100644 --- a/src/types.js +++ b/src/types.js @@ -36,6 +36,7 @@ /** * @typedef {Object} WorkerRegisterOptions * @property {(code: number | undefined) => PromiseLike | void} [onTerminate] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The difference with pool's `onTerminateWorker` is that this callback runs in the worker context, while onTerminateWorker is executed on the main thread. + * @property {number} [abortListenerTimeout] The timeout in milliseconds to wait for a worker to clean up it's resources if an abort listener does not resolve, before stopping the worker forcefully. Default value is `1000`. */ /** diff --git a/src/worker.js b/src/worker.js index c0a025e7..3d8bad41 100644 --- a/src/worker.js +++ b/src/worker.js @@ -2,21 +2,49 @@ * worker must be started as a child process or a web worker. * It listens for RPC messages from the parent process. */ + var Transfer = require('./transfer'); +/** + * worker must handle async cleanup handlers. Use custom Promise implementation. +*/ +var Promise = require('./Promise').Promise; /** * Special message sent by parent which causes the worker to terminate itself. * Not a "message object"; this string is the entire message. */ var TERMINATE_METHOD_ID = '__workerpool-terminate__'; +/** + * Special message by parent which causes a child process worker to perform cleaup + * steps before determining if the child process worker should be terminated. +*/ +var CLEANUP_METHOD_ID = '__workerpool-cleanup__'; // var nodeOSPlatform = require('./environment').nodeOSPlatform; + +var TIMEOUT_DEFAULT = 1_000; + // create a worker API for sending and receiving messages which works both on // node.js and in the browser var worker = { exit: function() {} }; + +// api for in worker communication with parent process +// works in both node.js and the browser +var publicWorker = { + /** + * + * @param {() => Promise} listener + */ + addAbortListener: function(listener) { + worker.abortListeners.push(listener); + }, + + emit: worker.emit +}; + if (typeof self !== 'undefined' && typeof postMessage === 'function' && typeof addEventListener === 'function') { // worker in the browser worker.on = function (event, callback) { @@ -95,7 +123,8 @@ worker.methods = {}; * @returns {*} */ worker.methods.run = function run(fn, args) { - var f = new Function('return (' + fn + ').apply(null, arguments);'); + var f = new Function('return (' + fn + ').apply(this, arguments);'); + f.worker = publicWorker; return f.apply(f, args); }; @@ -112,12 +141,20 @@ worker.methods.methods = function methods() { */ worker.terminationHandler = undefined; +worker.abortListenerTimeout = TIMEOUT_DEFAULT; + +/** + * Abort handlers for resolving errors which may cause a timeout or cancellation + * to occur from a worker context + */ +worker.abortListeners = []; + /** * Cleanup and exit the worker. * @param {Number} code - * @returns + * @returns {Promise} */ -worker.cleanupAndExit = function(code) { +worker.terminateAndExit = function(code) { var _exit = function() { worker.exit(code); } @@ -125,21 +162,104 @@ worker.cleanupAndExit = function(code) { if(!worker.terminationHandler) { return _exit(); } - + var result = worker.terminationHandler(code); if (isPromise(result)) { result.then(_exit, _exit); + + return result; } else { _exit(); + return new Promise(function (_resolve, reject) { + reject(new Error("Worker terminating")); + }); + } +} + + + +/** + * Called within the worker message handler to run abort handlers if registered to perform cleanup operations. + * @param {Integer} [requestId] id of task which is currently executing in the worker + * @return {Promise} +*/ +worker.cleanup = function(requestId) { + + if (!worker.abortListeners.length) { + worker.send({ + id: requestId, + method: CLEANUP_METHOD_ID, + error: convertError(new Error('Worker terminating')), + }); + + // If there are no handlers registered, reject the promise with an error as we want the handler to be notified + // that cleanup should begin and the handler should be GCed. + return new Promise(function(resolve) { resolve(); }); } + + + var _exit = function() { + worker.exit(); + } + + var _abort = function() { + if (!worker.abortListeners.length) { + worker.abortListeners = []; + } + } + + const promises = worker.abortListeners.map(listener => listener()); + let timerId; + const timeoutPromise = new Promise((_resolve, reject) => { + timerId = setTimeout(function () { + reject(new Error('Timeout occured waiting for abort handler, killing worker')); + }, worker.abortListenerTimeout); + }); + + // Once a promise settles we need to clear the timeout to prevet fulfulling the promise twice + const settlePromise = Promise.all(promises).then(function() { + clearTimeout(timerId); + _abort(); + }, function() { + clearTimeout(timerId); + _exit(); + }); + + // Returns a promise which will result in one of the following cases + // - Resolve once all handlers resolve + // - Reject if one or more handlers exceed the 'abortListenerTimeout' interval + // - Reject if one or more handlers reject + // Upon one of the above cases a message will be sent to the handler with the result of the handler execution + // which will either kill the worker if the result contains an error, or + return Promise.all([ + settlePromise, + timeoutPromise + ]).then(function() { + worker.send({ + id: requestId, + method: CLEANUP_METHOD_ID, + error: null, + }); + }, function(err) { + worker.send({ + id: requestId, + method: CLEANUP_METHOD_ID, + error: err ? convertError(err) : null, + }); + }); } var currentRequestId = null; worker.on('message', function (request) { if (request === TERMINATE_METHOD_ID) { - return worker.cleanupAndExit(0); + return worker.terminateAndExit(0); + } + + if (request.method === CLEANUP_METHOD_ID) { + return worker.cleanup(request.id); } + try { var method = worker.methods[request.method]; @@ -172,7 +292,7 @@ worker.on('message', function (request) { worker.send({ id: request.id, result: null, - error: convertError(err) + error: convertError(err), }); currentRequestId = null; }); @@ -220,12 +340,15 @@ worker.register = function (methods, options) { for (var name in methods) { if (methods.hasOwnProperty(name)) { worker.methods[name] = methods[name]; + worker.methods[name].worker = publicWorker; } } } if (options) { worker.terminationHandler = options.onTerminate; + // register listener timeout or default to 1 second + worker.abortListenerTimeout = options.abortListenerTimeout || TIMEOUT_DEFAULT; } worker.send('ready'); @@ -250,6 +373,7 @@ worker.emit = function (payload) { } }; + if (typeof exports !== 'undefined') { exports.add = worker.register; exports.emit = worker.emit; diff --git a/test/Pool.test.js b/test/Pool.test.js index 87999647..8775c074 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -660,6 +660,7 @@ describe('Pool', function () { assert(err instanceof Promise.TimeoutError); // we cannot assert that no workers remain in the pool, because that happens // on a different promise chain (termination is now async) + }); }); @@ -729,7 +730,7 @@ describe('Pool', function () { }); }); - it('should handle crashed workers (1)', function () { + it('should handle crashed workers', function () { var pool = createPool({maxWorkers: 1}); var promise = pool.exec(add) @@ -1256,7 +1257,7 @@ describe('Pool', function () { }); }); - it('should call worker termination handler (worker_thread)', function () { + it('should call worker termination handler', function () { var pool = createPool(__dirname + '/workers/cleanup.js'); var handlerCalled = false; @@ -1275,7 +1276,7 @@ describe('Pool', function () { }); }); - it('should call worker termination async handler (worker_thread)', function () { + it('should call worker termination async handler', function () { var pool = createPool(__dirname + '/workers/cleanup-async.js'); var handlerCalled = false; @@ -1315,18 +1316,249 @@ describe('Pool', function () { }); }); - it('should terminate inside finally', function () { - const pool = new Pool() + + describe('abort handler', () => { + it('should not terminate worker if abort listener is defined dedicated worker with Timeout', function () { + var workerCount = 0; + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 1, + onCreateWorker: () => { + workerCount += 1; + } + }); + + return pool.exec('asyncTimeout', []) + .timeout(200) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); + let stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + }).then(function() { + return pool.exec(add, [1, 2]) + }).then(function() { + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); - function forever() { - return new Promise(resolve => setTimeout(resolve, Infinity)) - } + }); + }); - return pool.exec(forever) - .timeout(50) - .finally(() => { - return pool.terminate() + it('should not terminate worker if abort listener is defined dedicated worker with Cancellation', function () { + var workerCount = 0; + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 1, + onCreateWorker: () => { + workerCount += 1; + } }); + + let task = pool.exec('asyncTimeout', [], {}); + + // Wrap in a new promise which waits 50ms + // in order to allow the function executing in the + // worker to + return new Promise(function(resolve) { + setTimeout(function() { + resolve(); + }, 50); + }).then(function() { + return task + .cancel() + .catch(function (err) { + assert(err instanceof Promise.CancellationError); + let stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + }).then(function() { + return pool.exec(add, [1, 2]) + }).then(function() { + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + + }); + }); + }); + + + it('should not terminate worker if abort listener is defined inline worker with Timeout', function () { + var workerCount = 0; + var pool = createPool({ + onCreateWorker: () => { + workerCount += 1; + }, + maxWorkers: 1, + }); + function asyncTimeout() { + var me = this; + return new Promise(function () { + let timeout = setTimeout(function() { + resolve(); + }, 5000); + me.worker.addAbortListener(function () { + return new Promise(function (resolve) { + clearTimeout(timeout); + resolve(); + }); + }); + }); + } + function add(a, b) { } + return pool.exec(asyncTimeout, [], { + }) + .timeout(200) + .catch(function(err) { + assert(err instanceof Promise.TimeoutError); + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + }).always(function () { + return pool.exec(add, [1, 2]).then(function () { + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + + }); + }); + }); + + it('should not terminate worker if abort listener is defined inline worker with Cancellation', function () { + var workerCount = 0; + var pool = createPool({ + onCreateWorker: () => { + workerCount += 1; + }, + maxWorkers: 1, + }); + + function asyncTimeout() { + var me = this; + return new Promise(function (_resolve, reject) { + let timeout = setTimeout(function() { + reject(new Error("should not be thrown")); + }, 5000); + me.worker.addAbortListener(function () { + return new Promise(function (resolve) { + clearTimeout(timeout); + resolve(); + }); + }); + }); + } + function add(a, b) { } + const task = pool.exec(asyncTimeout, [], { + }) + return new Promise(function(resolve) { + setTimeout(function() { + resolve(); + }, 50); + }).then(function() { + return task + .cancel() + .catch(function(err) { + assert(err instanceof Promise.TimeoutError); + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 1); + assert.strictEqual(stats.totalWorkers, 1); + }).always(function () { + return pool.exec(add, [1, 2]).then(function () { + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + + }); + }); + }); + + }); + + it('should invoke timeout for abort handler if timeout period is reached with Timeout', function () { + var workerCount = 0; + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 2, + onCreateWorker: function() { + workerCount += 1; + } + }); + + return pool.exec('asyncAbortHandlerNeverResolves', []) + .timeout(1000) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); + + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 1); + assert.strictEqual(stats.totalWorkers, 1); + }).always(function() { + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 0); + assert.strictEqual(stats.totalWorkers, 1); + return pool.exec(add, [1, 2]).then(function() { + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + + }); + }); + }); + + + it('should invoke timeout for abort handler if timeout period is reached with Cancellation', function () { + var workerCount = 0; + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 1, + onCreateWorker: function() { + workerCount += 1; + } + }); + + const task = pool.exec('asyncAbortHandlerNeverResolves', []) + + return new Promise(function(resolve) { + setTimeout(function() { + resolve(); + }, 50); + }).then(function() { + return task.cancel() + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); + var stats = pool.stats(); + assert(stats.busyWorkers === 1); + }).always(function() { + assert.strictEqual(workerCount, 1); + + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 0); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.totalWorkers, 1); + return pool.exec(add, [1, 2]).then(function() { + assert.strictEqual(workerCount, 1); + var stats = pool.stats(); + + assert.strictEqual(stats.busyWorkers, 0); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.totalWorkers, 1); + }); + }); + }); + }); }); describe('validate', () => { diff --git a/test/workers/cleanup-abort.js b/test/workers/cleanup-abort.js new file mode 100644 index 00000000..29709ed6 --- /dev/null +++ b/test/workers/cleanup-abort.js @@ -0,0 +1,46 @@ +var workerpool = require("../.."); + +function asyncTimeout() { + var me = this; + return new Promise(function (resolve) { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + + me.worker.addAbortListener(async function () { + clearTimeout(timeout); + resolve(); + }); + }); +} + +function asyncAbortHandlerNeverResolves() { + var me = this; + return new Promise((resolve) => { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + me.worker.addAbortListener(function () { + clearTimeout(timeout); + return new Promise((res) => { + setTimeout(() => { + res(); + resolve(); + // set the timeout high so it will not resolve before the external + // timeout triggers and exits the worker + }, 1_000_000_000); + }); + }); + }); +} + +// create a worker and register public functions +workerpool.worker( + { + asyncTimeout: asyncTimeout, + asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves, + }, + { + abortListenerTimeout: 1000 + } +); \ No newline at end of file