From de2971d84afe39b43a9cfaee7e02ade05fc36ebd Mon Sep 17 00:00:00 2001 From: Daniel Huigens Date: Wed, 6 Jun 2018 16:47:58 +0200 Subject: [PATCH] Use TransformStreams --- .eslintrc.js | 3 +- src/encoding/armor.js | 99 ++++++------- src/message.js | 72 +++++----- src/openpgp.js | 33 ++++- src/packet/clone.js | 9 +- src/packet/sym_encrypted_aead_protected.js | 51 ++++--- src/stream.js | 153 ++++++++++++++------- test/general/signature.js | 2 +- test/general/streaming.js | 5 +- 9 files changed, 267 insertions(+), 160 deletions(-) diff --git a/.eslintrc.js b/.eslintrc.js index 5436e689..22db6956 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -26,7 +26,8 @@ module.exports = { "unescape": true, "postMessage": true, "resolves": true, - "rejects": true + "rejects": true, + "TransformStream": true }, "rules": { diff --git a/src/encoding/armor.js b/src/encoding/armor.js index 9d2057af..5d7c764d 100644 --- a/src/encoding/armor.js +++ b/src/encoding/armor.js @@ -214,11 +214,10 @@ function dearmor(input) { let textDone; let reader; let controller; - let data = base64.decode(stream.from(input, { - start(_controller, _reader) { - controller = _controller; - reader = _reader; - } + let buffer = ''; + let data = base64.decode(stream.transformRaw(input, { + transform: (value, controller) => process(buffer + value, controller), + flush: controller => process(buffer, controller) })); let checksum; const checksumVerified = getCheckSum(stream.clone(data)); @@ -230,53 +229,59 @@ function dearmor(input) { checksumVerifiedString + "'"); } }); - while (true) { - let line = await reader.readLine(); - if (line === undefined) { - controller.error('Misformed armored text'); - break; - } - // remove trailing whitespace at end of lines - // remove leading whitespace for compat with older versions of OpenPGP.js - line = line.trim(); - if (!type) { - if (reSplit.test(line)) { - type = getType(line); - } - } else if (!headersDone) { - if (reSplit.test(line)) { - reject(new Error('Mandatory blank line missing between armor headers and armor data')); - } - if (!reEmptyLine.test(line)) { - lastHeaders.push(line); - } else { - verifyHeaders(lastHeaders); - headersDone = true; - if (textDone || type !== 2) resolve({ text, data, headers, type }); - } - } else if (!textDone && type === 2) { - if (!reSplit.test(line)) { - // Reverse dash-escaping for msg - text.push(line.replace(/^- /, '')); - } else { - text = text.join('\r\n'); - textDone = true; - verifyHeaders(lastHeaders); - lastHeaders = []; - headersDone = false; - } - } else { - if (!reSplit.test(line)) { - if (line[0] !== '=') { - controller.enqueue(line); + function process(value, controller) { + const lineEndIndex = value.indexOf('\n') + 1; + if (lineEndIndex) { + let line = value.substr(0, lineEndIndex); + // remove trailing whitespace at end of lines + // remove leading whitespace for compat with older versions of OpenPGP.js + line = line.trim(); + if (!type) { + if (reSplit.test(line)) { + type = getType(line); + } + } else if (!headersDone) { + if (reSplit.test(line)) { + reject(new Error('Mandatory blank line missing between armor headers and armor data')); + } + if (!reEmptyLine.test(line)) { + lastHeaders.push(line); } else { - checksum = line.substr(1); + verifyHeaders(lastHeaders); + headersDone = true; + if (textDone || type !== 2) resolve({ text, data, headers, type }); + } + } else if (!textDone && type === 2) { + if (!reSplit.test(line)) { + // Reverse dash-escaping for msg + text.push(line.replace(/^- /, '')); + } else { + text = text.join('\r\n'); + textDone = true; + verifyHeaders(lastHeaders); + lastHeaders = []; + headersDone = false; } } else { - controller.close(); - break; + if (!reSplit.test(line)) { + if (line[0] !== '=') { + controller.enqueue(line); + } else { + checksum = line.substr(1); + } + } else { + controller.close(); + return; + } } + process(value.substr(lineEndIndex), controller); + } else { + buffer = value; } + // if (line === undefined) { + // controller.error('Misformed armored text'); + // break; + // } } } catch(e) { reject(e); diff --git a/src/message.js b/src/message.js index 17610eda..21a31cdc 100644 --- a/src/message.js +++ b/src/message.js @@ -544,24 +544,19 @@ Message.prototype.verify = async function(keys, date=new Date()) { } if (msg.packets.stream) { let onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature); - onePassSigList = Array.from(onePassSigList).reverse(); onePassSigList.forEach(onePassSig => { onePassSig.signatureData = stream.fromAsync(() => new Promise(resolve => { onePassSig.signatureDataResolve = resolve; })); onePassSig.hashed = onePassSig.hash(literalDataList[0]); }); - const reader = stream.getReader(msg.packets.stream); - for (let i = 0; ; i++) { - const { done, value } = await reader.read(); - if (done) { - break; - } - onePassSigList[i].signatureDataResolve(value.signatureData); - value.hashed = onePassSigList[i].hashed; - value.hashedData = onePassSigList[i].hashedData; - msg.packets.push(value); - } + return stream.transform(msg.packets.stream, signature => { + const onePassSig = onePassSigList.pop(); + onePassSig.signatureDataResolve(signature.signatureData); + signature.hashed = onePassSig.hashed; + signature.hashedData = onePassSig.hashedData; + return createVerificationObject(signature, literalDataList, keys, date); + }); } const signatureList = msg.packets.filterByTag(enums.packet.signature); return createVerificationObjects(signatureList, literalDataList, keys, date); @@ -585,6 +580,39 @@ Message.prototype.verifyDetached = function(signature, keys, date=new Date()) { return createVerificationObjects(signatureList, literalDataList, keys, date); }; +/** + * Create object containing signer's keyid and validity of signature + * @param {module:packet.Signature} signature signature packets + * @param {Array} literalDataList array of literal data packets + * @param {Array} keys array of keys to verify signatures + * @param {Date} date Verify the signature against the given date, + * i.e. check signature creation time < date < expiration time + * @returns {Promise>} list of signer's keyid and validity of signature + * @async + */ +async function createVerificationObject(signature, literalDataList, keys, date=new Date()) { + let keyPacket = null; + await Promise.all(keys.map(async function(key) { + // Look for the unique key that matches issuerKeyId of signature + const result = await key.getSigningKey(signature.issuerKeyId, date); + if (result) { + keyPacket = result.keyPacket; + } + })); + + const verifiedSig = { + keyid: signature.issuerKeyId, + valid: keyPacket ? await signature.verify(keyPacket, literalDataList[0]) : null + }; + + const packetlist = new packet.List(); + packetlist.push(signature); + verifiedSig.signature = new Signature(packetlist); + + return verifiedSig; +} + /** * Create list of objects containing signer's keyid and validity of signature * @param {Array} signatureList array of signature packets @@ -598,25 +626,7 @@ Message.prototype.verifyDetached = function(signature, keys, date=new Date()) { */ export async function createVerificationObjects(signatureList, literalDataList, keys, date=new Date()) { return Promise.all(signatureList.map(async function(signature) { - let keyPacket = null; - await Promise.all(keys.map(async function(key) { - // Look for the unique key that matches issuerKeyId of signature - const result = await key.getSigningKey(signature.issuerKeyId, date); - if (result) { - keyPacket = result.keyPacket; - } - })); - - const verifiedSig = { - keyid: signature.issuerKeyId, - valid: keyPacket ? await signature.verify(keyPacket, literalDataList[0]) : null - }; - - const packetlist = new packet.List(); - packetlist.push(signature); - verifiedSig.signature = new Signature(packetlist); - - return verifiedSig; + return createVerificationObject(signature, literalDataList, keys, date); })); } diff --git a/src/openpgp.js b/src/openpgp.js index 6ea51b1e..cd9790d6 100644 --- a/src/openpgp.js +++ b/src/openpgp.js @@ -329,6 +329,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se if (armor) { result.data = encrypted.message.armor(); result.data = await convertStream(result.data, asStream); + // result.cancel = stream.cancel.bind(result.data); } else { result.message = encrypted.message; } @@ -370,11 +371,12 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe } const result = {}; - result.signatures = signature ? message.verifyDetached(signature, publicKeys, date) : message.verify(publicKeys, date); - if (!asStream) result.signatures = await result.signatures; + result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date); result.data = format === 'binary' ? message.getLiteralData() : message.getText(); result.data = await convertStream(result.data, asStream); + result.signatures = await convertStreamArray(result.signatures, asStream); result.filename = message.getFilename(); + // result.cancel = stream.cancel.bind(message.packets); return result; }).catch(onError.bind(null, 'Error decrypting message')); } @@ -426,6 +428,7 @@ export function sign({ data, dataType, privateKeys, armor=true, asStream, detach if (armor) { result.data = message.armor(); result.data = await convertStream(result.data, asStream); + // result.cancel = stream.cancel.bind(result.data); } else { result.message = message; } @@ -457,10 +460,11 @@ export function verify({ message, publicKeys, asStream, signature=null, date=new return Promise.resolve().then(async function() { const result = {}; - result.signatures = signature ? message.verifyDetached(signature, publicKeys, date) : message.verify(publicKeys, date); - if (!asStream) result.signatures = await result.signatures; + result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date); result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData(); result.data = await convertStream(result.data, asStream); + result.signatures = await convertStreamArray(result.signatures, asStream); + // result.cancel = stream.cancel.bind(message.packets); return result; }).catch(onError.bind(null, 'Error verifying cleartext signed message')); } @@ -618,6 +622,27 @@ async function convertStream(data, asStream) { return data; } +/** + * Convert data array to or from Stream + * @param {Object} data the data to convert + * @param {Boolean} asStream whether to return a ReadableStream + * @returns {Object} the parse data in the respective format + */ +async function convertStreamArray(data, asStream) { + if (!asStream && util.isStream(data)) { + return stream.readToEnd(data, arr => arr); + } + if (asStream && !util.isStream(data)) { + return new ReadableStream({ + start(controller) { + data.forEach(controller.enqueue.bind(controller)); + controller.close(); + } + }); + } + return data; +} + /** * Global error handler that logs the stack trace and rethrows a high lvl error message. diff --git a/src/packet/clone.js b/src/packet/clone.js index eb20c875..c7479790 100644 --- a/src/packet/clone.js +++ b/src/packet/clone.js @@ -69,9 +69,8 @@ export function clonePackets(options) { options.signature = options.signature.packets; } if (options.signatures) { - if (options.signatures instanceof Promise) { - const signatures = options.signatures; - options.signatures = stream.fromAsync(async () => (await signatures).map(verificationObjectToClone)); + if (util.isStream(options.signatures)) { + options.signatures = stream.transform(options.signatures, verificationObjectToClone); } else { options.signatures.forEach(verificationObjectToClone); } @@ -117,9 +116,7 @@ export function parseClonedPackets(options) { } if (options.signatures) { if (util.isStream(options.signatures)) { - options.signatures = stream.readToEnd(options.signatures, arr => arr).then(([signatures]) => { - return signatures.map(packetlistCloneToSignatures); - }); + options.signatures = stream.transform(options.signatures, packetlistCloneToSignatures); } else { options.signatures = options.signatures.map(packetlistCloneToSignatures); } diff --git a/src/packet/sym_encrypted_aead_protected.js b/src/packet/sym_encrypted_aead_protected.js index e9fdf5ee..ab72eade 100644 --- a/src/packet/sym_encrypted_aead_protected.js +++ b/src/packet/sym_encrypted_aead_protected.js @@ -58,20 +58,21 @@ export default SymEncryptedAEADProtected; * Parse an encrypted payload of bytes in the order: version, IV, ciphertext (see specification) */ SymEncryptedAEADProtected.prototype.read = async function (bytes) { - const reader = stream.getReader(bytes); - if (await reader.readByte() !== VERSION) { // The only currently defined value is 1. - throw new Error('Invalid packet version.'); - } - if (config.aead_protect_version === 4) { - this.cipherAlgo = await reader.readByte(); - this.aeadAlgo = await reader.readByte(); - this.chunkSizeByte = await reader.readByte(); - } else { - this.aeadAlgo = enums.aead.experimental_gcm; - } - const mode = crypto[enums.read(enums.aead, this.aeadAlgo)]; - this.iv = await reader.readBytes(mode.ivLength); - this.encrypted = reader.substream(); + await stream.parse(bytes, async reader => { + if (await reader.readByte() !== VERSION) { // The only currently defined value is 1. + throw new Error('Invalid packet version.'); + } + if (config.aead_protect_version === 4) { + this.cipherAlgo = await reader.readByte(); + this.aeadAlgo = await reader.readByte(); + this.chunkSizeByte = await reader.readByte(); + } else { + this.aeadAlgo = enums.aead.experimental_gcm; + } + const mode = crypto[enums.read(enums.aead, this.aeadAlgo)]; + this.iv = await reader.readBytes(mode.ivLength); + this.encrypted = reader.remainder(); + }); }; /** @@ -143,15 +144,23 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) { let cryptedBytes = 0; let queuedBytes = 0; const iv = this.iv; - return stream.from(data, { - async pull(controller, reader) { - let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array(); + let buffer = []; + return stream.transformRaw(data, { + transform: process, + flush: controller => process(undefined, controller, true) + }); + async function process(value, controller, final) { + if (!final) buffer.push(value); + while (buffer.reduce(((acc, value) => acc + value.length), 0) >= (final ? 0 : chunkSize) + tagLengthIfDecrypting) { + const bufferConcat = util.concatUint8Array(buffer); + let chunk = bufferConcat.subarray(0, chunkSize + tagLengthIfDecrypting); + buffer = [bufferConcat.subarray(chunkSize + tagLengthIfDecrypting)]; const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting); chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting); let cryptedPromise; let done; if (!chunkIndex || chunk.length) { - reader.unshift(finalChunk); + buffer.unshift(finalChunk); cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray); } else { // After the last chunk, we either encrypt a final, empty @@ -173,12 +182,12 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) { } if (!done) { adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...) - await this.pull(controller, reader); } else { - controller.close(); + controller.terminate(); + return; } } - }); + } } else { return modeInstance[fn](await stream.readToEnd(data), this.iv); } diff --git a/src/stream.js b/src/stream.js index bff34a02..251c67eb 100644 --- a/src/stream.js +++ b/src/stream.js @@ -6,29 +6,52 @@ import util from './util'; const nodeStream = util.getNodeStream(); -function concat(arrays) { - const readers = arrays.map(getReader); - let current = 0; +function toStream(input) { + if (util.isStream(input)) { + return input; + } return create({ - async pull(controller) { - try { - const { done, value } = await readers[current].read(); - if (!done) { - controller.enqueue(value); - } else if (++current === arrays.length) { - controller.close(); - } else { - await this.pull(controller); - } - } catch(e) { - controller.error(e); - } + start(controller) { + controller.enqueue(input); + controller.close(); + } + }); +} + +function pipeThrough(input, target, options) { + if (!util.isStream(input)) { + input = toStream(input); + } + return input.pipeThrough(target, options); +} + +function concat(arrays) { + arrays = arrays.map(toStream); + let controller; + const transform = new TransformStream({ + start(_controller) { + controller = _controller; }, - cancel() { - readers.forEach(reader => reader.releaseLock()); + cancel: () => { return Promise.all(arrays.map(cancel)); } }); + (async () => { + for (let i = 0; i < arrays.length; i++) { + // await new Promise(resolve => { + try { + await arrays[i].pipeTo(transform.writable, { + preventClose: i !== arrays.length - 1 + }); + } catch(e) { + console.log(e); + // controller.error(e); + return; + } + // }); + } + })(); + return transform.readable; } function getReader(input) { @@ -45,46 +68,47 @@ function create(options, extraArg) { options.start = wrap(options.start); options.pull = wrap(options.pull); const _cancel = options.cancel; - options.cancel = async controller => { + options.cancel = async reason => { try { - console.log('cancel wrapper', options); + console.log('cancel wrapper', reason, options); await promises.get(options.start); console.log('awaited start'); await promises.get(options.pull); console.log('awaited pull'); } finally { - if (_cancel) return _cancel.call(options, controller, extraArg); + if (_cancel) return _cancel.call(options, reason, extraArg); } }; options.options = options; return new ReadableStream(options); } -function from(input, options) { - const reader = getReader(input); - if (!options.cancel) { - options.cancel = (controller, reader) => { - console.log('from() cancel', stream, input); - reader.releaseLock(); - return cancel(input); - }; - } - options.from = input; - const stream = create(options, reader); - stream.from = input; - return stream; +function transformRaw(input, options) { + options.start = controller => { + if (input.externalBuffer) { + input.externalBuffer.forEach(chunk => { + options.transform(chunk, controller); + }); + } + }; + return toStream(input).pipeThrough(new TransformStream(options)); } function transform(input, process = () => undefined, finish = () => undefined) { if (util.isStream(input)) { - return from(input, { - async pull(controller, reader) { + return transformRaw(input, { + async transform(value, controller) { try { - const { done, value } = await reader.read(); - const result = await (!done ? process : finish)(value); + const result = await process(value); + if (result !== undefined) controller.enqueue(result); + } catch(e) { + controller.error(e); + } + }, + async flush(controller) { + try { + const result = await finish(); if (result !== undefined) controller.enqueue(result); - else if (!done) await this.pull(controller, reader); - if (done) controller.close(); } catch(e) { controller.error(e); } @@ -92,7 +116,7 @@ function transform(input, process = () => undefined, finish = () => undefined) { }); } const result1 = process(input); - const result2 = finish(undefined); + const result2 = finish(); if (result1 !== undefined && result2 !== undefined) return util.concat([result1, result2]); return result1 !== undefined ? result1 : result2; } @@ -130,15 +154,13 @@ function slice(input, begin=0, end=Infinity) { if (util.isStream(input)) { if (begin >= 0 && end >= 0) { let bytesRead = 0; - return from(input, { - async pull (controller, reader) { - const { done, value } = await reader.read(); - if (!done && bytesRead < end) { + return transformRaw(input, { + transform(value, controller) { + if (bytesRead < end) { if (bytesRead + value.length >= begin) { controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead)); } bytesRead += value.length; - await this.pull(controller, reader); // Only necessary if the above call to enqueue() didn't happen } else { controller.close(); } @@ -177,6 +199,41 @@ function slice(input, begin=0, end=Infinity) { return input.slice(begin, end); } +async function parse(input, parser) { + let controller; + const transformed = transformRaw(input, { + start(_controller) { + controller = _controller; + }, + cancel: cancel.bind(input) + }); + transformed[stream.cancelReadsSym] = controller.error.bind(controller); + toStream(input).pipeTo(target); + const reader = getReader(transformed.readable); + await parser(reader); + + + new ReadableStream({ + start(_controller) { + controller = _controller; + }, + pull: () => { + + }, + cancel: () => { + + } + }); + new ReadableStream({ + pull: () => { + + }, + cancel: () => { + + } + }); +} + async function readToEnd(input, join) { if (util.isStream(input)) { return getReader(input).readToEnd(join); @@ -273,7 +330,7 @@ if (nodeStream) { } -export default { concat, getReader, from, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync }; +export default { toStream, concat, getReader, transformRaw, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync, readerAcquiredMap }; const readerAcquiredMap = new Map(); @@ -442,6 +499,8 @@ Reader.prototype.substream = function() { return cancel(this.stream); } }), { from: this.stream }); + this.releaseLock(); + return this.stream; }; Reader.prototype.readToEnd = async function(join=util.concat) { diff --git a/test/general/signature.js b/test/general/signature.js index 5bedfc43..ba6ecddc 100644 --- a/test/general/signature.js +++ b/test/general/signature.js @@ -680,7 +680,7 @@ yYDnCgA= return openpgp.verify({ publicKeys:[pubKey], message:sMsg }).then(async function(cleartextSig) { expect(cleartextSig).to.exist; expect(openpgp.util.nativeEOL(openpgp.util.Uint8Array_to_str(await openpgp.stream.readToEnd(cleartextSig.data)))).to.equal(plaintext); - cleartextSig.signatures = await cleartextSig.signatures; + cleartextSig.signatures = await openpgp.stream.readToEnd(cleartextSig.signatures, arr => arr); expect(cleartextSig.signatures).to.have.length(1); expect(cleartextSig.signatures[0].valid).to.be.true; expect(cleartextSig.signatures[0].signature.packets.length).to.equal(1); diff --git a/test/general/streaming.js b/test/general/streaming.js index 866f4cd4..d6265b3c 100644 --- a/test/general/streaming.js +++ b/test/general/streaming.js @@ -222,10 +222,11 @@ describe('Streaming', function() { format: 'binary' }); expect(util.isStream(decrypted.data)).to.be.true; + expect(util.isStream(decrypted.signatures)).to.be.true; expect(await openpgp.stream.getReader(openpgp.stream.clone(decrypted.data)).readBytes(1024)).to.deep.equal(plaintext[0]); if (i > 10) throw new Error('Data did not arrive early.'); expect(await openpgp.stream.readToEnd(decrypted.data)).to.deep.equal(util.concatUint8Array(plaintext)); - expect(await decrypted.signatures).to.exist.and.have.length(0); + expect(await openpgp.stream.readToEnd(decrypted.signatures, arr => arr)).to.exist.and.have.length(0); } finally { openpgp.config.unsafe_stream = unsafe_streamValue; } @@ -363,7 +364,7 @@ describe('Streaming', function() { expect(await openpgp.stream.getReader(openpgp.stream.clone(decrypted.data)).readBytes(10)).not.to.deep.equal(plaintext[0]); if (i > 10) throw new Error('Data did not arrive early.'); await openpgp.stream.readToEnd(decrypted.data); - expect(decrypted.signatures).to.be.rejectedWith('Ascii armor integrity check on message failed'); + expect(openpgp.stream.readToEnd(decrypted.signatures, arr => arr)).to.be.rejectedWith('Ascii armor integrity check on message failed'); } finally { openpgp.config.unsafe_stream = unsafe_streamValue; }