Skip to content

Commit

Permalink
transformWithCancel: fix race condition in cancellation (#43)
Browse files Browse the repository at this point in the history
Relevant for e.g.passiveClones.
  • Loading branch information
larabr authored Jun 18, 2024
1 parent 3636c58 commit f655016
Showing 1 changed file with 18 additions and 4 deletions.
22 changes: 18 additions & 4 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ function transformRaw(input, options) {
* @param {Function} cancel
* @returns {TransformStream}
*/
function transformWithCancel(cancel) {
function transformWithCancel(customCancel) {
let pulled = false;
let backpressureChangePromiseResolve;
let cancelled = false;
let backpressureChangePromiseResolve, backpressureChangePromiseReject;
let outputController;
return {
readable: new ReadableStream({
Expand All @@ -234,16 +235,29 @@ function transformWithCancel(cancel) {
pulled = true;
}
},
cancel
async cancel(reason) {
cancelled = true;
if (customCancel) {
await customCancel(reason);
}
if (backpressureChangePromiseReject) {
backpressureChangePromiseReject(reason);
}
}
}, {highWaterMark: 0}),
writable: new WritableStream({
write: async function(chunk) {
if (cancelled) {
throw new Error('Stream is cancelled');
}
outputController.enqueue(chunk);
if (!pulled) {
await new Promise(resolve => {
await new Promise((resolve, reject) => {
backpressureChangePromiseResolve = resolve;
backpressureChangePromiseReject = reject;
});
backpressureChangePromiseResolve = null;
backpressureChangePromiseReject = null;
} else {
pulled = false;
}
Expand Down

0 comments on commit f655016

Please sign in to comment.