Streaming (de)compression (Web)

compressjs has a streaming API, but it is synchronous, so we can't use it
(at least in the browser).
This commit is contained in:
Daniel Huigens 2018-05-17 16:28:12 +02:00
parent 37014ecf30
commit d67526338e
2 changed files with 56 additions and 9 deletions

View File

@ -19,6 +19,7 @@
* @requires pako
* @requires config
* @requires enums
* @requires stream
* @requires util
* @requires compression/bzip2
*/
@ -26,6 +27,7 @@
import pako from 'pako';
import config from '../config';
import enums from '../enums';
import stream from '../stream';
import util from '../util';
import Bzip2 from '../compression/bzip2.build.js';
@ -68,11 +70,13 @@ function Compressed() {
* @param {String} bytes Payload of a tag 8 packet
*/
Compressed.prototype.read = async function (bytes) {
const reader = stream.getReader(bytes);
// One octet that gives the algorithm used to compress the packet.
this.algorithm = enums.read(enums.compression, bytes[0]);
this.algorithm = enums.read(enums.compression, await reader.readByte());
// Compressed data, which makes up the remainder of the packet.
this.compressed = bytes.subarray(1, bytes.length);
this.compressed = reader.substream();
await this.decompress();
};
@ -87,7 +91,7 @@ Compressed.prototype.write = function () {
this.compress();
}
return util.concatUint8Array([new Uint8Array([enums.write(enums.compression, this.algorithm)]), this.compressed]);
return util.concat([new Uint8Array([enums.write(enums.compression, this.algorithm)]), this.compressed]);
};
@ -135,9 +139,22 @@ function node_zlib(func, options = {}) {
function pako_zlib(constructor, options = {}) {
return function(data) {
const obj = new constructor(options);
obj.push(data, true);
const obj = new constructor(options);
return stream.transform(data, value => {
obj.push(value, pako.Z_SYNC_FLUSH);
return obj.result;
});
};
}
function bzip2(func) {
return function(data) {
return new ReadableStream({
async start(controller) {
controller.enqueue(func(await stream.readToEnd(data)));
controller.close();
}
});
};
}
@ -149,7 +166,7 @@ if (nodeZlib) { // Use Node native zlib for DEFLATE compression/decompression
zip: node_zlib(nodeZlib.deflateRawSync, { level: config.deflate_level }),
// eslint-disable-next-line no-sync
zlib: node_zlib(nodeZlib.deflateSync, { level: config.deflate_level }),
bzip2: Bzip2.compressFile
bzip2: bzip2(Bzip2.compressFile)
};
decompress_fns = {
@ -157,18 +174,18 @@ if (nodeZlib) { // Use Node native zlib for DEFLATE compression/decompression
zip: node_zlib(nodeZlib.inflateRawSync),
// eslint-disable-next-line no-sync
zlib: node_zlib(nodeZlib.inflateSync),
bzip2: Bzip2.decompressFile
bzip2: bzip2(Bzip2.decompressFile)
};
} else { // Use JS fallbacks
compress_fns = {
zip: pako_zlib(pako.Deflate, { raw: true, level: config.deflate_level }),
zlib: pako_zlib(pako.Deflate, { level: config.deflate_level }),
bzip2: Bzip2.compressFile
bzip2: bzip2(Bzip2.compressFile)
};
decompress_fns = {
zip: pako_zlib(pako.Inflate, { raw: true }),
zlib: pako_zlib(pako.Inflate),
bzip2: Bzip2.decompressFile
bzip2: bzip2(Bzip2.decompressFile)
};
}

View File

@ -2124,6 +2124,36 @@ describe('OpenPGP.js public api tests', function() {
verifyCompressionDecrypted(decrypted);
});
});
it('Streaming encrypt and decrypt small message roundtrip', async function() {
let plaintext = [];
let i = 0;
const data = new ReadableStream({
async pull(controller) {
if (i++ < 4) {
let randomBytes = await openpgp.crypto.random.getRandomBytes(10);
controller.enqueue(randomBytes);
plaintext.push(randomBytes);
} else {
controller.close();
}
}
});
const encrypted = await openpgp.encrypt(modifyCompressionEncryptOptions({
data,
passwords: ['test'],
}));
const msgAsciiArmored = encrypted.data;
const message = await openpgp.message.readArmored(msgAsciiArmored);
const decrypted = await openpgp.decrypt({
passwords: ['test'],
message,
format: 'binary'
});
expect(openpgp.util.isStream(decrypted.data)).to.be.true;
expect(await openpgp.stream.readToEnd(decrypted.data)).to.deep.equal(openpgp.util.concatUint8Array(plaintext));
});
});
});