From 51713c80305aeb46d2810fad66eb97ea0a62e76a Mon Sep 17 00:00:00 2001 From: Dan Stillman Date: Tue, 13 Aug 2013 17:29:14 -0400 Subject: [PATCH] ConcurrentCaller updates --- resource/concurrent-caller.js | 91 ++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/resource/concurrent-caller.js b/resource/concurrent-caller.js index 66a92f534..b02824757 100644 --- a/resource/concurrent-caller.js +++ b/resource/concurrent-caller.js @@ -50,23 +50,28 @@ ConcurrentCaller = function (numConcurrent) { } this.stopOnError = false; + this.onError = null; - this.numConcurrent = numConcurrent; - this.numRunning = 0; - this.queue = []; - this.logger = null; - this.errorLogger = null; + this._numConcurrent = numConcurrent; + this._numRunning = 0; + this._queue = []; + this._logger = null; + this._interval = 0; +}; + + +/** + * Set the interval between the end of one function run and the beginning + * of another, in milliseconds + */ +ConcurrentCaller.prototype.setInterval = function (ms) { + this._interval = ms; }; ConcurrentCaller.prototype.setLogger = function (func) { - this.logger = func; -} - - -ConcurrentCaller.prototype.setErrorLogger = function (func) { - this.errorLogger = func; -} + this._logger = func; +}; /** @@ -79,47 +84,57 @@ ConcurrentCaller.prototype.fcall = function (func) { //this._log("Running fcall on function"); promises.push(this.fcall(func[i])); } - return this.stopOnError ? Q.all(promises) : Q.allSettled(promises); + return Q.allSettled(promises); } // If we're at the maximum number of concurrent functions, // queue this function for later - if (this.numRunning == this.numConcurrent) { - this._log("Already at " + this.numConcurrent + " -- queueing for later"); + if (this._numRunning == this._numConcurrent) { + this._log("Already at " + this._numConcurrent + " -- queueing for later"); var deferred = Q.defer(); - this.queue.push({ + this._queue.push({ func: Q.fbind(func), deferred: deferred }); return deferred.promise; } - this._log("Running function (" + this.numRunning + " current < " + this.numConcurrent + " max)"); + this._log("Running function (" + this._numRunning + " current < " + this._numConcurrent + " max)"); // Otherwise run it now - this.numRunning++; + this._numRunning++; return this._onFunctionDone(Q.fcall(func)); } +ConcurrentCaller.prototype.stop = function () { + self._log("Clearing queue"); + self._queue = []; +}; + + ConcurrentCaller.prototype._onFunctionDone = function (promise) { var self = this; return Q.when( promise, function (promise) { - self.numRunning--; + self._numRunning--; self._log("Done with function (" - + self.numRunning + "/" + self.numConcurrent + " running, " - + self.queue.length + " queued)"); + + self._numRunning + "/" + self._numConcurrent + " running, " + + self._queue.length + " queued)"); // If there's a function to call and we're under the concurrent limit, // run it now - let f = self.queue.shift(); - if (f && self.numRunning < self.numConcurrent) { - Q.delay(1) + let f = self._queue.shift(); + if (f && self._numRunning < self._numConcurrent) { + Q.delay(self._interval) .then(function () { - self.numRunning++; + self._log("Running new function (" + + self._numRunning + "/" + self._numConcurrent + " running, " + + self._queue.length + " queued)"); + + self._numRunning++; var p = self._onFunctionDone(f.func()); f.deferred.resolve(p); }); @@ -128,28 +143,28 @@ ConcurrentCaller.prototype._onFunctionDone = function (promise) { return promise; }, function (e) { - if (self.errorLogger) { - self.errorLogger(e); + self._numRunning--; + + self._log("Done with function (" + self._numRunning + "/" + self._numConcurrent + ", " + + self._queue.length + " in queue)"); + + if (self.onError) { + self.onError(e); } - self.numRunning--; - - self._log("Done with function (" + self.numRunning + "/" + self.numConcurrent + ", " - + self.queue.length + " in queue)"); - - if (self.stopOnError && self.queue.length) { + if (self.stopOnError && self._queue.length) { self._log("Stopping on error: " + e); - self.queue = []; + self._queue = []; } throw e; } ); -} +}; ConcurrentCaller.prototype._log = function (msg) { - if (this.logger) { - this.logger(msg); + if (this._logger) { + this._logger("[ConcurrentCaller] " + msg); } -} +};