Use TransformStreams

This commit is contained in:
Daniel Huigens 2018-06-06 16:47:58 +02:00
parent 51c897b073
commit de2971d84a
9 changed files with 267 additions and 160 deletions

View File

@ -26,7 +26,8 @@ module.exports = {
"unescape": true,
"postMessage": true,
"resolves": true,
"rejects": true
"rejects": true,
"TransformStream": true
},
"rules": {

View File

@ -214,11 +214,10 @@ function dearmor(input) {
let textDone;
let reader;
let controller;
let data = base64.decode(stream.from(input, {
start(_controller, _reader) {
controller = _controller;
reader = _reader;
}
let buffer = '';
let data = base64.decode(stream.transformRaw(input, {
transform: (value, controller) => process(buffer + value, controller),
flush: controller => process(buffer, controller)
}));
let checksum;
const checksumVerified = getCheckSum(stream.clone(data));
@ -230,53 +229,59 @@ function dearmor(input) {
checksumVerifiedString + "'");
}
});
while (true) {
let line = await reader.readLine();
if (line === undefined) {
controller.error('Misformed armored text');
break;
}
// remove trailing whitespace at end of lines
// remove leading whitespace for compat with older versions of OpenPGP.js
line = line.trim();
if (!type) {
if (reSplit.test(line)) {
type = getType(line);
}
} else if (!headersDone) {
if (reSplit.test(line)) {
reject(new Error('Mandatory blank line missing between armor headers and armor data'));
}
if (!reEmptyLine.test(line)) {
lastHeaders.push(line);
} else {
verifyHeaders(lastHeaders);
headersDone = true;
if (textDone || type !== 2) resolve({ text, data, headers, type });
}
} else if (!textDone && type === 2) {
if (!reSplit.test(line)) {
// Reverse dash-escaping for msg
text.push(line.replace(/^- /, ''));
} else {
text = text.join('\r\n');
textDone = true;
verifyHeaders(lastHeaders);
lastHeaders = [];
headersDone = false;
}
} else {
if (!reSplit.test(line)) {
if (line[0] !== '=') {
controller.enqueue(line);
function process(value, controller) {
const lineEndIndex = value.indexOf('\n') + 1;
if (lineEndIndex) {
let line = value.substr(0, lineEndIndex);
// remove trailing whitespace at end of lines
// remove leading whitespace for compat with older versions of OpenPGP.js
line = line.trim();
if (!type) {
if (reSplit.test(line)) {
type = getType(line);
}
} else if (!headersDone) {
if (reSplit.test(line)) {
reject(new Error('Mandatory blank line missing between armor headers and armor data'));
}
if (!reEmptyLine.test(line)) {
lastHeaders.push(line);
} else {
checksum = line.substr(1);
verifyHeaders(lastHeaders);
headersDone = true;
if (textDone || type !== 2) resolve({ text, data, headers, type });
}
} else if (!textDone && type === 2) {
if (!reSplit.test(line)) {
// Reverse dash-escaping for msg
text.push(line.replace(/^- /, ''));
} else {
text = text.join('\r\n');
textDone = true;
verifyHeaders(lastHeaders);
lastHeaders = [];
headersDone = false;
}
} else {
controller.close();
break;
if (!reSplit.test(line)) {
if (line[0] !== '=') {
controller.enqueue(line);
} else {
checksum = line.substr(1);
}
} else {
controller.close();
return;
}
}
process(value.substr(lineEndIndex), controller);
} else {
buffer = value;
}
// if (line === undefined) {
// controller.error('Misformed armored text');
// break;
// }
}
} catch(e) {
reject(e);

View File

@ -544,24 +544,19 @@ Message.prototype.verify = async function(keys, date=new Date()) {
}
if (msg.packets.stream) {
let onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature);
onePassSigList = Array.from(onePassSigList).reverse();
onePassSigList.forEach(onePassSig => {
onePassSig.signatureData = stream.fromAsync(() => new Promise(resolve => {
onePassSig.signatureDataResolve = resolve;
}));
onePassSig.hashed = onePassSig.hash(literalDataList[0]);
});
const reader = stream.getReader(msg.packets.stream);
for (let i = 0; ; i++) {
const { done, value } = await reader.read();
if (done) {
break;
}
onePassSigList[i].signatureDataResolve(value.signatureData);
value.hashed = onePassSigList[i].hashed;
value.hashedData = onePassSigList[i].hashedData;
msg.packets.push(value);
}
return stream.transform(msg.packets.stream, signature => {
const onePassSig = onePassSigList.pop();
onePassSig.signatureDataResolve(signature.signatureData);
signature.hashed = onePassSig.hashed;
signature.hashedData = onePassSig.hashedData;
return createVerificationObject(signature, literalDataList, keys, date);
});
}
const signatureList = msg.packets.filterByTag(enums.packet.signature);
return createVerificationObjects(signatureList, literalDataList, keys, date);
@ -585,6 +580,39 @@ Message.prototype.verifyDetached = function(signature, keys, date=new Date()) {
return createVerificationObjects(signatureList, literalDataList, keys, date);
};
/**
* Create object containing signer's keyid and validity of signature
* @param {module:packet.Signature} signature signature packets
* @param {Array<module:packet.Literal>} literalDataList array of literal data packets
* @param {Array<module:key.Key>} keys array of keys to verify signatures
* @param {Date} date Verify the signature against the given date,
* i.e. check signature creation time < date < expiration time
* @returns {Promise<Array<{keyid: module:type/keyid,
* valid: Boolean}>>} list of signer's keyid and validity of signature
* @async
*/
async function createVerificationObject(signature, literalDataList, keys, date=new Date()) {
let keyPacket = null;
await Promise.all(keys.map(async function(key) {
// Look for the unique key that matches issuerKeyId of signature
const result = await key.getSigningKey(signature.issuerKeyId, date);
if (result) {
keyPacket = result.keyPacket;
}
}));
const verifiedSig = {
keyid: signature.issuerKeyId,
valid: keyPacket ? await signature.verify(keyPacket, literalDataList[0]) : null
};
const packetlist = new packet.List();
packetlist.push(signature);
verifiedSig.signature = new Signature(packetlist);
return verifiedSig;
}
/**
* Create list of objects containing signer's keyid and validity of signature
* @param {Array<module:packet.Signature>} signatureList array of signature packets
@ -598,25 +626,7 @@ Message.prototype.verifyDetached = function(signature, keys, date=new Date()) {
*/
export async function createVerificationObjects(signatureList, literalDataList, keys, date=new Date()) {
return Promise.all(signatureList.map(async function(signature) {
let keyPacket = null;
await Promise.all(keys.map(async function(key) {
// Look for the unique key that matches issuerKeyId of signature
const result = await key.getSigningKey(signature.issuerKeyId, date);
if (result) {
keyPacket = result.keyPacket;
}
}));
const verifiedSig = {
keyid: signature.issuerKeyId,
valid: keyPacket ? await signature.verify(keyPacket, literalDataList[0]) : null
};
const packetlist = new packet.List();
packetlist.push(signature);
verifiedSig.signature = new Signature(packetlist);
return verifiedSig;
return createVerificationObject(signature, literalDataList, keys, date);
}));
}

View File

@ -329,6 +329,7 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
if (armor) {
result.data = encrypted.message.armor();
result.data = await convertStream(result.data, asStream);
// result.cancel = stream.cancel.bind(result.data);
} else {
result.message = encrypted.message;
}
@ -370,11 +371,12 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe
}
const result = {};
result.signatures = signature ? message.verifyDetached(signature, publicKeys, date) : message.verify(publicKeys, date);
if (!asStream) result.signatures = await result.signatures;
result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date);
result.data = format === 'binary' ? message.getLiteralData() : message.getText();
result.data = await convertStream(result.data, asStream);
result.signatures = await convertStreamArray(result.signatures, asStream);
result.filename = message.getFilename();
// result.cancel = stream.cancel.bind(message.packets);
return result;
}).catch(onError.bind(null, 'Error decrypting message'));
}
@ -426,6 +428,7 @@ export function sign({ data, dataType, privateKeys, armor=true, asStream, detach
if (armor) {
result.data = message.armor();
result.data = await convertStream(result.data, asStream);
// result.cancel = stream.cancel.bind(result.data);
} else {
result.message = message;
}
@ -457,10 +460,11 @@ export function verify({ message, publicKeys, asStream, signature=null, date=new
return Promise.resolve().then(async function() {
const result = {};
result.signatures = signature ? message.verifyDetached(signature, publicKeys, date) : message.verify(publicKeys, date);
if (!asStream) result.signatures = await result.signatures;
result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date);
result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData();
result.data = await convertStream(result.data, asStream);
result.signatures = await convertStreamArray(result.signatures, asStream);
// result.cancel = stream.cancel.bind(message.packets);
return result;
}).catch(onError.bind(null, 'Error verifying cleartext signed message'));
}
@ -618,6 +622,27 @@ async function convertStream(data, asStream) {
return data;
}
/**
* Convert data array to or from Stream
* @param {Object} data the data to convert
* @param {Boolean} asStream whether to return a ReadableStream
* @returns {Object} the parse data in the respective format
*/
async function convertStreamArray(data, asStream) {
if (!asStream && util.isStream(data)) {
return stream.readToEnd(data, arr => arr);
}
if (asStream && !util.isStream(data)) {
return new ReadableStream({
start(controller) {
data.forEach(controller.enqueue.bind(controller));
controller.close();
}
});
}
return data;
}
/**
* Global error handler that logs the stack trace and rethrows a high lvl error message.

View File

@ -69,9 +69,8 @@ export function clonePackets(options) {
options.signature = options.signature.packets;
}
if (options.signatures) {
if (options.signatures instanceof Promise) {
const signatures = options.signatures;
options.signatures = stream.fromAsync(async () => (await signatures).map(verificationObjectToClone));
if (util.isStream(options.signatures)) {
options.signatures = stream.transform(options.signatures, verificationObjectToClone);
} else {
options.signatures.forEach(verificationObjectToClone);
}
@ -117,9 +116,7 @@ export function parseClonedPackets(options) {
}
if (options.signatures) {
if (util.isStream(options.signatures)) {
options.signatures = stream.readToEnd(options.signatures, arr => arr).then(([signatures]) => {
return signatures.map(packetlistCloneToSignatures);
});
options.signatures = stream.transform(options.signatures, packetlistCloneToSignatures);
} else {
options.signatures = options.signatures.map(packetlistCloneToSignatures);
}

View File

@ -58,20 +58,21 @@ export default SymEncryptedAEADProtected;
* Parse an encrypted payload of bytes in the order: version, IV, ciphertext (see specification)
*/
SymEncryptedAEADProtected.prototype.read = async function (bytes) {
const reader = stream.getReader(bytes);
if (await reader.readByte() !== VERSION) { // The only currently defined value is 1.
throw new Error('Invalid packet version.');
}
if (config.aead_protect_version === 4) {
this.cipherAlgo = await reader.readByte();
this.aeadAlgo = await reader.readByte();
this.chunkSizeByte = await reader.readByte();
} else {
this.aeadAlgo = enums.aead.experimental_gcm;
}
const mode = crypto[enums.read(enums.aead, this.aeadAlgo)];
this.iv = await reader.readBytes(mode.ivLength);
this.encrypted = reader.substream();
await stream.parse(bytes, async reader => {
if (await reader.readByte() !== VERSION) { // The only currently defined value is 1.
throw new Error('Invalid packet version.');
}
if (config.aead_protect_version === 4) {
this.cipherAlgo = await reader.readByte();
this.aeadAlgo = await reader.readByte();
this.chunkSizeByte = await reader.readByte();
} else {
this.aeadAlgo = enums.aead.experimental_gcm;
}
const mode = crypto[enums.read(enums.aead, this.aeadAlgo)];
this.iv = await reader.readBytes(mode.ivLength);
this.encrypted = reader.remainder();
});
};
/**
@ -143,15 +144,23 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
let cryptedBytes = 0;
let queuedBytes = 0;
const iv = this.iv;
return stream.from(data, {
async pull(controller, reader) {
let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array();
let buffer = [];
return stream.transformRaw(data, {
transform: process,
flush: controller => process(undefined, controller, true)
});
async function process(value, controller, final) {
if (!final) buffer.push(value);
while (buffer.reduce(((acc, value) => acc + value.length), 0) >= (final ? 0 : chunkSize) + tagLengthIfDecrypting) {
const bufferConcat = util.concatUint8Array(buffer);
let chunk = bufferConcat.subarray(0, chunkSize + tagLengthIfDecrypting);
buffer = [bufferConcat.subarray(chunkSize + tagLengthIfDecrypting)];
const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting);
chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting);
let cryptedPromise;
let done;
if (!chunkIndex || chunk.length) {
reader.unshift(finalChunk);
buffer.unshift(finalChunk);
cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray);
} else {
// After the last chunk, we either encrypt a final, empty
@ -173,12 +182,12 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
}
if (!done) {
adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...)
await this.pull(controller, reader);
} else {
controller.close();
controller.terminate();
return;
}
}
});
}
} else {
return modeInstance[fn](await stream.readToEnd(data), this.iv);
}

View File

@ -6,29 +6,52 @@ import util from './util';
const nodeStream = util.getNodeStream();
function concat(arrays) {
const readers = arrays.map(getReader);
let current = 0;
function toStream(input) {
if (util.isStream(input)) {
return input;
}
return create({
async pull(controller) {
try {
const { done, value } = await readers[current].read();
if (!done) {
controller.enqueue(value);
} else if (++current === arrays.length) {
controller.close();
} else {
await this.pull(controller);
}
} catch(e) {
controller.error(e);
}
start(controller) {
controller.enqueue(input);
controller.close();
}
});
}
function pipeThrough(input, target, options) {
if (!util.isStream(input)) {
input = toStream(input);
}
return input.pipeThrough(target, options);
}
function concat(arrays) {
arrays = arrays.map(toStream);
let controller;
const transform = new TransformStream({
start(_controller) {
controller = _controller;
},
cancel() {
readers.forEach(reader => reader.releaseLock());
cancel: () => {
return Promise.all(arrays.map(cancel));
}
});
(async () => {
for (let i = 0; i < arrays.length; i++) {
// await new Promise(resolve => {
try {
await arrays[i].pipeTo(transform.writable, {
preventClose: i !== arrays.length - 1
});
} catch(e) {
console.log(e);
// controller.error(e);
return;
}
// });
}
})();
return transform.readable;
}
function getReader(input) {
@ -45,46 +68,47 @@ function create(options, extraArg) {
options.start = wrap(options.start);
options.pull = wrap(options.pull);
const _cancel = options.cancel;
options.cancel = async controller => {
options.cancel = async reason => {
try {
console.log('cancel wrapper', options);
console.log('cancel wrapper', reason, options);
await promises.get(options.start);
console.log('awaited start');
await promises.get(options.pull);
console.log('awaited pull');
} finally {
if (_cancel) return _cancel.call(options, controller, extraArg);
if (_cancel) return _cancel.call(options, reason, extraArg);
}
};
options.options = options;
return new ReadableStream(options);
}
function from(input, options) {
const reader = getReader(input);
if (!options.cancel) {
options.cancel = (controller, reader) => {
console.log('from() cancel', stream, input);
reader.releaseLock();
return cancel(input);
};
}
options.from = input;
const stream = create(options, reader);
stream.from = input;
return stream;
function transformRaw(input, options) {
options.start = controller => {
if (input.externalBuffer) {
input.externalBuffer.forEach(chunk => {
options.transform(chunk, controller);
});
}
};
return toStream(input).pipeThrough(new TransformStream(options));
}
function transform(input, process = () => undefined, finish = () => undefined) {
if (util.isStream(input)) {
return from(input, {
async pull(controller, reader) {
return transformRaw(input, {
async transform(value, controller) {
try {
const { done, value } = await reader.read();
const result = await (!done ? process : finish)(value);
const result = await process(value);
if (result !== undefined) controller.enqueue(result);
} catch(e) {
controller.error(e);
}
},
async flush(controller) {
try {
const result = await finish();
if (result !== undefined) controller.enqueue(result);
else if (!done) await this.pull(controller, reader);
if (done) controller.close();
} catch(e) {
controller.error(e);
}
@ -92,7 +116,7 @@ function transform(input, process = () => undefined, finish = () => undefined) {
});
}
const result1 = process(input);
const result2 = finish(undefined);
const result2 = finish();
if (result1 !== undefined && result2 !== undefined) return util.concat([result1, result2]);
return result1 !== undefined ? result1 : result2;
}
@ -130,15 +154,13 @@ function slice(input, begin=0, end=Infinity) {
if (util.isStream(input)) {
if (begin >= 0 && end >= 0) {
let bytesRead = 0;
return from(input, {
async pull (controller, reader) {
const { done, value } = await reader.read();
if (!done && bytesRead < end) {
return transformRaw(input, {
transform(value, controller) {
if (bytesRead < end) {
if (bytesRead + value.length >= begin) {
controller.enqueue(slice(value, Math.max(begin - bytesRead, 0), end - bytesRead));
}
bytesRead += value.length;
await this.pull(controller, reader); // Only necessary if the above call to enqueue() didn't happen
} else {
controller.close();
}
@ -177,6 +199,41 @@ function slice(input, begin=0, end=Infinity) {
return input.slice(begin, end);
}
async function parse(input, parser) {
let controller;
const transformed = transformRaw(input, {
start(_controller) {
controller = _controller;
},
cancel: cancel.bind(input)
});
transformed[stream.cancelReadsSym] = controller.error.bind(controller);
toStream(input).pipeTo(target);
const reader = getReader(transformed.readable);
await parser(reader);
new ReadableStream({
start(_controller) {
controller = _controller;
},
pull: () => {
},
cancel: () => {
}
});
new ReadableStream({
pull: () => {
},
cancel: () => {
}
});
}
async function readToEnd(input, join) {
if (util.isStream(input)) {
return getReader(input).readToEnd(join);
@ -273,7 +330,7 @@ if (nodeStream) {
}
export default { concat, getReader, from, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync };
export default { toStream, concat, getReader, transformRaw, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync, readerAcquiredMap };
const readerAcquiredMap = new Map();
@ -442,6 +499,8 @@ Reader.prototype.substream = function() {
return cancel(this.stream);
}
}), { from: this.stream });
this.releaseLock();
return this.stream;
};
Reader.prototype.readToEnd = async function(join=util.concat) {

View File

@ -680,7 +680,7 @@ yYDnCgA=
return openpgp.verify({ publicKeys:[pubKey], message:sMsg }).then(async function(cleartextSig) {
expect(cleartextSig).to.exist;
expect(openpgp.util.nativeEOL(openpgp.util.Uint8Array_to_str(await openpgp.stream.readToEnd(cleartextSig.data)))).to.equal(plaintext);
cleartextSig.signatures = await cleartextSig.signatures;
cleartextSig.signatures = await openpgp.stream.readToEnd(cleartextSig.signatures, arr => arr);
expect(cleartextSig.signatures).to.have.length(1);
expect(cleartextSig.signatures[0].valid).to.be.true;
expect(cleartextSig.signatures[0].signature.packets.length).to.equal(1);

View File

@ -222,10 +222,11 @@ describe('Streaming', function() {
format: 'binary'
});
expect(util.isStream(decrypted.data)).to.be.true;
expect(util.isStream(decrypted.signatures)).to.be.true;
expect(await openpgp.stream.getReader(openpgp.stream.clone(decrypted.data)).readBytes(1024)).to.deep.equal(plaintext[0]);
if (i > 10) throw new Error('Data did not arrive early.');
expect(await openpgp.stream.readToEnd(decrypted.data)).to.deep.equal(util.concatUint8Array(plaintext));
expect(await decrypted.signatures).to.exist.and.have.length(0);
expect(await openpgp.stream.readToEnd(decrypted.signatures, arr => arr)).to.exist.and.have.length(0);
} finally {
openpgp.config.unsafe_stream = unsafe_streamValue;
}
@ -363,7 +364,7 @@ describe('Streaming', function() {
expect(await openpgp.stream.getReader(openpgp.stream.clone(decrypted.data)).readBytes(10)).not.to.deep.equal(plaintext[0]);
if (i > 10) throw new Error('Data did not arrive early.');
await openpgp.stream.readToEnd(decrypted.data);
expect(decrypted.signatures).to.be.rejectedWith('Ascii armor integrity check on message failed');
expect(openpgp.stream.readToEnd(decrypted.signatures, arr => arr)).to.be.rejectedWith('Ascii armor integrity check on message failed');
} finally {
openpgp.config.unsafe_stream = unsafe_streamValue;
}