Streaming support on Node

This commit is contained in:
Daniel Huigens 2018-05-17 20:40:37 +02:00
parent 8658816b90
commit fb155ffae0
5 changed files with 95 additions and 15 deletions

View File

@ -30,8 +30,9 @@ const Buffer = util.getNodeBuffer();
function node_hash(type) { function node_hash(type) {
return function (data) { return function (data) {
const shasum = nodeCrypto.createHash(type); const shasum = nodeCrypto.createHash(type);
shasum.update(new Buffer(data)); return stream.transform(data, value => {
return new Uint8Array(shasum.digest()); shasum.update(new Buffer(value));
}, () => new Uint8Array(shasum.digest()));
}; };
} }

View File

@ -133,7 +133,7 @@ const nodeZlib = util.getNodeZlib();
function node_zlib(func, options = {}) { function node_zlib(func, options = {}) {
return function (data) { return function (data) {
return func(data, options); return stream.nodeToWeb(stream.webToNode(data).pipe(func(options)));
}; };
} }
@ -163,17 +163,17 @@ let decompress_fns;
if (nodeZlib) { // Use Node native zlib for DEFLATE compression/decompression if (nodeZlib) { // Use Node native zlib for DEFLATE compression/decompression
compress_fns = { compress_fns = {
// eslint-disable-next-line no-sync // eslint-disable-next-line no-sync
zip: node_zlib(nodeZlib.deflateRawSync, { level: config.deflate_level }), zip: node_zlib(nodeZlib.createDeflateRaw, { level: config.deflate_level }),
// eslint-disable-next-line no-sync // eslint-disable-next-line no-sync
zlib: node_zlib(nodeZlib.deflateSync, { level: config.deflate_level }), zlib: node_zlib(nodeZlib.createDeflate, { level: config.deflate_level }),
bzip2: bzip2(Bzip2.compressFile) bzip2: bzip2(Bzip2.compressFile)
}; };
decompress_fns = { decompress_fns = {
// eslint-disable-next-line no-sync // eslint-disable-next-line no-sync
zip: node_zlib(nodeZlib.inflateRawSync), zip: node_zlib(nodeZlib.createInflateRaw),
// eslint-disable-next-line no-sync // eslint-disable-next-line no-sync
zlib: node_zlib(nodeZlib.inflateSync), zlib: node_zlib(nodeZlib.createInflate),
bzip2: bzip2(Bzip2.decompressFile) bzip2: bzip2(Bzip2.decompressFile)
}; };
} else { // Use JS fallbacks } else { // Use JS fallbacks

View File

@ -172,19 +172,16 @@ function aesDecrypt(algo, ct, key) {
return stream.subarray(pt, crypto.cipher[algo].blockSize + 2); // Remove random prefix return stream.subarray(pt, crypto.cipher[algo].blockSize + 2); // Remove random prefix
} }
function nodeEncrypt(algo, prefix, pt, key) { function nodeEncrypt(algo, pt, key) {
key = new Buffer(key); key = new Buffer(key);
const iv = new Buffer(new Uint8Array(crypto.cipher[algo].blockSize)); const iv = new Buffer(new Uint8Array(crypto.cipher[algo].blockSize));
const cipherObj = new nodeCrypto.createCipheriv('aes-' + algo.substr(3, 3) + '-cfb', key, iv); const cipherObj = new nodeCrypto.createCipheriv('aes-' + algo.substr(3, 3) + '-cfb', key, iv);
const ct = cipherObj.update(new Buffer(util.concat([prefix, pt]))); return stream.transform(pt, value => new Uint8Array(cipherObj.update(new Buffer(value))));
return new Uint8Array(ct);
} }
function nodeDecrypt(algo, ct, key) { function nodeDecrypt(algo, ct, key) {
ct = new Buffer(ct);
key = new Buffer(key); key = new Buffer(key);
const iv = new Buffer(new Uint8Array(crypto.cipher[algo].blockSize)); const iv = new Buffer(new Uint8Array(crypto.cipher[algo].blockSize));
const decipherObj = new nodeCrypto.createDecipheriv('aes-' + algo.substr(3, 3) + '-cfb', key, iv); const decipherObj = new nodeCrypto.createDecipheriv('aes-' + algo.substr(3, 3) + '-cfb', key, iv);
const pt = decipherObj.update(ct); return stream.transform(ct, value => new Uint8Array(decipherObj.update(new Buffer(value))));
return new Uint8Array(pt);
} }

View File

@ -4,6 +4,8 @@ if (typeof ReadableStream === 'undefined') {
Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill')); Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill'));
} }
const nodeStream = util.getNodeStream();
function concat(arrays) { function concat(arrays) {
const readers = arrays.map(getReader); const readers = arrays.map(getReader);
let current = 0; let current = 0;
@ -104,7 +106,76 @@ async function readToEnd(input, join) {
} }
export default { concat, getReader, transform, clone, subarray, readToEnd }; /**
* Web / node stream conversion functions
* From https://github.com/gwicke/node-web-streams
*/
let nodeToWeb;
let webToNode;
if (nodeStream) {
nodeToWeb = function(nodeStream) {
return new ReadableStream({
start(controller) {
nodeStream.pause();
nodeStream.on('data', chunk => {
controller.enqueue(chunk);
nodeStream.pause();
});
nodeStream.on('end', () => controller.close());
nodeStream.on('error', e => controller.error(e));
},
pull() {
nodeStream.resume();
},
cancel() {
nodeStream.pause();
}
});
};
class NodeReadable extends nodeStream.Readable {
constructor(webStream, options) {
super(options);
this._webStream = webStream;
this._reader = getReader(webStream);
this._reading = false;
}
_read(size) {
if (this._reading) {
return;
}
this._reading = true;
const doRead = () => {
this._reader.read()
.then(res => {
if (res.done) {
this.push(null);
return;
}
if (this.push(res.value)) {
return doRead(size);
} else {
this._reading = false;
}
});
};
doRead();
}
}
webToNode = function(webStream) {
return new NodeReadable(webStream);
};
}
export default { concat, getReader, transform, clone, subarray, readToEnd, nodeToWeb, webToNode };
/*const readerAcquiredMap = new Map(); /*const readerAcquiredMap = new Map();

View File

@ -630,7 +630,7 @@ export default {
// This "hack" allows us to access the native node buffer module. // This "hack" allows us to access the native node buffer module.
// otherwise, it gets replaced with the browserified version // otherwise, it gets replaced with the browserified version
// eslint-disable-next-line no-useless-concat, import/no-dynamic-require // eslint-disable-next-line no-useless-concat, import/no-dynamic-require
return require('buf'+'fer').Buffer; return require('buffer'+'').Buffer;
}, },
getNodeZlib: function() { getNodeZlib: function() {
@ -641,6 +641,17 @@ export default {
return require('zlib'); return require('zlib');
}, },
getNodeStream: function() {
if (!util.detectNode()) {
return;
}
// This "hack" allows us to access the native node buffer module.
// otherwise, it gets replaced with the browserified version
// eslint-disable-next-line no-useless-concat, import/no-dynamic-require
return require('stream'+'');
},
isEmailAddress: function(data) { isEmailAddress: function(data) {
if (!util.isString(data)) { if (!util.isString(data)) {
return false; return false;