From bad983389639cc923b622cde546e57d842e1db98 Mon Sep 17 00:00:00 2001 From: Dan Stillman Date: Wed, 11 Nov 2015 02:14:30 -0500 Subject: [PATCH] Resolve caller.start() promise when passed functions finish Instead of waiting for all queued functions to finish --- resource/concurrentCaller.js | 29 +++++----- test/tests/concurrentCallerTest.js | 85 ++++++++++++++++-------------- 2 files changed, 61 insertions(+), 53 deletions(-) diff --git a/resource/concurrentCaller.js b/resource/concurrentCaller.js index 0bfe6e6aa..e4155eb60 100644 --- a/resource/concurrentCaller.js +++ b/resource/concurrentCaller.js @@ -68,9 +68,9 @@ ConcurrentCaller = function (options = {}) { this.stopOnError = options.stopOnError || false; this.onError = options.onError || null; + this.numConcurrent = options.numConcurrent; this._id = options.id; - this._numConcurrent = options.numConcurrent; this._numRunning = 0; this._queue = []; this._logger = options.logger || null; @@ -105,8 +105,8 @@ ConcurrentCaller.prototype.pause = function (ms) { * Add a task to the queue without starting it * * @param {Function|Function[]} - One or more functions to run - * @return {Promise[]} - An array of promises for passed functions, resolved once the queue is - * empty + * @return {Promise[]} - An array of promises for passed functions, resolved once they have all + * finished (even if other functions are still running) */ ConcurrentCaller.prototype.add = function (func) { if (Array.isArray(func)) { @@ -114,7 +114,7 @@ ConcurrentCaller.prototype.add = function (func) { for (let i = 0; i < func.length; i++) { promises.push(this.start(func[i])); } - return this._deferred.promise.return(promises); + return Promise.settle(promises); } if (!this._deferred || !this._deferred.promise.isPending()) { @@ -126,20 +126,20 @@ ConcurrentCaller.prototype.add = function (func) { func: Promise.method(func), deferred: deferred }); - return this._deferred.promise.return(deferred.promise); + return deferred.promise; } /** * @param {Function|Function[]} - One or more functions to run - * @return {Promise[]} - An array of promises for passed functions, resolved once the queue is - * empty + * @return {Promise[]} - An array of promises for passed functions, resolved once they have all + * finished (even if other functions are still running) */ ConcurrentCaller.prototype.start = function (func) { var promise = this.add(func); var run = this._processNext(); if (!run) { - this._log("Already at " + this._numConcurrent + " -- queueing for later"); + this._log("Already at " + this.numConcurrent + " -- queueing for later"); } return promise; } @@ -187,7 +187,7 @@ ConcurrentCaller.prototype.stop = function () { ConcurrentCaller.prototype._processNext = function () { - if (this._numRunning >= this._numConcurrent) { + if (this._numRunning >= this.numConcurrent) { return false; } @@ -205,7 +205,7 @@ ConcurrentCaller.prototype._processNext = function () { } this._log("Running function (" - + this._numRunning + "/" + this._numConcurrent + " running, " + + this._numRunning + "/" + this.numConcurrent + " running, " + this._queue.length + " queued)"); this._numRunning++; @@ -213,7 +213,7 @@ ConcurrentCaller.prototype._processNext = function () { this._numRunning--; this._log("Done with function (" - + this._numRunning + "/" + this._numConcurrent + " running, " + + this._numRunning + "/" + this.numConcurrent + " running, " + this._queue.length + " queued)"); this._waitForPause().bind(this).then(function () { @@ -225,7 +225,7 @@ ConcurrentCaller.prototype._processNext = function () { .catch(function (e) { this._numRunning--; - this._log("Error in function (" + this._numRunning + "/" + this._numConcurrent + ", " + this._log("Error in function (" + this._numRunning + "/" + this.numConcurrent + ", " + this._queue.length + " in queue)"); if (this.onError) { @@ -234,7 +234,12 @@ ConcurrentCaller.prototype._processNext = function () { if (this.stopOnError && this._queue.length) { this._log("Stopping on error: " + e); + this._oldQueue = this._queue; this._queue = []; + for (let o of this._oldQueue) { + //this._log("Rejecting promise"); + o.deferred.reject(); + } } this._waitForPause().bind(this).then(function () { diff --git a/test/tests/concurrentCallerTest.js b/test/tests/concurrentCallerTest.js index d237bdf00..1cea84602 100644 --- a/test/tests/concurrentCallerTest.js +++ b/test/tests/concurrentCallerTest.js @@ -51,6 +51,7 @@ describe("ConcurrentCaller", function () { assert.equal(results.length, ids.length); assert.equal(running, 0); assert.equal(finished, ids.length); + assert.isFalse(failed); }) it("should add functions to existing queue and resolve when all are complete (waiting for earlier)", function* () { @@ -59,9 +60,9 @@ describe("ConcurrentCaller", function () { var finished = 0; var failed = false; - var ids1 = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm']; - var ids2 = ['n', 'o', 'p', 'q']; - var makeFunc = function (id) { + var ids1 = {"1": 5, "2": 10, "3": 7}; + var ids2 = {"4": 50, "5": 50}; + var makeFunc = function (id, delay) { return Zotero.Promise.coroutine(function* () { if (logger) { Zotero.debug("Running " + id); @@ -71,11 +72,7 @@ describe("ConcurrentCaller", function () { failed = true; throw new Error("Too many concurrent tasks"); } - var min = 10; - var max = 25; - yield Zotero.Promise.delay( - Math.floor(Math.random() * (max - min + 1)) + min - ); + yield Zotero.Promise.delay(delay); if (running > numConcurrent) { failed = true; throw new Error("Too many concurrent tasks"); @@ -88,24 +85,27 @@ describe("ConcurrentCaller", function () { return id; }); }; - var funcs1 = ids1.map(makeFunc) - var funcs2 = ids2.map(makeFunc) + var keys1 = Object.keys(ids1); + var keys2 = Object.keys(ids2); + var funcs1 = Object.keys(ids1).map(id => makeFunc(id, ids1[id])); + var funcs2 = Object.keys(ids2).map(id => makeFunc(id, ids2[id])); var caller = new ConcurrentCaller({ numConcurrent, logger }); var promise1 = caller.start(funcs1); - yield Zotero.Promise.delay(10); + yield Zotero.Promise.delay(1); var promise2 = caller.start(funcs2); var results1 = yield promise1; - assert.isTrue(promise2.isFulfilled()); - assert.equal(running, 0); - assert.equal(finished, ids1.length + ids2.length); - assert.equal(results1.length, ids1.length); - assert.sameMembers(results1.map(p => p.value()), ids1); + // Second set shouldn't be done yet + assert.isFalse(promise2.isFulfilled()); + assert.equal(finished, keys1.length); + assert.equal(results1.length, keys1.length); + assert.sameMembers(results1.map(p => p.value()), keys1); + assert.isFalse(failed); }) it("should add functions to existing queue and resolve when all are complete (waiting for later)", function* () { @@ -114,9 +114,9 @@ describe("ConcurrentCaller", function () { var finished = 0; var failed = false; - var ids1 = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm']; - var ids2 = ['n', 'o', 'p', 'q']; - var makeFunc = function (id) { + var ids1 = {"1": 50, "2": 15, "3": 60}; + var ids2 = {"4": 1, "5": 1}; + var makeFunc = function (id, delay) { return Zotero.Promise.coroutine(function* () { if (logger) { Zotero.debug("Running " + id); @@ -126,11 +126,7 @@ describe("ConcurrentCaller", function () { failed = true; throw new Error("Too many concurrent tasks"); } - var min = 10; - var max = 25; - yield Zotero.Promise.delay( - Math.floor(Math.random() * (max - min + 1)) + min - ); + yield Zotero.Promise.delay(delay); if (running > numConcurrent) { failed = true; throw new Error("Too many concurrent tasks"); @@ -143,8 +139,10 @@ describe("ConcurrentCaller", function () { return id; }); }; - var funcs1 = ids1.map(makeFunc) - var funcs2 = ids2.map(makeFunc) + var keys1 = Object.keys(ids1); + var keys2 = Object.keys(ids2); + var funcs1 = Object.keys(ids1).map(id => makeFunc(id, ids1[id])); + var funcs2 = Object.keys(ids2).map(id => makeFunc(id, ids2[id])); var caller = new ConcurrentCaller({ numConcurrent, @@ -156,10 +154,14 @@ describe("ConcurrentCaller", function () { var results2 = yield promise2; - assert.isTrue(promise1.isFulfilled()); - assert.equal(running, 0); - assert.equal(finished, ids1.length + ids2.length); - assert.equal(results2.length, ids2.length); + // The second set should finish before the first + assert.isFalse(promise1.isFulfilled()); + assert.equal(running, 1); // 3 should still be running + assert.equal(finished, 4); // 1, 2, 4, 5 + assert.equal(results2.length, keys2.length); + assert.equal(results2[0].value(), keys2[0]); + assert.equal(results2[1].value(), keys2[1]); + assert.isFalse(failed); }) it("should return a rejected promise if a single passed function fails", function* () { @@ -241,10 +243,10 @@ describe("ConcurrentCaller", function () { assert.isTrue(results1[0].isFulfilled()); // 'g' should be rejected assert.isTrue(results1[6].isRejected()); - // 'm' should be pending - assert.isTrue(results1[12].isPending()); - // All promises in second batch should be pending - assert.isTrue(promise2.value().every(p => p.isPending())); + // 'm' should be rejected + assert.isTrue(results1[12].isRejected()); + // All promises in second batch should be rejected + assert.isTrue(promise2.value().every(p => p.isRejected())); }) @@ -302,21 +304,22 @@ describe("ConcurrentCaller", function () { var promise1 = caller.start(funcs1); var promise2 = caller.start(funcs2); - var results1 = yield promise1; + var results2 = yield promise2; + assert.isTrue(promise1.isFulfilled()); assert.isTrue(promise2.isFulfilled()); assert.equal(running, 0); assert.equal(finished, ids1.length + ids2.length); - assert.equal(results1.length, ids1.length); - assert.equal(promise2.value().length, ids2.length); + assert.equal(promise1.value().length, ids1.length); + assert.equal(results2.length, ids2.length); // 'a' should be fulfilled - assert.isTrue(results1[0].isFulfilled()); + assert.isTrue(promise1.value()[0].isFulfilled()); // 'g' should be rejected - assert.isTrue(results1[6].isRejected()); + assert.isTrue(promise1.value()[6].isRejected()); // 'm' should be fulfilled - assert.isTrue(results1[12].isFulfilled()); + assert.isTrue(promise1.value()[12].isFulfilled()); // All promises in second batch should be fulfilled - assert.isTrue(promise2.value().every(p => p.isFulfilled())); + assert.isTrue(results2.every(p => p.isFulfilled())); }) })