From 6e3329b5e96bd3900c06d939cd52cd759ec73cf2 Mon Sep 17 00:00:00 2001 From: jakecastelli Date: Fri, 18 Oct 2024 21:27:46 +1030 Subject: [PATCH] stream: catch and forward error from dest.write --- lib/internal/streams/readable.js | 13 ++-- ...tream-pipe-objectmode-to-non-objectmode.js | 67 +++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index 26ff5ec17c6f0c..ca8b4bcc851684 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1004,10 +1004,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.on('data', ondata); function ondata(chunk) { debug('ondata'); - const ret = dest.write(chunk); - debug('dest.write', ret); - if (ret === false) { - pause(); + try { + const ret = dest.write(chunk); + debug('dest.write', ret); + + if (ret === false) { + pause(); + } + } catch (error) { + dest.destroy(error); } } diff --git a/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js new file mode 100644 index 00000000000000..fda25c2e3cc167 --- /dev/null +++ b/test/parallel/test-stream-pipe-objectmode-to-non-objectmode.js @@ -0,0 +1,67 @@ +'use strict'; + +const common = require('../common'); +const assert = require('node:assert'); +const { Readable, Transform, Writable } = require('node:stream'); + +{ + const objectReadable = Readable.from([ + { hello: 'hello' }, + { world: 'world' }, + ]); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + cb(null); + }, + }); + + passThrough.on('error', common.mustCall()); + + objectReadable.pipe(passThrough); + + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of passThrough); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} + +{ + const stringReadable = Readable.from(['hello', 'world']); + + const passThrough = new Transform({ + transform(chunk, _encoding, cb) { + this.push(chunk); + throw new Error('something went wrong'); + }, + }); + + passThrough.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + + stringReadable.pipe(passThrough); +} + +{ + const binaryData = Buffer.from('binary data'); + + const binaryReadable = new Readable({ + read() { + this.push(binaryData); + this.push(null); + } + }); + + const binaryWritable = new Writable({ + write(chunk, _encoding, cb) { + throw new Error('something went wrong'); + } + }); + + binaryWritable.on('error', common.mustCall((err) => { + assert.strictEqual(err.message, 'something went wrong'); + })); + binaryReadable.pipe(binaryWritable); +}