-
Notifications
You must be signed in to change notification settings - Fork 152
/
Copy pathPromiseQueue.ts
98 lines (90 loc) · 2.94 KB
/
PromiseQueue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// tslint:disable:no-floating-promises
import * as async from "async";
import * as debugGenerator from "debug";
const debug = debugGenerator("css-blocks");
interface PendingWork<WorkItem, Result> {
id: number;
result?: Result;
item: WorkItem;
}
let queueInstanceId = 1;
/**
* This queue ensures a max concurrency and that, on error, all concurrent
* work is completed before the error is propagated. New work can be added as
* needed.
*/
export class PromiseQueue<WorkItem, Result> {
private queue: async.AsyncQueue<PendingWork<WorkItem, Result>>;
private queueId: number;
private jobId: number;
private draining: boolean;
private promiseProcessor: (item: WorkItem) => Promise<Result>;
constructor(concurrency: number, processor: (item: WorkItem) => Promise<Result>) {
this.promiseProcessor = processor;
this.queue = async.queue<PendingWork<WorkItem, Result>, Error>(this.processWork.bind(this), concurrency);
this.queueId = queueInstanceId++;
this.jobId = 0;
this.draining = false;
}
private processWork(work: PendingWork<WorkItem, Result>, callback: (err?: Error | null | undefined) => void) {
this.debug(`[Job:${work.id}] Starting job.`);
this.promiseProcessor(work.item).then(
(result: Result) => {
this.debug(`[Job:${work.id}] Finished. Recording result.`);
work.result = result;
callback();
},
(error: Error | null | undefined) => {
this.debug(`[Job:${work.id}] Errored.`);
callback(error);
});
}
get activeJobCount() {
return this.queue.running();
}
get concurrency() {
return this.queue.concurrency;
}
debug(message: string) {
debug(`[Queue:${this.queueId}] ${message}`);
}
drain(): Promise<void> {
this.draining = true;
return this.queue.drain().then(() => {
this.draining = false;
});
}
restart() {
this.queue.resume();
}
enqueue(item: WorkItem): Promise<Result> {
let id = this.jobId++;
return new Promise<Result>((resolve, reject) => {
if (this.draining) {
let message = "Queue is draining, cannot enqueue job.";
this.debug(`[Job:${id}] ${message}`);
return reject(new Error(message));
}
this.debug(`[Job:${id}] Added to queue.`);
let work: PendingWork<WorkItem, Result> = {id, item};
this.queue.push(work, (err) => {
if (err) {
this.debug(`[Job:${id}] Failed.`);
this.drain().then(() => {
this.debug(`[Job:${id}] Done draining. Rejecting promise.`);
reject(err);
});
} else {
if (Object.keys(work).indexOf("result")) {
this.debug(`[Job:${id}] Complete. Resolving promise.`);
resolve(work.result);
} else {
this.debug(`[Job:${id}] WTF! Result missing.`);
let error = new Error("there's no result to return. this is an internal error.");
reject(error);
}
}
});
});
}
}