From d844b8b06c7cca5ca10d823fe2f73a4c5d1e5a35 Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Fri, 6 Jul 2018 16:08:14 +0200 Subject: [PATCH] Add minimum AEAD buffer size This enables parallelism for streaming AEAD chunked encryption. The reason we can't do so at the very end of the pipe chain (e.g., in `readToEnd`) is because requests for increased buffering (i.e. `desiredSize > 1`) do not propagate backwards, only requests for backpressure (i.e. `desiredSize <= 0`) do. --- src/packet/sym_encrypted_aead_protected.js | 10 +++- src/util.js | 9 +++ test/general/packet.js | 64 ++++++++++++++++++++++ test/general/streaming.js | 3 + 4 files changed, 83 insertions(+), 3 deletions(-) diff --git a/src/packet/sym_encrypted_aead_protected.js b/src/packet/sym_encrypted_aead_protected.js index 8ea728a9..d7ce14fd 100644 --- a/src/packet/sym_encrypted_aead_protected.js +++ b/src/packet/sym_encrypted_aead_protected.js @@ -146,10 +146,14 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) { const iv = this.iv; return stream.transformPair(data, async (readable, writable) => { const reader = stream.getReader(readable); - const writer = stream.getWriter(writable); + const buffer = new TransformStream({}, { + highWaterMark: util.getHardwareConcurrency() * 2 ** (config.aead_chunk_size_byte + 6), + size: array => array.length + }); + stream.pipe(buffer.readable, writable); + const writer = stream.getWriter(buffer.writable); try { while (true) { - await writer.ready; let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array(); const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting); chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting); @@ -170,10 +174,10 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) { queuedBytes += chunk.length - tagLengthIfDecrypting; // eslint-disable-next-line no-loop-func latestPromise = latestPromise.then(() => cryptedPromise).then(async crypted => { + await writer.ready; await writer.write(crypted); queuedBytes -= chunk.length; }).catch(err => writer.abort(err)); - // console.log(fn, done, queuedBytes, writer.desiredSize); if (done || queuedBytes > writer.desiredSize) { await latestPromise; // Respect backpressure } diff --git a/src/util.js b/src/util.js index 64df40a2..963bf645 100644 --- a/src/util.js +++ b/src/util.js @@ -683,6 +683,15 @@ export default { return (util.nodeRequire('util') || {}).TextDecoder; }, + getHardwareConcurrency: function() { + if (util.detectNode()) { + const os = util.nodeRequire('os'); + return os.cpus().length; + } + + return navigator.hardwareConcurrency || 1; + }, + isEmailAddress: function(data) { if (!util.isString(data)) { return false; diff --git a/test/general/packet.js b/test/general/packet.js index fb93572b..71c8a930 100644 --- a/test/general/packet.js +++ b/test/general/packet.js @@ -181,6 +181,70 @@ describe("Packet", function() { }); }); + function cryptStub(webCrypto, method) { + const crypt = webCrypto[method]; + const cryptStub = stub(webCrypto, method); + let cryptCallsActive = 0; + cryptStub.onCall(0).callsFake(async function() { + cryptCallsActive++; + try { + return await crypt.apply(this, arguments); + } finally { + cryptCallsActive--; + } + }); + cryptStub.onCall(1).callsFake(function() { + expect(cryptCallsActive).to.equal(1); + return crypt.apply(this, arguments); + }); + cryptStub.callThrough(); + return cryptStub; + } + + it('Sym. encrypted AEAD protected packet is encrypted in parallel (GCM, draft04)', function() { + const webCrypto = openpgp.util.getWebCrypto(); + if (!webCrypto) return; + const encryptStub = cryptStub(webCrypto, 'encrypt'); + const decryptStub = cryptStub(webCrypto, 'decrypt'); + + let aead_protectVal = openpgp.config.aead_protect; + let aead_protect_versionVal = openpgp.config.aead_protect_version; + let aead_chunk_size_byteVal = openpgp.config.aead_chunk_size_byte; + openpgp.config.aead_protect = true; + openpgp.config.aead_protect_version = 4; + openpgp.config.aead_chunk_size_byte = 0; + const testText = input.createSomeMessage(); + + const key = new Uint8Array([1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2]); + const algo = 'aes256'; + + const literal = new openpgp.packet.Literal(); + const enc = new openpgp.packet.SymEncryptedAEADProtected(); + const msg = new openpgp.packet.List(); + enc.aeadAlgorithm = 'experimental_gcm'; + + msg.push(enc); + literal.setText(testText); + enc.packets.push(literal); + + const msg2 = new openpgp.packet.List(); + + return enc.encrypt(algo, key).then(async function() { + await msg2.read(msg.write()); + return msg2[0].decrypt(algo, key); + }).then(async function() { + expect(await openpgp.stream.readToEnd(msg2[0].packets[0].data)).to.deep.equal(literal.data); + expect(encryptStub.callCount > 1).to.be.true; + expect(decryptStub.callCount > 1).to.be.true; + }).finally(function() { + openpgp.config.aead_protect = aead_protectVal; + openpgp.config.aead_protect_version = aead_protect_versionVal; + openpgp.config.aead_chunk_size_byte = aead_chunk_size_byteVal; + encryptStub.restore(); + decryptStub.restore(); + }); + }); + it('Sym. encrypted AEAD protected packet test vector (draft04)', function() { // From https://gitlab.com/openpgp-wg/rfc4880bis/commit/00b20923e6233fb6ff1666ecd5acfefceb32907d diff --git a/test/general/streaming.js b/test/general/streaming.js index 6437132a..dec61991 100644 --- a/test/general/streaming.js +++ b/test/general/streaming.js @@ -789,6 +789,8 @@ describe('Streaming', function() { let aead_chunk_size_byteValue = openpgp.config.aead_chunk_size_byte; openpgp.config.aead_protect = true; openpgp.config.aead_chunk_size_byte = 4; + let coresStub = stub(openpgp.util, 'getHardwareConcurrency'); + coresStub.returns(1); try { let plaintext = []; let i = 0; @@ -824,6 +826,7 @@ describe('Streaming', function() { } finally { openpgp.config.aead_protect = aead_protectValue; openpgp.config.aead_chunk_size_byte = aead_chunk_size_byteValue; + coresStub.restore(); } });