From ddda6a0b1639ec27640f7afbca752f5f375b01d4 Mon Sep 17 00:00:00 2001
From: Daniel Huigens <d.huigens@protonmail.com>
Date: Fri, 8 Jun 2018 16:40:02 +0200
Subject: [PATCH] Implement cancellation by manually linking together input and
 output streams

---
 package.json                                  |   4 +-
 src/encoding/armor.js                         |  60 ++--
 src/message.js                                |   4 +-
 src/openpgp.js                                |  17 +-
 src/packet/compressed.js                      |  13 +-
 src/packet/literal.js                         |  17 +-
 src/packet/packet.js                          |   3 +-
 src/packet/packetlist.js                      |  50 ++--
 src/packet/sym_encrypted_aead_protected.js    |  34 +--
 .../sym_encrypted_integrity_protected.js      |  19 +-
 src/stream.js                                 | 262 +++++++-----------
 src/util.js                                   |  25 +-
 test/general/streaming.js                     |  58 +++-
 test/unittests.js                             |   6 +-
 14 files changed, 285 insertions(+), 287 deletions(-)

diff --git a/package.json b/package.json
index 4477f3a0..da3846f9 100644
--- a/package.json
+++ b/package.json
@@ -72,6 +72,7 @@
     "whatwg-fetch": "^2.0.3"
   },
   "dependencies": {
+    "@mattiasbuelens/web-streams-polyfill": "0.1.0-alpha.4",
     "address-rfc2822": "^2.0.3",
     "asmcrypto.js": "^0.22.0",
     "asn1.js": "^5.0.0",
@@ -82,8 +83,7 @@
     "hash.js": "^1.1.3",
     "node-fetch": "^2.1.2",
     "node-localstorage": "~1.3.0",
-    "pako": "^1.0.6",
-    "web-streams-polyfill": "^1.3.2"
+    "pako": "^1.0.6"
   },
   "repository": {
     "type": "git",
diff --git a/src/encoding/armor.js b/src/encoding/armor.js
index 5d7c764d..2d22739a 100644
--- a/src/encoding/armor.js
+++ b/src/encoding/armor.js
@@ -212,27 +212,17 @@ function dearmor(input) {
       let headersDone;
       let text = [];
       let textDone;
-      let reader;
-      let controller;
-      let buffer = '';
-      let data = base64.decode(stream.transformRaw(input, {
-        transform: (value, controller) => process(buffer + value, controller),
-        flush: controller => process(buffer, controller)
-      }));
       let checksum;
-      const checksumVerified = getCheckSum(stream.clone(data));
-      data = stream.getReader(data).substream(); // Convert to Stream
-      data = stream.transform(data, value => value, async () => {
-        const checksumVerifiedString = await stream.readToEnd(checksumVerified);
-        if (checksum !== checksumVerifiedString && (checksum || config.checksum_required)) {
-          throw new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" +
-                  checksumVerifiedString + "'");
-        }
-      });
-      function process(value, controller) {
-        const lineEndIndex = value.indexOf('\n') + 1;
-        if (lineEndIndex) {
-          let line = value.substr(0, lineEndIndex);
+      let data = base64.decode(stream.transformPair(input, async (readable, writable) => {
+        const reader = stream.getReader(readable);
+        const writer = stream.getWriter(writable);
+        while (true) {
+          await writer.ready;
+          let line = await reader.readLine();
+          if (line === undefined) {
+            writer.abort('Misformed armored text');
+            break;
+          }
           // remove trailing whitespace at end of lines
           // remove leading whitespace for compat with older versions of OpenPGP.js
           line = line.trim();
@@ -265,24 +255,32 @@ function dearmor(input) {
           } else {
             if (!reSplit.test(line)) {
               if (line[0] !== '=') {
-                controller.enqueue(line);
+                writer.write(line);
               } else {
                 checksum = line.substr(1);
               }
             } else {
-              controller.close();
-              return;
+              writer.close();
+              break;
             }
           }
-          process(value.substr(lineEndIndex), controller);
-        } else {
-          buffer = value;
         }
-        // if (line === undefined) {
-        //   controller.error('Misformed armored text');
-        //   break;
-        // }
-      }
+      }));
+      data = stream.transformPair(data, async (readable, writable) => {
+        const checksumVerified = getCheckSum(stream.clone(readable));
+        stream.pipe(readable, writable, {
+          preventClose: true
+        });
+        const checksumVerifiedString = await stream.readToEnd(checksumVerified);
+        const writer = stream.getWriter(writable);
+        await writer.ready;
+        if (checksum !== checksumVerifiedString && (checksum || config.checksum_required)) {
+          writer.abort(new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" +
+                  checksumVerifiedString + "'"));
+        } else {
+          writer.close();
+        }
+      });
     } catch(e) {
       reject(e);
     }
