diff --git a/wikizimmer.js b/wikizimmer.js index 84c9783..bbc5b2b 100755 --- a/wikizimmer.js +++ b/wikizimmer.js @@ -113,106 +113,107 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) { const retryLimit = 10 const retryExternal = command.retryExternal == null ? retryLimit : command.retryExternal const requestTimeout = 5 * 60 * 1000 - const queue = [] const refHost = urlconv.parse( referenceUri ).host - let timer = null - let supressTimer = null - let supressTimeout = 60 * 1000 - let tokenCounter = 0 + const hostQueues = {} - function setTimer () { - if ( supressTimer ) - return - timer = setTimeout( - () => ( timer = null, schedule() ), - interval - ) - } - - function pause ( query ) { - if ( timer ) { - clearTimeout( timer ) + class Queue { + constructor () { + this.queue = [] + this.timer = null + this.supressTimer = null + this.supressTimeout = 60 * 1000 + this.tokenCounter = 0 + this.interval = interval } - if ( supressTimer ) { - clearTimeout( supressTimer ) - } - supressTimer = setTimeout( - () => ( supressTimer = false, setTimer()), - query.retries * supressTimeout - ) - if ( ! query.external ) { - interval = interval * 4 - } - } - function release () { - tokenCounter -- - schedule() - } - - function retry ( query, error ) { - pause( query ) - if ( ++ query.retries <= ( query.external ? retryExternal : retryLimit )) { - queue.push( query ) - return - } - query.reject( error ) - return - } - - function acquire () { - let query - if ( timer || supressTimer || tokenCounter >= maxTokens || ! ( query = queue.shift())) - return false - tokenCounter ++ - return { - query, - release, - retry, - } - } - - function schedule () { - let token = acquire() - if ( ! token ) - return - runRequest( token ) - setTimer() - } - - function runRequest( token ) { - let query = token.query - return request( query ) - .catch( error => { - const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode : - error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false - if ( retryCause ) { - log( 'retry request', interval, error.name, retryCause, query ) - token.retry( query, error ) + reshedule () { + if ( this.supressTimer ) return + this.timer = setTimeout( + () => ( this.timer = null, this.run() ), + this.interval + ) + } + + pause ( query ) { + if ( this.timer ) { + clearTimeout( this.timer ) } - query.reject( error ) - return - }) - .then( reply => { - token.release() - return reply ? query.resolve( reply ) : null - }) - } + if ( this.supressTimer ) { + clearTimeout( this.supressTimer ) + } + this.supressTimer = setTimeout( + () => ( this.supressTimer = false, this.reshedule()), + query.retries * this.supressTimeout + ) + if ( ! query.external ) { + this.interval = this.interval * 2 + } + } - function append ( query, priority ) { - return new Promise(( resolve, reject ) => { - query.resolve = resolve - query.reject = reject - query.retries = 0 + release () { + this.tokenCounter -- + this.run() + } - if ( priority ) - queue.unshift( query ) - else - queue.push( query ) + retry ( query, error ) { + this.pause( query ) + if ( ++ query.retries <= ( query.external ? retryExternal : retryLimit )) { + this.queue.push( query ) + } else { + query.reject( error ) + } + } - schedule() - }) + acquire () { + let query + if ( this.timer || this.supressTimer || this.tokenCounter >= maxTokens || ! ( query = this.queue.shift())) + return false + this.tokenCounter ++ + return query + } + + submit ( query ) { + return request( query ) + .catch( error => { + const retryCause = retryStatusCodes.includes( error.statusCode ) ? error.statusCode : + error.cause && retryErrorCodes.includes( error.cause.code ) ? error.cause.code : false + if ( retryCause ) { + log( 'retry request', interval, error.name, retryCause, query ) + this.retry( query, error ) + return + } + query.reject( error ) + return + }) + .then( reply => { + this.release() + return reply ? query.resolve( reply ) : query.reject( ) + }) + } + + run () { + let query = this.acquire() + if ( query ) { + this.submit( query ) + this.reshedule() + } + } + + append ( query, priority ) { + return new Promise(( resolve, reject ) => { + query.resolve = resolve + query.reject = reject + query.retries = 0 + + if ( priority ) + this.queue.unshift( query ) + else + this.queue.push( query ) + + this.run() + }) + } } function processOptions ( query ) { @@ -226,8 +227,8 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) { delete query.uri } query.url = urlconv.resolve( referenceUri, url ) - const queryHost = urlconv.parse( query.url ).host - query.external = queryHost != refHost + query.host = urlconv.parse( query.url ).host + query.external = query.host != refHost if ( ! query.headers ) query.headers = {} @@ -243,7 +244,13 @@ function pooledRequest( request, referenceUri, maxTokens = 1, interval = 10 ) { } return function ( query, priority = false ) { - return append( processOptions( query ), priority ) + processOptions( query ) + let queue = hostQueues[ query.host ] + if ( ! queue ) { + queue = new Queue + hostQueues[ query.host ] = queue + } + return queue.append( query , priority ) } }