more async/await

This commit is contained in:
v 2018-11-28 18:48:28 +03:00
parent f8768360ee
commit 036f12441b

477
zimmer.js
View File

@ -317,10 +317,10 @@ class Writer {
// Cluster
//
// ClusterSizeThreshold = 8 * 1024 * 1024
var ClusterSizeThreshold = 4 * 1024 * 1024
// ClusterSizeThreshold = 2 * 1024 * 1024
// ClusterSizeThreshold = 2 * 1024
// var ClusterSizeThreshold = 8 * 1024 * 1024
//~ var ClusterSizeThreshold = 4 * 1024 * 1024
var ClusterSizeThreshold = 1 * 1024 * 1024
// var ClusterSizeThreshold = 2 * 1024 * 1024
class Cluster {
constructor ( compressible ) {
@ -354,7 +354,7 @@ class Cluster {
// <2nd Blob> data n/a n/a data of the <2nd Blob>
// ... data ... n/a ...
save () {
async save () {
//~ log( 'Cluster.prototype.save', this.compressible, this.blobs )
var nBlobs = this.blobs.length
@ -379,15 +379,14 @@ class Cluster {
var compression = this.compressible ? 4 : 0
var id = this.id
return Promise.coroutine( function* () {
if ( compression ) {
// https://tukaani.org/lzma/benchmarks.html
data = yield lzma.compress( data, 7 ) // 3 | lzma.PRESET_EXTREME )
data = await lzma.compress( data, 7 ) // 3 | lzma.PRESET_EXTREME )
log( 'Cluster lzma compressed' )
}
log( 'Cluster write', id, compression )
const offset = yield out.write( Buffer.concat([ Buffer.from([ compression ]), data ]))
const offset = await out.write( Buffer.concat([ Buffer.from([ compression ]), data ]))
log( 'Cluster saved', id, offset )
return indexerDb.run(
@ -397,7 +396,6 @@ class Cluster {
offset
]
)
}) ()
}
}
@ -415,7 +413,7 @@ var ClusterWriter = {
{ max: 8, }
),
append: function ( mimeType, data, id /* for debugging */ ) {
append: async function ( mimeType, data, id /* for debugging */ ) {
//~ log( 'ClusterWriter.append', arguments )
var compressible = ClusterWriter.isCompressible( mimeType, data, id )
@ -425,14 +423,12 @@ var ClusterWriter = {
if ( blobNum === false ) { // store to a new cluster
ClusterWriter[ compressible ] = new Cluster( compressible )
const ready = ClusterWriter.pool.acquire()
const token = await ClusterWriter.pool.acquire()
ready.then( token => cluster.save()
cluster.save()
.then( () => ClusterWriter.pool.release( token ))
)
return ready
.then( () => ClusterWriter.append( mimeType, data, id ))
return ClusterWriter.append( mimeType, data, id )
}
log( 'ClusterWriter.append', compressible, clusterNum, blobNum, data.length, id )
@ -458,28 +454,29 @@ var ClusterWriter = {
// <nth Cluster> integer (n-1)*8 8 pointer to the <nth Cluster>
// ... integer ... 8 ...
storeIndex: function () {
return saveIndex (
`
SELECT
offset
FROM clusters
ORDER BY id
;
`,
8, 'offset', header.clusterCount, 'storeClusterIndex'
)
storeIndex: async function () {
return saveIndex ({
query:
'SELECT ' +
'offset ' +
'FROM clusters ' +
'ORDER BY id ' +
';',
byteLength: 8,
rowField: 'offset',
count: header.clusterCount,
logInfo: 'storeClusterIndex',
})
.then( offset => header.clusterPtrPos = offset )
},
finish: function () {
finish: async function () {
//~ log( 'ClusterWriter.finish', ClusterWriter )
return ClusterWriter[ true ].save() // save last compressible cluster
.then( () => ClusterWriter[ false ].save()) // save last uncompressible cluster
.then( () => ClusterWriter.pool.drain())
.then( () => ClusterWriter.pool.clear())
.then( () => ClusterWriter.storeIndex())
await ClusterWriter[ true ].save() // save last compressible cluster
await ClusterWriter[ false ].save() // save last uncompressible cluster
await ClusterWriter.pool.drain()
await ClusterWriter.pool.clear()
return ClusterWriter.storeIndex()
},
}
@ -530,8 +527,7 @@ class Item {
saveItemIndex () {
if ( ! this.path ) {
console.trace( 'Item no url', this )
process.exit( 1 )
fatal( 'Item no url', this )
}
const row = [
@ -564,11 +560,10 @@ class Item {
// title string n/a zero terminated string with an title as refered in the Title pointer list or empty; in case it is empty, the URL is used as title
// parameter data see parameter len (not used) extra parameters
storeDirEntry ( clusterIdx, blobIdx, redirectTarget ) {
async storeDirEntry ( clusterIdx, blobIdx, redirectTarget ) {
if ( clusterIdx == null ) {
console.error( 'storeDirEntry error: clusterIdx == null', this )
pprocess.exit( 1 )
return Promise.resolve()
fatal( 'storeDirEntry error: clusterIdx == null', this )
return
}
header.articleCount++
@ -589,30 +584,25 @@ class Item {
var urlBuf = Buffer.from( this.path + '\0' )
var titleBuf = Buffer.from( this.title + '\0' )
return out.write( Buffer.concat([ buf, urlBuf, titleBuf ]))
.then( offset => {
log( 'storeDirEntry done', offset, buf.length, this.path )
return this.dirEntry = offset
})
.then( offset => this.saveDirEntryIndex( offset ))
this.dirEntry = await out.write( Buffer.concat([ buf, urlBuf, titleBuf ]))
log( 'storeDirEntry done', this.dirEntry, buf.length, this.path )
return this.saveDirEntryIndex( this.dirEntry )
}
saveDirEntryIndex ( offset ) {
return this.getId()
.then( id => {
async saveDirEntryIndex ( offset ) {
const id = await this.getId()
try {
log( 'saveDirEntryIndex', id, offset, this.path )
return indexerDb.run(
return await indexerDb.run(
'INSERT INTO dirEntries (id, offset) VALUES (?,?)',
[
id,
offset,
]
)
.catch( err => {
console.error( 'saveDirEntryIndex error', err, this )
process.exit( 1 )
})
})
} catch ( err ) {
fatal( 'saveDirEntryIndex error', err, this )
}
}
}
@ -686,9 +676,8 @@ class Redirect extends Item {
return this.saveRedirectIndex()
}
saveRedirectIndex () {
return this.getId()
.then( id => {
async saveRedirectIndex () {
const id = await this.getId()
return indexerDb.run(
'INSERT INTO redirects (id, targetKey, fragment) VALUES (?,?,?)',
[
@ -697,7 +686,6 @@ class Redirect extends Item {
this.target.fragment,
]
)
})
}
}
@ -743,31 +731,28 @@ class DataItem extends Item {
super( params )
}
process () {
async process () {
//~ log( 'DataItem process', this.path )
return this.store()
.then( () => super.process())
.catch( err => {
try {
await this.store()
await super.process()
} catch ( err ) {
if ( err instanceof NoProcessingRequired )
return
console.error( 'Item process error', this.path, err )
process.exit( 1 )
})
fatal( 'Item process error', this.path, err )
}
}
store () {
return this.getData()
.then( data => {
async store () {
let data = await this.getData()
if ( data == null ) {
console.error( 'DataItem.store error: data == null', this )
process.exit( 1 )
fatal( 'DataItem.store error: data == null', this )
}
if ( !( data instanceof Buffer )) {
data = Buffer.from( data )
}
return ClusterWriter.append( this.mimeType, data, this.path )
.then( ([ clusterIdx, blobIdx ]) => Object.assign( this, { clusterIdx, blobIdx }))
})
const [ clusterIdx, blobIdx ] = await ClusterWriter.append( this.mimeType, data, this.path )
Object.assign( this, { clusterIdx, blobIdx })
}
getData () {
@ -812,39 +797,37 @@ class File extends DataItem {
}
}
processJpeg ( data ) {
async processJpeg ( data ) {
if ( ! argv.optimg )
return data
this.mimeType = 'image/jpeg'
return spawn(
try {
return await spawn(
mozjpeg,
[ '-quality', argv.jpegquality, data.length < 20000 ? '-baseline' : '-progressive' ],
data
)
.catch( err => {
} catch ( err ) {
log( 'Error otimizing jpeg', err, this )
return data
})
}
}
processImage ( data ) {
async processImage ( data ) {
if ( ! argv.optimg )
return data
return Promise.coroutine( function* () {
try {
const image = sharp( data )
const metadata = yield image.metadata()
if ( metadata.format == 'gif' && isAnimated( data ))
const metadata = await image.metadata()
if ( metadata.format == 'gif' && isAnimated( data )) {
return data
}
if ( metadata.hasAlpha && metadata.channels == 1 ) {
log( 'metadata.channels == 1', this.path )
} else if ( metadata.hasAlpha && metadata.channels > 1 ) {
if ( data.length > 20000 ) {
// Is this rather opaque?
const alpha = yield image
const alpha = await image
.clone()
.extractChannel( metadata.channels - 1 )
.raw()
@ -856,7 +839,7 @@ class File extends DataItem {
if ( isOpaque ) { // convert to JPEG
log( 'isOpaque', this.path )
if ( metadata.format == 'gif' )
data = yield image.toBuffer()
data = await image.toBuffer()
return this.processJpeg ( data )
}
}
@ -864,11 +847,10 @@ class File extends DataItem {
if ( metadata.format == 'gif' )
return data
return image.toBuffer()
}).call( this )
.catch( err => {
} catch ( err ) {
log( 'Error otimizing image', err, this )
return data
})
}
}
}
@ -890,18 +872,16 @@ class RawFile extends File {
return fullPath( this.path )
}
preProcess ( data ) {
return Promise.coroutine( function* () {
async preProcess ( data ) {
if ( ! this.mimeType ) {
this.mimeType = yield mimeFromData( data )
this.mimeType = await mimeFromData( data )
this.nameSpace = this.nameSpace || getNameSpace( this.mimeType )
}
if ( argv.inflateHtml && this.mimeType == 'text/html' )
data = yield zlib.inflate( data ) // inflateData
return data
}).call( this )
.then( data => this.preProcessHtml( data ))
.then( data => super.preProcess( data ))
if ( argv.inflateHtml && this.mimeType == 'text/html' ) {
data = await zlib.inflate( data ) // inflateData
}
await this.preProcessHtml( data )
return super.preProcess( data )
}
preProcessHtml ( data ) {
@ -1018,7 +998,7 @@ class RawFile extends File {
//
// Favicon a favicon (48x48) is also mandatory and should be located at /-/favicon
function loadMetadata () {
function fillInMetadata () {
const outParsed = osPath.parse( outPath )
const metadata = [
[ 'Name', outParsed.base ],
@ -1066,11 +1046,8 @@ function loadMetadata () {
return Promise.all( done )
}
function openMetadata( dbName ) {
return Promise.resolve()
.then( () => sqlite.open( dbName ))
.then( db => {
indexerDb = db
async function openMetadata( dbName ) {
indexerDb = await sqlite.open( dbName )
return indexerDb.exec(`
PRAGMA synchronous = OFF;
PRAGMA journal_mode = WAL;
@ -1093,20 +1070,18 @@ function openMetadata( dbName ) {
`
)
}
)
}
function newMetadata() {
async function newMetadata() {
var dbName = ''
if ( argv.verbose ) {
var parsed = osPath.parse( outPath )
dbName = osPath.join( parsed.dir, parsed.base + '.db' )
}
return fs.unlink( dbName )
.catch( () => null )
.then( () => sqlite.open( dbName ))
.then( db => {
indexerDb = db
try {
await fs.unlink( dbName )
} catch ( err ) {
}
indexerDb = await sqlite.open( dbName )
return indexerDb.exec(
'PRAGMA synchronous = OFF;' +
'PRAGMA journal_mode = OFF;' +
@ -1135,8 +1110,6 @@ function newMetadata() {
');'
)
}
)
}
function sortArticles () {
return indexerDb.exec(`
@ -1156,14 +1129,14 @@ function sortArticles () {
)
}
function loadRedirects () {
async function loadRedirects () {
var redirectsFile
if ( preProcessed )
redirectsFile = osPath.join( srcPath, 'redirects.csv' )
else if ( argv.redirects )
redirectsFile = expandHomeDir( argv.redirects )
else
return Promise.resolve()
return
const getRow = cvsReader( redirectsFile, {
columns:[ 'nameSpace', 'path', 'title', 'to' ],
@ -1171,24 +1144,15 @@ function loadRedirects () {
relax_column_count: true
})
return Promise.coroutine( function* () {
while ( true ) {
const row = yield getRow()
let row
while ( row = await getRow() ) {
log( 'loadRedirects', row )
if ( row ) {
yield new Redirect( row )
.process()
} else {
return
await new Redirect( row ).process()
}
}
}) ()
}
function resolveRedirects () {
return Promise.coroutine( function* () {
var stmt = yield indexerDb.prepare( `
async function resolveRedirects () {
var stmt = await indexerDb.prepare( `
SELECT
src.id AS id,
src.urlKey AS urlKey,
@ -1217,52 +1181,41 @@ function resolveRedirects () {
USING (id)
WHERE targetId IS NOT NULL
;`)
while ( true ) {
const row = yield stmt.get()
if ( ! row ) {
break
}
let row
while ( row = await stmt.get() ) {
var nameSpace = row.urlKey[ 0 ]
var path = row.urlKey.substr( 1 )
var title = ( row.titleKey == row.urlKey ) ? '' : row.titleKey.substr( 1 )
var target = row.targetRow - 1
if ( path == 'mainpage' ) {
mainPage.target = target
}
yield new ResolvedRedirect ( row.id, nameSpace, path, title, target, row.revision )
await new ResolvedRedirect ( row.id, nameSpace, path, title, target, row.revision )
.process()
}
yield stmt.finalize()
}) ()
return stmt.finalize()
}
function saveIndex ( query, byteLength, rowField, count, logInfo ) {
logInfo = logInfo || 'saveIndex'
log( logInfo, 'start', count )
async function saveIndex ( params ) {
const logInfo = params.logInfo || 'saveIndex'
log( logInfo, 'start', params.count )
return Promise.coroutine( function* () {
var startOffset
var stmt = await indexerDb.prepare( params.query )
var i = 0
var stmt = yield indexerDb.prepare( query )
while ( true ) {
const row = yield stmt.get()
if ( ! row )
break
for ( let row; row = await stmt.get(); i++ ) {
log( logInfo, i, row )
i++
var buf = Buffer.allocUnsafe( byteLength )
writeUIntLE( buf, row[ rowField ], 0, byteLength )
if ( params.rowCb )
params.rowCb( row, i )
var buf = Buffer.allocUnsafe( params.byteLength )
writeUIntLE( buf, row[ params.rowField ], 0, params.byteLength )
var offset = yield out.write( buf )
var offset = await out.write( buf )
if ( ! startOffset )
startOffset = offset
}
yield stmt.finalize()
await stmt.finalize()
log( logInfo, 'done', i, count, startOffset )
return Promise.resolve( startOffset )
}) ()
log( logInfo, 'done', i, params.count, startOffset )
return startOffset
}
// URL Pointer List (urlPtrPos)
@ -1278,21 +1231,28 @@ function saveIndex ( query, byteLength, rowField, count, logInfo ) {
// <nth URL> integer (n-1)*8 8 pointer to the directory entry of <nth URL>
// ... integer ... 8 ...
function storeUrlIndex () {
return saveIndex (`
SELECT
urlSorted.rowid,
id,
urlKey,
offset
FROM urlSorted
LEFT OUTER JOIN dirEntries
USING (id)
ORDER BY urlSorted.rowid
;`,
8, 'offset', header.articleCount, 'storeUrlIndex'
)
.then( offset => header.urlPtrPos = offset )
async function storeUrlIndex () {
header.urlPtrPos = await saveIndex ({
query:
'SELECT ' +
'urlSorted.rowid, ' +
'id, ' +
'urlKey, ' +
'offset ' +
'FROM urlSorted ' +
'LEFT OUTER JOIN dirEntries ' +
'USING (id) ' +
'ORDER BY urlSorted.rowid ' +
';',
byteLength: 8,
rowField: 'offset',
count: header.articleCount,
logInfo: 'storeUrlIndex',
rowCb: (row, index) => {
if ( row.urlKey == mainPage.urlKey )
mainPage.index = index
}
})
}
// Title Pointer List (titlePtrPos)
@ -1304,8 +1264,9 @@ function storeUrlIndex () {
// <nth Title> integer (n-1)*4 4 pointer to the URL pointer of <nth Title>
// ... integer ... 4 ...
function storeTitleIndex () {
return saveIndex (
async function storeTitleIndex () {
header.titlePtrPos = await saveIndex ({
query:
'SELECT ' +
'titleKey, ' +
'urlSorted.rowid - 1 AS articleNumber ' +
@ -1314,9 +1275,11 @@ function storeTitleIndex () {
'USING (id) ' +
'ORDER BY titleKey ' +
';',
4, 'articleNumber', header.articleCount, 'storeTitleIndex'
)
.then( offset => header.titlePtrPos = offset )
byteLength: 4,
rowField: 'articleNumber',
count: header.articleCount,
logInfo: 'storeTitleIndex',
})
}
// MIME Type List (mimeListPos)
@ -1335,14 +1298,13 @@ function getMimeTypes () {
log( 'MimeTypes', mimeTypeList.length, buf.length )
if ( buf.length > maxMimeLength ) {
console.error( 'Error: mime type list length >', maxMimeLength )
process.exit( 1 )
fatal( 'Error: mime type list length >', maxMimeLength )
}
return buf
}
function getHeader () {
header.mainPage = mainPage.target || header.mainPage
header.mainPage = mainPage.index || header.mainPage
//~ log( 'Header', 'articleCount', header.articleCount, 'clusterCount', header.clusterCount, 'mainPage', mainPage )
log( 'Header', header )
@ -1370,68 +1332,56 @@ function getHeader () {
return buf
}
function storeHeader() {
async function storeHeader() {
var buf = Buffer.concat([ getHeader(), getMimeTypes() ])
var fd = fs.openSync( outPath, 'r+' )
fs.writeSync( fd, buf, 0, buf.length, 0 )
fs.closeSync( fd )
return Promise.resolve()
var fd = await fs.open( outPath, 'r+' )
await fs.writeSync( fd, buf, 0, buf.length, 0 )
return fs.close( fd )
}
function calculateFileHash () {
var outHash
var hash = crypto.createHash( 'md5' )
var stream = fs.createReadStream( outPath )
var resolve
stream.on( 'data', data => hash.update( data ))
stream.on( 'end', () => {
outHash = hash.digest()
log( 'outHash', outHash )
fs.appendFileSync( outPath, outHash )
resolve()
})
return new Promise( r => resolve = r )
return new Promise( (resolve, reject ) => stream.on( 'end', async () => {
outHash = hash.digest()
await fs.appendFile( outPath, outHash )
log( 'outHash', outHash )
resolve()
}))
}
function initialise () {
var stat = fs.statSync( srcPath ) // check source
async function initialise () {
var stat = await fs.stat( srcPath )
if ( ! stat.isDirectory() ) {
return Promise.reject( new Error( srcPath + ' is not a directory' ))
throw new Error( srcPath + ' is not a directory' )
}
out = new Writer( outPath ); // create output file
log( 'reserving space for header and mime type list' )
out.write( Buffer.alloc( headerLength + maxMimeLength ))
await out.write( Buffer.alloc( headerLength + maxMimeLength ))
var metadata = osPath.join( srcPath, 'metadata.db' )
if ( fs.existsSync( metadata )) {
if ( await fs.exists( metadata )) {
preProcessed = true
return openMetadata( metadata )
.then( () => loadMimeTypes())
try {
mainPage.urlKey = await fs.readFile( osPath.join( srcPath, 'mainpage' ))
} catch ( err ) {
}
await openMetadata( metadata )
return loadMimeTypes()
} else {
await newMetadata()
return fillInMetadata()
}
return newMetadata()
.then( () => loadMetadata())
}
function rawLoader () {
async function rawLoader () {
const dirs = [ '' ]
function scanDirectories ( path ) {
return Promise.coroutine( function* () {
for ( let path; ( path = dirs.shift()) != null; ) {
log( 'scanDirectory', path )
yield Promise.map(
fs.readdir( fullPath( path )),
fname => parseDirEntry( osPath.join( path, fname )),
{ concurrency: 8 }
)
}
}) ()
}
function parseDirEntry ( path ) {
if ( path == 'metadata.csv' || path == 'redirects.csv' )
return Promise.resolve()
@ -1448,13 +1398,22 @@ function rawLoader () {
}
log( 'rawLoader start' )
return scanDirectories()
.then( () => log( 'rawLoader finished !!!!!!!!!' ))
// scan Directories
for ( let path; ( path = dirs.shift()) != null; ) {
log( 'scanDirectory', path )
await Promise.map(
fs.readdir( fullPath( path )),
fname => parseDirEntry( osPath.join( path, fname )),
{ concurrency: 8 }
)
}
function loadPreProcessedArticles () {
return Promise.coroutine( function* () {
var stmt = yield indexerDb.prepare( `
log( 'rawLoader finished !!!!!!!!!' )
}
async function loadPreProcessedArticles () {
var stmt = await indexerDb.prepare( `
SELECT
id ,
mimeId ,
@ -1465,14 +1424,14 @@ function loadPreProcessedArticles () {
WHERE mimeId IS NOT 0xffff
;`)
while ( true ) {
const row = yield stmt.get()
const row = await stmt.get()
if ( ! row ) {
break
}
var nameSpace = row.urlKey[ 0 ]
var path = row.urlKey.substr( 1 )
var title = ( row.titleKey == row.urlKey ) ? '' : row.titleKey.substr( 1 )
yield new File( {
await new File( {
nameSpace,
path,
title,
@ -1482,13 +1441,11 @@ function loadPreProcessedArticles () {
} )
.process()
}
yield stmt.finalize()
}) ()
return stmt.finalize()
}
function loadMimeTypes () {
return Promise.coroutine( function * () {
var stmt = yield indexerDb.prepare( `
async function loadMimeTypes () {
var stmt = await indexerDb.prepare( `
SELECT
id ,
value
@ -1496,48 +1453,40 @@ function loadMimeTypes () {
ORDER BY id
;`)
while ( true ) {
const row = yield stmt.get()
const row = await stmt.get()
if ( ! row ) {
break
}
mimeTypeList.push( row.value )
}
yield stmt.finalize()
}) ()
return stmt.finalize()
}
function loadRawArticles () {
return Promise.resolve()
.then( () => rawLoader())
.then( () => loadRedirects())
async function loadRawArticles () {
await rawLoader()
return loadRedirects()
}
function postProcess () {
return Promise.coroutine( function* () {
yield ClusterWriter.finish()
yield sortArticles()
yield resolveRedirects()
yield storeUrlIndex()
yield storeTitleIndex()
}) ()
async function postProcess () {
await ClusterWriter.finish()
await sortArticles()
await resolveRedirects()
await storeUrlIndex()
return storeTitleIndex()
}
function finalise () {
return Promise.coroutine( function* () {
header.checksumPos = yield out.close() // close the output stream
yield indexerDb.close()
yield storeHeader()
yield calculateFileHash()
}) ()
async function finalise () {
header.checksumPos = await out.close() // close the output stream
await indexerDb.close()
await storeHeader()
return calculateFileHash()
}
function core () {
return Promise.coroutine( function* () {
yield initialise()
yield preProcessed ? loadPreProcessedArticles() : loadRawArticles()
yield postProcess()
yield finalise()
}) ()
async function core () {
await initialise()
await ( preProcessed ? loadPreProcessedArticles() : loadRawArticles() )
await postProcess()
await finalise()
}
// Mandatory arguments:
@ -1607,12 +1556,8 @@ function main () {
outPath = parsed.base + '.zim'
}
if ( argv.minChunkSize ) {
ClusterSizeThreshold = argv.minChunkSize * 1024
}
//~ mainPage = {
//~ title: argv.welcome
//~ if ( argv.minChunkSize ) {
//~ ClusterSizeThreshold = argv.minChunkSize * 1024
//~ }
core ()