Skip to content

Commit 848d7ae

Browse files
authored
fix: hanging grpc handles after shutdown (#683)
Signed-off-by: Todd Baert <[email protected]>
1 parent 42fe06a commit 848d7ae

13 files changed

+57
-17
lines changed

libs/providers/flagd/jest.config.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ export default {
1313
moduleFileExtensions: ['ts', 'js', 'html'],
1414
// ignore e2e path
1515
testPathIgnorePatterns: ["/e2e/"],
16-
coverageDirectory: '../../../coverage/libs/providers/flagd',
16+
coverageDirectory: '../../../coverage/libs/providers/flagd'
1717
};

libs/providers/flagd/project.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@
7878
"executor": "nx:run-commands",
7979
"options": {
8080
"commands": [
81-
"npx jest --setupFiles './setup-rpc-provider.ts'",
82-
"npx jest --setupFiles './setup-in-process-provider.ts'"
81+
"npx jest --setupFiles './setup-rpc-provider.ts' --runInBand --detectOpenHandles",
82+
"npx jest --setupFiles './setup-in-process-provider.ts' --runInBand --detectOpenHandles"
8383
],
8484
"cwd": "libs/providers/flagd/src/e2e",
8585
"parallel": false

libs/providers/flagd/src/e2e/jest.config.ts

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ export default {
77
moduleNameMapper: {
88
'@openfeature/flagd-core': ['<rootDir>/../../../../shared/flagd-core/src'],
99
},
10-
globalTeardown: './tear-down.ts',
1110
};

libs/providers/flagd/src/e2e/step-definitions/evaluation.spec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ defineFeature(feature, (test) => {
2929
});
3030
});
3131

32+
afterAll(async () => {
33+
await OpenFeature.close();
34+
});
35+
3236
test('Resolves boolean value', ({ given, when, then }) => {
3337
let value: boolean;
3438
let flagKey: string;

libs/providers/flagd/src/e2e/step-definitions/flagd-json-evaluator.spec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ defineFeature(feature, (test) => {
3939
});
4040
});
4141

42+
afterAll(async () => {
43+
await OpenFeature.close();
44+
});
45+
4246
test('Evaluator reuse', evaluateStringFlagWithContext);
4347

4448
test('Fractional operator', ({ given, when, and, then }) => {

libs/providers/flagd/src/e2e/step-definitions/flagd-reconnect.unstable.spec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ defineFeature(feature, (test) => {
2020
});
2121
});
2222

23+
afterAll(async () => {
24+
await OpenFeature.close();
25+
});
26+
2327
test('Provider reconnection', ({ given, when, then, and }) => {
2428
given('a flagd provider is set', () => {
2529
// handled in beforeAll

libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ defineFeature(feature, (test) => {
1818
});
1919
});
2020

21+
afterAll(async () => {
22+
await OpenFeature.close();
23+
});
24+
2125
test('Provider ready event', ({ given, when, then }) => {
2226
let ran = false;
2327

libs/providers/flagd/src/e2e/tear-down.ts

-8
This file was deleted.

libs/providers/flagd/src/lib/flagd-provider.spec.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ describe(FlagdProvider.name, () => {
8181
}
8282
}),
8383
cancel: jest.fn(),
84+
removeAllListeners: jest.fn(),
85+
destroy: jest.fn(),
8486
};
8587
}),
8688
close: jest.fn(),
@@ -260,6 +262,8 @@ describe(FlagdProvider.name, () => {
260262
}
261263
}),
262264
cancel: jest.fn(),
265+
removeAllListeners: jest.fn(),
266+
destroy: jest.fn(),
263267
};
264268

265269
const mockChannel = {
@@ -529,6 +533,8 @@ describe(FlagdProvider.name, () => {
529533
}
530534
}),
531535
cancel: jest.fn(),
536+
removeAllListeners: jest.fn(),
537+
destroy: jest.fn(),
532538
};
533539
}),
534540
close: jest.fn(),
@@ -635,6 +641,8 @@ describe(FlagdProvider.name, () => {
635641
});
636642

