Don't clone stream in packet.parse()

This commit is contained in:
Daniel Huigens 2018-06-12 18:55:00 +02:00
parent ddda6a0b16
commit 589b666ac7
2 changed files with 75 additions and 71 deletions

View File

@ -138,8 +138,8 @@ export default {
* @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise. * @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise.
*/ */
read: async function(input, callback) { read: async function(input, callback) {
let reader = stream.getReader(input); const reader = stream.getReader(input);
let controller; let writer;
try { try {
const peekedBytes = await reader.peekBytes(2); const peekedBytes = await reader.peekBytes(2);
// some sanity checks // some sanity checks
@ -168,42 +168,50 @@ export default {
const streaming = this.supportsStreaming(tag); const streaming = this.supportsStreaming(tag);
let packet = null; let packet = null;
if (!format) { let callbackReturned;
// 4.2.1. Old Format Packet Lengths if (streaming) {
switch (packet_length_type) { const transform = new TransformStream();
case 0: writer = stream.getWriter(transform.writable);
// The packet has a one-octet length. The header is 2 octets packet = transform.readable;
// long. callbackReturned = callback({ tag, packet });
packet_length = await reader.readByte(); }
break;
case 1: let wasPartialLength;
// The packet has a two-octet length. The header is 3 octets do {
// long. if (!format) {
packet_length = (await reader.readByte() << 8) | await reader.readByte(); // 4.2.1. Old Format Packet Lengths
break; switch (packet_length_type) {
case 2: case 0:
// The packet has a four-octet length. The header is 5 // The packet has a one-octet length. The header is 2 octets
// octets long. // long.
packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() << packet_length = await reader.readByte();
8) | await reader.readByte(); break;
break; case 1:
default: // The packet has a two-octet length. The header is 3 octets
// 3 - The packet is of indeterminate length. The header is 1 // long.
// octet long, and the implementation must determine how long packet_length = (await reader.readByte() << 8) | await reader.readByte();
// the packet is. If the packet is in a file, this means that break;
// the packet extends until the end of the file. In general, case 2:
// an implementation SHOULD NOT use indeterminate-length // The packet has a four-octet length. The header is 5
// packets except where the end of the data will be clear // octets long.
// from the context, and even then it is better to use a packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() <<
// definite length, or a new format header. The new format 8) | await reader.readByte();
// headers described below have a mechanism for precisely break;
// encoding data of indeterminate length. default:
packet_length = Infinity; // 3 - The packet is of indeterminate length. The header is 1
break; // octet long, and the implementation must determine how long
} // the packet is. If the packet is in a file, this means that
} else { // 4.2.2. New Format Packet Lengths // the packet extends until the end of the file. In general,
let wasPartialLength; // an implementation SHOULD NOT use indeterminate-length
do { // packets except where the end of the data will be clear
// from the context, and even then it is better to use a
// definite length, or a new format header. The new format
// headers described below have a mechanism for precisely
// encoding data of indeterminate length.
packet_length = Infinity;
break;
}
} else { // 4.2.2. New Format Packet Lengths
// 4.2.2.1. One-Octet Lengths // 4.2.2.1. One-Octet Lengths
const lengthByte = await reader.readByte(); const lengthByte = await reader.readByte();
wasPartialLength = false; wasPartialLength = false;
@ -218,51 +226,47 @@ export default {
wasPartialLength = true; wasPartialLength = true;
if (!streaming) { if (!streaming) {
throw new TypeError('This packet type does not support partial lengths.'); throw new TypeError('This packet type does not support partial lengths.');
} else if (!packet) {
packet = new ReadableStream({
// eslint-disable-next-line no-loop-func
async start(_controller) {
controller = _controller;
},
cancel: stream.cancel.bind(input)
});
callback({ tag, packet });
} }
// 4.2.2.3. Five-Octet Lengths // 4.2.2.3. Five-Octet Lengths
} else { } else {
packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() << packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() <<
8) | await reader.readByte(); 8) | await reader.readByte();
} }
if (controller) {
controller.enqueue(await reader.readBytes(packet_length));
}
} while(wasPartialLength);
}
if (!packet) {
if (streaming) {
// Send the remainder of the packet to the callback as a stream
reader.releaseLock();
packet = stream.slice(stream.clone(input), 0, packet_length);
await callback({ tag, packet });
// Read the entire packet before parsing the next one
reader = stream.getReader(input);
await reader.readBytes(packet_length);
} else {
packet = await reader.readBytes(packet_length);
await callback({ tag, packet });
} }
if (writer) {
let bytesRead = 0;
while (true) {
await writer.ready;
const { done, value } = await reader.read();
if (done) {
if (packet_length === Infinity) break;
throw new Error('Unexpected end of packet');
}
await writer.write(value.slice(0, packet_length - bytesRead));
bytesRead += value.length;
if (bytesRead >= packet_length) {
reader.unshift(value.slice(packet_length - bytesRead + value.length));
break;
}
}
}
} while(wasPartialLength);
if (!streaming) {
packet = await reader.readBytes(packet_length);
await callback({ tag, packet });
} }
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (!done) reader.unshift(value); if (!done) reader.unshift(value);
if (controller) { if (writer) {
controller.close(); await writer.ready;
await writer.close();
} }
if (streaming) await callbackReturned;
return done || !value || !value.length; return done || !value || !value.length;
} catch(e) { } catch(e) {
if (controller) { if (writer) {
controller.error(e); writer.abort(e);
return true; return true;
} else { } else {
throw e; throw e;

View File

@ -47,7 +47,7 @@ List.prototype.read = async function (bytes) {
packet.packets = new List(); packet.packets = new List();
packet.fromStream = util.isStream(parsed.packet); packet.fromStream = util.isStream(parsed.packet);
await packet.read(parsed.packet); await packet.read(parsed.packet);
writer.write(packet); await writer.write(packet);
} catch (e) { } catch (e) {
if (!config.tolerant || if (!config.tolerant ||
parsed.tag === enums.packet.symmetricallyEncrypted || parsed.tag === enums.packet.symmetricallyEncrypted ||