From 43d42dcf3f2409c53a623b46ab72c89af353e4e8 Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Tue, 11 Apr 2023 14:24:25 -0700 Subject: [PATCH] grpc-js: Fix connectivity state change event sequencing --- .../grpc-js/src/load-balancer-pick-first.ts | 2 +- packages/grpc-js/src/subchannel.ts | 65 ++++----- .../test/test-global-subchannel-pool.ts | 130 ++++++++++++++++++ 3 files changed, 165 insertions(+), 32 deletions(-) create mode 100644 packages/grpc-js/test/test-global-subchannel-pool.ts diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index a501b1f7d..41d21a2ea 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -322,12 +322,12 @@ export class PickFirstLoadBalancer implements LoadBalancer { ); } this.currentPick = subchannel; - this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener); subchannel.ref(); this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); this.resetSubchannelList(); clearTimeout(this.connectionDelayTimeout); + this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); } private updateState(newState: ConnectivityState, picker: Picker) { diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index c93e0c451..de420cc97 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -60,7 +60,7 @@ export class Subchannel { * state changes. Will be modified by `addConnectivityStateListener` and * `removeConnectivityStateListener` */ - private stateListeners: ConnectivityStateListener[] = []; + private stateListeners: Set = new Set(); private backoffTimeout: BackoffTimeout; @@ -227,6 +227,8 @@ export class Subchannel { } const previousState = this.connectivityState; this.connectivityState = newState; + process.nextTick(() => { + }); switch (newState) { case ConnectivityState.READY: this.stopBackoff(); @@ -261,9 +263,7 @@ export class Subchannel { default: throw new Error(`Invalid state: unknown ConnectivityState ${newState}`); } - /* We use a shallow copy of the stateListeners array in case a listener - * is removed during this iteration */ - for (const listener of [...this.stateListeners]) { + for (const listener of this.stateListeners) { listener(this, previousState, newState, this.keepaliveTime); } return true; @@ -291,13 +291,15 @@ export class Subchannel { if (this.channelzEnabled) { this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); } - this.transitionToState( - [ConnectivityState.CONNECTING, ConnectivityState.READY], - ConnectivityState.IDLE - ); if (this.channelzEnabled) { unregisterChannelzRef(this.channelzRef); } + process.nextTick(() => { + this.transitionToState( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + ConnectivityState.IDLE + ); + }); } } @@ -339,20 +341,22 @@ export class Subchannel { * Otherwise, do nothing. */ startConnecting() { - /* First, try to transition from IDLE to connecting. If that doesn't happen - * because the state is not currently IDLE, check if it is - * TRANSIENT_FAILURE, and if so indicate that it should go back to - * connecting after the backoff timer ends. Otherwise do nothing */ - if ( - !this.transitionToState( - [ConnectivityState.IDLE], - ConnectivityState.CONNECTING - ) - ) { - if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { - this.continueConnecting = true; + process.nextTick(() => { + /* First, try to transition from IDLE to connecting. If that doesn't happen + * because the state is not currently IDLE, check if it is + * TRANSIENT_FAILURE, and if so indicate that it should go back to + * connecting after the backoff timer ends. Otherwise do nothing */ + if ( + !this.transitionToState( + [ConnectivityState.IDLE], + ConnectivityState.CONNECTING + ) + ) { + if (this.connectivityState === ConnectivityState.TRANSIENT_FAILURE) { + this.continueConnecting = true; + } } - } + }); } /** @@ -368,7 +372,7 @@ export class Subchannel { * @param listener */ addConnectivityStateListener(listener: ConnectivityStateListener) { - this.stateListeners.push(listener); + this.stateListeners.add(listener); } /** @@ -377,21 +381,20 @@ export class Subchannel { * `addConnectivityStateListener` */ removeConnectivityStateListener(listener: ConnectivityStateListener) { - const listenerIndex = this.stateListeners.indexOf(listener); - if (listenerIndex > -1) { - this.stateListeners.splice(listenerIndex, 1); - } + this.stateListeners.delete(listener); } /** * Reset the backoff timeout, and immediately start connecting if in backoff. */ resetBackoff() { - this.backoffTimeout.reset(); - this.transitionToState( - [ConnectivityState.TRANSIENT_FAILURE], - ConnectivityState.CONNECTING - ); + process.nextTick(() => { + this.backoffTimeout.reset(); + this.transitionToState( + [ConnectivityState.TRANSIENT_FAILURE], + ConnectivityState.CONNECTING + ); + }); } getAddress(): string { diff --git a/packages/grpc-js/test/test-global-subchannel-pool.ts b/packages/grpc-js/test/test-global-subchannel-pool.ts new file mode 100644 index 000000000..c19125687 --- /dev/null +++ b/packages/grpc-js/test/test-global-subchannel-pool.ts @@ -0,0 +1,130 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; +import * as path from 'path'; + +import * as grpc from '../src'; +import {sendUnaryData, Server, ServerCredentials, ServerUnaryCall, ServiceClientConstructor, ServiceError} from '../src'; + +import {loadProtoFile} from './common'; + +const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); +const echoService = + loadProtoFile(protoFile).EchoService as ServiceClientConstructor; + +describe.only('Global subchannel pool', () => { + let server: Server; + let serverPort: number; + + let client1: InstanceType; + let client2: InstanceType; + + let promises: Promise[]; + + before(done => { + server = new Server(); + server.addService(echoService.service, { + echo(call: ServerUnaryCall, callback: sendUnaryData) { + callback(null, call.request); + }, + }); + + server.bindAsync( + 'localhost:0', ServerCredentials.createInsecure(), (err, port) => { + assert.ifError(err); + serverPort = port; + server.start(); + done(); + }); + }); + + beforeEach(() => { + promises = []; + }) + + after(done => { + server.tryShutdown(done); + }); + + function callService(client: InstanceType) { + return new Promise((resolve) => { + const request = {value: 'test value', value2: 3}; + + client.echo(request, (error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, request); + resolve(); + }); + }) + } + + function connect() { + const grpcOptions = { + 'grpc.use_local_subchannel_pool': 0, + } + + client1 = new echoService( + `127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(), + grpcOptions); + + client2 = new echoService( + `127.0.0.1:${serverPort}`, grpc.credentials.createInsecure(), + grpcOptions); + } + + /* This is a regression test for a bug where client1.close in the + * waitForReady callback would cause the subchannel to transition to IDLE + * even though client2 is also using it. */ + it('Should handle client.close calls in waitForReady', + done => { + connect(); + + promises.push(new Promise((resolve) => { + client1.waitForReady(Date.now() + 50, (error) => { + assert.ifError(error); + client1.close(); + resolve(); + }); + })) + + promises.push(new Promise((resolve) => { + client2.waitForReady(Date.now() + 50, (error) => { + assert.ifError(error); + resolve(); + }); + })) + + Promise.all(promises).then(() => {done()}); + }) + + it('Call the service', done => { + promises.push(callService(client2)); + + Promise.all(promises).then(() => { + done(); + }); + }) + + it('Should complete the client lifecycle without error', done => { + setTimeout(() => { + client1.close(); + client2.close(); + done() + }, 500); + }); +});