Skip to content

Commit e94b8c5

Browse files
authored
Merge pull request #2421 from murgatroid99/grpc-js_waitForReady_fix
grpc-js: Fix connectivity state change event sequencing
2 parents 0c8616c + 43d42dc commit e94b8c5

File tree

3 files changed

+165
-32
lines changed

3 files changed

+165
-32
lines changed

packages/grpc-js/src/load-balancer-pick-first.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -322,12 +322,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
322322
);
323323
}
324324
this.currentPick = subchannel;
325-
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
326325
subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener);
327326
subchannel.ref();
328327
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
329328
this.resetSubchannelList();
330329
clearTimeout(this.connectionDelayTimeout);
330+
this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel));
331331
}
332332

333333
private updateState(newState: ConnectivityState, picker: Picker) {

packages/grpc-js/src/subchannel.ts

+34-31
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class Subchannel {
6060
* state changes. Will be modified by `addConnectivityStateListener` and
6161
* `removeConnectivityStateListener`
6262
*/
63-
private stateListeners: ConnectivityStateListener[] = [];
63+
private stateListeners: Set<ConnectivityStateListener> = new Set();
6464

6565
private backoffTimeout: BackoffTimeout;
6666

@@ -227,6 +227,8 @@ export class Subchannel {
227227
}
228228
const previousState = this.connectivityState;
229229
this.connectivityState = newState;
230+
process.nextTick(() => {
231+
});
230232
switch (newState) {
231233
case ConnectivityState.READY:
232234
this.stopBackoff();
@@ -261,9 +263,7 @@ export class Subchannel {
261263
default:
262264
throw new Error(`Invalid state: unknown ConnectivityState ${newState}`);
263265
}
264-
/* We use a shallow copy of the stateListeners array in case a listener
265-
* is removed during this iteration */
266-
for (const listener of [...this.stateListeners]) {
266+
for (const listener of this.stateListeners) {
267267
listener(this, previousState, newState, this.keepaliveTime);
268268
}
269269
return true;
@@ -291,13 +291,15 @@ export class Subchannel {
291291
if (this.channelzEnabled) {
292292
this.channelzTrace.addTrace('CT_INFO', 'Shutting down');
293293
}
294-
this.transitionToState(
295-
[ConnectivityState.CONNECTING, ConnectivityState.READY],
296-
ConnectivityState.IDLE
297-
);
298294
if (this.channelzEnabled) {
299295
unregisterChannelzRef(this.channelzRef);
300296
}
297+
process.nextTick(() => {
298+
this.transitionToState(
299+
[ConnectivityState.CONNECTING, ConnectivityState.READY],
300+
ConnectivityState.IDLE
301+
);
302+
});
301303
}
302304
}
303305

@@ -339,20 +341,22 @@ export class Subchannel {
339341
* Otherwise, do nothing.
340342
*/
341343
startConnecting() {
342-
/* First, try to transition from IDLE to connecting. If that doesn't happen
343-
* because the state is not currently IDLE, check if it is
344-
* TRANSIENT_FAILURE, and if so indicate that it should go back to
345-
* connecting after the backoff timer ends. Otherwise do nothing */
346-
if (
347-
!this.transitionToState(
348-
[ConnectivityState.IDLE],
349-
ConnectivityState.CONNECTING
350-
)
351-
) {
352-
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
353-
this.continueConnecting = true;
344+
process.nextTick(() => {
345+
/* First, try to transition from IDLE to connecting. If that doesn't happen
346+
* because the state is not currently IDLE, check if it is
347+
* TRANSIENT_FAILURE, and if so indicate that it should go back to
348+
* connecting after the backoff timer ends. Otherwise do nothing */
349+
if (
350+
!this.transitionToState(
351+
[ConnectivityState.IDLE],
352+
ConnectivityState.CONNECTING
353+
)
354+
) {
355+
if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) {
356+
this.continueConnecting = true;
357+
}
354358
}
355-
}
359+
});
356360
}
357361

358362
/**
@@ -368,7 +372,7 @@ export class Subchannel {
368372
* @param listener
369373
*/
370374
addConnectivityStateListener(listener: ConnectivityStateListener) {
371-
this.stateListeners.push(listener);
375+
this.stateListeners.add(listener);
372376
}
373377

374378
/**
@@ -377,21 +381,20 @@ export class Subchannel {
377381
* `addConnectivityStateListener`
378382
*/
379383
removeConnectivityStateListener(listener: ConnectivityStateListener) {
380-
const listenerIndex = this.stateListeners.indexOf(listener);
381-
if (listenerIndex > -1) {
382-
this.stateListeners.splice(listenerIndex, 1);
383-
}
384+
this.stateListeners.delete(listener);
384385
}
385386

386387
/**
387388
* Reset the backoff timeout, and immediately start connecting if in backoff.
388389
*/
389390
resetBackoff() {
390-
this.backoffTimeout.reset();
391-
this.transitionToState(
392-
[ConnectivityState.TRANSIENT_FAILURE],
393-
ConnectivityState.CONNECTING
394-
);
391+
process.nextTick(() => {
392+
this.backoffTimeout.reset();
393+
this.transitionToState(
394+
[ConnectivityState.TRANSIENT_FAILURE],
395+
ConnectivityState.CONNECTING
396+
);
397+
});
395398
}
396399

397400
getAddress(): string {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2023 gRPC authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
import * as assert from 'assert';
19+
import * as path from 'path';
20+
21+
import * as grpc from '../src';
22+
import {sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError} from '../src';
23+
24+
import {loadProtoFile} from './common';
25+
26+
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
27+
const echoService =
28+
loadProtoFile(protoFile).EchoService as ServiceClientConstructor;
29+
30+
describe.only('Global subchannel pool', () => {
31+
let server: Server;
32+
let serverPort: number;
33+
34+
let client1: InstanceType<grpc.ServiceClientConstructor>;
35+
let client2: InstanceType<grpc.ServiceClientConstructor>;
36+
37+
let promises: Promise<any>[];
38+
39+
before(done => {
40+
server = new Server();
41+
server.addService(echoService.service, {
42+
echo(call: ServerUnaryCall<any, any>, callback: sendUnaryData<any>) {
43+
callback(null, call.request);
44+
},
45+
});
46+
47+
server.bindAsync(
48+
'localhost:0', ServerCredentials.createInsecure(), (err, port) => {
49+
assert.ifError(err);
50+
serverPort = port;
51+
server.start();
52+
done();
53+
});
54+
});
55+
56+
beforeEach(() => {
57+
promises = [];
58+
})
59+
60+
after(done => {
61+
server.tryShutdown(done);
62+
});
63+
64+
function callService(client: InstanceType<grpc.ServiceClientConstructor>) {
65+
return new Promise<void>((resolve) => {
66+
const request = {value: 'test value', value2: 3};
67+
68+
client.echo(request, (error: ServiceError, response: any) => {
69+
assert.ifError(error);
70+
assert.deepStrictEqual(response, request);
71+
resolve();
72+
});
73+
})
74+
}
75+
76+
function connect() {
77+
const grpcOptions = {
78+
'grpc.use_local_subchannel_pool': 0,
79+
}
80+
81+
client1 = new echoService(
82+
`127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(),
83+
grpcOptions);
84+
85+
client2 = new echoService(
86+
`127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(),
87+
grpcOptions);
88+
}
89+
90+
/* This is a regression test for a bug where client1.close in the
91+
* waitForReady callback would cause the subchannel to transition to IDLE
92+
* even though client2 is also using it. */
93+
it('Should handle client.close calls in waitForReady',
94+
done => {
95+
connect();
96+
97+
promises.push(new Promise<void>((resolve) => {
98+
client1.waitForReady(Date.now() + 50, (error) => {
99+
assert.ifError(error);
100+
client1.close();
101+
resolve();
102+
});
103+
}))
104+
105+
promises.push(new Promise<void>((resolve) => {
106+
client2.waitForReady(Date.now() + 50, (error) => {
107+
assert.ifError(error);
108+
resolve();
109+
});
110+
}))
111+
112+
Promise.all(promises).then(() => {done()});
113+
})
114+
115+
it('Call the service', done => {
116+
promises.push(callService(client2));
117+
118+
Promise.all(promises).then(() => {
119+
done();
120+
});
121+
})
122+
123+
it('Should complete the client lifecycle without error', done => {
124+
setTimeout(() => {
125+
client1.close();
126+
client2.close();
127+
done()
128+
}, 500);
129+
});
130+
});

0 commit comments

Comments
 (0)