diff --git a/zimmer.js b/zimmer.js index 9b016da..2144c1b 100755 --- a/zimmer.js +++ b/zimmer.js @@ -394,48 +394,33 @@ class Cluster { // <2nd Blob> data n/a n/a data of the <2nd Blob> // ... data ... n/a ... - async save () { + async getData () { //~ log( 'Cluster.prototype.save', this.compressible, this.blobs ) - var nBlobs = this.blobs.length - if ( nBlobs == 0 ) - return Promise.resolve() - // generate blob offsets - var offsets = Buffer.alloc(( nBlobs + 1 ) * 4 ) - var blobOffset = offsets.length - for ( var i=0; i < nBlobs; i++ ) { - offsets.writeUIntLE( blobOffset, i * 4, 4 ) - blobOffset += this.blobs[ i ].length - } - //~ log( this.id,'generate blob offsets', nBlobs, offsets.length, i, blobOffset ) - offsets.writeUIntLE( blobOffset, i * 4, 4 ) // final offset + const byteLength = 4 + let blobOffset = ( this.blobs.length + 1 ) * byteLength + const offsetIndex = this.blobs.map(( blob, i, arr ) => { + const val = [ blobOffset, byteLength ] + blobOffset += blob.length + return val + }) + offsetIndex.push([ blobOffset, byteLength ]) // final offset - // join offsets and article data - this.blobs.unshift( offsets ) - var data = Buffer.concat( this.blobs ) - var rawSize = data.length + const chunks = offsetIndex.concat( this.blobs ) - var compression = this.compressible ? 4 : 0 - var id = this.id + let data = chunksToBuffer( chunks ) - if ( compression ) { + if ( this.compressible ) { // https://tukaani.org/lzma/benchmarks.html - data = await lzma.compress( data, 7 ) // 3 | lzma.PRESET_EXTREME ) + // https://catchchallenger.first-world.info/wiki/Quick_Benchmark:_Gzip_vs_Bzip2_vs_LZMA_vs_XZ_vs_LZ4_vs_LZO + data = await lzma.compress( data, 5 ) // 3 | lzma.PRESET_EXTREME ) log( 'Cluster lzma compressed' ) } + + const compression = toBuffer( this.compressible ? 4 : 0, 1 ) - log( 'Cluster write', id, compression ) - const offset = await out.write( Buffer.concat([ Buffer.from([ compression ]), data ])) - - log( 'Cluster saved', id, offset ) - return wikiDb.run( - 'INSERT INTO clusters (id, offset) VALUES (?,?)', - [ - id, - offset - ] - ) + return Buffer.concat([ compression, data ]) } } @@ -445,6 +430,7 @@ class Cluster { class ClusterPool { constructor () { this.holder = {} + this.savePrefix = outPath + '.tmp' this.pool = genericPool.createPool( { create () { return Promise.resolve( Symbol() ) }, @@ -465,25 +451,37 @@ class ClusterPool { return cluster } - async append ( mimeType, data, id /* for debugging */ ) { - //~ log( 'ClusterWriter.append', arguments ) + async save ( cluster ) { + const data = await cluster.getData() + await fs.outputFile( osPath.join( this.savePrefix, `${cluster.id}` ), data ) + await wikiDb.run( + 'INSERT INTO clusters ( id, size ) VALUES ( ?,? )', + [ + cluster.id, + data.length + ] + ) + log( 'Cluster saved', cluster.id, data.length ) + return + } - var compressible = this.isCompressible( mimeType, data, id ) + async append ( mimeType, data, path /* for debugging */ ) { + var compressible = this.isCompressible( mimeType, data, path ) var cluster = this.getCluster( compressible ) var clusterNum = cluster.id var blobNum = cluster.append( data ) - if ( blobNum === false ) { // store to a new cluster + if ( blobNum === false ) { // save current cluster, create and store into a new cluster this.removeCluster( compressible ) const token = await this.pool.acquire() - cluster.save() - .then( () => this.pool.release( token )) + await this.save( cluster ) + this.pool.release( token ) - return this.append( mimeType, data, id ) + return this.append( mimeType, data, path ) } - log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, id ) + log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, path ) return [ clusterNum, blobNum ] } @@ -508,31 +506,51 @@ class ClusterPool { // ... integer ... 8 ... async storeIndex () { + const byteLength = 8 + const count = header.clusterCount + const start = await out.write( Buffer.alloc( 0 )) + let offset = start + count * byteLength + header.clusterPtrPos = await saveIndex ({ - query: - 'SELECT ' + - 'offset ' + - 'FROM clusters ' + - 'ORDER BY id ' + - ';', - byteLength: 8, - count: header.clusterCount, + query:` + SELECT + size + FROM clusters + ORDER BY id + ;`, + rowField: 'size', + byteLength, + count, logPrefix: 'storeClusterIndex', rowCb: ( row, index ) => { - return row.offset + const val = offset + offset += row.size + return val }, - }) } + async storeClusters () { + for ( let i = 0; i < header.clusterCount; i++ ) { + const fname = osPath.join( this.savePrefix, `${i}` ) + const data = await fs.readFile( fname ) + const pos = await out.write( data ) + log( 'storeClusters', i, pos ) + await fs.remove( fname ) + } + await fs.remove( this.savePrefix ) + } + async finish () { //~ log( 'ClusterWriter.finish', ClusterWriter ) for ( let i in this.holder ) { // save last clusters - await this.holder[ i ].save() + await this.save( this.holder[ i ] ) } await this.pool.drain() await this.pool.clear() - return this.storeIndex() + await this.storeIndex() + await this.storeClusters() + return } } @@ -1121,7 +1139,7 @@ async function openWikiDb( dbName ) { ); CREATE TABLE clusters ( id INTEGER PRIMARY KEY, - offset INTEGER + size INTEGER ); ` ) @@ -1520,11 +1538,12 @@ async function loadRawArticles () { } async function postProcess () { - await clusterWriter.finish() await sortArticles() await resolveRedirects() await storeUrlIndex() - return storeTitleIndex() + await storeTitleIndex() + await clusterWriter.finish() + return } async function finalise () {