Implement cancellation by manually linking together input and output streams

This commit is contained in:
Daniel Huigens 2018-06-08 16:40:02 +02:00
parent de2971d84a
commit ddda6a0b16
14 changed files with 285 additions and 287 deletions

View File

@ -72,6 +72,7 @@
"whatwg-fetch": "^2.0.3" "whatwg-fetch": "^2.0.3"
}, },
"dependencies": { "dependencies": {
"@mattiasbuelens/web-streams-polyfill": "0.1.0-alpha.4",
"address-rfc2822": "^2.0.3", "address-rfc2822": "^2.0.3",
"asmcrypto.js": "^0.22.0", "asmcrypto.js": "^0.22.0",
"asn1.js": "^5.0.0", "asn1.js": "^5.0.0",
@ -82,8 +83,7 @@
"hash.js": "^1.1.3", "hash.js": "^1.1.3",
"node-fetch": "^2.1.2", "node-fetch": "^2.1.2",
"node-localstorage": "~1.3.0", "node-localstorage": "~1.3.0",
"pako": "^1.0.6", "pako": "^1.0.6"
"web-streams-polyfill": "^1.3.2"
}, },
"repository": { "repository": {
"type": "git", "type": "git",

View File

@ -212,27 +212,17 @@ function dearmor(input) {
let headersDone; let headersDone;
let text = []; let text = [];
let textDone; let textDone;
let reader;
let controller;
let buffer = '';
let data = base64.decode(stream.transformRaw(input, {
transform: (value, controller) => process(buffer + value, controller),
flush: controller => process(buffer, controller)
}));
let checksum; let checksum;
const checksumVerified = getCheckSum(stream.clone(data)); let data = base64.decode(stream.transformPair(input, async (readable, writable) => {
data = stream.getReader(data).substream(); // Convert to Stream const reader = stream.getReader(readable);
data = stream.transform(data, value => value, async () => { const writer = stream.getWriter(writable);
const checksumVerifiedString = await stream.readToEnd(checksumVerified); while (true) {
if (checksum !== checksumVerifiedString && (checksum || config.checksum_required)) { await writer.ready;
throw new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" + let line = await reader.readLine();
checksumVerifiedString + "'"); if (line === undefined) {
writer.abort('Misformed armored text');
break;
} }
});
function process(value, controller) {
const lineEndIndex = value.indexOf('\n') + 1;
if (lineEndIndex) {
let line = value.substr(0, lineEndIndex);
// remove trailing whitespace at end of lines // remove trailing whitespace at end of lines
// remove leading whitespace for compat with older versions of OpenPGP.js // remove leading whitespace for compat with older versions of OpenPGP.js
line = line.trim(); line = line.trim();
@ -265,24 +255,32 @@ function dearmor(input) {
} else { } else {
if (!reSplit.test(line)) { if (!reSplit.test(line)) {
if (line[0] !== '=') { if (line[0] !== '=') {
controller.enqueue(line); writer.write(line);
} else { } else {
checksum = line.substr(1); checksum = line.substr(1);
} }
} else { } else {
controller.close(); writer.close();
return; break;
} }
} }
process(value.substr(lineEndIndex), controller); }
}));
data = stream.transformPair(data, async (readable, writable) => {
const checksumVerified = getCheckSum(stream.clone(readable));
stream.pipe(readable, writable, {
preventClose: true
});
const checksumVerifiedString = await stream.readToEnd(checksumVerified);
const writer = stream.getWriter(writable);
await writer.ready;
if (checksum !== checksumVerifiedString && (checksum || config.checksum_required)) {
writer.abort(new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" +
checksumVerifiedString + "'"));
} else { } else {
buffer = value; writer.close();
}
// if (line === undefined) {
// controller.error('Misformed armored text');
// break;
// }
} }
});
} catch(e) { } catch(e) {
reject(e); reject(e);
} }

View File

