pooledRequest fix

This commit is contained in:
v 2018-09-03 17:42:39 +03:00
parent d05b6c16c5
commit 5c0178b2f2

View File

@ -136,78 +136,74 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) {
} }
pause ( query ) { pause ( query ) {
if ( this.timer ) { clearTimeout( this.timer )
clearTimeout( this.timer ) this.timer = null
}
if ( this.supressTimer ) { clearTimeout( this.supressTimer )
clearTimeout( this.supressTimer )
}
this.supressTimer = setTimeout( this.supressTimer = setTimeout(
() => ( this.supressTimer = false, this.reshedule()), () => ( this.supressTimer = false, this.reshedule()),
query.retries * this.supressTimeout query.retries * this.supressTimeout
) )
if ( ! query.external ) {
this.interval = this.interval * 2
}
}
release () {
this.tokenCounter --
this.run()
} }
retry ( query, error ) { retry ( query, error ) {
this.pause( query ) const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode :
if ( ++ query.retries <= ( query.external ? retryExternal : retryLimit )) { error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false
this.queue.push( query ) const maxRetries = query.external ? retryExternal : retryLimit
} else { if ( ! retryCause || query.retries > maxRetries)
query.reject( error )
}
}
acquire () {
let query
if ( this.timer || this.supressTimer || this.tokenCounter >= maxTokens || ! ( query = this.queue.shift()))
return false return false
this.tokenCounter ++
return query if ( query.retries > maxRetries / 2 ) {
this.interval = this.interval * 2
}
query.retries ++
log( 'retry request', query.retries, this.interval, error.name, retryCause, error.options.uri || error.options.url ) // , query )
this.queue.push( query )
this.pause( query )
return true
} }
submit ( query ) { submit ( query ) {
this.tokenCounter ++
return request( query ) return request( query )
.catch( error => {
const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode :
error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false
if ( retryCause ) {
log( 'retry request', interval, error.name, retryCause, error.options.uri || error.options.url ) // , query )
this.retry( query, error )
return
}
warning( 'HTTP error', error.cause && error.cause.code || error.statusCode, error.options.uri || error.options.url ) // , error.error && error.error.toString().trim())
query.reject( error )
return
})
.then( reply => { .then( reply => {
this.release() this.tokenCounter --
return reply ? query.resolve( reply ) : query.reject( ) if ( reply )
query.resolve( reply )
else
query.reject( )
this.reshedule()
})
.catch( error => {
this.tokenCounter --
if ( ! this.retry( query, error )) {
warning( 'HTTP error', error.cause && error.cause.code || error.statusCode, error.options.uri || error.options.url )
query.reject( error )
this.reshedule()
}
}) })
} }
run () { run () {
let query = this.acquire() if ( this.timer || this.supressTimer || this.tokenCounter >= maxTokens )
return
const query = this.queue.shift()
if ( query ) { if ( query ) {
//~ if ( query.retries > 0 )
//~ debugger
this.submit( query ) this.submit( query )
this.reshedule() this.reshedule()
} }
} }
append ( query, priority ) { append ( query ) {
return new Promise(( resolve, reject ) => { return new Promise(( resolve, reject ) => {
query.resolve = resolve query.resolve = resolve
query.reject = reject query.reject = reject
query.retries = 0 query.retries = 0
if ( priority ) if ( query.priority )
this.queue.unshift( query ) this.queue.unshift( query )
else else
this.queue.push( query ) this.queue.push( query )
@ -244,14 +240,16 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) {
return query return query
} }
return function ( query, priority = false ) { return function ( query, queueId ) {
processOptions( query ) processOptions( query )
let queue = hostQueues[ query.host ] if ( ! queueId )
queueId = query.host
let queue = hostQueues[ queueId ]
if ( ! queue ) { if ( ! queue ) {
queue = new Queue queue = new Queue
hostQueues[ query.host ] = queue hostQueues[ queueId ] = queue
} }
return queue.append( query , priority ) return queue.append( query )
} }
} }