diff --git a/src/stream.js b/src/stream.js index a7ab0152..c0d23c5d 100644 --- a/src/stream.js +++ b/src/stream.js @@ -101,7 +101,8 @@ function transformRaw(input, options) { * @returns {TransformStream} */ function transformWithCancel(cancel) { - let backpressureChangePromiseResolve = function() {}; + let pulled = false; + let backpressureChangePromiseResolve; let outputController; return { readable: new ReadableStream({ @@ -109,17 +110,24 @@ function transformWithCancel(cancel) { outputController = controller; }, pull() { - backpressureChangePromiseResolve(); + if (backpressureChangePromiseResolve) { + backpressureChangePromiseResolve(); + } else { + pulled = true; + } }, cancel - }), + }, {highWaterMark: 0}), writable: new WritableStream({ write: async function(chunk) { outputController.enqueue(chunk); - if (outputController.desiredSize <= 0) { + if (!pulled) { await new Promise(resolve => { backpressureChangePromiseResolve = resolve; }); + backpressureChangePromiseResolve = null; + } else { + pulled = false; } }, close: outputController.close.bind(outputController), diff --git a/src/util.js b/src/util.js index 963bf645..50175cfe 100644 --- a/src/util.js +++ b/src/util.js @@ -135,7 +135,7 @@ export default { value.postMessage({ action: 'cancel' }); }); } - }); + }, {highWaterMark: 0}); return; } util.restoreStreams(value);