@ -127,7 +127,7 @@ Message.prototype.decrypt = async function(privateKeys, passwords, sessionKeys)
exception = e; exception = e;
} }
} }
// We don't await stream.cancel here because... it sometimes hangs indefinitely. No clue why. // We don't await stream.cancel here because it only returns when the other copy is canceled too.
stream.cancel(symEncryptedPacket.encrypted); // Don't keep copy of encrypted data in memory. stream.cancel(symEncryptedPacket.encrypted); // Don't keep copy of encrypted data in memory.
symEncryptedPacket.encrypted = null; symEncryptedPacket.encrypted = null;
@ -543,7 +543,7 @@ Message.prototype.verify = async function(keys, date=new Date()) {
throw new Error('Can only verify message with one literal data packet.'); throw new Error('Can only verify message with one literal data packet.');
} }
if (msg.packets.stream) { if (msg.packets.stream) {
let onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature); const onePassSigList = msg.packets.filterByTag(enums.packet.onePassSignature);
onePassSigList.forEach(onePassSig => { onePassSigList.forEach(onePassSig => {
onePassSig.signatureData = stream.fromAsync(() => new Promise(resolve => { onePassSig.signatureData = stream.fromAsync(() => new Promise(resolve => {
onePassSig.signatureDataResolve = resolve; onePassSig.signatureDataResolve = resolve;

View File

@ -329,7 +329,6 @@ export function encrypt({ data, dataType, publicKeys, privateKeys, passwords, se
if (armor) { if (armor) {
result.data = encrypted.message.armor(); result.data = encrypted.message.armor();
result.data = await convertStream(result.data, asStream); result.data = await convertStream(result.data, asStream);
// result.cancel = stream.cancel.bind(result.data);
} else { } else {
result.message = encrypted.message; result.message = encrypted.message;
} }
@ -365,18 +364,22 @@ export function decrypt({ message, privateKeys, passwords, sessionKeys, publicKe
return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, asStream, signature, date }); return asyncProxy.delegate('decrypt', { message, privateKeys, passwords, sessionKeys, publicKeys, format, asStream, signature, date });
} }
return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(message) { return message.decrypt(privateKeys, passwords, sessionKeys).then(async function(decrypted) {
if (!publicKeys) { if (!publicKeys) {
publicKeys = []; publicKeys = [];
} }
const result = {}; const result = {};
result.signatures = signature ? await message.verifyDetached(signature, publicKeys, date) : await message.verify(publicKeys, date); result.signatures = signature ? await decrypted.verifyDetached(signature, publicKeys, date) : await decrypted.verify(publicKeys, date);
result.data = format === 'binary' ? message.getLiteralData() : message.getText(); result.data = format === 'binary' ? decrypted.getLiteralData() : decrypted.getText();
result.data = await convertStream(result.data, asStream); result.data = await convertStream(result.data, asStream);
result.signatures = await convertStreamArray(result.signatures, asStream); result.signatures = await convertStreamArray(result.signatures, asStream);
result.filename = message.getFilename(); if (asStream) {
// result.cancel = stream.cancel.bind(message.packets); result.data = stream.transformPair(message.packets.stream, async (readable, writable) => {
await stream.pipe(result.data, writable);
});
}
result.filename = decrypted.getFilename();
return result; return result;
}).catch(onError.bind(null, 'Error decrypting message')); }).catch(onError.bind(null, 'Error decrypting message'));
} }
@ -428,7 +431,6 @@ export function sign({ data, dataType, privateKeys, armor=true, asStream, detach
if (armor) { if (armor) {
result.data = message.armor(); result.data = message.armor();
result.data = await convertStream(result.data, asStream); result.data = await convertStream(result.data, asStream);
// result.cancel = stream.cancel.bind(result.data);
} else { } else {
result.message = message; result.message = message;
} }
@ -464,7 +466,6 @@ export function verify({ message, publicKeys, asStream, signature=null, date=new
result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData(); result.data = message instanceof CleartextMessage ? message.getText() : message.getLiteralData();
result.data = await convertStream(result.data, asStream); result.data = await convertStream(result.data, asStream);
result.signatures = await convertStreamArray(result.signatures, asStream); result.signatures = await convertStreamArray(result.signatures, asStream);
// result.cancel = stream.cancel.bind(message.packets);
return result; return result;
}).catch(onError.bind(null, 'Error verifying cleartext signed message')); }).catch(onError.bind(null, 'Error verifying cleartext signed message'));
} }

View File

@ -70,15 +70,16 @@ function Compressed() {
* @param {String} bytes Payload of a tag 8 packet * @param {String} bytes Payload of a tag 8 packet
*/ */
Compressed.prototype.read = async function (bytes) { Compressed.prototype.read = async function (bytes) {
const reader = stream.getReader(bytes); await stream.parse(bytes, async reader => {
// One octet that gives the algorithm used to compress the packet. // One octet that gives the algorithm used to compress the packet.
this.algorithm = enums.read(enums.compression, await reader.readByte()); this.algorithm = enums.read(enums.compression, await reader.readByte());
// Compressed data, which makes up the remainder of the packet. // Compressed data, which makes up the remainder of the packet.
this.compressed = reader.substream(); this.compressed = reader.remainder();
await this.decompress(); await this.decompress();
});
}; };

View File

@ -139,7 +139,7 @@ Literal.prototype.getFilename = function() {
* @returns {module:packet.Literal} object representation * @returns {module:packet.Literal} object representation
*/ */
Literal.prototype.read = async function(bytes) { Literal.prototype.read = async function(bytes) {
const reader = stream.getReader(bytes); await stream.parse(bytes, async reader => {
// - A one-octet field that describes how the data is formatted. // - A one-octet field that describes how the data is formatted.
const format = enums.read(enums.literal, await reader.readByte()); const format = enums.read(enums.literal, await reader.readByte());
@ -148,9 +148,10 @@ Literal.prototype.read = async function(bytes) {
this.date = util.readDate(await reader.readBytes(4)); this.date = util.readDate(await reader.readBytes(4));
const data = reader.substream(); const data = reader.remainder();
this.setBytes(data, format); this.setBytes(data, format);
});
}; };
/** /**

View File

@ -259,10 +259,11 @@ export default {
if (controller) { if (controller) {
controller.close(); controller.close();
} }
return !done && value && value.length; return done || !value || !value.length;
} catch(e) { } catch(e) {
if (controller) { if (controller) {
controller.error(e); controller.error(e);
return true;
} else { } else {
throw e; throw e;
} }

View File

@ -36,34 +36,34 @@ function List() {
* @param {Uint8Array} A Uint8Array of bytes. * @param {Uint8Array} A Uint8Array of bytes.
*/ */
List.prototype.read = async function (bytes) { List.prototype.read = async function (bytes) {
this.stream = new ReadableStream({ this.stream = stream.transformPair(bytes, async (readable, writable) => {
pull: async controller => { const writer = stream.getWriter(writable);
try { while (true) {
if (!await packetParser.read(bytes, async parsed => { await writer.ready;
const done = await packetParser.read(readable, async parsed => {
try { try {
const tag = enums.read(enums.packet, parsed.tag); const tag = enums.read(enums.packet, parsed.tag);
const packet = packets.newPacketFromTag(tag); const packet = packets.newPacketFromTag(tag);
packet.packets = new List(); packet.packets = new List();
packet.fromStream = util.isStream(parsed.packet); packet.fromStream = util.isStream(parsed.packet);
await packet.read(parsed.packet); await packet.read(parsed.packet);
controller.enqueue(packet); writer.write(packet);
} catch (e) { } catch (e) {
if (!config.tolerant || if (!config.tolerant ||
parsed.tag === enums.packet.symmetricallyEncrypted || parsed.tag === enums.packet.symmetricallyEncrypted ||
parsed.tag === enums.packet.literal || parsed.tag === enums.packet.literal ||
parsed.tag === enums.packet.compressed) { parsed.tag === enums.packet.compressed) {
controller.error(e); writer.abort(e);
} }
util.print_debug_error(e); util.print_debug_error(e);
} }
})) { });
controller.close(); if (done) {
await writer.ready;
writer.close();
return;
} }
} catch(e) {
controller.error(e);
} }
},
cancel: stream.cancel.bind(bytes)
}); });
// Wait until first few packets have been read // Wait until first few packets have been read

View File

@ -144,23 +144,18 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
let cryptedBytes = 0; let cryptedBytes = 0;
let queuedBytes = 0; let queuedBytes = 0;
const iv = this.iv; const iv = this.iv;
let buffer = []; return stream.transformPair(data, async (readable, writable) => {
return stream.transformRaw(data, { const reader = stream.getReader(readable);
transform: process, const writer = stream.getWriter(writable);
flush: controller => process(undefined, controller, true) while (true) {
}); await writer.ready;
async function process(value, controller, final) { let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array();
if (!final) buffer.push(value);
while (buffer.reduce(((acc, value) => acc + value.length), 0) >= (final ? 0 : chunkSize) + tagLengthIfDecrypting) {
const bufferConcat = util.concatUint8Array(buffer);
let chunk = bufferConcat.subarray(0, chunkSize + tagLengthIfDecrypting);
buffer = [bufferConcat.subarray(chunkSize + tagLengthIfDecrypting)];
const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting); const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting);
chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting); chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting);
let cryptedPromise; let cryptedPromise;
let done; let done;
if (!chunkIndex || chunk.length) { if (!chunkIndex || chunk.length) {
buffer.unshift(finalChunk); reader.unshift(finalChunk);
cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray); cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray);
} else { } else {
// After the last chunk, we either encrypt a final, empty // After the last chunk, we either encrypt a final, empty
@ -172,22 +167,23 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) {
} }
cryptedBytes += chunk.length - tagLengthIfDecrypting; cryptedBytes += chunk.length - tagLengthIfDecrypting;
queuedBytes += chunk.length - tagLengthIfDecrypting; queuedBytes += chunk.length - tagLengthIfDecrypting;
// eslint-disable-next-line no-loop-func
latestPromise = latestPromise.then(() => cryptedPromise).then(crypted => { latestPromise = latestPromise.then(() => cryptedPromise).then(crypted => {
controller.enqueue(crypted); writer.write(crypted);
queuedBytes -= chunk.length; queuedBytes -= chunk.length;
}).catch(err => controller.error(err)); }).catch(err => writer.abort(err));
// console.log(fn, done, queuedBytes, controller.desiredSize); // console.log(fn, done, queuedBytes, writer.desiredSize);
if (done || queuedBytes > controller.desiredSize) { if (done || queuedBytes > writer.desiredSize) {
await latestPromise; // Respect backpressure await latestPromise; // Respect backpressure
} }
if (!done) { if (!done) {
adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...) adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...)
} else { } else {
controller.terminate(); writer.close();
return; break;
}
} }
} }
});
} else { } else {
return modeInstance[fn](await stream.readToEnd(data), this.iv); return modeInstance[fn](await stream.readToEnd(data), this.iv);
} }

