From 802e1b8d9475565e635d8d9b917880856b0de6cc Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Thu, 17 May 2018 19:10:07 +0200 Subject: [PATCH] Transfer Streams to Workers Also, add a "asStream" parameter to high-level functions to control whether the return value is a Stream; defaulting to whether the parameter passed was a Stream. --- src/encoding/armor.js | 2 +- src/openpgp.js | 69 +++++++++++++++++++++++++-------------- src/util.js | 49 +++++++++++++++++++++++---- src/worker/async_proxy.js | 7 ++-- src/worker/worker.js | 3 ++ test/general/util.js | 18 +++++----- 6 files changed, 103 insertions(+), 45 deletions(-) diff --git a/src/encoding/armor.js b/src/encoding/armor.js index 713ead63..65da3d79 100644 --- a/src/encoding/armor.js +++ b/src/encoding/armor.js @@ -359,7 +359,7 @@ function armor(messagetype, body, partindex, parttotal, customComment) { break; } - return stream.concat(result); + return util.concat(result); } export default { diff --git a/src/openpgp.js b/src/openpgp.js index a330771e..5132565f 100644 --- a/src/openpgp.js +++ b/src/openpgp.js @@ -134,8 +134,8 @@ export function generateKey({ userIds=[], passphrase="", numBits=2048, keyExpira return { key: key, - privateKeyArmored: await stream.readToEnd(key.armor()), - publicKeyArmored: await stream.readToEnd(key.toPublic().armor()) + privateKeyArmored: await convertStream(key.armor()), + publicKeyArmored: await convertStream(key.toPublic().armor()), revocationCertificate: revocationCertificate }; @@ -287,6 +287,7 @@ export function encryptKey({ privateKey, passphrase }) { * @param {String} filename (optional) a filename for the literal data packet * @param {module:enums.compression} compression (optional) which compression algorithm to compress the message with, defaults to what is specified in config * @param {Boolean} armor (optional) if the return values should be ascii armored or the message/signature objects + * @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if data is a Stream. * @param {Boolean} detached (optional) if the signature should be detached (if true, signature will be added to returned object) * @param {Signature} signature (optional) a detached signature to add to the encrypted message * @param {Boolean} returnSessionKey (optional) if the unencrypted session key should be added to returned object @@ -300,11 +301,12 @@ export function encryptKey({ privateKey, passphrase }) { * @async * @static */ -export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression=config.compression, armor=true, detached=false, signature=null, returnSessionKey=false, wildcard=false, date=new Date(), fromUserId={}, toUserId={} }) { +export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression=config.compression, armor=true, asStream, detached=false, signature=null, returnSessionKey=false, wildcard=false, date=new Date(), fromUserId={}, toUserId={} }) { checkData(data); publicKeys = toArray(publicKeys); privateKeys = toArray(privateKeys); passwords = toArray(passwords); + if (asStream === undefined) asStream = util.isStream(data); if (!nativeAEAD() && asyncProxy) { // use web worker if web crypto apis are not supported - return asyncProxy.delegate('encrypt', { data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression, armor, detached, signature, returnSessionKey, wildcard, date, fromUserId, toUserId }); + return asyncProxy.delegate('encrypt', { data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression, armor, asStream, detached, signature, returnSessionKey, wildcard, date, fromUserId, toUserId }); } const result = {}; return Promise.resolve().then(async function() { @@ -326,9 +328,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se }).then(async encrypted => { if (armor) { result.data = encrypted.message.armor(); - if (!util.isStream(data)) { - result.data = await stream.readToEnd(result.data); - } + result.data = await convertStream(result.data, asStream); } else { result.message = encrypted.message; } @@ -348,6 +348,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se * @param {Object|Array} sessionKeys (optional) session keys in the form: { data:Uint8Array, algorithm:String } * @param {Key|Array} publicKeys (optional) array of public keys or single key, to verify signatures * @param {String} format (optional) return data format either as 'utf8' or 'binary' + * @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if message was created from a Stream. * @param {Signature} signature (optional) detached signature for verification * @param {Date} date (optional) use the given date for verification instead of the current time * @returns {Promise} decrypted and verified message in the form: @@ -355,14 +356,14 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se * @async * @static */ -export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKeys, format='utf8', signature=null, date=new Date() }) { +export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKeys, format='utf8', asStream, signature=null, date=new Date() }) { checkMessage(message); publicKeys = toArray(publicKeys); privateKeys = toArray(privateKeys); passwords = toArray(passwords); sessionKeys = toArray(sessionKeys); + if (asStream === undefined) asStream = message.fromStream; if (!nativeAEAD() && asyncProxy) { // use web worker if web crypto apis are not supported - return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, signature, date }); + return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, asStream, signature, date }); } - const asStream = message.fromStream; return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(message) { const result = await parseMessage(message, format, asStream); @@ -390,6 +391,7 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe * @param {utf8|binary|text|mime} dataType (optional) data packet type * @param {Key|Array} privateKeys array of keys or single key with decrypted secret key data to sign cleartext * @param {Boolean} armor (optional) if the return value should be ascii armored or the message object + * @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if data is a Stream. * @param {Boolean} detached (optional) if the return value should contain a detached signature * @param {Date} date (optional) override the creation date signature * @param {Object} fromUserId (optional) user ID to sign with, e.g. { name:'Steve Sender', email:'steve@openpgp.org' } @@ -399,13 +401,14 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe * @async * @static */ -export function sign({ data, dataType, privateKeys, armor=true, detached=false, date=new Date(), fromUserId={} }) { +export function sign({ data, dataType, privateKeys, armor=true, asStream, detached=false, date=new Date(), fromUserId={} }) { checkData(data); privateKeys = toArray(privateKeys); + if (asStream === undefined) asStream = util.isStream(data); if (asyncProxy) { // use web worker if available return asyncProxy.delegate('sign', { - data, dataType, privateKeys, armor, detached, date, fromUserId + data, dataType, privateKeys, armor, asStream, detached, date, fromUserId }); } @@ -420,9 +423,7 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false, message = await message.sign(privateKeys, undefined, date, fromUserId); if (armor) { result.data = message.armor(); - if (!util.isStream(data)) { - result.data = await stream.readToEnd(result.data); - } + result.data = await convertStream(result.data, asStream); } else { result.message = message; } @@ -435,6 +436,7 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false, * Verifies signatures of cleartext signed message * @param {Key|Array} publicKeys array of publicKeys or single key, to verify signatures * @param {CleartextMessage} message cleartext message object with signatures + * @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if message was created from a Stream. * @param {Signature} signature (optional) detached signature for verification * @param {Date} date (optional) use the given date for verification instead of the current time * @returns {Promise} cleartext with status of verified signatures in the form of: @@ -442,20 +444,19 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false, * @async * @static */ -export function verify({ message, publicKeys, signature=null, date=new Date() }) { +export function verify({ message, publicKeys, asStream, signature=null, date=new Date() }) { checkCleartextOrMessage(message); publicKeys = toArray(publicKeys); + if (asStream === undefined) asStream = message.fromStream; if (asyncProxy) { // use web worker if available - return asyncProxy.delegate('verify', { message, publicKeys, signature, date }); + return asyncProxy.delegate('verify', { message, publicKeys, asStream, signature, date }); } return Promise.resolve().then(async function() { const result = {}; result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData(); - if (!message.fromStream) { - result.data = await stream.readToEnd(result.data); - } + result.data = await convertStream(result.data, asStream); result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date); @@ -599,7 +600,7 @@ function createMessage(data, filename, date=new Date(), type) { * Parse the message given a certain format. * @param {Message} message the message object to be parse * @param {String} format the output format e.g. 'utf8' or 'binary' - * @param {Boolean} asStream whether to return a ReadableStream, if available + * @param {Boolean} asStream whether to return a ReadableStream * @returns {Object} the parse data in the respective format */ async function parseMessage(message, format, asStream) { @@ -611,13 +612,33 @@ async function parseMessage(message, format, asStream) { } else { throw new Error('Invalid format'); } - if (!asStream && util.isStream(data)) { - data = await stream.readToEnd(data); - } + data = await convertStream(data, asStream); const filename = message.getFilename(); return { data, filename }; } +/** + * Convert data to or from Stream + * @param {Object} data the data to convert + * @param {Boolean} asStream whether to return a ReadableStream + * @returns {Object} the parse data in the respective format + */ +async function convertStream(data, asStream) { + if (!asStream && util.isStream(data)) { + return stream.readToEnd(data); + } + if (asStream && !util.isStream(data)) { + return new ReadableStream({ + start(controller) { + controller.enqueue(data); + controller.close(); + } + }); + } + return data; +} + + /** * Global error handler that logs the stack trace and rethrows a high lvl error message. * @param {String} message A human readable high level error Message diff --git a/src/util.js b/src/util.js index 2c951914..6e922e7d 100644 --- a/src/util.js +++ b/src/util.js @@ -57,17 +57,17 @@ export default { * @param {Object} obj the options object to be passed to the web worker * @returns {Array} an array of binary data to be passed */ - prepareBuffers: async function(obj) { + getTransferables: function(obj) { // Internet Explorer does not support Transferable objects. if (isIE11) { return undefined; } const transferables = []; - await util.collectBuffers(obj, transferables); + util.collectTransferables(obj, transferables); return transferables.length ? transferables : undefined; }, - collectBuffers: async function(obj, collection) { + collectTransferables: function(obj, collection) { if (!obj) { return; } @@ -79,15 +79,50 @@ export default { return; } if (Object.prototype.isPrototypeOf(obj)) { - await Promise.all(Object.entries(obj).map(async ([key, value]) => { // recursively search all children + Object.entries(obj).forEach(([key, value]) => { // recursively search all children if (util.isStream(value)) { - obj[key] = value = await stream.readToEnd(value); + const reader = stream.getReader(value); + const { port1, port2 } = new MessageChannel(); + port1.onmessage = async function() { + port1.postMessage(await reader.read()); + }; + obj[key] = port2; + collection.push(port2); + return; } - await util.collectBuffers(value, collection); - })); + util.collectTransferables(value, collection); + }); } }, + restoreStreams: function(obj) { + if (Object.prototype.isPrototypeOf(obj)) { + Object.entries(obj).forEach(([key, value]) => { // recursively search all children + if (MessagePort.prototype.isPrototypeOf(value)) { + obj[key] = new ReadableStream({ + pull(controller) { + return new Promise(resolve => { + value.onmessage = evt => { + const { done, value } = evt.data; + if (!done) { + controller.enqueue(value); + } else { + controller.close(); + } + resolve(); + }; + value.postMessage(undefined); + }); + } + }); + return; + } + util.restoreStreams(value); + }); + } + return obj; + }, + readNumber: function (bytes) { let n = 0; for (let i = 0; i < bytes.length; i++) { diff --git a/src/worker/async_proxy.js b/src/worker/async_proxy.js index 9b82d07b..28de7f41 100644 --- a/src/worker/async_proxy.js +++ b/src/worker/async_proxy.js @@ -112,7 +112,7 @@ AsyncProxy.prototype.getID = function() { */ AsyncProxy.prototype.seedRandom = async function(workerId, size) { const buf = await crypto.random.getRandomBytes(size); - this.workers[workerId].postMessage({ event:'seed-random', buf }, await util.prepareBuffers(buf)); + this.workers[workerId].postMessage({ event:'seed-random', buf }, util.getTransferables(buf)); }; /** @@ -145,12 +145,11 @@ AsyncProxy.prototype.delegate = function(method, options) { return new Promise(async (resolve, reject) => { // clone packets (for web worker structured cloning algorithm) - const transferables = await util.prepareBuffers(options); - this.workers[workerId].postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, transferables); + this.workers[workerId].postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, util.getTransferables(options)); this.workers[workerId].requests++; // remember to handle parsing cloned packets from worker - this.tasks[id] = { resolve: data => resolve(packet.clone.parseClonedPackets(data, method)), reject }; + this.tasks[id] = { resolve: data => resolve(util.restoreStreams(packet.clone.parseClonedPackets(data, method))), reject }; }); }; diff --git a/src/worker/worker.js b/src/worker/worker.js index 0dc704f9..69df6096 100644 --- a/src/worker/worker.js +++ b/src/worker/worker.js @@ -115,10 +115,13 @@ function delegate(id, method, options) { } // parse cloned packets options = openpgp.packet.clone.parseClonedPackets(options, method); + // construct ReadableStreams from MessagePorts + openpgp.util.restoreStreams(options); openpgp[method](options).then(function(data) { // clone packets (for web worker structured cloning algorithm) response({ id:id, event:'method-return', data:openpgp.packet.clone.clonePackets(data) }); }).catch(function(e) { + openpgp.util.print_debug_error(e); response({ id:id, event:'method-return', err:e.message, stack:e.stack }); diff --git a/test/general/util.js b/test/general/util.js index 67e36647..a611590a 100644 --- a/test/general/util.js +++ b/test/general/util.js @@ -116,7 +116,7 @@ describe('Util unit tests', function() { }); }); - describe('prepareBuffers', function() { + describe('getTransferables', function() { let zero_copyVal; const buf1 = new Uint8Array(1); const buf2 = new Uint8Array(1); @@ -137,18 +137,18 @@ describe('Util unit tests', function() { openpgp.config.zero_copy = zero_copyVal; }); - it('should return undefined when zero_copy is false', async function() { + it('should return undefined when zero_copy is false', function() { openpgp.config.zero_copy = false; - expect(await openpgp.util.prepareBuffers(obj)).to.be.undefined; + expect(openpgp.util.getTransferables(obj)).to.be.undefined; }); - it('should return undefined for no input', async function() { - expect(await openpgp.util.prepareBuffers()).to.be.undefined; + it('should return undefined for no input', function() { + expect(openpgp.util.getTransferables()).to.be.undefined; }); - it('should return undefined for an empty oject', async function() { - expect(await openpgp.util.prepareBuffers({})).to.be.undefined; + it('should return undefined for an empty oject', function() { + expect(openpgp.util.getTransferables({})).to.be.undefined; }); - it('should return two buffers', async function() { - expect(await openpgp.util.prepareBuffers(obj)).to.deep.equal([buf1.buffer, buf2.buffer]); + it('should return two buffers', function() { + expect(openpgp.util.getTransferables(obj)).to.deep.equal([buf1.buffer, buf2.buffer]); }); });