-
-
Notifications
You must be signed in to change notification settings - Fork 694
/
Copy pathdynamicFlushScheduler.server.ts
70 lines (60 loc) · 1.95 KB
/
dynamicFlushScheduler.server.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import { nanoid } from "nanoid";
export type DynamicFlushSchedulerConfig<T> = {
batchSize: number;
flushInterval: number;
callback: (flushId: string, batch: T[]) => Promise<void>;
};
export class DynamicFlushScheduler<T> {
private batchQueue: T[][]; // Adjust the type according to your data structure
private currentBatch: T[]; // Adjust the type according to your data structure
private readonly BATCH_SIZE: number;
private readonly FLUSH_INTERVAL: number;
private flushTimer: NodeJS.Timeout | null;
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
constructor(config: DynamicFlushSchedulerConfig<T>) {
this.batchQueue = [];
this.currentBatch = [];
this.BATCH_SIZE = config.batchSize;
this.FLUSH_INTERVAL = config.flushInterval;
this.callback = config.callback;
this.flushTimer = null;
this.startFlushTimer();
}
addToBatch(items: T[]): void {
this.currentBatch.push(...items);
if (this.currentBatch.length >= this.BATCH_SIZE) {
this.batchQueue.push(this.currentBatch);
this.currentBatch = [];
this.flushNextBatch();
this.resetFlushTimer();
}
}
private startFlushTimer(): void {
this.flushTimer = setInterval(() => this.checkAndFlush(), this.FLUSH_INTERVAL);
}
private resetFlushTimer(): void {
if (this.flushTimer) {
clearInterval(this.flushTimer);
}
this.startFlushTimer();
}
private checkAndFlush(): void {
if (this.currentBatch.length > 0) {
this.batchQueue.push(this.currentBatch);
this.currentBatch = [];
}
this.flushNextBatch();
}
private async flushNextBatch(): Promise<void> {
if (this.batchQueue.length === 0) return;
const batchToFlush = this.batchQueue.shift();
try {
await this.callback(nanoid(), batchToFlush!);
if (this.batchQueue.length > 0) {
this.flushNextBatch();
}
} catch (error) {
console.error("Error inserting batch:", error);
}
}
}