637643
describe('shutdown', () => {
644+
const removeAllListenersMock = jest.fn();
645+
const destroyMock = jest.fn();
638646
const cancelMock = jest.fn();
639647
const closeMock = jest.fn();
640648

@@ -648,6 +656,8 @@ describe(FlagdProvider.name, () => {
648656
}
649657
}),
650658
cancel: cancelMock,
659+
removeAllListeners: removeAllListenersMock,
660+
destroy: destroyMock,
651661
};
652662
}),
653663
close: closeMock,
@@ -668,7 +678,8 @@ describe(FlagdProvider.name, () => {
668678
it('should call service disconnect', async () => {
669679
await OpenFeature.close();
670680
expect(cancelMock).toHaveBeenCalled();
671-
expect(closeMock).toHaveBeenCalled();
681+
expect(removeAllListenersMock).toHaveBeenCalled();
682+
expect(destroyMock).toHaveBeenCalled();
672683
});
673684
});
674685
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { ClientReadableStream } from '@grpc/grpc-js';
2+
3+
export const closeStreamIfDefined = (stream: ClientReadableStream<unknown> | undefined) => {
4+
/**
5+
* cancel() is necessary to prevent calls from hanging the process, so we need to we need to remove all the
6+
* handlers, and add a no-op for 'error' so we can cancel without bubbling up an exception
7+
*/
8+
if (stream) {
9+
stream.removeAllListeners();
10+
stream.on('error', () => {
11+
// swallow errors after closed
12+
});
13+
stream.cancel();
14+
stream.destroy();
15+
}
16+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './grpc-util';

libs/providers/flagd/src/lib/service/grpc/grpc-service.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { Config } from '../../configuration';
3232
import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants';
3333
import { FlagdProvider } from '../../flagd-provider';
3434
import { Service } from '../service';
35+
import { closeStreamIfDefined } from '../common';
3536

3637
type AnyResponse =
3738
| ResolveBooleanResponse
@@ -104,8 +105,7 @@ export class GRPCService implements Service {
104105
}
105106

106107
async disconnect(): Promise<void> {
107-
// cancel the stream and close the connection
108-
this._eventStream?.cancel();
108+
closeStreamIfDefined(this._eventStream);
109109
this._client.close();
110110
}
111111

@@ -153,6 +153,10 @@ export class GRPCService implements Service {
153153
rejectConnect?: (reason: Error) => void,
154154
) {
155155
this.logger?.debug(`${FlagdProvider.name}: connecting stream...`);
156+
157+
// close the previous stream if we're reconnecting
158+
closeStreamIfDefined(this._eventStream);
159+
156160
const stream = this._client.eventStream({}, {});
157161
stream.on('error', (err: Error) => {
158162
rejectConnect?.(err);

libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { GeneralError } from '@openfeature/server-sdk';
44
import { FlagSyncServiceClient, SyncFlagsRequest, SyncFlagsResponse } from '../../../../proto/ts/sync/v1/sync_service';
55
import { Config } from '../../../configuration';
66
import { DataFetch } from '../data-fetch';
7+
import { closeStreamIfDefined } from '../../common';
78

89
/**
910
* Implements the gRPC sync contract to fetch flag data.
@@ -34,15 +35,14 @@ export class GrpcFetch implements DataFetch {
3435
changedCallback: (flagsChanged: string[]) => void,
3536
disconnectCallback: () => void,
3637
): Promise<void> {
37-
// note that we never reject the promise as sync is a long-running operation
3838
return new Promise((resolve, reject) =>
3939
this.listen(dataFillCallback, reconnectCallback, changedCallback, disconnectCallback, resolve, reject),
4040
);
4141
}
4242

4343
disconnect() {
4444
this._logger?.debug('Disconnecting gRPC sync connection');
45-
this._syncStream?.destroy();
45+
closeStreamIfDefined(this._syncStream);
4646
this._syncClient.close();
4747
}
4848

@@ -54,6 +54,7 @@ export class GrpcFetch implements DataFetch {
5454
resolveConnect?: () => void,
5555
rejectConnect?: (reason: Error) => void,
5656
) {
57+
closeStreamIfDefined(this._syncStream);
5758
this._syncStream = this._syncClient.syncFlags(this._request);
5859

5960
this._syncStream.on('data', (data: SyncFlagsResponse) => {

0 commit comments

Comments
 (0)