Don't increase buffering in transformWithCancel

Keep backpressure the same as in default TransformStream().
This commit is contained in:
Daniel Huigens 2018-07-09 13:38:44 +02:00
parent 0b0112d1e6
commit 721e522b17
2 changed files with 13 additions and 5 deletions

View File

@ -101,7 +101,8 @@ function transformRaw(input, options) {
* @returns {TransformStream} * @returns {TransformStream}
*/ */
function transformWithCancel(cancel) { function transformWithCancel(cancel) {
let backpressureChangePromiseResolve = function() {}; let pulled = false;
let backpressureChangePromiseResolve;
let outputController; let outputController;
return { return {
readable: new ReadableStream({ readable: new ReadableStream({
@ -109,17 +110,24 @@ function transformWithCancel(cancel) {
outputController = controller; outputController = controller;
}, },
pull() { pull() {
backpressureChangePromiseResolve(); if (backpressureChangePromiseResolve) {
backpressureChangePromiseResolve();
} else {
pulled = true;
}
}, },
cancel cancel
}), }, {highWaterMark: 0}),
writable: new WritableStream({ writable: new WritableStream({
write: async function(chunk) { write: async function(chunk) {
outputController.enqueue(chunk); outputController.enqueue(chunk);
if (outputController.desiredSize <= 0) { if (!pulled) {
await new Promise(resolve => { await new Promise(resolve => {
backpressureChangePromiseResolve = resolve; backpressureChangePromiseResolve = resolve;
}); });
backpressureChangePromiseResolve = null;
} else {
pulled = false;
} }
}, },
close: outputController.close.bind(outputController), close: outputController.close.bind(outputController),

View File

@ -135,7 +135,7 @@ export default {
value.postMessage({ action: 'cancel' }); value.postMessage({ action: 'cancel' });
}); });
} }
}); }, {highWaterMark: 0});
return; return;
} }
util.restoreStreams(value); util.restoreStreams(value);