diff --git a/src/message.js b/src/message.js
index 21a31cdc..1516a07e 100644
--- a/src/message.js
+++ b/src/message.js
@@ -127,7 +127,7 @@ Message.prototype.decrypt = async function(privateKeys, passwords, sessionKeys)
       exception = e;
     }
   }
-  // We don't await stream.cancel here because... it sometimes hangs indefinitely. No clue why.
+  // We don't await stream.cancel here because it only returns when the other copy is canceled too.
   stream.cancel(symEncryptedPacket.encrypted); // Don't keep copy of encrypted data in memory.
   symEncryptedPacket.encrypted = null;
 
@@ -543,7 +543,7 @@ Message.prototype.verify = async function(keys, date=new Date()) {
     throw new Error('Can only verify message with one literal data packet.');
   }
   if (msg.packets.stream) {
-    let onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature);
+    const onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature);
     onePassSigList.forEach(onePassSig => {
       onePassSig.signatureData = stream.fromAsync(() => new Promise(resolve => {
         onePassSig.signatureDataResolve = resolve;
diff --git a/src/openpgp.js b/src/openpgp.js
index cd9790d6..cf2afc09 100644
--- a/src/openpgp.js
+++ b/src/openpgp.js
@@ -329,7 +329,6 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
     if (armor) {
       result.data = encrypted.message.armor();
       result.data = await convertStream(result.data, asStream);
-      // result.cancel = stream.cancel.bind(result.data);
     } else {
       result.message = encrypted.message;
     }
@@ -365,18 +364,22 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe
     return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, asStream, signature, date });
   }
 
-  return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(message) {
+  return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(decrypted) {
     if (!publicKeys) {
       publicKeys = [];
     }
 
     const result = {};
-    result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date);
-    result.data = format === 'binary' ? message.getLiteralData() : message.getText();
+    result.signatures = signature ? await decrypted.verifyDetached(signature, publicKeys, date) : await decrypted.verify(publicKeys, date);
+    result.data = format === 'binary' ? decrypted.getLiteralData() : decrypted.getText();
     result.data = await convertStream(result.data, asStream);
     result.signatures = await convertStreamArray(result.signatures, asStream);
-    result.filename = message.getFilename();
-    // result.cancel = stream.cancel.bind(message.packets);
+    if (asStream) {
+      result.data = stream.transformPair(message.packets.stream, async (readable, writable) => {
+        await stream.pipe(result.data, writable);
+      });
+    }
+    result.filename = decrypted.getFilename();
     return result;
   }).catch(onError.bind(null, 'Error decrypting message'));
 }