View File

@ -65,7 +65,7 @@ function SymEncryptedIntegrityProtected() {
} }
SymEncryptedIntegrityProtected.prototype.read = async function (bytes) { SymEncryptedIntegrityProtected.prototype.read = async function (bytes) {
const reader = stream.getReader(bytes); await stream.parse(bytes, async reader => {
// - A one-octet version number. The only currently defined value is 1. // - A one-octet version number. The only currently defined value is 1.
if (await reader.readByte() !== VERSION) { if (await reader.readByte() !== VERSION) {
@ -75,7 +75,8 @@ SymEncryptedIntegrityProtected.prototype.read = async function (bytes) {
// - Encrypted data, the output of the selected symmetric-key cipher // - Encrypted data, the output of the selected symmetric-key cipher
// operating in Cipher Feedback mode with shift amount equal to the // operating in Cipher Feedback mode with shift amount equal to the
// block size of the cipher (CFB-n where n is the block size). // block size of the cipher (CFB-n where n is the block size).
this.encrypted = reader.substream(); this.encrypted = reader.remainder();
});
}; };
SymEncryptedIntegrityProtected.prototype.write = function () { SymEncryptedIntegrityProtected.prototype.write = function () {

View File

@ -1,7 +1,7 @@
import util from './util'; import util from './util';
// if (typeof ReadableStream === 'undefined') { // if (typeof TransformStream === 'undefined') {
Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill')); Object.assign(typeof window !== 'undefined' ? window : global, require('@mattiasbuelens/web-streams-polyfill'));
// } // }
const nodeStream = util.getNodeStream(); const nodeStream = util.getNodeStream();
@ -18,39 +18,31 @@ function toStream(input) {
}); });
} }
function pipeThrough(input, target, options) {
if (!util.isStream(input)) {
input = toStream(input);
}
return input.pipeThrough(target, options);
}
function concat(arrays) { function concat(arrays) {
arrays = arrays.map(toStream); arrays = arrays.map(toStream);
let controller; let outputController;
const transform = new TransformStream({ const transform = {
readable: new ReadableStream({
start(_controller) { start(_controller) {
controller = _controller; outputController = _controller;
}, },
cancel: () => { async cancel(reason) {
return Promise.all(arrays.map(cancel)); await Promise.all(transforms.map(array => cancel(array, reason)));
} }
}); }),
(async () => { writable: new WritableStream({
for (let i = 0; i < arrays.length; i++) { write: outputController.enqueue.bind(outputController),
// await new Promise(resolve => { close: outputController.close.bind(outputController),
try { abort: outputController.error.bind(outputController)
await arrays[i].pipeTo(transform.writable, { })
};
let prev = Promise.resolve();
const transforms = arrays.map((array, i) => transformPair(array, (readable, writable) => {
prev = prev.then(() => pipe(readable, transform.writable, {
preventClose: i !== arrays.length - 1 preventClose: i !== arrays.length - 1
}); }));
} catch(e) { return prev;
console.log(e); }));
// controller.error(e);
return;
}
// });
}
})();
return transform.readable; return transform.readable;
} }
@ -58,6 +50,10 @@ function getReader(input) {
return new Reader(input); return new Reader(input);
} }
function getWriter(input) {
return input.getWriter();
}
function create(options, extraArg) { function create(options, extraArg) {
const promises = new Map(); const promises = new Map();
const wrap = fn => fn && (controller => { const wrap = fn => fn && (controller => {
@ -65,33 +61,32 @@ function create(options, extraArg) {
promises.set(fn, returnValue); promises.set(fn, returnValue);
return returnValue; return returnValue;
}); });
options.options = Object.assign({}, options);
options.start = wrap(options.start); options.start = wrap(options.start);
options.pull = wrap(options.pull); options.pull = wrap(options.pull);
const _cancel = options.cancel;
options.cancel = async reason => {
try {
console.log('cancel wrapper', reason, options);
await promises.get(options.start);
console.log('awaited start');
await promises.get(options.pull);
console.log('awaited pull');
} finally {
if (_cancel) return _cancel.call(options, reason, extraArg);
}
};
options.options = options;
return new ReadableStream(options); return new ReadableStream(options);
} }
function transformRaw(input, options) { async function pipe(input, target, options) {
options.start = controller => { if (!util.isStream(input)) {
if (input.externalBuffer) { input = toStream(input);
input.externalBuffer.forEach(chunk => {
options.transform(chunk, controller);
});
} }
}; if (input.externalBuffer) {
return toStream(input).pipeThrough(new TransformStream(options)); const writer = target.getWriter();
for (let i = 0; i < input.externalBuffer.length; i++) {
await writer.ready;
writer.write(input.externalBuffer[i]);
}
writer.releaseLock();
}
return input.pipeTo(target, options);
}
function transformRaw(input, options) {
options.cancel = cancel.bind(input);
const transformStream = new TransformStream(options);
pipe(input, transformStream.writable);
return transformStream.readable;
} }
function transform(input, process = () => undefined, finish = () => undefined) { function transform(input, process = () => undefined, finish = () => undefined) {
@ -121,6 +116,60 @@ function transform(input, process = () => undefined, finish = () => undefined) {
return result1 !== undefined ? result1 : result2; return result1 !== undefined ? result1 : result2;
} }
function transformPair(input, fn) {
let incomingTransformController;
const incoming = new TransformStream({
start(controller) {
incomingTransformController = controller;
}
});
const canceledErr = new Error('Readable side was canceled.');
const pipeDonePromise = pipe(input, incoming.writable).catch(e => {
if (e !== canceledErr) {
throw e;
}
});
let outputController;
const outgoing = {
readable: new ReadableStream({
start(_controller) {
outputController = _controller;
},
async cancel() {
incomingTransformController.error(canceledErr);
await pipeDonePromise;
}
}),
writable: new WritableStream({
write: outputController.enqueue.bind(outputController),
close: outputController.close.bind(outputController),
abort: outputController.error.bind(outputController)
})
};
Promise.resolve(fn(incoming.readable, outgoing.writable)).catch(e => {
if (e !== canceledErr) {
throw e;
}
});
return outgoing.readable;
}
function parse(input, fn) {
let returnValue;
const transformed = transformPair(input, (readable, writable) => {
const reader = getReader(readable);
reader.remainder = () => {
reader.releaseLock();
pipe(readable, writable);
return transformed;
};
returnValue = fn(reader);
});
return returnValue;
}
function tee(input) { function tee(input) {
if (util.isStream(input)) { if (util.isStream(input)) {
const teed = input.tee(); const teed = input.tee();
@ -162,7 +211,7 @@ function slice(input, begin=0, end=Infinity) {
} }
bytesRead += value.length; bytesRead += value.length;
} else { } else {
controller.close(); controller.terminate();
} }
} }
}); });
@ -199,41 +248,6 @@ function slice(input, begin=0, end=Infinity) {
return input.slice(begin, end); return input.slice(begin, end);
} }
async function parse(input, parser) {
let controller;
const transformed = transformRaw(input, {
start(_controller) {
controller = _controller;
},
cancel: cancel.bind(input)
});
transformed[stream.cancelReadsSym] = controller.error.bind(controller);
toStream(input).pipeTo(target);
const reader = getReader(transformed.readable);
await parser(reader);
new ReadableStream({
start(_controller) {
controller = _controller;
},
pull: () => {
},
cancel: () => {
}
});
new ReadableStream({
pull: () => {
},
cancel: () => {
}
});
}
async function readToEnd(input, join) { async function readToEnd(input, join) {
if (util.isStream(input)) { if (util.isStream(input)) {
return getReader(input).readToEnd(join); return getReader(input).readToEnd(join);
@ -241,9 +255,9 @@ async function readToEnd(input, join) {
return input; return input;
} }
async function cancel(input) { async function cancel(input, reason) {
if (util.isStream(input)) { if (util.isStream(input)) {
return input.cancel(); return input.cancel(reason);
} }
} }
@ -330,52 +344,7 @@ if (nodeStream) {
} }
export default { toStream, concat, getReader, transformRaw, transform, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync, readerAcquiredMap }; export default { toStream, concat, getReader, getWriter, pipe, transformRaw, transform, transformPair, parse, clone, slice, readToEnd, cancel, nodeToWeb, webToNode, fromAsync };
const readerAcquiredMap = new Map();
const _getReader = ReadableStream.prototype.getReader;
ReadableStream.prototype.getReader = function() {
if (readerAcquiredMap.has(this)) {
console.error(readerAcquiredMap.get(this));
} else {
readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
}
const _this = this;
const reader = _getReader.apply(this, arguments);
const _releaseLock = reader.releaseLock;
reader.releaseLock = function() {
try {
readerAcquiredMap.delete(_this);
} catch(e) {}
return _releaseLock.apply(this, arguments);
};
return reader;
};
const _tee = ReadableStream.prototype.tee;
ReadableStream.prototype.tee = function() {
if (readerAcquiredMap.has(this)) {
console.error(readerAcquiredMap.get(this));
} else {
readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
}
return _tee.apply(this, arguments);
};
const _cancel = ReadableStream.prototype.cancel;
ReadableStream.prototype.cancel = function() {
try {
return _cancel.apply(this, arguments);
} finally {
if (readerAcquiredMap.has(this)) {
console.error(readerAcquiredMap.get(this));
} else {
readerAcquiredMap.set(this, new Error('Reader for this ReadableStream already acquired here.'));
}
}
};
const doneReadingSet = new WeakSet(); const doneReadingSet = new WeakSet();
@ -484,25 +453,6 @@ Reader.prototype.unshift = function(...values) {
this.externalBuffer.unshift(...values.filter(value => value && value.length)); this.externalBuffer.unshift(...values.filter(value => value && value.length));
}; };
Reader.prototype.substream = function() {
return Object.assign(create({
pull: async controller => {
const { done, value } = await this.read();
if (!done) {
controller.enqueue(value);
} else {
controller.close();
}
},
cancel: () => {
this.releaseLock();
return cancel(this.stream);
}
}), { from: this.stream });
this.releaseLock();
return this.stream;
};
Reader.prototype.readToEnd = async function(join=util.concat) { Reader.prototype.readToEnd = async function(join=util.concat) {
const result = []; const result = [];
while (true) { while (true) {

View File

@ -84,13 +84,16 @@ export default {
if (value.locked) { if (value.locked) {
obj[key] = null; obj[key] = null;
} else { } else {
const reader = stream.getReader(value); const transformed = stream.transformPair(value, async readable => {
const reader = stream.getReader(readable);
const { port1, port2 } = new MessageChannel(); const { port1, port2 } = new MessageChannel();
port1.onmessage = async function() { port1.onmessage = async function({ data: { action } }) {
port1.postMessage(await reader.read()); if (action === 'read') port1.postMessage(await reader.read());
else if (action === 'cancel') port1.postMessage(await transformed.cancel());
}; };
obj[key] = port2; obj[key] = port2;
collection.push(port2); collection.push(port2);
});
} }
return; return;
} }
@ -115,7 +118,13 @@ export default {
} }
resolve(); resolve();
}; };
value.postMessage(undefined); value.postMessage({ action: 'read' });
});
},
cancel() {
return new Promise(resolve => {
value.onmessage = resolve;
value.postMessage({ action: 'cancel' });
}); });
} }
}); });

