diff --git a/src/encoding/armor.js b/src/encoding/armor.js index 3ebb1d3e..09ecd06e 100644 --- a/src/encoding/armor.js +++ b/src/encoding/armor.js @@ -212,57 +212,69 @@ function dearmor(input) { let headersDone; let text = []; let textDone; + let resolved = false; let checksum; let data = base64.decode(stream.transformPair(input, async (readable, writable) => { const reader = stream.getReader(readable); const writer = stream.getWriter(writable); while (true) { - await writer.ready; - let line = await reader.readLine(); - if (line === undefined) { - writer.abort('Misformed armored text'); - break; - } - // remove trailing whitespace at end of lines - // remove leading whitespace for compat with older versions of OpenPGP.js - line = line.trim(); - if (!type) { - if (reSplit.test(line)) { - type = getType(line); + if (resolved) await writer.ready; + try { + let line = await reader.readLine(); + if (line === undefined) { + throw new Error('Misformed armored text'); } - } else if (!headersDone) { - if (reSplit.test(line)) { - reject(new Error('Mandatory blank line missing between armor headers and armor data')); - } - if (!reEmptyLine.test(line)) { - lastHeaders.push(line); - } else { - verifyHeaders(lastHeaders); - headersDone = true; - if (textDone || type !== 2) resolve({ text, data, headers, type }); - } - } else if (!textDone && type === 2) { - if (!reSplit.test(line)) { - // Reverse dash-escaping for msg - text.push(line.replace(/^- /, '')); - } else { - text = text.join('\r\n'); - textDone = true; - verifyHeaders(lastHeaders); - lastHeaders = []; - headersDone = false; - } - } else { - if (!reSplit.test(line)) { - if (line[0] !== '=') { - writer.write(line); + // remove trailing whitespace at end of lines + // remove leading whitespace for compat with older versions of OpenPGP.js + line = line.trim(); + if (!type) { + if (reSplit.test(line)) { + type = getType(line); + } + } else if (!headersDone) { + if (reSplit.test(line)) { + reject(new Error('Mandatory blank line missing between armor headers and armor data')); + } + if (!reEmptyLine.test(line)) { + lastHeaders.push(line); } else { - checksum = line.substr(1); + verifyHeaders(lastHeaders); + headersDone = true; + if (textDone || type !== 2) { + resolve({ text, data, headers, type }); + resolved = true; + } + } + } else if (!textDone && type === 2) { + if (!reSplit.test(line)) { + // Reverse dash-escaping for msg + text.push(line.replace(/^- /, '')); + } else { + text = text.join('\r\n'); + textDone = true; + verifyHeaders(lastHeaders); + lastHeaders = []; + headersDone = false; } } else { - writer.close(); - break; + if (!reSplit.test(line)) { + if (line[0] !== '=') { + await writer.write(line); + } else { + checksum = line.substr(1); + } + } else { + await writer.close(); + break; + } } + } catch(e) { + if (resolved) { + await writer.abort(e); + } else { + reject(e); + } + break; } } })); @@ -275,10 +287,10 @@ function dearmor(input) { const writer = stream.getWriter(writable); await writer.ready; if (checksum !== checksumVerifiedString && (checksum || config.checksum_required)) { - writer.abort(new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" + + await writer.abort(new Error("Ascii armor integrity check on message failed: '" + checksum + "' should be '" + checksumVerifiedString + "'")); } else { - writer.close(); + await writer.close(); } }); } catch(e) { diff --git a/src/packet/packetlist.js b/src/packet/packetlist.js index fcedb6ee..a96b3276 100644 --- a/src/packet/packetlist.js +++ b/src/packet/packetlist.js @@ -38,31 +38,35 @@ function List() { List.prototype.read = async function (bytes) { this.stream = stream.transformPair(bytes, async (readable, writable) => { const writer = stream.getWriter(writable); - while (true) { - await writer.ready; - const done = await packetParser.read(readable, async parsed => { - try { - const tag = enums.read(enums.packet, parsed.tag); - const packet = packets.newPacketFromTag(tag); - packet.packets = new List(); - packet.fromStream = util.isStream(parsed.packet); - await packet.read(parsed.packet); - await writer.write(packet); - } catch (e) { - if (!config.tolerant || - parsed.tag === enums.packet.symmetricallyEncrypted || - parsed.tag === enums.packet.literal || - parsed.tag === enums.packet.compressed) { - writer.abort(e); - } - util.print_debug_error(e); - } - }); - if (done) { + try { + while (true) { await writer.ready; - writer.close(); - return; + const done = await packetParser.read(readable, async parsed => { + try { + const tag = enums.read(enums.packet, parsed.tag); + const packet = packets.newPacketFromTag(tag); + packet.packets = new List(); + packet.fromStream = util.isStream(parsed.packet); + await packet.read(parsed.packet); + await writer.write(packet); + } catch (e) { + if (!config.tolerant || + parsed.tag === enums.packet.symmetricallyEncrypted || + parsed.tag === enums.packet.literal || + parsed.tag === enums.packet.compressed) { + await writer.abort(e); + } + util.print_debug_error(e); + } + }); + if (done) { + await writer.ready; + await writer.close(); + return; + } } + } catch(e) { + await writer.abort(e); } }); diff --git a/src/packet/sym_encrypted_aead_protected.js b/src/packet/sym_encrypted_aead_protected.js index 2458c879..785d8cd7 100644 --- a/src/packet/sym_encrypted_aead_protected.js +++ b/src/packet/sym_encrypted_aead_protected.js @@ -147,41 +147,45 @@ SymEncryptedAEADProtected.prototype.crypt = async function (fn, key, data) { return stream.transformPair(data, async (readable, writable) => { const reader = stream.getReader(readable); const writer = stream.getWriter(writable); - while (true) { - await writer.ready; - let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array(); - const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting); - chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting); - let cryptedPromise; - let done; - if (!chunkIndex || chunk.length) { - reader.unshift(finalChunk); - cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray); - } else { - // After the last chunk, we either encrypt a final, empty - // data chunk to get the final authentication tag or - // validate that final authentication tag. - adataView.setInt32(13 + 4, cryptedBytes); // Should be setInt64(13, ...) - cryptedPromise = modeInstance[fn](finalChunk, mode.getNonce(iv, chunkIndexArray), adataTagArray); - done = true; - } - cryptedBytes += chunk.length - tagLengthIfDecrypting; - queuedBytes += chunk.length - tagLengthIfDecrypting; - // eslint-disable-next-line no-loop-func - latestPromise = latestPromise.then(() => cryptedPromise).then(crypted => { - writer.write(crypted); - queuedBytes -= chunk.length; - }).catch(err => writer.abort(err)); - // console.log(fn, done, queuedBytes, writer.desiredSize); - if (done || queuedBytes > writer.desiredSize) { - await latestPromise; // Respect backpressure - } - if (!done) { - adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...) - } else { - writer.close(); - break; + try { + while (true) { + await writer.ready; + let chunk = await reader.readBytes(chunkSize + tagLengthIfDecrypting) || new Uint8Array(); + const finalChunk = chunk.subarray(chunk.length - tagLengthIfDecrypting); + chunk = chunk.subarray(0, chunk.length - tagLengthIfDecrypting); + let cryptedPromise; + let done; + if (!chunkIndex || chunk.length) { + reader.unshift(finalChunk); + cryptedPromise = modeInstance[fn](chunk, mode.getNonce(iv, chunkIndexArray), adataArray); + } else { + // After the last chunk, we either encrypt a final, empty + // data chunk to get the final authentication tag or + // validate that final authentication tag. + adataView.setInt32(13 + 4, cryptedBytes); // Should be setInt64(13, ...) + cryptedPromise = modeInstance[fn](finalChunk, mode.getNonce(iv, chunkIndexArray), adataTagArray); + done = true; + } + cryptedBytes += chunk.length - tagLengthIfDecrypting; + queuedBytes += chunk.length - tagLengthIfDecrypting; + // eslint-disable-next-line no-loop-func + latestPromise = latestPromise.then(() => cryptedPromise).then(async crypted => { + await writer.write(crypted); + queuedBytes -= chunk.length; + }).catch(err => writer.abort(err)); + // console.log(fn, done, queuedBytes, writer.desiredSize); + if (done || queuedBytes > writer.desiredSize) { + await latestPromise; // Respect backpressure + } + if (!done) { + adataView.setInt32(5 + 4, ++chunkIndex); // Should be setInt64(5, ...) + } else { + await writer.close(); + break; + } } + } catch(e) { + await writer.abort(e); } }); } else { diff --git a/src/stream.js b/src/stream.js index c003a6fb..3d028df4 100644 --- a/src/stream.js +++ b/src/stream.js @@ -53,7 +53,7 @@ async function pipe(input, target, options) { } writer.releaseLock(); } - return input.pipeTo(target, options); + return input.pipeTo(target, options).catch(function() {}); } function transformRaw(input, options) { @@ -125,22 +125,13 @@ function transformPair(input, fn) { } }); - const canceledErr = new Error('Readable side was canceled.'); - const pipeDonePromise = pipe(input, incoming.writable).catch(e => { - if (e !== canceledErr) { - throw e; - } - }); + const pipeDonePromise = pipe(input, incoming.writable); const outgoing = transformWithCancel(async function() { - incomingTransformController.error(canceledErr); + incomingTransformController.error(new Error('Readable side was canceled.')); await pipeDonePromise; }); - Promise.resolve(fn(incoming.readable, outgoing.writable)).catch(e => { - if (e !== canceledErr) { - throw e; - } - }); + fn(incoming.readable, outgoing.writable); return outgoing.readable; } @@ -183,16 +174,20 @@ function passiveClone(input) { const transformed = transformPair(input, async (readable, writable) => { const reader = getReader(readable); const writer = getWriter(writable); - while (true) { - await writer.ready; - const { done, value } = await reader.read(); - if (done) { - try { controller.close(); } catch(e) {} - await writer.close(); - return; + try { + while (true) { + await writer.ready; + const { done, value } = await reader.read(); + if (done) { + try { controller.close(); } catch(e) {} + await writer.close(); + return; + } + try { controller.enqueue(value); } catch(e) {} + await writer.write(value); } - try { controller.enqueue(value); } catch(e) {} - await writer.write(value); + } catch(e) { + await writer.abort(e); } }); overwrite(input, transformed); diff --git a/test/general/streaming.js b/test/general/streaming.js index b9baf718..ba73685e 100644 --- a/test/general/streaming.js +++ b/test/general/streaming.js @@ -416,11 +416,7 @@ describe('Streaming', function() { let plaintext = []; let i = 0; let canceled = false; - let controller; const data = new ReadableStream({ - start(_controller) { - controller = _controller; - }, async pull(controller) { await new Promise(setTimeout); if (i++ < 10) { @@ -435,16 +431,22 @@ describe('Streaming', function() { canceled = true; } }); - data.controller = controller; const transformed = stream.transformPair(stream.slice(data, 0, 5000), async (readable, writable) => { const reader = stream.getReader(readable); const writer = stream.getWriter(writable); - while (true) { - await writer.ready; - const { done, value } = await reader.read(); - if (done) return writer.close(); - writer.write(value); + try { + while (true) { + await writer.ready; + const { done, value } = await reader.read(); + if (done) { + await writer.close(); + break; + } + await writer.write(value); + } + } catch(e) { + await writer.abort(e); } }); await new Promise(resolve => setTimeout(resolve));