external data -> separate queues

This commit is contained in:
v 2018-07-17 17:49:30 +03:00
parent 8a7c2ed5b2
commit a3990075c4

View File

@ -113,106 +113,107 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) {
const retryLimit = 10 const retryLimit = 10
const retryExternal = command.retryExternal == null ? retryLimit : command.retryExternal const retryExternal = command.retryExternal == null ? retryLimit : command.retryExternal
const requestTimeout = 5 * 60 * 1000 const requestTimeout = 5 * 60 * 1000
const queue = []
const refHost = urlconv.parse( referenceUri ).host const refHost = urlconv.parse( referenceUri ).host
let timer = null const hostQueues = {}
let supressTimer = null
let supressTimeout = 60 * 1000
let tokenCounter = 0
function setTimer () { class Queue {
if ( supressTimer ) constructor () {
return this.queue = []
timer = setTimeout( this.timer = null
() => ( timer = null, schedule() ), this.supressTimer = null
interval this.supressTimeout = 60 * 1000
) this.tokenCounter = 0
} this.interval = interval
function pause ( query ) {
if ( timer ) {
clearTimeout( timer )
} }
if ( supressTimer ) {
clearTimeout( supressTimer )
}
supressTimer = setTimeout(
() => ( supressTimer = false, setTimer()),
query.retries * supressTimeout
)
if ( ! query.external ) {
interval = interval * 4
}
}
function release () { reshedule () {
tokenCounter -- if ( this.supressTimer )
schedule()
}
function retry ( query, error ) {
pause( query )
if ( ++ query.retries <= ( query.external ? retryExternal : retryLimit )) {
queue.push( query )
return
}
query.reject( error )
return
}
function acquire () {
let query
if ( timer || supressTimer || tokenCounter >= maxTokens || ! ( query = queue.shift()))
return false
tokenCounter ++
return {
query,
release,
retry,
}
}
function schedule () {
let token = acquire()
if ( ! token )
return
runRequest( token )
setTimer()
}
function runRequest( token ) {
let query = token.query
return request( query )
.catch( error => {
const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode :
error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false
if ( retryCause ) {
log( 'retry request', interval, error.name, retryCause, query )
token.retry( query, error )
return return
this.timer = setTimeout(
() => ( this.timer = null, this.run() ),
this.interval
)
}
pause ( query ) {
if ( this.timer ) {
clearTimeout( this.timer )
} }
query.reject( error ) if ( this.supressTimer ) {
return clearTimeout( this.supressTimer )
}) }
.then( reply => { this.supressTimer = setTimeout(
token.release() () => ( this.supressTimer = false, this.reshedule()),
return reply ? query.resolve( reply ) : null query.retries * this.supressTimeout
}) )
} if ( ! query.external ) {
this.interval = this.interval * 2
}
}
function append ( query, priority ) { release () {
return new Promise(( resolve, reject ) => { this.tokenCounter --
query.resolve = resolve this.run()
query.reject = reject }
query.retries = 0
if ( priority ) retry ( query, error ) {
queue.unshift( query ) this.pause( query )
else if ( ++ query.retries <= ( query.external ? retryExternal : retryLimit )) {
queue.push( query ) this.queue.push( query )
} else {
query.reject( error )
}
}
schedule() acquire () {
}) let query
if ( this.timer || this.supressTimer || this.tokenCounter >= maxTokens || ! ( query = this.queue.shift()))
return false
this.tokenCounter ++
return query
}
submit ( query ) {
return request( query )
.catch( error => {
const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode :
error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false
if ( retryCause ) {
log( 'retry request', interval, error.name, retryCause, query )
this.retry( query, error )
return
}
query.reject( error )
return
})
.then( reply => {
this.release()
return reply ? query.resolve( reply ) : query.reject( )
})
}
run () {
let query = this.acquire()
if ( query ) {
this.submit( query )
this.reshedule()
}
}
append ( query, priority ) {
return new Promise(( resolve, reject ) => {
query.resolve = resolve
query.reject = reject
query.retries = 0
if ( priority )
this.queue.unshift( query )
else
this.queue.push( query )
this.run()
})
}
} }
function processOptions ( query ) { function processOptions ( query ) {
@ -226,8 +227,8 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) {
delete query.uri delete query.uri
} }
query.url = urlconv.resolve( referenceUri, url ) query.url = urlconv.resolve( referenceUri, url )
const queryHost = urlconv.parse( query.url ).host query.host = urlconv.parse( query.url ).host
query.external = queryHost != refHost query.external = query.host != refHost
if ( ! query.headers ) if ( ! query.headers )
query.headers = {} query.headers = {}
@ -243,7 +244,13 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) {
} }
return function ( query, priority = false ) { return function ( query, priority = false ) {
return append( processOptions( query ), priority ) processOptions( query )
let queue = hostQueues[ query.host ]
if ( ! queue ) {
queue = new Queue
hostQueues[ query.host ] = queue
}
return queue.append( query , priority )
} }
} }