This commit is contained in:
vss-devel 2019-02-26 20:31:09 +03:00
parent 845789ea31
commit 477e4a657e

127
zimmer.js
View File

@ -394,48 +394,33 @@ class Cluster {
// <2nd Blob> data n/a n/a data of the <2nd Blob> // <2nd Blob> data n/a n/a data of the <2nd Blob>
// ... data ... n/a ... // ... data ... n/a ...
async save () { async getData () {
//~ log( 'Cluster.prototype.save', this.compressible, this.blobs ) //~ log( 'Cluster.prototype.save', this.compressible, this.blobs )
var nBlobs = this.blobs.length
if ( nBlobs == 0 )
return Promise.resolve()
// generate blob offsets // generate blob offsets
var offsets = Buffer.alloc(( nBlobs + 1 ) * 4 ) const byteLength = 4
var blobOffset = offsets.length let blobOffset = ( this.blobs.length + 1 ) * byteLength
for ( var i=0; i < nBlobs; i++ ) { const offsetIndex = this.blobs.map(( blob, i, arr ) => {
offsets.writeUIntLE( blobOffset, i * 4, 4 ) const val = [ blobOffset, byteLength ]
blobOffset += this.blobs[ i ].length blobOffset += blob.length
} return val
//~ log( this.id,'generate blob offsets', nBlobs, offsets.length, i, blobOffset ) })
offsets.writeUIntLE( blobOffset, i * 4, 4 ) // final offset offsetIndex.push([ blobOffset, byteLength ]) // final offset
// join offsets and article data const chunks = offsetIndex.concat( this.blobs )
this.blobs.unshift( offsets )
var data = Buffer.concat( this.blobs )
var rawSize = data.length
var compression = this.compressible ? 4 : 0 let data = chunksToBuffer( chunks )
var id = this.id
if ( compression ) { if ( this.compressible ) {
// https://tukaani.org/lzma/benchmarks.html // https://tukaani.org/lzma/benchmarks.html
data = await lzma.compress( data, 7 ) // 3 | lzma.PRESET_EXTREME ) // https://catchchallenger.first-world.info/wiki/Quick_Benchmark:_Gzip_vs_Bzip2_vs_LZMA_vs_XZ_vs_LZ4_vs_LZO
data = await lzma.compress( data, 5 ) // 3 | lzma.PRESET_EXTREME )
log( 'Cluster lzma compressed' ) log( 'Cluster lzma compressed' )
} }
log( 'Cluster write', id, compression ) const compression = toBuffer( this.compressible ? 4 : 0, 1 )
const offset = await out.write( Buffer.concat([ Buffer.from([ compression ]), data ]))
log( 'Cluster saved', id, offset ) return Buffer.concat([ compression, data ])
return wikiDb.run(
'INSERT INTO clusters (id, offset) VALUES (?,?)',
[
id,
offset
]
)
} }
} }
@ -445,6 +430,7 @@ class Cluster {
class ClusterPool { class ClusterPool {
constructor () { constructor () {
this.holder = {} this.holder = {}
this.savePrefix = outPath + '.tmp'
this.pool = genericPool.createPool( this.pool = genericPool.createPool(
{ {
create () { return Promise.resolve( Symbol() ) }, create () { return Promise.resolve( Symbol() ) },
@ -465,25 +451,37 @@ class ClusterPool {
return cluster return cluster
} }
async append ( mimeType, data, id /* for debugging */ ) { async save ( cluster ) {
//~ log( 'ClusterWriter.append', arguments ) const data = await cluster.getData()
await fs.outputFile( osPath.join( this.savePrefix, `${cluster.id}` ), data )
await wikiDb.run(
'INSERT INTO clusters ( id, size ) VALUES ( ?,? )',
[
cluster.id,
data.length
]
)
log( 'Cluster saved', cluster.id, data.length )
return
}
var compressible = this.isCompressible( mimeType, data, id ) async append ( mimeType, data, path /* for debugging */ ) {
var compressible = this.isCompressible( mimeType, data, path )
var cluster = this.getCluster( compressible ) var cluster = this.getCluster( compressible )
var clusterNum = cluster.id var clusterNum = cluster.id
var blobNum = cluster.append( data ) var blobNum = cluster.append( data )
if ( blobNum === false ) { // store to a new cluster if ( blobNum === false ) { // save current cluster, create and store into a new cluster
this.removeCluster( compressible ) this.removeCluster( compressible )
const token = await this.pool.acquire() const token = await this.pool.acquire()
cluster.save() await this.save( cluster )
.then( () => this.pool.release( token )) this.pool.release( token )
return this.append( mimeType, data, id ) return this.append( mimeType, data, path )
} }
log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, id ) log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, path )
return [ clusterNum, blobNum ] return [ clusterNum, blobNum ]
} }
@ -508,31 +506,51 @@ class ClusterPool {
// ... integer ... 8 ... // ... integer ... 8 ...
async storeIndex () { async storeIndex () {
const byteLength = 8
const count = header.clusterCount
const start = await out.write( Buffer.alloc( 0 ))
let offset = start + count * byteLength
header.clusterPtrPos = await saveIndex ({ header.clusterPtrPos = await saveIndex ({
query: query:`
'SELECT ' + SELECT
'offset ' + size
'FROM clusters ' + FROM clusters
'ORDER BY id ' + ORDER BY id
';', ;`,
byteLength: 8, rowField: 'size',
count: header.clusterCount, byteLength,
count,
logPrefix: 'storeClusterIndex', logPrefix: 'storeClusterIndex',
rowCb: ( row, index ) => { rowCb: ( row, index ) => {
return row.offset const val = offset
offset += row.size
return val
}, },
}) })
} }
async storeClusters () {
for ( let i = 0; i < header.clusterCount; i++ ) {
const fname = osPath.join( this.savePrefix, `${i}` )
const data = await fs.readFile( fname )
const pos = await out.write( data )
log( 'storeClusters', i, pos )
await fs.remove( fname )
}
await fs.remove( this.savePrefix )
}
async finish () { async finish () {
//~ log( 'ClusterWriter.finish', ClusterWriter ) //~ log( 'ClusterWriter.finish', ClusterWriter )
for ( let i in this.holder ) { // save last clusters for ( let i in this.holder ) { // save last clusters
await this.holder[ i ].save() await this.save( this.holder[ i ] )
} }
await this.pool.drain() await this.pool.drain()
await this.pool.clear() await this.pool.clear()
return this.storeIndex() await this.storeIndex()
await this.storeClusters()
return
} }
} }
@ -1121,7 +1139,7 @@ async function openWikiDb( dbName ) {
); );
CREATE TABLE clusters ( CREATE TABLE clusters (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
offset INTEGER size INTEGER
); );
` `
) )
@ -1520,11 +1538,12 @@ async function loadRawArticles () {
} }
async function postProcess () { async function postProcess () {
await clusterWriter.finish()
await sortArticles() await sortArticles()
await resolveRedirects() await resolveRedirects()
await storeUrlIndex() await storeUrlIndex()
return storeTitleIndex() await storeTitleIndex()
await clusterWriter.finish()
return
} }
async function finalise () { async function finalise () {