Don't depend on util in stream.js

This commit is contained in:
Daniel Huigens 2018-07-16 16:25:23 +02:00
parent bb15ffc2a0
commit 252da44419
2 changed files with 101 additions and 96 deletions

View File

@ -1,6 +1,24 @@
import util from './util'; const NodeReadableStream = typeof window === 'undefined' && require('stream').Readable;
const NodeReadableStream = util.getNodeStream(); const isIE11 = typeof navigator !== 'undefined' && !!navigator.userAgent.match(/Trident\/7\.0.*rv:([0-9.]+).*\).*Gecko$/);
/**
* Check whether data is a Stream
* @param {Any} input data to check
* @returns {Boolean}
*/
function isStream(input) {
return ReadableStream.prototype.isPrototypeOf(input);
}
/**
* Check whether data is a Uint8Array
* @param {Any} input data to check
* @returns {Boolean}
*/
function isUint8Array(input) {
return Uint8Array.prototype.isPrototypeOf(input);
}
/** /**
* Convert data to Stream * Convert data to Stream
@ -8,7 +26,7 @@ const NodeReadableStream = util.getNodeStream();
* @returns {ReadableStream} Converted data * @returns {ReadableStream} Converted data
*/ */
function toStream(input) { function toStream(input) {
if (util.isStream(input)) { if (isStream(input)) {
return input; return input;
} }
return new ReadableStream({ return new ReadableStream({
@ -20,17 +38,60 @@ function toStream(input) {
} }
/** /**
* Concat a list of Streams * Concat a list of Uint8Arrays, Strings or Streams
* @param {Array<ReadableStream|Uint8array|String>} list Array of Uint8Arrays/Strings/Streams to concatenate * The caller should not mix Uint8Arrays with Strings, but may mix Streams with non-Streams.
* @returns {ReadableStream} Concatenated array * @param {Array<Uint8array|String|ReadableStream>} Array of Uint8Arrays/Strings/Streams to concatenate
* @returns {Uint8array|String|ReadableStream} Concatenated array
*/ */
function concat(list) { function concat(list) {
if (list.some(isStream)) {
return concatStream(list);
}
if (typeof list[0] === 'string') {
return list.join('');
}
return concatUint8Array(list);
}
/**
* Concat Uint8Arrays
* @param {Array<Uint8array>} Array of Uint8Arrays to concatenate
* @returns {Uint8array} Concatenated array
*/
function concatUint8Array(arrays) {
if (arrays.length === 1) return arrays[0];
let totalLength = 0;
for (let i = 0; i < arrays.length; i++) {
if (!isUint8Array(arrays[i])) {
throw new Error('concatUint8Array: Data must be in the form of a Uint8Array');
}
totalLength += arrays[i].length;
}
const result = new Uint8Array(totalLength);
let pos = 0;
arrays.forEach(function (element) {
result.set(element, pos);
pos += element.length;
});
return result;
}
/**
* Concat a list of Streams
* @param {Array<ReadableStream|Uint8array|String>} list Array of Uint8Arrays/Strings/Streams to concatenate
* @returns {ReadableStream} Concatenated list
*/
function concatStream(list) {
list = list.map(toStream); list = list.map(toStream);
const transform = transformWithCancel(async function(reason) { const transform = transformWithCancel(async function(reason) {
await Promise.all(transforms.map(array => cancel(array, reason))); await Promise.all(transforms.map(stream => cancel(stream, reason)));
}); });
let prev = Promise.resolve(); let prev = Promise.resolve();
const transforms = list.map((array, i) => transformPair(array, (readable, writable) => { const transforms = list.map((stream, i) => transformPair(stream, (readable, writable) => {
prev = prev.then(() => pipe(readable, transform.writable, { prev = prev.then(() => pipe(readable, transform.writable, {
preventClose: i !== list.length - 1 preventClose: i !== list.length - 1
})); }));
@ -65,9 +126,7 @@ function getWriter(input) {
* @returns {Promise<undefined>} Promise indicating when piping has finished (input stream closed or errored) * @returns {Promise<undefined>} Promise indicating when piping has finished (input stream closed or errored)
*/ */
async function pipe(input, target, options) { async function pipe(input, target, options) {
if (!util.isStream(input)) { input = toStream(input);
input = toStream(input);
}
try { try {
if (input[externalBuffer]) { if (input[externalBuffer]) {
const writer = target.getWriter(); const writer = target.getWriter();
@ -77,10 +136,8 @@ async function pipe(input, target, options) {
} }
writer.releaseLock(); writer.releaseLock();
} }
return await input.pipeTo(target, options).catch(function() {}); return await input.pipeTo(target, options);
} catch(e) { } catch(e) {}
util.print_debug_error(e);
}
} }
/** /**
@ -144,7 +201,7 @@ function transformWithCancel(cancel) {
* @returns {ReadableStream|Uint8array|String} * @returns {ReadableStream|Uint8array|String}
*/ */
function transform(input, process = () => undefined, finish = () => undefined) { function transform(input, process = () => undefined, finish = () => undefined) {
if (util.isStream(input)) { if (isStream(input)) {
return transformRaw(input, { return transformRaw(input, {
async transform(value, controller) { async transform(value, controller) {
try { try {
@ -166,7 +223,7 @@ function transform(input, process = () => undefined, finish = () => undefined) {
} }
const result1 = process(input); const result1 = process(input);
const result2 = finish(); const result2 = finish();
if (result1 !== undefined && result2 !== undefined) return util.concat([result1, result2]); if (result1 !== undefined && result2 !== undefined) return concat([result1, result2]);
return result1 !== undefined ? result1 : result2; return result1 !== undefined ? result1 : result2;
} }
@ -229,7 +286,7 @@ function parse(input, fn) {
* @returns {Array<ReadableStream|Uint8array|String>} array containing two copies of input * @returns {Array<ReadableStream|Uint8array|String>} array containing two copies of input
*/ */
function tee(input) { function tee(input) {
if (util.isStream(input)) { if (isStream(input)) {
const teed = input.tee(); const teed = input.tee();
teed[0][externalBuffer] = teed[1][externalBuffer] = input[externalBuffer]; teed[0][externalBuffer] = teed[1][externalBuffer] = input[externalBuffer];
return teed; return teed;
@ -245,7 +302,7 @@ function tee(input) {
* @returns {ReadableStream|Uint8array|String} cloned input * @returns {ReadableStream|Uint8array|String} cloned input
*/ */
function clone(input) { function clone(input) {
if (util.isStream(input)) { if (isStream(input)) {
const teed = tee(input); const teed = tee(input);
overwrite(input, teed[0]); overwrite(input, teed[0]);
return teed[1]; return teed[1];
@ -262,7 +319,7 @@ function clone(input) {
* @returns {ReadableStream|Uint8array|String} cloned input * @returns {ReadableStream|Uint8array|String} cloned input
*/ */
function passiveClone(input) { function passiveClone(input) {
if (util.isStream(input)) { if (isStream(input)) {
return new ReadableStream({ return new ReadableStream({
start(controller) { start(controller) {
const transformed = transformPair(input, async (readable, writable) => { const transformed = transformPair(input, async (readable, writable) => {
@ -319,7 +376,7 @@ function overwrite(input, clone) {
* @returns {ReadableStream|Uint8array|String} clone * @returns {ReadableStream|Uint8array|String} clone
*/ */
function slice(input, begin=0, end=Infinity) { function slice(input, begin=0, end=Infinity) {
if (util.isStream(input)) { if (isStream(input)) {
if (begin >= 0 && end >= 0) { if (begin >= 0 && end >= 0) {
let bytesRead = 0; let bytesRead = 0;
return transformRaw(input, { return transformRaw(input, {
@ -340,12 +397,12 @@ function slice(input, begin=0, end=Infinity) {
return transform(input, value => { return transform(input, value => {
if (value.length >= -begin) lastBytes = [value]; if (value.length >= -begin) lastBytes = [value];
else lastBytes.push(value); else lastBytes.push(value);
}, () => slice(util.concat(lastBytes), begin, end)); }, () => slice(concat(lastBytes), begin, end));
} }
if (begin === 0 && end < 0) { if (begin === 0 && end < 0) {
let lastBytes; let lastBytes;
return transform(input, value => { return transform(input, value => {
const returnValue = lastBytes ? util.concat([lastBytes, value]) : value; const returnValue = lastBytes ? concat([lastBytes, value]) : value;
if (returnValue.length >= -end) { if (returnValue.length >= -end) {
lastBytes = slice(returnValue, end); lastBytes = slice(returnValue, end);
return slice(returnValue, begin, end); return slice(returnValue, begin, end);
@ -354,27 +411,26 @@ function slice(input, begin=0, end=Infinity) {
} }
}); });
} }
// TODO: Don't read entire stream into memory here. console.warn(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`);
util.print_debug_error(`stream.slice(input, ${begin}, ${end}) not implemented efficiently.`);
return fromAsync(async () => slice(await readToEnd(input), begin, end)); return fromAsync(async () => slice(await readToEnd(input), begin, end));
} }
if (input[externalBuffer]) { if (input[externalBuffer]) {
input = util.concat(input[externalBuffer].concat([input])); input = concat(input[externalBuffer].concat([input]));
} }
if (util.isUint8Array(input) && !util.isIE11) { // IE11 subarray is buggy if (isUint8Array(input) && !isIE11) { // IE11 subarray is buggy
return input.subarray(begin, end); return input.subarray(begin, end);
} }
return input.slice(begin, end); return input.slice(begin, end);
} }
/** /**
* Read a stream to the end and return its contents, concatenated by the concat function (defaults to util.concat). * Read a stream to the end and return its contents, concatenated by the concat function (defaults to concat).
* @param {ReadableStream|Uint8array|String} input * @param {ReadableStream|Uint8array|String} input
* @param {Function} concat * @param {Function} concat
* @returns {Uint8array|String|Any} the return value of concat() * @returns {Uint8array|String|Any} the return value of concat()
*/ */
async function readToEnd(input, concat) { async function readToEnd(input, concat) {
if (util.isStream(input)) { if (isStream(input)) {
return getReader(input).readToEnd(concat); return getReader(input).readToEnd(concat);
} }
return input; return input;
@ -387,7 +443,7 @@ async function readToEnd(input, concat) {
* @returns {Promise<Any>} indicates when the stream has been canceled * @returns {Promise<Any>} indicates when the stream has been canceled
*/ */
async function cancel(input, reason) { async function cancel(input, reason) {
if (util.isStream(input)) { if (isStream(input)) {
return input.cancel(reason); return input.cancel(reason);
} }
} }
@ -490,7 +546,7 @@ if (NodeReadableStream) {
} }
export default { toStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, passiveClone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync }; export default { isStream, isUint8Array, toStream, concatUint8Array, concatStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, passiveClone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync };
const doneReadingSet = new WeakSet(); const doneReadingSet = new WeakSet();
@ -500,7 +556,7 @@ function Reader(input) {
if (input[externalBuffer]) { if (input[externalBuffer]) {
this[externalBuffer] = input[externalBuffer].slice(); this[externalBuffer] = input[externalBuffer].slice();
} }
if (util.isStream(input)) { if (isStream(input)) {
const reader = input.getReader(); const reader = input.getReader();
this._read = reader.read.bind(reader); this._read = reader.read.bind(reader);
this._releaseLock = reader.releaseLock.bind(reader); this._releaseLock = reader.releaseLock.bind(reader);
@ -555,12 +611,12 @@ Reader.prototype.readLine = async function() {
while (!returnVal) { while (!returnVal) {
const { done, value } = await this.read(); const { done, value } = await this.read();
if (done) { if (done) {
if (buffer.length) return util.concat(buffer); if (buffer.length) return concat(buffer);
return; return;
} }
const lineEndIndex = value.indexOf('\n') + 1; const lineEndIndex = value.indexOf('\n') + 1;
if (lineEndIndex) { if (lineEndIndex) {
returnVal = util.concat(buffer.concat(value.substr(0, lineEndIndex))); returnVal = concat(buffer.concat(value.substr(0, lineEndIndex)));
buffer = []; buffer = [];
} }
if (lineEndIndex !== value.length) { if (lineEndIndex !== value.length) {
@ -593,13 +649,13 @@ Reader.prototype.readBytes = async function(length) {
while (true) { while (true) {
const { done, value } = await this.read(); const { done, value } = await this.read();
if (done) { if (done) {
if (buffer.length) return util.concat(buffer); if (buffer.length) return concat(buffer);
return; return;
} }
buffer.push(value); buffer.push(value);
bufferLength += value.length; bufferLength += value.length;
if (bufferLength >= length) { if (bufferLength >= length) {
const bufferConcat = util.concat(buffer); const bufferConcat = concat(buffer);
this.unshift(slice(bufferConcat, length)); this.unshift(slice(bufferConcat, length));
return slice(bufferConcat, 0, length); return slice(bufferConcat, 0, length);
} }
@ -628,16 +684,16 @@ Reader.prototype.unshift = function(...values) {
}; };
/** /**
* Read the stream to the end and return its contents, concatenated by the concat function (defaults to util.concat). * Read the stream to the end and return its contents, concatenated by the join function (defaults to concat).
* @param {Function} concat * @param {Function} join
* @returns {Uint8array|String|Any} the return value of concat() * @returns {Uint8array|String|Any} the return value of join()
*/ */
Reader.prototype.readToEnd = async function(concat=util.concat) { Reader.prototype.readToEnd = async function(join=concat) {
const result = []; const result = [];
while (true) { while (true) {
const { done, value } = await this.read(); const { done, value } = await this.read();
if (done) break; if (done) break;
result.push(value); result.push(value);
} }
return concat(result); return join(result);
}; };

View File

@ -31,11 +31,7 @@ import util from './util'; // re-import module to access util functions
import b64 from './encoding/base64'; import b64 from './encoding/base64';
import stream from './stream'; import stream from './stream';
const isIE11 = typeof navigator !== 'undefined' && !!navigator.userAgent.match(/Trident\/7\.0.*rv:([0-9.]+).*\).*Gecko$/);
export default { export default {
isIE11,
isString: function(data) { isString: function(data) {
return typeof data === 'string' || String.prototype.isPrototypeOf(data); return typeof data === 'string' || String.prototype.isPrototypeOf(data);
}, },
@ -44,13 +40,9 @@ export default {
return Array.prototype.isPrototypeOf(data); return Array.prototype.isPrototypeOf(data);
}, },
isUint8Array: function(data) { isUint8Array: stream.isUint8Array,
return Uint8Array.prototype.isPrototypeOf(data);
},
isStream: function(data) { isStream: stream.isStream,
return ReadableStream.prototype.isPrototypeOf(data);
},
/** /**
* Get transferable objects to pass buffers with zero copy (similar to "pass by reference" in C++) * Get transferable objects to pass buffers with zero copy (similar to "pass by reference" in C++)
@ -346,57 +338,14 @@ export default {
* @param {Array<Uint8array|String|ReadableStream>} Array of Uint8Arrays/Strings/Streams to concatenate * @param {Array<Uint8array|String|ReadableStream>} Array of Uint8Arrays/Strings/Streams to concatenate
* @returns {Uint8array|String|ReadableStream} Concatenated array * @returns {Uint8array|String|ReadableStream} Concatenated array
*/ */
concat: function (list) { concat: stream.concat,
if (list.some(util.isStream)) {
return stream.concat(list);
}
if (util.isString(list[0])) {
return list.join('');
}
return util.concatUint8Array(list);
},
/** /**
* Concat Uint8Arrays * Concat Uint8Arrays
* @param {Array<Uint8array>} Array of Uint8Arrays to concatenate * @param {Array<Uint8array>} Array of Uint8Arrays to concatenate
* @returns {Uint8array} Concatenated array * @returns {Uint8array} Concatenated array
*/ */
concatUint8Array: function (arrays) { concatUint8Array: stream.concatUint8Array,
if (arrays.length === 1) return arrays[0];
let totalLength = 0;
for (let i = 0; i < arrays.length; i++) {
if (!util.isUint8Array(arrays[i])) {
throw new Error('concatUint8Array: Data must be in the form of a Uint8Array');
}
totalLength += arrays[i].length;
}
const result = new Uint8Array(totalLength);
let pos = 0;
arrays.forEach(function (element) {
result.set(element, pos);
pos += element.length;
});
return result;
},
/**
* Deep copy Uint8Array
* @param {Uint8Array} Array to copy
* @returns {Uint8Array} new Uint8Array
*/
copyUint8Array: function (array) {
if (!util.isUint8Array(array)) {
throw new Error('Data must be in the form of a Uint8Array');
}
const copy = new Uint8Array(array.length);
copy.set(array);
return copy;
},
/** /**
* Check Uint8Array equality * Check Uint8Array equality