Add minimum AEAD buffer size
This enables parallelism for streaming AEAD chunked encryption. The reason we can't do so at the very end of the pipe chain (e.g., in `readToEnd`) is because requests for increased buffering (i.e. `desiredSize > 1`) do not propagate backwards, only requests for backpressure (i.e. `desiredSize <= 0`) do.
This commit is contained in:
parent
54d5bd7d39
commit
d844b8b06c
|
@ -146,10 +146,14 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
|
|||
const iv = this.iv;
|
||||
return stream.transformPair(data, async (readable, writable) => {
|
||||
const reader = stream.getReader(readable);
|
||||
const writer = stream.getWriter(writable);
|
||||
const buffer = new TransformStream({}, {
|
||||
highWaterMark: util.getHardwareConcurrency() * 2 ** (config.aead_chunk_size_byte + 6),
|
||||
size: array => array.length
|
||||
});
|
||||
stream.pipe(buffer.readable, writable);
|
||||
const writer = stream.getWriter(buffer.writable);
|
||||
try {
|
||||
while (true) {
|
||||
await writer.ready;
|
||||
let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array();
|
||||
const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting);
|
||||
chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting);
|
||||
|
@ -170,10 +174,10 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
|
|||
queuedBytes += chunk.length - tagLengthIfDecrypting;
|
||||
// eslint-disable-next-line no-loop-func
|
||||
latestPromise = latestPromise.then(() => cryptedPromise).then(async crypted => {
|
||||
await writer.ready;
|
||||
await writer.write(crypted);
|
||||
queuedBytes -= chunk.length;
|
||||
}).catch(err => writer.abort(err));
|
||||
// console.log(fn, done, queuedBytes, writer.desiredSize);
|
||||
if (done || queuedBytes > writer.desiredSize) {
|
||||
await latestPromise; // Respect backpressure
|
||||
}
|
||||
|
|
|
@ -683,6 +683,15 @@ export default {
|
|||
return (util.nodeRequire('util') || {}).TextDecoder;
|
||||
},
|
||||
|
||||
getHardwareConcurrency: function() {
|
||||
if (util.detectNode()) {
|
||||
const os = util.nodeRequire('os');
|
||||
return os.cpus().length;
|
||||
}
|
||||
|
||||
return navigator.hardwareConcurrency || 1;
|
||||
},
|
||||
|
||||
isEmailAddress: function(data) {
|
||||
if (!util.isString(data)) {
|
||||
return false;
|
||||
|
|
|
@ -181,6 +181,70 @@ describe("Packet", function() {
|
|||
});
|
||||
});
|
||||
|
||||
function cryptStub(webCrypto, method) {
|
||||
const crypt = webCrypto[method];
|
||||
const cryptStub = stub(webCrypto, method);
|
||||
let cryptCallsActive = 0;
|
||||
cryptStub.onCall(0).callsFake(async function() {
|
||||
cryptCallsActive++;
|
||||
try {
|
||||
return await crypt.apply(this, arguments);
|
||||
} finally {
|
||||
cryptCallsActive--;
|
||||
}
|
||||
});
|
||||
cryptStub.onCall(1).callsFake(function() {
|
||||
expect(cryptCallsActive).to.equal(1);
|
||||
return crypt.apply(this, arguments);
|
||||
});
|
||||
cryptStub.callThrough();
|
||||
return cryptStub;
|
||||
}
|
||||
|
||||
it('Sym. encrypted AEAD protected packet is encrypted in parallel (GCM, draft04)', function() {
|
||||
const webCrypto = openpgp.util.getWebCrypto();
|
||||
if (!webCrypto) return;
|
||||
const encryptStub = cryptStub(webCrypto, 'encrypt');
|
||||
const decryptStub = cryptStub(webCrypto, 'decrypt');
|
||||
|
||||
let aead_protectVal = openpgp.config.aead_protect;
|
||||
let aead_protect_versionVal = openpgp.config.aead_protect_version;
|
||||
let aead_chunk_size_byteVal = openpgp.config.aead_chunk_size_byte;
|
||||
openpgp.config.aead_protect = true;
|
||||
openpgp.config.aead_protect_version = 4;
|
||||
openpgp.config.aead_chunk_size_byte = 0;
|
||||
const testText = input.createSomeMessage();
|
||||
|
||||
const key = new Uint8Array([1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0,1,2]);
|
||||
const algo = 'aes256';
|
||||
|
||||
const literal = new openpgp.packet.Literal();
|
||||
const enc = new openpgp.packet.SymEncryptedAEADProtected();
|
||||
const msg = new openpgp.packet.List();
|
||||
enc.aeadAlgorithm = 'experimental_gcm';
|
||||
|
||||
msg.push(enc);
|
||||
literal.setText(testText);
|
||||
enc.packets.push(literal);
|
||||
|
||||
const msg2 = new openpgp.packet.List();
|
||||
|
||||
return enc.encrypt(algo, key).then(async function() {
|
||||
await msg2.read(msg.write());
|
||||
return msg2[0].decrypt(algo, key);
|
||||
}).then(async function() {
|
||||
expect(await openpgp.stream.readToEnd(msg2[0].packets[0].data)).to.deep.equal(literal.data);
|
||||
expect(encryptStub.callCount > 1).to.be.true;
|
||||
expect(decryptStub.callCount > 1).to.be.true;
|
||||
}).finally(function() {
|
||||
openpgp.config.aead_protect = aead_protectVal;
|
||||
openpgp.config.aead_protect_version = aead_protect_versionVal;
|
||||
openpgp.config.aead_chunk_size_byte = aead_chunk_size_byteVal;
|
||||
encryptStub.restore();
|
||||
decryptStub.restore();
|
||||
});
|
||||
});
|
||||
|
||||
it('Sym. encrypted AEAD protected packet test vector (draft04)', function() {
|
||||
// From https://gitlab.com/openpgp-wg/rfc4880bis/commit/00b20923e6233fb6ff1666ecd5acfefceb32907d
|
||||
|
||||
|
|
|
@ -789,6 +789,8 @@ describe('Streaming', function() {
|
|||
let aead_chunk_size_byteValue = openpgp.config.aead_chunk_size_byte;
|
||||
openpgp.config.aead_protect = true;
|
||||
openpgp.config.aead_chunk_size_byte = 4;
|
||||
let coresStub = stub(openpgp.util, 'getHardwareConcurrency');
|
||||
coresStub.returns(1);
|
||||
try {
|
||||
let plaintext = [];
|
||||
let i = 0;
|
||||
|
@ -824,6 +826,7 @@ describe('Streaming', function() {
|
|||
} finally {
|
||||
openpgp.config.aead_protect = aead_protectValue;
|
||||
openpgp.config.aead_chunk_size_byte = aead_chunk_size_byteValue;
|
||||
coresStub.restore();
|
||||
}
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user