From 2bb5db2cf414e1cea17273fb2dbeab4aaf036dbc Mon Sep 17 00:00:00 2001 From: Bart Butler Date: Mon, 5 Mar 2018 21:36:53 -0800 Subject: [PATCH] multiple web workers --- src/openpgp.js | 11 ++-- src/worker/async_proxy.js | 116 +++++++++++++++++++++++-------------- test/general/openpgp.js | 4 +- test/worker/async_proxy.js | 2 +- 4 files changed, 82 insertions(+), 51 deletions(-) diff --git a/src/openpgp.js b/src/openpgp.js index a7c0d048..e7c9c64e 100644 --- a/src/openpgp.js +++ b/src/openpgp.js @@ -55,12 +55,13 @@ let asyncProxy; // instance of the asyncproxy /** * Set the path for the web worker script and create an instance of the async proxy - * @param {String} path relative path to the worker scripts, default: 'openpgp.worker.js' - * @param {Object} worker alternative to path parameter: web worker initialized with 'openpgp.worker.js' + * @param {String} path relative path to the worker scripts, default: 'openpgp.worker.js' + * @param {Number} n number of workers to initialize + * @param {Array} workers alternative to path parameter: web workers initialized with 'openpgp.worker.js' */ -export function initWorker({ path='openpgp.worker.js', worker } = {}) { - if (worker || (typeof window !== 'undefined' && window.Worker)) { - asyncProxy = new AsyncProxy({ path, worker, config }); +export function initWorker({ path='openpgp.worker.js', n = 1, workers = [] } = {}) { + if (workers.length || (typeof window !== 'undefined' && window.Worker)) { + asyncProxy = new AsyncProxy({ path, n, workers, config }); return true; } } diff --git a/src/worker/async_proxy.js b/src/worker/async_proxy.js index e5f3f45a..d44f87e4 100644 --- a/src/worker/async_proxy.js +++ b/src/worker/async_proxy.js @@ -19,24 +19,69 @@ import util from '../util.js'; import crypto from '../crypto'; import packet from '../packet'; +/** + * Message handling + */ +function handleMessage(id) { + return function(event) { + const msg = event.data; + switch (msg.event) { + case 'method-return': + if (msg.err) { + // fail + const err = new Error(msg.err); + // add worker stack + err.workerStack = msg.stack; + this.tasks[msg.id].reject(err); + } else { + // success + this.tasks[msg.id].resolve(msg.data); + } + delete this.tasks[msg.id]; + this.workers[id].requests--; + break; + case 'request-seed': + this.seedRandom(id, msg.amount); + break; + default: + throw new Error('Unknown Worker Event.'); + } + }; +} + /** * Initializes a new proxy and loads the web worker * @constructor - * @param {String} path The path to the worker or 'openpgp.worker.js' by default - * @param {Object} config config The worker configuration - * @param {Object} worker alternative to path parameter: web worker initialized with 'openpgp.worker.js' + * @param {String} path The path to the worker or 'openpgp.worker.js' by default + * @param {Number} n number of workers to initialize + * @param {Object} config config The worker configuration + * @param {Array} worker alternative to path parameter: web worker initialized with 'openpgp.worker.js' * @return {Promise} */ -export default function AsyncProxy({ path='openpgp.worker.js', worker, config } = {}) { - this.worker = worker || new Worker(path); - this.worker.onmessage = this.onMessage.bind(this); - this.worker.onerror = e => { - throw new Error('Unhandled error in openpgp worker: ' + e.message + ' (' + e.filename + ':' + e.lineno + ')'); - }; +export default function AsyncProxy({ path='openpgp.worker.js', n = 1, workers = [], config } = {}) { - if (config) { - this.worker.postMessage({ event:'configure', config }); + if (workers.length) { + this.workers = workers; } + else { + this.workers = []; + while (this.workers.length < n) { + this.workers.push(new Worker(path)); + } + } + + let workerId = 0; + this.workers.forEach(worker => { + worker.requests = 0; + worker.onmessage = handleMessage(workerId++).bind(this); + worker.onerror = e => { + throw new Error('Unhandled error in openpgp worker: ' + e.message + ' (' + e.filename + ':' + e.lineno + ')'); + }; + + if (config) { + worker.postMessage({ event:'configure', config }); + } + }); // Cannot rely on task order being maintained, use object keyed by request ID to track tasks this.tasks = {}; @@ -51,47 +96,22 @@ AsyncProxy.prototype.getID = function() { return this.currentID++; }; -/** - * Message handling - */ -AsyncProxy.prototype.onMessage = function(event) { - const msg = event.data; - switch (msg.event) { - case 'method-return': - if (msg.err) { - // fail - const err = new Error(msg.err); - // add worker stack - err.workerStack = msg.stack; - this.tasks[msg.id].reject(err); - } else { - // success - this.tasks[msg.id].resolve(msg.data); - } - delete this.tasks[msg.id]; - break; - case 'request-seed': - this.seedRandom(msg.amount); - break; - default: - throw new Error('Unknown Worker Event.'); - } -}; - /** * Send message to worker with random data * @param {Integer} size Number of bytes to send */ -AsyncProxy.prototype.seedRandom = async function(size) { +AsyncProxy.prototype.seedRandom = async function(id, size) { const buf = await crypto.random.getRandomBytes(size); - this.worker.postMessage({ event:'seed-random', buf }, util.getTransferables(buf)); + this.workers[id].postMessage({ event:'seed-random', buf }, util.getTransferables(buf)); }; /** - * Terminates the worker + * Terminates the workers */ AsyncProxy.prototype.terminate = function() { - this.worker.terminate(); + this.workers.forEach(worker => { + worker.terminate(); + }); }; /** @@ -101,11 +121,21 @@ AsyncProxy.prototype.terminate = function() { * @return {Promise} see the corresponding public api functions for their return types */ AsyncProxy.prototype.delegate = function(method, options) { + const id = this.getID(); + const requests = this.workers.map(worker => worker.requests); + const minRequests = Math.min(requests); + let workerId = 0; + for(; workerId < this.workers.length; workerId++) { + if (this.workers[workerId].requests === minRequests) { + break; + } + } return new Promise((resolve, reject) => { // clone packets (for web worker structured cloning algorithm) - this.worker.postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, util.getTransferables(options)); + this.workers[workerId].postMessage({ id:id, event:method, options:packet.clone.clonePackets(options) }, util.getTransferables(options)); + this.workers[workerId].requests++; // remember to handle parsing cloned packets from worker this.tasks[id] = { resolve: data => resolve(packet.clone.parseClonedPackets(data, method)), reject }; diff --git a/test/general/openpgp.js b/test/general/openpgp.js index fb72aebb..e678d292 100644 --- a/test/general/openpgp.js +++ b/test/general/openpgp.js @@ -363,7 +363,7 @@ describe('OpenPGP.js public api tests', function() { postMessage: function() {} }; openpgp.initWorker({ - worker: workerStub + workers: [workerStub] }); expect(openpgp.getWorker()).to.exist; openpgp.destroyWorker(); @@ -522,7 +522,7 @@ describe('OpenPGP.js public api tests', function() { postMessage: function() {} }; openpgp.initWorker({ - worker: workerStub + workers: [workerStub] }); const proxyGenStub = stub(openpgp.getWorker(), 'delegate'); getWebCryptoAllStub.returns(); diff --git a/test/worker/async_proxy.js b/test/worker/async_proxy.js index 6fb765c6..6c6d1c0b 100644 --- a/test/worker/async_proxy.js +++ b/test/worker/async_proxy.js @@ -50,7 +50,7 @@ function tests() { describe('Random number pipeline', function() { it('Random number buffer automatically reseeded', function() { const worker = new Worker('../dist/openpgp.worker.js'); - const wProxy = new openpgp.AsyncProxy({ path:'../dist/openpgp.worker.js', worker }); + const wProxy = new openpgp.AsyncProxy({ path:'../dist/openpgp.worker.js', workers: [worker] }); return wProxy.delegate('encrypt', { publicKeys:[pubKey], data:plaintext }); });