Skip to content

fix(NODE-5064): consolidate connection cleanup logic and ensure socket is always closed #3572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 17, 2023
Merged
118 changes: 63 additions & 55 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
address: string;
socketTimeoutMS: number;
monitorCommands: boolean;
/** Indicates that the connection (including underlying TCP socket) has been closed. */
closed: boolean;
destroyed: boolean;
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
Expand Down Expand Up @@ -204,7 +204,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.monitorCommands = options.monitorCommands;
this.serverApi = options.serverApi;
this.closed = false;
this.destroyed = false;
this[kHello] = null;
this[kClusterTime] = null;

Expand Down Expand Up @@ -297,33 +296,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
if (this.closed) {
return;
}

this[kStream].destroy(error);

this.closed = true;

for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(false, error);
}

onClose() {
if (this.closed) {
return;
}

this.closed = true;

const message = `connection ${this.id} to ${this.address} closed`;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkError(message));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(false, new MongoNetworkError(message));
}

onTimeout() {
Expand All @@ -332,18 +310,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

this[kDelayedTimeoutId] = setTimeout(() => {
this[kStream].destroy();

this.closed = true;

const message = `connection ${this.id} to ${this.address} timed out`;
const beforeHandshake = this.hello == null;
for (const op of this[kQueue].values()) {
op.cb(new MongoNetworkTimeoutError(message, { beforeHandshake }));
}

this[kQueue].clear();
this.emit(Connection.CLOSE);
this.cleanup(false, new MongoNetworkTimeoutError(message, { beforeHandshake }));
}, 1).unref(); // No need for this timer to hold the event loop open
}

Expand All @@ -364,7 +333,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

// First check if the map is of invalid size
if (this[kQueue].size > 1) {
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
this.cleanup(true, new MongoRuntimeError(INVALID_QUEUE_SIZE));
} else {
// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
Expand Down Expand Up @@ -444,34 +413,73 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
}

destroy(options: DestroyOptions, callback?: Callback): void {
this.removeAllListeners(Connection.PINNED);
this.removeAllListeners(Connection.UNPINNED);

if (this[kStream] == null || this.destroyed) {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
if (this.closed) {
process.nextTick(() => callback?.());
return;
}
if (typeof callback === 'function') {
this.once('close', () => process.nextTick(() => callback()));
}
const message = `connection ${this.id} to ${this.address} closed`;
this.cleanup(options.force, new MongoNetworkError(message));
}

/**
* A method that cleans up the connection. When `force` is true, this method
* forcibly destroys the socket.
*
* If an error is provided, any in-flight operations will be closed with the error.
*
* This method does nothing if the connection is already closed.
*/
private cleanup(force: boolean, error?: Error): void {
if (this.closed) {
return;
}

if (options.force) {
this[kStream].destroy();
this.destroyed = true;
if (typeof callback === 'function') {
callback();
this.closed = true;

const completeCleanup = () => {
for (const op of this[kQueue].values()) {
op.cb(error);
}

this[kQueue].clear();

this.emit(Connection.CLOSE);
};

for (const event of [
Connection.MESSAGE,
Connection.PINNED,
Connection.UNPINNED,
Connection.COMMAND_STARTED,
Connection.COMMAND_FAILED,
Connection.COMMAND_SUCCEEDED,
Connection.CLUSTER_TIME_RECEIVED
]) {
this.removeAllListeners(event);
}

this[kStream].removeAllListeners();
this[kMessageStream].removeAllListeners();

this[kMessageStream].destroy();

if (force) {
this[kStream].destroy();
completeCleanup();
return;
}

this[kStream].end(() => {
this.destroyed = true;
if (typeof callback === 'function') {
callback();
}
});
if (!this[kStream].writableEnded) {
this[kStream].end(() => {
this[kStream].destroy();
completeCleanup();
});
} else {
completeCleanup();
}
}

command(
Expand Down
Loading