View File

@ -6,7 +6,7 @@ chai.use(require('chai-as-promised'));
const { expect } = chai; const { expect } = chai;
const { util } = openpgp; const { stream, util } = openpgp;
const pub_key = const pub_key =
['-----BEGIN PGP PUBLIC KEY BLOCK-----', ['-----BEGIN PGP PUBLIC KEY BLOCK-----',
@ -102,6 +102,7 @@ describe('Streaming', function() {
let i = 0; let i = 0;
const data = new ReadableStream({ const data = new ReadableStream({
async pull(controller) { async pull(controller) {
await new Promise(setTimeout);
if (i++ < 10) { if (i++ < 10) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(1024); let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
controller.enqueue(randomBytes); controller.enqueue(randomBytes);
@ -115,7 +116,7 @@ describe('Streaming', function() {
data, data,
passwords: ['test'], passwords: ['test'],
}); });
expect(await openpgp.stream.getReader(openpgp.stream.clone(encrypted.data)).readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\nVersion: OpenPGP.js VERSION\r\nComment: https:\/\/openpgpjs.org\r\n\r\n/); expect(await openpgp.stream.getReader(openpgp.stream.clone(encrypted.data)).readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\n/);
if (i > 10) throw new Error('Data did not arrive early.'); if (i > 10) throw new Error('Data did not arrive early.');
const msgAsciiArmored = await openpgp.stream.readToEnd(encrypted.data); const msgAsciiArmored = await openpgp.stream.readToEnd(encrypted.data);
const message = await openpgp.message.readArmored(msgAsciiArmored); const message = await openpgp.message.readArmored(msgAsciiArmored);
@ -133,6 +134,7 @@ describe('Streaming', function() {
let canceled = false; let canceled = false;
const data = new ReadableStream({ const data = new ReadableStream({
async pull(controller) { async pull(controller) {
await new Promise(setTimeout);
if (i++ < 10) { if (i++ < 10) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(1024); let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
controller.enqueue(randomBytes); controller.enqueue(randomBytes);
@ -150,9 +152,7 @@ describe('Streaming', function() {
passwords: ['test'], passwords: ['test'],
}); });
const reader = openpgp.stream.getReader(encrypted.data); const reader = openpgp.stream.getReader(encrypted.data);
console.log('read start'); expect(await reader.readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\n/);
expect(await reader.readBytes(1024)).to.match(/^-----BEGIN PGP MESSAGE-----\r\nVersion: OpenPGP.js VERSION\r\nComment: https:\/\/openpgpjs.org\r\n\r\n/);
console.log('read end');
if (i > 10) throw new Error('Data did not arrive early.'); if (i > 10) throw new Error('Data did not arrive early.');
reader.releaseLock(); reader.releaseLock();
await openpgp.stream.cancel(encrypted.data); await openpgp.stream.cancel(encrypted.data);
@ -380,7 +380,7 @@ describe('Streaming', function() {
let i = 0; let i = 0;
const data = new ReadableStream({ const data = new ReadableStream({
async pull(controller) { async pull(controller) {
await new Promise(setTimeout); await new Promise(resolve => setTimeout(resolve, 10));
if (i++ < 10) { if (i++ < 10) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(1024); let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
controller.enqueue(randomBytes); controller.enqueue(randomBytes);
@ -412,6 +412,46 @@ describe('Streaming', function() {
} }
}); });
it('stream.transformPair()', async function() {
let plaintext = [];
let i = 0;
let canceled = false;
let controller;
const data = new ReadableStream({
start(_controller) {
controller = _controller;
},
async pull(controller) {
await new Promise(setTimeout);
if (i++ < 10) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
controller.enqueue(randomBytes);
plaintext.push(randomBytes);
} else {
controller.close();
}
},
cancel() {
canceled = true;
}
});
data.controller = controller;
const transformed = stream.transformPair(stream.slice(data, 0, 5000), async (readable, writable) => {
const reader = stream.getReader(readable);
const writer = stream.getWriter(writable);
while (true) {
await writer.ready;
const { done, value } = await reader.read();
if (done) return writer.close();
writer.write(value);
}
});
await new Promise(resolve => setTimeout(resolve));
await stream.cancel(transformed);
expect(canceled).to.be.true;
});
it('Input stream should be canceled when canceling decrypted stream (draft04)', async function() { it('Input stream should be canceled when canceling decrypted stream (draft04)', async function() {
let aead_protectValue = openpgp.config.aead_protect; let aead_protectValue = openpgp.config.aead_protect;
let aead_chunk_size_byteValue = openpgp.config.aead_chunk_size_byte; let aead_chunk_size_byteValue = openpgp.config.aead_chunk_size_byte;
@ -423,7 +463,7 @@ describe('Streaming', function() {
let canceled = false; let canceled = false;
const data = new ReadableStream({ const data = new ReadableStream({
async pull(controller) { async pull(controller) {
await new Promise(setTimeout); await new Promise(resolve => setTimeout(resolve, 10));
if (i++ < 10) { if (i++ < 10) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(1024); let randomBytes = await openpgp.crypto.random.getRandomBytes(1024);
controller.enqueue(randomBytes); controller.enqueue(randomBytes);
@ -449,11 +489,11 @@ describe('Streaming', function() {
format: 'binary' format: 'binary'
}); });
expect(util.isStream(decrypted.data)).to.be.true; expect(util.isStream(decrypted.data)).to.be.true;
const reader = openpgp.stream.getReader(openpgp.stream.clone(decrypted.data)); const reader = openpgp.stream.getReader(decrypted.data);
expect(await reader.readBytes(1024)).to.deep.equal(plaintext[0]); expect(await reader.readBytes(1024)).to.deep.equal(plaintext[0]);
if (i > 10) throw new Error('Data did not arrive early.'); if (i > 10) throw new Error('Data did not arrive early.');
reader.releaseLock(); reader.releaseLock();
await openpgp.stream.cancel(decrypted.data); await openpgp.stream.cancel(decrypted.data, new Error('canceled by test'));
expect(canceled).to.be.true; expect(canceled).to.be.true;
} finally { } finally {
openpgp.config.aead_protect = aead_protectValue; openpgp.config.aead_protect = aead_protectValue;

View File

@ -5,9 +5,9 @@ if (typeof Symbol === 'undefined') {
if (typeof Promise === 'undefined') { if (typeof Promise === 'undefined') {
require('core-js/fn/promise'); require('core-js/fn/promise');
} }
if (typeof ReadableStream === 'undefined') { // if (typeof TransformStream === 'undefined') {
Object.assign(typeof window !== 'undefined' ? window : global, require('web-streams-polyfill')); Object.assign(typeof window !== 'undefined' ? window : global, require('@mattiasbuelens/web-streams-polyfill'));
} // }
(typeof window !== 'undefined' ? window : global).resolves = function(val) { (typeof window !== 'undefined' ? window : global).resolves = function(val) {
return new Promise(function(res) { res(val); }); return new Promise(function(res) { res(val); });