Transfer Streams to Workers

Also, add a "asStream" parameter to high-level functions to control
whether the return value is a Stream; defaulting to whether the
parameter passed was a Stream.
This commit is contained in:
Daniel Huigens 2018-05-17 19:10:07 +02:00
parent 58eca571bf
commit 802e1b8d94
6 changed files with 103 additions and 45 deletions

View File

@ -359,7 +359,7 @@ function armor(messagetype, body, partindex, parttotal, customComment) {
break;
}
return stream.concat(result);
return util.concat(result);
}
export default {

View File

@ -134,8 +134,8 @@ export function generateKey({ userIds=[], passphrase="", numBits=2048, keyExpira
return {
key: key,
privateKeyArmored: await stream.readToEnd(key.armor()),
publicKeyArmored: await stream.readToEnd(key.toPublic().armor())
privateKeyArmored: await convertStream(key.armor()),
publicKeyArmored: await convertStream(key.toPublic().armor()),
revocationCertificate: revocationCertificate
};
@ -287,6 +287,7 @@ export function encryptKey({ privateKey, passphrase }) {
* @param {String} filename (optional) a filename for the literal data packet
* @param {module:enums.compression} compression (optional) which compression algorithm to compress the message with, defaults to what is specified in config
* @param {Boolean} armor (optional) if the return values should be ascii armored or the message/signature objects
* @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if data is a Stream.
* @param {Boolean} detached (optional) if the signature should be detached (if true, signature will be added to returned object)
* @param {Signature} signature (optional) a detached signature to add to the encrypted message
* @param {Boolean} returnSessionKey (optional) if the unencrypted session key should be added to returned object
@ -300,11 +301,12 @@ export function encryptKey({ privateKey, passphrase }) {
* @async
* @static
*/
export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression=config.compression, armor=true, detached=false, signature=null, returnSessionKey=false, wildcard=false, date=new Date(), fromUserId={}, toUserId={} }) {
export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression=config.compression, armor=true, asStream, detached=false, signature=null, returnSessionKey=false, wildcard=false, date=new Date(), fromUserId={}, toUserId={} }) {
checkData(data); publicKeys = toArray(publicKeys); privateKeys = toArray(privateKeys); passwords = toArray(passwords);
if (asStream === undefined) asStream = util.isStream(data);
if (!nativeAEAD() && asyncProxy) { // use web worker if web crypto apis are not supported
return asyncProxy.delegate('encrypt', { data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression, armor, detached, signature, returnSessionKey, wildcard, date, fromUserId, toUserId });
return asyncProxy.delegate('encrypt', { data, dataType, publicKeys, privateKeys, passwords, sessionKey, filename, compression, armor, asStream, detached, signature, returnSessionKey, wildcard, date, fromUserId, toUserId });
}
const result = {};
return Promise.resolve().then(async function() {
@ -326,9 +328,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
}).then(async encrypted => {
if (armor) {
result.data = encrypted.message.armor();
if (!util.isStream(data)) {
result.data = await stream.readToEnd(result.data);
}
result.data = await convertStream(result.data, asStream);
} else {
result.message = encrypted.message;
}
@ -348,6 +348,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
* @param {Object|Array<Object>} sessionKeys (optional) session keys in the form: { data:Uint8Array, algorithm:String }
* @param {Key|Array<Key>} publicKeys (optional) array of public keys or single key, to verify signatures
* @param {String} format (optional) return data format either as 'utf8' or 'binary'
* @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if message was created from a Stream.
* @param {Signature} signature (optional) detached signature for verification
* @param {Date} date (optional) use the given date for verification instead of the current time
* @returns {Promise<Object>} decrypted and verified message in the form:
@ -355,14 +356,14 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
* @async
* @static
*/
export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKeys, format='utf8', signature=null, date=new Date() }) {
export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKeys, format='utf8', asStream, signature=null, date=new Date() }) {
checkMessage(message); publicKeys = toArray(publicKeys); privateKeys = toArray(privateKeys); passwords = toArray(passwords); sessionKeys = toArray(sessionKeys);
if (asStream === undefined) asStream = message.fromStream;
if (!nativeAEAD() && asyncProxy) { // use web worker if web crypto apis are not supported
return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, signature, date });
return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, asStream, signature, date });
}
const asStream = message.fromStream;
return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(message) {
const result = await parseMessage(message, format, asStream);
@ -390,6 +391,7 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe
* @param {utf8|binary|text|mime} dataType (optional) data packet type
* @param {Key|Array<Key>} privateKeys array of keys or single key with decrypted secret key data to sign cleartext
* @param {Boolean} armor (optional) if the return value should be ascii armored or the message object
* @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if data is a Stream.
* @param {Boolean} detached (optional) if the return value should contain a detached signature
* @param {Date} date (optional) override the creation date signature
* @param {Object} fromUserId (optional) user ID to sign with, e.g. { name:'Steve Sender', email:'steve@openpgp.org' }
@ -399,13 +401,14 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe
* @async
* @static
*/
export function sign({ data, dataType, privateKeys, armor=true, detached=false, date=new Date(), fromUserId={} }) {
export function sign({ data, dataType, privateKeys, armor=true, asStream, detached=false, date=new Date(), fromUserId={} }) {
checkData(data);
privateKeys = toArray(privateKeys);
if (asStream === undefined) asStream = util.isStream(data);
if (asyncProxy) { // use web worker if available
return asyncProxy.delegate('sign', {
data, dataType, privateKeys, armor, detached, date, fromUserId
data, dataType, privateKeys, armor, asStream, detached, date, fromUserId
});
}
@ -420,9 +423,7 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false,
message = await message.sign(privateKeys, undefined, date, fromUserId);
if (armor) {
result.data = message.armor();
if (!util.isStream(data)) {
result.data = await stream.readToEnd(result.data);
}
result.data = await convertStream(result.data, asStream);
} else {
result.message = message;
}
@ -435,6 +436,7 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false,
* Verifies signatures of cleartext signed message
* @param {Key|Array<Key>} publicKeys array of publicKeys or single key, to verify signatures
* @param {CleartextMessage} message cleartext message object with signatures
* @param {Boolean} asStream (optional) whether to return data as a ReadableStream. Defaults to true if message was created from a Stream.
* @param {Signature} signature (optional) detached signature for verification
* @param {Date} date (optional) use the given date for verification instead of the current time
* @returns {Promise<Object>} cleartext with status of verified signatures in the form of:
@ -442,20 +444,19 @@ export function sign({ data, dataType, privateKeys, armor=true, detached=false,
* @async
* @static
*/
export function verify({ message, publicKeys, signature=null, date=new Date() }) {
export function verify({ message, publicKeys, asStream, signature=null, date=new Date() }) {
checkCleartextOrMessage(message);
publicKeys = toArray(publicKeys);
if (asStream === undefined) asStream = message.fromStream;
if (asyncProxy) { // use web worker if available
return asyncProxy.delegate('verify', { message, publicKeys, signature, date });
return asyncProxy.delegate('verify', { message, publicKeys, asStream, signature, date });
}
return Promise.resolve().then(async function() {
const result = {};
result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData();
if (!message.fromStream) {
result.data = await stream.readToEnd(result.data);
}
result.data = await convertStream(result.data, asStream);
result.signatures = signature ?
await message.verifyDetached(signature, publicKeys, date) :
await message.verify(publicKeys, date);
@ -599,7 +600,7 @@ function createMessage(data, filename, date=new Date(), type) {
* Parse the message given a certain format.
* @param {Message} message the message object to be parse
* @param {String} format the output format e.g. 'utf8' or 'binary'
* @param {Boolean} asStream whether to return a ReadableStream, if available
* @param {Boolean} asStream whether to return a ReadableStream
* @returns {Object} the parse data in the respective format
*/
async function parseMessage(message, format, asStream) {
@ -611,13 +612,33 @@ async function parseMessage(message, format, asStream) {
} else {
throw new Error('Invalid format');
}
if (!asStream && util.isStream(data)) {
data = await stream.readToEnd(data);
}
data = await convertStream(data, asStream);
const filename = message.getFilename();
return { data, filename };
}
/**
* Convert data to or from Stream
* @param {Object} data the data to convert
* @param {Boolean} asStream whether to return a ReadableStream
* @returns {Object} the parse data in the respective format
*/
async function convertStream(data, asStream) {
if (!asStream && util.isStream(data)) {
return stream.readToEnd(data);
}
if (asStream && !util.isStream(data)) {
return new ReadableStream({
start(controller) {
controller.enqueue(data);
controller.close();
}
});
}
return data;
}
/**
* Global error handler that logs the stack trace and rethrows a high lvl error message.
* @param {String} message A human readable high level error Message

View File

@ -57,17 +57,17 @@ export default {
* @param {Object} obj the options object to be passed to the web worker
* @returns {Array<ArrayBuffer>} an array of binary data to be passed
*/
prepareBuffers: async function(obj) {
getTransferables: function(obj) {
// Internet Explorer does not support Transferable objects.
if (isIE11) {
return undefined;
}
const transferables = [];
await util.collectBuffers(obj, transferables);
util.collectTransferables(obj, transferables);
return transferables.length ? transferables : undefined;
},
collectBuffers: async function(obj, collection) {
collectTransferables: function(obj, collection) {
if (!obj) {
return;
}
@ -79,15 +79,50 @@ export default {
return;
}
if (Object.prototype.isPrototypeOf(obj)) {
await Promise.all(Object.entries(obj).map(async ([key, value]) => { // recursively search all children
Object.entries(obj).forEach(([key, value]) => { // recursively search all children
if (util.isStream(value)) {
obj[key] = value = await stream.readToEnd(value);
const reader = stream.getReader(value);
const { port1, port2 } = new MessageChannel();
port1.onmessage = async function() {
port1.postMessage(await reader.read());
};
obj[key] = port2;
collection.push(port2);
return;
}
await util.collectBuffers(value, collection);
}));
util.collectTransferables(value, collection);
});
}
},
restoreStreams: function(obj) {
if (Object.prototype.isPrototypeOf(obj)) {
Object.entries(obj).forEach(([key, value]) => { // recursively search all children
if (MessagePort.prototype.isPrototypeOf(value)) {
obj[key] = new ReadableStream({
pull(controller) {
return new Promise(resolve => {
value.onmessage = evt => {
const { done, value } = evt.data;
if (!done) {
controller.enqueue(value);
} else {
controller.close();
}
resolve();
};
value.postMessage(undefined);
});
}
});
return;
}
util.restoreStreams(value);
});
}
return obj;
},
readNumber: function (bytes) {
let n = 0;
for (let i = 0; i < bytes.length; i++) {

View File

@ -112,7 +112,7 @@ AsyncProxy.prototype.getID = function() {
*/
AsyncProxy.prototype.seedRandom = async function(workerId, size) {
const buf = await crypto.random.getRandomBytes(size);
this.workers[workerId].postMessage({ event:'seed-random', buf }, await util.prepareBuffers(buf));
this.workers[workerId].postMessage({ event:'seed-random', buf }, util.getTransferables(buf));
};
/**
@ -145,12 +145,11 @@ AsyncProxy.prototype.delegate = function(method, options) {
return new Promise(async (resolve, reject) => {
// clone packets (for web worker structured cloning algorithm)
const transferables = await util.prepareBuffers(options);
this.workers[workerId].postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, transferables);
this.workers[workerId].postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, util.getTransferables(options));
this.workers[workerId].requests++;
// remember to handle parsing cloned packets from worker
this.tasks[id] = { resolve: data => resolve(packet.clone.parseClonedPackets(data, method)), reject };
this.tasks[id] = { resolve: data => resolve(util.restoreStreams(packet.clone.parseClonedPackets(data, method))), reject };
});
};

View File

@ -115,10 +115,13 @@ function delegate(id, method, options) {
}
// parse cloned packets
options = openpgp.packet.clone.parseClonedPackets(options, method);
// construct ReadableStreams from MessagePorts
openpgp.util.restoreStreams(options);
openpgp[method](options).then(function(data) {
// clone packets (for web worker structured cloning algorithm)
response({ id:id, event:'method-return', data:openpgp.packet.clone.clonePackets(data) });
}).catch(function(e) {
openpgp.util.print_debug_error(e);
response({
id:id, event:'method-return', err:e.message, stack:e.stack
});

View File

@ -116,7 +116,7 @@ describe('Util unit tests', function() {
});
});
describe('prepareBuffers', function() {
describe('getTransferables', function() {
let zero_copyVal;
const buf1 = new Uint8Array(1);
const buf2 = new Uint8Array(1);
@ -137,18 +137,18 @@ describe('Util unit tests', function() {
openpgp.config.zero_copy = zero_copyVal;
});
it('should return undefined when zero_copy is false', async function() {
it('should return undefined when zero_copy is false', function() {
openpgp.config.zero_copy = false;
expect(await openpgp.util.prepareBuffers(obj)).to.be.undefined;
expect(openpgp.util.getTransferables(obj)).to.be.undefined;
});
it('should return undefined for no input', async function() {
expect(await openpgp.util.prepareBuffers()).to.be.undefined;
it('should return undefined for no input', function() {
expect(openpgp.util.getTransferables()).to.be.undefined;
});
it('should return undefined for an empty oject', async function() {
expect(await openpgp.util.prepareBuffers({})).to.be.undefined;
it('should return undefined for an empty oject', function() {
expect(openpgp.util.getTransferables({})).to.be.undefined;
});
it('should return two buffers', async function() {
expect(await openpgp.util.prepareBuffers(obj)).to.deep.equal([buf1.buffer, buf2.buffer]);
it('should return two buffers', function() {
expect(openpgp.util.getTransferables(obj)).to.deep.equal([buf1.buffer, buf2.buffer]);
});
});