diff --git a/Gruntfile.js b/Gruntfile.js index 08979b2d..a78ddb6d 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -26,12 +26,12 @@ module.exports = function(grunt) { }, cacheFile: 'browserify-cache.json', // Don't bundle these packages with openpgp.js - external: ['crypto', 'zlib', 'node-localstorage', 'node-fetch', 'asn1.js'], + external: ['crypto', 'zlib', 'node-localstorage', 'node-fetch', 'asn1.js', 'stream'], transform: [ ["babelify", { global: true, - // Only babelify asmcrypto and address-rfc2822 in node_modules - only: /^(?:.*\/node_modules\/asmcrypto\.js\/|.*\/node_modules\/address-rfc2822\/|(?!.*\/node_modules\/)).*$/, + // Only babelify web-stream-tools, asmcrypto and address-rfc2822 in node_modules + only: /^(?:.*\/node_modules\/web-stream-tools\/|.*\/node_modules\/asmcrypto\.js\/|.*\/node_modules\/address-rfc2822\/|(?!.*\/node_modules\/)).*$/, plugins: ["transform-async-to-generator", "syntax-async-functions", "transform-regenerator", @@ -55,12 +55,12 @@ module.exports = function(grunt) { }, cacheFile: 'browserify-cache-debug.json', // Don't bundle these packages with openpgp.js - external: ['crypto', 'zlib', 'node-localstorage', 'node-fetch', 'asn1.js'], + external: ['crypto', 'zlib', 'node-localstorage', 'node-fetch', 'asn1.js', 'stream'], transform: [ ["babelify", { global: true, - // Only babelify asmcrypto and address-rfc2822 in node_modules - only: /^(?:.*\/node_modules\/asmcrypto\.js\/|.*\/node_modules\/address-rfc2822\/|(?!.*\/node_modules\/)).*$/, + // Only babelify web-stream-tools, asmcrypto and address-rfc2822 in node_modules + only: /^(?:.*\/node_modules\/web-stream-tools\/|.*\/node_modules\/asmcrypto\.js\/|.*\/node_modules\/address-rfc2822\/|(?!.*\/node_modules\/)).*$/, plugins: ["transform-async-to-generator", "syntax-async-functions", "transform-regenerator", diff --git a/package.json b/package.json index 35babc47..21d96db5 100644 --- a/package.json +++ b/package.json @@ -84,7 +84,8 @@ "hash.js": "^1.1.3", "node-fetch": "^2.1.2", "node-localstorage": "~1.3.0", - "pako": "^1.0.6" + "pako": "^1.0.6", + "web-stream-tools": "github:openpgpjs/web-stream-tools" }, "repository": { "type": "git", diff --git a/src/crypto/hash/index.js b/src/crypto/hash/index.js index 65a18bfa..66d56f27 100644 --- a/src/crypto/hash/index.js +++ b/src/crypto/hash/index.js @@ -4,8 +4,8 @@ * @see {@link https://github.com/indutny/hash.js|hash.js} * @requires asmcrypto.js * @requires hash.js + * @requires web-stream-tools * @requires crypto/hash/md5 - * @requires stream * @requires util * @module crypto/hash */ @@ -16,8 +16,8 @@ import { Sha512 } from 'asmcrypto.js/dist_es8/hash/sha512/sha512'; import sha224 from 'hash.js/lib/hash/sha/224'; import sha384 from 'hash.js/lib/hash/sha/384'; import { ripemd160 } from 'hash.js/lib/hash/ripemd'; +import stream from 'web-stream-tools'; import md5 from './md5'; -import stream from '../../stream'; import util from '../../util'; const nodeCrypto = util.getNodeCrypto(); diff --git a/src/crypto/public_key/elliptic/key.js b/src/crypto/public_key/elliptic/key.js index 4505bca7..5121ab8e 100644 --- a/src/crypto/public_key/elliptic/key.js +++ b/src/crypto/public_key/elliptic/key.js @@ -18,8 +18,8 @@ /** * @fileoverview Wrapper for a KeyPair of an Elliptic Curve * @requires bn.js + * @requires web-stream-tools * @requires crypto/public_key/elliptic/curves - * @requires stream * @requires util * @requires enums * @requires asn1.js @@ -27,8 +27,8 @@ */ import BN from 'bn.js'; +import stream from 'web-stream-tools'; import { webCurves } from './curves'; -import stream from '../../../stream'; import util from '../../../util'; import enums from '../../../enums'; diff --git a/src/encoding/armor.js b/src/encoding/armor.js index 622996de..fab22e9a 100644 --- a/src/encoding/armor.js +++ b/src/encoding/armor.js @@ -16,18 +16,18 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires encoding/base64 * @requires enums * @requires config - * @requires stream * @requires util * @module encoding/armor */ +import stream from 'web-stream-tools'; import base64 from './base64.js'; import enums from '../enums.js'; import config from '../config'; -import stream from '../stream'; import util from '../util'; /** diff --git a/src/encoding/base64.js b/src/encoding/base64.js index b6071bb6..3c5d4f1a 100644 --- a/src/encoding/base64.js +++ b/src/encoding/base64.js @@ -12,11 +12,11 @@ */ /** - * @requires stream + * @requires web-stream-tools * @module encoding/base64 */ -import stream from '../stream'; +import stream from 'web-stream-tools'; const b64s = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'; // Standard radix-64 const b64u = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'; // URL-safe radix-64 diff --git a/src/index.js b/src/index.js index f2245cdd..200d5dbe 100644 --- a/src/index.js +++ b/src/index.js @@ -101,10 +101,10 @@ export { default as KDFParams } from './type/kdf_params'; export { default as OID } from './type/oid'; /** - * @see module:stream + * @see streams * @name module:openpgp.stream */ -export { default as stream } from './stream'; +export { default as stream } from 'web-stream-tools'; /** * @see module:encoding/armor diff --git a/src/keyring/localstore.js b/src/keyring/localstore.js index eeb0ff4c..902b2fed 100644 --- a/src/keyring/localstore.js +++ b/src/keyring/localstore.js @@ -17,16 +17,16 @@ /** * @fileoverview Provides the LocalStore class + * @requires web-stream-tools * @requires config * @requires key - * @requires stream * @requires util * @module keyring/localstore */ +import stream from 'web-stream-tools'; import config from '../config'; import { readArmored } from '../key'; -import stream from '../stream'; import util from '../util'; /** diff --git a/src/message.js b/src/message.js index e4d9f294..b2ae676f 100644 --- a/src/message.js +++ b/src/message.js @@ -16,12 +16,12 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires encoding/armor * @requires type/keyid * @requires config * @requires crypto * @requires enums - * @requires stream * @requires util * @requires packet * @requires signature @@ -29,12 +29,12 @@ * @module message */ +import stream from 'web-stream-tools'; import armor from './encoding/armor'; import type_keyid from './type/keyid'; import config from './config'; import crypto from './crypto'; import enums from './enums'; -import stream from './stream'; import util from './util'; import packet from './packet'; import { Signature } from './signature'; diff --git a/src/openpgp.js b/src/openpgp.js index 7bcbfba5..09b36f42 100644 --- a/src/openpgp.js +++ b/src/openpgp.js @@ -19,12 +19,12 @@ * @fileoverview The openpgp base module should provide all of the functionality * to consume the openpgp.js library. All additional classes are documented * for extending and developing on top of the base library. + * @requires web-stream-tools * @requires message * @requires cleartext * @requires key * @requires config * @requires enums - * @requires stream * @requires util * @requires polyfills * @requires worker/async_proxy @@ -39,13 +39,13 @@ * {@link module:openpgp} */ +import stream from 'web-stream-tools'; import * as messageLib from './message'; import { CleartextMessage } from './cleartext'; import { generate, reformat } from './key'; import config from './config/config'; import enums from './enums'; import './polyfills'; -import stream from './stream'; import util from './util'; import AsyncProxy from './worker/async_proxy'; diff --git a/src/packet/clone.js b/src/packet/clone.js index 8376441b..70e5a848 100644 --- a/src/packet/clone.js +++ b/src/packet/clone.js @@ -22,13 +22,13 @@ * @module packet/clone */ +import stream from 'web-stream-tools'; import { Key } from '../key'; import { Message } from '../message'; import { CleartextMessage } from '../cleartext'; import { Signature } from '../signature'; import List from './packetlist'; import type_keyid from '../type/keyid'; -import stream from '../stream'; import util from '../util'; diff --git a/src/packet/compressed.js b/src/packet/compressed.js index f4c8948a..a9e4cd00 100644 --- a/src/packet/compressed.js +++ b/src/packet/compressed.js @@ -16,18 +16,18 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires pako * @requires config * @requires enums - * @requires stream * @requires util * @requires compression/bzip2 */ import pako from 'pako'; +import stream from 'web-stream-tools'; import config from '../config'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; import Bzip2 from '../compression/bzip2.build.js'; diff --git a/src/packet/literal.js b/src/packet/literal.js index f1e2b26c..bd2c3c88 100644 --- a/src/packet/literal.js +++ b/src/packet/literal.js @@ -16,13 +16,13 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires enums - * @requires stream * @requires util */ +import stream from 'web-stream-tools'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; /** diff --git a/src/packet/packet.js b/src/packet/packet.js index 181941f2..fa63e2e2 100644 --- a/src/packet/packet.js +++ b/src/packet/packet.js @@ -19,14 +19,14 @@ /** * @fileoverview Functions for reading and writing packets + * @requires web-stream-tools * @requires enums - * @requires stream * @requires util * @module packet/packet */ +import stream from 'web-stream-tools'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; export default { diff --git a/src/packet/packetlist.js b/src/packet/packetlist.js index bb311b61..12b09754 100644 --- a/src/packet/packetlist.js +++ b/src/packet/packetlist.js @@ -1,18 +1,18 @@ /* eslint-disable callback-return */ /** + * @requires web-stream-tools * @requires packet/all_packets * @requires packet/packet * @requires config * @requires enums - * @requires stream * @requires util */ +import stream from 'web-stream-tools'; import * as packets from './all_packets'; import packetParser from './packet'; import config from '../config'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; /** diff --git a/src/packet/signature.js b/src/packet/signature.js index b48eeba3..fbc75df1 100644 --- a/src/packet/signature.js +++ b/src/packet/signature.js @@ -16,21 +16,21 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires packet/packet * @requires type/keyid * @requires type/mpi * @requires crypto * @requires enums - * @requires stream * @requires util */ +import stream from 'web-stream-tools'; import packet from './packet'; import type_keyid from '../type/keyid.js'; import type_mpi from '../type/mpi.js'; import crypto from '../crypto'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; /** diff --git a/src/packet/sym_encrypted_aead_protected.js b/src/packet/sym_encrypted_aead_protected.js index 9d599796..a2e4a366 100644 --- a/src/packet/sym_encrypted_aead_protected.js +++ b/src/packet/sym_encrypted_aead_protected.js @@ -16,17 +16,17 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires config * @requires crypto * @requires enums - * @requires stream * @requires util */ +import stream from 'web-stream-tools'; import config from '../config'; import crypto from '../crypto'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; const VERSION = 1; // A one-octet version number of the data packet. diff --git a/src/packet/sym_encrypted_integrity_protected.js b/src/packet/sym_encrypted_integrity_protected.js index c305a2e8..382783bb 100644 --- a/src/packet/sym_encrypted_integrity_protected.js +++ b/src/packet/sym_encrypted_integrity_protected.js @@ -17,19 +17,19 @@ /** * @requires asmcrypto.js + * @requires web-stream-tools * @requires config * @requires crypto * @requires enums - * @requires stream * @requires util */ import { AES_CFB } from 'asmcrypto.js/dist_es8/aes/cfb'; +import stream from 'web-stream-tools'; import config from '../config'; import crypto from '../crypto'; import enums from '../enums'; -import stream from '../stream'; import util from '../util'; const nodeCrypto = util.getNodeCrypto(); diff --git a/src/packet/symmetrically_encrypted.js b/src/packet/symmetrically_encrypted.js index cbcbe263..953961b6 100644 --- a/src/packet/symmetrically_encrypted.js +++ b/src/packet/symmetrically_encrypted.js @@ -16,16 +16,16 @@ // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA /** + * @requires web-stream-tools * @requires config * @requires crypto * @requires enums - * @requires stream */ +import stream from 'web-stream-tools'; import config from '../config'; import crypto from '../crypto'; import enums from '../enums'; -import stream from '../stream'; /** * Implementation of the Symmetrically Encrypted Data Packet (Tag 9) diff --git a/src/stream.js b/src/stream.js deleted file mode 100644 index 2fef58ef..00000000 --- a/src/stream.js +++ /dev/null @@ -1,699 +0,0 @@ -const NodeReadableStream = typeof window === 'undefined' && require('stream').Readable; - -const isIE11 = typeof navigator !== 'undefined' && !!navigator.userAgent.match(/Trident\/7\.0.*rv:([0-9.]+).*\).*Gecko$/); - -/** - * Check whether data is a Stream - * @param {Any} input data to check - * @returns {Boolean} - */ -function isStream(input) { - return ReadableStream.prototype.isPrototypeOf(input); -} - -/** - * Check whether data is a Uint8Array - * @param {Any} input data to check - * @returns {Boolean} - */ -function isUint8Array(input) { - return Uint8Array.prototype.isPrototypeOf(input); -} - -/** - * Convert data to Stream - * @param {ReadableStream|Uint8array|String} input data to convert - * @returns {ReadableStream} Converted data - */ -function toStream(input) { - if (isStream(input)) { - return input; - } - return new ReadableStream({ - start(controller) { - controller.enqueue(input); - controller.close(); - } - }); -} - -/** - * Concat a list of Uint8Arrays, Strings or Streams - * The caller should not mix Uint8Arrays with Strings, but may mix Streams with non-Streams. - * @param {Array} Array of Uint8Arrays/Strings/Streams to concatenate - * @returns {Uint8array|String|ReadableStream} Concatenated array - */ -function concat(list) { - if (list.some(isStream)) { - return concatStream(list); - } - if (typeof list[0] === 'string') { - return list.join(''); - } - return concatUint8Array(list); -} - -/** - * Concat Uint8Arrays - * @param {Array} Array of Uint8Arrays to concatenate - * @returns {Uint8array} Concatenated array - */ -function concatUint8Array(arrays) { - if (arrays.length === 1) return arrays[0]; - - let totalLength = 0; - for (let i = 0; i < arrays.length; i++) { - if (!isUint8Array(arrays[i])) { - throw new Error('concatUint8Array: Data must be in the form of a Uint8Array'); - } - - totalLength += arrays[i].length; - } - - const result = new Uint8Array(totalLength); - let pos = 0; - arrays.forEach(function (element) { - result.set(element, pos); - pos += element.length; - }); - - return result; -} - -/** - * Concat a list of Streams - * @param {Array} list Array of Uint8Arrays/Strings/Streams to concatenate - * @returns {ReadableStream} Concatenated list - */ -function concatStream(list) { - list = list.map(toStream); - const transform = transformWithCancel(async function(reason) { - await Promise.all(transforms.map(stream => cancel(stream, reason))); - }); - let prev = Promise.resolve(); - const transforms = list.map((stream, i) => transformPair(stream, (readable, writable) => { - prev = prev.then(() => pipe(readable, transform.writable, { - preventClose: i !== list.length - 1 - })); - return prev; - })); - return transform.readable; -} - -/** - * Get a Reader - * @param {ReadableStream|Uint8array|String} input - * @returns {Reader} - */ -function getReader(input) { - return new Reader(input); -} - -/** - * Get a Writer - * @param {WritableStream} input - * @returns {WritableStreamDefaultWriter} - */ -function getWriter(input) { - return input.getWriter(); -} - -/** - * Pipe a readable stream to a writable stream. Don't throw on input stream errors, but forward them to the output stream. - * @param {ReadableStream|Uint8array|String} input - * @param {WritableStream} target - * @param {Object} (optional) options - * @returns {Promise} Promise indicating when piping has finished (input stream closed or errored) - */ -async function pipe(input, target, options) { - input = toStream(input); - try { - if (input[externalBuffer]) { - const writer = target.getWriter(); - for (let i = 0; i < input[externalBuffer].length; i++) { - await writer.ready; - await writer.write(input[externalBuffer][i]); - } - writer.releaseLock(); - } - return await input.pipeTo(target, options); - } catch(e) {} -} - -/** - * Pipe a readable stream through a transform stream. - * @param {ReadableStream|Uint8array|String} input - * @param {Object} (optional) options - * @returns {ReadableStream} transformed stream - */ -function transformRaw(input, options) { - const transformStream = new TransformStream(options); - pipe(input, transformStream.writable); - return transformStream.readable; -} - -/** - * Create a cancelable TransformStream. - * @param {Function} cancel - * @returns {TransformStream} - */ -function transformWithCancel(cancel) { - let pulled = false; - let backpressureChangePromiseResolve; - let outputController; - return { - readable: new ReadableStream({ - start(controller) { - outputController = controller; - }, - pull() { - if (backpressureChangePromiseResolve) { - backpressureChangePromiseResolve(); - } else { - pulled = true; - } - }, - cancel - }, {highWaterMark: 0}), - writable: new WritableStream({ - write: async function(chunk) { - outputController.enqueue(chunk); - if (!pulled) { - await new Promise(resolve => { - backpressureChangePromiseResolve = resolve; - }); - backpressureChangePromiseResolve = null; - } else { - pulled = false; - } - }, - close: outputController.close.bind(outputController), - abort: outputController.error.bind(outputController) - }) - }; -} - -/** - * Transform a stream using helper functions which are called on each chunk, and on stream close, respectively. - * @param {ReadableStream|Uint8array|String} input - * @param {Function} process - * @param {Function} finish - * @returns {ReadableStream|Uint8array|String} - */ -function transform(input, process = () => undefined, finish = () => undefined) { - if (isStream(input)) { - return transformRaw(input, { - async transform(value, controller) { - try { - const result = await process(value); - if (result !== undefined) controller.enqueue(result); - } catch(e) { - controller.error(e); - } - }, - async flush(controller) { - try { - const result = await finish(); - if (result !== undefined) controller.enqueue(result); - } catch(e) { - controller.error(e); - } - } - }); - } - const result1 = process(input); - const result2 = finish(); - if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]); - return result1 !== undefined ? result1 : result2; -} - -/** - * Transform a stream using a helper function which is passed a readable and a writable stream. - * This function also maintains the possibility to cancel the input stream, - * and does so on cancelation of the output stream, despite cancelation - * normally being impossible when the input stream is being read from. - * @param {ReadableStream|Uint8array|String} input - * @param {Function} fn - * @returns {ReadableStream} - */ -function transformPair(input, fn) { - let incomingTransformController; - const incoming = new TransformStream({ - start(controller) { - incomingTransformController = controller; - } - }); - - const pipeDonePromise = pipe(input, incoming.writable); - - const outgoing = transformWithCancel(async function() { - incomingTransformController.error(new Error('Readable side was canceled.')); - await pipeDonePromise; - await new Promise(setTimeout); - }); - fn(incoming.readable, outgoing.writable); - return outgoing.readable; -} - -/** - * Parse a stream using a helper function which is passed a Reader. - * The reader additionally has a remainder() method which returns a - * stream pointing to the remainder of input, and is linked to input - * for cancelation. - * @param {ReadableStream|Uint8array|String} input - * @param {Function} fn - * @returns {Any} the return value of fn() - */ -function parse(input, fn) { - let returnValue; - const transformed = transformPair(input, (readable, writable) => { - const reader = getReader(readable); - reader.remainder = () => { - reader.releaseLock(); - pipe(readable, writable); - return transformed; - }; - returnValue = fn(reader); - }); - return returnValue; -} - -/** - * Tee a Stream for reading it twice. The input stream can no longer be read after tee()ing. - * Reading either of the two returned streams will pull from the input stream. - * The input stream will only be canceled if both of the returned streams are canceled. - * @param {ReadableStream|Uint8array|String} input - * @returns {Array} array containing two copies of input - */ -function tee(input) { - if (isStream(input)) { - const teed = input.tee(); - teed[0][externalBuffer] = teed[1][externalBuffer] = input[externalBuffer]; - return teed; - } - return [slice(input), slice(input)]; -} - -/** - * Clone a Stream for reading it twice. The input stream can still be read after clone()ing. - * Reading from the clone will pull from the input stream. - * The input stream will only be canceled if both the clone and the input stream are canceled. - * @param {ReadableStream|Uint8array|String} input - * @returns {ReadableStream|Uint8array|String} cloned input - */ -function clone(input) { - if (isStream(input)) { - const teed = tee(input); - overwrite(input, teed[0]); - return teed[1]; - } - return slice(input); -} - -/** - * Clone a Stream for reading it twice. Data will arrive at the same rate as the input stream is being read. - * Reading from the clone will NOT pull from the input stream. Data only arrives when reading the input stream. - * The input stream will NOT be canceled if the clone is canceled, only if the input stream are canceled. - * If the input stream is canceled, the clone will be errored. - * @param {ReadableStream|Uint8array|String} input - * @returns {ReadableStream|Uint8array|String} cloned input - */ -function passiveClone(input) { - if (isStream(input)) { - return new ReadableStream({ - start(controller) { - const transformed = transformPair(input, async (readable, writable) => { - const reader = getReader(readable); - const writer = getWriter(writable); - try { - while (true) { - await writer.ready; - const { done, value } = await reader.read(); - if (done) { - try { controller.close(); } catch(e) {} - await writer.close(); - return; - } - try { controller.enqueue(value); } catch(e) {} - await writer.write(value); - } - } catch(e) { - controller.error(e); - await writer.abort(e); - } - }); - overwrite(input, transformed); - } - }); - } - return slice(input); -} - -/** - * Modify a stream object to point to a different stream object. - * This is used internally by clone() and passiveClone() to provide an abstraction over tee(). - * @param {ReadableStream} input - * @param {ReadableStream} clone - */ -function overwrite(input, clone) { - // Overwrite input.getReader, input.locked, etc to point to clone - Object.entries(Object.getOwnPropertyDescriptors(ReadableStream.prototype)).forEach(([name, descriptor]) => { - if (name === 'constructor') { - return; - } - if (descriptor.value) { - descriptor.value = descriptor.value.bind(clone); - } else { - descriptor.get = descriptor.get.bind(clone); - } - Object.defineProperty(input, name, descriptor); - }); -} - -/** - * Return a stream pointing to a part of the input stream. - * @param {ReadableStream|Uint8array|String} input - * @returns {ReadableStream|Uint8array|String} clone - */ -function slice(input, begin=0, end=Infinity) { - if (isStream(input)) { - if (begin >= 0 && end >= 0) { - let bytesRead = 0; - return transformRaw(input, { - transform(value, controller) { - if (bytesRead < end) { - if (bytesRead + value.length >= begin) { - controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead)); - } - bytesRead += value.length; - } else { - controller.terminate(); - } - } - }); - } - if (begin < 0 && (end < 0 || end === Infinity)) { - let lastBytes = []; - return transform(input, value => { - if (value.length >= -begin) lastBytes = [value]; - else lastBytes.push(value); - }, () => slice(concat(lastBytes), begin, end)); - } - if (begin === 0 && end < 0) { - let lastBytes; - return transform(input, value => { - const returnValue = lastBytes ? concat([lastBytes, value]) : value; - if (returnValue.length >= -end) { - lastBytes = slice(returnValue, end); - return slice(returnValue, begin, end); - } else { - lastBytes = returnValue; - } - }); - } - console.warn(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`); - return fromAsync(async () => slice(await readToEnd(input), begin, end)); - } - if (input[externalBuffer]) { - input = concat(input[externalBuffer].concat([input])); - } - if (isUint8Array(input) && !isIE11) { // IE11 subarray is buggy - return input.subarray(begin, end); - } - return input.slice(begin, end); -} - -/** - * Read a stream to the end and return its contents, concatenated by the concat function (defaults to concat). - * @param {ReadableStream|Uint8array|String} input - * @param {Function} concat - * @returns {Uint8array|String|Any} the return value of concat() - */ -async function readToEnd(input, concat) { - if (isStream(input)) { - return getReader(input).readToEnd(concat); - } - return input; -} - -/** - * Cancel a stream. - * @param {ReadableStream|Uint8array|String} input - * @param {Any} reason - * @returns {Promise} indicates when the stream has been canceled - */ -async function cancel(input, reason) { - if (isStream(input)) { - return input.cancel(reason); - } -} - -/** - * Convert an async function to a Stream. When the function returns, its return value is enqueued to the stream. - * @param {Function} fn - * @returns {ReadableStream} - */ -function fromAsync(fn) { - return new ReadableStream({ - pull: async controller => { - try { - controller.enqueue(await fn()); - controller.close(); - } catch(e) { - controller.error(e); - } - } - }); -} - - -/** - * Web / node stream conversion functions - * From https://github.com/gwicke/node-web-streams - */ - -let nodeToWeb; -let webToNode; - -if (NodeReadableStream) { - - /** - * Convert a Node Readable Stream to a Web ReadableStream - * @param {Readable} nodeStream - * @returns {ReadableStream} - */ - nodeToWeb = function(nodeStream) { - return new ReadableStream({ - start(controller) { - nodeStream.pause(); - nodeStream.on('data', chunk => { - controller.enqueue(chunk); - nodeStream.pause(); - }); - nodeStream.on('end', () => controller.close()); - nodeStream.on('error', e => controller.error(e)); - }, - pull() { - nodeStream.resume(); - }, - cancel() { - nodeStream.pause(); - } - }); - }; - - - class NodeReadable extends NodeReadableStream { - constructor(webStream, options) { - super(options); - this._webStream = webStream; - this._reader = getReader(webStream); - this._reading = false; - } - - _read(size) { - if (this._reading) { - return; - } - this._reading = true; - const doRead = () => { - this._reader.read() - .then(res => { - if (res.done) { - this.push(null); - return; - } - if (this.push(res.value)) { - return doRead(size); - } else { - this._reading = false; - } - }); - }; - doRead(); - } - } - - /** - * Convert a Web ReadableStream to a Node Readable Stream - * @param {ReadableStream} webStream - * @returns {Readable} - */ - webToNode = function(webStream) { - return new NodeReadable(webStream); - }; - -} - - -export default { isStream, isUint8Array, toStream, concatUint8Array, concatStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, passiveClone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync }; - - -const doneReadingSet = new WeakSet(); -const externalBuffer = Symbol('externalBuffer'); -function Reader(input) { - this.stream = input; - if (input[externalBuffer]) { - this[externalBuffer] = input[externalBuffer].slice(); - } - if (isStream(input)) { - const reader = input.getReader(); - this._read = reader.read.bind(reader); - this._releaseLock = reader.releaseLock.bind(reader); - return; - } - let doneReading = false; - this._read = async () => { - if (doneReading || doneReadingSet.has(input)) { - return { value: undefined, done: true }; - } - doneReading = true; - return { value: input, done: false }; - }; - this._releaseLock = () => { - if (doneReading) { - try { - doneReadingSet.add(input); - } catch(e) {} - } - }; -} - -/** - * Read a chunk of data. - * @returns {Object} Either { done: false, value: Uint8Array | String } or { done: true, value: undefined } - */ -Reader.prototype.read = async function() { - if (this[externalBuffer] && this[externalBuffer].length) { - const value = this[externalBuffer].shift(); - return { done: false, value }; - } - return this._read(); -}; - -/** - * Allow others to read the stream. - */ -Reader.prototype.releaseLock = function() { - if (this[externalBuffer]) { - this.stream[externalBuffer] = this[externalBuffer]; - } - this._releaseLock(); -}; - -/** - * Read up to and including the first \n character. - * @returns {String|Undefined} - */ -Reader.prototype.readLine = async function() { - let buffer = []; - let returnVal; - while (!returnVal) { - const { done, value } = await this.read(); - if (done) { - if (buffer.length) return concat(buffer); - return; - } - const lineEndIndex = value.indexOf('\n') + 1; - if (lineEndIndex) { - returnVal = concat(buffer.concat(value.substr(0, lineEndIndex))); - buffer = []; - } - if (lineEndIndex !== value.length) { - buffer.push(value.substr(lineEndIndex)); - } - } - this.unshift(...buffer); - return returnVal; -}; - -/** - * Read a single byte/character. - * @returns {Number|String|Undefined} - */ -Reader.prototype.readByte = async function() { - const { done, value } = await this.read(); - if (done) return; - const byte = value[0]; - this.unshift(slice(value, 1)); - return byte; -}; - -/** - * Read a specific amount of bytes/characters, unless the stream ends before that amount. - * @returns {Uint8Array|String|Undefined} - */ -Reader.prototype.readBytes = async function(length) { - const buffer = []; - let bufferLength = 0; - while (true) { - const { done, value } = await this.read(); - if (done) { - if (buffer.length) return concat(buffer); - return; - } - buffer.push(value); - bufferLength += value.length; - if (bufferLength >= length) { - const bufferConcat = concat(buffer); - this.unshift(slice(bufferConcat, length)); - return slice(bufferConcat, 0, length); - } - } -}; - -/** - * Peek (look ahead) a specific amount of bytes/characters, unless the stream ends before that amount. - * @returns {Uint8Array|String|Undefined} - */ -Reader.prototype.peekBytes = async function(length) { - const bytes = await this.readBytes(length); - this.unshift(bytes); - return bytes; -}; - -/** - * Push data to the front of the stream. - * @param {Uint8Array|String|Undefined} ...values - */ -Reader.prototype.unshift = function(...values) { - if (!this[externalBuffer]) { - this[externalBuffer] = []; - } - this[externalBuffer].unshift(...values.filter(value => value && value.length)); -}; - -/** - * Read the stream to the end and return its contents, concatenated by the join function (defaults to concat). - * @param {Function} join - * @returns {Uint8array|String|Any} the return value of join() - */ -Reader.prototype.readToEnd = async function(join=concat) { - const result = []; - while (true) { - const { done, value } = await this.read(); - if (done) break; - result.push(value); - } - return join(result); -}; diff --git a/src/util.js b/src/util.js index fec6db77..6b85474c 100644 --- a/src/util.js +++ b/src/util.js @@ -20,16 +20,17 @@ /** * This object contains utility functions * @requires address-rfc2822 + * @requires web-stream-tools * @requires config * @requires encoding/base64 * @module util */ import rfc2822 from 'address-rfc2822'; +import stream from 'web-stream-tools'; import config from './config'; import util from './util'; // re-import module to access util functions import b64 from './encoding/base64'; -import stream from './stream'; export default { isString: function(data) {