Optimize reading large messages with lots of tiny partial body parts (#864)

* Fix pako decompression

* Optimize base64-decoding

* Don't stream-parse packets when not stream-reading data
This commit is contained in:
Daniel Huigens 2019-02-21 17:33:55 +01:00 committed by Sanjana Rajan
parent 54fc1dde3d
commit 5dcaf85f5a
7 changed files with 36 additions and 24 deletions

View File

@ -21,6 +21,13 @@ import stream from 'web-stream-tools';
const b64s = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'; // Standard radix-64 const b64s = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'; // Standard radix-64
const b64u = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'; // URL-safe radix-64 const b64u = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_'; // URL-safe radix-64
const b64toByte = [];
for (let i = 0; i < b64s.length; i++) {
b64toByte[b64s.charCodeAt(i)] = i;
}
b64toByte[b64u.charCodeAt(62)] = 62;
b64toByte[b64u.charCodeAt(63)] = 63;
/** /**
* Convert binary array to radix-64 * Convert binary array to radix-64
* @param {Uint8Array | ReadableStream<Uint8Array>} t Uint8Array to convert * @param {Uint8Array | ReadableStream<Uint8Array>} t Uint8Array to convert
@ -98,7 +105,6 @@ function s2r(t, u = false) {
*/ */
function r2s(t, u) { function r2s(t, u) {
// TODO check atob alternative // TODO check atob alternative
const b64 = u ? b64u : b64s;
let c; let c;
let s = 0; let s = 0;
@ -109,7 +115,7 @@ function r2s(t, u) {
const r = new Uint8Array(Math.ceil(0.75 * tl)); const r = new Uint8Array(Math.ceil(0.75 * tl));
let index = 0; let index = 0;
for (let n = 0; n < tl; n++) { for (let n = 0; n < tl; n++) {
c = b64.indexOf(value.charAt(n)); c = b64toByte[value.charCodeAt(n)];
if (c >= 0) { if (c >= 0) {
if (s) { if (s) {
r[index++] = a | ((c >> (6 - s)) & 255); r[index++] = a | ((c >> (6 - s)) & 255);

View File

@ -761,7 +761,7 @@ export async function read(input, fromStream=util.isStream(input)) {
input = stream.nodeToWeb(input); input = stream.nodeToWeb(input);
} }
const packetlist = new packet.List(); const packetlist = new packet.List();
await packetlist.read(input); await packetlist.read(input, fromStream);
const message = new Message(packetlist); const message = new Message(packetlist);
message.fromStream = fromStream; message.fromStream = fromStream;
return message; return message;

View File

@ -69,7 +69,7 @@ function Compressed() {
* Parsing function for the packet. * Parsing function for the packet.
* @param {Uint8Array | ReadableStream<Uint8Array>} bytes Payload of a tag 8 packet * @param {Uint8Array | ReadableStream<Uint8Array>} bytes Payload of a tag 8 packet
*/ */
Compressed.prototype.read = async function (bytes) { Compressed.prototype.read = async function (bytes, streaming) {
await stream.parse(bytes, async reader => { await stream.parse(bytes, async reader => {
// One octet that gives the algorithm used to compress the packet. // One octet that gives the algorithm used to compress the packet.
@ -78,7 +78,7 @@ Compressed.prototype.read = async function (bytes) {
// Compressed data, which makes up the remainder of the packet. // Compressed data, which makes up the remainder of the packet.
this.compressed = reader.remainder(); this.compressed = reader.remainder();
await this.decompress(); await this.decompress(streaming);
}); });
}; };
@ -100,13 +100,13 @@ Compressed.prototype.write = function () {
* Decompression method for decompressing the compressed data * Decompression method for decompressing the compressed data
* read by read_packet * read by read_packet
*/ */
Compressed.prototype.decompress = async function () { Compressed.prototype.decompress = async function (streaming) {
if (!decompress_fns[this.algorithm]) { if (!decompress_fns[this.algorithm]) {
throw new Error(this.algorithm + ' decompression not supported'); throw new Error(this.algorithm + ' decompression not supported');
} }
await this.packets.read(decompress_fns[this.algorithm](this.compressed)); await this.packets.read(decompress_fns[this.algorithm](this.compressed), streaming);
}; };
/** /**
@ -147,8 +147,10 @@ function pako_zlib(constructor, options = {}) {
return obj.result; return obj.result;
} }
}, () => { }, () => {
if (constructor === pako.Deflate) {
obj.push([], pako.Z_FINISH); obj.push([], pako.Z_FINISH);
return obj.result; return obj.result;
}
}); });
}; };
} }

View File

@ -137,7 +137,7 @@ export default {
* @param {Function} callback Function to call with the parsed packet * @param {Function} callback Function to call with the parsed packet
* @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise. * @returns {Boolean} Returns false if the stream was empty and parsing is done, and true otherwise.
*/ */
read: async function(input, callback) { read: async function(input, streaming, callback) {
const reader = stream.getReader(input); const reader = stream.getReader(input);
let writer; let writer;
try { try {
@ -166,14 +166,16 @@ export default {
packet_length_type = headerByte & 0x03; // bit 1-0 packet_length_type = headerByte & 0x03; // bit 1-0
} }
const streaming = this.supportsStreaming(tag); const supportsStreaming = this.supportsStreaming(tag);
let packet = null; let packet = null;
let callbackReturned; let callbackReturned;
if (streaming) { if (streaming && supportsStreaming) {
const transform = new TransformStream(); const transform = new TransformStream();
writer = stream.getWriter(transform.writable); writer = stream.getWriter(transform.writable);
packet = transform.readable; packet = transform.readable;
callbackReturned = callback({ tag, packet }); callbackReturned = callback({ tag, packet });
} else {
packet = [];
} }
let wasPartialLength; let wasPartialLength;
@ -224,7 +226,7 @@ export default {
} else if (lengthByte > 223 && lengthByte < 255) { } else if (lengthByte > 223 && lengthByte < 255) {
packet_length = 1 << (lengthByte & 0x1F); packet_length = 1 << (lengthByte & 0x1F);
wasPartialLength = true; wasPartialLength = true;
if (!streaming) { if (!supportsStreaming) {
throw new TypeError('This packet type does not support partial lengths.'); throw new TypeError('This packet type does not support partial lengths.');
} }
// 4.2.2.3. Five-Octet Lengths // 4.2.2.3. Five-Octet Lengths
@ -233,35 +235,37 @@ export default {
8) | await reader.readByte(); 8) | await reader.readByte();
} }
} }
if (writer && packet_length > 0) { if (packet_length >= 0) {
let bytesRead = 0; let bytesRead = 0;
while (true) { while (true) {
await writer.ready; if (writer) await writer.ready;
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (done) { if (done) {
if (packet_length === Infinity) break; if (packet_length === Infinity) break;
throw new Error('Unexpected end of packet'); throw new Error('Unexpected end of packet');
} }
await writer.write(value.slice(0, packet_length - bytesRead)); const chunk = value.subarray(0, packet_length - bytesRead);
if (writer) await writer.write(chunk);
else packet.push(chunk);
bytesRead += value.length; bytesRead += value.length;
if (bytesRead >= packet_length) { if (bytesRead >= packet_length) {
reader.unshift(value.slice(packet_length - bytesRead + value.length)); reader.unshift(value.subarray(packet_length - bytesRead + value.length));
break; break;
} }
} }
} }
} while(wasPartialLength); } while(wasPartialLength);
if (!streaming) { if (!writer) {
packet = await reader.readBytes(packet_length); packet = util.concatUint8Array(packet);
await callback({ tag, packet }); await callback({ tag, packet });
} }
const nextPacket = await reader.peekBytes(2); const nextPacket = await reader.peekBytes(2);
if (writer) { if (writer) {
await writer.ready; await writer.ready;
await writer.close(); await writer.close();
await callbackReturned;
} }
if (streaming) await callbackReturned;
return !nextPacket || !nextPacket.length; return !nextPacket || !nextPacket.length;
} catch(e) { } catch(e) {
if (writer) { if (writer) {

View File

@ -38,19 +38,19 @@ List.prototype = [];
* Reads a stream of binary data and interprents it as a list of packets. * Reads a stream of binary data and interprents it as a list of packets.
* @param {Uint8Array | ReadableStream<Uint8Array>} A Uint8Array of bytes. * @param {Uint8Array | ReadableStream<Uint8Array>} A Uint8Array of bytes.
*/ */
List.prototype.read = async function (bytes) { List.prototype.read = async function (bytes, streaming) {
this.stream = stream.transformPair(bytes, async (readable, writable) => { this.stream = stream.transformPair(bytes, async (readable, writable) => {
const writer = stream.getWriter(writable); const writer = stream.getWriter(writable);
try { try {
while (true) { while (true) {
await writer.ready; await writer.ready;
const done = await packetParser.read(readable, async parsed => { const done = await packetParser.read(readable, streaming, async parsed => {
try { try {
const tag = enums.read(enums.packet, parsed.tag); const tag = enums.read(enums.packet, parsed.tag);
const packet = packets.newPacketFromTag(tag); const packet = packets.newPacketFromTag(tag);
packet.packets = new List(); packet.packets = new List();
packet.fromStream = util.isStream(parsed.packet); packet.fromStream = util.isStream(parsed.packet);
await packet.read(parsed.packet); await packet.read(parsed.packet, streaming);
await writer.write(packet); await writer.write(packet);
} catch (e) { } catch (e) {
if (!config.tolerant || packetParser.supportsStreaming(parsed.tag)) { if (!config.tolerant || packetParser.supportsStreaming(parsed.tag)) {
@ -82,7 +82,7 @@ List.prototype.read = async function (bytes) {
} else { } else {
this.stream = null; this.stream = null;
} }
if (done || value.fromStream) { if (done || packetParser.supportsStreaming(value.tag)) {
break; break;
} }
} }

View File

@ -99,7 +99,7 @@ SymEncryptedAEADProtected.prototype.decrypt = async function (sessionKeyAlgorith
if (config.aead_protect_version !== 4) { if (config.aead_protect_version !== 4) {
this.cipherAlgo = enums.write(enums.symmetric, sessionKeyAlgorithm); this.cipherAlgo = enums.write(enums.symmetric, sessionKeyAlgorithm);
} }
await this.packets.read(await this.crypt('decrypt', key, stream.clone(this.encrypted), streaming)); await this.packets.read(await this.crypt('decrypt', key, stream.clone(this.encrypted), streaming), streaming);
return true; return true;
}; };

View File

@ -132,7 +132,7 @@ SymEncryptedIntegrityProtected.prototype.decrypt = async function (sessionKeyAlg
if (!util.isStream(encrypted) || !config.allow_unauthenticated_stream) { if (!util.isStream(encrypted) || !config.allow_unauthenticated_stream) {
packetbytes = await stream.readToEnd(packetbytes); packetbytes = await stream.readToEnd(packetbytes);
} }
await this.packets.read(packetbytes); await this.packets.read(packetbytes, streaming);
return true; return true;
}; };