diff --git a/src/stream.js b/src/stream.js index 5f099168..2fef58ef 100644 --- a/src/stream.js +++ b/src/stream.js @@ -1,6 +1,24 @@ -import util from './util'; +const NodeReadableStream = typeof window === 'undefined' && require('stream').Readable; -const NodeReadableStream = util.getNodeStream(); +const isIE11 = typeof navigator !== 'undefined' && !!navigator.userAgent.match(/Trident\/7\.0.*rv:([0-9.]+).*\).*Gecko$/); + +/** + * Check whether data is a Stream + * @param {Any} input data to check + * @returns {Boolean} + */ +function isStream(input) { + return ReadableStream.prototype.isPrototypeOf(input); +} + +/** + * Check whether data is a Uint8Array + * @param {Any} input data to check + * @returns {Boolean} + */ +function isUint8Array(input) { + return Uint8Array.prototype.isPrototypeOf(input); +} /** * Convert data to Stream @@ -8,7 +26,7 @@ const NodeReadableStream = util.getNodeStream(); * @returns {ReadableStream} Converted data */ function toStream(input) { - if (util.isStream(input)) { + if (isStream(input)) { return input; } return new ReadableStream({ @@ -20,17 +38,60 @@ function toStream(input) { } /** - * Concat a list of Streams - * @param {Array} list Array of Uint8Arrays/Strings/Streams to concatenate - * @returns {ReadableStream} Concatenated array + * Concat a list of Uint8Arrays, Strings or Streams + * The caller should not mix Uint8Arrays with Strings, but may mix Streams with non-Streams. + * @param {Array} Array of Uint8Arrays/Strings/Streams to concatenate + * @returns {Uint8array|String|ReadableStream} Concatenated array */ function concat(list) { + if (list.some(isStream)) { + return concatStream(list); + } + if (typeof list[0] === 'string') { + return list.join(''); + } + return concatUint8Array(list); +} + +/** + * Concat Uint8Arrays + * @param {Array} Array of Uint8Arrays to concatenate + * @returns {Uint8array} Concatenated array + */ +function concatUint8Array(arrays) { + if (arrays.length === 1) return arrays[0]; + + let totalLength = 0; + for (let i = 0; i < arrays.length; i++) { + if (!isUint8Array(arrays[i])) { + throw new Error('concatUint8Array: Data must be in the form of a Uint8Array'); + } + + totalLength += arrays[i].length; + } + + const result = new Uint8Array(totalLength); + let pos = 0; + arrays.forEach(function (element) { + result.set(element, pos); + pos += element.length; + }); + + return result; +} + +/** + * Concat a list of Streams + * @param {Array} list Array of Uint8Arrays/Strings/Streams to concatenate + * @returns {ReadableStream} Concatenated list + */ +function concatStream(list) { list = list.map(toStream); const transform = transformWithCancel(async function(reason) { - await Promise.all(transforms.map(array => cancel(array, reason))); + await Promise.all(transforms.map(stream => cancel(stream, reason))); }); let prev = Promise.resolve(); - const transforms = list.map((array, i) => transformPair(array, (readable, writable) => { + const transforms = list.map((stream, i) => transformPair(stream, (readable, writable) => { prev = prev.then(() => pipe(readable, transform.writable, { preventClose: i !== list.length - 1 })); @@ -65,9 +126,7 @@ function getWriter(input) { * @returns {Promise} Promise indicating when piping has finished (input stream closed or errored) */ async function pipe(input, target, options) { - if (!util.isStream(input)) { - input = toStream(input); - } + input = toStream(input); try { if (input[externalBuffer]) { const writer = target.getWriter(); @@ -77,10 +136,8 @@ async function pipe(input, target, options) { } writer.releaseLock(); } - return await input.pipeTo(target, options).catch(function() {}); - } catch(e) { - util.print_debug_error(e); - } + return await input.pipeTo(target, options); + } catch(e) {} } /** @@ -144,7 +201,7 @@ function transformWithCancel(cancel) { * @returns {ReadableStream|Uint8array|String} */ function transform(input, process = () => undefined, finish = () => undefined) { - if (util.isStream(input)) { + if (isStream(input)) { return transformRaw(input, { async transform(value, controller) { try { @@ -166,7 +223,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { } const result1 = process(input); const result2 = finish(); - if (result1 !== undefined && result2 !== undefined) return util.concat([result1, result2]); + if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]); return result1 !== undefined ? result1 : result2; } @@ -229,7 +286,7 @@ function parse(input, fn) { * @returns {Array} array containing two copies of input */ function tee(input) { - if (util.isStream(input)) { + if (isStream(input)) { const teed = input.tee(); teed[0][externalBuffer] = teed[1][externalBuffer] = input[externalBuffer]; return teed; @@ -245,7 +302,7 @@ function tee(input) { * @returns {ReadableStream|Uint8array|String} cloned input */ function clone(input) { - if (util.isStream(input)) { + if (isStream(input)) { const teed = tee(input); overwrite(input, teed[0]); return teed[1]; @@ -262,7 +319,7 @@ function clone(input) { * @returns {ReadableStream|Uint8array|String} cloned input */ function passiveClone(input) { - if (util.isStream(input)) { + if (isStream(input)) { return new ReadableStream({ start(controller) { const transformed = transformPair(input, async (readable, writable) => { @@ -319,7 +376,7 @@ function overwrite(input, clone) { * @returns {ReadableStream|Uint8array|String} clone */ function slice(input, begin=0, end=Infinity) { - if (util.isStream(input)) { + if (isStream(input)) { if (begin >= 0 && end >= 0) { let bytesRead = 0; return transformRaw(input, { @@ -340,12 +397,12 @@ function slice(input, begin=0, end=Infinity) { return transform(input, value => { if (value.length >= -begin) lastBytes = [value]; else lastBytes.push(value); - }, () => slice(util.concat(lastBytes), begin, end)); + }, () => slice(concat(lastBytes), begin, end)); } if (begin === 0 && end < 0) { let lastBytes; return transform(input, value => { - const returnValue = lastBytes ? util.concat([lastBytes, value]) : value; + const returnValue = lastBytes ? concat([lastBytes, value]) : value; if (returnValue.length >= -end) { lastBytes = slice(returnValue, end); return slice(returnValue, begin, end); @@ -354,27 +411,26 @@ function slice(input, begin=0, end=Infinity) { } }); } - // TODO: Don't read entire stream into memory here. - util.print_debug_error(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`); + console.warn(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`); return fromAsync(async () => slice(await readToEnd(input), begin, end)); } if (input[externalBuffer]) { - input = util.concat(input[externalBuffer].concat([input])); + input = concat(input[externalBuffer].concat([input])); } - if (util.isUint8Array(input) && !util.isIE11) { // IE11 subarray is buggy + if (isUint8Array(input) && !isIE11) { // IE11 subarray is buggy return input.subarray(begin, end); } return input.slice(begin, end); } /** - * Read a stream to the end and return its contents, concatenated by the concat function (defaults to util.concat). + * Read a stream to the end and return its contents, concatenated by the concat function (defaults to concat). * @param {ReadableStream|Uint8array|String} input * @param {Function} concat * @returns {Uint8array|String|Any} the return value of concat() */ async function readToEnd(input, concat) { - if (util.isStream(input)) { + if (isStream(input)) { return getReader(input).readToEnd(concat); } return input; @@ -387,7 +443,7 @@ async function readToEnd(input, concat) { * @returns {Promise} indicates when the stream has been canceled */ async function cancel(input, reason) { - if (util.isStream(input)) { + if (isStream(input)) { return input.cancel(reason); } } @@ -490,7 +546,7 @@ if (NodeReadableStream) { } -export default { toStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, passiveClone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync }; +export default { isStream, isUint8Array, toStream, concatUint8Array, concatStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, passiveClone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync }; const doneReadingSet = new WeakSet(); @@ -500,7 +556,7 @@ function Reader(input) { if (input[externalBuffer]) { this[externalBuffer] = input[externalBuffer].slice(); } - if (util.isStream(input)) { + if (isStream(input)) { const reader = input.getReader(); this._read = reader.read.bind(reader); this._releaseLock = reader.releaseLock.bind(reader); @@ -555,12 +611,12 @@ Reader.prototype.readLine = async function() { while (!returnVal) { const { done, value } = await this.read(); if (done) { - if (buffer.length) return util.concat(buffer); + if (buffer.length) return concat(buffer); return; } const lineEndIndex = value.indexOf('\n') + 1; if (lineEndIndex) { - returnVal = util.concat(buffer.concat(value.substr(0, lineEndIndex))); + returnVal = concat(buffer.concat(value.substr(0, lineEndIndex))); buffer = []; } if (lineEndIndex !== value.length) { @@ -593,13 +649,13 @@ Reader.prototype.readBytes = async function(length) { while (true) { const { done, value } = await this.read(); if (done) { - if (buffer.length) return util.concat(buffer); + if (buffer.length) return concat(buffer); return; } buffer.push(value); bufferLength += value.length; if (bufferLength >= length) { - const bufferConcat = util.concat(buffer); + const bufferConcat = concat(buffer); this.unshift(slice(bufferConcat, length)); return slice(bufferConcat, 0, length); } @@ -628,16 +684,16 @@ Reader.prototype.unshift = function(...values) { }; /** - * Read the stream to the end and return its contents, concatenated by the concat function (defaults to util.concat). - * @param {Function} concat - * @returns {Uint8array|String|Any} the return value of concat() + * Read the stream to the end and return its contents, concatenated by the join function (defaults to concat). + * @param {Function} join + * @returns {Uint8array|String|Any} the return value of join() */ -Reader.prototype.readToEnd = async function(concat=util.concat) { +Reader.prototype.readToEnd = async function(join=concat) { const result = []; while (true) { const { done, value } = await this.read(); if (done) break; result.push(value); } - return concat(result); + return join(result); }; diff --git a/src/util.js b/src/util.js index 0954c4bc..fec6db77 100644 --- a/src/util.js +++ b/src/util.js @@ -31,11 +31,7 @@ import util from './util'; // re-import module to access util functions import b64 from './encoding/base64'; import stream from './stream'; -const isIE11 = typeof navigator !== 'undefined' && !!navigator.userAgent.match(/Trident\/7\.0.*rv:([0-9.]+).*\).*Gecko$/); - export default { - isIE11, - isString: function(data) { return typeof data === 'string' || String.prototype.isPrototypeOf(data); }, @@ -44,13 +40,9 @@ export default { return Array.prototype.isPrototypeOf(data); }, - isUint8Array: function(data) { - return Uint8Array.prototype.isPrototypeOf(data); - }, + isUint8Array: stream.isUint8Array, - isStream: function(data) { - return ReadableStream.prototype.isPrototypeOf(data); - }, + isStream: stream.isStream, /** * Get transferable objects to pass buffers with zero copy (similar to "pass by reference" in C++) @@ -346,57 +338,14 @@ export default { * @param {Array} Array of Uint8Arrays/Strings/Streams to concatenate * @returns {Uint8array|String|ReadableStream} Concatenated array */ - concat: function (list) { - if (list.some(util.isStream)) { - return stream.concat(list); - } - if (util.isString(list[0])) { - return list.join(''); - } - return util.concatUint8Array(list); - }, + concat: stream.concat, /** * Concat Uint8Arrays * @param {Array} Array of Uint8Arrays to concatenate * @returns {Uint8array} Concatenated array */ - concatUint8Array: function (arrays) { - if (arrays.length === 1) return arrays[0]; - - let totalLength = 0; - for (let i = 0; i < arrays.length; i++) { - if (!util.isUint8Array(arrays[i])) { - throw new Error('concatUint8Array: Data must be in the form of a Uint8Array'); - } - - totalLength += arrays[i].length; - } - - const result = new Uint8Array(totalLength); - let pos = 0; - arrays.forEach(function (element) { - result.set(element, pos); - pos += element.length; - }); - - return result; - }, - - /** - * Deep copy Uint8Array - * @param {Uint8Array} Array to copy - * @returns {Uint8Array} new Uint8Array - */ - copyUint8Array: function (array) { - if (!util.isUint8Array(array)) { - throw new Error('Data must be in the form of a Uint8Array'); - } - - const copy = new Uint8Array(array.length); - copy.set(array); - return copy; - }, + concatUint8Array: stream.concatUint8Array, /** * Check Uint8Array equality