From b003003b4c5d068723aa6b4e2f1c1b18c110ecf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ari=20Perkki=C3=B6?= Date: Sat, 13 Apr 2024 12:02:51 +0300 Subject: [PATCH] fix(child_process): prevent writing to terminating process (#85) * test: rename test file * fix(child_process): prevent writing to terminating process --- src/runtime/process-worker.ts | 14 +++++++--- ...on-timeout.test.ts => termination.test.ts} | 26 +++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) rename test/{termination-timeout.test.ts => termination.test.ts} (61%) diff --git a/src/runtime/process-worker.ts b/src/runtime/process-worker.ts index fddd214..4c07919 100644 --- a/src/runtime/process-worker.ts +++ b/src/runtime/process-worker.ts @@ -18,6 +18,7 @@ export default class ProcessWorker implements TinypoolWorker { port?: MessagePort channel?: TinypoolChannel waitForExit!: Promise + isTerminating = false initialize(options: Parameters[0]) { this.process = fork( @@ -42,6 +43,7 @@ export default class ProcessWorker implements TinypoolWorker { } async terminate() { + this.isTerminating = true this.process.off('exit', this.onUnexpectedExit) const sigkillTimeout = setTimeout( @@ -61,10 +63,16 @@ export default class ProcessWorker implements TinypoolWorker { // Mirror channel's messages to process this.channel.onMessage((message: any) => { - this.process.send(message) + this.send(message) }) } + private send(message: Parameters>[0]) { + if (!this.isTerminating) { + this.process.send(message) + } + } + postMessage(message: any, transferListItem?: Readonly) { transferListItem?.forEach((item) => { if (item instanceof MessagePort) { @@ -75,7 +83,7 @@ export default class ProcessWorker implements TinypoolWorker { // Mirror port's messages to process if (this.port) { this.port.on('message', (message) => - this.process.send(>{ + this.send(>{ ...message, source: 'port', __tinypool_worker_message__, @@ -83,7 +91,7 @@ export default class ProcessWorker implements TinypoolWorker { ) } - return this.process.send(>{ + return this.send(>{ ...message, source: 'pool', __tinypool_worker_message__, diff --git a/test/termination-timeout.test.ts b/test/termination.test.ts similarity index 61% rename from test/termination-timeout.test.ts rename to test/termination.test.ts index e6146ea..b34f922 100644 --- a/test/termination-timeout.test.ts +++ b/test/termination.test.ts @@ -30,3 +30,29 @@ test('termination timeout throws when worker does not terminate in time', async 'Failed to terminate worker' ) }) + +test('writing to terminating worker does not crash', async () => { + const listeners: ((msg: any) => void)[] = [] + + const pool = new Tinypool({ + runtime: 'child_process', + filename: resolve(__dirname, 'fixtures/sleep.js'), + minThreads: 1, + maxThreads: 1, + }) + + await pool.run( + {}, + { + channel: { + onMessage: (listener) => listeners.push(listener), + postMessage: () => {}, + }, + } + ) + + const destroyed = pool.destroy() + listeners.forEach((listener) => listener('Hello from main thread')) + + await destroyed +})