Skip to content

Commit 41039ce

Browse files
committed
add async pool
1 parent 4d23e52 commit 41039ce

File tree

1 file changed

+42
-27
lines changed

1 file changed

+42
-27
lines changed

js/stack_queue.js

+42-27
Original file line numberDiff line numberDiff line change
@@ -38,37 +38,52 @@ function pairTest(str) {
3838

3939
console.log(pairTest('sdf\<asdsdfeesf{sdfefi{esadf{aefw}sdfw}sd}\>'));
4040

41-
function waterfall(tasks, callback) {
42-
43-
// var err = null;
44-
// var res = null;
45-
// var current;
46-
// while (fnArray.length !== 0) {
47-
// current = fnArray.shift();
48-
// current.apply(null, args);
49-
// }
50-
51-
if (!tasks.length) return callback();
52-
var taskIndex = 0;
41+
/**
42+
* 异步操作池
43+
* @param {Array<Promise>} tasks
44+
* @param {Number} limit 最大并发数,默认为1表示串行
45+
* 两种错误处理:1、其中一个任务出错就停止所有;2、执行完所有任务后收集所有错误
46+
*/
47+
function asyncPoolByThunk(tasks, limit = 1, cb = () => {}, options = {}) {
48+
const stopImmediate = options.stopImmediate;
49+
let index = limit - 1;
50+
let errs = [];
5351

54-
function nextTask(args) {
55-
if (taskIndex === tasks.length) {
56-
return callback.apply(null, [null].concat(args));
52+
function next(err) {
53+
if (err) {
54+
errs.push(err);
55+
if (stopImmediate) {
56+
cb(errs);
57+
return;
58+
}
5759
}
58-
59-
var taskCallback = onlyOnce(rest(function(err, args) {
60-
if (err) {
61-
return callback.apply(null, [err].concat(args));
60+
if (index > tasks.length) {
61+
cb(errs);
62+
return;
63+
}
64+
const current = tasks[++index];
65+
if (typeof current === 'function') {
66+
if (stopImmediate && errs.length > 0) {
67+
return;
6268
}
63-
nextTask(args);
64-
}));
65-
66-
args.push(taskCallback);
69+
current(next);
70+
}
71+
}
6772

68-
var task = tasks[taskIndex++];
69-
task.apply(null, args);
73+
for (let k = 0; k < limit; k++) {
74+
tasks[k](next);
7075
}
7176

72-
nextTask([]);
77+
}
7378

74-
}
79+
function asyncFn(params) {
80+
return function (next) {
81+
setTimeout(() => {
82+
console.log('async', params);
83+
next(params === '4' ? '5555' : null);
84+
}, 1000);
85+
}
86+
}
87+
asyncPoolByThunk('123456789'.split('').map(el => asyncFn(el)), 3, function (errs) {
88+
console.log('errs', errs);
89+
}, { stopImmediate: true });

0 commit comments

Comments
 (0)