diff --git a/lib/streams.js b/lib/streams.js index 4671312..308b792 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -218,9 +218,10 @@ function transformRaw(input, options) { * @param {Function} cancel * @returns {TransformStream} */ -function transformWithCancel(cancel) { +function transformWithCancel(customCancel) { let pulled = false; - let backpressureChangePromiseResolve; + let cancelled = false; + let backpressureChangePromiseResolve, backpressureChangePromiseReject; let outputController; return { readable: new ReadableStream({ @@ -234,16 +235,29 @@ function transformWithCancel(cancel) { pulled = true; } }, - cancel + async cancel(reason) { + cancelled = true; + if (customCancel) { + await customCancel(reason); + } + if (backpressureChangePromiseReject) { + backpressureChangePromiseReject(reason); + } + } }, {highWaterMark: 0}), writable: new WritableStream({ write: async function(chunk) { + if (cancelled) { + throw new Error('Stream is cancelled'); + } outputController.enqueue(chunk); if (!pulled) { - await new Promise(resolve => { + await new Promise((resolve, reject) => { backpressureChangePromiseResolve = resolve; + backpressureChangePromiseReject = reject; }); backpressureChangePromiseResolve = null; + backpressureChangePromiseReject = null; } else { pulled = false; }