From 52d26c2fc6046219b8d09debe5429db714954352 Mon Sep 17 00:00:00 2001 From: jakecastelli Date: Fri, 18 Oct 2024 21:27:46 +1030 Subject: [PATCH] stream: catch and forward error from dest.write --- lib/internal/streams/readable.js | 13 ++-- ...tream-pipe-objectmode-to-non-objectmode.js | 71 +++++++++++++++++++ 2 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 26ff5ec17c6f0c..ca8b4bcc851684 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1004,10 +1004,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.on('data', ondata); function ondata(chunk) { debug('ondata'); - const ret = dest.write(chunk); - debug('dest.write', ret); - if (ret === false) { - pause(); + try { + const ret = dest.write(chunk); + debug('dest.write', ret); + + if (ret === false) { + pause(); + } + } catch (error) { + dest.destroy(error); } } diff --git a/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js new file mode 100644 index 00000000000000..bca932d907d130 --- /dev/null +++ b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js @@ -0,0 +1,71 @@ +'use strict'; + +/** + * source is an object mode stream that only flows strings/buffer and the error is unrelated + * source is a binary stream and dest is a binary stream + */ +const common = require('../common'); +const assert = require('node:assert'); +const { Readable, Transform, Writable } = require('node:stream'); + +{ + const objectReadable = Readable.from([ + { hello: 'hello' }, + { world: 'world' }, + ]); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + cb(null); + }, + }); + + passThrough.on('error', common.mustCall()); + + objectReadable.pipe(passThrough); + + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of passThrough); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} + +{ + const stringReadable = Readable.from(['hello', 'world']); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + throw new Error('something went wrong'); + }, + }); + + passThrough.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + + stringReadable.pipe(passThrough); +} + +{ + const binaryData = Buffer.from('binary data'); + + const binaryReadable = new Readable({ + read() { + this.push(binaryData); + this.push(null); + } + }); + + const binaryWritable = new Writable({ + write(chunk, _encoding, cb) { + throw new Error('something went wrong'); + } + }); + + binaryWritable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + binaryReadable.pipe(binaryWritable); +}