Skip to content

Commit

Permalink
feat: abort listener (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshLong145 authored Oct 11, 2024
1 parent e487987 commit 20d20db
Show file tree
Hide file tree
Showing 9 changed files with 627 additions and 27 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,38 @@ workerpool.worker({
});
```

Tasks may configure an `abort handler` to perform cleanup operations when `timeout` or `cancel` is called on a `task`. the `abortListenerTimeout` option can be configured to control when cleanup should be aborted in the case an `abortHandler` never resolves. This timeout trigger will cause the given worker to be cleaned up. Allowing a new worker to be created if need be.

```js
function asyncTimeout() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
},
{
abortListenerTimeout: 1000
}
);
```

### Events

You can send data back from workers to the pool while the task is being executed using the `workerEmit` function:
Expand Down
46 changes: 46 additions & 0 deletions examples/abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const workerpool = require("..");

var workerCount = 0;

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", {
// maximum time to wait for worker to cleanup it's resources
// on termination before forcefully stopping the worker
workerTerminateTimeout: 1000,
onCreateWorker: (args) => {
console.log("New worker created");
workerCount += 1;
}
});

function add (a, b) {
return a + b;
}

const main = async () => {
const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => {
console.log("task timeout");
console.log("timeout occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
});
await cleanedUpTask;

const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => {
console.log("task canceled");
console.log("cancel occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
});

await canceledTask;
}


main();
55 changes: 55 additions & 0 deletions examples/workers/cleanupAbort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
var workerpool = require("../..");

function asyncTimeout() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(async function () {
clearTimeout(timeout);
resolve();
});
});
}

function asyncAbortHandlerNeverResolves() {
var me = this;
return new Promise((resolve) => {
let timeout = setTimeout(() => {
resolve();
}, 5000);

// An abort listener allows for cleanup for a given worker
// such that it may be resused for future tasks
// if an execption is thrown within scope of the handler
// the worker instance will be destroyed.
me.worker.addAbortListener(function () {
clearTimeout(timeout);
return new Promise((res) => {
setTimeout(() => {
res();
resolve();
// set the timeout high so it will not resolve before the external
// timeout triggers and exits the worker
}, 1_000_000_000);
});
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves,
},
{
abortListenerTimeout: 1000
}
);
80 changes: 72 additions & 8 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const {validateOptions, forkOptsNames, workerThreadOptsNames, workerOptsNames} =
*/
var TERMINATE_METHOD_ID = '__workerpool-terminate__';

/**
* Special message by parent which causes a child process worker to perform cleaup
* steps before determining if the child process worker should be terminated.
*/
var CLEANUP_METHOD_ID = '__workerpool-cleanup__';

function ensureWorkerThreads() {
var WorkerThreads = tryRequireWorkerThreads()
if (!WorkerThreads) {
Expand Down Expand Up @@ -294,6 +300,20 @@ function WorkerHandler(script, _options) {
}
}
}

if (response.method === CLEANUP_METHOD_ID) {
var trackedTask = me.tracking[response.id];
if (trackedTask !== undefined) {
if (response.error) {
clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.reject(objectToError(response.error))
} else {
me.tracking && clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.resolve(trackedTask.result);
}
}
delete me.tracking[id];
}
}
});

Expand All @@ -306,6 +326,7 @@ function WorkerHandler(script, _options) {
me.processing[id].resolver.reject(error);
}
}

me.processing = Object.create(null);
}

Expand Down Expand Up @@ -337,7 +358,7 @@ function WorkerHandler(script, _options) {
});

this.processing = Object.create(null); // queue with tasks currently in progress

this.tracking = Object.create(null); // queue with tasks being monitored for cleanup status
this.terminating = false;
this.terminated = false;
this.cleaning = false;
Expand Down Expand Up @@ -399,17 +420,50 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
var me = this;
return resolver.promise.catch(function (error) {
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
me.tracking[id] = {
id,
resolver: Promise.defer()
};

// remove this task from the queue. It is already rejected (hence this
// catch event), and else it will be rejected again when terminating
delete me.processing[id];

// terminate worker
return me.terminateAndNotify(true)
.then(function() {
throw error;
}, function(err) {
throw err;
});
me.tracking[id].resolver.promise = me.tracking[id].resolver.promise.catch(function(err) {
delete me.tracking[id];

var promise = me.terminateAndNotify(true)
.then(function() {
throw err;
}, function(err) {
throw err;
});

return promise;
});

me.worker.send({
id,
method: CLEANUP_METHOD_ID
});


/**
* Sets a timeout to reject the cleanup operation if the message sent to the worker
* does not receive a response. see worker.tryCleanup for worker cleanup operations.
* Here we use the workerTerminateTimeout as the worker will be terminated if the timeout does invoke.
*
* We need this timeout in either case of a Timeout or Cancellation Error as if
* the worker does not send a message we still need to give a window of time for a response.
*
* The workerTermniateTimeout is used here if this promise is rejected the worker cleanup
* operations will occure.
*/
me.tracking[id].timeoutId = setTimeout(function() {
me.tracking[id].resolver.reject(error);
}, me.workerTerminateTimeout);

return me.tracking[id].resolver.promise;
} else {
throw error;
}
Expand Down Expand Up @@ -441,9 +495,18 @@ WorkerHandler.prototype.terminate = function (force, callback) {
this.processing[id].resolver.reject(new Error('Worker terminated'));
}
}

this.processing = Object.create(null);
}

// If we are terminating, cancel all tracked task for cleanup
for (var task of Object.values(me.tracking)) {
clearTimeout(task.timeoutId);
task.resolver.reject(new Error('Worker Terminating'));
}

me.tracking = Object.create(null);

if (typeof callback === 'function') {
this.terminationHandler = callback;
}
Expand All @@ -452,6 +515,7 @@ WorkerHandler.prototype.terminate = function (force, callback) {
var cleanup = function(err) {
me.terminated = true;
me.cleaning = false;

if (me.worker != null && me.worker.removeAllListeners) {
// removeAllListeners is only available for child_process
me.worker.removeAllListeners('message');
Expand Down
2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
/**
* @typedef {Object} WorkerRegisterOptions
* @property {(code: number | undefined) => PromiseLike<void> | void} [onTerminate] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The difference with pool's `onTerminateWorker` is that this callback runs in the worker context, while onTerminateWorker is executed on the main thread.
* @property {number} [abortListenerTimeout] The timeout in milliseconds to wait for a worker to clean up it's resources if an abort listener does not resolve, before stopping the worker forcefully. Default value is `1000`.
*/

/**
Expand Down
Loading

0 comments on commit 20d20db

Please sign in to comment.