diff --git a/src/packet/packet.js b/src/packet/packet.js index fc84d28b..00ab90fd 100644 --- a/src/packet/packet.js +++ b/src/packet/packet.js @@ -138,8 +138,8 @@ export default { * @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise. */ read: async function(input, callback) { - let reader = stream.getReader(input); - let controller; + const reader = stream.getReader(input); + let writer; try { const peekedBytes = await reader.peekBytes(2); // some sanity checks @@ -168,42 +168,50 @@ export default { const streaming = this.supportsStreaming(tag); let packet = null; - if (!format) { - // 4.2.1. Old Format Packet Lengths - switch (packet_length_type) { - case 0: - // The packet has a one-octet length. The header is 2 octets - // long. - packet_length = await reader.readByte(); - break; - case 1: - // The packet has a two-octet length. The header is 3 octets - // long. - packet_length = (await reader.readByte() << 8) | await reader.readByte(); - break; - case 2: - // The packet has a four-octet length. The header is 5 - // octets long. - packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() << - 8) | await reader.readByte(); - break; - default: - // 3 - The packet is of indeterminate length. The header is 1 - // octet long, and the implementation must determine how long - // the packet is. If the packet is in a file, this means that - // the packet extends until the end of the file. In general, - // an implementation SHOULD NOT use indeterminate-length - // packets except where the end of the data will be clear - // from the context, and even then it is better to use a - // definite length, or a new format header. The new format - // headers described below have a mechanism for precisely - // encoding data of indeterminate length. - packet_length = Infinity; - break; - } - } else { // 4.2.2. New Format Packet Lengths - let wasPartialLength; - do { + let callbackReturned; + if (streaming) { + const transform = new TransformStream(); + writer = stream.getWriter(transform.writable); + packet = transform.readable; + callbackReturned = callback({ tag, packet }); + } + + let wasPartialLength; + do { + if (!format) { + // 4.2.1. Old Format Packet Lengths + switch (packet_length_type) { + case 0: + // The packet has a one-octet length. The header is 2 octets + // long. + packet_length = await reader.readByte(); + break; + case 1: + // The packet has a two-octet length. The header is 3 octets + // long. + packet_length = (await reader.readByte() << 8) | await reader.readByte(); + break; + case 2: + // The packet has a four-octet length. The header is 5 + // octets long. + packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() << + 8) | await reader.readByte(); + break; + default: + // 3 - The packet is of indeterminate length. The header is 1 + // octet long, and the implementation must determine how long + // the packet is. If the packet is in a file, this means that + // the packet extends until the end of the file. In general, + // an implementation SHOULD NOT use indeterminate-length + // packets except where the end of the data will be clear + // from the context, and even then it is better to use a + // definite length, or a new format header. The new format + // headers described below have a mechanism for precisely + // encoding data of indeterminate length. + packet_length = Infinity; + break; + } + } else { // 4.2.2. New Format Packet Lengths // 4.2.2.1. One-Octet Lengths const lengthByte = await reader.readByte(); wasPartialLength = false; @@ -218,51 +226,47 @@ export default { wasPartialLength = true; if (!streaming) { throw new TypeError('This packet type does not support partial lengths.'); - } else if (!packet) { - packet = new ReadableStream({ - // eslint-disable-next-line no-loop-func - async start(_controller) { - controller = _controller; - }, - cancel: stream.cancel.bind(input) - }); - callback({ tag, packet }); } // 4.2.2.3. Five-Octet Lengths } else { packet_length = (await reader.readByte() << 24) | (await reader.readByte() << 16) | (await reader.readByte() << 8) | await reader.readByte(); } - if (controller) { - controller.enqueue(await reader.readBytes(packet_length)); - } - } while(wasPartialLength); - } - - if (!packet) { - if (streaming) { - // Send the remainder of the packet to the callback as a stream - reader.releaseLock(); - packet = stream.slice(stream.clone(input), 0, packet_length); - await callback({ tag, packet }); - - // Read the entire packet before parsing the next one - reader = stream.getReader(input); - await reader.readBytes(packet_length); - } else { - packet = await reader.readBytes(packet_length); - await callback({ tag, packet }); } + if (writer) { + let bytesRead = 0; + while (true) { + await writer.ready; + const { done, value } = await reader.read(); + if (done) { + if (packet_length === Infinity) break; + throw new Error('Unexpected end of packet'); + } + await writer.write(value.slice(0, packet_length - bytesRead)); + bytesRead += value.length; + if (bytesRead >= packet_length) { + reader.unshift(value.slice(packet_length - bytesRead + value.length)); + break; + } + } + } + } while(wasPartialLength); + + if (!streaming) { + packet = await reader.readBytes(packet_length); + await callback({ tag, packet }); } const { done, value } = await reader.read(); if (!done) reader.unshift(value); - if (controller) { - controller.close(); + if (writer) { + await writer.ready; + await writer.close(); } + if (streaming) await callbackReturned; return done || !value || !value.length; } catch(e) { - if (controller) { - controller.error(e); + if (writer) { + writer.abort(e); return true; } else { throw e; diff --git a/src/packet/packetlist.js b/src/packet/packetlist.js index 8351ca1b..fcedb6ee 100644 --- a/src/packet/packetlist.js +++ b/src/packet/packetlist.js @@ -47,7 +47,7 @@ List.prototype.read = async function (bytes) { packet.packets = new List(); packet.fromStream = util.isStream(parsed.packet); await packet.read(parsed.packet); - writer.write(packet); + await writer.write(packet); } catch (e) { if (!config.tolerant || parsed.tag === enums.packet.symmetricallyEncrypted ||