ClusterPool

This commit is contained in:
v 2018-11-29 19:47:48 +03:00
parent 05d7b83eba
commit 818c344d2f

View File

@ -426,42 +426,54 @@ class Cluster {
} }
// //
// ClusterWriter // ClusterPool
// //
var ClusterWriter = { class ClusterPool {
true: new Cluster( true ), // compressible cluster constructor () {
false: new Cluster( false ), // uncompressible cluster this.holder = {}
pool: genericPool.createPool( this.pool = genericPool.createPool(
{ {
create () { return Promise.resolve( Symbol() ) }, create () { return Promise.resolve( Symbol() ) },
destroy ( resource ) { return Promise.resolve() }, destroy ( resource ) { return Promise.resolve() },
}, },
{ max: 8, } { max: 8, }
), )
}
append: async function ( mimeType, data, id /* for debugging */ ) { removeCluster ( type ) {
delete this.holder[ type ]
}
getCluster ( type ) {
let cluster = this.holder[ type ]
if ( ! cluster )
cluster = this.holder[ type ] = new Cluster( type )
return cluster
}
async append ( mimeType, data, id /* for debugging */ ) {
//~ log( 'ClusterWriter.append', arguments ) //~ log( 'ClusterWriter.append', arguments )
var compressible = ClusterWriter.isCompressible( mimeType, data, id ) var compressible = this.isCompressible( mimeType, data, id )
var cluster = ClusterWriter[ compressible ] var cluster = this.getCluster( compressible )
var clusterNum = cluster.id var clusterNum = cluster.id
var blobNum = cluster.append( data ) var blobNum = cluster.append( data )
if ( blobNum === false ) { // store to a new cluster if ( blobNum === false ) { // store to a new cluster
ClusterWriter[ compressible ] = new Cluster( compressible ) this.removeCluster( compressible )
const token = await ClusterWriter.pool.acquire() const token = await this.pool.acquire()
cluster.save() cluster.save()
.then( () => ClusterWriter.pool.release( token )) .then( () => this.pool.release( token ))
return ClusterWriter.append( mimeType, data, id ) return this.append( mimeType, data, id )
} }
log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, id ) log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, id )
return Promise.resolve([ clusterNum, blobNum ]) return [ clusterNum, blobNum ]
}, }
isCompressible: function ( mimeType, data, id ) { isCompressible ( mimeType, data, id ) {
if ( data == null || data.length == 0 ) if ( data == null || data.length == 0 )
return false return false
if ( !mimeType ) { if ( !mimeType ) {
@ -471,7 +483,7 @@ var ClusterWriter = {
if ( mimeType == 'image/svg+xml' || mimeType.split( '/' )[ 0 ] == 'text' ) if ( mimeType == 'image/svg+xml' || mimeType.split( '/' )[ 0 ] == 'text' )
return true return true
return !! ( mimeDb[ mimeType ] && mimeDb[ mimeType ].compressible ) return !! ( mimeDb[ mimeType ] && mimeDb[ mimeType ].compressible )
}, }
// The cluster pointer list is a list of 8 byte offsets which point to all data clusters in a ZIM file. // The cluster pointer list is a list of 8 byte offsets which point to all data clusters in a ZIM file.
// Field Name Type Offset Length Description // Field Name Type Offset Length Description
@ -480,8 +492,8 @@ var ClusterWriter = {
// <nth Cluster> integer (n-1)*8 8 pointer to the <nth Cluster> // <nth Cluster> integer (n-1)*8 8 pointer to the <nth Cluster>
// ... integer ... 8 ... // ... integer ... 8 ...
storeIndex: async function () { async storeIndex () {
return saveIndex ({ header.clusterPtrPos = await saveIndex ({
query: query:
'SELECT ' + 'SELECT ' +
'offset ' + 'offset ' +
@ -493,17 +505,17 @@ var ClusterWriter = {
count: header.clusterCount, count: header.clusterCount,
logInfo: 'storeClusterIndex', logInfo: 'storeClusterIndex',
}) })
.then( offset => header.clusterPtrPos = offset ) }
},
finish: async function () { async finish () {
//~ log( 'ClusterWriter.finish', ClusterWriter ) //~ log( 'ClusterWriter.finish', ClusterWriter )
await ClusterWriter[ true ].save() // save last compressible cluster for ( let i in this.holder ) { // save last clusters
await ClusterWriter[ false ].save() // save last uncompressible cluster await this.holder[ i ].save()
await ClusterWriter.pool.drain() }
await ClusterWriter.pool.clear() await this.pool.drain()
return ClusterWriter.storeIndex() await this.pool.clear()
}, return this.storeIndex()
}
} }
class NoProcessingRequired extends Error { class NoProcessingRequired extends Error {
@ -777,7 +789,7 @@ class DataItem extends Item {
if ( !( data instanceof Buffer )) { if ( !( data instanceof Buffer )) {
data = Buffer.from( data ) data = Buffer.from( data )
} }
const [ clusterIdx, blobIdx ] = await ClusterWriter.append( this.mimeType, data, this.path ) const [ clusterIdx, blobIdx ] = await clusterWriter.append( this.mimeType, data, this.path )
Object.assign( this, { clusterIdx, blobIdx }) Object.assign( this, { clusterIdx, blobIdx })
} }
@ -1381,6 +1393,8 @@ function calculateFileHash () {
} }
async function initialise () { async function initialise () {
clusterWriter = new ClusterPool
var stat = await fs.stat( srcPath ) var stat = await fs.stat( srcPath )
if ( ! stat.isDirectory() ) { if ( ! stat.isDirectory() ) {
throw new Error( srcPath + ' is not a directory' ) throw new Error( srcPath + ' is not a directory' )
@ -1491,7 +1505,7 @@ async function loadRawArticles () {
} }
async function postProcess () { async function postProcess () {
await ClusterWriter.finish() await clusterWriter.finish()
await sortArticles() await sortArticles()
await resolveRedirects() await resolveRedirects()
await storeUrlIndex() await storeUrlIndex()