Skip to content

Commit 99205ec

Browse files
authored
feat: use weighted round robin in balancedPool (#1069)
* fixes * more fixes * add test * remove console.log * rename startingWeightPerServer to maxWeightPerServer * add another test
1 parent 5b57e8c commit 99205ec

File tree

2 files changed

+390
-11
lines changed

2 files changed

+390
-11
lines changed

lib/balanced-pool.js

+81-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,17 @@ const { parseOrigin } = require('./core/util')
1818
const kFactory = Symbol('factory')
1919

2020
const kOptions = Symbol('options')
21+
const kGreatestCommonDivisor = Symbol('kGreatestCommonDivisor')
22+
const kCurrentWeight = Symbol('kCurrentWeight')
23+
const kIndex = Symbol('kIndex')
24+
const kWeight = Symbol('kWeight')
25+
const kMaxWeightPerServer = Symbol('kMaxWeightPerServer')
26+
const kErrorPenalty = Symbol('kErrorPenalty')
27+
28+
function getGreatestCommonDivisor (a, b) {
29+
if (b === 0) return a
30+
return getGreatestCommonDivisor(b, a % b)
31+
}
2132

2233
function defaultFactory (origin, opts) {
2334
return new Pool(origin, opts)
@@ -28,6 +39,11 @@ class BalancedPool extends PoolBase {
2839
super()
2940

3041
this[kOptions] = opts
42+
this[kIndex] = -1
43+
this[kCurrentWeight] = 0
44+
45+
this[kMaxWeightPerServer] = this[kOptions].maxWeightPerServer || 100
46+
this[kErrorPenalty] = this[kOptions].errorPenalty || 15
3147

3248
if (!Array.isArray(upstreams)) {
3349
upstreams = [upstreams]
@@ -42,6 +58,7 @@ class BalancedPool extends PoolBase {
4258
for (const upstream of upstreams) {
4359
this.addUpstream(upstream)
4460
}
61+
this._updateBalancedPoolStats()
4562
}
4663

4764
addUpstream (upstream) {
@@ -54,12 +71,40 @@ class BalancedPool extends PoolBase {
5471
))) {
5572
return this
5673
}
74+
const pool = this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions]))
75+
76+
this[kAddClient](pool)
77+
pool.on('connect', () => {
78+
pool[kWeight] = Math.min(this[kMaxWeightPerServer], pool[kWeight] + this[kErrorPenalty])
79+
})
80+
81+
pool.on('connectionError', () => {
82+
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
83+
this._updateBalancedPoolStats()
84+
})
85+
86+
pool.on('disconnect', (...args) => {
87+
const err = args[2]
88+
if (err && err.code === 'UND_ERR_SOCKET') {
89+
// decrease the weight of the pool.
90+
pool[kWeight] = Math.max(1, pool[kWeight] - this[kErrorPenalty])
91+
this._updateBalancedPoolStats()
92+
}
93+
})
94+
95+
for (const client of this[kClients]) {
96+
client[kWeight] = this[kMaxWeightPerServer]
97+
}
5798

58-
this[kAddClient](this[kFactory](upstreamOrigin, Object.assign({}, this[kOptions])))
99+
this._updateBalancedPoolStats()
59100

60101
return this
61102
}
62103

104+
_updateBalancedPoolStats () {
105+
this[kGreatestCommonDivisor] = this[kClients].map(p => p[kWeight]).reduce(getGreatestCommonDivisor, 0)
106+
}
107+
63108
removeUpstream (upstream) {
64109
const upstreamOrigin = parseOrigin(upstream).origin
65110

@@ -100,10 +145,42 @@ class BalancedPool extends PoolBase {
100145
return
101146
}
102147

103-
this[kClients].splice(this[kClients].indexOf(dispatcher), 1)
104-
this[kClients].push(dispatcher)
148+
const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true)
149+
150+
if (allClientsBusy) {
151+
return
152+
}
153+
154+
let counter = 0
155+
156+
let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain])
157+
158+
while (counter++ < this[kClients].length) {
159+
this[kIndex] = (this[kIndex] + 1) % this[kClients].length
160+
const pool = this[kClients][this[kIndex]]
161+
162+
// find pool index with the largest weight
163+
if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) {
164+
maxWeightIndex = this[kIndex]
165+
}
166+
167+
// decrease the current weight every `this[kClients].length`.
168+
if (this[kIndex] === 0) {
169+
// Set the current weight to the next lower weight.
170+
this[kCurrentWeight] = this[kCurrentWeight] - this[kGreatestCommonDivisor]
171+
172+
if (this[kCurrentWeight] <= 0) {
173+
this[kCurrentWeight] = this[kMaxWeightPerServer]
174+
}
175+
}
176+
if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) {
177+
return pool
178+
}
179+
}
105180

106-
return dispatcher
181+
this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight]
182+
this[kIndex] = maxWeightIndex
183+
return this[kClients][maxWeightIndex]
107184
}
108185
}
109186

0 commit comments

Comments
 (0)