Resolve caller.start() promise when passed functions finish
Instead of waiting for all queued functions to finish
This commit is contained in:
parent
790d6d3dd6
commit
bad9833896
|
@ -68,9 +68,9 @@ ConcurrentCaller = function (options = {}) {
|
||||||
|
|
||||||
this.stopOnError = options.stopOnError || false;
|
this.stopOnError = options.stopOnError || false;
|
||||||
this.onError = options.onError || null;
|
this.onError = options.onError || null;
|
||||||
|
this.numConcurrent = options.numConcurrent;
|
||||||
|
|
||||||
this._id = options.id;
|
this._id = options.id;
|
||||||
this._numConcurrent = options.numConcurrent;
|
|
||||||
this._numRunning = 0;
|
this._numRunning = 0;
|
||||||
this._queue = [];
|
this._queue = [];
|
||||||
this._logger = options.logger || null;
|
this._logger = options.logger || null;
|
||||||
|
@ -105,8 +105,8 @@ ConcurrentCaller.prototype.pause = function (ms) {
|
||||||
* Add a task to the queue without starting it
|
* Add a task to the queue without starting it
|
||||||
*
|
*
|
||||||
* @param {Function|Function[]} - One or more functions to run
|
* @param {Function|Function[]} - One or more functions to run
|
||||||
* @return {Promise[]} - An array of promises for passed functions, resolved once the queue is
|
* @return {Promise[]} - An array of promises for passed functions, resolved once they have all
|
||||||
* empty
|
* finished (even if other functions are still running)
|
||||||
*/
|
*/
|
||||||
ConcurrentCaller.prototype.add = function (func) {
|
ConcurrentCaller.prototype.add = function (func) {
|
||||||
if (Array.isArray(func)) {
|
if (Array.isArray(func)) {
|
||||||
|
@ -114,7 +114,7 @@ ConcurrentCaller.prototype.add = function (func) {
|
||||||
for (let i = 0; i < func.length; i++) {
|
for (let i = 0; i < func.length; i++) {
|
||||||
promises.push(this.start(func[i]));
|
promises.push(this.start(func[i]));
|
||||||
}
|
}
|
||||||
return this._deferred.promise.return(promises);
|
return Promise.settle(promises);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!this._deferred || !this._deferred.promise.isPending()) {
|
if (!this._deferred || !this._deferred.promise.isPending()) {
|
||||||
|
@ -126,20 +126,20 @@ ConcurrentCaller.prototype.add = function (func) {
|
||||||
func: Promise.method(func),
|
func: Promise.method(func),
|
||||||
deferred: deferred
|
deferred: deferred
|
||||||
});
|
});
|
||||||
return this._deferred.promise.return(deferred.promise);
|
return deferred.promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Function|Function[]} - One or more functions to run
|
* @param {Function|Function[]} - One or more functions to run
|
||||||
* @return {Promise[]} - An array of promises for passed functions, resolved once the queue is
|
* @return {Promise[]} - An array of promises for passed functions, resolved once they have all
|
||||||
* empty
|
* finished (even if other functions are still running)
|
||||||
*/
|
*/
|
||||||
ConcurrentCaller.prototype.start = function (func) {
|
ConcurrentCaller.prototype.start = function (func) {
|
||||||
var promise = this.add(func);
|
var promise = this.add(func);
|
||||||
var run = this._processNext();
|
var run = this._processNext();
|
||||||
if (!run) {
|
if (!run) {
|
||||||
this._log("Already at " + this._numConcurrent + " -- queueing for later");
|
this._log("Already at " + this.numConcurrent + " -- queueing for later");
|
||||||
}
|
}
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ ConcurrentCaller.prototype.stop = function () {
|
||||||
|
|
||||||
|
|
||||||
ConcurrentCaller.prototype._processNext = function () {
|
ConcurrentCaller.prototype._processNext = function () {
|
||||||
if (this._numRunning >= this._numConcurrent) {
|
if (this._numRunning >= this.numConcurrent) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ ConcurrentCaller.prototype._processNext = function () {
|
||||||
}
|
}
|
||||||
|
|
||||||
this._log("Running function ("
|
this._log("Running function ("
|
||||||
+ this._numRunning + "/" + this._numConcurrent + " running, "
|
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
||||||
+ this._queue.length + " queued)");
|
+ this._queue.length + " queued)");
|
||||||
|
|
||||||
this._numRunning++;
|
this._numRunning++;
|
||||||
|
@ -213,7 +213,7 @@ ConcurrentCaller.prototype._processNext = function () {
|
||||||
this._numRunning--;
|
this._numRunning--;
|
||||||
|
|
||||||
this._log("Done with function ("
|
this._log("Done with function ("
|
||||||
+ this._numRunning + "/" + this._numConcurrent + " running, "
|
+ this._numRunning + "/" + this.numConcurrent + " running, "
|
||||||
+ this._queue.length + " queued)");
|
+ this._queue.length + " queued)");
|
||||||
|
|
||||||
this._waitForPause().bind(this).then(function () {
|
this._waitForPause().bind(this).then(function () {
|
||||||
|
@ -225,7 +225,7 @@ ConcurrentCaller.prototype._processNext = function () {
|
||||||
.catch(function (e) {
|
.catch(function (e) {
|
||||||
this._numRunning--;
|
this._numRunning--;
|
||||||
|
|
||||||
this._log("Error in function (" + this._numRunning + "/" + this._numConcurrent + ", "
|
this._log("Error in function (" + this._numRunning + "/" + this.numConcurrent + ", "
|
||||||
+ this._queue.length + " in queue)");
|
+ this._queue.length + " in queue)");
|
||||||
|
|
||||||
if (this.onError) {
|
if (this.onError) {
|
||||||
|
@ -234,7 +234,12 @@ ConcurrentCaller.prototype._processNext = function () {
|
||||||
|
|
||||||
if (this.stopOnError && this._queue.length) {
|
if (this.stopOnError && this._queue.length) {
|
||||||
this._log("Stopping on error: " + e);
|
this._log("Stopping on error: " + e);
|
||||||
|
this._oldQueue = this._queue;
|
||||||
this._queue = [];
|
this._queue = [];
|
||||||
|
for (let o of this._oldQueue) {
|
||||||
|
//this._log("Rejecting promise");
|
||||||
|
o.deferred.reject();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this._waitForPause().bind(this).then(function () {
|
this._waitForPause().bind(this).then(function () {
|
||||||
|
|
|
@ -51,6 +51,7 @@ describe("ConcurrentCaller", function () {
|
||||||
assert.equal(results.length, ids.length);
|
assert.equal(results.length, ids.length);
|
||||||
assert.equal(running, 0);
|
assert.equal(running, 0);
|
||||||
assert.equal(finished, ids.length);
|
assert.equal(finished, ids.length);
|
||||||
|
assert.isFalse(failed);
|
||||||
})
|
})
|
||||||
|
|
||||||
it("should add functions to existing queue and resolve when all are complete (waiting for earlier)", function* () {
|
it("should add functions to existing queue and resolve when all are complete (waiting for earlier)", function* () {
|
||||||
|
@ -59,9 +60,9 @@ describe("ConcurrentCaller", function () {
|
||||||
var finished = 0;
|
var finished = 0;
|
||||||
var failed = false;
|
var failed = false;
|
||||||
|
|
||||||
var ids1 = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'];
|
var ids1 = {"1": 5, "2": 10, "3": 7};
|
||||||
var ids2 = ['n', 'o', 'p', 'q'];
|
var ids2 = {"4": 50, "5": 50};
|
||||||
var makeFunc = function (id) {
|
var makeFunc = function (id, delay) {
|
||||||
return Zotero.Promise.coroutine(function* () {
|
return Zotero.Promise.coroutine(function* () {
|
||||||
if (logger) {
|
if (logger) {
|
||||||
Zotero.debug("Running " + id);
|
Zotero.debug("Running " + id);
|
||||||
|
@ -71,11 +72,7 @@ describe("ConcurrentCaller", function () {
|
||||||
failed = true;
|
failed = true;
|
||||||
throw new Error("Too many concurrent tasks");
|
throw new Error("Too many concurrent tasks");
|
||||||
}
|
}
|
||||||
var min = 10;
|
yield Zotero.Promise.delay(delay);
|
||||||
var max = 25;
|
|
||||||
yield Zotero.Promise.delay(
|
|
||||||
Math.floor(Math.random() * (max - min + 1)) + min
|
|
||||||
);
|
|
||||||
if (running > numConcurrent) {
|
if (running > numConcurrent) {
|
||||||
failed = true;
|
failed = true;
|
||||||
throw new Error("Too many concurrent tasks");
|
throw new Error("Too many concurrent tasks");
|
||||||
|
@ -88,24 +85,27 @@ describe("ConcurrentCaller", function () {
|
||||||
return id;
|
return id;
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
var funcs1 = ids1.map(makeFunc)
|
var keys1 = Object.keys(ids1);
|
||||||
var funcs2 = ids2.map(makeFunc)
|
var keys2 = Object.keys(ids2);
|
||||||
|
var funcs1 = Object.keys(ids1).map(id => makeFunc(id, ids1[id]));
|
||||||
|
var funcs2 = Object.keys(ids2).map(id => makeFunc(id, ids2[id]));
|
||||||
|
|
||||||
var caller = new ConcurrentCaller({
|
var caller = new ConcurrentCaller({
|
||||||
numConcurrent,
|
numConcurrent,
|
||||||
logger
|
logger
|
||||||
});
|
});
|
||||||
var promise1 = caller.start(funcs1);
|
var promise1 = caller.start(funcs1);
|
||||||
yield Zotero.Promise.delay(10);
|
yield Zotero.Promise.delay(1);
|
||||||
var promise2 = caller.start(funcs2);
|
var promise2 = caller.start(funcs2);
|
||||||
|
|
||||||
var results1 = yield promise1;
|
var results1 = yield promise1;
|
||||||
|
|
||||||
assert.isTrue(promise2.isFulfilled());
|
// Second set shouldn't be done yet
|
||||||
assert.equal(running, 0);
|
assert.isFalse(promise2.isFulfilled());
|
||||||
assert.equal(finished, ids1.length + ids2.length);
|
assert.equal(finished, keys1.length);
|
||||||
assert.equal(results1.length, ids1.length);
|
assert.equal(results1.length, keys1.length);
|
||||||
assert.sameMembers(results1.map(p => p.value()), ids1);
|
assert.sameMembers(results1.map(p => p.value()), keys1);
|
||||||
|
assert.isFalse(failed);
|
||||||
})
|
})
|
||||||
|
|
||||||
it("should add functions to existing queue and resolve when all are complete (waiting for later)", function* () {
|
it("should add functions to existing queue and resolve when all are complete (waiting for later)", function* () {
|
||||||
|
@ -114,9 +114,9 @@ describe("ConcurrentCaller", function () {
|
||||||
var finished = 0;
|
var finished = 0;
|
||||||
var failed = false;
|
var failed = false;
|
||||||
|
|
||||||
var ids1 = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm'];
|
var ids1 = {"1": 50, "2": 15, "3": 60};
|
||||||
var ids2 = ['n', 'o', 'p', 'q'];
|
var ids2 = {"4": 1, "5": 1};
|
||||||
var makeFunc = function (id) {
|
var makeFunc = function (id, delay) {
|
||||||
return Zotero.Promise.coroutine(function* () {
|
return Zotero.Promise.coroutine(function* () {
|
||||||
if (logger) {
|
if (logger) {
|
||||||
Zotero.debug("Running " + id);
|
Zotero.debug("Running " + id);
|
||||||
|
@ -126,11 +126,7 @@ describe("ConcurrentCaller", function () {
|
||||||
failed = true;
|
failed = true;
|
||||||
throw new Error("Too many concurrent tasks");
|
throw new Error("Too many concurrent tasks");
|
||||||
}
|
}
|
||||||
var min = 10;
|
yield Zotero.Promise.delay(delay);
|
||||||
var max = 25;
|
|
||||||
yield Zotero.Promise.delay(
|
|
||||||
Math.floor(Math.random() * (max - min + 1)) + min
|
|
||||||
);
|
|
||||||
if (running > numConcurrent) {
|
if (running > numConcurrent) {
|
||||||
failed = true;
|
failed = true;
|
||||||
throw new Error("Too many concurrent tasks");
|
throw new Error("Too many concurrent tasks");
|
||||||
|
@ -143,8 +139,10 @@ describe("ConcurrentCaller", function () {
|
||||||
return id;
|
return id;
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
var funcs1 = ids1.map(makeFunc)
|
var keys1 = Object.keys(ids1);
|
||||||
var funcs2 = ids2.map(makeFunc)
|
var keys2 = Object.keys(ids2);
|
||||||
|
var funcs1 = Object.keys(ids1).map(id => makeFunc(id, ids1[id]));
|
||||||
|
var funcs2 = Object.keys(ids2).map(id => makeFunc(id, ids2[id]));
|
||||||
|
|
||||||
var caller = new ConcurrentCaller({
|
var caller = new ConcurrentCaller({
|
||||||
numConcurrent,
|
numConcurrent,
|
||||||
|
@ -156,10 +154,14 @@ describe("ConcurrentCaller", function () {
|
||||||
|
|
||||||
var results2 = yield promise2;
|
var results2 = yield promise2;
|
||||||
|
|
||||||
assert.isTrue(promise1.isFulfilled());
|
// The second set should finish before the first
|
||||||
assert.equal(running, 0);
|
assert.isFalse(promise1.isFulfilled());
|
||||||
assert.equal(finished, ids1.length + ids2.length);
|
assert.equal(running, 1); // 3 should still be running
|
||||||
assert.equal(results2.length, ids2.length);
|
assert.equal(finished, 4); // 1, 2, 4, 5
|
||||||
|
assert.equal(results2.length, keys2.length);
|
||||||
|
assert.equal(results2[0].value(), keys2[0]);
|
||||||
|
assert.equal(results2[1].value(), keys2[1]);
|
||||||
|
assert.isFalse(failed);
|
||||||
})
|
})
|
||||||
|
|
||||||
it("should return a rejected promise if a single passed function fails", function* () {
|
it("should return a rejected promise if a single passed function fails", function* () {
|
||||||
|
@ -241,10 +243,10 @@ describe("ConcurrentCaller", function () {
|
||||||
assert.isTrue(results1[0].isFulfilled());
|
assert.isTrue(results1[0].isFulfilled());
|
||||||
// 'g' should be rejected
|
// 'g' should be rejected
|
||||||
assert.isTrue(results1[6].isRejected());
|
assert.isTrue(results1[6].isRejected());
|
||||||
// 'm' should be pending
|
// 'm' should be rejected
|
||||||
assert.isTrue(results1[12].isPending());
|
assert.isTrue(results1[12].isRejected());
|
||||||
// All promises in second batch should be pending
|
// All promises in second batch should be rejected
|
||||||
assert.isTrue(promise2.value().every(p => p.isPending()));
|
assert.isTrue(promise2.value().every(p => p.isRejected()));
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@ -302,21 +304,22 @@ describe("ConcurrentCaller", function () {
|
||||||
var promise1 = caller.start(funcs1);
|
var promise1 = caller.start(funcs1);
|
||||||
var promise2 = caller.start(funcs2);
|
var promise2 = caller.start(funcs2);
|
||||||
|
|
||||||
var results1 = yield promise1;
|
var results2 = yield promise2;
|
||||||
|
|
||||||
|
assert.isTrue(promise1.isFulfilled());
|
||||||
assert.isTrue(promise2.isFulfilled());
|
assert.isTrue(promise2.isFulfilled());
|
||||||
assert.equal(running, 0);
|
assert.equal(running, 0);
|
||||||
assert.equal(finished, ids1.length + ids2.length);
|
assert.equal(finished, ids1.length + ids2.length);
|
||||||
assert.equal(results1.length, ids1.length);
|
assert.equal(promise1.value().length, ids1.length);
|
||||||
assert.equal(promise2.value().length, ids2.length);
|
assert.equal(results2.length, ids2.length);
|
||||||
// 'a' should be fulfilled
|
// 'a' should be fulfilled
|
||||||
assert.isTrue(results1[0].isFulfilled());
|
assert.isTrue(promise1.value()[0].isFulfilled());
|
||||||
// 'g' should be rejected
|
// 'g' should be rejected
|
||||||
assert.isTrue(results1[6].isRejected());
|
assert.isTrue(promise1.value()[6].isRejected());
|
||||||
// 'm' should be fulfilled
|
// 'm' should be fulfilled
|
||||||
assert.isTrue(results1[12].isFulfilled());
|
assert.isTrue(promise1.value()[12].isFulfilled());
|
||||||
// All promises in second batch should be fulfilled
|
// All promises in second batch should be fulfilled
|
||||||
assert.isTrue(promise2.value().every(p => p.isFulfilled()));
|
assert.isTrue(results2.every(p => p.isFulfilled()));
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user