@@ -428,7 +431,6 @@ export function sign({ data, dataType, privateKeys, armor=true, asStream, detach
       if (armor) {
         result.data = message.armor();
         result.data = await convertStream(result.data, asStream);
-        // result.cancel = stream.cancel.bind(result.data);
       } else {
         result.message = message;
       }
@@ -464,7 +466,6 @@ export function verify({ message, publicKeys, asStream, signature=null, date=new
     result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData();
     result.data = await convertStream(result.data, asStream);
     result.signatures = await convertStreamArray(result.signatures, asStream);
-    // result.cancel = stream.cancel.bind(message.packets);
     return result;
   }).catch(onError.bind(null, 'Error verifying cleartext signed message'));
 }
diff --git a/src/packet/compressed.js b/src/packet/compressed.js
index f20c9493..84dcacaa 100644
--- a/src/packet/compressed.js
+++ b/src/packet/compressed.js
@@ -70,15 +70,16 @@ function Compressed() {
  * @param {String} bytes Payload of a tag 8 packet
  */
 Compressed.prototype.read = async function (bytes) {
-  const reader = stream.getReader(bytes);
+  await stream.parse(bytes, async reader => {
 
-  // One octet that gives the algorithm used to compress the packet.
-  this.algorithm = enums.read(enums.compression, await reader.readByte());
+    // One octet that gives the algorithm used to compress the packet.
+    this.algorithm = enums.read(enums.compression, await reader.readByte());
 
-  // Compressed data, which makes up the remainder of the packet.
-  this.compressed = reader.substream();
+    // Compressed data, which makes up the remainder of the packet.
+    this.compressed = reader.remainder();
 
-  await this.decompress();
+    await this.decompress();
+  });
 };
 
 
diff --git a/src/packet/literal.js b/src/packet/literal.js
index d3c10f2c..3d9c7988 100644
--- a/src/packet/literal.js
+++ b/src/packet/literal.js
@@ -139,18 +139,19 @@ Literal.prototype.getFilename = function() {
  * @returns {module:packet.Literal} object representation
  */
 Literal.prototype.read = async function(bytes) {
-  const reader = stream.getReader(bytes);
-  // - A one-octet field that describes how the data is formatted.
-  const format = enums.read(enums.literal, await reader.readByte());
+  await stream.parse(bytes, async reader => {
+    // - A one-octet field that describes how the data is formatted.
+    const format = enums.read(enums.literal, await reader.readByte());
 
-  const filename_len = await reader.readByte();
-  this.filename = util.decode_utf8(util.Uint8Array_to_str(await reader.readBytes(filename_len)));
+    const filename_len = await reader.readByte();
+    this.filename = util.decode_utf8(util.Uint8Array_to_str(await reader.readBytes(filename_len)));
 
-  this.date = util.readDate(await reader.readBytes(4));
+    this.date = util.readDate(await reader.readBytes(4));
 
-  const data = reader.substream();
+    const data = reader.remainder();
 
-  this.setBytes(data, format);
+    this.setBytes(data, format);
+  });
 };
 
 /**
diff --git a/src/packet/packet.js b/src/packet/packet.js
index bc4e0b6d..fc84d28b 100644
--- a/src/packet/packet.js
+++ b/src/packet/packet.js
@@ -259,10 +259,11 @@ export default {
       if (controller) {
         controller.close();
       }
-      return !done && value && value.length;
+      return done || !value || !value.length;
     } catch(e) {
       if (controller) {
         controller.error(e);
+        return true;
       } else {
         throw e;
       }
diff --git a/src/packet/packetlist.js b/src/packet/packetlist.js
index 232e8780..8351ca1b 100644
--- a/src/packet/packetlist.js
+++ b/src/packet/packetlist.js
@@ -36,34 +36,34 @@ function List() {
  * @param {Uint8Array} A Uint8Array of bytes.
  */
 List.prototype.read = async function (bytes) {
-  this.stream = new ReadableStream({
-    pull: async controller => {
-      try {
-        if (!await packetParser.read(bytes, async parsed => {
-          try {
-            const tag = enums.read(enums.packet, parsed.tag);
-            const packet = packets.newPacketFromTag(tag);
-            packet.packets = new List();
-            packet.fromStream = util.isStream(parsed.packet);
-            await packet.read(parsed.packet);
-            controller.enqueue(packet);
-          } catch (e) {
-            if (!config.tolerant ||
-                parsed.tag === enums.packet.symmetricallyEncrypted ||
-                parsed.tag === enums.packet.literal ||
-                parsed.tag === enums.packet.compressed) {
-              controller.error(e);
-            }
-            util.print_debug_error(e);
+  this.stream = stream.transformPair(bytes, async (readable, writable) => {
+    const writer = stream.getWriter(writable);
+    while (true) {
+      await writer.ready;
+      const done = await packetParser.read(readable, async parsed => {
+        try {
+          const tag = enums.read(enums.packet, parsed.tag);
+          const packet = packets.newPacketFromTag(tag);
+          packet.packets = new List();
+          packet.fromStream = util.isStream(parsed.packet);
+          await packet.read(parsed.packet);
+          writer.write(packet);
+        } catch (e) {
+          if (!config.tolerant ||
+              parsed.tag === enums.packet.symmetricallyEncrypted ||
+              parsed.tag === enums.packet.literal ||
+              parsed.tag === enums.packet.compressed) {
+            writer.abort(e);
           }
-        })) {
-          controller.close();
+          util.print_debug_error(e);
         }
-      } catch(e) {
-        controller.error(e);
+      });
+      if (done) {
+        await writer.ready;
+        writer.close();
+        return;
       }
-    },
-    cancel: stream.cancel.bind(bytes)
+    }
   });
 
   // Wait until first few packets have been read
diff --git a/src/packet/sym_encrypted_aead_protected.js b/src/packet/sym_encrypted_aead_protected.js
index ab72eade..2458c879 100644
--- a/src/packet/sym_encrypted_aead_protected.js
+++ b/src/packet/sym_encrypted_aead_protected.js
@@ -144,23 +144,18 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
     let cryptedBytes = 0;
     let queuedBytes = 0;
     const iv = this.iv;
-    let buffer = [];
-    return stream.transformRaw(data, {
-      transform: process,
-      flush: controller => process(undefined, controller, true)
-    });
-    async function process(value, controller, final) {
-      if (!final) buffer.push(value);
-      while (buffer.reduce(((acc, value) => acc + value.length), 0) >= (final ? 0 : chunkSize) + tagLengthIfDecrypting) {
-        const bufferConcat = util.concatUint8Array(buffer);
-        let chunk = bufferConcat.subarray(0, chunkSize + tagLengthIfDecrypting);
-        buffer = [bufferConcat.subarray(chunkSize + tagLengthIfDecrypting)];
+    return stream.transformPair(data, async (readable, writable) => {
+      const reader = stream.getReader(readable);
+      const writer = stream.getWriter(writable);
+      while (true) {
+        await writer.ready;
+        let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array();
         const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting);
         chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting);
         let cryptedPromise;
         let done;
         if (!chunkIndex || chunk.length) {
-          buffer.unshift(finalChunk);
+          reader.unshift(finalChunk);
           cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray);
         } else {
           // After the last chunk, we either encrypt a final, empty
@@ -172,22 +167,23 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
         }
         cryptedBytes += chunk.length - tagLengthIfDecrypting;
         queuedBytes += chunk.length - tagLengthIfDecrypting;
+        // eslint-disable-next-line no-loop-func
         latestPromise = latestPromise.then(() => cryptedPromise).then(crypted => {
-          controller.enqueue(crypted);
+          writer.write(crypted);
           queuedBytes -= chunk.length;
-        }).catch(err => controller.error(err));
-        // console.log(fn, done, queuedBytes, controller.desiredSize);
-        if (done || queuedBytes > controller.desiredSize) {
+        }).catch(err => writer.abort(err));
+        // console.log(fn, done, queuedBytes, writer.desiredSize);
+        if (done || queuedBytes > writer.desiredSize) {
           await latestPromise; // Respect backpressure
         }
         if (!done) {
           adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...)
         } else {
-          controller.terminate();
-          return;
+          writer.close();
+          break;
         }
       }
-    }
+    });
   } else {
     return modeInstance[fn](await stream.readToEnd(data), this.iv);
   }
diff --git a/src/packet/sym_encrypted_integrity_protected.js b/src/packet/sym_encrypted_integrity_protected.js
index 12ebbea8..e03f01ef 100644
--- a/src/packet/sym_encrypted_integrity_protected.js
+++ b/src/packet/sym_encrypted_integrity_protected.js
@@ -65,17 +65,18 @@ function SymEncryptedIntegrityProtected() {
 }
 
 SymEncryptedIntegrityProtected.prototype.read = async function (bytes) {
-  const reader = stream.getReader(bytes);
+  await stream.parse(bytes, async reader => {
 
-  // - A one-octet version number. The only currently defined value is 1.
-  if (await reader.readByte() !== VERSION) {
-    throw new Error('Invalid packet version.');
-  }
+    // - A one-octet version number. The only currently defined value is 1.
+    if (await reader.readByte() !== VERSION) {
+      throw new Error('Invalid packet version.');
+    }
 
-  // - Encrypted data, the output of the selected symmetric-key cipher
-  //   operating in Cipher Feedback mode with shift amount equal to the
-  //   block size of the cipher (CFB-n where n is the block size).
-  this.encrypted = reader.substream();
+    // - Encrypted data, the output of the selected symmetric-key cipher
+    //   operating in Cipher Feedback mode with shift amount equal to the
+    //   block size of the cipher (CFB-n where n is the block size).
+    this.encrypted = reader.remainder();
+  });
 };
 
 SymEncryptedIntegrityProtected.prototype.write = function () {
diff --git a/src/stream.js b/src/stream.js
index 251c67eb..dadebc33 100644
--- a/src/stream.js
+++ b/src/stream.js
@@ -1,7 +1,7 @@
 import util from './util';
 
-// if (typeof ReadableStream === 'undefined') {
-  Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill'));
+// if (typeof TransformStream === 'undefined') {
+  Object.assign(typeof window !== 'undefined' ? window : global, require('@mattiasbuelens/web-streams-polyfill'));
 // }
 
 const nodeStream = util.getNodeStream();
@@ -18,39 +18,31 @@ function toStream(input) {
   });
 }
 
-function pipeThrough(input, target, options) {
-  if (!util.isStream(input)) {
-    input = toStream(input);
-  }
-  return input.pipeThrough(target, options);
-}
-
 function concat(arrays) {
   arrays = arrays.map(toStream);
-  let controller;
-  const transform = new TransformStream({
-    start(_controller) {
-      controller = _controller;
-    },
-    cancel: () => {
-      return Promise.all(arrays.map(cancel));
-    }
-  });
-  (async () => {
-    for (let i = 0; i < arrays.length; i++) {
-      // await new Promise(resolve => {
-      try {
-        await arrays[i].pipeTo(transform.writable, {
-          preventClose: i !== arrays.length - 1
-        });
-      } catch(e) {
-        console.log(e);
-        // controller.error(e);
-        return;
+  let outputController;
+  const transform = {
+    readable: new ReadableStream({
+      start(_controller) {
+        outputController = _controller;
+      },
+      async cancel(reason) {
+        await Promise.all(transforms.map(array => cancel(array, reason)));
       }
-      // });
-    }
-  })();
+    }),
+    writable: new WritableStream({
+      write: outputController.enqueue.bind(outputController),
+      close: outputController.close.bind(outputController),
+      abort: outputController.error.bind(outputController)
+    })
+  };
+  let prev = Promise.resolve();
+  const transforms = arrays.map((array, i) => transformPair(array, (readable, writable) => {
+    prev = prev.then(() => pipe(readable, transform.writable, {
+      preventClose: i !== arrays.length - 1
+    }));
+    return prev;
+  }));
   return transform.readable;
 }
 
@@ -58,6 +50,10 @@ function getReader(input) {
   return new Reader(input);
 }
 
+function getWriter(input) {
+  return input.getWriter();
+}
+
 function create(options, extraArg) {
   const promises = new Map();
   const wrap = fn => fn && (controller => {
@@ -65,33 +61,32 @@ function create(options, extraArg) {
     promises.set(fn, returnValue);
     return returnValue;
   });
+  options.options = Object.assign({}, options);
   options.start = wrap(options.start);
   options.pull = wrap(options.pull);
-  const _cancel = options.cancel;
-  options.cancel = async reason => {
-    try {
-      console.log('cancel wrapper', reason, options);
-      await promises.get(options.start);
-      console.log('awaited start');
-      await promises.get(options.pull);
-      console.log('awaited pull');
-    } finally {
-      if (_cancel) return _cancel.call(options, reason, extraArg);
-    }
-  };
-  options.options = options;
   return new ReadableStream(options);
 }
 
-function transformRaw(input, options) {
-  options.start = controller => {
-    if (input.externalBuffer) {
-      input.externalBuffer.forEach(chunk => {
-        options.transform(chunk, controller);
-      });
+async function pipe(input, target, options) {
+  if (!util.isStream(input)) {
+    input = toStream(input);
+  }
+  if (input.externalBuffer) {
+    const writer = target.getWriter();
+    for (let i = 0; i < input.externalBuffer.length; i++) {
+      await writer.ready;
+      writer.write(input.externalBuffer[i]);
     }
-  };
-  return toStream(input).pipeThrough(new TransformStream(options));
+    writer.releaseLock();
+  }
+  return input.pipeTo(target, options);
+}
+
+function transformRaw(input, options) {
+  options.cancel = cancel.bind(input);
+  const transformStream = new TransformStream(options);
+  pipe(input, transformStream.writable);
+  return transformStream.readable;
 }
 
 function transform(input, process = () => undefined, finish = () => undefined) {
@@ -121,6 +116,60 @@ function transform(input, process = () => undefined, finish = () => undefined) {
   return result1 !== undefined ? result1 : result2;
 }
 
+function transformPair(input, fn) {
+  let incomingTransformController;
+  const incoming = new TransformStream({
+    start(controller) {
+      incomingTransformController = controller;
+    }
+  });
+
+  const canceledErr = new Error('Readable side was canceled.');
+  const pipeDonePromise = pipe(input, incoming.writable).catch(e => {
+    if (e !== canceledErr) {
+      throw e;
+    }
+  });
+
+  let outputController;
+  const outgoing = {
+    readable: new ReadableStream({
+      start(_controller) {
+        outputController = _controller;
+      },
+      async cancel() {
+        incomingTransformController.error(canceledErr);
+        await pipeDonePromise;
+      }
+    }),
+    writable: new WritableStream({
+      write: outputController.enqueue.bind(outputController),
+      close: outputController.close.bind(outputController),
+      abort: outputController.error.bind(outputController)
+    })
+  };
+  Promise.resolve(fn(incoming.readable, outgoing.writable)).catch(e => {
+    if (e !== canceledErr) {
+      throw e;
+    }
+  });
+  return outgoing.readable;
+}
+
+function parse(input, fn) {
+  let returnValue;
+  const transformed = transformPair(input, (readable, writable) => {
+    const reader = getReader(readable);
+    reader.remainder = () => {
+      reader.releaseLock();
+      pipe(readable, writable);
+      return transformed;
+    };
+    returnValue = fn(reader);
+  });
+  return returnValue;
+}
+
 function tee(input) {
   if (util.isStream(input)) {
     const teed = input.tee();
@@ -162,7 +211,7 @@ function slice(input, begin=0, end=Infinity) {
             }
             bytesRead += value.length;
           } else {
-            controller.close();
+            controller.terminate();
           }
         }
       });
@@ -199,41 +248,6 @@ function slice(input, begin=0, end=Infinity) {
   return input.slice(begin, end);
 }
 
-async function parse(input, parser) {
-  let controller;
-  const transformed = transformRaw(input, {
-    start(_controller) {
-      controller = _controller;
-    },
-    cancel: cancel.bind(input)
-  });
-  transformed[stream.cancelReadsSym] = controller.error.bind(controller);
-  toStream(input).pipeTo(target);
-  const reader = getReader(transformed.readable);
-  await parser(reader);
-
-
-  new ReadableStream({
-    start(_controller) {
-      controller = _controller;
-    },
-    pull: () => {
-
-    },
-    cancel: () => {
-      
-    }
-  });
-  new ReadableStream({
-    pull: () => {
-
-    },
-    cancel: () => {
-
-    }
-  });
-}
-
 async function readToEnd(input, join) {
   if (util.isStream(input)) {
     return getReader(input).readToEnd(join);
@@ -241,9 +255,9 @@ async function readToEnd(input, join) {
   return input;
 }
 
-async function cancel(input) {
+async function cancel(input, reason) {
   if (util.isStream(input)) {
-    return input.cancel();
+    return input.cancel(reason);
   }
 }
 
@@ -330,52 +344,7 @@ if (nodeStream) {
 }
 
 
-export default { toStream, concat, getReader, transformRaw, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync, readerAcquiredMap };
-
-
-const readerAcquiredMap = new Map();
-
-const _getReader = ReadableStream.prototype.getReader;
-ReadableStream.prototype.getReader = function() {
-  if (readerAcquiredMap.has(this)) {
-    console.error(readerAcquiredMap.get(this));
-  } else {
-    readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
-  }
-  const _this = this;
-  const reader = _getReader.apply(this, arguments);
-  const _releaseLock = reader.releaseLock;
-  reader.releaseLock = function() {
-    try {
-      readerAcquiredMap.delete(_this);
-    } catch(e) {}
-    return _releaseLock.apply(this, arguments);
-  };
-  return reader;
-};
-
-const _tee = ReadableStream.prototype.tee;
-ReadableStream.prototype.tee = function() {
-  if (readerAcquiredMap.has(this)) {
-    console.error(readerAcquiredMap.get(this));
-  } else {
-    readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
-  }
-  return _tee.apply(this, arguments);
-};
-
-const _cancel = ReadableStream.prototype.cancel;
-ReadableStream.prototype.cancel = function() {
-  try {
-    return _cancel.apply(this, arguments);
-  } finally {
-    if (readerAcquiredMap.has(this)) {
-      console.error(readerAcquiredMap.get(this));
-    } else {
-      readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
-    }
-  }
-};
+export default { toStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync };
 
 
 const doneReadingSet = new WeakSet();
@@ -484,25 +453,6 @@ Reader.prototype.unshift = function(...values) {
   this.externalBuffer.unshift(...values.filter(value => value && value.length));
 };
 
-Reader.prototype.substream = function() {
-  return Object.assign(create({
-    pull: async controller => {
-      const { done, value } = await this.read();
-      if (!done) {
-        controller.enqueue(value);
-      } else {
-        controller.close();
-      }
-    },
-    cancel: () => {
-      this.releaseLock();
-      return cancel(this.stream);
-    }
-  }), { from: this.stream });
-  this.releaseLock();
-  return this.stream;
-};
-
 Reader.prototype.readToEnd = async function(join=util.concat) {
   const result = [];
   while (true) {
diff --git a/src/util.js b/src/util.js
index 2507627c..33f0be0f 100644
--- a/src/util.js
+++ b/src/util.js
@@ -84,13 +84,16 @@ export default {
           if (value.locked) {
             obj[key] = null;
           } else {
-            const reader = stream.getReader(value);
-            const { port1, port2 } = new MessageChannel();
-            port1.onmessage = async function() {
-              port1.postMessage(await reader.read());
-            };
-            obj[key] = port2;
-            collection.push(port2);
+            const transformed = stream.transformPair(value, async readable => {
+              const reader = stream.getReader(readable);
+              const { port1, port2 } = new MessageChannel();
+              port1.onmessage = async function({ data: { action } }) {
+                if (action === 'read') port1.postMessage(await reader.read());
+                else if (action === 'cancel') port1.postMessage(await transformed.cancel());
+              };
+              obj[key] = port2;
+              collection.push(port2);
+            });
           }
           return;
         }
@@ -115,7 +118,13 @@ export default {
                   }
                   resolve();
                 };
-                value.postMessage(undefined);
+                value.postMessage({ action: 'read' });
+              });
+            },
+            cancel() {
+              return new Promise(resolve => {
+                value.onmessage = resolve;
+                value.postMessage({ action: 'cancel' });
               });
             }
           });
diff --git a/test/general/streaming.js b/test/general/streaming.js
index d6265b3c..01674dcd 100644
--- a/test/general/streaming.js
+++ b/test/general/streaming.js
@@ -6,7 +6,7 @@ chai.use(require('chai-as-promised'));
 
 const { expect } = chai;
 
-const { util } = openpgp;
+const { stream, util } = openpgp;
 
 const pub_key =
   ['-----BEGIN PGP PUBLIC KEY BLOCK-----',
@@ -102,6 +102,7 @@ describe('Streaming', function() {
     let i = 0;
     const data = new ReadableStream({
       async pull(controller) {
+        await new Promise(setTimeout);
         if (i++ < 10) {
           let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
           controller.enqueue(randomBytes);
@@ -115,7 +116,7 @@ describe('Streaming', function() {
       data,
       passwords: ['test'],
     });
-    expect(await openpgp.stream.getReader(openpgp.stream.clone(encrypted.data)).readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\nVersion: OpenPGP.js VERSION\r\nComment: https:\/\/openpgpjs.org\r\n\r\n/);
+    expect(await openpgp.stream.getReader(openpgp.stream.clone(encrypted.data)).readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\n/);
     if (i > 10) throw new Error('Data did not arrive early.');
     const msgAsciiArmored = await openpgp.stream.readToEnd(encrypted.data);
     const message = await openpgp.message.readArmored(msgAsciiArmored);
@@ -133,6 +134,7 @@ describe('Streaming', function() {
     let canceled = false;
     const data = new ReadableStream({
       async pull(controller) {
+        await new Promise(setTimeout);
         if (i++ < 10) {
           let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
           controller.enqueue(randomBytes);
@@ -150,9 +152,7 @@ describe('Streaming', function() {
       passwords: ['test'],
     });
     const reader = openpgp.stream.getReader(encrypted.data);
-    console.log('read start');
-    expect(await reader.readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\nVersion: OpenPGP.js VERSION\r\nComment: https:\/\/openpgpjs.org\r\n\r\n/);
-    console.log('read end');
+    expect(await reader.readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\n/);
     if (i > 10) throw new Error('Data did not arrive early.');
     reader.releaseLock();
     await openpgp.stream.cancel(encrypted.data);
@@ -380,7 +380,7 @@ describe('Streaming', function() {
       let i = 0;
       const data = new ReadableStream({
         async pull(controller) {
-          await new Promise(setTimeout);
+          await new Promise(resolve => setTimeout(resolve, 10));
           if (i++ < 10) {
             let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
             controller.enqueue(randomBytes);
@@ -412,6 +412,46 @@ describe('Streaming', function() {
     }
   });
 
+  it('stream.transformPair()', async function() {
+    let plaintext = [];
+    let i = 0;
+    let canceled = false;
+    let controller;
+    const data = new ReadableStream({
+      start(_controller) {
+        controller = _controller;
+      },
+      async pull(controller) {
+        await new Promise(setTimeout);
+        if (i++ < 10) {
+          let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
+          controller.enqueue(randomBytes);
+          plaintext.push(randomBytes);
+        } else {
+          controller.close();
+        }
+      },
+      cancel() {
+        canceled = true;
+      }
+    });
+    data.controller = controller;
+
+    const transformed = stream.transformPair(stream.slice(data, 0, 5000), async (readable, writable) => {
+      const reader = stream.getReader(readable);
+      const writer = stream.getWriter(writable);
+      while (true) {
+        await writer.ready;
+        const { done, value } = await reader.read();
+        if (done) return writer.close();
+        writer.write(value);
+      }
+    });
+    await new Promise(resolve => setTimeout(resolve));
+    await stream.cancel(transformed);
+    expect(canceled).to.be.true;
+  });
+
   it('Input stream should be canceled when canceling decrypted stream (draft04)', async function() {
     let aead_protectValue = openpgp.config.aead_protect;
     let aead_chunk_size_byteValue = openpgp.config.aead_chunk_size_byte;
@@ -423,7 +463,7 @@ describe('Streaming', function() {
       let canceled = false;
       const data = new ReadableStream({
         async pull(controller) {
-          await new Promise(setTimeout);
+          await new Promise(resolve => setTimeout(resolve, 10));
           if (i++ < 10) {
             let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
             controller.enqueue(randomBytes);
@@ -449,11 +489,11 @@ describe('Streaming', function() {
         format: 'binary'
       });
       expect(util.isStream(decrypted.data)).to.be.true;
-      const reader = openpgp.stream.getReader(openpgp.stream.clone(decrypted.data));
+      const reader = openpgp.stream.getReader(decrypted.data);
       expect(await reader.readBytes(1024)).to.deep.equal(plaintext[0]);
       if (i > 10) throw new Error('Data did not arrive early.');
       reader.releaseLock();
-      await openpgp.stream.cancel(decrypted.data);
+      await openpgp.stream.cancel(decrypted.data, new Error('canceled by test'));
       expect(canceled).to.be.true;
     } finally {
       openpgp.config.aead_protect = aead_protectValue;
diff --git a/test/unittests.js b/test/unittests.js
index fc1524fd..b2d10619 100644
--- a/test/unittests.js
+++ b/test/unittests.js
@@ -5,9 +5,9 @@ if (typeof Symbol === 'undefined') {
 if (typeof Promise === 'undefined') {
   require('core-js/fn/promise');
 }
-if (typeof ReadableStream === 'undefined') {
-  Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill'));
-}
+// if (typeof TransformStream === 'undefined') {
+  Object.assign(typeof window !== 'undefined' ? window : global, require('@mattiasbuelens/web-streams-polyfill'));
+// }
 
 (typeof window !== 'undefined' ? window : global).resolves = function(val) {
   return new Promise(function(res) { res(val); });